Skip to content

gopidesupavan/qualink

Repository files navigation

qualink

Official Website

Blazing fast data quality framework for Python, built on Apache DataFusion.

Features

  • High Performance: Leverages Apache DataFusion for fast data processing and validation.
  • Flexible Constraints: Supports various data quality constraints including completeness, uniqueness, and custom assertions.
  • YAML Configuration: Define validation suites declaratively using YAML files.
  • CLI – qualinkctl: Run YAML-driven validations from the terminal — no Python script required.
  • Cloud Object Stores: Read data directly from Amazon S3 (and S3-compatible services).
  • Multiple Output Formats: Results can be formatted as human-readable text, JSON, or Markdown.
  • Async Support: Built with asyncio for non-blocking operations.
  • Analyzers: Compute reusable dataset and column metrics independent of pass/fail checks.
  • Metrics Repository: Persist analyzer outputs over time using tagged result keys.
  • Anomaly Detection: Detect unexpected metric shifts from historical baselines.
  • Intelligent Rule Suggestions: Generate candidate validation rules from column profiles.
  • Easy Integration: Simple API for defining and running validation suites.

Installation

Install qualink using uv:

uv add qualink

Or using pip:

pip install qualink

Quick Start

Here's a basic example of using qualink to validate a CSV file:

import asyncio
from datafusion import SessionContext
from qualink.checks import Check, Level
from qualink.constraints import Assertion
from qualink.core import ValidationSuite
from qualink.formatters import MarkdownFormatter


async def main() -> None:
    ctx = SessionContext()
    ctx.register_csv("users", "examples/users.csv")

    result = await (
        ValidationSuite()
        .on_data(ctx, "users")
        .with_name("User Data Quality")
        .add_check(Check.builder("Critical Checks").with_level(Level.ERROR).is_complete("user_id").build())
        .add_check(
            Check.builder("Data Quality")
            .with_level(Level.WARNING)
            .has_completeness("name", Assertion.greater_than_or_equal(0.95))
            .build()
        )
        .run()
    )

    print(MarkdownFormatter().format(result))


if __name__ == "__main__":
    asyncio.run(main())

YAML Configuration

You can also define validation suites using YAML files for a declarative approach:

suite:
  name: "User Data Quality"

data_sources:
  - name: users_source
    format: csv
    path: "examples/users.csv"
    table_name: users

checks:
  - name: "Critical Checks"
    level: error
    rules:
      - is_complete: user_id
      - is_unique: email
      - has_size:
          gt: 0
  - name: "Data Quality"
    level: warning
    rules:
      - has_completeness:
          column: name
          gte: 0.95

Run the YAML configuration:

import asyncio
from qualink.config import run_yaml
from qualink.formatters import HumanFormatter


async def main() -> None:
    result = await run_yaml("path/to/your/config.yaml")
    print(HumanFormatter().format(result))


if __name__ == "__main__":
    asyncio.run(main())

run_yaml() also accepts filesystem URIs such as s3://my-bucket/checks.yaml or file:///absolute/path/to/checks.yaml, in addition to local file paths and inline YAML strings.

CLI – qualinkctl

The simplest way to run a YAML validation is with qualinkctl:

# Human-readable output (default)
uv run qualinkctl checks.yaml

# JSON output
uv run qualinkctl checks.yaml -f json

# Markdown report saved to file
uv run qualinkctl checks.yaml -f markdown -o report.md

# JSON report written to object storage
uv run qualinkctl checks.yaml -f json -o s3://my-bucket/qualink/results.json

# Show all constraints (including passed) with debug logging
uv run qualinkctl checks.yaml --show-passed -v

qualinkctl exits with code 0 on success and 1 on failure, making it easy to use in CI/CD pipelines:

uv run qualinkctl checks.yaml -f json -o results.json || echo "Validation failed!"

Run uv run qualinkctl --help for a full list of options.

Advanced Features

Runnable end-to-end examples are available in:

  • examples/adbc_sqlite_example.py
  • examples/analyzers_example.py
  • examples/metrics_repository_example.py
  • examples/anomaly_detection_example.py
  • examples/intelligent_rule_suggestions_example.py
  • examples/output_results_example.py
  • examples/file_uri_validation.py

ADBC Datasources

qualink can also register database-backed sources through ADBC and materialize them into DataFusion tables before running checks.

SQLite example shape:

connections:
  sqlite_local:
    uri: sqlite:///tmp/users.db

data_sources:
  - name: users_source
    connection: sqlite_local
    table: users
    table_name: users

To run the SQLite example after installing the optional ADBC packages:

uv sync --group adbc
uv run python examples/adbc_sqlite_example.py

Secret-backed Connections

Sensitive connection values can be resolved inline from environment variables, AWS Systems Manager Parameter Store, AWS Secrets Manager, or GCP Secret Manager.

Example:

connections:
  sqlite_local:
    uri:
      from: env
      key: QUALINK_SQLITE_URI

data_sources:
  - name: users_source
    connection: sqlite_local
    table: users
    table_name: users

AWS SSM example:

connections:
  postgres_prod:
    uri:
      from: aws_ssm
      key: /qualink/prod/postgres/uri
      region: us-east-1

AWS Secrets Manager JSON field extraction:

connections:
  snowflake_prod:
    uri:
      from: aws_secretsmanager
      key: qualink/prod/snowflake
      field: uri
      region: eu-west-1

The checked-in reference config is examples/secret_backed_connections.yaml.

Result Outputs to Filesystems

Validation results can be written to local paths or filesystem URIs backed by PyArrow filesystems such as S3, GCS, and Azure Blob/Data Lake.

CLI example:

uv run qualinkctl checks.yaml -f json -o s3://my-bucket/qualink/results.json
uv run qualinkctl checks.yaml -f markdown -o gs://my-bucket/qualink/report.md

YAML-driven outputs:

outputs:
  - path: reports/results.json
    format: json
    show_passed: true
  - uri: s3://my-bucket/qualink/results.md
    format: markdown

Python API example:

from qualink.config import run_yaml
from qualink.config.parser import load_yaml
from qualink.output import OutputService, normalize_output_specs

config = load_yaml("examples/output_results.yaml")
result = await run_yaml("examples/output_results.yaml")
OutputService().emit_many(result, normalize_output_specs(config))

S3 Object Store Sources

qualink can read data directly from Amazon S3 using DataFusion's built-in AmazonS3:

suite:
  name: "Cloud Data Quality"

data_sources:
  - name: users_source
    format: parquet
    path: s3://my-data-lake/data/users.parquet
    table_name: users

checks:
  - name: "Completeness"
    level: error
    rules:
      - is_complete: user_id
      - is_unique: email

Use the standard AWS credential chain. On Glue, ECS, EKS, or EC2 with an attached role, explicit keys are usually not required.

Constraints

qualink supports the following constraint types:

  • Completeness: Ensures a column has no null values or meets a minimum completeness ratio.
  • Uniqueness: Checks for duplicate values in a column.
  • Assertion: Custom assertions using SQL expressions.

Formatters

Results can be formatted using:

  • HumanFormatter: Human-readable text output.
  • JsonFormatter: JSON format for programmatic processing.
  • MarkdownFormatter: Markdown tables for documentation.

Benchmarks

qualink ships with a real-world benchmark suite that validates ~42 million NYC Yellow Taxi trip records (654 MB of Parquet data) through 12 check groups and 92 constraints — in under 1.5 seconds.

========================================================================
  qualink Benchmark — NYC Taxi Trips
========================================================================
  Parquet files : 3
  Total size    : 654.3 MB
  Data dir      : benchmarks/data
  YAML config   : benchmarks/nyc_taxi_validation.yaml

    • data-200901.parquet  (211.9 MB)
    • data-201206.parquet  (231.1 MB)
    • data-201501.parquet  (211.3 MB)
========================================================================

⏱  Running benchmark with 'human' formatter …

Verification PASSED: NYC Taxi Trips – qualink Benchmark Suite

Checks          12
Constraints     92
Passed          91
Failed          1
Skipped         0
Pass rate       98.9%
Execution time  1440 ms

Status    Check       Message
--------  ----------  ---------------------------------------------
[FAIL]    Uniqueness  Uniqueness of (id) is 0.0000, expected >= 1.0

========================================================================
  Status         : ✅ PASSED
  Total records  : 41.94M
  Wall-clock     : 1.455s
  Checks         : 12
  Constraints    : 92
  Passed         : 91
  Failed         : 1
  Pass rate      : 98.9%
  Engine time    : 0.02m
========================================================================

Run it yourself

# 1. Download data (parquet files from public S3)
./benchmarks/download_data.sh 3

# 2. Run the benchmark
uv run python benchmarks/run_benchmark.py

# Other output formats
uv run python benchmarks/run_benchmark.py --format markdown
uv run python benchmarks/run_benchmark.py --format json

See benchmarks/README.md for full dataset details and configuration.

Development

To set up the development environment:

git clone https://github.com/gopidesupavan/qualink.git
cd qualink
uv sync

Run tests:

uv run pytest

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

Acknowledgments

About

Data quality validation, profiling, anomaly detection, and YAML-driven checks for Python on Apache DataFusion.

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors