import os
import pandas as pd
import numpy as np
import logging
from scipy import stats
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
from matplotlib.markers import MarkerStyle
from matplotlib.transforms import Affine2D
from timewise.wise_data_base import WISEDataBase
from timewise.utils import get_excess_variance
logger = logging.getLogger(__name__)
[docs]
class WiseDataByVisit(WISEDataBase):
"""
WISEData class to bin lightcurve by visits. The visits typically consist of some tens of observations.
The individual visits are separated by about six months. The mean flux for one visit is calculated by the
weighted mean of the data. The error on that mean is calculated by the root-mean-squared and corrected by the
t-value. Outliers per visit are identified if they are more than 20 times the rms away from the mean.
In addition to the attributes of :class:`timewise.WISEDataBase` this class has the following attributes:
:param clean_outliers_when_binning: whether to remove outliers by brightness when binning
:type clean_outliers_when_binning: bool
:param mean_key: the key for the mean
:type mean_key: str
:param median_key: the key for the median
:type median_key: str
:param rms_key: the key for the rms
:type rms_key: str
:param upper_limit_key: the key for the upper limit
:type upper_limit_key: str
:param Npoints_key: the key for the number of points
:type Npoints_key: str
:param zeropoint_key_ext: the key for the zeropoint
:type zeropoint_key_ext: str
"""
mean_key = '_mean'
median_key = '_median'
rms_key = '_rms'
upper_limit_key = '_ul'
Npoints_key = '_Npoints'
zeropoint_key_ext = '_zeropoint'
flux_error_factor = {
"W1": 1.6,
"W2": 1.2
}
[docs]
def __init__(
self,
base_name,
parent_sample_class,
min_sep_arcsec,
n_chunks,
clean_outliers_when_binning=True,
multiply_flux_error=True
):
"""
Constructor of the WISEDataByVisit class.
:param base_name: the base name of the data directory
:type base_name: str
:param parent_sample_class: the parent sample class
:type parent_sample_class: ParentSampleBase
:param min_sep_arcsec: query region around source for positional query
:type min_sep_arcsec: float
:param n_chunks: number of chunks to split the sample into
:type n_chunks: int
:param clean_outliers_when_binning: if True, clean outliers when binning
:type clean_outliers_when_binning: bool
"""
super().__init__(base_name, parent_sample_class, min_sep_arcsec, n_chunks)
self.clean_outliers_when_binning = clean_outliers_when_binning
self.multiply_flux_error = multiply_flux_error
[docs]
def calculate_epochs(self, f, e, visit_mask, counts, remove_outliers, outlier_mask=None):
"""
Calculates the binned epochs of a lightcurve.
:param f: the fluxes
:type f: np.array
:param e: the flux errors
:type e: np.array
:param visit_mask: the visit mask
:type visit_mask: np.array
:param counts: the counts
:type counts: np.array
:param remove_outliers: whether to remove outliers
:type remove_outliers: bool
:param outlier_mask: the outlier mask
:type outlier_mask: np.array
:return: the epoch
:rtype: float
"""
if len(f) == 0:
return [], [], [], [], [], []
u_lims = pd.isna(e)
nan_mask = pd.isna(f)
# --------------------- remove outliers in the bins ---------------------- #
# if we do not want to clean outliers just set the threshold to infinity
outlier_thresh = np.inf if not remove_outliers else 20
# set up empty masks
outlier_mask = np.array([False] * len(f)) if outlier_mask is None else outlier_mask
median = np.nan
u = np.nan
use_mask = None
n_points = counts
# set up dummy values for number of remaining outliers
n_remaining_outlier = np.inf
# --------------------- flag upper limits ---------------------- #
bin_n_ulims = np.bincount(visit_mask, weights=u_lims, minlength=len(counts))
bin_ulim_bool = (counts - bin_n_ulims) == 0
use_mask_ul = ~u_lims | (u_lims & bin_ulim_bool[visit_mask])
n_loops = 0
# recalculate uncertainty and median as long as no outliers left
while n_remaining_outlier > 0:
# make a mask of values to use
use_mask = ~outlier_mask & use_mask_ul & ~nan_mask
n_points = np.bincount(visit_mask, weights=use_mask)
zero_points_mask = n_points == 0
# ------------------------- calculate median ------------------------- #
median = np.zeros_like(counts, dtype=float)
visits_at_least_one_point = np.unique(visit_mask[~zero_points_mask[visit_mask]])
visits_zero_points = np.unique(visit_mask[zero_points_mask[visit_mask]])
median[visits_at_least_one_point] = np.array([
np.median(f[(visit_mask == i) & use_mask]) for i in visits_at_least_one_point
])
median[visits_zero_points] = np.nan
# median is NaN for visits with 0 detections, (i.e. detections in one band and not the other)
# if median is NaN for other visits raise Error
if np.any(np.isnan(median[n_points > 0])):
nan_indices = np.where(np.isnan(median))[0]
msg = ""
for inan_index in nan_indices:
nanf = f[visit_mask == inan_index]
msg += f"median is nan for {inan_index}th bin\n{nanf}\n\n"
raise ValueError(msg)
# --------------------- calculate uncertainty ---------------------- #
mean_deviation = np.bincount(
visit_mask[use_mask],
weights=(f[use_mask] - median[visit_mask[use_mask]]) ** 2,
minlength=len(counts)
)
one_points_mask = n_points <= 1
# calculate standard deviation
std = np.zeros_like(counts, dtype=float)
std[~one_points_mask] = (
np.sqrt(mean_deviation[~one_points_mask])
/ (n_points[~one_points_mask] - 1)
* stats.t.interval(0.68, df=n_points[~one_points_mask] - 1)[1]
# for visits with small number of detections we have to correct according to the t distribution
)
std[one_points_mask] = -np.inf
# calculate the propagated errors of the single exposure measurements
single_exp_measurement_errors = np.sqrt(np.bincount(
visit_mask[use_mask],
weights=e[use_mask] ** 2,
minlength=len(counts)
))
e_meas = np.zeros_like(std, dtype=float)
e_meas[~zero_points_mask] = single_exp_measurement_errors[n_points > 0] / n_points[n_points > 0]
e_meas[zero_points_mask] = np.nan
# take the maximum value of the measured single exposure errors and the standard deviation
u = np.maximum(std, e_meas)
# calculate 90% confidence interval
u70 = np.zeros_like(counts, dtype=float)
u70[one_points_mask] = 1e-10
visits_at_least_two_point = np.unique(visit_mask[~one_points_mask[visit_mask]])
u70[visits_at_least_two_point] = np.array([
np.quantile(abs(f[(visit_mask == i) & use_mask] - median[i]), .7, method="interpolated_inverted_cdf")
for i in visits_at_least_two_point
])
# --------------------- remove outliers in the bins ---------------------- #
remaining_outliers = (abs(median[visit_mask] - f) > outlier_thresh * u70[visit_mask]) & ~outlier_mask
outlier_mask |= remaining_outliers
n_remaining_outlier = sum(remaining_outliers) if remove_outliers else 0
# setting remaining_outliers to 0 will exit the while loop
n_loops += 1
if n_loops > 20:
raise Exception(f"{n_loops}!")
return median, u, bin_ulim_bool, outlier_mask, use_mask, n_points
[docs]
@staticmethod
def get_visit_map(lightcurve):
"""
Create a map datapoint to visit
:param lightcurve: the unbinned lightcurve
:type lightcurve: pd.DataFrame
:returns: visit map
:rtype: np.ndarray
"""
# ------------------------- find epoch intervals -------------------------- #
sorted_mjds = np.sort(lightcurve.mjd)
epoch_bounds_mask = (sorted_mjds[1:] - sorted_mjds[:-1]) > 100
epoch_bins = np.array(
[lightcurve.mjd.min() * 0.99] + # this makes sure that the first datapoint gets selected
list(((sorted_mjds[1:] + sorted_mjds[:-1]) / 2)[epoch_bounds_mask]) + # finding the middle between
# two visits
[lightcurve.mjd.max() * 1.01] # this just makes sure that the last datapoint gets selected as well
)
visit_mask = np.digitize(lightcurve.mjd, epoch_bins) - 1
return visit_mask
[docs]
def bin_lightcurve(self, lightcurve):
"""
Combine the data by visits of the satellite of one region in the sky.
The visits typically consist of some tens of observations. The individual visits are separated by about
six months.
The mean flux for one visit is calculated by the weighted mean of the data.
The error on that mean is calculated by the root-mean-squared and corrected by the t-value.
Outliers per visit are identified if they are more than 100 times the rms away from the mean. These outliers
are removed from the calculation of the mean and the error if self.clean_outliers_when_binning is True.
:param lightcurve: the unbinned lightcurve
:type lightcurve: pandas.DataFrame
:return: the binned lightcurve
:rtype: pandas.DataFrame
"""
# ------------------------- create visit mask -------------------------- #
visit_map = self.get_visit_map(lightcurve)
counts = np.bincount(visit_map)
binned_data = dict()
# ------------------------- calculate mean mjd -------------------------- #
binned_data["mean_mjd"] = np.bincount(visit_map, weights=lightcurve.mjd) / counts
# ------------------------- loop through bands -------------------------- #
for b in self.bands:
# loop through magnitude and flux and save the respective datapoints
outlier_masks = dict()
use_masks = dict()
bin_ulim_bools = dict()
for lum_ext in [self.flux_key_ext, self.mag_key_ext]:
f = lightcurve[f"{b}{lum_ext}"]
e = lightcurve[f"{b}{lum_ext}{self.error_key_ext}"]
# we will flag outliers based on the flux only
remove_outliers = lum_ext == self.flux_key_ext and self.clean_outliers_when_binning
outlier_mask = outlier_masks.get(self.flux_key_ext, None)
mean, u, bin_ulim_bool, outlier_mask, use_mask, n_points = self.calculate_epochs(
f, e, visit_map,
counts,
remove_outliers=remove_outliers,
outlier_mask=outlier_mask
)
n_outliers = np.sum(outlier_mask)
if (n_outliers > 0):
logger.debug(f"removed {n_outliers} outliers by brightness for {b} {lum_ext}")
binned_data[f'{b}{self.mean_key}{lum_ext}'] = mean
binned_data[f'{b}{lum_ext}{self.rms_key}'] = u
binned_data[f'{b}{lum_ext}{self.upper_limit_key}'] = bin_ulim_bool
binned_data[f'{b}{lum_ext}{self.Npoints_key}'] = n_points
outlier_masks[lum_ext] = outlier_mask
use_masks[lum_ext] = use_mask
bin_ulim_bools[lum_ext] = bin_ulim_bool
# ------- calculate the zeropoints per exposure ------- #
# this might look wrong since we use the flux mask on the magnitudes but it s right
# for each flux measurement we need the corresponding magnitude to get the zeropoint
mags = lightcurve[f'{b}{self.mag_key_ext}']
inst_fluxes = lightcurve[f'{b}{self.flux_key_ext}']
pos_m = inst_fluxes > 0 # select only positive fluxes, i.e. detections
zp_mask = pos_m & use_masks[self.flux_key_ext]
# calculate zero points
zps = np.zeros_like(inst_fluxes)
zps[zp_mask] = mags[zp_mask] + 2.5 * np.log10(inst_fluxes[zp_mask])
# find visits with no zeropoints
n_valid_zps = np.bincount(visit_map, weights=zp_mask)
at_least_one_valid_zp = n_valid_zps > 0
# calculate the median zeropoint for each visit
zps_median = np.zeros_like(n_valid_zps, dtype=float)
zps_median[n_valid_zps > 0] = np.array([
np.median(zps[(visit_map == i) & zp_mask])
for i in np.unique(visit_map[at_least_one_valid_zp[visit_map]])
])
# if there are only non-detections then fall back to default zeropoint
zps_median[n_valid_zps == 0] = self.magnitude_zeropoints['Mag'][b]
# if the visit only has upper limits then use the fall-back zeropoint
zps_median[bin_ulim_bools[self.flux_key_ext]] = self.magnitude_zeropoints['Mag'][b]
# --------------- calculate flux density from instrument flux ---------------- #
# get the instrument flux [digital numbers], i.e. source count
inst_fluxes_e = lightcurve[f'{b}{self.flux_key_ext}{self.error_key_ext}']
# calculate the proportionality constant between flux density and source count
mag_zp = self.magnitude_zeropoints['F_nu'][b].to('mJy').value
flux_dens_const = mag_zp * 10 ** (-zps_median / 2.5)
# calculate flux densities from instrument counts
flux_densities = inst_fluxes * flux_dens_const[visit_map]
flux_densities_e = inst_fluxes_e * flux_dens_const[visit_map]
# bin flux densities
mean_fd, u_fd, ul_fd, outlier_mask_fd, use_mask_fd, n_points_fd = self.calculate_epochs(
flux_densities,
flux_densities_e,
visit_map, counts,
remove_outliers=False,
outlier_mask=outlier_masks[self.flux_key_ext]
)
binned_data[f'{b}{self.mean_key}{self.flux_density_key_ext}'] = mean_fd
binned_data[f'{b}{self.flux_density_key_ext}{self.rms_key}'] = u_fd
binned_data[f'{b}{self.flux_density_key_ext}{self.upper_limit_key}'] = ul_fd
binned_data[f'{b}{self.flux_density_key_ext}{self.Npoints_key}'] = n_points_fd
return pd.DataFrame(binned_data)
[docs]
def plot_diagnostic_binning(
self,
service,
ind,
lum_key="mag",
interactive=False,
fn=None,
save=True,
which="panstarrs",
arcsec=20
):
"""
Show a skymap of the single detections and which bin they belong to next to the binned lightcurve
:param service: service used to download data, either of 'tap' or 'gator'
:type service: str
:param ind: index of the object in the parent sample
:type ind: str, int
:param lum_key: the key of the brightness unit, either of `flux` (instrument flux in counts) or `mag`
:type lum_key: str
:param interactive: if function is used interactively, return mpl.Figure and mpl.axes if True
:type interactive: bool
:param fn: filename for saving
:type fn: str
:param save: saves figure if True
:type save: bool
:param which: survey to get the cutout from, either of 'sdss' or 'panstarrs'
:type which: str
:param arcsec: size of cutout
:type arcsec: float
:returns: Figure and axes if `interactive=True`
:rtype: mpl.Figure, mpl.Axes
"""
logger.info(f"making binning diagnostic plot")
# get position
pos = self.parent_sample.df.loc[
ind,
[self.parent_sample.default_keymap["ra"], self.parent_sample.default_keymap["dec"]]
]
# get chunk number to be able to load the unstacked lightcurve
chunk_number = self._get_chunk_number(parent_sample_index=ind)
# load the unstacked lightcurves
if service == "tap":
unbinned_lcs = self.get_unbinned_lightcurves(chunk_number=chunk_number)
else:
unbinned_lcs = self._get_unbinned_lightcurves_gator(chunk_number=chunk_number)
# select the datapoints corresponding to our object
lightcurve = unbinned_lcs[unbinned_lcs[self._tap_orig_id_key] == ind]
# get visit map
visit_map = self.get_visit_map(lightcurve)
counts = np.bincount(visit_map)
# get the masks indicating outliers per visit based on brightness
outlier_masks = dict()
for b in self.bands:
f = lightcurve[f"{b}{self.flux_key_ext}"]
e = lightcurve[f"{b}{self.flux_key_ext}{self.error_key_ext}"]
mean, u, bin_ulim_bool, outlier_mask, use_mask, n_points = self.calculate_epochs(
f, e, visit_map,
counts,
remove_outliers=True
)
if np.any(outlier_mask):
outlier_masks[b] = outlier_mask
# get a mask indicating outliers based on position
ra, dec = pos.astype(float)
bad_indices_position, cluster_res, data_mask, allwise_mask = self.calculate_position_mask(
lightcurve, ra=ra, dec=dec, return_all=True, whitelist_region=self.whitelist_region.to("arcsec").value
)
position_mask = (
~lightcurve.index.isin(bad_indices_position)
if len(bad_indices_position) > 0
else np.array([True] * len(lightcurve))
)
# calculate the stacked lightcurve
binned_lightcurve = self.bin_lightcurve(lightcurve[position_mask])
# ----------------- create the plot ----------------- #
fig, axs = plt.subplots(nrows=2, gridspec_kw={"height_ratios": [3, 2]}, figsize=(5, 8))
# plot a panstarrs cutout
kwargs = {"plot_color_image": True} if which == "panstarrs" else dict()
self.parent_sample.plot_cutout(ind=ind, ax=axs[0], which=which, arcsec=arcsec, **kwargs)
# plot the lightcurve
if len(binned_lightcurve) > 0:
self._plot_lc(
lightcurve=binned_lightcurve,
unbinned_lc=lightcurve[position_mask],
lum_key=lum_key,
ax=axs[-1],
save=False
)
self._plot_lc(
unbinned_lc=lightcurve[~position_mask],
lum_key=lum_key,
ax=axs[-1],
save=False,
colors={"W1": "gray", "W2": "lightgray"}
)
# set markers for visits
markers_strings = list(Line2D.filled_markers) + ["$1$", "$2$", "$3$", "$4$", "$5$", "$6$", "$7$", "$8$", "$9$"]
markers_straight = [MarkerStyle(im) for im in markers_strings]
rot = Affine2D().rotate_deg(180)
markers_rotated = [MarkerStyle(im, transform=rot) for im in markers_strings]
markers = markers_straight + markers_rotated
# calculate ra and dec relative to center of cutout
ra = (lightcurve.ra - pos[self.parent_sample.default_keymap["ra"]]) * 3600
dec = (lightcurve.dec - pos[self.parent_sample.default_keymap["dec"]]) * 3600
# for each visit plot the datapoints on the cutout
for visit in np.unique(visit_map):
m = visit_map == visit
label = str(visit)
axs[0].plot([], [], marker=markers[visit], label=label, mec="k", mew=1, mfc="none", ls="")
for im, mec, zorder in zip(
[position_mask, ~position_mask],
["k", "none"],
[1, 0]
):
mask = m & im
for i_data_mask, i_data in zip([data_mask, ~data_mask], ["data", "other_allwise"]):
datapoints = lightcurve[mask & i_data_mask]
cluster_labels = (
cluster_res.labels_[mask[i_data_mask]] if i_data == "data"
else np.array([-1] * len(datapoints))
)
for cluster_label in np.unique(cluster_labels):
cluster_label_mask = cluster_labels == cluster_label
datapoints_cluster = datapoints[cluster_label_mask]
# make a marker for the visit and colored by cluster
color = f"C{cluster_label}" if cluster_label != -1 else "grey"
if ("sigra" in datapoints_cluster.columns) and ("sigdec" in datapoints_cluster.columns):
has_sig = ~datapoints_cluster.sigra.isna() & ~datapoints_cluster.sigdec.isna()
_ra = ra[mask & i_data_mask][cluster_label_mask]
_dec = dec[mask & i_data_mask][cluster_label_mask]
axs[0].errorbar(
_ra[has_sig],
_dec[has_sig],
xerr=datapoints_cluster.sigra[has_sig] / 3600,
yerr=datapoints_cluster.sigdec[has_sig] / 3600,
marker=markers[visit],
ls="",
color=color,
zorder=zorder,
ms=10,
mec=mec,
mew=0.1
)
axs[0].scatter(
_ra[~has_sig],
_dec[~has_sig],
marker=markers[visit],
color=color,
zorder=zorder,
edgecolors=mec,
linewidths=0.1,
)
else:
axs[0].scatter(
ra[mask], dec[mask],
marker=markers[visit],
color=color,
zorder=zorder,
edgecolors=mec,
linewidths=0.1,
)
# for each band indicate the outliers based on brightness with circles
for b, outlier_mask in outlier_masks.items():
brightness_outlier_scatter_style = {
"marker": '$\u25cc$',
"facecolors": self.band_plot_colors[b],
"edgecolors": self.band_plot_colors[b],
"s": 60,
"lw": 2
}
axs[0].scatter(
ra[outlier_mask],
dec[outlier_mask],
label=f"{b} outlier by brightness",
**brightness_outlier_scatter_style
)
axs[1].scatter(
lightcurve.mjd[outlier_mask],
lightcurve[f"{b}_{lum_key}"][outlier_mask],
**brightness_outlier_scatter_style
)
# indicate the query region with a circle
circle = plt.Circle((0, 0), self.min_sep.to("arcsec").value, color='r', fill=False, ls="-", lw=3, zorder=0)
axs[0].add_artist(circle)
# indicate the whitelist region of 1 arcsec with a circle
circle = plt.Circle(
(0, 0),
self.whitelist_region.to("arcsec").value,
color='g',
fill=False,
ls="-",
lw=3,
zorder=0
)
axs[0].add_artist(circle)
# formatting
title = axs[0].get_title()
axs[-1].set_ylabel("Apparent Vega Magnitude")
axs[-1].grid(ls=":", alpha=0.5)
axs[0].set_title("")
axs[0].legend(ncol=5, bbox_to_anchor=(0, 1, 1, 0), loc="lower left", mode="expand", title=title)
axs[0].set_aspect(1, adjustable="box")
fig.tight_layout()
if save:
if fn is None:
fn = self.plots_dir / f"{ind}_binning_diag_{which}cutout.pdf"
logger.debug(f"saving under {fn}")
fig.savefig(fn)
if interactive:
return fig, axs
else:
plt.close()