feat(inference-logging-client): add decode_mplog_proto_dataframe with caller-supplied schema#373
Conversation
New entry point for decoding MPLog DataFrames when the caller already has the schema in hand and all rows share the same schema. Skips the inference service entirely (no fetch, no host, no negative caching needed) and always decodes as PROTO. - Adds decode_mplog_proto_dataframe(df, spark, schema, ...). - Same distributed mapInPandas pipeline as decode_mplog_dataframe, including input-column projection and a default max_records_per_batch of 50 to keep multi-MB rows under Arrow's 2 GiB per-column limit. - Bumps version to 0.4.0 (new public API). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
WalkthroughA new public function decode_mplog_proto_dataframe decodes PROTO-encoded MPLog feature payloads from Spark DataFrames. It normalizes caller schema, validates inputs, builds a deterministic output schema preserving input column types, performs parallel per-batch decoding with optional zstd decompression, and returns a DataFrame with columns ordered as entity_id, row-metadata, then features. Module and project versions bumped to 0.3.4. ChangesMPLog Proto Decoder Feature
Sequence DiagramsequenceDiagram
participant Caller
participant decode_mplog_proto_dataframe
participant InputValidation
participant SchemaBuilder
participant mapInPandas
participant WorkerDecoder
participant OutputFormatter
Caller->>decode_mplog_proto_dataframe: call(df, spark, schema, ...)
decode_mplog_proto_dataframe->>InputValidation: validate schema, check columns
InputValidation-->>decode_mplog_proto_dataframe: schema OK, columns exist
decode_mplog_proto_dataframe->>SchemaBuilder: build output Spark schema
SchemaBuilder-->>decode_mplog_proto_dataframe: entity_id + metadata + features
decode_mplog_proto_dataframe->>mapInPandas: apply worker with projected cols
mapInPandas->>WorkerDecoder: per-batch decoding
WorkerDecoder->>WorkerDecoder: parse JSON, base64 decode, decompress, protobuf decode
WorkerDecoder-->>mapInPandas: pandas DataFrame per batch
mapInPandas-->>decode_mplog_proto_dataframe: decoded batches
decode_mplog_proto_dataframe->>OutputFormatter: reorder columns
OutputFormatter-->>decode_mplog_proto_dataframe: entity_id, metadata, features
decode_mplog_proto_dataframe-->>Caller: decoded DataFrame
🚥 Pre-merge checks | ✅ 1✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py`:
- Around line 631-636: The metadata passthrough currently hardcodes
"mp_config_id" in the row_metadata_columns list (and two other places), causing
custom mp_config_id_column values to be dropped; update these locations to use
the configured variable mp_config_id_column (instead of the literal
"mp_config_id") so the configured column name is preserved—search for the
variable/constant mp_config_id_column and replace the hardcoded "mp_config_id"
entries in row_metadata_columns and the other referenced metadata lists (around
the other occurrences) to reference that variable.
- Around line 753-758: The current broad except in the block around
decode_proto_features(working_data, feature_schema,
needed_columns=needed_columns) silently drops rows; change it to catch Exception
as e, increment a per-batch failure counter, and emit a warning/log that
includes the exception and enough context (e.g., which working_data item or
batch index) so failures are visible; also add or respect a "strict" flag (e.g.,
raise_on_decode_error or strict_decode) that, when true, re-raises the exception
instead of continuing. Ensure the new logic updates whatever batch-level metrics
you have and uses the module's logger to surface the error details.
- Around line 619-623: Before performing the empty-check using
df.limit(1).count() validate that the input DataFrame contains the required
columns used by the decoder (the same required column list/schema used elsewhere
in this module) and raise a clear error if any are missing; if the DataFrame is
empty, return an empty DataFrame with the correct typed schema (construct the
expected StructType from the decoder/expected column definitions and call
spark.createDataFrame([], expected_struct)) instead of StructType([]). Apply
this change to the df.limit(1).count() blocks (the one shown and the similar
blocks at 624-629 and 646-662) so empty-input behavior matches non-empty
decoding and required-column validation is not skipped.
In `@py-sdk/inference_logging_client/pyproject.toml`:
- Line 7: Bump the package minor version to reflect the new public API: change
version = "0.3.4" to "0.4.0" in pyproject.toml and update the package's
__version__ constant (wherever it is defined in the codebase) to "0.4.0" so both
pyproject.toml and the module-level __version__ remain synchronized.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 09f6de9d-1727-496d-b94d-50dd91cbd98a
📒 Files selected for processing (2)
py-sdk/inference_logging_client/inference_logging_client/__init__.pypy-sdk/inference_logging_client/pyproject.toml
| # Check if DataFrame is empty (avoid full count: use limit(1)) | ||
| if df.limit(1).count() == 0: | ||
| from pyspark.sql.types import StructType | ||
| return spark.createDataFrame([], StructType([])) | ||
|
|
There was a problem hiding this comment.
Validate required columns before empty-check, and return a typed empty DataFrame.
Current flow returns StructType([]) for empty input and skips required-column validation. That makes empty-input behavior inconsistent with non-empty decoding and can hide invalid input schemas.
Proposed fix
- # Check if DataFrame is empty (avoid full count: use limit(1))
- if df.limit(1).count() == 0:
- from pyspark.sql.types import StructType
- return spark.createDataFrame([], StructType([]))
-
# Validate required columns
df_columns = df.columns
required_columns = [features_column, mp_config_id_column]
missing_columns = [c for c in required_columns if c not in df_columns]
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}")
@@
full_schema = StructType(schema_fields)
all_columns_ordered = ["entity_id"] + metadata_cols_in_schema + sorted(all_feature_names)
+
+ # Check if DataFrame is empty (avoid full count: use limit(1))
+ if df.limit(1).count() == 0:
+ return spark.createDataFrame([], full_schema)Also applies to: 624-629, 646-662
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around
lines 619 - 623, Before performing the empty-check using df.limit(1).count()
validate that the input DataFrame contains the required columns used by the
decoder (the same required column list/schema used elsewhere in this module) and
raise a clear error if any are missing; if the DataFrame is empty, return an
empty DataFrame with the correct typed schema (construct the expected StructType
from the decoder/expected column definitions and call spark.createDataFrame([],
expected_struct)) instead of StructType([]). Apply this change to the
df.limit(1).count() blocks (the one shown and the similar blocks at 624-629 and
646-662) so empty-input behavior matches non-empty decoding and required-column
validation is not skipped.
| row_metadata_columns = [ | ||
| "prism_ingested_at", | ||
| "prism_extracted_at", | ||
| "created_at", | ||
| "mp_config_id", | ||
| "parent_entity", |
There was a problem hiding this comment.
Honor mp_config_id_column in metadata passthrough.
mp_config_id_column is configurable, but metadata handling is hardcoded to "mp_config_id". With a custom column name, validation passes, but that value is dropped from output rows.
Proposed fix
- row_metadata_columns = [
+ row_metadata_columns = [
"prism_ingested_at",
"prism_extracted_at",
"created_at",
- "mp_config_id",
+ mp_config_id_column,
"parent_entity",
"tracking_id",
"user_id",
"year",
"month",
"day",
"hour",
]Also applies to: 644-645, 771-773
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around
lines 631 - 636, The metadata passthrough currently hardcodes "mp_config_id" in
the row_metadata_columns list (and two other places), causing custom
mp_config_id_column values to be dropped; update these locations to use the
configured variable mp_config_id_column (instead of the literal "mp_config_id")
so the configured column name is preserved—search for the variable/constant
mp_config_id_column and replace the hardcoded "mp_config_id" entries in
row_metadata_columns and the other referenced metadata lists (around the other
occurrences) to reference that variable.
| try: | ||
| decoded_features = decode_proto_features( | ||
| working_data, feature_schema, needed_columns=needed_columns | ||
| ) | ||
| except Exception: | ||
| continue |
There was a problem hiding this comment.
Avoid silent row drops on decode failures.
Catching all exceptions and continuing without any signal can hide data loss. At minimum, count failures per batch and emit a warning/log (or expose a strict mode that raises).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around
lines 753 - 758, The current broad except in the block around
decode_proto_features(working_data, feature_schema,
needed_columns=needed_columns) silently drops rows; change it to catch Exception
as e, increment a per-batch failure counter, and emit a warning/log that
includes the exception and enough context (e.g., which working_data item or
batch index) so failures are visible; also add or respect a "strict" flag (e.g.,
raise_on_decode_error or strict_decode) that, when true, re-raises the exception
instead of continuing. Ensure the new logic updates whatever batch-level metrics
you have and uses the module's logger to surface the error details.
| [project] | ||
| name = "inference-logging-client" | ||
| version = "0.3.1" | ||
| version = "0.3.4" |
There was a problem hiding this comment.
Use a minor version bump for the new public API.
This PR adds a new exported API, so a patch bump to 0.3.4 is easy to miss for consumers following semantic-version expectations. Consider bumping to 0.4.0 (and keep __version__ aligned).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@py-sdk/inference_logging_client/pyproject.toml` at line 7, Bump the package
minor version to reflect the new public API: change version = "0.3.4" to "0.4.0"
in pyproject.toml and update the package's __version__ constant (wherever it is
defined in the codebase) to "0.4.0" so both pyproject.toml and the module-level
__version__ remain synchronized.
…proto_dataframe
The schema parameter now accepts any of:
- list[FeatureInfo] (unchanged)
- list[dict] with feature_name/feature_type keys
- dict {"data": [...]} matching the inference service JSON response
A dict from the inference API can be passed in directly without manual
conversion. The feature_type decoder already strips 'DataType' prefixes
and case-normalizes, so DataTypeFP32 etc. work as-is. Order is preserved
and used to assign the proto field index.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Walks through install, loading the encoded logs DataFrame, supplying the schema in any of the three accepted shapes (inference-service JSON dict, list of dicts, or list[FeatureInfo]), decoding, and optional needed_columns / batch-size tuning. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Standalone runnable example that calls decode_mplog_proto_dataframe with the exact row layout (entities/features/metadata/mp_config_id + prism metadata) and the full feature schema for clp-organic-l2-ranker-v1-0 version=1 (256 features ending at score/pctr_score/pcvr_score/p_nqd_score). The user pastes the full FEATURES_JSON (the encoded blobs from the row's features column) and runs the script; output goes to /tmp/decoded_single_row. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the inline row-data placeholders with a Spark CSV reader so the script runs end-to-end against a downloaded sample (default path: /Users/dheerajchouhan/Downloads/test_new.csv). Accepts an optional positional argument for a different CSV path. The 256-feature schema for clp-organic-l2-ranker-v1-0 (version=1, PROTO) remains inlined. Output: prints schema + sample score columns and writes parquet to /tmp/decoded_single_row. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
py-sdk/inference_logging_client/examples/decode_single_row.py (1)
390-395: ⚡ Quick winEnsure
SparkSessionis always closed.
main()can return early (Line 412) and can also throw before script exit; wrapping work intry/finallywithspark.stop()makes cleanup deterministic.Proposed cleanup fix
- spark = ( + spark = ( SparkSession.builder .appName("decode_single_row") .config("spark.sql.execution.arrow.pyspark.enabled", "true") .getOrCreate() ) - - # multiLine=true because the JSON cells contain embedded commas/quotes. - df = ( - spark.read - .option("header", "true") - .option("multiLine", "true") - .option("escape", '"') - .schema(CSV_SCHEMA) - .csv(csv_path) - ) - - n_in = df.count() - print(f"input csv: {csv_path}") - print(f"input rows: {n_in}") - print(f"schema features: {len(SCHEMA['data'])}") - - if n_in == 0: - print("no rows in csv, exiting") - return - - decoded = decode_mplog_proto_dataframe(df, spark, schema=SCHEMA) + try: + # multiLine=true because the JSON cells contain embedded commas/quotes. + df = ( + spark.read + .option("header", "true") + .option("multiLine", "true") + .option("escape", '"') + .schema(CSV_SCHEMA) + .csv(csv_path) + ) + + n_in = df.count() + print(f"input csv: {csv_path}") + print(f"input rows: {n_in}") + print(f"schema features: {len(SCHEMA['data'])}") + + if n_in == 0: + print("no rows in csv, exiting") + return + + decoded = decode_mplog_proto_dataframe(df, spark, schema=SCHEMA) + ... + finally: + spark.stop()Also applies to: 412-415, 444-445
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@py-sdk/inference_logging_client/examples/decode_single_row.py` around lines 390 - 395, main() currently builds a SparkSession via SparkSession.builder in decode_single_row.py but may return early or raise before exit; wrap the SparkSession creation and all work in a try/finally (or use try/except/finally) so that spark.stop() is always called in the finally block; locate the SparkSession.builder block and the main() function and ensure every early return path (including the return at line ~412 and any exceptions) still triggers spark.stop() by moving work into the try and calling spark.stop() in finally.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@py-sdk/inference_logging_client/examples/decode_single_row.py`:
- Line 11: The script currently hardcodes a user-specific CSV path (e.g.,
"/Users/dheerajchouhan/...") as a default; change the CLI handling so the CSV
path argument is required (or read from a clearly named environment variable)
instead of using that absolute default: update the argument parsing variable
(csv_path / the parser in main or parse_args) to not set that hardcoded default,
add a runtime check that prints a clear usage/error and exits if no path is
provided (or, if using an env var fallback, validate it and error if absent),
and remove or replace any other instances of the hardcoded path referenced later
(lines around the code that opens the CSV at 388-389) so the code always uses
the provided path variable.
---
Nitpick comments:
In `@py-sdk/inference_logging_client/examples/decode_single_row.py`:
- Around line 390-395: main() currently builds a SparkSession via
SparkSession.builder in decode_single_row.py but may return early or raise
before exit; wrap the SparkSession creation and all work in a try/finally (or
use try/except/finally) so that spark.stop() is always called in the finally
block; locate the SparkSession.builder block and the main() function and ensure
every early return path (including the return at line ~412 and any exceptions)
still triggers spark.stop() by moving work into the try and calling spark.stop()
in finally.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 3b1fe738-9674-4d83-a8e9-b941a558b14d
📒 Files selected for processing (1)
py-sdk/inference_logging_client/examples/decode_single_row.py
| pip install inference-logging-client==0.3.4 zstandard pyspark | ||
| python decode_single_row.py [path/to/logs.csv] | ||
|
|
||
| If no path is given, defaults to /Users/dheerajchouhan/Downloads/test_new.csv. |
There was a problem hiding this comment.
Avoid a user-specific absolute default CSV path.
Line 30 hardcodes /Users/dheerajchouhan/...; no-arg runs will fail on other machines. Prefer requiring an explicit path (or env var) with a clear usage error.
Proposed portability fix
-DEFAULT_CSV_PATH = "/Users/dheerajchouhan/Downloads/test_new.csv"
+DEFAULT_CSV_PATH = None
...
- csv_path = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_CSV_PATH
+ csv_path = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_CSV_PATH
+ if not csv_path:
+ raise SystemExit("Usage: python decode_single_row.py <path/to/logs.csv>")Also applies to: 30-30, 388-389
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@py-sdk/inference_logging_client/examples/decode_single_row.py` at line 11,
The script currently hardcodes a user-specific CSV path (e.g.,
"/Users/dheerajchouhan/...") as a default; change the CLI handling so the CSV
path argument is required (or read from a clearly named environment variable)
instead of using that absolute default: update the argument parsing variable
(csv_path / the parser in main or parse_args) to not set that hardcoded default,
add a runtime check that prints a clear usage/error and exits if no path is
provided (or, if using an env var fallback, validate it and error if absent),
and remove or replace any other instances of the hardcoded path referenced later
(lines around the code that opens the CSV at 388-389) so the code always uses
the provided path variable.
New entry point `decode_mplog_proto_csv(input_csv, output_csv, schema)` that reads an inference-log CSV, decodes each row's encoded entities using the caller-supplied PROTO schema, and writes one decoded row per entity to a new CSV. No Spark, no pyarrow, no inference-service contact -- pure-Python on csv/json/base64 plus the existing proto decoder. Handles multi-MB features cells by lifting csv.field_size_limit. Also adds examples/decode_csv_to_csv.py demonstrating end-to-end usage against the 256-feature clp-organic-l2-ranker-v1-0 schema. Verified locally: 5 input rows -> 647 decoded rows, with correct entity_id, score/pctr_score/pcvr_score/p_nqd_score, int64 cat/scat ids, and string portfolio names. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
decode_mplog_proto_dataframe(df, spark, schema, ...)for decoding an MPLog Spark DataFrame when the caller already has the schema and all rows share it. Format is always PROTO.inference_host, no per-row schema lookup, no exposure to inference-service failures.mapInPandaspipeline asdecode_mplog_dataframe(column projection, defaultmax_records_per_batch=50,num_partitions=10000).0.4.0(new public API).Motivation
decode_mplog_dataframealways fetches the schema from the inference service per distinct(mp_config_id, version). When the caller already has the schema (e.g. from a priorget_feature_schemacall, a cached artifact, or a controlled environment without inference-service access), that fetch is unnecessary and a failure mode. The new method removes that dependency entirely.API
Test plan
decode_mplog_dataframebaseline.needed_columnsand confirm only the requested feature columns appear in the output.ValueError.featurescolumn and confirmValueError.num_partitionsandmax_records_per_batchoverrides take effect.🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Examples
Chores