From da18108ad98b074e6fe7445d3f44bed8af0efd30 Mon Sep 17 00:00:00 2001 From: dheerajchouhan08 Date: Mon, 11 May 2026 19:11:46 +0530 Subject: [PATCH 1/7] feat: add decode_mplog_proto_dataframe with caller-supplied schema New entry point for decoding MPLog DataFrames when the caller already has the schema in hand and all rows share the same schema. Skips the inference service entirely (no fetch, no host, no negative caching needed) and always decodes as PROTO. - Adds decode_mplog_proto_dataframe(df, spark, schema, ...). - Same distributed mapInPandas pipeline as decode_mplog_dataframe, including input-column projection and a default max_records_per_batch of 50 to keep multi-MB rows under Arrow's 2 GiB per-column limit. - Bumps version to 0.4.0 (new public API). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../inference_logging_client/__init__.py | 249 +++++++++++++++++- .../inference_logging_client/pyproject.toml | 2 +- 2 files changed, 249 insertions(+), 2 deletions(-) diff --git a/py-sdk/inference_logging_client/inference_logging_client/__init__.py b/py-sdk/inference_logging_client/inference_logging_client/__init__.py index 7efa8f2d..656c2d9e 100644 --- a/py-sdk/inference_logging_client/inference_logging_client/__init__.py +++ b/py-sdk/inference_logging_client/inference_logging_client/__init__.py @@ -48,7 +48,7 @@ from .types import FORMAT_TYPE_MAP, DecodedMPLog, FeatureInfo, Format from .utils import format_dataframe_floats, get_format_name, unpack_metadata_byte -__version__ = "0.3.1" +__version__ = "0.4.0" # Maximum supported schema version (4 bits = 0-15) _MAX_SCHEMA_VERSION = 15 @@ -56,6 +56,7 @@ __all__ = [ "decode_mplog", "decode_mplog_dataframe", + "decode_mplog_proto_dataframe", "get_mplog_metadata", "get_feature_schema", "clear_schema_cache", @@ -554,3 +555,249 @@ def _decode_batch(iterator): feature_cols = [c for c in result_columns if c not in metadata_cols] column_order = metadata_cols + feature_cols return result_df.select(column_order) + + +def decode_mplog_proto_dataframe( + df: "SparkDataFrame", + spark: "SparkSession", + schema: list, + decompress: bool = True, + features_column: str = "features", + mp_config_id_column: str = "mp_config_id", + num_partitions: Optional[int] = None, + max_records_per_batch: Optional[int] = None, + needed_columns: Optional[Collection[str]] = None, +) -> "SparkDataFrame": + """ + Decode MPLog features from a Spark DataFrame using a caller-supplied schema. + + Format is always PROTO. No schema fetch is performed and no inference service + is contacted. The caller is responsible for passing the correct schema for the + encoded payloads in the DataFrame; all rows are decoded against the same schema. + + Expected DataFrame columns: + - features (encoded payloads; JSON-array-of-base64 strings or pre-parsed list of dicts) + - mp_config_id + - optional: entities, parent_entity + - optional row-metadata: prism_ingested_at, prism_extracted_at, created_at, + tracking_id, user_id, year, month, day, hour + + Args: + df: Input Spark DataFrame. + spark: The SparkSession to use for creating the result DataFrame. + schema: Pre-fetched schema (list of FeatureInfo) applied to all rows. + decompress: Whether to attempt zstd decompression on each encoded payload. + features_column: Name of the column containing encoded features (default: "features"). + mp_config_id_column: Name of the column containing model proxy config ID + (default: "mp_config_id"). Pass-through column; not used to look up schema. + num_partitions: Number of partitions for distributed decode. Default 10000. + max_records_per_batch: Max rows per Arrow batch in mapInPandas. Default 50. + needed_columns: Optional set or list of feature names to include. If provided, + only these columns are decoded and returned. + + Returns: + Spark DataFrame with entity_id as first column, followed by available row-metadata + columns, followed by feature columns. + + Example: + >>> from pyspark.sql import SparkSession + >>> from inference_logging_client import ( + ... decode_mplog_proto_dataframe, get_feature_schema, + ... ) + >>> spark = SparkSession.builder.appName("decode").getOrCreate() + >>> df = spark.read.parquet("logs.parquet") + >>> schema = get_feature_schema("my-model", 1) + >>> decoded_df = decode_mplog_proto_dataframe(df, spark, schema=schema) + >>> decoded_df.show() + """ + import base64 + import json + + if not isinstance(schema, list) or not schema: + raise ValueError("schema must be a non-empty list of FeatureInfo") + + # Check if DataFrame is empty (avoid full count: use limit(1)) + if df.limit(1).count() == 0: + from pyspark.sql.types import StructType + return spark.createDataFrame([], StructType([])) + + # Validate required columns + df_columns = df.columns + required_columns = [features_column, mp_config_id_column] + missing_columns = [c for c in required_columns if c not in df_columns] + if missing_columns: + raise ValueError(f"Missing required columns: {missing_columns}") + + row_metadata_columns = [ + "prism_ingested_at", + "prism_extracted_at", + "created_at", + "mp_config_id", + "parent_entity", + "tracking_id", + "user_id", + "year", + "month", + "day", + "hour", + ] + _reserved_columns = {"entity_id"} | {c for c in row_metadata_columns if c in df_columns} + + # Build output schema: entity_id + available metadata cols + feature names + all_feature_names = {f.name for f in schema} + if needed_columns is not None: + all_feature_names = all_feature_names & set(needed_columns) + metadata_cols_in_schema = [c for c in row_metadata_columns if c in df_columns] + + from pyspark.sql.types import StringType, StructField, StructType + input_field_map = {field.name: field.dataType for field in df.schema.fields} + schema_fields = [StructField("entity_id", StringType(), True)] + for c in metadata_cols_in_schema: + original_type = input_field_map.get(c, StringType()) + schema_fields.append(StructField(c, original_type, True)) + for c in sorted(all_feature_names): + schema_fields.append(StructField(c, StringType(), True)) + full_schema = StructType(schema_fields) + all_columns_ordered = ["entity_id"] + metadata_cols_in_schema + sorted(all_feature_names) + + # Project to only the columns we actually need on workers + projected_cols = [ + c for c in ( + [features_column, mp_config_id_column, "entities"] + row_metadata_columns + ) + if c in df_columns + ] + seen = set() + projected_cols = [c for c in projected_cols if not (c in seen or seen.add(c))] + df_projected = df.select(*projected_cols) + + # Capture for closure + feature_schema = schema + + def _safe_get(row, col, default=None): + try: + val = row[col] if col in row.index else getattr(row, col, default) + if hasattr(val, "isna") and val.isna(): + return default + return val + except (KeyError, AttributeError): + return default + + def _decode_batch(iterator): + import pandas as pd + for pdf in iterator: + out_rows = [] + for idx, row in pdf.iterrows(): + features_data = _safe_get(row, features_column) + if features_data is None: + continue + if isinstance(features_data, str): + try: + features_list = json.loads(features_data) + except (json.JSONDecodeError, ValueError, TypeError): + continue + else: + features_list = features_data + if not isinstance(features_list, list): + continue + entities_val = None + if "entities" in df_columns: + entities_raw = _safe_get(row, "entities") + if entities_raw is not None: + if isinstance(entities_raw, str): + try: + entities_val = json.loads(entities_raw) + except (json.JSONDecodeError, ValueError): + entities_val = [entities_raw] + elif isinstance(entities_raw, list): + entities_val = entities_raw + else: + entities_val = [entities_raw] + parent_entity_val = None + if "parent_entity" in df_columns: + parent_val = _safe_get(row, "parent_entity") + if parent_val is not None: + if isinstance(parent_val, str): + try: + parent_val = json.loads(parent_val) + except (json.JSONDecodeError, ValueError): + parent_val = [parent_val] + if isinstance(parent_val, list): + parent_entity_val = ( + parent_val[0] if len(parent_val) == 1 + else str(parent_val) if len(parent_val) > 1 + else None + ) + else: + parent_entity_val = parent_val + for i, feature_item in enumerate(features_list): + if not isinstance(feature_item, dict): + continue + entity_id = ( + str(entities_val[i]) + if entities_val and i < len(entities_val) + else f"entity_{i}" + ) + encoded_features_b64 = feature_item.get("encoded_features", "") + if not encoded_features_b64: + continue + try: + encoded_bytes = base64.b64decode(encoded_features_b64) + except (ValueError, TypeError): + continue + if len(encoded_bytes) == 0: + continue + working_data = encoded_bytes + if decompress: + working_data = _decompress_zstd(encoded_bytes) + try: + decoded_features = decode_proto_features( + working_data, feature_schema, needed_columns=needed_columns + ) + except Exception: + continue + result_row = {"entity_id": entity_id} + for k, v in decoded_features.items(): + if k in _reserved_columns: + continue + if v is None: + result_row[k] = None + elif isinstance(v, (list, tuple)): + result_row[k] = str(v) + elif isinstance(v, bytes): + result_row[k] = v.hex() + else: + result_row[k] = str(v) + for col in row_metadata_columns: + if col in df_columns: + result_row[col] = _safe_get(row, col) + if parent_entity_val is not None: + result_row["parent_entity"] = parent_entity_val + for col in all_columns_ordered: + if col not in result_row: + result_row[col] = None + out_rows.append(result_row) + if out_rows: + out_pdf = pd.DataFrame(out_rows, columns=all_columns_ordered) + yield out_pdf + + n_partitions = num_partitions if num_partitions is not None else 10000 + df_repart = df_projected.repartition(n_partitions) + + batch_limit = max_records_per_batch if max_records_per_batch is not None else 50 + prev_max_records = spark.conf.get("spark.sql.execution.arrow.maxRecordsPerBatch") + spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", str(batch_limit)) + try: + result_df = df_repart.mapInPandas(_decode_batch, full_schema) + finally: + spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", prev_max_records or "10000") + + # Reorder columns: entity_id first, then metadata, then features + result_columns = result_df.columns + metadata_cols = ["entity_id"] + for col in row_metadata_columns: + if col in result_columns: + metadata_cols.append(col) + feature_cols = [c for c in result_columns if c not in metadata_cols] + column_order = metadata_cols + feature_cols + return result_df.select(column_order) diff --git a/py-sdk/inference_logging_client/pyproject.toml b/py-sdk/inference_logging_client/pyproject.toml index 13366434..454f3622 100644 --- a/py-sdk/inference_logging_client/pyproject.toml +++ b/py-sdk/inference_logging_client/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "inference-logging-client" -version = "0.3.1" +version = "0.4.0" description = "Decode MPLog feature logs from proto, arrow, or parquet format" readme = "readme.md" requires-python = ">=3.8" From cde302c4a7e4d2cf33231c1611d947a2f9f7db7d Mon Sep 17 00:00:00 2001 From: dheerajchouhan08 Date: Mon, 11 May 2026 19:12:44 +0530 Subject: [PATCH 2/7] chore: bump version to 0.3.4 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../inference_logging_client/__init__.py | 2 +- py-sdk/inference_logging_client/pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/py-sdk/inference_logging_client/inference_logging_client/__init__.py b/py-sdk/inference_logging_client/inference_logging_client/__init__.py index 656c2d9e..6664581e 100644 --- a/py-sdk/inference_logging_client/inference_logging_client/__init__.py +++ b/py-sdk/inference_logging_client/inference_logging_client/__init__.py @@ -48,7 +48,7 @@ from .types import FORMAT_TYPE_MAP, DecodedMPLog, FeatureInfo, Format from .utils import format_dataframe_floats, get_format_name, unpack_metadata_byte -__version__ = "0.4.0" +__version__ = "0.3.4" # Maximum supported schema version (4 bits = 0-15) _MAX_SCHEMA_VERSION = 15 diff --git a/py-sdk/inference_logging_client/pyproject.toml b/py-sdk/inference_logging_client/pyproject.toml index 454f3622..ad55a935 100644 --- a/py-sdk/inference_logging_client/pyproject.toml +++ b/py-sdk/inference_logging_client/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "inference-logging-client" -version = "0.4.0" +version = "0.3.4" description = "Decode MPLog feature logs from proto, arrow, or parquet format" readme = "readme.md" requires-python = ">=3.8" From 69182e40d8e3b8cc1559b505847dd5f35f9cf3fc Mon Sep 17 00:00:00 2001 From: dheerajchouhan08 Date: Thu, 14 May 2026 13:44:14 +0530 Subject: [PATCH 3/7] feat: accept inference-service JSON shape for schema in decode_mplog_proto_dataframe The schema parameter now accepts any of: - list[FeatureInfo] (unchanged) - list[dict] with feature_name/feature_type keys - dict {"data": [...]} matching the inference service JSON response A dict from the inference API can be passed in directly without manual conversion. The feature_type decoder already strips 'DataType' prefixes and case-normalizes, so DataTypeFP32 etc. work as-is. Order is preserved and used to assign the proto field index. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../inference_logging_client/__init__.py | 57 +++++++++++++++++-- 1 file changed, 53 insertions(+), 4 deletions(-) diff --git a/py-sdk/inference_logging_client/inference_logging_client/__init__.py b/py-sdk/inference_logging_client/inference_logging_client/__init__.py index 6664581e..1dd010f2 100644 --- a/py-sdk/inference_logging_client/inference_logging_client/__init__.py +++ b/py-sdk/inference_logging_client/inference_logging_client/__init__.py @@ -557,10 +557,56 @@ def _decode_batch(iterator): return result_df.select(column_order) +def _normalize_schema(schema) -> "list[FeatureInfo]": + """Accept either a list[FeatureInfo], a list of raw dicts, or the + inference-service JSON shape ``{"data": [...]}`` and return list[FeatureInfo]. + + Raw dict items must carry ``feature_name`` and ``feature_type`` keys + (matching the inference service response). Order is preserved and used + to assign the ``index`` of each FeatureInfo, which is the proto field + position used by the decoder. + """ + if schema is None: + raise ValueError("schema must not be None") + + # Unwrap {"data": [...]} JSON shape + if isinstance(schema, dict): + if "data" not in schema: + raise ValueError("schema dict must contain a 'data' key") + items = schema["data"] + else: + items = schema + + if not isinstance(items, list) or not items: + raise ValueError("schema must be a non-empty list (or dict with non-empty 'data')") + + # Already FeatureInfo objects + if all(isinstance(it, FeatureInfo) for it in items): + return items + + normalized: list[FeatureInfo] = [] + for idx, item in enumerate(items): + if isinstance(item, FeatureInfo): + normalized.append(item) + continue + if not isinstance(item, dict): + raise ValueError( + f"schema item at index {idx} must be FeatureInfo or dict, got {type(item).__name__}" + ) + name = item.get("feature_name") or item.get("name") + feature_type = item.get("feature_type") + if not name or not feature_type: + raise ValueError( + f"schema item at index {idx} missing 'feature_name'/'name' or 'feature_type'" + ) + normalized.append(FeatureInfo(name=name, feature_type=feature_type, index=idx)) + return normalized + + def decode_mplog_proto_dataframe( df: "SparkDataFrame", spark: "SparkSession", - schema: list, + schema, decompress: bool = True, features_column: str = "features", mp_config_id_column: str = "mp_config_id", @@ -585,7 +631,11 @@ def decode_mplog_proto_dataframe( Args: df: Input Spark DataFrame. spark: The SparkSession to use for creating the result DataFrame. - schema: Pre-fetched schema (list of FeatureInfo) applied to all rows. + schema: Schema applied to all rows. Accepted shapes: + - list[FeatureInfo] + - list[dict] with keys 'feature_name' (or 'name') and 'feature_type' + - dict {"data": [...]} matching the inference service JSON response + Order is used to assign the proto field index; do not reorder. decompress: Whether to attempt zstd decompression on each encoded payload. features_column: Name of the column containing encoded features (default: "features"). mp_config_id_column: Name of the column containing model proxy config ID @@ -613,8 +663,7 @@ def decode_mplog_proto_dataframe( import base64 import json - if not isinstance(schema, list) or not schema: - raise ValueError("schema must be a non-empty list of FeatureInfo") + schema = _normalize_schema(schema) # Check if DataFrame is empty (avoid full count: use limit(1)) if df.limit(1).count() == 0: From b44f7391c92eae7132764ed2e86a2a7efbd6b802 Mon Sep 17 00:00:00 2001 From: dheerajchouhan08 Date: Thu, 14 May 2026 13:54:00 +0530 Subject: [PATCH 4/7] docs: add example notebook for decode_mplog_proto_dataframe Walks through install, loading the encoded logs DataFrame, supplying the schema in any of the three accepted shapes (inference-service JSON dict, list of dicts, or list[FeatureInfo]), decoding, and optional needed_columns / batch-size tuning. Co-Authored-By: Claude Opus 4.7 (1M context) --- ...decode_mplog_proto_dataframe_example.ipynb | 285 ++++++++++++++++++ 1 file changed, 285 insertions(+) create mode 100644 py-sdk/inference_logging_client/examples/decode_mplog_proto_dataframe_example.ipynb diff --git a/py-sdk/inference_logging_client/examples/decode_mplog_proto_dataframe_example.ipynb b/py-sdk/inference_logging_client/examples/decode_mplog_proto_dataframe_example.ipynb new file mode 100644 index 00000000..67d7f882 --- /dev/null +++ b/py-sdk/inference_logging_client/examples/decode_mplog_proto_dataframe_example.ipynb @@ -0,0 +1,285 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# `decode_mplog_proto_dataframe` — example notebook\n", + "\n", + "This notebook demonstrates the `decode_mplog_proto_dataframe` API added in `inference-logging-client` 0.3.4.\n", + "\n", + "Use this method when:\n", + "- All rows in the input DataFrame are encoded as **proto** (not arrow / parquet).\n", + "- You already have the feature schema in hand (from your own API, a cached JSON, or a prior `get_feature_schema` call).\n", + "- You want to avoid contacting the inference service at decode time (no schema fetch, no positive cache, no negative cache, no per-worker fallback).\n", + "\n", + "Compared to `decode_mplog_dataframe`, this method skips: driver-side `distinct().collect()` for schema discovery, per-row metadata-byte parsing, format dispatch, and the `schema_cache.get()` per row. Same distributed `mapInPandas` pipeline, same Arrow 2 GiB safety (default `max_records_per_batch=50`), same input-column projection." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1. Install\n", + "\n", + "On Databricks, install at the cluster or notebook level:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%pip install --upgrade inference-logging-client==0.3.4 zstandard" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dbutils.library.restartPython() # Databricks only" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2. Imports and Spark session" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql import SparkSession\n", + "from pyspark.sql import functions as F\n", + "\n", + "from inference_logging_client import decode_mplog_proto_dataframe\n", + "\n", + "spark = SparkSession.builder.appName(\"decode_mplog_proto_example\").getOrCreate()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3. Load the encoded logs DataFrame\n", + "\n", + "Adjust the table name / path and the partition filter for your environment. The DataFrame must have at least a `features` column (the encoded payloads) and a `mp_config_id` column. Optional columns that get passed through if present: `entities`, `parent_entity`, `prism_ingested_at`, `prism_extracted_at`, `created_at`, `tracking_id`, `user_id`, `year`, `month`, `day`, `hour`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "logs_df = (\n", + " spark.table(\"silver.ML_Platform__model_proxy_inference_logs\")\n", + " .filter(F.col(\"mp_config_id\") == \"my-model-proxy-id\")\n", + " .filter(F.concat_ws(\"-\", \"year\", \"month\", \"day\") == \"2026-05-09\")\n", + ")\n", + "\n", + "logs_df.printSchema()\n", + "logs_df.limit(3).show(truncate=80)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 4. Provide the feature schema\n", + "\n", + "The `schema` argument accepts three shapes — pick whichever your source already produces:\n", + "\n", + "**Option A — inference-service JSON response shape (most common):**\n", + "```python\n", + "schema = {\"data\": [{\"feature_name\": \"...\", \"feature_type\": \"DataTypeFP32\", \"feature_size\": 1}, ...]}\n", + "```\n", + "\n", + "**Option B — plain list of dicts:**\n", + "```python\n", + "schema = [{\"feature_name\": \"...\", \"feature_type\": \"DataTypeFP32\"}, ...]\n", + "```\n", + "\n", + "**Option C — typed `FeatureInfo` list (returned by `get_feature_schema`):**\n", + "```python\n", + "from inference_logging_client import get_feature_schema\n", + "schema = get_feature_schema(\"my-model-proxy-id\", 1)\n", + "```\n", + "\n", + "Array order is preserved and used as the proto field index — do not reorder." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Example: schema fetched from your own API, in the inference-service JSON shape.\n", + "schema = {\n", + " \"data\": [\n", + " {\"feature_name\": \"user:derived_2_fp32:log_views_56day\", \"feature_type\": \"DataTypeFP32\", \"feature_size\": 1},\n", + " {\"feature_name\": \"user:derived_2_fp32:orders_by_clicks_laplace_56day\", \"feature_type\": \"DataTypeFP32\", \"feature_size\": 1},\n", + " {\"feature_name\": \"user:derived_2_fp32:avg_click_catalog_nqd_30day\", \"feature_type\": \"DataTypeFP32\", \"feature_size\": 1},\n", + " {\"feature_name\": \"user:derived_2_fp32:avg_order__catalog_arp_sscat_percentile__90day\", \"feature_type\": \"DataTypeFP32\", \"feature_size\": 1},\n", + " {\"feature_name\": \"user:derived_2_fp32:browse_time_last_7day\", \"feature_type\": \"DataTypeFP32\", \"feature_size\": 1},\n", + " {\"feature_name\": \"user:derived_2_fp32:clicks_by_views_laplace_28day\", \"feature_type\": \"DataTypeFP32\", \"feature_size\": 1},\n", + " {\"feature_name\": \"user:derived_2_fp32:engagement_click_percent\", \"feature_type\": \"DataTypeFP32\", \"feature_size\": 1},\n", + " {\"feature_name\": \"user:derived_2_fp32:retention_90_days\", \"feature_type\": \"DataTypeFP32\", \"feature_size\": 1},\n", + " {\"feature_name\": \"user:derived_2_fp32:user__nqp\", \"feature_type\": \"DataTypeFP32\", \"feature_size\": 1},\n", + " {\"feature_name\": \"user:derived_2_fp32:user__nqp_by_nqd\", \"feature_type\": \"DataTypeFP32\", \"feature_size\": 1},\n", + " ]\n", + "}\n", + "\n", + "print(f\"schema has {len(schema['data'])} features\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 5. Decode\n", + "\n", + "Defaults that matter:\n", + "- `decompress=True` — automatically zstd-decompresses each payload if needed.\n", + "- `num_partitions=10000` — keeps each worker task small when rows carry multi-MB payloads.\n", + "- `max_records_per_batch=50` — keeps each Arrow batch under the 2 GiB per-column limit.\n", + "- `needed_columns=None` — decode all schema columns; pass a list/set to project early." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "decoded_df = decode_mplog_proto_dataframe(\n", + " df=logs_df,\n", + " spark=spark,\n", + " schema=schema,\n", + ")\n", + "\n", + "decoded_df.printSchema()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "decoded_df.limit(5).show(truncate=80)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 6. Decode only the columns you need (optional)\n", + "\n", + "Pass `needed_columns` so workers skip decoding and emitting features you don't care about. Significantly reduces output size and worker memory when the schema is wide." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "subset_df = decode_mplog_proto_dataframe(\n", + " df=logs_df,\n", + " spark=spark,\n", + " schema=schema,\n", + " needed_columns={\n", + " \"user:derived_2_fp32:log_views_56day\",\n", + " \"user:derived_2_fp32:retention_90_days\",\n", + " },\n", + ")\n", + "\n", + "subset_df.printSchema()\n", + "subset_df.limit(5).show(truncate=80)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 7. Tuning knobs (optional)\n", + "\n", + "Adjust if your rows are unusually small or unusually large:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "tuned_df = decode_mplog_proto_dataframe(\n", + " df=logs_df,\n", + " spark=spark,\n", + " schema=schema,\n", + " num_partitions=20000, # raise if executors are CPU-starved\n", + " max_records_per_batch=20, # lower further if you still hit Arrow overflow\n", + " decompress=True,\n", + ")\n", + "\n", + "tuned_df.limit(3).show(truncate=80)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 8. Persist the decoded output" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "(\n", + " decoded_df\n", + " .write\n", + " .mode(\"overwrite\")\n", + " .partitionBy(\"year\", \"month\", \"day\")\n", + " .parquet(\"s3://your-bucket/decoded_mplog/my-model-proxy-id/\")\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Notes\n", + "\n", + "- **Format is always PROTO.** If your logs use arrow or parquet encoding, use `decode_mplog_dataframe` instead.\n", + "- **Schema is applied to every row.** All rows in the input DataFrame must have been encoded against the schema you pass. If you have multiple `(mp_config_id, version)` combos in the same DataFrame and they use different schemas, filter and decode each group separately.\n", + "- **Type strings.** `DataTypeFP32`, `FP32`, `fp32` — all work. The decoder strips the `DataType` prefix and case-normalizes internally.\n", + "- **`feature_size`.** Ignored. The decoder infers scalar vs vector from the type name (`FP32` vs `FP32Vector`)." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.10" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} From 66967761d565ca4712fc05fe0176b8254c5affb7 Mon Sep 17 00:00:00 2001 From: dheerajchouhan08 Date: Thu, 14 May 2026 17:39:44 +0530 Subject: [PATCH 5/7] docs: add single-row decode script with full 256-feature schema Standalone runnable example that calls decode_mplog_proto_dataframe with the exact row layout (entities/features/metadata/mp_config_id + prism metadata) and the full feature schema for clp-organic-l2-ranker-v1-0 version=1 (256 features ending at score/pctr_score/pcvr_score/p_nqd_score). The user pastes the full FEATURES_JSON (the encoded blobs from the row's features column) and runs the script; output goes to /tmp/decoded_single_row. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../examples/decode_single_row.py | 501 ++++++++++++++++++ 1 file changed, 501 insertions(+) create mode 100644 py-sdk/inference_logging_client/examples/decode_single_row.py diff --git a/py-sdk/inference_logging_client/examples/decode_single_row.py b/py-sdk/inference_logging_client/examples/decode_single_row.py new file mode 100644 index 00000000..9709f257 --- /dev/null +++ b/py-sdk/inference_logging_client/examples/decode_single_row.py @@ -0,0 +1,501 @@ +""" +Standalone script that decodes ONE inference-log row using +`decode_mplog_proto_dataframe` with the full 256-feature schema. + +Run on Databricks (or any pyspark environment with the package installed): + + pip install inference-logging-client==0.3.4 zstandard pyspark + python decode_single_row.py + +The script: + 1. Builds a one-row Spark DataFrame with the exact `entities`, `features`, + and `metadata` strings provided. + 2. Calls `decode_mplog_proto_dataframe(df, spark, schema=SCHEMA)`. + 3. Prints the decoded DataFrame (one row per entity) and writes parquet + to `/tmp/decoded_single_row/`. + +If the `features` JSON or `entities` JSON is very long, paste the full +strings into the two placeholders below (FEATURES_JSON and ENTITIES_JSON). +""" + +from pyspark.sql import Row, SparkSession + +from inference_logging_client import decode_mplog_proto_dataframe + + +# --------------------------------------------------------------------------- +# 1. Row data +# --------------------------------------------------------------------------- + +PRISM_INGESTED_AT = 1778093134932 +PRISM_EXTRACTED_AT = 1778093172000 +CREATED_AT = "2026-05-07T00:15:34.000+05:30" + +# Fill in the actual mp_config_id for this row (pass-through column). +MP_CONFIG_ID = "clp-organic-l2-ranker-v1-0" + +YEAR, MONTH, DAY, HOUR = "2026", "05", "07", "00" + +# Metadata byte = 0x04 -> compression=False, version=1, format=PROTO +METADATA_JSON = '["BA=="]' + +# Paste the full entities JSON-array string from the row here (300 entity ids). +ENTITIES_JSON = ( + '["129858133","172683451","40159726","163432064","157307877","127153263",' + '"150494430","91634625","105262423","143362971","152885639","141723254",' + '"67612028","90190259","90799634","37254681","69442240","12704667",' + '"182883452","101545685","92571270","109083227","122270821","189312092",' + '"147408625","88849258","105146149","157307878","178517767","68884198",' + '"124324363","183029599","101509512","187296531","185250355","105290811",' + '"94130043","110902288","162834270","46188585","193161","187058552",' + '"143662941","182577489","178489609","78387570","165354806","23923038",' + '"124383285","102178839","187855725","67999990","112389083","53622608",' + '"157307876","40322544","193681","87261452","91976737","58163190",' + '"182561787","159505400","174790442","162708871","125787271","189130322",' + '"133152244","154338081","78656868","18307540","185845733","165178985",' + '"105045470","115278211","168328734","127711153","47093013","173028789",' + '"152891706","150483687","119108032","148530689","137907125","189735602",' + '"140606894","55894553","113416713","134191955","137961683","108117958",' + '"126322517","81413078","88758009","76830400","188805968","162409042",' + '"101610920","129403638","14712806","114253030","188269192","15729965",' + '"77084232","123680130","105910520","133951061","116614716","182882195",' + '"81376971","150810651","148564784","138414869","11645744","32447000",' + '"114509225","110751353","60133034","95408000","100316387","125457803",' + '"105186243","138102163","87461454","110808182","187527955","174901426",' + '"112985436","158026050","176144041","125544584","90201083","9105405",' + '"57338961","23055087","90191915","178738841","125250712","151895413",' + '"112335263","163340375","161219586","182447297","124901483","182847433",' + '"149289846","108108708","96851092","116066124","148109044","92858017",' + '"81310699","80426512","123293294","105146150","130108227","166968286",' + '"189302536","120632248","181328675","162162514","55835366","120336101",' + '"164118551","169249718","96818909","119434054","113454362","156102338",' + '"124943412","184408554","99651509","120571620","124904974","161748042",' + '"92858620","40159729","14770797","136077639","67658808","90035215",' + '"72921911","118273836","133708453","137496876","106377352","81906055",' + '"158064774","22371023","131621161","132315814","160010257","9640992",' + '"75121762","55205272","105838887","127970505","96546979","184455344",' + '"178084048","42691306","117911911","154907429","101610459","120345620",' + '"187298515","90519128","182727359","101497968","13157465","80803175",' + '"188408895","67656425","91403106","155610407","142739602","185836448",' + '"159831050","138611069","103394964","134702173","144832430","645632",' + '"166130025","174766204","123236695","18745107","62959962","127979490",' + '"101508311","103660271","177931974","67610484","181645149","137370294",' + '"40159730","129572066","127117824","127212003","43596020","70468712",' + '"129395854","78664565","161719437","134874694","180101337","113065467",' + '"117541611","67655129","99498602","536757","45502433","114285885",' + '"77234578","40964091","183387920","90519125","73522956","139548067",' + '"115084443","108526129","40322281","172987169","138291358","125458478",' + '"129140995","175694187","40988876","187451321","41704353","90930750",' + '"117304498","112026524","120715082","181587439","129737618","120534472",' + '"123654440","156204061","95892707","186627409","89870249","156174988",' + '"158606418","73774749","184531000","41005967","151464858","123351724",' + '"129180516","134941738","50414200","12805166","156839738","151884108",' + '"92552214","118409718","49832509","161418189","108108982","97081458"]' +) + +# Paste the full features JSON-array string here. This is the cell content +# from the `features` column, exactly as stored. Truncated here for brevity; +# replace with the full value when running. +FEATURES_JSON = ( + '[{"encoded_features":""},' + ' {"encoded_features":""},' + ' ... 300 entries total ...' + ']' +) + + +# --------------------------------------------------------------------------- +# 2. Full feature schema (256 features, version=1 for this mp_config_id) +# --------------------------------------------------------------------------- + +SCHEMA = { + "data": [ + {"feature_name": "user_scat:derived_fp32:orders_by_clicks_laplace_56day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user_scat:derived_fp32:clicks_by_views_laplace_3day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user_scat:derived_fp32:clicks_by_views_laplace_7day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user_scat:derived_fp32:log_clicks_14day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user_scat:derived_fp32:log_clicks_56day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user_scat:derived_fp32:log_clicks_7day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user_scat:derived_fp32:log_views_7day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_clp:derived_fp32:clicks_by_views_3_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_clp:derived_fp32:clicks_by_views_7_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_clp:derived_fp32:orders_by_views_28_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_clp:derived_fp32:orders_by_clicks_28_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_clp:derived_fp32:orders_by_views_7_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_clp:derived_fp32:clicks_3_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_clp:derived_fp32:orders_28_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_clp:derived_fp32:orders_3_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_clp:derived_fp32:orders_7_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:ads_interactions_timeseries_transforms_orders_7day_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:catalog__nqp_90_days", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:clicks_by_views_28_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:low_risk_user_orders_percentage", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:od_more_than_75p_user_api_user_orders_percentage_90day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:vrs_boosting_factor", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:ads_interactions_timeseries_transforms_clicks_28day_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:ads_interactions_timeseries_transforms_orders_3day_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:clicks_28_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:clicks_3_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:clicks_7_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:log_reviews", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:low_risk_user_orders_percentage_90day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:ads_interactions_timeseries_transforms_clicks_3day_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:catalog__per_qr_return", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:orders_by_clicks_28_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:price_discount_percent", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:views_7_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:ads_interactions_timeseries_transforms_views_28day_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:avg_rating", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:catalog__per_return", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:orders_7_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:orders_by_clicks_7_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:nqd_boosting_factor_gbm_model_v0", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:nqp_by_nqd", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:od_20_plus_order_users_orders_percentage_90day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:price_decrease_percent_decay", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:ads_interactions_timeseries_transforms_views_3day_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:catalog__ads_orders_by_clicks_28_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:loyalty_boosting_factor", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:nqp", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:orders_28_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:views_3_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:ads_interactions_timeseries_transforms_orders_28day_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:catalog__ads_clicks_by_views_7_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:catalog__ads_orders_by_clicks_7_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:clicks_by_views_7_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:net_order_by_gross_order_smoothened", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:nqp_by_nqd_90day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:views_28_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:ads_interactions_timeseries_transforms_clicks_7day_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:ads_interactions_timeseries_transforms_views_7day_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:catalog__ads_clicks_by_views_28_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:catalog__mean_price_90_days", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_fp32:orders_3_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog:derived_int32:catalog__is_non_gst", "feature_type": "DataTypeInt32", "feature_size": 1}, + {"feature_name": "catalog:derived_int32:catalog__price_arp", "feature_type": "DataTypeInt32", "feature_size": 1}, + {"feature_name": "catalog:derived_int32:fds_attributes_raw_age", "feature_type": "DataTypeInt32", "feature_size": 1}, + {"feature_name": "catalog:derived_int32:is_mall", "feature_type": "DataTypeInt32", "feature_size": 1}, + {"feature_name": "catalog:derived_int32:od_less_than_25p_user_aov_user_orders_90day", "feature_type": "DataTypeInt32", "feature_size": 1}, + {"feature_name": "catalog:derived_int32:price_shipping", "feature_type": "DataTypeInt32", "feature_size": 1}, + {"feature_name": "catalog:derived_int32:top_supplier_id", "feature_type": "DataTypeInt32", "feature_size": 1}, + {"feature_name": "catalog:derived_int32:arp", "feature_type": "DataTypeInt32", "feature_size": 1}, + {"feature_name": "catalog:derived_string:portfolio_name", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "catalog:derived_string:sscat_level_attribute_value_2", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "catalog:derived_string:super_portfolio_name", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "catalog:derived_string:attribute_value_1", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "catalog:derived_string:attribute_value_2", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "catalog:derived_string:attribute_value_3", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "catalog:embeddings_fp16:search__flava_embedding_str", "feature_type": "DataTypeFP16Vector", "feature_size": 1}, + {"feature_name": "catalog:realtime_int64:cat_id", "feature_type": "DataTypeInt64", "feature_size": 1}, + {"feature_name": "catalog:realtime_int64:scat_id", "feature_type": "DataTypeInt64", "feature_size": 1}, + {"feature_name": "catalog:realtime_int64:sscat_id", "feature_type": "DataTypeInt64", "feature_size": 1}, + {"feature_name": "clp:derived_int32:unique_sscats", "feature_type": "DataTypeInt32", "feature_size": 1}, + {"feature_name": "clp:derived_string:name", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "supplier_user_quality_segment:rollup:supplier__nqd_review_media_engagement_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier_user_quality_segment:rollup:supplier__nqp_90_days_gender_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier_user_quality_segment:rollup:supplier__nqd_90_days_review_media_engagement_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier_user_quality_segment:rollup:supplier__nqp_90_days_od_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier_user_quality_segment:rollup:supplier__nqp_by_nqd_division_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier_user_quality_segment:rollup:supplier__nqp_by_nqd_review_media_engagement_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier_user_quality_segment:rollup:supplier__nqp_review_media_engagement_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier_user_quality_segment:rollup:supplier__nqp_by_nqd_90_days_od_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier_user_quality_segment:rollup:supplier__nqp_by_nqd_90_days_review_media_engagement_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier_user_quality_segment:rollup:supplier__nqp_by_nqd_first_order_age_bin_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier_user_quality_segment:rollup:supplier__avg_rating_90_days_od_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier_user_quality_segment:rollup:supplier__nqp_90_days_division_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier_user_quality_segment:rollup:supplier__nqp_90_days_review_media_engagement_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier_user_quality_segment:rollup:supplier__nqp_by_nqd_90_days_division_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier_user_quality_segment:rollup:supplier__nqp_by_nqd_90_days_gender_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier_user_quality_segment:rollup:supplier__avg_rating_90_days_review_media_engagement_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier_user_quality_segment:rollup:supplier__nqp_by_nqd_90_days_first_order_age_bin_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier_user_quality_segment:rollup:supplier__nqp_by_nqd_gender_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier_user_quality_segment:rollup:supplier__nqp_by_nqd_od_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user_cat:derived_fp32:clicks_by_views_laplace_7day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user_cat:derived_fp32:log_clicks_28day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user_cat:derived_fp32:log_clicks_56day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user_cat:derived_fp32:log_views_3day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user_cat:derived_fp32:log_views_7day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user_cat:derived_fp32:orders_by_clicks_laplace_56day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "cat:derived_string:name", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "clp_catalog:derived_fp32:clicks_by_views_laplace_3day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_catalog:derived_fp32:clicks_by_views_laplace_7day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_catalog:derived_fp32:orders_by_clicks_laplace_28day_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_catalog:derived_fp32:orders_by_clicks_laplace_3day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_catalog:derived_fp32:orders_by_clicks_laplace_7day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_catalog:derived_fp32:views_3_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_catalog:derived_fp32:orders_by_views_laplace_28day_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_catalog:derived_fp32:price_ratio", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_catalog:derived_fp32:orders_by_views_laplace_28day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_catalog:derived_fp32:clicks_3_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_catalog:derived_fp32:clicks_by_views_laplace_28day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_catalog:derived_fp32:clicks_by_views_laplace_3day_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_catalog:derived_fp32:orders_3_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_catalog:derived_fp32:orders_7_days_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_catalog:derived_fp32:orders_by_clicks_laplace_28day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_catalog:derived_fp32:clicks_by_views_laplace_28day_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_catalog:derived_fp32:clicks_by_views_laplace_7day_percentile", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_catalog:derived_fp32:orders_by_views_laplace_7day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_catalog:derived_int32:price_diff", "feature_type": "DataTypeInt32", "feature_size": 1}, + {"feature_name": "supplier:derived_2_fp32:supplier__nqp_28_days", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier:derived_2_fp32:supplier__nqp_By_nqd", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier:derived_2_fp32:supplier__nqd", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier:derived_2_fp32:supplier__nqd_28_days", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier:derived_2_fp32:supplier__nqp_By_nqd_90_days", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier:derived_2_fp32:supplier__per_qr_return", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier:derived_2_fp32:supplier__per_return", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier:derived_2_fp32:supplier__per_wfr_return", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier:derived_2_fp32:supplier__pq_return_per", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier:derived_2_fp32:supplier__nqp", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier:derived_2_fp32:supplier__nqp_90_days", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier_sscat:derived_fp32:net_orders_by_gross_orders_90day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier_sscat:derived_fp32:num_video_review_by_num_review_7day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier_sscat:derived_fp32:supplier_sscat__nqd_28_days", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier_sscat:derived_fp32:supplier_sscat__nqp_28_days", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "supplier_sscat:derived_fp32:supplier_sscat__num_nps_response_By_num_user_with_nps_response_28_days", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__nqp_90_days_gender_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__nqp_90_days_division_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__per_qr_return_division_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__per_return_first_order_age_bin_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__per_wfr_return_division_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__avg_rating_90_days_od_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__avg_rating_90_days_review_media_engagement_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__nqd_review_media_engagement_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__nqp_90_days_od_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__nqp_by_nqd_90_days_first_order_age_bin_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__nqp_by_nqd_90_days_gender_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__nqp_review_media_engagement_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__per_wfr_return_gender_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__pq_return_per_first_order_age_bin_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__nqp_by_nqd_90_days_od_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__nqp_by_nqd_division_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__nqp_by_nqd_first_order_age_bin_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__per_return_od_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__nqp_by_nqd_gender_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__wfr_return_per_gender_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__nqp_by_nqd_90_days_division_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__nqp_by_nqd_od_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__per_return_division_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__per_return_gender_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__nqd_90_days_review_media_engagement_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__per_return_review_media_engagement_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__wfr_return_per_first_order_age_bin_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__nqp_90_days_review_media_engagement_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__nqp_by_nqd_90_days_review_media_engagement_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__nqp_by_nqd_review_media_engagement_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__per_qr_return_first_order_age_bin_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__per_qr_return_gender_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__per_qr_return_review_media_engagement_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "catalog_user_quality_segment:rollup:catalog__per_wfr_return_first_order_age_bin_cohort", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_sscat:derived_fp32:view_contribution_3_days", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_sscat:derived_fp32:clp_sscat__clicks_28_days__log", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_sscat:derived_fp32:clp_sscat__clicks_7_days__log", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_sscat:derived_fp32:clp_sscat__orders_28_days__log", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_sscat:derived_fp32:clp_sscat__orders_7_days__log", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_sscat:derived_fp32:clp_sscat__views_28_days__log", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_sscat:derived_fp32:clp_sscat__views_7_days__log", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_sscat:derived_fp32:order_contribution_30_days", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_sscat:derived_int32:clicks_28day", "feature_type": "DataTypeInt32", "feature_size": 1}, + {"feature_name": "clp_sscat:derived_int32:clicks_7day", "feature_type": "DataTypeInt32", "feature_size": 1}, + {"feature_name": "clp_sscat:derived_int32:orders_28day", "feature_type": "DataTypeInt32", "feature_size": 1}, + {"feature_name": "clp_sscat:derived_int32:orders_7day", "feature_type": "DataTypeInt32", "feature_size": 1}, + {"feature_name": "clp_sscat:derived_int32:views_7day", "feature_type": "DataTypeInt32", "feature_size": 1}, + {"feature_name": "clp_sscat:derived_int64:views_28day", "feature_type": "DataTypeInt64", "feature_size": 1}, + {"feature_name": "sscat:derived_fp32:sscat__mean_price_90_days", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "sscat:derived_string:name", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "user:derived_2_int32:user__app_count", "feature_type": "DataTypeInt32", "feature_size": 1}, + {"feature_name": "user:derived_2_int32:voice_search_count", "feature_type": "DataTypeInt32", "feature_size": 1}, + {"feature_name": "user:derived_2_int32:percentile_clicks_bin_28day", "feature_type": "DataTypeInt32", "feature_size": 1}, + {"feature_name": "user:derived_2_int32:text_search_count", "feature_type": "DataTypeInt32", "feature_size": 1}, + {"feature_name": "user:derived_2_int32:total_click_gold_catalog_count_30day", "feature_type": "DataTypeInt32", "feature_size": 1}, + {"feature_name": "user:derived_2_int32:total_click_rated_catalog_count_30day", "feature_type": "DataTypeInt32", "feature_size": 1}, + {"feature_name": "user:derived_2_string:aov_bin_90day", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "user:derived_2_string:app_language", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "user:derived_2_string:net_orders_bin_30day", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "user:derived_2_string:price_index_bin_90day", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "user:derived_2_string:order_stage_bin", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "user:derived_2_string:order_stage_bin_90day", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "user:derived_2_string:pincode", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "user:derived_2_string:aov_bin", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "user:derived_2_string:first_order_bin", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "user:derived_2_string:gross_orders_bin_30day", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "user:derived_2_string:last_month_r_segment", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "user:derived_2_string:occupation", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "user:derived_2_string:region", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "user:derived_2_string:user_quality_segment", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "user:derived_2_string:gross_orders_bin", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "user:derived_2_string:install_source", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "user:derived_2_string:net_orders_bin", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "user:derived_2_string:price_index_bin", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "user:derived_2_string:user__division", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "user:derived_2_string:age_bin", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "user:derived_2_string:gender", "feature_type": "DataTypeString", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:user__avg_rating", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:avg_order_catalog_nqd_90day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:gross_orders", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:net_orders", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:avg_click_price_30day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:retention_90_days", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:user__nqp", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:avg_click_catalog_nqd_30day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:log_clicks_56day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:percentage_click_rated_catalog_count_30day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:orders_by_clicks_laplace_28day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:browse_time_last_7day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:clicks_by_views_laplace_28day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:clicks_by_views_laplace_7day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:net_order_by_gross_order", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:orders_by_clicks_laplace_56day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:percentage_click_female_catalog_7day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:user__nqd", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:user__nqp_by_nqd", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:log_clicks_7day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:log_views_56day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:mean_price_index", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:clicks_by_views_laplace_56day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:log_orders_lifetime", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:orders_by_clicks_laplace_7day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:log_clicks_14day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:clicks_by_views_laplace_14day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user:derived_2_fp32:engagement_click_percent", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user_sscat:derived_fp32:clicks_by_views_laplace_3day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user_sscat:derived_fp32:log_clicks_56day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user_sscat:derived_fp32:orders_by_clicks_laplace_56day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user_sscat:derived_fp32:log_views_7day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user_sscat:derived_fp32:nqd", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user_sscat:derived_fp32:nqp", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user_sscat:derived_fp32:clicks_by_views_laplace_7day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user_sscat:derived_fp32:log_clicks_14day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user_sscat:derived_fp32:log_clicks_28day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "user_sscat:derived_fp32:log_clicks_7day", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_query:embeddings_fp16:embedding_str", "feature_type": "DataTypeFP16Vector", "feature_size": 1}, + {"feature_name": "score", "feature_type": "DataTypeFP32", "feature_size": 1}, + {"feature_name": "clp_query_val", "feature_type": "BYTES", "feature_size": 1}, + {"feature_name": "pctr_pre:portfolio_name", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:super_portfolio_name", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:user__net_orders_bin", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:sscat_level_attribute_value_2", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:user__region", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:user__first_order_bin", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:user__aov_bin", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:user__aov_bin_90_days", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:user__order_stage_bin", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:user__order_stage_bin_90_days", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:user__price_index_bin", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:user__price_index_bin_90_days", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:user__age_bin_on_meesho_in_days__30_days", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:user__gross_orders_bin", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:user__net_orders_bin__30_days", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:user__gross_orders_bin__30_days", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:cat_id", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:user__app_language", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:user__occupation", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:user__last_month_r_segment", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:is_mall", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:catalog_level_attribute_value_1", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:catalog_level_attribute_value_2", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:catalog_level_attribute_value_3", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:catalog__is_non_gst", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:predicted_nqd", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:clp_sscat__obyc_7_days", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:clp_sscat__obyc_28_days", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:clp_sscat__cbyv_7_days", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_pre:clp_sscat__cbyv_28_days", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:portfolio_name", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:user__region", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:super_portfolio_name", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:user__first_order_bin", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:user__app_language", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:user__order_stage_bin_90_days", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:user__age_bin_on_meesho_in_days__30_days", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:user__gender", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:user__aov_bin", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:user__aov_bin_90_days", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:user__order_stage_bin", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:user__price_index_bin", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:user__price_index_bin_90_days", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:user__net_orders_bin", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:user__gross_orders_bin", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:user__net_orders_bin__30_days", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:user__gross_orders_bin__30_days", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:cat_id", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:user__install_source", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:is_mall", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:catalog_level_attribute_value_1", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:catalog_level_attribute_value_2", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:catalog_level_attribute_value_3", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:catalog__is_non_gst", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:predicted_nqd", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:clp_sscat__obyc_7_days", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:clp_sscat__obyc_28_days", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:clp_sscat__cbyv_7_days", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_pre:clp_sscat__cbyv_28_days", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pctr_score", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "pcvr_score", "feature_type": "FP16", "feature_size": 1}, + {"feature_name": "p_nqd_score", "feature_type": "FP32", "feature_size": 1}, + ] +} + + +# --------------------------------------------------------------------------- +# 3. Build the DataFrame and decode +# --------------------------------------------------------------------------- + +def main(): + spark = ( + SparkSession.builder + .appName("decode_single_row") + .getOrCreate() + ) + + row = Row( + prism_ingested_at=PRISM_INGESTED_AT, + prism_extracted_at=PRISM_EXTRACTED_AT, + created_at=CREATED_AT, + entities=ENTITIES_JSON, + features=FEATURES_JSON, + metadata=METADATA_JSON, + mp_config_id=MP_CONFIG_ID, + parent_entity=None, + tracking_id=None, + user_id=None, + year=YEAR, + month=MONTH, + day=DAY, + hour=HOUR, + ) + df = spark.createDataFrame([row]) + + print(f"input rows: {df.count()}") + print(f"schema features: {len(SCHEMA['data'])}") + + decoded = decode_mplog_proto_dataframe(df, spark, schema=SCHEMA) + + print("decoded schema:") + decoded.printSchema() + print(f"decoded rows: {decoded.count()}") + + # Quick look at the most interesting score columns + quick_cols = [ + c for c in [ + "entity_id", + "score", + "pctr_score", + "pcvr_score", + "p_nqd_score", + "catalog:realtime_int64:cat_id", + "catalog:realtime_int64:scat_id", + "catalog:realtime_int64:sscat_id", + "catalog:derived_string:portfolio_name", + ] + if c in decoded.columns + ] + if quick_cols: + decoded.select(*quick_cols).show(20, truncate=False) + + out_path = "/tmp/decoded_single_row" + decoded.write.mode("overwrite").parquet(out_path) + print(f"wrote: {out_path}") + + +if __name__ == "__main__": + main() From 494c0e7644df54a99bd5be928fb65a7be04619ce Mon Sep 17 00:00:00 2001 From: dheerajchouhan08 Date: Thu, 14 May 2026 17:54:02 +0530 Subject: [PATCH 6/7] docs: read logs from local CSV in decode_single_row.py Replaces the inline row-data placeholders with a Spark CSV reader so the script runs end-to-end against a downloaded sample (default path: /Users/dheerajchouhan/Downloads/test_new.csv). Accepts an optional positional argument for a different CSV path. The 256-feature schema for clp-organic-l2-ranker-v1-0 (version=1, PROTO) remains inlined. Output: prints schema + sample score columns and writes parquet to /tmp/decoded_single_row. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../examples/decode_single_row.py | 160 ++++++------------ 1 file changed, 52 insertions(+), 108 deletions(-) diff --git a/py-sdk/inference_logging_client/examples/decode_single_row.py b/py-sdk/inference_logging_client/examples/decode_single_row.py index 9709f257..6013015c 100644 --- a/py-sdk/inference_logging_client/examples/decode_single_row.py +++ b/py-sdk/inference_logging_client/examples/decode_single_row.py @@ -1,107 +1,33 @@ """ -Standalone script that decodes ONE inference-log row using -`decode_mplog_proto_dataframe` with the full 256-feature schema. +Standalone script that decodes inference-log rows from a local CSV using +`decode_mplog_proto_dataframe` with the full 256-feature schema for +`clp-organic-l2-ranker-v1-0` (version=1, PROTO format). -Run on Databricks (or any pyspark environment with the package installed): +Run locally: pip install inference-logging-client==0.3.4 zstandard pyspark - python decode_single_row.py + python decode_single_row.py [path/to/logs.csv] -The script: - 1. Builds a one-row Spark DataFrame with the exact `entities`, `features`, - and `metadata` strings provided. - 2. Calls `decode_mplog_proto_dataframe(df, spark, schema=SCHEMA)`. - 3. Prints the decoded DataFrame (one row per entity) and writes parquet - to `/tmp/decoded_single_row/`. +If no path is given, defaults to /Users/dheerajchouhan/Downloads/test_new.csv. -If the `features` JSON or `entities` JSON is very long, paste the full -strings into the two placeholders below (FEATURES_JSON and ENTITIES_JSON). +Output: + - prints schema, row counts, and a sample of score columns + - writes parquet to /tmp/decoded_single_row/ """ -from pyspark.sql import Row, SparkSession +import sys + +from pyspark.sql import SparkSession +from pyspark.sql.types import LongType, StringType, StructField, StructType from inference_logging_client import decode_mplog_proto_dataframe # --------------------------------------------------------------------------- -# 1. Row data +# 1. Input CSV path # --------------------------------------------------------------------------- -PRISM_INGESTED_AT = 1778093134932 -PRISM_EXTRACTED_AT = 1778093172000 -CREATED_AT = "2026-05-07T00:15:34.000+05:30" - -# Fill in the actual mp_config_id for this row (pass-through column). -MP_CONFIG_ID = "clp-organic-l2-ranker-v1-0" - -YEAR, MONTH, DAY, HOUR = "2026", "05", "07", "00" - -# Metadata byte = 0x04 -> compression=False, version=1, format=PROTO -METADATA_JSON = '["BA=="]' - -# Paste the full entities JSON-array string from the row here (300 entity ids). -ENTITIES_JSON = ( - '["129858133","172683451","40159726","163432064","157307877","127153263",' - '"150494430","91634625","105262423","143362971","152885639","141723254",' - '"67612028","90190259","90799634","37254681","69442240","12704667",' - '"182883452","101545685","92571270","109083227","122270821","189312092",' - '"147408625","88849258","105146149","157307878","178517767","68884198",' - '"124324363","183029599","101509512","187296531","185250355","105290811",' - '"94130043","110902288","162834270","46188585","193161","187058552",' - '"143662941","182577489","178489609","78387570","165354806","23923038",' - '"124383285","102178839","187855725","67999990","112389083","53622608",' - '"157307876","40322544","193681","87261452","91976737","58163190",' - '"182561787","159505400","174790442","162708871","125787271","189130322",' - '"133152244","154338081","78656868","18307540","185845733","165178985",' - '"105045470","115278211","168328734","127711153","47093013","173028789",' - '"152891706","150483687","119108032","148530689","137907125","189735602",' - '"140606894","55894553","113416713","134191955","137961683","108117958",' - '"126322517","81413078","88758009","76830400","188805968","162409042",' - '"101610920","129403638","14712806","114253030","188269192","15729965",' - '"77084232","123680130","105910520","133951061","116614716","182882195",' - '"81376971","150810651","148564784","138414869","11645744","32447000",' - '"114509225","110751353","60133034","95408000","100316387","125457803",' - '"105186243","138102163","87461454","110808182","187527955","174901426",' - '"112985436","158026050","176144041","125544584","90201083","9105405",' - '"57338961","23055087","90191915","178738841","125250712","151895413",' - '"112335263","163340375","161219586","182447297","124901483","182847433",' - '"149289846","108108708","96851092","116066124","148109044","92858017",' - '"81310699","80426512","123293294","105146150","130108227","166968286",' - '"189302536","120632248","181328675","162162514","55835366","120336101",' - '"164118551","169249718","96818909","119434054","113454362","156102338",' - '"124943412","184408554","99651509","120571620","124904974","161748042",' - '"92858620","40159729","14770797","136077639","67658808","90035215",' - '"72921911","118273836","133708453","137496876","106377352","81906055",' - '"158064774","22371023","131621161","132315814","160010257","9640992",' - '"75121762","55205272","105838887","127970505","96546979","184455344",' - '"178084048","42691306","117911911","154907429","101610459","120345620",' - '"187298515","90519128","182727359","101497968","13157465","80803175",' - '"188408895","67656425","91403106","155610407","142739602","185836448",' - '"159831050","138611069","103394964","134702173","144832430","645632",' - '"166130025","174766204","123236695","18745107","62959962","127979490",' - '"101508311","103660271","177931974","67610484","181645149","137370294",' - '"40159730","129572066","127117824","127212003","43596020","70468712",' - '"129395854","78664565","161719437","134874694","180101337","113065467",' - '"117541611","67655129","99498602","536757","45502433","114285885",' - '"77234578","40964091","183387920","90519125","73522956","139548067",' - '"115084443","108526129","40322281","172987169","138291358","125458478",' - '"129140995","175694187","40988876","187451321","41704353","90930750",' - '"117304498","112026524","120715082","181587439","129737618","120534472",' - '"123654440","156204061","95892707","186627409","89870249","156174988",' - '"158606418","73774749","184531000","41005967","151464858","123351724",' - '"129180516","134941738","50414200","12805166","156839738","151884108",' - '"92552214","118409718","49832509","161418189","108108982","97081458"]' -) - -# Paste the full features JSON-array string here. This is the cell content -# from the `features` column, exactly as stored. Truncated here for brevity; -# replace with the full value when running. -FEATURES_JSON = ( - '[{"encoded_features":""},' - ' {"encoded_features":""},' - ' ... 300 entries total ...' - ']' -) +DEFAULT_CSV_PATH = "/Users/dheerajchouhan/Downloads/test_new.csv" # --------------------------------------------------------------------------- @@ -437,44 +363,62 @@ # --------------------------------------------------------------------------- -# 3. Build the DataFrame and decode +# 3. Build the DataFrame from CSV and decode # --------------------------------------------------------------------------- +CSV_SCHEMA = StructType([ + StructField("prism_ingested_at", LongType(), True), + StructField("prism_extracted_at", LongType(), True), + StructField("created_at", StringType(), True), + StructField("entities", StringType(), True), + StructField("features", StringType(), True), + StructField("metadata", StringType(), True), + StructField("mp_config_id", StringType(), True), + StructField("parent_entity", StringType(), True), + StructField("tracking_id", StringType(), True), + StructField("user_id", StringType(), True), + StructField("year", StringType(), True), + StructField("month", StringType(), True), + StructField("day", StringType(), True), + StructField("hour", StringType(), True), +]) + + def main(): + csv_path = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_CSV_PATH + spark = ( SparkSession.builder .appName("decode_single_row") + .config("spark.sql.execution.arrow.pyspark.enabled", "true") .getOrCreate() ) - row = Row( - prism_ingested_at=PRISM_INGESTED_AT, - prism_extracted_at=PRISM_EXTRACTED_AT, - created_at=CREATED_AT, - entities=ENTITIES_JSON, - features=FEATURES_JSON, - metadata=METADATA_JSON, - mp_config_id=MP_CONFIG_ID, - parent_entity=None, - tracking_id=None, - user_id=None, - year=YEAR, - month=MONTH, - day=DAY, - hour=HOUR, + # multiLine=true because the JSON cells contain embedded commas/quotes. + df = ( + spark.read + .option("header", "true") + .option("multiLine", "true") + .option("escape", '"') + .schema(CSV_SCHEMA) + .csv(csv_path) ) - df = spark.createDataFrame([row]) - print(f"input rows: {df.count()}") + n_in = df.count() + print(f"input csv: {csv_path}") + print(f"input rows: {n_in}") print(f"schema features: {len(SCHEMA['data'])}") + if n_in == 0: + print("no rows in csv, exiting") + return + decoded = decode_mplog_proto_dataframe(df, spark, schema=SCHEMA) print("decoded schema:") decoded.printSchema() print(f"decoded rows: {decoded.count()}") - # Quick look at the most interesting score columns quick_cols = [ c for c in [ "entity_id", From 4a7a4926107b13fb19b7e72dd0626387921b6d40 Mon Sep 17 00:00:00 2001 From: dheerajchouhan08 Date: Thu, 14 May 2026 18:17:20 +0530 Subject: [PATCH 7/7] feat: add decode_mplog_proto_csv for Spark-free CSV decode New entry point `decode_mplog_proto_csv(input_csv, output_csv, schema)` that reads an inference-log CSV, decodes each row's encoded entities using the caller-supplied PROTO schema, and writes one decoded row per entity to a new CSV. No Spark, no pyarrow, no inference-service contact -- pure-Python on csv/json/base64 plus the existing proto decoder. Handles multi-MB features cells by lifting csv.field_size_limit. Also adds examples/decode_csv_to_csv.py demonstrating end-to-end usage against the 256-feature clp-organic-l2-ranker-v1-0 schema. Verified locally: 5 input rows -> 647 decoded rows, with correct entity_id, score/pctr_score/pcvr_score/p_nqd_score, int64 cat/scat ids, and string portfolio names. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../examples/decode_csv_to_csv.py | 43 +++++ .../inference_logging_client/__init__.py | 176 ++++++++++++++++++ 2 files changed, 219 insertions(+) create mode 100644 py-sdk/inference_logging_client/examples/decode_csv_to_csv.py diff --git a/py-sdk/inference_logging_client/examples/decode_csv_to_csv.py b/py-sdk/inference_logging_client/examples/decode_csv_to_csv.py new file mode 100644 index 00000000..08c0d945 --- /dev/null +++ b/py-sdk/inference_logging_client/examples/decode_csv_to_csv.py @@ -0,0 +1,43 @@ +""" +Decode an inference-log CSV directly to another CSV using the caller-supplied +schema. No Spark required; pure-Python (csv + json + base64 + the proto decoder). + +Usage: + python decode_csv_to_csv.py [input.csv] [output.csv] + +Defaults: + input = /Users/dheerajchouhan/Downloads/test_new.csv + output = /tmp/decoded_test_new.csv +""" + +import sys + +from inference_logging_client import decode_mplog_proto_csv + +# Import the full 256-feature schema from the sibling script. +from decode_single_row import SCHEMA + + +DEFAULT_INPUT = "/Users/dheerajchouhan/Downloads/test_new.csv" +DEFAULT_OUTPUT = "/tmp/decoded_test_new.csv" + + +def main(): + input_csv = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_INPUT + output_csv = sys.argv[2] if len(sys.argv) > 2 else DEFAULT_OUTPUT + + print(f"input : {input_csv}") + print(f"output : {output_csv}") + print(f"schema : {len(SCHEMA['data'])} features") + + n = decode_mplog_proto_csv( + input_csv=input_csv, + output_csv=output_csv, + schema=SCHEMA, + ) + + print(f"decoded rows written: {n}") + + +if __name__ == "__main__": + main() diff --git a/py-sdk/inference_logging_client/inference_logging_client/__init__.py b/py-sdk/inference_logging_client/inference_logging_client/__init__.py index 1dd010f2..2658ec66 100644 --- a/py-sdk/inference_logging_client/inference_logging_client/__init__.py +++ b/py-sdk/inference_logging_client/inference_logging_client/__init__.py @@ -57,6 +57,7 @@ "decode_mplog", "decode_mplog_dataframe", "decode_mplog_proto_dataframe", + "decode_mplog_proto_csv", "get_mplog_metadata", "get_feature_schema", "clear_schema_cache", @@ -850,3 +851,178 @@ def _decode_batch(iterator): feature_cols = [c for c in result_columns if c not in metadata_cols] column_order = metadata_cols + feature_cols return result_df.select(column_order) + + +def decode_mplog_proto_csv( + input_csv: str, + output_csv: str, + schema, + decompress: bool = True, + features_column: str = "features", + mp_config_id_column: str = "mp_config_id", + needed_columns: Optional[Collection[str]] = None, +) -> int: + """ + Decode an MPLog CSV file directly to another CSV, without Spark. + + Reads the input CSV row-by-row, decodes each row's encoded entities using + the caller-supplied PROTO schema, and writes one decoded row per entity to + output_csv. Pure-Python; uses only csv/json/base64 + decode_proto_features. + + Expected input columns: features, mp_config_id, optionally entities, + parent_entity, and the row-metadata columns (prism_ingested_at, etc). + + Args: + input_csv: Path to the input CSV. + output_csv: Path where the decoded CSV will be written. + schema: Same shapes accepted by decode_mplog_proto_dataframe: + list[FeatureInfo], list[dict], or {"data": [...]}. + decompress: Attempt zstd decompression per encoded payload. + features_column: Column with the encoded features JSON. + mp_config_id_column: Pass-through column name. + needed_columns: Optional set of feature names to keep. + + Returns: + Number of decoded rows written. + """ + import base64 + import csv as _csv + import json + import sys as _sys + + # MPLog features cells can be multi-MB; lift the csv field-size cap. + try: + _csv.field_size_limit(_sys.maxsize) + except OverflowError: + _csv.field_size_limit(2**31 - 1) + + schema_list = _normalize_schema(schema) + needed_set = set(needed_columns) if needed_columns is not None else None + + feature_names = [f.name for f in schema_list] + if needed_set is not None: + feature_names = [n for n in feature_names if n in needed_set] + + row_metadata_columns = [ + "prism_ingested_at", + "prism_extracted_at", + "created_at", + "mp_config_id", + "parent_entity", + "tracking_id", + "user_id", + "year", + "month", + "day", + "hour", + ] + + with open(input_csv, "r", newline="", encoding="utf-8") as f_in: + reader = _csv.DictReader(f_in) + if reader.fieldnames is None: + raise ValueError(f"Input CSV {input_csv} has no header row") + input_columns = set(reader.fieldnames) + + if features_column not in input_columns: + raise ValueError(f"Missing required column: {features_column}") + + present_metadata_cols = [c for c in row_metadata_columns if c in input_columns] + out_columns = ["entity_id"] + present_metadata_cols + sorted(feature_names) + + n_written = 0 + with open(output_csv, "w", newline="", encoding="utf-8") as f_out: + writer = _csv.DictWriter(f_out, fieldnames=out_columns, extrasaction="ignore") + writer.writeheader() + + for row in reader: + features_data = row.get(features_column) + if not features_data: + continue + try: + features_list = json.loads(features_data) + except (json.JSONDecodeError, ValueError, TypeError): + continue + if not isinstance(features_list, list): + continue + + entities_val = None + if "entities" in input_columns: + entities_raw = row.get("entities") + if entities_raw: + try: + parsed = json.loads(entities_raw) + entities_val = parsed if isinstance(parsed, list) else [parsed] + except (json.JSONDecodeError, ValueError): + entities_val = [entities_raw] + + parent_entity_val = None + if "parent_entity" in input_columns: + parent_raw = row.get("parent_entity") + if parent_raw: + try: + parsed = json.loads(parent_raw) + if isinstance(parsed, list): + parent_entity_val = ( + parsed[0] if len(parsed) == 1 + else str(parsed) if len(parsed) > 1 + else None + ) + else: + parent_entity_val = parsed + except (json.JSONDecodeError, ValueError): + parent_entity_val = parent_raw + + base_metadata = {c: row.get(c) for c in present_metadata_cols} + + for i, feature_item in enumerate(features_list): + if not isinstance(feature_item, dict): + continue + encoded_b64 = feature_item.get("encoded_features", "") + if not encoded_b64: + continue + try: + encoded_bytes = base64.b64decode(encoded_b64) + except (ValueError, TypeError): + continue + if not encoded_bytes: + continue + + working_data = encoded_bytes + if decompress: + try: + working_data = _decompress_zstd(encoded_bytes) + except Exception: + continue + + try: + decoded = decode_proto_features( + working_data, schema_list, needed_columns=needed_set + ) + except Exception: + continue + + entity_id = ( + str(entities_val[i]) + if entities_val and i < len(entities_val) + else f"entity_{i}" + ) + + out_row = {"entity_id": entity_id} + out_row.update(base_metadata) + if parent_entity_val is not None and "parent_entity" in present_metadata_cols: + out_row["parent_entity"] = parent_entity_val + for k, v in decoded.items(): + if needed_set is not None and k not in needed_set: + continue + if v is None: + out_row[k] = "" + elif isinstance(v, (list, tuple)): + out_row[k] = str(v) + elif isinstance(v, bytes): + out_row[k] = v.hex() + else: + out_row[k] = v + writer.writerow(out_row) + n_written += 1 + + return n_written