pydrex.io

PyDRex: Mesh, configuration and supporting data Input/Output functions.

PyDRex can read/write three kinds of plain text files:

  • PyDRex configuration files, which specify simulation parameters and initial conditions
  • 'SCSV' files, CSV files with YAML frontmatter for (small) scientific datasets
  • Mesh files via meshio, to set up final mineral positions in steady flows.

SCSV files are our custom CSV files with a YAML header. The header is used for data attribution and metadata, as well as a column type spec. There is no official spec for SCSV files at the moment but they should follow the format of existing SCSV files in the data/ folder of the source repository. For supported cell types, see SCSV_TYPEMAP.

  1"""> PyDRex: Mesh, configuration and supporting data Input/Output functions.
  2
  3PyDRex can read/write three kinds of plain text files:
  4- PyDRex configuration files, which specify simulation parameters and initial conditions
  5- 'SCSV' files, CSV files with YAML frontmatter for (small) scientific datasets
  6- Mesh files via `meshio`, to set up final mineral positions in steady flows.
  7
  8SCSV files are our custom CSV files with a YAML header. The header is used for data
  9attribution and metadata, as well as a column type spec. There is no official spec for
 10SCSV files at the moment but they should follow the format of existing  SCSV files in
 11the `data/` folder of the source repository. For supported cell types, see
 12`SCSV_TYPEMAP`.
 13
 14"""
 15
 16import collections as c
 17import contextlib as cl
 18import csv
 19import functools as ft
 20import io
 21import itertools as it
 22import logging
 23import os
 24import pathlib
 25import re
 26import sys
 27
 28if sys.version_info >= (3, 11):
 29    import tomllib
 30else:
 31    import tomli as tomllib
 32
 33from importlib.resources import files
 34
 35import h5py
 36import meshio
 37import numpy as np
 38import yaml
 39from tqdm import tqdm
 40
 41from pydrex import core as _core
 42from pydrex import exceptions as _err
 43from pydrex import logger as _log
 44from pydrex import utils as _utils
 45from pydrex import velocity as _velocity
 46
 47SCSV_TYPEMAP = {
 48    "string": str,
 49    "integer": int,
 50    "float": float,
 51    "boolean": bool,
 52    "complex": complex,
 53}
 54"""Mapping of supported SCSV field types to corresponding Python types."""
 55
 56SCSV_TERSEMAP = {
 57    "s": "string",
 58    "i": "integer",
 59    "f": "float",
 60    "b": "boolean",
 61    "c": "complex",
 62}
 63"""Mapping of supported terse format SCSV field types to their standard names."""
 64
 65_SCSV_DEFAULT_TYPE = "string"
 66_SCSV_DEFAULT_FILL = ""
 67
 68
 69def extract_h5part(
 70    file, phase: _core.MineralPhase, fabric: _core.MineralFabric, n_grains: int, output
 71):
 72    """Extract CPO data from Fluidity h5part file and save to canonical formats."""
 73    from pydrex.minerals import Mineral
 74
 75    with h5py.File(file, "r") as f:
 76        for particle_id in f["Step#0/id"][:]:
 77            # Fluidity writes empty arrays to the particle data after they are deleted.
 78            # We need only the timesteps before deletion of this particle.
 79            steps = []
 80            for k in sorted(list(f.keys()), key=lambda s: int(s.lstrip("Step#"))):
 81                if f[f"{k}/x"].shape[0] >= particle_id:
 82                    steps.append(k)
 83
 84            # Temporary data arrays.
 85            n_timesteps = len(steps)
 86            x = np.zeros(n_timesteps)
 87            y = np.zeros(n_timesteps)
 88            z = np.zeros(n_timesteps)
 89            orientations = np.empty((n_timesteps, n_grains, 3, 3))
 90            fractions = np.empty((n_timesteps, n_grains))
 91
 92            strains = np.zeros(n_timesteps)
 93            for t, k in enumerate(
 94                tqdm(steps, desc=f"Extracting particle {particle_id}")
 95            ):
 96                # Extract particle position.
 97                x[t] = f[f"{k}/x"][particle_id - 1]
 98                y[t] = f[f"{k}/y"][particle_id - 1]
 99                z[t] = f[f"{k}/z"][particle_id - 1]
100
101                # Extract CPO data.
102                strains[t] = f[f"{k}/CPO_{n_grains * 10 + 1}"][particle_id - 1]
103                vals = np.empty(n_grains * 10)
104                for n in range(len(vals)):
105                    vals[n] = f[f"{k}/CPO_{n+1}"][particle_id - 1]
106
107                orientations[t] = np.array(
108                    [
109                        np.reshape(vals[n : n + 9], (3, 3))
110                        for n in range(0, 9 * n_grains, 9)
111                    ]
112                )
113                fractions[t] = vals[9 * n_grains :]
114
115            _postfix = str(particle_id)
116            _fractions = list(fractions)
117            _orientations = list(orientations)
118            mineral = Mineral(
119                phase=phase,
120                fabric=fabric,
121                n_grains=n_grains,
122                fractions_init=_fractions[0],
123                orientations_init=_orientations[0],
124            )
125            mineral.fractions = _fractions
126            mineral.orientations = _orientations
127            mineral.save(output, postfix=_postfix)
128            save_scsv(
129                output[:-4] + f"_{_postfix}" + ".scsv",
130                {
131                    "delimiter": ",",
132                    "missing": "-",
133                    "fields": [
134                        {
135                            "name": "strain",
136                            "type": "float",
137                            "unit": "percent",
138                            "fill": np.nan,
139                        },
140                        {
141                            "name": "x",
142                            "type": "float",
143                            "unit": "m",
144                            "fill": np.nan,
145                        },
146                        {
147                            "name": "y",
148                            "type": "float",
149                            "unit": "m",
150                            "fill": np.nan,
151                        },
152                        {
153                            "name": "z",
154                            "type": "float",
155                            "unit": "m",
156                            "fill": np.nan,
157                        },
158                    ],
159                },
160                [strains * 200, x, y, z],
161            )
162
163
164@_utils.defined_if(sys.version_info >= (3, 12))
165def parse_scsv_schema(terse_schema: str) -> dict:
166    """Parse terse scsv schema representation and return the expanded schema.
167
168    The terse schema is useful for command line tools and can be specified in a single
169    line of text. However, there are some limitations compared to using a Python
170    dictionary, all of which are edge cases and not recommended usage:
171    - the delimiter cannot be the character `d` or the character `m`
172    - the missing data encoding cannot be the character `m`
173    - fill values are not able to contain the colon (`:`) character
174    - the arbitrary unit/comment for any field is not able to contain parentheses
175
176    The delimiter is specified after the letter `d` and the missing data encoding after
177    `m`. These are succeeded by the column specs which are a sequence of column names
178    (which must be valid Python identifiers) and their (optional) data type, missing
179    data fill value, and unit/comment.
180
181    .. note:: This function is only defined if the version of your Python interpreter is
182        greater than 3.11.x.
183
184    >>> #                delimiter
185    >>> #                | missing data encoding    column specifications
186    >>> #                | |  ______________________|______________________________
187    >>> #                v v /                                                     `
188    >>> schemastring = "d,m-:colA(s)colB(s:N/A:...)colC()colD(i:999999)colE(f:NaN:%)"
189    >>> schema = parse_scsv_schema(schemastring)
190    >>> schema["delimiter"]
191    ','
192    >>> schema["missing"]
193    '-'
194    >>> schema["fields"][0]
195    {'name': 'colA', 'type': 'string', 'fill': ''}
196    >>> schema["fields"][1]
197    {'name': 'colB', 'type': 'string', 'fill': 'N/A', 'unit': '...'}
198    >>> schema["fields"][2]
199    {'name': 'colC', 'type': 'string', 'fill': ''}
200    >>> schema["fields"][3]
201    {'name': 'colD', 'type': 'integer', 'fill': '999999'}
202    >>> schema["fields"][4]
203    {'name': 'colE', 'type': 'float', 'fill': 'NaN', 'unit': '%'}
204
205    """
206    if not terse_schema.startswith("d"):
207        raise _err.SCSVError(
208            "terse schema must start with delimiter specification (format: d<delimiter>)"
209        )
210    i_cols = terse_schema.find(":")
211    if i_cols < 4:
212        raise _err.SCSVError(
213            "could not parse missing data encoding from terse SCSV schema"
214        )
215    i_missing = terse_schema.find("m", 0, i_cols)
216    if i_missing < 2:
217        raise _err.SCSVError(
218            "could not parse missing data encoding from terse SCSV schema"
219        )
220
221    delimiter = terse_schema[1:i_missing]
222    missing = terse_schema[i_missing + 1 : i_cols]
223
224    raw_colspecs = re.split(r"\(|\)", terse_schema[i_cols + 1 :])
225    raw_colspecs.pop()  # Get rid of additional last empty string element.
226    if len(raw_colspecs) < 2:
227        raise _err.SCSVError("failed to parse any fields from terse SCSV schema")
228    if len(raw_colspecs) % 2 != 0:
229        raise _err.SCSVError("invalid field specifications in terse SCSV schema")
230
231    fields = []
232    for name, spec in it.batched(raw_colspecs, 2):
233        _spec = spec.split(":")
234        _type = _SCSV_DEFAULT_TYPE
235        if _spec[0] != "":
236            try:
237                _type = SCSV_TERSEMAP[_spec[0]]
238            except KeyError:
239                raise _err.SCSVError(
240                    f"invalid field type {_spec[0]} in terse SCSV schema"
241                ) from None
242        field = {
243            "name": name,
244            "type": _type,
245            "fill": _spec[1] if len(_spec) > 1 else _SCSV_DEFAULT_FILL,
246        }
247        if len(_spec) == 3:
248            field["unit"] = _spec[2]
249        fields.append(field)
250    return {"delimiter": delimiter, "missing": missing, "fields": fields}
251
252
253def read_scsv(file):
254    """Read data from an SCSV file.
255
256    Prints the YAML header section to output and returns a NamedTuple with columns of
257    the csv data. See also `save_scsv`.
258
259    """
260    with open(resolve_path(file)) as fileref:
261        yaml_lines = []
262        csv_lines = []
263
264        is_yaml = False
265        for line in fileref:
266            if line == "\n":  # Empty lines are skipped.
267                continue
268            if line == "---\n":
269                if is_yaml:
270                    is_yaml = False  # Second --- ends YAML section.
271                    continue
272                else:
273                    is_yaml = True  # First --- begins YAML section.
274                    continue
275
276            if is_yaml:
277                yaml_lines.append(line)
278            else:
279                csv_lines.append(line)
280
281        metadata = yaml.safe_load(io.StringIO("".join(yaml_lines)))
282        schema = metadata["schema"]
283        if not _validate_scsv_schema(schema):
284            raise _err.SCSVError(
285                f"unable to parse SCSV schema from '{file}'."
286                + " Check logging output for details."
287            )
288        reader = csv.reader(
289            csv_lines, delimiter=schema["delimiter"], skipinitialspace=True
290        )
291
292        schema_colnames = [d["name"] for d in schema["fields"]]
293        header_colnames = [s.strip() for s in next(reader)]
294        if not schema_colnames == header_colnames:
295            raise _err.SCSVError(
296                f"schema field names must match column headers in '{file}'."
297                + f" You've supplied schema fields\n{schema_colnames}"
298                + f"\n with column headers\n{header_colnames}"
299            )
300
301        _log.info("reading SCSV file: %s", resolve_path(file))
302        Columns = c.namedtuple("Columns", schema_colnames)
303        # __dict__() and __slots__() of NamedTuples is empty :(
304        # Set up some pretty printing instead to give a quick view of column names.
305        Columns.__str__ = lambda self: f"Columns: {self._fields}"
306        Columns._repr_pretty_ = lambda self, p, _: p.text(f"Columns: {self._fields}")
307        # Also add some extra attributes to inspect the schema and yaml header.
308        Columns._schema = schema
309        Columns._metadata = (
310            "".join(yaml_lines)
311            .replace("# ", "")
312            .replace("-\n", "")
313            .replace("\n", " ")
314            .rsplit("schema:", maxsplit=1)[0]  # Assumes comments are above the schema.
315        )
316        coltypes = [
317            SCSV_TYPEMAP[d.get("type", _SCSV_DEFAULT_TYPE)] for d in schema["fields"]
318        ]
319        missingstr = schema["missing"]
320        fillvals = [d.get("fill", _SCSV_DEFAULT_FILL) for d in schema["fields"]]
321        return Columns._make(
322            [
323                tuple(
324                    map(
325                        ft.partial(
326                            _parse_scsv_cell, f, missingstr=missingstr, fillval=fill
327                        ),
328                        x,
329                    )
330                )
331                for f, fill, x in zip(
332                    coltypes, fillvals, zip(*list(reader), strict=True), strict=True
333                )
334            ]
335        )
336
337
338def write_scsv_header(stream, schema, comments=None):
339    """Write YAML header to an SCSV stream.
340
341    - `stream` — open output stream (e.g. file handle) where data should be written
342    - `schema` — SCSV schema dictionary, with 'delimiter', 'missing' and 'fields' keys
343    - `comments` (optional) — array of comments to be written above the schema, each on
344      a new line with an '#' prefix
345
346    See also `read_scsv`, `save_scsv`.
347
348    """
349    if not _validate_scsv_schema(schema):
350        raise _err.SCSVError(
351            "refusing to write invalid schema to stream."
352            + " Check logging output for details."
353        )
354
355    stream.write("---" + os.linesep)
356    if comments is not None:
357        for comment in comments:
358            stream.write("# " + comment + os.linesep)
359    stream.write("schema:" + os.linesep)
360    delimiter = schema["delimiter"]
361    missing = schema["missing"]
362    stream.write(f"  delimiter: '{delimiter}'{os.linesep}")
363    stream.write(f"  missing: '{missing}'{os.linesep}")
364    stream.write("  fields:" + os.linesep)
365
366    for field in schema["fields"]:
367        name = field["name"]
368        kind = field.get("type", _SCSV_DEFAULT_TYPE)
369        stream.write(f"    - name: {name}{os.linesep}")
370        stream.write(f"      type: {kind}{os.linesep}")
371        if "unit" in field:
372            unit = field["unit"]
373            stream.write(f"      unit: {unit}{os.linesep}")
374        if "fill" in field:
375            fill = field["fill"]
376            stream.write(f"      fill: {fill}{os.linesep}")
377    stream.write("---" + os.linesep)
378
379
380def save_scsv(file, schema, data, **kwargs):
381    """Save data to SCSV file.
382
383    - `file` — path to the file where the data should be written
384    - `schema` — SCSV schema dictionary, with 'delimiter', 'missing' and 'fields' keys
385    - `data` — data arrays (columns) of equal length
386
387    Optional keyword arguments are passed to `write_scsv_header`. See also `read_scsv`.
388
389    """
390    path = resolve_path(file)
391    n_rows = len(data[0])
392    for col in data[1:]:
393        if len(col) != n_rows:
394            raise _err.SCSVError(
395                "refusing to write data columns of unequal length to SCSV file"
396            )
397
398    _log.info("writing to SCSV file: %s", file)
399    try:  # Check that the output is valid by attempting to parse.
400        with open(path, mode="w") as stream:
401            write_scsv_header(stream, schema, **kwargs)
402            fills = [
403                field.get("fill", _SCSV_DEFAULT_FILL) for field in schema["fields"]
404            ]
405            types = [
406                SCSV_TYPEMAP[field.get("type", _SCSV_DEFAULT_TYPE)]
407                for field in schema["fields"]
408            ]
409            names = [field["name"] for field in schema["fields"]]
410            writer = csv.writer(
411                stream, delimiter=schema["delimiter"], lineterminator=os.linesep
412            )
413            writer.writerow(names)
414
415            # No need for strict=True here since column lengths were already checked.
416            for col in zip(*data):
417                row = []
418                for i, (d, t, f) in enumerate(zip(col, types, fills, strict=True)):
419                    try:
420                        _parse_scsv_cell(
421                            t, str(d), missingstr=schema["missing"], fillval=f
422                        )
423                    except ValueError:
424                        raise _err.SCSVError(
425                            f"invalid data for column '{names[i]}'."
426                            + f" Cannot parse {d} as type '{t.__qualname__}'."
427                        ) from None
428                    if isinstance(t, bool):
429                        row.append(d)
430                    elif t in (float, complex):
431                        if np.isnan(d) and np.isnan(t(f)):
432                            row.append(schema["missing"])
433                        elif d == t(f):
434                            row.append(schema["missing"])
435                        else:
436                            row.append(d)
437                    elif t in (int, str) and d == t(f):
438                        row.append(schema["missing"])
439                    else:
440                        row.append(d)
441                writer.writerow(row)
442    except ValueError:
443        path.unlink(missing_ok=True)
444        raise _err.SCSVError(
445            "number of fields declared in schema does not match number of data columns."
446            + f" Declared schema fields were {names}; got {len(data)} data columns"
447        ) from None
448
449
450def parse_config(path):
451    """Parse a TOML file containing PyDRex configuration."""
452    path = resolve_path(path)
453    _log.info("parsing configuration file: %s", path)
454    with open(path, "rb") as file:
455        toml = tomllib.load(file)
456
457    # Use provided name or set randomized default.
458    toml["name"] = toml.get(
459        "name", f"pydrex.{np.random.default_rng().integers(1,1e10)}"
460    )
461
462    toml["parameters"] = _parse_config_params(toml)
463    _params = toml["parameters"]
464    toml["input"] = _parse_config_input_common(toml, path)
465    _input = toml["input"]
466
467    if "mesh" in _input:
468        # Input option 1: velocity gradient mesh + final particle locations.
469        _input = _parse_config_input_steadymesh(_input, path)
470    elif "velocity_gradient" in _input:
471        # Input option 2: velocity gradient callable + initial locations.
472        _input = _parse_config_input_calcpaths(_input, path)
473    elif "paths" in _input:
474        # Input option 3: NPZ or SCSV files with pre-computed input pathlines.
475        _input = _parse_config_input_postpaths(_input, path)
476    else:
477        _input["paths"] = None
478
479    # Output fields are optional, default: most data output, least logging output.
480    _output = toml.get("output", {})
481    if "directory" in _output:
482        _output["directory"] = resolve_path(_output["directory"], path.parent)
483    else:
484        _output["directory"] = resolve_path(pathlib.Path.cwd())
485
486    # Raw output means rotation matrices and grain volumes.
487    _parse_output_options(_output, "raw_output", _params["phase_assemblage"])
488    # Diagnostic output means texture diagnostics (strength, symmetry, mean angle).
489    _parse_output_options(_output, "diagnostics", _params["phase_assemblage"])
490    # Anisotropy output means hexagonal symmetry axis and ΔVp (%).
491    _output["anisotropy"] = _output.get(
492        "anisotropy", ["Voigt", "hexaxis", "moduli", "%decomp"]
493    )
494
495    # Optional SCSV or NPZ pathline outputs, not sensible if there are pathline inputs.
496    if "paths" in _input and "paths" in _output:
497        _log.warning(
498            "input pathlines and output pathline filenames are mutually exclusive;"
499            + " ignoring output pathline filenames"
500        )
501        _output["paths"] = None
502    _output["paths"] = _output.get("paths", None)
503
504    # Default logging level for all log files.
505    _output["log_level"] = _output.get("log_level", "WARNING")
506
507    return toml
508
509
510def resolve_path(path, refdir=None):
511    """Resolve relative paths and create parent directories if necessary.
512
513    Relative paths are interpreted with respect to the current working directory,
514    i.e. the directory from whith the current Python process was executed,
515    unless a specific reference directory is provided with `refdir`.
516
517    """
518    cwd = pathlib.Path.cwd()
519    if refdir is None:
520        _path = cwd / path
521    else:
522        _path = refdir / path
523    _path.parent.mkdir(parents=True, exist_ok=True)
524    return _path.resolve()
525
526
527def _parse_config_params(toml):
528    """Parse DRex and other rheology parameters."""
529    _params = toml.get("parameters", {})
530    for key, default in _core.DefaultParams().as_dict().items():
531        _params[key] = _params.get(key, default)
532
533    # Make sure volume fractions sum to 1.
534    if np.abs(np.sum(_params["phase_fractions"]) - 1.0) > 1e-16:
535        raise _err.ConfigError(
536            "Volume fractions of mineral phases must sum to 1."
537            + f" You've provided phase_fractions = {_params['phase_fractions']}."
538        )
539
540    # Make sure all mineral phases are accounted for and valid.
541    if len(_params["phase_assemblage"]) != len(_params["phase_fractions"]):
542        raise _err.ConfigError(
543            "All mineral phases must have an associated volume fraction."
544            + f" You've provided phase_assemblage = {_params['phase_assemblage']} and"
545            + f" phase_fractions = {_params['phase_fractions']}."
546        )
547    try:
548        _params["phase_assemblage"] = tuple(
549            _parse_phase(ϕ) for ϕ in _params["phase_assemblage"]
550        )
551    except AttributeError:
552        raise _err.ConfigError(
553            f"invalid phase assemblage: {_params['phase_assemblage']}"
554        ) from None
555
556    # Make sure initial olivine fabric is valid.
557    try:
558        _params["initial_olivine_fabric"] = getattr(
559            _core.MineralFabric, "olivine_" + _params["initial_olivine_fabric"]
560        )
561    except AttributeError:
562        raise _err.ConfigError(
563            f"invalid initial olivine fabric: {_params['initial_olivine_fabric']}"
564        ) from None
565
566    # Make sure we have enough unified dislocation creep law coefficients.
567    n_provided = len(_params["disl_coefficients"])
568    n_required = len(_core.DefaultParams().disl_coefficients)
569    if n_provided != n_required:
570        raise _err.ConfigError(
571            "not enough unified dislocation creep law coefficients."
572            + f"You've provided {n_provided}/{n_required} coefficients."
573        )
574    _params["disl_coefficients"] = tuple(_params["disl_coefficients"])
575
576    return _params
577
578
579def _parse_config_input_common(toml, path):
580    try:
581        _input = toml["input"]
582    except KeyError:
583        raise _err.ConfigError(f"missing [input] section in '{path}'") from None
584    if "timestep" not in _input and "paths" not in _input:
585        raise _err.ConfigError(f"unspecified input timestep in '{path}'")
586
587    _input["timestep"] = _input.get("timestep", np.nan)
588    if not isinstance(_input["timestep"], float | int):
589        raise _err.ConfigError(
590            f"timestep must be float or int, not {type(input['timestep'])}"
591        )
592
593    _input["strain_final"] = _input.get("strain_final", np.inf)
594    if not isinstance(_input["strain_final"], float | int):
595        raise _err.ConfigError(
596            f"final strain must be float or int, not {type(input['strain_final'])}"
597        )
598
599    return _input
600
601
602def _parse_config_input_steadymesh(input, path):
603    input["mesh"] = meshio.read(resolve_path(input["mesh"], path.parent))
604    input["locations_final"] = read_scsv(
605        resolve_path(input["locations_final"], path.parent)
606    )
607    if "velocity_gradient" in input:
608        _log.warning(
609            "input mesh and velocity gradient callable are mutually exclusive;"
610            + " ignoring velocity gradient callable"
611        )
612    if "locations_initial" in input:
613        _log.warning(
614            "initial particle locations are not used for pathline interpolation"
615            + " and will be ignored"
616        )
617    if "paths" in input:
618        _log.warning(
619            "input mesh and input pathlines are mutually exclusive;"
620            + " ignoring input pathlines"
621        )
622    input["velocity_gradient"] = None
623    input["locations_initial"] = None
624    input["paths"] = None
625    return input
626
627
628def _parse_config_input_calcpaths(input, path):
629    _velocity_gradient_func = getattr(_velocity, input["velocity_gradient"][0])
630    input["velocity_gradient"] = _velocity_gradient_func(
631        *input["velocity_gradient"][1:]
632    )
633    input["locations_initial"] = read_scsv(
634        resolve_path(input["locations_initial"], path.parent)
635    )
636    if "locations_final" in input:
637        _log.warning(
638            "final particle locations are not used for forward advection"
639            + " and will be ignored"
640        )
641    if "paths" in input:
642        _log.warning(
643            "velocity gradient callable and input pathlines are mutually exclusive;"
644            + " ignoring input pathlines"
645        )
646    input["locations_final"] = None
647    input["paths"] = None
648    input["mesh"] = None
649    return input
650
651
652def _parse_config_input_postpaths(input, path):
653    input["paths"] = [np.load(resolve_path(p, path.parent)) for p in input["paths"]]
654    if "locations_initial" in input:
655        _log.warning(
656            "input pathlines and initial particle locations are mutually exclusive;"
657            + " ignoring initial particle locations"
658        )
659    if "locations_final" in input:
660        _log.warning(
661            "input pathlines and final particle locations are mutually exclusive;"
662            + " ignoring final particle locations"
663        )
664    input["locations_initial"] = None
665    input["locations_final"] = None
666    input["mesh"] = None
667    return input
668
669
670def _parse_output_options(output_opts, level, phase_assemblage):
671    try:
672        output_opts[level] = [
673            getattr(_core.MineralPhase, ϕ) for ϕ in output_opts[level]
674        ]
675    except AttributeError:
676        raise _err.ConfigError(
677            f"unsupported mineral phase in '{level}' output option.\n"
678            + f" You supplied the value: {output_opts[level]}.\n"
679            + " Check pydrex.core.MineralPhase for supported phases."
680        ) from None
681    for phase in output_opts[level]:
682        if phase not in phase_assemblage:
683            raise _err.ConfigError(
684                f"cannot output '{level}' for phase that is not being simulated"
685            )
686
687
688def _parse_phase(ϕ: str | _core.MineralPhase | int) -> _core.MineralPhase:
689    if isinstance(ϕ, str):
690        try:
691            return getattr(_core.MineralPhase, ϕ)
692        except AttributeError:
693            raise _err.ConfigError(f"invalid phase in phase assemblage: {ϕ}") from None
694    elif isinstance(ϕ, _core.MineralPhase):
695        return ϕ
696    elif isinstance(ϕ, int):
697        try:
698            return _core.MineralPhase(ϕ)
699        except IndexError:
700            raise _err.ConfigError(f"invalid phase in phase assemblage: {ϕ}") from None
701    raise _err.ConfigError(f"invalid phase in phase assemblage: {ϕ}") from None
702
703
704def _validate_scsv_schema(schema):
705    format_ok = (
706        "delimiter" in schema
707        and "missing" in schema
708        and "fields" in schema
709        and len(schema["fields"]) > 0
710        and schema["delimiter"] != schema["missing"]
711        and schema["delimiter"] not in schema["missing"]
712    )
713    if not format_ok:
714        _log.error(
715            "invalid format for SCSV schema: %s"
716            + "\nMust contain: 'delimiter', 'missing', 'fields'"
717            + "\nMust contain at least one field."
718            + "\nMust contain compatible 'missing' and 'delimiter' values.",
719            schema,
720        )
721        return False
722    for field in schema["fields"]:
723        if not field["name"].isidentifier():
724            _log.error(
725                "SCSV field name '%s' is not a valid Python identifier", field["name"]
726            )
727            return False
728        if field.get("type", _SCSV_DEFAULT_TYPE) not in SCSV_TYPEMAP.keys():
729            _log.error("unsupported SCSV field type: '%s'", field["type"])
730            return False
731        if (
732            field.get("type", _SCSV_DEFAULT_TYPE) not in (_SCSV_DEFAULT_TYPE, "boolean")
733            and "fill" not in field
734        ):
735            _log.error("SCSV field of type '%s' requires a fill value", field["type"])
736            return False
737    return True
738
739
740def _parse_scsv_bool(x):
741    """Parse boolean from string, for SCSV files."""
742    return str(x).lower() in ("yes", "true", "t", "1")
743
744
745def _parse_scsv_cell(func, data, missingstr=None, fillval=None):
746    if data.strip() == missingstr:
747        if fillval == "NaN":
748            return func(np.nan)
749        return func(fillval)
750    elif func.__qualname__ == "bool":
751        return _parse_scsv_bool(data)
752    return func(data.strip())
753
754
755def stringify(s):
756    """Return a cleaned version of a string for use in filenames, etc."""
757    return "".join(filter(lambda c: str.isidentifier(c) or str.isdecimal(c), str(s)))
758
759
760def data(directory):
761    """Get resolved path to a pydrex data directory."""
762    resources = files("pydrex.data")
763    if (resources / directory).is_dir():
764        return resolve_path(resources / directory)
765    else:
766        raise NotADirectoryError(f"{resources / directory} is not a directory")
767
768
769@cl.contextmanager
770def logfile_enable(path, level=logging.DEBUG, mode="w"):
771    """Enable logging to a file at `path` with given `level`."""
772    logger_file = logging.FileHandler(resolve_path(path), mode=mode)
773    logger_file.setFormatter(
774        logging.Formatter(
775            "%(levelname)s [%(asctime)s] %(name)s: %(message)s",
776            datefmt="%Y-%m-%d %H:%M:%S",
777        )
778    )
779    logger_file.setLevel(level)
780    _log.LOGGER.addHandler(logger_file)
781    yield
782    logger_file.close()
SCSV_TYPEMAP = {'string': <class 'str'>, 'integer': <class 'int'>, 'float': <class 'float'>, 'boolean': <class 'bool'>, 'complex': <class 'complex'>}

Mapping of supported SCSV field types to corresponding Python types.

SCSV_TERSEMAP = {'s': 'string', 'i': 'integer', 'f': 'float', 'b': 'boolean', 'c': 'complex'}

Mapping of supported terse format SCSV field types to their standard names.

def extract_h5part( file, phase: pydrex.core.MineralPhase, fabric: pydrex.core.MineralFabric, n_grains: int, output):
 70def extract_h5part(
 71    file, phase: _core.MineralPhase, fabric: _core.MineralFabric, n_grains: int, output
 72):
 73    """Extract CPO data from Fluidity h5part file and save to canonical formats."""
 74    from pydrex.minerals import Mineral
 75
 76    with h5py.File(file, "r") as f:
 77        for particle_id in f["Step#0/id"][:]:
 78            # Fluidity writes empty arrays to the particle data after they are deleted.
 79            # We need only the timesteps before deletion of this particle.
 80            steps = []
 81            for k in sorted(list(f.keys()), key=lambda s: int(s.lstrip("Step#"))):
 82                if f[f"{k}/x"].shape[0] >= particle_id:
 83                    steps.append(k)
 84
 85            # Temporary data arrays.
 86            n_timesteps = len(steps)
 87            x = np.zeros(n_timesteps)
 88            y = np.zeros(n_timesteps)
 89            z = np.zeros(n_timesteps)
 90            orientations = np.empty((n_timesteps, n_grains, 3, 3))
 91            fractions = np.empty((n_timesteps, n_grains))
 92
 93            strains = np.zeros(n_timesteps)
 94            for t, k in enumerate(
 95                tqdm(steps, desc=f"Extracting particle {particle_id}")
 96            ):
 97                # Extract particle position.
 98                x[t] = f[f"{k}/x"][particle_id - 1]
 99                y[t] = f[f"{k}/y"][particle_id - 1]
100                z[t] = f[f"{k}/z"][particle_id - 1]
101
102                # Extract CPO data.
103                strains[t] = f[f"{k}/CPO_{n_grains * 10 + 1}"][particle_id - 1]
104                vals = np.empty(n_grains * 10)
105                for n in range(len(vals)):
106                    vals[n] = f[f"{k}/CPO_{n+1}"][particle_id - 1]
107
108                orientations[t] = np.array(
109                    [
110                        np.reshape(vals[n : n + 9], (3, 3))
111                        for n in range(0, 9 * n_grains, 9)
112                    ]
113                )
114                fractions[t] = vals[9 * n_grains :]
115
116            _postfix = str(particle_id)
117            _fractions = list(fractions)
118            _orientations = list(orientations)
119            mineral = Mineral(
120                phase=phase,
121                fabric=fabric,
122                n_grains=n_grains,
123                fractions_init=_fractions[0],
124                orientations_init=_orientations[0],
125            )
126            mineral.fractions = _fractions
127            mineral.orientations = _orientations
128            mineral.save(output, postfix=_postfix)
129            save_scsv(
130                output[:-4] + f"_{_postfix}" + ".scsv",
131                {
132                    "delimiter": ",",
133                    "missing": "-",
134                    "fields": [
135                        {
136                            "name": "strain",
137                            "type": "float",
138                            "unit": "percent",
139                            "fill": np.nan,
140                        },
141                        {
142                            "name": "x",
143                            "type": "float",
144                            "unit": "m",
145                            "fill": np.nan,
146                        },
147                        {
148                            "name": "y",
149                            "type": "float",
150                            "unit": "m",
151                            "fill": np.nan,
152                        },
153                        {
154                            "name": "z",
155                            "type": "float",
156                            "unit": "m",
157                            "fill": np.nan,
158                        },
159                    ],
160                },
161                [strains * 200, x, y, z],
162            )

Extract CPO data from Fluidity h5part file and save to canonical formats.

def parse_scsv_schema(terse_schema: str) -> dict:
165@_utils.defined_if(sys.version_info >= (3, 12))
166def parse_scsv_schema(terse_schema: str) -> dict:
167    """Parse terse scsv schema representation and return the expanded schema.
168
169    The terse schema is useful for command line tools and can be specified in a single
170    line of text. However, there are some limitations compared to using a Python
171    dictionary, all of which are edge cases and not recommended usage:
172    - the delimiter cannot be the character `d` or the character `m`
173    - the missing data encoding cannot be the character `m`
174    - fill values are not able to contain the colon (`:`) character
175    - the arbitrary unit/comment for any field is not able to contain parentheses
176
177    The delimiter is specified after the letter `d` and the missing data encoding after
178    `m`. These are succeeded by the column specs which are a sequence of column names
179    (which must be valid Python identifiers) and their (optional) data type, missing
180    data fill value, and unit/comment.
181
182    .. note:: This function is only defined if the version of your Python interpreter is
183        greater than 3.11.x.
184
185    >>> #                delimiter
186    >>> #                | missing data encoding    column specifications
187    >>> #                | |  ______________________|______________________________
188    >>> #                v v /                                                     `
189    >>> schemastring = "d,m-:colA(s)colB(s:N/A:...)colC()colD(i:999999)colE(f:NaN:%)"
190    >>> schema = parse_scsv_schema(schemastring)
191    >>> schema["delimiter"]
192    ','
193    >>> schema["missing"]
194    '-'
195    >>> schema["fields"][0]
196    {'name': 'colA', 'type': 'string', 'fill': ''}
197    >>> schema["fields"][1]
198    {'name': 'colB', 'type': 'string', 'fill': 'N/A', 'unit': '...'}
199    >>> schema["fields"][2]
200    {'name': 'colC', 'type': 'string', 'fill': ''}
201    >>> schema["fields"][3]
202    {'name': 'colD', 'type': 'integer', 'fill': '999999'}
203    >>> schema["fields"][4]
204    {'name': 'colE', 'type': 'float', 'fill': 'NaN', 'unit': '%'}
205
206    """
207    if not terse_schema.startswith("d"):
208        raise _err.SCSVError(
209            "terse schema must start with delimiter specification (format: d<delimiter>)"
210        )
211    i_cols = terse_schema.find(":")
212    if i_cols < 4:
213        raise _err.SCSVError(
214            "could not parse missing data encoding from terse SCSV schema"
215        )
216    i_missing = terse_schema.find("m", 0, i_cols)
217    if i_missing < 2:
218        raise _err.SCSVError(
219            "could not parse missing data encoding from terse SCSV schema"
220        )
221
222    delimiter = terse_schema[1:i_missing]
223    missing = terse_schema[i_missing + 1 : i_cols]
224
225    raw_colspecs = re.split(r"\(|\)", terse_schema[i_cols + 1 :])
226    raw_colspecs.pop()  # Get rid of additional last empty string element.
227    if len(raw_colspecs) < 2:
228        raise _err.SCSVError("failed to parse any fields from terse SCSV schema")
229    if len(raw_colspecs) % 2 != 0:
230        raise _err.SCSVError("invalid field specifications in terse SCSV schema")
231
232    fields = []
233    for name, spec in it.batched(raw_colspecs, 2):
234        _spec = spec.split(":")
235        _type = _SCSV_DEFAULT_TYPE
236        if _spec[0] != "":
237            try:
238                _type = SCSV_TERSEMAP[_spec[0]]
239            except KeyError:
240                raise _err.SCSVError(
241                    f"invalid field type {_spec[0]} in terse SCSV schema"
242                ) from None
243        field = {
244            "name": name,
245            "type": _type,
246            "fill": _spec[1] if len(_spec) > 1 else _SCSV_DEFAULT_FILL,
247        }
248        if len(_spec) == 3:
249            field["unit"] = _spec[2]
250        fields.append(field)
251    return {"delimiter": delimiter, "missing": missing, "fields": fields}

Parse terse scsv schema representation and return the expanded schema.

The terse schema is useful for command line tools and can be specified in a single line of text. However, there are some limitations compared to using a Python dictionary, all of which are edge cases and not recommended usage:

  • the delimiter cannot be the character d or the character m
  • the missing data encoding cannot be the character m
  • fill values are not able to contain the colon (:) character
  • the arbitrary unit/comment for any field is not able to contain parentheses

The delimiter is specified after the letter d and the missing data encoding after m. These are succeeded by the column specs which are a sequence of column names (which must be valid Python identifiers) and their (optional) data type, missing data fill value, and unit/comment.

This function is only defined if the version of your Python interpreter is

greater than 3.11.x.

>>> #                delimiter
>>> #                | missing data encoding    column specifications
>>> #                | |  ______________________|______________________________
>>> #                v v /                                                     `
>>> schemastring = "d,m-:colA(s)colB(s:N/A:...)colC()colD(i:999999)colE(f:NaN:%)"
>>> schema = parse_scsv_schema(schemastring)
>>> schema["delimiter"]
','
>>> schema["missing"]
'-'
>>> schema["fields"][0]
{'name': 'colA', 'type': 'string', 'fill': ''}
>>> schema["fields"][1]
{'name': 'colB', 'type': 'string', 'fill': 'N/A', 'unit': '...'}
>>> schema["fields"][2]
{'name': 'colC', 'type': 'string', 'fill': ''}
>>> schema["fields"][3]
{'name': 'colD', 'type': 'integer', 'fill': '999999'}
>>> schema["fields"][4]
{'name': 'colE', 'type': 'float', 'fill': 'NaN', 'unit': '%'}
def read_scsv(file):
254def read_scsv(file):
255    """Read data from an SCSV file.
256
257    Prints the YAML header section to output and returns a NamedTuple with columns of
258    the csv data. See also `save_scsv`.
259
260    """
261    with open(resolve_path(file)) as fileref:
262        yaml_lines = []
263        csv_lines = []
264
265        is_yaml = False
266        for line in fileref:
267            if line == "\n":  # Empty lines are skipped.
268                continue
269            if line == "---\n":
270                if is_yaml:
271                    is_yaml = False  # Second --- ends YAML section.
272                    continue
273                else:
274                    is_yaml = True  # First --- begins YAML section.
275                    continue
276
277            if is_yaml:
278                yaml_lines.append(line)
279            else:
280                csv_lines.append(line)
281
282        metadata = yaml.safe_load(io.StringIO("".join(yaml_lines)))
283        schema = metadata["schema"]
284        if not _validate_scsv_schema(schema):
285            raise _err.SCSVError(
286                f"unable to parse SCSV schema from '{file}'."
287                + " Check logging output for details."
288            )
289        reader = csv.reader(
290            csv_lines, delimiter=schema["delimiter"], skipinitialspace=True
291        )
292
293        schema_colnames = [d["name"] for d in schema["fields"]]
294        header_colnames = [s.strip() for s in next(reader)]
295        if not schema_colnames == header_colnames:
296            raise _err.SCSVError(
297                f"schema field names must match column headers in '{file}'."
298                + f" You've supplied schema fields\n{schema_colnames}"
299                + f"\n with column headers\n{header_colnames}"
300            )
301
302        _log.info("reading SCSV file: %s", resolve_path(file))
303        Columns = c.namedtuple("Columns", schema_colnames)
304        # __dict__() and __slots__() of NamedTuples is empty :(
305        # Set up some pretty printing instead to give a quick view of column names.
306        Columns.__str__ = lambda self: f"Columns: {self._fields}"
307        Columns._repr_pretty_ = lambda self, p, _: p.text(f"Columns: {self._fields}")
308        # Also add some extra attributes to inspect the schema and yaml header.
309        Columns._schema = schema
310        Columns._metadata = (
311            "".join(yaml_lines)
312            .replace("# ", "")
313            .replace("-\n", "")
314            .replace("\n", " ")
315            .rsplit("schema:", maxsplit=1)[0]  # Assumes comments are above the schema.
316        )
317        coltypes = [
318            SCSV_TYPEMAP[d.get("type", _SCSV_DEFAULT_TYPE)] for d in schema["fields"]
319        ]
320        missingstr = schema["missing"]
321        fillvals = [d.get("fill", _SCSV_DEFAULT_FILL) for d in schema["fields"]]
322        return Columns._make(
323            [
324                tuple(
325                    map(
326                        ft.partial(
327                            _parse_scsv_cell, f, missingstr=missingstr, fillval=fill
328                        ),
329                        x,
330                    )
331                )
332                for f, fill, x in zip(
333                    coltypes, fillvals, zip(*list(reader), strict=True), strict=True
334                )
335            ]
336        )

Read data from an SCSV file.

Prints the YAML header section to output and returns a NamedTuple with columns of the csv data. See also save_scsv.

def write_scsv_header(stream, schema, comments=None):
339def write_scsv_header(stream, schema, comments=None):
340    """Write YAML header to an SCSV stream.
341
342    - `stream` — open output stream (e.g. file handle) where data should be written
343    - `schema` — SCSV schema dictionary, with 'delimiter', 'missing' and 'fields' keys
344    - `comments` (optional) — array of comments to be written above the schema, each on
345      a new line with an '#' prefix
346
347    See also `read_scsv`, `save_scsv`.
348
349    """
350    if not _validate_scsv_schema(schema):
351        raise _err.SCSVError(
352            "refusing to write invalid schema to stream."
353            + " Check logging output for details."
354        )
355
356    stream.write("---" + os.linesep)
357    if comments is not None:
358        for comment in comments:
359            stream.write("# " + comment + os.linesep)
360    stream.write("schema:" + os.linesep)
361    delimiter = schema["delimiter"]
362    missing = schema["missing"]
363    stream.write(f"  delimiter: '{delimiter}'{os.linesep}")
364    stream.write(f"  missing: '{missing}'{os.linesep}")
365    stream.write("  fields:" + os.linesep)
366
367    for field in schema["fields"]:
368        name = field["name"]
369        kind = field.get("type", _SCSV_DEFAULT_TYPE)
370        stream.write(f"    - name: {name}{os.linesep}")
371        stream.write(f"      type: {kind}{os.linesep}")
372        if "unit" in field:
373            unit = field["unit"]
374            stream.write(f"      unit: {unit}{os.linesep}")
375        if "fill" in field:
376            fill = field["fill"]
377            stream.write(f"      fill: {fill}{os.linesep}")
378    stream.write("---" + os.linesep)

Write YAML header to an SCSV stream.

  • stream — open output stream (e.g. file handle) where data should be written
  • schema — SCSV schema dictionary, with 'delimiter', 'missing' and 'fields' keys
  • comments (optional) — array of comments to be written above the schema, each on a new line with an '#' prefix

See also read_scsv, save_scsv.

def save_scsv(file, schema, data, **kwargs):
381def save_scsv(file, schema, data, **kwargs):
382    """Save data to SCSV file.
383
384    - `file` — path to the file where the data should be written
385    - `schema` — SCSV schema dictionary, with 'delimiter', 'missing' and 'fields' keys
386    - `data` — data arrays (columns) of equal length
387
388    Optional keyword arguments are passed to `write_scsv_header`. See also `read_scsv`.
389
390    """
391    path = resolve_path(file)
392    n_rows = len(data[0])
393    for col in data[1:]:
394        if len(col) != n_rows:
395            raise _err.SCSVError(
396                "refusing to write data columns of unequal length to SCSV file"
397            )
398
399    _log.info("writing to SCSV file: %s", file)
400    try:  # Check that the output is valid by attempting to parse.
401        with open(path, mode="w") as stream:
402            write_scsv_header(stream, schema, **kwargs)
403            fills = [
404                field.get("fill", _SCSV_DEFAULT_FILL) for field in schema["fields"]
405            ]
406            types = [
407                SCSV_TYPEMAP[field.get("type", _SCSV_DEFAULT_TYPE)]
408                for field in schema["fields"]
409            ]
410            names = [field["name"] for field in schema["fields"]]
411            writer = csv.writer(
412                stream, delimiter=schema["delimiter"], lineterminator=os.linesep
413            )
414            writer.writerow(names)
415
416            # No need for strict=True here since column lengths were already checked.
417            for col in zip(*data):
418                row = []
419                for i, (d, t, f) in enumerate(zip(col, types, fills, strict=True)):
420                    try:
421                        _parse_scsv_cell(
422                            t, str(d), missingstr=schema["missing"], fillval=f
423                        )
424                    except ValueError:
425                        raise _err.SCSVError(
426                            f"invalid data for column '{names[i]}'."
427                            + f" Cannot parse {d} as type '{t.__qualname__}'."
428                        ) from None
429                    if isinstance(t, bool):
430                        row.append(d)
431                    elif t in (float, complex):
432                        if np.isnan(d) and np.isnan(t(f)):
433                            row.append(schema["missing"])
434                        elif d == t(f):
435                            row.append(schema["missing"])
436                        else:
437                            row.append(d)
438                    elif t in (int, str) and d == t(f):
439                        row.append(schema["missing"])
440                    else:
441                        row.append(d)
442                writer.writerow(row)
443    except ValueError:
444        path.unlink(missing_ok=True)
445        raise _err.SCSVError(
446            "number of fields declared in schema does not match number of data columns."
447            + f" Declared schema fields were {names}; got {len(data)} data columns"
448        ) from None

Save data to SCSV file.

  • file — path to the file where the data should be written
  • schema — SCSV schema dictionary, with 'delimiter', 'missing' and 'fields' keys
  • data — data arrays (columns) of equal length

Optional keyword arguments are passed to write_scsv_header. See also read_scsv.

def parse_config(path):
451def parse_config(path):
452    """Parse a TOML file containing PyDRex configuration."""
453    path = resolve_path(path)
454    _log.info("parsing configuration file: %s", path)
455    with open(path, "rb") as file:
456        toml = tomllib.load(file)
457
458    # Use provided name or set randomized default.
459    toml["name"] = toml.get(
460        "name", f"pydrex.{np.random.default_rng().integers(1,1e10)}"
461    )
462
463    toml["parameters"] = _parse_config_params(toml)
464    _params = toml["parameters"]
465    toml["input"] = _parse_config_input_common(toml, path)
466    _input = toml["input"]
467
468    if "mesh" in _input:
469        # Input option 1: velocity gradient mesh + final particle locations.
470        _input = _parse_config_input_steadymesh(_input, path)
471    elif "velocity_gradient" in _input:
472        # Input option 2: velocity gradient callable + initial locations.
473        _input = _parse_config_input_calcpaths(_input, path)
474    elif "paths" in _input:
475        # Input option 3: NPZ or SCSV files with pre-computed input pathlines.
476        _input = _parse_config_input_postpaths(_input, path)
477    else:
478        _input["paths"] = None
479
480    # Output fields are optional, default: most data output, least logging output.
481    _output = toml.get("output", {})
482    if "directory" in _output:
483        _output["directory"] = resolve_path(_output["directory"], path.parent)
484    else:
485        _output["directory"] = resolve_path(pathlib.Path.cwd())
486
487    # Raw output means rotation matrices and grain volumes.
488    _parse_output_options(_output, "raw_output", _params["phase_assemblage"])
489    # Diagnostic output means texture diagnostics (strength, symmetry, mean angle).
490    _parse_output_options(_output, "diagnostics", _params["phase_assemblage"])
491    # Anisotropy output means hexagonal symmetry axis and ΔVp (%).
492    _output["anisotropy"] = _output.get(
493        "anisotropy", ["Voigt", "hexaxis", "moduli", "%decomp"]
494    )
495
496    # Optional SCSV or NPZ pathline outputs, not sensible if there are pathline inputs.
497    if "paths" in _input and "paths" in _output:
498        _log.warning(
499            "input pathlines and output pathline filenames are mutually exclusive;"
500            + " ignoring output pathline filenames"
501        )
502        _output["paths"] = None
503    _output["paths"] = _output.get("paths", None)
504
505    # Default logging level for all log files.
506    _output["log_level"] = _output.get("log_level", "WARNING")
507
508    return toml

Parse a TOML file containing PyDRex configuration.

def resolve_path(path, refdir=None):
511def resolve_path(path, refdir=None):
512    """Resolve relative paths and create parent directories if necessary.
513
514    Relative paths are interpreted with respect to the current working directory,
515    i.e. the directory from whith the current Python process was executed,
516    unless a specific reference directory is provided with `refdir`.
517
518    """
519    cwd = pathlib.Path.cwd()
520    if refdir is None:
521        _path = cwd / path
522    else:
523        _path = refdir / path
524    _path.parent.mkdir(parents=True, exist_ok=True)
525    return _path.resolve()

Resolve relative paths and create parent directories if necessary.

Relative paths are interpreted with respect to the current working directory, i.e. the directory from whith the current Python process was executed, unless a specific reference directory is provided with refdir.

def stringify(s):
756def stringify(s):
757    """Return a cleaned version of a string for use in filenames, etc."""
758    return "".join(filter(lambda c: str.isidentifier(c) or str.isdecimal(c), str(s)))

Return a cleaned version of a string for use in filenames, etc.

def data(directory):
761def data(directory):
762    """Get resolved path to a pydrex data directory."""
763    resources = files("pydrex.data")
764    if (resources / directory).is_dir():
765        return resolve_path(resources / directory)
766    else:
767        raise NotADirectoryError(f"{resources / directory} is not a directory")

Get resolved path to a pydrex data directory.

@cl.contextmanager
def logfile_enable(path, level=10, mode='w'):
770@cl.contextmanager
771def logfile_enable(path, level=logging.DEBUG, mode="w"):
772    """Enable logging to a file at `path` with given `level`."""
773    logger_file = logging.FileHandler(resolve_path(path), mode=mode)
774    logger_file.setFormatter(
775        logging.Formatter(
776            "%(levelname)s [%(asctime)s] %(name)s: %(message)s",
777            datefmt="%Y-%m-%d %H:%M:%S",
778        )
779    )
780    logger_file.setLevel(level)
781    _log.LOGGER.addHandler(logger_file)
782    yield
783    logger_file.close()

Enable logging to a file at path with given level.