
PyDRex: 2D simple shear tests.

  1"""> PyDRex: 2D simple shear tests."""
  3import contextlib as cl
  4import functools as ft
  5from time import process_time
  7import numpy as np
  8import pytest
  9from numpy import asarray as Ŋ
 10from numpy import testing as nt
 11from scipy.interpolate import PchipInterpolator
 13from pydrex import core as _core
 14from pydrex import diagnostics as _diagnostics
 15from pydrex import geometry as _geo
 16from pydrex import io as _io
 17from pydrex import logger as _log
 18from pydrex import minerals as _minerals
 19from pydrex import pathlines as _paths
 20from pydrex import stats as _stats
 21from pydrex import utils as _utils
 22from pydrex import velocity as _velocity
 23from pydrex import visualisation as _vis
 25Pool, HAS_RAY = _utils.import_proc_pool()
 27# Subdirectory of `outdir` used to store outputs from these tests.
 28SUBDIR = "2d_simple_shear"
 31class TestPreliminaries:
 32    """Preliminary tests to check that various auxiliary routines are working."""
 34    def test_strain_increment(self):
 35        """Test for accumulating strain via strain increment calculations."""
 36        _, get_velocity_gradient = _velocity.simple_shear_2d("X", "Z", 1)
 37        timestamps = np.linspace(0, 1, 10)  # Solve until D₀t=1 (tensorial strain).
 38        strains_inc = np.zeros_like(timestamps)
 39        L = get_velocity_gradient(np.nan, Ŋ([0e0, 0e0, 0e0]))
 40        for i, ε in enumerate(strains_inc[1:]):
 41            strains_inc[i + 1] = strains_inc[i] + _utils.strain_increment(
 42                timestamps[1] - timestamps[0],
 43                L,
 44            )
 45        # For constant timesteps, check strains == positive_timestamps * strain_rate.
 46        nt.assert_allclose(strains_inc, timestamps, atol=6e-16, rtol=0)
 48        # Same thing, but for strain rate similar to experiments.
 49        _, get_velocity_gradient = _velocity.simple_shear_2d("Y", "X", 1e-5)
 50        timestamps = np.linspace(0, 1e6, 10)  # Solve until D₀t=10 (tensorial strain).
 51        strains_inc = np.zeros_like(timestamps)
 52        L = get_velocity_gradient(np.nan, Ŋ([0e0, 0e0, 0e0]))
 53        for i, ε in enumerate(strains_inc[1:]):
 54            strains_inc[i + 1] = strains_inc[i] + _utils.strain_increment(
 55                timestamps[1] - timestamps[0],
 56                L,
 57            )
 58        nt.assert_allclose(strains_inc, timestamps * 1e-5, atol=5e-15, rtol=0)
 60        # Again, but this time the particle will move (using get_pathline).
 61        # We use a 400km x 400km box and a strain rate of 1e-15 s⁻¹.
 62        get_velocity, get_velocity_gradient = _velocity.simple_shear_2d("X", "Z", 1e-15)
 63        timestamps, get_position = _paths.get_pathline(
 64            Ŋ([1e5, 0e0, 1e5]),
 65            get_velocity,
 66            get_velocity_gradient,
 67            Ŋ([-2e5, 0e0, -2e5]),
 68            Ŋ([2e5, 0e0, 2e5]),
 69            2,
 70            regular_steps=10,
 71        )
 72        positions = [get_position(t) for t in timestamps]
 73        velocity_gradients = [get_velocity_gradient(np.nan, Ŋ(x)) for x in positions]
 75        # Check that polycrystal is experiencing steady velocity gradient.
 76        nt.assert_array_equal(
 77            velocity_gradients, np.full_like(velocity_gradients, velocity_gradients[0])
 78        )
 79        # Check that positions are changing as expected.
 80        xdiff = np.diff(Ŋ([x[0] for x in positions]))
 81        zdiff = np.diff(Ŋ([x[2] for x in positions]))
 82        assert xdiff[0] > 0
 83        assert zdiff[0] == 0
 84        nt.assert_allclose(xdiff, np.full_like(xdiff, xdiff[0]), rtol=0, atol=1e-10)
 85        nt.assert_allclose(zdiff, np.full_like(zdiff, zdiff[0]), rtol=0, atol=1e-10)
 86        strains_inc = np.zeros_like(timestamps)
 87        for t, time in enumerate(timestamps[:-1], start=1):
 88            strains_inc[t] = strains_inc[t - 1] + (
 89                _utils.strain_increment(timestamps[t] - time, velocity_gradients[t])
 90            )
 91        # fig, ax, _, _ = _vis.pathline_box2d(
 92        #     None,
 93        #     get_velocity,
 94        #     "xz",
 95        #     strains_inc,
 96        #     positions,
 97        #     ".",
 98        #     Ŋ([-2e5, -2e5]),
 99        #     Ŋ([2e5, 2e5]),
100        #     [20, 20],
101        # )
102        # fig.savefig("/tmp/fig.png")
103        nt.assert_allclose(
104            strains_inc,
105            (timestamps - timestamps[0]) * 1e-15,
106            atol=5e-15,
107            rtol=0,
108        )
111class TestOlivineA:
112    """Tests for stationary A-type olivine polycrystals in 2D simple shear."""
114    class_id = "olivineA"
116    @classmethod
117    def run(
118        cls,
119        params,
120        timestamps,
121        strain_rate,
122        get_velocity_gradient,
123        shear_direction,
124        seed=None,
125        return_fse=None,
126        get_position=lambda t: np.zeros(3),  # Stationary particles by default.
127    ):
128        """Reusable logic for 2D olivine (A-type) simple shear tests.
130        Returns a tuple with the mineral and the FSE angle (or `None` if `return_fse` is
131        `None`).
133        """
134        mineral = _minerals.Mineral(
135            phase=_core.MineralPhase.olivine,
136            fabric=_core.MineralFabric.olivine_A,
137            regime=_core.DeformationRegime.matrix_dislocation,
138            n_grains=params["number_of_grains"],
139            seed=seed,
140        )
141        deformation_gradient = np.eye(3)  # Undeformed initial state.
142        θ_fse = np.empty_like(timestamps)
143        θ_fse[0] = 45
145        for t, time in enumerate(timestamps[:-1], start=1):
146            # Set up logging message depending on dynamic parameter and seeds.
147            msg_start = (
148                f"N = {params['number_of_grains']}; "
149                + f"λ∗ = {params['nucleation_efficiency']}; "
150                + f"X = {params['gbs_threshold']}; "
151                + f"M∗ = {params['gbm_mobility']}; "
152            )
153            if seed is not None:
154                msg_start += f"# {seed}; "
156            _log.info(msg_start + "step %s/%s (t = %s)", t, len(timestamps) - 1, time)
158            deformation_gradient = mineral.update_orientations(
159                params,
160                deformation_gradient,
161                get_velocity_gradient,
162                pathline=(time, timestamps[t], get_position),
163            )
164            _log.debug(
165                "› velocity gradient = %s",
166                get_velocity_gradient(np.nan, np.full(3, np.nan)).flatten(),
167            )
168            _log.debug("› strain D₀t = %.2f", strain_rate * timestamps[t])
169            _log.debug(
170                "› grain fractions: median = %s, max = %s, min = %s",
171                np.median(mineral.fractions[-1]),
172                np.max(mineral.fractions[-1]),
173                np.min(mineral.fractions[-1]),
174            )
175            if return_fse:
176                _, fse_v = _diagnostics.finite_strain(deformation_gradient)
177                θ_fse[t] = _diagnostics.smallest_angle(fse_v, shear_direction)
178            else:
179                θ_fse = None
181        return mineral, θ_fse
183    @classmethod
184    def interp_GBM_Kaminski2001(cls, strains):
185        """Interpolate Kaminski & Ribe, 2001 data to get target angles at `strains`."""
186        _log.info("interpolating target CPO angles...")
187        data = _io.read_scsv(_io.data("thirdparty") / "Kaminski2001_GBMshear.scsv")
188        cs_M0 = PchipInterpolator(
189            _utils.remove_nans(data.equivalent_strain_M0) / 200,
190            _utils.remove_nans(data.angle_M0),
191        )
192        cs_M50 = PchipInterpolator(
193            _utils.remove_nans(data.equivalent_strain_M50) / 200,
194            _utils.remove_nans(data.angle_M50),
195        )
196        cs_M200 = PchipInterpolator(
197            _utils.remove_nans(data.equivalent_strain_M200) / 200,
198            _utils.remove_nans(data.angle_M200),
199        )
200        return [cs_M0(strains), cs_M50(strains), cs_M200(strains)]
202    @classmethod
203    def interp_GBM_FortranDRex(cls, strains):
204        """Interpolate angles produced using 'tools/drex_forward_simpleshear.f90'."""
205        _log.info("interpolating target CPO  angles...")
206        data = _io.read_scsv(_io.data("drexF90") / "olA_D1E4_dt50_X0_L5.scsv")
207        data_strains = np.linspace(0, 1, 200)
208        cs_M0 = PchipInterpolator(data_strains, _utils.remove_nans(data.M0_angle))
209        cs_M50 = PchipInterpolator(data_strains, _utils.remove_nans(data.M50_angle))
210        cs_M200 = PchipInterpolator(data_strains, _utils.remove_nans(data.M200_angle))
211        return [cs_M0(strains), cs_M50(strains), cs_M200(strains)]
213    @classmethod
214    def interp_GBS_FortranDRex(cls, strains):
215        """Interpolate angles produced using 'tools/drex_forward_simpleshear.f90'."""
216        _log.info("interpolating target CPO  angles...")
217        data = _io.read_scsv(_io.data("thirdparty") / "a_axis_GBS_fortran.scsv")
218        data_strains = np.linspace(0, 1, 200)
219        cs_X0 = PchipInterpolator(data_strains, _utils.remove_nans(data.a_mean_X0))
220        cs_X20 = PchipInterpolator(data_strains, _utils.remove_nans(data.a_mean_X20))
221        cs_X40 = PchipInterpolator(data_strains, _utils.remove_nans(data.a_mean_X40))
222        return [cs_X0(strains), cs_X20(strains), cs_X40(strains)]
224    @classmethod
225    def interp_GBS_long_FortranDRex(cls, strains):
226        """Interpolate angles produced using 'tools/drex_forward_simpleshear.f90'."""
227        _log.info("interpolating target CPO  angles...")
228        data = _io.read_scsv(_io.data("thirdparty") / "a_axis_GBS_long_fortran.scsv")
229        data_strains = np.linspace(0, 2.5, 500)
230        cs_X0 = PchipInterpolator(data_strains, _utils.remove_nans(data.a_mean_X0))
231        cs_X20 = PchipInterpolator(data_strains, _utils.remove_nans(data.a_mean_X20))
232        cs_X40 = PchipInterpolator(data_strains, _utils.remove_nans(data.a_mean_X40))
233        return [cs_X0(strains), cs_X20(strains), cs_X40(strains)]
235    @classmethod
236    def interp_GBS_Kaminski2004(cls, strains):
237        """Interpolate Kaminski & Ribe, 2001 data to get target angles at `strains`."""
238        _log.info("interpolating target CPO angles...")
239        data = _io.read_scsv(_io.data("thirdparty") / "Kaminski2004_GBSshear.scsv")
240        cs_X0 = PchipInterpolator(
241            _utils.remove_nans(data.dimensionless_time_X0),
242            45 + _utils.remove_nans(data.angle_X0),
243        )
244        cs_X0d2 = PchipInterpolator(
245            _utils.remove_nans(data.dimensionless_time_X0d2),
246            45 + _utils.remove_nans(data.angle_X0d2),
247        )
248        cs_X0d4 = PchipInterpolator(
249            _utils.remove_nans(data.dimensionless_time_X0d4),
250            45 + _utils.remove_nans(data.angle_X0d4),
251        )
252        return [cs_X0(strains), cs_X0d2(strains), cs_X0d4(strains)]
254    @pytest.mark.skipif(_utils.in_ci("win32"), reason="Unable to allocate memory")
255    def test_zero_recrystallisation(self, seed):
256        """Check that M*=0 is a reliable switch to turn off recrystallisation."""
257        params = _core.DefaultParams().as_dict()
258        params["gbm_mobility"] = 0
259        strain_rate = 1
260        timestamps = np.linspace(0, 1, 25)  # Solve until D₀t=1 (tensorial strain).
261        shear_direction = Ŋ([0, 1, 0], dtype=np.float64)
262        _, get_velocity_gradient = _velocity.simple_shear_2d("Y", "X", strain_rate)
263        mineral, _ = self.run(
264            params,
265            timestamps,
266            strain_rate,
267            get_velocity_gradient,
268            shear_direction,
269            seed=seed,
270        )
271        for fractions in mineral.fractions[1:]:
272            nt.assert_allclose(fractions, mineral.fractions[0], atol=1e-15, rtol=0)
274    @pytest.mark.skipif(_utils.in_ci("win32"), reason="Unable to allocate memory")
275    @pytest.mark.parametrize("gbm_mobility", [50, 100, 150])
276    def test_grainsize_median(self, seed, gbm_mobility):
277        """Check that M*={50,100,150}, λ*=5 causes decreasing grain size median."""
278        params = _core.DefaultParams().as_dict()
279        params["gbm_mobility"] = gbm_mobility
280        params["nucleation_efficiency"] = 5
281        strain_rate = 1
282        timestamps = np.linspace(0, 1, 25)  # Solve until D₀t=1 (tensorial strain).
283        n_timestamps = len(timestamps)
284        shear_direction = Ŋ([0, 1, 0], dtype=np.float64)
285        _, get_velocity_gradient = _velocity.simple_shear_2d("Y", "X", strain_rate)
286        mineral, _ = self.run(
287            params,
288            timestamps,
289            strain_rate,
290            get_velocity_gradient,
291            shear_direction,
292            seed=seed,
293        )
294        medians = np.empty(n_timestamps)
295        for i, fractions in enumerate(mineral.fractions):
296            medians[i] = np.median(fractions)
298        # The first diff is positive (~1e-6) before nucleation sets in.
299        nt.assert_array_less(np.diff(medians)[1:], np.full(n_timestamps - 2, 0))
301    @pytest.mark.slow
302    @pytest.mark.parametrize("gbs_threshold", [0, 0.2, 0.4])
303    @pytest.mark.parametrize("nucleation_efficiency", [3, 5, 10])
304    def test_dvdx_ensemble(
305        self, outdir, seeds_nearX45, ncpus, gbs_threshold, nucleation_efficiency
306    ):
307        r"""Test a-axis alignment to shear in Y direction (init. SCCS near 45° from X).
309        Velocity gradient:
310        $$\bm{L} = \begin{bmatrix} 0 & 0 & 0 \cr 2 & 0 & 0 \cr 0 & 0 & 0 \end{bmatrix}$$
312        """
313        strain_rate = 1
314        timestamps = np.linspace(0, 1, 201)  # Solve until D₀t=1 ('shear' γ=2).
315        n_timestamps = len(timestamps)
316        # Use `seeds` instead of `seeds_nearX45` if you have even more RAM and CPU time.
317        _seeds = seeds_nearX45
318        n_seeds = len(_seeds)
320        shear_direction = Ŋ([0, 1, 0], dtype=np.float64)
321        _, get_velocity_gradient = _velocity.simple_shear_2d("Y", "X", strain_rate)
323        gbm_mobilities = [0, 50, 125, 200]
324        markers = ("x", "*", "d", "s")
326        _id = f"X{_io.stringify(gbs_threshold)}_L{_io.stringify(nucleation_efficiency)}"
327        # Output setup with optional logging and data series labels.
328        θ_fse = np.empty_like(timestamps)
329        angles = np.empty((len(gbm_mobilities), n_seeds, n_timestamps))
330        optional_logging = cl.nullcontext()
331        if outdir is not None:
332            out_basepath = f"{outdir}/{SUBDIR}/{self.class_id}_dvdx_ensemble_{_id}"
333            optional_logging = _io.logfile_enable(f"{out_basepath}.log")
334            labels = []
336        with optional_logging:
337            clock_start = process_time()
338            for m, gbm_mobility in enumerate(gbm_mobilities):
339                if m == 0:
340                    return_fse = True
341                else:
342                    return_fse = False
344                params = {
345                    "phase_assemblage": (_core.MineralPhase.olivine,),
346                    "phase_fractions": (1.0,),
347                    "enstatite_fraction": 0.0,
348                    "stress_exponent": 1.5,
349                    "deformation_exponent": 3.5,
350                    "gbm_mobility": gbm_mobility,
351                    "gbs_threshold": gbs_threshold,
352                    "nucleation_efficiency": nucleation_efficiency,
353                    "number_of_grains": 5000,
354                    "initial_olivine_fabric": "A",
355                }
357                _run = ft.partial(
358                    self.run,
359                    params,
360                    timestamps,
361                    strain_rate,
362                    get_velocity_gradient,
363                    shear_direction,
364                    return_fse=return_fse,
365                )
366                with Pool(processes=ncpus) as pool:
367                    for s, out in enumerate(pool.imap_unordered(_run, _seeds)):
368                        mineral, fse_angles = out
369                        angles[m, s, :] = [
370                            _diagnostics.smallest_angle(v, shear_direction)
371                            for v in _diagnostics.elasticity_components(
372                                _minerals.voigt_averages([mineral], params)
373                            )["hexagonal_axis"]
374                        ]
375                        # Save the whole mineral for the first seed only.
376                        if outdir is not None and s == 0:
377                            postfix = (
378                                f"M{_io.stringify(gbm_mobility)}"
379                                + f"_X{_io.stringify(gbs_threshold)}"
380                                + f"_L{_io.stringify(nucleation_efficiency)}"
381                            )
382                            mineral.save(f"{out_basepath}.npz", postfix=postfix)
383                        if return_fse:
384                            θ_fse += fse_angles
386                if return_fse:
387                    θ_fse /= n_seeds
389                if outdir is not None:
390                    labels.append(f"$M^∗$ = {gbm_mobility}")
392            _log.info("elapsed CPU time: %s", np.abs(process_time() - clock_start))
394        # Take ensemble means and optionally plot figure.
395        strains = timestamps * strain_rate
396        _log.info("postprocessing results for %s", _id)
397        result_angles = angles.mean(axis=1)
398        result_angles_err = angles.std(axis=1)
400        if outdir is not None:
401            schema = {
402                "delimiter": ",",
403                "missing": "-",
404                "fields": [
405                    {
406                        "name": "strain",
407                        "type": "integer",
408                        "unit": "percent",
409                        "fill": 999999,
410                    }
411                ],
412            }
413            np.savez(
414                f"{out_basepath}.npz",
415                angles=result_angles,
416                angles_err=result_angles_err,
417            )
418            _io.save_scsv(
419                f"{out_basepath}_strains.scsv",
420                schema,
421                [[int(D * 200) for D in strains]],  # Shear strain % is 200 * D₀.
422            )
423            fig, ax, colors = _vis.alignment(
424                None,
425                strains,
426                result_angles,
427                markers,
428                labels,
429                err=result_angles_err,
430                θ_max=60,
431                θ_fse=θ_fse,
432            )
433            fig.savefig(_io.resolve_path(f"{out_basepath}.pdf"))
435    @pytest.mark.slow
436    def test_dvdx_GBM(self, outdir, seeds_nearX45, ncpus):
437        r"""Test a-axis alignment to shear in Y direction (init. SCCS near 45° from X).
439        Velocity gradient:
440        $$
441        \bm{L} = 10^{-4} ×
442            \begin{bmatrix} 0 & 0 & 0 \cr 2 & 0 & 0 \cr 0 & 0 & 0 \end{bmatrix}
443        $$
445        Results are compared to the Fortran 90 output.
447        """
448        shear_direction = Ŋ([0, 1, 0], dtype=np.float64)
449        strain_rate = 1e-4
450        _, get_velocity_gradient = _velocity.simple_shear_2d("Y", "X", strain_rate)
451        timestamps = np.linspace(0, 1e4, 51)  # Solve until D₀t=1 ('shear' γ=2).
452        i_strain_40p = 10  # Index of 40% strain, lower strains are not relevant here.
453        i_strain_100p = 25  # Index of 100% strain, when M*=0 matches FSE.
454        params = _core.DefaultParams().as_dict()
455        params["gbs_threshold"] = 0  # No GBS, to match the Fortran parameters.
456        gbm_mobilities = (0, 10, 50, 125, 200)  # Must be in ascending order.
457        markers = ("x", ".", "*", "d", "s")
458        # Use `seeds` instead of `seeds_nearX45` if you have even more RAM and CPU time.
459        _seeds = seeds_nearX45
460        n_seeds = len(_seeds)
461        angles = np.empty((len(gbm_mobilities), n_seeds, len(timestamps)))
462        θ_fse = np.zeros_like(timestamps)
463        strains = timestamps * strain_rate
464        M0_drexF90, M50_drexF90, M200_drexF90 = self.interp_GBM_FortranDRex(strains)
466        optional_logging = cl.nullcontext()
467        if outdir is not None:
468            out_basepath = f"{outdir}/{SUBDIR}/{self.class_id}_mobility"
469            optional_logging = _io.logfile_enable(f"{out_basepath}.log")
470            labels = []
472        with optional_logging:
473            clock_start = process_time()
474            for m, gbm_mobility in enumerate(gbm_mobilities):
475                if m == 0:
476                    return_fse = True
477                else:
478                    return_fse = False
479                params["gbm_mobility"] = gbm_mobility
481                _run = ft.partial(
482                    self.run,
483                    params,
484                    timestamps,
485                    strain_rate,
486                    get_velocity_gradient,
487                    shear_direction,
488                    return_fse=True,
489                )
490                with Pool(processes=ncpus) as pool:
491                    for s, out in enumerate(pool.imap_unordered(_run, _seeds)):
492                        mineral, fse_angles = out
493                        angles[m, s, :] = [
494                            _diagnostics.smallest_angle(v, shear_direction)
495                            for v in _diagnostics.elasticity_components(
496                                _minerals.voigt_averages([mineral], params)
497                            )["hexagonal_axis"]
498                        ]
499                        # Save the whole mineral for the first seed only.
500                        if outdir is not None and s == 0:
501                            mineral.save(
502                                f"{out_basepath}.npz",
503                                postfix=f"M{_io.stringify(gbm_mobility)}",
504                            )
505                        if return_fse:
506                            θ_fse += fse_angles
508                if return_fse:
509                    θ_fse /= n_seeds
511                if outdir is not None:
512                    labels.append(f"$M^∗$ = {params['gbm_mobility']}")
514            _log.info("elapsed CPU time: %s", np.abs(process_time() - clock_start))
516            # Take ensemble means and optionally plot figure.
517            _log.info("postprocessing results...")
518            result_angles = angles.mean(axis=1)
519            result_angles_err = angles.std(axis=1)
521            if outdir is not None:
522                schema = {
523                    "delimiter": ",",
524                    "missing": "-",
525                    "fields": [
526                        {
527                            "name": "strain",
528                            "type": "integer",
529                            "unit": "percent",
530                            "fill": 999999,
531                        }
532                    ],
533                }
534                _io.save_scsv(
535                    f"{out_basepath}_strains.scsv",
536                    schema,
537                    [[int(D * 200) for D in strains]],  # Shear strain % is 200 * D₀.
538                )
539                np.savez(
540                    f"{out_basepath}_angles.npz",
541                    angles=result_angles,
542                    err=result_angles_err,
543                )
544                fig, ax, colors = _vis.alignment(
545                    None,
546                    strains,
547                    result_angles,
548                    markers,
549                    labels,
550                    err=result_angles_err,
551                    θ_max=60,
552                    θ_fse=θ_fse,
553                )
554                ax.plot(strains, M0_drexF90, c=colors[0])
555                ax.plot(strains, M50_drexF90, c=colors[2])
556                ax.plot(strains, M200_drexF90, c=colors[4])
557                _vis.show_Skemer2016_ShearStrainAngles(
558                    ax,
559                    ["Z&K 1200 C", "Z&K 1300 C"],
560                    ["v", "^"],
561                    ["k", "k"],
562                    ["none", None],
563                    [
564                        "Zhang & Karato, 1995\n(1473 K)",
565                        "Zhang & Karato, 1995\n(1573 K)",
566                    ],
567                    _core.MineralFabric.olivine_A,
568                )
569                # There is a lot of stuff on this legend, so put it outside the axes.
570                # These values might need to be tweaked depending on the font size, etc.
571                _legend = _utils.redraw_legend(ax, fig=fig, bbox_to_anchor=(1.66, 0.99))
572                fig.savefig(
573                    _io.resolve_path(f"{out_basepath}.pdf"),
574                    bbox_extra_artists=(_legend,),
575                    bbox_inches="tight",
576                )
578            # Check that GBM speeds up the alignment between 40% and 100% strain.
579            _log.info("checking grain orientations...")
580            for i, θ in enumerate(result_angles[:-1], start=1):
581                nt.assert_array_less(
582                    result_angles[i][i_strain_40p:i_strain_100p],
583                    θ[i_strain_40p:i_strain_100p],
584                )
586            # Check that M*=0 matches FSE (±1°) past 100% strain.
587            nt.assert_allclose(
588                result_angles[0][i_strain_100p:],
589                θ_fse[i_strain_100p:],
590                atol=1,
591                rtol=0,
592            )
594            # Check that results match Fortran output past 40% strain.
595            nt.assert_allclose(
596                result_angles[0][i_strain_40p:],
597                M0_drexF90[i_strain_40p:],
598                atol=0,
599                rtol=0.1,  # At 40% strain the match is worse than at higher strain.
600            )
601            nt.assert_allclose(
602                result_angles[2][i_strain_40p:],
603                M50_drexF90[i_strain_40p:],
604                atol=1,
605                rtol=0,
606            )
607            nt.assert_allclose(
608                result_angles[4][i_strain_40p:],
609                M200_drexF90[i_strain_40p:],
610                atol=1.5,
611                rtol=0,
612            )
614    @pytest.mark.slow
615    def test_GBM_calibration(self, outdir, seeds, ncpus):
616        r"""Compare results for various values of $$M^∗$$ to A-type olivine data.
618        Velocity gradient:
619        $$
620        \bm{L} = 10^{-4} ×
621            \begin{bmatrix} 0 & 0 & 0 \cr 2 & 0 & 0 \cr 0 & 0 & 0 \end{bmatrix}
622        $$
624        Unlike `test_dvdx_GBM`,
625        grain boudary sliding is enabled here (see `_core.DefaultParams`).
626        Data are provided by [Skemer & Hansen, 2016](http://dx.doi.org/10.1016/j.tecto.2015.12.003).
628        """
629        shear_direction = Ŋ([0, 1, 0], dtype=np.float64)
630        strain_rate = 1
631        _, get_velocity_gradient = _velocity.simple_shear_2d("Y", "X", strain_rate)
632        timestamps = np.linspace(0, 3.2, 65)  # Solve until D₀t=3.2 ('shear' γ=6.4).
633        params = _core.DefaultParams().as_dict()
634        params["number_of_grains"] = 5000
635        gbm_mobilities = (0, 10, 50, 125)  # Must be in ascending order.
636        markers = ("x", "*", "1", ".")
637        # Uses 100 seeds by default; use all 1000 if you have more RAM and CPU time.
638        _seeds = seeds[:100]
639        n_seeds = len(_seeds)
640        angles = np.empty((len(gbm_mobilities), n_seeds, len(timestamps)))
641        θ_fse = np.zeros_like(timestamps)
642        strains = timestamps * strain_rate
644        optional_logging = cl.nullcontext()
645        if outdir is not None:
646            out_basepath = f"{outdir}/{SUBDIR}/{self.class_id}_calibration"
647            optional_logging = _io.logfile_enable(f"{out_basepath}.log")
648            labels = []
650        with optional_logging:
651            clock_start = process_time()
652            for m, gbm_mobility in enumerate(gbm_mobilities):
653                return_fse = True if m == 0 else False
654                params["gbm_mobility"] = gbm_mobility
655                _run = ft.partial(
656                    self.run,
657                    params,
658                    timestamps,
659                    strain_rate,
660                    get_velocity_gradient,
661                    shear_direction,
662                    return_fse=return_fse,
663                )
664                with Pool(processes=ncpus) as pool:
665                    for s, out in enumerate(pool.imap_unordered(_run, _seeds)):
666                        mineral, fse_angles = out
667                        angles[m, s, :] = [
668                            _diagnostics.smallest_angle(v, shear_direction)
669                            for v in _diagnostics.elasticity_components(
670                                _minerals.voigt_averages([mineral], params)
671                            )["hexagonal_axis"]
672                        ]
673                        # Save the whole mineral for the first seed only.
674                        if outdir is not None and s == 0:
675                            mineral.save(
676                                f"{out_basepath}.npz",
677                                postfix=f"M{_io.stringify(gbm_mobility)}",
678                            )
679                        if return_fse:
680                            θ_fse += fse_angles
682                if return_fse:
683                    θ_fse /= n_seeds
685                if outdir is not None:
686                    labels.append(f"$M^∗$ = {params['gbm_mobility']}")
688            _log.info("elapsed CPU time: %s", np.abs(process_time() - clock_start))
690            # Take ensemble means and optionally plot figure.
691            _log.info("postprocessing results...")
692            result_angles = angles.mean(axis=1)
693            result_angles_err = angles.std(axis=1)
695            if outdir is not None:
696                schema = {
697                    "delimiter": ",",
698                    "missing": "-",
699                    "fields": [
700                        {
701                            "name": "strain",
702                            "type": "integer",
703                            "unit": "percent",
704                            "fill": 999999,
705                        }
706                    ],
707                }
708                _io.save_scsv(
709                    f"{out_basepath}_strains.scsv",
710                    schema,
711                    [[int(D * 200) for D in strains]],  # Shear strain % is 200 * D₀.
712                )
713                np.savez(
714                    _io.resolve_path(f"{out_basepath}_ensemble_means.npz"),
715                    angles=result_angles,
716                    err=result_angles_err,
717                )
718                fig = _vis.figure(
719                    figsize=(_vis.DEFAULT_FIG_WIDTH * 3, _vis.DEFAULT_FIG_HEIGHT)
720                )
721                fig, ax, colors = _vis.alignment(
722                    fig.add_subplot(),
723                    strains,
724                    result_angles,
725                    markers,
726                    labels,
727                    err=result_angles_err,
728                    θ_max=80,
729                    θ_fse=θ_fse,
730                )
731                (
732                    _,
733                    _,
734                    _,
735                    data_Skemer2016,
736                    indices,
737                ) = _vis.show_Skemer2016_ShearStrainAngles(
738                    ax,
739                    [
740                        "Z&K 1200 C",
741                        "Z&K 1300 C",
742                        "Skemer 2011",
743                        "Hansen 2014",
744                        "Warren 2008",
745                        "Webber 2010",
746                        "H&W 2015",
747                    ],
748                    ["v", "^", "o", "s", "v", "o", "s"],
749                    ["k", "k", "k", "k", "k", "k", "k"],
750                    ["none", "none", "none", "none", None, None, None],
751                    [
752                        "Zhang & Karato, 1995 (1473 K)",
753                        "Zhang & Karato, 1995 (1573 K)",
754                        "Skemer et al., 2011 (1500 K)",
755                        "Hansen et al., 2014 (1473 K)",
756                        "Warren et al., 2008",
757                        "Webber et al., 2010",
758                        "Hansen & Warren, 2015",
759                    ],
760                    fabric=_core.MineralFabric.olivine_A,
761                )
762                _legend = _utils.redraw_legend(ax, loc="upper right", ncols=3)
763                fig.savefig(
764                    _io.resolve_path(f"{out_basepath}.pdf"),
765                    bbox_extra_artists=(_legend,),
766                    bbox_inches="tight",
767                )
768                r2vals = []
769                for angles in result_angles:
770                    _angles = PchipInterpolator(strains, angles)
771                    r2 = np.sum(
772                        [
773                            (a - b) ** 2
774                            for a, b in zip(
775                                _angles(
776                                    np.take(data_Skemer2016.shear_strain, indices) / 200
777                                ),
778                                np.take(data_Skemer2016.angle, indices),
779                            )
780                        ]
781                    )
782                    r2vals.append(r2)
783                _log.info(
784                    "Sums of squared residuals (r-values) for each M∗: %s", r2vals
785                )
787    @pytest.mark.big
788    def test_dudz_pathline(self, outdir, seed):
789        """Test alignment of olivine a-axis for a polycrystal advected on a pathline."""
790        test_id = "dudz_pathline"
791        optional_logging = cl.nullcontext()
792        if outdir is not None:
793            out_basepath = f"{outdir}/{SUBDIR}/{self.class_id}_{test_id}"
794            optional_logging = _io.logfile_enable(f"{out_basepath}.log")
796        with optional_logging:
797            shear_direction = Ŋ([1, 0, 0], dtype=np.float64)
798            strain_rate = 1e-15  # Moderate, realistic shear in the upper mantle.
799            get_velocity, get_velocity_gradient = _velocity.simple_shear_2d(
800                "X", "Z", strain_rate
801            )
802            n_timesteps = 10
803            timestamps, get_position = _paths.get_pathline(
804                Ŋ([1e5, 0e0, 1e5]),
805                get_velocity,
806                get_velocity_gradient,
807                Ŋ([-2e5, 0e0, -2e5]),
808                Ŋ([2e5, 0e0, 2e5]),
809                2,
810                regular_steps=n_timesteps,
811            )
812            positions = [get_position(t) for t in timestamps]
813            velocity_gradients = [
814                get_velocity_gradient(np.nan, Ŋ(x)) for x in positions
815            ]
817            params = _core.DefaultParams().as_dict()
818            params["gbm_mobility"] = 10
819            params["number_of_grains"] = 5000
820            olA = _minerals.Mineral(n_grains=params["number_of_grains"], seed=seed)
821            deformation_gradient = np.eye(3)
822            strains = np.zeros_like(timestamps)
823            for t, time in enumerate(timestamps[:-1], start=1):
824                strains[t] = strains[t - 1] + (
825                    _utils.strain_increment(timestamps[t] - time, velocity_gradients[t])
826                )
827                _log.info("step %d/%d (ε = %.2f)", t, len(timestamps) - 1, strains[t])
828                deformation_gradient = olA.update_orientations(
829                    params,
830                    deformation_gradient,
831                    get_velocity_gradient,
832                    pathline=(time, timestamps[t], get_position),
833                )
835            if outdir is not None:
836                olA.save(f"{out_basepath}.npz")
838            orient_resampled, fractions_resampled = _stats.resample_orientations(
839                olA.orientations, olA.fractions, seed=seed
840            )
841            # About 36GB, 26 min needed with float64. GitHub macos runner has 14GB.
842            misorient_indices = _diagnostics.misorientation_indices(
843                orient_resampled,
844                _geo.LatticeSystem.orthorhombic,
845                ncpus=3,
846            )
847            cpo_vectors = np.zeros((n_timesteps + 1, 3))
848            cpo_angles = np.zeros(n_timesteps + 1)
849            for i, matrices in enumerate(orient_resampled):
850                cpo_vectors[i] = _diagnostics.bingham_average(
851                    matrices,
852                    axis=_minerals.OLIVINE_PRIMARY_AXIS[olA.fabric],
853                )
854                cpo_angles[i] = _diagnostics.smallest_angle(
855                    cpo_vectors[i], shear_direction
856                )
858            # Check for mostly decreasing CPO angles (exclude initial condition).
859            _log.debug("cpo angles: %s", cpo_angles)
860            nt.assert_array_less(np.diff(cpo_angles[1:]), np.ones(n_timesteps - 1))
861            # Check for increasing CPO strength (M-index).
862            _log.debug("cpo strengths: %s", misorient_indices)
863            nt.assert_array_less(
864                np.full(n_timesteps, -0.01), np.diff(misorient_indices)
865            )
866            # Check that last angle is <5° (M*=125) or <10° (M*=10).
867            assert cpo_angles[-1] < 10
869        if outdir is not None:
870            fig, ax, _, _ = _vis.steady_box2d(
871                None,
872                (get_velocity, [20, 20]),
873                (positions, Ŋ([-2e5, -2e5]), Ŋ([2e5, 2e5])),
874                "xz",
875                (cpo_vectors, misorient_indices),
876                strains,
877            )
878            fig.savefig(f"{out_basepath}.pdf")
