Skip to content

feat(inference-logging-client): add decode_mplog_proto_dataframe with caller-supplied schema#373

Open
dheerajchouhan08 wants to merge 7 commits into
developfrom
feat/inference-logging-decode-proto-dataframe
Open

feat(inference-logging-client): add decode_mplog_proto_dataframe with caller-supplied schema#373
dheerajchouhan08 wants to merge 7 commits into
developfrom
feat/inference-logging-decode-proto-dataframe

Conversation

@dheerajchouhan08
Copy link
Copy Markdown
Contributor

@dheerajchouhan08 dheerajchouhan08 commented May 11, 2026

Summary

  • New API decode_mplog_proto_dataframe(df, spark, schema, ...) for decoding an MPLog Spark DataFrame when the caller already has the schema and all rows share it. Format is always PROTO.
  • No schema fetch: no inference_host, no per-row schema lookup, no exposure to inference-service failures.
  • Same distributed mapInPandas pipeline as decode_mplog_dataframe (column projection, default max_records_per_batch=50, num_partitions=10000).
  • Bumps version to 0.4.0 (new public API).

Motivation

decode_mplog_dataframe always fetches the schema from the inference service per distinct (mp_config_id, version). When the caller already has the schema (e.g. from a prior get_feature_schema call, a cached artifact, or a controlled environment without inference-service access), that fetch is unnecessary and a failure mode. The new method removes that dependency entirely.

API

from inference_logging_client import decode_mplog_proto_dataframe, get_feature_schema

schema = get_feature_schema(\"my-model\", 1)
decoded_df = decode_mplog_proto_dataframe(df, spark, schema=schema)

Test plan

  • Run on a proto-encoded log dataset with a known schema; verify decoded rows and feature column values match a decode_mplog_dataframe baseline.
  • Pass needed_columns and confirm only the requested feature columns appear in the output.
  • Pass an empty schema list and confirm ValueError.
  • Pass a DataFrame missing features column and confirm ValueError.
  • Verify num_partitions and max_records_per_batch overrides take effect.

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • Added an MPLog PROTO decoder for Spark DataFrames with base64 support, optional decompression, schema normalization, and selective column decoding; exposed as a new public decoder API.
  • Examples

    • Added a full Jupyter notebook demonstrating decoder usage and tuning.
    • Added a standalone script for decoding single-row inputs and writing Parquet output.
  • Chores

    • Package version bumped to 0.3.4.

Review Change Stack

dheerajchouhan08 and others added 2 commits May 11, 2026 19:11
New entry point for decoding MPLog DataFrames when the caller already has
the schema in hand and all rows share the same schema. Skips the inference
service entirely (no fetch, no host, no negative caching needed) and always
decodes as PROTO.

- Adds decode_mplog_proto_dataframe(df, spark, schema, ...).
- Same distributed mapInPandas pipeline as decode_mplog_dataframe, including
  input-column projection and a default max_records_per_batch of 50 to keep
  multi-MB rows under Arrow's 2 GiB per-column limit.
- Bumps version to 0.4.0 (new public API).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 11, 2026

Walkthrough

A new public function decode_mplog_proto_dataframe decodes PROTO-encoded MPLog feature payloads from Spark DataFrames. It normalizes caller schema, validates inputs, builds a deterministic output schema preserving input column types, performs parallel per-batch decoding with optional zstd decompression, and returns a DataFrame with columns ordered as entity_id, row-metadata, then features. Module and project versions bumped to 0.3.4.

Changes

MPLog Proto Decoder Feature

Layer / File(s) Summary
Version & Export Declaration
py-sdk/inference_logging_client/inference_logging_client/__init__.py
__version__ bumped to 0.3.4 and __all__ updated to export decode_mplog_proto_dataframe.
Project Version
py-sdk/inference_logging_client/pyproject.toml
Project version updated from 0.3.1 to 0.3.4.
Example notebook
py-sdk/inference_logging_client/examples/decode_mplog_proto_dataframe_example.ipynb
Added notebook demonstrating installation, Spark setup, loading encoded logs, supported schema shapes, decode usage, needed_columns filtering, tuning parameters, and persisting decoded parquet.
Schema normalizer
py-sdk/inference_logging_client/inference_logging_client/__init__.py
Added _normalize_schema to accept several schema shapes and return list[FeatureInfo] with indices assigned by list order.
API Entry & Validation
py-sdk/inference_logging_client/inference_logging_client/__init__.py
Declared decode_mplog_proto_dataframe; normalizes schema, returns early for empty input, and validates presence of required input columns.
Output Schema & Projection
py-sdk/inference_logging_client/inference_logging_client/__init__.py
Builds output Spark schema (entity_id + preserved row-metadata types + feature columns filtered by needed_columns) and projects input to minimal columns for worker decoding.
Worker helpers
py-sdk/inference_logging_client/inference_logging_client/__init__.py
Adds _safe_get for robust pandas row access within the decoding closure.
mapInPandas decoding loop
py-sdk/inference_logging_client/inference_logging_client/__init__.py
Per-batch decoding: parse JSON/list payloads, derive entity ids, base64-decode and optionally zstd-decompress encoded feature bytes, call decode_proto_features, stringify/hex complex values, preserve metadata, and fill missing features with None.
Execution wiring & output ordering
py-sdk/inference_logging_client/inference_logging_client/__init__.py
Optional repartitioning, temporarily set spark.sql.execution.arrow.maxRecordsPerBatch during mapInPandas, apply worker closure, restore setting, and reorder columns to entity_id, metadata, features.
Standalone example script
py-sdk/inference_logging_client/examples/decode_single_row.py
Added script reading CSV, defining SCHEMA and CSV_SCHEMA, calling decoder, showing counts/samples, and writing decoded Parquet.

Sequence Diagram

sequenceDiagram
  participant Caller
  participant decode_mplog_proto_dataframe
  participant InputValidation
  participant SchemaBuilder
  participant mapInPandas
  participant WorkerDecoder
  participant OutputFormatter

  Caller->>decode_mplog_proto_dataframe: call(df, spark, schema, ...)
  decode_mplog_proto_dataframe->>InputValidation: validate schema, check columns
  InputValidation-->>decode_mplog_proto_dataframe: schema OK, columns exist
  decode_mplog_proto_dataframe->>SchemaBuilder: build output Spark schema
  SchemaBuilder-->>decode_mplog_proto_dataframe: entity_id + metadata + features
  decode_mplog_proto_dataframe->>mapInPandas: apply worker with projected cols
  mapInPandas->>WorkerDecoder: per-batch decoding
  WorkerDecoder->>WorkerDecoder: parse JSON, base64 decode, decompress, protobuf decode
  WorkerDecoder-->>mapInPandas: pandas DataFrame per batch
  mapInPandas-->>decode_mplog_proto_dataframe: decoded batches
  decode_mplog_proto_dataframe->>OutputFormatter: reorder columns
  OutputFormatter-->>decode_mplog_proto_dataframe: entity_id, metadata, features
  decode_mplog_proto_dataframe-->>Caller: decoded DataFrame
Loading
🚥 Pre-merge checks | ✅ 1
✅ Passed checks (1 passed)
Check name Status Explanation
Dynamic Configuration Validation ✅ Passed No changes to application-dyn-*.yml files detected. PR only modifies Python source files and Jupyter notebooks in the inference_logging_client package. Check requirements satisfied.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py`:
- Around line 631-636: The metadata passthrough currently hardcodes
"mp_config_id" in the row_metadata_columns list (and two other places), causing
custom mp_config_id_column values to be dropped; update these locations to use
the configured variable mp_config_id_column (instead of the literal
"mp_config_id") so the configured column name is preserved—search for the
variable/constant mp_config_id_column and replace the hardcoded "mp_config_id"
entries in row_metadata_columns and the other referenced metadata lists (around
the other occurrences) to reference that variable.
- Around line 753-758: The current broad except in the block around
decode_proto_features(working_data, feature_schema,
needed_columns=needed_columns) silently drops rows; change it to catch Exception
as e, increment a per-batch failure counter, and emit a warning/log that
includes the exception and enough context (e.g., which working_data item or
batch index) so failures are visible; also add or respect a "strict" flag (e.g.,
raise_on_decode_error or strict_decode) that, when true, re-raises the exception
instead of continuing. Ensure the new logic updates whatever batch-level metrics
you have and uses the module's logger to surface the error details.
- Around line 619-623: Before performing the empty-check using
df.limit(1).count() validate that the input DataFrame contains the required
columns used by the decoder (the same required column list/schema used elsewhere
in this module) and raise a clear error if any are missing; if the DataFrame is
empty, return an empty DataFrame with the correct typed schema (construct the
expected StructType from the decoder/expected column definitions and call
spark.createDataFrame([], expected_struct)) instead of StructType([]). Apply
this change to the df.limit(1).count() blocks (the one shown and the similar
blocks at 624-629 and 646-662) so empty-input behavior matches non-empty
decoding and required-column validation is not skipped.

In `@py-sdk/inference_logging_client/pyproject.toml`:
- Line 7: Bump the package minor version to reflect the new public API: change
version = "0.3.4" to "0.4.0" in pyproject.toml and update the package's
__version__ constant (wherever it is defined in the codebase) to "0.4.0" so both
pyproject.toml and the module-level __version__ remain synchronized.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 09f6de9d-1727-496d-b94d-50dd91cbd98a

📥 Commits

Reviewing files that changed from the base of the PR and between 37b045d and cde302c.

📒 Files selected for processing (2)
  • py-sdk/inference_logging_client/inference_logging_client/__init__.py
  • py-sdk/inference_logging_client/pyproject.toml

Comment on lines +619 to +623
# Check if DataFrame is empty (avoid full count: use limit(1))
if df.limit(1).count() == 0:
from pyspark.sql.types import StructType
return spark.createDataFrame([], StructType([]))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Validate required columns before empty-check, and return a typed empty DataFrame.

Current flow returns StructType([]) for empty input and skips required-column validation. That makes empty-input behavior inconsistent with non-empty decoding and can hide invalid input schemas.

Proposed fix
-    # Check if DataFrame is empty (avoid full count: use limit(1))
-    if df.limit(1).count() == 0:
-        from pyspark.sql.types import StructType
-        return spark.createDataFrame([], StructType([]))
-
     # Validate required columns
     df_columns = df.columns
     required_columns = [features_column, mp_config_id_column]
     missing_columns = [c for c in required_columns if c not in df_columns]
     if missing_columns:
         raise ValueError(f"Missing required columns: {missing_columns}")
@@
     full_schema = StructType(schema_fields)
     all_columns_ordered = ["entity_id"] + metadata_cols_in_schema + sorted(all_feature_names)
+
+    # Check if DataFrame is empty (avoid full count: use limit(1))
+    if df.limit(1).count() == 0:
+        return spark.createDataFrame([], full_schema)

Also applies to: 624-629, 646-662

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around
lines 619 - 623, Before performing the empty-check using df.limit(1).count()
validate that the input DataFrame contains the required columns used by the
decoder (the same required column list/schema used elsewhere in this module) and
raise a clear error if any are missing; if the DataFrame is empty, return an
empty DataFrame with the correct typed schema (construct the expected StructType
from the decoder/expected column definitions and call spark.createDataFrame([],
expected_struct)) instead of StructType([]). Apply this change to the
df.limit(1).count() blocks (the one shown and the similar blocks at 624-629 and
646-662) so empty-input behavior matches non-empty decoding and required-column
validation is not skipped.

Comment on lines +631 to +636
row_metadata_columns = [
"prism_ingested_at",
"prism_extracted_at",
"created_at",
"mp_config_id",
"parent_entity",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Honor mp_config_id_column in metadata passthrough.

mp_config_id_column is configurable, but metadata handling is hardcoded to "mp_config_id". With a custom column name, validation passes, but that value is dropped from output rows.

Proposed fix
-    row_metadata_columns = [
+    row_metadata_columns = [
         "prism_ingested_at",
         "prism_extracted_at",
         "created_at",
-        "mp_config_id",
+        mp_config_id_column,
         "parent_entity",
         "tracking_id",
         "user_id",
         "year",
         "month",
         "day",
         "hour",
     ]

Also applies to: 644-645, 771-773

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around
lines 631 - 636, The metadata passthrough currently hardcodes "mp_config_id" in
the row_metadata_columns list (and two other places), causing custom
mp_config_id_column values to be dropped; update these locations to use the
configured variable mp_config_id_column (instead of the literal "mp_config_id")
so the configured column name is preserved—search for the variable/constant
mp_config_id_column and replace the hardcoded "mp_config_id" entries in
row_metadata_columns and the other referenced metadata lists (around the other
occurrences) to reference that variable.

Comment on lines +753 to +758
try:
decoded_features = decode_proto_features(
working_data, feature_schema, needed_columns=needed_columns
)
except Exception:
continue
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Avoid silent row drops on decode failures.

Catching all exceptions and continuing without any signal can hide data loss. At minimum, count failures per batch and emit a warning/log (or expose a strict mode that raises).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@py-sdk/inference_logging_client/inference_logging_client/__init__.py` around
lines 753 - 758, The current broad except in the block around
decode_proto_features(working_data, feature_schema,
needed_columns=needed_columns) silently drops rows; change it to catch Exception
as e, increment a per-batch failure counter, and emit a warning/log that
includes the exception and enough context (e.g., which working_data item or
batch index) so failures are visible; also add or respect a "strict" flag (e.g.,
raise_on_decode_error or strict_decode) that, when true, re-raises the exception
instead of continuing. Ensure the new logic updates whatever batch-level metrics
you have and uses the module's logger to surface the error details.

[project]
name = "inference-logging-client"
version = "0.3.1"
version = "0.3.4"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Use a minor version bump for the new public API.

This PR adds a new exported API, so a patch bump to 0.3.4 is easy to miss for consumers following semantic-version expectations. Consider bumping to 0.4.0 (and keep __version__ aligned).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@py-sdk/inference_logging_client/pyproject.toml` at line 7, Bump the package
minor version to reflect the new public API: change version = "0.3.4" to "0.4.0"
in pyproject.toml and update the package's __version__ constant (wherever it is
defined in the codebase) to "0.4.0" so both pyproject.toml and the module-level
__version__ remain synchronized.

dheerajchouhan08 and others added 4 commits May 14, 2026 13:44
…proto_dataframe

The schema parameter now accepts any of:
- list[FeatureInfo] (unchanged)
- list[dict] with feature_name/feature_type keys
- dict {"data": [...]} matching the inference service JSON response

A dict from the inference API can be passed in directly without manual
conversion. The feature_type decoder already strips 'DataType' prefixes
and case-normalizes, so DataTypeFP32 etc. work as-is. Order is preserved
and used to assign the proto field index.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Walks through install, loading the encoded logs DataFrame, supplying the
schema in any of the three accepted shapes (inference-service JSON dict,
list of dicts, or list[FeatureInfo]), decoding, and optional
needed_columns / batch-size tuning.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Standalone runnable example that calls decode_mplog_proto_dataframe with
the exact row layout (entities/features/metadata/mp_config_id + prism
metadata) and the full feature schema for clp-organic-l2-ranker-v1-0
version=1 (256 features ending at score/pctr_score/pcvr_score/p_nqd_score).

The user pastes the full FEATURES_JSON (the encoded blobs from the row's
features column) and runs the script; output goes to /tmp/decoded_single_row.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the inline row-data placeholders with a Spark CSV reader so the
script runs end-to-end against a downloaded sample (default path:
/Users/dheerajchouhan/Downloads/test_new.csv). Accepts an optional
positional argument for a different CSV path.

The 256-feature schema for clp-organic-l2-ranker-v1-0 (version=1, PROTO)
remains inlined. Output: prints schema + sample score columns and writes
parquet to /tmp/decoded_single_row.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
py-sdk/inference_logging_client/examples/decode_single_row.py (1)

390-395: ⚡ Quick win

Ensure SparkSession is always closed.

main() can return early (Line 412) and can also throw before script exit; wrapping work in try/finally with spark.stop() makes cleanup deterministic.

Proposed cleanup fix
-    spark = (
+    spark = (
         SparkSession.builder
         .appName("decode_single_row")
         .config("spark.sql.execution.arrow.pyspark.enabled", "true")
         .getOrCreate()
     )
-
-    # multiLine=true because the JSON cells contain embedded commas/quotes.
-    df = (
-        spark.read
-        .option("header", "true")
-        .option("multiLine", "true")
-        .option("escape", '"')
-        .schema(CSV_SCHEMA)
-        .csv(csv_path)
-    )
-
-    n_in = df.count()
-    print(f"input csv: {csv_path}")
-    print(f"input rows: {n_in}")
-    print(f"schema features: {len(SCHEMA['data'])}")
-
-    if n_in == 0:
-        print("no rows in csv, exiting")
-        return
-
-    decoded = decode_mplog_proto_dataframe(df, spark, schema=SCHEMA)
+    try:
+        # multiLine=true because the JSON cells contain embedded commas/quotes.
+        df = (
+            spark.read
+            .option("header", "true")
+            .option("multiLine", "true")
+            .option("escape", '"')
+            .schema(CSV_SCHEMA)
+            .csv(csv_path)
+        )
+
+        n_in = df.count()
+        print(f"input csv: {csv_path}")
+        print(f"input rows: {n_in}")
+        print(f"schema features: {len(SCHEMA['data'])}")
+
+        if n_in == 0:
+            print("no rows in csv, exiting")
+            return
+
+        decoded = decode_mplog_proto_dataframe(df, spark, schema=SCHEMA)
+        ...
+    finally:
+        spark.stop()

Also applies to: 412-415, 444-445

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@py-sdk/inference_logging_client/examples/decode_single_row.py` around lines
390 - 395, main() currently builds a SparkSession via SparkSession.builder in
decode_single_row.py but may return early or raise before exit; wrap the
SparkSession creation and all work in a try/finally (or use try/except/finally)
so that spark.stop() is always called in the finally block; locate the
SparkSession.builder block and the main() function and ensure every early return
path (including the return at line ~412 and any exceptions) still triggers
spark.stop() by moving work into the try and calling spark.stop() in finally.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@py-sdk/inference_logging_client/examples/decode_single_row.py`:
- Line 11: The script currently hardcodes a user-specific CSV path (e.g.,
"/Users/dheerajchouhan/...") as a default; change the CLI handling so the CSV
path argument is required (or read from a clearly named environment variable)
instead of using that absolute default: update the argument parsing variable
(csv_path / the parser in main or parse_args) to not set that hardcoded default,
add a runtime check that prints a clear usage/error and exits if no path is
provided (or, if using an env var fallback, validate it and error if absent),
and remove or replace any other instances of the hardcoded path referenced later
(lines around the code that opens the CSV at 388-389) so the code always uses
the provided path variable.

---

Nitpick comments:
In `@py-sdk/inference_logging_client/examples/decode_single_row.py`:
- Around line 390-395: main() currently builds a SparkSession via
SparkSession.builder in decode_single_row.py but may return early or raise
before exit; wrap the SparkSession creation and all work in a try/finally (or
use try/except/finally) so that spark.stop() is always called in the finally
block; locate the SparkSession.builder block and the main() function and ensure
every early return path (including the return at line ~412 and any exceptions)
still triggers spark.stop() by moving work into the try and calling spark.stop()
in finally.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 3b1fe738-9674-4d83-a8e9-b941a558b14d

📥 Commits

Reviewing files that changed from the base of the PR and between b44f739 and 494c0e7.

📒 Files selected for processing (1)
  • py-sdk/inference_logging_client/examples/decode_single_row.py

pip install inference-logging-client==0.3.4 zstandard pyspark
python decode_single_row.py [path/to/logs.csv]

If no path is given, defaults to /Users/dheerajchouhan/Downloads/test_new.csv.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Avoid a user-specific absolute default CSV path.

Line 30 hardcodes /Users/dheerajchouhan/...; no-arg runs will fail on other machines. Prefer requiring an explicit path (or env var) with a clear usage error.

Proposed portability fix
-DEFAULT_CSV_PATH = "/Users/dheerajchouhan/Downloads/test_new.csv"
+DEFAULT_CSV_PATH = None
...
-    csv_path = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_CSV_PATH
+    csv_path = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_CSV_PATH
+    if not csv_path:
+        raise SystemExit("Usage: python decode_single_row.py <path/to/logs.csv>")

Also applies to: 30-30, 388-389

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@py-sdk/inference_logging_client/examples/decode_single_row.py` at line 11,
The script currently hardcodes a user-specific CSV path (e.g.,
"/Users/dheerajchouhan/...") as a default; change the CLI handling so the CSV
path argument is required (or read from a clearly named environment variable)
instead of using that absolute default: update the argument parsing variable
(csv_path / the parser in main or parse_args) to not set that hardcoded default,
add a runtime check that prints a clear usage/error and exits if no path is
provided (or, if using an env var fallback, validate it and error if absent),
and remove or replace any other instances of the hardcoded path referenced later
(lines around the code that opens the CSV at 388-389) so the code always uses
the provided path variable.

New entry point `decode_mplog_proto_csv(input_csv, output_csv, schema)` that
reads an inference-log CSV, decodes each row's encoded entities using the
caller-supplied PROTO schema, and writes one decoded row per entity to a
new CSV. No Spark, no pyarrow, no inference-service contact -- pure-Python
on csv/json/base64 plus the existing proto decoder.

Handles multi-MB features cells by lifting csv.field_size_limit.

Also adds examples/decode_csv_to_csv.py demonstrating end-to-end usage
against the 256-feature clp-organic-l2-ranker-v1-0 schema.

Verified locally: 5 input rows -> 647 decoded rows, with correct
entity_id, score/pctr_score/pcvr_score/p_nqd_score, int64 cat/scat ids,
and string portfolio names.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant