Skip to content

Pipeline Guide

Joseph T. French edited this page Mar 10, 2026 · 5 revisions

Pipeline Guide

RoboSystems uses Dagster for all data orchestration. Data flows from external sources through adapters, staging (DuckDB), and into the knowledge graph (LadybugDB).

Related documentation:


Current Pipelines

Pipeline Status Tracking
SEC Production-ready #117
QuickBooks In Development #118
Plaid Scaffolded Coming soon

Infrastructure:

  • Shared replicas deployment: #113

Two Pipeline Patterns

RoboSystems uses two distinct patterns based on data source characteristics:

Pattern A: Arelle-Based (SEC)

XBRL Files β†’ Arelle Processing β†’ Parquet β†’ DuckDB Staging β†’ LadybugDB
            (semantic extraction)   (already graph-shaped)
  • Used for: SEC EDGAR filings (XBRL format)
  • Why: Arelle extracts XBRL semantics (concepts, contexts, facts) directly into graph-compatible structures
  • No dbt needed: Output is already graph-shaped

Pattern B: dbt-Based (QuickBooks, Plaid, Custom)

API JSON β†’ S3 Raw β†’ dbt transforms β†’ S3 Processed β†’ DuckDB Staging β†’ LadybugDB
          (chunks)  (JSON β†’ Parquet)  (graph-shaped)
  • Used for: API-based integrations (QuickBooks, Plaid, custom ERPs)
  • Why: Raw JSON needs transformation into graph-compatible node/relationship Parquet
  • dbt provides: SQL-based transforms, testing, documentation

Adapter Structure

Each adapter is self-contained: client, processors, and pipeline (Dagster orchestration) all live together. dagster/definitions.py collects adapter pipelines via the get_dagster_components() discovery pattern.

robosystems/adapters/
β”œβ”€β”€ base.py                     # SharedRepositoryManifest dataclass
β”œβ”€β”€ sec/                        # SEC EDGAR adapter (self-contained)
β”‚   β”œβ”€β”€ manifest.py             # SEC_MANIFEST (plans, rates, endpoints, credits)
β”‚   β”œβ”€β”€ client/                 # EDGAR API, EFTS, Arelle, Downloader
β”‚   β”œβ”€β”€ processors/             # XBRL processing, metadata, ingestion
β”‚   β”‚   β”œβ”€β”€ metadata.py         # SECMetadataLoader with caching
β”‚   β”‚   β”œβ”€β”€ xbrl_graph.py       # XBRLGraphProcessor
β”‚   β”‚   β”œβ”€β”€ processing.py       # Single filing processing
β”‚   β”‚   β”œβ”€β”€ consolidation.py    # Parquet consolidation
β”‚   β”‚   └── ingestion/          # DuckDB/LadybugDB ingestion
β”‚   └── pipeline/               # Dagster orchestration
β”‚       β”œβ”€β”€ __init__.py         # get_dagster_components() discovery
β”‚       β”œβ”€β”€ configs.py          # Run configurations
β”‚       β”œβ”€β”€ download.py         # sec_raw_filings asset
β”‚       β”œβ”€β”€ process.py          # sec_processed_filings asset
β”‚       β”œβ”€β”€ stage.py            # DuckDB staging assets
β”‚       β”œβ”€β”€ materialize.py      # LadybugDB materialization
β”‚       β”œβ”€β”€ entity_update.py    # Entity incremental update
β”‚       β”œβ”€β”€ backup.py           # SEC backup asset
β”‚       β”œβ”€β”€ jobs.py             # Job definitions
β”‚       └── sensors.py          # Sensors + schedule
β”œβ”€β”€ quickbooks/                 # QuickBooks adapter
β”‚   β”œβ”€β”€ client/                 # QB OAuth client
β”‚   β”œβ”€β”€ dbt/                    # dbt transforms (JSON β†’ graph-shaped Parquet)
β”‚   └── pipeline/               # Dagster extract/transform/load assets
└── plaid/                      # Plaid adapter (scaffolded)
    β”œβ”€β”€ client/                 # Plaid API client
    └── processors/             # Transaction sync

robosystems/dagster/            # Platform orchestration (collector)
β”œβ”€β”€ definitions.py              # Collects platform + adapter pipelines
β”œβ”€β”€ resources/                  # Shared Dagster resources (DB, S3, Graph)
β”œβ”€β”€ assets/
β”‚   β”œβ”€β”€ graphs.py               # User graph operation assets
β”‚   └── shared_repositories/    # S3 publish + replica refresh
β”œβ”€β”€ jobs/                       # Platform jobs (billing, infrastructure, graph, provisioning)
└── sensors/
    └── provisioning.py         # Subscription/repository provisioning

Dagster Architecture

Dagster runs on ECS Fargate (orchestration only). Heavy compute happens on the shared master via Graph API.

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  DAGSTER (ECS Fargate) - Orchestration                       β”‚
β”‚  β”œβ”€β”€ Daemon: 512 CPU / 1024 MB (singleton, runs migrations)  β”‚
β”‚  β”œβ”€β”€ Webserver: 512 CPU / 1024 MB (1-3 replicas, auto-scale) β”‚
β”‚  β”œβ”€β”€ Capacity: 80% SPOT + 20% On-Demand                      β”‚
β”‚  └── Jobs: Download, process Arelle, call Graph API          β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  SHARED MASTER (graph-ladybug.yaml) - Heavy Compute          β”‚
β”‚  β”œβ”€β”€ EC2 r7g.large with dynamic EBS (Volume Manager Lambda)  β”‚
β”‚  β”œβ”€β”€ DuckDB staging + LadybugDB materialization              β”‚
β”‚  β”œβ”€β”€ node_type: shared_master (in DynamoDB registry)         β”‚
β”‚  └── Single source of truth for shared repositories          β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  SHARED REPLICAS (graph-ladybug-replicas.yaml)               β”‚
β”‚  β”œβ”€β”€ ASG: Min=2, Max=10, TargetTracking on CPU               β”‚
β”‚  β”œβ”€β”€ S3 download: Pull .lbug + .duckdb from S3 on boot      β”‚
β”‚  β”œβ”€β”€ Instance: r7g.medium (read-only, smaller than master)   β”‚
β”‚  └── ALB on port 8001, health check /status                  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Key insight: Dagster orchestrates (lightweight), Graph API computes (heavy). This keeps Dagster simple and reuses existing graph infrastructure.

What Dagster Does vs What Graph API Does

Task Dagster (Fargate) Graph API (Shared Master)
Download XBRL βœ… Orchestrates -
Process with Arelle βœ… Runs on Fargate -
Parquet to S3 βœ… Handles -
DuckDB staging - βœ… Graph API handles
LadybugDB materialization - βœ… Graph API handles
S3 publish (.lbug/.duckdb) βœ… Orchestrates βœ… Uploads files
Replica refresh βœ… AWS API calls -

S3 Publish β†’ Replica Refresh Flow

1. Dagster sensor: sec_post_materialize_publish_sensor
   β”œβ”€β”€ Publish .lbug to S3 (LadybugDB graph database)
   └── Publish .duckdb to S3 (DuckDB with embeddings for vector search)

2. Dagster sensor: triggers shared_repository_refresh_replicas_job
   └── Rolling ASG instance refresh (min_healthy=100%, max_healthy=200%)

3. New replicas boot alongside old ones
   β”œβ”€β”€ Download .lbug + .duckdb from S3 (~15 min for ~85GB)
   β”œβ”€β”€ Start Graph API, pass health check
   └── Register with ALB, old instance terminated

Adding a Shared Repository

Shared repositories (like SEC) are defined by adapter manifests. Each manifest declares everything about a repo: identity, data source, schema, rate limits, plans/pricing, endpoint access, and credit costs.

To add a new shared repository:

  1. Create adapters/{name}/manifest.py with a SharedRepositoryManifest (see adapters/sec/manifest.py as a template)
  2. Add one import + _register() call to config/shared_repositories.py β†’ _load_manifests()

No billing config files, no DB migrations, no hardcoded lists to update.

Registry (config/shared_repositories.py): Lazy-loads manifests on first access. Provides the query API used by billing, middleware, and operations β€” get_manifest(), get_plan_details(), get_rate_limits(), is_shared_repository(), etc.

See PR #308 and RFC #191 for the broader extensible adapter design.

Adding Custom Adapters

The custom_*/ namespace is reserved for fork additions that won't conflict with upstream updates:

  1. Create adapters/custom_myservice/ with client/, processors/, and pipeline/
  2. Implement pipeline/__init__.py with get_dagster_components() returning {"assets": [...], "jobs": [...], "sensors": [...], "schedules": [...]}
  3. Import and collect in dagster/definitions.py (see the # === FORK comment)

See Adapters README for details and RFC #191 for the broader extensible adapter design.


Unified Ingestion Pattern

All pipelines use DuckDB staging regardless of source:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              UNIFIED INGESTION (ALL PIPELINES)                  β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                                 β”‚
β”‚  ANY Pipeline Output (Parquet in S3)                            β”‚
β”‚           β”‚                                                     β”‚
β”‚           β–Ό                                                     β”‚
β”‚  1. GraphFile.create()                                          β”‚
β”‚     Register file in PostgreSQL (provenance tracking)           β”‚
β”‚           β”‚                                                     β”‚
β”‚           β–Ό                                                     β”‚
β”‚  2. client.create_table(s3_files=[...])                         β”‚
β”‚     Load parquet β†’ DuckDB staging table (queryable for debug)   β”‚
β”‚           β”‚                                                     β”‚
β”‚           β–Ό                                                     β”‚
β”‚  3. client.materialize_table(file_ids=[...])                    β”‚
β”‚     DuckDB β†’ LadybugDB (incremental by file_id)                 β”‚
β”‚           β”‚                                                     β”‚
β”‚           β–Ό                                                     β”‚
β”‚  4. GraphFile.mark_graph_ingested()                             β”‚
β”‚     Track completion in PostgreSQL                              β”‚
β”‚                                                                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Why DuckDB:

  • Validation layer before graph ingestion
  • Handles S3 Parquet natively (httpfs extension)
  • Same pattern for SEC, QuickBooks, Plaid, custom
  • Scale proof: If it handles SEC (1TB+), it handles any company's data

Graph API endpoints:

  • /databases/{graph_id}/tables - Create staging table
  • /databases/{graph_id}/tables/query - SQL validation
  • /databases/{graph_id}/tables/{name}/ingest - Materialize to graph

Company Graphs vs Shared Repositories

Aspect Shared Repos (SEC) Company Graphs (QuickBooks)
Write pattern Nightly incremental + rebuild Incremental daily
Read pattern High volume, cacheable Low volume, fresh
Scaling Horizontal (replicas) Vertical (bigger instance)
Data size 1TB+ 1-100GB
DuckDB role Transport (full rebuild) Staging (incremental)

Compute Infrastructure

Component Approach EBS Pattern Reason
Dagster (all jobs) ECS Fargate None (ephemeral) Orchestration only. Heavy compute via Graph API.
Shared master Raw EC2 + ASG Dynamic (Vol Manager) Graph API handles DuckDB + LadybugDB.
Customer graph writers Raw EC2 + ASG Dynamic per-customer Volume Manager assigns EBS per customer allocation.
Shared replicas Raw EC2 + ASG S3 download at boot All replicas identical. Download .lbug/.duckdb from S3.

Why Fargate for Dagster

Dagster's role is orchestration, not compute:

  • Download XBRL files (small, fits in Fargate's 200GB ephemeral)
  • Process with Arelle β†’ parquet (CPU-bound, Fargate handles)
  • Upload parquet to S3 (network I/O)
  • Call Graph API for materialization (HTTP call, master does work)
  • AWS API calls for replica refresh (lightweight)

Benefits:

  • Simpler infrastructure (no EC2 capacity provider)
  • No EBS management for Dagster
  • Scale to zero automatically
  • Cost efficient (pay only for orchestration time)

Why Raw EC2 for Graph Instances

Graph instances need:

  • Large EBS volumes (100GB-1TB+)
  • Volume Manager Lambda coordination (dynamic assignment)
  • Long-running processes (Graph API serving requests)

ECS doesn't help here - core complexity (EBS management) remains regardless.


Key Architectural Decisions

Decision Rationale
Dagster for all orchestration One system (eliminated Celery), one UI for pipelines + billing + infrastructure
Fargate-only Dagster Dagster orchestrates, Graph API computes; no EC2 capacity provider complexity
DuckDB-only staging Standardized validation layer for all pipelines
Two pipeline patterns Arelle for XBRL semantics (SEC), dbt for API JSON transforms (QB/Plaid)
GitHub repos for adapters Customization is the value prop; dbt projects don't fit pip
S3 publish for replicas Dagster publishes to S3, triggers rolling refresh; replicas download on boot

Lambda Functions

Some functions must stay as Lambda (event-driven, not orchestration):

Lambda Reason
graph_volume_manager.py Called synchronously from EC2 userdata during boot
graph_volume_monitor.py Triggered by CloudWatch alarms (SNS)
graph_volume_detachment.py ASG lifecycle hook handler
postgres_rotation.py AWS Secrets Manager rotation
valkey_rotation.py AWS Secrets Manager rotation
graph_api_rotation.py AWS Secrets Manager rotation
postgres_init.py RDS event handler (secrets + database creation)

Reference

  • Dagster README: /robosystems/dagster/README.md
  • SEC Pipeline README: /robosystems/adapters/sec/pipeline/README.md
  • Adapter READMEs: /robosystems/adapters/*/README.md
  • CloudFormation: /cloudformation/dagster.yaml, /cloudformation/graph-ladybug-replicas.yaml