Source code for mutis.flares.bayblocks

""" Flare analysis """

import logging

import numpy as np
import scipy as sp
import pandas as pd

import matplotlib as mplt
import matplotlib.pyplot as plt

import sys
import os
import glob

import re
from datetime import datetime

from astropy.stats import bayesian_blocks

import warnings 

import mutis.flares.flare as flare

log = logging.getLogger(__name__)

[docs]class BayesianBlocks: """ Return a Bayesian Block representation of the signal Attributes ---------- edges: list list of edges defining the blocks. values: list list of values defining the height of the blocks. signal: mutis.Signal the mutis.Signal() instance that this BayesianBlocks() represents. inflare: list list of boolean representing whether a block is in flare or not. """ def __init__(self, signal, p=0.1): edges = bayesian_blocks(signal.times, signal.values, signal.dvalues, fitness='measures', p0=p) values = list() for i in range( len(edges)-1 ): msk = (edges[i] < signal.times) & (signal.times < edges[i+1]) if np.sum(msk) > 0: value = np.average( signal.values[msk], weights=1/signal.dvalues[msk] ) values.append( value ) else: values.append( 0 ) self.edges = np.array(edges) self.values = np.array(values) self.signal = signal self.inflare = None
[docs] def plot(self, style='color.strict', ax=None, **kwargs): """ Plot the bayesian block representation E.g: >>> signal = mutis.Signal(data['jyear'], data['Flux'], data['Flux_err']) >>> BayesianBlocks(signal).plot(color='k') """ ax = plt.gca() if ax is None else ax #self.signal.plot() x = self.edges y = self.values[np.r_[0:len(self.values),-1]] if self.inflare is None or style == 'none': ax.step(x, y, 'k', where='post', **kwargs) else: if style == 'area': ax.step(x, y, 'k', where='post') for i in range(len(self.inflare)): if self.inflare[i]: ax.axvspan(x[i], x[i+1], facecolor='r', edgecolor=None, alpha=0.2, **kwargs) elif style == 'color.simple': for i in range(len(self.edges)-2): ax.step(self.edges[[i,i+1]], self.values[[i,i+1]], 'r' if self.inflare[i] else 'k', where='post', **kwargs) ax.step(self.edges[[-2,-1]], self.values[[-1,-1]], 'r' if self.inflare[-1] else 'k', where='post', **kwargs) elif style == 'color.loose': for i in range(len(self.edges)-2): ax.step(self.edges[[i,i+1]], self.values[[i,i]], 'r' if self.inflare[i] else 'k', where='post', **kwargs) ax.step(self.edges[[i+1,i+1]], self.values[[i,i+1]], 'r' if self.inflare[i] or self.inflare[i+1] else 'k', where='post', **kwargs) ax.step(self.edges[[-2,-1]], self.values[[-1,-1]], 'r' if self.inflare[-1] else 'k', where='post', **kwargs) elif style == 'color.strict': for i in range(len(self.edges)-2): ax.step(self.edges[[i,i+1]], self.values[[i,i]], 'r' if self.inflare[i] else 'k', where='post', **kwargs) ax.step(self.edges[[i+1,i+1]], self.values[[i,i+1]], 'r' if self.inflare[i] and self.inflare[i+1] else 'k', where='post', **kwargs) ax.step(self.edges[[-2,-1]], self.values[[-1,-1]], 'r' if self.inflare[-1] else 'k', where='post', **kwargs) else: raise Exception("Unknown style. Available styles are 'none', 'area', 'color.simple', 'color.strict', 'color.loose'.")
[docs] def get_flares(self, thresh=1): """ Get a list of flares following the algorithm proposed in [Meyer, Scargle, Blandford (2019)] (https://iopscience.iop.org/article/10.3847/1538-4357/ab1651/pdf): ```There is no generally accepted consensus on the best way to determine which data points belong to a flaring state and which characterize the quiescent level. Nalewajko (2013)suggested the simple definition that a flare is a continuous time interval associated with a flux peak in which the flux is larger than half the peak flux value. This definition is intuitive, however, and it is unclear how to treat overlapping flares and identify flux peaks in an objective way. Here we use a simple two-step procedure tailored to the block representation: (1)identify a block that is higher than both the previous and subsequent blocks as a peak, and (2)proceed downward from the peak in both directions as long as the blocks are successively lower.``` """ di_max = 5 # get list of local maxima # bad beheaviour at 0 and -1: #imaxL = sp.signal.argrelextrema(self.values, np.greater)[0] imaxL = list() for i in range(0,len(self.values)): if i == 0 and thresh*self.values[0] > self.values[1]: imaxL.append(i) elif i == len(self.values)-1 and self.values[i-1] < thresh*self.values[i]: imaxL.append(i) elif self.values[i-1] < thresh*self.values[i] > self.values[i+1]: imaxL.append(i) imaxL = np.array(imaxL) inflareL = np.full(self.values.shape, False) # get list of local maxima over a threshold (flare peaks) for imax in imaxL: inflare = True if imax == 0: pass elif not (self.values[imax-1] < thresh*self.values[imax]): inflare = False if imax == len(self.values)-1: pass elif not (thresh*self.values[imax] > self.values[imax+1]): inflare = False inflareL[imax] = inflare # extend the flare to adyacent blocks for imax in np.argwhere(inflareL): if imax == 0: pass else: di = 1 stop = False while not stop and di < di_max: if imax-di-1 < 0: break if self.values[imax-di-1] < thresh*self.values[imax-di]: inflareL[imax-di] = True else: stop = True di = di + 1 if imax == len(self.values)-1: pass else: di = 1 stop = False while not stop and di < di_max: if imax+di+1 > len(self.values)-1: break if thresh*self.values[imax+di] > self.values[imax+di+1]: inflareL[imax+di] = True else: stop = True di = di + 1 self.inflare = np.array(inflareL)
[docs] def get_flare_list(self): """ Join all flares into a list of mutis.flares.Flare() """ groups = np.split(np.arange(len(self.inflare)), np.where((np.abs(np.diff(np.asarray(self.inflare, dtype=int))) == 1))[0]+1) # (groups contains the groups of indices of self.inflare/self.values # that corresond to flares or no flares) #print(self.inflare) #print([list(grp) for grp in groups]) flareL = list() for i, grp in enumerate(groups): if np.all(self.inflare[grp]): #print(grp) tstart = self.edges[grp[0]] tstop = self.edges[grp[-1]+1] # flare ends when no flare begins flareL.append(flare.Flare(tstart,tstop)) #print(flareL) return flareL