"""Module to handle Locality Preserving Projections (LPP)."""
# TODO: Replace camelcase and populate API docstrings.
# TODO: Maybe the boat has sailed but maybe some of these functions,
# especially the ones that pass around mapInfo should have been class methods.
from exovetter import lightkurve_utils
import copy
import warnings
import numpy as np
from astropy import units as u
from astropy.utils.data import download_file, _is_url
from scipy import io as spio
__all__ = ['compute_lpp_Transitmetric', 'runningMedian', 'foldBinLightCurve',
'computeRawLPPTransitMetric', 'knnDistance_fromKnown',
'periodNormalLPPTransitMetric', 'lpp_onetransit',
'lpp_averageIndivTransit', 'plot_lpp_diagnostic', 'Lppdata',
'Loadmap']
[docs]
def compute_lpp_Transitmetric(data, mapInfo):
"""Take a data class with light curve info
and the mapInfo with information about the mapping to use.
It then returns a lpp metric value.
"""
binFlux, binPhase = foldBinLightCurve(data, mapInfo.ntrfr, mapInfo.npts)
plot_data = dict()
plot_data['bin_flux'] = binFlux
plot_data['bin_phase'] = binPhase
# Dimensionality Reduction and knn parts
rawTLpp, transformedTransit = computeRawLPPTransitMetric(binFlux, mapInfo)
# Normalize by Period Dependence
normTLpp = periodNormalLPPTransitMetric(
rawTLpp, np.array([data.period, data.mes]), mapInfo)
plot_data['lpp_transform'] = transformedTransit
return normTLpp, rawTLpp, plot_data
[docs]
def foldBinLightCurve(data, ntrfr, npts):
"""Fold and bin lightcurve for input to LPP metric calculation.
Parameters
----------
data
Contains time, tzero, dur, period, mes and flux (centered around zero).
ntrfr
Number of transit fraction for binning around transit ~1.5.
npts
Number of points in the final binning.
"""
# Create a phased light curve
phaselc = np.mod((data.time - (data.tzero - 0.5 * data.period)) /
data.period, 1)
flux = data.flux
mes = data.mes
# Determine the fraction of the time the planet transits the star.
# Insist that ntrfr * transit fraction
if ~np.isnan(data.dur) & (data.dur > 0):
transit_dur = data.dur
else:
transit_dur = 0.2 * data.period / 24.
transit_fr = transit_dur / 24. / data.period
if (transit_fr * ntrfr) > 0.5:
transit_fr = 0.5 / ntrfr
# Specify the out of transit (a) and the in transit regions
binover = 1.3
if mes <= 20:
binover = -(1 / 8.0) * mes + 3.8
endfr = .03
midfr = .11
a = np.concatenate((np.arange(endfr, .5 - midfr, 1 / npts),
np.arange((0.5 + midfr), (1 - endfr), 1 / npts)),
axis=None)
ovsamp = 4.0
# bstep=(ovsamp*ntrfr*transit_fr)/npts
b_num = 41
b = np.linspace((0.5 - ntrfr * transit_fr),
(0.5 + ntrfr * transit_fr), b_num)
[runta, runya] = runningMedian(phaselc, flux, binover / npts, a)
[runtb, runyb] = runningMedian(
phaselc, flux, (binover * ovsamp * ntrfr * transit_fr) / npts, b)
# Combine the two sets of bins
runymess = np.array(runya + runyb)
runtmess = np.array(runta + runtb)
srt = np.argsort(runtmess)
runy = runymess[srt]
runt = runtmess[srt]
# Scale the flux by the depth so everything has the same depth.
# Catch or dividing by zero is to not scale.
scale = -1 * np.min(runyb)
if scale != 0:
scaledFlux = runy / scale
else:
scaledFlux = runy
binnedFlux = scaledFlux
phasebins = runt
return binnedFlux, phasebins
[docs]
def computeRawLPPTransitMetric(binFlux, mapInfo):
"""Perform the matrix transformation with LPP.
Do the KNN test to get a raw LPP transit metric number.
.. note:: This requires ``lpproj`` package.
"""
# https://github.com/jakevdp/lpproj
from lpproj import LocalityPreservingProjection
Yorig = mapInfo.YmapMapped
lpp = LocalityPreservingProjection(n_components=mapInfo.n_dim)
lpp.projection_ = mapInfo.YmapM
# To equate to Matlab LPP methods, we need to remove mean of transform.
# Check if this is correct, YmapMean is an array that is transit shapped
normBinFlux = binFlux - mapInfo.YmapMean
inputY = lpp.transform(normBinFlux.reshape(1, -1))
knownTransitsY = Yorig[mapInfo.knnGood, :]
dist, ind = knnDistance_fromKnown(knownTransitsY, inputY, mapInfo.knn)
rawLppTrMetric = np.mean(dist)
return rawLppTrMetric, binFlux
[docs]
def knnDistance_fromKnown(knownTransits, new, knn):
"""For a group of known transits and a new one,
use KNN to determine how close the new one is to the known transits.
.. note:: Using KNN ``minkowski p = 2 ()``. Requires ``scikit-learn``.
"""
from sklearn.neighbors import NearestNeighbors
# p=3 sets a minkowski distance of 3.
# TODO: Check that you really used 3 for matlab.
nbrs = NearestNeighbors(n_neighbors=int(knn), algorithm='kd_tree', p=2)
nbrs.fit(knownTransits)
distances, indices = nbrs.kneighbors(new)
return distances, indices
[docs]
def periodNormalLPPTransitMetric(rawTLpp, newPerMes, mapInfo):
"""Normalize the rawTransitMetric value by those with the closest period.
This part removes the period dependence of the metric at short periods.
Plus, it makes a value near one be the threshold between good and bad.
``newPerMes`` is the ``np.array([period, mes])`` of the new sample.
"""
knownTrPeriods = mapInfo.mappedPeriods[mapInfo.knnGood]
knownTrMes = mapInfo.mappedMes[mapInfo.knnGood]
knownTrrawLpp = mapInfo.dymeans[mapInfo.knnGood]
nPercentil = mapInfo.nPercentil
nPsample = mapInfo.nPsample
# Find the those with the nearest periods Npsample-nneighbors
logPeriods = np.log10(knownTrPeriods)
logMes = np.log10(knownTrMes)
knownPerMes = np.stack((logPeriods, logMes), axis=-1)
np.shape(knownPerMes)
logNew = np.log10(newPerMes).reshape(1, -1)
# logNew=np.array([np.log10(newPeriod)]).reshape(1,1)
dist, ind = knnDistance_fromKnown(knownPerMes, logNew, nPsample)
# Find the nthPercentile of the rawLpp of these indicies
nearPeriodLpp = knownTrrawLpp[ind]
LppNPercentile = np.percentile(nearPeriodLpp, nPercentil)
NormLppTransitMetric = rawTLpp / LppNPercentile
return NormLppTransitMetric
[docs]
def lpp_onetransit(tcedata, mapInfo, ntransit):
"""Chop down the full time series to one orbital period.
Then gather the lpp value for that one transit.
"""
startTime = tcedata.time[0] + ntransit * tcedata.period
# A few cadences of overlap
endTime = tcedata.time[0] + (ntransit + 1) * tcedata.period + 3 / 24.0
want = (tcedata.time >= startTime) & (tcedata.time <= endTime)
newtime = tcedata.time[want]
newflux = tcedata.flux[want]
nExpCad = (tcedata.time[-1] - tcedata.time[0]) / tcedata.period
if len(newtime > nExpCad * 0.75):
onetransit = copy.deepcopy(tcedata)
onetransit.time = newtime
onetransit.flux = newflux
normTLpp, rawTLpp, transformedTr = compute_lpp_Transitmetric(
onetransit, mapInfo)
else:
normTLpp = np.nan
rawTLpp = np.nan
return normTLpp, rawTLpp
[docs]
def lpp_averageIndivTransit(tcedata, mapInfo):
"""Create the loop over individual transits and return
array normalized lpp values, mean, and std.
Input TCE object and mapInfo object.
It is unclear that this individual transit approach
separates out several new false positives.
It probably would require re-tuning for low SNR signals.
"""
length = tcedata.time[-1] - tcedata.time[0]
ntransits = int(np.floor(length / tcedata.period))
lppNorms = np.ones(ntransits)
lppRaws = np.ones(ntransits)
# nExpCad=(tcedata.time[-1]-tcedata.time[0])/tcedata.period
for i in range(ntransits):
lppNorms[i], lppRaws[i] = lpp_onetransit(tcedata, mapInfo, i)
lppMed = np.nanmedian(lppNorms)
lppStd = np.nanstd(lppNorms)
return lppNorms, lppMed, lppStd, ntransits
[docs]
def plot_lpp_diagnostic(data, target, norm_lpp):
"""Plot LPP data for diagnostics.
.. note:: Requires ``matplotlib``.
Parameters
----------
data : dict
Contains ``bin_flux`` and ``bin_phase`` for plotting.
target : str
Contains target name on the plot.
norm_lpp : float
Normalized LPP transit metric value.
Used as string on the top of the plot.
Returns
-------
fig : obj
Matplotlib figure.
"""
import matplotlib.pyplot as plt
phase = data['bin_phase']
flux = data['bin_flux']
fig = plt.figure()
ax1 = fig.add_subplot(211)
ax1.plot(phase, flux, 'b.', ms=5, label="LPP Bins")
ax1.set_xlabel('Phase')
ax1.set_ylabel('Normalized Flux')
ax1.set_title(f"LPP Binning for {target}")
ax1.legend(loc="best")
ax2 = fig.add_subplot(212)
ax2.plot(flux, 'k.', ms=5, label=f"LPP Norm = {norm_lpp:5.3f}")
ax2.set_xlabel('Bin Number')
ax2.legend()
fig.tight_layout()
plt.draw()
return fig
[docs]
class Lppdata:
"""Class to handle LPP data.
Parameters
----------
tce : `~exovetter.tce.TCE`
TCE object.
lc : obj
``lightkurve`` object.
"""
def __init__(self, tce, lc, lc_name="flux", default_snr=10.):
# TODO: Needs a check lightcurve function
self.check_tce(tce, default_snr)
# FIXME: This looks more correct but fails the test.
# from exovetter import const as exo_const
# self.tzero = tce.get_epoch(
# getattr(exo_const, lc.time_format)).to_value(u.day)
self.tzero = tce['epoch'].to_value(u.day)
self.dur = tce['duration'].to_value(u.hr)
self.period = tce['period'].to_value(u.day)
self.mes = default_snr
if 'snr' in tce.keys():
self.mes = tce['snr']
self.time, self.flux, _ = \
lightkurve_utils.unpack_lk_version(lc, lc_name)
# make sure flux is zero norm.
if np.round(np.median(self.flux)) != 0:
self.flux = self.flux - np.median(self.flux)
[docs]
def check_tce(self, tce, default_snr):
"""Validate TCE."""
if 'period' not in tce.keys():
raise KeyError('Period required for the TCE to run LPP.')
if 'snr' not in tce.keys():
warnings.warn('LPP requires a MES or SNR value stored as snr '
f'in the tce. Using a value of {default_snr}.')
[docs]
class Loadmap:
"""Class to handle map info parsing.
Read in MATLAB blob. Use the DV trained one by default.
.. note:: Requires ``scipy``.
Parameters
----------
filename : str or `None`
Full path to a LPP ``.mat`` file.
If not provided, a built-in default is used.
If URL is provided, it will be cached using :ref:`astropy:utils-data`
"""
builtin_mat = 'https://stsci.box.com/shared/static/1ffi1t1fhae82d7xeqexw4ymlhlk0ov4.mat' # noqa
def __init__(self, filename=None):
if filename is None:
filename = self.builtin_mat
if _is_url(filename):
self.filename = download_file(filename, cache=True)
else:
self.filename = filename
mat = spio.loadmat(self.filename, matlab_compatible=True)
# Pull out the information we need.
# FIXME: No mapInfoDV nor commented stuff below in SourceForge version
key = 'mapInfoDV'
# key = 'map'
self.n_dim = mat[key]['nDim'][0][0][0][0]
self.Ymap = mat[key]['Ymap'][0][0][0][0]
self.YmapMapping = self.Ymap['mapping']
self.YmapMean = self.YmapMapping['mean'][0][0][0]
self.YmapM = self.YmapMapping['M'][0][0]
self.YmapMapped = self.Ymap['mapped']
self.knn = mat[key]['knn'][0][0][0][0]
self.knnGood = mat[key]['knnGood'][0][0][:, 0]
self.mappedPeriods = mat[key]['periods'][0][0][0]
self.mappedMes = mat[key]['mes'][0][0][0]
self.nPsample = mat[key]['nPsample'][0][0][0][0] # number to sample # noqa: E501
self.nPercentil = mat[key]['npercentilTM'][0][0][0][0]
self.dymeans = mat[key]['dymean'][0][0][0]
self.ntrfr = 2.0
self.npts = 80.0