Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,24 @@ This page provides an auto-generated summary of IMAS-Python's API. For more deta
and examples, refer to the relevant chapters in the main part of the
documentation.

IMAS-Python IDS manipulation
----------------------------
IMAS-Python public API
----------------------

.. currentmodule:: imas

.. autosummary::

convert_core_edge_plasma.convert_to_plasma_profiles
convert_core_edge_plasma.convert_to_plasma_sources
convert_core_edge_plasma.convert_to_plasma_transport
db_entry.DBEntry
ids_convert.convert_ids
ids_data_type.IDSDataType
ids_factory.IDSFactory
ids_toplevel.IDSToplevel
ids_identifiers.identifiers
ids_metadata.IDSMetadata
ids_metadata.IDSType
ids_primitive.IDSPrimitive
ids_structure.IDSStructure
ids_struct_array.IDSStructArray
ids_structure.IDSStructure
ids_toplevel.IDSToplevel
58 changes: 43 additions & 15 deletions imas/__init__.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,58 @@
# This file is part of IMAS-Python.
# You should have received the IMAS-Python LICENSE file with this project.

# isort: skip_file

from packaging.version import Version as _V

from ._version import version as __version__ # noqa: F401
from ._version import version_tuple # noqa: F401

# Import logging _first_
from . import setup_logging
# isort: off
from . import setup_logging # noqa: F401

# isort: on

# Import main user API objects in the imas module
# Ensure that `imas.util` is loaded when importing imas
from . import util # noqa: F401

# Public API:
from ._version import version as __version__
from ._version import version_tuple
from .convert_core_edge_plasma import (
convert_to_plasma_profiles,
convert_to_plasma_sources,
convert_to_plasma_transport,
)
from .db_entry import DBEntry
from .ids_factory import IDSFactory
from .ids_convert import convert_ids
from .ids_data_type import IDSDataType
from .ids_factory import IDSFactory
from .ids_identifiers import identifiers

# Load the IMAS-Python IMAS AL/DD core
from . import (
db_entry,
dd_zip,
util,
)
from .ids_metadata import IDSMetadata, IDSType
from .ids_primitive import IDSPrimitive
from .ids_struct_array import IDSStructArray
from .ids_structure import IDSStructure
from .ids_toplevel import IDSToplevel

PUBLISHED_DOCUMENTATION_ROOT = "https://imas-python.readthedocs.io/en/latest/"
"""URL to the published documentation."""
OLDEST_SUPPORTED_VERSION = _V("3.22.0")
"""Oldest Data Dictionary version that is supported by IMAS-Python."""

__all__ = [
"__version__",
"version_tuple",
"DBEntry",
"IDSDataType",
"IDSFactory",
"IDSMetadata",
"IDSPrimitive",
"IDSStructure",
"IDSStructArray",
"IDSToplevel",
"IDSType",
"convert_ids",
"convert_to_plasma_profiles",
"convert_to_plasma_sources",
"convert_to_plasma_transport",
"identifiers",
"PUBLISHED_DOCUMENTATION_ROOT",
"OLDEST_SUPPORTED_VERSION",
]
69 changes: 66 additions & 3 deletions imas/command/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# This file is part of IMAS-Python.
# You should have received the IMAS-Python LICENSE file with this project.
""" Main CLI entry point """
"""Main CLI entry point"""

import logging
import sys
Expand All @@ -22,7 +22,13 @@

import imas
import imas.backends.imas_core.imas_interface
from imas import DBEntry, dd_zip
from imas import (
DBEntry,
dd_zip,
convert_to_plasma_profiles,
convert_to_plasma_sources,
convert_to_plasma_transport,
)
from imas.backends.imas_core.imas_interface import ll_interface
from imas.command.db_analysis import analyze_db, process_db_analysis
from imas.command.helpers import min_version_guard, setup_rich_log_handler
Expand Down Expand Up @@ -109,6 +115,23 @@ def print_ids(uri, ids, occurrence, print_all):
imas.util.print_tree(ids_obj, not print_all)


def _check_convert_to_plasma_ids(idss_with_occurrences):
"""Check if no plasma_ IDS is present when converting a core_ or edge_ IDS."""
idsnames = {ids_name for ids_name, _ in idss_with_occurrences}
for suffix in ("_profiles", "_sources", "_transport"):
if f"plasma{suffix}" in idsnames:
if f"core{suffix}" in idsnames:
overlap = "core"
elif f"edge{suffix}" in idsnames:
overlap = "edge"
else:
continue
raise RuntimeError(
f"Cannot convert {overlap}{suffix} IDS to plasma{suffix}: "
f"there already exists a plasma{suffix} IDS in the data source."
)


@cli.command("convert", no_args_is_help=True)
@click.argument("uri_in")
@click.argument("dd_version")
Expand All @@ -127,8 +150,21 @@ def print_ids(uri, ids, occurrence, print_all):
is_flag=True,
help="Don't add provenance metadata to the converted IDS.",
)
@click.option(
"--convert-to-plasma-ids",
is_flag=True,
help="Convert core/edge profiles/transport/sources to the corresponding plasma IDS",
)
def convert_ids(
uri_in, dd_version, uri_out, ids, occurrence, quiet, timeit, no_provenance
uri_in,
dd_version,
uri_out,
ids,
occurrence,
quiet,
timeit,
no_provenance,
convert_to_plasma_ids,
):
"""Convert a Data Entry (or a single IDS) to the target DD version.

Expand Down Expand Up @@ -174,6 +210,10 @@ def convert_ids(
else:
idss_with_occurrences.append((ids_name, occurrence))

if convert_to_plasma_ids: # Sanity checks for conversion to plasma IDSs
_check_convert_to_plasma_ids(idss_with_occurrences)
next_plasma_occurrence = {"_profiles": 0, "_transport": 0, "_sources": 0}

# Create progress bar and task
columns = (
TimeElapsedColumn(),
Expand Down Expand Up @@ -209,6 +249,29 @@ def convert_ids(
provenance_origin_uri=provenance_origin_uri,
)

# Convert to plasma_profiles/plasma_sources/plasma_transport IDS
if convert_to_plasma_ids and ids_name.startswith(("core", "edge")):
suffix = ids_name[4:]
logger.info(
"Storing IDS %s/%d as plasma%s/%d",
ids_name,
occurrence,
suffix,
next_plasma_occurrence[suffix],
)
occurrence = next_plasma_occurrence[suffix]
next_plasma_occurrence[suffix] += 1

name2 = f"[bold green]plasma{suffix}[/][green]/{occurrence}[/]"
progress.update(task, description=f"Converting {name} to {name2}")
if suffix == "_profiles":
ids2 = convert_to_plasma_profiles(ids2)
elif suffix == "_sources":
ids2 = convert_to_plasma_sources(ids2)
elif suffix == "_transport":
ids2 = convert_to_plasma_transport(ids2)
name = name2

# Store in output entry:
progress.update(task, description=f"Storing {name}", advance=1)
with timer("Put", name):
Expand Down
124 changes: 124 additions & 0 deletions imas/convert_core_edge_plasma.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# This file is part of IMAS-Python.
# You should have received the IMAS-Python LICENSE file with this project.
"""Logic to convert core/edge IDSs to their corresponding plasma ID."""

from packaging.version import Version

from imas.ids_toplevel import IDSToplevel
from imas.ids_factory import IDSFactory
from imas.exception import IDSNameError
from imas.ids_convert import DDVersionMap, NBCPathMap, _copy_structure


def convert_to_plasma_profiles(
core_or_edge_profiles: IDSToplevel, *, deepcopy: bool = False
) -> IDSToplevel:
"""Convert a core_profiles or edge_profiles IDS to a plasma_profiles IDS.

The input IDS must use a Data Dictionary version for which the plasma_profiles IDS
exists (3.42.0 or newer).

Args:
core_or_edge_profiles: The core_profiles or edge_profiles IDS to be converted.

Keyword Args:
deepcopy: When True, performs a deep copy of all data. When False (default),
numpy arrays are not copied and the converted IDS shares the same underlying
data buffers.
"""
return _convert_to_plasma(core_or_edge_profiles, "profiles", deepcopy)


def convert_to_plasma_sources(
core_or_edge_sources: IDSToplevel, *, deepcopy: bool = False
) -> IDSToplevel:
"""Convert a core_sources or edge_sources IDS to a plasma_sources IDS.

The input IDS must use a Data Dictionary version for which the plasma_sources IDS
exists (3.42.0 or newer).

Args:
core_or_edge_sources: The core_sources or edge_sources IDS to be converted.

Keyword Args:
deepcopy: When True, performs a deep copy of all data. When False (default),
numpy arrays are not copied and the converted IDS shares the same underlying
data buffers.
"""
return _convert_to_plasma(core_or_edge_sources, "sources", deepcopy)


def convert_to_plasma_transport(
core_or_edge_transport: IDSToplevel, *, deepcopy: bool = False
) -> IDSToplevel:
"""Convert a core_transport or edge_transport IDS to a plasma_transport IDS.

The input IDS must use a Data Dictionary version for which the plasma_transport IDS
exists (3.42.0 or newer).

Args:
core_or_edge_transport: The core_transport or edge_transport IDS to be
converted.

Keyword Args:
deepcopy: When True, performs a deep copy of all data. When False (default),
numpy arrays are not copied and the converted IDS shares the same underlying
data buffers.
"""
return _convert_to_plasma(core_or_edge_transport, "transport", deepcopy)


class _CoreEdgePlasmaMap(DDVersionMap):
"""Subclass of DDVersionMap to generate an NBCPathMap that is suitable to copy
between a core/edge IDS and the corresponding plasma IDS."""

def __init__(self, source, target, factory):
self.ids_name = source
self.old_version = factory._etree
self.new_version = factory._etree
self.version_old = Version(factory.version)

self.old_to_new = NBCPathMap()
self.new_to_old = NBCPathMap()

old_ids_object = factory._etree.find(f"IDS[@name='{source}']")
new_ids_object = factory._etree.find(f"IDS[@name='{target}']")
self._build_map(old_ids_object, new_ids_object)


def _convert_to_plasma(source: IDSToplevel, suffix: str, deepcopy: bool) -> IDSToplevel:
# Sanity checks for input data
if not isinstance(source, IDSToplevel):
raise TypeError(
f"First argument to convert_to_plasma_{suffix} must be a core_{suffix} or "
f"edge_{suffix} of type IDSToplevel. Got a type {type(source)} instead."
)
if source.metadata.name not in [f"core_{suffix}", f"edge_{suffix}"]:
raise ValueError(
f"First argument to convert_to_plasma_{suffix} must be a core_{suffix} or "
f"edge_{suffix} IDS. Got a {source.metadata.name} IDS instead."
)
if source._lazy:
raise NotImplementedError(
"IDS conversion is not implemented for lazy-loaded IDSs"
)

# Construct target plasma_{suffix} IDS
factory: IDSFactory = source._parent
try:
target = factory.new(f"plasma_{suffix}")
except IDSNameError:
raise ValueError(
f"Cannot convert {source.metadata.name} IDS to plasma_{suffix}: the source "
f"IDS uses Data Dictionary version {factory.dd_version} which doesn't have "
f"a plasma_{suffix} IDS. Please convert the source IDS to a supported Data "
"Dictionary version using `imas.convert_ids` and try again."
) from None

# Leverage existing logic from ids_convert to do the copying
# First construct a map (to handle missing items in the target IDS)
data_map = _CoreEdgePlasmaMap(source.metadata.name, target.metadata.name, factory)
path_map = data_map.old_to_new # old = core/edge, new = plasma IDS
_copy_structure(source, target, deepcopy, path_map)

return target
2 changes: 1 addition & 1 deletion imas/db_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ def _get(
raise RuntimeError("Database entry is not open.")
if lazy and destination:
raise ValueError("Cannot supply a destination IDS when lazy loading.")
if not self._ids_factory.exists(ids_name):
if not self._ids_factory.exists(ids_name) and autoconvert:
raise IDSNameError(ids_name, self._ids_factory)

# Note: this will raise an exception when the ids/occurrence is not filled:
Expand Down
37 changes: 36 additions & 1 deletion imas/test/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest
from click.testing import CliRunner

from imas.command.cli import print_version
from imas.command.cli import print_version, convert_ids
from imas.command.db_analysis import analyze_db, process_db_analysis
from imas.db_entry import DBEntry
from imas.test.test_helpers import fill_with_random_data
Expand Down Expand Up @@ -100,3 +100,38 @@ def test_db_analysis_csv(tmp_path, requires_imas):
wall,ids_properties/version_put/data_dictionary,,1.0,1.0
""" # noqa: E501 (line too long)
)


def test_imas_convert_with_plasma(tmp_path):
in_db = tmp_path / "in"
out_db = tmp_path / "out"
with DBEntry(f"imas:hdf5?path={in_db}", "w", dd_version="3.39.0") as entry:
for core_edge in ("core", "edge"):
for suffix in ("profiles", "sources", "transport"):
ids = entry.factory.new(f"{core_edge}_{suffix}")
ids.ids_properties.homogeneous_time = 2
for i in range(4):
ids.ids_properties.comment = f"{core_edge}_{suffix} occurrence {i}"
entry.put(ids, i)

runner = CliRunner()
with runner.isolated_filesystem(tmp_path):
convert_result = runner.invoke(
convert_ids,
[
"--convert-to-plasma-ids",
f"imas:hdf5?path={in_db}",
"4.1.0",
f"imas:hdf5?path={out_db}",
],
)
assert convert_result.exit_code == 0

with DBEntry(f"imas:hdf5?path={out_db}", "r", dd_version="4.1.0") as entry:
for suffix in ("profiles", "sources", "transport"):
for i in range(8):
# We expect 8 occurrences, first 4 core, then 4 edge ones
core_edge = "core" if i < 4 else "edge"
expected_comment = f"{core_edge}_{suffix} occurrence {i % 4}"
ids = entry.get(f"plasma_{suffix}", i)
assert ids.ids_properties.comment == expected_comment
Loading
Loading