"""Helper functions for the analysis of TR-WAXS data."""
from copy import deepcopy
from dateutil.parser import parse
import numpy as np
from scipy.optimize import curve_fit
from scipy.interpolate import interp1d
from txs.utils import t2str, str2t
[docs]
def get_time_delta(data, delay, return_abs=False):
"""Measurement time of difference patterns for given delay.
Parameters
----------
data : dict
Assumes a dictionary output from `txs.datared.datared`.
delay : str or int
Either a string corresponding to the time delay to be used
or an integer giving the index of this time delay in the
`data['t']` value.
return_abs : bool, optional
If True, the absolute patterns and errors are also returned
for the corresponding measurement time deltas.
"""
if isinstance(delay, str):
delay = data['t'].index(delay)
if "elapsed_time" not in data['log']:
datetime = np.array(
[
parse(' '.join(val)) for val in
zip(data['log']['date'], data['log']['time'])
]
)
tdelta = np.array(
[np.round((val - datetime[0]).seconds / 60, 1) for val in datetime]
)
to_keep = data['log']['delay'] == data['t'][delay]
else:
tdelta = data['log']['elapsed_time'] / 60
to_keep = np.array(t2str(data['log']['delay'], 3)) == data['t'][delay]
outliers = data['outliers'][delay]
tdelta = tdelta[to_keep]
if len(outliers) > 0:
step = int(np.ceil(len(tdelta) / len(outliers)))
tdelta = tdelta[::step][~outliers]
if not return_abs:
return tdelta
else:
out = (
tdelta,
data['i'][:, to_keep][:, ::step][:, ~outliers],
data['e'][:, to_keep][:, ::step][:, ~outliers]
)
return out
[docs]
def common_delay_rescale(data, delay="1ms", ref=0, loss='linear'):
"""Rescale the data sets to minimize the differences on common delay.
Parameters
----------
data : list of dict
Assumes a list of dictionary output from `txs.datared.datared` each
corresponding to a measurement run with multiple time delays.
delay : str, optional
Delay label to be used. It should present in each data set in `data`
argument.
(default, '1ms')
ref : int, optional
Index of a reference data set to be used for rescaling.
(default, 0)
loss : str, {'linear', 'cauchy', 'soft_l1', 'huber', 'arctan'}
Loss function to be used for the least-square problem.
(default, 'linear', standard least-square)
"""
data = deepcopy(data)
avgs = []
for s_idx, sample in enumerate(data):
try:
delay_idx = list(sample['t']).index(delay)
avgs.append(sample['diff_av'][:, delay_idx])
except ValueError as err:
print(f"Error for data at index {s_idx}: {err}")
q_ref = data[ref]['q']
for sidx, sample in enumerate(data):
interp = interp1d(
sample['q'],
avgs[sidx],
bounds_error=False,
fill_value="extrapolate"
)
avgs[sidx] = interp(q_ref)
avgs = np.array(avgs)
res = []
for _, val in enumerate(avgs):
popt, _ = curve_fit(
lambda x, p: p * val,
val,
avgs[ref],
p0=1,
bounds=(0., np.inf),
ftol=1e-15,
maxfev=10000,
loss=loss,
)
res.append(popt[0])
for s_idx, sample in enumerate(data):
data[s_idx]['diff_av'] = sample['diff_av'] * res[s_idx]
data[s_idx]['diff_err'] = sample['diff_err'] * res[s_idx]
data[s_idx]['i'] = sample['i'] * res[s_idx]
data[s_idx]['e'] = sample['e'] * res[s_idx]
for t_idx, delay in enumerate(sample['t']):
data[s_idx]['diffs'][t_idx] = sample['diffs'][t_idx] * res[s_idx]
print(f"Using following factors to rescale provided data: {res}")
return data
[docs]
def qrange_rescale(data, qrange=(1.4, 1.5), ref=0, loss='linear'):
"""Rescale the data sets to minimize the differences on a given q-range.
Parameters
----------
data : list of dict
Assumes a list of dictionary output from `txs.datared.datared` each
corresponding to a measurement run with multiple time delays.
qrange : tuple, optional
Range of q-values to be used.
(default, (1.4, 1.5))
ref : int, optional
Index of a reference data set to be used for rescaling.
(default, 0)
loss : str, {'linear', 'cauchy', 'soft_l1', 'huber', 'arctan'}
Loss function to be used for the least-square problem.
(default, 'linear', standard least-square)
"""
data = deepcopy(data)
avgs = []
for s_idx, sample in enumerate(data):
try:
qmask = (sample['q'] > qrange[0]) & (sample['q'] < qrange[1])
avgs.append(sample['diff_av'][qmask].mean(1))
except ValueError as err:
print(f"Error for data at index {s_idx}: {err}")
avgs = np.array(avgs)
res = []
for _, val in enumerate(avgs):
popt, _ = curve_fit(
lambda x, p: p * val,
val,
avgs[ref],
p0=1,
bounds=(0., np.inf),
ftol=1e-15,
maxfev=10000,
loss=loss,
)
res.append(popt[0])
for s_idx, sample in enumerate(data):
data[s_idx]['diff_av'] = sample['diff_av'] * res[s_idx]
data[s_idx]['diff_err'] = sample['diff_err'] * res[s_idx]
data[s_idx]['i'] = sample['i'] * res[s_idx]
data[s_idx]['e'] = sample['e'] * res[s_idx]
for t_idx, delay in enumerate(sample['t']):
data[s_idx]['diffs'][t_idx] = sample['diffs'][t_idx] * res[s_idx]
print(f"Using following factors to rescale provided data: {res}")
return data
[docs]
def manual_merging_selection(data, labels=None, invert=False):
"""Group data with common time delays and ask user which are to be kept.
The data are copied and a new list is returned where the unwanted time
delays for each sample has been removed.
Parameters
----------
data : list of dict
Assumes a list of dictionary output from `txs.datared.datared` each
corresponding to a measurement run with multiple time delays.
labels : list of str, optional
The names corresponding to each sample in `data` for easier
identification.
(default, None, numbers are used instead of names)
invert : bool
If True, selected data are discarded instead of being kept.
"""
out = deepcopy(data)
delays = {}
for s_idx, sample in enumerate(data):
for _, delay in enumerate(sample['t']):
if delay not in delays:
delays[delay] = []
delays[delay].append(s_idx)
print(
f"Select which samples are to be {'kept' if not invert else 'discarded'} for each time delay:\n"
"Example use: \n"
"'-20us: 1+3, 3ms: 1'\n"
"If a time delay is not given, all samples are kept, unless "
"`invert` argument is set to True.\n\n"
"Samples are:\n"
)
for key, val in delays.items():
if labels is not None:
val = [f"{label}: {labels[label]}" for label in val]
print(f"{key} -> {val}")
sel = input("Type your selection: ")
if sel == '':
return out
sel = {
val.split(":")[0].strip(): val.split(":")[1].split("+")
for val in sel.split(",")
}
for key, val in sel.items():
val = np.array([int(label) for label in val])
samples_at_t = np.array(delays[key])
mask = np.isin(samples_at_t, val)
mask = mask if invert else ~mask
for s_idx in samples_at_t[mask]:
t_idx = list(out[s_idx]['t']).index(key)
out[s_idx]['diffs'] = [
val for idx, val in enumerate(out[s_idx]['diffs'])
if idx != t_idx
]
out[s_idx]['diff_av'] = np.delete(out[s_idx]['diff_av'], t_idx, 1)
out[s_idx]['diff_err'] = np.delete(out[s_idx]['diff_err'], t_idx, 1)
out[s_idx]['red_chi2'] = [
val for idx, val in enumerate(out[s_idx]['red_chi2'])
if idx != t_idx
]
out[s_idx]['pts_perc'] = [
val for idx, val in enumerate(out[s_idx]['pts_perc'])
if idx != t_idx
]
out[s_idx]['diffs_outliers'] = [
val for idx, val in enumerate(out[s_idx]['diffs_outliers'])
if idx != t_idx
]
out[s_idx]['outliers'] = [
val for idx, val in enumerate(out[s_idx]['outliers'])
if idx != t_idx
]
out[s_idx]['diffs_unfilt'] = [
val for idx, val in enumerate(out[s_idx]['diffs_unfilt'])
if idx != t_idx
]
out[s_idx]['diff_av_unfilt'] = np.delete(
out[s_idx]['diff_av_unfilt'], t_idx, 1
)
out[s_idx]['diff_err_unfilt'] = np.delete(
out[s_idx]['diff_err_unfilt'], t_idx, 1
)
out[s_idx]['red_chi2_unfilt'] = [
val for idx, val in enumerate(out[s_idx]['red_chi2_unfilt'])
if idx != t_idx
]
out[s_idx]['pts_perc_unfilt'] = [
val for idx, val in enumerate(out[s_idx]['pts_perc_unfilt'])
if idx != t_idx
]
out[s_idx]['filt_res'] = [
val for idx, val in enumerate(out[s_idx]['filt_res'])
if idx != t_idx
]
out[s_idx]['signoise_av'] = [
val for idx, val in enumerate(out[s_idx]['signoise_av'])
if idx != t_idx
]
out[s_idx]['t'] = [
val for idx, val in enumerate(out[s_idx]['t'])
if idx != t_idx
]
return out
[docs]
def merge_same_delays(
data,
max_chi_square=10,
cutoff=None,
verbose=True,
):
"""Average data corresponding to the same time delay.
Parameters
----------
data : list of dict
Assumes a list of dictionary output from `txs.datared.datared` each
corresponding to a measurement run with multiple time delays.
max_chi_square : float, optional
Maximum value for the chi-square computed between the average of the
data and the individual data sets. The chi-square is averaged over
matching time delays. Each data set that has a chi-square
value bigger that provided argument is discarded.
(default, 10)
cutoff : float or None, optional
Time in minute where the data should be cut. The average is computed
up to this point for all measurement in `data`.
verbose : bool, optional
Whether or not report on the number of shots kept for each time delay.
(optional, True)
Returns
-------
out_av : 2D array
The averaged difference patterns for each time delay (columns).
out_err : 2D array
The standard deviation for each time delay (columns).
delays : list of str
The time delays corresponding to each column in `out_av` and `out_err`.
"""
out = deepcopy(data[0])
diffs = {}
diff_av = {}
diff_err = {}
chi_square = {}
outliers = {}
# Get the minimum q vector
qlist = [val['q'] for val in data]
q_min_idx = np.argmin([len(val) for val in qlist])
q_out = qlist[q_min_idx]
# First, regroup all same time delays together in diffs
for _, val in enumerate(data):
for idx, delay in enumerate(val['t']):
diffs_t = interp1d(
val['q'],
val['diffs'][idx].T,
bounds_error=False,
fill_value='extrapolate'
)
diffs_t = diffs_t(q_out).T
if cutoff is not None:
tdelta = get_time_delta(val, delay)
diffs_t = diffs_t[:, tdelta <= cutoff]
if delay not in diffs:
diffs[delay] = diffs_t
else:
diffs[delay] = np.column_stack((diffs[delay], diffs_t))
# Second, perform averages and compute errors
for key, val in diffs.items():
err = np.nanstd(val, 1)
avg = np.median(val, 1)
diff_av[key] = avg
diff_err[key] = err / np.sqrt(len(avg))
chi_square[key] = np.sum(
(val - avg[:, None]) ** 2 / err[:, None] ** 2,
0
) / avg.size
outliers = {
key: np.zeros(val.shape[1]).astype(bool) for key, val in diffs.items()
}
out['diff_av_unfilt'] = np.column_stack([val for val in diff_av.values()])
out['diff_err_unfilt'] = np.column_stack([val for val in diff_err.values()])
out['red_chi2'] = [val for val in chi_square.values()]
# Third, filter diffs based on chi-square
if max_chi_square is not None:
for key, val in diffs.items():
mask = chi_square[key] <= max_chi_square
outliers[key] = ~mask
filt_diffs = val[:, mask]
err = np.nanstd(filt_diffs, 1)
avg = np.median(filt_diffs, 1)
diff_av[key] = avg
diff_err[key] = err / np.sqrt(len(avg))
if verbose:
print(
f"delay: {key}, keeping {np.sum(mask)} "
f"out of {val.shape[1]}"
)
out['q'] = q_out
out['t'] = np.array(list(diff_av.keys()))
out['diffs'] = [val[:, ~outliers[key]] for key, val in diffs.items()]
out['diffs_unfilt'] = [val for val in diffs.values()]
out['diff_av'] = np.column_stack([val for val in diff_av.values()])
out['diff_err'] = np.column_stack([val for val in diff_err.values()])
out['outliers'] = [val for val in outliers.values()]
return out
[docs]
def sort_delays(data, ref_delays=None, include_ref=True):
"""Sort the data according to the time delays
Parameters
----------
data : `txs.datared.datared`
A `txs.datared.datared` instance.
ref_delays : list of str
The delays corresponding to the reference dark measurements.
include_ref : bool
If False, the reference delays given in `ref_delays` are removed
from the data.
"""
delays = np.array(data['t'])
if not include_ref:
if ref_delays is None:
raise ValueError(
"'ref_delay' cannot be None if 'include_ref' is False."
)
delays = []
diff_av = []
for idx, val in enumerate(data['t']):
if val not in ref_delays:
delays.append(val)
diff_av.append(data['diff_av'][:, idx])
delays = np.array(delays)
data['diff_av'] = np.column_stack(diff_av)
# to avoid bad dorting due to 'off' in the entry
has_off = False
if 'off' in delays:
has_off = True
off_index = list(delays).index('off')
delays = np.delete(delays, off_index)
data['diff_av'] = np.delete(data['diff_av'], off_index, 1)
data['diff_err'] = np.delete(data['diff_err'], off_index, 1)
off_diffs = data['diffs'].pop(off_index)
indices = np.argsort([str2t(val) for val in delays])
delays = delays[indices]
diff_av = data['diff_av'][:, indices]
diff_err = data['diff_err'][:, indices]
diffs = [data['diffs'][idx] for idx in indices]
if has_off:
delays = ['off'] + list(delays)
diff_av = np.column_stack((data['diff_av'][:, off_index], diff_av))
diff_err = np.column_stack((data['diff_err'][:, off_index], diff_err))
diffs = [off_diffs] + diffs
data['t'] = list(delays)
data['diff_av'] = diff_av
data['diff_err'] = diff_err
data['diffs'] = diffs
return data