"""
Single-channel classes:
* NoiseRecords: encapsulate a file with noise records
* PulseRecords: encapsulate a file with pulse records
* MicrocalDataSet: encapsulate basically everything about 1 channel's pulses and noise
"""
import numpy as np
import scipy as sp
import matplotlib.pylab as plt
import inspect
import os
import sys
import traceback
import sklearn
from packaging import version
from deprecation import deprecated
# MASS modules
import mass.mathstat.power_spectrum
import mass.mathstat.interpolate
import mass.mathstat.robust
import mass.core.analysis_algorithms
from . import phase_correct
from .pulse_model import PulseModel
from mass.core.cut import Cuts
from mass.core.files import VirtualFile, LJHFile
from mass.core.optimal_filtering import Filter, ArrivalTimeSafeFilter
from mass.core.utilities import show_progress
from mass.calibration.energy_calibration import EnergyCalibration
from mass.calibration.algorithms import EnergyCalibrationAutocal
from mass.mathstat.entropy import laplace_entropy
import mass.off
from mass.core import ljh_util
import logging
LOG = logging.getLogger("mass")
[docs]
class NoiseRecords:
"""Encapsulate a set of noise records.
The noise records can either be assumed continuous or arbitrarily separated in time.
"""
DEFAULT_MAXSEGMENTSIZE = 32000000
ALLOWED_TYPES = ("ljh", "virtual")
def __init__(self, filename, records_are_continuous=False, use_records=None,
maxsegmentsize=None):
"""Contain and analyze a noise records file.
Args:
filename: name of the noise data file
records_are_continuous: whether to treat all pulses as a continuous
timestream (default False)
use_records: (default None), or can be a sequence (first,end) to use
only a limited section of the file from record first to the one
before end.
maxsegmentsize: the number of bytes to be read at once in a segment
(default self.DEFAULT_MAXSEGMENTSIZE)
"""
if maxsegmentsize is not None:
self.maxsegmentsize = maxsegmentsize
else:
self.maxsegmentsize = self.DEFAULT_MAXSEGMENTSIZE
self.channum = ljh_util.ljh_channum(filename)
self.nSamples = self.nPresamples = self.nPulses = 0
self.n_segments = 0
self.timebase = 0.0
self.timestamp_offset = 0
self.datafile = None
self.data = None
self.saved_auto_cuts = None
self.__open_file(filename, use_records=use_records)
self.continuous = records_are_continuous
self.noise_psd = None
[docs]
def set_hdf5_group(self, hdf5_group):
if hdf5_group is None:
raise ValueError("hdf5_group should not be None")
self.hdf5_group = hdf5_group
# Copy up some of the most important attributes
for attr in ("nSamples", "nPresamples", "nPulses", "timebase", "channum", "n_segments"):
self.__dict__[attr] = self.datafile.__dict__[attr]
self.hdf5_group.attrs[attr] = self.datafile.__dict__[attr]
self.autocorrelation = self.hdf5_group.require_dataset(
"autocorrelation", shape=(self.nSamples,), dtype=np.float64)
nfreq = 1 + self.nSamples // 2
self.noise_psd = self.hdf5_group.require_dataset(
"noise_psd", shape=(nfreq,), dtype=np.float64)
def __open_file(self, filename, use_records=None, file_format=None):
"""Detect the filetype and open it."""
if file_format is None:
if isinstance(filename, VirtualFile):
file_format = "virtual"
elif filename.endswith("ljh"):
file_format = "ljh"
else:
file_format = "ljh"
if file_format not in self.ALLOWED_TYPES:
raise ValueError("file_format must be None or one of %s" % ",".join(self.ALLOWED_TYPES))
if file_format == "ljh":
self.datafile = LJHFile.open(filename)
elif file_format == "virtual":
vfile = filename # Aha! It must not be a string
self.datafile = vfile
self.datafile.segmentsize = vfile.nPulses * (6 + 2 * vfile.nSamples)
filename = "Virtual file"
else:
raise RuntimeError("It is a programming error to get here")
self.filename = filename
self.records_per_segment = self.datafile.segmentsize // (6 + 2 * self.datafile.nSamples)
self.data = self.datafile.alldata
if use_records is not None:
if use_records < self.datafile.nPulses:
self.datafile.nPulses = use_records
self.datafile.n_segments = use_records // self.records_per_segment
[docs]
def set_fake_data(self):
"""Use when this does not correspond to a real datafile."""
self.datafile = VirtualFile(np.zeros((0, 0)))
[docs]
def copy(self):
"""Return a copy of the object.
Handy when coding and you don't want to read the whole data set back in, but
you do want to update the method definitions."""
c = NoiseRecords(self.filename)
c.__dict__.update(self.__dict__)
c.datafile = self.datafile.copy()
return c
[docs]
def compute_power_spectrum(self, window=None, plot=True,
max_excursion=1000):
if window is None:
window = mass.mathstat.power_spectrum.hann
self.compute_power_spectrum_reshape(window=window, seg_length=None,
max_excursion=max_excursion)
if plot:
self.plot_power_spectrum()
[docs]
def compute_power_spectrum_reshape(self, window=mass.mathstat.power_spectrum.hann,
seg_length=None, max_excursion=1000):
"""Compute the noise power spectrum with noise "records" reparsed into
separate records of a given length.
Args:
window (ndarray): a window function weighting, or a function that will
return a weighting.
seg_length (int): length of the noise segments (default None). If None,
then use self.data.shape[0], which is self.data.nPulses, will be
used as the number of segments, each having length self.data.nSamples.
max_excursion (number): the biggest excursion from the median allowed
in each data segment, or else it will be ignored (default 1000).
By making <seg_length> small, you improve the noise on the PSD estimates at the price of poor
frequency resolution. By making it large, you get good frequency resolution with worse
uncertainty on each PSD estimate. No free lunch, know what I mean?
"""
if not self.continuous and seg_length is not None and seg_length != self.nSamples:
raise ValueError(
"This NoiseRecords doesn't have continuous noise; it can't be resegmented.")
if seg_length is None:
seg_length = self.nSamples
spectrum = mass.mathstat.power_spectrum.PowerSpectrum(seg_length // 2, dt=self.timebase)
if window is None:
window = np.ones(seg_length)
else:
window = window(seg_length)
data = self.datafile.alldata
if self.continuous and seg_length is not None:
n = np.prod(data.shape)
n -= n % seg_length
data = data[:n].reshape((n // seg_length, seg_length))
for d in data:
y = d - d.mean()
if y.max() - y.min() < max_excursion and len(y) == spectrum.m2:
spectrum.addDataSegment(y, window=window)
freq = spectrum.frequencies()
psd = spectrum.spectrum()
if self.hdf5_group is not None:
self.noise_psd[:] = psd
self.noise_psd.attrs["delta_f"] = freq[1] - freq[0]
else:
self.noise_psd = psd
return spectrum
[docs]
def compute_fancy_power_spectra(self, window=mass.mathstat.power_spectrum.hann,
plot=True, seglength_choices=None):
"""Compute a power spectrum using a few long segments for the low freq.
and many short ones for the higher frequencies.
"""
assert self.continuous
n = np.prod(self.datafile.data.shape)
if seglength_choices is None:
longest_seg = 1
while longest_seg <= n // 16:
longest_seg *= 2
seglength_choices = [longest_seg]
while seglength_choices[-1] > 256:
seglength_choices.append(seglength_choices[-1] // 4)
LOG.debug("Will use segments of length: %s", seglength_choices)
spectra = [self.compute_power_spectrum_reshape(window=window, seg_length=seglen)
for seglen in seglength_choices]
if plot:
plt.clf()
lowest_freq = np.array([1. / (s.dt * s.m2) for s in spectra])
start_freq = 0.0
for i, s in enumerate(spectra):
x, y = s.frequencies(), s.spectrum()
if i == len(spectra) - 1:
good = x >= start_freq
else:
good = np.logical_and(x >= start_freq, x < 10 * lowest_freq[i + 1])
plt.loglog(x[good], y[good], '-')
start_freq = lowest_freq[i] * 10
return spectra
[docs]
def plot_power_spectrum(self, axis=None, scale=1.0, sqrt_psd=False, **kwarg):
"""Plot the power spectrum of this noise record.
Args:
<axis> Which plt.Axes object to plot on. If none, clear the figure and plot there.
<scale> Scale all raw units by this number to convert counts to physical
<sqrt_psd> Whether to take the sqrt(PSD) for plotting. Default is no sqrt
"""
if all(self.noise_psd[:] == 0):
self.compute_power_spectrum(plot=False)
if axis is None:
plt.clf()
axis = plt.subplot(111)
yvalue = self.noise_psd[1:] * (scale**2)
if sqrt_psd:
yvalue = np.sqrt(yvalue)
freq = np.arange(1, 1 + len(yvalue)) * self.noise_psd.attrs['delta_f']
axis.plot(freq, yvalue, **kwarg)
plt.loglog()
axis.grid()
axis.set_xlim([10, 3e5])
axis.set_xlabel("Frequency (Hz)")
axis.set_ylabel(r"Power Spectral Density (counts$^2$ Hz$^{-1}$)")
axis.set_title("Noise power spectrum for %s" % self.filename)
def _compute_continuous_autocorrelation(self, n_lags=None, data_samples=None,
max_excursion=1000):
if data_samples is None:
data_samples = [0, self.nSamples * self.nPulses]
n_data = data_samples[1] - data_samples[0]
samples_per_segment = self.records_per_segment * self.nSamples
if n_lags is None:
n_lags = samples_per_segment
if n_lags > samples_per_segment:
n_lags = samples_per_segment
def padded_length(n):
"""Return a sensible number in the range [n, 2n] which is not too
much larger than n, yet is good for FFTs.
Returns:
A number: (1, 3, or 5)*(a power of two), whichever is smallest.
"""
pow2 = np.round(2**np.ceil(np.log2(n)))
if n == pow2:
return int(n)
elif n > 0.75 * pow2:
return int(pow2)
elif n > 0.625 * pow2:
return int(np.round(0.75 * pow2))
else:
return int(np.round(0.625 * pow2))
# When there are 10 million data points and only 10,000 lags wanted,
# it's hugely inefficient to compute the full autocorrelation, especially
# in memory. Instead, compute it on chunks several times the length of the desired
# correlation, and average.
CHUNK_MULTIPLE = 15
if n_data < CHUNK_MULTIPLE * n_lags:
n_lags = n_data // CHUNK_MULTIPLE
if n_lags < self.nSamples:
msg = f"There are not enough data to compute at least {self.nSamples} lags."
msg += f"\nn_data={n_data}, n_lags * {CHUNK_MULTIPLE} = {CHUNK_MULTIPLE * n_lags}"
raise ValueError(msg)
# Be sure to pad chunksize samples by AT LEAST n_lags zeros, to prevent
# unwanted wraparound in the autocorrelation.
# padded_data is what we do DFT/InvDFT on; ac is the unnormalized output.
chunksize = CHUNK_MULTIPLE * n_lags
padsize = n_lags
padded_data = np.zeros(padded_length(padsize + chunksize), dtype=float)
ac = np.zeros(n_lags, dtype=float)
entries = 0.0
first, last = data_samples
data = self.datafile.alldata[first:last].ravel()
Nchunks = np.prod(data.shape) // chunksize
datachunks = data[:Nchunks * chunksize].reshape(Nchunks, chunksize)
for data in datachunks:
padded_data[:chunksize] = data - data.mean()
padded_data[chunksize:] = 0.0
if np.abs(padded_data).max() > max_excursion:
continue
ft = np.fft.rfft(padded_data)
ft[0] = 0 # this redundantly removes the mean of the data set
power = (ft * ft.conj()).real
acsum = np.fft.irfft(power)
ac += acsum[:n_lags]
entries += 1.0
if entries == 0:
raise Exception(
"Apparently all chunks had excusions, so no autocorrelation was computed")
ac /= entries
ac /= (np.arange(chunksize, chunksize - n_lags + 0.5, -1.0, dtype=float))
self.autocorrelation[:] = ac
[docs]
def compute_autocorrelation(self, n_lags=None, data_samples=None, plot=True, max_excursion=1000):
"""Compute the autocorrelation averaged across all "pulses" in the file.
Args:
<n_lags>
<data_samples> If not None, then a range [a,b] to use.
"""
if self.continuous:
self._compute_continuous_autocorrelation(n_lags=n_lags, data_samples=data_samples,
max_excursion=max_excursion)
else:
self._compute_broken_autocorrelation(n_lags=n_lags, data_samples=data_samples,
max_excursion=max_excursion)
if self.hdf5_group is not None:
grp = self.hdf5_group.require_group("reclen%d" % n_lags)
ds = grp.require_dataset("autocorrelation", shape=(n_lags,), dtype=np.float64)
ds[:] = self.autocorrelation[:]
if plot:
self.plot_autocorrelation()
def _compute_broken_autocorrelation(self, n_lags=None, data_samples=None,
max_excursion=1000):
if n_lags is None:
n_lags = self.nSamples
if n_lags > self.nSamples:
raise ValueError("The autocorrelation can't be computed for "
f"n_lags>nsamp={self.nSamples} when data are not continuous")
if data_samples is None:
data_samples = [0, self.nSamples * self.nPulses]
records_used = samples_used = 0
ac = np.zeros(self.nSamples, dtype=float)
first = data_samples[0]
idx_first = first // self.nSamples
if first % self.nSamples > 0:
idx_first += 1
idx_last = data_samples[1] // self.nSamples
for i in range(idx_first, idx_last):
data = 1.0 * self.datafile.alldata[i]
if data.max() - data.min() > max_excursion:
continue
data -= data.mean()
ac += np.correlate(data, data, 'full')[self.nSamples - 1:]
samples_used += self.nSamples
records_used += 1
ac /= records_used
ac /= self.nSamples - np.arange(self.nSamples, dtype=float)
if n_lags < self.nSamples:
ac = ac[:n_lags]
self.autocorrelation[:] = ac
[docs]
def plot_autocorrelation(self, axis=None, color='blue', label=None):
"""Plot the autocorrelation function."""
if all(self.autocorrelation[:] == 0):
LOG.info("Autocorrelation must be computed before it can be plotted")
self.compute_autocorrelation(plot=False)
if axis is None:
plt.clf()
axis = plt.subplot(111)
t = self.timebase * 1e3 * np.arange(len(self.autocorrelation))
axis.plot(t, self.autocorrelation[:], label=label, color=color)
axis.plot([0], [self.autocorrelation[0]], 'o', color=color)
axis.set_xlabel("Lag (ms)")
axis.set_ylabel(r"Autocorrelation (counts$^2$)")
[docs]
class PulseRecords:
"""
Encapsulate a set of data containing multiple triggered pulse traces.
The pulses should not be noise records.
This object will not contain derived facts such as pulse summaries, filtered values,
and so forth. It is meant to be only a file interface.
"""
ALLOWED_TYPES = ("ljh", "virtual")
def __init__(self, filename, file_format=None):
"""Contain and analyze a noise records file.
Args:
filename: name of the pulse data file
"""
self.nSamples = 0
self.nPresamples = 0
self.nPulses = 0
self.n_segments = 0
self.segmentsize = 0
self.pulses_per_seg = 0
self.timebase = None
self.timestamp_offset = 0
self.channum = ljh_util.ljh_channum(filename)
self.datafile = None
self.__open_file(filename, file_format=file_format)
def __open_file(self, filename, file_format=None):
"""Detect the filetype and open it."""
if file_format is None:
if isinstance(filename, VirtualFile):
file_format = 'virtual'
elif filename.endswith("ljh"):
file_format = "ljh"
else:
file_format = "ljh"
if file_format not in self.ALLOWED_TYPES:
raise ValueError("file_format must be None or one of %s" % ",".join(self.ALLOWED_TYPES))
if file_format == "ljh":
self.datafile = LJHFile.open(filename)
elif file_format == "virtual":
vfile = filename # Aha! It must not be a string
self.datafile = vfile
else:
raise RuntimeError("It is a programming error to get here")
self.filename = filename
# Copy up some of the most important attributes
for attr in ("nSamples", "nPresamples", "nPulses", "timebase", "channum",
"n_segments", "pulses_per_seg", "segmentsize", "timestamp_offset"):
self.__dict__[attr] = self.datafile.__dict__[attr]
def __str__(self):
return "%s path '%s'\n%d samples (%d pretrigger) at %.2f microsecond sample time" % (
self.__class__.__name__, self.filename, self.nSamples, self.nPresamples,
1e6 * self.timebase)
def __repr__(self):
return f"{self.__class__.__name__}('{self.filename}')"
[docs]
def set_segment_size(self, seg_size):
"""Update the underlying file's segment size in bytes."""
self.datafile.set_segment_size(seg_size)
self.n_segments = self.datafile.n_segments
self.pulses_per_seg = self.datafile.pulses_per_seg
self.segmentsize = self.datafile.segmentsize
[docs]
def copy(self):
"""Return a copy of the object.
Handy when coding and you don't want to read the whole data set back in, but
you do want to update the method definitions."""
c = PulseRecords(self.filename)
c.__dict__.update(self.__dict__)
c.datafile = self.datafile.copy()
return c
[docs]
class GroupLooper:
"""A mixin class to allow TESGroup objects to hold methods that loop over
their constituent channels. (Has to be a mixin, in order to break the import
cycle that would otherwise occur.)"""
pass
def _add_group_loop():
"""Add MicrocalDataSet method `method` to GroupLooper (and hence, to TESGroup).
This is a decorator to add before method definitions inside class MicrocalDataSet.
Usage is:
class MicrocalDataSet(...):
...
@_add_group_loop()
def awesome_fuction(self, ...):
...
"""
is_running_tests = "pytest" in sys.modules
def decorator(method):
method_name = method.__name__
def wrapper(self, *args, **kwargs):
rethrow = kwargs.pop("_rethrow", is_running_tests) # always throw errors when testing
for ds in self:
try:
method(ds, *args, **kwargs)
except KeyboardInterrupt as e:
raise e
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
s = traceback.format_exception(exc_type, exc_value, exc_traceback)
if rethrow:
raise
self.set_chan_bad(ds.channum, f"failed {method_name} with {e}\ntraceback:\n{s}")
wrapper.__name__ = method_name
# Generate a good doc-string.
lines = ["Loop over self, calling the %s(...) method for each channel." % method_name]
try:
argtext = inspect.signature(method) # Python 3.3 and later
except AttributeError:
arginfo = inspect.getargspec(method)
argtext = inspect.formatargspec(*arginfo)
if method.__doc__ is None:
lines.append(f"\n{method_name}{argtext} has no docstring")
else:
lines.append(f"\n{method_name}{argtext} docstring reads:")
lines.append(method.__doc__)
wrapper.__doc__ = "\n".join(lines)
setattr(GroupLooper, method_name, wrapper)
return method
return decorator
[docs]
class MicrocalDataSet: # noqa: PLR0904
"""Represent a single microcalorimeter's PROCESSED data."""
# Attributes that all such objects must have.
expected_attributes = ("nSamples", "nPresamples", "nPulses", "timebase", "channum",
"timestamp_offset")
HDF5_CHUNK_SIZE = 256
def __init__(self, pulserec_dict, tes_group=None, hdf5_group=None):
"""
Args:
pulserec_dict: a dictionary (presumably that of a PulseRecords object)
containing the expected attributes that must be copied to this
MicrocalDataSet.
tes_group: the parent TESGroup object of which this is a member
(default None).
hdf5_group: the HDF5 group in which the relevant per-pulse data are
cached. You really want this to exist, for reasons of both performance
and data backup. (default None)
"""
self.nSamples = 0
self.nPresamples = 0
self.nPulses = 0
self.timebase = 0.0
self.channum = None
self.timestamp_offset = 0
self.filter = None
self.lastUsedFilterHash = -1
self.drift_correct_info = {}
self.phase_correct_info = {}
self.time_drift_correct_info = {}
self.noise_autocorr = None
self.noise_demodulated = None
self.calibration = {}
for a in self.expected_attributes:
self.__dict__[a] = pulserec_dict[a]
self.filename = pulserec_dict.get('filename', 'virtual data set')
self.pretrigger_ignore_samples = 0 # Cut this long before trigger in computing pretrig values
self.cut_pre = 0 # Number of presamples to ignore at start of pulse
self.cut_post = 0 # Number of samples to ignore at end of pulse
self.peak_samplenumber = None # Look for retriggers only after this time.
self.index = None # Index in the larger TESGroup object
self.last_used_calibration = None
self.data = None
self.times = None
self.subframecount = None
self.number_of_rows = None
self.row_number = None
self.number_of_columns = None
self.column_number = None
self.subframe_divisions = None
self.subframe_offset = None
self.subframe_timebase = None
self._filter_type = "ats"
self.tes_group = tes_group
try:
self.hdf5_group = hdf5_group
if "npulses" not in self.hdf5_group.attrs: # to allow TESGroupHDF5 with in read only mode
self.hdf5_group.attrs['npulses'] = self.nPulses
else:
assert self.hdf5_group.attrs['npulses'] == self.nPulses
if "channum" not in self.hdf5_group.attrs: # to allow TESGroupHDF5 with in read only mode
self.hdf5_group.attrs['channum'] = self.channum
else:
assert self.hdf5_group.attrs['channum'] == self.channum
except KeyError:
self.hdf5_group = None
self.__setup_vectors(npulses=self.nPulses)
self.__load_filters_from_hdf5()
self.__load_cals_from_hdf5()
self.__load_auto_cuts()
self.__load_corrections()
[docs]
def toOffStyle(self):
a = self._makeNumpyArray()
return mass.off.channels.ChannelFromNpArray(a,
channum=self.channum,
shortname=self.shortname,
experimentStateFile=self.tes_group.experimentStateFile)
def _makeNumpyArray(self, fields=None, prefix="p_"):
if fields is None:
fields = [k for k in self.__dict__ if k.startswith(prefix)]
_dtypes = [self.__dict__[k].dtype for k in fields]
dtlist = list(zip(fields, _dtypes))
dtype = np.dtype(dtlist)
a = np.zeros(len(self.__dict__[fields[0]]), dtype)
for k in fields:
a[k] = self.__dict__[k][:]
return a
def __setup_vectors(self, npulses=None):
"""Given the number of pulses, build arrays to hold the relevant facts
about each pulse in memory.
These will include:
* p_filt_value = pulse height after running through filter
* p_fil_value_dc = pulse height after running through filter and applying drift correction
* p_filt_value_phc = phase corrected pulse height
* p_energy = pulse energy determined from applying a calibration to one of the p_filt_value??? variables
* ...and many others.
"""
if npulses is None:
assert self.nPulses > 0
npulses = self.nPulses
h5grp = self.hdf5_group
# Set up the per-pulse vectors
float64_fields = ('timestamp',)
float32_fields = ('pretrig_mean', 'pretrig_rms', 'pulse_average', 'pulse_rms',
'promptness', 'rise_time', 'postpeak_deriv',
'pretrig_deriv', 'pretrig_offset',
'filt_phase', 'filt_phase_corr', 'filt_value', 'filt_value_dc',
'filt_value_phc', 'filt_value_tdc',
'energy')
uint16_fields = ('peak_index', 'peak_value', 'min_value')
int64_fields = ()
bool_fields = ('shift1',)
for dtype, fieldnames in ((np.float64, float64_fields),
(np.float32, float32_fields),
(np.uint16, uint16_fields),
(bool, bool_fields),
(np.int64, int64_fields)):
for field in fieldnames:
self.__dict__['p_%s' % field] = h5grp.require_dataset(field, shape=(npulses,),
dtype=dtype)
# Workaround for fact that this value changed names in Feb 2024 from "rowcount" to "subframecount"
if "rowcount" in h5grp:
self.p_subframecount = h5grp.require_dataset("rowcount", shape=(npulses,), dtype=np.int64)
else:
self.p_subframecount = h5grp.require_dataset("subframecount", shape=(npulses,), dtype=np.int64)
if "peak_samplenumber" in self.p_peak_index.attrs:
self.peak_samplenumber = self.p_peak_index.attrs["peak_samplenumber"]
# Other vectors needed per-channel
self.average_pulse = h5grp.require_dataset('average_pulse', shape=(self.nSamples,),
dtype=np.float32)
self.noise_autocorr = h5grp.require_dataset('noise_autocorr', shape=(self.nSamples,),
dtype=np.float64)
nfreq = 1 + self.nSamples // 2
self.noise_psd = h5grp.require_dataset('noise_psd', shape=(nfreq,),
dtype=np.float64)
grp = self.hdf5_group.require_group('cuts')
self.cuts = Cuts(self.nPulses, self.tes_group, hdf5_group=grp)
def __load_filters_from_hdf5(self, overwrite=False):
if 'filters' not in self.hdf5_group:
return
filter_group = self.hdf5_group['filters']
fmax = filter_group.attrs['fmax'] if 'fmax' in filter_group.attrs else None
f_3db = filter_group.attrs['f_3db'] if 'f_3db' in filter_group.attrs else None
shorten = filter_group.attrs['shorten'] if 'shorten' in filter_group.attrs else None
version = 0 # older hdf5 files with filters have no version number, assign 0
if "version" in filter_group.attrs:
version = filter_group.attrs["version"]
filter_type = "5lag"
if version > 0:
filter_type = filter_group.attrs["filter_type"] # a string
if isinstance(filter_type, bytes):
filter_type = filter_type.decode()
elif "newfilter" in filter_group.attrs:
# version 0 hdf5 files either did or did not have the "newfilter" attribute, newfilter corresponds to ats filters
filter_type = "ats"
if filter_type == "ats":
# arrival time safe filter can be shorter than records by 1 sample, or equal in length
if version > 0:
# Version 1 avg_signal was an attribute until Nov 2021, when we fixed #208.
# Try to read as a dataset, then as attribute so that old HDF5 files still work.
try:
avg_signal = filter_group["avg_signal"][()]
except KeyError:
avg_signal = filter_group.attrs["avg_signal"][()]
aterms = filter_group["filt_aterms"][()]
else:
# version 0 hdf5 files did not store avg_signal, use truncated average_pulse instead
avg_signal, aterms = self.average_pulse[1:], filter_group["filt_aterms"][()]
model = np.vstack([avg_signal, aterms]).T
modelpeak = np.max(avg_signal)
self.filter = ArrivalTimeSafeFilter(model,
self.nPresamples - self.pretrigger_ignore_samples,
self.noise_autocorr,
sample_time=self.timebase,
peak=modelpeak)
elif filter_type == "5lag":
self.filter = Filter(self.average_pulse[...],
self.nPresamples - self.pretrigger_ignore_samples,
self.noise_psd[...],
self.noise_autocorr, sample_time=self.timebase,
shorten=shorten)
else:
raise Exception(f"filter_type={filter_type}, must be `ats` or `5lag`")
self.filter.fmax = fmax
self.filter.f_3db = f_3db
self._filter_type = filter_type
for k in ["filt_fourier", "filt_fourier_full", "filt_noconst",
"filt_baseline", "filt_baseline_pretrig", "filt_aterms"]:
if k in filter_group:
filter_ds = filter_group[k]
setattr(self.filter, k, filter_ds[...])
if 'variance' in filter_ds.attrs:
self.filter.variances[k.split("filt_")[1]] = filter_ds.attrs['variance']
if 'predicted_v_over_dv' in filter_ds.attrs:
self.filter.predicted_v_over_dv[k.split(
"filt_")[1]] = filter_ds.attrs['predicted_v_over_dv']
def __load_cals_from_hdf5(self, overwrite=False):
"""Load all calibrations in self.hdf5_group["calibration"] into the dict
self.calibration.
"""
hdf5_cal_group = self.hdf5_group.require_group('calibration')
for k in hdf5_cal_group.keys():
if not overwrite:
if k in self.calibration.keys():
raise ValueError(
"trying to load over existing calibration, consider passing overwrite=True")
self.calibration[k] = EnergyCalibration.load_from_hdf5(hdf5_cal_group, k)
def __load_corrections(self):
# drift correction should be loaded here, but I don't think it is loaded at all, here or anywhere!
if "phase_correction" in self.hdf5_group:
self.phaseCorrector = phase_correct.PhaseCorrector.fromHDF5(
self.hdf5_group, name="phase_correction")
@property
@deprecated(deprecated_in="0.8.2", details="Use subframecount, which is equivalent but better named")
def rowcount(self):
return self.subframecount
@property
def p_peak_time(self):
peak_index = np.asarray(self.p_peak_index[:], dtype=float)
return (peak_index - self.nPresamples) * self.timebase
@property
def subframes_after_last_external_trigger(self):
try:
return self.hdf5_group["subframes_after_last_external_trigger"]
except KeyError:
raise ValueError(
"run tes_group.calc_external_trigger_timing before accessing this")
@property
def subframes_until_next_external_trigger(self):
try:
return self.hdf5_group["subframes_until_next_external_trigger"]
except KeyError:
raise ValueError(
"run tes_group.calc_external_trigger_timing before accessing this")
@property
def subframes_from_nearest_external_trigger(self):
try:
return self.hdf5_group["subframes_from_nearest_external_trigger"]
except KeyError:
raise ValueError(
"run tes_group.calc_external_trigger_timing before accessing this")
def __str__(self):
return "%s path '%s'\n%d samples (%d pretrigger) at %.2f microsecond sample time" % (
self.__class__.__name__, self.filename, self.nSamples, self.nPresamples,
1e6 * self.timebase)
def __repr__(self):
return f"{self.__class__.__name__}('{self.filename}')"
[docs]
def updater(self, name):
return self.tes_group.updater(name + f" chan {self.channum:d}")
[docs]
def good(self, *args, **kwargs):
"""Returns a boolean vector, one per pulse record, saying whether record is good"""
return self.cuts.good(*args, **kwargs)
[docs]
def bad(self, *args, **kwargs):
"""Returns a boolean vector, one per pulse record, saying whether record is bad"""
return self.cuts.bad(*args, **kwargs)
[docs]
def resize(self, nPulses):
if self.nPulses < nPulses:
raise ValueError("Can only shrink using resize(), but the requested size %d is larger than current %d" %
(nPulses, self.nPulses))
self.nPulses = nPulses
self.__setup_vectors()
[docs]
def copy(self):
"""Return a deep copy of the object."""
c = MicrocalDataSet(self.__dict__)
c.__dict__.update(self.__dict__)
for k in self.calibration.keys():
c.calibration[k] = self.calibration[k].copy()
c.cuts = self.cuts.copy()
return c
def _compute_peak_samplenumber(self):
peak_idx = self.data[:, self.cut_pre:self.nSamples
- self.cut_post].argmax(axis=1) + self.cut_pre
if version.parse(sp.__version__) >= version.parse("1.9.0"):
self.peak_samplenumber = int(sp.stats.mode(peak_idx, keepdims=False).mode)
else:
# The old way of using sp.stats.mode is deprecated in sp 1.9.0 and will be removed.
self.peak_samplenumber = int(sp.stats.mode(peak_idx)[0][0])
self.p_peak_index.attrs["peak_samplenumber"] = self.peak_samplenumber
return self.peak_samplenumber
[docs]
@show_progress("channel.summarize_data")
def summarize_data(self, peak_time_microsec=None, pretrigger_ignore_microsec=None,
cut_pre=0, cut_post=0,
forceNew=False, use_cython=True,
doPretrigFit=False):
"""Summarize the complete data set one chunk at a time.
Store results in the HDF5 datasets p_pretrig_mean and similar.
Args:
peak_time_microsec: the time in microseconds at which this channel's
pulses typically peak (default None). You should leave this as None,
and let the value be estimated from the data.
pretrigger_ignore_microsec: how much time before the trigger to ignore
when computing pretrigger mean (default None). If None, it will
be chosen sensibly.
cut_pre: Cut this many samples from the start of a pulse record when calculating summary values
cut_post: Cut this many samples from the end of the a record when calculating summary values
forceNew: whether to re-compute summaries if they exist (default False)
use_cython: whether to use cython for summarizing the data (default True).
doPretrigFit: whether to do a linear fit of the pretrigger data
"""
self.number_of_rows = self.pulse_records.datafile.number_of_rows
self.row_number = self.pulse_records.datafile.row_number
self.number_of_columns = self.pulse_records.datafile.number_of_columns
self.column_number = self.pulse_records.datafile.column_number
self.subframe_divisions = self.pulse_records.datafile.subframe_divisions
self.subframe_offset = self.pulse_records.datafile.subframe_offset
self.subframe_timebase = self.timebase / float(self.subframe_divisions)
not_done = all(self.p_pretrig_mean[:] == 0)
if not (not_done or forceNew):
LOG.info('\nchan %d did not summarize because results were already preloaded', self.channum)
return
if len(self.p_timestamp) < self.pulse_records.nPulses:
# make sure vectors are setup correctly
self.__setup_vectors(npulses=self.pulse_records.nPulses)
if peak_time_microsec is None:
self.peak_samplenumber = None
else:
self.peak_samplenumber = self.nPresamples + int(peak_time_microsec * 1e-6 / self.timebase)
if pretrigger_ignore_microsec is None:
self.pretrigger_ignore_samples = 3
else:
self.pretrigger_ignore_samples = int(pretrigger_ignore_microsec * 1e-6 / self.timebase)
self.cut_pre = cut_pre
self.cut_post = cut_post
if use_cython:
if self.peak_samplenumber is None:
self._compute_peak_samplenumber()
self.p_timestamp[:] = self.times[:]
self.p_subframecount[:] = self.subframecount[:]
sumdata = mass.core.cython_channel.summarize_data_cython
for segnum in range(self.pulse_records.n_segments):
first = segnum * self.pulse_records.pulses_per_seg
end = min(first + self.pulse_records.pulses_per_seg, self.nPulses)
results = sumdata(self.data, self.timebase, self.peak_samplenumber,
self.pretrigger_ignore_samples, self.nPresamples,
first, end)
self.p_pretrig_mean[first:end] = results["pretrig_mean"][:]
self.p_pretrig_rms[first:end] = results["pretrig_rms"][:]
self.p_pulse_average[first:end] = results["pulse_average"][:]
self.p_pulse_rms[first:end] = results["pulse_rms"][:]
self.p_promptness[first:end] = results["promptness"][:]
self.p_postpeak_deriv[first:end] = results["postpeak_deriv"][:]
self.p_peak_index[first:end] = results["peak_index"][:]
self.p_peak_value[first:end] = results["peak_value"][:]
self.p_min_value[first:end] = results["min_value"][:]
self.p_rise_time[first:end] = results["rise_times"][:]
self.p_shift1[first:end] = results["shift1"][:]
yield (segnum + 1.0) / self.pulse_records.n_segments
else:
for segnum in range(self.pulse_records.n_segments):
idx_range = np.array([segnum, segnum + 1]) * self.pulse_records.pulses_per_seg
MicrocalDataSet._summarize_data_segment(self, idx_range, doPretrigFit=doPretrigFit)
yield (segnum + 1.0) / self.pulse_records.n_segments
self.hdf5_group.file.flush()
self.__parse_expt_states()
def _summarize_data_segment(self, idx_range, doPretrigFit=False):
"""Summarize one segment of the data file, loading it into cache."""
first, end = idx_range
if first >= self.nPulses:
return
if end > self.nPulses:
end = self.nPulses
if len(self.p_timestamp) <= 0:
self.__setup_vectors(npulses=self.nPulses)
# Don't look for retriggers before this # of samples. Use the most common
# value of the peak index in the currently-loaded segment.
if self.peak_samplenumber is None:
self._compute_peak_samplenumber()
self.p_timestamp[first:end] = self.times[first:end]
self.p_subframecount[first:end] = self.subframecount[first:end]
# Fit line to pretrigger and save the derivative and offset
if doPretrigFit:
presampleNumbers = np.arange(self.cut_pre, self.nPresamples
- self.pretrigger_ignore_samples)
ydata = self.data[first:end, self.cut_pre:self.nPresamples
- self.pretrigger_ignore_samples].T
self.p_pretrig_deriv[first:end], self.p_pretrig_offset[first:end] = \
np.polyfit(presampleNumbers, ydata, deg=1)
self.p_pretrig_mean[first:end] = \
self.data[first:end, self.cut_pre:self.nPresamples
- self.pretrigger_ignore_samples].mean(axis=1)
self.p_pretrig_rms[first:end] = \
self.data[first:end, self.cut_pre:self.nPresamples
- self.pretrigger_ignore_samples].std(axis=1)
self.p_peak_index[first:end] = self.data[first:end,
self.cut_pre:self.nSamples - self.cut_post].argmax(axis=1) + self.cut_pre
self.p_peak_value[first:end] = self.data[first:end,
self.cut_pre:self.nSamples - self.cut_post].max(axis=1)
self.p_min_value[first:end] = self.data[first:end,
self.cut_pre:self.nSamples - self.cut_post].min(axis=1)
self.p_pulse_average[first:end] = self.data[first:end,
self.nPresamples:self.nSamples - self.cut_post].mean(axis=1)
# Remove the pretrigger mean from the peak value and the pulse average figures.
ptm = self.p_pretrig_mean[first:end]
self.p_pulse_average[first:end] -= ptm
self.p_peak_value[first:end] -= np.asarray(ptm, dtype=self.p_peak_value.dtype)
self.p_pulse_rms[first:end] = np.sqrt(
(self.data[first:end, self.nPresamples:self.nSamples - self.cut_post]**2.0).mean(axis=1)
- ptm * (ptm + 2 * self.p_pulse_average[first:end]))
shift1 = (self.data[first:end, self.nPresamples - 1] - ptm
> 4.3 * self.p_pretrig_rms[first:end])
self.p_shift1[first:end] = shift1
halfidx = (self.nPresamples + 2 + self.peak_samplenumber) // 2
pkval = self.p_peak_value[first:end]
prompt = (self.data[first:end, self.nPresamples + 2:halfidx].mean(axis=1)
- ptm) / pkval
prompt[shift1] = (self.data[first:end, self.nPresamples + 1:halfidx - 1][shift1, :].mean(axis=1)
- ptm[shift1]) / pkval[shift1]
self.p_promptness[first:end] = prompt
self.p_rise_time[first:end] = \
mass.core.analysis_algorithms.estimateRiseTime(self.data[first:end, self.cut_pre:self.nSamples - self.cut_post],
timebase=self.timebase,
nPretrig=self.nPresamples - self.cut_pre)
self.p_postpeak_deriv[first:end] = \
mass.core.analysis_algorithms.compute_max_deriv(self.data[first:end, self.cut_pre:self.nSamples - self.cut_post],
ignore_leading=self.peak_samplenumber - self.cut_pre)
def __parse_expt_states(self):
"""
Load experiment states from the state file and store the slices found for each state
as a categorical cut.
"""
esf = self.tes_group.experimentStateFile
if esf is None:
return
nano = self.p_timestamp[:] * 1e9
slicedict = esf.calcStatesDict(nano)
state_codes = np.zeros(self.nPulses, dtype=np.uint32)
for id, state in enumerate(slicedict.keys()):
slice = slicedict[state]
state_codes[slice.start:slice.stop] = id + 1
self.cuts.cut("state", state_codes)
[docs]
def compute_average_pulse(self, mask, subtract_mean=True, forceNew=False):
"""Compute the average pulse this channel.
Store as self.average_pulse
Args:
mask -- A boolean array saying which records to average.
subtract_mean -- Whether to subtract the pretrigger mean and set the
pretrigger period to strictly zero (default True).
forceNew -- Whether to recompute when already exists (default False)
"""
# Don't proceed if not necessary and not forced
already_done = self.average_pulse[-1] != 0
if already_done and not forceNew:
LOG.info("skipping compute average pulse on chan %d", self.channum)
return
average_pulse = self.data[mask, :].mean(axis=0)
if subtract_mean:
average_pulse -= np.mean(average_pulse[:self.nPresamples
- self.pretrigger_ignore_samples])
self.average_pulse[:] = average_pulse
[docs]
@_add_group_loop()
def avg_pulses_auto_masks(self, max_pulses_to_use=7000, subtract_mean=True, forceNew=False):
"""Compute an average pulse.
Compute average pulse using an automatically generated mask of
+- 5%% around the median pulse_average value.
Args:
max_pulses_to_use (int): Use no more than
the first this many good pulses (default 7000).
forceNew (bool): whether to re-compute if results already exist (default False)
"""
use = self.good()
if use.sum() <= 0:
raise ValueError("No good pulses")
median_pulse_avg = np.median(self.p_pulse_average[use])
mask = np.abs(self.p_pulse_average[:] / median_pulse_avg - 1) < 0.05
mask = np.logical_and(mask, use)
if mask.sum() <= 0:
raise ValueError("No good pulses within 5%% of median size.")
if np.sum(mask) > max_pulses_to_use:
good_so_far = np.cumsum(mask)
stop_at = (good_so_far == max_pulses_to_use).argmax()
mask[stop_at + 1:] = False
self.compute_average_pulse(mask, subtract_mean=subtract_mean,
forceNew=forceNew)
def _filter_to_hdf5(self):
# Store all filters created to a new HDF5 group
if "filters" in self.hdf5_group:
del self.hdf5_group["filters"]
h5grp = self.hdf5_group.require_group('filters')
if self.filter.f_3db is not None:
h5grp.attrs['f_3db'] = self.filter.f_3db
if self.filter.fmax is not None:
h5grp.attrs['fmax'] = self.filter.fmax
h5grp.attrs['peak'] = self.filter.peak_signal
h5grp.attrs['shorten'] = self.filter.shorten
h5grp.attrs['filter_type'] = self._filter_type
h5grp.attrs["version"] = 1
h5grp.create_dataset("avg_signal", data=self.filter.avg_signal)
for k in ["filt_fourier", "filt_fourier_full", "filt_noconst",
"filt_baseline", "filt_baseline_pretrig", 'filt_aterms']:
if k in h5grp:
del h5grp[k]
if getattr(self.filter, k, None) is not None:
vec = h5grp.create_dataset(k, data=getattr(self.filter, k))
shortname = k.split('filt_')[1]
vec.attrs['variance'] = self.filter.variances.get(shortname, 0.0)
vec.attrs['predicted_v_over_dv'] = self.filter.predicted_v_over_dv.get(
shortname, 0.0)
@property
def shortname(self):
"""return a string containing part of the filename and the channel number, useful for labelling plots"""
s = os.path.split(self.filename)[-1]
chanstr = "chan%g" % self.channum
if chanstr not in s:
s += chanstr
return s
[docs]
@_add_group_loop()
def compute_5lag_filter(self, fmax=None, f_3db=None, cut_pre=0, cut_post=0, category={}, forceNew=False):
"""Requires that compute_noise has been run and that average pulse has been computed"""
if "filters" in self.hdf5_group and not forceNew:
LOG.info(f"ch {self.channum} skpping comput 5lag filter because it is already done")
return
if all(self.noise_autocorr[:] == 0):
raise Exception("compute noise first")
if not (category is None or category == {}):
raise Exception(
"category argument has no effect on compute_oldfilter, pass None or {}. compute_oldfilter uses self.average_pulse")
f = self._compute_5lag_filter_no_mutation(fmax, f_3db, cut_pre, cut_post)
self.filter = f
self._filter_type = "5lag"
self._filter_to_hdf5()
return f
def _compute_5lag_filter_no_mutation(self, fmax, f_3db, cut_pre, cut_post):
try:
spectrum = self.noise_spectrum.spectrum()
except Exception:
spectrum = self.noise_psd[:]
avg_signal = np.array(self.average_pulse)
if np.sum(np.abs(self.average_pulse)) == 0:
raise Exception("average pulse is all zeros, try avg_pulses_auto_masks first")
f = mass.core.Filter(avg_signal, self.nPresamples - self.pretrigger_ignore_samples,
spectrum, self.noise_autocorr, sample_time=self.timebase,
shorten=2, cut_pre=cut_pre, cut_post=cut_post)
f.compute(fmax=fmax, f_3db=f_3db)
return f
[docs]
@_add_group_loop()
def compute_ats_filter(self, fmax=None, f_3db=None, transform=None, cut_pre=0, cut_post=0, # noqa: PLR0917, PLR0914
category={}, shift1=True, forceNew=False, minimum_n_pulses=20,
maximum_n_pulses=4000, optimize_dp_dt=True):
"""Compute a arrival-time-safe filter to model the pulse and its time-derivative.
Requires that `compute_noise` has been run.
Args:
fmax: if not None, the hard cutoff in frequency space, above which
the DFT of the filter will be set to zero (default None)
f_3db: if not None, the 3 dB rolloff point in frequency space, above which
the DFT of the filter will rolled off with a 1-pole filter
(default None)
transform: a callable object that will be called on all data records
before filtering (default None)
optimize_dp_dt: bool, try a more elaborate approach to dp_dt than just the finite
difference (works well for x-ray, bad for gamma rays)
cut_pre: Cut this many samples from the start of the filter, giving them 0 weight.
cut_post: Cut this many samples from the end of the filter, giving them 0 weight.
shift1: Potentially shift each pulse by one sample based on ds.shift1 value,
resulting filter is one sample shorter than pulse records.
If you used a zero threshold trigger (eg dastard egdeMulti you can likely use shift1=False)
Returns:
the filter (an ndarray)
Modified in April 2017 to make the model for the rising edge and the rest of
the pulse differently. For the rising edge, we use entropy minimization to understand
the pulse shape dependence on arrival-time. For the rest of the pulse, it
is less noisy and in fact more robust to rely on the finite-difference of
the pulse average to get the arrival-time dependence.
"""
if "filters" in self.hdf5_group and not forceNew:
LOG.info(f"ch {self.channum} skipping compute_ats_filter because it is already done")
return
if all(self.noise_autocorr[:] == 0):
raise Exception("compute noise first")
# At the moment, 1st-order model vs arrival-time is required.
DEGREE = 1
# The raw training data, which is shifted (trigger-aligned)
data, pulsenums = self.first_n_good_pulses(maximum_n_pulses, category=category)
if len(pulsenums) < minimum_n_pulses:
raise Exception(f"too few good pulses, ngood={len(pulsenums)}")
if shift1:
raw = data[:, 1:]
_shift1 = self.p_shift1[:][pulsenums]
raw[_shift1, :] = data[_shift1, 0:-1]
else:
raw = data[:, :]
# Center promptness around 0, using a simple function of Prms
prompt = self.p_promptness[:][pulsenums]
prms = self.p_pulse_rms[:][pulsenums]
mprms = np.median(prms)
use = np.abs(prms / mprms - 1.0) < 0.3
promptshift = np.poly1d(np.polyfit(prms[use], prompt[use], 1))
prompt -= promptshift(prms)
# Scale promptness quadratically to cover the range -0.5 to +0.5, approximately
x, y, z = np.percentile(prompt[use], [10, 50, 90])
A = np.array([[x * x, x, 1],
[y * y, y, 1],
[z * z, z, 1]])
param = np.linalg.solve(A, [-.4, 0, +.4])
ATime = np.poly1d(param)(prompt)
use = np.logical_and(use, np.abs(ATime) < 0.45)
ATime = ATime[use]
ptm = self.p_pretrig_mean[:][pulsenums]
ptm.shape = (len(pulsenums), 1)
raw = (raw - ptm)[use, :]
if transform is not None:
raw = transform(raw)
rawscale = raw.max(axis=1)
# The 0 component of the model is an average pulse, but do not use
# self.average_pulse, because it doesn't account for the shift1.
if shift1:
model = np.zeros((self.nSamples - 1, 1 + DEGREE), dtype=float)
else:
model = np.zeros((self.nSamples, 1 + DEGREE), dtype=float)
ap = (raw.T / rawscale).mean(axis=1)
apmax = np.max(ap)
model[:, 0] = ap / apmax
model[1:-1, 1] = (ap[2:] - ap[:-2]) * 0.5 / apmax
model[-1, 1] = (ap[-1] - ap[-2]) / apmax
model[:self.nPresamples - 1, :] = 0
if optimize_dp_dt:
# Now use min-entropy computation to model dp/dt on the rising edge
def cost(slope, x, y):
return mass.mathstat.entropy.laplace_entropy(y - x * slope, 0.002)
if self.peak_samplenumber is None:
self._compute_peak_samplenumber()
for samplenum in range(self.nPresamples - 1, self.peak_samplenumber):
y = raw[:, samplenum] / rawscale
bestslope = sp.optimize.brent(cost, (ATime, y), brack=[-.1, .25], tol=1e-7)
model[samplenum, 1] = bestslope
modelpeak = np.median(rawscale)
self.pulsemodel = model
f = ArrivalTimeSafeFilter(model, self.nPresamples, self.noise_autocorr,
sample_time=self.timebase, peak=modelpeak)
f.compute(fmax=fmax, f_3db=f_3db, cut_pre=cut_pre, cut_post=cut_post)
self.filter = f
if np.any(np.isnan(f.filt_noconst)) or np.any(np.isnan(f.filt_aterms)):
raise Exception("{}. model {}, nPresamples {}, noise_autcorr {}, timebase {}, modelpeak {}".format(
"there are nan values in your filters!! BAD",
model, self.nPresamples, self.noise_autocorr, self.timebase, modelpeak))
self._filter_type = "ats"
self._filter_to_hdf5()
return f
[docs]
@_add_group_loop()
def filter_data(self, filter_name='filt_noconst', transform=None, forceNew=False, use_cython=None):
"""Filter the complete data file one chunk at a time.
Args:
filter_name: the object under self.filter to use for filtering the
data records (default 'filt_noconst')
transform: a callable object that will be called on all data records
before filtering (default None)
forceNew: Whether to recompute when already exists (default False)
"""
if not (forceNew or all(self.p_filt_value[:] == 0)):
LOG.info('\nchan %d did not filter because results were already loaded', self.channum)
return
if use_cython is None:
use_cython = self._filter_type == "5lag"
if self.filter is not None:
filter_values = self.filter.__dict__[filter_name]
else:
filter_values = self.hdf5_group['filters/%s' % filter_name][()]
if use_cython:
if self._filter_type == "ats":
raise ValueError("Cannot perform Arrival-Time-Safe filtering in Cython yet")
fdata = mass.core.analysis_algorithms.filter_data_5lag_cython
fv, fp = fdata(self.data, filter_values)
self.p_filt_value[:] = fv[:]
self.p_filt_phase[:] = fp[:]
self.hdf5_group.file.flush()
return
self._filter_data_nocython(filter_values, transform=transform)
@show_progress("channel.filter_data_tdm")
def _filter_data_nocython(self, filter_values, transform=None):
if self._filter_type == "ats":
if len(filter_values) == self.nSamples - 1:
filterfunction = self._filter_data_segment_ats
elif len(filter_values) == self.nSamples:
# when dastard uses kink model for determining trigger location, we don't need to shift1
# this code path should be followed when filters are created with the shift1=False argument
filterfunction = self._filter_data_segment_ats_dont_shift1
filter_AT = self.filter.filt_aterms[0]
elif self._filter_type == "5lag":
filterfunction = self._filter_data_segment_5lag
filter_AT = None
else:
raise ValueError(f"filter_type={self._filter_type}, must be `ats` or `5lag`")
for s in range(self.pulse_records.n_segments):
first = s * self.pulse_records.pulses_per_seg
end = min(self.nPulses, first + self.pulse_records.pulses_per_seg)
(self.p_filt_phase[first:end],
self.p_filt_value[first:end]) = \
filterfunction(filter_values, filter_AT, first, end, transform)
yield (end + 1) / float(self.nPulses)
self.hdf5_group.file.flush()
[docs]
def get_pulse_model(self, f, f_5lag, n_basis, pulses_for_svd, extra_n_basis_5lag=0,
maximum_n_pulses=4000, noise_weight_basis=True, category={}):
assert n_basis >= 3
assert isinstance(f, ArrivalTimeSafeFilter), "requires arrival time safe filter"
deriv_like_model = f.pulsemodel[:, 1]
pulse_like_model = f.pulsemodel[:, 0]
if not len(pulse_like_model) == self.nSamples:
raise Exception(f"filter length {len(pulse_like_model)} and nSamples {self.nSamples} don't match, "
"you likely need to use shift1=False in compute_ats_filter")
projectors1 = np.vstack([f.filt_baseline,
f.filt_aterms[0],
f.filt_noconst])
if noise_weight_basis:
basis1 = np.vstack([np.ones(len(pulse_like_model), dtype=float),
deriv_like_model,
pulse_like_model]).T
else:
def joint_norm(x):
"""return y such that x.dot(y)==1 or at least pretty close"""
return x / x.dot(x)
# this leads to terrible results, but I'm leaving it in for now so I don't get tempted to try adding it back for testing
basis1 = np.vstack([joint_norm(p) for p in projectors1]).T
if pulses_for_svd is None:
pulses_for_svd, _ = self.first_n_good_pulses(maximum_n_pulses, category=category)
pulses_for_svd = pulses_for_svd.T
if hasattr(self, "saved_auto_cuts"):
pretrig_rms_median = self.saved_auto_cuts._pretrig_rms_median
pretrig_rms_sigma = self.saved_auto_cuts._pretrig_rms_sigma
else:
raise Exception(
"use autocuts when making projectors, so it can save more info about desired cuts")
v_dv = f.predicted_v_over_dv.get("noconst", 0.0)
self.pulse_model = PulseModel(projectors1, basis1, n_basis, pulses_for_svd,
v_dv, pretrig_rms_median, pretrig_rms_sigma, self.filename,
extra_n_basis_5lag, f_5lag.filt_noconst,
self.average_pulse[:], self.noise_psd[:], self.noise_psd.attrs['delta_f'],
self.noise_autocorr[:])
return self.pulse_model
@_add_group_loop()
def _pulse_model_to_hdf5(self, hdf5_file, n_basis, pulses_for_svd=None, extra_n_basis_5lag=0,
maximum_n_pulses=4000, noise_weight_basis=True, f_3db_5lag=None, category={}):
self.avg_pulses_auto_masks(forceNew=False, max_pulses_to_use=maximum_n_pulses)
f_5lag = self._compute_5lag_filter_no_mutation(
fmax=None, f_3db=f_3db_5lag, cut_pre=0, cut_post=0)
pulse_model = self.get_pulse_model(
self.filter, f_5lag, n_basis, pulses_for_svd, extra_n_basis_5lag,
maximum_n_pulses=maximum_n_pulses, noise_weight_basis=noise_weight_basis, category=category)
save_inverted = self.__dict__.get("invert_data", False)
hdf5_group = hdf5_file.create_group(f"{self.channum}")
pulse_model.toHDF5(hdf5_group, save_inverted)
def _filter_data_segment_5lag(self, filter_values, _filter_AT, first, end, transform=None):
"""Traditional 5-lag filter used by default until 2015."""
if first >= self.nPulses:
return None, None
# These parameters fit a parabola to any 5 evenly-spaced points
fit_array = np.array((
(-6, 24, 34, 24, -6),
(-14, -7, 0, 7, 14),
(10, -5, -10, -5, 10)), dtype=float) / 70.0
assert len(filter_values) + 4 == self.nSamples
seg_size = end - first
data = self.data[first:end]
conv = np.zeros((5, seg_size), dtype=float)
if transform is not None:
ptmean = self.p_pretrig_mean[first:end]
ptmean.shape = (seg_size, 1)
data = transform(data - ptmean)
conv[0, :] = np.dot(data[:, 0:-4], filter_values)
conv[1, :] = np.dot(data[:, 1:-3], filter_values)
conv[2, :] = np.dot(data[:, 2:-2], filter_values)
conv[3, :] = np.dot(data[:, 3:-1], filter_values)
conv[4, :] = np.dot(data[:, 4:], filter_values)
param = np.dot(fit_array, conv)
peak_x = -0.5 * param[1, :] / param[2, :]
peak_y = param[0, :] - 0.25 * param[1, :]**2 / param[2, :]
return peak_x, peak_y
def _filter_data_segment_ats(self, filter_values, filter_AT, first, end, transform=None):
"""single-lag filter developed in 2015"""
if first >= self.nPulses:
return None, None
assert len(filter_values) + 1 == self.nSamples
seg_size = end - first
ptmean = self.p_pretrig_mean[first:end]
data = self.data[first:end]
if transform is not None:
ptmean.shape = (seg_size, 1)
data = transform(self.data - ptmean)
ptmean.shape = (seg_size,)
conv0 = np.dot(data[:, 1:], filter_values)
conv1 = np.dot(data[:, 1:], filter_AT)
# Find pulses that triggered 1 sample too late and "want to shift"
want_to_shift = self.p_shift1[first:end]
conv0[want_to_shift] = np.dot(data[want_to_shift, :-1], filter_values)
conv1[want_to_shift] = np.dot(data[want_to_shift, :-1], filter_AT)
AT = conv1 / conv0
return AT, conv0
def _filter_data_segment_ats_dont_shift1(self, filter_values, filter_AT, first, end, transform=None):
# when using a zero threshold trigger (eg dastard using the kink-model) no shift is neccesary
assert len(filter_values == self.nSamples)
seg_size = end - first
ptmean = self.p_pretrig_mean[first:end]
data = self.data[first:end, :]
if transform is not None:
ptmean.shape = (seg_size, 1)
data = transform(data - ptmean)
ptmean.shape = (seg_size,)
conv0 = np.dot(data, filter_values)
conv1 = np.dot(data, filter_AT)
AT = conv1 / conv0
return AT, conv0
[docs]
def plot_summaries(self, valid='uncut', downsample=None, log=False):
"""Plot a summary of the data set, including time series and histograms of
key pulse properties.
Args:
valid: An array of booleans self.nPulses long saying which pulses are to be plotted
*OR* 'uncut' or 'cut', meaning that only uncut or cut data are to be plotted
*OR* None or 'all', meaning that all pulses should be plotted.
downsample: To prevent the scatter plots (left panels) from getting too crowded,
plot only one out of this many samples. If None, then plot will be
downsampled to 10,000 total points (default None).
log (bool): Use logarithmic y-axis on the histograms (right panels). (Default False)
"""
# Convert "uncut" or "cut" to array of all good or all bad data
def isstr(x):
return isinstance(x, ("".__class__, "".__class__))
status = "Plotting selected data"
if valid is None:
status = "Plotting all data, cut or uncut"
elif isstr(valid):
if "uncut" in valid.lower():
valid = self.cuts.good()
status = "Plotting only uncut data"
elif "cut" in valid.lower():
valid = self.cuts.bad()
status = "Plotting only cut data"
elif 'all' in valid.lower():
valid = None
status = "Plotting all data, cut or uncut"
else:
raise ValueError("If valid is a string, it must contain 'all', 'uncut' or 'cut'.")
if valid is not None:
nrecs = valid.sum()
if downsample is None:
downsample = nrecs // 10000
if downsample < 1:
downsample = 1
hour = self.p_timestamp[valid][::downsample] / 3600.0
else:
nrecs = self.nPulses
if downsample is None:
downsample = self.nPulses // 10000
if downsample < 1:
downsample = 1
hour = self.p_timestamp[::downsample] / 3600.0
LOG.info("%s (%d records; %d in scatter plots)", status, nrecs, len(hour))
plottables = (
(self.p_pulse_rms, 'Pulse RMS', 'magenta', None),
(self.p_pulse_average, 'Pulse Avg', 'purple', None),
(self.p_peak_value, 'Peak value', 'blue', None),
(self.p_pretrig_rms, 'Pretrig RMS', 'green', [0, 4000]),
(self.p_pretrig_mean, 'Pretrig Mean', '#00ff26', None),
(self.p_postpeak_deriv, 'Max PostPk deriv', 'gold', [0, 700]),
(self.p_rise_time[:] * 1e3, 'Rise time (ms)', 'orange', [0, 12]),
(self.p_peak_time[:] * 1e3, 'Peak time (ms)', 'red', [-3, 9])
)
# Plot timeseries with 0 = the last 00 UT during or before the run.
last_record = np.max(self.p_timestamp)
last_midnight = last_record - (last_record % 86400)
hour_offset = last_midnight / 3600.
plt.clf()
for i, (vect, label, color, limits) in enumerate(plottables):
# Time series scatter plots (left-hand panels)
plt.subplot(len(plottables), 2, 1 + i * 2)
plt.ylabel(label)
use_vect = vect
if valid is not None:
use_vect = vect[valid]
plt.plot(hour - hour_offset, use_vect[::downsample], '.', ms=1, color=color)
if i == len(plottables) - 1:
plt.xlabel("Time since last UT midnight (hours)")
# Histogram (right-hand panels)
plt.subplot(len(plottables), 2, 2 + i * 2)
if limits is None:
in_limit = np.ones(len(use_vect), dtype=bool)
else:
in_limit = np.logical_and(use_vect[:] > limits[0], use_vect[:] < limits[1])
contents, _bins, _patches = plt.hist(use_vect[in_limit], 200, log=log,
histtype='stepfilled', fc=color, alpha=0.5)
if log:
plt.ylim(ymin=contents.min())
[docs]
@_add_group_loop()
def assume_white_noise(self, noise_variance=1.0, forceNew=False):
"""Set the noise variance to `noise_variance` and the spectrum to be white.
This is appropriate when no noise files were taken.
Though you may set `noise_variance` to a value other than 1, this will affect only the
predicted resolution, and will not change the optimal filters that get computed/used.
Args:
noise_variance(number): what to set as the lag-0 noise autocorrelation.
forceNew (bool): whether to update the noise autocorrelation if it's already
been set (default False).
"""
if forceNew or all(self.noise_autocorr[:] == 0):
self.noise_autocorr[1:] = 0.0
self.noise_autocorr[0] = noise_variance
psd = 2.0 * noise_variance * self.timebase
self.noise_psd[:] = psd
[docs]
@_add_group_loop()
def compute_noise_nlags(self, n_lags, max_excursion=1000, plot=False):
"""Compute the noise autocorrelation and power spectrum of this channel using records of length nlags.
Treats data in separate noise traces as continuous.
Args:
max_excursion (number): the biggest excursion from the median allowed
in each data segment, or else it will be ignored (default 1000).
n_lags: if not None, the number of lags in each noise spectrum and the max lag
for the autocorrelation. If None, the record length is used
(default None).
forceNew (bool): whether to recompute if it already exists (default False).
"""
self.noise_records_nlags = NoiseRecords(self.noise_records.filename)
self.noise_records_nlags.compute_power_spectrum_reshape(
max_excursion=max_excursion, seg_length=n_lags)
self.noise_records_nlags.compute_autocorrelation(
n_lags=n_lags, plot=False, max_excursion=max_excursion)
if plot:
self.noise_records_nlags.plot_power_spectrum(sqrt_psd=False)
[docs]
@_add_group_loop()
def compute_noise(self, max_excursion=1000, forceNew=False):
"""Compute the noise autocorrelation and power spectrum of this channel.
Args:
max_excursion (number): the biggest excursion from the median allowed
in each data segment, or else it will be ignored (default 1000).
n_lags: if not None, the number of lags in each noise spectrum and the max lag
for the autocorrelation. If None, the record length is used
(default None).
forceNew (bool): whether to recompute if it already exists (default False).
"""
n_lags = self.noise_records.nSamples
if forceNew or all(self.noise_autocorr[:] == 0):
self.noise_records.compute_power_spectrum_reshape(
max_excursion=max_excursion, seg_length=n_lags)
self.noise_records.compute_autocorrelation(
n_lags=n_lags, plot=False, max_excursion=max_excursion)
self.noise_autocorr[:] = self.noise_records.autocorrelation[:len(
self.noise_autocorr[:])]
self.noise_psd[:] = self.noise_records.noise_psd[:len(self.noise_psd[:])]
self.noise_psd.attrs['delta_f'] = self.noise_records.noise_psd.attrs['delta_f']
else:
LOG.info("chan %d skipping compute_noise because already done", self.channum)
# Rename compute_noise_spectra -> compute_noise, because the latter is a better name!
# But use deprecation to not immediately break all code.
[docs]
@_add_group_loop()
@deprecated(deprecated_in="0.7.9", details="Use compute_noise(), which is equivalent but better named")
def compute_noise_spectra(self, max_excursion=1000, n_lags=None, forceNew=False):
"""Replaced by the equivalent compute_noise(...)"""
return self.compute_noise(max_excursion=max_excursion, n_lags=n_lags, forceNew=forceNew)
[docs]
@_add_group_loop()
def apply_cuts(self, controls, clear=False, forceNew=True):
"""Apply the cuts.
Args:
controls (AnalysisControl): contains the cuts to apply.
clear (bool): Whether to clear previous cuts first (default False).
forceNew (bool): whether to recompute if it already exists (default False).
"""
if self.nPulses == 0:
return # don't bother current if there are no pulses
if not forceNew:
if self.cuts.good().sum() != self.nPulses:
LOG.info("Chan %d skipped cuts: after %d are good, %d are bad of %d total pulses",
self.channum, self.cuts.good().sum(), self.cuts.bad().sum(), self.nPulses)
if clear:
self.clear_cuts()
if controls is None:
controls = mass.controller.standardControl()
c = controls.cuts_prm
self.cuts.cut_parameter(self.p_energy, c['energy'], 'energy')
self.cuts.cut_parameter(self.p_pretrig_rms, c['pretrigger_rms'], 'pretrigger_rms')
self.cuts.cut_parameter(self.p_pretrig_mean, c['pretrigger_mean'], 'pretrigger_mean')
self.cuts.cut_parameter(self.p_peak_time[:] * 1e3, c['peak_time_ms'], 'peak_time_ms')
self.cuts.cut_parameter(self.p_rise_time[:] * 1e3, c['rise_time_ms'], 'rise_time_ms')
self.cuts.cut_parameter(self.p_postpeak_deriv, c['postpeak_deriv'], 'postpeak_deriv')
self.cuts.cut_parameter(self.p_pulse_average, c['pulse_average'], 'pulse_average')
self.cuts.cut_parameter(self.p_peak_value, c['peak_value'], 'peak_value')
self.cuts.cut_parameter(
self.p_min_value[:] - self.p_pretrig_mean[:], c['min_value'], 'min_value')
self.cuts.cut_parameter(self.p_timestamp[:], c['timestamp_sec'], 'timestamp_sec')
if c['timestamp_diff_sec'] is not None:
self.cuts.cut_parameter(np.hstack((np.inf, np.diff(self.p_timestamp))),
c['timestamp_diff_sec'], 'timestamp_diff_sec')
if c['subframecount_diff_sec'] is not None:
self.cuts.cut_parameter(np.hstack((np.inf, np.diff(self.p_subframecount[:] * self.subframe_timebase))),
c['subframecount_diff_sec'], 'subframecount_diff_sec')
if c['pretrigger_mean_departure_from_median'] is not None and self.cuts.good().sum() > 0:
median = np.median(self.p_pretrig_mean[self.cuts.good()])
LOG.debug('applying cut on pretrigger mean around its median value of %f', median)
self.cuts.cut_parameter(self.p_pretrig_mean - median,
c['pretrigger_mean_departure_from_median'],
'pretrigger_mean_departure_from_median')
LOG.info("Chan %d after cuts, %d are good, %d are bad of %d total pulses",
self.channum, self.cuts.good().sum(), self.cuts.bad().sum(), self.nPulses)
[docs]
@_add_group_loop()
def clear_cuts(self):
"""Clear all cuts."""
self.cuts.clear_cut()
self.saved_auto_cuts = None
[docs]
def correct_flux_jumps(self, flux_quant):
'''Remove 'flux' jumps' from pretrigger mean.
When using umux readout, if a pulse is recorded that has a very fast
rising edge (e.g. a cosmic ray), the readout system will "slip" an
integer number of flux quanta. This means that the baseline level
returned to after the pulse will different from the pretrigger value by
an integer number of flux quanta. This causes that pretrigger mean
summary quantity to jump around in a way that causes trouble for the
rest of MASS. This function attempts to correct these jumps.
Arguments:
flux_quant -- size of 1 flux quantum
'''
# remember original value, just in case we need it
self.p_pretrig_mean_orig = self.p_pretrig_mean[:]
corrected = mass.core.analysis_algorithms.correct_flux_jumps(
self.p_pretrig_mean[:], self.good(), flux_quant)
self.p_pretrig_mean[:] = corrected
[docs]
@_add_group_loop()
def drift_correct(self, attr="p_filt_value", forceNew=False, category={}):
"""Drift correct using the standard entropy-minimizing algorithm"""
doesnt_exist = all(self.p_filt_value_dc[:] == 0) or all(
self.p_filt_value_dc[:] == self.p_filt_value[:])
if not (forceNew or doesnt_exist):
LOG.info(
"chan %d drift correction skipped, because p_filt_value_dc already populated", self.channum)
return
g = self.cuts.good(**category)
uncorrected = getattr(self, attr)[g]
indicator = self.p_pretrig_mean[g]
drift_corr_param, self.drift_correct_info = \
mass.core.analysis_algorithms.drift_correct(indicator, uncorrected)
self.p_filt_value_dc.attrs.update(self.drift_correct_info) # Store in hdf5 file
LOG.info('chan %d best drift correction parameter: %.6f', self.channum, drift_corr_param)
self._apply_drift_correction(attr=attr)
def _apply_drift_correction(self, attr):
# Apply correction
assert self.p_filt_value_dc.attrs["type"] == "ptmean_gain"
ptm_offset = self.p_filt_value_dc.attrs["median_pretrig_mean"]
uncorrected = getattr(self, attr)[:]
gain = 1 + (self.p_pretrig_mean[:] - ptm_offset) * self.p_filt_value_dc.attrs["slope"]
self.p_filt_value_dc[:] = uncorrected * gain
self.hdf5_group.file.flush()
[docs]
@_add_group_loop()
def phase_correct2014(self, typical_resolution, maximum_num_records=50000, plot=False,
forceNew=False, category={}):
"""Apply the phase correction that worked for calibronium-like data as of June 2014.
For more notes, do help(mass.core.analysis_algorithms.FilterTimeCorrection)
Args:
typical_resolution (number): should be an estimated energy resolution in UNITS OF
self.p_pulse_rms. This helps the peak-finding (clustering) algorithm decide
which pulses go together into a single peak. Be careful to use a semi-reasonable
quantity here.
maximum_num_records (int): don't use more than this many records to learn
the correction (default 50000).
plot (bool): whether to make a relevant plot
forceNew (bool): whether to recompute if it already exists (default False).
category (dict): if not None, then a dict giving a category name and the
required category label.
"""
doesnt_exist = all(self.p_filt_value_phc[:] == 0) or all(
self.p_filt_value_phc[:] == self.p_filt_value_dc[:])
if not (forceNew or doesnt_exist):
LOG.info("channel %d skipping phase_correct2014", self.channum)
return
data, g = self.first_n_good_pulses(maximum_num_records, category)
LOG.info("channel %d doing phase_correct2014 with %d good pulses",
self.channum, data.shape[0])
prompt = self.p_promptness[:]
prms = self.p_pulse_rms[:]
if self.filter is not None:
dataFilter = self.filter.__dict__['filt_noconst']
else:
dataFilter = self.hdf5_group['filters/filt_noconst'][:]
tc = mass.core.analysis_algorithms.FilterTimeCorrection(
data, prompt[g], prms[g], dataFilter,
self.nPresamples, typicalResolution=typical_resolution)
self.p_filt_value_phc[:] = self.p_filt_value_dc[:]
self.p_filt_value_phc[:] -= tc(prompt, prms)
if plot:
fnum = plt.gcf().number
plt.figure(5)
plt.clf()
g = self.cuts.good()
plt.plot(prompt[g], self.p_filt_value_dc[g], 'g.')
plt.plot(prompt[g], self.p_filt_value_phc[g], 'b.')
plt.figure(fnum)
[docs]
@_add_group_loop()
def phase_correct(self, attr="p_filt_value_dc", forceNew=False, category={}, ph_peaks=None,
method2017=True, kernel_width=None, save_to_hdf5=True):
"""Apply the 2017 or 2015 phase correction method.
Args:
forceNew (bool): whether to recompute if it already exists (default False).
category (dict): if not None, then a dict giving a category name and the
required category label.
ph_peaks: Peaks to use for alignment. If None, then use _find_peaks_heuristic()
kernel_width: Width (in PH units) of the kernel-smearing function. If None, use a heuristic.
"""
doesnt_exist = not hasattr(self, "phaseCorrector")
if not (forceNew or doesnt_exist):
LOG.info("channel %d skipping phase_correct", self.channum)
return
good = self.cuts.good(**category)
self.phaseCorrector = phase_correct.phase_correct(self.p_filt_phase[good],
getattr(self, attr)[good],
ph_peaks=ph_peaks, method2017=method2017, kernel_width=kernel_width,
indicatorName="p_filt_phase", uncorrectedName="p_filt_value_dc")
self.p_filt_phase_corr[:] = self.phaseCorrector.phase_uniformifier(self.p_filt_phase[:])
self.p_filt_value_phc[:] = self.phaseCorrector(self.p_filt_phase[:], getattr(self, attr)[:])
if save_to_hdf5:
self.phaseCorrector.toHDF5(self.hdf5_group, overwrite=True)
LOG.info('Channel %3d phase corrected. Correction size: %.2f',
self.channum, mass.mathstat.robust.median_abs_dev(self.p_filt_value_phc[good]
- getattr(self, attr)[good], True))
return self.phaseCorrector
[docs]
def first_n_good_pulses(self, n=50000, category={}):
"""Return the first good pulse records.
Args:
n: maximum number of good pulses to include (default 50000).
Returns:
(data, g)
data is a (X,Y) array where X is number of records, and Y is number of samples per record
g is a 1d array of of pulse record numbers of the pulses in data.
If we did load all of ds.data at once, this would be roughly equivalent to
return ds.data[ds.cuts.good()][:n], np.nonzero(ds.cuts.good())[0][:n]
"""
g = self.cuts.good(**category)
if g.sum() > n:
dont_use = np.nonzero(g)[0][n:]
g[dont_use] = False
return self.data[g], np.nonzero(g)[0]
[docs]
def fit_spectral_line(self, prange, mask=None, times=None, fit_type='dc', line='MnKAlpha',
nbins=200, plot=True, **kwargs):
all_values = {'filt': self.p_filt_value,
'phc': self.p_filt_value_phc,
'dc': self.p_filt_value_dc,
'energy': self.p_energy,
}[fit_type]
if mask is not None:
valid = np.array(mask)
else:
valid = self.cuts.good()
if times is not None:
valid = np.logical_and(valid, self.p_timestamp < times[1])
valid = np.logical_and(valid, self.p_timestamp > times[0])
good_values = all_values[valid]
contents, bin_edges = np.histogram(good_values, nbins, prange)
LOG.info("%d events pass cuts; %d are in histogram range", len(good_values), contents.sum())
bin_ctrs = 0.5 * (bin_edges[1:] + bin_edges[:-1])
# Try line first as a number, then as a fluorescence line, then as a Gaussian
try:
energy = float(line)
module = 'mass.calibration.gaussian_lines'
fittername = f'{module}.GaussianFitter({module}.GaussianLine())'
fitter = eval(fittername)
except ValueError:
energy = None
try:
module = 'mass.calibration.fluorescence_lines'
fittername = f'{module}.{line}Fitter()'
fitter = eval(fittername)
except AttributeError:
try:
module = 'mass.calibration.gaussian_lines'
fittername = f'{module}.{line}Fitter()'
fitter = eval(fittername)
except AttributeError:
raise ValueError(
"Cannot understand line=%s as an energy or a known calibration line." % line)
params, covar = fitter.fit(contents, bin_ctrs, plot=plot, **kwargs)
if plot:
mass.plot_as_stepped_hist(plt.gca(), contents, bin_ctrs)
if energy is not None:
scale = energy / params[1]
else:
scale = 1.0
LOG.info('Resolution: %5.2f +- %5.2f eV', params[0] * scale, np.sqrt(covar[0, 0]) * scale)
return params, covar, fitter
@property
def pkl_fname(self):
return ljh_util.mass_folder_from_ljh_fname(self.filename, filename="ch%d_calibration.pkl" % self.channum)
[docs]
@_add_group_loop()
def calibrate(self, attr, line_names, name_ext="", size_related_to_energy_resolution=10, # noqa: PLR0917
fit_range_ev=200, excl=(), plot_on_fail=False,
bin_size_ev=2.0, category={}, forceNew=False, maxacc=0.015, nextra=3,
param_adjust_closure=None, curvetype="gain", approximate=False,
diagnose=False):
calname = attr + name_ext
if not forceNew and calname in self.calibration:
return self.calibration[calname]
LOG.info("Calibrating chan %d to create %s", self.channum, calname)
cal = EnergyCalibration(curvetype=curvetype)
cal.set_use_approximation(approximate)
# It tries to calibrate detector using mass.calibration.algorithm.EnergyCalibrationAutocal.
auto_cal = EnergyCalibrationAutocal(cal,
getattr(self, attr)[self.cuts.good(**category)],
line_names)
auto_cal.guess_fit_params(smoothing_res_ph=size_related_to_energy_resolution,
fit_range_ev=fit_range_ev,
binsize_ev=bin_size_ev,
nextra=nextra, maxacc=maxacc)
if param_adjust_closure:
param_adjust_closure(self, auto_cal)
auto_cal.fit_lines()
if auto_cal.anyfailed:
LOG.warning(
"chan %d failed calibration because on of the fitter was a FailedFitter", self.channum)
raise Exception()
self.calibration[calname] = cal
hdf5_cal_group = self.hdf5_group.require_group('calibration')
cal.save_to_hdf5(hdf5_cal_group, calname)
if diagnose:
auto_cal.diagnose()
self.convert_to_energy(attr, attr + name_ext)
[docs]
@_add_group_loop()
def convert_to_energy(self, attr, calname=None):
if calname is None:
calname = attr
if calname not in self.calibration:
raise ValueError("For chan %d calibration %s does not exist" % (self.channum, calname))
cal = self.calibration[calname]
self.p_energy[:] = cal.ph2energy(getattr(self, attr))
self.last_used_calibration = cal
[docs]
def plot_traces(self, pulsenums, pulse_summary=True, axis=None, difference=False, # noqa: PLR0917
residual=False, valid_status=None, shift1=False,
subtract_baseline=False, fcut=None):
"""Plot some example pulses, given by sample number.
Args:
<pulsenums> A sequence of sample numbers, or a single one.
<pulse_summary> Whether to put text about the first few pulses on the plot
<axis> A plt axis to plot on.
<difference> Whether to show successive differences (that is, d(pulse)/dt) or the raw data
<residual> Whether to show the residual between data and opt filtered model, or just raw data.
<valid_status> If None, plot all pulses in <pulsenums>. If "valid" omit any from that set
that have been cut. If "cut", show only those that have been cut.
<shift1> Whether to take pulses with p_shift1==True and delay them by 1 sample
<subtract_baseline> Whether to subtract pretrigger mean prior to plotting the pulse
<fcut> If not none, apply a lowpass filter with this cutoff frequency prior to plotting
"""
if isinstance(pulsenums, int):
pulsenums = (pulsenums,)
pulsenums = np.asarray(pulsenums)
if pulse_summary:
try:
if len(self.p_pretrig_mean) == 0:
pulse_summary = False
except AttributeError:
pulse_summary = False
if valid_status not in {None, "valid", "cut"}:
raise ValueError("valid_status must be one of [None, 'valid', or 'cut']")
if residual and difference:
raise ValueError("Only one of residual and difference can be True.")
dt = (np.arange(self.nSamples) - self.nPresamples) * self.timebase * 1e3
cm = plt.cm.jet
MAX_TO_SUMMARIZE = 30
if axis is None:
plt.clf()
axis = plt.subplot(111)
axis.set_xlabel("Time after trigger (ms)")
axis.set_xlim([dt[0], dt[-1]])
axis.set_ylabel("Feedback (or mix) in [Volts/16384]")
if pulse_summary:
axis.text(.975, .97, r" -PreTrigger- Max Rise t Peak Pulse",
size='medium', family='monospace', transform=axis.transAxes, ha='right')
axis.text(.975, .95, r"Cut P# Mean rms PTDeriv ($\mu$s) value mean",
size='medium', family='monospace', transform=axis.transAxes, ha='right')
cuts_good = self.cuts.good()[pulsenums]
pulses_plotted = -1
for i, pn in enumerate(pulsenums):
if valid_status == 'cut' and cuts_good[i]:
continue
if valid_status == 'valid' and not cuts_good[i]:
continue
pulses_plotted += 1
data = self.read_trace(pn)
if difference:
data = data * 1.0 - np.roll(data, 1)
data[0] = 0
data += np.roll(data, 1) + np.roll(data, -1)
data[0] = 0
elif residual:
model = self.p_filt_value[pn] * self.average_pulse[:] / np.max(self.average_pulse)
# Careful! The following was `data -= model`, but that fails because data
# is now a read-only memmap.
# `data = data - model` rebinds data to a numpy vector, which is allowed.
data = data - model
if shift1 and self.p_shift1[pn]:
data = np.hstack([data[0], data[:-1]])
if fcut is not None:
data = mass.core.analysis_algorithms.filter_signal_lowpass(
data, 1. / self.timebase, fcut)
if subtract_baseline:
# Recalculate the pretrigger mean here, to avoid issues due to flux slipping when
# plotting umux data
data = data - np.mean(data[:self.nPresamples - self.pretrigger_ignore_samples])
cutchar, alpha, linestyle, linewidth = ' ', 1.0, '-', 1
# When plotting both cut and valid, mark the cut data with x and dashed lines
if valid_status is None and not cuts_good[i]:
cutchar, alpha, linestyle, linewidth = 'X', 1.0, '--', 1
color = cm(pulses_plotted * 1.0 / len(cuts_good))
axis.plot(dt, data, color=color,
linestyle=linestyle, alpha=alpha, linewidth=linewidth)
if pulse_summary and pulses_plotted < MAX_TO_SUMMARIZE and len(self.p_pretrig_mean) >= pn:
try:
summary = "%s%6d: %5.0f %7.2f %6.1f %5.0f %5.0f %7.1f" % (
cutchar, pn, self.p_pretrig_mean[pn], self.p_pretrig_rms[pn],
self.p_postpeak_deriv[pn], self.p_rise_time[pn] * 1e6,
self.p_peak_value[pn], self.p_pulse_average[pn])
except IndexError:
pulse_summary = False
continue
axis.text(.975, .93 - .025 * pulses_plotted, summary, color=color,
family='monospace', size='medium',
transform=axis.transAxes, ha='right')
[docs]
def read_trace(self, record_num):
"""Read (from cache or disk) and return the pulse numbered `record_num`."""
return self.data[record_num, :]
[docs]
@_add_group_loop()
def time_drift_correct(self, attr="p_filt_value_phc", sec_per_degree=2000,
pulses_per_degree=2000, max_degrees=20, forceNew=False,
category={}):
"""Drift correct over long times with an entropy-minimizing algorithm.
Here we correct as a low-ish-order Legendre polynomial in time.
attr: the attribute of self that is to be corrected. (The result
will be stored in self.p_filt_value_tdc[:]).
sec_per_degree: assign as many as one polynomial degree per this many seconds
pulses_per_degree: assign as many as one polynomial degree per this many pulses
max_degrees: never use more than this many degrees of Legendre polynomial.
forceNew: whether to do this step, if it appears already to have been done.
category: choices for categorical cuts
"""
if all(self.p_filt_value_tdc[:] == 0.0) or forceNew:
LOG.info("chan %d doing time_drift_correct", self.channum)
attr = getattr(self, attr)
g = self.cuts.good(**category)
pk = np.median(attr[g])
g = np.logical_and(g, np.abs(attr[:] / pk - 1) < 0.5)
w = max(pk / 3000., 1.0)
info = time_drift_correct(self.p_timestamp[g], attr[g], w,
limit=[0.5 * pk, 2 * pk])
tnorm = info["normalize"](self.p_timestamp[:])
corrected = attr[:] * (1 + info["model"](tnorm))
self.p_filt_value_tdc[:] = corrected
self.time_drift_correct_info = info
else:
LOG.info("chan %d skipping time_drift_correct", self.channum)
corrected, info = self.p_filt_value_tdc[:], {}
return corrected, info
[docs]
def compare_calibrations(self):
plt.figure()
for key in self.calibration:
cal = self.calibration[key]
try:
plt.plot(cal.peak_energies, cal.energy_resolutions, 'o', label=key)
except Exception:
pass
plt.legend()
plt.xlabel("energy (eV)")
plt.ylabel("energy resolution fwhm (eV)")
plt.grid("on")
plt.title("chan %d cal comparison" % self.channum)
[docs]
def count_rate(self, goodonly=False, bin_s=60):
g = self.cuts.good()
if not goodonly:
g[:] = True
if isinstance(bin_s, (int, float)):
bin_edge = np.arange(self.p_timestamp[g][0], self.p_timestamp[g][-1], bin_s)
else:
bin_edge = bin_s
counts, bin_edge = np.histogram(self.p_timestamp[g], bin_edge)
bin_centers = bin_edge[:-1] + 0.5 * (bin_edge[1] - bin_edge[0])
rate = counts / float(bin_edge[1] - bin_edge[0])
return bin_centers, rate
[docs]
def cut_summary(self):
boolean_fields = [name.decode() for (name, _) in self.tes_group.boolean_cut_desc if name]
for c1 in boolean_fields:
bad1 = self.cuts.bad(c1)
for c2 in boolean_fields:
if c1 is c2:
continue
bad2 = self.cuts.bad(c2)
n_and = np.logical_and(bad1, bad2).sum()
n_or = np.logical_or(bad1, bad2).sum()
print("%6d (and) %6d (or) pulses cut by [%s and/or %s]" %
(n_and, n_or, c1.upper(), c2.upper()))
print()
for cut_name in boolean_fields:
print("%6d pulses cut by %s" % (self.cuts.bad(cut_name).sum(), cut_name.upper()))
print("%6d pulses total" % self.nPulses)
[docs]
@_add_group_loop()
def auto_cuts(self, nsigma_pt_rms=8.0, nsigma_max_deriv=8.0, pretrig_rms_percentile=None, forceNew=False, clearCuts=True):
"""Compute and apply an appropriate set of automatically generated cuts.
The peak time and rise time come from the measured most-common peak time.
The pulse RMS and postpeak-derivative cuts are based on what's observed in
the (presumably) pulse-free noise file associated with this data file.
Args:
nsigma_pt_rms (float): How big an excursion is allowed in pretrig RMS
(default 8.0).
nsigma_max_deriv (float): How big an excursion is allowed in max
post-peak derivative (default 8.0).
pretrig_rms_percentile (float): Make upper limit for pretrig_rms at
least as large as this percentile of the data. I.e., if you
pass in 99, then the upper limit for pretrig_rms will exclude
no more than the 1 % largest values. This number is a
percentage, *not* a fraction. This should not be routinely used
- it is intended to help auto_cuts work even if there is a
problem during a data acquisition that causes large drifts in
noise properties.
forceNew (bool): Whether to perform auto-cuts even if cuts already exist.
clearCuts (bool): Whether to clear any existing cuts first (default
True).
The two excursion limits are given in units of equivalent sigma from the
noise file. "Equivalent" meaning that the noise file was assessed not for
RMS but for median absolute deviation, normalized to Gaussian distributions.
Returns:
The cut object that was applied.
"""
if self.saved_auto_cuts is None:
forceNew = True
if not forceNew:
LOG.info("channel %g skipping auto cuts because cuts exist", self.channum)
return
if clearCuts:
self.clear_cuts()
# Step 1: peak and rise times
if self.peak_samplenumber is None:
self._compute_peak_samplenumber()
MARGIN = 3 # step at least this many samples forward before cutting.
peak_time_ms = (MARGIN + self.peak_samplenumber - self.nPresamples) * self.timebase * 1000
# Step 2: analyze *noise* so we know how to cut on pretrig rms postpeak_deriv
pretrigger_rms = np.zeros(self.noise_records.nPulses)
for i in range(self.noise_records.nPulses):
data = self.noise_records.datafile.alldata[i]
pretrigger_rms[i] = data[:self.nPresamples].std()
max_deriv = mass.analysis_algorithms.compute_max_deriv(data, ignore_leading=0)
# Multiply MAD by 1.4826 to get into terms of sigma, if distribution were Gaussian.
md_med = np.median(max_deriv)
pt_med = np.median(pretrigger_rms)
md_madn = np.median(np.abs(max_deriv - md_med)) * 1.4826
pt_madn = np.median(np.abs(pretrigger_rms - pt_med)) * 1.4826
md_max = md_med + md_madn * nsigma_max_deriv
pt_max = max(0.0, pt_med + pt_madn * nsigma_pt_rms)
# Step 2.5: In the case of pretrig_rms, cut no more than pretrig_rms_percentile percent
# of the pulses on the upper end. This appears to be appropriate for
# SLEDGEHAMMER gamma devices, but may not be appropriate in cases where
# there are many pulses riding on tails, so by default we don't do
# this.
if pretrig_rms_percentile is not None:
pt_max = max(pt_max, np.percentile(self.p_pretrig_rms, pretrig_rms_percentile))
# Step 3: make the cuts
cuts = mass.core.controller.AnalysisControl(
peak_time_ms=(0, peak_time_ms * 1.25),
rise_time_ms=(0, peak_time_ms * 1.10),
pretrigger_rms=(None, pt_max),
postpeak_deriv=(None, md_max),
)
cuts._pretrig_rms_median = pt_med # store these so we can acess them when writing projectors to hdf5
cuts._pretrig_rms_sigma = pt_madn
self.apply_cuts(cuts, forceNew=True, clear=False)
self.__save_auto_cuts(cuts)
return cuts
def __save_auto_cuts(self, cuts):
"""Store the results of auto-cuts internally and in HDF5."""
self.saved_auto_cuts = cuts
g = self.hdf5_group["cuts"].require_group("auto_cuts")
for attrname in ("peak_time_ms", "rise_time_ms", "pretrigger_rms",
"postpeak_deriv"):
g.attrs[attrname] = cuts.cuts_prm[attrname][1]
def __load_auto_cuts(self):
"""Load the results of auto-cuts, if any, from HDF5."""
try:
g = self.hdf5_group["cuts/auto_cuts"]
except KeyError:
self.saved_auto_cuts = None
return
cuts = mass.AnalysisControl()
for attrname in ("peak_time_ms", "rise_time_ms", "pretrigger_rms",
"postpeak_deriv"):
cuts.cuts_prm[attrname] = (None, g.attrs[attrname])
self.saved_auto_cuts = cuts
[docs]
@_add_group_loop()
def smart_cuts(self, threshold=10.0, n_trainings=10000, forceNew=False):
"""Young! Why is there no doc string here??"""
# first check to see if this had already been done
if all(self.cuts.good("smart_cuts")) or forceNew:
mdata = np.vstack([self.p_pretrig_mean[:n_trainings], self.p_pretrig_rms[:n_trainings],
self.p_min_value[:n_trainings], self.p_postpeak_deriv[:n_trainings]])
mdata = mdata.transpose()
robust = sklearn.covariance.MinCovDet().fit(mdata)
# It excludes only extreme outliers.
mdata = np.vstack([self.p_pretrig_mean[...], self.p_pretrig_rms[...],
self.p_min_value[...], self.p_postpeak_deriv[...]])
mdata = mdata.transpose()
flag = robust.mahalanobis(mdata) > threshold**2
self.cuts.cut("smart_cuts", flag)
LOG.info("channel %g ran smart cuts, %g of %g pulses passed",
self.channum, self.cuts.good("smart_cuts").sum(), self.nPulses)
else:
LOG.info("channel %g skipping smart cuts because it was already done", self.channum)
[docs]
@_add_group_loop()
def flag_crosstalking_pulses(self, priorTime, postTime, combineCategories=True,
nearestNeighborsDistances=1, crosstalk_key='is_crosstalking',
forceNew=False):
''' Uses a list of nearest neighbor channels to flag pulses in current channel based
on arrival times of pulses in neighboring channels
Args:
priorTime (float): amount of time to check, in ms, before the pulse arrival time
postTime (float): amount of time to check, in ms, after the pulse arrival time
combineChannels (bool): whether to combine all neighboring channel pulses for flagging crosstalk
nearestNeighborDistances (int or int array): nearest neighbor distances to use for flagging,
i.e. 1 = 1st nearest neighbors, 2 = 2nd nearest neighbors, etc.
forceNew (bool): whether to re-compute the crosstalk cuts (default False)
'''
def crosstalk_flagging_loop(channelsToCompare):
''' Main algorithm for flagging crosstalking pulses in current victim channel,
given a list of perpetrator channels
Args:
channelsToCompare: (int array): list of perpetrator channel numbers to compare against
'''
# Initialize array that will include the pulses from all neighboring channels
compareChannelsPulsesList = np.array([])
# Iterate through all neighboring channels and put pulse timestamps into combined list
for compare_channum in channelsToCompare:
if compare_channum not in self.tes_group.channel:
continue
dsToCompare = self.tes_group.channel[compare_channum]
# Combine the pulses from all neighboring channels into a single array
compareChannelsPulsesList = np.append(compareChannelsPulsesList,
dsToCompare.p_subframecount[:] * dsToCompare.subframe_timebase)
# Create a histogram of the neighboring channel pulses using the bin edges from the channel you are flagging
hist, _bin_edges = np.histogram(compareChannelsPulsesList, bins=combinedEdges)
# Even corresponds to bins with a photon in channel 1 (crosstalk), odd are empty bins (no crosstalk)
badCountsHist = hist[::2]
# Even only histogram indices map directly to previously good flagged pulse indices for victim channel
crosstalking_pulses = badCountsHist > 0.0
return crosstalking_pulses
# Check to see if nearest neighbors list has already been set, otherwise skip
nn_channel_key = 'nearest_neighbors'
if nn_channel_key in self.hdf5_group.keys():
# Set up combined crosstalk flag array in hdf5 file
h5grp = self.hdf5_group.require_group('crosstalk_flags')
crosstalk_array_dtype = bool
self.__dict__['p_%s' % crosstalk_key] = h5grp.require_dataset(
crosstalk_key, shape=(self.nPulses,), dtype=crosstalk_array_dtype)
if not combineCategories:
for neighborCategory in self.hdf5_group[nn_channel_key]:
categoryField = str(crosstalk_key + '_' + neighborCategory)
self.__dict__['p_%s' % categoryField] = h5grp.require_dataset(
categoryField, shape=(self.nPulses,), dtype=crosstalk_array_dtype)
# Check to see if crosstalk list has already been written and skip, unless forceNew
if (not np.any(h5grp[crosstalk_key][:]) or forceNew):
# Convert from ms input to s used in rest of MASS
priorTime /= 1000.0
postTime /= 1000.0
# Create uneven histogram edges, with a specified amount of time before and after a photon event
pulseTimes = self.p_subframecount[:] * self.subframe_timebase
# Create start and stop edges around pulses corresponding to veto times
startEdges = pulseTimes - priorTime
stopEdges = pulseTimes + postTime
combinedEdges = np.sort(np.append(startEdges, stopEdges))
if combineCategories:
LOG.info('Checking crosstalk between channel %d and combined neighbors...', self.channum)
# Combine all nearest neighbor pairs for this channel into a single list
combinedNearestNeighbors = np.array([])
# Loop through nearest neighbor categories
for neighborCategory in self.hdf5_group[nn_channel_key]:
subgroupName = nn_channel_key + '/' + neighborCategory
subgroupNeighbors = self.hdf5_group[subgroupName + '/neighbors_list'].value
# Remove duplicates, sort
selectNeighbors = subgroupNeighbors[:, 0][np.isin(
subgroupNeighbors[:, 2], nearestNeighborsDistances)]
combinedNearestNeighbors = np.unique(
np.append(combinedNearestNeighbors, selectNeighbors).astype(int))
if np.sum(np.isin(self.tes_group.channel.keys(), selectNeighbors)) > 0:
h5grp[crosstalk_key][:] = crosstalk_flagging_loop(combinedNearestNeighbors)
else:
msg = "Channel %d skipping crosstalk cuts: no nearest neighbors matching criteria" % self.channum
LOG.info(msg)
else:
for neighborCategory in self.hdf5_group[nn_channel_key]:
categoryField = str(crosstalk_key + '_' + neighborCategory)
subgroupName = nn_channel_key + '/' + neighborCategory
subgroupNeighbors = self.hdf5_group[subgroupName + '/neighbors_list'].value
selectNeighbors = subgroupNeighbors[:, 0][np.isin(
subgroupNeighbors[:, 2], nearestNeighborsDistances)]
if np.sum(np.isin(self.tes_group.channel.keys(), selectNeighbors)) > 0:
LOG.info('Checking crosstalk between channel %d and %s neighbors...' % (
self.channum, neighborCategory))
h5grp[categoryField][:] = crosstalk_flagging_loop(selectNeighbors)
h5grp[crosstalk_key][:] = np.logical_or(
h5grp[crosstalk_key], h5grp[categoryField])
else:
msg = "channel %d skipping %s crosstalk cuts because" % (
self.channum, neighborCategory)
msg = msg * " no nearest neighbors matching criteria in category"
LOG.info(msg)
else:
LOG.info("channel %d skipping crosstalk cuts because it was already done", self.channum)
else:
LOG.info("channel %d skipping crosstalk cuts because nearest neighbors not set", self.channum)
[docs]
@_add_group_loop()
def set_nearest_neighbors_list(self, mapFilename, nearestNeighborCategory='physical',
distanceType='cartesian', forceNew=False):
''' Finds the nearest neighbors in a given space for all channels in a data set
Args:
mapFilename (str): Location of map file in the following format
Column 0 - list of channel numbers.
Remaining column(s) - coordinates that define a particular column in a given space.
For example, can be the row and column number in a physical space
or the frequency order number in a frequency space (umux readout).
nearestNeighborCategory (str): name used to categorize the type of nearest neighbor.
This will be the name given to the subgroup of the hdf5 file under the nearest_neighbor group.
This will also be a key for dictionary nearest_neighbors_dictionary
distanceType (str): Type of distance to measure between nearest neighbors, i.e. cartesian
forceNew (bool): whether to re-compute nearest neighbors list if it exists (default False)
'''
def calculate_cartesian_squared_distances():
'''
Stores the cartesian distance from the current channel to all other channels
on the array within nearest_neighbors_dictionary.
'''
# Create a dictionary to link all squared distances in the given space
# to n for the n-th nearest neighbor.
# Find the maximum distance in each dimension
maxDistances = np.amax(positionValues, axis=0) - np.amin(positionValues, axis=0)
# Create a list of possible squared distances for each dimension
singleDimensionalSquaredDistances = []
for iDim in range(nDims):
tempSquaredDistances = []
for iLength in range(maxDistances[iDim] + 1):
tempSquaredDistances.append(iLength**2)
singleDimensionalSquaredDistances.append(tempSquaredDistances)
# Use np.meshgrid to make an array of all possible combinations of the arrays in each dimension
possibleSquaredCombinations = np.meshgrid(
*[iArray for iArray in singleDimensionalSquaredDistances])
# Sum the squared distances along the corresponding axis to get squared distance at each point
allSquaredDistances = possibleSquaredCombinations[0]
for iDim in range(1, nDims):
allSquaredDistances += possibleSquaredCombinations[iDim]
# Make a sorted list of the unique squared values
uniqueSquaredDistances = np.unique(allSquaredDistances)
# Create a dictionary with squared distance keys and the index of the squared distance array
# as the n value for n-th nearest neighbor
squaredDistanceDictionary = {}
for nthNeighbor, nthNeighborSquaredDistance in enumerate(uniqueSquaredDistances):
squaredDistanceDictionary[nthNeighborSquaredDistance] = nthNeighbor
# Loop through channel map and save an hdf5 dataset including the
# channum, cartesian squared distance, and nearest neighbor n
for channelIndex, distant_channum in enumerate(channelNumbers):
distantPos = np.array(positionValues[distant_channum == channelNumbers][0], ndmin=1)
squaredDistance = 0.0
for iDim in range(nDims):
squaredDistance += (channelPos[iDim] - distantPos[iDim])**2.0
h5grp[nearestNeighborCategory]['neighbors_list'][channelIndex, 0] = distant_channum
h5grp[nearestNeighborCategory]['neighbors_list'][channelIndex, 1] = squaredDistance
h5grp[nearestNeighborCategory]['neighbors_list'][channelIndex, 2] = \
squaredDistanceDictionary[squaredDistance]
def calculate_manhattan_distances():
raise Exception("not implemented")
def process_matching_channel(positionToCompare):
'''
Returns the channel number of a neighboring position after checking for goodness
Args:
positionToCompare (int array) - position to check for nearest neighbor match
'''
# Find the channel number corresponding to the compare position
channelToCompare = channelNumbers[np.all(positionToCompare == positionValues, axis=1)]
# If the new position exists in map file and the channel to compare to is good, return the channel number
if (positionToCompare in positionValues) & (channelToCompare in self.tes_group.good_channels):
return channelToCompare
# Return an empty array if not actually a good nearest neighbor
else:
return np.array([], dtype=int)
# Load channel numbers and positions from map file, define number of dimensions
mapData = np.loadtxt(mapFilename, dtype=int)
channelNumbers = np.array(mapData[:, 0], dtype=int)
positionValues = mapData[:, 1:]
nDims = positionValues.shape[1]
# Set up hdf5 group and repopulate arrays, if already calculated earlier
h5grp = self.hdf5_group.require_group('nearest_neighbors')
h5grp.require_group(nearestNeighborCategory)
# Victim channel position dataset
self.__dict__['position_%s' % nearestNeighborCategory] = \
h5grp[nearestNeighborCategory].require_dataset(
'position', shape=(nDims,), dtype=np.int64)
# Perpetrator channels dataset
hnnc = h5grp[nearestNeighborCategory]
self.__dict__['neighbors_list_%s' % nearestNeighborCategory] = \
hnnc.require_dataset('neighbors_list', shape=((len(channelNumbers), 3)),
dtype=np.int64)
# Check to see if if data set already exists or if forceNew is set to True
if not np.any(h5grp[nearestNeighborCategory]['neighbors_list'][:]) or forceNew:
# Extract channel number and position of current channel, store in hdf5 file
channum = self.channum
channelPos = np.array(positionValues[channum == channelNumbers][0], ndmin=1)
h5grp[nearestNeighborCategory]['position'][:] = channelPos
# Calculate distances and store in hdf5 file
if distanceType == 'cartesian':
calculate_cartesian_squared_distances()
elif distanceType == 'manhattan':
calculate_manhattan_distances()
else:
raise Exception('Distance type ' + distanceType + ' not recognized.')
[docs]
def hist(self, bin_edges, attr="p_energy", t0=0, tlast=1e20, category={}, g_func=None):
"""return a tuple of (bin_centers, counts) of p_energy of good pulses (or another attribute).
Automatically filtes out nan values
Parameters
----------
bin_edges : _type_
edges of bins unsed for histogram
attr : str, optional
which attribute to histogram "p_energy" or "p_filt_value", by default "p_energy"
t0 : int, optional
cuts all pulses before this time before fitting, by default 0
tlast : _type_, optional
cuts all pulses after this time before fitting, by default 1e20
category : dict, optional
_description_, by default {}
g_func : _type_, optional
a function a function taking a MicrocalDataSet and returnning a vector like ds.good() would return
This vector is anded with the vector calculated by the histogrammer, by default None
Returns
-------
ndarray, ndarray
Histogram bin *centers*, counts
"""
bin_edges = np.array(bin_edges)
bin_centers = 0.5 * (bin_edges[1:] + bin_edges[:-1])
vals = getattr(self, attr)[:]
# sanitize the data bit
tg = np.logical_and(self.p_timestamp[:] > t0, self.p_timestamp[:] < tlast)
g = np.logical_and(tg, self.good(**category))
g = np.logical_and(g, ~np.isnan(vals))
if g_func is not None:
g = g & g_func(self)
counts, _ = np.histogram(vals[g], bin_edges)
return bin_centers, counts
[docs]
def plot_hist(self, bin_edges, attr="p_energy", axis=None, label_lines=[], category={}, g_func=None):
"""plot a coadded histogram from all good datasets and all good pulses
bin_edges -- edges of bins unsed for histogram
attr -- which attribute to histogram "p_energy" or "p_filt_value"
axis -- if None, then create a new figure, otherwise plot onto this axis
annotate_lines -- enter lines names in STANDARD_FEATURES to add to the plot, calls annotate_lines
g_func -- a function a function taking a MicrocalDataSet and returnning a vector like ds.good() would return
This vector is anded with the vector calculated by the histogrammer """
if axis is None:
plt.figure()
axis = plt.gca()
x, y = self.hist(bin_edges, attr, category=category, g_func=g_func)
axis.plot(x, y, drawstyle="steps-mid")
axis.set_xlabel(attr)
de = bin_edges[1] - bin_edges[0]
axis.set_ylabel(f"counts per {de:0.1f} unit bin")
axis.set_title(self.shortname)
mass.core.utilities.annotate_lines(axis, label_lines)
[docs]
def linefit(self, line_name="MnKAlpha", t0=0, tlast=1e20, axis=None, dlo=50, dhi=50, # noqa: PLR0917
binsize=1, bin_edges=None, attr="p_energy", label="full", plot=True,
guess_params=None, ph_units="eV", category={}, g_func=None,
has_tails=False):
"""Do a fit to `line_name` and return the fitter. You can get the params results with fitter.last_fit_params_dict or any other
way you like.
line_name -- A string like "MnKAlpha" will get "MnKAlphaFitter", your you can pass in a fitter like a mass.GaussianFitter().
t0 and tlast -- cuts all pulses outside this timerange before fitting
axis -- if axis is None and plot==True, will create a new figure, otherwise plot onto this axis
dlo and dhi and binsize -- by default it tries to fit with bin edges given by np.arange(fitter.spect.nominal_peak_energy-dlo,
fitter.spect.nominal_peak_energy+dhi, binsize)
bin_edges -- pass the bin_edges you want as a numpy array
attr -- default is "p_energy", you could pick "p_filt_value" or others. be sure to pass in bin_edges as well because
the default calculation will probably fail for anything other than p_energy
label -- passed to fitter.plot
plot -- passed to fitter.fit, determine if plot happens
guess_params -- passed to fitter.fit, fitter.fit will guess the params on its own if this is None
ph_units -- passed to fitter.fit, used in plot label
category -- pass {"side":"A"} or similar to use categorical cuts
g_func -- a function a function taking a MicrocalDataSet and returnning a vector like ds.good() would return
holdvals -- a dictionary mapping keys from fitter.params_meaning to values... eg {"background":0, "dP_dE":1}
This vector is anded with the vector calculated by the histogrammer
"""
assert "energy" in attr
model = mass.getmodel(line_name, has_tails=has_tails)
nominal_peak_energy = model.spect.nominal_peak_energy
if bin_edges is None:
bin_edges = np.arange(nominal_peak_energy - dlo, nominal_peak_energy + dhi, binsize)
bin_centers, counts = self.hist(bin_edges, attr, t0, tlast, category, g_func)
params = model.guess(counts, bin_centers=bin_centers, dph_de=1)
params["dph_de"].set(vary=False)
result = model.fit(counts, params=params, bin_centers=bin_centers)
if plot:
result.plotm(ax=axis, xlabel=f"{attr} ({ph_units})",
ylabel=f"counts per {binsize:0.2f} ({ph_units}) bin",
title=f"{self.shortname}\n{model.spect}")
return result
[docs]
def time_drift_correct(time, uncorrected, w, sec_per_degree=2000, # noqa: PLR0914
pulses_per_degree=2000, max_degrees=20, ndeg=None, limit=None):
"""Compute a time-based drift correction that minimizes the spectral entropy.
Args:
time: The "time-axis". Correction will be a low-order polynomial in this.
uncorrected: A filtered pulse height vector. Same length as indicator.
Assumed to have some gain that is linearly related to indicator.
w: the kernel width for the Laplace KDE density estimator
sec_per_degree: assign as many as one polynomial degree per this many seconds
pulses_per_degree: assign as many as one polynomial degree per this many pulses
max_degrees: never use more than this many degrees of Legendre polynomial.
n_deg: If not None, use this many degrees, regardless of the values of
sec_per_degree, pulses_per_degree, and max_degress. In this case, never downsample.
limit: The [lower,upper] limit of uncorrected values over which entropy is
computed (default None).
The entropy will be computed on corrected values only in the range
[limit[0], limit[1]], so limit should be set to a characteristic large value
of uncorrected. If limit is None (the default), then it will be computed as
25%% larger than the 99%%ile point of uncorrected.
Possible improvements in the future:
* Move this routine to Cython.
* Allow the parameters to be function arguments with defaults: photons per
degree of freedom, seconds per degree of freedom, and max degrees of freedom.
* Figure out how to span the available time with more than one set of legendre
polynomials, so that we can have more than 20 d.o.f. eventually, for long runs.
"""
if limit is None:
pct99 = np.percentile(uncorrected, 99)
limit = [0, 1.25 * pct99]
use = np.logical_and(uncorrected > limit[0], uncorrected < limit[1])
time = time[use]
uncorrected = uncorrected[use]
tmin, tmax = np.min(time), np.max(time)
def normalize(t):
return (t - tmin) / (tmax - tmin) * 2 - 1
info = {
"tmin": tmin,
"tmax": tmax,
"normalize": normalize,
}
dtime = tmax - tmin
N = len(time)
if ndeg is None:
ndeg = int(np.minimum(dtime / sec_per_degree, N / pulses_per_degree))
ndeg = min(ndeg, max_degrees)
ndeg = max(ndeg, 1)
phot_per_degree = N / float(ndeg)
if phot_per_degree >= 2 * pulses_per_degree:
downsample = int(phot_per_degree / pulses_per_degree)
time = time[::downsample]
uncorrected = uncorrected[::downsample]
N = len(time)
else:
downsample = 1
else:
downsample = 1
LOG.info("Using %2d degrees for %6d photons (after %d downsample)", ndeg, N, downsample)
LOG.info("That's %6.1f photons per degree, and %6.1f seconds per degree.", N / float(ndeg), dtime / ndeg)
def model1(pi, i, param, basis):
pcopy = np.array(param)
pcopy[i] = pi
return 1 + np.dot(basis.T, pcopy)
def cost1(pi, i, param, y, w, basis):
return laplace_entropy(y * model1(pi, i, param, basis), w=w)
param = np.zeros(ndeg, dtype=float)
xnorm = np.asarray(normalize(time), dtype=float)
basis = np.vstack([sp.special.legendre(i + 1)(xnorm) for i in range(ndeg)])
fc = 0
model = np.poly1d([0])
info["coefficients"] = np.zeros(ndeg, dtype=float)
for i in range(ndeg):
result, _fval, _iter, funcalls = sp.optimize.brent(
cost1, (i, param, uncorrected, w, basis), [-.001, .001], tol=1e-5, full_output=True)
param[i] = result
fc += funcalls
model += sp.special.legendre(i + 1) * result
info["coefficients"][i] = result
info["funccalls"] = fc
xk = np.linspace(-1, 1, 1 + 2 * ndeg)
model2 = mass.mathstat.interpolate.CubicSpline(xk, model(xk))
H1 = laplace_entropy(uncorrected, w=w)
H2 = laplace_entropy(uncorrected * (1 + model(xnorm)), w=w)
H3 = laplace_entropy(uncorrected * (1 + model2(xnorm)), w=w)
if H2 <= 0 or H2 - H1 > 0.0:
model = np.poly1d([0])
elif H3 <= 0 or H3 - H2 > .00001:
model2 = model
info["entropies"] = (H1, H2, H3)
info["model"] = model
return info