Source code for mutis.signal

# Licensed under a 3-clause BSD style license - see LICENSE
"""Synthetic generation of light curves."""

import logging

import matplotlib.pyplot as plt
import numpy as np
import scipy.optimize as scipy_optimize
import scipy.signal as scipy_signal
import scipy.special as scipy_special
import scipy.stats as scipy_stats
from matplotlib.offsetbox import AnchoredText

from mutis.lib.signal import *

__all__ = ["Signal"]

log = logging.getLogger(__name__)


[docs]class Signal: """Analysis and generation of a signal. Description goes here. Parameters ---------- times : :class:`numpy.ndarray` or :class:`pandas.Series` Values of the time axis. values : :class:`numpy.ndarray` or :class:`pandas.Series` Values of the signal axis. fgen : :class:`str` Method used to generate the synthetic signal. """ def __init__(self, times, values, dvalues=None, fgen=None): self.times = np.array(times) self.values = np.array(values) self.fgen = fgen self.dvalues = np.array(dvalues) if dvalues is not None else None self.synth = None # TODO make attributes below specific of OU method / not the entire class self.OU_theta = None self.OU_mu = None self.OU_sigma = None
[docs] def plot(self, ax=None): if ax is None: ax = plt.gca() return ax.plot(self.times, self.values, ".-", lw=1, alpha=0.7)
[docs] def gen_synth(self, samples): """Description goes here.""" self.synth = np.empty((samples, self.times.size)) for n in range(samples): self.synth[n] = fgen_wrapper(fgen=self.fgen, t=self.times, y=self.values, fgen_params={'theta':self.OU_theta, 'mu':self.OU_mu, 'sigma':self.OU_sigma})
[docs] def OU_fit(self, bins=None, rang=None, a=1e-5, b=100): """Fit the signal to an OU stochastic process, using several statistical approaches. This function tries to fit the signal to an OU stochastic process using both basic curve fitting and the Maximum Likelihood Estimation method, and returns some plots of the signals and its properties, and the estimated parameters. """ # TODO: make a generic fit method res = dict() y = self.values t = self.times dy = np.diff(y) dt = np.diff(t) # estimate sigma try: sigma_est = (np.nanmean(dy ** 2 / y[:-1] ** 2 / dt)) ** 0.5 res["sigma_est"] = sigma_est except Exception as e: log.error(f"Could not estimate sigma: {e}") # plot histogram if bins is None: bins = np.int(y.size ** 0.5 / 1.5) # bins='auto' if rang is None: rang = (np.percentile(y, 0), np.percentile(y, 99)) p, x = np.histogram(y, density=True, bins=bins, range=rang) # bins='sqrt') x = (x + np.roll(x, -1))[:-1] / 2.0 plt.subplots() plt.hist(y, density=True, alpha=0.75, bins=bins, range=rang) plt.plot(x, p, "r-", alpha=0.5) anchored_text = AnchoredText( f"mean {np.mean(y):.2g} \n " f"median {np.median(y):.2g} \n " f"mode {scipy_stats.mode(y)[0][0]:.2g} \n " f"std {np.std(y):.2g} \n " f"var {np.var(y):.2g}", loc="upper right", ) plt.gca().add_artist(anchored_text) # curve_fit try: popt, pcov = scipy_optimize.curve_fit(f=self.pdf, xdata=x, ydata=p) # print('curve_fit: (l, mu)') # print('popt: ') # print(popt) # print('pcov: ') # print(np.sqrt(np.diag(pcov))) x_c = np.linspace(1e-5, 1.1 * np.max(x), 1000) plt.plot(x_c, self.pdf(x_c, *popt), "k-", label="curve_fit", alpha=0.8) res["curve_fit"] = (popt, np.sqrt(np.diag(pcov))) except Exception as e: log.error(f"Some error fitting with curve_fit {e}") # fit pdf with MLE # TODO: place outside as a helper class # mu here is different than self.mu class OU(scipy_stats.rv_continuous): def _pdf(self, x, l, mu): return ( (l * mu) ** (1 + l) / scipy_special.gamma(1 + l) * np.exp(-l * mu / x) / x ** (l + 2) ) try: fit = OU(a=a, b=np.percentile(y, b)).fit(y, 1, 1, floc=0, fscale=1) # print('MLE fit: (l, mu)') # print(fit) x_c = np.linspace(0, 1.1 * np.max(x), 1000) plt.plot(x_c, self.pdf(x_c, fit[0], fit[1]), "k-.", label="MLE", alpha=0.8) res["MLE_fit"] = fit[:-2] except Exception as e: log.error(f"Some error fitting with MLE {e}") plt.legend(loc="lower right") # plt.show() # estimate theta (from curve_fit) try: res["th_est1"] = fit[0] * sigma_est ** 2 / 2 except NameError as e: res["th_est1"] = None # estimate theta (from MLE) try: res["th_est2"] = popt[0] * sigma_est ** 2 / 2 except NameError as e: res["th_est2"] = None return res
[docs] def check_gen(self, fgen=None, fgen_params=None, fpsd="lombscargle", **axes): """Check the generation of synthetic signals. This function checks the generation of a synthetic light curve. Parameters: ----------- fgen: :str: A valid fgen method name. fgen_params: :dict: Parameters for fgen (e.g. for fgen='lc_gen_ou', a dict containing values for theta, mu and sigma). Returns: -------- axes: :tuple: Tuple of three axes, on which: - The first plot show both the original signal and the synthetic one. - The second plot shows the histogram of the values taken by both signals. - The third plot shows their PSD. """ t, y = self.times, self.values y2 = fgen_wrapper(fgen=fgen, t=t, y=y, fgen_params=fgen_params) print(f"Original vs Synthethic:", f"mean: {np.mean(y)} / {np.mean(y2)}", f"std: {np.std(y)} / {np.std(y2)}", sep='\n') if ("ax1" not in axes) or ("ax2" not in axes) or ("ax3" not in axes): fig, (axes["ax1"], axes["ax2"], axes["ax3"]) = plt.subplots( nrows=1, ncols=3, figsize=(20, 4) ) # plot the two signals axes["ax1"].plot(t, y, "b-", label="orig", lw=0.5, alpha=0.8) # ax1p = ax1.twinx() axes["ax1"].plot(t, y2, "r-", label="gen", lw=0.5, alpha=0.8) axes["ax1"].set_title("light curves") # plot their histogram bins = "auto" # bins = np.int(y.size**0.5/1.5) # rang = (np.percentile(y, 0), np.percentile(y, 99)) axes["ax2"].hist(y, density=True, color="b", alpha=0.4, bins=bins, range=rang) # ax2p = ax2.twinx() bins = "auto" # bins = np.int(y.size**0.5/1.5) # rang = (np.percentile(y2, 0), np.percentile(y2, 99)) axes["ax2"].hist(y2, density=True, color="r", alpha=0.4, bins=bins, range=rang) axes["ax2"].set_title("pdf") # plot their PSD if fpsd == "lombscargle": k = np.linspace(1e-3, self.values.size / 2, self.values.size // 2) freqs = k / 2 / np.pi pxx = scipy_signal.lombscargle(t, y, freqs, normalize=True) axes["ax3"].plot(freqs, pxx, "b-", lw=1, alpha=0.5) pxx2 = scipy_signal.lombscargle(t, y2, freqs, normalize=True) axes["ax3"].plot(freqs, pxx2, "r-", lw=1, alpha=0.5) axes["ax3"].set_xscale("log") # ax3.set_yscale('log') else: axes["ax3"].psd(y, color="b", lw=1, alpha=0.5) ax3p = axes["ax3"].twinx() ax3p.psd(y2, color="r", lw=1, alpha=0.5) axes["ax3"].set_title("PSD") return axes
[docs] @staticmethod def pdf(xx, ll, mu): """Helper func to fit pdf as a curve.""" return ( (ll * mu) ** (1 + ll) / scipy_special.gamma(1 + ll) * np.exp(-ll * mu / xx) / xx ** (ll + 2) )