From 1301e007a23ff5d9c54cb88370e20f5e37c481f5 Mon Sep 17 00:00:00 2001 From: chi-yang-db Date: Tue, 7 Oct 2025 17:33:47 -0700 Subject: [PATCH 01/15] Initial commit for file push --- contrib/templates/file-push/README.md | 10 ++ .../file-push/databricks_template_schema.json | 22 +++ .../README.md.tmpl | 167 ++++++++++++++++++ .../databricks.yml.tmpl | 36 ++++ .../resources/job.yml | 46 +++++ .../resources/pipeline.yml | 15 ++ .../resources/schema.yml | 7 + .../resources/volume.yml | 8 + .../src/configs/tables.json | 6 + .../src/debug_table_config.py | 63 +++++++ .../src/ingestion.py | 35 ++++ .../src/utils/envmanager.py | 38 ++++ .../src/utils/formatmanager.py | 105 +++++++++++ .../src/utils/initialization.py | 69 ++++++++ .../src/utils/tablemanager.py | 98 ++++++++++ 15 files changed, 725 insertions(+) create mode 100644 contrib/templates/file-push/README.md create mode 100644 contrib/templates/file-push/databricks_template_schema.json create mode 100644 contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/README.md.tmpl create mode 100644 contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/databricks.yml.tmpl create mode 100644 contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/job.yml create mode 100644 contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/pipeline.yml create mode 100644 contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/schema.yml create mode 100644 contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/volume.yml create mode 100644 contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/configs/tables.json create mode 100644 contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/debug_table_config.py create mode 100644 contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/ingestion.py create mode 100644 contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/envmanager.py create mode 100644 contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py create mode 100644 contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/initialization.py create mode 100644 contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/tablemanager.py diff --git a/contrib/templates/file-push/README.md b/contrib/templates/file-push/README.md new file mode 100644 index 00000000..f8d179b3 --- /dev/null +++ b/contrib/templates/file-push/README.md @@ -0,0 +1,10 @@ +# file-push + +This is an (experimental) template for creating a file push pipeline with Databricks Asset Bundles. + +Install it using +``` +databricks bundle init --template-dir contrib/templates/file-push https://github.com/databricks/bundle-examples +``` + +and follow the generated README.md to get started. \ No newline at end of file diff --git a/contrib/templates/file-push/databricks_template_schema.json b/contrib/templates/file-push/databricks_template_schema.json new file mode 100644 index 00000000..7ca7ee56 --- /dev/null +++ b/contrib/templates/file-push/databricks_template_schema.json @@ -0,0 +1,22 @@ +{ + "welcome_message": "\nWelcome to the file-push template for Databricks Asset Bundles!\n\nA workspace was selected based on your current profile. For information about how to change this, see https://docs.databricks.com/dev-tools/cli/profiles.html.\nworkspace_host: {{workspace_host}}", + "properties": { + "catalog_name": { + "type": "string", + "description": "\nPlease provide the name of an EXISTING UC catalog with default storage enabled.\nCatalog Name", + "order": 1, + "default": "main", + "pattern": "^[a-z_][a-z0-9_]{0,254}$", + "pattern_match_failure_message": "Name must only consist of letters, numbers, and underscores." + }, + "schema_name": { + "type": "string", + "description": "\nPlease provide a NEW schema name where the pipelines and tables will land in.\nSchema Name", + "order": 2, + "default": "filepushschema", + "pattern": "^[a-z_][a-z0-9_]{0,254}$", + "pattern_match_failure_message": "Name must only consist of letters, numbers, dashes, and underscores." + } + }, + "success_message": "\nBundle folder '{{.catalog_name}}.{{.schema_name}}' has been created. Please refer to the README.md for next steps." +} \ No newline at end of file diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/README.md.tmpl b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/README.md.tmpl new file mode 100644 index 00000000..f9b18a20 --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/README.md.tmpl @@ -0,0 +1,167 @@ +--- +title: "File Push" +language: python +author: "Chi Yang" +date: 2025-08-07 + +tags: +- ingestion +- file +- nocode +--- + +# File Push + +A lightweight, no‑code file ingestion workflow. Configure a set of tables, get a volume path for each, and drop files into those paths—your data lands in Unity Catalog tables via Auto Loader and Lakeflow Pipeline. + +## Table of Contents +- [Quick Start](#quick-start) + - [Step 1. Configure tables](#step-1-configure-tables) + - [Step 2. Deploy & set up](#step-2-deploy--set-up) + - [Step 3. Retrieve endpoint & push files](#step-3-retrieve-endpoint--push-files) +- [Debug Table Issues](#debug-table-issues) + - [Step 1. Configure tables to debug](#step-1-configure-tables-to-debug) + - [Step 2. Deploy & set up in dev mode](#step-2-deploy--set-up-in-dev-mode) + - [Step 3. Retrieve endpoint & push files to debug](#step-3-retrieve-endpoint--push-files-to-debug) + - [Step 4. Debug table configs](#step-4-debug-table-configs) + - [Step 5. Fix the table configs in production](#step-5-fix-the-table-configs-in-production) + +--- + +## Quick Start + +### Step 1. Configure tables +Edit table configs in `./src/configs/tables.json`. Only `name` and `format` are required. + +For supported `format_options`, see the [Auto Loader options](https://docs.databricks.com/aws/en/ingestion/cloud-object-storage/auto-loader/options). Not all options are supported here. If unsure, specify only `name` and `format`, or follow [Debug Table Issues](#debug-table-issues) to discover the correct options. + +```json +[ + { + "name": "table1", + "format": "csv", + "format_options": + { + "escape": "\"" + }, + "schema_hints": "id int, name string" + }, + { + "name": "table2", + "format": "json" + } +] +``` + +> **Tip:** Keep `schema_hints` minimal; Auto Loader can evolve the schema as new columns appear. + +### Step 2. Deploy & set up + +```bash +databricks bundle deploy +databricks bundle run configuration_job +``` + +Wait for the configuration job to finish before moving on. + +### Step 3. Retrieve endpoint & push files +Fetch the volume path for uploading files to a specific table (example: `table1`): + +```bash +databricks tables get {{.catalog_name}}.{{.schema_name}}.table1 --output json \ + | jq -r '.properties["filepush.table_volume_path_data"]' +``` + +Example output: + +```text +/Volumes/{{.catalog_name}}/{{.schema_name}}/{{.catalog_name}}_{{.schema_name}}_filepush_volume/data/table1 +``` + +Upload files to the path above using any of the [Volumes file APIs](https://docs.databricks.com/aws/en/volumes/volume-files#methods-for-managing-files-in-volumes). + +**Databricks CLI example** (destination uses the `dbfs:` scheme): + +```bash +databricks fs cp /local/file/path/datafile1.csv \ + dbfs:/Volumes/{{.catalog_name}}/{{.schema_name}}/{{.catalog_name}}_{{.schema_name}}_filepush_volume/data/table1 +``` + +**REST API example**: + +```bash +# prerequisites: export DATABRICKS_HOST and DATABRICKS_TOKEN (PAT token) +curl -X PUT "$DATABRICKS_HOST/api/2.0/fs/files/Volumes/{{.catalog_name}}/{{.schema_name}}/{{.catalog_name}}_{{.schema_name}}_filepush_volume/data/table1/datafile1.csv" \ + -H "Authorization: Bearer $DATABRICKS_TOKEN" \ + -H "Content-Type: application/octet-stream" \ + --data-binary @"/local/file/path/datafile1.csv" +``` + +Within about a minute, the data should appear in the table `{{.catalog_name}}.{{.schema_name}}.table1`. + +--- + +## Debug Table Issues +If data isn’t parsed as expected, use **dev mode** to iterate on table options safely. + +### Step 1. Configure tables to debug +Configure tables as in [Step 1 of Quick Start](#step-1-configure-tables). + +### Step 2. Deploy & set up in **dev mode** + +```bash +databricks bundle deploy -t dev +databricks bundle run configuration_job -t dev +``` + +Wait for the configuration job to finish. Example output: + +```text +2025-09-23 22:03:04,938 [INFO] initialization - ========== +catalog_name: {{.catalog_name}} +schema_name: dev_chi_yang_{{.schema_name}} +volume_path_root: /Volumes/{{.catalog_name}}/dev_chi_yang_{{.schema_name}}/{{.catalog_name}}_{{.schema_name}}_filepush_volume +volume_path_data: /Volumes/{{.catalog_name}}/dev_chi_yang_{{.schema_name}}/{{.catalog_name}}_{{.schema_name}}_filepush_volume/data +volume_path_archive: /Volumes/{{.catalog_name}}/dev_chi_yang_{{.schema_name}}/{{.catalog_name}}_{{.schema_name}}_filepush_volume/archive +========== +``` + +> **Note:** In **dev mode**, the schema name is **prefixed**. Use the printed schema name for the remaining steps. + +### Step 3. Retrieve endpoint & push files to debug +Get the dev volume path (note the **prefixed schema**): + +```bash +databricks tables get {{.catalog_name}}.dev_chi_yang_{{.schema_name}}.table1 --output json \ + | jq -r '.properties["filepush.table_volume_path_data"]' +``` + +Example output: + +```text +/Volumes/{{.catalog_name}}/dev_chi_yang_{{.schema_name}}/{{.catalog_name}}_{{.schema_name}}_filepush_volume/data/table1 +``` + +Then follow the upload instructions from [Quick Start → Step 3](#step-3-retrieve-endpoint--push-files) to send test files. + +### Step 4. Debug table configs +Open the pipeline in the workspace: + +```bash +databricks bundle open refresh_pipeline -t dev +``` + +Click **Edit pipeline** to launch the development UI. Open the `debug_table_config` notebook and follow its guidance to refine the table options. When satisfied, copy the final config back to `./src/configs/tables.json`. + +### Step 5. Fix the table configs in production +Redeploy the updated config and run a full refresh to correct existing data for an affected table: + +```bash +databricks bundle deploy +databricks bundle run refresh_pipeline --full-refresh table1 +``` + +--- + +**That’s it!** You now have a managed, push-based file ingestion workflow with debuggable table configs and repeatable deployments. + diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/databricks.yml.tmpl b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/databricks.yml.tmpl new file mode 100644 index 00000000..33ca957a --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/databricks.yml.tmpl @@ -0,0 +1,36 @@ +# databricks.yml +# This is the configuration for the file push DAB dab. + +bundle: + name: dab + +include: + - resources/*.yml + +targets: + # The deployment targets. See https://docs.databricks.com/en/dev-tools/bundles/deployment-modes.html + dev: + mode: development + workspace: + host: https://e2-dogfood.staging.cloud.databricks.com + + prod: + mode: production + default: true + workspace: + host: https://e2-dogfood.staging.cloud.databricks.com + root_path: /Workspace/Users/${workspace.current_user.userName}/.bundle/${bundle.name}/${bundle.target} + permissions: + - user_name: ${workspace.current_user.userName} + level: CAN_MANAGE + +variables: + catalog_name: + description: The existing catalog where the NEW schema will be created. + default: {{.catalog_name}} + schema_name: + description: The name of the NEW schema where the tables will be created. + default: {{.schema_name}} + resource_name_prefix: + description: The prefix for the resource names. + default: ${var.catalog_name}_${var.schema_name}_ diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/job.yml b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/job.yml new file mode 100644 index 00000000..f8fdaac9 --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/job.yml @@ -0,0 +1,46 @@ +# The main job for schema dab +# This job will trigger in the schema pipeline + +resources: + jobs: + filetrigger_job: + name: ${var.resource_name_prefix}filetrigger_job + tasks: + - task_key: pipeline_refresh + pipeline_task: + pipeline_id: ${resources.pipelines.refresh_pipeline.id} + trigger: + file_arrival: + url: ${resources.volumes.filepush_volume.volume_path}/data/ + configuration_job: + name: ${var.resource_name_prefix}configuration_job + tasks: + - task_key: initialization + spark_python_task: + python_file: ../src/utils/initialization.py + parameters: + - "--catalog_name" + - "{{job.parameters.catalog_name}}" + - "--schema_name" + - "{{job.parameters.schema_name}}" + - "--volume_path_root" + - "{{job.parameters.volume_path_root}}" + - "--logging_level" + - "${bundle.target}" + environment_key: serverless + - task_key: trigger_refresh + run_job_task: + job_id: ${resources.jobs.filetrigger_job.id} + depends_on: + - task_key: initialization + environments: + - environment_key: serverless + spec: + client: "3" + parameters: + - name: catalog_name + default: ${var.catalog_name} + - name: schema_name + default: ${resources.schemas.main_schema.name} + - name: volume_path_root + default: ${resources.volumes.filepush_volume.volume_path} diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/pipeline.yml b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/pipeline.yml new file mode 100644 index 00000000..e30c4ae6 --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/pipeline.yml @@ -0,0 +1,15 @@ +# The table refresh pipeline for schema dab + +resources: + pipelines: + refresh_pipeline: + name: ${var.resource_name_prefix}refresh_pipeline + catalog: ${var.catalog_name} + schema: ${resources.schemas.main_schema.name} + serverless: true + libraries: + - file: + path: ../src/ingestion.py + root_path: ../src + configuration: + filepush.volume_path_root: ${resources.volumes.filepush_volume.volume_path} diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/schema.yml b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/schema.yml new file mode 100644 index 00000000..72500a02 --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/schema.yml @@ -0,0 +1,7 @@ +# The schema dab + +resources: + schemas: + main_schema: + name: ${var.schema_name} + catalog_name: ${var.catalog_name} diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/volume.yml b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/volume.yml new file mode 100644 index 00000000..ac8929c8 --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/volume.yml @@ -0,0 +1,8 @@ +# The file staging volume for schema dab + +resources: + volumes: + filepush_volume: + name: ${var.resource_name_prefix}filepush_volume + catalog_name: ${var.catalog_name} + schema_name: ${var.schema_name} diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/configs/tables.json b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/configs/tables.json new file mode 100644 index 00000000..a231c03f --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/configs/tables.json @@ -0,0 +1,6 @@ +[ + { + "name": "example_table", + "format": "csv" + } +] diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/debug_table_config.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/debug_table_config.py new file mode 100644 index 00000000..0d697fcd --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/debug_table_config.py @@ -0,0 +1,63 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC ## Paste the table config JSON you would like to debug from `./configs/tables.json` and assign to variable `table_config` +# MAGIC For example, +# MAGIC ``` +# MAGIC table_config = r''' +# MAGIC { +# MAGIC "name": "all_employees", +# MAGIC "format": "csv", +# MAGIC "format_options": { +# MAGIC "escape": "\"", +# MAGIC "multiLine": "false" +# MAGIC } +# MAGIC "schema_hints": "id int, name string" +# MAGIC } +# MAGIC ''' +# MAGIC ``` +# MAGIC Only `name` and `format` are required for a table. + +# COMMAND ---------- + +table_config = r''' + { + "name": "employees", + "format": "csv", + "format_options": { + "escape": "\"" + }, + "schema_hints": "id int, name string" + } +''' + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Click `Run all` and inspect the parsed result. Iterate on the config until the result looks good + +# COMMAND ---------- + +import json +import tempfile +from utils import tablemanager +from utils import envmanager + +if not envmanager.has_default_storage(): + print("WARNING: Current catalog is not using default storage, some file push feature may not be available") + +# Load table config +table_config_json = json.loads(table_config) +tablemanager.validate_config(table_config_json) +table_name = table_config_json["name"] +table_volume_path_data = tablemanager.get_table_volume_path(table_name) + +assert tablemanager.has_data_file(table_name), f"No data file found in {table_volume_path_data}. Please upload at least 1 file to {table_volume_path_data}" + +# Put schema location in temp directory +with tempfile.TemporaryDirectory() as tmpdir: + display(tablemanager.get_df_with_config(spark, table_config_json, tmpdir)) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Copy and paste the modified config back to the `./configs/tables.json` in the DAB folder \ No newline at end of file diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/ingestion.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/ingestion.py new file mode 100644 index 00000000..0ddd7f26 --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/ingestion.py @@ -0,0 +1,35 @@ +import dlt +from utils import tablemanager +from utils import formatmanager + +def _make_append_flow(table_name, table_config, table_volume_path): + def _body(): + # use _rescued_data as placeholder when no data file is present + if not tablemanager.has_data_file(table_name): + return tablemanager.get_placeholder_df_with_config(spark, table_config) + else: + return tablemanager.get_df_with_config(spark, table_config) + + # give the function a unique name + _body.__name__ = f"append_{table_name.lower()}" + + # apply the decorator programmatically + dlt.append_flow(target=table_name, name=table_name)(_body) + +table_configs = tablemanager.get_configs() + +# create the tables and append flows +for cfg in table_configs: + tablemanager.validate_config(cfg) + tbl = cfg["name"] + path = tablemanager.get_table_volume_path(tbl) + fmt = formatmanager.get_format_manager(cfg["format"]) + expts = fmt.expectations + + dlt.create_streaming_table( + name=tbl, + comment="File push created table", + table_properties={"filepush.table_volume_path_data": path}, + expect_all=expts + ) + _make_append_flow(tbl, cfg, path) diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/envmanager.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/envmanager.py new file mode 100644 index 00000000..ba822a98 --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/envmanager.py @@ -0,0 +1,38 @@ +import os +import json +from databricks.sdk import WorkspaceClient + +def get_config() -> dict: + json_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "configs", "environment.json") + if not os.path.exists(json_path): + raise RuntimeError(f"Missing environment file: {json_path}. Have you run `databricks bundle run configuration_job`?") + with open(json_path, "r") as f: + configs = json.load(f) + return configs + +def has_default_storage() -> bool: + catalog = get_config()["catalog_name"] + + w = WorkspaceClient() + + # Try SDK model first + info = w.catalogs.get(catalog) + storage_root = getattr(info, "storage_root", None) + storage_location = getattr(info, "storage_location", None) + props = getattr(info, "properties", {}) or {} + + # Some workspaces expose fields only via raw JSON; fall back if all empty + if not (storage_root or storage_location or props): + j = w.api_client.do("GET", f"/api/2.1/unity-catalog/catalogs/{catalog}") + storage_root = j.get("storage_root") or j.get("storageLocation") + storage_location = j.get("storage_location") or j.get("storageLocation") + props = j.get("properties", {}) or {} + + # Heuristics: any of these indicates “default storage” is set + return bool( + storage_root or + storage_location or + props.get("defaultManagedLocation") or + props.get("delta.defaultLocation") + ) + \ No newline at end of file diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py new file mode 100644 index 00000000..663b897a --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py @@ -0,0 +1,105 @@ +from dataclasses import dataclass +from . import envmanager + +@dataclass(frozen=True, slots=True) +class AutoLoaderOption: + key: str + value: str + hidden: bool = False + def __iter__(self): + yield (self.key, self) + +class AutoLoaderFormat: + def __init__(self): + self.name = None + self.options: set[AutoLoaderOption] = { + AutoLoaderOption("cloudFiles.inferColumnTypes", "true", True), + AutoLoaderOption("cloudFiles.schemaEvolutionMode", "addNewColumns", True), + AutoLoaderOption("cloudFiles.cleanSource", "MOVE", True), + AutoLoaderOption("cloudFiles.cleanSource.retentionDuration", "14 days", True), + AutoLoaderOption("cloudFiles.cleanSource.moveDestination", f"{envmanager.get_config()['volume_path_archive']}/{{table_name}}", True) + } + self.expectations: dict[str, str] = { + "Rescued data should be null": "_rescued_data IS NULL" + } + self.default_schema: set[str] = {"_rescued_data STRING"} + + def get_default_schema(self) -> str: + return ", ".join(self.default_schema) + + def get_userfacing_options(self) -> dict[str, str]: + return {opt.key: opt.value for opt in self.options if not opt.hidden} + + def validate_user_options(self, options: dict[str, str]) -> None: + allowed = set(self.get_userfacing_options()) + illegal = set(options) - allowed + if illegal: + raise ValueError( + f"Unsupported or protected options: {sorted(illegal)}. " + f"Allowed user options: {sorted(allowed)}" + ) + + def get_modified_options(self, options: dict[str, str]) -> dict[str, str]: + self.validate_user_options(options) + defaults = self.get_userfacing_options() + return {k: v for k, v in options.items() if k in defaults and v != defaults[k]} + + def get_merged_options(self, options: dict[str, str], table_name: str) -> dict[str, str]: + self.validate_user_options(options) + defaults = self.get_userfacing_options() + + merged = defaults.copy() + merged.update({k: v for k, v in options.items() if k in defaults}) + + # Format the moveDestination with table_name + move_dest_key = "cloudFiles.cleanSource.moveDestination" + if move_dest_key in merged: + merged[move_dest_key] = merged[move_dest_key].format(table_name=table_name) + + return merged + +class CSV(AutoLoaderFormat): + def __init__(self): + super().__init__() + self.name = "CSV" + self.options |= { + AutoLoaderOption("header", "true", True), + AutoLoaderOption("mergeSchema", "true", True), + AutoLoaderOption("mode", "PERMISSIVE", True), + AutoLoaderOption("columnNameOfCorruptRecord", "_corrupt_record", True), + AutoLoaderOption("delimiter", ","), + AutoLoaderOption("escape", "\""), + AutoLoaderOption("multiLine", "false"), + } + self.expectations |= { + "Corrupted record should be null": "_corrupt_record IS NULL" + } + self.default_schema |= {"_corrupt_record STRING"} + +class JSON(AutoLoaderFormat): + def __init__(self): + super().__init__() + self.name = "JSON" + self.options |= { + AutoLoaderOption("mergeSchema", "true", True), + AutoLoaderOption("mode", "PERMISSIVE", True), + AutoLoaderOption("columnNameOfCorruptRecord", "_corrupt_record", True), + AutoLoaderOption("allowComments", "true"), + AutoLoaderOption("allowSingleQuotes", "true"), + AutoLoaderOption("inferTimestamp", "true"), + AutoLoaderOption("multiLine", "true"), + } + self.expectations |= { + "Corrupted record should be null": "_corrupt_record IS NULL" + } + self.default_schema |= {"_corrupt_record STRING"} + +_supported_formats: dict[str, AutoLoaderFormat] = {f.name: f for f in (CSV(), JSON())} + +def get_format_manager(fmt: str) -> dict[str, str]: + key = fmt.strip().upper() + try: + return _supported_formats[key] + except KeyError: + supported = ", ".join(sorted(_supported_formats)) + raise ValueError(f"{fmt!r} is not a supported format. Supported formats: {supported}") diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/initialization.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/initialization.py new file mode 100644 index 00000000..aa106072 --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/initialization.py @@ -0,0 +1,69 @@ +from databricks.sdk import WorkspaceClient +import argparse +import json +import logging + +# Parse arguments +parser = argparse.ArgumentParser() +parser.add_argument("--catalog_name", type=str, required=True) +parser.add_argument("--schema_name", type=str, required=True) +parser.add_argument("--volume_path_root", type=str, required=True) +parser.add_argument("--logging_level", type=str, required=False, default="dev") +args = parser.parse_args() + +catalog_name = args.catalog_name +schema_name = args.schema_name +volume_path_root = args.volume_path_root +volume_path_data = args.volume_path_root + "/data" +volume_path_archive = args.volume_path_root + "/archive" +logging_level = logging.DEBUG if args.logging_level == "dev" else logging.INFO + +# Logging +logging.basicConfig( + level=logging_level, + format="%(asctime)s [%(levelname)s] %(module)s - %(message)s" +) +logger = logging.getLogger(__name__) # per-module logger + +# Initialize workspace client +ws = WorkspaceClient() + +# Set property to schema +logger.info(f"Setting property to schema {catalog_name}.{schema_name}") +logger.debug(f"Volume path root: {volume_path_root}") +logger.debug(f"Volume path data: {volume_path_data}") +ws.schemas.update(full_name=f"{catalog_name}.{schema_name}", properties={ + "filepush.volume_path_root": volume_path_root, + "filepush.volume_path_data": volume_path_data, + "filepush.volume_path_data": volume_path_archive +}) +logger.info(f"Schema {catalog_name}.{schema_name} configured") + +# Initialize volume folder structure +logger.info(f"Initializing volume folder structure {volume_path_root}") +logger.debug(f"Creating data directory {volume_path_data}") +ws.files.create_directory(volume_path_data) +logger.debug(f"Creating archive directory {volume_path_archive}") +ws.files.create_directory(volume_path_archive) +with open("../configs/tables.json", "r") as f: + for table in json.load(f): + table_volume_path_data = f"{volume_path_data}/{table['name']}" + logger.debug(f"Creating table directory {table_volume_path_data}") + ws.files.create_directory(table_volume_path_data) + table_volume_path_archive = f"{volume_path_archive}/{table['name']}" + logger.debug(f"Creating table archive directory {table_volume_path_archive}") + ws.files.create_directory(table_volume_path_archive) +logger.info(f"Volume {volume_path_root} configured") + +# Dump configs to environment json +all_configs = { + "catalog_name": catalog_name, + "schema_name": schema_name, + "volume_path_root": volume_path_root, + "volume_path_data": volume_path_data, + "volume_path_archive": volume_path_archive +} +with open("../configs/environment.json", "w") as f: + json.dump(all_configs, f) + +logger.info(f"==========\n%s\n==========", "\n".join(f"{k}: {v}" for k, v in all_configs.items())) diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/tablemanager.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/tablemanager.py new file mode 100644 index 00000000..5aaf9a55 --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/tablemanager.py @@ -0,0 +1,98 @@ +import os +import json +from . import envmanager +from . import formatmanager +from pyspark.sql.streaming import DataStreamReader +from pyspark.sql import DataFrame, SparkSession +from databricks.sdk import WorkspaceClient +from databricks.sdk.errors.platform import NotFound + +def validate_config(table_config: dict): + if not table_config.get("name"): + raise ValueError("name is required for table config") + if not table_config.get("format"): + raise ValueError("format is required for table config") + +def validate_configs(table_configs: list): + names = [cfg.get("name") for cfg in table_configs] + duplicates = set([name for name in names if names.count(name) > 1 and name is not None]) + if duplicates: + raise ValueError(f"Duplicate table names found in table configs: {sorted(duplicates)}") + for table_config in table_configs: + validate_config(table_config) + +def get_configs() -> list: + json_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "configs", "tables.json") + if not os.path.exists(json_path): + raise RuntimeError(f"Missing table configs file: {json_path}. Please following README.md to create one, deploy and run configuration_job.") + with open(json_path, "r") as f: + configs = json.load(f) + validate_configs(configs) + return configs + +def get_table_volume_path(table_name: str) -> str: + ws = WorkspaceClient() + table_volume_path_data = os.path.join(envmanager.get_config()["volume_path_data"], table_name) + try: + ws.files.get_directory_metadata(table_volume_path_data) + except NotFound: + raise RuntimeError(f"Table data path not found for table `{table_name}`. Have you run `databricks bundle run configuration_job`?") + return table_volume_path_data + +def has_data_file(table_name: str) -> bool: + ws = WorkspaceClient() + table_volume_path_data = get_table_volume_path(table_name) + try: + iter = ws.files.list_directory_contents(table_volume_path_data) + next(iter) + except StopIteration: + return False + return True + +def is_table_created(table_name: str) -> bool: + ws = WorkspaceClient() + return ws.tables.exists(full_name=f"{envmanager.get_config()['catalog_name']}.{envmanager.get_config()['schema_name']}.{table_name}").table_exists + +def _apply_table_options(reader: DataStreamReader, table_config: dict, fmt_mgr) -> DataStreamReader: + name = table_config.get("name") + fmt = table_config.get("format") + + # format options + user_fmt_opts = table_config.get("format_options", {}) + final_fmt_opts = fmt_mgr.get_merged_options(user_fmt_opts, name) + reader = reader.option("cloudFiles.format", fmt) + for k, v in final_fmt_opts.items(): + reader = reader.option(k, v) + + # schema hints + schema_hints = table_config.get("schema_hints") + if schema_hints: + reader = reader.option("cloudFiles.schemaHints", ", ".join({schema_hints} | fmt_mgr.default_schema)) + else: + reader = reader.option("cloudFiles.schemaHints", ", ".join(fmt_mgr.default_schema)) + + return reader + +def get_df_with_config(spark: SparkSession, table_config: dict, schema_location: str = None) -> DataFrame: + validate_config(table_config) + fmt = table_config.get("format") + fmt_mgr = formatmanager.get_format_manager(fmt) + + reader = spark.readStream.format("cloudFiles") + reader = _apply_table_options(reader, table_config, fmt_mgr) + if schema_location: + reader = reader.option("cloudFiles.schemaLocation", schema_location) + + # include file metadata + return reader.load(get_table_volume_path(table_config.get("name"))).selectExpr("*", "_metadata") + +def get_placeholder_df_with_config(spark: SparkSession, table_config: dict) -> DataFrame: + validate_config(table_config) + fmt = table_config.get("format") + fmt_mgr = formatmanager.get_format_manager(fmt) + + reader = spark.readStream.format("cloudFiles") + reader = _apply_table_options(reader, table_config, fmt_mgr).schema(fmt_mgr.get_default_schema()) + + return reader.load(get_table_volume_path(table_config.get("name"))) + \ No newline at end of file From 8008eed53458513ff10623de228f07193ffdda7b Mon Sep 17 00:00:00 2001 From: chi-yang-db Date: Wed, 8 Oct 2025 10:35:59 -0700 Subject: [PATCH 02/15] Update doc and inject version number to pipeline --- .../file-push/databricks_template_schema.json | 2 +- .../README.md.tmpl | 14 +++++++------- .../resources/pipeline.yml | 1 + 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/contrib/templates/file-push/databricks_template_schema.json b/contrib/templates/file-push/databricks_template_schema.json index 7ca7ee56..9022c2e1 100644 --- a/contrib/templates/file-push/databricks_template_schema.json +++ b/contrib/templates/file-push/databricks_template_schema.json @@ -19,4 +19,4 @@ } }, "success_message": "\nBundle folder '{{.catalog_name}}.{{.schema_name}}' has been created. Please refer to the README.md for next steps." -} \ No newline at end of file +} diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/README.md.tmpl b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/README.md.tmpl index f9b18a20..dbf27fea 100644 --- a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/README.md.tmpl +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/README.md.tmpl @@ -119,10 +119,10 @@ Wait for the configuration job to finish. Example output: ```text 2025-09-23 22:03:04,938 [INFO] initialization - ========== catalog_name: {{.catalog_name}} -schema_name: dev_chi_yang_{{.schema_name}} -volume_path_root: /Volumes/{{.catalog_name}}/dev_chi_yang_{{.schema_name}}/{{.catalog_name}}_{{.schema_name}}_filepush_volume -volume_path_data: /Volumes/{{.catalog_name}}/dev_chi_yang_{{.schema_name}}/{{.catalog_name}}_{{.schema_name}}_filepush_volume/data -volume_path_archive: /Volumes/{{.catalog_name}}/dev_chi_yang_{{.schema_name}}/{{.catalog_name}}_{{.schema_name}}_filepush_volume/archive +schema_name: dev_first_last_{{.schema_name}} +volume_path_root: /Volumes/{{.catalog_name}}/dev_first_last_{{.schema_name}}/{{.catalog_name}}_{{.schema_name}}_filepush_volume +volume_path_data: /Volumes/{{.catalog_name}}/dev_first_last_{{.schema_name}}/{{.catalog_name}}_{{.schema_name}}_filepush_volume/data +volume_path_archive: /Volumes/{{.catalog_name}}/dev_first_last_{{.schema_name}}/{{.catalog_name}}_{{.schema_name}}_filepush_volume/archive ========== ``` @@ -132,14 +132,14 @@ volume_path_archive: /Volumes/{{.catalog_name}}/dev_chi_yang_{{.schema_name}}/{{ Get the dev volume path (note the **prefixed schema**): ```bash -databricks tables get {{.catalog_name}}.dev_chi_yang_{{.schema_name}}.table1 --output json \ +databricks tables get {{.catalog_name}}.dev_first_last_{{.schema_name}}.table1 --output json \ | jq -r '.properties["filepush.table_volume_path_data"]' ``` Example output: ```text -/Volumes/{{.catalog_name}}/dev_chi_yang_{{.schema_name}}/{{.catalog_name}}_{{.schema_name}}_filepush_volume/data/table1 +/Volumes/{{.catalog_name}}/dev_first_last_{{.schema_name}}/{{.catalog_name}}_{{.schema_name}}_filepush_volume/data/table1 ``` Then follow the upload instructions from [Quick Start → Step 3](#step-3-retrieve-endpoint--push-files) to send test files. @@ -163,5 +163,5 @@ databricks bundle run refresh_pipeline --full-refresh table1 --- -**That’s it!** You now have a managed, push-based file ingestion workflow with debuggable table configs and repeatable deployments. +**That’s it!** You now have a managed, push-based file ingestion workflow with debuggable table configs and repeatable deployments! diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/pipeline.yml b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/pipeline.yml index e30c4ae6..1b325543 100644 --- a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/pipeline.yml +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/pipeline.yml @@ -12,4 +12,5 @@ resources: path: ../src/ingestion.py root_path: ../src configuration: + lakeflow.experimantal.filepush.version: 0.1 filepush.volume_path_root: ${resources.volumes.filepush_volume.volume_path} From 0670999847c05351c5aae62bfe2caaff9c06fa53 Mon Sep 17 00:00:00 2001 From: chi-yang-db Date: Wed, 8 Oct 2025 10:48:59 -0700 Subject: [PATCH 03/15] EOL warning --- .../{{.catalog_name}}.{{.schema_name}}/src/utils/envmanager.py | 1 - .../{{.catalog_name}}.{{.schema_name}}/src/utils/tablemanager.py | 1 - 2 files changed, 2 deletions(-) diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/envmanager.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/envmanager.py index ba822a98..138aac39 100644 --- a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/envmanager.py +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/envmanager.py @@ -35,4 +35,3 @@ def has_default_storage() -> bool: props.get("defaultManagedLocation") or props.get("delta.defaultLocation") ) - \ No newline at end of file diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/tablemanager.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/tablemanager.py index 5aaf9a55..46cf7aa3 100644 --- a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/tablemanager.py +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/tablemanager.py @@ -95,4 +95,3 @@ def get_placeholder_df_with_config(spark: SparkSession, table_config: dict) -> D reader = _apply_table_options(reader, table_config, fmt_mgr).schema(fmt_mgr.get_default_schema()) return reader.load(get_table_volume_path(table_config.get("name"))) - \ No newline at end of file From ec450658f4c455201f9e187896e4c4517f623cdc Mon Sep 17 00:00:00 2001 From: chi-yang-db Date: Wed, 8 Oct 2025 10:57:53 -0700 Subject: [PATCH 04/15] Switch to ipynb notebook --- .../src/debug_table_config.ipynb | 153 ++++++++++++++++++ .../src/debug_table_config.py | 63 -------- .../src/utils/envmanager.py | 2 +- 3 files changed, 154 insertions(+), 64 deletions(-) create mode 100644 contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/debug_table_config.ipynb delete mode 100644 contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/debug_table_config.py diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/debug_table_config.ipynb b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/debug_table_config.ipynb new file mode 100644 index 00000000..89bd4615 --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/debug_table_config.ipynb @@ -0,0 +1,153 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "c0609c90-7292-4a15-9e5f-bc5740240c24", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "## This Notebook should be run in a workspace, not locally.\n", + "## Paste the table config JSON you would like to debug from `./configs/tables.json` and assign to variable `table_config`\n", + "For example,\n", + "```\n", + "table_config = r'''\n", + "{\n", + " \"name\": \"all_employees\",\n", + " \"format\": \"csv\",\n", + " \"format_options\": {\n", + " \"escape\": \"\\\"\",\n", + " \"multiLine\": \"false\"\n", + " }\n", + " \"schema_hints\": \"id int, name string\"\n", + "}\n", + "'''\n", + "```\n", + "Only `name` and `format` are required for a table." + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "0065f01d-7a87-4810-9dc9-55e1ab56815f", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "table_config = r'''\n", + " {\n", + " \"name\": \"employees\",\n", + " \"format\": \"csv\",\n", + " \"format_options\": {\n", + " \"escape\": \"\\\"\"\n", + " },\n", + " \"schema_hints\": \"id int, name string\"\n", + " }\n", + "'''" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "86b00986-55bd-46e1-8258-a8240b7d2e0e", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "## Click `Run all` and inspect the parsed result. Iterate on the config until the result looks good" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "77843022-4cb8-4387-baf7-927705aa104d", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + }, + "jupyter": { + "source_hidden": true + } + }, + "outputs": [], + "source": [ + "import json\n", + "import tempfile\n", + "from utils import tablemanager\n", + "from utils import envmanager\n", + "\n", + "if not envmanager.has_default_storage():\n", + " print(\"WARNING: Current catalog is not using default storage, some file push feature may not be available\")\n", + "\n", + "# Load table config\n", + "table_config_json = json.loads(table_config)\n", + "tablemanager.validate_config(table_config_json)\n", + "table_name = table_config_json[\"name\"]\n", + "table_volume_path_data = tablemanager.get_table_volume_path(table_name)\n", + "\n", + "assert tablemanager.has_data_file(table_name), f\"No data file found in {table_volume_path_data}. Please upload at least 1 file to {table_volume_path_data}\"\n", + "\n", + "# Put schema location in temp directory\n", + "with tempfile.TemporaryDirectory() as tmpdir:\n", + " display(tablemanager.get_df_with_config(spark, table_config_json, tmpdir))" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "071ffa2e-f391-451b-a584-5727edd507c0", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "## Copy and paste the modified config back to the `./configs/tables.json` in the DAB folder" + ] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "computePreferences": null, + "dashboards": [], + "environmentMetadata": null, + "inputWidgetPreferences": null, + "language": "python", + "notebookMetadata": { + "pythonIndentUnit": 2 + }, + "notebookName": "debug_table_config", + "widgets": {} + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/debug_table_config.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/debug_table_config.py deleted file mode 100644 index 0d697fcd..00000000 --- a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/debug_table_config.py +++ /dev/null @@ -1,63 +0,0 @@ -# Databricks notebook source -# MAGIC %md -# MAGIC ## Paste the table config JSON you would like to debug from `./configs/tables.json` and assign to variable `table_config` -# MAGIC For example, -# MAGIC ``` -# MAGIC table_config = r''' -# MAGIC { -# MAGIC "name": "all_employees", -# MAGIC "format": "csv", -# MAGIC "format_options": { -# MAGIC "escape": "\"", -# MAGIC "multiLine": "false" -# MAGIC } -# MAGIC "schema_hints": "id int, name string" -# MAGIC } -# MAGIC ''' -# MAGIC ``` -# MAGIC Only `name` and `format` are required for a table. - -# COMMAND ---------- - -table_config = r''' - { - "name": "employees", - "format": "csv", - "format_options": { - "escape": "\"" - }, - "schema_hints": "id int, name string" - } -''' - -# COMMAND ---------- - -# MAGIC %md -# MAGIC ## Click `Run all` and inspect the parsed result. Iterate on the config until the result looks good - -# COMMAND ---------- - -import json -import tempfile -from utils import tablemanager -from utils import envmanager - -if not envmanager.has_default_storage(): - print("WARNING: Current catalog is not using default storage, some file push feature may not be available") - -# Load table config -table_config_json = json.loads(table_config) -tablemanager.validate_config(table_config_json) -table_name = table_config_json["name"] -table_volume_path_data = tablemanager.get_table_volume_path(table_name) - -assert tablemanager.has_data_file(table_name), f"No data file found in {table_volume_path_data}. Please upload at least 1 file to {table_volume_path_data}" - -# Put schema location in temp directory -with tempfile.TemporaryDirectory() as tmpdir: - display(tablemanager.get_df_with_config(spark, table_config_json, tmpdir)) - -# COMMAND ---------- - -# MAGIC %md -# MAGIC ## Copy and paste the modified config back to the `./configs/tables.json` in the DAB folder \ No newline at end of file diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/envmanager.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/envmanager.py index 138aac39..25aa9d0c 100644 --- a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/envmanager.py +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/envmanager.py @@ -5,7 +5,7 @@ def get_config() -> dict: json_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "configs", "environment.json") if not os.path.exists(json_path): - raise RuntimeError(f"Missing environment file: {json_path}. Have you run `databricks bundle run configuration_job`?") + raise RuntimeError(f"Missing environment file: {json_path}. This should be run in a workspace, not locally. And make sure to run `databricks bundle run configuration_job` at least once.") with open(json_path, "r") as f: configs = json.load(f) return configs From 96b9c72ca0684d30b2cbe01fedb5be5cbc1f77fb Mon Sep 17 00:00:00 2001 From: chi-yang-db Date: Wed, 8 Oct 2025 13:50:07 -0700 Subject: [PATCH 05/15] Add 2 more supported formats --- .../src/utils/formatmanager.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py index 663b897a..7871f443 100644 --- a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py @@ -94,7 +94,23 @@ def __init__(self): } self.default_schema |= {"_corrupt_record STRING"} -_supported_formats: dict[str, AutoLoaderFormat] = {f.name: f for f in (CSV(), JSON())} +class AVRO(AutoLoaderFormat): + def __init__(self): + super().__init__() + self.name = "AVRO" + self.options |= { + AutoLoaderOption("mergeSchema", "true", True), + } + +class PARQUET(AutoLoaderFormat): + def __init__(self): + super().__init__() + self.name = "PARQUET" + self.options |= { + AutoLoaderOption("mergeSchema", "true", True), + } + +_supported_formats: dict[str, AutoLoaderFormat] = {f.name: f for f in (CSV(), JSON(), AVRO(), PARQUET())} def get_format_manager(fmt: str) -> dict[str, str]: key = fmt.strip().upper() From 99e90583ef32fc564f8190670566d019ce1b25d2 Mon Sep 17 00:00:00 2001 From: chi-yang-db Date: Wed, 8 Oct 2025 16:44:37 -0700 Subject: [PATCH 06/15] Update supported formats --- .../README.md.tmpl | 2 ++ .../src/configs/tables.json | 14 +++++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/README.md.tmpl b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/README.md.tmpl index dbf27fea..6d23f98d 100644 --- a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/README.md.tmpl +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/README.md.tmpl @@ -33,6 +33,8 @@ A lightweight, no‑code file ingestion workflow. Configure a set of tables, get ### Step 1. Configure tables Edit table configs in `./src/configs/tables.json`. Only `name` and `format` are required. +Currently supported formats are `csv`, `json`, `avro` and `parquet`. + For supported `format_options`, see the [Auto Loader options](https://docs.databricks.com/aws/en/ingestion/cloud-object-storage/auto-loader/options). Not all options are supported here. If unsure, specify only `name` and `format`, or follow [Debug Table Issues](#debug-table-issues) to discover the correct options. ```json diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/configs/tables.json b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/configs/tables.json index a231c03f..0a57f166 100644 --- a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/configs/tables.json +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/configs/tables.json @@ -1,6 +1,18 @@ [ { - "name": "example_table", + "name": "example_table_csv", "format": "csv" + }, + { + "name": "example_table_json", + "format": "json" + }, + { + "name": "example_table_avro", + "format": "avro" + }, + { + "name": "example_table_parquet", + "format": "parquet" } ] From 313df6c3151f5d85b218448d6ed4c9d414e2767a Mon Sep 17 00:00:00 2001 From: chi-yang-db Date: Fri, 17 Oct 2025 16:37:34 -0700 Subject: [PATCH 07/15] Allow use to alter csv header option --- .../src/utils/formatmanager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py index 7871f443..0cd835db 100644 --- a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py @@ -63,7 +63,7 @@ def __init__(self): super().__init__() self.name = "CSV" self.options |= { - AutoLoaderOption("header", "true", True), + AutoLoaderOption("header", "true"), AutoLoaderOption("mergeSchema", "true", True), AutoLoaderOption("mode", "PERMISSIVE", True), AutoLoaderOption("columnNameOfCorruptRecord", "_corrupt_record", True), From dfb16deec5953a36768f53dda65d49c887e5e8f5 Mon Sep 17 00:00:00 2001 From: chi-yang-db Date: Fri, 17 Oct 2025 16:40:12 -0700 Subject: [PATCH 08/15] Update README.md --- .../README.md.tmpl | 21 +++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/README.md.tmpl b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/README.md.tmpl index 6d23f98d..58b6fc27 100644 --- a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/README.md.tmpl +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/README.md.tmpl @@ -1,16 +1,4 @@ ---- -title: "File Push" -language: python -author: "Chi Yang" -date: 2025-08-07 - -tags: -- ingestion -- file -- nocode ---- - -# File Push +# Zerobus - File Mode A lightweight, no‑code file ingestion workflow. Configure a set of tables, get a volume path for each, and drop files into those paths—your data lands in Unity Catalog tables via Auto Loader and Lakeflow Pipeline. @@ -67,6 +55,12 @@ databricks bundle run configuration_job Wait for the configuration job to finish before moving on. ### Step 3. Retrieve endpoint & push files +First, grant write permissions to the volume. This enables the client to push files: + +```bash +databricks bundle open filepush_volume +``` + Fetch the volume path for uploading files to a specific table (example: `table1`): ```bash @@ -131,6 +125,7 @@ volume_path_archive: /Volumes/{{.catalog_name}}/dev_first_last_{{.schema_name}}/ > **Note:** In **dev mode**, the schema name is **prefixed**. Use the printed schema name for the remaining steps. ### Step 3. Retrieve endpoint & push files to debug + Get the dev volume path (note the **prefixed schema**): ```bash From cf6834b174ad4188449ecab90a74ca02666148a2 Mon Sep 17 00:00:00 2001 From: chi-yang-db Date: Fri, 17 Oct 2025 16:50:39 -0700 Subject: [PATCH 09/15] Ran ruff formatter --- .../src/debug_table_config.ipynb | 14 +- .../src/ingestion.py | 48 ++-- .../src/utils/envmanager.py | 66 ++--- .../src/utils/formatmanager.py | 227 ++++++++++-------- .../src/utils/initialization.py | 47 ++-- .../src/utils/tablemanager.py | 193 +++++++++------ 6 files changed, 335 insertions(+), 260 deletions(-) diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/debug_table_config.ipynb b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/debug_table_config.ipynb index 89bd4615..968fc5e4 100644 --- a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/debug_table_config.ipynb +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/debug_table_config.ipynb @@ -47,7 +47,7 @@ }, "outputs": [], "source": [ - "table_config = r'''\n", + "table_config = r\"\"\"\n", " {\n", " \"name\": \"employees\",\n", " \"format\": \"csv\",\n", @@ -56,7 +56,7 @@ " },\n", " \"schema_hints\": \"id int, name string\"\n", " }\n", - "'''" + "\"\"\"" ] }, { @@ -99,7 +99,9 @@ "from utils import envmanager\n", "\n", "if not envmanager.has_default_storage():\n", - " print(\"WARNING: Current catalog is not using default storage, some file push feature may not be available\")\n", + " print(\n", + " \"WARNING: Current catalog is not using default storage, some file push feature may not be available\"\n", + " )\n", "\n", "# Load table config\n", "table_config_json = json.loads(table_config)\n", @@ -107,11 +109,13 @@ "table_name = table_config_json[\"name\"]\n", "table_volume_path_data = tablemanager.get_table_volume_path(table_name)\n", "\n", - "assert tablemanager.has_data_file(table_name), f\"No data file found in {table_volume_path_data}. Please upload at least 1 file to {table_volume_path_data}\"\n", + "assert tablemanager.has_data_file(table_name), (\n", + " f\"No data file found in {table_volume_path_data}. Please upload at least 1 file to {table_volume_path_data}\"\n", + ")\n", "\n", "# Put schema location in temp directory\n", "with tempfile.TemporaryDirectory() as tmpdir:\n", - " display(tablemanager.get_df_with_config(spark, table_config_json, tmpdir))" + " display(tablemanager.get_df_with_config(spark, table_config_json, tmpdir))" ] }, { diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/ingestion.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/ingestion.py index 0ddd7f26..e441b5ca 100644 --- a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/ingestion.py +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/ingestion.py @@ -2,34 +2,36 @@ from utils import tablemanager from utils import formatmanager + def _make_append_flow(table_name, table_config, table_volume_path): - def _body(): - # use _rescued_data as placeholder when no data file is present - if not tablemanager.has_data_file(table_name): - return tablemanager.get_placeholder_df_with_config(spark, table_config) - else: - return tablemanager.get_df_with_config(spark, table_config) + def _body(): + # use _rescued_data as placeholder when no data file is present + if not tablemanager.has_data_file(table_name): + return tablemanager.get_placeholder_df_with_config(spark, table_config) + else: + return tablemanager.get_df_with_config(spark, table_config) + + # give the function a unique name + _body.__name__ = f"append_{table_name.lower()}" - # give the function a unique name - _body.__name__ = f"append_{table_name.lower()}" + # apply the decorator programmatically + dlt.append_flow(target=table_name, name=table_name)(_body) - # apply the decorator programmatically - dlt.append_flow(target=table_name, name=table_name)(_body) table_configs = tablemanager.get_configs() # create the tables and append flows for cfg in table_configs: - tablemanager.validate_config(cfg) - tbl = cfg["name"] - path = tablemanager.get_table_volume_path(tbl) - fmt = formatmanager.get_format_manager(cfg["format"]) - expts = fmt.expectations - - dlt.create_streaming_table( - name=tbl, - comment="File push created table", - table_properties={"filepush.table_volume_path_data": path}, - expect_all=expts - ) - _make_append_flow(tbl, cfg, path) + tablemanager.validate_config(cfg) + tbl = cfg["name"] + path = tablemanager.get_table_volume_path(tbl) + fmt = formatmanager.get_format_manager(cfg["format"]) + expts = fmt.expectations + + dlt.create_streaming_table( + name=tbl, + comment="File push created table", + table_properties={"filepush.table_volume_path_data": path}, + expect_all=expts, + ) + _make_append_flow(tbl, cfg, path) diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/envmanager.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/envmanager.py index 25aa9d0c..432c9643 100644 --- a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/envmanager.py +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/envmanager.py @@ -2,36 +2,42 @@ import json from databricks.sdk import WorkspaceClient + def get_config() -> dict: - json_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "configs", "environment.json") - if not os.path.exists(json_path): - raise RuntimeError(f"Missing environment file: {json_path}. This should be run in a workspace, not locally. And make sure to run `databricks bundle run configuration_job` at least once.") - with open(json_path, "r") as f: - configs = json.load(f) - return configs + json_path = os.path.join( + os.path.dirname(os.path.dirname(__file__)), "configs", "environment.json" + ) + if not os.path.exists(json_path): + raise RuntimeError( + f"Missing environment file: {json_path}. This should be run in a workspace, not locally. And make sure to run `databricks bundle run configuration_job` at least once." + ) + with open(json_path, "r") as f: + configs = json.load(f) + return configs + def has_default_storage() -> bool: - catalog = get_config()["catalog_name"] - - w = WorkspaceClient() - - # Try SDK model first - info = w.catalogs.get(catalog) - storage_root = getattr(info, "storage_root", None) - storage_location = getattr(info, "storage_location", None) - props = getattr(info, "properties", {}) or {} - - # Some workspaces expose fields only via raw JSON; fall back if all empty - if not (storage_root or storage_location or props): - j = w.api_client.do("GET", f"/api/2.1/unity-catalog/catalogs/{catalog}") - storage_root = j.get("storage_root") or j.get("storageLocation") - storage_location = j.get("storage_location") or j.get("storageLocation") - props = j.get("properties", {}) or {} - - # Heuristics: any of these indicates “default storage” is set - return bool( - storage_root or - storage_location or - props.get("defaultManagedLocation") or - props.get("delta.defaultLocation") - ) + catalog = get_config()["catalog_name"] + + w = WorkspaceClient() + + # Try SDK model first + info = w.catalogs.get(catalog) + storage_root = getattr(info, "storage_root", None) + storage_location = getattr(info, "storage_location", None) + props = getattr(info, "properties", {}) or {} + + # Some workspaces expose fields only via raw JSON; fall back if all empty + if not (storage_root or storage_location or props): + j = w.api_client.do("GET", f"/api/2.1/unity-catalog/catalogs/{catalog}") + storage_root = j.get("storage_root") or j.get("storageLocation") + storage_location = j.get("storage_location") or j.get("storageLocation") + props = j.get("properties", {}) or {} + + # Heuristics: any of these indicates “default storage” is set + return bool( + storage_root + or storage_location + or props.get("defaultManagedLocation") + or props.get("delta.defaultLocation") + ) diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py index 0cd835db..efed584d 100644 --- a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py @@ -1,121 +1,142 @@ from dataclasses import dataclass from . import envmanager + @dataclass(frozen=True, slots=True) class AutoLoaderOption: - key: str - value: str - hidden: bool = False - def __iter__(self): - yield (self.key, self) + key: str + value: str + hidden: bool = False + + def __iter__(self): + yield (self.key, self) + class AutoLoaderFormat: - def __init__(self): - self.name = None - self.options: set[AutoLoaderOption] = { - AutoLoaderOption("cloudFiles.inferColumnTypes", "true", True), - AutoLoaderOption("cloudFiles.schemaEvolutionMode", "addNewColumns", True), - AutoLoaderOption("cloudFiles.cleanSource", "MOVE", True), - AutoLoaderOption("cloudFiles.cleanSource.retentionDuration", "14 days", True), - AutoLoaderOption("cloudFiles.cleanSource.moveDestination", f"{envmanager.get_config()['volume_path_archive']}/{{table_name}}", True) - } - self.expectations: dict[str, str] = { - "Rescued data should be null": "_rescued_data IS NULL" - } - self.default_schema: set[str] = {"_rescued_data STRING"} - - def get_default_schema(self) -> str: - return ", ".join(self.default_schema) - - def get_userfacing_options(self) -> dict[str, str]: - return {opt.key: opt.value for opt in self.options if not opt.hidden} - - def validate_user_options(self, options: dict[str, str]) -> None: - allowed = set(self.get_userfacing_options()) - illegal = set(options) - allowed - if illegal: - raise ValueError( - f"Unsupported or protected options: {sorted(illegal)}. " - f"Allowed user options: {sorted(allowed)}" - ) - - def get_modified_options(self, options: dict[str, str]) -> dict[str, str]: - self.validate_user_options(options) - defaults = self.get_userfacing_options() - return {k: v for k, v in options.items() if k in defaults and v != defaults[k]} - - def get_merged_options(self, options: dict[str, str], table_name: str) -> dict[str, str]: - self.validate_user_options(options) - defaults = self.get_userfacing_options() - - merged = defaults.copy() - merged.update({k: v for k, v in options.items() if k in defaults}) - - # Format the moveDestination with table_name - move_dest_key = "cloudFiles.cleanSource.moveDestination" - if move_dest_key in merged: - merged[move_dest_key] = merged[move_dest_key].format(table_name=table_name) - - return merged + def __init__(self): + self.name = None + self.options: set[AutoLoaderOption] = { + AutoLoaderOption("cloudFiles.inferColumnTypes", "true", True), + AutoLoaderOption("cloudFiles.schemaEvolutionMode", "addNewColumns", True), + AutoLoaderOption("cloudFiles.cleanSource", "MOVE", True), + AutoLoaderOption( + "cloudFiles.cleanSource.retentionDuration", "14 days", True + ), + AutoLoaderOption( + "cloudFiles.cleanSource.moveDestination", + f"{envmanager.get_config()['volume_path_archive']}/{{table_name}}", + True, + ), + } + self.expectations: dict[str, str] = { + "Rescued data should be null": "_rescued_data IS NULL" + } + self.default_schema: set[str] = {"_rescued_data STRING"} + + def get_default_schema(self) -> str: + return ", ".join(self.default_schema) + + def get_userfacing_options(self) -> dict[str, str]: + return {opt.key: opt.value for opt in self.options if not opt.hidden} + + def validate_user_options(self, options: dict[str, str]) -> None: + allowed = set(self.get_userfacing_options()) + illegal = set(options) - allowed + if illegal: + raise ValueError( + f"Unsupported or protected options: {sorted(illegal)}. " + f"Allowed user options: {sorted(allowed)}" + ) + + def get_modified_options(self, options: dict[str, str]) -> dict[str, str]: + self.validate_user_options(options) + defaults = self.get_userfacing_options() + return {k: v for k, v in options.items() if k in defaults and v != defaults[k]} + + def get_merged_options( + self, options: dict[str, str], table_name: str + ) -> dict[str, str]: + self.validate_user_options(options) + defaults = self.get_userfacing_options() + + merged = defaults.copy() + merged.update({k: v for k, v in options.items() if k in defaults}) + + # Format the moveDestination with table_name + move_dest_key = "cloudFiles.cleanSource.moveDestination" + if move_dest_key in merged: + merged[move_dest_key] = merged[move_dest_key].format(table_name=table_name) + + return merged + class CSV(AutoLoaderFormat): - def __init__(self): - super().__init__() - self.name = "CSV" - self.options |= { - AutoLoaderOption("header", "true"), - AutoLoaderOption("mergeSchema", "true", True), - AutoLoaderOption("mode", "PERMISSIVE", True), - AutoLoaderOption("columnNameOfCorruptRecord", "_corrupt_record", True), - AutoLoaderOption("delimiter", ","), - AutoLoaderOption("escape", "\""), - AutoLoaderOption("multiLine", "false"), - } - self.expectations |= { - "Corrupted record should be null": "_corrupt_record IS NULL" - } - self.default_schema |= {"_corrupt_record STRING"} + def __init__(self): + super().__init__() + self.name = "CSV" + self.options |= { + AutoLoaderOption("header", "true"), + AutoLoaderOption("mergeSchema", "true", True), + AutoLoaderOption("mode", "PERMISSIVE", True), + AutoLoaderOption("columnNameOfCorruptRecord", "_corrupt_record", True), + AutoLoaderOption("delimiter", ","), + AutoLoaderOption("escape", '"'), + AutoLoaderOption("multiLine", "false"), + } + self.expectations |= { + "Corrupted record should be null": "_corrupt_record IS NULL" + } + self.default_schema |= {"_corrupt_record STRING"} + class JSON(AutoLoaderFormat): - def __init__(self): - super().__init__() - self.name = "JSON" - self.options |= { - AutoLoaderOption("mergeSchema", "true", True), - AutoLoaderOption("mode", "PERMISSIVE", True), - AutoLoaderOption("columnNameOfCorruptRecord", "_corrupt_record", True), - AutoLoaderOption("allowComments", "true"), - AutoLoaderOption("allowSingleQuotes", "true"), - AutoLoaderOption("inferTimestamp", "true"), - AutoLoaderOption("multiLine", "true"), - } - self.expectations |= { - "Corrupted record should be null": "_corrupt_record IS NULL" - } - self.default_schema |= {"_corrupt_record STRING"} + def __init__(self): + super().__init__() + self.name = "JSON" + self.options |= { + AutoLoaderOption("mergeSchema", "true", True), + AutoLoaderOption("mode", "PERMISSIVE", True), + AutoLoaderOption("columnNameOfCorruptRecord", "_corrupt_record", True), + AutoLoaderOption("allowComments", "true"), + AutoLoaderOption("allowSingleQuotes", "true"), + AutoLoaderOption("inferTimestamp", "true"), + AutoLoaderOption("multiLine", "true"), + } + self.expectations |= { + "Corrupted record should be null": "_corrupt_record IS NULL" + } + self.default_schema |= {"_corrupt_record STRING"} + class AVRO(AutoLoaderFormat): - def __init__(self): - super().__init__() - self.name = "AVRO" - self.options |= { - AutoLoaderOption("mergeSchema", "true", True), - } + def __init__(self): + super().__init__() + self.name = "AVRO" + self.options |= { + AutoLoaderOption("mergeSchema", "true", True), + } + class PARQUET(AutoLoaderFormat): - def __init__(self): - super().__init__() - self.name = "PARQUET" - self.options |= { - AutoLoaderOption("mergeSchema", "true", True), - } + def __init__(self): + super().__init__() + self.name = "PARQUET" + self.options |= { + AutoLoaderOption("mergeSchema", "true", True), + } + + +_supported_formats: dict[str, AutoLoaderFormat] = { + f.name: f for f in (CSV(), JSON(), AVRO(), PARQUET()) +} -_supported_formats: dict[str, AutoLoaderFormat] = {f.name: f for f in (CSV(), JSON(), AVRO(), PARQUET())} def get_format_manager(fmt: str) -> dict[str, str]: - key = fmt.strip().upper() - try: - return _supported_formats[key] - except KeyError: - supported = ", ".join(sorted(_supported_formats)) - raise ValueError(f"{fmt!r} is not a supported format. Supported formats: {supported}") + key = fmt.strip().upper() + try: + return _supported_formats[key] + except KeyError: + supported = ", ".join(sorted(_supported_formats)) + raise ValueError( + f"{fmt!r} is not a supported format. Supported formats: {supported}" + ) diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/initialization.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/initialization.py index aa106072..7182281a 100644 --- a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/initialization.py +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/initialization.py @@ -20,8 +20,7 @@ # Logging logging.basicConfig( - level=logging_level, - format="%(asctime)s [%(levelname)s] %(module)s - %(message)s" + level=logging_level, format="%(asctime)s [%(levelname)s] %(module)s - %(message)s" ) logger = logging.getLogger(__name__) # per-module logger @@ -32,11 +31,14 @@ logger.info(f"Setting property to schema {catalog_name}.{schema_name}") logger.debug(f"Volume path root: {volume_path_root}") logger.debug(f"Volume path data: {volume_path_data}") -ws.schemas.update(full_name=f"{catalog_name}.{schema_name}", properties={ - "filepush.volume_path_root": volume_path_root, - "filepush.volume_path_data": volume_path_data, - "filepush.volume_path_data": volume_path_archive -}) +ws.schemas.update( + full_name=f"{catalog_name}.{schema_name}", + properties={ + "filepush.volume_path_root": volume_path_root, + "filepush.volume_path_data": volume_path_data, + "filepush.volume_path_data": volume_path_archive, + }, +) logger.info(f"Schema {catalog_name}.{schema_name} configured") # Initialize volume folder structure @@ -46,24 +48,27 @@ logger.debug(f"Creating archive directory {volume_path_archive}") ws.files.create_directory(volume_path_archive) with open("../configs/tables.json", "r") as f: - for table in json.load(f): - table_volume_path_data = f"{volume_path_data}/{table['name']}" - logger.debug(f"Creating table directory {table_volume_path_data}") - ws.files.create_directory(table_volume_path_data) - table_volume_path_archive = f"{volume_path_archive}/{table['name']}" - logger.debug(f"Creating table archive directory {table_volume_path_archive}") - ws.files.create_directory(table_volume_path_archive) + for table in json.load(f): + table_volume_path_data = f"{volume_path_data}/{table['name']}" + logger.debug(f"Creating table directory {table_volume_path_data}") + ws.files.create_directory(table_volume_path_data) + table_volume_path_archive = f"{volume_path_archive}/{table['name']}" + logger.debug(f"Creating table archive directory {table_volume_path_archive}") + ws.files.create_directory(table_volume_path_archive) logger.info(f"Volume {volume_path_root} configured") # Dump configs to environment json all_configs = { - "catalog_name": catalog_name, - "schema_name": schema_name, - "volume_path_root": volume_path_root, - "volume_path_data": volume_path_data, - "volume_path_archive": volume_path_archive + "catalog_name": catalog_name, + "schema_name": schema_name, + "volume_path_root": volume_path_root, + "volume_path_data": volume_path_data, + "volume_path_archive": volume_path_archive, } with open("../configs/environment.json", "w") as f: - json.dump(all_configs, f) + json.dump(all_configs, f) -logger.info(f"==========\n%s\n==========", "\n".join(f"{k}: {v}" for k, v in all_configs.items())) +logger.info( + f"==========\n%s\n==========", + "\n".join(f"{k}: {v}" for k, v in all_configs.items()), +) diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/tablemanager.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/tablemanager.py index 46cf7aa3..2da7766f 100644 --- a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/tablemanager.py +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/tablemanager.py @@ -7,91 +7,128 @@ from databricks.sdk import WorkspaceClient from databricks.sdk.errors.platform import NotFound + def validate_config(table_config: dict): - if not table_config.get("name"): - raise ValueError("name is required for table config") - if not table_config.get("format"): - raise ValueError("format is required for table config") + if not table_config.get("name"): + raise ValueError("name is required for table config") + if not table_config.get("format"): + raise ValueError("format is required for table config") + def validate_configs(table_configs: list): - names = [cfg.get("name") for cfg in table_configs] - duplicates = set([name for name in names if names.count(name) > 1 and name is not None]) - if duplicates: - raise ValueError(f"Duplicate table names found in table configs: {sorted(duplicates)}") - for table_config in table_configs: - validate_config(table_config) - + names = [cfg.get("name") for cfg in table_configs] + duplicates = set( + [name for name in names if names.count(name) > 1 and name is not None] + ) + if duplicates: + raise ValueError( + f"Duplicate table names found in table configs: {sorted(duplicates)}" + ) + for table_config in table_configs: + validate_config(table_config) + + def get_configs() -> list: - json_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "configs", "tables.json") - if not os.path.exists(json_path): - raise RuntimeError(f"Missing table configs file: {json_path}. Please following README.md to create one, deploy and run configuration_job.") - with open(json_path, "r") as f: - configs = json.load(f) - validate_configs(configs) - return configs + json_path = os.path.join( + os.path.dirname(os.path.dirname(__file__)), "configs", "tables.json" + ) + if not os.path.exists(json_path): + raise RuntimeError( + f"Missing table configs file: {json_path}. Please following README.md to create one, deploy and run configuration_job." + ) + with open(json_path, "r") as f: + configs = json.load(f) + validate_configs(configs) + return configs + def get_table_volume_path(table_name: str) -> str: - ws = WorkspaceClient() - table_volume_path_data = os.path.join(envmanager.get_config()["volume_path_data"], table_name) - try: - ws.files.get_directory_metadata(table_volume_path_data) - except NotFound: - raise RuntimeError(f"Table data path not found for table `{table_name}`. Have you run `databricks bundle run configuration_job`?") - return table_volume_path_data + ws = WorkspaceClient() + table_volume_path_data = os.path.join( + envmanager.get_config()["volume_path_data"], table_name + ) + try: + ws.files.get_directory_metadata(table_volume_path_data) + except NotFound: + raise RuntimeError( + f"Table data path not found for table `{table_name}`. Have you run `databricks bundle run configuration_job`?" + ) + return table_volume_path_data + def has_data_file(table_name: str) -> bool: - ws = WorkspaceClient() - table_volume_path_data = get_table_volume_path(table_name) - try: - iter = ws.files.list_directory_contents(table_volume_path_data) - next(iter) - except StopIteration: - return False - return True + ws = WorkspaceClient() + table_volume_path_data = get_table_volume_path(table_name) + try: + iter = ws.files.list_directory_contents(table_volume_path_data) + next(iter) + except StopIteration: + return False + return True + def is_table_created(table_name: str) -> bool: - ws = WorkspaceClient() - return ws.tables.exists(full_name=f"{envmanager.get_config()['catalog_name']}.{envmanager.get_config()['schema_name']}.{table_name}").table_exists - -def _apply_table_options(reader: DataStreamReader, table_config: dict, fmt_mgr) -> DataStreamReader: - name = table_config.get("name") - fmt = table_config.get("format") - - # format options - user_fmt_opts = table_config.get("format_options", {}) - final_fmt_opts = fmt_mgr.get_merged_options(user_fmt_opts, name) - reader = reader.option("cloudFiles.format", fmt) - for k, v in final_fmt_opts.items(): - reader = reader.option(k, v) - - # schema hints - schema_hints = table_config.get("schema_hints") - if schema_hints: - reader = reader.option("cloudFiles.schemaHints", ", ".join({schema_hints} | fmt_mgr.default_schema)) - else: - reader = reader.option("cloudFiles.schemaHints", ", ".join(fmt_mgr.default_schema)) - - return reader - -def get_df_with_config(spark: SparkSession, table_config: dict, schema_location: str = None) -> DataFrame: - validate_config(table_config) - fmt = table_config.get("format") - fmt_mgr = formatmanager.get_format_manager(fmt) - - reader = spark.readStream.format("cloudFiles") - reader = _apply_table_options(reader, table_config, fmt_mgr) - if schema_location: - reader = reader.option("cloudFiles.schemaLocation", schema_location) - - # include file metadata - return reader.load(get_table_volume_path(table_config.get("name"))).selectExpr("*", "_metadata") - -def get_placeholder_df_with_config(spark: SparkSession, table_config: dict) -> DataFrame: - validate_config(table_config) - fmt = table_config.get("format") - fmt_mgr = formatmanager.get_format_manager(fmt) - - reader = spark.readStream.format("cloudFiles") - reader = _apply_table_options(reader, table_config, fmt_mgr).schema(fmt_mgr.get_default_schema()) - - return reader.load(get_table_volume_path(table_config.get("name"))) + ws = WorkspaceClient() + return ws.tables.exists( + full_name=f"{envmanager.get_config()['catalog_name']}.{envmanager.get_config()['schema_name']}.{table_name}" + ).table_exists + + +def _apply_table_options( + reader: DataStreamReader, table_config: dict, fmt_mgr +) -> DataStreamReader: + name = table_config.get("name") + fmt = table_config.get("format") + + # format options + user_fmt_opts = table_config.get("format_options", {}) + final_fmt_opts = fmt_mgr.get_merged_options(user_fmt_opts, name) + reader = reader.option("cloudFiles.format", fmt) + for k, v in final_fmt_opts.items(): + reader = reader.option(k, v) + + # schema hints + schema_hints = table_config.get("schema_hints") + if schema_hints: + reader = reader.option( + "cloudFiles.schemaHints", ", ".join({schema_hints} | fmt_mgr.default_schema) + ) + else: + reader = reader.option( + "cloudFiles.schemaHints", ", ".join(fmt_mgr.default_schema) + ) + + return reader + + +def get_df_with_config( + spark: SparkSession, table_config: dict, schema_location: str = None +) -> DataFrame: + validate_config(table_config) + fmt = table_config.get("format") + fmt_mgr = formatmanager.get_format_manager(fmt) + + reader = spark.readStream.format("cloudFiles") + reader = _apply_table_options(reader, table_config, fmt_mgr) + if schema_location: + reader = reader.option("cloudFiles.schemaLocation", schema_location) + + # include file metadata + return reader.load(get_table_volume_path(table_config.get("name"))).selectExpr( + "*", "_metadata" + ) + + +def get_placeholder_df_with_config( + spark: SparkSession, table_config: dict +) -> DataFrame: + validate_config(table_config) + fmt = table_config.get("format") + fmt_mgr = formatmanager.get_format_manager(fmt) + + reader = spark.readStream.format("cloudFiles") + reader = _apply_table_options(reader, table_config, fmt_mgr).schema( + fmt_mgr.get_default_schema() + ) + + return reader.load(get_table_volume_path(table_config.get("name"))) From 9858624dc69cd7c57802f0b04fa60ab8576b7871 Mon Sep 17 00:00:00 2001 From: chi-yang-db Date: Fri, 17 Oct 2025 21:31:56 -0700 Subject: [PATCH 10/15] All user to alter schema evolution mode per discussion today --- .../src/utils/formatmanager.py | 10 +++++-- .../src/utils/tablemanager.py | 30 ++++++++++++------- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py index efed584d..ab43a162 100644 --- a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py @@ -20,7 +20,7 @@ def __init__(self): AutoLoaderOption("cloudFiles.schemaEvolutionMode", "addNewColumns", True), AutoLoaderOption("cloudFiles.cleanSource", "MOVE", True), AutoLoaderOption( - "cloudFiles.cleanSource.retentionDuration", "14 days", True + "cloudFiles.cleanSource.retentionDuration", "1 day", True ), AutoLoaderOption( "cloudFiles.cleanSource.moveDestination", @@ -54,14 +54,18 @@ def get_modified_options(self, options: dict[str, str]) -> dict[str, str]: return {k: v for k, v in options.items() if k in defaults and v != defaults[k]} def get_merged_options( - self, options: dict[str, str], table_name: str + self, options: dict[str, str], table_name: str, is_placeholder: bool=False ) -> dict[str, str]: self.validate_user_options(options) - defaults = self.get_userfacing_options() + defaults = {opt.key: opt.value for opt in self.options} merged = defaults.copy() merged.update({k: v for k, v in options.items() if k in defaults}) + # Do not specify schema evolution mode in placeholder + if is_placeholder: + merged.pop("cloudFiles.schemaEvolutionMode", None) + # Format the moveDestination with table_name move_dest_key = "cloudFiles.cleanSource.moveDestination" if move_dest_key in merged: diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/tablemanager.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/tablemanager.py index 2da7766f..c411bb1e 100644 --- a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/tablemanager.py +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/tablemanager.py @@ -75,28 +75,38 @@ def is_table_created(table_name: str) -> bool: def _apply_table_options( - reader: DataStreamReader, table_config: dict, fmt_mgr + reader: DataStreamReader, table_config: dict, fmt_mgr, is_placeholder: bool = False ) -> DataStreamReader: name = table_config.get("name") fmt = table_config.get("format") # format options user_fmt_opts = table_config.get("format_options", {}) - final_fmt_opts = fmt_mgr.get_merged_options(user_fmt_opts, name) + final_fmt_opts = fmt_mgr.get_merged_options(user_fmt_opts, name, is_placeholder) reader = reader.option("cloudFiles.format", fmt) for k, v in final_fmt_opts.items(): reader = reader.option(k, v) # schema hints schema_hints = table_config.get("schema_hints") + + use_schema = ( + final_fmt_opts.get("cloudFiles.schemaEvolutionMode", "").lower() == "rescue" + or is_placeholder + ) + + # schema_hints goes first, then default schema entries (ordered) + base_schema = [] if schema_hints: - reader = reader.option( - "cloudFiles.schemaHints", ", ".join({schema_hints} | fmt_mgr.default_schema) - ) + base_schema.append(schema_hints) + base_schema.extend(fmt_mgr.default_schema) + + joined_schema = ", ".join(base_schema) + + if use_schema: + reader = reader.schema(joined_schema) else: - reader = reader.option( - "cloudFiles.schemaHints", ", ".join(fmt_mgr.default_schema) - ) + reader = reader.option("cloudFiles.schemaHints", joined_schema) return reader @@ -127,8 +137,6 @@ def get_placeholder_df_with_config( fmt_mgr = formatmanager.get_format_manager(fmt) reader = spark.readStream.format("cloudFiles") - reader = _apply_table_options(reader, table_config, fmt_mgr).schema( - fmt_mgr.get_default_schema() - ) + reader = _apply_table_options(reader, table_config, fmt_mgr, is_placeholder=True) return reader.load(get_table_volume_path(table_config.get("name"))) From dc2578b467d4b00992d79f682d521217f630d3ea Mon Sep 17 00:00:00 2001 From: chi-yang-db Date: Fri, 17 Oct 2025 21:32:46 -0700 Subject: [PATCH 11/15] Run ruff format --- .../src/utils/formatmanager.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py index ab43a162..8b4825ed 100644 --- a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py @@ -19,9 +19,7 @@ def __init__(self): AutoLoaderOption("cloudFiles.inferColumnTypes", "true", True), AutoLoaderOption("cloudFiles.schemaEvolutionMode", "addNewColumns", True), AutoLoaderOption("cloudFiles.cleanSource", "MOVE", True), - AutoLoaderOption( - "cloudFiles.cleanSource.retentionDuration", "1 day", True - ), + AutoLoaderOption("cloudFiles.cleanSource.retentionDuration", "1 day", True), AutoLoaderOption( "cloudFiles.cleanSource.moveDestination", f"{envmanager.get_config()['volume_path_archive']}/{{table_name}}", @@ -54,7 +52,7 @@ def get_modified_options(self, options: dict[str, str]) -> dict[str, str]: return {k: v for k, v in options.items() if k in defaults and v != defaults[k]} def get_merged_options( - self, options: dict[str, str], table_name: str, is_placeholder: bool=False + self, options: dict[str, str], table_name: str, is_placeholder: bool = False ) -> dict[str, str]: self.validate_user_options(options) defaults = {opt.key: opt.value for opt in self.options} From 460de00d0accaca25609dea0ffb11025af414e44 Mon Sep 17 00:00:00 2001 From: chi-yang-db Date: Fri, 17 Oct 2025 21:57:31 -0700 Subject: [PATCH 12/15] allow user to change evolution mode --- .../src/utils/formatmanager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py index 8b4825ed..368109ed 100644 --- a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py @@ -17,7 +17,7 @@ def __init__(self): self.name = None self.options: set[AutoLoaderOption] = { AutoLoaderOption("cloudFiles.inferColumnTypes", "true", True), - AutoLoaderOption("cloudFiles.schemaEvolutionMode", "addNewColumns", True), + AutoLoaderOption("cloudFiles.schemaEvolutionMode", "addNewColumns"), AutoLoaderOption("cloudFiles.cleanSource", "MOVE", True), AutoLoaderOption("cloudFiles.cleanSource.retentionDuration", "1 day", True), AutoLoaderOption( From 72b535b7dd93a1e01f06df22d2b815be2eaf21a5 Mon Sep 17 00:00:00 2001 From: chi-yang-db Date: Fri, 17 Oct 2025 22:00:27 -0700 Subject: [PATCH 13/15] Update project name --- contrib/templates/file-push/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/templates/file-push/README.md b/contrib/templates/file-push/README.md index f8d179b3..46b524e3 100644 --- a/contrib/templates/file-push/README.md +++ b/contrib/templates/file-push/README.md @@ -1,4 +1,4 @@ -# file-push +# Zerobus - File Mode This is an (experimental) template for creating a file push pipeline with Databricks Asset Bundles. From 3894f5ae05b07b602ebc8d06ec63b516e069e867 Mon Sep 17 00:00:00 2001 From: chi-yang-db Date: Tue, 21 Oct 2025 11:42:54 -0700 Subject: [PATCH 14/15] Parameterize workspace host --- .../{{.catalog_name}}.{{.schema_name}}/databricks.yml.tmpl | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/databricks.yml.tmpl b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/databricks.yml.tmpl index 33ca957a..3d5466be 100644 --- a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/databricks.yml.tmpl +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/databricks.yml.tmpl @@ -2,7 +2,8 @@ # This is the configuration for the file push DAB dab. bundle: - name: dab + name: {{.project_name}} + uuid: {{bundle_uuid}} include: - resources/*.yml @@ -12,13 +13,13 @@ targets: dev: mode: development workspace: - host: https://e2-dogfood.staging.cloud.databricks.com + host: {{workspace_host}} prod: mode: production default: true workspace: - host: https://e2-dogfood.staging.cloud.databricks.com + host: {{workspace_host}} root_path: /Workspace/Users/${workspace.current_user.userName}/.bundle/${bundle.name}/${bundle.target} permissions: - user_name: ${workspace.current_user.userName} From f8a4adc3cec46bb120bba3303cd61872004c39f4 Mon Sep 17 00:00:00 2001 From: chi-yang-db Date: Tue, 21 Oct 2025 11:48:45 -0700 Subject: [PATCH 15/15] Fix bundle name --- .../{{.catalog_name}}.{{.schema_name}}/databricks.yml.tmpl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/databricks.yml.tmpl b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/databricks.yml.tmpl index 3d5466be..9a8d779c 100644 --- a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/databricks.yml.tmpl +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/databricks.yml.tmpl @@ -2,7 +2,7 @@ # This is the configuration for the file push DAB dab. bundle: - name: {{.project_name}} + name: {{.schema_name}} uuid: {{bundle_uuid}} include: