diff --git a/dbt_sql/dbt_profiles/profiles.yml b/dbt_sql/dbt_profiles/profiles.yml index 0d757ae6..295615e7 100644 --- a/dbt_sql/dbt_profiles/profiles.yml +++ b/dbt_sql/dbt_profiles/profiles.yml @@ -4,35 +4,35 @@ dbt_sql: target: dev # default target outputs: - # Doing local development with the dbt CLI? - # Then you should create your own profile in your .dbt/profiles.yml using 'dbt init' - # (See README.md) - - # The default target when deployed with the Databricks CLI - # N.B. when you use dbt from the command line, it uses the profile from .dbt/profiles.yml - dev: - type: databricks - method: http - catalog: catalog - schema: "{{ var('dev_schema') }}" - - http_path: /sql/1.0/warehouses/abcdef1234567890 - - # The workspace host / token are provided by Databricks - # see databricks.yml for the workspace host used for 'dev' - host: "{{ env_var('DBT_HOST') }}" - token: "{{ env_var('DBT_ACCESS_TOKEN') }}" - - # The production target when deployed with the Databricks CLI - prod: - type: databricks - method: http - catalog: catalog - schema: default - - http_path: /sql/1.0/warehouses/abcdef1234567890 - - # The workspace host / token are provided by Databricks - # see databricks.yml for the workspace host used for 'prod' - host: "{{ env_var('DBT_HOST') }}" - token: "{{ env_var('DBT_ACCESS_TOKEN') }}" + # Doing local development with the dbt CLI? + # Then you should create your own profile in your .dbt/profiles.yml using 'dbt init' + # (See README.md) + + # The default target when deployed with the Databricks CLI + # N.B. when you use dbt from the command line, it uses the profile from .dbt/profiles.yml + dev: + type: databricks + method: http + catalog: catalog + schema: "{{ var('dev_schema') }}" + + http_path: /sql/1.0/warehouses/abcdef1234567890 + + # The workspace host / token are provided by Databricks + # see databricks.yml for the workspace host used for 'dev' + host: "{{ env_var('DBT_HOST') }}" + token: "{{ env_var('DBT_ACCESS_TOKEN') }}" + + # The production target when deployed with the Databricks CLI + prod: + type: databricks + method: http + catalog: catalog + schema: default + + http_path: /sql/1.0/warehouses/abcdef1234567890 + + # The workspace host / token are provided by Databricks + # see databricks.yml for the workspace host used for 'prod' + host: "{{ env_var('DBT_HOST') }}" + token: "{{ env_var('DBT_ACCESS_TOKEN') }}" diff --git a/dbt_sql/resources/dbt_sql.job.yml b/dbt_sql/resources/dbt_sql.job.yml index db1d1d43..67224361 100644 --- a/dbt_sql/resources/dbt_sql.job.yml +++ b/dbt_sql/resources/dbt_sql.job.yml @@ -15,6 +15,7 @@ resources: tasks: - task_key: dbt + environment_key: default dbt_task: project_directory: ../ # The default schema, catalog, etc. are defined in ../dbt_profiles/profiles.yml @@ -25,17 +26,9 @@ resources: - 'dbt seed --target=${bundle.target} --vars "{ dev_schema: ${workspace.current_user.short_name} }"' - 'dbt run --target=${bundle.target} --vars "{ dev_schema: ${workspace.current_user.short_name} }"' - libraries: - - pypi: - package: dbt-databricks>=1.8.0,<2.0.0 - - new_cluster: - spark_version: 15.4.x-scala2.12 - node_type_id: i3.xlarge - data_security_mode: SINGLE_USER - num_workers: 0 - spark_conf: - spark.master: "local[*, 4]" - spark.databricks.cluster.profile: singleNode - custom_tags: - ResourceClass: SingleNode + environments: + - environment_key: default + spec: + environment_version: "2" + dependencies: + - dbt-databricks>=1.8.0,<2.0.0 diff --git a/default_python/src/pipeline.ipynb b/default_python/src/pipeline.ipynb new file mode 100644 index 00000000..7c551388 --- /dev/null +++ b/default_python/src/pipeline.ipynb @@ -0,0 +1,90 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "9a626959-61c8-4bba-84d2-2a4ecab1f7ec", + "showTitle": false, + "title": "" + } + }, + "source": [ + "# Lakeflow Declarative Pipeline\n", + "\n", + "This Lakeflow Declarative Pipeline definition is executed using a pipeline defined in resources/default_python.pipeline.yml." + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "9198e987-5606-403d-9f6d-8f14e6a4017f", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# Import DLT and src/default_python\n", + "import dlt\n", + "import sys\n", + "\n", + "sys.path.append(spark.conf.get(\"bundle.sourcePath\", \".\"))\n", + "from pyspark.sql.functions import expr\n", + "from default_python import main" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "3fc19dba-61fd-4a89-8f8c-24fee63bfb14", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "@dlt.view\n", + "def taxi_raw():\n", + " return main.find_all_taxis()\n", + "\n", + "\n", + "@dlt.table\n", + "def filtered_taxis():\n", + " return dlt.read(\"taxi_raw\").filter(expr(\"fare_amount < 30\"))" + ] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "dashboards": [], + "language": "python", + "notebookMetadata": { + "pythonIndentUnit": 2 + }, + "notebookName": "pipeline", + "widgets": {} + }, + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.11.4" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/default_python/tests/conftest.py b/default_python/tests/conftest.py new file mode 100644 index 00000000..f80cb439 --- /dev/null +++ b/default_python/tests/conftest.py @@ -0,0 +1,59 @@ +"""This file configures pytest.""" + +import os, sys, pathlib +from contextlib import contextmanager + + +try: + from databricks.connect import DatabricksSession + from databricks.sdk import WorkspaceClient + from pyspark.sql import SparkSession + import pytest +except ImportError: + raise ImportError( + "Test dependencies not found.\n\nRun tests using 'uv run pytest'. See http://docs.astral.sh/uv to learn more about uv." + ) + + +def enable_fallback_compute(): + """Enable serverless compute if no compute is specified.""" + conf = WorkspaceClient().config + if conf.serverless_compute_id or conf.cluster_id or os.environ.get("SPARK_REMOTE"): + return + + url = "https://docs.databricks.com/dev-tools/databricks-connect/cluster-config" + print("☁️ no compute specified, falling back to serverless compute", file=sys.stderr) + print(f" see {url} for manual configuration", file=sys.stderr) + + os.environ["DATABRICKS_SERVERLESS_COMPUTE_ID"] = "auto" + + +@contextmanager +def allow_stderr_output(config: pytest.Config): + """Temporarily disable pytest output capture.""" + capman = config.pluginmanager.get_plugin("capturemanager") + if capman: + with capman.global_and_fixture_disabled(): + yield + else: + yield + + +def pytest_configure(config: pytest.Config): + """Configure pytest session.""" + with allow_stderr_output(config): + enable_fallback_compute() + + # Initialize Spark session eagerly, so it is available even when + # SparkSession.builder.getOrCreate() is used. For DB Connect 15+, + # we validate version compatibility with the remote cluster. + if hasattr(DatabricksSession.builder, "validateSession"): + DatabricksSession.builder.validateSession().getOrCreate() + else: + DatabricksSession.builder.getOrCreate() + + +@pytest.fixture(scope="session") +def spark() -> SparkSession: + """Provide a SparkSession fixture for tests.""" + return DatabricksSession.builder.getOrCreate() diff --git a/lakeflow_pipelines_python/README.md b/lakeflow_pipelines_python/README.md index b4270849..b910a0f5 100644 --- a/lakeflow_pipelines_python/README.md +++ b/lakeflow_pipelines_python/README.md @@ -1,10 +1,8 @@ # lakeflow_pipelines_python -The 'lakeflow_pipelines_python' project was generated by using the Lakeflow Pipelines template. +The 'lakeflow_pipelines_python' project was generated by using the default template. -* `lib/`: Python source code for this project. -* `lib/shared`: Shared source code across all jobs/pipelines/etc. -* `resources/pipelines_python_etl`: Pipeline code and assets for the pipelines_python_etl pipeline. +* `src/`: Python source code for this project. * `resources/`: Resource configurations (jobs, pipelines, etc.) ## Getting started @@ -46,7 +44,7 @@ with this project. It's also possible to interact with it directly using the CLI $ databricks bundle deploy --target prod ``` Note the default template has a includes a job that runs the pipeline every day - (defined in resources/pipelines_python_etl/pipelines_python_job.job.yml). The schedule + (defined in resources/sample_job.job.yml). The schedule is paused when deploying in development mode (see https://docs.databricks.com/dev-tools/bundles/deployment-modes.html). diff --git a/lakeflow_pipelines_python/pyproject.toml b/lakeflow_pipelines_python/pyproject.toml new file mode 100644 index 00000000..7886f28b --- /dev/null +++ b/lakeflow_pipelines_python/pyproject.toml @@ -0,0 +1,29 @@ +[project] +name = "lakeflow_pipelines_python" +version = "0.0.1" +authors = [{ name = "user@company.com" }] +requires-python = ">=3.10,<=3.13" +dependencies = [ + # Any dependencies for jobs and pipelines in this project can be added here + # See also https://docs.databricks.com/dev-tools/bundles/library-dependencies + # + # LIMITATION: for pipelines, dependencies are cached during development; + # add dependencies to the 'environment' section of pipeline.yml file instead +] + +[dependency-groups] +dev = [ + "pytest", + "databricks-dlt", + "databricks-connect>=15.4,<15.5", +] + +[project.scripts] +main = "lakeflow_pipelines_python.main:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.black] +line-length = 125 diff --git a/lakeflow_pipelines_python/resources/pipelines_python_etl.pipeline.yml b/lakeflow_pipelines_python/resources/pipelines_python_etl.pipeline.yml new file mode 100644 index 00000000..bd1f31bf --- /dev/null +++ b/lakeflow_pipelines_python/resources/pipelines_python_etl.pipeline.yml @@ -0,0 +1,20 @@ +# The main pipeline for lakeflow_pipelines_python + +resources: + pipelines: + pipelines_python_etl: + name: pipelines_python_etl + catalog: ${var.catalog} + schema: ${var.schema} + serverless: true + root_path: "../src/pipelines_python_etl" + + libraries: + - glob: + include: ../src/pipelines_python_etl/transformations/** + + environment: + dependencies: + # We include every dependency defined by pyproject.toml by defining an editable environment + # that points to the folder where pyproject.toml is deployed. + - --editable ${workspace.file_path} diff --git a/lakeflow_pipelines_python/resources/sample_job.job.yml b/lakeflow_pipelines_python/resources/sample_job.job.yml new file mode 100644 index 00000000..3fe7701b --- /dev/null +++ b/lakeflow_pipelines_python/resources/sample_job.job.yml @@ -0,0 +1,32 @@ +# A sample job for lakeflow_pipelines_python. + +resources: + jobs: + sample_job: + name: sample_job + + trigger: + # Run this job every day, exactly one day from the last run; see https://docs.databricks.com/api/workspace/jobs/create#trigger + periodic: + interval: 1 + unit: DAYS + + #email_notifications: + # on_failure: + # - your_email@example.com + + parameters: + - name: catalog + default: ${var.catalog} + - name: schema + default: ${var.schema} + + tasks: + - task_key: refresh_pipeline + pipeline_task: + pipeline_id: ${resources.pipelines.pipelines_python_etl.id} + + environments: + - environment_key: default + spec: + environment_version: "2" diff --git a/lakeflow_pipelines_python/src/pipelines_python_etl/README.md b/lakeflow_pipelines_python/src/pipelines_python_etl/README.md new file mode 100644 index 00000000..ba1c3dd8 --- /dev/null +++ b/lakeflow_pipelines_python/src/pipelines_python_etl/README.md @@ -0,0 +1,20 @@ +# lakeflow_pipelines_python + +This folder defines all source code for the lakeflow_pipelines_python pipeline: + +- `explorations/`: Ad-hoc notebooks used to explore the data processed by this pipeline. +- `transformations/`: All dataset definitions and transformations. +- `utilities/` (optional): Utility functions and Python modules used in this pipeline. +- `data_sources/` (optional): View definitions describing the source data for this pipeline. + +## Getting Started + +To get started, go to the `transformations` folder -- most of the relevant source code lives there: + +* By convention, every dataset under `transformations` is in a separate file. +* Take a look at the sample called "sample_trips_lakeflow_pipelines_python.py" to get familiar with the syntax. + Read more about the syntax at https://docs.databricks.com/dlt/python-ref.html. +* If you're using the workspace UI, use `Run file` to run and preview a single transformation. +* If you're using the CLI, use `databricks bundle run pipelines_python_etl --select sample_trips_lakeflow_pipelines_python` to run a single transformation. + +For more tutorials and reference material, see https://docs.databricks.com/dlt. diff --git a/lakeflow_pipelines_python/src/pipelines_python_etl/transformations/sample_trips_lakeflow_pipelines_python.py b/lakeflow_pipelines_python/src/pipelines_python_etl/transformations/sample_trips_lakeflow_pipelines_python.py new file mode 100644 index 00000000..6a48a79b --- /dev/null +++ b/lakeflow_pipelines_python/src/pipelines_python_etl/transformations/sample_trips_lakeflow_pipelines_python.py @@ -0,0 +1,12 @@ +from pyspark import pipelines as dp +from pyspark.sql.functions import col + + +# This file defines a sample transformation. +# Edit the sample below or add new transformations +# using "+ Add" in the file browser. + + +@dp.table +def sample_trips_lakeflow_pipelines_python(): + return spark.read.table("samples.nyctaxi.trips") diff --git a/lakeflow_pipelines_python/src/pipelines_python_etl/transformations/sample_zones_lakeflow_pipelines_python.py b/lakeflow_pipelines_python/src/pipelines_python_etl/transformations/sample_zones_lakeflow_pipelines_python.py new file mode 100644 index 00000000..4c52bf52 --- /dev/null +++ b/lakeflow_pipelines_python/src/pipelines_python_etl/transformations/sample_zones_lakeflow_pipelines_python.py @@ -0,0 +1,17 @@ +from pyspark import pipelines as dp +from pyspark.sql.functions import col, sum + + +# This file defines a sample transformation. +# Edit the sample below or add new transformations +# using "+ Add" in the file browser. + + +@dp.table +def sample_zones_lakeflow_pipelines_python(): + # Read from the "sample_trips" table, then sum all the fares + return ( + spark.read.table(f"sample_trips_lakeflow_pipelines_python") + .groupBy(col("pickup_zip")) + .agg(sum("fare_amount").alias("total_fare")) + ) diff --git a/lakeflow_pipelines_sql/README.md b/lakeflow_pipelines_sql/README.md index 20eba12a..1de3c6c3 100644 --- a/lakeflow_pipelines_sql/README.md +++ b/lakeflow_pipelines_sql/README.md @@ -1,10 +1,8 @@ # lakeflow_pipelines_sql -The 'lakeflow_pipelines_sql' project was generated by using the Lakeflow Pipelines template. +The 'lakeflow_pipelines_sql' project was generated by using the default template. -* `lib/`: Python source code for this project. -* `lib/shared`: Shared source code across all jobs/pipelines/etc. -* `resources/pipelines_sql_etl`: Pipeline code and assets for the pipelines_sql_etl pipeline. +* `src/`: SQL source code for this project. * `resources/`: Resource configurations (jobs, pipelines, etc.) ## Getting started @@ -46,7 +44,7 @@ with this project. It's also possible to interact with it directly using the CLI $ databricks bundle deploy --target prod ``` Note the default template has a includes a job that runs the pipeline every day - (defined in resources/pipelines_sql_etl/pipelines_sql_job.job.yml). The schedule + (defined in resources/sample_job.job.yml). The schedule is paused when deploying in development mode (see https://docs.databricks.com/dev-tools/bundles/deployment-modes.html). diff --git a/lakeflow_pipelines_sql/pyproject.toml b/lakeflow_pipelines_sql/pyproject.toml new file mode 100644 index 00000000..5e53a638 --- /dev/null +++ b/lakeflow_pipelines_sql/pyproject.toml @@ -0,0 +1,29 @@ +[project] +name = "lakeflow_pipelines_sql" +version = "0.0.1" +authors = [{ name = "user@company.com" }] +requires-python = ">=3.10,<=3.13" +dependencies = [ + # Any dependencies for jobs and pipelines in this project can be added here + # See also https://docs.databricks.com/dev-tools/bundles/library-dependencies + # + # LIMITATION: for pipelines, dependencies are cached during development; + # add dependencies to the 'environment' section of pipeline.yml file instead +] + +[dependency-groups] +dev = [ + "pytest", + "databricks-dlt", + "databricks-connect>=15.4,<15.5", +] + +[project.scripts] +main = "lakeflow_pipelines_sql.main:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.black] +line-length = 125 diff --git a/lakeflow_pipelines_sql/resources/pipelines_sql_etl.pipeline.yml b/lakeflow_pipelines_sql/resources/pipelines_sql_etl.pipeline.yml new file mode 100644 index 00000000..e8e5c181 --- /dev/null +++ b/lakeflow_pipelines_sql/resources/pipelines_sql_etl.pipeline.yml @@ -0,0 +1,20 @@ +# The main pipeline for lakeflow_pipelines_sql + +resources: + pipelines: + pipelines_sql_etl: + name: pipelines_sql_etl + catalog: ${var.catalog} + schema: ${var.schema} + serverless: true + root_path: "../src/pipelines_sql_etl" + + libraries: + - glob: + include: ../src/pipelines_sql_etl/transformations/** + + environment: + dependencies: + # We include every dependency defined by pyproject.toml by defining an editable environment + # that points to the folder where pyproject.toml is deployed. + - --editable ${workspace.file_path} diff --git a/lakeflow_pipelines_sql/resources/sample_job.job.yml b/lakeflow_pipelines_sql/resources/sample_job.job.yml new file mode 100644 index 00000000..d883b05c --- /dev/null +++ b/lakeflow_pipelines_sql/resources/sample_job.job.yml @@ -0,0 +1,32 @@ +# A sample job for lakeflow_pipelines_sql. + +resources: + jobs: + sample_job: + name: sample_job + + trigger: + # Run this job every day, exactly one day from the last run; see https://docs.databricks.com/api/workspace/jobs/create#trigger + periodic: + interval: 1 + unit: DAYS + + #email_notifications: + # on_failure: + # - your_email@example.com + + parameters: + - name: catalog + default: ${var.catalog} + - name: schema + default: ${var.schema} + + tasks: + - task_key: refresh_pipeline + pipeline_task: + pipeline_id: ${resources.pipelines.pipelines_sql_etl.id} + + environments: + - environment_key: default + spec: + environment_version: "2" diff --git a/lakeflow_pipelines_sql/src/pipelines_sql_etl/README.md b/lakeflow_pipelines_sql/src/pipelines_sql_etl/README.md new file mode 100644 index 00000000..b1178a57 --- /dev/null +++ b/lakeflow_pipelines_sql/src/pipelines_sql_etl/README.md @@ -0,0 +1,20 @@ +# lakeflow_pipelines_sql + +This folder defines all source code for the lakeflow_pipelines_sql pipeline: + +- `explorations/`: Ad-hoc notebooks used to explore the data processed by this pipeline. +- `transformations/`: All dataset definitions and transformations. +- `utilities/` (optional): Utility functions and Python modules used in this pipeline. +- `data_sources/` (optional): View definitions describing the source data for this pipeline. + +## Getting Started + +To get started, go to the `transformations` folder -- most of the relevant source code lives there: + +* By convention, every dataset under `transformations` is in a separate file. +* Take a look at the sample called "sample_trips_lakeflow_pipelines_sql.py" to get familiar with the syntax. + Read more about the syntax at https://docs.databricks.com/dlt/python-ref.html. +* If you're using the workspace UI, use `Run file` to run and preview a single transformation. +* If you're using the CLI, use `databricks bundle run pipelines_sql_etl --select sample_trips_lakeflow_pipelines_sql` to run a single transformation. + +For more tutorials and reference material, see https://docs.databricks.com/dlt. diff --git a/lakeflow_pipelines_sql/src/pipelines_sql_etl/transformations/sample_trips_lakeflow_pipelines_sql.sql b/lakeflow_pipelines_sql/src/pipelines_sql_etl/transformations/sample_trips_lakeflow_pipelines_sql.sql new file mode 100644 index 00000000..336cda70 --- /dev/null +++ b/lakeflow_pipelines_sql/src/pipelines_sql_etl/transformations/sample_trips_lakeflow_pipelines_sql.sql @@ -0,0 +1,10 @@ +-- This file defines a sample transformation. +-- Edit the sample below or add new transformations +-- using "+ Add" in the file browser. + +CREATE MATERIALIZED VIEW sample_trips_lakeflow_pipelines_sql AS +SELECT + pickup_zip, + fare_amount, + trip_distance +FROM samples.nyctaxi.trips diff --git a/lakeflow_pipelines_sql/src/pipelines_sql_etl/transformations/sample_zones_lakeflow_pipelines_sql.sql b/lakeflow_pipelines_sql/src/pipelines_sql_etl/transformations/sample_zones_lakeflow_pipelines_sql.sql new file mode 100644 index 00000000..5f5c567d --- /dev/null +++ b/lakeflow_pipelines_sql/src/pipelines_sql_etl/transformations/sample_zones_lakeflow_pipelines_sql.sql @@ -0,0 +1,10 @@ +-- This file defines a sample transformation. +-- Edit the sample below or add new transformations +-- using "+ Add" in the file browser. + +CREATE MATERIALIZED VIEW sample_zones_lakeflow_pipelines_sql AS +SELECT + pickup_zip, + SUM(fare_amount) AS total_fare +FROM sample_trips_lakeflow_pipelines_sql +GROUP BY pickup_zip