pydrex.io
PyDRex: Mesh, configuration and supporting data Input/Output functions.
PyDRex can read/write three kinds of plain text files:
- PyDRex configuration files, which specify simulation parameters and initial conditions
- 'SCSV' files, CSV files with YAML frontmatter for (small) scientific datasets
- Mesh files via
meshio
, to set up final mineral positions in steady flows.
SCSV files are our custom CSV files with a YAML header. The header is used for data
attribution and metadata, as well as a column type spec. There is no official spec for
SCSV files at the moment but they should follow the format of existing SCSV files in
the data/
folder of the source repository. For supported cell types, see
SCSV_TYPEMAP
.
1"""> PyDRex: Mesh, configuration and supporting data Input/Output functions. 2 3PyDRex can read/write three kinds of plain text files: 4- PyDRex configuration files, which specify simulation parameters and initial conditions 5- 'SCSV' files, CSV files with YAML frontmatter for (small) scientific datasets 6- Mesh files via `meshio`, to set up final mineral positions in steady flows. 7 8SCSV files are our custom CSV files with a YAML header. The header is used for data 9attribution and metadata, as well as a column type spec. There is no official spec for 10SCSV files at the moment but they should follow the format of existing SCSV files in 11the `data/` folder of the source repository. For supported cell types, see 12`SCSV_TYPEMAP`. 13 14""" 15 16import collections as c 17import contextlib as cl 18import csv 19import functools as ft 20import io 21import itertools as it 22import logging 23import os 24import pathlib 25import re 26import sys 27 28if sys.version_info >= (3, 11): 29 import tomllib 30else: 31 import tomli as tomllib 32 33from importlib.resources import files 34 35import h5py 36import meshio 37import numpy as np 38import yaml 39from tqdm import tqdm 40 41from pydrex import core as _core 42from pydrex import exceptions as _err 43from pydrex import logger as _log 44from pydrex import utils as _utils 45from pydrex import velocity as _velocity 46 47SCSV_TYPEMAP = { 48 "string": str, 49 "integer": int, 50 "float": float, 51 "boolean": bool, 52 "complex": complex, 53} 54"""Mapping of supported SCSV field types to corresponding Python types.""" 55 56SCSV_TERSEMAP = { 57 "s": "string", 58 "i": "integer", 59 "f": "float", 60 "b": "boolean", 61 "c": "complex", 62} 63"""Mapping of supported terse format SCSV field types to their standard names.""" 64 65_SCSV_DEFAULT_TYPE = "string" 66_SCSV_DEFAULT_FILL = "" 67 68 69def extract_h5part( 70 file, phase: _core.MineralPhase, fabric: _core.MineralFabric, n_grains: int, output 71): 72 """Extract CPO data from Fluidity h5part file and save to canonical formats.""" 73 from pydrex.minerals import Mineral 74 75 with h5py.File(file, "r") as f: 76 for particle_id in f["Step#0/id"][:]: 77 # Fluidity writes empty arrays to the particle data after they are deleted. 78 # We need only the timesteps before deletion of this particle. 79 steps = [] 80 for k in sorted(list(f.keys()), key=lambda s: int(s.lstrip("Step#"))): 81 if f[f"{k}/x"].shape[0] >= particle_id: 82 steps.append(k) 83 84 # Temporary data arrays. 85 n_timesteps = len(steps) 86 x = np.zeros(n_timesteps) 87 y = np.zeros(n_timesteps) 88 z = np.zeros(n_timesteps) 89 orientations = np.empty((n_timesteps, n_grains, 3, 3)) 90 fractions = np.empty((n_timesteps, n_grains)) 91 92 strains = np.zeros(n_timesteps) 93 for t, k in enumerate( 94 tqdm(steps, desc=f"Extracting particle {particle_id}") 95 ): 96 # Extract particle position. 97 x[t] = f[f"{k}/x"][particle_id - 1] 98 y[t] = f[f"{k}/y"][particle_id - 1] 99 z[t] = f[f"{k}/z"][particle_id - 1] 100 101 # Extract CPO data. 102 strains[t] = f[f"{k}/CPO_{n_grains * 10 + 1}"][particle_id - 1] 103 vals = np.empty(n_grains * 10) 104 for n in range(len(vals)): 105 vals[n] = f[f"{k}/CPO_{n+1}"][particle_id - 1] 106 107 orientations[t] = np.array( 108 [ 109 np.reshape(vals[n : n + 9], (3, 3)) 110 for n in range(0, 9 * n_grains, 9) 111 ] 112 ) 113 fractions[t] = vals[9 * n_grains :] 114 115 _postfix = str(particle_id) 116 _fractions = list(fractions) 117 _orientations = list(orientations) 118 mineral = Mineral( 119 phase=phase, 120 fabric=fabric, 121 n_grains=n_grains, 122 fractions_init=_fractions[0], 123 orientations_init=_orientations[0], 124 ) 125 mineral.fractions = _fractions 126 mineral.orientations = _orientations 127 mineral.save(output, postfix=_postfix) 128 save_scsv( 129 output[:-4] + f"_{_postfix}" + ".scsv", 130 { 131 "delimiter": ",", 132 "missing": "-", 133 "fields": [ 134 { 135 "name": "strain", 136 "type": "float", 137 "unit": "percent", 138 "fill": np.nan, 139 }, 140 { 141 "name": "x", 142 "type": "float", 143 "unit": "m", 144 "fill": np.nan, 145 }, 146 { 147 "name": "y", 148 "type": "float", 149 "unit": "m", 150 "fill": np.nan, 151 }, 152 { 153 "name": "z", 154 "type": "float", 155 "unit": "m", 156 "fill": np.nan, 157 }, 158 ], 159 }, 160 [strains * 200, x, y, z], 161 ) 162 163 164@_utils.defined_if(sys.version_info >= (3, 12)) 165def parse_scsv_schema(terse_schema: str) -> dict: 166 """Parse terse scsv schema representation and return the expanded schema. 167 168 The terse schema is useful for command line tools and can be specified in a single 169 line of text. However, there are some limitations compared to using a Python 170 dictionary, all of which are edge cases and not recommended usage: 171 - the delimiter cannot be the character `d` or the character `m` 172 - the missing data encoding cannot be the character `m` 173 - fill values are not able to contain the colon (`:`) character 174 - the arbitrary unit/comment for any field is not able to contain parentheses 175 176 The delimiter is specified after the letter `d` and the missing data encoding after 177 `m`. These are succeeded by the column specs which are a sequence of column names 178 (which must be valid Python identifiers) and their (optional) data type, missing 179 data fill value, and unit/comment. 180 181 .. note:: This function is only defined if the version of your Python interpreter is 182 greater than 3.11.x. 183 184 >>> # delimiter 185 >>> # | missing data encoding column specifications 186 >>> # | | ______________________|______________________________ 187 >>> # v v / ` 188 >>> schemastring = "d,m-:colA(s)colB(s:N/A:...)colC()colD(i:999999)colE(f:NaN:%)" 189 >>> schema = parse_scsv_schema(schemastring) 190 >>> schema["delimiter"] 191 ',' 192 >>> schema["missing"] 193 '-' 194 >>> schema["fields"][0] 195 {'name': 'colA', 'type': 'string', 'fill': ''} 196 >>> schema["fields"][1] 197 {'name': 'colB', 'type': 'string', 'fill': 'N/A', 'unit': '...'} 198 >>> schema["fields"][2] 199 {'name': 'colC', 'type': 'string', 'fill': ''} 200 >>> schema["fields"][3] 201 {'name': 'colD', 'type': 'integer', 'fill': '999999'} 202 >>> schema["fields"][4] 203 {'name': 'colE', 'type': 'float', 'fill': 'NaN', 'unit': '%'} 204 205 """ 206 if not terse_schema.startswith("d"): 207 raise _err.SCSVError( 208 "terse schema must start with delimiter specification (format: d<delimiter>)" 209 ) 210 i_cols = terse_schema.find(":") 211 if i_cols < 4: 212 raise _err.SCSVError( 213 "could not parse missing data encoding from terse SCSV schema" 214 ) 215 i_missing = terse_schema.find("m", 0, i_cols) 216 if i_missing < 2: 217 raise _err.SCSVError( 218 "could not parse missing data encoding from terse SCSV schema" 219 ) 220 221 delimiter = terse_schema[1:i_missing] 222 missing = terse_schema[i_missing + 1 : i_cols] 223 224 raw_colspecs = re.split(r"\(|\)", terse_schema[i_cols + 1 :]) 225 raw_colspecs.pop() # Get rid of additional last empty string element. 226 if len(raw_colspecs) < 2: 227 raise _err.SCSVError("failed to parse any fields from terse SCSV schema") 228 if len(raw_colspecs) % 2 != 0: 229 raise _err.SCSVError("invalid field specifications in terse SCSV schema") 230 231 fields = [] 232 for name, spec in it.batched(raw_colspecs, 2): 233 _spec = spec.split(":") 234 _type = _SCSV_DEFAULT_TYPE 235 if _spec[0] != "": 236 try: 237 _type = SCSV_TERSEMAP[_spec[0]] 238 except KeyError: 239 raise _err.SCSVError( 240 f"invalid field type {_spec[0]} in terse SCSV schema" 241 ) from None 242 field = { 243 "name": name, 244 "type": _type, 245 "fill": _spec[1] if len(_spec) > 1 else _SCSV_DEFAULT_FILL, 246 } 247 if len(_spec) == 3: 248 field["unit"] = _spec[2] 249 fields.append(field) 250 return {"delimiter": delimiter, "missing": missing, "fields": fields} 251 252 253def read_scsv(file): 254 """Read data from an SCSV file. 255 256 Prints the YAML header section to output and returns a NamedTuple with columns of 257 the csv data. See also `save_scsv`. 258 259 """ 260 with open(resolve_path(file)) as fileref: 261 yaml_lines = [] 262 csv_lines = [] 263 264 is_yaml = False 265 for line in fileref: 266 if line == "\n": # Empty lines are skipped. 267 continue 268 if line == "---\n": 269 if is_yaml: 270 is_yaml = False # Second --- ends YAML section. 271 continue 272 else: 273 is_yaml = True # First --- begins YAML section. 274 continue 275 276 if is_yaml: 277 yaml_lines.append(line) 278 else: 279 csv_lines.append(line) 280 281 metadata = yaml.safe_load(io.StringIO("".join(yaml_lines))) 282 schema = metadata["schema"] 283 if not _validate_scsv_schema(schema): 284 raise _err.SCSVError( 285 f"unable to parse SCSV schema from '{file}'." 286 + " Check logging output for details." 287 ) 288 reader = csv.reader( 289 csv_lines, delimiter=schema["delimiter"], skipinitialspace=True 290 ) 291 292 schema_colnames = [d["name"] for d in schema["fields"]] 293 header_colnames = [s.strip() for s in next(reader)] 294 if not schema_colnames == header_colnames: 295 raise _err.SCSVError( 296 f"schema field names must match column headers in '{file}'." 297 + f" You've supplied schema fields\n{schema_colnames}" 298 + f"\n with column headers\n{header_colnames}" 299 ) 300 301 _log.info("reading SCSV file: %s", resolve_path(file)) 302 Columns = c.namedtuple("Columns", schema_colnames) 303 # __dict__() and __slots__() of NamedTuples is empty :( 304 # Set up some pretty printing instead to give a quick view of column names. 305 Columns.__str__ = lambda self: f"Columns: {self._fields}" 306 Columns._repr_pretty_ = lambda self, p, _: p.text(f"Columns: {self._fields}") 307 # Also add some extra attributes to inspect the schema and yaml header. 308 Columns._schema = schema 309 Columns._metadata = ( 310 "".join(yaml_lines) 311 .replace("# ", "") 312 .replace("-\n", "") 313 .replace("\n", " ") 314 .rsplit("schema:", maxsplit=1)[0] # Assumes comments are above the schema. 315 ) 316 coltypes = [ 317 SCSV_TYPEMAP[d.get("type", _SCSV_DEFAULT_TYPE)] for d in schema["fields"] 318 ] 319 missingstr = schema["missing"] 320 fillvals = [d.get("fill", _SCSV_DEFAULT_FILL) for d in schema["fields"]] 321 return Columns._make( 322 [ 323 tuple( 324 map( 325 ft.partial( 326 _parse_scsv_cell, f, missingstr=missingstr, fillval=fill 327 ), 328 x, 329 ) 330 ) 331 for f, fill, x in zip( 332 coltypes, fillvals, zip(*list(reader), strict=True), strict=True 333 ) 334 ] 335 ) 336 337 338def write_scsv_header(stream, schema, comments=None): 339 """Write YAML header to an SCSV stream. 340 341 - `stream` — open output stream (e.g. file handle) where data should be written 342 - `schema` — SCSV schema dictionary, with 'delimiter', 'missing' and 'fields' keys 343 - `comments` (optional) — array of comments to be written above the schema, each on 344 a new line with an '#' prefix 345 346 See also `read_scsv`, `save_scsv`. 347 348 """ 349 if not _validate_scsv_schema(schema): 350 raise _err.SCSVError( 351 "refusing to write invalid schema to stream." 352 + " Check logging output for details." 353 ) 354 355 stream.write("---" + os.linesep) 356 if comments is not None: 357 for comment in comments: 358 stream.write("# " + comment + os.linesep) 359 stream.write("schema:" + os.linesep) 360 delimiter = schema["delimiter"] 361 missing = schema["missing"] 362 stream.write(f" delimiter: '{delimiter}'{os.linesep}") 363 stream.write(f" missing: '{missing}'{os.linesep}") 364 stream.write(" fields:" + os.linesep) 365 366 for field in schema["fields"]: 367 name = field["name"] 368 kind = field.get("type", _SCSV_DEFAULT_TYPE) 369 stream.write(f" - name: {name}{os.linesep}") 370 stream.write(f" type: {kind}{os.linesep}") 371 if "unit" in field: 372 unit = field["unit"] 373 stream.write(f" unit: {unit}{os.linesep}") 374 if "fill" in field: 375 fill = field["fill"] 376 stream.write(f" fill: {fill}{os.linesep}") 377 stream.write("---" + os.linesep) 378 379 380def save_scsv(file, schema, data, **kwargs): 381 """Save data to SCSV file. 382 383 - `file` — path to the file where the data should be written 384 - `schema` — SCSV schema dictionary, with 'delimiter', 'missing' and 'fields' keys 385 - `data` — data arrays (columns) of equal length 386 387 Optional keyword arguments are passed to `write_scsv_header`. See also `read_scsv`. 388 389 """ 390 path = resolve_path(file) 391 n_rows = len(data[0]) 392 for col in data[1:]: 393 if len(col) != n_rows: 394 raise _err.SCSVError( 395 "refusing to write data columns of unequal length to SCSV file" 396 ) 397 398 _log.info("writing to SCSV file: %s", file) 399 try: # Check that the output is valid by attempting to parse. 400 with open(path, mode="w") as stream: 401 write_scsv_header(stream, schema, **kwargs) 402 fills = [ 403 field.get("fill", _SCSV_DEFAULT_FILL) for field in schema["fields"] 404 ] 405 types = [ 406 SCSV_TYPEMAP[field.get("type", _SCSV_DEFAULT_TYPE)] 407 for field in schema["fields"] 408 ] 409 names = [field["name"] for field in schema["fields"]] 410 writer = csv.writer( 411 stream, delimiter=schema["delimiter"], lineterminator=os.linesep 412 ) 413 writer.writerow(names) 414 415 # No need for strict=True here since column lengths were already checked. 416 for col in zip(*data): 417 row = [] 418 for i, (d, t, f) in enumerate(zip(col, types, fills, strict=True)): 419 try: 420 _parse_scsv_cell( 421 t, str(d), missingstr=schema["missing"], fillval=f 422 ) 423 except ValueError: 424 raise _err.SCSVError( 425 f"invalid data for column '{names[i]}'." 426 + f" Cannot parse {d} as type '{t.__qualname__}'." 427 ) from None 428 if isinstance(t, bool): 429 row.append(d) 430 elif t in (float, complex): 431 if np.isnan(d) and np.isnan(t(f)): 432 row.append(schema["missing"]) 433 elif d == t(f): 434 row.append(schema["missing"]) 435 else: 436 row.append(d) 437 elif t in (int, str) and d == t(f): 438 row.append(schema["missing"]) 439 else: 440 row.append(d) 441 writer.writerow(row) 442 except ValueError: 443 path.unlink(missing_ok=True) 444 raise _err.SCSVError( 445 "number of fields declared in schema does not match number of data columns." 446 + f" Declared schema fields were {names}; got {len(data)} data columns" 447 ) from None 448 449 450def parse_config(path): 451 """Parse a TOML file containing PyDRex configuration.""" 452 path = resolve_path(path) 453 _log.info("parsing configuration file: %s", path) 454 with open(path, "rb") as file: 455 toml = tomllib.load(file) 456 457 # Use provided name or set randomized default. 458 toml["name"] = toml.get( 459 "name", f"pydrex.{np.random.default_rng().integers(1,1e10)}" 460 ) 461 462 toml["parameters"] = _parse_config_params(toml) 463 _params = toml["parameters"] 464 toml["input"] = _parse_config_input_common(toml, path) 465 _input = toml["input"] 466 467 if "mesh" in _input: 468 # Input option 1: velocity gradient mesh + final particle locations. 469 _input = _parse_config_input_steadymesh(_input, path) 470 elif "velocity_gradient" in _input: 471 # Input option 2: velocity gradient callable + initial locations. 472 _input = _parse_config_input_calcpaths(_input, path) 473 elif "paths" in _input: 474 # Input option 3: NPZ or SCSV files with pre-computed input pathlines. 475 _input = _parse_config_input_postpaths(_input, path) 476 else: 477 _input["paths"] = None 478 479 # Output fields are optional, default: most data output, least logging output. 480 _output = toml.get("output", {}) 481 if "directory" in _output: 482 _output["directory"] = resolve_path(_output["directory"], path.parent) 483 else: 484 _output["directory"] = resolve_path(pathlib.Path.cwd()) 485 486 # Raw output means rotation matrices and grain volumes. 487 _parse_output_options(_output, "raw_output", _params["phase_assemblage"]) 488 # Diagnostic output means texture diagnostics (strength, symmetry, mean angle). 489 _parse_output_options(_output, "diagnostics", _params["phase_assemblage"]) 490 # Anisotropy output means hexagonal symmetry axis and ΔVp (%). 491 _output["anisotropy"] = _output.get( 492 "anisotropy", ["Voigt", "hexaxis", "moduli", "%decomp"] 493 ) 494 495 # Optional SCSV or NPZ pathline outputs, not sensible if there are pathline inputs. 496 if "paths" in _input and "paths" in _output: 497 _log.warning( 498 "input pathlines and output pathline filenames are mutually exclusive;" 499 + " ignoring output pathline filenames" 500 ) 501 _output["paths"] = None 502 _output["paths"] = _output.get("paths", None) 503 504 # Default logging level for all log files. 505 _output["log_level"] = _output.get("log_level", "WARNING") 506 507 return toml 508 509 510def resolve_path(path, refdir=None): 511 """Resolve relative paths and create parent directories if necessary. 512 513 Relative paths are interpreted with respect to the current working directory, 514 i.e. the directory from whith the current Python process was executed, 515 unless a specific reference directory is provided with `refdir`. 516 517 """ 518 cwd = pathlib.Path.cwd() 519 if refdir is None: 520 _path = cwd / path 521 else: 522 _path = refdir / path 523 _path.parent.mkdir(parents=True, exist_ok=True) 524 return _path.resolve() 525 526 527def _parse_config_params(toml): 528 """Parse DRex and other rheology parameters.""" 529 _params = toml.get("parameters", {}) 530 for key, default in _core.DefaultParams().as_dict().items(): 531 _params[key] = _params.get(key, default) 532 533 # Make sure volume fractions sum to 1. 534 if np.abs(np.sum(_params["phase_fractions"]) - 1.0) > 1e-16: 535 raise _err.ConfigError( 536 "Volume fractions of mineral phases must sum to 1." 537 + f" You've provided phase_fractions = {_params['phase_fractions']}." 538 ) 539 540 # Make sure all mineral phases are accounted for and valid. 541 if len(_params["phase_assemblage"]) != len(_params["phase_fractions"]): 542 raise _err.ConfigError( 543 "All mineral phases must have an associated volume fraction." 544 + f" You've provided phase_assemblage = {_params['phase_assemblage']} and" 545 + f" phase_fractions = {_params['phase_fractions']}." 546 ) 547 try: 548 _params["phase_assemblage"] = tuple( 549 _parse_phase(ϕ) for ϕ in _params["phase_assemblage"] 550 ) 551 except AttributeError: 552 raise _err.ConfigError( 553 f"invalid phase assemblage: {_params['phase_assemblage']}" 554 ) from None 555 556 # Make sure initial olivine fabric is valid. 557 try: 558 _params["initial_olivine_fabric"] = getattr( 559 _core.MineralFabric, "olivine_" + _params["initial_olivine_fabric"] 560 ) 561 except AttributeError: 562 raise _err.ConfigError( 563 f"invalid initial olivine fabric: {_params['initial_olivine_fabric']}" 564 ) from None 565 566 # Make sure we have enough unified dislocation creep law coefficients. 567 n_provided = len(_params["disl_coefficients"]) 568 n_required = len(_core.DefaultParams().disl_coefficients) 569 if n_provided != n_required: 570 raise _err.ConfigError( 571 "not enough unified dislocation creep law coefficients." 572 + f"You've provided {n_provided}/{n_required} coefficients." 573 ) 574 _params["disl_coefficients"] = tuple(_params["disl_coefficients"]) 575 576 return _params 577 578 579def _parse_config_input_common(toml, path): 580 try: 581 _input = toml["input"] 582 except KeyError: 583 raise _err.ConfigError(f"missing [input] section in '{path}'") from None 584 if "timestep" not in _input and "paths" not in _input: 585 raise _err.ConfigError(f"unspecified input timestep in '{path}'") 586 587 _input["timestep"] = _input.get("timestep", np.nan) 588 if not isinstance(_input["timestep"], float | int): 589 raise _err.ConfigError( 590 f"timestep must be float or int, not {type(input['timestep'])}" 591 ) 592 593 _input["strain_final"] = _input.get("strain_final", np.inf) 594 if not isinstance(_input["strain_final"], float | int): 595 raise _err.ConfigError( 596 f"final strain must be float or int, not {type(input['strain_final'])}" 597 ) 598 599 return _input 600 601 602def _parse_config_input_steadymesh(input, path): 603 input["mesh"] = meshio.read(resolve_path(input["mesh"], path.parent)) 604 input["locations_final"] = read_scsv( 605 resolve_path(input["locations_final"], path.parent) 606 ) 607 if "velocity_gradient" in input: 608 _log.warning( 609 "input mesh and velocity gradient callable are mutually exclusive;" 610 + " ignoring velocity gradient callable" 611 ) 612 if "locations_initial" in input: 613 _log.warning( 614 "initial particle locations are not used for pathline interpolation" 615 + " and will be ignored" 616 ) 617 if "paths" in input: 618 _log.warning( 619 "input mesh and input pathlines are mutually exclusive;" 620 + " ignoring input pathlines" 621 ) 622 input["velocity_gradient"] = None 623 input["locations_initial"] = None 624 input["paths"] = None 625 return input 626 627 628def _parse_config_input_calcpaths(input, path): 629 _velocity_gradient_func = getattr(_velocity, input["velocity_gradient"][0]) 630 input["velocity_gradient"] = _velocity_gradient_func( 631 *input["velocity_gradient"][1:] 632 ) 633 input["locations_initial"] = read_scsv( 634 resolve_path(input["locations_initial"], path.parent) 635 ) 636 if "locations_final" in input: 637 _log.warning( 638 "final particle locations are not used for forward advection" 639 + " and will be ignored" 640 ) 641 if "paths" in input: 642 _log.warning( 643 "velocity gradient callable and input pathlines are mutually exclusive;" 644 + " ignoring input pathlines" 645 ) 646 input["locations_final"] = None 647 input["paths"] = None 648 input["mesh"] = None 649 return input 650 651 652def _parse_config_input_postpaths(input, path): 653 input["paths"] = [np.load(resolve_path(p, path.parent)) for p in input["paths"]] 654 if "locations_initial" in input: 655 _log.warning( 656 "input pathlines and initial particle locations are mutually exclusive;" 657 + " ignoring initial particle locations" 658 ) 659 if "locations_final" in input: 660 _log.warning( 661 "input pathlines and final particle locations are mutually exclusive;" 662 + " ignoring final particle locations" 663 ) 664 input["locations_initial"] = None 665 input["locations_final"] = None 666 input["mesh"] = None 667 return input 668 669 670def _parse_output_options(output_opts, level, phase_assemblage): 671 try: 672 output_opts[level] = [ 673 getattr(_core.MineralPhase, ϕ) for ϕ in output_opts[level] 674 ] 675 except AttributeError: 676 raise _err.ConfigError( 677 f"unsupported mineral phase in '{level}' output option.\n" 678 + f" You supplied the value: {output_opts[level]}.\n" 679 + " Check pydrex.core.MineralPhase for supported phases." 680 ) from None 681 for phase in output_opts[level]: 682 if phase not in phase_assemblage: 683 raise _err.ConfigError( 684 f"cannot output '{level}' for phase that is not being simulated" 685 ) 686 687 688def _parse_phase(ϕ: str | _core.MineralPhase | int) -> _core.MineralPhase: 689 if isinstance(ϕ, str): 690 try: 691 return getattr(_core.MineralPhase, ϕ) 692 except AttributeError: 693 raise _err.ConfigError(f"invalid phase in phase assemblage: {ϕ}") from None 694 elif isinstance(ϕ, _core.MineralPhase): 695 return ϕ 696 elif isinstance(ϕ, int): 697 try: 698 return _core.MineralPhase(ϕ) 699 except IndexError: 700 raise _err.ConfigError(f"invalid phase in phase assemblage: {ϕ}") from None 701 raise _err.ConfigError(f"invalid phase in phase assemblage: {ϕ}") from None 702 703 704def _validate_scsv_schema(schema): 705 format_ok = ( 706 "delimiter" in schema 707 and "missing" in schema 708 and "fields" in schema 709 and len(schema["fields"]) > 0 710 and schema["delimiter"] != schema["missing"] 711 and schema["delimiter"] not in schema["missing"] 712 ) 713 if not format_ok: 714 _log.error( 715 "invalid format for SCSV schema: %s" 716 + "\nMust contain: 'delimiter', 'missing', 'fields'" 717 + "\nMust contain at least one field." 718 + "\nMust contain compatible 'missing' and 'delimiter' values.", 719 schema, 720 ) 721 return False 722 for field in schema["fields"]: 723 if not field["name"].isidentifier(): 724 _log.error( 725 "SCSV field name '%s' is not a valid Python identifier", field["name"] 726 ) 727 return False 728 if field.get("type", _SCSV_DEFAULT_TYPE) not in SCSV_TYPEMAP.keys(): 729 _log.error("unsupported SCSV field type: '%s'", field["type"]) 730 return False 731 if ( 732 field.get("type", _SCSV_DEFAULT_TYPE) not in (_SCSV_DEFAULT_TYPE, "boolean") 733 and "fill" not in field 734 ): 735 _log.error("SCSV field of type '%s' requires a fill value", field["type"]) 736 return False 737 return True 738 739 740def _parse_scsv_bool(x): 741 """Parse boolean from string, for SCSV files.""" 742 return str(x).lower() in ("yes", "true", "t", "1") 743 744 745def _parse_scsv_cell(func, data, missingstr=None, fillval=None): 746 if data.strip() == missingstr: 747 if fillval == "NaN": 748 return func(np.nan) 749 return func(fillval) 750 elif func.__qualname__ == "bool": 751 return _parse_scsv_bool(data) 752 return func(data.strip()) 753 754 755def stringify(s): 756 """Return a cleaned version of a string for use in filenames, etc.""" 757 return "".join(filter(lambda c: str.isidentifier(c) or str.isdecimal(c), str(s))) 758 759 760def data(directory): 761 """Get resolved path to a pydrex data directory.""" 762 resources = files("pydrex.data") 763 if (resources / directory).is_dir(): 764 return resolve_path(resources / directory) 765 else: 766 raise NotADirectoryError(f"{resources / directory} is not a directory") 767 768 769@cl.contextmanager 770def logfile_enable(path, level=logging.DEBUG, mode="w"): 771 """Enable logging to a file at `path` with given `level`.""" 772 logger_file = logging.FileHandler(resolve_path(path), mode=mode) 773 logger_file.setFormatter( 774 logging.Formatter( 775 "%(levelname)s [%(asctime)s] %(name)s: %(message)s", 776 datefmt="%Y-%m-%d %H:%M:%S", 777 ) 778 ) 779 logger_file.setLevel(level) 780 _log.LOGGER.addHandler(logger_file) 781 yield 782 logger_file.close()
Mapping of supported SCSV field types to corresponding Python types.
Mapping of supported terse format SCSV field types to their standard names.
70def extract_h5part( 71 file, phase: _core.MineralPhase, fabric: _core.MineralFabric, n_grains: int, output 72): 73 """Extract CPO data from Fluidity h5part file and save to canonical formats.""" 74 from pydrex.minerals import Mineral 75 76 with h5py.File(file, "r") as f: 77 for particle_id in f["Step#0/id"][:]: 78 # Fluidity writes empty arrays to the particle data after they are deleted. 79 # We need only the timesteps before deletion of this particle. 80 steps = [] 81 for k in sorted(list(f.keys()), key=lambda s: int(s.lstrip("Step#"))): 82 if f[f"{k}/x"].shape[0] >= particle_id: 83 steps.append(k) 84 85 # Temporary data arrays. 86 n_timesteps = len(steps) 87 x = np.zeros(n_timesteps) 88 y = np.zeros(n_timesteps) 89 z = np.zeros(n_timesteps) 90 orientations = np.empty((n_timesteps, n_grains, 3, 3)) 91 fractions = np.empty((n_timesteps, n_grains)) 92 93 strains = np.zeros(n_timesteps) 94 for t, k in enumerate( 95 tqdm(steps, desc=f"Extracting particle {particle_id}") 96 ): 97 # Extract particle position. 98 x[t] = f[f"{k}/x"][particle_id - 1] 99 y[t] = f[f"{k}/y"][particle_id - 1] 100 z[t] = f[f"{k}/z"][particle_id - 1] 101 102 # Extract CPO data. 103 strains[t] = f[f"{k}/CPO_{n_grains * 10 + 1}"][particle_id - 1] 104 vals = np.empty(n_grains * 10) 105 for n in range(len(vals)): 106 vals[n] = f[f"{k}/CPO_{n+1}"][particle_id - 1] 107 108 orientations[t] = np.array( 109 [ 110 np.reshape(vals[n : n + 9], (3, 3)) 111 for n in range(0, 9 * n_grains, 9) 112 ] 113 ) 114 fractions[t] = vals[9 * n_grains :] 115 116 _postfix = str(particle_id) 117 _fractions = list(fractions) 118 _orientations = list(orientations) 119 mineral = Mineral( 120 phase=phase, 121 fabric=fabric, 122 n_grains=n_grains, 123 fractions_init=_fractions[0], 124 orientations_init=_orientations[0], 125 ) 126 mineral.fractions = _fractions 127 mineral.orientations = _orientations 128 mineral.save(output, postfix=_postfix) 129 save_scsv( 130 output[:-4] + f"_{_postfix}" + ".scsv", 131 { 132 "delimiter": ",", 133 "missing": "-", 134 "fields": [ 135 { 136 "name": "strain", 137 "type": "float", 138 "unit": "percent", 139 "fill": np.nan, 140 }, 141 { 142 "name": "x", 143 "type": "float", 144 "unit": "m", 145 "fill": np.nan, 146 }, 147 { 148 "name": "y", 149 "type": "float", 150 "unit": "m", 151 "fill": np.nan, 152 }, 153 { 154 "name": "z", 155 "type": "float", 156 "unit": "m", 157 "fill": np.nan, 158 }, 159 ], 160 }, 161 [strains * 200, x, y, z], 162 )
Extract CPO data from Fluidity h5part file and save to canonical formats.
165@_utils.defined_if(sys.version_info >= (3, 12)) 166def parse_scsv_schema(terse_schema: str) -> dict: 167 """Parse terse scsv schema representation and return the expanded schema. 168 169 The terse schema is useful for command line tools and can be specified in a single 170 line of text. However, there are some limitations compared to using a Python 171 dictionary, all of which are edge cases and not recommended usage: 172 - the delimiter cannot be the character `d` or the character `m` 173 - the missing data encoding cannot be the character `m` 174 - fill values are not able to contain the colon (`:`) character 175 - the arbitrary unit/comment for any field is not able to contain parentheses 176 177 The delimiter is specified after the letter `d` and the missing data encoding after 178 `m`. These are succeeded by the column specs which are a sequence of column names 179 (which must be valid Python identifiers) and their (optional) data type, missing 180 data fill value, and unit/comment. 181 182 .. note:: This function is only defined if the version of your Python interpreter is 183 greater than 3.11.x. 184 185 >>> # delimiter 186 >>> # | missing data encoding column specifications 187 >>> # | | ______________________|______________________________ 188 >>> # v v / ` 189 >>> schemastring = "d,m-:colA(s)colB(s:N/A:...)colC()colD(i:999999)colE(f:NaN:%)" 190 >>> schema = parse_scsv_schema(schemastring) 191 >>> schema["delimiter"] 192 ',' 193 >>> schema["missing"] 194 '-' 195 >>> schema["fields"][0] 196 {'name': 'colA', 'type': 'string', 'fill': ''} 197 >>> schema["fields"][1] 198 {'name': 'colB', 'type': 'string', 'fill': 'N/A', 'unit': '...'} 199 >>> schema["fields"][2] 200 {'name': 'colC', 'type': 'string', 'fill': ''} 201 >>> schema["fields"][3] 202 {'name': 'colD', 'type': 'integer', 'fill': '999999'} 203 >>> schema["fields"][4] 204 {'name': 'colE', 'type': 'float', 'fill': 'NaN', 'unit': '%'} 205 206 """ 207 if not terse_schema.startswith("d"): 208 raise _err.SCSVError( 209 "terse schema must start with delimiter specification (format: d<delimiter>)" 210 ) 211 i_cols = terse_schema.find(":") 212 if i_cols < 4: 213 raise _err.SCSVError( 214 "could not parse missing data encoding from terse SCSV schema" 215 ) 216 i_missing = terse_schema.find("m", 0, i_cols) 217 if i_missing < 2: 218 raise _err.SCSVError( 219 "could not parse missing data encoding from terse SCSV schema" 220 ) 221 222 delimiter = terse_schema[1:i_missing] 223 missing = terse_schema[i_missing + 1 : i_cols] 224 225 raw_colspecs = re.split(r"\(|\)", terse_schema[i_cols + 1 :]) 226 raw_colspecs.pop() # Get rid of additional last empty string element. 227 if len(raw_colspecs) < 2: 228 raise _err.SCSVError("failed to parse any fields from terse SCSV schema") 229 if len(raw_colspecs) % 2 != 0: 230 raise _err.SCSVError("invalid field specifications in terse SCSV schema") 231 232 fields = [] 233 for name, spec in it.batched(raw_colspecs, 2): 234 _spec = spec.split(":") 235 _type = _SCSV_DEFAULT_TYPE 236 if _spec[0] != "": 237 try: 238 _type = SCSV_TERSEMAP[_spec[0]] 239 except KeyError: 240 raise _err.SCSVError( 241 f"invalid field type {_spec[0]} in terse SCSV schema" 242 ) from None 243 field = { 244 "name": name, 245 "type": _type, 246 "fill": _spec[1] if len(_spec) > 1 else _SCSV_DEFAULT_FILL, 247 } 248 if len(_spec) == 3: 249 field["unit"] = _spec[2] 250 fields.append(field) 251 return {"delimiter": delimiter, "missing": missing, "fields": fields}
Parse terse scsv schema representation and return the expanded schema.
The terse schema is useful for command line tools and can be specified in a single line of text. However, there are some limitations compared to using a Python dictionary, all of which are edge cases and not recommended usage:
- the delimiter cannot be the character
d
or the characterm
- the missing data encoding cannot be the character
m
- fill values are not able to contain the colon (
:
) character - the arbitrary unit/comment for any field is not able to contain parentheses
The delimiter is specified after the letter d
and the missing data encoding after
m
. These are succeeded by the column specs which are a sequence of column names
(which must be valid Python identifiers) and their (optional) data type, missing
data fill value, and unit/comment.
This function is only defined if the version of your Python interpreter is
greater than 3.11.x.
>>> # delimiter
>>> # | missing data encoding column specifications
>>> # | | ______________________|______________________________
>>> # v v / `
>>> schemastring = "d,m-:colA(s)colB(s:N/A:...)colC()colD(i:999999)colE(f:NaN:%)"
>>> schema = parse_scsv_schema(schemastring)
>>> schema["delimiter"]
','
>>> schema["missing"]
'-'
>>> schema["fields"][0]
{'name': 'colA', 'type': 'string', 'fill': ''}
>>> schema["fields"][1]
{'name': 'colB', 'type': 'string', 'fill': 'N/A', 'unit': '...'}
>>> schema["fields"][2]
{'name': 'colC', 'type': 'string', 'fill': ''}
>>> schema["fields"][3]
{'name': 'colD', 'type': 'integer', 'fill': '999999'}
>>> schema["fields"][4]
{'name': 'colE', 'type': 'float', 'fill': 'NaN', 'unit': '%'}
254def read_scsv(file): 255 """Read data from an SCSV file. 256 257 Prints the YAML header section to output and returns a NamedTuple with columns of 258 the csv data. See also `save_scsv`. 259 260 """ 261 with open(resolve_path(file)) as fileref: 262 yaml_lines = [] 263 csv_lines = [] 264 265 is_yaml = False 266 for line in fileref: 267 if line == "\n": # Empty lines are skipped. 268 continue 269 if line == "---\n": 270 if is_yaml: 271 is_yaml = False # Second --- ends YAML section. 272 continue 273 else: 274 is_yaml = True # First --- begins YAML section. 275 continue 276 277 if is_yaml: 278 yaml_lines.append(line) 279 else: 280 csv_lines.append(line) 281 282 metadata = yaml.safe_load(io.StringIO("".join(yaml_lines))) 283 schema = metadata["schema"] 284 if not _validate_scsv_schema(schema): 285 raise _err.SCSVError( 286 f"unable to parse SCSV schema from '{file}'." 287 + " Check logging output for details." 288 ) 289 reader = csv.reader( 290 csv_lines, delimiter=schema["delimiter"], skipinitialspace=True 291 ) 292 293 schema_colnames = [d["name"] for d in schema["fields"]] 294 header_colnames = [s.strip() for s in next(reader)] 295 if not schema_colnames == header_colnames: 296 raise _err.SCSVError( 297 f"schema field names must match column headers in '{file}'." 298 + f" You've supplied schema fields\n{schema_colnames}" 299 + f"\n with column headers\n{header_colnames}" 300 ) 301 302 _log.info("reading SCSV file: %s", resolve_path(file)) 303 Columns = c.namedtuple("Columns", schema_colnames) 304 # __dict__() and __slots__() of NamedTuples is empty :( 305 # Set up some pretty printing instead to give a quick view of column names. 306 Columns.__str__ = lambda self: f"Columns: {self._fields}" 307 Columns._repr_pretty_ = lambda self, p, _: p.text(f"Columns: {self._fields}") 308 # Also add some extra attributes to inspect the schema and yaml header. 309 Columns._schema = schema 310 Columns._metadata = ( 311 "".join(yaml_lines) 312 .replace("# ", "") 313 .replace("-\n", "") 314 .replace("\n", " ") 315 .rsplit("schema:", maxsplit=1)[0] # Assumes comments are above the schema. 316 ) 317 coltypes = [ 318 SCSV_TYPEMAP[d.get("type", _SCSV_DEFAULT_TYPE)] for d in schema["fields"] 319 ] 320 missingstr = schema["missing"] 321 fillvals = [d.get("fill", _SCSV_DEFAULT_FILL) for d in schema["fields"]] 322 return Columns._make( 323 [ 324 tuple( 325 map( 326 ft.partial( 327 _parse_scsv_cell, f, missingstr=missingstr, fillval=fill 328 ), 329 x, 330 ) 331 ) 332 for f, fill, x in zip( 333 coltypes, fillvals, zip(*list(reader), strict=True), strict=True 334 ) 335 ] 336 )
Read data from an SCSV file.
Prints the YAML header section to output and returns a NamedTuple with columns of
the csv data. See also save_scsv
.
339def write_scsv_header(stream, schema, comments=None): 340 """Write YAML header to an SCSV stream. 341 342 - `stream` — open output stream (e.g. file handle) where data should be written 343 - `schema` — SCSV schema dictionary, with 'delimiter', 'missing' and 'fields' keys 344 - `comments` (optional) — array of comments to be written above the schema, each on 345 a new line with an '#' prefix 346 347 See also `read_scsv`, `save_scsv`. 348 349 """ 350 if not _validate_scsv_schema(schema): 351 raise _err.SCSVError( 352 "refusing to write invalid schema to stream." 353 + " Check logging output for details." 354 ) 355 356 stream.write("---" + os.linesep) 357 if comments is not None: 358 for comment in comments: 359 stream.write("# " + comment + os.linesep) 360 stream.write("schema:" + os.linesep) 361 delimiter = schema["delimiter"] 362 missing = schema["missing"] 363 stream.write(f" delimiter: '{delimiter}'{os.linesep}") 364 stream.write(f" missing: '{missing}'{os.linesep}") 365 stream.write(" fields:" + os.linesep) 366 367 for field in schema["fields"]: 368 name = field["name"] 369 kind = field.get("type", _SCSV_DEFAULT_TYPE) 370 stream.write(f" - name: {name}{os.linesep}") 371 stream.write(f" type: {kind}{os.linesep}") 372 if "unit" in field: 373 unit = field["unit"] 374 stream.write(f" unit: {unit}{os.linesep}") 375 if "fill" in field: 376 fill = field["fill"] 377 stream.write(f" fill: {fill}{os.linesep}") 378 stream.write("---" + os.linesep)
Write YAML header to an SCSV stream.
stream
— open output stream (e.g. file handle) where data should be writtenschema
— SCSV schema dictionary, with 'delimiter', 'missing' and 'fields' keyscomments
(optional) — array of comments to be written above the schema, each on a new line with an '#' prefix
381def save_scsv(file, schema, data, **kwargs): 382 """Save data to SCSV file. 383 384 - `file` — path to the file where the data should be written 385 - `schema` — SCSV schema dictionary, with 'delimiter', 'missing' and 'fields' keys 386 - `data` — data arrays (columns) of equal length 387 388 Optional keyword arguments are passed to `write_scsv_header`. See also `read_scsv`. 389 390 """ 391 path = resolve_path(file) 392 n_rows = len(data[0]) 393 for col in data[1:]: 394 if len(col) != n_rows: 395 raise _err.SCSVError( 396 "refusing to write data columns of unequal length to SCSV file" 397 ) 398 399 _log.info("writing to SCSV file: %s", file) 400 try: # Check that the output is valid by attempting to parse. 401 with open(path, mode="w") as stream: 402 write_scsv_header(stream, schema, **kwargs) 403 fills = [ 404 field.get("fill", _SCSV_DEFAULT_FILL) for field in schema["fields"] 405 ] 406 types = [ 407 SCSV_TYPEMAP[field.get("type", _SCSV_DEFAULT_TYPE)] 408 for field in schema["fields"] 409 ] 410 names = [field["name"] for field in schema["fields"]] 411 writer = csv.writer( 412 stream, delimiter=schema["delimiter"], lineterminator=os.linesep 413 ) 414 writer.writerow(names) 415 416 # No need for strict=True here since column lengths were already checked. 417 for col in zip(*data): 418 row = [] 419 for i, (d, t, f) in enumerate(zip(col, types, fills, strict=True)): 420 try: 421 _parse_scsv_cell( 422 t, str(d), missingstr=schema["missing"], fillval=f 423 ) 424 except ValueError: 425 raise _err.SCSVError( 426 f"invalid data for column '{names[i]}'." 427 + f" Cannot parse {d} as type '{t.__qualname__}'." 428 ) from None 429 if isinstance(t, bool): 430 row.append(d) 431 elif t in (float, complex): 432 if np.isnan(d) and np.isnan(t(f)): 433 row.append(schema["missing"]) 434 elif d == t(f): 435 row.append(schema["missing"]) 436 else: 437 row.append(d) 438 elif t in (int, str) and d == t(f): 439 row.append(schema["missing"]) 440 else: 441 row.append(d) 442 writer.writerow(row) 443 except ValueError: 444 path.unlink(missing_ok=True) 445 raise _err.SCSVError( 446 "number of fields declared in schema does not match number of data columns." 447 + f" Declared schema fields were {names}; got {len(data)} data columns" 448 ) from None
Save data to SCSV file.
file
— path to the file where the data should be writtenschema
— SCSV schema dictionary, with 'delimiter', 'missing' and 'fields' keysdata
— data arrays (columns) of equal length
Optional keyword arguments are passed to write_scsv_header
. See also read_scsv
.
451def parse_config(path): 452 """Parse a TOML file containing PyDRex configuration.""" 453 path = resolve_path(path) 454 _log.info("parsing configuration file: %s", path) 455 with open(path, "rb") as file: 456 toml = tomllib.load(file) 457 458 # Use provided name or set randomized default. 459 toml["name"] = toml.get( 460 "name", f"pydrex.{np.random.default_rng().integers(1,1e10)}" 461 ) 462 463 toml["parameters"] = _parse_config_params(toml) 464 _params = toml["parameters"] 465 toml["input"] = _parse_config_input_common(toml, path) 466 _input = toml["input"] 467 468 if "mesh" in _input: 469 # Input option 1: velocity gradient mesh + final particle locations. 470 _input = _parse_config_input_steadymesh(_input, path) 471 elif "velocity_gradient" in _input: 472 # Input option 2: velocity gradient callable + initial locations. 473 _input = _parse_config_input_calcpaths(_input, path) 474 elif "paths" in _input: 475 # Input option 3: NPZ or SCSV files with pre-computed input pathlines. 476 _input = _parse_config_input_postpaths(_input, path) 477 else: 478 _input["paths"] = None 479 480 # Output fields are optional, default: most data output, least logging output. 481 _output = toml.get("output", {}) 482 if "directory" in _output: 483 _output["directory"] = resolve_path(_output["directory"], path.parent) 484 else: 485 _output["directory"] = resolve_path(pathlib.Path.cwd()) 486 487 # Raw output means rotation matrices and grain volumes. 488 _parse_output_options(_output, "raw_output", _params["phase_assemblage"]) 489 # Diagnostic output means texture diagnostics (strength, symmetry, mean angle). 490 _parse_output_options(_output, "diagnostics", _params["phase_assemblage"]) 491 # Anisotropy output means hexagonal symmetry axis and ΔVp (%). 492 _output["anisotropy"] = _output.get( 493 "anisotropy", ["Voigt", "hexaxis", "moduli", "%decomp"] 494 ) 495 496 # Optional SCSV or NPZ pathline outputs, not sensible if there are pathline inputs. 497 if "paths" in _input and "paths" in _output: 498 _log.warning( 499 "input pathlines and output pathline filenames are mutually exclusive;" 500 + " ignoring output pathline filenames" 501 ) 502 _output["paths"] = None 503 _output["paths"] = _output.get("paths", None) 504 505 # Default logging level for all log files. 506 _output["log_level"] = _output.get("log_level", "WARNING") 507 508 return toml
Parse a TOML file containing PyDRex configuration.
511def resolve_path(path, refdir=None): 512 """Resolve relative paths and create parent directories if necessary. 513 514 Relative paths are interpreted with respect to the current working directory, 515 i.e. the directory from whith the current Python process was executed, 516 unless a specific reference directory is provided with `refdir`. 517 518 """ 519 cwd = pathlib.Path.cwd() 520 if refdir is None: 521 _path = cwd / path 522 else: 523 _path = refdir / path 524 _path.parent.mkdir(parents=True, exist_ok=True) 525 return _path.resolve()
Resolve relative paths and create parent directories if necessary.
Relative paths are interpreted with respect to the current working directory,
i.e. the directory from whith the current Python process was executed,
unless a specific reference directory is provided with refdir
.
756def stringify(s): 757 """Return a cleaned version of a string for use in filenames, etc.""" 758 return "".join(filter(lambda c: str.isidentifier(c) or str.isdecimal(c), str(s)))
Return a cleaned version of a string for use in filenames, etc.
761def data(directory): 762 """Get resolved path to a pydrex data directory.""" 763 resources = files("pydrex.data") 764 if (resources / directory).is_dir(): 765 return resolve_path(resources / directory) 766 else: 767 raise NotADirectoryError(f"{resources / directory} is not a directory")
Get resolved path to a pydrex data directory.
770@cl.contextmanager 771def logfile_enable(path, level=logging.DEBUG, mode="w"): 772 """Enable logging to a file at `path` with given `level`.""" 773 logger_file = logging.FileHandler(resolve_path(path), mode=mode) 774 logger_file.setFormatter( 775 logging.Formatter( 776 "%(levelname)s [%(asctime)s] %(name)s: %(message)s", 777 datefmt="%Y-%m-%d %H:%M:%S", 778 ) 779 ) 780 logger_file.setLevel(level) 781 _log.LOGGER.addHandler(logger_file) 782 yield 783 logger_file.close()
Enable logging to a file at path
with given level
.