diff --git a/contrib/templates/file-push/README.md b/contrib/templates/file-push/README.md new file mode 100644 index 00000000..46b524e3 --- /dev/null +++ b/contrib/templates/file-push/README.md @@ -0,0 +1,10 @@ +# Zerobus - File Mode + +This is an (experimental) template for creating a file push pipeline with Databricks Asset Bundles. + +Install it using +``` +databricks bundle init --template-dir contrib/templates/file-push https://github.com/databricks/bundle-examples +``` + +and follow the generated README.md to get started. \ No newline at end of file diff --git a/contrib/templates/file-push/databricks_template_schema.json b/contrib/templates/file-push/databricks_template_schema.json new file mode 100644 index 00000000..9022c2e1 --- /dev/null +++ b/contrib/templates/file-push/databricks_template_schema.json @@ -0,0 +1,22 @@ +{ + "welcome_message": "\nWelcome to the file-push template for Databricks Asset Bundles!\n\nA workspace was selected based on your current profile. For information about how to change this, see https://docs.databricks.com/dev-tools/cli/profiles.html.\nworkspace_host: {{workspace_host}}", + "properties": { + "catalog_name": { + "type": "string", + "description": "\nPlease provide the name of an EXISTING UC catalog with default storage enabled.\nCatalog Name", + "order": 1, + "default": "main", + "pattern": "^[a-z_][a-z0-9_]{0,254}$", + "pattern_match_failure_message": "Name must only consist of letters, numbers, and underscores." + }, + "schema_name": { + "type": "string", + "description": "\nPlease provide a NEW schema name where the pipelines and tables will land in.\nSchema Name", + "order": 2, + "default": "filepushschema", + "pattern": "^[a-z_][a-z0-9_]{0,254}$", + "pattern_match_failure_message": "Name must only consist of letters, numbers, dashes, and underscores." + } + }, + "success_message": "\nBundle folder '{{.catalog_name}}.{{.schema_name}}' has been created. Please refer to the README.md for next steps." +} diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/README.md.tmpl b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/README.md.tmpl new file mode 100644 index 00000000..58b6fc27 --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/README.md.tmpl @@ -0,0 +1,164 @@ +# Zerobus - File Mode + +A lightweight, no‑code file ingestion workflow. Configure a set of tables, get a volume path for each, and drop files into those paths—your data lands in Unity Catalog tables via Auto Loader and Lakeflow Pipeline. + +## Table of Contents +- [Quick Start](#quick-start) + - [Step 1. Configure tables](#step-1-configure-tables) + - [Step 2. Deploy & set up](#step-2-deploy--set-up) + - [Step 3. Retrieve endpoint & push files](#step-3-retrieve-endpoint--push-files) +- [Debug Table Issues](#debug-table-issues) + - [Step 1. Configure tables to debug](#step-1-configure-tables-to-debug) + - [Step 2. Deploy & set up in dev mode](#step-2-deploy--set-up-in-dev-mode) + - [Step 3. Retrieve endpoint & push files to debug](#step-3-retrieve-endpoint--push-files-to-debug) + - [Step 4. Debug table configs](#step-4-debug-table-configs) + - [Step 5. Fix the table configs in production](#step-5-fix-the-table-configs-in-production) + +--- + +## Quick Start + +### Step 1. Configure tables +Edit table configs in `./src/configs/tables.json`. Only `name` and `format` are required. + +Currently supported formats are `csv`, `json`, `avro` and `parquet`. + +For supported `format_options`, see the [Auto Loader options](https://docs.databricks.com/aws/en/ingestion/cloud-object-storage/auto-loader/options). Not all options are supported here. If unsure, specify only `name` and `format`, or follow [Debug Table Issues](#debug-table-issues) to discover the correct options. + +```json +[ + { + "name": "table1", + "format": "csv", + "format_options": + { + "escape": "\"" + }, + "schema_hints": "id int, name string" + }, + { + "name": "table2", + "format": "json" + } +] +``` + +> **Tip:** Keep `schema_hints` minimal; Auto Loader can evolve the schema as new columns appear. + +### Step 2. Deploy & set up + +```bash +databricks bundle deploy +databricks bundle run configuration_job +``` + +Wait for the configuration job to finish before moving on. + +### Step 3. Retrieve endpoint & push files +First, grant write permissions to the volume. This enables the client to push files: + +```bash +databricks bundle open filepush_volume +``` + +Fetch the volume path for uploading files to a specific table (example: `table1`): + +```bash +databricks tables get {{.catalog_name}}.{{.schema_name}}.table1 --output json \ + | jq -r '.properties["filepush.table_volume_path_data"]' +``` + +Example output: + +```text +/Volumes/{{.catalog_name}}/{{.schema_name}}/{{.catalog_name}}_{{.schema_name}}_filepush_volume/data/table1 +``` + +Upload files to the path above using any of the [Volumes file APIs](https://docs.databricks.com/aws/en/volumes/volume-files#methods-for-managing-files-in-volumes). + +**Databricks CLI example** (destination uses the `dbfs:` scheme): + +```bash +databricks fs cp /local/file/path/datafile1.csv \ + dbfs:/Volumes/{{.catalog_name}}/{{.schema_name}}/{{.catalog_name}}_{{.schema_name}}_filepush_volume/data/table1 +``` + +**REST API example**: + +```bash +# prerequisites: export DATABRICKS_HOST and DATABRICKS_TOKEN (PAT token) +curl -X PUT "$DATABRICKS_HOST/api/2.0/fs/files/Volumes/{{.catalog_name}}/{{.schema_name}}/{{.catalog_name}}_{{.schema_name}}_filepush_volume/data/table1/datafile1.csv" \ + -H "Authorization: Bearer $DATABRICKS_TOKEN" \ + -H "Content-Type: application/octet-stream" \ + --data-binary @"/local/file/path/datafile1.csv" +``` + +Within about a minute, the data should appear in the table `{{.catalog_name}}.{{.schema_name}}.table1`. + +--- + +## Debug Table Issues +If data isn’t parsed as expected, use **dev mode** to iterate on table options safely. + +### Step 1. Configure tables to debug +Configure tables as in [Step 1 of Quick Start](#step-1-configure-tables). + +### Step 2. Deploy & set up in **dev mode** + +```bash +databricks bundle deploy -t dev +databricks bundle run configuration_job -t dev +``` + +Wait for the configuration job to finish. Example output: + +```text +2025-09-23 22:03:04,938 [INFO] initialization - ========== +catalog_name: {{.catalog_name}} +schema_name: dev_first_last_{{.schema_name}} +volume_path_root: /Volumes/{{.catalog_name}}/dev_first_last_{{.schema_name}}/{{.catalog_name}}_{{.schema_name}}_filepush_volume +volume_path_data: /Volumes/{{.catalog_name}}/dev_first_last_{{.schema_name}}/{{.catalog_name}}_{{.schema_name}}_filepush_volume/data +volume_path_archive: /Volumes/{{.catalog_name}}/dev_first_last_{{.schema_name}}/{{.catalog_name}}_{{.schema_name}}_filepush_volume/archive +========== +``` + +> **Note:** In **dev mode**, the schema name is **prefixed**. Use the printed schema name for the remaining steps. + +### Step 3. Retrieve endpoint & push files to debug + +Get the dev volume path (note the **prefixed schema**): + +```bash +databricks tables get {{.catalog_name}}.dev_first_last_{{.schema_name}}.table1 --output json \ + | jq -r '.properties["filepush.table_volume_path_data"]' +``` + +Example output: + +```text +/Volumes/{{.catalog_name}}/dev_first_last_{{.schema_name}}/{{.catalog_name}}_{{.schema_name}}_filepush_volume/data/table1 +``` + +Then follow the upload instructions from [Quick Start → Step 3](#step-3-retrieve-endpoint--push-files) to send test files. + +### Step 4. Debug table configs +Open the pipeline in the workspace: + +```bash +databricks bundle open refresh_pipeline -t dev +``` + +Click **Edit pipeline** to launch the development UI. Open the `debug_table_config` notebook and follow its guidance to refine the table options. When satisfied, copy the final config back to `./src/configs/tables.json`. + +### Step 5. Fix the table configs in production +Redeploy the updated config and run a full refresh to correct existing data for an affected table: + +```bash +databricks bundle deploy +databricks bundle run refresh_pipeline --full-refresh table1 +``` + +--- + +**That’s it!** You now have a managed, push-based file ingestion workflow with debuggable table configs and repeatable deployments! + diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/databricks.yml.tmpl b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/databricks.yml.tmpl new file mode 100644 index 00000000..9a8d779c --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/databricks.yml.tmpl @@ -0,0 +1,37 @@ +# databricks.yml +# This is the configuration for the file push DAB dab. + +bundle: + name: {{.schema_name}} + uuid: {{bundle_uuid}} + +include: + - resources/*.yml + +targets: + # The deployment targets. See https://docs.databricks.com/en/dev-tools/bundles/deployment-modes.html + dev: + mode: development + workspace: + host: {{workspace_host}} + + prod: + mode: production + default: true + workspace: + host: {{workspace_host}} + root_path: /Workspace/Users/${workspace.current_user.userName}/.bundle/${bundle.name}/${bundle.target} + permissions: + - user_name: ${workspace.current_user.userName} + level: CAN_MANAGE + +variables: + catalog_name: + description: The existing catalog where the NEW schema will be created. + default: {{.catalog_name}} + schema_name: + description: The name of the NEW schema where the tables will be created. + default: {{.schema_name}} + resource_name_prefix: + description: The prefix for the resource names. + default: ${var.catalog_name}_${var.schema_name}_ diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/job.yml b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/job.yml new file mode 100644 index 00000000..f8fdaac9 --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/job.yml @@ -0,0 +1,46 @@ +# The main job for schema dab +# This job will trigger in the schema pipeline + +resources: + jobs: + filetrigger_job: + name: ${var.resource_name_prefix}filetrigger_job + tasks: + - task_key: pipeline_refresh + pipeline_task: + pipeline_id: ${resources.pipelines.refresh_pipeline.id} + trigger: + file_arrival: + url: ${resources.volumes.filepush_volume.volume_path}/data/ + configuration_job: + name: ${var.resource_name_prefix}configuration_job + tasks: + - task_key: initialization + spark_python_task: + python_file: ../src/utils/initialization.py + parameters: + - "--catalog_name" + - "{{job.parameters.catalog_name}}" + - "--schema_name" + - "{{job.parameters.schema_name}}" + - "--volume_path_root" + - "{{job.parameters.volume_path_root}}" + - "--logging_level" + - "${bundle.target}" + environment_key: serverless + - task_key: trigger_refresh + run_job_task: + job_id: ${resources.jobs.filetrigger_job.id} + depends_on: + - task_key: initialization + environments: + - environment_key: serverless + spec: + client: "3" + parameters: + - name: catalog_name + default: ${var.catalog_name} + - name: schema_name + default: ${resources.schemas.main_schema.name} + - name: volume_path_root + default: ${resources.volumes.filepush_volume.volume_path} diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/pipeline.yml b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/pipeline.yml new file mode 100644 index 00000000..1b325543 --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/pipeline.yml @@ -0,0 +1,16 @@ +# The table refresh pipeline for schema dab + +resources: + pipelines: + refresh_pipeline: + name: ${var.resource_name_prefix}refresh_pipeline + catalog: ${var.catalog_name} + schema: ${resources.schemas.main_schema.name} + serverless: true + libraries: + - file: + path: ../src/ingestion.py + root_path: ../src + configuration: + lakeflow.experimantal.filepush.version: 0.1 + filepush.volume_path_root: ${resources.volumes.filepush_volume.volume_path} diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/schema.yml b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/schema.yml new file mode 100644 index 00000000..72500a02 --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/schema.yml @@ -0,0 +1,7 @@ +# The schema dab + +resources: + schemas: + main_schema: + name: ${var.schema_name} + catalog_name: ${var.catalog_name} diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/volume.yml b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/volume.yml new file mode 100644 index 00000000..ac8929c8 --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/resources/volume.yml @@ -0,0 +1,8 @@ +# The file staging volume for schema dab + +resources: + volumes: + filepush_volume: + name: ${var.resource_name_prefix}filepush_volume + catalog_name: ${var.catalog_name} + schema_name: ${var.schema_name} diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/configs/tables.json b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/configs/tables.json new file mode 100644 index 00000000..0a57f166 --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/configs/tables.json @@ -0,0 +1,18 @@ +[ + { + "name": "example_table_csv", + "format": "csv" + }, + { + "name": "example_table_json", + "format": "json" + }, + { + "name": "example_table_avro", + "format": "avro" + }, + { + "name": "example_table_parquet", + "format": "parquet" + } +] diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/debug_table_config.ipynb b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/debug_table_config.ipynb new file mode 100644 index 00000000..968fc5e4 --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/debug_table_config.ipynb @@ -0,0 +1,157 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "c0609c90-7292-4a15-9e5f-bc5740240c24", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "## This Notebook should be run in a workspace, not locally.\n", + "## Paste the table config JSON you would like to debug from `./configs/tables.json` and assign to variable `table_config`\n", + "For example,\n", + "```\n", + "table_config = r'''\n", + "{\n", + " \"name\": \"all_employees\",\n", + " \"format\": \"csv\",\n", + " \"format_options\": {\n", + " \"escape\": \"\\\"\",\n", + " \"multiLine\": \"false\"\n", + " }\n", + " \"schema_hints\": \"id int, name string\"\n", + "}\n", + "'''\n", + "```\n", + "Only `name` and `format` are required for a table." + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "0065f01d-7a87-4810-9dc9-55e1ab56815f", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "outputs": [], + "source": [ + "table_config = r\"\"\"\n", + " {\n", + " \"name\": \"employees\",\n", + " \"format\": \"csv\",\n", + " \"format_options\": {\n", + " \"escape\": \"\\\"\"\n", + " },\n", + " \"schema_hints\": \"id int, name string\"\n", + " }\n", + "\"\"\"" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "86b00986-55bd-46e1-8258-a8240b7d2e0e", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "## Click `Run all` and inspect the parsed result. Iterate on the config until the result looks good" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "77843022-4cb8-4387-baf7-927705aa104d", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + }, + "jupyter": { + "source_hidden": true + } + }, + "outputs": [], + "source": [ + "import json\n", + "import tempfile\n", + "from utils import tablemanager\n", + "from utils import envmanager\n", + "\n", + "if not envmanager.has_default_storage():\n", + " print(\n", + " \"WARNING: Current catalog is not using default storage, some file push feature may not be available\"\n", + " )\n", + "\n", + "# Load table config\n", + "table_config_json = json.loads(table_config)\n", + "tablemanager.validate_config(table_config_json)\n", + "table_name = table_config_json[\"name\"]\n", + "table_volume_path_data = tablemanager.get_table_volume_path(table_name)\n", + "\n", + "assert tablemanager.has_data_file(table_name), (\n", + " f\"No data file found in {table_volume_path_data}. Please upload at least 1 file to {table_volume_path_data}\"\n", + ")\n", + "\n", + "# Put schema location in temp directory\n", + "with tempfile.TemporaryDirectory() as tmpdir:\n", + " display(tablemanager.get_df_with_config(spark, table_config_json, tmpdir))" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "071ffa2e-f391-451b-a584-5727edd507c0", + "showTitle": false, + "tableResultSettingsMap": {}, + "title": "" + } + }, + "source": [ + "## Copy and paste the modified config back to the `./configs/tables.json` in the DAB folder" + ] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "computePreferences": null, + "dashboards": [], + "environmentMetadata": null, + "inputWidgetPreferences": null, + "language": "python", + "notebookMetadata": { + "pythonIndentUnit": 2 + }, + "notebookName": "debug_table_config", + "widgets": {} + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/ingestion.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/ingestion.py new file mode 100644 index 00000000..e441b5ca --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/ingestion.py @@ -0,0 +1,37 @@ +import dlt +from utils import tablemanager +from utils import formatmanager + + +def _make_append_flow(table_name, table_config, table_volume_path): + def _body(): + # use _rescued_data as placeholder when no data file is present + if not tablemanager.has_data_file(table_name): + return tablemanager.get_placeholder_df_with_config(spark, table_config) + else: + return tablemanager.get_df_with_config(spark, table_config) + + # give the function a unique name + _body.__name__ = f"append_{table_name.lower()}" + + # apply the decorator programmatically + dlt.append_flow(target=table_name, name=table_name)(_body) + + +table_configs = tablemanager.get_configs() + +# create the tables and append flows +for cfg in table_configs: + tablemanager.validate_config(cfg) + tbl = cfg["name"] + path = tablemanager.get_table_volume_path(tbl) + fmt = formatmanager.get_format_manager(cfg["format"]) + expts = fmt.expectations + + dlt.create_streaming_table( + name=tbl, + comment="File push created table", + table_properties={"filepush.table_volume_path_data": path}, + expect_all=expts, + ) + _make_append_flow(tbl, cfg, path) diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/envmanager.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/envmanager.py new file mode 100644 index 00000000..432c9643 --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/envmanager.py @@ -0,0 +1,43 @@ +import os +import json +from databricks.sdk import WorkspaceClient + + +def get_config() -> dict: + json_path = os.path.join( + os.path.dirname(os.path.dirname(__file__)), "configs", "environment.json" + ) + if not os.path.exists(json_path): + raise RuntimeError( + f"Missing environment file: {json_path}. This should be run in a workspace, not locally. And make sure to run `databricks bundle run configuration_job` at least once." + ) + with open(json_path, "r") as f: + configs = json.load(f) + return configs + + +def has_default_storage() -> bool: + catalog = get_config()["catalog_name"] + + w = WorkspaceClient() + + # Try SDK model first + info = w.catalogs.get(catalog) + storage_root = getattr(info, "storage_root", None) + storage_location = getattr(info, "storage_location", None) + props = getattr(info, "properties", {}) or {} + + # Some workspaces expose fields only via raw JSON; fall back if all empty + if not (storage_root or storage_location or props): + j = w.api_client.do("GET", f"/api/2.1/unity-catalog/catalogs/{catalog}") + storage_root = j.get("storage_root") or j.get("storageLocation") + storage_location = j.get("storage_location") or j.get("storageLocation") + props = j.get("properties", {}) or {} + + # Heuristics: any of these indicates “default storage” is set + return bool( + storage_root + or storage_location + or props.get("defaultManagedLocation") + or props.get("delta.defaultLocation") + ) diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py new file mode 100644 index 00000000..368109ed --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/formatmanager.py @@ -0,0 +1,144 @@ +from dataclasses import dataclass +from . import envmanager + + +@dataclass(frozen=True, slots=True) +class AutoLoaderOption: + key: str + value: str + hidden: bool = False + + def __iter__(self): + yield (self.key, self) + + +class AutoLoaderFormat: + def __init__(self): + self.name = None + self.options: set[AutoLoaderOption] = { + AutoLoaderOption("cloudFiles.inferColumnTypes", "true", True), + AutoLoaderOption("cloudFiles.schemaEvolutionMode", "addNewColumns"), + AutoLoaderOption("cloudFiles.cleanSource", "MOVE", True), + AutoLoaderOption("cloudFiles.cleanSource.retentionDuration", "1 day", True), + AutoLoaderOption( + "cloudFiles.cleanSource.moveDestination", + f"{envmanager.get_config()['volume_path_archive']}/{{table_name}}", + True, + ), + } + self.expectations: dict[str, str] = { + "Rescued data should be null": "_rescued_data IS NULL" + } + self.default_schema: set[str] = {"_rescued_data STRING"} + + def get_default_schema(self) -> str: + return ", ".join(self.default_schema) + + def get_userfacing_options(self) -> dict[str, str]: + return {opt.key: opt.value for opt in self.options if not opt.hidden} + + def validate_user_options(self, options: dict[str, str]) -> None: + allowed = set(self.get_userfacing_options()) + illegal = set(options) - allowed + if illegal: + raise ValueError( + f"Unsupported or protected options: {sorted(illegal)}. " + f"Allowed user options: {sorted(allowed)}" + ) + + def get_modified_options(self, options: dict[str, str]) -> dict[str, str]: + self.validate_user_options(options) + defaults = self.get_userfacing_options() + return {k: v for k, v in options.items() if k in defaults and v != defaults[k]} + + def get_merged_options( + self, options: dict[str, str], table_name: str, is_placeholder: bool = False + ) -> dict[str, str]: + self.validate_user_options(options) + defaults = {opt.key: opt.value for opt in self.options} + + merged = defaults.copy() + merged.update({k: v for k, v in options.items() if k in defaults}) + + # Do not specify schema evolution mode in placeholder + if is_placeholder: + merged.pop("cloudFiles.schemaEvolutionMode", None) + + # Format the moveDestination with table_name + move_dest_key = "cloudFiles.cleanSource.moveDestination" + if move_dest_key in merged: + merged[move_dest_key] = merged[move_dest_key].format(table_name=table_name) + + return merged + + +class CSV(AutoLoaderFormat): + def __init__(self): + super().__init__() + self.name = "CSV" + self.options |= { + AutoLoaderOption("header", "true"), + AutoLoaderOption("mergeSchema", "true", True), + AutoLoaderOption("mode", "PERMISSIVE", True), + AutoLoaderOption("columnNameOfCorruptRecord", "_corrupt_record", True), + AutoLoaderOption("delimiter", ","), + AutoLoaderOption("escape", '"'), + AutoLoaderOption("multiLine", "false"), + } + self.expectations |= { + "Corrupted record should be null": "_corrupt_record IS NULL" + } + self.default_schema |= {"_corrupt_record STRING"} + + +class JSON(AutoLoaderFormat): + def __init__(self): + super().__init__() + self.name = "JSON" + self.options |= { + AutoLoaderOption("mergeSchema", "true", True), + AutoLoaderOption("mode", "PERMISSIVE", True), + AutoLoaderOption("columnNameOfCorruptRecord", "_corrupt_record", True), + AutoLoaderOption("allowComments", "true"), + AutoLoaderOption("allowSingleQuotes", "true"), + AutoLoaderOption("inferTimestamp", "true"), + AutoLoaderOption("multiLine", "true"), + } + self.expectations |= { + "Corrupted record should be null": "_corrupt_record IS NULL" + } + self.default_schema |= {"_corrupt_record STRING"} + + +class AVRO(AutoLoaderFormat): + def __init__(self): + super().__init__() + self.name = "AVRO" + self.options |= { + AutoLoaderOption("mergeSchema", "true", True), + } + + +class PARQUET(AutoLoaderFormat): + def __init__(self): + super().__init__() + self.name = "PARQUET" + self.options |= { + AutoLoaderOption("mergeSchema", "true", True), + } + + +_supported_formats: dict[str, AutoLoaderFormat] = { + f.name: f for f in (CSV(), JSON(), AVRO(), PARQUET()) +} + + +def get_format_manager(fmt: str) -> dict[str, str]: + key = fmt.strip().upper() + try: + return _supported_formats[key] + except KeyError: + supported = ", ".join(sorted(_supported_formats)) + raise ValueError( + f"{fmt!r} is not a supported format. Supported formats: {supported}" + ) diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/initialization.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/initialization.py new file mode 100644 index 00000000..7182281a --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/initialization.py @@ -0,0 +1,74 @@ +from databricks.sdk import WorkspaceClient +import argparse +import json +import logging + +# Parse arguments +parser = argparse.ArgumentParser() +parser.add_argument("--catalog_name", type=str, required=True) +parser.add_argument("--schema_name", type=str, required=True) +parser.add_argument("--volume_path_root", type=str, required=True) +parser.add_argument("--logging_level", type=str, required=False, default="dev") +args = parser.parse_args() + +catalog_name = args.catalog_name +schema_name = args.schema_name +volume_path_root = args.volume_path_root +volume_path_data = args.volume_path_root + "/data" +volume_path_archive = args.volume_path_root + "/archive" +logging_level = logging.DEBUG if args.logging_level == "dev" else logging.INFO + +# Logging +logging.basicConfig( + level=logging_level, format="%(asctime)s [%(levelname)s] %(module)s - %(message)s" +) +logger = logging.getLogger(__name__) # per-module logger + +# Initialize workspace client +ws = WorkspaceClient() + +# Set property to schema +logger.info(f"Setting property to schema {catalog_name}.{schema_name}") +logger.debug(f"Volume path root: {volume_path_root}") +logger.debug(f"Volume path data: {volume_path_data}") +ws.schemas.update( + full_name=f"{catalog_name}.{schema_name}", + properties={ + "filepush.volume_path_root": volume_path_root, + "filepush.volume_path_data": volume_path_data, + "filepush.volume_path_data": volume_path_archive, + }, +) +logger.info(f"Schema {catalog_name}.{schema_name} configured") + +# Initialize volume folder structure +logger.info(f"Initializing volume folder structure {volume_path_root}") +logger.debug(f"Creating data directory {volume_path_data}") +ws.files.create_directory(volume_path_data) +logger.debug(f"Creating archive directory {volume_path_archive}") +ws.files.create_directory(volume_path_archive) +with open("../configs/tables.json", "r") as f: + for table in json.load(f): + table_volume_path_data = f"{volume_path_data}/{table['name']}" + logger.debug(f"Creating table directory {table_volume_path_data}") + ws.files.create_directory(table_volume_path_data) + table_volume_path_archive = f"{volume_path_archive}/{table['name']}" + logger.debug(f"Creating table archive directory {table_volume_path_archive}") + ws.files.create_directory(table_volume_path_archive) +logger.info(f"Volume {volume_path_root} configured") + +# Dump configs to environment json +all_configs = { + "catalog_name": catalog_name, + "schema_name": schema_name, + "volume_path_root": volume_path_root, + "volume_path_data": volume_path_data, + "volume_path_archive": volume_path_archive, +} +with open("../configs/environment.json", "w") as f: + json.dump(all_configs, f) + +logger.info( + f"==========\n%s\n==========", + "\n".join(f"{k}: {v}" for k, v in all_configs.items()), +) diff --git a/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/tablemanager.py b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/tablemanager.py new file mode 100644 index 00000000..c411bb1e --- /dev/null +++ b/contrib/templates/file-push/template/{{.catalog_name}}.{{.schema_name}}/src/utils/tablemanager.py @@ -0,0 +1,142 @@ +import os +import json +from . import envmanager +from . import formatmanager +from pyspark.sql.streaming import DataStreamReader +from pyspark.sql import DataFrame, SparkSession +from databricks.sdk import WorkspaceClient +from databricks.sdk.errors.platform import NotFound + + +def validate_config(table_config: dict): + if not table_config.get("name"): + raise ValueError("name is required for table config") + if not table_config.get("format"): + raise ValueError("format is required for table config") + + +def validate_configs(table_configs: list): + names = [cfg.get("name") for cfg in table_configs] + duplicates = set( + [name for name in names if names.count(name) > 1 and name is not None] + ) + if duplicates: + raise ValueError( + f"Duplicate table names found in table configs: {sorted(duplicates)}" + ) + for table_config in table_configs: + validate_config(table_config) + + +def get_configs() -> list: + json_path = os.path.join( + os.path.dirname(os.path.dirname(__file__)), "configs", "tables.json" + ) + if not os.path.exists(json_path): + raise RuntimeError( + f"Missing table configs file: {json_path}. Please following README.md to create one, deploy and run configuration_job." + ) + with open(json_path, "r") as f: + configs = json.load(f) + validate_configs(configs) + return configs + + +def get_table_volume_path(table_name: str) -> str: + ws = WorkspaceClient() + table_volume_path_data = os.path.join( + envmanager.get_config()["volume_path_data"], table_name + ) + try: + ws.files.get_directory_metadata(table_volume_path_data) + except NotFound: + raise RuntimeError( + f"Table data path not found for table `{table_name}`. Have you run `databricks bundle run configuration_job`?" + ) + return table_volume_path_data + + +def has_data_file(table_name: str) -> bool: + ws = WorkspaceClient() + table_volume_path_data = get_table_volume_path(table_name) + try: + iter = ws.files.list_directory_contents(table_volume_path_data) + next(iter) + except StopIteration: + return False + return True + + +def is_table_created(table_name: str) -> bool: + ws = WorkspaceClient() + return ws.tables.exists( + full_name=f"{envmanager.get_config()['catalog_name']}.{envmanager.get_config()['schema_name']}.{table_name}" + ).table_exists + + +def _apply_table_options( + reader: DataStreamReader, table_config: dict, fmt_mgr, is_placeholder: bool = False +) -> DataStreamReader: + name = table_config.get("name") + fmt = table_config.get("format") + + # format options + user_fmt_opts = table_config.get("format_options", {}) + final_fmt_opts = fmt_mgr.get_merged_options(user_fmt_opts, name, is_placeholder) + reader = reader.option("cloudFiles.format", fmt) + for k, v in final_fmt_opts.items(): + reader = reader.option(k, v) + + # schema hints + schema_hints = table_config.get("schema_hints") + + use_schema = ( + final_fmt_opts.get("cloudFiles.schemaEvolutionMode", "").lower() == "rescue" + or is_placeholder + ) + + # schema_hints goes first, then default schema entries (ordered) + base_schema = [] + if schema_hints: + base_schema.append(schema_hints) + base_schema.extend(fmt_mgr.default_schema) + + joined_schema = ", ".join(base_schema) + + if use_schema: + reader = reader.schema(joined_schema) + else: + reader = reader.option("cloudFiles.schemaHints", joined_schema) + + return reader + + +def get_df_with_config( + spark: SparkSession, table_config: dict, schema_location: str = None +) -> DataFrame: + validate_config(table_config) + fmt = table_config.get("format") + fmt_mgr = formatmanager.get_format_manager(fmt) + + reader = spark.readStream.format("cloudFiles") + reader = _apply_table_options(reader, table_config, fmt_mgr) + if schema_location: + reader = reader.option("cloudFiles.schemaLocation", schema_location) + + # include file metadata + return reader.load(get_table_volume_path(table_config.get("name"))).selectExpr( + "*", "_metadata" + ) + + +def get_placeholder_df_with_config( + spark: SparkSession, table_config: dict +) -> DataFrame: + validate_config(table_config) + fmt = table_config.get("format") + fmt_mgr = formatmanager.get_format_manager(fmt) + + reader = spark.readStream.format("cloudFiles") + reader = _apply_table_options(reader, table_config, fmt_mgr, is_placeholder=True) + + return reader.load(get_table_volume_path(table_config.get("name")))