"""
channel_group.py
Part of the Microcalorimeter Analysis Software System (MASS).
This module defines classes that handle one or more TES data streams together.
"""
import os
import logging
import re
from collections.abc import Iterable
from functools import reduce
from deprecation import deprecated
import numpy as np
import matplotlib.pylab as plt
import h5py
import mass.core.analysis_algorithms
import mass.calibration.energy_calibration
from mass.calibration.energy_calibration import EnergyCalibration
from mass.core.channel import MicrocalDataSet, PulseRecords, NoiseRecords, GroupLooper
from mass.core.cut import CutFieldMixin
from mass.core.utilities import InlineUpdater, show_progress, plot_multipage
from mass.core.ljh_util import remove_unpaired_channel_files, filename_glob_expand, ljh_get_extern_trig_fnames
from ..common import isstr
LOG = logging.getLogger("mass")
def _generate_hdf5_filename(rawname):
"""Generate the appropriate HDF5 filename based on a file's LJH name.
Takes /path/to/data_chan33.ljh --> /path/to/data_mass.hdf5
"""
fparts = re.split(r"_chan\d+", rawname)
prefix_path = fparts[0]
if rawname.endswith("noi"):
prefix_path += '_noise'
return prefix_path + "_mass.hdf5"
[docs]
def RestoreTESGroup(hdf5filename, hdf5noisename=None):
"""Generate a TESGroup object from a data summary HDF5 file.
Args:
hdf5filename (string): the data summary file
hdf5noisename (string): the noise summary file; this can often be inferred from
the noise raw filenames, which are stored in the pulse HDF5 file (assuming you
aren't doing something weird). (default None)
"""
pulsefiles = []
channum = []
noisefiles = []
generated_noise_hdf5_name = None
with h5py.File(hdf5filename, "r") as h5file:
for name, group in h5file.items():
if not name.startswith("chan"):
continue
pulsefiles.append(group.attrs['filename'])
channum.append(group.attrs['channum'])
if hdf5noisename is None:
fname = group.attrs['noise_filename']
if generated_noise_hdf5_name is None:
generated_noise_hdf5_name = _generate_hdf5_filename(fname)
elif generated_noise_hdf5_name != _generate_hdf5_filename(fname):
msg = f"""The implied HDF5 noise files names are not the same for all channels.
The first channel implies '{generated_noise_hdf5_name}'
and another implies '{_generate_hdf5_filename(fname)}'.
Instead, you should run RestoreTESGroup with an explicit hdf5noisename argument."""
raise RuntimeError(msg)
noisefiles.append(fname)
h5file.close()
if hdf5noisename is not None:
with h5py.File(hdf5noisename, "r") as h5file:
for ch in channum:
group = h5file[f'chan{ch}']
noisefiles.append(group.attrs['filename'])
h5file.close()
else:
hdf5noisename = generated_noise_hdf5_name
return TESGroup(pulsefiles, noisefiles, hdf5_filename=hdf5filename,
hdf5_noisefilename=hdf5noisename)
[docs]
class TESGroup(CutFieldMixin, GroupLooper): # noqa: PLR0904, PLR0917
"""The interface for a group of one or more microcalorimeters."""
def __init__(self, filenames, noise_filenames=None, noise_only=False, # noqa: PLR0917
noise_is_continuous=True,
hdf5_filename=None, hdf5_noisefilename=None,
never_use=None, use_only=None, max_chans=None,
experimentStateFile=None, excludeStates="auto", overwrite_hdf5_file=False):
"""Set up a group of related data sets by their filenames.
Args:
filenames: either a sequence of pulse filenames, or a shell "glob pattern"
that can be expanded to a list of filenames
noise_filenames: either a sequence of noise filenames, or a shell
"glob pattern" that can be expanded to a list of filenames, or
None (if there are to be no noise data). (default None)
noise_only (bool): if True, then treat this as a pulse-free analysis
and take filenames to be a list of noise files (default False)
noise_is_continuous (bool): whether to treat sequential noise records
as continuous in time (default True)
hdf5_filename: if not None, the filename to use for backing the
analyzed data in HDF5 (default None). If None, choose a sensible
filename based on the input data filenames.
hdf5_noisefilename: if not None, the filename to use for backing the
analyzed noise data in HDF5 (default None). If None, choose a sensible
filename based on the input data filenames.
never_use: if not None, a sequence of channel numbers to ignore
(default None).
use_only: if not None, a sequence of channel numbers to use, i.e.
ignore all channels not on this list (default None).
max_chans: open at most this many ljh files
"""
if noise_filenames is not None and len(noise_filenames) == 0:
noise_filenames = None
# In the noise_only case, you can put the noise file names either in the
# usual (pulse) filenames argument or in the noise_filenames argument.
self.noise_only = noise_only
if noise_only and noise_filenames is None:
filenames, noise_filenames = (), filenames
# Handle the case that either filename list is a glob pattern (e.g.,
# "files_chan*.ljh"). Note that this will return a list, never a string,
# even if there is only one result from the pattern matching.
pattern = filenames
filenames = filename_glob_expand(filenames)
if (filenames is None or len(filenames) == 0) and (not noise_only):
raise ValueError(f"Pulse filename pattern {pattern} expanded to no files")
if noise_filenames is not None:
pattern = noise_filenames
noise_filenames = filename_glob_expand(noise_filenames)
if noise_filenames is None or len(noise_filenames) == 0:
raise ValueError(f"Noise filename pattern {pattern} expanded to no files")
# If using a glob pattern especially, we have to be careful to eliminate files that are
# missing a partner, either noise without pulse or pulse without noise.
remove_unpaired_channel_files(filenames, noise_filenames,
never_use=never_use, use_only=use_only)
# enforce max_chans
if max_chans is not None:
n = min(max_chans, len(filenames))
filenames = filenames[:n]
noise_filenames = noise_filenames[:n]
# Figure out where the 2 HDF5 files are to live, if the default argument
# was given for their paths.
if hdf5_filename is None and not noise_only:
hdf5_filename = _generate_hdf5_filename(filenames[0])
if hdf5_noisefilename is None and noise_filenames is not None:
hdf5_noisefilename = _generate_hdf5_filename(noise_filenames[0])
# Handle the pulse files.
if noise_only:
self.filenames = ()
self.hdf5_file = None
else:
# Convert a single filename to a tuple of size one
if isstr(filenames):
filenames = (filenames,)
self.filenames = tuple(filenames)
self.n_channels = len(self.filenames)
if overwrite_hdf5_file:
self.hdf5_file = h5py.File(hdf5_filename, 'w')
else:
self.hdf5_file = h5py.File(hdf5_filename, 'a')
# Cut parameter description need to initialized.
self.cut_field_desc_init()
# Same for noise filenames
self.noise_filenames = None
self.hdf5_noisefile = None
if noise_filenames is not None:
if isstr(noise_filenames):
noise_filenames = (noise_filenames,)
self.noise_filenames = noise_filenames
try:
intent = "w" if overwrite_hdf5_file else "a"
self.hdf5_noisefile = h5py.File(hdf5_noisefilename, intent)
except OSError:
# if the noise file is corrupted, we will get an OSError
# open with write intent, which will clobber the existing file
self.hdf5_noisefile = h5py.File(hdf5_noisefilename, 'w')
if noise_only:
self.n_channels = len(self.noise_filenames)
# Load up experiment state file
self.experimentStateFile = None
if not noise_only:
if experimentStateFile is None:
try:
self.experimentStateFile = mass.off.ExperimentStateFile(
datasetFilename=self.filenames[0], excludeStates=excludeStates)
except OSError as e:
LOG.debug('Skipping loading of experiment state file because %s', e)
else:
self.experimentStateFile = mass.off.channels.ExperimentStateFile(
experimentStateFile, excludeStates=excludeStates)
if self.experimentStateFile is not None:
valid_state_labels = self.experimentStateFile.labels
self.register_categorical_cut_field("state", valid_state_labels)
# Set up other aspects of the object
self.nhits = None
self.n_segments = 0
self.nPulses = 0
self.nPresamples = 0
self.nSamples = 0
self.timebase = 0.0
self._allowed_pnum_ranges = None
self.pulses_per_seg = None
self._bad_channums = {}
self._external_trigger_subframe_count = None
if self.noise_only:
self._setup_per_channel_objects_noiseonly(noise_is_continuous)
else:
self._setup_per_channel_objects(noise_is_continuous)
self.updater = InlineUpdater
[docs]
def toOffStyle(self):
channels = [ds.toOffStyle() for ds in self]
g = mass.off.channels.ChannelGroupFromNpArrays(channels,
shortname=self.shortname,
experimentStateFile=self.experimentStateFile)
return g
def _setup_per_channel_objects(self, noise_is_continuous=True):
pulse_list = []
noise_list = []
dset_list = []
for i, fname in enumerate(self.filenames):
# Create the pulse records file interface and the overall MicrocalDataSet
pulse = PulseRecords(fname)
LOG.info("%s %i", fname, pulse.nPulses)
if pulse.nPulses == 0:
LOG.info("TESGroup is skipping a file that has zero pulses: %s", fname)
continue # don't load files with zero pulses
hdf5_group = self.hdf5_file.require_group(f"chan{pulse.channum}")
hdf5_group.attrs['filename'] = fname
dset = MicrocalDataSet(pulse.__dict__, tes_group=self, hdf5_group=hdf5_group)
if 'calibration' in hdf5_group:
hdf5_cal_grp = hdf5_group['calibration']
for cal_name in hdf5_cal_grp:
dset.calibration[cal_name] = EnergyCalibration.load_from_hdf5(
hdf5_cal_grp, cal_name)
if 'why_bad' in hdf5_group.attrs:
self._bad_channums[dset.channum] = [comment.decode() for comment in
hdf5_group.attrs['why_bad']]
# If appropriate, add to the MicrocalDataSet the NoiseRecords file interface
if self.noise_filenames is not None:
nf = self.noise_filenames[i]
hdf5_group.attrs['noise_filename'] = nf
try:
hdf5_noisegroup = self.hdf5_noisefile.require_group(f"chan{pulse.channum}")
hdf5_noisegroup.attrs['filename'] = nf
except Exception:
hdf5_noisegroup = None
noise = NoiseRecords(nf, records_are_continuous=noise_is_continuous)
noise.set_hdf5_group(hdf5_noisegroup)
if pulse.channum != noise.channum:
LOG.warning(
"WARNING: TESGroup did not add data: channums don't match %s, %s", fname, nf)
continue
dset.noise_records = noise
assert (dset.channum == dset.noise_records.channum)
noise_list.append(noise)
pulse_list.append(pulse)
dset_list.append(dset)
if self.n_segments == 0:
for attr in ("nSamples", "nPresamples", "timebase"):
self.__dict__[attr] = pulse.__dict__[attr]
else:
for attr in ("nSamples", "nPresamples", "timebase"):
if self.__dict__[attr] != pulse.__dict__[attr]:
msg = f"Unequal values of '{attr}': {float(self.__dict__[attr])} != {float(pulse.__dict__[attr])}"
raise ValueError(msg)
self.n_segments = max(self.n_segments, pulse.n_segments)
self.nPulses = max(self.nPulses, pulse.nPulses)
# Store relevant facts as attributes to the HDF5 file
if self.hdf5_file is not None:
self.hdf5_file.attrs.update({'npulses': self.nPulses,
'nsamples': self.nSamples,
'npresamples': self.nPresamples,
'frametime': self.timebase})
self.channels = tuple(pulse_list)
self.noise_channels = tuple(noise_list)
self.datasets = tuple(dset_list)
for index, (pr, ds) in enumerate(zip(self.channels, self.datasets)):
ds.pulse_records = pr
ds.data = pr.datafile.alldata
ds.times = pr.datafile.datatimes_float
ds.subframecount = pr.datafile.subframecount
ds.index = index
if len(pulse_list) > 0:
self.pulses_per_seg = pulse_list[0].pulses_per_seg
def _setup_per_channel_objects_noiseonly(self, noise_is_continuous=True):
noise_list = []
dset_list = []
for fname in self.noise_filenames:
noise = NoiseRecords(fname, records_are_continuous=noise_is_continuous)
hdf5_group = self.hdf5_noisefile.require_group(f"chan{noise.channum}")
hdf5_group.attrs['filename'] = fname
noise.set_hdf5_group(hdf5_group)
dset = MicrocalDataSet(noise.__dict__, hdf5_group=hdf5_group)
dset.noise_records = noise
noise_list.append(noise)
dset_list.append(dset)
if self.n_segments == 0:
for attr in ("nSamples", "nPresamples", "timebase"):
self.__dict__[attr] = noise.__dict__[attr]
else:
for attr in ("nSamples", "nPresamples", "timebase"):
if self.__dict__[attr] != noise.__dict__[attr]:
msg = f"Unequal values of '{attr}': {float(self.__dict__[attr])} != {float(noise.__dict__[attr])}"
raise ValueError(msg)
self.n_segments = max(self.n_segments, noise.n_segments)
self.nPulses = max(self.nPulses, noise.nPulses)
# Store relevant facts as attributes to the HDF5 file
if self.hdf5_file is not None:
self.hdf5_file.attrs.update({'npulses': self.nPulses,
'nsamples': self.nSamples,
'npresamples': self.nPresamples,
'frametime': self.timebase})
self.channels = ()
self.noise_channels = tuple(noise_list)
self.datasets = tuple(dset_list)
for index, (pr, ds) in enumerate(zip(self.channels, self.datasets)):
ds.pulse_records = pr
ds.index = index
def __repr__(self):
clname = self.__class__.__name__
if self.noise_filenames is None:
pname = os.path.dirname(self.filenames[0])
return f"{clname}(pulse={pname}, noise=None)"
nname = os.path.dirname(self.noise_filenames[0])
if self.noise_only:
return f"{clname}(noise={nname}, noise_only=True)"
pname = os.path.dirname(self.filenames[0])
return f"{clname}(pulse={pname}, noise={nname})"
def __iter__(self):
"""Iterator over the self.datasets in channel number order"""
yield from self.iter_channels()
[docs]
def iter_channels(self, include_badchan=False):
"""Iterator over the self.datasets in channel number order
Args:
include_badchan (bool): whether to include officially bad channels
in the result (default False).
"""
for ds in self.datasets:
if not include_badchan:
if ds.channum in self._bad_channums:
continue
yield ds
[docs]
def iter_channel_numbers(self, include_badchan=False):
"""Iterator over the channel numbers in channel number order
Args:
include_badchan (bool): whether to include officially bad channels
in the result (default False).
"""
for ds in self.iter_channels(include_badchan=include_badchan):
yield ds.channum
[docs]
def set_chan_good(self, *args):
"""Set one or more channels to be good.
(No effect for channels already listed as good.)
Args:
*args Arguments to this function are integers or containers of integers. Each
integer is removed from the bad-channels list.
"""
added_to_list = set.union(*[set(x) if isinstance(x, Iterable) else {x} for x in args])
for channum in added_to_list:
if channum in self._bad_channums:
comment = self._bad_channums.pop(channum)
del self.hdf5_file[f"chan{channum}"].attrs['why_bad']
LOG.info("chan %d set good, had previously been set bad for %s", channum, str(comment))
else:
LOG.info("chan %d not set good because it was not set bad", channum)
[docs]
def set_chan_bad(self, *args):
"""Set one or more channels to be bad.
(No effect for channels already listed as bad.)
Args:
*args Arguments to this function are integers or containers of integers. Each
integer is added to the bad-channels list.
Examples:
data.set_chan_bad(1, "too few good pulses")
data.set_chan_bad(103, [1, 3, 5], "detector unstable")
"""
added_to_list = set.union(*[set(x) if isinstance(x, Iterable)
else {x} for x in args if not isstr(x)])
comment = reduce(lambda x, y: y, [x for x in args if isstr(x)], '')
for channum in added_to_list:
new_comment = self._bad_channums.get(channum, []) + [comment]
self._bad_channums[channum] = new_comment
LOG.warning('WARNING: Chan %s flagged bad because %s', channum, comment)
self.hdf5_file[f"chan{channum}"].attrs['why_bad'] = \
np.asarray(new_comment, dtype=np.bytes_)
[docs]
def set_all_chan_good(self):
"""Set all channels to be good."""
# Must do it this way (copying the list) so that you aren't iterating over a list
# while also changing that list
bad_chan_list = [ch for ch in self._bad_channums]
for channum in bad_chan_list:
self.set_chan_good(channum)
[docs]
def n_good_channels(self):
return self.n_channels - len(self._bad_channums.keys())
@property
def timestamp_offset(self):
ts = set([ds.timestamp_offset for ds in self if ds.channum not in self._bad_channums])
if len(ts) == 1:
return ts.pop()
return None
@property
def channel(self):
return {ds.channum: ds for ds in self.datasets}
@property
def good_channels(self):
return [ds.channum for ds in self if ds.channum not in self._bad_channums]
@property
def num_good_channels(self):
return len(self.good_channels)
@property
def first_good_dataset(self):
if self.num_good_channels <= 0:
raise IndexError("WARNING: All datasets flagged bad, most things won't work.")
return self.channel[self.good_channels[0]]
@property
def why_chan_bad(self):
return self._bad_channums.copy()
[docs]
@deprecated(deprecated_in="0.7.9", details="Use compute_noise(), which is equivalent but better named")
def compute_noise_spectra(self, max_excursion=1000, forceNew=False):
"""Replaced by the equivalent compute_noise(...)"""
# This is needed because the @_add_group_loop decorator does not preserve warnings
# and hand them up.
for ds in self:
ds.compute_noise(max_excursion=max_excursion, forceNew=forceNew)
[docs]
def sample2segnum(self, samplenum):
"""Returns the segment number of sample number <samplenum>."""
if samplenum >= self.nPulses:
samplenum = self.nPulses - 1
return samplenum // self.pulses_per_seg
[docs]
def segnum2sample_range(self, segnum):
"""Return the (first,end) sample numbers of the segment numbered <segnum>.
Note that <end> is 1 beyond the last sample number in that segment."""
return segnum * self.pulses_per_seg, (segnum + 1) * self.pulses_per_seg
@property
def shortname(self):
"""Return a string containing part of the filename and the number of good channels"""
ngoodchan = len([ds for ds in self])
return mass.ljh_util.ljh_basename_channum(os.path.split(self.datasets[0].filename)[-1])[0] + f", {ngoodchan} chans"
[docs]
@show_progress("summarize_data")
def summarize_data(self, peak_time_microsec=None, pretrigger_ignore_microsec=None,
cut_pre=0, cut_post=0,
include_badchan=False, forceNew=False, use_cython=True, doPretrigFit=False):
"""Summarize the data with per-pulse summary quantities for each channel.
peak_time_microsec will be determined automatically if None, and will be
stored in channels as ds.peak_samplenumber.
Args:
use_cython uses a cython (aka faster) implementation of summarize.
"""
nchan = float(len(self.channel.keys())) if include_badchan else float(
self.num_good_channels)
for i, ds in enumerate(self.iter_channels(include_badchan)):
try:
ds.summarize_data(peak_time_microsec=peak_time_microsec,
pretrigger_ignore_microsec=pretrigger_ignore_microsec,
cut_pre=cut_pre,
cut_post=cut_post,
forceNew=forceNew, use_cython=use_cython,
doPretrigFit=doPretrigFit)
yield (i + 1.0) / nchan
self.hdf5_file.flush()
except Exception as e:
self.set_chan_bad(ds.channum, f"summarize_data failed with {e}")
[docs]
def compute_filters(self, fmax=None, f_3db=None, cut_pre=0, cut_post=0, forceNew=False, category=None, filter_type="ats"):
if category is None:
category = {}
LOG.warning(
'compute_filters is deprecated and will eventually be removed, please '
'use compute_ats_filter or compute_5lag_filter directly')
for ds in self.datasets:
if hasattr(ds, "_use_new_filters"):
raise Exception(
"ds._use_new_filters is deprecated, use the filter_type argument to this function instead")
if filter_type == "ats":
self.compute_ats_filter(fmax=fmax, f_3db=f_3db, cut_pre=cut_pre,
cut_post=cut_post, forceNew=forceNew, category=category)
elif filter_type == "5lag":
self.compute_5lag_filter(fmax=fmax, f_3db=f_3db, cut_pre=cut_pre,
cut_post=cut_post, forceNew=forceNew, category=category)
else:
raise Exception("filter_type must be one of `ats` or `5lag`")
[docs]
def pulse_model_to_hdf5(self, hdf5_file=None, n_basis=6, replace_output=False,
maximum_n_pulses=4000, extra_n_basis_5lag=0, noise_weight_basis=True,
category=None, f_3db_5lag=None, _rethrow=False):
if category is None:
category = {}
if hdf5_file is None:
basename, _ = self.datasets[0].filename.split("chan")
hdf5_filename = basename + "model.hdf5"
if os.path.isfile(hdf5_filename):
if not replace_output:
raise Exception(
f"file {hdf5_filename} already exists, pass replace_output = True to overwrite")
with h5py.File(hdf5_filename, "w") as h5:
self._pulse_model_to_hdf5(
h5, n_basis, pulses_for_svd=None,
extra_n_basis_5lag=extra_n_basis_5lag, maximum_n_pulses=maximum_n_pulses,
category=category, noise_weight_basis=noise_weight_basis, f_3db_5lag=f_3db_5lag, _rethrow=_rethrow)
LOG.info("writing pulse_model to %s", hdf5_filename)
else:
hdf5_filename = hdf5_file.filename
LOG.info("writing pulse_model to %s", hdf5_filename)
self._pulse_model_to_hdf5(
hdf5_file, n_basis, maximum_n_pulses=maximum_n_pulses,
extra_n_basis_5lag=extra_n_basis_5lag, f_3db_5lag=f_3db_5lag, category=category)
return hdf5_filename
@property
def external_trigger_subframe_count(self):
if self._external_trigger_subframe_count is None:
self.subframe_divisions = self.first_good_dataset.subframe_divisions
possible_files = ljh_get_extern_trig_fnames(self.first_good_dataset.filename)
if os.path.isfile(possible_files["hdf5"]):
h5 = h5py.File(possible_files["hdf5"], "r")
ds_name = "trig_times_w_offsets" if "trig_times_w_offsets" in h5 else "trig_times"
self._external_trigger_subframe_count = h5[ds_name]
elif os.path.isfile(possible_files["binary"]):
with open(possible_files["binary"], "rb") as f:
header_text = f.readline().decode()
m = re.match(r".*\(nrow=(.*)\)\n", header_text)
if m is not None:
self.subframe_divisions = int(m.groups()[0])
for ds in self.datasets:
ds.subframe_divisions = self.subframe_divisions
self._external_trigger_subframe_count = np.fromfile(f, dtype="int64")
else:
raise OSError("No external trigger files found: ", possible_files)
self.subframe_timebase = self.timebase / float(self.subframe_divisions)
for ds in self.datasets:
ds.subframe_timebase = self.subframe_timebase
return self._external_trigger_subframe_count
@property
def external_trigger_subframe_as_seconds(self):
"""This is not a posix timestamp, it is just the external trigger subframecount converted to seconds
based on the nominal clock rate of the crate.
"""
return self.external_trigger_subframe_count[:] / float(self.subframe_divisions) * self.timebase
[docs]
def calc_external_trigger_timing(self, forceNew=False):
ds = self.first_good_dataset
external_trigger_subframe_count = np.asarray(ds.external_trigger_subframe_count[:], dtype=np.int64)
for ds in self:
try:
if ("subframes_after_last_external_trigger" in ds.hdf5_group) and \
("subframes_until_next_external_trigger" in ds.hdf5_group) and \
("subframes_from_nearest_external_trigger" in ds.hdf5_group) and (not forceNew):
continue
subframes_after_last, subframes_until_next = \
mass.core.analysis_algorithms.nearest_arrivals(ds.p_subframecount[:],
external_trigger_subframe_count)
nearest = np.fmin(subframes_after_last, subframes_until_next)
for name, values in zip(("subframes_after_last_external_trigger",
"subframes_until_next_external_trigger",
"subframes_from_nearest_external_trigger"),
(subframes_after_last, subframes_until_next, nearest)):
h5dset = ds.hdf5_group.require_dataset(name, (ds.nPulses,), dtype=np.int64)
h5dset[:] = values
except Exception:
self.set_chan_bad(ds.channum, "calc_external_trigger_timing")
[docs]
def read_trace(self, record_num, dataset_num=0, channum=None):
"""Read one trace from cache or disk.
Args:
record_num (int): the pulse record number to read.
dataset_num (int): the dataset number to use
channum (int): the channel number to use (if both this and dataset_num
are given, use channum in preference).
Returns:
an ndarray: the pulse numbered <record_num>
"""
ds = self.channel.get(channum, self.datasets[dataset_num])
return ds.read_trace(record_num)
[docs]
def plot_traces(self, pulsenums, dataset_num=0, channum=None, pulse_summary=True, axis=None, # noqa: PLR0917
difference=False, residual=False, valid_status=None, shift1=False):
"""Plot some example pulses, given by record number.
Args:
<pulsenums> A sequence of record numbers, or a single number.
<dataset_num> Dataset index (0 to n_dets-1, inclusive). Will be used only if
<channum> is invalid.
<channum> Channel number. If valid, it will be used instead of dataset_num.
<pulse_summary> Whether to put text about the first few pulses on the plot
(default True)
<axis> A plt axis to plot on (default None, i.e., create a new axis)
<difference> Whether to show successive differences (that is, d(pulse)/dt) or the raw data
(default False).
<residual> Whether to show the residual between data and opt filtered model,
or just raw data (default False).
<valid_status> If None, plot all pulses in <pulsenums>. If "valid" omit any from that set
that have been cut. If "cut", show only those that have been cut.
(default None).
<shift1> Whether to take pulses with p_shift1==True and delay them by
1 sample (default False, i.e., show the pure raw data w/o shifting).
"""
if channum in self.channel:
dataset = self.channel[channum]
dataset_num = dataset.index
else:
dataset = self.datasets[dataset_num]
if channum is not None:
LOG.info("Cannot find channum[%d], so using dataset #%d", channum, dataset_num)
return dataset.plot_traces(pulsenums, pulse_summary, axis, difference,
residual, valid_status, shift1)
[docs]
def plot_summaries(self, quantity, valid='uncut', downsample=None, log=False, hist_limits=None, # noqa: PLR0914
channel_numbers=None, dataset_numbers=None):
"""Plot a summary of one quantity from the data set.
This plot includes time series and histograms of this quantity. This
method plots all channels in the group, but only one quantity. If you
would rather see all quantities for one channel, then use the group's
group.channel[i].plot_summaries() method.
Args:
quantity: A case-insensitive whitespace-ignored one of the following list, or the numbers
that go with it:
"Pulse RMS" (0)
"Pulse Avg" (1)
"Peak Value" (2)
"Pretrig RMS" (3)
"Pretrig Mean" (4)
"Max PT Deriv" (5)
"Rise Time" (6)
"Peak Time" (7)
"Peak Index" (8)
valid: The words 'uncut' or 'cut', meaning that only uncut or cut data
are to be plotted *OR* None, meaning that all pulses should be plotted.
downsample (int): To prevent the scatter plots (left panels) from getting too crowded,
plot only one out of this many samples. If None, then plot will be
downsampled to 10,000 total points.
log (bool): Use logarithmic y-axis on the histograms (right panels).
hist_limits: if not None, limit the right-panel histograms to this range.
channel_numbers: A sequence of channel numbers to plot. If None, then plot all.
dataset_numbers: A sequence of the datasets [0...n_channels-1] to plot. If None
(the default) then plot all datasets in numerical order. But ignored
if channel_numbers is not None.
"""
plottables = (
("p_pulse_rms", 'Pulse RMS', 'magenta', None),
("p_pulse_average", 'Pulse Avg', 'purple', [0, 5000]),
("p_peak_value", 'Peak value', 'blue', None),
("p_pretrig_rms", 'Pretrig RMS', 'green', [0, 4000]),
("p_pretrig_mean", 'Pretrig Mean', '#00ff26', None),
("p_postpeak_deriv", 'Max PostPk deriv', 'gold', [0, 700]),
("p_rise_time[:]*1e3", 'Rise time (ms)', 'orange', [0, 12]),
("p_peak_time[:]*1e3", 'Peak time (ms)', 'red', [-3, 9]),
("p_peak_index[:]", 'Peak index', 'red', [600, 800])
)
quant_names = [p[1].lower().replace(" ", "") for p in plottables]
if quantity in range(len(quant_names)):
plottable = plottables[quantity]
else:
quantity = quant_names.index(quantity.lower().replace(" ", ""))
plottable = plottables[quantity]
MAX_TO_PLOT = 16
if channel_numbers is None:
if dataset_numbers is None:
datasets = list(self)
if len(datasets) > MAX_TO_PLOT:
datasets = datasets[:MAX_TO_PLOT]
else:
datasets = [self.datasets[i] for i in dataset_numbers]
channel_numbers = [ds.channum for ds in datasets]
else:
datasets = [self.channel[i] for i in channel_numbers]
# Plot timeseries with 0 = the last 00 UT during or before the run.
last_record = np.max([ds.p_timestamp[-1] for ds in self])
last_midnight = last_record - (last_record % 86400)
hour_offset = last_midnight / 3600.
plt.clf()
ny_plots = len(datasets)
for i, (channum, ds) in enumerate(zip(channel_numbers, datasets)):
# Convert "uncut" or "cut" to array of all good or all bad data
if isstr(valid):
if "uncut" in valid.lower():
valid_mask = ds.cuts.good()
LOG.info("Plotting only uncut data")
elif "cut" in valid.lower():
valid_mask = ds.cuts.bad()
LOG.info("Plotting only cut data")
elif 'all' in valid.lower():
valid_mask = None
LOG.info("Plotting all data, cut or uncut")
else:
raise ValueError(
"If valid is a string, it must contain 'all', 'uncut' or 'cut'.")
if valid_mask is not None:
nrecs = valid_mask.sum()
if downsample is None:
downsample = max(nrecs // 10000, 1)
hour = ds.p_timestamp[valid_mask][::downsample] / 3600.0
else:
nrecs = ds.nPulses
if downsample is None:
downsample = max(ds.nPulses // 10000, 1)
hour = ds.p_timestamp[::downsample] / 3600.0
LOG.info("Chan %3d (%d records; %d in scatter plots)", channum, nrecs, hour.shape[0])
(vect, label, color, default_limits) = plottable
if hist_limits is None:
limits = default_limits
else:
limits = hist_limits
# Vectors are being sampled and multiplied, so eval() is needed.
vect = eval(f"ds.{vect}")[valid_mask]
# Scatter plots on left half of figure
if i == 0:
ax_master = plt.subplot(ny_plots, 2, 1 + i * 2)
else:
plt.subplot(ny_plots, 2, 1 + i * 2, sharex=ax_master)
if len(vect) > 0:
plt.plot(hour - hour_offset, vect[::downsample], '.', ms=1, color=color)
else:
plt.text(.5, .5, 'empty', ha='center', va='center', size='large',
transform=plt.gca().transAxes)
if i == 0:
plt.title(label)
plt.ylabel(f"Ch {channum}")
if i == ny_plots - 1:
plt.xlabel("Time since last UT midnight (hours)")
# Histograms on right half of figure
if i == 0:
axh_master = plt.subplot(ny_plots, 2, 2 + i * 2)
elif 'Pretrig Mean' == label:
plt.subplot(ny_plots, 2, 2 + i * 2)
else:
plt.subplot(ny_plots, 2, 2 + i * 2, sharex=axh_master)
if limits is None:
in_limit = np.ones(len(vect), dtype=bool)
else:
in_limit = np.logical_and(vect > limits[0], vect < limits[1])
if in_limit.sum() <= 0:
plt.text(.5, .5, 'empty', ha='center', va='center', size='large',
transform=plt.gca().transAxes)
else:
contents, _bins, _patches = plt.hist(vect[in_limit], 200, log=log,
histtype='stepfilled', fc=color, alpha=0.5)
if i == ny_plots - 1:
plt.xlabel(label)
if log:
plt.ylim(ymin=contents.min())
plt.tight_layout()
[docs]
def make_masks(self, pulse_avg_range=None,
pulse_peak_range=None,
pulse_rms_range=None, gains=None):
"""Generate a sequence of masks for use in compute_average_pulses().
Args:
pulse_avg_range -- A 2-sequence giving the (minimum,maximum) p_pulse_average
pulse_peak_range -- A 2-sequence giving the (minimum,maximum) p_peak_value
pulse_rms_range -- A 2-sequence giving the (minimum,maximum) p_pulse_rms
gains -- The set of gains to use, if any.
Returns:
a list of ndvectors of boolean dtype, one list per channel.
Each vector says whether each pulse in that channel is in the given
range of allowed pulse sizes.
"""
for ds in self:
if ds.nPulses == 0:
self.set_chan_bad(ds.channum, "has 0 pulses")
masks = []
if gains is None:
gains = np.ones(self.n_channels)
nranges = 0
if pulse_avg_range is not None:
nranges += 1
vectname = "p_pulse_average"
pmin, pmax = pulse_avg_range
if pulse_peak_range is not None:
nranges += 1
vectname = "p_peak_value"
pmin, pmax = pulse_peak_range
if pulse_rms_range is not None:
nranges += 1
vectname = "p_pulse_rms"
pmin, pmax = pulse_rms_range
if nranges == 0:
raise ValueError("Call make_masks with one of pulse_avg_range"
" pulse_rms_range, or pulse_peak_range specified.")
if nranges > 1:
LOG.warning(
"Warning: make_masks uses only one range argument. Checking only '%s'.", vectname)
middle = 0.5 * (pmin + pmax)
abs_lim = 0.5 * np.abs(pmax - pmin)
for gain, dataset in zip(gains, self.datasets):
v = dataset.__dict__[vectname][:]
m = np.abs(v / gain - middle) <= abs_lim
m = np.logical_and(m, dataset.cuts.good())
masks.append(m)
return masks
[docs]
def compute_average_pulse(self, masks, subtract_mean=True, forceNew=False):
"""Compute an average pulse in each TES channel.
Store the averages in self.datasets.average_pulse, a length nSamp vector.
Note that this method replaces any previously computed self.datasets.average_pulse
Args:
masks: A sequence of length self.n_channels, one sequence per channel.
The elements of `masks` should be booleans or interpretable as booleans.
subtract_mean (bool): whether each average pulse will subtract a constant
to ensure that the pretrigger mean (first self.nPresamples elements) is zero.
"""
if len(masks) != len(self.datasets):
raise ValueError("masks must include exactly one mask per data channel")
# Make sure that masks is a sequence of 1D arrays of the right shape
for i, m in enumerate(masks):
if not isinstance(m, np.ndarray):
raise ValueError(f"masks[{i}] is not a np.ndarray")
for (mask, ds) in zip(masks, self.datasets):
if ds.channum not in self.good_channels:
continue
ds.compute_average_pulse(mask, subtract_mean=subtract_mean, forceNew=forceNew)
[docs]
def plot_average_pulses(self, axis=None, channels=None, cmap=None, legend=True,
fcut=None, include_badchan=False):
"""Plot average pulse for channel number <channum> on matplotlib.Axes
<axis>, or on a new Axes if <axis> is None. If <channum> is not a valid
channel number, then plot all average pulses. If <fcut> is not None,
then lowpass filter the traces with this cutoff frequency prior to
plotting.
"""
if axis is None:
plt.clf()
axis = plt.subplot(111)
if cmap is None:
cmap = plt.cm.nipy_spectral
dt = (np.arange(self.nSamples) - self.nPresamples) * self.timebase * 1e3
if channels is None:
dsets = list(self.iter_channels(include_badchan=include_badchan))
else:
dsets = [self.channel[c] for c in channels]
nplot = len(dsets)
for i, ds in enumerate(dsets):
avg_pulse = ds.average_pulse[:].copy()
if fcut is not None:
avg_pulse = mass.core.analysis_algorithms.filter_signal_lowpass(
avg_pulse, 1. / self.timebase, fcut)
plt.plot(dt, avg_pulse, label=f"Chan {ds.channum}", color=cmap(float(i) / nplot))
plt.title("Average pulse for each channel when it is hit")
plt.xlabel("Time past trigger (ms)")
plt.ylabel("Raw counts")
plt.xlim([dt[0], dt[-1]])
if legend:
plt.legend(loc='best')
if nplot > 12:
ltext = axis.get_legend().get_texts()
plt.setp(ltext, fontsize='small')
[docs]
def plot_filters(self, axis=None, channels=None, cmap=None,
filtname="filt_noconst", legend=True):
"""Plot the optimal filters.
Args:
channels: Sequence of channel numbers to display. If None, then show all.
"""
if axis is None:
plt.clf()
axis = plt.subplot(111)
if channels is None:
channels = list(self.channel.keys())
channels.sort()
if cmap is None:
cmap = plt.cm.nipy_spectral
axis.grid(True)
for ds_num, channum in enumerate(channels):
if channum not in self.channel:
continue
ds = self.channel[channum]
if ds.filter is None:
continue
plt.plot(ds.filter.__dict__[filtname], label=f"Chan {ds.channum}",
color=cmap(float(ds_num) / len(channels)))
plt.xlabel("Sample number")
if legend:
plt.legend(loc='best')
if len(channels) > 12:
ltext = axis.get_legend().get_texts()
plt.setp(ltext, fontsize='small')
[docs]
def summarize_filters(self, filter_name='noconst', std_energy=5898.8):
rms_fwhm = np.sqrt(np.log(2) * 8) # FWHM is this much times the RMS
LOG.info('V/dV for time, Fourier filters: ')
for i, ds in enumerate(self):
try:
if ds.filter is not None:
rms = ds.filter.variances[filter_name]**0.5
else:
rms = ds.hdf5_group[f'filters/filt_{filter_name}'].attrs['variance']**0.5
v_dv = (1 / rms) / rms_fwhm
LOG.info("Chan %3d filter %-15s Predicted V/dV %6.1f Predicted res at %.1f eV: %6.1f eV",
ds.channum, filter_name, v_dv, std_energy, std_energy / v_dv)
except Exception as e:
LOG.warning("Filter %d can't be used", i)
LOG.warning(e)
[docs]
def report(self):
"""Report on the number of data points and similar."""
for ds in self.datasets:
good = ds.cuts.good()
ng = ds.cuts.good().sum()
dt = (ds.p_timestamp[good][-1] * 1.0 - ds.p_timestamp[good][0]) # seconds
npulse = np.arange(len(good))[good][-1] - good.argmax() + 1
rate = (npulse - 1.0) / dt
print(f"chan {ds.channum:3d} {npulse:6d} pulses ({rate:6.3f} Hz "
f"over {dt / 3600.:6.4f} hr) {100. * ng / npulse:6.3f}% good")
[docs]
def plot_noise_autocorrelation(self, axis=None, channels=None, cmap=None,
legend=True):
"""Plot the noise autocorrelation functions.
Args:
channels: Sequence of channel numbers to display. If None, then show all.
"""
if axis is None:
plt.clf()
axis = plt.subplot(111)
if channels is None:
channels = list(self.channel.keys())
channels.sort()
if cmap is None:
cmap = plt.cm.nipy_spectral
axis.grid(True)
for ds_num, channum in enumerate(channels):
if channum not in self.channel:
continue
ds = self.channel[channum]
noise = ds.noise_records
noise.plot_autocorrelation(axis=axis, label=f'Chan {channum}',
color=cmap(float(ds_num) / len(channels)))
plt.xlabel("Time lag (ms)")
if legend:
plt.legend(loc='best')
if len(channels) > 12:
ltext = axis.get_legend().get_texts()
plt.setp(ltext, fontsize='small')
[docs]
def save_pulse_energies_ascii(self, filename='all'):
filename += '.energies'
energy = []
for ds in self:
energy = np.hstack((energy, ds.p_energy[ds.cuts.good()]))
np.savetxt(filename, energy, fmt='%.10e')
[docs]
def copy(self):
g = TESGroup(self.filenames, self.noise_filenames)
g.__dict__.update(self.__dict__)
g.datasets = tuple([d.copy() for d in self.datasets])
return g
[docs]
def set_segment_size(self, seg_size):
self.n_segments = 0
for ds in self:
ds.pulse_records.set_segment_size(seg_size)
self.n_segments = max(self.n_segments, ds.pulse_records.pulses_per_seg)
self.pulses_per_seg = self.first_good_dataset.pulse_records.pulses_per_seg
for ds in self:
assert ds.pulse_records.pulses_per_seg == self.pulses_per_seg
[docs]
def plot_noise(self, axis=None, channels=None, cmap=None, scale_factor=1.0,
sqrt_psd=False, legend=True, include_badchan=False):
"""Plot the noise power spectra.
Args:
channels: Sequence of channels to display. If None, then show all.
scale_factor: Multiply counts by this number to get physical units.
sqrt_psd: Whether to show the sqrt(PSD) or (by default) the PSD itself.
cmap: A matplotlib color map. Defaults to something.
legend (bool): Whether to plot the legend (default True)
"""
if axis is None:
plt.clf()
axis = plt.subplot(111)
if channels is None:
dsets = list(self.iter_channels(include_badchan=include_badchan))
else:
dsets = [self.channel[c] for c in channels]
nplot = len(dsets)
if cmap is None:
cmap = plt.cm.nipy_spectral
if scale_factor == 1.0:
units = "Counts"
else:
units = "Scaled counts"
axis.grid(True)
for i, ds in enumerate(dsets):
channum = ds.channum
yvalue = ds.noise_psd[:] * scale_factor**2
if sqrt_psd:
yvalue = np.sqrt(yvalue)
axis.set_ylabel(f"PSD$^{1 / 2}$ ({units}/Hz$^{1 / 2}$)")
try:
df = ds.noise_psd.attrs['delta_f']
freq = np.arange(1, 1 + len(yvalue)) * df
axis.plot(freq, yvalue, label=f'Chan {channum}',
color=cmap(float(i) / nplot))
except Exception:
LOG.warning("WARNING: Could not plot channel %4d.", channum)
continue
axis.set_xlim([freq[1] * 0.9, freq[-1] * 1.1])
axis.set_ylabel(f"Power Spectral Density ({units}^2/Hz)")
axis.set_xlabel("Frequency (Hz)")
axis.loglog()
if legend:
plt.legend(loc='best')
if nplot > 12:
ltext = axis.get_legend().get_texts()
plt.setp(ltext, fontsize='small')
[docs]
def correct_flux_jumps(self, flux_quant):
'''Remove 'flux' jumps' from pretrigger mean.
When using umux readout, if a pulse is recorded that has a very fast
rising edge (e.g. a cosmic ray), the readout system will "slip" an
integer number of flux quanta. This means that the baseline level
returned to after the pulse will different from the pretrigger value by
an integer number of flux quanta. This causes that pretrigger mean
summary quantity to jump around in a way that causes trouble for the
rest of MASS. This function attempts to correct these jumps.
Arguments:
flux_quant -- size of 1 flux quantum
'''
for ds in self:
try:
ds.correct_flux_jumps(flux_quant)
except Exception:
self.set_chan_bad(ds.channum, "failed to correct flux jumps")
[docs]
def sanitize_p_filt_phase(self):
self.register_boolean_cut_fields("filt_phase")
for ds in self:
ds.cuts.cut("filt_phase", np.abs(ds.p_filt_phase[:]) > 2)
[docs]
def plot_count_rate(self, bin_s=60):
bin_edge = np.arange(self.first_good_dataset.p_timestamp[0],
np.amax(self.first_good_dataset.p_timestamp), bin_s)
bin_centers = bin_edge[:-1] + 0.5 * (bin_edge[1] - bin_edge[0])
rates_all = np.array([ds.count_rate(False, bin_edge)[1] for ds in self])
rates_good = np.array([ds.count_rate(True, bin_edge)[1] for ds in self])
plt.figure()
plt.subplot(311)
plt.plot(bin_centers, rates_all.T)
plt.ylabel("all by chan")
plt.subplot(312)
plt.plot(bin_centers, rates_good.T)
plt.ylabel("good by chan")
plt.subplot(313)
LOG.info(rates_all.sum(axis=-1).shape)
plt.plot(bin_centers, rates_all.sum(axis=0))
plt.ylabel("all array")
plt.grid("on")
plt.figure()
plt.plot([ds.channum for ds in self], rates_all.mean(axis=1), 'o', label="all")
plt.plot([ds.channum for ds in self], rates_good.mean(axis=1), 'o', label="good")
plt.xlabel("channel number")
plt.ylabel("average trigger/s")
plt.grid("on")
plt.legend()
[docs]
def plot_summary_pages(self, x_attr, y_attr, x_range=None, y_range=None, subplot_shape=(3, 4), # noqa: PLR0917
suffix=None, lines=None, down=10, fileformat='png', one_file=False):
'''Make scatter plots of summary quantities for all channels.
This creates the plots for each good channel, placing multiple plots on
each page, and saves each page to its own file. Pulses that pass cuts are
plotted in blue, and cut pulses are plotted in gray. The file names have
the form "<x_attr>.vs.<y-attr>-<suffix>-<page number>.png". The default
value for the suffix is that pulsefile's base name.
Arguments:
x_attr -- string containing name of X value attribute
y_attr -- string containing name of Y value attribute
x_range -- if not None, values to use for x limits. Defaults to None.
y_range -- if not None, values to use for y limits. Defaults to None.
subplot_shape -- tuple indicating shape of subplots. First element is
number of rows, second is number of columns.
suffix -- suffix to use for filenames. Defaults to None, which causes the
function to use the first 15 characters of the pulse filename
for the first data set (which typically will have a value
like '20171017_103454')
lines -- if not None, must contain a hashtable, keyed off of channel
number. The value for each channel is a list of numbers. A
dashed horizontal line is plotted for each value in this list.
Defaults to None.
down -- downsample by this factor. Defaults to 10
fileformat -- output format ('png', 'pdf', etc). Must be a value supported by
your installation of matplotlib.
one_file -- If True, combine all pages to one pdf file. If False, use
separate files for all pages. Defaults to False. If format is
something other than 'pdf', this uses the ImageMagick program
`convert` to combine the files. You can install it on ubuntu
via `apt-get install imagemagick`.
'''
if suffix is None:
suffix = os.path.basename(self.channels[0].datafile.filename)[:15]
filename_template_per_file = f'{y_attr}.vs.{x_attr}-{suffix}-%%03d.{fileformat}'
filename_template_glob = f'{y_attr}.vs.{x_attr}-{suffix}-[0-9][0-9][0-9].{fileformat}'
filename_one_file = f'{y_attr}.vs.{x_attr}-{suffix}.pdf' % (y_attr, x_attr, suffix)
def helper(ds, ax):
ch = ds.channum
g = ds.good()
b = np.logical_not(g)
x_g = getattr(ds, x_attr)[g][::down]
x_b = getattr(ds, x_attr)[b][::down]
y_g = getattr(ds, y_attr)[g][::down]
y_b = getattr(ds, y_attr)[b][::down]
if x_attr == 'p_timestamp':
x_g = (x_g - getattr(ds, x_attr)[0]) / (60 * 60)
x_b = (x_b - getattr(ds, x_attr)[0]) / (60 * 60)
plt.plot(x_b, y_b, '.', markersize=2.5, color='gray')
plt.plot(x_g, y_g, '.', markersize=2.5, color='blue')
if lines is not None:
x_lo = min(np.amin(x_g), np.amin(x_b))
x_hi = max(np.amax(x_g), np.amax(x_b))
for line in lines[ch]:
plt.plot([x_lo, x_hi], [line, line], '--k')
if x_range is not None:
plt.xlim(x_range)
if y_range is not None:
plt.ylim(y_range)
if x_attr == 'p_timestamp':
plt.xlabel('Time (hours)')
else:
plt.xlabel(x_attr, fontsize=8)
plt.ylabel(y_attr, fontsize=8)
ax.tick_params(axis='both', labelsize=8)
plt.title('MATTER Ch%d' % ch, fontsize=10)
plot_multipage(self, subplot_shape, helper, filename_template_per_file,
filename_template_glob, filename_one_file, format, one_file)
[docs]
def plot_histogram_pages(self, attr, valrange, bins, y_range=None, subplot_shape=(3, 4), # noqa: PLR0917
suffix=None, lines=None, fileformat='png', one_file=False):
'''Make plots of histograms for all channels.
This creates the plots for each good channel, placing multiple plots on
each page, and saves each page to its own file. Only pulses that pass cuts
are included. The file names have the form "<attr>-hist-<suffix>-<page
number>.png". The default value for the suffix is that pulsefile's base
name.
Arguments:
attr -- string containing name of attribute to plot
valrange -- range of value over which to histogram (passed into histogram function)
bins -- number of bins (passed into histogram function)
y_range -- if not None, values to use for y limits. Defaults to None.
subplot_shape -- tuple indicating shape of subplots. First element is
number of rows, second is number of columns.
suffix -- suffix to use for filenames. Defaults to None, which causes the
function to use the first 15 characters of the pulse filename
for the first data set (which typically will have a value
like '20171017_103454')
lines -- if not None, must contain a hashtable, keyed off of channel
number. The value for each channel is a list of numbers. A
dashed horizontal line is plotted for each value in this list.
Defaults to None.
fileformat -- output format ('png', 'pdf', etc). Must be a value supported by
your installation of matplotlib.
one_file -- If True, combine all pages to one pdf file. If False, use
separate files for all pages. Defaults to False. If format is
something other than 'pdf', this uses the ImageMagick program
`convert` to combine the files. You can install it on ubuntu
via `apt-get install imagemagick`.
'''
if suffix is None:
suffix = os.path.basename(self.channels[0].datafile.filename)[:15]
filename_template_per_file = f'{attr}-hist-{suffix}-%%03d.{fileformat}'
filename_template_glob = f'{attr}-hist-{suffix}-[0-9][0-9][0-9].{fileformat}'
filename_one_file = f'{attr}-hist-{suffix}.pdf'
def helper(ds, ax):
g = ds.good()
x_g = getattr(ds, attr)[g]
# I generally prefer the "stepped" histtype, but that seems to interact
# poorly with log scale - the automatic choice of axis limits gets
# screwed up.
plt.hist(x_g, range=range, bins=bins, histtype='bar')
plt.yscale('log')
if lines is not None:
x_lo = np.amin(x_g)
x_hi = np.amax(x_g)
for line in lines[ds.channum]:
plt.plot([x_lo, x_hi], [line, line], '-k')
if y_range is not None:
plt.ylim(y_range)
plt.xlabel(attr, fontsize=8)
plt.ylabel('Counts / bin', fontsize=8)
ax.tick_params(axis='both', labelsize=8)
plt.title(f'MATTER Ch{ds.channum}', fontsize=10)
plot_multipage(self, subplot_shape, helper, filename_template_per_file,
filename_template_glob, filename_one_file, format, one_file)
[docs]
def hists(self, bin_edges, attr="p_energy", t0=0, tlast=1e20, category={}, g_func=None):
"""return a tuple of (bin_centers, countsdict). automatically filters out nan values
where countsdict is a dictionary mapping channel numbers to numpy arrays of counts
bin_edges -- edges of bins unsed for histogram
attr -- which attribute to histogram "p_energy" or "p_filt_value"
t0 and tlast -- cuts all pulses outside this timerange before fitting
g_func -- a function a function taking a MicrocalDataSet and returnning a vector like ds.good() would return
This vector is anded with the vector calculated by the histogrammer """
bin_edges = np.array(bin_edges)
bin_centers = 0.5 * (bin_edges[1:] + bin_edges[:-1])
countsdict = {ds.channum: ds.hist(bin_edges, attr, t0, tlast, category, g_func)[1] for ds in self}
return bin_centers, countsdict
[docs]
def hist(self, bin_edges, attr="p_energy", t0=0, tlast=1e20, category={}, g_func=None):
"""return a tuple of (bin_centers, counts) of p_energy of good pulses in all good datasets
(use .hists to get the histograms individually). filters out nan values
bin_edges -- edges of bins unsed for histogram
attr -- which attribute to histogram "p_energy" or "p_filt_value"
t0 and tlast -- cuts all pulses outside this timerange before fitting
g_func -- a function a function taking a MicrocalDataSet and returnning a vector like ds.good() would return
This vector is anded with the vector calculated by the histogrammer
"""
bin_centers, countsdict = self.hists(bin_edges, attr, t0, tlast, category, g_func)
counts = np.zeros_like(bin_centers, dtype="int")
for (k, v) in countsdict.items():
counts += v
return bin_centers, counts
[docs]
def linefit(self, line_name="MnKAlpha", t0=0, tlast=1e20, axis=None, dlo=50, dhi=50, # noqa: PLR0917
binsize=1, bin_edges=None, attr="p_energy", label="full", plot=True,
guess_params=None, ph_units="eV", category={}, g_func=None, has_tails=False):
"""Do a fit to `line_name` and return the fitter. You can get the params results with
fitter.last_fit_params_dict or any other way you like.
line_name -- A string like "MnKAlpha" will get "MnKAlphaFitter", your you can pass in a fitter like a mass.GaussianFitter().
t0 and tlast -- cuts all pulses outside this timerange before fitting
axis -- if axis is None and plot==True, will create a new figure, otherwise plot onto this axis
dlo and dhi and binsize -- by default it tries to fit with bin edges given by np.arange(fitter.spect.nominal_peak_energy-dlo,
fitter.spect.nominal_peak_energy+dhi, binsize)
bin_edges -- pass the bin_edges you want as a numpy array
attr -- default is "p_energy", you could pick "p_filt_value" or others. be sure to pass in bin_edges as well because
the default calculation will probably fail for anything other than p_energy
label -- passed to fitter.plot
plot -- passed to fitter.fit, determine if plot happens
guess_params -- passed to fitter.fit, fitter.fit will guess the params on its own if this is None
ph_units -- passed to fitter.fit, used in plot label
category -- pass {"side":"A"} or similar to use categorical cuts
g_func -- a function a function taking a MicrocalDataSet and returnning a vector like ds.good() would return
holdvals -- a dictionary mapping keys from fitter.params_meaning to values... eg {"background":0, "dP_dE":1}
This vector is anded with the vector calculated by the histogrammer
this should be the same as ds.linefit, but for now I've just copied and pasted the code
"""
assert "energy" in attr
model = mass.getmodel(line_name, has_tails=has_tails)
nominal_peak_energy = model.spect.nominal_peak_energy
if bin_edges is None:
bin_edges = np.arange(nominal_peak_energy - dlo, nominal_peak_energy + dhi, binsize)
bin_centers, counts = self.hist(bin_edges, attr, t0, tlast, category, g_func)
params = model.guess(counts, bin_centers=bin_centers, dph_de=1)
params["dph_de"].set(vary=False)
result = model.fit(counts, params=params, bin_centers=bin_centers)
if plot:
result.plotm(ax=axis, xlabel=f"{attr} ({ph_units})",
ylabel=f"counts per {binsize:0.2f} ({ph_units}) bin",
title=f"{self.shortname}\n{model.spect}")
return result
[docs]
class CrosstalkVeto:
"""An object to allow vetoing of data in 1 channel when another is hit."""
def __init__(self, datagroup=None, window_ms=(-10, 3), pileup_limit=100):
if datagroup is None:
return
window_ms = np.array(window_ms, dtype=int)
self.window_ms = window_ms
self.n_channels = datagroup.n_channels
self.n_pulses = datagroup.nPulses
ms0 = np.array([ds.p_timestamp[0] for ds in datagroup.datasets]).min() * 1e3 + window_ms[0]
ms9 = np.array([ds.p_timestamp[-1] for ds in datagroup.datasets]).max() * 1e3 + window_ms[1]
self.nhits = np.zeros(int(ms9 - ms0 + 1), dtype=np.int8)
self.time0 = ms0
for ds in datagroup.datasets:
g = ds.cuts.good()
vetotimes = np.asarray(ds.p_timestamp[g] * 1e3 - ms0, dtype=np.int64)
vetotimes[vetotimes < 0] = 0
a, b = window_ms
b += 1
for t in vetotimes:
self.nhits[t + a:t + b] += 1
pileuptimes = vetotimes[ds.p_postpeak_deriv[g] > pileup_limit]
LOG.info("%s %d %f %d", vetotimes, len(vetotimes), 1.0e3
* ds.nPulses / (ms9 - ms0), len(pileuptimes))
for t in pileuptimes:
self.nhits[t + b:t + b + 8] += 1
[docs]
def copy(self):
"""Deep copy."""
v = CrosstalkVeto()
v.__dict__ = self.__dict__.copy()
return v
[docs]
def veto(self, times_sec):
"""Return boolean vector for whether a given moment is vetoed. Times are given in
seconds. Resolution is 1 ms for the veto."""
index = np.asarray(times_sec * 1e3 - self.time0 + 0.5, dtype=int)
return self.nhits[index] > 1