MCP Server for DQX#1252
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #1252 +/- ##
==========================================
- Coverage 92.56% 92.45% -0.11%
==========================================
Files 102 102
Lines 10075 10075
==========================================
- Hits 9326 9315 -11
- Misses 749 760 +11
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
✅ 788/788 passed, 41 skipped, 4h27m56s total Running from acceptance #4952 |
|
✅ 194/194 passed, 2 skipped, 6h19m37s total Running from anomaly #1066 |
| spec: | ||
| environment_version: "2" | ||
| dependencies: | ||
| - databricks-labs-dqx |
There was a problem hiding this comment.
pls leave a comment that this should be pinned in produtction
There was a problem hiding this comment.
Added an inline comment on the dependency noting it's unpinned for dev and should be pinned to an explicit version in production so runner environments stay reproducible. (The line now also carries the [datacontract] extra for the new contract-based rule generation.)
There was a problem hiding this comment.
Review: MCP Server for DQX
Strong PR — the OBO + definer's-rights-view + SP-job architecture is the right pattern, and it neatly avoids needing a Jobs scope at the user auth level. Nice touches: the 4.5 MB notebook.exit() guard, pure-ASGI OBO middleware, idempotent setup grants, and a fixed OPERATIONS dispatch (no arbitrary code exec).
A few things to address before merge — inline comments below. Two summary-level points:
Rebase on main (stale base). The root pyproject.toml mypy-comment change reintroduces a reference to apx dev check, but PR #1223 (already merged) removed apx and replaced it with first-party scripts (bun run tsc -b + basedpyright). The branch is behind main; please rebase so this drift is resolved.
Add an integration test with Genie (see inline on the tests). app.py is explicitly tuned for "Genie Code compatibility" (stateless_http, json_response, CORS preflight) — but nothing exercises that path. An end-to-end test that drives the MCP server through Databricks Genie (Genie Code as the MCP client) over the deployed app would protect exactly the behaviour these settings exist for.
Priorities: the rebase are blockers; CORS and the in-memory run-state are important follow-ups.
| # ── Jobs API — async submit + poll ─────────────────────────────────── | ||
|
|
||
| # Track pending runs: run_id → metadata (view to clean up, table name, etc.) | ||
| _pending_runs: dict[int, dict[str, Any]] = {} |
There was a problem hiding this comment.
_pending_runs is process-local in-memory state. On a Databricks App that can restart or run multiple replicas, this means: (1) orphaned temp views leak in the shared tmp schema if the user never polls or the app restarts, and (2) a get_run_result landing on a different replica won't find the metadata (no view cleanup, can't re-attach table_name). Consider TTL-based view cleanup (the view name has a UUID — adding a timestamp would let a sweeper drop stale views) or reconstructing cleanup from job tags.
There was a problem hiding this comment.
So this is an interesting one, The App/MCP SP is not the schema owner, and has no MANAGE. The schema is owned by whoever ran databricks bundle run dqx_setup (the comment shows a human admin via --profile). Each temp view is owned by the OBO user who created it. A sweeper running as the SP cannot drop user-created views. UC will reject the DROP. our best option is to give the app SP manage on the tmp schema.
|
|
||
| mock_job.assert_called_once_with("list_available_checks", {}) | ||
|
|
||
| def test_get_workflow_returns_steps(self): |
There was a problem hiding this comment.
Add an integration test with Genie. The server is explicitly built for "Genie Code compatibility" (stateless_http/json_response/CORS preflight in app.py), but no test exercises that end-to-end path. An integration test that registers the deployed MCP app with Databricks Genie and drives the documented workflow (get_table_schema → profile_table → generate_rules → run_checks, polling get_run_result) would protect the OBO header propagation and the async submit/poll contract that these settings exist for. Unit coverage of the server layer here is otherwise good.
There was a problem hiding this comment.
Will evaluate feasibility of this, I doubt if we have any APIs yet for Genie Code. The UCode integration is using just API Gateway https://github.com/databricks/ucode
| } | ||
|
|
||
| @mcp_server.tool | ||
| def generate_rules(profiles: list[dict], criticality: str = "error"): |
There was a problem hiding this comment.
it would be good to provide option to generate rules based on user input as well, both are supported today in DQX Core. I would leave one method and make user input optional
There was a problem hiding this comment.
good shout. Let's think about this @souravg-db2 - there are two approaches for this -
- we already have a purpose built DQX Agent which has DQX specific instructions and validation we wire that as a tool?
- another is the calling Agent itself analyses the user need and only leverages granular tools that we provide via this MCP. I just think perhaps the output would be wildly different on basis of the quality of the calling agent.
There was a problem hiding this comment.
Hi @mwojtyczka @souravg-db2 - I tried this out I think I am leaning on leaving the MCP granular and deterministic with composable primitives. seems to be what Anthropic also advises https://www.anthropic.com/engineering/writing-tools-for-agents
mwojtyczka
left a comment
There was a problem hiding this comment.
Going in the right direction, left some comments to address
|
|
||
| elapsed += poll_interval | ||
| if elapsed < max_wait: | ||
| time.sleep(poll_interval) |
There was a problem hiding this comment.
get_run_status blocks here for up to 90s (time.sleep(10) × 9) before returning. The MCP tools are sync def, so FastMCP runs each call in an anyio worker thread — the blocking sleep won't stall the event loop, but two costs remain: (1) it holds the HTTP connection open for up to 90s per get_run_result poll, which can trip client/proxy timeouts (Genie Code / Databricks Apps front-end), and (2) it occupies one of the limited anyio threadpool tokens for the whole wait, so a burst of concurrent polls can saturate the pool and stall unrelated tool calls. Since the tool already returns status: running for the caller to retry, consider a much shorter internal wait (a single short poll) and let the client drive the cadence.
mwojtyczka
left a comment
There was a problem hiding this comment.
Closed some previous comments, there are still open one and a few new.
Close the gap between rule generation and operational use in the DQX MCP server: - load_checks / save_checks: retrieve and persist rule sets from a table, UC volume, or workspace file (backend inferred from the location string) - apply_checks_and_save_to_table: operationalized run_checks that writes valid/quarantine rows to Delta tables instead of returning a sample - generate_rules_from_contract: derive checks from an ODCS data contract (deterministic by default; no LLM) Each tool reuses the existing async submit + OBO temp-view pattern. The runner gains matching operations and the [datacontract] extra; get_workflow documents the new tools. Adds unit tests (50 passing). Co-authored-by: Isaac
Address review comments on the MCP server: - pyproject.toml: reduce dependencies to what the server actually imports (fastmcp, starlette, uvicorn, databricks-sdk). Drop unused fastapi and databricks-labs-dqx (the latter is only used by the runner notebook, which installs it via the job environment spec in databricks.yml). mcp/pydantic remain as transitive deps via fastmcp. - requirements.txt: align exactly with pyproject so the runtime manifest and the dev/packaging manifest can no longer drift; add starlette, which was imported directly but only listed here before. - uv.lock: regenerated via `make lock-mcp-dependencies`. - utils.py: remove dead code (get_workspace_client, make_json_safe) — neither is referenced by the server or tests. Co-authored-by: Isaac
Add an inline comment in databricks.yml flagging that databricks-labs-dqx is unpinned for development and should be pinned to an explicit version for production runner job environments, per review feedback. Co-authored-by: Isaac
Temp views are created via the user's OBO token and are owned by that user. In Unity Catalog only the view owner, a principal with MANAGE, the parent-schema owner, or a metastore admin can DROP a view — so the app SP (granted only USE SCHEMA + SELECT) could not drop them, and the existing SP-side view cleanup was failing silently and leaking views in the shared tmp schema. Transfer ownership of catalog.tmp to the app SP in setup.py so it can manage the lifecycle of temp views created by any user. Data governance is unaffected: the views are definer's-rights, so the SP still reads source data as the creating user, never directly. Ownership transfer runs after the GRANTs (so they are issued while the setup principal still owns the schema) and is idempotent. Co-authored-by: Isaac
Cleanup previously relied on _pending_runs, a process-local in-memory dict. On a Databricks App that can restart or run multiple replicas this leaked temp views (when the user never polled, the app restarted, or a poll landed on a different replica) and lost the table_name re-attach. Remove that state entirely: - runner.py: the runner job drops its own input temp view in a finally (it runs as the SP, which now owns the temp schema) and echoes table_name into its result. Cleanup now happens in the guaranteed job execution, independent of whether/where the user polls. - utils.py: temp view names carry a creation epoch (v_<epoch>_<uuid>); add a throttled, best-effort sweep_stale_views() backstop (invoked from submit_job_async) that drops orphaned views older than a TTL — for the rare case a job never ran or died before its own cleanup. Remove _pending_runs and the metadata parameter; get_run_status is now stateless (a pure function of run_id) and no longer drops views or re-attaches table_name. - tools.py: pass table_name in the job params (echoed back by the runner) instead of server-side metadata. - tests: update for the stateless model; add sweeper coverage. Co-authored-by: Isaac
The previous implementation slept up to 90s (10s x 9) inside get_run_status, holding the HTTP connection open (risking client/proxy timeouts) and pinning an anyio worker thread for the whole wait (saturating the pool under concurrent polls). MCP's model for long-running work is requestor-driven polling, so do a single status check and return immediately: 'completed', 'failed', or 'running'. The client drives the poll cadence (the tool already returns 'running' to retry). Co-authored-by: Isaac
Adds two test layers above the existing handler unit tests, per the testing-pyramid research: - test_mcp_protocol.py: drives the server through FastMCP's in-memory Client over the REAL MCP protocol (capability negotiation, tool registration/schemas, call_tool dispatch) with the workspace boundary faked. Asserts all 12 tools are exposed with schemas and that representative tools behave correctly through the protocol — coverage the direct-handler tests could not provide. - test_app_http.py: integration tests over the real ASGI app (combined_app) via Starlette TestClient — health route, MCP initialize/tools-list over Streamable HTTP, and OBO token propagation (header -> OBOAuthMiddleware -> contextvar -> get_obo_client), including rejection when the forwarded token is absent. Closes the "add a Genie integration test" review request. CORS remains covered by test_cors.py. Full mcp-server suite: 62 passed. No new test dependencies (starlette/fastmcp only). Co-authored-by: Isaac
Adds an LLM-in-the-loop integration test (MCPEval-style) that connects a real MCP client to the deployed server, hands a tool-calling Databricks model-serving endpoint the server's tool schemas plus a natural-language instruction, and asserts the model DISCOVERS and INVOKES the right tools — verifying the tools are usable by an arbitrary agent, not just our own code. Follows the DQX integration patterns: gated/skipped unless a deployed server + workspace LLM endpoint are configured (DQX_MCP_SERVER_URL / DATABRICKS_HOST / DATABRICKS_TOKEN), with a reachability probe-and-skip mirroring the anomaly ai_query_endpoint fixture, and structural/trajectory assertions (tool was called, sensible final answer) rather than exact-match on non-deterministic LLM output. Registers the 'integration' pytest marker. Verified live against the deployed mcp-dqx-vb: the model autonomously called get_table_schema and answered with the real columns. Skips cleanly in unit CI (62 passed, 1 skipped). Co-authored-by: Isaac
The CI fmt job runs `make fmt` (black ., ruff --fix, mypy, pylint) then `git diff --exit-code`. black traverses into mcp-server/ (only [tool.mypy] excludes it, not [tool.black]), and three recently-added files were not black-formatted. Reformat them via black so the fmt gate passes — no lint suppressions or config exclusions added. Verified: `make fmt` now leaves the tree clean (black 353 unchanged, ruff passed, mypy ok, pylint 10.00/10); mcp-server suite still 62 passed, 1 skipped. Co-authored-by: Isaac
…gration test Previously the agent-in-the-loop test pointed at the shared, pre-existing samples.nyctaxi.trips and created no resources — so it did not follow DQX's scaffold-and-teardown convention. Add a `dq_test_table` yield-fixture that creates its own isolated schema+table via the Databricks SDK, yields the FQN, and drops the table on teardown (runs even if the test fails) — replicating the pytester factory(create, delete) guarantee. The mcp-server test env lacks the root project's pytester fixtures, so the equivalent is built with the SDK. The test now asserts the agent reports the columns of OUR scaffolded table, removing the dependency on an external sample table. Gated on DQX_MCP_TEST_CATALOG in addition to the server/host/token env. Verified live against the field-eng workspace: table created, agent read it back, table dropped (no residue); make fmt clean; suite still 62 passed / 1 skipped without env. Co-authored-by: Isaac
The runner job previously installed databricks-labs-dqx[datacontract] from PyPI, so it ran a published release rather than the source in this repo — drift between what's tested/deployed and what the PR actually changes. Mirror the app/ bundle: build the DQX library wheel from the repo root (`uv build ../ --wheel --out-dir ./.build`) as a bundle artifact, sync .build, and point the runner job's serverless environment at `./.build/databricks_labs_dqx-*.whl` (+ datacontract-cli for the [datacontract] extra, since pip extras can't be applied to a wheel path). This matches app/databricks.yml's task-runner job, which installs `./.build/databricks_labs_dqx-*.whl` the same way. The MCP server process needs no change — it has no DQX dependency; only the runner job does. Verified on the field-eng workspace: deploy built and uploaded the wheel; the runner job env resolved to the uploaded artifact wheel (not the PyPI package); a runner job ran successfully on it (list_available_checks returned 76 functions). Co-authored-by: Isaac
The MCP integration test needs a deployed Databricks App, which the library's acceptance/anomaly harness (ephemeral workspace + pytest via the sandbox/acceptance action) does not provision. Add a dedicated workflow that deploys an isolated copy of the MCP bundle to a persistent CI workspace, runs the test suite against it, then tears it down. - databricks.yml: parameterize resource names/catalog/secret-scope via bundle variables (name_prefix, catalog_name, config_secret_scope) with defaults preserving current behaviour, so CI can deploy an isolated copy via --var without forking the file. - scripts/ci_deploy.sh: deploy bundle (builds the in-repo DQX wheel) + run setup + start/deploy the app, emit the app URL. Verified end-to-end against a live workspace. - scripts/ci_destroy.sh: best-effort teardown (bundle destroy + drop CI secret scope). - .github/workflows/mcp-integration.yml: not-a-fork gate -> deploy -> make mcp-test (runs the live agent integration test) -> teardown. Requires a maintainer to set the CI workspace secrets/vars documented in the workflow header (DQX_MCP_CI_HOST/TOKEN, DQX_MCP_CI_CATALOG) and to pin the setup-cli action SHA. Co-authored-by: Isaac
The integration workflow's job now skips cleanly unless vars.DQX_MCP_CI_CATALOG is set, so an unconfigured CI environment no longer red-blocks every mcp-server PR (the deterministic MCP tests still run in push.yml's `mcp` job). ci_deploy.sh now fails fast naming the exact secret/var to set (DQX_MCP_CI_HOST / DQX_MCP_CI_TOKEN / DQX_MCP_CI_CATALOG) when run without them. Co-authored-by: Isaac
…space/auth) Rework the MCP integration test to follow the anomaly suite instead of a bespoke CI workspace + token: - tests/integration_mcp/: a suite driven by the shared acceptance harness, reusing its workspace, authentication, fixed TEST_CATALOG, make_schema (create+teardown), and Model Serving endpoint (DQX_AI_QUERY_TEST_ENDPOINT, default databricks-claude-sonnet-4-5). A session-scoped `deployed_mcp` fixture deploys ONE isolated MCP app for the whole session and tears it down (cleanup runs on deploy failure too); the agent test creates its table via SQL (no Spark dependency). - .github/workflows/mcp.yml: mirrors anomaly.yml exactly (not-a-fork gate, tool env, setup-env, prebuild-wheel, databrickslabs/sandbox/acceptance with vault/OIDC, codegen_path tests/integration_mcp/.codegen.json) — no MCP-specific secrets. - Remove the bespoke mcp-integration.yml (new workspace + DQX_MCP_CI_* secrets) and the now-moved mcp-server/tests/test_integration_agent.py + its pytest marker. - ci_deploy.sh: wait for compute_status==ACTIVE before `apps deploy` (a fresh app's app_status can't be RUNNING pre-deploy), and use if-blocks for the GITHUB_OUTPUT/ENV writes so the script exits 0 on success when run locally. Verified end-to-end against the field-eng workspace (dqx-mcp): deploy -> agent loop (model selected get_table_schema and reported the real columns) -> teardown, 1 passed. Catalog is overridable via DQX_MCP_TEST_CATALOG for local runs. Co-authored-by: Isaac
|
❌ 1 skipped, 1s total Running from mcp #2 |
…RICKS_TOKEN The MCP integration suite shelled out to ci_deploy.sh and made raw HTTP calls (serving endpoint + the app's /mcp host) that read DATABRICKS_HOST/TOKEN straight from the environment. The acceptance harness only authenticates the SDK client and never exports a token, so CI failed with "DATABRICKS_TOKEN is not set". Add a session-scoped workspace_auth fixture that mints (host, bearer) via the SDK's config.authenticate(), working under the acceptance action's OIDC auth, a local profile, or env vars alike. Thread host/token into the deploy script env and the test's HTTP helpers so no DATABRICKS_TOKEN needs to be set anywhere. Verified end-to-end against a live workspace: deploy -> app start/deploy -> agent discovers and invokes get_table_schema -> teardown, no leaked resources. Co-authored-by: Isaac
Changes
Adds an MCP (Model Context Protocol) server for DQX, exposing DQX's data-quality
capabilities as tools that any MCP-compatible AI agent (Claude, Genie Code, Cursor, Mosaic AI)
can discover and orchestrate. It runs as a Databricks App with on-behalf-of (OBO)
authentication, so all data access is governed by the calling user's Unity Catalog permissions.
Architecture
no PySpark/DQX dependency.
over the source table using the user's forwarded token (
X-Forwarded-Access-Token), so theservice principal reads data as the user, never directly.
job and return a
run_idimmediately; the client pollsget_run_result.get_run_statusdoesa single non-blocking poll (the client drives cadence) — this matches MCP's canonical
long-running-tool model and avoids holding the HTTP connection / saturating the worker pool.
stateless_http+json_response+ CORS preflight scoped toDatabricks domains.
Tools
get_workflowget_table_schemaprofile_tablegenerate_rulesgenerate_rules_from_contractvalidate_checksrun_checksapply_checks_and_save_to_tablesave_checks/load_checkslist_available_checksget_run_resultTemp-view lifecycle (stateless, restart/replica-safe)
finally(it runs as the SP, which owns thetemp schema — see setup), so cleanup happens in the guaranteed job execution regardless of
whether/where the user polls.
v_<epoch>_<uuid>view names) reaps any orphans whosejob never ran. No per-request server state is kept, so app restarts / multiple replicas don't
leak views or lose context.
Security & governance
tmpschema (object lifecycle), not the underlying data.and backtick-quoted; log values sanitized (CWE-117); catalog name sourced from a Databricks secret.
Deployment
databricks.yml): runner job, one-time setup job (UC grants + schemaownership), and the app.
requirements.txt(the App's runtime manifest) andpyproject.tomlarekept in sync; the runner installs
databricks-labs-dqx[datacontract].maketargets andfmt/CI wiring for the sub-project.Linked issues
Resolves #1045
Tests
Layered MCP-server test suite (62 passing, deterministic, no workspace needed for CI):
test_tools.py)Clientover the real MCP protocol (test_mcp_protocol.py)initialize/tools-list, and OBO tokenpropagation (
test_app_http.py); CORS policy (test_cors.py)endpoint): a tool-calling model is handed the tool schemas and an instruction, and we assert it
discovers and invokes the right tools (
test_integration_agent.py)Documentation and Demos
docs/dqx/docs/guide/dqx_mcp_server.mdx)This description was written by Isaac.