"""
Functions ljh_copy_traces and ljh_append_traces and class LJHModify, all of
which can modify existing LJH files.
"""
import os
import struct
import time
import numpy as np
from packaging.version import Version
from mass.core.files import LJHFile, make_ljh_header
from mass.core.utilities import InlineUpdater
[docs]
def LJHModify(input_filename, output_filename, callback, overwrite=False):
"""Copy an LJH file `input_filename` to a new LJH file `output_filename`
with the identical header, but with the raw data records transformed in-place
by the function (or other callable object) `callback`.
The function `callback` should be of the form `modified=callback(record)`, where
`record` is an array of raw data records of shape (Nsamples, ).
The callback might take the following form, if you need it to loop over records:
def mycallback(record):
return 1000 + (record/2) # or whatever operations you need.
NOT IMPLEMENTED: this version of LJHModify does *not* allow the caller to
modify the per-pulse row counter or posix time. Please file an issue if this
becomes a problem.
"""
# Check for file problems, then open the input and output LJH files.
if os.path.exists(output_filename):
if os.path.samefile(input_filename, output_filename):
raise ValueError(f"Input '{input_filename}' and output '{output_filename}' "
"are the same file, which is not allowed.")
if overwrite:
print(f"WARNING: overwriting output file '{output_filename}'")
else:
raise ValueError(f"Output file '{output_filename}' exists. "
"Call with overwrite=True to proceed anyway.")
infile = LJHFile.open(input_filename)
with open(output_filename, "wb") as outfile:
# Copy the header as a single string.
outfile.write("".join(infile.header_lines))
# For now, we are not modifying the times and row #s
# If we wanted to, that would require a fancier callback, I guess.
# Write the modified segdata (and the unmodified row count and timestamps).
updater = InlineUpdater("LJHModify")
if Version(infile.version_str.decode()) >= Version("2.2.0"):
for i in range(infile.nPulses):
data = callback(infile.alldata[i])
x = np.zeros((1,), dtype=infile.dtype)
x["subframecount"] = infile.subframecount
x["posix_usec"] = infile.datatimes_float * 1e6
x["data"] = data
x.tofile(outfile)
if i % 100 == 0:
updater.update(float(i + 1) / infile.nPulses)
else:
for i in range(infile.nPulses):
data = callback(infile.alldata[i])
x = np.zeros((1, 3 + infile.nSamples), dtype=np.uint16)
x[:, 3:] = data
x.tofile(outfile)
if i % 100 == 0:
updater.update(float(i + 1) / infile.nPulses)
updater.update(1.0)
# A callback that does nothing
[docs]
def dummy_callback(data):
return data
# Here's how you supply a simple callback without any free parameters.
# This function will invert every data value. For an unsigned int, it might
# not be clear what "invert" means. I mean that we replace every 0 with 0xffff,
# ever 1 with 0xfffe, and so on.
[docs]
def callback_invert(record):
assert record.dtype == np.uint16
return 0xffff - record
# Here's how to supply a callback with a free parameter (some kind of "state").
# This creates a "function object", which is callable but also stores internally
# the number that you wanted to add to every raw data value.
[docs]
class callback_shift:
def __init__(self, shiftby):
self.shift = shiftby
def __call__(self, segdata):
return segdata + self.shift
[docs]
def helper_write_pulse(dest, src, i):
subframecount, timestamp_usec, trace = src.read_trace_with_timing(i)
prefix = struct.pack('<Q', int(subframecount))
dest.write(prefix)
prefix = struct.pack('<Q', int(timestamp_usec))
dest.write(prefix)
trace.tofile(dest, sep="")
[docs]
def ljh_copy_traces(src_name, dest_name, pulses, overwrite=False):
"""
Copy traces from one ljh file to another. The destination file will be of
LJH version 2.2.0.
Can be used to grab specific traces from some other ljh file, and put them into a new file
Args:
src_name: the name of the source file
dest_name: the name of the destination file
pulses: indices of the pulses to copy
overwrite: If the destination file exists and overwrite is not True,
then the copy fails (default False).
"""
if os.path.exists(dest_name) and not overwrite:
raise OSError(f"The ljhfile '{dest_name}' exists and overwrite was not set to True")
src = LJHFile.open(src_name)
header_dict = src.__dict__.copy()
header_dict['asctime'] = time.asctime(time.gmtime())
header_dict['version_str'] = '2.2.0'
ljh_header = make_ljh_header(header_dict)
with open(dest_name, "wb") as dest_fp:
dest_fp.write(ljh_header)
for i in pulses:
helper_write_pulse(dest_fp, src, i)
[docs]
def ljh_append_traces(src_name, dest_name, pulses=None):
"""Append traces from one LJH file onto another. The destination file is
assumed to be version 2.2.0.
Can be used to grab specific traces from some other ljh file, and append them onto an existing ljh file.
Args:
src_name: the name of the source file
dest_name: the name of the destination file
pulses: indices of the pulses to copy (default: None, meaning copy all)
"""
src = LJHFile.open(src_name)
if pulses is None:
pulses = range(src.nPulses)
with open(dest_name, "ab") as dest_fp:
for i in pulses:
helper_write_pulse(dest_fp, src, i)
[docs]
def ljh_truncate(input_filename, output_filename, n_pulses=None, timestamp=None, segmentsize=None):
"""Truncate an LJH file.
Writes a new copy of an LJH file, with
with the identical header, but with a smaller number of raw data pulses.
Arguments:
input_filename -- name of file to truncate
output_filename -- filename for truncated file
n_pulses -- truncate to include only this many pulses (default None)
timestamp -- truncate to include only pulses with timestamp earlier
than this number (default None)
segmentsize -- number of bytes per segment; this is primarily here to
facilitate testing (defaults to same value as in LJHFile)
Exactly one of n_pulses and timestamp must be specified.
"""
if (n_pulses is None and timestamp is None) or \
(n_pulses is not None and timestamp is not None):
msg = "Must specify exactly one of n_pulses, timestamp."
msg = msg + f" Values were {str(n_pulses)}, {str(timestamp)}"
raise Exception(msg)
# Check for file problems, then open the input and output LJH files.
if os.path.exists(output_filename):
if os.path.samefile(input_filename, output_filename):
msg = f"Input '{input_filename}' and output '{output_filename}' are the same file, which is not allowed"
raise ValueError(msg)
infile = LJHFile.open(input_filename)
if segmentsize is not None:
infile.set_segment_size(segmentsize)
if Version(infile.version_str.decode()) < Version("2.2.0"):
raise Exception(f"Don't know how to truncate this LJH version [{infile.version_str}]")
with open(output_filename, "wb") as outfile:
# write the header as a single string.
for (k, v) in infile.header_dict.items():
outfile.write(k + b": " + v + b"\r\n")
outfile.write(b"#End of Header\r\n")
# Write pulses. Stop reading segments from the original file as soon as possible.
if n_pulses is None:
n_pulses = infile.nPulses
for i in range(n_pulses):
if (timestamp is not None and infile.datatimes_float[i] > timestamp):
break
prefix = struct.pack('<Q', np.uint64(infile.subframecount[i]))
outfile.write(prefix)
prefix = struct.pack('<Q', np.uint64(infile.datatimes_raw[i]))
outfile.write(prefix)
trace = infile.alldata[i, :]
trace.tofile(outfile, sep="")