from ewoksid02.id02_format import Path
from pyFAI.units import to_unit
from ewoksid02.tasks.id02processingtask import ID02ProcessingTask
from ewoksid02.utils.average import calculate_average, get_array_limit
[docs]
class AverageTask(
ID02ProcessingTask,
optional_input_names=[
"dataset_sum_signal",
"dataset_sum_normalization",
"dataset_sum_variance",
"radial_array",
"azimuth_array",
"Dummy",
"unit",
"azimuth_range",
"pca_parameters",
],
output_names=[
"dataset_average_signal_norm",
"radial_array",
],
):
"""The `AverageTask` class is responsible for calculating the average of datasets in the ID02 SAXS pipeline.
It extends the `ID02ProcessingTask` class and provides additional functionality for handling averaging-specific
inputs and processing logic. If azimuth_range is not provided, a full average will be performed.
Optional Inputs:
- dataset_sum_signal (numpy.ndarray): Sum of signal, non-normalized from an ai.integrate2d result.
- dataset_sum_normalization (numpy.ndarray): Sum of normalized pixels, from an ai.integrate2d result.
- dataset_sum_variance (numpy.ndarray): Sum of variance, from an ai.integrate2d result.
- radial_array (numpy.ndarray): Radial axis array for the dataset.
- azimuth_array (numpy.ndarray): Azimuthal axis array for the dataset.
- Dummy (float): Value to replace invalid pixels in the dataset.
- unit (str): Unit for the radial axis (e.g., "q_nm^-1").
- azimuth_range (list of tuples): Azimuthal ranges for averaging (e.g., `[(0, 90), (270, 360)]`).
- pca_parameters (dict): Parameters for Principal Component Analysis (PCA) if applicable.
Outputs Parameters:
- dataset_average_signal_norm (numpy.ndarray): Normalized average signal dataset.
- radial_array (numpy.ndarray): Radial axis array for the averaged data.
"""
[docs]
def get_processing_parameters(self) -> dict:
if not self.processing_params:
azimuth_range = self.get_input_value("azimuth_range", None)
array_ranges = None
if azimuth_range is None:
self.log_info("There is no azimuth_range. Full average will be done.")
array_ranges = [(0, -1)]
else:
azimuth_array = self.get_input_value("azimuth_array", None)
if azimuth_array is None:
self.log_warning(
"There is no azimuth_array , azimuth ranges cannot be transformed into array limits. Full average will be done."
)
array_ranges = [(0, -1)]
else:
array_ranges = [
get_array_limit(
azimuth_array=azimuth_array, azimuth_range=az_range
)
for az_range in azimuth_range
]
self.processing_params = {
"array_ranges": array_ranges,
"Dummy": self.get_parameter("Dummy"),
"azimuth_range": azimuth_range,
}
return self.processing_params
[docs]
def process(self):
(
dataset_average_intensity,
dataset_average_signal_norm,
dataset_average_variance,
dataset_average_sigma,
) = calculate_average(
dataset_intensity=self.dataset_signal,
dataset_sum_signal=self.get_input_value("dataset_sum_signal", None),
dataset_sum_norm=self.get_input_value("dataset_sum_normalization", None),
dataset_sum_variance=self.get_input_value("dataset_sum_variance", None),
calculate_variance=self.get_input_value("save_variance", False),
**self.get_processing_parameters(),
)
self.outputs.dataset_signal = dataset_average_intensity
self.outputs.dataset_average_signal_norm = dataset_average_signal_norm
self.outputs.dataset_variance = dataset_average_variance
self.outputs.dataset_sigma = dataset_average_sigma
self.outputs.radial_array = self.get_input_value("radial_array", None)
[docs]
def save(self) -> None:
super().save()
with self.open_id02_file(
filename=self.processing_filename,
mode="a",
) as processed_file:
processed_file.update_dataset(
added_dataset=self.outputs.dataset_average_signal_norm,
h5_dataset_path=str(
Path(processed_file.nxdata_path) / "data_signal_norm"
),
index_read=self.index_range_last,
datatype=self.get_input_value("datatype", "float32"),
)
# Add nexus data information (only once)
unit = self.get_parameter("unit")
radial_unit = to_unit(unit)
nexus_data_grp = processed_file[processed_file.nxdata_path]
if radial_unit is not None and radial_unit.short_name not in nexus_data_grp:
if self.get_input_value("radial_array", None) is not None:
radial_dset = nexus_data_grp.create_dataset(
name=radial_unit.short_name,
data=self.get_input_value("radial_array", None),
)
radial_dset.attrs.update(
{
"unit": str(radial_unit),
"interpretation": "scalar",
"axis": "2",
}
)
nexus_data_grp.attrs.update(
{
"axes": [".", radial_unit.short_name],
"signal": "data",
}
)
[docs]
def processing_info(self) -> list:
azimuth_range = self.get_input_value("azimuth_range", [0, 360])
return [{"h5path": "entry_0000", "name": "ave_limits", "value": azimuth_range}]
def _save_in_gallery(self) -> str:
if self.loop_nb != 1:
return
dataset_signal = self.outputs.dataset_signal
if dataset_signal is None or dataset_signal.size == 0:
return
import matplotlib.pyplot as plt
unit = self.get_parameter("unit")
radial_unit = to_unit(unit)
signal = dataset_signal.mean(axis=0)
q = self.outputs.radial_array
fig, ax = plt.subplots(figsize=(15, 10))
ax.plot(q, signal)
ax.set_xlabel(f"{radial_unit.short_name} ({radial_unit.unit_symbol})")
ax.set_ylabel("Intensity (arb. units)")
ax.set_title(f"scan {self.scan_nb} - average")
filename_png = self._get_filename_gallery()
fig.savefig(filename_png)