Source code for mass.core.cut

"""
Created on Apr 22, 2016

"""

import functools
import operator
import numpy as np

from ..common import isstr


def _get_max_width(a, min_width=0):
    max_width = max(map(len, a))
    max_width = max(max_width, min_width)

    return max_width


[docs] class CutDesc(np.ndarray): def __repr__(self): w = _get_max_width(self["name"], min_width=4) + 2 header = "{0:^{width}s}|{1:^34s}".format("name", "mask", width=w) spacer = "-" * w + "+" + "-" * 34 rows = ["{0:^{width}s}| {1:032b} ".format(name.decode(), mask, width=w) for name, mask in self if mask != 0] return "\n".join([header, spacer] + rows)
[docs] class CategoryList(np.ndarray): def __repr__(self): fw = _get_max_width(self["field"], min_width=5) + 2 cw = _get_max_width(self["category"], min_width=8) + 2 w = _get_max_width(map(str, self["code"]), min_width=4) + 2 header = "{0:^{fw}s}|{1:^{cw}s}|{2:^{w}s}".format( "field", "category", "code", fw=fw, cw=cw, w=w) spacer = "-" * fw + "+" + "-" * cw + "+" + "-" * w rows = ["{0:^{fw}s}|{1:^{cw}s}|{2:^{w}d} ".format(field.decode(), category.decode(), code, fw=fw, cw=cw, w=w) for field, category, code in self] return "\n".join([header, spacer] + rows)
[docs] class CutFieldMixin: """A mixin object that gives a class access to lots of features involving boolean per-pulse cuts and per-pulse categorization. """ BUILTIN_BOOLEAN_CUT_FIELDS = ['pretrigger_rms', 'pretrigger_mean', 'pretrigger_mean_departure_from_median', 'peak_time_ms', 'rise_time_ms', 'postpeak_deriv', 'pulse_average', 'min_value', 'timestamp_sec', 'timestamp_diff_sec', 'subframecount_diff_sec', 'peak_value', 'energy', 'timing', "p_filt_phase", 'smart_cuts'] # Categorical cut field item format # [name of field, list of categories, default category] BUILTIN_CATEGORICAL_CUT_FIELDS = [ ['calibration', ['in', 'out'], 'in'], ] CUT_BOOLEAN_FIELD_DESC_DTYPE = np.dtype([("name", np.bytes_, 64), ("mask", np.uint32)]) CUT_CATEGORICAL_FIELD_DESC_DTYPE = np.dtype([("name", np.bytes_, 64), ("mask", np.uint32)]) CUT_CATEGORY_LIST_DTYPE = np.dtype([("field", np.bytes_, 64), ("category", np.bytes_, 64), ("code", np.uint32)])
[docs] def cut_field_desc_init(self): """Initialize the cut field descriptions. This methods expects self to have the hdf5_file attribute. """ if self.hdf5_file: if 'cut_num_used_bits' in self.hdf5_file.attrs: self.hdf5_file.attrs['cut_format_ver'] = b'1' # convert from version 1 to the verion 2 cut_num_used_bits = np.uint32(self.hdf5_file.attrs["cut_num_used_bits"]) self.hdf5_file.attrs['cut_used_bit_flags'] = \ np.uint32((np.uint64(1) << cut_num_used_bits) - 1) self.boolean_cut_desc = np.asarray(self.boolean_cut_desc, dtype=self.CUT_BOOLEAN_FIELD_DESC_DTYPE) self.categorical_cut_desc = np.asarray(self.categorical_cut_desc[['name', 'mask']], dtype=self.CUT_CATEGORICAL_FIELD_DESC_DTYPE) self.cut_category_list = np.asarray(list(self.cut_category_list), dtype=self.CUT_CATEGORY_LIST_DTYPE) del self.hdf5_file.attrs['cut_num_used_bits'] # here, we can assume that cut descriptions are empty or version 2 if "cut_used_bit_flags" not in self.hdf5_file.attrs: self.hdf5_file.attrs['cut_used_bit_flags'] = np.uint32(0) if "cut_boolean_field_desc" not in self.hdf5_file.attrs: self.boolean_cut_desc = np.zeros(32, dtype=self.CUT_BOOLEAN_FIELD_DESC_DTYPE) self.register_boolean_cut_fields(*self.BUILTIN_BOOLEAN_CUT_FIELDS) if ("cut_categorical_field_desc" not in self.hdf5_file.attrs) and \ ("cut_category_list" not in self.hdf5_file.attrs): self.categorical_cut_desc = np.zeros(0, dtype=self.CUT_CATEGORICAL_FIELD_DESC_DTYPE) self.cut_category_list = np.zeros(0, dtype=self.CUT_CATEGORY_LIST_DTYPE) for categorical_desc in self.BUILTIN_CATEGORICAL_CUT_FIELDS: self.register_categorical_cut_field(*categorical_desc) if "cut_format_ver" not in self.hdf5_file.attrs: # to allow TESGroupHDF5 with in read only mode self.hdf5_file.attrs['cut_format_ver'] = b'2'
@property def boolean_cut_desc(self): return self.hdf5_file.attrs["cut_boolean_field_desc"].view(CutDesc) @boolean_cut_desc.setter def boolean_cut_desc(self, value): self.hdf5_file.attrs["cut_boolean_field_desc"] = value @property def categorical_cut_desc(self): return self.hdf5_file.attrs["cut_categorical_field_desc"].view(CutDesc) @categorical_cut_desc.setter def categorical_cut_desc(self, value): self.hdf5_file.attrs["cut_categorical_field_desc"] = value @property def cut_category_list(self): return self.hdf5_file.attrs["cut_category_list"].view(CategoryList) @cut_category_list.setter def cut_category_list(self, value): self.hdf5_file.attrs["cut_category_list"] = value @property def cut_used_bit_flags(self): return self.hdf5_file.attrs["cut_used_bit_flags"] @cut_used_bit_flags.setter def cut_used_bit_flags(self, value): self.hdf5_file.attrs["cut_used_bit_flags"] = np.uint32(value)
[docs] def cut_field_categories(self, field_name): category_list = self.cut_category_list return {category.decode(): code for field, category, code in category_list if field == field_name.encode()}
@staticmethod def __lowest_available_cut_bit(cut_used_bit_flags): """Returns the index of lowest available cut bit. Args: cut_used_bit_flags (np.uint32): This number represents a status of used cut bits. It doesn't need to be same with the current status of cut bits. Return: np.uint32 """ uint32_one = np.uint32(1) for i in range(32): trial_bit_pos = np.uint32(i) trial_bit = uint32_one << trial_bit_pos if cut_used_bit_flags & trial_bit == 0: return trial_bit_pos raise ValueError("No available cut bit.")
[docs] def register_boolean_cut_fields(self, *names): """Register one or more boolean cut field(s). If any of given boolean cut fields already exist, it silently ignore. Args: names (sequence of str): name(s) of one or more cut fields(s). """ boolean_fields = self.boolean_cut_desc cut_used_bit_flags = self.cut_used_bit_flags new_fields = [n.encode() for n in names if n.encode() not in boolean_fields["name"]] uint32_one = np.uint32(1) for new_field in new_fields: available_bit_pos = self.__lowest_available_cut_bit(cut_used_bit_flags) boolean_fields[available_bit_pos] = (new_field, uint32_one << available_bit_pos) cut_used_bit_flags |= (uint32_one << available_bit_pos) self.boolean_cut_desc = boolean_fields self.cut_used_bit_flags = cut_used_bit_flags
[docs] def unregister_boolean_cut_fields(self, *names): """Unregister one or more boolean cut fields. Args: names (sequence of str): name(s) of one or more boolean cut fields(s). Raises: KeyError: when any of cut fields don't exist. """ boolean_fields = self.boolean_cut_desc enc_names = [name.encode() for name in names] for name in enc_names: if not name or name not in boolean_fields['name']: raise KeyError(f"{name.decode():s} is not a registered boolean field.") clear_mask = np.uint32(0) for i in range(32): if boolean_fields[i][0] in enc_names: clear_mask |= boolean_fields[i][1] boolean_fields[i] = (b'', 0) self.boolean_cut_desc = boolean_fields self.cut_used_bit_flags &= ~clear_mask
[docs] def register_categorical_cut_field(self, name, categories, default="uncategorized"): """Register one categorical cut field. Args: name (str): the name of a new categorical cut field. categories (list[str]): the list of the names of categories of the cut field. "uncategorized" category will be added if it doesn't have already. default (str): the name of default category. """ categorical_fields = self.categorical_cut_desc cut_used_bit_flags = self.cut_used_bit_flags if name.encode() in categorical_fields["name"]: return # categories might be an immutable tuple. # duplicated categories are dropped. And original order is intact. # And it converts categories into str(s). category_list = [] for category in map(str, categories): if category not in category_list: category_list.append(category) default = str(default) # if the default category is already included, it's temporarily removed from the category_list # and insert into at the head of the category_list. if default in category_list: category_list.remove(default) category_list.insert(0, default) num_bits = 1 while (1 << num_bits) < len(category_list): num_bits += 1 individual_bit_masks = [] bit_mask = np.uint32(0) lowest_bit_pos = np.uint32(31) uint32_one = np.uint32(1) for _ in range(num_bits): bit_pos = self.__lowest_available_cut_bit(cut_used_bit_flags | bit_mask) if bit_pos < lowest_bit_pos: lowest_bit_pos = bit_pos bit_mask |= (uint32_one << bit_pos) individual_bit_masks.insert(0, uint32_one << bit_pos) # Updates the 'cut_category_list' attribute new_list = [] for i, category in enumerate(category_list): digits = map(np.uint32, f"{i:032b}"[-num_bits:]) code = np.sum([a * b for a, b in zip(individual_bit_masks, digits)]) new_list.append((name.encode(), category.encode(), code >> lowest_bit_pos)) new_list = np.array(new_list, dtype=self.CUT_CATEGORY_LIST_DTYPE) self.cut_category_list = np.hstack([self.cut_category_list, new_list]) # Needs to update the 'cut_categorical_field_desc' attribute. field_desc_item = np.array([(name.encode(), bit_mask)], dtype=self.CUT_CATEGORICAL_FIELD_DESC_DTYPE) self.categorical_cut_desc = np.hstack([categorical_fields, field_desc_item]) self.cut_used_bit_flags |= bit_mask
[docs] def unregister_categorical_cut_field(self, name): """Unregister one categorical cut field Args: name (str): the name of a categorical cut field to be unregistered. Raises: KeyError: when any of cut fields don't exist. """ categorical_fields = self.categorical_cut_desc category_list = self.cut_category_list if not np.any(categorical_fields['name'] == name.encode()): raise KeyError(f"{name:s} field is not a registered categorical field.") new_categorical_fields = categorical_fields[categorical_fields['name'] != name.encode()] new_category_list = category_list[category_list['field'] != name.encode()] clear_mask = categorical_fields['mask'][categorical_fields['name'] == name.encode()][0] self.categorical_cut_desc = new_categorical_fields self.cut_category_list = new_category_list self.cut_used_bit_flags &= ~clear_mask
[docs] class Cuts: """Object to hold a 32-bit cut mask for each triggered record.""" def __init__(self, n, tes_group, hdf5_group=None): """Create an object to hold n masks of 32 bits each. Args: n (int): the number of pulses tes_group (mass.core.TESGroup): the TESGroup object that holds all channels. hdf5_group : the hdf5 group for the channel that owns the cut. """ self.tes_group = tes_group self.hdf5_group = hdf5_group if hdf5_group is None: self._mask = np.zeros(n, dtype=np.uint32) else: try: self._mask = hdf5_group.require_dataset('mask', shape=(n,), dtype=np.uint32) except TypeError: temp = hdf5_group.require_dataset('mask', shape=(n,), dtype=np.int32)[...] del hdf5_group['mask'] self._mask = hdf5_group.require_dataset('mask', shape=(n,), dtype=np.uint32) self._mask[...] = np.asarray(temp, dtype=np.uint32)
[docs] def cut(self, cut_num, mask): """Set the mask of a single field. It could be a boolean or categorical field. Args: cut_num (string or int): the name of a cut field. mask (np.array(dtype=bool) or np.array(dtype=np.uint32): a cut mask for the cut field. Raises: ValueError: if cut_num or mask don't make sense. """ assert (mask.size == self._mask.size) boolean_field = self.tes_group.boolean_cut_desc categorical_field = self.tes_group.categorical_cut_desc if isinstance(cut_num, (int, np.uint, int)): cut_num = int(cut_num) if (cut_num < 0) or (cut_num > 31): raise ValueError(str(cut_num) + " is out of range.") if boolean_field[cut_num]['name'] == b'': raise ValueError(str(cut_num) + " is not a registered boolean cut.") _, bit_mask = boolean_field[cut_num] self._mask[mask] |= bit_mask elif isstr(cut_num): boolean_g = (boolean_field["name"] == cut_num.encode()) if np.any(boolean_g): _, bit_mask = boolean_field[boolean_g][0] self._mask[mask] |= bit_mask else: categorical_g = (categorical_field["name"] == cut_num.encode()) if np.any(categorical_g): _, bit_mask = categorical_field[categorical_g][0] for i in range(32): bit_pos = np.uint32(i) if (bit_mask >> bit_pos) & np.uint32(1): break temp = self._mask[...] & ~bit_mask category_values = np.asarray(mask, dtype=np.uint32) << bit_pos self._mask[...] = temp | (category_values & bit_mask) else: raise ValueError(cut_num + " field is not found.") else: raise ValueError("cut_num should be a number or a string but is '%s'" % type(cut_num))
[docs] def cut_categorical(self, field, booldict): """Set the value of one categorical cut. Args: field (str): the name of category booldict (dict{str: np.array(dtype=bool)}): Keys are categories of the field and entries are bool vectors of length equal to mask indicating belongingness. Raises: ValueError: if any pulse is assigned to more than one category. """ category_names = self.tes_group.cut_field_categories(field) labels = np.zeros(len(self._mask), dtype=np.uint32) for (category, catbool) in booldict.items(): labels[catbool] = category_names[category] for (category, catbool) in booldict.items(): if not all(labels[booldict[category]] == category_names[category]): raise ValueError("bools passed for %s conflict with some other" % category) self.cut(field, labels)
[docs] def cut_parameter(self, data, allowed, cut_id): """Apply a cut on some per-pulse parameter. Args: <data> The per-pulse parameter to cut on. It can be an attribute of self, or it can be computed from one or more arrays, but it must be an array of length self.nPulses <allowed> The cut to apply (see below). <cut_id> The bit number (range [0,31]) to identify this cut or (as a string) the name of the cut. <allowed> is a 2-element sequence (a,b), then the cut requires a < data < b. Either a or b may be None, indicating no cut. OR <allowed> is a sequence of 2-element sequences (a,b), then the cut cuts data that does not meet a <= data <=b for any of the two element sequences. """ if allowed is None: # no cut here! return if isinstance(cut_id, int): if cut_id < 0 or cut_id >= 32: raise ValueError("cut_id must be in the range [0,31]") elif isstr(cut_id): boolean_cut_fields = self.tes_group.boolean_cut_desc g = boolean_cut_fields["name"] == cut_id.encode() if not np.any(g): raise ValueError(cut_id + " is not found.") # determine if allowed is a sequence or a sequence of sequences if np.size(allowed[0]) == 2 or allowed[0] == 'invert': doInvert = False cut_vec = np.ones_like(data, dtype='bool') for element in allowed: if np.size(element) == 2: try: a, b = element if a is not None and b is not None: index = np.logical_and(data[:] >= a, data[:] <= b) elif a is not None: index = data[:] >= a elif b is not None: index = data[:] <= b cut_vec[index] = False except Exception: raise ValueError( '%s passed as a cut element, only two element lists or tuples are valid' % str(element)) elif element == 'invert': doInvert = True if doInvert: self.cut(cut_id, ~cut_vec) else: self.cut(cut_id, cut_vec) else: try: a, b = allowed if a and b: self.cut(cut_id, (data[:] <= a) | (data[:] >= b)) else: if a is not None: self.cut(cut_id, data[:] <= a) if b is not None: self.cut(cut_id, data[:] >= b) except ValueError: raise ValueError('%s was passed as a cut element, but only two-element sequences are valid.' % str(allowed))
[docs] def select_category(self, **kwargs): """Select pulses belongs to all of specified categories. Returns: A numpy array of booleans. """ category_field_bit_mask = np.uint32(0) category_field_target_bits = np.uint32(0) categorical_fields = self.tes_group.categorical_cut_desc category_list = self.tes_group.cut_category_list for name, category_label in kwargs.items(): categorical_g = (categorical_fields["name"] == name.encode()) if not np.any(categorical_g): raise ValueError(name + " categorical field is not found.") category_g = (category_list["field"] == name.encode()) &\ (category_list["category"] == category_label.encode()) if not np.any(category_g): raise ValueError(category_label + " category is not found.") _, bit_mask = categorical_fields[categorical_g][0] _, _, code = category_list[category_g][0] for i in range(32): bit_pos = np.uint32(i) if (bit_mask >> bit_pos) & np.uint32(1): break category_field_bit_mask |= bit_mask category_field_target_bits |= code << bit_pos return (self._mask[...] & category_field_bit_mask) == category_field_target_bits
[docs] def category_codes(self, name): """Returns the category codes of a single categorical cut field. Args: name (str): the name of a categorical cut field. Returns: numpy array of uint32 : category codes of a categorical cut field 'name'. Raises: KeyError when a name is not a registered categorical cut field. """ categorical_field = self.tes_group.categorical_cut_desc categorical_field_g = categorical_field["name"] == name.encode() if np.any(categorical_field_g): _, bit_mask = categorical_field[categorical_field_g][0] for i in range(32): bit_pos = np.uint32(i) if (bit_mask >> bit_pos) & np.uint32(1): break else: raise KeyError(name + " is not found.") return (self._mask[...] & bit_mask) >> bit_pos
[docs] def cut_mask(self, *fields): """Retrieves masks of multiple cut fields. They could be boolean or categorical. Args: fields (list(str)): cut field name or names. """ boolean_field = self.tes_group.boolean_cut_desc categorical_field = self.tes_group.categorical_cut_desc if fields: boolean_field_names = [str(name.decode()) for name, _ in boolean_field if name.decode() in fields] categorical_field_names = [str(name.decode()) for name, _ in categorical_field if name.decode() in fields] not_found = set(fields) - (set(boolean_field_names).union(set(categorical_field_names))) if not_found: raise ValueError(",".join(not_found) + " are not found.") else: boolean_field_names = [str(name.decode()) for name, _ in boolean_field if name] categorical_field_names = [str(name.decode()) for name, _, in categorical_field] mask_dtype = np.dtype([(name, bool) for name in boolean_field_names] + [(name, np.uint32) for name in categorical_field_names]) cut_mask = np.zeros(self._mask.shape[0], dtype=mask_dtype) for name in boolean_field_names: cut_mask[:][name] = self.good(name) for name in categorical_field_names: cut_mask[:][name] = self.category_codes(name) return cut_mask
[docs] def clear_cut(self, *args): """Clear one or more boolean fields. Args: *args: one or more args giving the names to clear. If no name is given, clear all boolean fields. """ bit_mask = self._boolean_fields_bit_mask(args) self._mask[:] &= ~bit_mask
def _boolean_fields_bit_mask(self, names): """Calculate the bit mask for any combination of boolean cut fields.""" boolean_fields = self.tes_group.boolean_cut_desc if names: all_field_names = set([name.decode() for name, mask in boolean_fields if name]) not_found_fields = set(names) - all_field_names if not_found_fields: raise ValueError(", ".join(not_found_fields) + " not found.") bit_masks = [mask for name, mask in boolean_fields if name.decode() in names] else: bit_masks = [mask for name, mask in boolean_fields if name] bit_mask = functools.reduce(operator.or_, bit_masks, np.uint32(0)) return bit_mask
[docs] def good(self, *args, **kwargs): """Select pulses which are good for all of specified boolean cut fields. If any categorical cut fields are given, only pulses in the combination of categories are considered. """ bit_mask = self._boolean_fields_bit_mask(args) g = ((self._mask[...] & bit_mask) == 0) if kwargs: return g & self.select_category(**kwargs) return g
[docs] def bad(self, *args, **kwargs): """Select pulses which are bad for at least one of specified boolean cut fields. If any categorical cut fields are given, only pulses in the combination of categories are considered. """ bit_mask = self._boolean_fields_bit_mask(args) g = (self._mask[...] & bit_mask != 0) if kwargs: return g & self.select_category(**kwargs) return g
def __repr__(self): return "Cuts(%d)" % len(self._mask) def __str__(self): return "Cuts(%d) with %d cut and %d uncut" % (len(self._mask), self.bad().sum(), self.good().sum())