PyDRex: Mesh, configuration and supporting data Input/Output functions.

PyDRex can read/write three kinds of plain text files:

  • PyDRex configuration files, which specify simulation parameters and initial conditions
  • 'SCSV' files, CSV files with YAML frontmatter for (small) scientific datasets
  • Mesh files via meshio, to set up final mineral positions in steady flows.

SCSV files are our custom CSV files with a YAML header. The header is used for data attribution and metadata, as well as a column type spec. There is no official spec for SCSV files at the moment but they should follow the format of existing SCSV files in the data/ folder of the source repository. For supported cell types, see SCSV_TYPEMAP.

  1"""> PyDRex: Mesh, configuration and supporting data Input/Output functions.
  3PyDRex can read/write three kinds of plain text files:
  4- PyDRex configuration files, which specify simulation parameters and initial conditions
  5- 'SCSV' files, CSV files with YAML frontmatter for (small) scientific datasets
  6- Mesh files via `meshio`, to set up final mineral positions in steady flows.
  8SCSV files are our custom CSV files with a YAML header. The header is used for data
  9attribution and metadata, as well as a column type spec. There is no official spec for
 10SCSV files at the moment but they should follow the format of existing  SCSV files in
 11the `data/` folder of the source repository. For supported cell types, see
 16import collections as c
 17import contextlib as cl
 18import csv
 19import functools as ft
 20import io
 21import itertools as it
 22import logging
 23import os
 24import pathlib
 25import re
 26import sys
 28if sys.version_info >= (3, 11):
 29    import tomllib
 31    import tomli as tomllib
 33from importlib.resources import files
 35import h5py
 36import meshio
 37import numpy as np
 38import yaml
 39from tqdm import tqdm
 41from pydrex import core as _core
 42from pydrex import exceptions as _err
 43from pydrex import logger as _log
 44from pydrex import utils as _utils
 45from pydrex import velocity as _velocity
 48    "string": str,
 49    "integer": int,
 50    "float": float,
 51    "boolean": bool,
 52    "complex": complex,
 54"""Mapping of supported SCSV field types to corresponding Python types."""
 57    "s": "string",
 58    "i": "integer",
 59    "f": "float",
 60    "b": "boolean",
 61    "c": "complex",
 63"""Mapping of supported terse format SCSV field types to their standard names."""
 65_SCSV_DEFAULT_TYPE = "string"
 69def extract_h5part(
 70    file, phase: _core.MineralPhase, fabric: _core.MineralFabric, n_grains: int, output
 72    """Extract CPO data from Fluidity h5part file and save to canonical formats."""
 73    from pydrex.minerals import Mineral
 75    with h5py.File(file, "r") as f:
 76        for particle_id in f["Step#0/id"][:]:
 77            # Fluidity writes empty arrays to the particle data after they are deleted.
 78            # We need only the timesteps before deletion of this particle.
 79            steps = []
 80            for k in sorted(list(f.keys()), key=lambda s: int(s.lstrip("Step#"))):
 81                if f[f"{k}/x"].shape[0] >= particle_id:
 82                    steps.append(k)
 84            # Temporary data arrays.
 85            n_timesteps = len(steps)
 86            x = np.zeros(n_timesteps)
 87            y = np.zeros(n_timesteps)
 88            z = np.zeros(n_timesteps)
 89            orientations = np.empty((n_timesteps, n_grains, 3, 3))
 90            fractions = np.empty((n_timesteps, n_grains))
 92            strains = np.zeros(n_timesteps)
 93            for t, k in enumerate(
 94                tqdm(steps, desc=f"Extracting particle {particle_id}")
 95            ):
 96                # Extract particle position.
 97                x[t] = f[f"{k}/x"][particle_id - 1]
 98                y[t] = f[f"{k}/y"][particle_id - 1]
 99                z[t] = f[f"{k}/z"][particle_id - 1]
101                # Extract CPO data.
102                strains[t] = f[f"{k}/CPO_{n_grains * 10 + 1}"][particle_id - 1]
103                vals = np.empty(n_grains * 10)
104                for n in range(len(vals)):
105                    vals[n] = f[f"{k}/CPO_{n+1}"][particle_id - 1]
107                orientations[t] = np.array(
108                    [
109                        np.reshape(vals[n : n + 9], (3, 3))
110                        for n in range(0, 9 * n_grains, 9)
111                    ]
112                )
113                fractions[t] = vals[9 * n_grains :]
115            _postfix = str(particle_id)
116            _fractions = list(fractions)
117            _orientations = list(orientations)
118            mineral = Mineral(
119                phase=phase,
120                fabric=fabric,
121                n_grains=n_grains,
122                fractions_init=_fractions[0],
123                orientations_init=_orientations[0],
124            )
125            mineral.fractions = _fractions
126            mineral.orientations = _orientations
127  , postfix=_postfix)
128            save_scsv(
129                output[:-4] + f"_{_postfix}" + ".scsv",
130                {
131                    "delimiter": ",",
132                    "missing": "-",
133                    "fields": [
134                        {
135                            "name": "strain",
136                            "type": "float",
137                            "unit": "percent",
138                            "fill": np.nan,
139                        },
140                        {
141                            "name": "x",
142                            "type": "float",
143                            "unit": "m",
144                            "fill": np.nan,
145                        },
146                        {
147                            "name": "y",
148                            "type": "float",
149                            "unit": "m",
150                            "fill": np.nan,
151                        },
152                        {
153                            "name": "z",
154                            "type": "float",
155                            "unit": "m",
156                            "fill": np.nan,
157                        },
158                    ],
159                },
160                [strains * 200, x, y, z],
161            )
164@_utils.defined_if(sys.version_info >= (3, 12))
165def parse_scsv_schema(terse_schema: str) -> dict:
166    """Parse terse scsv schema representation and return the expanded schema.
168    The terse schema is useful for command line tools and can be specified in a single
169    line of text. However, there are some limitations compared to using a Python
170    dictionary, all of which are edge cases and not recommended usage:
171    - the delimiter cannot be the character `d` or the character `m`
172    - the missing data encoding cannot be the character `m`
173    - fill values are not able to contain the colon (`:`) character
174    - the arbitrary unit/comment for any field is not able to contain parentheses
176    The delimiter is specified after the letter `d` and the missing data encoding after
177    `m`. These are succeeded by the column specs which are a sequence of column names
178    (which must be valid Python identifiers) and their (optional) data type, missing
179    data fill value, and unit/comment.
181    .. note:: This function is only defined if the version of your Python interpreter is
182        greater than 3.11.x.
184    >>> #                delimiter
185    >>> #                | missing data encoding    column specifications
186    >>> #                | |  ______________________|______________________________
187    >>> #                v v /                                                     `
188    >>> schemastring = "d,m-:colA(s)colB(s:N/A:...)colC()colD(i:999999)colE(f:NaN:%)"
189    >>> schema = parse_scsv_schema(schemastring)
190    >>> schema["delimiter"]
191    ','
192    >>> schema["missing"]
193    '-'
194    >>> schema["fields"][0]
195    {'name': 'colA', 'type': 'string', 'fill': ''}
196    >>> schema["fields"][1]
197    {'name': 'colB', 'type': 'string', 'fill': 'N/A', 'unit': '...'}
198    >>> schema["fields"][2]
199    {'name': 'colC', 'type': 'string', 'fill': ''}
200    >>> schema["fields"][3]
201    {'name': 'colD', 'type': 'integer', 'fill': '999999'}
202    >>> schema["fields"][4]
203    {'name': 'colE', 'type': 'float', 'fill': 'NaN', 'unit': '%'}
205    """
206    if not terse_schema.startswith("d"):
207        raise _err.SCSVError(
208            "terse schema must start with delimiter specification (format: d<delimiter>)"
209        )
210    i_cols = terse_schema.find(":")
211    if i_cols < 4:
212        raise _err.SCSVError(
213            "could not parse missing data encoding from terse SCSV schema"
214        )
215    i_missing = terse_schema.find("m", 0, i_cols)
216    if i_missing < 2:
217        raise _err.SCSVError(
218            "could not parse missing data encoding from terse SCSV schema"
219        )
221    delimiter = terse_schema[1:i_missing]
222    missing = terse_schema[i_missing + 1 : i_cols]
224    raw_colspecs = re.split(r"\(|\)", terse_schema[i_cols + 1 :])
225    raw_colspecs.pop()  # Get rid of additional last empty string element.
226    if len(raw_colspecs) < 2:
227        raise _err.SCSVError("failed to parse any fields from terse SCSV schema")
228    if len(raw_colspecs) % 2 != 0:
229        raise _err.SCSVError("invalid field specifications in terse SCSV schema")
231    fields = []
232    for name, spec in it.batched(raw_colspecs, 2):
233        _spec = spec.split(":")
234        _type = _SCSV_DEFAULT_TYPE
235        if _spec[0] != "":
236            try:
237                _type = SCSV_TERSEMAP[_spec[0]]
238            except KeyError:
239                raise _err.SCSVError(
240                    f"invalid field type {_spec[0]} in terse SCSV schema"
241                ) from None
242        field = {
243            "name": name,
244            "type": _type,
245            "fill": _spec[1] if len(_spec) > 1 else _SCSV_DEFAULT_FILL,
246        }
247        if len(_spec) == 3:
248            field["unit"] = _spec[2]
249        fields.append(field)
250    return {"delimiter": delimiter, "missing": missing, "fields": fields}
253def read_scsv(file):
254    """Read data from an SCSV file.
256    Prints the YAML header section to output and returns a NamedTuple with columns of
257    the csv data. See also `save_scsv`.
259    """
260    with open(resolve_path(file)) as fileref:
261        yaml_lines = []
262        csv_lines = []
264        is_yaml = False
265        for line in fileref:
266            if line == "\n":  # Empty lines are skipped.
267                continue
268            if line == "---\n":
269                if is_yaml:
270                    is_yaml = False  # Second --- ends YAML section.
271                    continue
272                else:
273                    is_yaml = True  # First --- begins YAML section.
274                    continue
276            if is_yaml:
277                yaml_lines.append(line)
278            else:
279                csv_lines.append(line)
281        metadata = yaml.safe_load(io.StringIO("".join(yaml_lines)))
282        schema = metadata["schema"]
283        if not _validate_scsv_schema(schema):
284            raise _err.SCSVError(
285                f"unable to parse SCSV schema from '{file}'."
286                + " Check logging output for details."
287            )
288        reader = csv.reader(
289            csv_lines, delimiter=schema["delimiter"], skipinitialspace=True
290        )
292        schema_colnames = [d["name"] for d in schema["fields"]]
293        header_colnames = [s.strip() for s in next(reader)]
294        if not schema_colnames == header_colnames:
295            raise _err.SCSVError(
296                f"schema field names must match column headers in '{file}'."
297                + f" You've supplied schema fields\n{schema_colnames}"
298                + f"\n with column headers\n{header_colnames}"
299            )
301"reading SCSV file: %s", resolve_path(file))
302        Columns = c.namedtuple("Columns", schema_colnames)
303        # __dict__() and __slots__() of NamedTuples is empty :(
304        # Set up some pretty printing instead to give a quick view of column names.
305        Columns.__str__ = lambda self: f"Columns: {self._fields}"
306        Columns._repr_pretty_ = lambda self, p, _: p.text(f"Columns: {self._fields}")
307        # Also add some extra attributes to inspect the schema and yaml header.
308        Columns._schema = schema
309        Columns._metadata = (
310            "".join(yaml_lines)
311            .replace("# ", "")
312            .replace("-\n", "")
313            .replace("\n", " ")
314            .rsplit("schema:", maxsplit=1)[0]  # Assumes comments are above the schema.
315        )
316        coltypes = [
317            SCSV_TYPEMAP[d.get("type", _SCSV_DEFAULT_TYPE)] for d in schema["fields"]
318        ]
319        missingstr = schema["missing"]
320        fillvals = [d.get("fill", _SCSV_DEFAULT_FILL) for d in schema["fields"]]
321        return Columns._make(
322            [
323                tuple(
324                    map(
325                        ft.partial(
326                            _parse_scsv_cell, f, missingstr=missingstr, fillval=fill
327                        ),
328                        x,
329                    )
330                )
331                for f, fill, x in zip(
332                    coltypes, fillvals, zip(*list(reader), strict=True), strict=True
333                )
334            ]
335        )
338def write_scsv_header(stream, schema, comments=None):
339    """Write YAML header to an SCSV stream.
341    - `stream` — open output stream (e.g. file handle) where data should be written
342    - `schema` — SCSV schema dictionary, with 'delimiter', 'missing' and 'fields' keys
343    - `comments` (optional) — array of comments to be written above the schema, each on
344      a new line with an '#' prefix
346    See also `read_scsv`, `save_scsv`.
348    """
349    if not _validate_scsv_schema(schema):
350        raise _err.SCSVError(
351            "refusing to write invalid schema to stream."
352            + " Check logging output for details."
353        )
355    stream.write("---" + os.linesep)
356    if comments is not None:
357        for comment in comments:
358            stream.write("# " + comment + os.linesep)
359    stream.write("schema:" + os.linesep)
360    delimiter = schema["delimiter"]
361    missing = schema["missing"]
362    stream.write(f"  delimiter: '{delimiter}'{os.linesep}")
363    stream.write(f"  missing: '{missing}'{os.linesep}")
364    stream.write("  fields:" + os.linesep)
366    for field in schema["fields"]:
367        name = field["name"]
368        kind = field.get("type", _SCSV_DEFAULT_TYPE)
369        stream.write(f"    - name: {name}{os.linesep}")
370        stream.write(f"      type: {kind}{os.linesep}")
371        if "unit" in field:
372            unit = field["unit"]
373            stream.write(f"      unit: {unit}{os.linesep}")
374        if "fill" in field:
375            fill = field["fill"]
376            stream.write(f"      fill: {fill}{os.linesep}")
377    stream.write("---" + os.linesep)
380def save_scsv(file, schema, data, **kwargs):
381    """Save data to SCSV file.
383    - `file` — path to the file where the data should be written
384    - `schema` — SCSV schema dictionary, with 'delimiter', 'missing' and 'fields' keys
385    - `data` — data arrays (columns) of equal length
387    Optional keyword arguments are passed to `write_scsv_header`. See also `read_scsv`.
389    """
390    path = resolve_path(file)
391    n_rows = len(data[0])
392    for col in data[1:]:
393        if len(col) != n_rows:
394            raise _err.SCSVError(
395                "refusing to write data columns of unequal length to SCSV file"
396            )
398"writing to SCSV file: %s", file)
399    try:  # Check that the output is valid by attempting to parse.
400        with open(path, mode="w") as stream:
401            write_scsv_header(stream, schema, **kwargs)
402            fills = [
403                field.get("fill", _SCSV_DEFAULT_FILL) for field in schema["fields"]
404            ]
405            types = [
406                SCSV_TYPEMAP[field.get("type", _SCSV_DEFAULT_TYPE)]
407                for field in schema["fields"]
408            ]
409            names = [field["name"] for field in schema["fields"]]
410            writer = csv.writer(
411                stream, delimiter=schema["delimiter"], lineterminator=os.linesep
412            )
413            writer.writerow(names)
415            # No need for strict=True here since column lengths were already checked.
416            for col in zip(*data):
417                row = []
418                for i, (d, t, f) in enumerate(zip(col, types, fills, strict=True)):
419                    try:
420                        _parse_scsv_cell(
421                            t, str(d), missingstr=schema["missing"], fillval=f
422                        )
423                    except ValueError:
424                        raise _err.SCSVError(
425                            f"invalid data for column '{names[i]}'."
426                            + f" Cannot parse {d} as type '{t.__qualname__}'."
427                        ) from None
428                    if isinstance(t, bool):
429                        row.append(d)
430                    elif t in (float, complex):
431                        if np.isnan(d) and np.isnan(t(f)):
432                            row.append(schema["missing"])
433                        elif d == t(f):
434                            row.append(schema["missing"])
435                        else:
436                            row.append(d)
437                    elif t in (int, str) and d == t(f):
438                        row.append(schema["missing"])
439                    else:
440                        row.append(d)
441                writer.writerow(row)
442    except ValueError:
443        path.unlink(missing_ok=True)
444        raise _err.SCSVError(
445            "number of fields declared in schema does not match number of data columns."
446            + f" Declared schema fields were {names}; got {len(data)} data columns"
447        ) from None
450def parse_config(path):
451    """Parse a TOML file containing PyDRex configuration."""
452    path = resolve_path(path)
453"parsing configuration file: %s", path)
454    with open(path, "rb") as file:
455        toml = tomllib.load(file)
457    # Use provided name or set randomized default.
458    toml["name"] = toml.get(
459        "name", f"pydrex.{np.random.default_rng().integers(1,1e10)}"
460    )
462    toml["parameters"] = _parse_config_params(toml)
463    _params = toml["parameters"]
464    toml["input"] = _parse_config_input_common(toml, path)
465    _input = toml["input"]
467    if "mesh" in _input:
468        # Input option 1: velocity gradient mesh + final particle locations.
469        _input = _parse_config_input_steadymesh(_input, path)
470    elif "velocity_gradient" in _input:
471        # Input option 2: velocity gradient callable + initial locations.
472        _input = _parse_config_input_calcpaths(_input, path)
473    elif "paths" in _input:
474        # Input option 3: NPZ or SCSV files with pre-computed input pathlines.
475        _input = _parse_config_input_postpaths(_input, path)
476    else:
477        _input["paths"] = None
479    # Output fields are optional, default: most data output, least logging output.
480    _output = toml.get("output", {})
481    if "directory" in _output:
482        _output["directory"] = resolve_path(_output["directory"], path.parent)
483    else:
484        _output["directory"] = resolve_path(pathlib.Path.cwd())
486    # Raw output means rotation matrices and grain volumes.
487    _parse_output_options(_output, "raw_output", _params["phase_assemblage"])
488    # Diagnostic output means texture diagnostics (strength, symmetry, mean angle).
489    _parse_output_options(_output, "diagnostics", _params["phase_assemblage"])
490    # Anisotropy output means hexagonal symmetry axis and ΔVp (%).
491    _output["anisotropy"] = _output.get(
492        "anisotropy", ["Voigt", "hexaxis", "moduli", "%decomp"]
493    )
495    # Optional SCSV or NPZ pathline outputs, not sensible if there are pathline inputs.
496    if "paths" in _input and "paths" in _output:
497        _log.warning(
498            "input pathlines and output pathline filenames are mutually exclusive;"
499            + " ignoring output pathline filenames"
500        )
501        _output["paths"] = None
502    _output["paths"] = _output.get("paths", None)
504    # Default logging level for all log files.
505    _output["log_level"] = _output.get("log_level", "WARNING")
507    return toml
510def resolve_path(path, refdir=None):
511    """Resolve relative paths and create parent directories if necessary.
513    Relative paths are interpreted with respect to the current working directory,
514    i.e. the directory from whith the current Python process was executed,
515    unless a specific reference directory is provided with `refdir`.
517    """
518    cwd = pathlib.Path.cwd()
519    if refdir is None:
520        _path = cwd / path
521    else:
522        _path = refdir / path
523    _path.parent.mkdir(parents=True, exist_ok=True)
524    return _path.resolve()
527def _parse_config_params(toml):
528    """Parse DRex and other rheology parameters."""
529    _params = toml.get("parameters", {})
530    for key, default in _core.DefaultParams().as_dict().items():
531        _params[key] = _params.get(key, default)
533    # Make sure volume fractions sum to 1.
534    if np.abs(np.sum(_params["phase_fractions"]) - 1.0) > 1e-16:
535        raise _err.ConfigError(
536            "Volume fractions of mineral phases must sum to 1."
537            + f" You've provided phase_fractions = {_params['phase_fractions']}."
538        )
540    # Make sure all mineral phases are accounted for and valid.
541    if len(_params["phase_assemblage"]) != len(_params["phase_fractions"]):
542        raise _err.ConfigError(
543            "All mineral phases must have an associated volume fraction."
544            + f" You've provided phase_assemblage = {_params['phase_assemblage']} and"
545            + f" phase_fractions = {_params['phase_fractions']}."
546        )
547    try:
548        _params["phase_assemblage"] = tuple(
549            _parse_phase(ϕ) for ϕ in _params["phase_assemblage"]
550        )
551    except AttributeError:
552        raise _err.ConfigError(
553            f"invalid phase assemblage: {_params['phase_assemblage']}"
554        ) from None
556    # Make sure initial olivine fabric is valid.
557    try:
558        _params["initial_olivine_fabric"] = getattr(
559            _core.MineralFabric, "olivine_" + _params["initial_olivine_fabric"]
560        )
561    except AttributeError:
562        raise _err.ConfigError(
563            f"invalid initial olivine fabric: {_params['initial_olivine_fabric']}"
564        ) from None
566    # Make sure we have enough unified dislocation creep law coefficients.
567    n_provided = len(_params["disl_coefficients"])
568    n_required = len(_core.DefaultParams().disl_coefficients)
569    if n_provided != n_required:
570        raise _err.ConfigError(
571            "not enough unified dislocation creep law coefficients."
572            + f"You've provided {n_provided}/{n_required} coefficients."
573        )
574    _params["disl_coefficients"] = tuple(_params["disl_coefficients"])
576    return _params
579def _parse_config_input_common(toml, path):
580    try:
581        _input = toml["input"]
582    except KeyError:
583        raise _err.ConfigError(f"missing [input] section in '{path}'") from None
584    if "timestep" not in _input and "paths" not in _input:
585        raise _err.ConfigError(f"unspecified input timestep in '{path}'")
587    _input["timestep"] = _input.get("timestep", np.nan)
588    if not isinstance(_input["timestep"], float | int):
589        raise _err.ConfigError(
590            f"timestep must be float or int, not {type(input['timestep'])}"
591        )
593    _input["strain_final"] = _input.get("strain_final", np.inf)
594    if not isinstance(_input["strain_final"], float | int):
595        raise _err.ConfigError(
596            f"final strain must be float or int, not {type(input['strain_final'])}"
597        )
599    return _input
602def _parse_config_input_steadymesh(input, path):
603    input["mesh"] =["mesh"], path.parent))
604    input["locations_final"] = read_scsv(
605        resolve_path(input["locations_final"], path.parent)
606    )
607    if "velocity_gradient" in input:
608        _log.warning(
609            "input mesh and velocity gradient callable are mutually exclusive;"
610            + " ignoring velocity gradient callable"
611        )
612    if "locations_initial" in input:
613        _log.warning(
614            "initial particle locations are not used for pathline interpolation"
615            + " and will be ignored"
616        )
617    if "paths" in input:
618        _log.warning(
619            "input mesh and input pathlines are mutually exclusive;"
620            + " ignoring input pathlines"
621        )
622    input["velocity_gradient"] = None
623    input["locations_initial"] = None
624    input["paths"] = None
625    return input
628def _parse_config_input_calcpaths(input, path):
629    _velocity_gradient_func = getattr(_velocity, input["velocity_gradient"][0])
630    input["velocity_gradient"] = _velocity_gradient_func(
631        *input["velocity_gradient"][1:]
632    )
633    input["locations_initial"] = read_scsv(
634        resolve_path(input["locations_initial"], path.parent)
635    )
636    if "locations_final" in input:
637        _log.warning(
638            "final particle locations are not used for forward advection"
639            + " and will be ignored"
640        )
641    if "paths" in input:
642        _log.warning(
643            "velocity gradient callable and input pathlines are mutually exclusive;"
644            + " ignoring input pathlines"
645        )
646    input["locations_final"] = None
647    input["paths"] = None
648    input["mesh"] = None
649    return input
652def _parse_config_input_postpaths(input, path):
653    input["paths"] = [np.load(resolve_path(p, path.parent)) for p in input["paths"]]
654    if "locations_initial" in input:
655        _log.warning(
656            "input pathlines and initial particle locations are mutually exclusive;"
657            + " ignoring initial particle locations"
658        )
659    if "locations_final" in input:
660        _log.warning(
661            "input pathlines and final particle locations are mutually exclusive;"
662            + " ignoring final particle locations"
663        )
664    input["locations_initial"] = None
665    input["locations_final"] = None
666    input["mesh"] = None
667    return input
670def _parse_output_options(output_opts, level, phase_assemblage):
671    try:
672        output_opts[level] = [
673            getattr(_core.MineralPhase, ϕ) for ϕ in output_opts[level]
674        ]
675    except AttributeError:
676        raise _err.ConfigError(
677            f"unsupported mineral phase in '{level}' output option.\n"
678            + f" You supplied the value: {output_opts[level]}.\n"
679            + " Check pydrex.core.MineralPhase for supported phases."
680        ) from None
681    for phase in output_opts[level]:
682        if phase not in phase_assemblage:
683            raise _err.ConfigError(
684                f"cannot output '{level}' for phase that is not being simulated"
685            )
688def _parse_phase(ϕ: str | _core.MineralPhase | int) -> _core.MineralPhase:
689    if isinstance(ϕ, str):
690        try:
691            return getattr(_core.MineralPhase, ϕ)
692        except AttributeError:
693            raise _err.ConfigError(f"invalid phase in phase assemblage: {ϕ}") from None
694    elif isinstance(ϕ, _core.MineralPhase):
695        return ϕ
696    elif isinstance(ϕ, int):
697        try:
698            return _core.MineralPhase(ϕ)
699        except IndexError:
700            raise _err.ConfigError(f"invalid phase in phase assemblage: {ϕ}") from None
701    raise _err.ConfigError(f"invalid phase in phase assemblage: {ϕ}") from None
704def _validate_scsv_schema(schema):
705    format_ok = (
706        "delimiter" in schema
707        and "missing" in schema
708        and "fields" in schema
709        and len(schema["fields"]) > 0
710        and schema["delimiter"] != schema["missing"]
711        and schema["delimiter"] not in schema["missing"]
712    )
713    if not format_ok:
714        _log.error(
715            "invalid format for SCSV schema: %s"
716            + "\nMust contain: 'delimiter', 'missing', 'fields'"
717            + "\nMust contain at least one field."
718            + "\nMust contain compatible 'missing' and 'delimiter' values.",
719            schema,
720        )
721        return False
722    for field in schema["fields"]:
723        if not field["name"].isidentifier():
724            _log.error(
725                "SCSV field name '%s' is not a valid Python identifier", field["name"]
726            )
727            return False
728        if field.get("type", _SCSV_DEFAULT_TYPE) not in SCSV_TYPEMAP.keys():
729            _log.error("unsupported SCSV field type: '%s'", field["type"])
730            return False
731        if (
732            field.get("type", _SCSV_DEFAULT_TYPE) not in (_SCSV_DEFAULT_TYPE, "boolean")
733            and "fill" not in field
734        ):
735            _log.error("SCSV field of type '%s' requires a fill value", field["type"])
736            return False
737    return True
740def _parse_scsv_bool(x):
741    """Parse boolean from string, for SCSV files."""
742    return str(x).lower() in ("yes", "true", "t", "1")
745def _parse_scsv_cell(func, data, missingstr=None, fillval=None):
746    if data.strip() == missingstr:
747        if fillval == "NaN":
748            return func(np.nan)
749        return func(fillval)
750    elif func.__qualname__ == "bool":
751        return _parse_scsv_bool(data)
752    return func(data.strip())
755def stringify(s):
756    """Return a cleaned version of a string for use in filenames, etc."""
757    return "".join(filter(lambda c: str.isidentifier(c) or str.isdecimal(c), str(s)))
760def data(directory):
761    """Get resolved path to a pydrex data directory."""
762    resources = files("")
763    if (resources / directory).is_dir():
764        return resolve_path(resources / directory)
765    else:
766        raise NotADirectoryError(f"{resources / directory} is not a directory")
770def logfile_enable(path, level=logging.DEBUG, mode="w"):
771    """Enable logging to a file at `path` with given `level`."""
772    logger_file = logging.FileHandler(resolve_path(path), mode=mode)
773    logger_file.setFormatter(
774        logging.Formatter(
775            "%(levelname)s [%(asctime)s] %(name)s: %(message)s",
776            datefmt="%Y-%m-%d %H:%M:%S",
777        )
778    )
779    logger_file.setLevel(level)
780    _log.LOGGER.addHandler(logger_file)
781    yield
782    logger_file.close()
