This repository details the architecture of a cloud-native, event-driven Node.js/TypeScript backend platform built on Express.js and PostgreSQL. It serves as the authoritative data layer for user profiles, consumption tracking, and health metrics, orchestrating multi-device data synchronization, high-throughput health ingestion, asynchronous projection pipelines, and AI-powered analytics.
We designed this system for the real world, not the happy path. It is engineered to handle unreliable mobile networks, long offline periods, bursty health data ingestion, and concurrent multi-device modifications. The architecture prioritizes data integrity, idempotency, and explicit consistency tracking. The platform remains correct, observable, and recoverable under all operating conditions.
| Runtime & Language | |
| API Framework | |
| Database & ORM | |
| Cache & Background Jobs | |
| Authentication & Security | |
| Real-Time & Sync | |
| Validation & Observability | |
| Cloud Infrastructure | |
| Testing & API | |
| CI/CD |
Every domain event is written to an OutboxEvent table within the same database transaction as the primary data change. A separate processor claims, routes, and completes events. If the app crashes between commit and delivery, events survive in the outbox. The dual-write problem is structurally eliminated.
Goal: No data change silently drops its downstream side effects.
Optimistic locking detects concurrent modifications. Conflict resolution is config-driven: field-level merge policies (LOCAL_WINS, MONOTONIC, MAX_VALUE) are declared in a shared contract package, consumed by both backend and mobile. No ad-hoc merge logic exists inside service methods.
Goal: Every conflict outcome is reproducible, auditable, and testable in isolation.
Request-level idempotency uses requestId + payloadHash in a persistent tracking table. Sample-level deduplication relies on composite unique constraints. Sync operations are tracked by clientSyncOperationId with cached resultPayload. Retries are always safe.
Goal: Network unreliability, client bugs, and background job retries never produce duplicate data.
Raw HealthSample ingestion is the write path: high throughput, append-only, minimal blocking. Derived read models (daily rollups, sleep summaries, session impacts, product impacts) are computed asynchronously via event-driven projection handlers, each with independent checkpoints and watermark-based freshness tracking.
Goal: API reads are fast (pre-computed), writes are durable (transactional), and staleness is explicit (watermarks).
bootstrap.ts is the single composition root. Every service receives its dependencies through constructor injection. No service locators, no ambient singletons, no hidden coupling. The entire dependency graph is visible in one file.
Goal: Maximum testability, minimal coupling, and complete transparency of system wiring.
The sync subsystem orchestrates bidirectional data synchronization between multiple offline-first mobile clients and the backend. Users make changes while offline; all local changes eventually reconcile with the server's authoritative state through deterministic conflict resolution. The engine uses cursor-based pagination, optimistic locking, and a transactional outbox to guarantee eventual consistency with zero data loss.
The push phase dequeues local outbox commands, performs FK translation and dependency ordering, and submits batched changes to the backend. The backend acquires a distributed lock, checks idempotency, detects conflicts via version comparison, and applies changes atomically using config-driven merge rules. The pull phase fetches incremental changes via composite cursors, applies them locally, then validates relational integrity before committing cursor advancement.
Sync Engine Sequence Diagram
sequenceDiagram
participant Client as Mobile Client
participant API as API Gateway
participant Sync as SyncService
participant SS as SyncState
participant SC as SyncChange
participant EH as EntityHandler
participant DB as PostgreSQL
Note over Client,API: Pull Cycle
Client->>API: GET /sync/changes?cursor=...&entityTypes=...
API->>Sync: getIncrementalChanges(userId, cursor, entityTypes)
Sync->>SS: acquireSyncLock(userId, deviceId)
Sync->>SC: getChangesSince(cursor, entityTypes)
Sync->>Sync: buildCompositeCursor(changes)
Sync->>SS: updateCursorPosition + releaseLock
API-->>Client: {changes[], nextCursor, hasMore}
Note over Client,API: Push Cycle
Client->>API: POST /sync/push {changes[], syncOperationId}
API->>Sync: processPushSync(userId, changes)
Sync->>SS: acquireSyncLock(userId, deviceId)
Sync->>DB: findByClientSyncOperationId (idempotency check)
loop Each Change
Sync->>EH: fetchServerVersion(entityId)
alt Server version > Client version (Conflict)
Sync->>Sync: resolveConflictStrategy(ENTITY_CONFLICT_CONFIG)
alt MERGE Strategy
Sync->>EH: merge(server, local, fieldPolicies)
Sync->>DB: update(mergedData)
else SERVER_WINS / CLIENT_WINS
Sync->>DB: update(winningData)
end
else No Conflict
Sync->>EH: create/update/delete(changeData)
end
Sync->>SC: createSyncChangeRecord(change)
end
Sync->>DB: markAsCompleted(SyncOperation, resultPayload)
Sync->>SS: releaseSyncLock
API-->>Client: {successful[], conflicts[], failed[]}
Guarantee: Cursors never advance past corrupted state. Data integrity is enforced structurally, not by convention.
For full implementation details -- conflict resolution policies, cursor semantics, and failure handling -- see docs/features/multi-device-sync-engine.md.
The health pipeline is divided into two decoupled phases: a high-throughput ingestion write path that validates, deduplicates, and persists raw health samples with transactional outbox events, and an async projection pipeline that transforms raw data into pre-computed read models with watermark-based freshness tracking.
End-to-End Ingestion Sequence Diagram
sequenceDiagram
participant Client as Mobile Client
participant API as API Gateway
participant HSS as HealthSampleService
participant HQS as IngestQueueService
participant Worker as BullMQ Worker
participant DB as PostgreSQL
participant Outbox as OutboxService
participant OPS as OutboxProcessor
participant HPC as ProjectionCoordinator
participant PH as ProjectionHandlers
Client->>API: POST /health/samples/batch-upsert
API->>HSS: batchUpsertSamples(userId, samples, requestId, payloadHash)
Note over HSS: Layer 1: Request Idempotency
HSS->>DB: checkRequestIdempotency(requestId, payloadHash)
alt Idempotency Hit
DB-->>HSS: Cached BatchUpsertResult
HSS-->>Client: 200 OK (cached response)
else New Request
HSS->>DB: createIngestRequest(PROCESSING)
Note over HSS: Privacy & Validation
HSS->>HSS: assertHealthUploadAllowed(privacySettings)
HSS->>HSS: filterBlockedMetrics + validate + normalize
Note over HSS: Layer 2: Sample Deduplication
HSS->>DB: batchUpsertWithIdempotency (ON CONFLICT DO UPDATE)
HSS->>DB: incrementWatermark(sequenceNumber)
Note over HSS,Outbox: Atomic Event Recording
HSS->>Outbox: createOutboxCallback (SAME transaction)
Outbox->>DB: INSERT health.samples.changed OutboxEvent
HSS-->>Client: 200 OK / 207 Partial Content
end
Note over OPS: Async Event Processing
OPS->>DB: claimBatch(PENDING outbox events)
OPS->>HPC: processHealthSamplesChanged(event)
loop Each ProjectionHandler
HPC->>DB: tryAcquireProjectionLease
HPC->>PH: handle(payload)
PH->>DB: Query raw HealthSamples
PH->>DB: Upsert derived read model
HPC->>DB: markCompleted / markFailed
end
OPS->>DB: markAsCompleted(OutboxEvent)
Guarantee: Zero dual-writes. Raw samples and outbox events are committed in a single atomic transaction. Downstream projections use watermark sequence numbers to detect and recover from staleness.
For full implementation details -- idempotency layers, projection checkpoints, and watermark freshness -- see docs/features/idempotent-health-ingestion.md.
The backend separates work into two distinct asynchronous pipelines: a BullMQ job-processing subsystem for compute-heavy background tasks, and a transactional outbox pipeline for event-driven projections. Both run in a dedicated Worker Service process, sharing no state with the Web Service except through PostgreSQL and Redis.
Guarantee: Background jobs survive worker crashes via durable Redis queues. Outbox events survive application crashes via transactional persistence. Both pipelines are idempotent and retry-safe.
Problem: Ensuring domain events are reliably emitted and processed from PostgreSQL, even with crashes or network failures.
Solution: The OutboxService implements the Transactional Outbox Pattern. Events are written to a dedicated OutboxEvent table within the same database transaction as the primary data change. An idempotent OutboxProcessorService polls, processes, and marks events as COMPLETED. Health events are further grouped for efficiency by outbox-coalescing.ts.
Problem: Reconciling complex, divergent local client states from potentially long offline periods with the server, handling version conflicts, and maintaining data integrity across all synced entities.
Solution: The SyncService orchestrates a cursor-based, bidirectional sync. Optimistic locking with version fields detects conflicts. Resolution uses configurable strategies (LAST_WRITE_WINS, MERGE) with field-level policies from shared contracts. Each entity type has a dedicated handler for specific merge logic.
Problem: Reliably processing large, bursty streams of sensitive health data without creating duplicates on retry, ensuring PHI redaction for AI, and maintaining performance.
Solution: Two-layer idempotency. Request-level uses a HealthIngestRequest table keyed by requestId and payloadHash. Sample-level deduplication relies on unique constraint (user_id, source_id, source_record_id, start_at). Large batches are offloaded to BullMQ workers. The AiPhiRedactionService sanitizes PHI before AI processing.
| Domain | Description | Key Services |
|---|---|---|
| API Gateway & Middleware | Request routing, authentication, authorization, rate limiting, security headers, correlation context | auth.middleware.ts, rateLimitQueue.middleware.ts, correlationContext.middleware.ts |
| Core Business Logic | Domain operations for users, consumption, journaling, inventory, purchases, goals, achievements | consumption.service.ts, session.service.ts, journal.service.ts |
| Data Storage & Access | Prisma ORM repositories for all entities | repository.factory.ts, health-sample.repository.ts |
| Multi-Client Sync | Bidirectional sync with conflict detection and resolution | sync.service.ts, syncLease.service.ts, handlers/*.handler.ts |
| Health Pipeline | Ingestion, projection, aggregation, and read service | healthSample.service.ts, health-projection-coordinator.service.ts |
| Async Jobs | Background processing with BullMQ | job-processor.ts, job-manager.service.ts |
| AI Integration | LLM orchestration, PHI redaction, cost tracking | ai.service.ts, ai-phi-redaction.service.ts |
| Real-time | WebSocket communication via Socket.IO | socket.service.ts, WebSocketBroadcaster.ts |
| Observability | Tracing, logging, performance monitoring | opentelemetry.ts, performanceMonitoring.service.ts |
For granular analysis of each subsystem, refer to the domain-specific documentation below:
| Document | Focus Area |
|---|---|
| System Architecture | Layered system model, deployment topology, service boundaries, data flows, and bootstrap lifecycle. |
| Bidirectional Sync Engine | Cursor-based sync, conflict resolution strategies, entity handlers, and admission control. |
| Health Ingestion Pipeline | Trust boundary validation, throughput routing, dual-layer idempotency, and transactional outbox. |
| Health Projection Pipeline | CQRS read models, projection handlers, watermark freshness, and checkpoint coordination. |
| Data Integrity & Concurrency | ACID boundaries, optimistic locking, idempotency guarantees, and concurrency control. |
| Failure Modes & Resilience | Retry policies, circuit breakers, dead-letter queues, and self-healing mechanisms. |
| Architectural Decision Records | 21 ADRs covering DI, outbox, sync, watermarks, privacy gating, and more. |
| Worker Scalability | BullMQ topology, concurrency control, backpressure, distributed locks, and auto-scaling. |
| Multi-Device Sync Engine | Bidirectional sync feature deep-dive with conflict resolution and cursor semantics. |
| Idempotent Health Ingestion | Two-layer idempotency, batch processing, privacy gating, and PHI redaction. |
| RealTime Session Telemetry | Watermark-based caching, bounded compute, WebSocket delivery, and async recomputation. |
| ADR-001: Transactional Outbox | Outbox pattern, event coalescing, retry/dead-letter, and crash safety. |
| ADR-002: Cursor-Based Sync | Composite cursors, optimistic locking, and config-driven merge policies. |
| ADR-003: Dual-Driver Health Ingestion | Native Swift + TypeScript fallback driver architecture for iOS HealthKit. |
| ADR-004: Health Projection Pipeline | CQRS read models, watermark freshness, and per-projection checkpoints. |
| ADR-005: Offline-First Local Outbox | Client-side transactional outbox, integrity gate, and offline durability. |
| Pattern | Implementation |
|---|---|
| Transactional Outbox | Atomic DB write + event recording; at-least-once delivery guaranteed |
| Optimistic Locking (MVCC) | version fields prevent lost updates during concurrent sync |
| Cursor-Based Pagination | O(log N) keyset pagination, stable under concurrent writes |
| Config-Driven Conflict Resolution | ENTITY_CONFLICT_CONFIG with field-level policies (LOCAL_WINS, MERGE_ARRAYS, MONOTONIC) |
| Event-Driven Architecture | Domain events drive analytics, achievements, projections via decoupled subscribers |
| Health Projection & Watermarks | Derived read models with sourceWatermark for explicit freshness tracking |
| Bounded Concurrency | BoundedComputeCoordinator prevents resource exhaustion during high-volume processing |
| Circuit Breaker | Protects external API integrations from cascading failures |
| Retry with Backoff + Jitter | Applied consistently for transient errors |
| PHI Redaction Pipeline | AiPhiRedactionService removes health information before AI processing |
| Strategy Pattern for Sync | SyncEntityHandler implementations for entity-specific merge logic |
| Pure DI Composition Root | bootstrap.ts explicitly wires entire dependency graph |
erDiagram
User ||--o{ Product : owns
User ||--o{ Consumption : logs
User ||--o{ ConsumptionSession : creates
User ||--o{ Purchase : makes
User ||--o{ JournalEntry : writes
User ||--o{ HealthSample : collects
User ||--o| UserHealthWatermark : tracks
User ||--o{ SyncOperation : performs
Product ||--o{ Consumption : "consumed as"
Product ||--o{ UserProductImpactRollup : "impact tracked"
ConsumptionSession ||--o{ Consumption : contains
ConsumptionSession ||--o{ UserSessionImpactSummary : "impact analyzed"
ConsumptionSession ||--o{ UserSleepNightSummary : "sleep correlated"
HealthSample }o--|| HealthIngestRequest : "tracked by"
UserHealthWatermark ||--o{ UserHealthRollupDay : "freshness source"
UserHealthWatermark ||--o{ UserSleepNightSummary : "freshness source"
UserHealthWatermark ||--o{ UserSessionImpactSummary : "freshness source"
UserHealthWatermark ||--o{ UserProductImpactRollup : "freshness source"
OutboxEvent ||--o{ ProjectionCheckpoint : "tracked per projection"
User {
UUID id PK
String username
String email
Int version
JSONB privacySettings
}
HealthSample {
UUID id PK
UUID userId FK
String sourceRecordId
String metricCode
Decimal value
String unit
DateTime startAt
Boolean isDeleted
}
HealthIngestRequest {
UUID id PK
String requestId UK
String payloadHash
String status
}
UserHealthWatermark {
UUID userId PK
BigInt sequenceNumber
DateTime lastChangedAt
}
UserHealthRollupDay {
String id PK
UUID userId FK
String metricCode
Date dayUtc
String status
BigInt sourceWatermark
}
UserSleepNightSummary {
String id PK
UUID userId FK
Date nightLocalDate
Int totalSleepMin
String status
BigInt sourceWatermark
}
UserSessionImpactSummary {
String id PK
UUID sessionId FK
String metricCode
Decimal deltaAfterPct
Boolean isReliable
String status
}
UserProductImpactRollup {
String id PK
UUID productId FK
String metricCode
Int periodDays
Decimal avgDeltaAfterPct
String confidenceTier
String status
}
OutboxEvent {
UUID id PK
String eventType
JSONB payload
String status
Int retryCount
}
ProjectionCheckpoint {
UUID id PK
UUID outboxEventId FK
String projectionName
String status
}
SyncOperation {
UUID id PK
UUID userId FK
String clientSyncOperationId UK
JSONB resultPayload
}
Health Data Ingestion & Query:
POST /health/samples/batch-upsert-- High-throughput, idempotent batch uploadGET /health/samples/cursor-- Cursor-based paginated health samplesGET /health/rollups-- Daily aggregated health metricsGET /health/sleep-- Summarized sleep data per nightGET /health/session-impact-- Health impact analysis per consumption sessionGET /health/impact/by-product-- Aggregated product health impact
Data Synchronization:
POST /sync/lease-- Sync lease admission controlGET /sync/changes-- Incremental cursor-based data pullPOST /sync/push-- Push local client changes with conflict resolutionPOST /sync/conflicts/batch-resolve-- Batch conflict resolution
AI/ML Integrations:
POST /ai/chat/message-- AI chat with conversation historyGET /ai/analysis/journal-- Journal entry analysisGET /ai/recommendations/variant-- Personalized product recommendations
session:started/ended/updated-- Real-time session lifecycleconsumption:update-- Live consumption data streamingsync:required/completed-- Sync coordination signals
code-snippets/
├── index.ts # Application entry point
├── app.ts # Express application setup
├── server.ts # HTTP server lifecycle management
├── bootstrap.ts # Pure DI composition root (single wiring point)
├── worker.ts # BullMQ worker process entry point
├── instrumentation.ts # OpenTelemetry tracing setup
│
├── api/v1/
│ ├── controllers/ # Request handlers
│ │ ├── health.controller.ts # Health ingestion + projection reads
│ │ ├── sync.controller.ts # Bidirectional sync orchestration
│ │ ├── session.controller.ts # Session lifecycle + telemetry
│ │ ├── device.controller.ts # BLE device management
│ │ ├── telemetry.controller.ts # Device telemetry ingestion
│ │ ├── user.controller.ts # User profile CRUD
│ │ └── websocket.controller.ts # WebSocket event handling
│ ├── middleware/ # 22 middleware modules
│ │ ├── auth.middleware.ts # Cognito JWT verification + user ID mapping
│ │ ├── apiCache.middleware.ts # Response caching with race-condition protection
│ │ ├── apiGateway.middleware.ts # External API circuit breaker + retry
│ │ ├── rateLimitQueue.middleware.ts # Token-bucket rate limiting
│ │ ├── sync-lease.middleware.ts # Distributed sync admission control
│ │ ├── correlationContext.middleware.ts # Request correlation tracking
│ │ ├── error.middleware.ts # Centralized error handling
│ │ └── ... # Security, logging, validation, HTTPS enforcement
│ ├── routes/ # Express route definitions
│ │ ├── health.routes.ts # /health/* endpoints
│ │ ├── sync.routes.ts # /sync/* endpoints
│ │ ├── session.routes.ts # /sessions/* endpoints
│ │ ├── auth.routes.ts # /auth/* endpoints
│ │ └── ... # device, telemetry, user, monitoring, security
│ └── schemas/ # Zod request/response validation
│ ├── sync.schemas.ts
│ ├── session.schemas.ts
│ ├── user.schemas.ts
│ └── validation-utils.ts # Shared decimal, UUID, enum validators
│
├── config/ # Configuration management
│ ├── index.ts # Async config initialization (AWS Secrets Manager)
│ └── auth.config.ts # Cognito + OAuth provider configuration
│
├── core/ # DI infrastructure
│ ├── controller-registry.ts # Type-safe controller container
│ ├── route-registry.ts # Declarative route-to-controller binding
│ ├── middleware-factory.ts # Middleware instantiation with DI
│ ├── controller.types.ts # Controller interface contracts
│ ├── type-guards.ts # Runtime type narrowing utilities
│ └── types.ts # Shared core type aliases
│
├── events/ # Domain event system
│ ├── domain.events.ts # 30+ typed event interfaces (EventTypeMap)
│ └── domain-event.service.ts # Pub/sub with dead-letter queue
│
├── jobs/ # BullMQ async job processing
│ ├── job-manager.service.ts # Queue management + Redis configuration
│ ├── job-processor.ts # Job handler dispatch
│ ├── job.types.ts # Job payload type definitions
│ └── schedules.ts # Repeatable job schedules (cron)
│
├── models/ # Zod schemas + Prisma type exports
│ ├── index.ts # Validation schemas for all entities
│ └── __tests__/ # Schema validation test suites
│
├── realtime/ # WebSocket infrastructure
│ ├── contracts/events.ts # Typed Socket.IO event contracts
│ ├── WebSocketBroadcaster.ts # User-scoped event emission
│ └── index.ts # Socket.IO server initialization
│
├── repositories/ # Prisma data access layer (20 repositories)
│ ├── base.repository.ts # Abstract base with error handling + soft delete
│ ├── repository.factory.ts # Lazy singleton repository instantiation
│ ├── health-sample.repository.ts # Health sample CRUD + batch upsert
│ ├── session.repository.ts # Session lifecycle + consumption queries
│ ├── sync-change.repository.ts # Sync change tracking
│ ├── outbox-event.repository.ts # Transactional outbox persistence
│ ├── projection-checkpoint.repository.ts # Per-projection completion tracking
│ ├── user-health-watermark.repository.ts # Monotonic watermark management
│ └── ... # device, telemetry-cache, rollup, sleep, impact
│
├── services/ # Business logic layer
│ ├── healthSample.service.ts # Health ingestion orchestration
│ ├── health-aggregation.service.ts # Metric aggregation computations
│ ├── health-projection-coordinator.service.ts # CQRS projection fanout
│ ├── health-projection-read.service.ts # Projection query service
│ ├── health-insight-engine.service.ts # Rule-based health insights
│ ├── healthIngestQueue.service.ts # BullMQ batch offloading
│ ├── product-impact-compute.ts # Statistical product impact analysis
│ ├── session.service.ts # Session lifecycle management
│ ├── session-telemetry.service.ts # Telemetry cache computation
│ ├── sync.service.ts # Bidirectional sync orchestration
│ ├── syncLease.service.ts # Distributed sync locking
│ ├── outbox.service.ts # Transactional outbox writes
│ ├── outbox-processor.service.ts # Outbox polling + event dispatch
│ ├── outbox-coalescing.ts # Event deduplication + batching
│ ├── auth.service.ts # Auth flows (Cognito, Google OAuth)
│ ├── cognito.service.ts # AWS Cognito client wrapper
│ ├── cache.service.ts # Redis cache with tag-based invalidation
│ ├── database.service.ts # Prisma client lifecycle
│ ├── logger.service.ts # Structured logging
│ ├── performanceMonitoring.service.ts # Runtime metrics + alerting
│ ├── drift-detector.service.ts # Schema/data drift detection
│ ├── sync/
│ │ ├── handlers/
│ │ │ ├── session.handler.ts # Session-specific sync merge logic
│ │ │ └── device.handler.ts # Device-specific sync merge logic
│ │ ├── conflict-merge.ts # Field-level conflict resolution engine
│ │ ├── sync.types.ts # Sync handler interface contracts
│ │ └── __tests__/ # Conflict merge test suite
│ └── ... # security, rate limiting, request validation
│
├── subscribers/ # Domain event handlers
│ ├── analytics.subscriber.ts # Consumption → DailyStat aggregation
│ ├── session-telemetry.subscriber.ts # Session end → telemetry cache trigger
│ └── domain.subscribers.ts # Subscriber initialization + wiring
│
├── types/ # TypeScript type definitions
│ ├── auth.types.ts # Authentication types
│ ├── cognito.types.ts # AWS Cognito response types
│ ├── express.d.ts # Express request augmentation
│ ├── socket.io.d.ts # Socket.IO type extensions
│ └── ... # middleware, request, database types
│
├── utils/ # Shared utilities
│ ├── AppError.ts # Structured error hierarchy
│ ├── error-handler.ts # Error classification + normalization
│ ├── retry.util.ts # Exponential backoff with jitter
│ ├── decimal-serializer.ts # Prisma Decimal → JSON serialization
│ ├── jwt-validation.utils.ts # JWT secret strength validation
│ ├── secure-id.utils.ts # Cryptographic ID generation
│ └── ... # auth, cognito, constants, correlation
│
└── websocket/
└── socket.service.ts # Socket.IO authentication + room management
prisma/
└── schema.prisma # PostgreSQL schema (20 models, 10 enums)
docs/
├── Architecture.md # System architecture + bootstrap lifecycle
├── Sync-Backend.md # Sync engine specification
├── HealthIngestion.md # Ingestion pipeline deep-dive
├── HealthProjection.md # CQRS projection pipeline
├── Data-Integrity.md # ACID guarantees + concurrency control
├── Failure-Modes.md # Resilience patterns + recovery
├── Decisions.md # 21 Architectural Decision Records
├── Worker-Scalability.md # BullMQ topology + auto-scaling
├── ADRs/ # Standalone ADR documents (5)
└── features/ # Feature deep-dives (3)
media/
├── diagrams/ # Architecture SVG diagrams (10)
└── MERMAID/ # Mermaid-rendered diagrams