Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 32 additions & 32 deletions dbt_sql/dbt_profiles/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,35 @@ dbt_sql:
target: dev # default target
outputs:

# Doing local development with the dbt CLI?
# Then you should create your own profile in your .dbt/profiles.yml using 'dbt init'
# (See README.md)

# The default target when deployed with the Databricks CLI
# N.B. when you use dbt from the command line, it uses the profile from .dbt/profiles.yml
dev:
type: databricks
method: http
catalog: catalog
schema: "{{ var('dev_schema') }}"

http_path: /sql/1.0/warehouses/abcdef1234567890

# The workspace host / token are provided by Databricks
# see databricks.yml for the workspace host used for 'dev'
host: "{{ env_var('DBT_HOST') }}"
token: "{{ env_var('DBT_ACCESS_TOKEN') }}"

# The production target when deployed with the Databricks CLI
prod:
type: databricks
method: http
catalog: catalog
schema: default

http_path: /sql/1.0/warehouses/abcdef1234567890

# The workspace host / token are provided by Databricks
# see databricks.yml for the workspace host used for 'prod'
host: "{{ env_var('DBT_HOST') }}"
token: "{{ env_var('DBT_ACCESS_TOKEN') }}"
# Doing local development with the dbt CLI?
# Then you should create your own profile in your .dbt/profiles.yml using 'dbt init'
# (See README.md)

# The default target when deployed with the Databricks CLI
# N.B. when you use dbt from the command line, it uses the profile from .dbt/profiles.yml
dev:
type: databricks
method: http
catalog: catalog
schema: "{{ var('dev_schema') }}"

http_path: /sql/1.0/warehouses/abcdef1234567890

# The workspace host / token are provided by Databricks
# see databricks.yml for the workspace host used for 'dev'
host: "{{ env_var('DBT_HOST') }}"
token: "{{ env_var('DBT_ACCESS_TOKEN') }}"

# The production target when deployed with the Databricks CLI
prod:
type: databricks
method: http
catalog: catalog
schema: default

http_path: /sql/1.0/warehouses/abcdef1234567890

# The workspace host / token are provided by Databricks
# see databricks.yml for the workspace host used for 'prod'
host: "{{ env_var('DBT_HOST') }}"
token: "{{ env_var('DBT_ACCESS_TOKEN') }}"
21 changes: 7 additions & 14 deletions dbt_sql/resources/dbt_sql.job.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ resources:

tasks:
- task_key: dbt
environment_key: default
dbt_task:
project_directory: ../
# The default schema, catalog, etc. are defined in ../dbt_profiles/profiles.yml
Expand All @@ -25,17 +26,9 @@ resources:
- 'dbt seed --target=${bundle.target} --vars "{ dev_schema: ${workspace.current_user.short_name} }"'
- 'dbt run --target=${bundle.target} --vars "{ dev_schema: ${workspace.current_user.short_name} }"'

libraries:
- pypi:
package: dbt-databricks>=1.8.0,<2.0.0

new_cluster:
spark_version: 15.4.x-scala2.12
node_type_id: i3.xlarge
data_security_mode: SINGLE_USER
num_workers: 0
spark_conf:
spark.master: "local[*, 4]"
spark.databricks.cluster.profile: singleNode
custom_tags:
ResourceClass: SingleNode
environments:
- environment_key: default
spec:
environment_version: "2"
dependencies:
- dbt-databricks>=1.8.0,<2.0.0
90 changes: 90 additions & 0 deletions default_python/src/pipeline.ipynb

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is not referred to anywhere.

Does this come from a released CLI?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that pipeline sources are all in a directory, do we still need the notebook or can it be removed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still the old template from main 🤷 We'll get rid of this soon!

Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {},
"inputWidgets": {},
"nuid": "9a626959-61c8-4bba-84d2-2a4ecab1f7ec",
"showTitle": false,
"title": ""
}
},
"source": [
"# Lakeflow Declarative Pipeline\n",
"\n",
"This Lakeflow Declarative Pipeline definition is executed using a pipeline defined in resources/default_python.pipeline.yml."
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {},
"inputWidgets": {},
"nuid": "9198e987-5606-403d-9f6d-8f14e6a4017f",
"showTitle": false,
"title": ""
}
},
"outputs": [],
"source": [
"# Import DLT and src/default_python\n",
"import dlt\n",
"import sys\n",
"\n",
"sys.path.append(spark.conf.get(\"bundle.sourcePath\", \".\"))\n",
"from pyspark.sql.functions import expr\n",
"from default_python import main"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {},
"inputWidgets": {},
"nuid": "3fc19dba-61fd-4a89-8f8c-24fee63bfb14",
"showTitle": false,
"title": ""
}
},
"outputs": [],
"source": [
"@dlt.view\n",
"def taxi_raw():\n",
" return main.find_all_taxis()\n",
"\n",
"\n",
"@dlt.table\n",
"def filtered_taxis():\n",
" return dlt.read(\"taxi_raw\").filter(expr(\"fare_amount < 30\"))"
]
}
],
"metadata": {
"application/vnd.databricks.v1+notebook": {
"dashboards": [],
"language": "python",
"notebookMetadata": {
"pythonIndentUnit": 2
},
"notebookName": "pipeline",
"widgets": {}
},
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.11.4"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
59 changes: 59 additions & 0 deletions default_python/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""This file configures pytest."""

import os, sys, pathlib
from contextlib import contextmanager


try:
from databricks.connect import DatabricksSession
from databricks.sdk import WorkspaceClient
from pyspark.sql import SparkSession
import pytest
except ImportError:
raise ImportError(
"Test dependencies not found.\n\nRun tests using 'uv run pytest'. See http://docs.astral.sh/uv to learn more about uv."
)


def enable_fallback_compute():
"""Enable serverless compute if no compute is specified."""
conf = WorkspaceClient().config
if conf.serverless_compute_id or conf.cluster_id or os.environ.get("SPARK_REMOTE"):
return

url = "https://docs.databricks.com/dev-tools/databricks-connect/cluster-config"
print("☁️ no compute specified, falling back to serverless compute", file=sys.stderr)
print(f" see {url} for manual configuration", file=sys.stderr)

os.environ["DATABRICKS_SERVERLESS_COMPUTE_ID"] = "auto"


@contextmanager
def allow_stderr_output(config: pytest.Config):
"""Temporarily disable pytest output capture."""
capman = config.pluginmanager.get_plugin("capturemanager")
if capman:
with capman.global_and_fixture_disabled():
yield
else:
yield


def pytest_configure(config: pytest.Config):
"""Configure pytest session."""
with allow_stderr_output(config):
enable_fallback_compute()

# Initialize Spark session eagerly, so it is available even when
# SparkSession.builder.getOrCreate() is used. For DB Connect 15+,
# we validate version compatibility with the remote cluster.
if hasattr(DatabricksSession.builder, "validateSession"):
DatabricksSession.builder.validateSession().getOrCreate()
else:
DatabricksSession.builder.getOrCreate()


@pytest.fixture(scope="session")
def spark() -> SparkSession:
"""Provide a SparkSession fixture for tests."""
return DatabricksSession.builder.getOrCreate()
8 changes: 3 additions & 5 deletions lakeflow_pipelines_python/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
# lakeflow_pipelines_python

The 'lakeflow_pipelines_python' project was generated by using the Lakeflow Pipelines template.
The 'lakeflow_pipelines_python' project was generated by using the default template.

* `lib/`: Python source code for this project.
* `lib/shared`: Shared source code across all jobs/pipelines/etc.
* `resources/pipelines_python_etl`: Pipeline code and assets for the pipelines_python_etl pipeline.
* `src/`: Python source code for this project.
* `resources/`: Resource configurations (jobs, pipelines, etc.)

## Getting started
Expand Down Expand Up @@ -46,7 +44,7 @@ with this project. It's also possible to interact with it directly using the CLI
$ databricks bundle deploy --target prod
```
Note the default template has a includes a job that runs the pipeline every day
(defined in resources/pipelines_python_etl/pipelines_python_job.job.yml). The schedule
(defined in resources/sample_job.job.yml). The schedule
is paused when deploying in development mode (see
https://docs.databricks.com/dev-tools/bundles/deployment-modes.html).

Expand Down
29 changes: 29 additions & 0 deletions lakeflow_pipelines_python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[project]
name = "lakeflow_pipelines_python"
version = "0.0.1"
authors = [{ name = "user@company.com" }]
requires-python = ">=3.10,<=3.13"
dependencies = [
# Any dependencies for jobs and pipelines in this project can be added here
# See also https://docs.databricks.com/dev-tools/bundles/library-dependencies
#
# LIMITATION: for pipelines, dependencies are cached during development;
# add dependencies to the 'environment' section of pipeline.yml file instead
]

[dependency-groups]
dev = [
"pytest",
"databricks-dlt",
"databricks-connect>=15.4,<15.5",
]

[project.scripts]
main = "lakeflow_pipelines_python.main:main"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.black]
line-length = 125
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# The main pipeline for lakeflow_pipelines_python

resources:
pipelines:
pipelines_python_etl:
name: pipelines_python_etl
catalog: ${var.catalog}
schema: ${var.schema}
serverless: true
root_path: "../src/pipelines_python_etl"

libraries:
- glob:
include: ../src/pipelines_python_etl/transformations/**

environment:
dependencies:
# We include every dependency defined by pyproject.toml by defining an editable environment
# that points to the folder where pyproject.toml is deployed.
- --editable ${workspace.file_path}
32 changes: 32 additions & 0 deletions lakeflow_pipelines_python/resources/sample_job.job.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# A sample job for lakeflow_pipelines_python.

resources:
jobs:
sample_job:
name: sample_job

trigger:
# Run this job every day, exactly one day from the last run; see https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS

#email_notifications:
# on_failure:
# - your_email@example.com

parameters:
- name: catalog
default: ${var.catalog}
- name: schema
default: ${var.schema}

tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipelines_python_etl.id}

environments:
- environment_key: default
spec:
environment_version: "2"
20 changes: 20 additions & 0 deletions lakeflow_pipelines_python/src/pipelines_python_etl/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# lakeflow_pipelines_python

This folder defines all source code for the lakeflow_pipelines_python pipeline:

- `explorations/`: Ad-hoc notebooks used to explore the data processed by this pipeline.
- `transformations/`: All dataset definitions and transformations.
- `utilities/` (optional): Utility functions and Python modules used in this pipeline.
- `data_sources/` (optional): View definitions describing the source data for this pipeline.

## Getting Started

To get started, go to the `transformations` folder -- most of the relevant source code lives there:

* By convention, every dataset under `transformations` is in a separate file.
* Take a look at the sample called "sample_trips_lakeflow_pipelines_python.py" to get familiar with the syntax.
Read more about the syntax at https://docs.databricks.com/dlt/python-ref.html.
* If you're using the workspace UI, use `Run file` to run and preview a single transformation.
* If you're using the CLI, use `databricks bundle run pipelines_python_etl --select sample_trips_lakeflow_pipelines_python` to run a single transformation.

For more tutorials and reference material, see https://docs.databricks.com/dlt.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from pyspark import pipelines as dp
from pyspark.sql.functions import col


# This file defines a sample transformation.
# Edit the sample below or add new transformations
# using "+ Add" in the file browser.


@dp.table
def sample_trips_lakeflow_pipelines_python():
return spark.read.table("samples.nyctaxi.trips")
Loading