-
Notifications
You must be signed in to change notification settings - Fork 4
Pipeline Guide
RoboSystems uses Dagster for all data orchestration. Data flows from external sources through adapters, staging (DuckDB), and into the knowledge graph (LadybugDB).
Related documentation:
- Architecture Overview - System architecture
- Bootstrap Guide - Deployment instructions
| Pipeline | Status | Tracking |
|---|---|---|
| SEC | Production-ready | #117 |
| QuickBooks | In Development | #118 |
| Plaid | Scaffolded | Coming soon |
Infrastructure:
- Shared replicas deployment: #113
RoboSystems uses two distinct patterns based on data source characteristics:
XBRL Files β Arelle Processing β Parquet β DuckDB Staging β LadybugDB
(semantic extraction) (already graph-shaped)
- Used for: SEC EDGAR filings (XBRL format)
- Why: Arelle extracts XBRL semantics (concepts, contexts, facts) directly into graph-compatible structures
- No dbt needed: Output is already graph-shaped
API JSON β S3 Raw β dbt transforms β S3 Processed β DuckDB Staging β LadybugDB
(chunks) (JSON β Parquet) (graph-shaped)
- Used for: API-based integrations (QuickBooks, Plaid, custom ERPs)
- Why: Raw JSON needs transformation into graph-compatible node/relationship Parquet
- dbt provides: SQL-based transforms, testing, documentation
Each adapter is self-contained: client, processors, and pipeline (Dagster orchestration) all live together. dagster/definitions.py collects adapter pipelines via the get_dagster_components() discovery pattern.
robosystems/adapters/
βββ base.py # SharedRepositoryManifest dataclass
βββ sec/ # SEC EDGAR adapter (self-contained)
β βββ manifest.py # SEC_MANIFEST (plans, rates, endpoints, credits)
β βββ client/ # EDGAR API, EFTS, Arelle, Downloader
β βββ processors/ # XBRL processing, metadata, ingestion
β β βββ metadata.py # SECMetadataLoader with caching
β β βββ xbrl_graph.py # XBRLGraphProcessor
β β βββ processing.py # Single filing processing
β β βββ consolidation.py # Parquet consolidation
β β βββ ingestion/ # DuckDB/LadybugDB ingestion
β βββ pipeline/ # Dagster orchestration
β βββ __init__.py # get_dagster_components() discovery
β βββ configs.py # Run configurations
β βββ download.py # sec_raw_filings asset
β βββ process.py # sec_processed_filings asset
β βββ stage.py # DuckDB staging assets
β βββ materialize.py # LadybugDB materialization
β βββ entity_update.py # Entity incremental update
β βββ backup.py # SEC backup asset
β βββ jobs.py # Job definitions
β βββ sensors.py # Sensors + schedule
βββ quickbooks/ # QuickBooks adapter
β βββ client/ # QB OAuth client
β βββ dbt/ # dbt transforms (JSON β graph-shaped Parquet)
β βββ pipeline/ # Dagster extract/transform/load assets
βββ plaid/ # Plaid adapter (scaffolded)
βββ client/ # Plaid API client
βββ processors/ # Transaction sync
robosystems/dagster/ # Platform orchestration (collector)
βββ definitions.py # Collects platform + adapter pipelines
βββ resources/ # Shared Dagster resources (DB, S3, Graph)
βββ assets/
β βββ graphs.py # User graph operation assets
β βββ shared_repositories/ # S3 publish + replica refresh
βββ jobs/ # Platform jobs (billing, infrastructure, graph, provisioning)
βββ sensors/
βββ provisioning.py # Subscription/repository provisioning
Dagster runs on ECS Fargate (orchestration only). Heavy compute happens on the shared master via Graph API.
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DAGSTER (ECS Fargate) - Orchestration β
β βββ Daemon: 512 CPU / 1024 MB (singleton, runs migrations) β
β βββ Webserver: 512 CPU / 1024 MB (1-3 replicas, auto-scale) β
β βββ Capacity: 80% SPOT + 20% On-Demand β
β βββ Jobs: Download, process Arelle, call Graph API β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β SHARED MASTER (graph-ladybug.yaml) - Heavy Compute β
β βββ EC2 r7g.large with dynamic EBS (Volume Manager Lambda) β
β βββ DuckDB staging + LadybugDB materialization β
β βββ node_type: shared_master (in DynamoDB registry) β
β βββ Single source of truth for shared repositories β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β SHARED REPLICAS (graph-ladybug-replicas.yaml) β
β βββ ASG: Min=2, Max=10, TargetTracking on CPU β
β βββ S3 download: Pull .lbug + .duckdb from S3 on boot β
β βββ Instance: r7g.medium (read-only, smaller than master) β
β βββ ALB on port 8001, health check /status β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Key insight: Dagster orchestrates (lightweight), Graph API computes (heavy). This keeps Dagster simple and reuses existing graph infrastructure.
| Task | Dagster (Fargate) | Graph API (Shared Master) |
|---|---|---|
| Download XBRL | β Orchestrates | - |
| Process with Arelle | β Runs on Fargate | - |
| Parquet to S3 | β Handles | - |
| DuckDB staging | - | β Graph API handles |
| LadybugDB materialization | - | β Graph API handles |
| S3 publish (.lbug/.duckdb) | β Orchestrates | β Uploads files |
| Replica refresh | β AWS API calls | - |
1. Dagster sensor: sec_post_materialize_publish_sensor
βββ Publish .lbug to S3 (LadybugDB graph database)
βββ Publish .duckdb to S3 (DuckDB with embeddings for vector search)
2. Dagster sensor: triggers shared_repository_refresh_replicas_job
βββ Rolling ASG instance refresh (min_healthy=100%, max_healthy=200%)
3. New replicas boot alongside old ones
βββ Download .lbug + .duckdb from S3 (~15 min for ~85GB)
βββ Start Graph API, pass health check
βββ Register with ALB, old instance terminated
Shared repositories (like SEC) are defined by adapter manifests. Each manifest declares everything about a repo: identity, data source, schema, rate limits, plans/pricing, endpoint access, and credit costs.
To add a new shared repository:
- Create
adapters/{name}/manifest.pywith aSharedRepositoryManifest(seeadapters/sec/manifest.pyas a template) - Add one import +
_register()call toconfig/shared_repositories.pyβ_load_manifests()
No billing config files, no DB migrations, no hardcoded lists to update.
Registry (config/shared_repositories.py): Lazy-loads manifests on first access. Provides the query API used by billing, middleware, and operations β get_manifest(), get_plan_details(), get_rate_limits(), is_shared_repository(), etc.
See PR #308 and RFC #191 for the broader extensible adapter design.
The custom_*/ namespace is reserved for fork additions that won't conflict with upstream updates:
- Create
adapters/custom_myservice/withclient/,processors/, andpipeline/ - Implement
pipeline/__init__.pywithget_dagster_components()returning{"assets": [...], "jobs": [...], "sensors": [...], "schedules": [...]} - Import and collect in
dagster/definitions.py(see the# === FORKcomment)
See Adapters README for details and RFC #191 for the broader extensible adapter design.
All pipelines use DuckDB staging regardless of source:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β UNIFIED INGESTION (ALL PIPELINES) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ANY Pipeline Output (Parquet in S3) β
β β β
β βΌ β
β 1. GraphFile.create() β
β Register file in PostgreSQL (provenance tracking) β
β β β
β βΌ β
β 2. client.create_table(s3_files=[...]) β
β Load parquet β DuckDB staging table (queryable for debug) β
β β β
β βΌ β
β 3. client.materialize_table(file_ids=[...]) β
β DuckDB β LadybugDB (incremental by file_id) β
β β β
β βΌ β
β 4. GraphFile.mark_graph_ingested() β
β Track completion in PostgreSQL β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Why DuckDB:
- Validation layer before graph ingestion
- Handles S3 Parquet natively (httpfs extension)
- Same pattern for SEC, QuickBooks, Plaid, custom
- Scale proof: If it handles SEC (1TB+), it handles any company's data
Graph API endpoints:
-
/databases/{graph_id}/tables- Create staging table -
/databases/{graph_id}/tables/query- SQL validation -
/databases/{graph_id}/tables/{name}/ingest- Materialize to graph
| Aspect | Shared Repos (SEC) | Company Graphs (QuickBooks) |
|---|---|---|
| Write pattern | Nightly incremental + rebuild | Incremental daily |
| Read pattern | High volume, cacheable | Low volume, fresh |
| Scaling | Horizontal (replicas) | Vertical (bigger instance) |
| Data size | 1TB+ | 1-100GB |
| DuckDB role | Transport (full rebuild) | Staging (incremental) |
| Component | Approach | EBS Pattern | Reason |
|---|---|---|---|
| Dagster (all jobs) | ECS Fargate | None (ephemeral) | Orchestration only. Heavy compute via Graph API. |
| Shared master | Raw EC2 + ASG | Dynamic (Vol Manager) | Graph API handles DuckDB + LadybugDB. |
| Customer graph writers | Raw EC2 + ASG | Dynamic per-customer | Volume Manager assigns EBS per customer allocation. |
| Shared replicas | Raw EC2 + ASG | S3 download at boot | All replicas identical. Download .lbug/.duckdb from S3. |
Dagster's role is orchestration, not compute:
- Download XBRL files (small, fits in Fargate's 200GB ephemeral)
- Process with Arelle β parquet (CPU-bound, Fargate handles)
- Upload parquet to S3 (network I/O)
- Call Graph API for materialization (HTTP call, master does work)
- AWS API calls for replica refresh (lightweight)
Benefits:
- Simpler infrastructure (no EC2 capacity provider)
- No EBS management for Dagster
- Scale to zero automatically
- Cost efficient (pay only for orchestration time)
Graph instances need:
- Large EBS volumes (100GB-1TB+)
- Volume Manager Lambda coordination (dynamic assignment)
- Long-running processes (Graph API serving requests)
ECS doesn't help here - core complexity (EBS management) remains regardless.
| Decision | Rationale |
|---|---|
| Dagster for all orchestration | One system (eliminated Celery), one UI for pipelines + billing + infrastructure |
| Fargate-only Dagster | Dagster orchestrates, Graph API computes; no EC2 capacity provider complexity |
| DuckDB-only staging | Standardized validation layer for all pipelines |
| Two pipeline patterns | Arelle for XBRL semantics (SEC), dbt for API JSON transforms (QB/Plaid) |
| GitHub repos for adapters | Customization is the value prop; dbt projects don't fit pip |
| S3 publish for replicas | Dagster publishes to S3, triggers rolling refresh; replicas download on boot |
Some functions must stay as Lambda (event-driven, not orchestration):
| Lambda | Reason |
|---|---|
graph_volume_manager.py |
Called synchronously from EC2 userdata during boot |
graph_volume_monitor.py |
Triggered by CloudWatch alarms (SNS) |
graph_volume_detachment.py |
ASG lifecycle hook handler |
postgres_rotation.py |
AWS Secrets Manager rotation |
valkey_rotation.py |
AWS Secrets Manager rotation |
graph_api_rotation.py |
AWS Secrets Manager rotation |
postgres_init.py |
RDS event handler (secrets + database creation) |
- Dagster README:
/robosystems/dagster/README.md - SEC Pipeline README:
/robosystems/adapters/sec/pipeline/README.md - Adapter READMEs:
/robosystems/adapters/*/README.md - CloudFormation:
/cloudformation/dagster.yaml,/cloudformation/graph-ladybug-replicas.yaml
Β© 2025 RFS LLC