Source code for mass.core.fake_data

"""
fake_data - Objects to make fake data for use, e.g., in demonstration scripts.

Joe Fowler, NIST

November 7, 2011
"""

import numpy as np

from mass.core.files import VirtualFile
from mass.core.channel import PulseRecords
from mass.core.channel_group import TESGroup

__all__ = ['FakeDataGenerator']


[docs] class FakeDataGenerator: """An object to create fake data in memory. Can generate a single mass.MicrocalDataSet or a 1+channel mass.TESGroup. Only basic functionality is here so far. Many interesting randomizations could be added in the future, such as gain variation, baseline drift. Pulse pileup should definitely be added. """ def __init__(self, sample_time, n_samples, n_presamples=None, model_peak=None, seed=None): self.rng = np.random.default_rng(seed) # Some defaults that can be overridden before generating fake data self.pretrig_level = 1000 self.rise_speed_us = 200. # in us self.fall_speed_us = 1200. # in us self.white_noise = 30.0 self.model = None self.sample_time_us = sample_time # in us self.n_samples = n_samples if n_presamples is None: self.n_presamples = self.n_samples / 4 else: self.n_presamples = n_presamples self.compute_model(model_peak=model_peak)
[docs] def compute_model(self, model_peak=None): """Compute the noise-free model pulse shape, given the 2 time constants.""" dt_us = (np.arange(self.n_samples) - self.n_presamples - 0.5) * self.sample_time_us self.model = np.exp(-dt_us / self.fall_speed_us) - np.exp(-dt_us / self.rise_speed_us) self.model[dt_us <= 0] = 0 if model_peak is not None: self.model = model_peak * self.model / self.model.max()
def _generate_virtual_file(self, n_pulses, distributions=None, distribution_weights=None, rate=1.0, channum=1): """Return a VirtualFile object with random pulses. Args: n_pulses number of pulses to put in the "file" distributions random distribution of scale factors. If none, all pulses are of unit height distribution_weights relative contribution from each distribution in <distributions>. if None, then all will be weighted equally rate expected number of pulses per second. """ data = np.zeros((n_pulses, self.n_samples), dtype=np.uint16) pulse_times = self.rng.exponential(1.0 / rate, size=n_pulses).cumsum() if distributions is None: scale = np.ones(n_pulses, dtype=float) else: weights = np.asarray(distribution_weights, dtype=float) weights = n_pulses * weights / weights.sum() weights = np.asarray(weights, dtype=int) weights[weights.argmax()] += n_pulses - weights.sum() scale = [] for n, distrib in zip(weights, distributions): scale.append(distrib.rvs(size=n)) scale = np.hstack(scale) self.rng.shuffle(scale) for i in range(n_pulses): data[i, :] = self.model * scale[i] + self.pretrig_level + \ 0.5 + self.rng.standard_normal(self.n_samples) * self.white_noise vfile = VirtualFile(data, times=pulse_times) vfile.filename = "virtual_file_chan%d.vtf" % channum vfile.timebase = self.sample_time_us / 1e6 vfile.nPresamples = self.n_presamples return vfile def _generate_virtual_noise_file(self, n_pulses, lowpass_kludge=0): """Return a VirtualFile object with random noise. Args: n_pulses: number of pulses to put in the "file" """ print('Making fake noise') data = np.zeros((n_pulses, self.n_samples), dtype=np.uint16) pulse_times = np.arange(n_pulses, dtype=float) * self.sample_time_us / 1e6 raw_noise = self.rng.standard_normal((n_pulses, self.n_samples)) * self.white_noise for i in range(lowpass_kludge): raw_noise = 0.5 * (raw_noise + np.roll(raw_noise, 2**i)) data[:, :] = 0.5 + raw_noise vfile = VirtualFile(data, times=pulse_times) vfile.timebase = self.sample_time_us / 1e6 vfile.nPresamples = self.n_presamples return vfile
[docs] def generate_microcal_dataset(self, n_pulses, distributions=None): """Return a single mass.MicrocalDataset""" vfile = self._generate_virtual_file(n_pulses, distributions=distributions) return PulseRecords(vfile), None
[docs] def generate_tesgroup(self, n_pulses, n_noise=1024, distributions=None, distribution_weights=None, nchan=1): """Return a mass.TESGroup with multiple channels in it. Args: n_pulses (int): the number of pulses per channel n_noise (int): the number of noise records per channel distributions: ?? distribution_weights: ?? nchan (int): how many channels to generate (default 1). """ vfiles = [self._generate_virtual_file(n_pulses, distributions=distributions, distribution_weights=distribution_weights) for _i in range(nchan)] nfiles = [self._generate_virtual_noise_file(n_noise) for _i in range(nchan)] data = TESGroup(vfiles, nfiles) # Have to fake the channel numbers, b/c they aren't encoded in filename for i, ds in enumerate(data.datasets): ds.channum = i * 2 + 1 data.channel = {} for ds in data.datasets: data.channel[ds.channum] = ds return data