Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions src/orcapod/core/cached_function_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
from typing import TYPE_CHECKING, Any

from orcapod.core.datagrams import Datagram
from orcapod.core.function_pod import WrappedFunctionPod
from orcapod.core.result_cache import ResultCache
from orcapod.protocols.core_protocols import (
Expand Down Expand Up @@ -95,12 +96,17 @@ def process_packet(
tag, output = self._function_pod.process_packet(tag, packet, logger=logger)
if output is not None:
pf = self._function_pod.packet_function
self._cache.store(
packet,
output,
variation_data=pf.get_function_variation_data(),
execution_data=pf.get_execution_data(),
var_dg = Datagram(
pf.get_function_variation_data(),
python_schema=pf.get_function_variation_data_schema(),
data_context=pf.data_context,
)
exec_dg = Datagram(
pf.get_execution_data(),
python_schema=pf.get_execution_data_schema(),
data_context=pf.data_context,
)
self._cache.store(packet, output, var_dg, exec_dg)
output = output.with_meta_columns(**{self.RESULT_COMPUTED_FLAG: True})
return tag, output

Expand Down Expand Up @@ -128,12 +134,17 @@ async def async_process_packet(
)
if output is not None:
pf = self._function_pod.packet_function
self._cache.store(
packet,
output,
variation_data=pf.get_function_variation_data(),
execution_data=pf.get_execution_data(),
var_dg = Datagram(
pf.get_function_variation_data(),
python_schema=pf.get_function_variation_data_schema(),
data_context=pf.data_context,
)
exec_dg = Datagram(
pf.get_execution_data(),
python_schema=pf.get_execution_data_schema(),
data_context=pf.data_context,
)
self._cache.store(packet, output, var_dg, exec_dg)
output = output.with_meta_columns(**{self.RESULT_COMPUTED_FLAG: True})
return tag, output

Expand Down
44 changes: 31 additions & 13 deletions src/orcapod/core/datagrams/datagram.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,19 +185,28 @@ def _init_from_table(
meta_table.select(keep), new_meta
)

# Store meta as dict (always); Arrow table is lazy.
# Derive schema via infer_python_schema_from_pylist_data (same as DictDatagram)
# to avoid typing.Any values that arrow_schema_to_python_schema may emit.
# Store meta as dict (always); keep the Arrow table when available.
# IMPORTANT: use the universal converter (not raw to_pylist()) to
# convert Arrow → Python so that complex types round-trip correctly.
# For example, Arrow map columns (used for dict[str, str]) become
# list[struct] via to_pylist() but proper Python dicts via the
# converter. Without the converter, downstream schema inference
# produces list[Any] which cannot be converted back to Arrow.
#
# Derive the Python schema from the Arrow schema (not from inference)
# to preserve precise types even for empty containers (e.g. an empty
# dict {} would infer as dict[Any, Any] but the Arrow schema knows
# it's dict[str, str]).
if meta_table is not None and meta_table.num_columns > 0:
self._meta = meta_table.to_pylist()[0]
self._meta_python_schema = infer_python_schema_from_pylist_data(
[self._meta], default_type=str
self._meta = self.converter.arrow_table_to_python_dicts(meta_table)[0]
self._meta_python_schema = self.converter.arrow_schema_to_python_schema(
meta_table.schema
)
self._meta_table = meta_table
else:
self._meta = {}
self._meta_python_schema = Schema.empty()

self._meta_table = None # built lazily
self._meta_table = None
self._context_table = None

# ------------------------------------------------------------------
Expand Down Expand Up @@ -530,9 +539,16 @@ def with_meta_columns(self, **meta_updates: DataValue) -> Self:
}
new_d = self.copy(include_cache=False)
new_d._meta = {**self._meta, **prefixed}
new_d._meta_python_schema = infer_python_schema_from_pylist_data(
[new_d._meta], default_type=str
# Preserve existing schema types for unchanged fields. This avoids
# re-inference which can lose precise types (e.g. an empty dict {}
# infers as dict[Any, Any] instead of dict[str, str], and
# arrow-derived types like timestamps may use Any).
new_inferred = infer_python_schema_from_pylist_data(
[prefixed], default_type=str
)
merged = dict(self._meta_python_schema)
merged.update(new_inferred)
new_d._meta_python_schema = Schema(merged)
return new_d

def drop_meta_columns(self, *keys: str, ignore_missing: bool = False) -> Self:
Expand All @@ -547,9 +563,11 @@ def drop_meta_columns(self, *keys: str, ignore_missing: bool = False) -> Self:
)
new_d = self.copy(include_cache=False)
new_d._meta = {k: v for k, v in self._meta.items() if k not in prefixed}
new_d._meta_python_schema = infer_python_schema_from_pylist_data(
[new_d._meta], default_type=str
)
# Preserve existing schema types for remaining fields (see
# with_meta_columns for rationale).
new_d._meta_python_schema = Schema({
k: v for k, v in self._meta_python_schema.items() if k not in prefixed
})
return new_d

# ------------------------------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions src/orcapod/core/executors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from orcapod.core.executors.base import PythonFunctionExecutorBase
from orcapod.core.executors.local import LocalExecutor
from orcapod.core.executors.local import LocalPythonFunctionExecutor

__all__ = [
"PythonFunctionExecutorBase",
"LocalExecutor",
"LocalPythonFunctionExecutor",
]
10 changes: 9 additions & 1 deletion src/orcapod/core/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from collections.abc import Callable
from typing import TYPE_CHECKING, Any

from orcapod.types import Schema

if TYPE_CHECKING:
from orcapod.protocols.observability_protocols import PacketExecutionLoggerProtocol

Expand Down Expand Up @@ -105,10 +107,16 @@ async def async_execute_callable(
"""
return self.execute_callable(fn, kwargs, executor_options, logger=logger)

def get_execution_data(self) -> dict[str, Any]:
def get_executor_data(self) -> dict[str, Any]:
"""Return metadata describing the execution environment.

Recorded alongside results for observability but does not affect
content or pipeline hashes. The default returns the executor type id.
"""
return {"executor_type": self.executor_type_id}

def get_executor_data_schema(self) -> Schema:
"""Return schema for the data returned by ``get_executor_data``."""
return Schema({
"executor_type": str
})
12 changes: 7 additions & 5 deletions src/orcapod/core/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from orcapod.protocols.observability_protocols import PacketExecutionLoggerProtocol


class LocalExecutor(PythonFunctionExecutorBase):
class LocalPythonFunctionExecutor(PythonFunctionExecutorBase):
"""Default executor -- runs the packet function directly in the current process.

Supports all packet function types (``supported_function_type_ids``
Expand Down Expand Up @@ -127,9 +127,11 @@ async def async_execute_callable(
logger.record(**captured.as_dict())
return raw_result

def with_options(self, **opts: Any) -> LocalExecutor:
"""Return a new ``LocalExecutor``.
def with_options(self, **opts: Any) -> LocalPythonFunctionExecutor:
"""Return a new ``LocalPythonFunctionExecutor``.

``LocalExecutor`` carries no state, so options are ignored.
``LocalPythonFunctionExecutor`` carries no state, so options are ignored.
"""
return LocalExecutor()
return LocalPythonFunctionExecutor()


31 changes: 23 additions & 8 deletions src/orcapod/core/executors/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import TYPE_CHECKING, Any

from orcapod.core.executors.base import PythonFunctionExecutorBase
from orcapod.types import Schema
from orcapod.core.executors.capture_wrapper import make_capture_wrapper

if TYPE_CHECKING:
Expand Down Expand Up @@ -333,12 +334,26 @@ def with_options(self, **opts: Any) -> RayExecutor:
**merged,
)

def get_execution_data(self) -> dict[str, Any]:
data: dict[str, Any] = {
"executor_type": self.executor_type_id,
def get_executor_data(self) -> dict[str, Any]:
executor_data = super().get_executor_data()
executor_data.update({
"ray_address": self._ray_address or "auto",
**self._remote_opts,
}
if self._runtime_env is not None:
data["runtime_env"] = True # flag presence without dumping contents
return data
"remote_opts": self._remote_opts,
"runtime_env": self._runtime_env is not None,
})
return executor_data

def get_executor_data_schema(self) -> Schema:
"""Schema reflecting actual return types of ``get_executor_data``.

Note: ``remote_opts`` values are ``Any`` (ints, nested dicts, etc.)
but the PacketFunction layer stringifies them into ``dict[str, str]``
before storage. The schema here describes the pre-stringification
executor output.
"""
return Schema({
"executor_type": str,
"ray_address": str,
"remote_opts": dict[str, Any],
"runtime_env": bool,
})
13 changes: 12 additions & 1 deletion src/orcapod/core/nodes/function_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -1206,7 +1206,18 @@ def as_table(

converter = self.data_context.type_converter

struct_packets = converter.python_dicts_to_struct_dicts(all_packets)
# Derive the Python schema from the Arrow schema when available,
# rather than re-inferring from dict values. This preserves precise
# types for empty containers (e.g. {} infers as dict[Any, Any] but
# the Arrow schema knows it's dict[str, str]).
packet_python_schema = (
converter.arrow_schema_to_python_schema(packet_schema)
if packet_schema is not None
else None
)
struct_packets = converter.python_dicts_to_struct_dicts(
all_packets, python_schema=packet_python_schema
)
all_tags_as_tables: pa.Table = pa.Table.from_pylist(
all_tags, schema=tag_schema
)
Expand Down
Loading
Loading