Source code for utils

# -*- coding: utf-8 -*-
"""Utilities functions for trx."""

__author__ = "Matteo Levantino"
__contact__ = "matteo.levantino@esrf.fr"
__licence__ = "MIT"
__copyright__ = "ESRF - The European Synchrotron, Grenoble, France"
__date__ = "01/09/2021"


import os
import re
import glob
import time
import numpy as np
import h5py
import fabio

from tqdm import tqdm
from shutil import copy
from collections import namedtuple

from pyFAI.azimuthalIntegrator import AzimuthalIntegrator
from pyFAI.detectors import RayonixMx170
from pyFAI.detectors import Maxipix2x2
from pyFAI.detectors import Maxipix5x1

try:
    from pyFAI.detectors import Jungfrau1M
except ImportError:
    from pyFAI.detectors import Eiger
    class Jungfrau1M(Eiger):
        MAX_SHAPE = (1064, 1032)
        aliases = ["Jungfrau 1M"]
    # from pyFAI.detectors import Eiger1M as Jungfrau1M
    # print("WARNING:Jungfrau1M not available among pyFAI.detectors")

from txs.common import en_to_wl


RayonixMx170.aliases = ['Rayonix MX170-HS', 'Rayonix MX170', 'RayonixMX170HS',
                        'RayonixMX170']
Maxipix2x2.aliases = ['Maxipix 2x2', 'Maxipix2x2']
Maxipix5x1.aliases = ['Maxipix 5x1', 'Maxipix5x1']


DETECTORS = {
    'rayonix': RayonixMx170(),
    'maxipix5x1': Maxipix5x1(),
    'maxipix2x1': Maxipix2x2(),
    'jungfrau1m': Jungfrau1M()
}



si = {-15: {'multiplier': 1e15, 'prefix': 'fs'},
      -14: {'multiplier': 1e15, 'prefix': 'fs'},
      -13: {'multiplier': 1e15, 'prefix': 'fs'},
      -12: {'multiplier': 1e12, 'prefix': 'ps'},
      -11: {'multiplier': 1e12, 'prefix': 'ps'},
      -10: {'multiplier': 1e12, 'prefix': 'ps'},
      -9: {'multiplier': 1e9, 'prefix': 'ns'},
      -8: {'multiplier': 1e9, 'prefix': 'ns'},
      -7: {'multiplier': 1e9, 'prefix': 'ns'},
      -6: {'multiplier': 1e6, 'prefix': 'us'},
      -5: {'multiplier': 1e6, 'prefix': 'us'},
      -4: {'multiplier': 1e6, 'prefix': 'us'},
      -3: {'multiplier': 1e3, 'prefix': 'ms'},
      -2: {'multiplier': 1e3, 'prefix': 'ms'},
      -1: {'multiplier': 1e3, 'prefix': 'ms'},
      0: {'multiplier': 1, 'prefix': 's'},
      1: {'multiplier': 1, 'prefix': 's'},
      2: {'multiplier': 1, 'prefix': 's'},
      3: {'multiplier': 1, 'prefix': 's'},
      4: {'multiplier': 1, 'prefix': 's'},
     }

# TO DO: add very long delays (min, hours, ...)

[docs] def get_ai(energy, distance, center, pixel=None, detector=None, binning=None): """Initialize pyFAI azimuthal integrator obj. Parameters ---------- energy : float X-ray photon energy (eV). distance : float Sample-to-detector distance (m). center : tuple Coordinates of the image center (hor, ver) (pixel). pixel : float or tuple or None, optional Pixel size (m). If tuple, first value is the horizontal pizel size and second value is the vertical pixel size. Default is None. detector : str or None, optional Detector nickname. Default is None. binning : tuple or None, optional Detector binning (hor, ver). Default is None. Returns ------- pyfai_ai : pyFAI azimuthal integrator obj pyFAI ai Examples -------- >>> ai = get_ai(18e3, 0.05, detector='rayonix', binning=(2, 2)) """ if detector is not None: if detector.lower() in DETECTORS.keys(): detector = DETECTORS[detector.lower()] else: raise ValueError("'%s' not among ID09 detectors.\n" % detector + "Available detectors are: %s" % DETECTORS.keys()) pixel1 = pixel2 = None if binning is not None: detector.set_binning(binning) elif pixel is not None: detector_binnings = list(detector.BINNED_PIXEL_SIZE.values()) if not isinstance(pixel, (tuple, list, np.ndarray)): k = np.argmin(np.abs(np.array(detector_binnings)-pixel)) else: pixel_ave = 0.5*(pixel[0]+pixel[1]) k = np.argmin(np.abs(np.array(detector_binnings)-pixel_ave)) detector.set_binning((k+1, k+1)) else: raise Exception("If 'detector' is not None, either 'pixel' or " + "'binning' must be not None.") elif pixel is not None: if isinstance(pixel, (tuple, list)): if len(pixel) != 2: raise ValueError("'pixel' must have len=2 if tuple or list.") else: pixel1, pixel2 = pixel[0], pixel[1] elif not isinstance(pixel, (int, float)): raise TypeError("'pixel' must be scalar, tuple or list.") else: pixel1 = pixel2 = pixel else: raise ValueError("'pixel' and 'detector' cannot be both None.") if not isinstance(center, (tuple, list)): raise TypeError("'center' must be tuple or list.") elif len(center) != 2: raise ValueError("'center' must have length=2.") wavelength = en_to_wl(energy) pyfai_ai = AzimuthalIntegrator(pixel1=pixel1, pixel2=pixel2, detector=detector, wavelength=wavelength) pyfai_ai.setFit2D(centerX=float(center[0]), centerY=float(center[1]), directDist=distance*1e3) # add properties and methods Fit2D = pyfai_ai.getFit2D() pyfai_ai.center = (Fit2D['centerX'], Fit2D['centerY']) pyfai_ai.tilt = (Fit2D['tilt'], Fit2D['tiltPlanRotation']) pyfai_ai.energy = energy/1e3 def as_dict(self): """Convert pyFAI azimuthal intagrator obj to dictionary.""" methods = dir(self) methods = [m for m in methods if m.find("get_") == 0] names = [m[4:] for m in methods] values = [getattr(self, m)() for m in methods] ret = dict(zip(names, values)) ret["detector"] = self.detector.get_name() ret["center"] = self.center ret["energy"] = self.energy ret['binning'] = self.detector.binning return ret def as_str(self): """Convert pyFAI azimuthal intagrator obj to str.""" det = self.detector.name spline = self.spline if spline is None: spline = "None" px = (self.pixel1, self.pixel2) dist = self.dist wl = self.wavelength en = self.energy # poni1, poni2 = self.poni1, self.poni2 rot1, rot2, rot3 = self.rot1, self.rot2, self.rot3 cpx = self.center cmm = (cpx[0]*px[0]*1e3, cpx[1]*px[1]*1e3) tilt, tpr = self.tilt[0], self.tilt[1] s = ["# Detector : %s" % det, "# Spline : %s" % spline, "# Binning : %dx%d" % self.detector.binning, "# Pixel [um] : (%.2f, %.2f)" % (px[0]*1e6, px[1]*1e6), "# Distance [mm] : %.3f" % (dist*1e3), "# Center [px] : (%.2f, %.2f)" % (cpx[0], cpx[1]), "# Center [mm] : (%.3f, %.3f)" % (cmm[0], cmm[1]), "# Wavelength [A] : %.5f" % (wl*1e10), "# Energy [keV] : %.3f" % en, "# Rotations [rad] : (%.3f, %.3f, %.3f)" % (rot1, rot2, rot3), "# Tilt [rad] : %.3f" % tilt, "# Tilt Plane Rotation [rad] : %.3f" % tpr] return "\n".join(s) def show(self): print(self.as_str()) import types pyfai_ai.as_dict = types.MethodType(as_dict, pyfai_ai) pyfai_ai.as_str = types.MethodType(as_str, pyfai_ai) pyfai_ai.show = types.MethodType(show, pyfai_ai) return pyfai_ai
[docs] def get_fnames(pattern, folder="./"): """Get sorted list of file names matching a pattern.""" fnames = glob.glob(os.path.join(folder, pattern)) fnames.sort() return fnames
[docs] def load_image(fname): """Load image using fabio.""" img = fabio.open(fname).data return img
[docs] def load_mask(fname): """Load mask using fabio and check if pyFAI compatible.""" mask = load_image(fname) if not all(item in mask for item in (0, 1)): print("WARNING: image is not a pyFAI-type mask.") return mask
[docs] def save_dict_as_hdf5(d, h5fname, h5path='/', create_dataset_args=None, verbose=False): """Save dictionary to hdf5 file. Strings, scalars and array_like with size <= 2 are saved as attributes. Everything else is saved as dataset. If sub-dictionaries exist, corresponding sub-groups are created. Parameters ---------- d : dict Data dictionary. h5fname : str Output file name. h5path : str, optional Target path in HDF5 file in which groups are created. Default is '/'. create_dataset_args : dict or None, optional Parameters to pass to `hdf5.create_dataset`. This allows to specify filters and compression parameters. Default is None. Credits ------- - silx.io.dictdump.dicttoh5 - M. Cammarata (https://github.com/marcocamma/datastorage) Example ------- >> save_dict_as_hdf5(mydict, 'test.h5', create_dataset_args={'compression': 'gzip', 'compression_opts': 9)) """ if not isinstance(d, dict): raise TypeError("'d' must be dict.") os.environ["HDF5_USE_FILE_LOCKING"] = "TRUE" def fill_h5(d, f, h5path, create_dataset_args): """ Function to fill (even recursively) the hdf5 file from within a context manager. """ for k, v in tqdm(d.items()): if v is None or (isinstance(v, (dict, tuple)) and not len(v)): # create empty group (error with h5py v2.10) f[h5path].attrs[k] = h5py.Empty(dtype="f") elif isinstance(v, h5py.Empty): f[h5path].attrs[k] = h5py.Empty("f") elif isinstance(v, dict): # recurse f.create_group(h5path + k) fill_h5(v, f, h5path + k + "/", create_dataset_args=create_dataset_args) elif isinstance(v, str): f[h5path].attrs[k] = str(v) elif isinstance(v, (int, float, np.integer)): f[h5path].attrs[k] = v elif isinstance(v, (bool, np.bool_)): f[h5path].attrs[k] = v elif isinstance(v, (list, tuple, np.ndarray)): # take care of cases with lists having different lengths orig_shapes = [] try: v_new = np.array(v) if v_new.dtype.kind == 'O': orig_shapes = [val.shape for val in v_new] v = np.concatenate(v_new) except ValueError: orig_shapes = [val.shape for val in v] v = [np.ravel(val) for val in v] v = np.concatenate(v) if v_new.dtype.kind in ["S", "U"]: v = np.array(v_new, dtype=np.bytes_) # if size <= 2, assume it is attribute rather than dataset if np.size(v) <= 2: f[h5path].attrs[k] = v else: # handle list of strings or numpy array of strings f.create_dataset(h5path+k, data=v, **create_dataset_args) f[h5path+k].attrs['orig_shapes'] = orig_shapes else: print("ERROR: unrecongnized data type for key='%s'" % k) t0 = time.time() if create_dataset_args is None: create_dataset_args = {} with h5py.File(h5fname, "w", locking=True) as f: fill_h5(d, f, h5path, create_dataset_args) # Add NeXus plottable data group base_group = f[h5path] base_group.attrs["default"] = "integrated" nxdata = base_group.create_group("integrated") nxdata.attrs["NX_class"] = "NXdata" nxdata.attrs["signal"] = "intensity" nxdata.attrs["axes"] = [".", "q"] nxdata.attrs["interpretation"] = "spectrum" # Need to duplicate data to transpose it, alternative is to use virtual datasets nxdata.create_dataset("intensity", data=np.transpose(d["i"]), **create_dataset_args) nxdata.create_dataset("intensity_errors", data=np.transpose(d["e"]), **create_dataset_args) nxdata["q"] = h5py.SoftLink(f"{base_group.name}/q") if verbose: print("Saving time: %.3f s" % (time.time() - t0))
[docs] def load_hdf5_as_dict(h5fname, h5path='/', verbose=False): """Read a HDF5 file and return a nested dictionary. Parameters ---------- h5fname : str HDF5 file name. h5path : str, optional Name of HDF5 group to use as dictionary root level. Default is '/'. Credits ------- - silx.io.dictdump.h5todict """ os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE" ddict = {} t0 = time.time() with h5py.File(h5fname, mode='r', locking=False) as f: for k in f[h5path]: if isinstance(f[h5path+k], h5py.Group): if f[h5path+k].attrs.get("NX_class") == "NXdata": continue # Skip NeXus plottable data group ddict[k] = load_hdf5_as_dict(h5fname, h5path+k+"/") else: # convert HDF5 dataset to numpy array ddict[k] = f[h5path+k][...] # convert to unicode string, int or float if scalar if ddict[k].size == 1: if ddict[k].dtype.kind in ['S', 'U']: ddict[k] = ddict[k].astype('unicode').astype(str) elif ddict[k].dtype.kind == 'i': ddict[k] = int(ddict[k]) elif ddict[k].dtype.kind == 'f': ddict[k] = float(ddict[k]) else: raise Exception("Unrecognized data type for '%s'" % k) # convert to array of unicode strings if array of strings if ddict[k].dtype.kind == 'S': ddict[k] = ddict[k].astype('unicode').astype(str) if "orig_shapes" in f[h5path + k].attrs.keys(): if f[h5path + k].attrs['orig_shapes'].size > 0: entry = [] start_idx = 0 for _, shape in enumerate( f[h5path + k].attrs["orig_shapes"] ): end_idx = np.prod(shape) entry.append( ddict[k][start_idx:end_idx].reshape(shape) ) ddict[k] = entry # special case if k == "filt_res": ddict[k] = [tuple(val) for val in ddict[k]] for k, v in f[h5path].attrs.items(): if k == "default": continue # Skip NeXus default attribute if isinstance(v, np.ndarray): if v.dtype.kind == "S": v = v.astype(str) if k == "filt_res": v = [tuple(val) for val in v] ddict[k] = v if verbose: print("Loading time: %.3f s" % (time.time() - t0)) return ddict
[docs] def get_exponent_and_mantissa(f): """Get exponent and mantissa from float.""" if abs(f) < 1e-15: exponent = 0 else: exponent = int(np.floor(np.log10(abs(f)))) mantissa = f*si[exponent]['multiplier'] return exponent, mantissa
[docs] def t2str(delay, digits=None, decimals=None, strip_spaces=True): """ Convert delay from float to str. Parameters ---------- delay : float or array-like Float delay. digits : int or None, optional Number of significant digits of the number without units. decimals : int or None, optional Number of digits after the dot. Returns ------- t : str or array_like String delay. """ if isinstance(delay, str): return delay if isinstance(delay, float): delay = [delay] if not isinstance(delay, (list, tuple, np.ndarray)): raise TypeError("'delay' must be float or array-like.") if digits is not None and not isinstance(digits, int): raise TypeError("'digits' must be int or None.") if decimals is not None and not isinstance(decimals, int): raise TypeError("'decimals' must be int or None.") if digits is not None and digits < 1: raise ValueError("'digits' must be > 0") t = [] for k, d in enumerate(delay): if isinstance(d, str): t.append(d) continue if abs(d) < 1e-15: t.append("0") continue if d in [9999.999, 9999, 10000]: t.append("off") continue exponent, mantissa = get_exponent_and_mantissa(d) if digits is not None: mantissa_exponent = np.floor(np.log10(abs(mantissa))) mantissa /= 10**mantissa_exponent mantissa = np.round(mantissa, digits-1) mantissa *= 10**mantissa_exponent if abs(mantissa) == 1000: exponent += 1 mantissa = 1 if mantissa > 0 else -1 if decimals is not None: mantissa = np.round(mantissa, decimals) if (abs(mantissa) - int(abs(mantissa))) == 0: # to avoid truncation issues with floats mantissa = int(mantissa) string = str(mantissa) + " " + si[exponent]["prefix"] if strip_spaces: string = string.replace(" ", "") t.append(string) if len(t) == 1: t = t[0] return t
_t_in_str_regex = re.compile("_(-?\d+\.?\d*(?:ps|ns|us|ms)?)") def get_delay_from_str(string): match = _t_in_str_regex.search(string) return match and match.group(1) or None _t_regex = re.compile("(-?\d+\.?\d*)\s*((?:s|fs|ms|ns|ps|us)?)")
[docs] def str2t(delay): """ Convert delay from string to float. Parameters ---------- delay : str or array-like String delay. Returns ------- t : float or array_like Float delay. """ if isinstance(delay, str): delay = [delay] return_array = False else: return_array = True if not isinstance(delay, (list, tuple, np.ndarray)): raise TypeError("'delay' must be float or array-like.") t2val = dict(fs=1e-15, ps=1e-12, ns=1e-9, us=1e-6, ms=1e-3, s=1) t = [] for k, d in enumerate(delay): if isinstance(d, bytes): d = d.decode('ascii') match = _t_regex.search(d) if match: n, t0 = float(match.group(1)), match.group(2) val = t2val.get(t0, 1) t.append(n*val) else: # unrecognized string will be kept t.append(d) if not return_array: t = t[0] return t
[docs] def sort_string_delays(delays, digits=None, return_indices=False): """Sort string delays in ascending order. Parameters ---------- delays : array-like Array of strings. return_indices : bool If True, returns also the indices of the original array arranged according to the new, sorted array. Can be used to sort data as well later on. Returns ------- sorted_delays : array-like Array of strings. sorted_indices : array-like Array of integer giving th position of the old array elements in the new, sorted array. Notes ----- - If a string that cannot be converted to float is present (e.g. 'off'), it is placed at the beginning of the array. """ delays = str2t(delays) num_delays = [d for d in delays if not isinstance(d, str)] str_delays = [d for d in delays if isinstance(d, str)] sorted_indices = np.argsort(num_delays) num_delays = [num_delays[idx] for idx in sorted_indices] # add possible string delays and update the indices accordingly. sorted_indices = np.concatenate(( np.arange(len(str_delays)), sorted_indices + len(str_delays) )) sorted_delays = t2str(str_delays + num_delays, digits=digits, decimals=1) if return_indices: return sorted_delays, sorted_indices else: return sorted_delays
def sim_live_dataset(orig_path, dest_path, sleep_time): # check what *.log files are available in 'orig_path' fnames = get_fnames("*.log", folder=orig_path) if len(fnames) == 0: full_path = os.path.join(os.getcwd(), orig_path) raise Exception("No *.log file found in '%s'." % full_path) diagnostics = os.path.join(orig_path, 'diagnostics.log') if diagnostics in fnames: fnames.remove(diagnostics) orig_logfile = fnames[0] if len(fnames) > 1: print("WARNING: found more that one *.log that is " + "not diagnostics.log\nUsing: %s" % orig_path) dest_logfile = os.path.join(dest_path, os.path.basename(orig_logfile)) filelist = get_fnames("*.edf", folder=orig_path) if not os.path.isdir(dest_path): os.mkdir(dest_path) # empty destination folder: for f in os.listdir(dest_path): os.remove(os.path.join(dest_path, f)) # read whole file content with open(orig_logfile, 'r') as f: lines = f.readlines() # find last line starting with '#' for iline, line in enumerate(lines): if line.lstrip()[0] != '#': break if iline == 0: raise Exception("No comment lines. " + "Does not seem a waxscollect log file.") for line in lines[:iline]: with open(dest_logfile, 'a') as log: log.write(line) loglines = lines[iline:] for f, line in zip(filelist, loglines): with open(dest_logfile, 'a') as log: print(line) log.write(line) time.sleep(sleep_time) print(f) copy(f, dest_path)
[docs] def convert_units(value, unit, target_unit): """Converts the value to the given target units. The base units used are the SI units. Parameters ---------- value : float The value to be converted. unit : str The unit the provided value is given into. For instance, 'm', 'eV', 'g/L', J/m^2, m.s^(-1), m.s^{-1}, J/m^2/s. target_unit : str The unit of the value to be returned. """ prefixes = { 'f':1e-15, 'p':1e-12, 'angs': 1e-10, # special case for distances 'n':1e-9, 'u':1e-6, 'm':1e-3, 'c':1e-2, 'd':1e-1, '':1, 'h':1e2, 'k':1e3, 'M':1e6, 'G':1e9, 'T':1e12, 'P':1e15, } current_singlets = _parse_unit(unit) target_singlets = _parse_unit(target_unit) for key, val in current_singlets.items(): curr_scale = prefixes[val['prefix']] ** val['exponent'] if key not in target_singlets.keys(): raise KeyError( "The current and target units do not seem to correspond." ) target = target_singlets[key] target_scale = prefixes[target['prefix']] ** target['exponent'] scale = curr_scale / target_scale value = value * scale return value
def _parse_unit(unit): bases = [ "angs", "mol", "rad", "cd", "eV", "Da", "ohm", "s", "g", "A", "K", "J", "Hz", "N", "Pa", "W", "V", "F", "T", "L", "dB", ] converters = { "rad": "m/m", "J": "kg.m^2.s^{-2}", "Hz": "s^{-1}", "N": "kg.m.s^{-2}", "Pa": "kg.m^{-1}.s^{-2}", "W": "kg.m^2.s^{-3}", "V": "kg.m^2.s^{-3}.A^{-1}", "F": "kg^{-1}.m^{-2}.s^4.A^2", "ohm": "kg.m^2.s^{-3}.A^{-2}", "T": "kg.s^{-2}.A^{-1}", "L": "dm^3", } pattern = re.compile("([a-zA-Z]+)\^?[({]?(-?\d*)[)}]?") singlets = {} # get the denominators and set the exponent sign to negative units = unit.split('/') for idx, val in enumerate(units): val = val.split(".") for sub_idx, single_unit in enumerate(val): is_negative = False # first element has negative exponent due to the '/'. if idx > 0 and sub_idx == 0: is_negative = True name, exponent = pattern.search(single_unit).groups() # process the name, look for a prefix if re.match("\w?m$", name): # a distance base_start_idx = name.rfind("m") else: for base in bases: base_start_idx = name.rfind(base) if base_start_idx != -1: break if base_start_idx == -1: raise ValueError( f"Unit {name} is not recognized.\n" f"Available values (with or without scaling " f"prefix are: {bases}.\n" ) elif base_start_idx == 0: prefix = '' else: prefix = name[:base_start_idx] name = name[base_start_idx:] if name == 'angs': name = 'm' prefix = 'angs' # process the exponent, if any exponent = float(exponent) if exponent != '' else 1. if exponent < 0: is_negative = not is_negative singlets[name] = { "prefix": prefix, "exponent": -abs(exponent) if is_negative else abs(exponent), } return singlets