Source code for mass.calibration.algorithms

"""
This file is intended to include algorithms that could be generally useful
for calibration. Mostly they are pulled out of the former
mass.calibration.young module.
"""

import itertools
import operator
import numpy as np

from collections.abc import Iterable

import matplotlib.pyplot as plt
import matplotlib.transforms as mtrans
from matplotlib.ticker import MaxNLocator

from ..common import isstr
from mass.calibration.energy_calibration import STANDARD_FEATURES
import mass.calibration
from .energy_calibration import EnergyCalibration


[docs] def line_names_and_energies(line_names): """Given a list of line_names, return (names, energies) in eV. Can also accept energies in eV directly and return (names, energies). """ if len(line_names) <= 0: return [], [] energies = [STANDARD_FEATURES.get(name_or_energy, name_or_energy) for name_or_energy in line_names] # names = [str(name_or_energy) for name_or_energy in line_names] return zip(*sorted(zip(line_names, energies), key=operator.itemgetter(1)))
[docs] def find_local_maxima(pulse_heights, gaussian_fwhm): """Smears each pulse by a gaussian of gaussian_fhwm and finds local maxima, returns a list of their locations in pulse_height units (sorted by number of pulses in peak) AND their peak values as: (peak_locations, peak_intensities) Args: pulse_heights (np.array(dtype=float)): a list of pulse heights (eg p_filt_value) gaussian_fwhm = fwhm of a gaussian that each pulse is smeared with, in same units as pulse heights """ # kernel density estimation (with a gaussian kernel) n = 128 * 1024 gaussian_fwhm = float(gaussian_fwhm) # The above ensures that lo & hi are floats, so that (lo-hi)/n is always a float in python2 sigma = gaussian_fwhm / (np.sqrt(np.log(2) * 2) * 2) tbw = 1.0 / sigma / (np.pi * 2) lo = np.min(pulse_heights) - 3 * gaussian_fwhm hi = np.max(pulse_heights) + 3 * gaussian_fwhm hist, bins = np.histogram(pulse_heights, np.linspace(lo, hi, n + 1)) tx = np.fft.rfftfreq(n, (lo - hi) / n) ty = np.exp(-tx**2 / 2 / tbw**2) x = (bins[1:] + bins[:-1]) / 2 y = np.fft.irfft(np.fft.rfft(hist) * ty) flag = (y[1:-1] > y[:-2]) & (y[1:-1] > y[2:]) lm = np.arange(1, n - 1)[flag] lm = lm[np.argsort(-y[lm])] return np.array(x[lm]), np.array(y[lm])
[docs] def find_opt_assignment(peak_positions, line_names, nextra=2, nincrement=3, nextramax=8, maxacc=0.015): """Tries to find an assignment of peaks to line names that is reasonably self consistent and smooth Args: peak_positions (np.array(dtype=float)): a list of peak locations in arb units, e.g. p_filt_value units line_names (list[str or float)]): a list of calibration lines either as number (which is energies in eV), or name to be looked up in STANDARD_FEATURES nextra (int): the algorithm starts with the first len(line_names) + nextra peak_positions nincrement (int): each the algorithm fails to find a satisfactory peak assignment, it uses nincrement more lines nextramax (int): the algorithm stops incrementint nextra past this value, instead failing with a ValueError saying "no peak assignment succeeded" maxacc (float): an empirical number that determines if an assignment is good enough. The default number works reasonably well for tupac data """ name_e, e_e = line_names_and_energies(line_names) n_sel_pp = len(line_names) + nextra # number of peak_positions to use to line up to line_names nmax = len(line_names) + nextramax while True: sel_positions = np.asarray(peak_positions[:n_sel_pp], dtype="float") energies = np.asarray(e_e, dtype="float") assign = np.array(list(itertools.combinations(sel_positions, len(line_names)))) assign.sort(axis=1) fracs = np.divide(energies[1:-1] - energies[:-2], energies[2:] - energies[:-2]) est_pos = assign[:, :-2] * (1 - fracs) + assign[:, 2:] * fracs acc_est = np.linalg.norm(np.divide(est_pos - assign[:, 1:-1], assign[:, 2:] - assign[:, :-2]), axis=1) opt_assign_i = np.argmin(acc_est) acc = acc_est[opt_assign_i] opt_assign = assign[opt_assign_i] if acc > maxacc * np.sqrt(len(energies)): n_sel_pp += nincrement if n_sel_pp > nmax: msg = f"no peak assignment succeeded: acc {acc:g}, maxacc*sqrt(len(energies)) "\ f"{maxacc * np.sqrt(len(energies)):g}" raise ValueError(msg) else: continue else: return name_e, energies, list(opt_assign)
[docs] def build_fit_ranges_ph(line_names, excluded_line_names, approx_ecal, fit_width_ev): """Call build_fit_ranges() to get (lo,hi) for fitranges in energy units, then convert to ph using approx_ecal""" e_e, fit_lo_hi_energy, slopes_de_dph = build_fit_ranges( line_names, excluded_line_names, approx_ecal, fit_width_ev) fit_lo_hi_ph = [] for lo, hi in fit_lo_hi_energy: lo_ph = approx_ecal.energy2ph(lo) hi_ph = approx_ecal.energy2ph(hi) fit_lo_hi_ph.append((lo_ph, hi_ph)) return e_e, fit_lo_hi_ph, slopes_de_dph
[docs] def build_fit_ranges(line_names, excluded_line_names, approx_ecal, fit_width_ev): """Returns a list of (lo,hi) where lo and hi have units of energy of ranges to fit in for each energy in line_names. Args: line_names (list[str or float]): list or line names or energies excluded_line_names (list[str or float]): list of line_names or energies to avoid when making fit ranges approx_ecal: an EnergyCalibration object containing an approximate calibration fit_width_ev (float): full size in eV of fit ranges """ _names, e_e = line_names_and_energies(line_names) _excl_names, excl_e_e = line_names_and_energies(excluded_line_names) half_width_ev = fit_width_ev / 2.0 all_e = np.sort(np.hstack((e_e, excl_e_e))) assert (len(all_e) == len(np.unique(all_e))) fit_lo_hi_energy = [] slopes_de_dph = [] for e in e_e: slope_de_dph = approx_ecal.energy2dedph(e) if any(all_e < e): nearest_below = all_e[all_e < e][-1] else: nearest_below = -np.inf if any(all_e > e): nearest_above = all_e[all_e > e][0] else: nearest_above = np.inf lo = max(e - half_width_ev, (e + nearest_below) / 2.0) hi = min(e + half_width_ev, (e + nearest_above) / 2.0) fit_lo_hi_energy.append((lo, hi)) slopes_de_dph.append(slope_de_dph) return e_e, fit_lo_hi_energy, slopes_de_dph
[docs] class FailedFit: def __init__(self, hist, bins): self.hist = hist self.bins = bins
[docs] class FailedToGetModelException(Exception): pass
[docs] def get_model(lineNameOrEnergy, has_linear_background=True, has_tails=False): if isinstance(lineNameOrEnergy, mass.GenericLineModel): line = lineNameOrEnergy.spect elif isinstance(lineNameOrEnergy, mass.SpectralLine): line = lineNameOrEnergy elif isinstance(lineNameOrEnergy, str): if lineNameOrEnergy in mass.spectra: line = mass.spectra[lineNameOrEnergy] elif lineNameOrEnergy in mass.STANDARD_FEATURES: energy = mass.STANDARD_FEATURES[lineNameOrEnergy] line = mass.SpectralLine.quick_monochromatic_line(lineNameOrEnergy, energy, 0.001, 0) else: raise FailedToGetModelException( f"failed to get line from lineNameOrEnergy={lineNameOrEnergy}") else: try: energy = float(lineNameOrEnergy) except Exception: raise FailedToGetModelException( f"lineNameOrEnergy = {lineNameOrEnergy} is not convertable to float or " "a str in mass.spectra or mass.STANDARD_FEATURES") line = mass.SpectralLine.quick_monochromatic_line( f"{lineNameOrEnergy}eV", float(lineNameOrEnergy), 0.001, 0) return line.model(has_linear_background=has_linear_background, has_tails=has_tails)
# support both names as they were both used historically getmodel = get_model
[docs] def multifit(ph, line_names, fit_lo_hi, binsize_ev, slopes_de_dph, hide_deprecation=False): """ Args: ph (np.array(dtype=float)): list of pulse heights line_names: names of calibration lines fit_lo_hi (list[list[float]]): a list of (lo,hi) with units of ph, used as edges of histograms for fitting binsize_ev (list[float]): list of binsizes in eV for calibration lines slopes_de_dph (list[float]): - list of slopes de_dph (e in eV) hide_deprecation: whether to suppress deprecation warnings """ name_e, e_e = line_names_and_energies(line_names) results = [] peak_ph = [] eres = [] for i, name in enumerate(name_e): lo, hi = fit_lo_hi[i] dP_dE = 1 / slopes_de_dph[i] binsize_ph = binsize_ev[i] * dP_dE result = singlefit(ph, name, lo, hi, binsize_ph, dP_dE) results.append(result) peak_ph.append(result.best_values["peak_ph"]) eres.append(result.best_values["fwhm"]) return {"results": results, "peak_ph": peak_ph, "eres": eres, "line_names": name_e, "energies": e_e}
[docs] def singlefit(ph, name, lo, hi, binsize_ph, approx_dP_dE): nbins = (hi - lo) / binsize_ph if nbins > 5000: raise Exception("too damn many bins, dont like running out of memory") counts, bin_edges = np.histogram(ph, np.arange(lo, hi, binsize_ph)) e = bin_edges[:-1] + 0.5 * (bin_edges[1] - bin_edges[0]) model = getmodel(name) guess_params = model.guess(counts, bin_centers=e, dph_de=approx_dP_dE) if "Gaussian" not in model.name: guess_params["dph_de"].set(approx_dP_dE, vary=False) result = model.fit(counts, guess_params, bin_centers=e, minimum_bins_per_fwhm=1.5) result.energies = e return result
[docs] class EnergyCalibrationAutocal: def __init__(self, calibration, ph=None, line_names=None): """ Args: line_names (list[str]): names of calibration lines. Names doesn't need to be ordered in their energies. """ if not isinstance(calibration, EnergyCalibration): raise ValueError("EnergyCalibrationAutocal requires an EnergyCalibration calibration.") self.calibration = calibration # store a reference to EnergyCalibrationAutocal in self.calibration # because we want to be able to run diagnose, and examine fits later, even though # normally only self.calibration is stored later in mass # this isn't the best API, but I was able to add it without changing any APIs so nothing breaks self.calibration.autocal = self self.calibration.diagnose = self.diagnose self.results = None self.energy_resolutions = None self.line_names = line_names self.energies_opt = None self.ph_opt = None self.fit_range_ev = None self.fit_lo_hi = None self.slopes_de_dph = None self.binsize_ev = None self.ph = ph self._hide_deprecation = False
[docs] def guess_fit_params(self, smoothing_res_ph=20, fit_range_ev=200.0, binsize_ev=1.0, nextra=2, nincrement=3, nextramax=8, maxacc=0.015): """Calculate reasonable parameters for complex models or Gaussian models. Args: binsize_ev (float or list[float]): bin sizes of the histograms of given calibration lines. If a single number is given, this same number will be used for all calibration lines. """ lm, _ = find_local_maxima(self.ph, smoothing_res_ph) # Note that find_opt_assignment does not require line_names be sorted by energies. self.line_names, self.energies_opt, self.ph_opt = find_opt_assignment( lm, self.line_names, nextra, nincrement, nextramax, maxacc=maxacc) if isinstance(binsize_ev, Iterable): self.binsize_ev = binsize_ev else: self.binsize_ev = [binsize_ev] * len(self.energies_opt) approx_cal = mass.energy_calibration.EnergyCalibration(1, approximate=False) for ph, e in zip(self.ph_opt, self.energies_opt): approx_cal.add_cal_point(ph, e) self.fit_range_ev = fit_range_ev # Default fit range width is 100 eV for each line. # But you can customize these numbers after self.guess_fit_params is finished. # New self.fit_lo_hi values will be in self.fit_lines in the next step. _, self.fit_lo_hi, self.slopes_de_dph = build_fit_ranges_ph(self.energies_opt, [], approx_cal, self.fit_range_ev)
[docs] def fit_lines(self): """All calibration emission lines are fitted with appropriate model. self.line_names will be sored by energy after this method is finished. """ mresult = multifit(self.ph, self.line_names, self.fit_lo_hi, self.binsize_ev, self.slopes_de_dph, hide_deprecation=self._hide_deprecation) for ph, e, n in zip(mresult["peak_ph"], mresult["energies"], mresult['line_names']): self.calibration.add_cal_point(ph, e, name=str(n)) self.results = mresult["results"] self.energy_resolutions = mresult["eres"] self.line_names = mresult["line_names"] return self.calibration
[docs] def autocal(self, smoothing_res_ph=20, fit_range_ev=200.0, binsize_ev=1.0, nextra=2, nincrement=3, nextramax=8, maxacc=0.015): self.guess_fit_params(smoothing_res_ph, fit_range_ev, binsize_ev, nextra, nincrement, nextramax, maxacc) self.fit_lines() return self.calibration
@property def anyfailed(self): return any([isinstance(cf, FailedFit) for cf in self.results])
[docs] def diagnose(self): fig = plt.figure(figsize=(16, 9)) n = int(np.ceil(np.sqrt(len(self.line_names)))) w, h, lm, bm, hs, vs = 0.6, 0.9, 0.05, 0.08, 0.1, 0.1 for i, (el, result, eres) in enumerate(zip(self.line_names, self.results, self.energy_resolutions)): ax = fig.add_axes([w * (i % n) / n + lm, h * (i // n) / n + bm, (w - hs) / n, (h - vs) / n]) ax.xaxis.set_major_locator(MaxNLocator(4, integer=True)) binsize = result.energies[1] - result.energies[0] bin_edges = np.linspace(result.energies[0] - binsize / 2.0, result.energies[-1] + binsize / 2.0, len(result.energies) + 1) ax.fill(np.repeat(bin_edges, 2), np.hstack([[0], np.repeat(result.data, 2), [0]]), lw=1, fc="#4c4ce6", ec="#1a1aff", alpha=0.8) x = np.linspace(result.energies[0], result.energies[-1], 201) if "Gaussian" in result.model.name: ax.text(0.05, 0.97, str(el) + ' (eV)\n' + f"Resolution: {eres:.1f} (eV)", transform=ax.transAxes, ha='left', va='top') else: ax.text(0.05, 0.97, el.replace('Alpha', r'$_{\alpha}$').replace('Beta', r'$_{\beta}$') + '\n' + f"Resolution: {eres:.1f} (eV)", transform=ax.transAxes, ha='left', va='top') y = result.model.eval(result.params, bin_centers=x) ax.plot(x, y, '-', color=(0.9, 0.1, 0.1), lw=2) ax.set_xlim(np.min(x), np.max(x)) ax.set_ylim(0, np.max(result.data) * 1.3) ax = fig.add_axes([lm + w, bm, (1.0 - lm - w) - 0.06, h - 0.05]) for el, pht, energy in zip(self.line_names, self.calibration.cal_point_phs, self.calibration.cal_point_energies): peak_name = 'Unknown' if isstr(el): peak_name = el.replace('Alpha', r'$_{\alpha}$').replace('Beta', r'$_{\beta}$') elif isinstance(el, (int, float)): peak_name = f"{energy:.1f} (eV)" ax.text(pht, energy, peak_name, ha='left', va='top', transform=ax.transData + mtrans.ScaledTranslation(5.0 / 72, -12.0 / 72, fig.dpi_scale_trans)) ax.scatter(self.calibration.cal_point_phs, self.calibration.cal_point_energies, s=36, c="#3333cc") lb = np.amin(self.calibration.cal_point_phs) ub = np.amax(self.calibration.cal_point_phs) width = ub - lb x = np.linspace(lb - width / 10, ub + width / 10, 101) y = self.calibration(x) ax.plot(x, y, '--', color='orange', lw=2, zorder=-2) ax.yaxis.set_tick_params(labelleft=False, labelright=True) ax.yaxis.set_label_position('right') ax.set_xlabel('Pulse height') ax.set_ylabel('Energy (eV)') ax.set_xlim(lb - width / 10, ub + width / 10)