# -*- coding: utf-8 -*-
"""Reader class for HDF5 files from BLISS."""
__author__ = "Kevin Pounot, Matteo Levantino"
__contact__ = "matteo.levantino@esrf.fr"
__licence__ = "MIT"
__copyright__ = "ESRF - The European Synchrotron, Grenoble, France"
__date__ = "24/10/2022"
import os
from glob import glob
import re
import numpy as np
import time
from tqdm import tqdm
from dateutil.parser import parse
from functools import lru_cache
import json
from silx.io.h5py_utils import File
import fabio
from txs.utils import get_fnames, get_ai, convert_units
from txs.detectors.jungfrau import JungfrauData
_DETECTORS = "rayonix|jungfrau1m|mpx_5x1|mpx_2x1"
[docs]
def load_images(folder, extension="h5", exclude=None, return_fnames=False,
verbose=True):
"""Load images in folder.
Parameters
----------
folder : str
Folder where images are located.
extension : str, optional
File extension of images. Default is 'edf'.
exclude : list or None, optional
List of filenames (basenames) that will not be loaded.
Default is None.
return_fnames : bool, optional
If True, a list of images filenames (basenames) is also returned.
Default is False.
Returns
-------
imgs : list
Images as returned as a list of numpy arrays.
"""
if extension == "edf":
imgs, fnames = load_EDF_images(folder, exclude, verbose)
if verbose and len(imgs) == 0:
print("No EDF images in folder: %s" % folder)
elif extension == 'h5':
imgs = ImageIteratorHDF5(folder, exclude=exclude)
fnames = imgs.fnames
if verbose and len(imgs) == 0:
print("No HDF5 images in folder: %s" % folder)
else:
raise NotImplementedError("NOT IMPLEMENTED YET!")
if return_fnames:
return imgs, fnames
else:
return imgs
# --- SPEC part ---
[docs]
def load_EDF_images(folder, exclude=None, verbose=True):
"""Load EDF images in folder.
Parameters
----------
folder : str
Folder where images are located.
exclude : list or None, optional
List of filenames (basenames) that will not be loaded.
Default is None.
Returns
-------
imgs : list
Images as returned as a list of numpy arrays.
"""
pattern = "*.edf"
fnames = get_fnames(pattern=pattern, folder=folder)
if exclude is None:
exclude = []
if not isinstance(exclude, (list, np.ndarray)):
raise TypeError("'exclude' must be None or list.")
exclude = [os.path.join(folder, f) for f in exclude]
fnames = [f for f in fnames if f not in exclude]
imgs = ImageIteratorEDF(fnames, verbose)
fnames = [os.path.basename(f) for f in fnames]
return imgs, fnames
[docs]
class ImageIteratorEDF:
"""
Class to handle the loading of EDF images and metadata from the SPEC macro
'waxscollect'.
The class provides iterator and slicing capabilities to access images.
Parameters
----------
fnames : list
List of EDF filenames.
"""
def __init__(self, fnames, verbose=True):
if isinstance(fnames, str):
fnames = [fnames]
self.fnames = fnames
self.nimgs = len(fnames)
self.verbose = verbose
def __iter__(self):
self.n = 0
return self
def __next__(self):
if self.n < self.nimgs:
img = fabio.open(self.fnames[self.n]).data
self.n += 1
return img
else:
raise StopIteration
def __getitem__(self, slice):
sel_imgs = self.fnames[slice]
if isinstance(sel_imgs, str):
sel_imgs = [sel_imgs]
if self.verbose:
# only verbose if multiple images are loaded at once
if len(sel_imgs) > 1:
print(
"\nLoading %d images from %s ..."
% (len(sel_imgs), os.path.abspath(sel_imgs[0]))
)
loader = tqdm(sel_imgs) if len(sel_imgs) > 1 else sel_imgs
imgs = [fabio.open(img).data for img in loader]
return np.array(imgs).squeeze()
def __len__(self):
return len(self.fnames)
[docs]
def get_info_from_id09_log_file(lines):
"""Get parameters values from ID09 log file comment section.
Paramters
---------
lines : list
List of comment lines extracted with readlines.
Returns
-------
pars : dict
Retrieved parameters (info).
"""
pars = {}
pars_lbls = {'mode': ['mode'],
'primary_slits': ['primary slits'],
'filters': ['filters'],
'sblen': ['pulse duration'],
'sample_slits': ['sample_slits'],
'd': ['detector distance'],
'npulses': ['pulses per image'],
'time_between_pulses': ['time between pulses'],
'wpo': ['waveplate'],
'pd1_range': ['pd1 range'],
'pd2_range': ['pd2 range'],
'pd3_range': ['pd3 range'],
'pd4_range': ['pd4 range'],
'pd1_dark': ['pd1 dark'],
'pd2_dark': ['pd2 dark'],
'pd3_dark': ['pd3 dark'],
'pd4_dark': ['pd4 dark'],
'spindle': ['spindle']}
for line in lines:
for k, v_list in pars_lbls.items():
if any(v in line for v in v_list):
pars[k] = line.split(":")[-1].strip()
return pars
[docs]
def read_id09_log_file(fname, skip=None, format='auto',
out_format='datastorage', get_info=False):
"""Read ID09 log file associated to a tr-waxs dataset.
Parameters
----------
fname : str
Name of log file or folder. If 'fname' is a folder, the file name
will be guessed automatically.
skip : int or tuple or None, optional
First/last lines to be skipped. Default is None.
format : str {'waxscollect', 'bliss', 'auto'}, optional
Log file format. Default is 'waxscollect'.
out_format : str, optional
Can be 'ndarray', 'dict' or 'datastorage'. Default is 'datastorage'.
get_info : boot, optional
If True, extra info (parameters value) is also returned.
Default is False.
Returns
-------
data : ndarray or dict or DataStorage
Retrieved data from id09 log file.
Use 'out_format' to choose between np.genfromtxt() ndarray or dict.
info : dict
Retrieved info from id09 log file comment section. Returned only
if 'get_info' is True.
"""
if format == 'waxscollect':
if os.path.isdir(fname):
fnames = get_fnames("*.log", folder=fname)
if len(fnames) == 0:
full_path = os.path.join(os.getcwd(), fname)
raise Exception("No *.log file found in '%s'." % full_path)
diagnostics = os.path.join(fname, 'diagnostics.log')
if diagnostics in fnames:
fnames.remove(diagnostics)
fname = fnames[0]
if len(fnames) > 1:
print("WARNING: found more that one *.log that is " +
"not diagnostics.log\nUsing: %s" % fname)
if not os.path.exists(fname):
raise Exception("'%s' does not exist." % fname)
# read whole file content
with open(fname, 'r') as f:
lines = f.readlines()
lines = [line.strip() for line in lines]
# find last line starting with '#'
for iline, line in enumerate(lines):
if line.lstrip()[0] != '#':
break
if iline == 0:
raise Exception("No comment lines. " +
"Does not seem a waxscollect log file.")
# extract info from comments
if get_info:
info = get_info_from_id09_log_file(lines[:iline-1])
# extract names from last commented line
names = lines[iline-1][1:].split()
# read data only (ndarray)
# NOTE: if the file is read while the last row is written an error will
# be raised as the number of columns in last row will be less than
# in previous
try:
data = np.genfromtxt(
fname, skip_header=iline, names=names, dtype=None,
encoding=None, converters=None, excludelist=[]
)
except ValueError:
time.sleep(0.03)
data = np.genfromtxt(
fname, skip_header=iline, names=names, dtype=None,
encoding=None, converters=None, excludelist=[]
)
# skip first/last lines
if skip is not None:
if isinstance(skip, int):
data = data[skip:]
elif isinstance(skip, tuple):
data = data[skip[0]:skip[1]]
else:
raise TypeError("'skip' must be int, tuple or None.")
# strip '_' from each 'data.dtype.names'
data.dtype.names = [name.strip('_') for name in data.dtype.names]
if not isinstance(out_format, str):
raise TypeError("'out_format' must be str.")
if out_format.lower() not in ['ndarray', 'dict', 'datastorage']:
raise ValueError("'out_format' must be 'ndarray', 'dict' or " +
"'datastorage'.")
if out_format.lower() == 'ndarray':
if get_info:
return data, info
else:
return data
else:
# convert data to dictionary
data = dict((name, data[name]) for name in data.dtype.names)
data["log_fname"] = fname
if get_info:
return data, info
else:
return data
elif format == 'bliss':
scan = ImageIteratorHDF5(fname)
return scan.metadata
elif format == 'auto':
path = os.path.abspath(fname)
if os.path.isdir(path):
edf_files = glob(path + "/*.edf")
if len(edf_files) == 0:
return read_id09_log_file(
fname, skip, 'bliss', out_format, get_info
)
else:
return read_id09_log_file(
fname, skip, 'waxscollect', out_format, get_info
)
else:
raise Exception(
"'fname' should be a directory with 'auto' format option."
)
else:
raise ValueError(
"The `format` argument for the log file is not valid.\n"
"It should be either 'waxscollect' or 'bliss'."
)
# --- BLISS part ---
[docs]
class BlissDataset:
"""Class to handle 'datasets' created by BLISS.
This is intended to handle simultaneously the HDF5 file in a 'Dataset'
folder and the associated HDF5 files of the corresponding scans.
Metadata are gathered mainly from the dataset HDF5 file, while images
are loaded directly from the scans HDF5 files.
Parameters
----------
fname : str
Name of the HDF5 file corresponding to the dataset.
detector : str
The name of the detector in the HDF5 file.
counters : 'all', list of str or None, optional
Additional counters to be read along with the images.
"""
def __init__(self, fname, detector='auto', counters=None):
self.fname = os.path.abspath(fname)
self.detector = detector
self.counters = counters
with File(fname, 'r') as data:
if detector == 'auto':
self.detector = _find_detector(data)
@property
def scans(self):
with File(self.fname, 'r') as data:
scans = sorted(
[key for key in data.keys()],
key=lambda x: float(x)
)
return scans
@property
def nscans(self):
return len(self.scans)
def __iter__(self):
self.n = 1
return self
def __next__(self):
if self.n <= self.nscans:
folder = self._get_scan_paths(self.n)
data = ImageIteratorHDF5(
folder,
detector=self.detector,
counters=self.counters,
dataset_file=self.fname,
)
self.n += 1
return data
else:
raise StopIteration
@lru_cache
def __getitem__(self, scan_number):
"""Return the images corresponding to the selected datasets."""
if isinstance(scan_number, int):
if scan_number >= 0:
scan_number = int(max(1, scan_number) - 1)
folder = self._get_scan_paths(scan_number)
return ImageIteratorHDF5(
folder,
detector=self.detector,
counters=self.counters,
dataset_file=self.fname,
)
if isinstance(scan_number, slice):
scan_number = np.arange(
scan_number.start if scan_number.start is not None else 1,
scan_number.stop
if scan_number.stop is not None else self.nscans,
)
if isinstance(scan_number, str):
folder = self._get_scan_paths(scan_number)
return ImageIteratorHDF5(
folder,
detector=self.detector,
counters=self.counters,
dataset_file=self.fname,
)
out = []
for idx in scan_number:
if idx >= 0:
idx = int(max(1, idx) - 1)
folder = self._get_scan_paths(idx)
ii = ImageIteratorHDF5(
folder,
detector=self.detector,
counters=self.counters,
dataset_file=self.fname,
)
out.append(ii)
return out
[docs]
@classmethod
def from_scan_folder(cls, folder, detector='auto', counters=None):
"""Returns a `BlissDataset` instance and an iterator on the scan.
Parameters
----------
folder : str
The folder containing the data of the BLISS `scan`.
detector : str, optional
The detector to be used.
(default, 'auto')
counters : list of str
Additional counters to be read with the images.
"""
folder = os.path.abspath(folder)
dset_folder = os.path.dirname(folder.rstrip('/'))
dset_file = dset_folder + f'/{dset_folder.split("/")[-1]}.h5'
dset = cls(dset_file, detector, counters)
scan_idx = dset.get_scan_from_folder(folder)
return dset, dset[scan_idx]
[docs]
def get_scan_from_folder(self, folder):
"""Return the scan number corresponding to the folder."""
scan_folders = self._get_scan_paths()
folder = os.path.abspath(folder)
for key, val in scan_folders.items():
if folder in val:
return key
[docs]
def get_counters(self, scan_number):
"""Returns the values of the counters corresponding to the scan."""
if isinstance(scan_number, int):
base = f"{self.scans[scan_number - 1]}/measurement"
elif isinstance(scan_number, str):
base = f"{scan_number}/measurement"
else:
raise ValueError(
"The argument `scan_number` could not be understand.\n"
"It should be either of type int or str."
)
counters = self.counters
if counters is None:
counters = []
with File(self.fname, 'r') as data:
if self.counters == 'all':
counters = [
val for val in data[f"{base}"] if val != self.detector
]
out = {
counter: data[f"{base}/{counter}"][()] for counter in counters
if counter in data[f"{base}"].keys()
}
return out
def _get_arborescence(self):
dirname = os.path.dirname(self.fname)
red_path, dset_id = os.path.split(dirname)
red_path, sample_id = os.path.split(red_path)
exp_root, _ = os.path.split(red_path)
raw = "RAW_DATA"
if not os.path.exists(
f"{exp_root}/{raw}/{sample_id}/{dset_id}"
):
exp_root = red_path
raw = ""
return exp_root, raw, sample_id, dset_id
def _get_scan_paths(self, idx=None):
"""Return the list of the absolute path of each scan."""
exp_root, raw, sample_id, dset_id = self._get_arborescence()
path = {}
for scan in self.scans:
path[scan] = ''
with File(self.fname, 'r') as data:
if "measurement" in data[f"{scan}"]:
if self.detector in data[f"{scan}/measurement"].keys():
scan_number = int(scan.split('.')[0])
scan_path = (f"{exp_root}/{raw}/{sample_id}/{dset_id}/"
f"scan{scan_number:04}")
scan_folder = glob(scan_path)
if len(scan_folder) == 1:
path[scan] = os.path.abspath(scan_folder[0])
if idx is not None:
if isinstance(idx, int):
if self.scans[idx] not in path.keys():
raise KeyError(
f"No images or folder can be found for scan "
f"{self.scans[idx]}.\n"
f"Verify that the folder "
f"'scan{int(self.scans[idx].split('.')[0]):04}' exists "
f"in the dataset folder."
)
path = path[self.scans[idx]]
elif isinstance(idx, str):
path = path[idx]
else:
raise TypeError(
"`idx` must be int, str or None."
)
return path
def __len__(self):
return len(self.scans)
def filter(self, **conditions):
"""Filter the scans based on the given conditions
Parameters
----------
conditions : keyword arguments
Conditions based on the metadata or the images.
Should in the form of a callable that takes
a single argument, the value of the metadata or the
images.
Returns
-------
sample : :py:class:`Sample`
An instance of the :py:class:`Sample` class which contains
the scans that passed the condition(s).
Examples
--------
>>> sample = Sample('my_file.h5')
>>> filt_sample = sample.filter(
... positions=lambda val: val['gony'] > 0.5,
... positions=lambda val: val['gonz'] < 1,
... images=lambda val: np.mean(val) > 1000
... )
"""
raise NotImplementedError
[docs]
class ImageIteratorHDF5:
"""A class to handle HDF5 files corresponding to a single BLISS scan.
This does not provide live monitoring yet (retry from silx for instance).
That is, if a file is still being written by BLISS, the images and
c unters will not be available and the iterator will stop at the
frame of the last closed file. #20221120
However, the list of available images is updated each time the iterator is
called or each time a slice is requested such that the class does not
need to be re-instanciated.
"""
def __init__(
self,
folder,
scan='',
detector='rayonix',
counters=None,
exclude=None,
dataset_file=None,
verbose=False,
):
"""Instantiate the class.
Parameters
----------
folder : str
The folder where file(s) are to be opened and read.
scan : str, optional
The scan and subscan numbers as it appears in the dataset HDF5 file.
detector : str, optional
The name of the detector in the HDF5 file.
counters : list of str, optional
Additional counters to be read along with the images.
exclude : list of str, optional
List of files to be excluded from the iterator.
"""
self.detector = detector
self.folder = os.path.abspath(folder)
self.scan = scan
self.counters = counters
self.exclude = list(exclude) if exclude is not None else []
self.verbose = verbose
self.dataset_file = dataset_file
if dataset_file is None:
# assumes the arborescence in 'RAW_SCANS'
dset_folder = os.path.dirname(self.folder.rstrip('/'))
dset_file = dset_folder + f'/{os.path.split(dset_folder)[-1]}.h5'
self.dataset_file = dset_file
self._chunk_size = None
self._load_frames()
if len(self.fnames) == 0 and self.verbose:
print(
f"WARNING: no images found yet in folder: {folder}.\n"
f"Either no image files has been finished being written yet "
f"or the folder path is not correct.\n"
)
self._shape = (0, 0, 0)
def _load_frames(self):
"""Update the list of available frames"""
folder_files = sorted(
glob(self.folder + f"/*{self.detector}*.h5")
)
# Cut the list of files at the first missing file in the serie
available_files = []
for index, fname in enumerate(folder_files):
match = re.match(r".*_(?P<index>\d+).h5$", fname)
if match is None or int(match.group("index")) != index:
break
available_files.append(fname)
fnames = [os.path.abspath(fname) for fname in available_files]
# first call, check the image files to find the chunk size and
# possibly the detector
if self._chunk_size is None:
for fname in fnames:
try:
with File(fname, 'r') as data:
if 'creator' in data.attrs.keys():
if 'LIMA' in data.attrs['creator']:
self._chunk_size = data[
'entry_0000/measurement/data'
].shape[0]
if self.detector == 'auto':
self.detector = _find_detector(data)
break # break at first image file found
self.exclude.append(fname)
except OSError:
# assumes no file is finished to be written
# (still locked).
pass
# check if the last file is done being recorded.
try:
with File(fnames[-1], 'r') as data:
pass
except Exception:
# assumes the file is not finished to be written (still locked).
fnames = fnames[:-1]
pass
self._available_fnames = [fname for fname in fnames if fname not in self.exclude]
@property
def fnames(self):
return self._available_fnames
@property
def shapes(self):
fnames = self.fnames
if len(fnames) > 0:
with File(fnames[0], 'r') as data:
shape = data['entry_0000/measurement/data'].shape
with File(fnames[-1], 'r') as data:
last_shape = data['entry_0000/measurement/data'].shape
else:
shape = self._shape
last_shape = (0, 0, 0)
self._shape = shape
shapes = (len(fnames) - 1) * [shape] + [last_shape]
return np.array(shapes)
@property
def nfiles(self):
return len(self.fnames)
@property
def nimgs(self):
return int(np.sum(self.shapes[:, 0]))
@property
def _last_first_index(self):
last = np.cumsum(self.shapes[:, 0])
first = np.insert(last, 0, 0)[:-1]
return (last, first)
@property
@lru_cache
def metadata(self):
dset = BlissDataset(self.dataset_file, self.detector, self.counters)
scan_idx = dset.get_scan_from_folder(self.folder)
return dset.metadata(scan_idx)
def __iter__(self):
self.n = 0
# snapshot of the images and fnames at the beginning of the iteration.
self._fnames = self.fnames
self._nimgs = self.nimgs
self._stream = None
return self
def __next__(self):
if self.n < self._nimgs:
fidx = int(self.n / self._chunk_size)
img_idx = int(self.n % self._chunk_size)
# open a stream for the next chunk of images.
if img_idx == 0:
if self._stream is not None:
self._stream.close()
self._stream = File(self._fnames[fidx], 'r')
img = self._stream["entry_0000/measurement/data"][img_idx]
if self.counters is None:
out = np.atleast_2d(img.squeeze())
else:
out = {self.detector: img, **self._get_counters(self.n)}
self.n += 1
return out
else:
self._stream.close()
raise StopIteration
def __getitem__(self, slice):
"""Return the images corresponding to the selected files."""
fnames, _, indices = self._get_imgs_fnames(slice, True)
imgs = []
counters = []
for idx, fname in enumerate(fnames):
with File(fname, 'r') as data:
imgs.append(data["entry_0000/measurement/data"][indices[idx]])
imgs = np.concatenate(imgs).squeeze()
# BELOW LINES are not used anymore (to be removed):
# out_counters = {}
# for key in counters[0].keys():
# out_counters[key] = np.concatenate(
# [val[key] for val in counters]
# ).squeeze()
if self.counters is None:
return imgs
else:
counters = self._get_counters(slice)
return {self.detector: imgs, **counters}
def __len__(self):
return self.nimgs
def _get_counters(self, frames):
"""Reads the selected counters in an open HDF5 file."""
counters = {}
dset = BlissDataset(self.dataset_file, self.detector, self.counters)
scan_idx = dset.get_scan_from_folder(self.folder)
counters = dset.get_counters(scan_idx)
counters = {
key: val[frames]
for key, val in counters.items()
}
return counters
def _get_imgs_fnames(self, imgs, return_indices=True):
"""Returns the file names corresponding to the images.
Parameters
----------
imgs : int, list of int or slice
The imgs to be selected.
return_indices : bool, optional
If True, returns the indices for each file in `self.fnames`
that corresponds to the selected imgs.
(default, True)
Returns
-------
fnames : list of str
The names of the files corresponding to the selected imgs.
"""
if len(self.fnames) == 0:
return ([], [], []) if return_indices else []
if isinstance(imgs, slice):
imgs = np.arange(
imgs.start if imgs.start is not None else 0,
imgs.stop if imgs.stop is not None else self.nimgs,
)
fnames = []
indices1 = []
indices2 = []
last_idx, first_idx = self._last_first_index
# determine where to start and where to stop for faster iteration
# (concerns iteration on last_idx and first_idx)
if isinstance(imgs, int):
start_idx = int(imgs / self._chunk_size)
stop_idx = start_idx + 1
else:
first_img = min(sorted(imgs))
last_img = max(sorted(imgs))
start_idx = int(first_img / self._chunk_size)
stop_idx = int(last_img / self._chunk_size) + 1
for idx, val in enumerate(last_idx[start_idx:stop_idx]):
idx += start_idx
if idx == stop_idx:
break
imgs_range = np.arange(first_idx[idx], val)
ids, ids1, ids2 = np.intersect1d(
imgs, imgs_range, return_indices=True
)
if len(ids) > 0:
indices1.append(ids1)
indices2.append(ids2)
fnames.append(self.fnames[idx])
if return_indices:
return fnames, indices1, indices2
else:
return fnames
[docs]
def get_ai(self):
"""Generate an `AzimuthalIntegrator` object from the metadata."""
meta = self.metadata
try:
energy = meta["energy"]
energy_units = 'keV'
center_x, center_y = meta["center"]
pixel_x = meta["x_pixel_size"]
pixel_x_units = meta["x_pixel_size_units"]
pixel_y = meta["y_pixel_size"]
pixel_y_units = meta["y_pixel_size_units"]
distance = meta["dist"]
distance_units = "m"
binning = meta["binning"]
except KeyError:
print(
"The metadata do not contain the necessary "
"information to build the `AzimuthalIntegrator` automatically."
)
return
ai = get_ai(
convert_units(energy, energy_units, 'eV'),
convert_units(distance, distance_units, 'm'),
(center_x, center_y),
(
convert_units(pixel_x, pixel_x_units, 'm'),
convert_units(pixel_y, pixel_y_units, 'm'),
),
self.detector,
binning,
)
return ai
# helper function for the different classes in this file
def _find_detector(stream):
"""Try to identify the detector used.
Parameters
----------
stream : :py:class:`silx.io.h5py_utils.File`
An opened HDF5 file stream.
"""
def det_finder(key):
return re.search(_DETECTORS, key)
detector = stream.visit(det_finder)
if detector is not None:
detector = detector.group()
if detector is None:
raise ValueError(
"The detector could not be detected automatically.\n"
"Either there are no images in the data set or "
"a non-registered detector was used.\n"
"Please provide a detector name as it appears in the "
"BLISS HDF5 file.\n"
)
return detector