pydrex.pathlines

PyDRex: Functions for pathline construction.

  1"""> PyDRex: Functions for pathline construction."""
  2
  3import numpy as np
  4from scipy import integrate as si
  5
  6from pydrex import logger as _log
  7from pydrex import utils as _utils
  8
  9
 10def get_pathline(
 11    final_location,
 12    get_velocity,
 13    get_velocity_gradient,
 14    min_coords,
 15    max_coords,
 16    max_strain,
 17    regular_steps=None,
 18    **kwargs,
 19):
 20    """Determine the pathline for a particle in a steady state flow.
 21
 22    The pathline will terminate at the given `final_location` and follow a curve
 23    determined by the velocity gradient. It works for both 2D (rectangular) and 3D
 24    (orthopiped¹) domains, so long as the provided callables expect/return arrays of the
 25    appropriate dimension.
 26
 27    .. note::
 28        The pathline is calculated backwards in time (t < 0) from the given endpoint.
 29        Therefore, the returned position callable should be evaluated at negative times.
 30
 31    - `final_location` (array) — coordinates of the final location
 32    - `get_velocity` (callable) — returns velocity vector at a point
 33    - `get_velocity_gradient` (callable) — returns velocity gradient matrix at a point
 34    - `min_coords` (array) — lower bound coordinates of the box
 35    - `max_coords` (array) — upper bound coordinates of the box
 36    - `max_strain` (float) — target strain (given as “tensorial” strain ε) at the final
 37      location, useful if the pathline never inflows into the domain (the pathline will
 38      only be traced backwards until a strain of 0 is reached, unless a domain boundary
 39      is reached first)
 40    - `regular_steps` (float, optional) — number of time steps to use for regular
 41      resampling between the start (t << 0) and end (t <= 0) of the pathline
 42      (if `None`, which is the default, then the timestamps obtained from
 43      `scipy.integrate.solve_ivp` are returned instead)
 44
 45    Optional keyword arguments will be passed to `scipy.integrate.solve_ivp`. However,
 46    some of the arguments to the `solve_ivp` call may not be modified, and a warning
 47    will be raised if they are provided.
 48
 49    Returns a tuple containing the time points and an interpolant that can be used
 50    to evaluate the pathline position (see `scipy.integrate.OdeSolution`).
 51
 52    ¹An “orthopiped” is a 3D rectangle (called a “box” when we are in a hurry), see
 53    <https://www.whatistoday.net/2020/04/cuboid-dilemma.html>.
 54
 55    """
 56
 57    def _terminate(
 58        time, point, get_velocity, get_velocity_gradient, min_coords, max_coords
 59    ):
 60        # Track “previous” (last seen) timestamp and total strain value.
 61        nonlocal _time_prev, _strain
 62
 63        if _is_inside(point, min_coords, max_coords):
 64             = _utils.strain_increment(
 65                time - _time_prev, get_velocity_gradient(np.nan, point)
 66            )
 67            if time > _time_prev:  # Timestamps jump around for SciPy to find the root.
 68                _strain += 
 69            else:  # Subtract strain increment because we are going backwards in time.
 70                _strain -= 
 71            _time_prev = time
 72            return _strain
 73        # If we are outside the domain, always terminate.
 74        return 0
 75
 76    _terminate.terminal = True
 77    _strain = max_strain
 78    _time_prev = 0
 79    _event_flag = False
 80
 81    # Illegal keyword args, check the call below. Remove them and warn about it.
 82    for key in ("events", "jac", "dense_output", "args"):
 83        try:
 84            kwargs.pop(key)
 85        except KeyError:
 86            continue
 87        else:
 88            _log.warning("ignoring illegal keyword argument: %s", key)
 89
 90    # We don't want to stop at a particular time,
 91    # so integrate time for 100 Myr, in seconds (“forever”).
 92    path = si.solve_ivp(
 93        _ivp_func,
 94        [0, -100e6 * 365.25 * 8.64e4],
 95        final_location,
 96        method=kwargs.pop("method", "LSODA"),
 97        events=[_terminate],
 98        args=(get_velocity, get_velocity_gradient, min_coords, max_coords),
 99        dense_output=True,
100        jac=_ivp_jac,
101        atol=kwargs.pop("atol", 1e-8),
102        rtol=kwargs.pop("rtol", 1e-5),
103        **kwargs,
104    )
105    _log.info(
106        "calculated pathline from %s (t = %e) to %s (t = %e)",
107        path.sol(path.t[0]),
108        path.t[0],
109        path.sol(path.t[-1]),
110        path.t[-1],
111    )
112
113    if regular_steps is None:
114        return path.t[::-1], path.sol
115    else:
116        return np.linspace(path.t[-1], path.t[0], regular_steps + 1), path.sol
117
118
119def _ivp_func(time, point, get_velocity, get_velocity_gradient, min_coords, max_coords):
120    """Internal use only, must have the same signature as `get_pathline`."""
121    if _is_inside(point, min_coords, max_coords):
122        return get_velocity(np.nan, point)
123    return np.zeros_like(point)
124
125
126def _ivp_jac(time, point, get_velocity, get_velocity_gradient, min_coords, max_coords):
127    """Internal use only, must have the same signature as `_ivp_func`."""
128    if _is_inside(point, min_coords, max_coords):
129        return get_velocity_gradient(np.nan, point)
130    return np.zeros((np.array(point).size,) * 2)
131
132
133def _is_inside(point, min_coords, max_coords):
134    """Check if the point lies within the numerical domain."""
135    assert np.array(point).size == len(min_coords) == len(max_coords)
136    if np.any(np.array(point) < min_coords) or np.any(np.array(point) > max_coords):
137        return False
138    return True
def get_pathline( final_location, get_velocity, get_velocity_gradient, min_coords, max_coords, max_strain, regular_steps=None, **kwargs):
 11def get_pathline(
 12    final_location,
 13    get_velocity,
 14    get_velocity_gradient,
 15    min_coords,
 16    max_coords,
 17    max_strain,
 18    regular_steps=None,
 19    **kwargs,
 20):
 21    """Determine the pathline for a particle in a steady state flow.
 22
 23    The pathline will terminate at the given `final_location` and follow a curve
 24    determined by the velocity gradient. It works for both 2D (rectangular) and 3D
 25    (orthopiped¹) domains, so long as the provided callables expect/return arrays of the
 26    appropriate dimension.
 27
 28    .. note::
 29        The pathline is calculated backwards in time (t < 0) from the given endpoint.
 30        Therefore, the returned position callable should be evaluated at negative times.
 31
 32    - `final_location` (array) — coordinates of the final location
 33    - `get_velocity` (callable) — returns velocity vector at a point
 34    - `get_velocity_gradient` (callable) — returns velocity gradient matrix at a point
 35    - `min_coords` (array) — lower bound coordinates of the box
 36    - `max_coords` (array) — upper bound coordinates of the box
 37    - `max_strain` (float) — target strain (given as “tensorial” strain ε) at the final
 38      location, useful if the pathline never inflows into the domain (the pathline will
 39      only be traced backwards until a strain of 0 is reached, unless a domain boundary
 40      is reached first)
 41    - `regular_steps` (float, optional) — number of time steps to use for regular
 42      resampling between the start (t << 0) and end (t <= 0) of the pathline
 43      (if `None`, which is the default, then the timestamps obtained from
 44      `scipy.integrate.solve_ivp` are returned instead)
 45
 46    Optional keyword arguments will be passed to `scipy.integrate.solve_ivp`. However,
 47    some of the arguments to the `solve_ivp` call may not be modified, and a warning
 48    will be raised if they are provided.
 49
 50    Returns a tuple containing the time points and an interpolant that can be used
 51    to evaluate the pathline position (see `scipy.integrate.OdeSolution`).
 52
 53    ¹An “orthopiped” is a 3D rectangle (called a “box” when we are in a hurry), see
 54    <https://www.whatistoday.net/2020/04/cuboid-dilemma.html>.
 55
 56    """
 57
 58    def _terminate(
 59        time, point, get_velocity, get_velocity_gradient, min_coords, max_coords
 60    ):
 61        # Track “previous” (last seen) timestamp and total strain value.
 62        nonlocal _time_prev, _strain
 63
 64        if _is_inside(point, min_coords, max_coords):
 65             = _utils.strain_increment(
 66                time - _time_prev, get_velocity_gradient(np.nan, point)
 67            )
 68            if time > _time_prev:  # Timestamps jump around for SciPy to find the root.
 69                _strain += 
 70            else:  # Subtract strain increment because we are going backwards in time.
 71                _strain -= 
 72            _time_prev = time
 73            return _strain
 74        # If we are outside the domain, always terminate.
 75        return 0
 76
 77    _terminate.terminal = True
 78    _strain = max_strain
 79    _time_prev = 0
 80    _event_flag = False
 81
 82    # Illegal keyword args, check the call below. Remove them and warn about it.
 83    for key in ("events", "jac", "dense_output", "args"):
 84        try:
 85            kwargs.pop(key)
 86        except KeyError:
 87            continue
 88        else:
 89            _log.warning("ignoring illegal keyword argument: %s", key)
 90
 91    # We don't want to stop at a particular time,
 92    # so integrate time for 100 Myr, in seconds (“forever”).
 93    path = si.solve_ivp(
 94        _ivp_func,
 95        [0, -100e6 * 365.25 * 8.64e4],
 96        final_location,
 97        method=kwargs.pop("method", "LSODA"),
 98        events=[_terminate],
 99        args=(get_velocity, get_velocity_gradient, min_coords, max_coords),
100        dense_output=True,
101        jac=_ivp_jac,
102        atol=kwargs.pop("atol", 1e-8),
103        rtol=kwargs.pop("rtol", 1e-5),
104        **kwargs,
105    )
106    _log.info(
107        "calculated pathline from %s (t = %e) to %s (t = %e)",
108        path.sol(path.t[0]),
109        path.t[0],
110        path.sol(path.t[-1]),
111        path.t[-1],
112    )
113
114    if regular_steps is None:
115        return path.t[::-1], path.sol
116    else:
117        return np.linspace(path.t[-1], path.t[0], regular_steps + 1), path.sol

Determine the pathline for a particle in a steady state flow.

The pathline will terminate at the given final_location and follow a curve determined by the velocity gradient. It works for both 2D (rectangular) and 3D (orthopiped¹) domains, so long as the provided callables expect/return arrays of the appropriate dimension.

The pathline is calculated backwards in time (t < 0) from the given endpoint. Therefore, the returned position callable should be evaluated at negative times.

  • final_location (array) — coordinates of the final location
  • get_velocity (callable) — returns velocity vector at a point
  • get_velocity_gradient (callable) — returns velocity gradient matrix at a point
  • min_coords (array) — lower bound coordinates of the box
  • max_coords (array) — upper bound coordinates of the box
  • max_strain (float) — target strain (given as “tensorial” strain ε) at the final location, useful if the pathline never inflows into the domain (the pathline will only be traced backwards until a strain of 0 is reached, unless a domain boundary is reached first)
  • regular_steps (float, optional) — number of time steps to use for regular resampling between the start (t << 0) and end (t <= 0) of the pathline (if None, which is the default, then the timestamps obtained from scipy.integrate.solve_ivp are returned instead)

Optional keyword arguments will be passed to scipy.integrate.solve_ivp. However, some of the arguments to the solve_ivp call may not be modified, and a warning will be raised if they are provided.

Returns a tuple containing the time points and an interpolant that can be used to evaluate the pathline position (see scipy.integrate.OdeSolution).

¹An “orthopiped” is a 3D rectangle (called a “box” when we are in a hurry), see https://www.whatistoday.net/2020/04/cuboid-dilemma.html.