Skip to content

analysis

analysis

Analysis

Source code in src/easydiffraction/analysis/analysis.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
class Analysis:
    _calculator = CalculatorFactory.create_calculator('cryspy')

    def __init__(self, project) -> None:
        self.project = project
        self.aliases = Aliases()
        self.constraints = Constraints()
        self.constraints_handler = ConstraintsHandler.get()
        self.calculator = Analysis._calculator  # Default calculator shared by project
        self._calculator_key: str = 'cryspy'  # Added to track the current calculator
        self._fit_mode: str = 'single'
        self.fitter = DiffractionMinimizer('lmfit (leastsq)')

    def _get_params_as_dataframe(
        self,
        params: List[Union[Descriptor, Parameter]],
    ) -> pd.DataFrame:
        """
        Convert a list of parameters to a DataFrame.

        Args:
            params: List of Descriptor or Parameter objects.

        Returns:
            A pandas DataFrame containing parameter information.
        """
        rows = []
        for param in params:
            common_attrs = {}
            if isinstance(param, (Descriptor, Parameter)):
                common_attrs = {
                    'datablock': param.datablock_id,
                    'category': param.category_key,
                    'entry': param.collection_entry_id,
                    'parameter': param.name,
                    'value': param.value,
                    'units': param.units,
                    'fittable': False,
                }
            param_attrs = {}
            if isinstance(param, Parameter):
                param_attrs = {
                    'fittable': True,
                    'free': param.free,
                    'min': param.min,
                    'max': param.max,
                    'uncertainty': f'{param.uncertainty:.4f}' if param.uncertainty else '',
                    'value': f'{param.value:.4f}',
                    'units': param.units,
                }
            row = common_attrs | param_attrs
            rows.append(row)

        dataframe = pd.DataFrame(rows)
        return dataframe

    def show_all_params(self) -> None:
        sample_models_params = self.project.sample_models.get_all_params()
        experiments_params = self.project.experiments.get_all_params()

        if not sample_models_params and not experiments_params:
            print(warning('No parameters found.'))
            return

        columns_headers = [
            'datablock',
            'category',
            'entry',
            'parameter',
            'value',
            'fittable',
        ]
        columns_alignment = [
            'left',
            'left',
            'left',
            'left',
            'right',
            'left',
        ]

        sample_models_dataframe = self._get_params_as_dataframe(sample_models_params)
        sample_models_dataframe = sample_models_dataframe[columns_headers]

        print(paragraph('All parameters for all sample models (🧩 data blocks)'))
        render_table(
            columns_headers=columns_headers,
            columns_alignment=columns_alignment,
            columns_data=sample_models_dataframe,
            show_index=True,
        )

        experiments_dataframe = self._get_params_as_dataframe(experiments_params)
        experiments_dataframe = experiments_dataframe[columns_headers]

        print(paragraph('All parameters for all experiments (🔬 data blocks)'))
        render_table(
            columns_headers=columns_headers,
            columns_alignment=columns_alignment,
            columns_data=experiments_dataframe,
            show_index=True,
        )

    def show_fittable_params(self) -> None:
        sample_models_params = self.project.sample_models.get_fittable_params()
        experiments_params = self.project.experiments.get_fittable_params()

        if not sample_models_params and not experiments_params:
            print(warning('No fittable parameters found.'))
            return

        columns_headers = [
            'datablock',
            'category',
            'entry',
            'parameter',
            'value',
            'uncertainty',
            'units',
            'free',
        ]
        columns_alignment = [
            'left',
            'left',
            'left',
            'left',
            'right',
            'right',
            'left',
            'left',
        ]

        sample_models_dataframe = self._get_params_as_dataframe(sample_models_params)
        sample_models_dataframe = sample_models_dataframe[columns_headers]

        print(paragraph('Fittable parameters for all sample models (🧩 data blocks)'))
        render_table(
            columns_headers=columns_headers,
            columns_alignment=columns_alignment,
            columns_data=sample_models_dataframe,
            show_index=True,
        )

        experiments_dataframe = self._get_params_as_dataframe(experiments_params)
        experiments_dataframe = experiments_dataframe[columns_headers]

        print(paragraph('Fittable parameters for all experiments (🔬 data blocks)'))
        render_table(
            columns_headers=columns_headers,
            columns_alignment=columns_alignment,
            columns_data=experiments_dataframe,
            show_index=True,
        )

    def show_free_params(self) -> None:
        sample_models_params = self.project.sample_models.get_free_params()
        experiments_params = self.project.experiments.get_free_params()
        free_params = sample_models_params + experiments_params

        if not free_params:
            print(warning('No free parameters found.'))
            return

        columns_headers = [
            'datablock',
            'category',
            'entry',
            'parameter',
            'value',
            'uncertainty',
            'min',
            'max',
            'units',
        ]
        columns_alignment = [
            'left',
            'left',
            'left',
            'left',
            'right',
            'right',
            'right',
            'right',
            'left',
        ]

        dataframe = self._get_params_as_dataframe(free_params)
        dataframe = dataframe[columns_headers]

        print(paragraph('Free parameters for both sample models (🧩 data blocks) and experiments (🔬 data blocks)'))
        render_table(
            columns_headers=columns_headers, columns_alignment=columns_alignment, columns_data=dataframe, show_index=True
        )

    def how_to_access_parameters(self) -> None:
        sample_models_params = self.project.sample_models.get_all_params()
        experiments_params = self.project.experiments.get_all_params()
        params = {'sample_models': sample_models_params, 'experiments': experiments_params}

        if not params:
            print(warning('No parameters found.'))
            return

        columns_headers = [
            'datablock',
            'category',
            'entry',
            'parameter',
            'How to Access in Python Code',
            'Unique Identifier for CIF Constraints',
        ]

        columns_alignment = [
            'left',
            'left',
            'left',
            'left',
            'left',
            'left',
        ]

        columns_data = []
        project_varname = self.project._varname
        for datablock_type, params in params.items():
            for param in params:
                if isinstance(param, (Descriptor, Parameter)):
                    datablock_id = param.datablock_id
                    category_key = param.category_key
                    entry_id = param.collection_entry_id
                    param_key = param.name
                    code_variable = f"{project_varname}.{datablock_type}['{datablock_id}'].{category_key}"
                    if entry_id:
                        code_variable += f"['{entry_id}']"
                    code_variable += f'.{param_key}'
                    cif_uid = param._generate_human_readable_unique_id()
                    columns_data.append([datablock_id, category_key, entry_id, param_key, code_variable, cif_uid])

        print(paragraph('How to access parameters'))
        render_table(
            columns_headers=columns_headers, columns_alignment=columns_alignment, columns_data=columns_data, show_index=True
        )

    def show_current_calculator(self) -> None:
        print(paragraph('Current calculator'))
        print(self.current_calculator)

    @staticmethod
    def show_supported_calculators() -> None:
        CalculatorFactory.show_supported_calculators()

    @property
    def current_calculator(self) -> str:
        return self._calculator_key

    @current_calculator.setter
    def current_calculator(self, calculator_name: str) -> None:
        calculator = CalculatorFactory.create_calculator(calculator_name)
        if calculator is None:
            return
        self.calculator = calculator
        self._calculator_key = calculator_name
        print(paragraph('Current calculator changed to'))
        print(self.current_calculator)

    def show_current_minimizer(self) -> None:
        print(paragraph('Current minimizer'))
        print(self.current_minimizer)

    @staticmethod
    def show_available_minimizers() -> None:
        MinimizerFactory.show_available_minimizers()

    @property
    def current_minimizer(self) -> Optional[str]:
        return self.fitter.selection if self.fitter else None

    @current_minimizer.setter
    def current_minimizer(self, selection: str) -> None:
        self.fitter = DiffractionMinimizer(selection)
        print(paragraph('Current minimizer changed to'))
        print(self.current_minimizer)

    @property
    def fit_mode(self) -> str:
        return self._fit_mode

    @fit_mode.setter
    def fit_mode(self, strategy: str) -> None:
        if strategy not in ['single', 'joint']:
            raise ValueError("Fit mode must be either 'single' or 'joint'")
        self._fit_mode = strategy
        if strategy == 'joint':
            if not hasattr(self, 'joint_fit_experiments'):
                # Pre-populate all experiments with weight 0.5
                self.joint_fit_experiments = JointFitExperiments()
                for id in self.project.experiments.ids:
                    self.joint_fit_experiments.add(id, weight=0.5)
        print(paragraph('Current fit mode changed to'))
        print(self._fit_mode)

    def show_available_fit_modes(self) -> None:
        strategies = [
            {
                'Strategy': 'single',
                'Description': 'Independent fitting of each experiment; no shared parameters',
            },
            {
                'Strategy': 'joint',
                'Description': 'Simultaneous fitting of all experiments; some parameters are shared',
            },
        ]

        columns_headers = ['Strategy', 'Description']
        columns_alignment = ['left', 'left']
        columns_data = []
        for item in strategies:
            strategy = item['Strategy']
            description = item['Description']
            columns_data.append([strategy, description])

        print(paragraph('Available fit modes'))
        render_table(columns_headers=columns_headers, columns_alignment=columns_alignment, columns_data=columns_data)

    def show_current_fit_mode(self) -> None:
        print(paragraph('Current fit mode'))
        print(self.fit_mode)

    def calculate_pattern(self, expt_name: str) -> Optional[np.ndarray]:
        """
        Calculate the diffraction pattern for a given experiment.

        Args:
            expt_name: The name of the experiment.

        Returns:
            The calculated pattern as a pandas DataFrame.
        """
        experiment = self.project.experiments[expt_name]
        sample_models = self.project.sample_models
        calculated_pattern = self.calculator.calculate_pattern(sample_models, experiment)
        return calculated_pattern

    def show_constraints(self) -> None:
        constraints_dict = self.constraints._items

        if not self.constraints._items:
            print(warning('No constraints defined.'))
            return

        rows = []
        for constraint in constraints_dict.values():
            row = {
                'lhs_alias': constraint.lhs_alias.value,
                'rhs_expr': constraint.rhs_expr.value,
                'full expression': f'{constraint.lhs_alias.value} = {constraint.rhs_expr.value}',
            }
            rows.append(row)

        headers = ['lhs_alias', 'rhs_expr', 'full expression']
        alignments = ['left', 'left', 'left']
        rows = [[row[header] for header in headers] for row in rows]

        print(paragraph('User defined constraints'))
        render_table(columns_headers=headers, columns_alignment=alignments, columns_data=rows)

    def apply_constraints(self):
        if not self.constraints._items:
            print(warning('No constraints defined.'))
            return

        self.constraints_handler.set_aliases(self.aliases)
        self.constraints_handler.set_constraints(self.constraints)
        self.constraints_handler.apply()

    def fit(self):
        sample_models = self.project.sample_models
        if not sample_models:
            print('No sample models found in the project. Cannot run fit.')
            return

        experiments = self.project.experiments
        if not experiments:
            print('No experiments found in the project. Cannot run fit.')
            return

        calculator = self.calculator
        if not calculator:
            print('No calculator is set. Cannot run fit.')
            return

        # Run the fitting process
        experiment_ids = experiments.ids

        if self.fit_mode == 'joint':
            print(paragraph(f"Using all experiments 🔬 {experiment_ids} for '{self.fit_mode}' fitting"))
            self.fitter.fit(sample_models, experiments, calculator, weights=self.joint_fit_experiments)
        elif self.fit_mode == 'single':
            for expt_name in experiments.ids:
                print(paragraph(f"Using experiment 🔬 '{expt_name}' for '{self.fit_mode}' fitting"))
                experiment = experiments[expt_name]
                dummy_experiments = Experiments()  # TODO: Find a better name
                dummy_experiments.add(experiment)
                self.fitter.fit(sample_models, dummy_experiments, calculator)
        else:
            raise NotImplementedError(f'Fit mode {self.fit_mode} not implemented yet.')

        # After fitting, get the results
        self.fit_results = self.fitter.results

    def as_cif(self):
        current_minimizer = self.current_minimizer
        if ' ' in current_minimizer:
            current_minimizer = f'"{current_minimizer}"'

        lines = []
        lines.append(f'_analysis.calculator_engine  {self.current_calculator}')
        lines.append(f'_analysis.fitting_engine  {current_minimizer}')
        lines.append(f'_analysis.fit_mode  {self.fit_mode}')

        lines.append('')
        lines.append(self.aliases.as_cif())

        lines.append('')
        lines.append(self.constraints.as_cif())

        return '\n'.join(lines)

    def show_as_cif(self) -> None:
        cif_text: str = self.as_cif()
        paragraph_title: str = paragraph('Analysis 🧮 info as cif')
        render_cif(cif_text, paragraph_title)

calculate_pattern(expt_name)

Calculate the diffraction pattern for a given experiment.

Parameters:

Name Type Description Default
expt_name str

The name of the experiment.

required

Returns:

Type Description
Optional[ndarray]

The calculated pattern as a pandas DataFrame.

Source code in src/easydiffraction/analysis/analysis.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def calculate_pattern(self, expt_name: str) -> Optional[np.ndarray]:
    """
    Calculate the diffraction pattern for a given experiment.

    Args:
        expt_name: The name of the experiment.

    Returns:
        The calculated pattern as a pandas DataFrame.
    """
    experiment = self.project.experiments[expt_name]
    sample_models = self.project.sample_models
    calculated_pattern = self.calculator.calculate_pattern(sample_models, experiment)
    return calculated_pattern

calculation

DiffractionCalculator

Invokes calculation engines for pattern generation.

Source code in src/easydiffraction/analysis/calculation.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class DiffractionCalculator:
    """
    Invokes calculation engines for pattern generation.
    """

    def __init__(self, engine: str = 'cryspy') -> None:
        """
        Initialize the DiffractionCalculator with a specified backend engine.

        Args:
            engine: Type of the calculation engine to use.
                    Supported types: 'crysfml', 'cryspy', 'pdffit'.
                    Default is 'cryspy'.
        """
        self.calculator_factory = CalculatorFactory()
        self._calculator = self.calculator_factory.create_calculator(engine)

    def set_calculator(self, engine: str) -> None:
        """
        Switch to a different calculator engine at runtime.

        Args:
            engine: New calculation engine type to use.
        """
        self._calculator = self.calculator_factory.create_calculator(engine)

    def calculate_structure_factors(
        self,
        sample_models: SampleModels,
        experiments: Experiments,
    ) -> Optional[List[Any]]:
        """
        Calculate HKL intensities (structure factors) for sample models and experiments.

        Args:
            sample_models: Collection of sample models.
            experiments: Collection of experiments.

        Returns:
            HKL intensities calculated by the backend calculator.
        """
        return self._calculator.calculate_structure_factors(sample_models, experiments)

    def calculate_pattern(
        self,
        sample_models: SampleModels,
        experiment: Experiment,
    ) -> np.ndarray:
        """
        Calculate diffraction pattern based on sample models and experiment.

        Args:
            sample_models: Collection of sample models.
            experiment: A single experiment object.

        Returns:
            Diffraction pattern calculated by the backend calculator.
        """
        return self._calculator.calculate_pattern(sample_models, experiment)

__init__(engine='cryspy')

Initialize the DiffractionCalculator with a specified backend engine.

Parameters:

Name Type Description Default
engine str

Type of the calculation engine to use. Supported types: 'crysfml', 'cryspy', 'pdffit'. Default is 'cryspy'.

'cryspy'
Source code in src/easydiffraction/analysis/calculation.py
22
23
24
25
26
27
28
29
30
31
32
def __init__(self, engine: str = 'cryspy') -> None:
    """
    Initialize the DiffractionCalculator with a specified backend engine.

    Args:
        engine: Type of the calculation engine to use.
                Supported types: 'crysfml', 'cryspy', 'pdffit'.
                Default is 'cryspy'.
    """
    self.calculator_factory = CalculatorFactory()
    self._calculator = self.calculator_factory.create_calculator(engine)

calculate_pattern(sample_models, experiment)

Calculate diffraction pattern based on sample models and experiment.

Parameters:

Name Type Description Default
sample_models SampleModels

Collection of sample models.

required
experiment Experiment

A single experiment object.

required

Returns:

Type Description
ndarray

Diffraction pattern calculated by the backend calculator.

Source code in src/easydiffraction/analysis/calculation.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def calculate_pattern(
    self,
    sample_models: SampleModels,
    experiment: Experiment,
) -> np.ndarray:
    """
    Calculate diffraction pattern based on sample models and experiment.

    Args:
        sample_models: Collection of sample models.
        experiment: A single experiment object.

    Returns:
        Diffraction pattern calculated by the backend calculator.
    """
    return self._calculator.calculate_pattern(sample_models, experiment)

calculate_structure_factors(sample_models, experiments)

Calculate HKL intensities (structure factors) for sample models and experiments.

Parameters:

Name Type Description Default
sample_models SampleModels

Collection of sample models.

required
experiments Experiments

Collection of experiments.

required

Returns:

Type Description
Optional[List[Any]]

HKL intensities calculated by the backend calculator.

Source code in src/easydiffraction/analysis/calculation.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def calculate_structure_factors(
    self,
    sample_models: SampleModels,
    experiments: Experiments,
) -> Optional[List[Any]]:
    """
    Calculate HKL intensities (structure factors) for sample models and experiments.

    Args:
        sample_models: Collection of sample models.
        experiments: Collection of experiments.

    Returns:
        HKL intensities calculated by the backend calculator.
    """
    return self._calculator.calculate_structure_factors(sample_models, experiments)

set_calculator(engine)

Switch to a different calculator engine at runtime.

Parameters:

Name Type Description Default
engine str

New calculation engine type to use.

required
Source code in src/easydiffraction/analysis/calculation.py
34
35
36
37
38
39
40
41
def set_calculator(self, engine: str) -> None:
    """
    Switch to a different calculator engine at runtime.

    Args:
        engine: New calculation engine type to use.
    """
    self._calculator = self.calculator_factory.create_calculator(engine)

calculators

calculator_base

CalculatorBase

Bases: ABC

Base API for diffraction calculation engines.

Source code in src/easydiffraction/analysis/calculators/calculator_base.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
class CalculatorBase(ABC):
    """
    Base API for diffraction calculation engines.
    """

    @property
    @abstractmethod
    def name(self) -> str:
        pass

    @property
    @abstractmethod
    def engine_imported(self) -> bool:
        pass

    @abstractmethod
    def calculate_structure_factors(
        self,
        sample_model: SampleModel,
        experiment: Experiment,
    ) -> None:
        """
        Calculate structure factors for a single sample model and experiment.
        """
        pass

    def calculate_pattern(
        self,
        sample_models: SampleModels,
        experiment: Experiment,
        called_by_minimizer: bool = False,
    ) -> np.ndarray:
        """
        Calculate the diffraction pattern for multiple sample models and a single experiment.

        Args:
            sample_models: Collection of sample models.
            experiment: The experiment object.
            called_by_minimizer: Whether the calculation is called by a minimizer.

        Returns:
            The calculated diffraction pattern as a NumPy array.
        """
        x_data = experiment.datastore.pattern.x
        y_calc_zeros = np.zeros_like(x_data)

        valid_linked_phases = self._get_valid_linked_phases(sample_models, experiment)

        # Apply user constraints to all sample models
        constraints = ConstraintsHandler.get()
        constraints.apply()

        # Calculate contributions from valid linked sample models
        y_calc_scaled = y_calc_zeros
        for linked_phase in valid_linked_phases:
            sample_model_id = linked_phase._entry_id
            sample_model_scale = linked_phase.scale.value
            sample_model = sample_models[sample_model_id]

            # Apply symmetry constraints
            sample_model.apply_symmetry_constraints()

            sample_model_y_calc = self._calculate_single_model_pattern(
                sample_model,
                experiment,
                called_by_minimizer=called_by_minimizer,
            )

            sample_model_y_calc_scaled = sample_model_scale * sample_model_y_calc
            y_calc_scaled += sample_model_y_calc_scaled

        # Calculate background contribution
        y_bkg = np.zeros_like(x_data)
        if hasattr(experiment, 'background'):
            y_bkg = experiment.background.calculate(x_data)
        experiment.datastore.pattern.bkg = y_bkg

        # Calculate total pattern
        y_calc_total = y_calc_scaled + y_bkg
        experiment.datastore.pattern.calc = y_calc_total

        return y_calc_total

    @abstractmethod
    def _calculate_single_model_pattern(
        self,
        sample_model: SampleModels,
        experiment: Experiment,
        called_by_minimizer: bool,
    ) -> np.ndarray:
        """
        Calculate the diffraction pattern for a single sample model and experiment.

        Args:
            sample_model: The sample model object.
            experiment: The experiment object.
            called_by_minimizer: Whether the calculation is called by a minimizer.

        Returns:
            The calculated diffraction pattern as a NumPy array.
        """
        pass

    def _get_valid_linked_phases(
        self,
        sample_models: SampleModels,
        experiment: Experiment,
    ) -> List[Any]:
        """
        Get valid linked phases from the experiment.

        Args:
            sample_models: Collection of sample models.
            experiment: The experiment object.

        Returns:
            A list of valid linked phases.
        """
        if not experiment.linked_phases:
            print('Warning: No linked phases found. Returning empty pattern.')
            return []

        valid_linked_phases = []
        for linked_phase in experiment.linked_phases:
            if linked_phase._entry_id not in sample_models.get_ids():
                print(f"Warning: Linked phase '{linked_phase.id.value}' not found in Sample Models {sample_models.get_ids()}")
                continue
            valid_linked_phases.append(linked_phase)

        if not valid_linked_phases:
            print('Warning: None of the linked phases found in Sample Models. Returning empty pattern.')

        return valid_linked_phases
calculate_pattern(sample_models, experiment, called_by_minimizer=False)

Calculate the diffraction pattern for multiple sample models and a single experiment.

Parameters:

Name Type Description Default
sample_models SampleModels

Collection of sample models.

required
experiment Experiment

The experiment object.

required
called_by_minimizer bool

Whether the calculation is called by a minimizer.

False

Returns:

Type Description
ndarray

The calculated diffraction pattern as a NumPy array.

Source code in src/easydiffraction/analysis/calculators/calculator_base.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def calculate_pattern(
    self,
    sample_models: SampleModels,
    experiment: Experiment,
    called_by_minimizer: bool = False,
) -> np.ndarray:
    """
    Calculate the diffraction pattern for multiple sample models and a single experiment.

    Args:
        sample_models: Collection of sample models.
        experiment: The experiment object.
        called_by_minimizer: Whether the calculation is called by a minimizer.

    Returns:
        The calculated diffraction pattern as a NumPy array.
    """
    x_data = experiment.datastore.pattern.x
    y_calc_zeros = np.zeros_like(x_data)

    valid_linked_phases = self._get_valid_linked_phases(sample_models, experiment)

    # Apply user constraints to all sample models
    constraints = ConstraintsHandler.get()
    constraints.apply()

    # Calculate contributions from valid linked sample models
    y_calc_scaled = y_calc_zeros
    for linked_phase in valid_linked_phases:
        sample_model_id = linked_phase._entry_id
        sample_model_scale = linked_phase.scale.value
        sample_model = sample_models[sample_model_id]

        # Apply symmetry constraints
        sample_model.apply_symmetry_constraints()

        sample_model_y_calc = self._calculate_single_model_pattern(
            sample_model,
            experiment,
            called_by_minimizer=called_by_minimizer,
        )

        sample_model_y_calc_scaled = sample_model_scale * sample_model_y_calc
        y_calc_scaled += sample_model_y_calc_scaled

    # Calculate background contribution
    y_bkg = np.zeros_like(x_data)
    if hasattr(experiment, 'background'):
        y_bkg = experiment.background.calculate(x_data)
    experiment.datastore.pattern.bkg = y_bkg

    # Calculate total pattern
    y_calc_total = y_calc_scaled + y_bkg
    experiment.datastore.pattern.calc = y_calc_total

    return y_calc_total
calculate_structure_factors(sample_model, experiment) abstractmethod

Calculate structure factors for a single sample model and experiment.

Source code in src/easydiffraction/analysis/calculators/calculator_base.py
32
33
34
35
36
37
38
39
40
41
@abstractmethod
def calculate_structure_factors(
    self,
    sample_model: SampleModel,
    experiment: Experiment,
) -> None:
    """
    Calculate structure factors for a single sample model and experiment.
    """
    pass

calculator_crysfml

CrysfmlCalculator

Bases: CalculatorBase

Wrapper for Crysfml library.

Source code in src/easydiffraction/analysis/calculators/calculator_crysfml.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
class CrysfmlCalculator(CalculatorBase):
    """
    Wrapper for Crysfml library.
    """

    engine_imported: bool = cfml_py_utilities is not None

    @property
    def name(self) -> str:
        return 'crysfml'

    def calculate_structure_factors(
        self,
        sample_models: SampleModels,
        experiments: Experiments,
    ) -> None:
        """
        Call Crysfml to calculate structure factors.

        Args:
            sample_models: The sample models to calculate structure factors for.
            experiments: The experiments associated with the sample models.
        """
        raise NotImplementedError('HKL calculation is not implemented for CrysfmlCalculator.')

    def _calculate_single_model_pattern(
        self,
        sample_model: SampleModels,
        experiment: Experiment,
        called_by_minimizer: bool = False,
    ) -> Union[np.ndarray, List[float]]:
        """
        Calculates the diffraction pattern using Crysfml for the given sample model and experiment.

        Args:
            sample_model: The sample model to calculate the pattern for.
            experiment: The experiment associated with the sample model.
            called_by_minimizer: Whether the calculation is called by a minimizer.

        Returns:
            The calculated diffraction pattern as a NumPy array or a list of floats.
        """
        crysfml_dict = self._crysfml_dict(sample_model, experiment)
        try:
            _, y = cfml_py_utilities.cw_powder_pattern_from_dict(crysfml_dict)
            y = self._adjust_pattern_length(y, len(experiment.datastore.pattern.x))
        except KeyError:
            print('[CrysfmlCalculator] Error: No calculated data')
            y = []
        return y

    def _adjust_pattern_length(
        self,
        pattern: List[float],
        target_length: int,
    ) -> List[float]:
        """
        Adjusts the length of the pattern to match the target length.

        Args:
            pattern: The pattern to adjust.
            target_length: The desired length of the pattern.

        Returns:
            The adjusted pattern.
        """
        # TODO: Check the origin of this discrepancy coming from PyCrysFML
        if len(pattern) > target_length:
            return pattern[:target_length]
        return pattern

    def _crysfml_dict(
        self,
        sample_model: SampleModels,
        experiment: Experiment,
    ) -> Dict[str, Union[Experiment, SampleModel]]:
        """
        Converts the sample model and experiment into a dictionary format for Crysfml.

        Args:
            sample_model: The sample model to convert.
            experiment: The experiment to convert.

        Returns:
            A dictionary representation of the sample model and experiment.
        """
        sample_model_dict = self._convert_sample_model_to_dict(sample_model)
        experiment_dict = self._convert_experiment_to_dict(experiment)
        return {
            'phases': [sample_model_dict],
            'experiments': [experiment_dict],
        }

    def _convert_sample_model_to_dict(
        self,
        sample_model: SampleModel,
    ) -> Dict[str, Any]:
        """
        Converts a sample model into a dictionary format.

        Args:
            sample_model: The sample model to convert.

        Returns:
            A dictionary representation of the sample model.
        """
        sample_model_dict = {
            sample_model.name: {
                '_space_group_name_H-M_alt': sample_model.space_group.name_h_m.value,
                '_cell_length_a': sample_model.cell.length_a.value,
                '_cell_length_b': sample_model.cell.length_b.value,
                '_cell_length_c': sample_model.cell.length_c.value,
                '_cell_angle_alpha': sample_model.cell.angle_alpha.value,
                '_cell_angle_beta': sample_model.cell.angle_beta.value,
                '_cell_angle_gamma': sample_model.cell.angle_gamma.value,
                '_atom_site': [],
            }
        }

        for atom in sample_model.atom_sites:
            atom_site = {
                '_label': atom.label.value,
                '_type_symbol': atom.type_symbol.value,
                '_fract_x': atom.fract_x.value,
                '_fract_y': atom.fract_y.value,
                '_fract_z': atom.fract_z.value,
                '_occupancy': atom.occupancy.value,
                '_adp_type': 'Biso',  # Assuming Biso for simplicity
                '_B_iso_or_equiv': atom.b_iso.value,
            }
            sample_model_dict[sample_model.name]['_atom_site'].append(atom_site)

        return sample_model_dict

    def _convert_experiment_to_dict(
        self,
        experiment: Experiment,
    ) -> Dict[str, Any]:
        """
        Converts an experiment into a dictionary format.

        Args:
            experiment: The experiment to convert.

        Returns:
            A dictionary representation of the experiment.
        """
        expt_type = getattr(experiment, 'type', None)
        instrument = getattr(experiment, 'instrument', None)
        peak = getattr(experiment, 'peak', None)

        x_data = experiment.datastore.pattern.x
        twotheta_min = float(x_data.min())
        twotheta_max = float(x_data.max())

        exp_dict = {
            'NPD': {
                '_diffrn_radiation_probe': expt_type.radiation_probe.value if expt_type else 'neutron',
                '_diffrn_radiation_wavelength': instrument.setup_wavelength.value if instrument else 1.0,
                '_pd_instr_resolution_u': peak.broad_gauss_u.value if peak else 0.0,
                '_pd_instr_resolution_v': peak.broad_gauss_v.value if peak else 0.0,
                '_pd_instr_resolution_w': peak.broad_gauss_w.value if peak else 0.0,
                '_pd_instr_resolution_x': peak.broad_lorentz_x.value if peak else 0.0,
                '_pd_instr_resolution_y': peak.broad_lorentz_y.value if peak else 0.0,
                # "_pd_instr_reflex_s_l": peak_asymm.s_l.value if peak_asymm else 0.0,
                # "_pd_instr_reflex_d_l": peak_asymm.d_l.value if peak_asymm else 0.0,
                '_pd_meas_2theta_offset': instrument.calib_twotheta_offset.value if instrument else 0.0,
                '_pd_meas_2theta_range_min': twotheta_min,
                '_pd_meas_2theta_range_max': twotheta_max,
                '_pd_meas_2theta_range_inc': (twotheta_max - twotheta_min) / len(x_data),
            }
        }

        return exp_dict
calculate_structure_factors(sample_models, experiments)

Call Crysfml to calculate structure factors.

Parameters:

Name Type Description Default
sample_models SampleModels

The sample models to calculate structure factors for.

required
experiments Experiments

The experiments associated with the sample models.

required
Source code in src/easydiffraction/analysis/calculators/calculator_crysfml.py
38
39
40
41
42
43
44
45
46
47
48
49
50
def calculate_structure_factors(
    self,
    sample_models: SampleModels,
    experiments: Experiments,
) -> None:
    """
    Call Crysfml to calculate structure factors.

    Args:
        sample_models: The sample models to calculate structure factors for.
        experiments: The experiments associated with the sample models.
    """
    raise NotImplementedError('HKL calculation is not implemented for CrysfmlCalculator.')

calculator_cryspy

CryspyCalculator

Bases: CalculatorBase

Cryspy-based diffraction calculator. Converts EasyDiffraction models into Cryspy objects and computes patterns.

Source code in src/easydiffraction/analysis/calculators/calculator_cryspy.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
class CryspyCalculator(CalculatorBase):
    """
    Cryspy-based diffraction calculator.
    Converts EasyDiffraction models into Cryspy objects and computes patterns.
    """

    engine_imported: bool = cryspy is not None

    @property
    def name(self) -> str:
        return 'cryspy'

    def __init__(self) -> None:
        super().__init__()
        self._cryspy_dicts: Dict[str, Dict[str, Any]] = {}

    def calculate_structure_factors(
        self,
        sample_model: SampleModel,
        experiment: Experiment,
    ) -> None:
        """
        Raises a NotImplementedError as HKL calculation is not implemented.

        Args:
            sample_model: The sample model to calculate structure factors for.
            experiment: The experiment associated with the sample models.
        """
        raise NotImplementedError('HKL calculation is not implemented for CryspyCalculator.')

    def _calculate_single_model_pattern(
        self,
        sample_model: SampleModel,
        experiment: Experiment,
        called_by_minimizer: bool = False,
    ) -> Union[np.ndarray, List[float]]:
        """
        Calculates the diffraction pattern using Cryspy for the given sample model and experiment.
        We only recreate the cryspy_obj if this method is
         - NOT called by the minimizer, or
         - the cryspy_dict is NOT yet created.
        In other cases, we are modifying the existing cryspy_dict
        This allows significantly speeding up the calculation

        Args:
            sample_model: The sample model to calculate the pattern for.
            experiment: The experiment associated with the sample model.
            called_by_minimizer: Whether the calculation is called by a minimizer.

        Returns:
            The calculated diffraction pattern as a NumPy array or a list of floats.
        """
        combined_name = f'{sample_model.name}_{experiment.name}'

        if called_by_minimizer:
            if self._cryspy_dicts and combined_name in self._cryspy_dicts:
                cryspy_dict = self._recreate_cryspy_dict(sample_model, experiment)
            else:
                cryspy_obj = self._recreate_cryspy_obj(sample_model, experiment)
                cryspy_dict = cryspy_obj.get_dictionary()
        else:
            cryspy_obj = self._recreate_cryspy_obj(sample_model, experiment)
            cryspy_dict = cryspy_obj.get_dictionary()

        self._cryspy_dicts[combined_name] = copy.deepcopy(cryspy_dict)

        cryspy_in_out_dict: Dict[str, Any] = {}

        # Calculate the pattern using Cryspy
        # TODO: Redirect stderr to suppress Cryspy warnings.
        #  This is a temporary solution to avoid cluttering the output.
        #  E.g. cryspy/A_functions_base/powder_diffraction_tof.py:106:
        #  RuntimeWarning: overflow encountered in exp
        #  Remove this when Cryspy is updated to handle warnings better.
        with contextlib.redirect_stderr(io.StringIO()):
            rhochi_calc_chi_sq_by_dictionary(
                cryspy_dict,
                dict_in_out=cryspy_in_out_dict,
                flag_use_precalculated_data=False,
                flag_calc_analytical_derivatives=False,
            )

        prefixes = {'constant wavelength': 'pd', 'time-of-flight': 'tof'}
        beam_mode = experiment.type.beam_mode.value
        if beam_mode in prefixes.keys():
            cryspy_block_name = f'{prefixes[beam_mode]}_{experiment.name}'
        else:
            print(f'[CryspyCalculator] Error: Unknown beam mode {experiment.type.beam_mode.value}')
            return []

        try:
            signal_plus = cryspy_in_out_dict[cryspy_block_name]['signal_plus']
            signal_minus = cryspy_in_out_dict[cryspy_block_name]['signal_minus']
            y_calc = signal_plus + signal_minus
        except KeyError:
            print(f'[CryspyCalculator] Error: No calculated data for {cryspy_block_name}')
            return []

        return y_calc

    def _recreate_cryspy_dict(
        self,
        sample_model: SampleModel,
        experiment: Experiment,
    ) -> Dict[str, Any]:
        """
        Recreates the Cryspy dictionary for the given sample model and experiment.

        Args:
            sample_model: The sample model to update.
            experiment: The experiment to update.

        Returns:
            The updated Cryspy dictionary.
        """
        combined_name = f'{sample_model.name}_{experiment.name}'
        cryspy_dict = copy.deepcopy(self._cryspy_dicts[combined_name])

        cryspy_model_id = f'crystal_{sample_model.name}'
        cryspy_model_dict = cryspy_dict[cryspy_model_id]

        # Cell
        cryspy_cell = cryspy_model_dict['unit_cell_parameters']
        cryspy_cell[0] = sample_model.cell.length_a.value
        cryspy_cell[1] = sample_model.cell.length_b.value
        cryspy_cell[2] = sample_model.cell.length_c.value
        cryspy_cell[3] = np.deg2rad(sample_model.cell.angle_alpha.value)
        cryspy_cell[4] = np.deg2rad(sample_model.cell.angle_beta.value)
        cryspy_cell[5] = np.deg2rad(sample_model.cell.angle_gamma.value)

        # Atomic coordinates
        cryspy_xyz = cryspy_model_dict['atom_fract_xyz']
        for idx, atom_site in enumerate(sample_model.atom_sites):
            cryspy_xyz[0][idx] = atom_site.fract_x.value
            cryspy_xyz[1][idx] = atom_site.fract_y.value
            cryspy_xyz[2][idx] = atom_site.fract_z.value

        # Atomic occupancies
        cryspy_occ = cryspy_model_dict['atom_occupancy']
        for idx, atom_site in enumerate(sample_model.atom_sites):
            cryspy_occ[idx] = atom_site.occupancy.value

        # Atomic ADPs - Biso only for now
        cryspy_biso = cryspy_model_dict['atom_b_iso']
        for idx, atom_site in enumerate(sample_model.atom_sites):
            cryspy_biso[idx] = atom_site.b_iso.value

        # ---------- Update experiment parameters ----------
        if experiment.type.beam_mode.value == 'constant wavelength':
            cryspy_expt_name = f'pd_{experiment.name}'
            cryspy_expt_dict = cryspy_dict[cryspy_expt_name]
            # Instrument
            cryspy_expt_dict['offset_ttheta'][0] = np.deg2rad(experiment.instrument.calib_twotheta_offset.value)
            cryspy_expt_dict['wavelength'][0] = experiment.instrument.setup_wavelength.value
            # Peak
            cryspy_resolution = cryspy_expt_dict['resolution_parameters']
            cryspy_resolution[0] = experiment.peak.broad_gauss_u.value
            cryspy_resolution[1] = experiment.peak.broad_gauss_v.value
            cryspy_resolution[2] = experiment.peak.broad_gauss_w.value
            cryspy_resolution[3] = experiment.peak.broad_lorentz_x.value
            cryspy_resolution[4] = experiment.peak.broad_lorentz_y.value

        elif experiment.type.beam_mode.value == 'time-of-flight':
            cryspy_expt_name = f'tof_{experiment.name}'
            cryspy_expt_dict = cryspy_dict[cryspy_expt_name]
            # Instrument
            cryspy_expt_dict['zero'][0] = experiment.instrument.calib_d_to_tof_offset.value
            cryspy_expt_dict['dtt1'][0] = experiment.instrument.calib_d_to_tof_linear.value
            cryspy_expt_dict['dtt2'][0] = experiment.instrument.calib_d_to_tof_quad.value
            cryspy_expt_dict['ttheta_bank'] = np.deg2rad(experiment.instrument.setup_twotheta_bank.value)
            # Peak
            cryspy_sigma = cryspy_expt_dict['profile_sigmas']
            cryspy_sigma[0] = experiment.peak.broad_gauss_sigma_0.value
            cryspy_sigma[1] = experiment.peak.broad_gauss_sigma_1.value
            cryspy_sigma[2] = experiment.peak.broad_gauss_sigma_2.value

            cryspy_beta = cryspy_expt_dict['profile_betas']
            cryspy_beta[0] = experiment.peak.broad_mix_beta_0.value
            cryspy_beta[1] = experiment.peak.broad_mix_beta_1.value

            cryspy_alpha = cryspy_expt_dict['profile_alphas']
            cryspy_alpha[0] = experiment.peak.asym_alpha_0.value
            cryspy_alpha[1] = experiment.peak.asym_alpha_1.value

        return cryspy_dict

    def _recreate_cryspy_obj(
        self,
        sample_model: SampleModel,
        experiment: Experiment,
    ) -> Any:
        """
        Recreates the Cryspy object for the given sample model and experiment.

        Args:
            sample_model: The sample model to recreate.
            experiment: The experiment to recreate.

        Returns:
            The recreated Cryspy object.
        """
        cryspy_obj = str_to_globaln('')

        cryspy_sample_model_cif = self._convert_sample_model_to_cryspy_cif(sample_model)
        cryspy_sample_model_obj = str_to_globaln(cryspy_sample_model_cif)
        cryspy_obj.add_items(cryspy_sample_model_obj.items)

        # Add single experiment to cryspy_obj
        cryspy_experiment_cif = self._convert_experiment_to_cryspy_cif(
            experiment,
            linked_phase=sample_model,
        )

        cryspy_experiment_obj = str_to_globaln(cryspy_experiment_cif)
        cryspy_obj.add_items(cryspy_experiment_obj.items)

        return cryspy_obj

    def _convert_sample_model_to_cryspy_cif(
        self,
        sample_model: SampleModel,
    ) -> str:
        """
        Converts a sample model to a Cryspy CIF string.

        Args:
            sample_model: The sample model to convert.

        Returns:
            The Cryspy CIF string representation of the sample model.
        """
        return sample_model.as_cif()

    def _convert_experiment_to_cryspy_cif(
        self,
        experiment: Experiment,
        linked_phase: Any,
    ) -> str:
        """
        Converts an experiment to a Cryspy CIF string.

        Args:
            experiment: The experiment to convert.
            linked_phase: The linked phase associated with the experiment.

        Returns:
            The Cryspy CIF string representation of the experiment.
        """
        expt_type = getattr(experiment, 'type', None)
        instrument = getattr(experiment, 'instrument', None)
        peak = getattr(experiment, 'peak', None)

        cif_lines = [f'data_{experiment.name}']

        if expt_type is not None:
            cif_lines.append('')
            radiation_probe = expt_type.radiation_probe.value
            radiation_probe = radiation_probe.replace('neutron', 'neutrons')
            radiation_probe = radiation_probe.replace('xray', 'X-rays')
            cif_lines.append(f'_setup_radiation {radiation_probe}')

        if instrument:
            instrument_mapping = {
                'setup_wavelength': '_setup_wavelength',
                'calib_twotheta_offset': '_setup_offset_2theta',
                'setup_twotheta_bank': '_tof_parameters_2theta_bank',
                'calib_d_to_tof_offset': '_tof_parameters_Zero',
                'calib_d_to_tof_linear': '_tof_parameters_Dtt1',
                'calib_d_to_tof_quad': '_tof_parameters_dtt2',
            }
            cif_lines.append('')
            for local_attr_name, engine_key_name in instrument_mapping.items():
                if hasattr(instrument, local_attr_name):
                    attr_value = getattr(instrument, local_attr_name).value
                    cif_lines.append(f'{engine_key_name} {attr_value}')

        if peak:
            peak_mapping = {
                'broad_gauss_u': '_pd_instr_resolution_U',
                'broad_gauss_v': '_pd_instr_resolution_V',
                'broad_gauss_w': '_pd_instr_resolution_W',
                'broad_lorentz_x': '_pd_instr_resolution_X',
                'broad_lorentz_y': '_pd_instr_resolution_Y',
                'broad_gauss_sigma_0': '_tof_profile_sigma0',
                'broad_gauss_sigma_1': '_tof_profile_sigma1',
                'broad_gauss_sigma_2': '_tof_profile_sigma2',
                'broad_mix_beta_0': '_tof_profile_beta0',
                'broad_mix_beta_1': '_tof_profile_beta1',
                'asym_alpha_0': '_tof_profile_alpha0',
                'asym_alpha_1': '_tof_profile_alpha1',
            }
            cif_lines.append('')
            if expt_type.beam_mode.value == 'time-of-flight':
                cif_lines.append('_tof_profile_peak_shape Gauss')
            for local_attr_name, engine_key_name in peak_mapping.items():
                if hasattr(peak, local_attr_name):
                    attr_value = getattr(peak, local_attr_name).value
                    cif_lines.append(f'{engine_key_name} {attr_value}')

        x_data = experiment.datastore.pattern.x
        twotheta_min = float(x_data.min())
        twotheta_max = float(x_data.max())
        cif_lines.append('')
        if expt_type.beam_mode.value == 'constant wavelength':
            cif_lines.append(f'_range_2theta_min {twotheta_min}')
            cif_lines.append(f'_range_2theta_max {twotheta_max}')
        elif expt_type.beam_mode.value == 'time-of-flight':
            cif_lines.append(f'_range_time_min {twotheta_min}')
            cif_lines.append(f'_range_time_max {twotheta_max}')

        cif_lines.append('')
        cif_lines.append('loop_')
        cif_lines.append('_phase_label')
        cif_lines.append('_phase_scale')
        cif_lines.append(f'{linked_phase.name} 1.0')

        if expt_type.beam_mode.value == 'constant wavelength':
            cif_lines.append('')
            cif_lines.append('loop_')
            cif_lines.append('_pd_background_2theta')
            cif_lines.append('_pd_background_intensity')
            cif_lines.append(f'{twotheta_min} 0.0')
            cif_lines.append(f'{twotheta_max} 0.0')
        elif expt_type.beam_mode.value == 'time-of-flight':
            cif_lines.append('')
            cif_lines.append('loop_')
            cif_lines.append('_tof_backgroundpoint_time')
            cif_lines.append('_tof_backgroundpoint_intensity')
            cif_lines.append(f'{twotheta_min} 0.0')
            cif_lines.append(f'{twotheta_max} 0.0')

        if expt_type.beam_mode.value == 'constant wavelength':
            cif_lines.append('')
            cif_lines.append('loop_')
            cif_lines.append('_pd_meas_2theta')
            cif_lines.append('_pd_meas_intensity')
            cif_lines.append('_pd_meas_intensity_sigma')
        elif expt_type.beam_mode.value == 'time-of-flight':
            cif_lines.append('')
            cif_lines.append('loop_')
            cif_lines.append('_tof_meas_time')
            cif_lines.append('_tof_meas_intensity')
            cif_lines.append('_tof_meas_intensity_sigma')

        y_data = experiment.datastore.pattern.meas
        sy_data = experiment.datastore.pattern.meas_su
        for x_val, y_val, sy_val in zip(x_data, y_data, sy_data):
            cif_lines.append(f'  {x_val:.5f}   {y_val:.5f}   {sy_val:.5f}')

        cryspy_experiment_cif = '\n'.join(cif_lines)

        return cryspy_experiment_cif
calculate_structure_factors(sample_model, experiment)

Raises a NotImplementedError as HKL calculation is not implemented.

Parameters:

Name Type Description Default
sample_model SampleModel

The sample model to calculate structure factors for.

required
experiment Experiment

The experiment associated with the sample models.

required
Source code in src/easydiffraction/analysis/calculators/calculator_cryspy.py
46
47
48
49
50
51
52
53
54
55
56
57
58
def calculate_structure_factors(
    self,
    sample_model: SampleModel,
    experiment: Experiment,
) -> None:
    """
    Raises a NotImplementedError as HKL calculation is not implemented.

    Args:
        sample_model: The sample model to calculate structure factors for.
        experiment: The experiment associated with the sample models.
    """
    raise NotImplementedError('HKL calculation is not implemented for CryspyCalculator.')

calculator_pdffit

PdffitCalculator

Bases: CalculatorBase

Wrapper for Pdffit library.

Source code in src/easydiffraction/analysis/calculators/calculator_pdffit.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
class PdffitCalculator(CalculatorBase):
    """
    Wrapper for Pdffit library.
    """

    engine_imported: bool = pdffit is not None

    @property
    def name(self):
        return 'pdffit'

    def calculate_structure_factors(self, sample_models, experiments):
        # PDF doesn't compute HKL but we keep interface consistent
        print('[pdffit] Calculating HKLs (not applicable)...')
        return []

    def _calculate_single_model_pattern(
        self,
        sample_model: SampleModel,
        experiment: Experiment,
        called_by_minimizer: bool = False,
    ):
        # Create PDF calculator object
        calculator = pdffit()

        # ---------------------------
        # Set sample model parameters
        # ---------------------------

        # TODO: move CIF v2 -> CIF v1 conversion to a separate module
        # Convert the sample model to CIF supported by PDFfit
        cif_string_v2 = sample_model.as_cif()
        # convert to version 1 of CIF format
        # this means: replace all dots with underscores for
        # cases where the dot is surrounded by letters on both sides.
        pattern = r'(?<=[a-zA-Z])\.(?=[a-zA-Z])'
        cif_string_v1 = re.sub(pattern, '_', cif_string_v2)

        # Create the PDFit structure
        structure = pdffit_cif_parser().parse(cif_string_v1)

        # Set all model parameters:
        # space group, cell parameters, and atom sites (including ADPs)
        calculator.add_structure(structure)

        # -------------------------
        # Set experiment parameters
        # -------------------------

        # Set some peak-related parameters
        calculator.setvar('pscale', experiment.linked_phases[sample_model.name].scale.value)
        calculator.setvar('delta1', experiment.peak.sharp_delta_1.value)
        calculator.setvar('delta2', experiment.peak.sharp_delta_2.value)
        calculator.setvar('spdiameter', experiment.peak.damp_particle_diameter.value)

        # Data
        pattern = experiment.datastore.pattern
        x = list(pattern.x)
        y_noise = list(np.zeros_like(pattern.x))

        # Assign the data to the PDFfit calculator
        calculator.read_data_lists(
            stype=experiment.type.radiation_probe.value[0].upper(),
            qmax=experiment.peak.cutoff_q.value,
            qdamp=experiment.peak.damp_q.value,
            r_data=x,
            Gr_data=y_noise,
        )

        # qbroad must be set after read_data_lists
        calculator.setvar('qbroad', experiment.peak.broad_q.value)

        # -----------------
        # Calculate pattern
        # -----------------

        # Calculate the PDF pattern
        calculator.calc()

        # Get the calculated PDF pattern
        pattern = calculator.getpdf_fit()
        pattern = np.array(pattern)

        return pattern

collections

joint_fit_experiments

JointFitExperiments

Bases: Collection

Collection manager for experiments that are fitted together in a joint fit.

Source code in src/easydiffraction/analysis/collections/joint_fit_experiments.py
43
44
45
46
47
48
49
50
51
52
53
54
55
class JointFitExperiments(Collection):
    """
    Collection manager for experiments that are fitted together
    in a `joint` fit.
    """

    @property
    def _type(self) -> str:
        return 'category'  # datablock or category

    @property
    def _child_class(self) -> Type[JointFitExperiment]:
        return JointFitExperiment

minimization

DiffractionMinimizer

Handles the fitting workflow using a pluggable minimizer.

Source code in src/easydiffraction/analysis/minimization.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
class DiffractionMinimizer:
    """
    Handles the fitting workflow using a pluggable minimizer.
    """

    def __init__(self, selection: str = 'lmfit (leastsq)') -> None:
        self.selection: str = selection
        self.engine: str = selection.split(' ')[0]  # Extracts 'lmfit' or 'dfols'
        self.minimizer = MinimizerFactory.create_minimizer(selection)
        self.results: Optional[FitResults] = None

    def fit(
        self,
        sample_models: SampleModels,
        experiments: Experiments,
        calculator: Any,
        weights: Optional[np.array] = None,
    ) -> None:
        """
        Run the fitting process.

        Args:
            sample_models: Collection of sample models.
            experiments: Collection of experiments.
            calculator: The calculator to use for pattern generation.
            weights: Optional weights for joint fitting.

        """
        params = sample_models.get_free_params() + experiments.get_free_params()

        if not params:
            print('⚠️ No parameters selected for fitting.')
            return None

        for param in params:
            param.start_value = param.value

        def objective_function(engine_params: Dict[str, Any]) -> np.ndarray:
            return self._residual_function(
                engine_params=engine_params,
                parameters=params,
                sample_models=sample_models,
                experiments=experiments,
                calculator=calculator,
                weights=weights,
            )

        # Perform fitting
        self.results = self.minimizer.fit(params, objective_function)

        # Post-fit processing
        self._process_fit_results(sample_models, experiments, calculator)

    def _process_fit_results(
        self,
        sample_models: SampleModels,
        experiments: Experiments,
        calculator: CalculatorBase,
    ) -> None:
        """
        Collect reliability inputs and display results after fitting.

        Args:
            sample_models: Collection of sample models.
            experiments: Collection of experiments.
            calculator: The calculator used for pattern generation.
        """
        y_obs, y_calc, y_err = get_reliability_inputs(
            sample_models,
            experiments,
            calculator,
        )

        # Placeholder for future f_obs / f_calc retrieval
        f_obs, f_calc = None, None

        if self.results:
            self.results.display_results(
                y_obs=y_obs,
                y_calc=y_calc,
                y_err=y_err,
                f_obs=f_obs,
                f_calc=f_calc,
            )

    def _collect_free_parameters(
        self,
        sample_models: SampleModels,
        experiments: Experiments,
    ) -> List[Parameter]:
        """
        Collect free parameters from sample models and experiments.

        Args:
            sample_models: Collection of sample models.
            experiments: Collection of experiments.

        Returns:
            List of free parameters.
        """
        free_params: List[Parameter] = sample_models.get_free_params() + experiments.get_free_params()
        return free_params

    def _residual_function(
        self,
        engine_params: Dict[str, Any],
        parameters: List[Parameter],
        sample_models: SampleModels,
        experiments: Experiments,
        calculator: CalculatorBase,
        weights: Optional[np.array] = None,
    ) -> np.ndarray:
        """
        Residual function computes the difference between measured and calculated patterns.
        It updates the parameter values according to the optimizer-provided engine_params.

        Args:
            engine_params: Engine-specific parameter dict.
            parameters: List of parameters being optimized.
            sample_models: Collection of sample models.
            experiments: Collection of experiments.
            calculator: The calculator to use for pattern generation.
            weights: Optional weights for joint fitting.

        Returns:
            Array of weighted residuals.
        """
        # Sync parameters back to objects
        self.minimizer._sync_result_to_parameters(parameters, engine_params)

        # Prepare weights for joint fitting
        num_expts: int = len(experiments.ids)
        if weights is None:
            _weights = np.ones(num_expts)
        else:
            _weights_list: List[float] = []
            for id in experiments.ids:
                _weight = weights._items[id].weight.value
                _weights_list.append(_weight)
            _weights = np.array(_weights_list, dtype=np.float64)

        # Normalize weights so they sum to num_expts
        # We should obtain the same reduced chi_squared when a single dataset is split into
        # two parts and fit together. If weights sum to one, then reduced chi_squared
        # will be half as large as expected.
        _weights *= num_expts / np.sum(_weights)
        residuals: List[float] = []

        for (expt_id, experiment), weight in zip(experiments._items.items(), _weights):
            # Calculate the difference between measured and calculated patterns
            y_calc: np.ndarray = calculator.calculate_pattern(
                sample_models,
                experiment,
                called_by_minimizer=True,
            )
            y_meas: np.ndarray = experiment.datastore.pattern.meas
            y_meas_su: np.ndarray = experiment.datastore.pattern.meas_su
            diff = (y_meas - y_calc) / y_meas_su

            # Residuals are squared before going into reduced chi-squared
            diff *= np.sqrt(weight)

            # Append the residuals for this experiment
            residuals.extend(diff)

        return self.minimizer.tracker.track(np.array(residuals), parameters)

fit(sample_models, experiments, calculator, weights=None)

Run the fitting process.

Parameters:

Name Type Description Default
sample_models SampleModels

Collection of sample models.

required
experiments Experiments

Collection of experiments.

required
calculator Any

The calculator to use for pattern generation.

required
weights Optional[array]

Optional weights for joint fitting.

None
Source code in src/easydiffraction/analysis/minimization.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def fit(
    self,
    sample_models: SampleModels,
    experiments: Experiments,
    calculator: Any,
    weights: Optional[np.array] = None,
) -> None:
    """
    Run the fitting process.

    Args:
        sample_models: Collection of sample models.
        experiments: Collection of experiments.
        calculator: The calculator to use for pattern generation.
        weights: Optional weights for joint fitting.

    """
    params = sample_models.get_free_params() + experiments.get_free_params()

    if not params:
        print('⚠️ No parameters selected for fitting.')
        return None

    for param in params:
        param.start_value = param.value

    def objective_function(engine_params: Dict[str, Any]) -> np.ndarray:
        return self._residual_function(
            engine_params=engine_params,
            parameters=params,
            sample_models=sample_models,
            experiments=experiments,
            calculator=calculator,
            weights=weights,
        )

    # Perform fitting
    self.results = self.minimizer.fit(params, objective_function)

    # Post-fit processing
    self._process_fit_results(sample_models, experiments, calculator)

minimizers

fitting_progress_tracker

FittingProgressTracker

Tracks and reports the reduced chi-square during the optimization process.

Source code in src/easydiffraction/analysis/minimizers/fitting_progress_tracker.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
class FittingProgressTracker:
    """
    Tracks and reports the reduced chi-square during the optimization process.
    """

    def __init__(self) -> None:
        self._iteration: int = 0
        self._previous_chi2: Optional[float] = None
        self._last_chi2: Optional[float] = None
        self._last_iteration: Optional[int] = None
        self._best_chi2: Optional[float] = None
        self._best_iteration: Optional[int] = None
        self._fitting_time: Optional[float] = None

        self._df_rows: List[List[str]] = []
        self._display_handle: Optional[DisplayHandle] = None

    def reset(self) -> None:
        self._iteration = 0
        self._previous_chi2 = None
        self._last_chi2 = None
        self._last_iteration = None
        self._best_chi2 = None
        self._best_iteration = None
        self._fitting_time = None

    def track(
        self,
        residuals: np.ndarray,
        parameters: List[float],
    ) -> np.ndarray:
        """
        Track chi-square progress during the optimization process.

        Parameters:
            residuals (np.ndarray): Array of residuals between measured and calculated data.
            parameters (list): List of free parameters being fitted.

        Returns:
            np.ndarray: Residuals unchanged, for optimizer consumption.
        """
        self._iteration += 1

        reduced_chi2 = calculate_reduced_chi_square(residuals, len(parameters))

        row: List[str] = []

        # First iteration, initialize tracking
        if self._previous_chi2 is None:
            self._previous_chi2 = reduced_chi2
            self._best_chi2 = reduced_chi2
            self._best_iteration = self._iteration

            row = [
                str(self._iteration),
                f'{reduced_chi2:.2f}',
                '',
            ]

        # Improvement check
        elif (self._previous_chi2 - reduced_chi2) / self._previous_chi2 > SIGNIFICANT_CHANGE_THRESHOLD:
            change_percent = (self._previous_chi2 - reduced_chi2) / self._previous_chi2 * 100

            row = [
                str(self._iteration),
                f'{reduced_chi2:.2f}',
                f'{change_percent:.1f}% ↓',
            ]

            self._previous_chi2 = reduced_chi2

        # Output if there is something new to display
        if row:
            self.add_tracking_info(row)

        # Update best chi-square if better
        if reduced_chi2 < self._best_chi2:
            self._best_chi2 = reduced_chi2
            self._best_iteration = self._iteration

        # Store last chi-square and iteration
        self._last_chi2 = reduced_chi2
        self._last_iteration = self._iteration

        return residuals

    @property
    def best_chi2(self) -> Optional[float]:
        return self._best_chi2

    @property
    def best_iteration(self) -> Optional[int]:
        return self._best_iteration

    @property
    def iteration(self) -> int:
        return self._iteration

    @property
    def fitting_time(self) -> Optional[float]:
        return self._fitting_time

    def start_timer(self) -> None:
        self._start_time = time.perf_counter()

    def stop_timer(self) -> None:
        self._end_time = time.perf_counter()
        self._fitting_time = self._end_time - self._start_time

    def start_tracking(self, minimizer_name: str) -> None:
        print(f"🚀 Starting fit process with '{minimizer_name}'...")
        print('📈 Goodness-of-fit (reduced χ²) change:')

        if is_notebook() and display is not None:
            # Reset the DataFrame rows
            self._df_rows = []

            # Recreate display handle for updating the table
            self._display_handle = DisplayHandle()

            # Create placeholder for display
            self._display_handle.display(HTML(''))

            # Show empty table with headers
            render_table(
                columns_data=self._df_rows,
                columns_alignment=DEFAULT_ALIGNMENTS,
                columns_headers=DEFAULT_HEADERS,
                display_handle=self._display_handle,
            )
        else:
            # Top border
            print('╒' + '╤'.join(['═' * FIXED_WIDTH for _ in DEFAULT_HEADERS]) + '╕')

            # Header row (all centered)
            header_row = '│' + '│'.join([format_cell(h, align='center') for h in DEFAULT_HEADERS]) + '│'
            print(header_row)

            # Separator
            print('╞' + '╪'.join(['═' * FIXED_WIDTH for _ in DEFAULT_HEADERS]) + '╡')

    def add_tracking_info(self, row: List[str]) -> None:
        if is_notebook() and display is not None:
            # Add row to DataFrame
            self._df_rows.append(row)

            # Show fully updated table
            render_table(
                columns_data=self._df_rows,
                columns_alignment=DEFAULT_ALIGNMENTS,
                columns_headers=DEFAULT_HEADERS,
                display_handle=self._display_handle,
            )
        else:
            # Alignments for each column
            formatted_row = (
                '│' + '│'.join([format_cell(cell, align=DEFAULT_ALIGNMENTS[i]) for i, cell in enumerate(row)]) + '│'
            )

            # Print the new row
            print(formatted_row)

    def finish_tracking(self) -> None:
        # Add last iteration as last row
        row: List[str] = [
            str(self._last_iteration),
            f'{self._last_chi2:.2f}' if self._last_chi2 is not None else '',
            '',
        ]
        self.add_tracking_info(row)

        # Bottom border for terminal only
        if not is_notebook() or display is None:
            # Bottom border for terminal only
            print('╘' + '╧'.join(['═' * FIXED_WIDTH for _ in range(len(row))]) + '╛')

        # Print best result
        print(f'🏆 Best goodness-of-fit (reduced χ²) is {self._best_chi2:.2f} at iteration {self._best_iteration}')
        print('✅ Fitting complete.')
track(residuals, parameters)

Track chi-square progress during the optimization process.

Parameters:

Name Type Description Default
residuals ndarray

Array of residuals between measured and calculated data.

required
parameters list

List of free parameters being fitted.

required

Returns:

Type Description
ndarray

np.ndarray: Residuals unchanged, for optimizer consumption.

Source code in src/easydiffraction/analysis/minimizers/fitting_progress_tracker.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def track(
    self,
    residuals: np.ndarray,
    parameters: List[float],
) -> np.ndarray:
    """
    Track chi-square progress during the optimization process.

    Parameters:
        residuals (np.ndarray): Array of residuals between measured and calculated data.
        parameters (list): List of free parameters being fitted.

    Returns:
        np.ndarray: Residuals unchanged, for optimizer consumption.
    """
    self._iteration += 1

    reduced_chi2 = calculate_reduced_chi_square(residuals, len(parameters))

    row: List[str] = []

    # First iteration, initialize tracking
    if self._previous_chi2 is None:
        self._previous_chi2 = reduced_chi2
        self._best_chi2 = reduced_chi2
        self._best_iteration = self._iteration

        row = [
            str(self._iteration),
            f'{reduced_chi2:.2f}',
            '',
        ]

    # Improvement check
    elif (self._previous_chi2 - reduced_chi2) / self._previous_chi2 > SIGNIFICANT_CHANGE_THRESHOLD:
        change_percent = (self._previous_chi2 - reduced_chi2) / self._previous_chi2 * 100

        row = [
            str(self._iteration),
            f'{reduced_chi2:.2f}',
            f'{change_percent:.1f}% ↓',
        ]

        self._previous_chi2 = reduced_chi2

    # Output if there is something new to display
    if row:
        self.add_tracking_info(row)

    # Update best chi-square if better
    if reduced_chi2 < self._best_chi2:
        self._best_chi2 = reduced_chi2
        self._best_iteration = self._iteration

    # Store last chi-square and iteration
    self._last_chi2 = reduced_chi2
    self._last_iteration = self._iteration

    return residuals

minimizer_base

MinimizerBase

Bases: ABC

Abstract base class for minimizer implementations. Provides shared logic and structure for concrete minimizers.

Source code in src/easydiffraction/analysis/minimizers/minimizer_base.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
class MinimizerBase(ABC):
    """
    Abstract base class for minimizer implementations.
    Provides shared logic and structure for concrete minimizers.
    """

    def __init__(
        self,
        name: Optional[str] = None,
        method: Optional[str] = None,
        max_iterations: Optional[int] = None,
    ) -> None:
        self.name: Optional[str] = name
        self.method: Optional[str] = method
        self.max_iterations: Optional[int] = max_iterations
        self.result: Optional[FitResults] = None
        self._previous_chi2: Optional[float] = None
        self._iteration: Optional[int] = None
        self._best_chi2: Optional[float] = None
        self._best_iteration: Optional[int] = None
        self._fitting_time: Optional[float] = None
        self.tracker: FittingProgressTracker = FittingProgressTracker()

    def _start_tracking(self, minimizer_name: str) -> None:
        self.tracker.reset()
        self.tracker.start_tracking(minimizer_name)
        self.tracker.start_timer()

    def _stop_tracking(self) -> None:
        self.tracker.stop_timer()
        self.tracker.finish_tracking()

    @abstractmethod
    def _prepare_solver_args(self, parameters: List[Any]) -> Dict[str, Any]:
        """
        Prepare the solver arguments directly from the list of free parameters.
        """
        pass

    @abstractmethod
    def _run_solver(
        self,
        objective_function: Callable[..., Any],
        engine_parameters: Dict[str, Any],
    ) -> Any:
        pass

    @abstractmethod
    def _sync_result_to_parameters(
        self,
        raw_result: Any,
        parameters: List[Any],
    ) -> None:
        pass

    def _finalize_fit(
        self,
        parameters: List[Any],
        raw_result: Any,
    ) -> FitResults:
        self._sync_result_to_parameters(parameters, raw_result)
        success = self._check_success(raw_result)
        self.result = FitResults(
            success=success,
            parameters=parameters,
            reduced_chi_square=self.tracker.best_chi2,
            engine_result=raw_result,
            starting_parameters=parameters,
            fitting_time=self.tracker.fitting_time,
        )
        return self.result

    @abstractmethod
    def _check_success(self, raw_result: Any) -> bool:
        """
        Determine whether the fit was successful.
        This must be implemented by concrete minimizers.
        """
        pass

    def fit(
        self,
        parameters: List[Any],
        objective_function: Callable[..., Any],
    ) -> FitResults:
        minimizer_name = self.name or 'Unnamed Minimizer'
        if self.method is not None:
            minimizer_name += f' ({self.method})'

        self._start_tracking(minimizer_name)

        solver_args = self._prepare_solver_args(parameters)
        raw_result = self._run_solver(objective_function, **solver_args)

        self._stop_tracking()

        result = self._finalize_fit(parameters, raw_result)

        return result

    def _objective_function(
        self,
        engine_params: Dict[str, Any],
        parameters: List[Any],
        sample_models: Any,
        experiments: Any,
        calculator: Any,
    ) -> np.ndarray:
        return self._compute_residuals(
            engine_params,
            parameters,
            sample_models,
            experiments,
            calculator,
        )

    def _create_objective_function(
        self,
        parameters: List[Any],
        sample_models: Any,
        experiments: Any,
        calculator: Any,
    ) -> Callable[[Dict[str, Any]], np.ndarray]:
        return lambda engine_params: self._objective_function(
            engine_params,
            parameters,
            sample_models,
            experiments,
            calculator,
        )

minimizer_dfols

DfolsMinimizer

Bases: MinimizerBase

Minimizer using the DFO-LS package (Derivative-Free Optimization for Least-Squares).

Source code in src/easydiffraction/analysis/minimizers/minimizer_dfols.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class DfolsMinimizer(MinimizerBase):
    """
    Minimizer using the DFO-LS package (Derivative-Free Optimization for Least-Squares).
    """

    def __init__(
        self,
        name: str = 'dfols',
        max_iterations: int = DEFAULT_MAX_ITERATIONS,
        **kwargs: Any,
    ) -> None:
        super().__init__(name=name, method=None, max_iterations=max_iterations)

    def _prepare_solver_args(self, parameters: List[Any]) -> Dict[str, Any]:
        x0 = []
        bounds_lower = []
        bounds_upper = []
        for param in parameters:
            x0.append(param.value)
            bounds_lower.append(param.min if param.min is not None else -np.inf)
            bounds_upper.append(param.max if param.max is not None else np.inf)
        bounds = (np.array(bounds_lower), np.array(bounds_upper))
        return {'x0': np.array(x0), 'bounds': bounds}

    def _run_solver(self, objective_function: Any, **kwargs: Any) -> Any:
        x0 = kwargs.get('x0')
        bounds = kwargs.get('bounds')
        return solve(objective_function, x0=x0, bounds=bounds, maxfun=self.max_iterations)

    def _sync_result_to_parameters(
        self,
        parameters: List[Any],
        raw_result: Any,
    ) -> None:
        """
        Synchronizes the result from the solver to the parameters.

        Args:
            parameters: List of parameters being optimized.
            raw_result: The result object returned by the solver.
        """
        # Ensure compatibility with raw_result coming from dfols.solve()
        if hasattr(raw_result, 'x'):
            result_values = raw_result.x
        else:
            result_values = raw_result  # fallback for raw_result being directly a list/array

        for i, param in enumerate(parameters):
            param.value = result_values[i]
            # DFO-LS doesn't provide uncertainties; set to None or calculate later if needed
            param.uncertainty = None

    def _check_success(self, raw_result: Any) -> bool:
        """
        Determines success from DFO-LS result dictionary.

        Args:
            raw_result: The result object returned by the solver.

        Returns:
            True if the optimization was successful, False otherwise.
        """
        return raw_result.flag == raw_result.EXIT_SUCCESS

minimizer_factory

MinimizerFactory

Source code in src/easydiffraction/analysis/minimizers/minimizer_factory.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
class MinimizerFactory:
    _available_minimizers: Dict[str, Dict[str, Any]] = {
        'lmfit': {
            'engine': 'lmfit',
            'method': 'leastsq',
            'description': 'LMFIT library using the default Levenberg-Marquardt least squares method',
            'class': LmfitMinimizer,
        },
        'lmfit (leastsq)': {
            'engine': 'lmfit',
            'method': 'leastsq',
            'description': 'LMFIT library with Levenberg-Marquardt least squares method',
            'class': LmfitMinimizer,
        },
        'lmfit (least_squares)': {
            'engine': 'lmfit',
            'method': 'least_squares',
            'description': 'LMFIT library with SciPy’s trust region reflective algorithm',
            'class': LmfitMinimizer,
        },
        'dfols': {
            'engine': 'dfols',
            'method': None,
            'description': 'DFO-LS library for derivative-free least-squares optimization',
            'class': DfolsMinimizer,
        },
    }

    @classmethod
    def list_available_minimizers(cls) -> List[str]:
        """
        List all available minimizers.

        Returns:
            A list of minimizer names.
        """
        return list(cls._available_minimizers.keys())

    @classmethod
    def show_available_minimizers(cls) -> None:
        """
        Display a table of available minimizers and their descriptions.
        """
        columns_headers: List[str] = ['Minimizer', 'Description']
        columns_alignment = ['left', 'left']
        columns_data: List[List[str]] = []
        for name, config in cls._available_minimizers.items():
            description: str = config.get('description', 'No description provided.')
            columns_data.append([name, description])

        print(paragraph('Supported minimizers'))
        render_table(
            columns_headers=columns_headers,
            columns_alignment=columns_alignment,
            columns_data=columns_data,
        )

    @classmethod
    def create_minimizer(cls, selection: str) -> MinimizerBase:
        """
        Create a minimizer instance based on the selection.

        Args:
            selection: The name of the minimizer to create.

        Returns:
            An instance of the selected minimizer.

        Raises:
            ValueError: If the selection is not a valid minimizer.
        """
        config = cls._available_minimizers.get(selection)
        if not config:
            raise ValueError(f"Unknown minimizer '{selection}'. Use one of {cls.list_available_minimizers()}")

        minimizer_class: Type[MinimizerBase] = config.get('class')
        method: Optional[str] = config.get('method')

        kwargs: Dict[str, Any] = {}
        if method is not None:
            kwargs['method'] = method

        return minimizer_class(**kwargs)

    @classmethod
    def register_minimizer(
        cls,
        name: str,
        minimizer_cls: Type[MinimizerBase],
        method: Optional[str] = None,
        description: str = 'No description provided.',
    ) -> None:
        """
        Register a new minimizer.

        Args:
            name: The name of the minimizer.
            minimizer_cls: The class of the minimizer.
            method: The method used by the minimizer (optional).
            description: A description of the minimizer.
        """
        cls._available_minimizers[name] = {
            'engine': name,
            'method': method,
            'description': description,
            'class': minimizer_cls,
        }
create_minimizer(selection) classmethod

Create a minimizer instance based on the selection.

Parameters:

Name Type Description Default
selection str

The name of the minimizer to create.

required

Returns:

Type Description
MinimizerBase

An instance of the selected minimizer.

Raises:

Type Description
ValueError

If the selection is not a valid minimizer.

Source code in src/easydiffraction/analysis/minimizers/minimizer_factory.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
@classmethod
def create_minimizer(cls, selection: str) -> MinimizerBase:
    """
    Create a minimizer instance based on the selection.

    Args:
        selection: The name of the minimizer to create.

    Returns:
        An instance of the selected minimizer.

    Raises:
        ValueError: If the selection is not a valid minimizer.
    """
    config = cls._available_minimizers.get(selection)
    if not config:
        raise ValueError(f"Unknown minimizer '{selection}'. Use one of {cls.list_available_minimizers()}")

    minimizer_class: Type[MinimizerBase] = config.get('class')
    method: Optional[str] = config.get('method')

    kwargs: Dict[str, Any] = {}
    if method is not None:
        kwargs['method'] = method

    return minimizer_class(**kwargs)
list_available_minimizers() classmethod

List all available minimizers.

Returns:

Type Description
List[str]

A list of minimizer names.

Source code in src/easydiffraction/analysis/minimizers/minimizer_factory.py
46
47
48
49
50
51
52
53
54
@classmethod
def list_available_minimizers(cls) -> List[str]:
    """
    List all available minimizers.

    Returns:
        A list of minimizer names.
    """
    return list(cls._available_minimizers.keys())
register_minimizer(name, minimizer_cls, method=None, description='No description provided.') classmethod

Register a new minimizer.

Parameters:

Name Type Description Default
name str

The name of the minimizer.

required
minimizer_cls Type[MinimizerBase]

The class of the minimizer.

required
method Optional[str]

The method used by the minimizer (optional).

None
description str

A description of the minimizer.

'No description provided.'
Source code in src/easydiffraction/analysis/minimizers/minimizer_factory.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
@classmethod
def register_minimizer(
    cls,
    name: str,
    minimizer_cls: Type[MinimizerBase],
    method: Optional[str] = None,
    description: str = 'No description provided.',
) -> None:
    """
    Register a new minimizer.

    Args:
        name: The name of the minimizer.
        minimizer_cls: The class of the minimizer.
        method: The method used by the minimizer (optional).
        description: A description of the minimizer.
    """
    cls._available_minimizers[name] = {
        'engine': name,
        'method': method,
        'description': description,
        'class': minimizer_cls,
    }
show_available_minimizers() classmethod

Display a table of available minimizers and their descriptions.

Source code in src/easydiffraction/analysis/minimizers/minimizer_factory.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@classmethod
def show_available_minimizers(cls) -> None:
    """
    Display a table of available minimizers and their descriptions.
    """
    columns_headers: List[str] = ['Minimizer', 'Description']
    columns_alignment = ['left', 'left']
    columns_data: List[List[str]] = []
    for name, config in cls._available_minimizers.items():
        description: str = config.get('description', 'No description provided.')
        columns_data.append([name, description])

    print(paragraph('Supported minimizers'))
    render_table(
        columns_headers=columns_headers,
        columns_alignment=columns_alignment,
        columns_data=columns_data,
    )

minimizer_lmfit

LmfitMinimizer

Bases: MinimizerBase

Minimizer using the lmfit package.

Source code in src/easydiffraction/analysis/minimizers/minimizer_lmfit.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
class LmfitMinimizer(MinimizerBase):
    """
    Minimizer using the lmfit package.
    """

    def __init__(
        self,
        name: str = 'lmfit',
        method: str = DEFAULT_METHOD,
        max_iterations: int = DEFAULT_MAX_ITERATIONS,
    ) -> None:
        super().__init__(
            name=name,
            method=method,
            max_iterations=max_iterations,
        )

    def _prepare_solver_args(
        self,
        parameters: List[Any],
    ) -> Dict[str, Any]:
        """
        Prepares the solver arguments for the lmfit minimizer.

        Args:
            parameters: List of parameters to be optimized.

        Returns:
            A dictionary containing the prepared lmfit.Parameters object.
        """
        engine_parameters = lmfit.Parameters()
        for param in parameters:
            engine_parameters.add(
                name=param.minimizer_uid,
                value=param.value,
                vary=param.free,
                min=param.min,
                max=param.max,
            )
        return {'engine_parameters': engine_parameters}

    def _run_solver(self, objective_function: Any, **kwargs: Any) -> Any:
        """
        Runs the lmfit solver.

        Args:
            objective_function: The objective function to minimize.
            **kwargs: Additional arguments for the solver.

        Returns:
            The result of the lmfit minimization.
        """
        engine_parameters = kwargs.get('engine_parameters')

        return lmfit.minimize(
            objective_function,
            params=engine_parameters,
            method=self.method,
            nan_policy='propagate',
            max_nfev=self.max_iterations,
        )

    def _sync_result_to_parameters(
        self,
        parameters: List[Any],
        raw_result: Any,
    ) -> None:
        """
        Synchronizes the result from the solver to the parameters.

        Args:
            parameters: List of parameters being optimized.
            raw_result: The result object returned by the solver.
        """
        if hasattr(raw_result, 'params'):
            param_values = raw_result.params
        else:
            param_values = raw_result  # fallback if params attribute is not present

        for param in parameters:
            param_result = param_values.get(param.minimizer_uid)
            if param_result is not None:
                param.value = param_result.value
                param.uncertainty = getattr(param_result, 'stderr', None)

    def _check_success(self, raw_result: Any) -> bool:
        """
        Determines success from lmfit MinimizerResult.

        Args:
            raw_result: The result object returned by the solver.

        Returns:
            True if the optimization was successful, False otherwise.
        """
        return getattr(raw_result, 'success', False)

    def _iteration_callback(
        self,
        params: lmfit.Parameters,
        iter: int,
        resid: Any,
        *args: Any,
        **kwargs: Any,
    ) -> None:
        """
        Callback function for each iteration of the minimizer.

        Args:
            params: The current parameters.
            iter: The current iteration number.
            resid: The residuals.
            *args: Additional positional arguments.
            **kwargs: Additional keyword arguments.
        """
        self._iteration = iter

reliability_factors

calculate_r_factor(y_obs, y_calc)

Calculate the R-factor (reliability factor) between observed and calculated data.

Parameters:

Name Type Description Default
y_obs ndarray

Observed data points.

required
y_calc ndarray

Calculated data points.

required

Returns:

Type Description
float

R-factor value.

Source code in src/easydiffraction/analysis/reliability_factors.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def calculate_r_factor(
    y_obs: np.ndarray,
    y_calc: np.ndarray,
) -> float:
    """
    Calculate the R-factor (reliability factor) between observed and calculated data.

    Args:
        y_obs: Observed data points.
        y_calc: Calculated data points.

    Returns:
        R-factor value.
    """
    y_obs = np.asarray(y_obs)
    y_calc = np.asarray(y_calc)
    numerator = np.sum(np.abs(y_obs - y_calc))
    denominator = np.sum(np.abs(y_obs))
    return numerator / denominator if denominator != 0 else np.nan

calculate_r_factor_squared(y_obs, y_calc)

Calculate the R-factor squared between observed and calculated data.

Parameters:

Name Type Description Default
y_obs ndarray

Observed data points.

required
y_calc ndarray

Calculated data points.

required

Returns:

Type Description
float

R-factor squared value.

Source code in src/easydiffraction/analysis/reliability_factors.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def calculate_r_factor_squared(
    y_obs: np.ndarray,
    y_calc: np.ndarray,
) -> float:
    """
    Calculate the R-factor squared between observed and calculated data.

    Args:
        y_obs: Observed data points.
        y_calc: Calculated data points.

    Returns:
        R-factor squared value.
    """
    y_obs = np.asarray(y_obs)
    y_calc = np.asarray(y_calc)
    numerator = np.sum((y_obs - y_calc) ** 2)
    denominator = np.sum(y_obs**2)
    return np.sqrt(numerator / denominator) if denominator != 0 else np.nan

calculate_rb_factor(y_obs, y_calc)

Calculate the Bragg R-factor between observed and calculated data.

Parameters:

Name Type Description Default
y_obs ndarray

Observed data points.

required
y_calc ndarray

Calculated data points.

required

Returns:

Type Description
float

Bragg R-factor value.

Source code in src/easydiffraction/analysis/reliability_factors.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def calculate_rb_factor(
    y_obs: np.ndarray,
    y_calc: np.ndarray,
) -> float:
    """
    Calculate the Bragg R-factor between observed and calculated data.

    Args:
        y_obs: Observed data points.
        y_calc: Calculated data points.

    Returns:
        Bragg R-factor value.
    """
    y_obs = np.asarray(y_obs)
    y_calc = np.asarray(y_calc)
    numerator = np.sum(np.abs(y_obs - y_calc))
    denominator = np.sum(y_obs)
    return numerator / denominator if denominator != 0 else np.nan

calculate_reduced_chi_square(residuals, num_parameters)

Calculate the reduced chi-square statistic.

Parameters:

Name Type Description Default
residuals ndarray

Residuals between observed and calculated data.

required
num_parameters int

Number of free parameters used in the model.

required

Returns:

Type Description
float

Reduced chi-square value.

Source code in src/easydiffraction/analysis/reliability_factors.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def calculate_reduced_chi_square(
    residuals: np.ndarray,
    num_parameters: int,
) -> float:
    """
    Calculate the reduced chi-square statistic.

    Args:
        residuals: Residuals between observed and calculated data.
        num_parameters: Number of free parameters used in the model.

    Returns:
        Reduced chi-square value.
    """
    residuals = np.asarray(residuals)
    chi_square = np.sum(residuals**2)
    n_points = len(residuals)
    dof = n_points - num_parameters
    if dof > 0:
        return chi_square / dof
    else:
        return np.nan

calculate_weighted_r_factor(y_obs, y_calc, weights)

Calculate the weighted R-factor between observed and calculated data.

Parameters:

Name Type Description Default
y_obs ndarray

Observed data points.

required
y_calc ndarray

Calculated data points.

required
weights ndarray

Weights for each data point.

required

Returns:

Type Description
float

Weighted R-factor value.

Source code in src/easydiffraction/analysis/reliability_factors.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def calculate_weighted_r_factor(
    y_obs: np.ndarray,
    y_calc: np.ndarray,
    weights: np.ndarray,
) -> float:
    """
    Calculate the weighted R-factor between observed and calculated data.

    Args:
        y_obs: Observed data points.
        y_calc: Calculated data points.
        weights: Weights for each data point.

    Returns:
        Weighted R-factor value.
    """
    y_obs = np.asarray(y_obs)
    y_calc = np.asarray(y_calc)
    weights = np.asarray(weights)
    numerator = np.sum(weights * (y_obs - y_calc) ** 2)
    denominator = np.sum(weights * y_obs**2)
    return np.sqrt(numerator / denominator) if denominator != 0 else np.nan

get_reliability_inputs(sample_models, experiments, calculator)

Collect observed and calculated data points for reliability calculations.

Parameters:

Name Type Description Default
sample_models SampleModels

Collection of sample models.

required
experiments Experiments

Collection of experiments.

required
calculator CalculatorBase

The calculator to use for pattern generation.

required

Returns:

Type Description
Tuple[ndarray, ndarray, Optional[ndarray]]

Tuple containing arrays of (observed values, calculated values, error values)

Source code in src/easydiffraction/analysis/reliability_factors.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def get_reliability_inputs(
    sample_models: SampleModels,
    experiments: Experiments,
    calculator: CalculatorBase,
) -> Tuple[np.ndarray, np.ndarray, Optional[np.ndarray]]:
    """
    Collect observed and calculated data points for reliability calculations.

    Args:
        sample_models: Collection of sample models.
        experiments: Collection of experiments.
        calculator: The calculator to use for pattern generation.

    Returns:
        Tuple containing arrays of (observed values, calculated values, error values)
    """
    y_obs_all = []
    y_calc_all = []
    y_err_all = []
    for expt_name, experiment in experiments._items.items():
        y_calc = calculator.calculate_pattern(sample_models, experiment)
        y_meas = experiment.datastore.pattern.meas
        y_meas_su = experiment.datastore.pattern.meas_su

        if y_meas is not None and y_calc is not None:
            # If standard uncertainty is not provided, use ones
            if y_meas_su is None:
                y_meas_su = np.ones_like(y_meas)

            y_obs_all.extend(y_meas)
            y_calc_all.extend(y_calc)
            y_err_all.extend(y_meas_su)

    return (
        np.array(y_obs_all),
        np.array(y_calc_all),
        np.array(y_err_all) if y_err_all else None,
    )