Pipeline cutover → next#58
Open
boringethan wants to merge 146 commits into
Open
Conversation
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Bake in decisions made during plan walkthrough: - DB endpoint is off by default. Caller passes db_path to MotionInterface at construction; when set, every scan opens a session row in that file. When None (default), the SDK behaves byte-for-byte the same as today. - Drop the per-scan write_to_db and db_path fields from ScanRequest. Keep write_raw_to_db and notes (per-scan opt-in for raw histograms and free-text annotation). - Provenance kept, but _build_meta uses the real attribute paths on current next: self.console.get_version(), self.left.get_version(), self.right.get_version(), plus get_hardware_id() (cached when available, hex-encoded for bytes results). - Floats in both session_data and session_raw rounded to 6 decimals at insert time, matching the corrected CSV writer's precision. Task 9's cell-for-cell equivalence assertion can now treat both endpoints as exact-equal (kept abs_tol=1e-6 as a safety margin around float repr). - Plan now refers to omotion/MotionInterface.py (the actual file name on next) rather than the omotion/Interface.py that the original draft referenced (a renamed-in-a-prototype-branch artifact). - Added a note up front warning that the printed line numbers are stale relative to current next — readers should grep for symbols. No task structure changes; the 10-task sequence and checkbox tracking are unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Plan Task 1. ScanDatabase class is now importable as ``from omotion import ScanDatabase`` so the upcoming ScanDBSink (and any downstream caller) doesn't need to ``sys.path``-hack into the stream-db/ directory. No behavior changes — the moved file is byte-identical to the previous stream-db/scan_db.py. The five-test smoke suite in tests/test_scan_database.py exercises create/get session, raw-frame round-trips (compressed + uncompressed), session_data streaming, and unicode/None survival in session_meta. The four stream-db/ tools that imported ``from scan_db import ...`` (db_browser, db_validator, importer, sensor_module_simulator) are temporarily broken; Task 2 switches them to ``from omotion import ScanDatabase`` next. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Plan Task 2 follow-up to Task 1's move. The three stream-db tools that imported scan_db at the top level (db_validator, importer, sensor_module_simulator) now import ``from omotion import ScanDatabase`` with a sys.path bootstrap so they continue to work when run standalone from inside the stream-db/ directory. db_browser.py was listed in the original plan but turned out not to use the ScanDatabase class — it talks to the DB directly via sqlite3 — so no change there. Tick Task 1 + Task 2 checkboxes in the plan. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Plan Task 3. ScanDBSink wraps one ScanDatabase connection plus one open session row for the duration of a scan, with a threading.Lock to serialise the raw-frame buffer and a callback surface that mirrors what ScanWorkflow will fire (on_raw_frame, on_corrected_batch). In this commit the callbacks are deliberately no-ops past the "ScanDBSink.* invoked before open()" guard; the real bodies land in Tasks 4 (raw frames, batched executemany) and 5 (corrected batches). Idempotent close() lets the wrapped on_complete_fn fire it safely on both success and error paths. Three tests cover the open/close lifecycle, idempotent close, and the pre-open guard. Tick Task 3 checkboxes in the plan. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Plan Task 4. Implements the raw-frame ingest path: * When write_raw=False (the default), on_raw_frame is a no-op after the pre-open guard. The hot path stays a single function-call cost for the common case where the user only wants corrected data. * When write_raw=True, frames are appended to a per-sink buffer under the lock. When the buffer hits raw_batch_size (default 200, ~5 s of data per side at 40 Hz across 8 cams), it flushes via ScanDatabase.insert_raw_frames (executemany over a single transaction). This keeps per-frame insert overhead amortised. * Float fields (timestamp_s, temp, tcm, tcl, pdc) are rounded to 6 decimals at insert time per project precision policy. Histogram blobs are stored verbatim (encoder may apply zlib compression depending on compress_raw_hist). * Two ordering nits handled: (1) the pre-open RuntimeError must fire before the write_raw=False early return, otherwise misuse against a default-constructed sink would be swallowed; (2) if on_raw_frame loses a race with close() and finds self._db is None inside the lock, drop silently rather than raise from a worker thread. Five new tests cover: write_raw=False no-op, per-call row inserts, batch-size flush behaviour, concurrent left+right writers (400 frames total), and 6-decimal rounding of every float field. Tick Task 4 checkboxes in the plan. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Plan Task 5 — the load-bearing piece. Each Sample in the CorrectedBatch becomes one session_data row via the bulk insert_session_data_rows path: * side encoding: 'left' → 0, 'right' → 1. Anything else is logged at WARNING, counted in self._insert_errors, and skipped (better than inserting a NULL-side row that breaks downstream queries). * Floats (bfi, bvi, contrast, mean, timestamp_s) rounded to 6 decimals to match the corrected CSV writer — gives Task 9 a clean cell-for- cell equivalence check. * Flush pending raw frames at the top of every corrected batch so raw and corrected rows land in roughly causal order (the corrected batch is computed from frames that may still be sitting in the raw buffer at this moment). * Empty batch is a no-op. * Insert errors from the DB are caught, logged, and counted; the callback never raises into the ScanWorkflow worker thread. Five new tests cover: standard 3-sample batch, raw-buffer flush ordering, empty batch no-op, 6-decimal rounding on every float field, and the unknown-side defensive skip. With Task 3 + 4 + 5 the full sink test file now runs 13/13 in ~0.2 s. Tick Task 5 checkboxes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Plan Task 6. Two new optional callbacks on ``ScanWorkflow.start_scan``, both no-ops when not supplied so existing callers are unaffected: * ``on_scan_start_fn(ts: str, session_start: float)`` fires in ``_worker`` immediately after the canonical YYYYMMDD_HHMMSS timestamp is computed. Lets a downstream sink (ScanDBSink for issue #92) open its session with the same label CSVs use and a wall-clock start captured at the same instant. * ``on_raw_frame_fn(side, cam_id, frame_id, timestamp_s, hist, temp, sum_counts, tcm, tcl, pdc)`` fires from the per-side ``_make_row_handler`` after the science pipeline has been fed. Reuses ``extra_cols_fn`` for the tcm/tcl/pdc telemetry so the DB sees the same values that land in the raw CSV writer. ScanWorkflow itself stays unaware of any database — it just emits the callbacks and a wrapper at the MotionInterface level (Task 7) will route them into ScanDBSink. Regression check: tests/test_corrected_csv_output.py — all 20 pass unchanged. Tick Task 6 checkboxes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Plan Task 7 — opt-in DB sink integration at SDK construction:
* MotionInterface.__init__ gains db_path: Optional[str] = None.
Stashed as self._db_path. Default (None) leaves SDK behavior
byte-for-byte identical to today — no sink built, no DB code in
the scan hot path.
* start_scan checks self._db_path and, when set, routes through a new
_wrap_kwargs_with_db_sink helper that:
- Constructs a ScanDBSink at the configured path (auto-mkdir of
parent dir so the caller can pass a fresh subdir).
- Composes user-supplied on_scan_start_fn / on_raw_frame_fn /
on_corrected_batch_fn / on_complete_fn with the sink's
callbacks (sink first, user second).
- Builds the session_meta provenance dict using the real APIs on
current next: self.console.get_version() /
self.left.get_version() / self.right.get_version() plus
get_cached_hardware_id() (falling back to get_hardware_id()),
hex-encoded for bytes results. Disconnect-time exceptions are
swallowed by a _safe_call wrapper so meta gets None
placeholders rather than crashing the scan.
* ScanRequest gains two minimal fields: write_raw_to_db: bool = False
(per-scan raw opt-in, only meaningful when the SDK has a db_path)
and notes: str = "" (attached to session_notes).
End-to-end smoke (no hardware): MotionInterface(db_path=...) →
_wrap_kwargs_with_db_sink → manually fire the four callbacks → DB has
1 session row with label "{ts}_{subject_id}", session_notes, full
17-key session_meta, and 1 session_data row from the corrected
batch. Parent dir auto-created.
Full regression sweep across the new + impacted tests
(test_scan_database, test_scan_db_sink, test_corrected_csv_output):
38/38 pass. Tick Task 7 checkboxes.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… (#92) The SDK was inconsistent about timestamp origin: ``Sample.timestamp_s`` carried the raw firmware-clock value (~seconds since sensor boot or clock reset), and only the corrected CSV writer normalized it to a per-scan 0-based origin — via a ``_corr_base_ts = min(timestamp_s)`` captured at the first complete corrected frame, then ``rel_ts = timestamp_s - _corr_base_ts`` per row, duplicated across three sites in ScanWorkflow._worker. Other consumers (the raw CSV writer, the on_uncorrected_fn / on_corrected_batch_fn / on_raw_frame_fn callbacks, the science pipeline, the brand-new ScanDBSink) all inherited the firmware-clock value, so per-scan time origins disagreed across outputs of the same scan. Promote the corrected-CSV convention to the SDK-wide one. Apply the offset at the single chokepoint where every per-scan sample is seen exactly once — inside ``parse_histogram_stream``, right after the firmware-rollover unwrap, before any consumer (raw CSV, row handler, science pipeline) reads ``sample.timestamp_s``. Mechanism: * MotionProcessing.parse_histogram_stream gains an optional ``t0_normalizer: Callable[[float], float] | None`` kwarg. When set, the unwrapped timestamp is replaced with the normalized value before any downstream read. Default None preserves the old raw-firmware-clock behavior for unrelated callers (stream_queue_to_csv_file etc.). * ScanWorkflow._worker constructs the normalizer as a double-checked- locking closure over a per-scan ``_session_t0: float | None = None`` and ``_t0_lock``. First writer thread to call it (whichever side fires first) captures t0; every subsequent call returns ``ts - _session_t0``. Passes the closure into both per-side parse_histogram_stream calls so they share one origin. * The three ``_corr_base_ts`` blocks in the corrected CSV writer are dropped; ``entry["timestamp_s"]`` is now already relative, so the writer emits it directly. * No API change to Sample, CorrectedBatch, ScanRequest, or any callback signature. The only observable behavior change is that ``Sample.timestamp_s`` is now seconds-since-scan-start on every emission path. The raw CSV's ``timestamp_s`` column also now starts at 0 (it used to inherit firmware-clock). * The full 38-test regression sweep still passes, including test_timestamp_starts_near_zero which is the explicit invariant this change enforces SDK-wide. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds docs/ScanDatabase.md covering the new SQLite scan-data endpoint shipped with the rest of issue #92: * TL;DR + opt-in model (MotionInterface(db_path=...) is off by default; CSVs are unaffected; DB is additive). * App-side configuration: scanDbEnabled / writeRawData / writeRawDataDurationSec keys in bloodflow-app's app_config.json, and which of those are startup-only vs live-toggleable. * Schema reference: sessions, session_data, session_raw, database_settings — column-by-column, with the session_meta JSON shape spelled out and the per-side timestamp-origin caveat called out. * Lifecycle diagram from MotionInterface.start_scan through ScanDBSink.close, including where raw frames buffer and where corrected rows flush. * ScanDBSink internals (lock, idempotent close, swallowed-exception semantics). * Querying examples — sqlite3 CLI with json_extract, ScanDatabase Python API, and the stream-db/db_browser.py GUI. * Relationship table comparing all the per-scan outputs the SDK writes side-by-side, with the per-side-clock note that motivates joining by frame_id rather than timestamp. * Forward note about schema versioning (no schema_version column yet; CREATE … IF NOT EXISTS makes existing DBs safe to re-open). Architecture.md gains a short callout under the ``ScanWorkflow`` section pointing at the new doc so anyone reading the layer overview finds the DB story. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The raw-write duration cap was wired through ``ScanRequest.raw_csv_- duration_sec`` into ``parse_histogram_stream``'s ``csv_deadline``, so the raw CSV writer correctly stopped emitting rows once the deadline fired. The DB sink shared the cap *flag* — bloodflow-app's connector passes ``write_raw_to_db=self._write_raw_data`` from the same master ``writeRawData`` switch — but the *duration cap* never reached the ``on_raw_frame_fn`` path. Result: a scan run with ``writeRawData=true``, ``writeRawDataDurationSec=30``, and ``scanDbEnabled=true`` would produce a 30 s raw CSV alongside a DB ``session_raw`` table populated for the entire scan length. For a 12-hour scan that meant ~13.8 M raw rows (~7–8 GB) when the user expected ~9.6 k rows (a few MB). Gate ``on_raw_frame_fn`` on the same ``_csv_stop_evt`` ``parse_histogram_- stream`` already flips when its deadline hits. Once either side's writer thread crosses the deadline, both CSV and DB stop accepting new raw frames so the two targets capture the same window. The science pipeline keeps receiving samples (we don't gate ``p.enqueue(...)``), so corrected output for the full scan is unaffected. docs/ScanDatabase.md updated to call out the cap covers both targets and to spell out the raw-row volume the DB would otherwise accumulate for an uncapped multi-hour scan. Full SDK regression sweep (test_scan_database + test_scan_db_sink + test_corrected_csv_output): 38/38 pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Step A of the unified storage-endpoint refactor — define the common
interface that every persistence target will satisfy, and migrate
ScanDBSink onto it. CsvSink (Step B) and the ScanWorkflow fan-out
refactor (Step C) build on this.
* New ``omotion.Sink`` ABC. Four hooks (``on_scan_start``,
``on_raw_frame``, ``on_corrected_batch``, ``on_complete``) all
default to no-ops so subclasses opt into only what they need. Hook
docstrings spell out the wire-up contract — ``timestamp_s`` is
already normalized to per-scan t=0 by ``parse_histogram_stream``,
``meta`` is the JSON-serialisable provenance dict, ``request`` is
the full ScanRequest so sinks can read flags / paths.
* ``ScanDBSink`` now inherits ``Sink`` and uses the new method names.
``open(*, label, start_ts, notes, meta) → int`` becomes
``on_scan_start(*, ts, session_start_ts, request, meta)``; the
sink composes ``label = f"{ts}_{request.subject_id}"`` and reads
``notes`` off the request itself. ``close(end_ts)`` becomes
``on_complete(result)`` and computes ``end_ts = time.time()``
internally so the workflow doesn't have to.
* ``MotionInterface._wrap_kwargs_with_db_sink`` switched to the new
method names. Existing tests + 1 new equivalence test all updated
to construct a minimal ``ScanRequest`` for the sink instead of
passing ``label`` / ``end_ts`` directly. Sink test count is
unchanged (13/13 + 5 ScanDatabase = 18/18 green).
* Drafted ``tests/test_db_matches_corrected_csv.py`` (the issue #92
plan's Task 9 equivalence test) — drives the corrected pipeline on
the canonical fixture CSVs, fans ``on_corrected_batch`` to both
``ScanDBSink`` and an in-memory CSV-merge builder, then asserts
per-cell equivalence. The test still has a row-ordering issue to
resolve, so the assertions aren't all green yet; it'll get fixed
alongside Step E (test matrix) once CsvSink is in place.
No behavior change for users of MotionInterface — only the Sink-side
API tightened. CSV writing is still baked into ScanWorkflow at this
commit; Step B extracts it.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Step B1 of the storage-endpoint refactor — add a ``CsvSink``
implementation of the ``Sink`` protocol that owns the corrected-CSV
merge + flush logic. Raw-CSV and telemetry-CSV ports follow in B2/B3.
* New ``omotion.CsvSink`` (Sink subclass).
- ``on_scan_start`` opens ``<data_dir>/{ts}_{subject_id}.csv``,
writes the header (5 metrics × 16 (sides × cams) = 80 data cols),
and primes the per-frame buffer + expected-cell set from the
request masks. Honors ``request.write_corrected_csv`` and skips
silently when disabled.
- ``on_corrected_batch`` does the merge: for each Sample, either
initialises a per-``absolute_frame_id`` entry (timestamp = the
sample's timestamp_s) or refines the existing entry's timestamp
with the per-sample minimum. After every batch it flushes the
frames whose values dict contains all of the expected
``{bfi,bvi,mean,contrast,temp}_{side}{cam+1}`` cells.
- ``on_complete`` drains any frames still waiting on cells as
partial rows (same as the inline writer did at scan end) and
closes the file handle. Idempotent.
- Reduced-mode path averages all active cameras per side per frame
and writes only ``bfi_left/right`` / ``bvi_left/right`` columns,
matching the existing inline reduced-mode behavior.
* The logic is ported verbatim from the pre-extraction
``ScanWorkflow._on_corrected_batch`` code. ScanWorkflow's inline
path is unchanged in this commit — both code paths now exist in
parallel. B2 cuts ScanWorkflow over to fan events to sinks
(including CsvSink) and removes the inline copy once the
byte-identical equivalence test below confirms no drift.
* ``tests/test_csv_sink.py`` (7 tests):
1–6. Standalone shape + lifecycle (header, no-file when disabled,
complete-frame flush, hold-incomplete-until-all-cams-report,
partial flush at scan end, reduced-mode averaging).
7. **Byte-identical equivalence** — drives the corrected pipeline
on the canonical fixture scan CSVs, fans every
``CorrectedBatch`` to BOTH a ``CsvSink`` and an in-test
``_InlineCorrectedMirror`` that replays the exact pre-extraction
ScanWorkflow logic (same 6-decimal rounding, same column set,
same complete-row gating). Diffs the two output files byte-for-
byte. They match.
Existing suites all green: 5 ScanDatabase + 13 ScanDBSink + 7
CsvSink + 20 corrected_csv_output = **45/45 passing**.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Step B2 — extract the raw-CSV writing path into CsvSink.on_raw_frame.
ScanWorkflow still owns the live raw-CSV writers inline; the cut-over
happens in B4. This commit puts the destination code in place +
tests so the cut-over is a simple substitution.
* CsvSink.on_scan_start now opens one ``{ts}_{subject}_{side}_mask{XX}_raw.csv``
per active side (mask != 0), gated by request.write_raw_csv. Header
row mirrors ScanWorkflow._RAW_CSV_HEADERS exactly: ``cam_id, frame_id,
timestamp_s, 0..1023 (histogram bins), temperature, sum, tcm, tcl,
pdc`` — 1032 cols.
* CsvSink.on_raw_frame unpacks the 4096-byte histogram blob via a
precomputed ``struct.Struct('<1024I')`` and writes the row to the
matching side's csv.writer. Cap honored: once ``timestamp_s >
request.raw_csv_duration_sec`` we close that side's writer cleanly
on a frame boundary, so the on-disk file ends at exactly the
configured window (same semantics ScanWorkflow's csv_deadline +
csv_stop_event combo implements today).
* CsvSink.on_complete closes any raw writers still open (writers that
hit the cap are already closed in-line). Internal _raw_lock serialises
the left/right writer-thread access just like the inline raw CSV
writer used.
* tests/test_csv_sink.py: 6 new tests covering writer open, no-file
when write_raw_csv=False, no-file for inactive sides, per-call row
writes (including bin verification + telemetry-cell verification),
duration-cap behavior (41 rows for a 1.0 s cap at 40 Hz), and
silent drop when called before on_scan_start. 13/13 total CsvSink
tests green.
* docs/ScanDatabase-HardwareVerification.md: the three-pass test plan
for verifying the issue #92 storage refactor on real hardware once
B2-B5 are pushed. CSV-only mode regression check, both-modes
equivalence with DB↔CSV cross-check, DB-only mode (after Step B5),
plus smoke checks for History modal, Visualize buttons, reduced
mode, and contact-quality.
Full sweep: 45 prior + 6 new raw-CSV = 51/51 SDK tests green.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
ScanWorkflow's inline per-frame merge / write / drain for the corrected
CSV is now a thin pass-through to CsvSink.on_corrected_batch. The
~180-line block (state init, file open, _on_corrected_batch CSV merge in
both normal and reduced modes, final partial-row flush) goes away in
favor of three CsvSink hook calls: on_scan_start, on_corrected_batch,
and on_complete.
The 6-decimal rounding policy, the "flush only when every expected
(side, cam) cell has reported" gating, the partial-row drain at scan
end, and the reduced-mode per-side averaging are unchanged — that's
the property the existing equivalence test (test_csv_sink::
test_csv_sink_matches_inline_corrected_csv) was set up to catch and
keeps passing here.
Raw CSV writing is still inline (parse_histogram_stream's csv_writer +
the per-side writer threads); CsvSink is constructed with
enable_raw=False so it owns only the corrected target for now. Step
B4b will flip the flag and remove the inline raw path. The
enable_corrected/enable_raw constructor knobs come out then.
ScanResult.corrected_path now sources from CsvSink.corrected_path, so
its behavior is unchanged ("" when write_corrected_csv is False or the
file fails to open; the real path otherwise).
Tests (76 across test_corrected_csv_output, test_csv_sink,
test_scan_db_sink, test_pipeline_csv) green. The hardware-attached
tests in tests/test_sequences.py and tests/test_comm_paths.py are
unrelated and were already red before this change.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
ScanWorkflow's per-side ``_writer`` thread no longer opens / writes / closes the raw histogram CSV. CsvSink.on_raw_frame now owns the file handles, header, ``raw_csv_duration_sec`` cap, and on-frame-boundary close. The writer thread keeps existing because it still drives ``parse_histogram_stream`` — which fans every frame into the ``_on_row`` callback that fires every connected sink in turn: 1. ``_csv_sink.on_raw_frame(...)`` — CsvSink (always present) 2. ``on_raw_frame_fn(...)`` — ScanDBSink + user callbacks The duration cap used to be plumbed via a shared ``_csv_stop_evt`` that the inline CSV writer set when it hit its deadline (commit c06263d); both that event and ``_trigger_armed_evt`` are gone. CsvSink times its own cap from the first frame it sees, and ``_on_row`` does a direct ``ts > raw_csv_duration_sec`` check before firing the external raw-frame hook so the DB sink stays in sync. The ``enable_corrected`` / ``enable_raw`` toggles on CsvSink stay for one more commit — pruning them is on the post-merge cleanup list. left_path / right_path / on_side_stream_fn now source from CsvSink.raw_paths so ScanResult and external listeners see exactly what the sink chose. The same 76 tests across test_corrected_csv_output, test_csv_sink, test_scan_db_sink, and test_pipeline_csv stay green; the raw-CSV unit tests in test_csv_sink::TestRawCsv (file-open, no-file-when- disabled, no-file-for-inactive-side, header + bin verification, 41- row cap at 1.0s / 40 Hz, silent-when-not-started) exercise the path this change now relies on. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The cross-endpoint equivalence test assumed ``session_data`` rows appear in the order the science pipeline emitted their corresponding Samples — but the dark-frame intervals for left and right close independently, so corrected batches interleave by side. Walking ``zip(emitted, db_rows)`` started failing at row 0 because the first DB row was a right-side sample and the first emitted Sample was a left-side sample. Switch to a content-based lookup keyed by (side, cam_id, timestamp_s). Each emitted Sample finds and pops its matching DB row; the cell-for-cell field comparisons (bfi / bvi / contrast / mean, 6-decimal rounding) stay byte-identical. The cross-CSV check below already used per-(frame, side, cam) lookup so it didn't have this bug. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Step F — add ``frame_id`` to ``session_data``: The corrected CSV writer uses ``Sample.absolute_frame_id`` as the merge key (one row per frame, every active (side, cam) cell merged in). Until now, ``session_data`` only carried the per-side timestamp, which has ~6 ms clock skew between sides — too noisy to merge by. The new column lets read_session reproduce the exact per-frame row layout the CSV writer would have produced. ``_init_schema`` now runs an idempotent ALTER on legacy DBs (column defaults to -1, a sentinel SessionPlayback uses to detect "this DB predates the migration and can't be played back"). Fresh DBs get the column via CREATE TABLE. Index on (session_id, frame_id) is created unconditionally so both paths can lookup-by-frame fast. Step D — ``omotion.materialize_corrected_csv(db, sid, out)``: New module ``omotion/SessionPlayback.py`` reads ``session_data`` for a session and writes a corrected-format CSV value-equivalent to what ``CsvSink`` would have written live. Reads ``session_meta.sdk_flags .reduced_mode`` for column layout; cells without DB rows (inactive cams or missing temperature/std — DB doesn't carry those) are left empty, matching what ``plot_corrected_scan.py`` already tolerates. The streaming merge buffers one frame at a time and flushes when ``frame_id`` advances — same per-frame ``min(timestamp_s)`` rule as CsvSink. Tests: - ``test_materialize_corrected_csv_matches_live_merge`` — drives the fixture pipeline into ScanDBSink, materializes, and asserts every bfi/bvi/contrast/mean cell matches an in-memory inline merge within 1e-6. - Plus the unknown-session and pre-Step-F sentinel-detection cases. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CsvSink's ``enable_corrected`` / ``enable_raw`` constructor flags were only there to let ScanWorkflow cut the inline writers over in two commits (B4a corrected, B4b raw). Both inline paths are gone now, so the flags are dead code — and they invited confusion about whether they should be exposed to MotionInterface callers (they shouldn't; ``write_corrected_csv`` / ``write_raw_csv`` on the ``ScanRequest`` are the user-facing knobs). Also refresh the now-stale rollout-era comments in CsvSink + the ScanWorkflow construction site to describe today's design instead of the migration plan. docs/ScanDatabase.md: new "Playback — rebuild a corrected CSV from the DB" section documenting ``materialize_corrected_csv``, plus a Step F sub-section under "Notes on schema evolution" explaining the ``session_data.frame_id`` migration and what the -1 sentinel means for legacy sessions. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When ``scanDbEnabled`` opens a ScanDBSink for a scan, the assigned session id is currently hidden inside the wrapper closure in ``_wrap_kwargs_with_db_sink``. The bloodflow-app's History modal needs it to push post-scan notes edits back into ``sessions.session_notes`` without having to look the session up by label. Track the active sink on ``self._active_db_sink`` for the duration of the scan (set in ``_wrap_kwargs_with_db_sink`` after sink construction, cleared in the sink's ``on_complete`` wrapper) and expose its session id via ``MotionInterface.active_db_session_id`` (None when no scan is recording to the DB). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
User's on_complete fires AFTER we clear ``self._active_db_sink``, so a synchronous handler reading ``active_db_session_id`` always saw None. The bloodflow-app uses that property to capture the just-completed session id for pushing post-scan notes into session_notes — net result was the connector silently dropping every notes write. Swap the order: run the user callback first, then clear. Anything asynchronous the user kicked off can still read the id while their handler is on the stack. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two unused code paths from before CsvSink owned all CSV writing: 1. ``stream_queue_to_csv_file`` — high-level wrapper that opens a file and feeds parse_histogram_stream with a csv.writer. Never called anywhere in the codebase. Deleted. 2. The ``csv_writer`` / ``csv_deadline`` / ``csv_stop_event`` / ``on_csv_closed_fn`` parameters on ``parse_histogram_stream`` itself, plus the ``extra_cols_fn`` it used to assemble CSV rows. ScanWorkflow's writer thread already passes all of these as None (#92 B4b moved every CSV-writing responsibility into CsvSink), so the deadline machinery + per-frame ``_csv_active`` check + the ``sample.to_csv_row()`` call are dead. Stripped. What's left in parse_histogram_stream is the actual job: timestamp unwrapping, t0 normalization, packet parsing + resync, and the ``on_row_fn`` fanout. ~130 lines lighter and the function signature is now honest about what it does. The 80-test sweep across CsvSink, ScanDBSink, pipeline, session playback, and DB↔CSV equivalence stays green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Findings writeup, supporting plots, and the reproducible ``generate_plots.py`` for the data-pipeline experiment that's characterizing dark-frame drift to support real-time dark correction (instead of today's wait-for-two-darks-then-interpolate post-hoc batched approach). What's in data-processing/dark-drift-study/: * findings.md — full writeup with embedded plots * 01..08 PNGs — drift over time / vs temperature / fits / gain * generate_plots.py — single-entry-point reproducer * calibration.json — per-camera quadratic coefficients (u1_poly, std_poly, gain, T_range, RMSE) Headline findings: * std drift is the dominant signal (Δstd 0.9–6.6 bin over 60 min; contrast = std/mean, so std error feeds straight into BFI/BVI). * u1 drift is smaller (Δu1 0.14–0.49 bin) but real. * Sensor temperature is essentially the only driver — correlations 0.88–0.96 for std vs T, no hysteresis. * Per-camera quadratics fit both u1(T) and std(T) to RMSE 0.05–0.21 on std and 0.02–0.03 on u1 — at or near the per-frame measurement noise floor. * Firmware gain table [16,4,2,1,1,2,4,16] partially explains cam-to-cam variation; std² ≈ a² + (gain·b(T))² physics-style decomposition fits the structure. Also tweak ``scripts/plot_dark_drift.py`` labels to use 1-indexed cam IDs (cam2/cam3/cam6/cam7) instead of raw-CSV 0-indexed values, matching the firmware / QML / docs naming convention. Next experiment per findings.md § "Next steps": cold-start reproducibility scan to verify the per-camera f(T) curves are run-invariant. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Spec for capturing one photodiode-current sample per camera frame via a console-firmware ring buffer drained by the existing ConsoleTelemetry poller, with the telemetry CSV becoming per-frame. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Phase 2 of the dark-drift study — picks an online estimator that
needs no per-device calibration, validates it against the long-scan
recording, and proposes an SDK integration plan.
What's new in data-processing/dark-drift-study/:
* online_estimators.md — full Phase 2 writeup. The picks:
- u1 = avg of last 3 darks (RMSE 0.020 bin)
- std = linear extrap in time (RMSE 0.022 bin, flat across
steep-rise and asymptotic
regimes both)
Backed by a strategy bake-off (zoh1, avg3/5/10, linear-time,
linear-T, avg3-on-std) and a dropped-dark stress test (random
rate sweep 0-50% and burst gaps 1/3/5 darks at two scan
timepoints). Headline robustness finding: predictors don't need
a fallback layer for normal operation — u1 is essentially
drop-immune, std RMSE stays under 0.05 bin even at 30% drops.
* integration_proposal.md — implementation plan. Adds a new
``on_realtime_corrected_fn`` callback to ``create_science_pipeline``
that fires per non-dark frame with predicted-dark correction
applied. Purely additive — the existing batched-interpolation
path that feeds the saved CSV / session_data stays as the ground
truth. Bloodflow app opts in via a ``realtimeDarkCorrection``
config flag, default-false at first.
* simulate_online_estimators.py — reproducer for the strategy
bake-off. Leave-one-out simulation: for each scheduled dark,
predict it using only earlier darks; aggregate RMSE per strategy.
* simulate_drop_stress.py — reproducer for the dropped-dark stress
test. Synthetically removes darks from the visible history,
re-runs the predictor, measures degradation.
* 11..14 PNGs — supporting plots embedded in online_estimators.md.
Phase-1 findings.md picks up a header note pointing forward to the
superseded recommendation. The drift / temperature / polyfit / gain
characterisation stays valid as physics context for the online
design.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
OpenMOTION's dark-frame scheme retimes the laser pulse to fire outside the camera exposure window rather than disabling the laser. The safety FPGA still produces a fresh peak_power_value on every frame, so the PDC stream is fully populated for bright and dark slots alike. Updated: - Background: laser always fires; dark = long-slot timing. - Producer ISR: hook LASER_TIMER period-elapsed (falling edge) instead of LSYNC_DelayElapsedCallback (rising edge), so the read lands after the FPGA's averaging window completes. - Tag bit semantics: bit 0 is now long_slot, not laser_enabled. - New current_slot_is_long flag maintained in FSYNC ISR; open question flags possible off-by-one between slot decision and pulse it gates. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Rename long_slot to dark_slot throughout (matches SciencePipeline terminology so analysts find one consistent name across CSVs). - Add a dedicated dark_slot bool column to the telemetry CSV alongside the pdc_flags bitfield, so downstream analysis can filter without unpacking bits. - Add a "Two sources of dark-frame truth" section explaining how the new firmware-truth signal relates to the science pipeline's derived dark-frame schedule, and that disagreements are a useful diagnostic. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds a third output stream to ``SciencePipeline`` alongside the
existing uncorrected (per-frame) and batched-corrected (per-dark-
interval) streams: a real-time corrected stream that fires
immediately per non-dark frame, with dark subtraction applied
using *predicted* darks rather than observed-and-interpolated darks.
The predictors are:
* u1 = mean of the last 3 observed darks (avg-of-last-3)
* std = linear extrapolation in time through the last 2 darks
These were picked and validated against a 60-min long-scan recording
in ``data-processing/dark-drift-study/online_estimators.md`` —
prediction RMSEs of 0.020 / 0.022 bin, robust to 30+% dropped
darks and 5-burst gaps.
Architecture:
* New constructor / factory parameter ``on_realtime_corrected_fn``
(default None — no behavior change for existing callers).
* Per-(side, cam_id) ring buffer ``_realtime_dark_history`` of
depth ``realtime_dark_history_size`` (default 4) storing
``(ts, u1, std)`` tuples for the most recent darks.
* On every dark frame the history gets a push; on every non-dark
frame, when the new callback is set, we predict + emit.
* Warmup gate: needs ≥ 2 observed darks for the std slope. Until
then the realtime stream stays silent and callers see only the
existing uncorrected stream (i.e. the pre-existing behavior).
* Same dark-subtraction + shot-noise + bfi/bvi calibration
arithmetic as the batched path, so the two streams are
numerically equivalent on real-world data modulo the predictor
RMSE.
The existing batched-interpolation stream (``on_corrected_batch_fn``)
is unchanged. It remains the source of truth for the saved
corrected CSV and ``session_data`` rows. The new realtime stream is
purely additive — meant for the bloodflow app's live plots so the
user sees corrected BFI/BVI from ~15 s into every scan instead of
waiting for batched batches.
Tests:
* tests/test_realtime_dark_estimator.py — 7 unit tests covering
the predictor math, warmup gate, history ring buffer, and
edge cases (zero-dt fallback).
* tests/test_realtime_dark_equivalence.py — fixture-driven smoke
test verifying the realtime stream wires through the pipeline
end-to-end with sane sample shapes. (Strict batched-vs-realtime
equivalence isn't asserted on this fixture because batched uses
a retroactive terminal-dark interpolation the realtime path
can't see by design — see the docstring for the full rationale.
Real validation is on hardware.)
App wiring follows in a separate commit. Default ``realtime_dark_
history_size=4``, default ``on_realtime_corrected_fn=None`` — purely
opt-in until the bloodflow-app side lands.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Task 14 (Phase D, pipeline cutover):
- New module omotion/ContactQualityWorkflow.py with three public types:
CamCQResult, ContactQualityResult, ContactQualityWorkflow.
- _ContactQualitySink (channels={"rolling"}) collects per-camera rolling-BFI
samples and evaluates them against dark/light thresholds supplied by the
caller. Skips warmup/stale/dark frame types.
- ContactQualityWorkflow.check() runs a short scan via the sink-based
ScanRequest API (sinks=[sink], skip_default_storage=True, rolling_avg_enabled=True)
and calls await_complete() for synchronous completion.
- 12 unit + end-to-end tests in tests/test_contact_quality_workflow.py cover
all threshold cases (ok, below_dark, above_light, no_signal), frame-type
filtering, channel gating, multi-camera pass/fail aggregation, and the
workflow's request shape.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Task 15 (Phase D, pipeline cutover): - Add _cq_workflow = None to __init__ alongside the existing _scan_workflow and _calibration_workflow slots. - Add contact_quality_workflow @Property that lazy-loads ContactQualityWorkflow wired to the same scan_workflow instance (shared scan-running lock). - Two new tests in test_motion_interface.py: lazy-load returns a ContactQualityWorkflow instance and is cached; cq._scan_workflow is the same object as motion.scan_workflow. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…nner Constructs default_pipeline + LiveUsbSource + auto-injected storage sinks + auto-wired telemetry source. Deletes ~440 LOC of legacy SciencePipeline construction and reduced-mode interception closures. The collector sinks added by Phase D (CalibrationWorkflow, ContactQualityWorkflow) are now actually fed data via the runner; their legacy callbacks were dormant before. - ScanRequest.disable_laser now has a default (False) so Phase E tests can omit it; adds batch_size_frames field (default 10) - self._runner set synchronously before worker thread spawns so callers can assert on runner attributes immediately after start_scan returns - Legacy callback kwargs silently discarded with a warning (Phase G will migrate CalibrationWorkflow / app callers to the sink API) - MotionInterface.start_scan simplified: no longer wraps db_path into callback chain (scan_db_path on the interface auto-injects ScanDBSink) - cancel() / cancel_scan() updated to close source + telemetry_source - await_complete() now also usable via new _scan_thread alias Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Deletes ~1500 LOC: SciencePipeline class, FrameIdUnwrapper, all _emit_*, _calibrate_*, _check_dark_integrity, compute_realtime_metrics, create_science_pipeline factory, feed_pipeline_from_csv. The new pipeline package owns this logic now (since Phase E commit 0ac12f1). Kept: parse_histogram_stream + parse_histogram_packet_structured (still used by LiveUsbSource._reader_loop), _rle_decompress + _util_crc16 helpers, Sample + CorrectedBatch + HistogramSample dataclasses, all constants (EXPECTED_HISTOGRAM_SUM, PEDESTAL_HEIGHT, ADC_GAIN, etc.). Also removes dead create_science_pipeline/parse_histogram_stream imports from ScanWorkflow.py (leftover from Phase E rewrite). Deletes 12 legacy test files that exercised SciencePipeline directly; their coverage now lives in tests/test_pipeline/. Adds tests/test_motion_processing_shim.py asserting the shim contract. LOC: 1941 -> 752 in MotionProcessing.py. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Phase C wired the source based on a planned API that didn't match the actual MotionConsole. The poller exposes telemetry via console.telemetry (a ConsoleTelemetryPoller), not console.poll_telemetry(). Fix the method name and field mapping: - Call console.telemetry.get_snapshot() instead of poll_telemetry() - Map ConsoleTelemetry fields (timestamp, pdc, tec_set_raw, tec_v_raw, etc.) to TelemetryEvent fields - Add time.sleep() between iterations so source doesn't busy-loop - Handle None snapshot (poller hasn't seen data yet) - Update test mock to use correct interface Symptom: telemetry thread crashes with "AttributeError: 'MotionConsole' object has no attribute 'poll_telemetry'" whenever a TelemetrySink is in the request's sinks list. Now caught in test_console_telemetry_source_*. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The new source accessed sensor.histo_stream, which does not exist on MotionSensor — the correct path is sensor.uart.histo (the StreamInterface on the MotionComposite). This blocked every scan with an AttributeError. Also restore the legacy stop sequence so the final DMA-buffered frame is not lost: trigger off -> ~0.35 s wait -> stop_streaming -> drain_final drain_final's recovered chunks are pushed back into the per-side packet queue so the parser can consume them before the reader thread exits. Update _FakeSensor in test_sources.py to match the new accessor.
Thresholds and per-camera results are now background-subtracted DN (display_mean from PedestalSubtractionStage), matching the legacy ContactQuality module. Two failure modes mirror the legacy semantics: AMBIENT_LIGHT: any dark frame display_mean > dark_threshold POOR_CONTACT : rolling avg of light display_mean < light_threshold The sink now subscribes to live (not rolling) so it sees every frame's display_mean and bucket by frame_type. It maintains its own rolling window per camera for the light-frame check (legacy did the same — see omotion/ContactQuality.py @ 506c856). CamCQResult field rename: avg_bfi -> light_avg_dn (+ new dark_max_dn). Reason rename: above_light -> ambient_light, below_dark -> poor_contact. Reverts the unintentional BFI-scale interpretation introduced in PR 2; bloodflow-app's [3.0, 15.0] DN defaults are honoured as DN once again.
…try refactor
C1 - CalibrationWorkflow._run_subscan_capture stops passing dead legacy
kwargs (on_corrected_batch_fn / on_dark_frame_fn / on_complete_fn /
log_dark_endpoints) to start_scan. _CalibrationCollectorSink now
subscribes to final (light samples) AND live (dark frames),
converts EnrichedCorrectedFrame -> Sample and pedestal-subtracted
display_mean -> dark Sample, and the workflow drains the sink after
await_complete instead of relying on callbacks that no longer fire.
ScanWorkflow gains `last_scan_error` and `last_scan_canceled` props
so callers can detect failure without callbacks.
C2 - CsvSink._consume_raw and ScanDBSink._consume_raw now read
temperature_c[i, side_idx, cam_id] instead of [i, 0, cam_id], so
right-side frames no longer record 0.0 deg C.
C3 - TelemetryIngestStage populates batch.pdc/tcm/tcl as numpy arrays
(NaN for missing pdc, 0 for missing counters) instead of Python
lists with None placeholders. The raw sinks previously crashed with
TypeError on every frame when telemetry was running (swallowed by
_safe_consume -> all raw rows silently dropped).
C4 - LiveUsbSource.close ordering rewrite. Previous fix set _stop before
drain_final, so parse_histogram_stream could exit before drained
chunks were enqueued AND the runner could exit __iter__ before any
resulting FrameBatch was delivered. New sequence:
stop_streaming -> drain_final + enqueue chunks
set self._stop -> parse drains queue and exits
join readers -> all batches are now on _batch_queue
push None sentinel -> __iter__ delivers them, then exits
Telemetry refactor (folded in because C3 needed it):
* new omotion/console_telemetry_conversions.py — TEC ADC->C/V/A math
+ 10K3CG R-T lookup table moved out of bloodflow-app
* TelemetryEvent: tec_setpoint_c/tec_actual_c now real C (converted
via SDK helper); +tec_setpoint_raw/tec_actual_raw; +tcm/tcl
counters; -console_temp_c, -fan_rpm (not in ConsoleTelemetry
snapshot)
* TelemetrySink CSV gains tec_*_raw + tcm/tcl columns
Bonus: TelemetryIngestStage.reset no longer wipes the aggregator —
telemetry history must outlive transient pipeline exceptions.
…h, runner robustness, guard race Three IMPORTANT (not CRITICAL) correctness fixes flagged on feature/pipeline-cutover at commit cc575d8: 1. FrameBatch.side_ids — sources now stamp each row with its source- assigned side (0=left, 1=right). FrameClassificationStage and DarkCorrectionStage read batch.side_ids[i] instead of inferring side via np.argmax(raw_histograms[i].sum(...)). The argmax path silently defaulted to 0 whenever a row's histogram was all zeros (firmware drop, USB stall, dropped packet), so right-side dropped frames were misrouted into the left-side unwrapper / dark history. 2. ScanWorkflow._duration_guard cancel race — when cancel_scan() runs mid-scan, source.close() and stop_trigger() have already fired synchronously. The guard's poll loop used to wake up and re-run stop_trigger + sleep(0.35) + source.close() unconditionally, which blew up on torn-down USB endpoints and blocked on the drained batch queue's sentinel slot. The guard now checks source._stop before continuing and exits if the source is already closing. The fallback close() is also try/except'd so it can't kill the worker. 3. ScanRunner.on_scan_start sink failures — a sink whose on_scan_start raised was still kept in self.sinks, so consume() and on_complete() subsequently ran against a partially-initialized object (no _meta, no open file handle). Failed sinks are now tracked in _failed_sinks and skipped by _sinks_for() + the on_complete loop. Bonus check (#4 in the review): CQ and Calibration workflow sinks already iterate ("left", "right") correctly via enumerate, so the right_camera_mask path was sound — no fix required. Tests: - tests/test_pipeline/test_classify_stage.py: test_zero_filled_row_keeps_source_assigned_side - tests/test_pipeline/test_dark_correction_stage.py: test_zero_filled_row_routes_to_source_assigned_right_side - tests/test_pipeline/test_runner.py: test_runner_skips_sink_whose_on_scan_start_raised test_runner_failed_sink_does_not_receive_diagnostic_or_final_events - tests/test_scan_workflow.py: test_duration_guard_skips_redundant_stop_trigger_and_close_on_cancel (verified to fail without the ScanWorkflow.py change) pytest tests/test_pipeline tests/test_calibration_workflow.py tests/test_contact_quality_workflow.py -m "not sensor" -q → 160 passed (up from 156). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
cancel_scan() and the duration-guard's stop-trigger fallback can both race to call source.close(). The previous implementation set self._stop at the END of close() (after drain_final), so a concurrent caller's 'source already closing?' check saw False mid-close and ran its own stop_streaming + drain_final, producing two 'Streaming stopped — N USB read chunk(s) received' log lines per side. Gate close() on self._stop at entry: the first caller does the work, subsequent callers no-op. parse_histogram_stream still drains its queue via 'not stop_evt.is_set() or not q.empty()', so setting stop early does not lose drained bytes — drain_final's chunks land in the packet queue before the parser exits.
Two related bugs were causing multi-scan workflows to silently lose
right-side data after the first scan:
1. Sequential per-side teardown in LiveUsbSource.close serialized
stop_streaming + drain_final per side. While the LEFT side drained
(~350ms - 2s), the RIGHT MCU kept pumping data into its USB host
endpoint buffer. By the time RIGHT got its stop, the buffer was
full and the still-running stream loop was stuck in a blocking
dev.read it couldn't release for many seconds. drain_final then
raced into the same endpoint and got a pipe error (errno 32),
sometimes leaving the stream thread alive in a half-dead state.
Fix: tear down both sides in parallel threads.
2. StreamInterface.start_streaming silently bailed when self.thread
was still alive ('Stream already running'), so the next scan's
packet_queue was never wired to USB reads and that side's data
went nowhere. Symptom: scans 2+ would have no right_mask*_raw.csv
and packets_received would report a stale count from scan 1.
Fix: when the old thread is alive at start_streaming, force-stop
it (set stop_event, join with timeout) and start a fresh thread
anyway. Log loudly so the orphan is visible.
Also tightened the inner stream loop:
- bounded data_queue.put (timeout=1.0) so the loop can't block
forever on a stopped/slow parser
- stop_streaming logs a warning when join times out so an orphan
shows up in the run log rather than dying silently
Removes the in-pipeline telemetry path entirely so we can stabilise the
core scan-data flow without the added complexity:
- omotion/pipeline/telemetry.py (deleted)
- tests/test_pipeline/test_telemetry.py (deleted)
- tests/test_pipeline/test_telemetry_sink.py (deleted)
- TelemetryEvent + tcm/tcl/pdc fields stripped from FrameBatch
- TelemetryAggregator/TelemetryIngestStage references stripped from
factory.py, pipeline.py, runner.py
- TelemetrySink + raw-CSV tcm/tcl/pdc columns removed from sinks.py
- ScanWorkflow no longer wires up a telemetry source / sink
- ContactQualityWorkflow / dark stage updates to match
- test fixtures + goldens regenerated against the simplified pipeline
Telemetry capture will come back in a later branch once the scan path
is stable. The ConsoleTelemetry poller still exists on the console
device itself — only the per-scan ingest/sink wiring is gone.
Symptom: 14s scans (under dark_interval=600 = 15s) produced no corrected CSV, and the terminal-dark flush logged 'no terminal dark frame found ... last frame abs_id=573 u1=219 exceeds dark threshold'. Cause: close() sets self._stop at entry (for cancel/duration-guard race safety) and then spends ~2s draining the firmware DMA backlog via drain_final. The drained data contains the dark frames captured after stop_trigger turned the laser off — which the terminal flush needs to close the trailing open interval. But __iter__ exited as soon as _batch_queue was empty for 1s with _stop set. The parser thread emits a batch only every ~250ms, so a queue-empty window of >1s during the 2s drain caused __iter__ to return early. on_scan_stop then ran before the drained dark frames had been processed into pi._light. The terminal flush looked at the last (still-light) frame, failed the dark-threshold check, and silently dropped the entire interval — no IntervalClosed event, no 'final' channel output, no corrected CSV. Fix: __iter__ now waits for the explicit sentinel (None) that close() pushes after teardown is complete. Stop alone is not a termination signal — close() is bounded (10s teardown + 5s reader join), so blocking on the queue indefinitely until the sentinel arrives is safe. Safety hatch: if 15s elapse past _stop with no sentinel (close() crashed), break out with a warning so the runner can't deadlock. Also moved the sentinel push into a try/finally so an exception mid-teardown still releases the runner.
Symptom: second scan in a row fails at the flash stage with 'left camera N not READY for FPGA/config.' Cause: pipeline cutover reordered teardown so source.close() (stop_streaming + drain_final) runs before disable_camera. After stop_trigger the laser is off but the camera FSIN scheduler keeps capturing dark frames into the firmware DMA buffer. With no host reader, that buffer accumulates until disable_camera finally fires in the worker's finally block ~3s later. The camera firmware ends up in a half-state that fails the next scan's READY check. Restore the legacy sequence in _duration_guard: stop_trigger -> 0.5s -> disable_camera per side -> 0.35s -> close Calling disable_camera first stops the DMA being filled, so close() drains a small/empty buffer instead of fighting the still-running firmware, AND the camera lands in clean standby ready for the next flash. The worker finally still calls stop_trigger + disable_camera as idempotent safety nets for the cancel_scan path (which bypasses the guard via source_already_closing). Dropped the redundant 0.5s sleep from that fallback path; cancel_scan is synchronous and doesn't need the wait. Bonus: this should also significantly reduce the ~88-chunk drain backlog we'd been seeing — the firmware now stops producing during the 0.35s pre-close wait rather than continuing to fill DMA.
…scan Symptom: pressing Stop took ~3s before the scan actually stopped. Cause: cancel_scan called cancel() (which fires source.close()) before disable_camera. Same root issue as the not-READY bug from e485912: after stop_trigger the camera FSIN scheduler keeps capturing into the firmware DMA buffer, and source.close()'s drain_final has to wait out 2-3s of buffered data with the firmware actively producing more. The duration-guard path already does the right order (stop_trigger -> 0.5s -> disable_camera -> 0.35s -> close). cancel_scan now mirrors it. Snapshot active (side, mask, sensor) tuples to self._scan_active_sides in the worker so cancel_scan can read them without re-resolving the masks from the request. Empty list before the worker has set them up, which fast-paths cancel_scan to just cancel() + join (no harm if the worker hasn't gotten that far yet). Expected behavior: stop now returns in ~1s instead of ~3s. The 0.5s post-stop_trigger gives the laser controller time to disengage; the 0.35s post-disable_camera flushes the (now small) DMA buffer; then close() drains an empty endpoint and returns immediately.
Symptom: live plot's first sample spiked to BFI ~ 10, then dropped
to the actual scan baseline (~0.5 on a static phantom).
Cause: the first dark frame is emitted with mean_dc_rt = NaN
(no prior light frame to hold over via _last_realtime). ShotNoise
then did:
contrast = np.where(mean > 0, std_sn / mean, 0.0)
NaN comparisons evaluate False, so 'mean > 0' was False for NaN
input. The else branch fed back a literal 0.0 contrast, which
BfiBviStage turned into BFI = (1 - 0/C_max) * 10 = **10** — a
"perfect signal" reading for what was actually 'no signal'.
Fix: split the where into three cases — normal (mean > 0), zero/
negative real mean (legacy zero contrast), and NaN mean (propagate
NaN downstream). BfiBvi's math then carries NaN through to
bfi_live, and the plot sink can skip-or-render-gap instead of
plotting a fake max-value spike.
The corrected CSV was being written as {scan_id}_corrected.csv (just
timestamp, no subject), but the raw CSVs include subject_id:
{scan_id}_{subject_id}_left_mask*_raw.csv
Bloodflow-app's History modal calls get_scan_list() which globs the
data dir, strips the _corrected suffix from each canonical CSV's
stem, and uses what's left as the scan_id. With no subject in the
filename, scan_id ended up as a bare YYYYMMDD_HHMMSS string. That
broke two things:
1. The dropdown showed only date/time entries — no user label.
2. get_scan_details() regex-tests for r'^\d{8}_\d{6}_' (with a
trailing underscore) to detect the new/mid format. A bare
timestamp fails the regex, so it fell through to the legacy
parser ('scan_userLabel_TS') and looked for files that don't
exist. correctedPath came back empty -> Visualize buttons
disabled.
New filename: {scan_id}_{subject_id}_corrected.csv (falls back to
the old {scan_id}_corrected.csv when subject_id is empty, e.g. for
calibration scans where the subject_id is a placeholder).
Old corrected CSVs already on disk are unaffected; they'll still
appear in the dropdown but with no user label and disabled
Visualize buttons. Rename them to include the subject_id if you
want them to show up properly.
74a9521 added subject_id to the corrected CSV filename but kept the _corrected suffix that issue #44 (commit 71bee4c, May 2) had explicitly dropped. The canonical naming is: Corrected: {scan_id}_{subject_id}.csv Raw histo: {scan_id}_{subject_id}_{side}_mask{XX}_raw.csv Telemetry: {scan_id}_{subject_id}_telemetry.csv The _raw suffix on histogram CSVs disambiguates them from the canonical corrected output, so the corrected stream — which is the default product of the pipeline — doesn't need its own suffix. Updated 5 globs in tests + scripts that were looking for '*corrected*.csv' to use the negation pattern instead: [p for p in out.glob('*.csv') if not p.name.endswith('_raw.csv')]
The pipeline cutover replaced the callback-based sink chain
(start_scan(on_*_fn=...) with caller-supplied callbacks + a chained
ScanDBSink wrapper) with channel-subscribed sinks under
omotion/pipeline/sinks.py. The old plumbing has been dead since the
cutover landed but the files + supporting code remained, creating
two coexisting sink modules with overlapping class names. This
commit removes the dead path so reviewers only see one sink
hierarchy.
Removed:
omotion/Sink.py (104 LOC) — old Sink base class
omotion/CsvSink.py (455 LOC) — old callback CSV sink
omotion/ScanDBSink.py (241 LOC) — old callback DB sink
omotion/MotionInterface.py:
- _wrap_kwargs_with_db_sink method (~140 LOC) — never called
- active_db_session_id property + _active_db_sink attr — set by
the dead wrap method, read by nothing
- associated docstring + ScanDBSink import path
omotion/ScanWorkflow.py:
- 'from omotion.CsvSink import CsvSink' — vestigial import,
file body references the new pipeline CsvSink instead
omotion/__init__.py:
- re-exports of Sink/CsvSink/ScanDBSink (and __all__ entries)
tests/test_scan_db_sink.py — tested the deleted ScanDBSink.
Equivalent coverage exists at tests/test_pipeline/test_scan_db_sink.py.
Updated:
docs/ScanDatabase.md
- session_meta source reference now points at the pipeline sink
- lifecycle diagram tagged as legacy (full rewrite is a separate
follow-up; schema/semantics described elsewhere in the doc are
unchanged)
Behaviour: no functional change. New pipeline sinks were already
the active write path. bloodflow-app never read active_db_session_id
(it writes notes to a sidecar .txt file, not the DB).
Tests: 293 passed, 37 skipped, 136 deselected (was 306; the 13-test
drop is from deleting tests/test_scan_db_sink.py, covered by
tests/test_pipeline/test_scan_db_sink.py).
Structured review plan for the cutover landing: three buckets (data pipeline, data sinks, ancillary), per-bucket code-review checklists + automated test commands + manual bench scenarios. Pre-merge gates and the staged next-next -> next merge plan are at the bottom. Lives at docs/superpowers/plans/ alongside the other planning docs for this project so future reviewers / auditors can trace what was verified before merge.
Pipeline cutover (stage to next-next)
These are internal research / planning materials that don't belong in
the public-facing SDK repo. Same content now lives in
Projects/motion-analysis/ for internal use.
Removed:
data-processing/dark-drift-study/ (19 files — plots, sims, findings)
data-processing/pdc-investigation/ (1 file — findings.md)
pdc-investigation/ (9 files — plots, scripts, FINDINGS.md)
docs/superpowers/ (17 files — all specs + plans)
Kept (still useful in the SDK):
data-processing/*.py (analyze_camera_analytics, check_csv,
compare_pipelines, plot_*, etc. —
pre-existed on origin/next)
scripts/plot_dark_drift.py (standalone diagnostic — useful for
any user inspecting a scan's dark
frame stability, not just the study)
No active code referenced the removed paths. Tests: 293 passed.
Fix prose references that used 'OpenMotion' or 'OpenMOTION' instead of the canonical hyphenated brand spelling. Affected: docs/Architecture.md — title + overview docs/TestPlan.md — title + overview docs/ScanDatabase.md — intro line + app description docs/MOTION_Interface_API.md — sample print() line README.md — repo overview AGENTS.md — repo description Untouched (technical identifiers — leave as-is): omotion package name, OPENMOTION_DEMO env var, repo names, class names (MotionInterface etc.), filenames. Note: class-name references in docs (MOTIONInterface, MOTIONConsole, MOTIONUart) are also miscapitalized — the actual classes are MotionInterface / MotionConsole / MotionUart. That's a separate hygiene issue not addressed here.
Architecture.md was describing an old connection model:
- DualMotionComposite (deleted from omotion/ — replaced by
ConnectionMonitor + per-sensor MotionSensor state machines)
- Class names with all-caps MOTION* prefix (actual classes are
MotionInterface / MotionConsole / MotionSensor / MotionUart /
MotionSignal)
This rewrites the affected sections to match the current code:
- Top-level architecture diagram (MotionInterface composes the
three stable handles + a single ConnectionMonitor daemon thread)
- Device-abstraction section (MotionConsole / MotionSensor /
ConnectionMonitor / MotionInterface paragraphs)
- Signal + event flow diagram (driven by ConnectionMonitor's event
queue, no more DualMotionComposite)
- Demo mode paragraph (MotionInterface(demo_mode=True) /
OPENMOTION_DEMO=1, no more 'DualMotionComposite accepts demo_mode')
- Error handling table, threading model table (Mot prefix casing)
Also rewrote the Packet structures section per a review pass:
- Dropped the I2C packet rows (not useful for an architecture doc)
- Expanded the UART packet row into a labelled wire-format block
with field semantics + max payload sizes (console vs sensor)
- Added the histogram packet wire format from MotionProcessing.py
(header / optional timestamp / per-camera blocks of 4103B /
footer + CRC; TYPE_HISTO_CMP variant with payload + transport
CRC; EXPECTED_HISTOGRAM_SUM validation)
docs/MOTION_Interface_API.md — one Notes bullet that referenced the
removed DualMotionComposite; rewrote to describe the actual
ConnectionMonitor-driven flow.
tests/test_comm_paths.py — deleted test_left_right_port_assignment
which referenced motion._dual_composite, an attribute that no longer
exists. Port assignment is now an internal ConnectionMonitor concern
(see connection_monitor.py). The other test in the same section
(test_dual_sensor_independent_pings) uses the standard sensor_left /
sensor_right fixtures and is unaffected — kept.
A genuine laser-off dark frame should be within a few DN of the sensor pedestal — the 30 DN default was loose enough to silently accept frames where the laser was still partly on. Drop to 5 DN. DarkIntegrityGuard.max_above_pedestal: 30.0 → 5.0 DarkCorrectionStage.integrity_max_above_pedestal: 30.0 → 5.0 Tests: - test_dark_integrity_guard tests pass explicit thresholds (30.0, 50.0) so they're unaffected. - Three terminal-flush tests in test_dark_correction_stage used synthetic 'dark' frames with u1=80/85/90 (16-26 DN above pedestal=64) that were dark enough under the loose 30 DN gate but not under 5 DN. Updated to u1=65/66 (1-2 DN above pedestal) so they read as realistic darks. One assertion (light.mean == 415.0) was numerically coupled to the old dark values; recomputed for the new ones (= 434.5). docs/SciencePipeline.md updated to match (§5.7.1 prose + defaults table). Tests: 293 passed.
The three files documented an API that no longer exists:
docs/MOTION_Interface_API.md — module path 'omotion.Interface'
(actual: omotion.MotionInterface),
class 'MOTIONInterface' (actual:
MotionInterface), attributes
console_module + sensors[] (actual:
console + left + right), async API
start_monitoring/stop_monitoring
(gone, replaced by start/stop +
ConnectionMonitor), static factory
acquire_motion_interface (gone),
get_camera_histogram (replaced by
get_single_histogram with different
signature), PyQt5 examples (PyQt6
only).
docs/MOTION_Console_API.md — same era, same disease.
docs/MOTION_Sensor_API.md — same.
Every code example in these docs would fail at the import line. No
inbound references from any other doc or code. Source-of-truth is
the docstrings in MotionInterface.py / MotionConsole.py /
MotionSensor.py + the high-level Architecture.md.
Also removed the matching rows from CLAUDE.md's 'Existing in-repo
docs' table and the 'see MOTION_Console_API.md' nudge in the
Gotchas section.
display_mean (= max(0, mean_raw - pedestal)) is the right quantity
for measuring ambient light on dark frames — baseline is the
zero-light pedestal, so any non-zero value is stray light leaking
onto the sensor. But for light frames it includes the dark baseline:
display_mean = signal + dark, which isn't what the POOR_CONTACT
threshold should test against.
_ContactQualitySink now reads:
- Dark frames: display_mean (unchanged — AMBIENT_LIGHT gate)
- Light frames: mean_dc_rt (= mean_raw - predicted_dark_baseline
from DarkCorrectionStage). Actual laser-driven signal strength
above the just-measured dark, not signal + dark.
Early light frames before the first dark observation have
mean_dc_rt = NaN (predictor returns None). The existing isfinite
check skips them; the rolling window fills up once the first dark
lands.
Test fixture _FakeFrameBatch now carries both arrays (seeded with
the same DN value) so existing tests stay valid.
SciencePipeline.md §11.2 + §5.6 + §5.11 + §6 (channels table)
updated to describe the dual-signal reading + the two-pass
refinement pattern.
…gain, t0 normalization fix
Pipeline simplification:
- Remove unused RollingAverageStage + Tee("rolling") + bfi_rolling/bvi_rolling fields. CQ rolling-mean preserved (independent, lives in _ContactQualitySink).
- Remove QtUiSink stub; the UI sinks (_LivePlotSink + _FinalBatchSink) live in the bloodflow-app now.
- MotionProcessing.py tier A: drop dead HISTO_BINS / HISTO_BINS_SQ / CAMERA_GAIN_MAP / ADC_GAIN re-exports (zero live consumers); update docstring.
Telemetry CSV restored:
- Wire ConsoleTelemetryPoller into ScanWorkflow via _TelemetryCsvWriter listener. The feature was dead post-cutover (ScanRequest.write_telemetry_csv, _TELEMETRY_HEADERS, _snap_to_row, ScanResult.telemetry_path all unwired). New scans now produce {scan_id}_{subject_id}_telemetry.csv alongside the existing raw / corrected outputs.
Per-side ADC gain:
- DarkCorrectionStage and ShotNoiseCorrectionStage now take SensorPedestals and derive per-side adc_gain via adc_gain_for_pedestal((1024 - p) / 11_000). The previous (1024 - 64) / 11_000 hardcode applied legacy-firmware gain to current-firmware pedestal=128 sensors. Mixed-firmware modules now get the right gain on each side.
- Move CAMERA_GAIN_MAP, HISTO_BINS, HISTO_BINS_SQ to omotion/config.py. Add adc_gain_for_pedestal() to omotion/pipeline/pedestal.py.
t0 normalization bug fix:
- LiveUsbSource.parse_histogram_stream was passed t0_normalizer=getattr(self, "_t0_normalize", None) but no _t0_normalize method existed. Silent fallback to None meant batch.timestamp_s carried absolute firmware TIM5 counter values (since-boot, modulo ~43000 s rollover) all the way through to corrected.csv and raw.csv. Post-scan plots showed time axes starting at e.g. 1182 s instead of 0.
- Add _t0_normalize scalar method to _BaseSource that shares self._t0 with the existing array-based _apply_timestamp_normalization. Replace getattr with direct call so future regressions ImportError loudly.
Dead-code cleanup:
- Delete 3 sensor-marked tests that ImportErrored on the long-gone create_science_pipeline (test_sequences.py:test_streaming_acquisition, test_dual_sensor_frame_alignment; test_comm_paths.py:test_stream_interface_no_data_loss).
- Delete data-processing/compare_pipelines.py (compared old vs new pipeline; the old pipeline is gone).
- Delete 4 scripts that duplicated bytes_to_integers and used the dead omotion.Interface import (camera_tester, test_receive_frame, test_receive_multi_frame, test_stream_all_cameras).
- Other Interface-era scripts left for a separate triage pass.
Docs: SciencePipeline.md (channels, ADC_GAIN explanation, UI sink split, removed §5.12/§5.13/§8.3 stale sections), Architecture.md (channels table, ASCII diagram, file table), CLAUDE.md (don't cite the broken flash_sensors.py script as canonical).
Tests: 287 software tests pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…se positive Odometer: - Add three console controller opcodes (omotion/config.py): OW_CTRL_GET_SYSTEM_ODO = 0x26 (returns uint32 LE: minutes of console uptime) OW_CTRL_GET_LASER_ODO = 0x27 (returns uint32 LE: cumulative LSYNC pulses) OW_CTRL_RESET_ODO = 0x28 (1-byte target: 0=system, 1=laser, 2=both) - Add MotionConsole.get_system_odometer_minutes / get_laser_odometer_pulses / reset_odometer. NAK-tolerant: builds that predate the feature return None (or False for reset) instead of raising. - Extend MotionConsole.log_device_info to log the odometer at every console connect (called from MotionInterface.log_console_info). On firmware that doesn't have the opcodes, the line is silently omitted (no scary warning). - New scripts/check_odometer.py — CLI to read (and optionally reset) the odometer. Pattern mirrors enter_dfu.py. Calibration cancel-flag fix: - ScanWorkflow.last_scan_canceled was reading the same Event (_stop_evt) that the worker's inner finally pulses on every clean exit to wake the duration guard. Result: every scan completion was marked canceled. Nothing caught it until CalibrationWorkflow._run_subscan_capture got the "if canceled: return empty" path on the way to a (clean) calibration scan — calibration aborted with "active camera (left, cam=1) produced no corrected samples" while the pipeline was actually producing thousands of valid samples. - Add an explicit self._cancel_requested flag, set only by cancel() and the mid-scan disconnect handler. Read that for last_scan_canceled instead of _stop_evt. Tests pass; lifecycle semantics unchanged for actual cancel paths. Tests: 143 passing. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
The final-review PR for the pipeline-cutover landing.
next-nextwas staged via #57; this PR moves the full set intonext.Same diff content as #57, fresh review eyes for code-quality assessment.
Scope (138 commits ahead of next)
Three review buckets from the review/test plan at
docs/superpowers/plans/2026-05-26-pipeline-cutover-review-and-test-plan.md:A — Data pipeline rework. New
omotion/pipeline/package (Source / Stage / Sink / Runner / FrameBatch) replaces the legacyMotionProcessing.SciencePipelineclass.MotionProcessing.pycollapsed 1941 → 752 LOC (parsing + dataclasses).ScanWorkflow.pyrewritten around the pipeline.StreamInterface.pyorphan-stream recovery + boundedput. Doc rewrites:SciencePipeline.md(777 lines),Architecture.md.B — Data sinks rework. Sink-based API (
ScanRequest(sinks=[...])with auto-injection of defaultCsvSink+ScanDBSinkbased onMotionInterface(data_dir=…, scan_db_path=…, operator_id=…)). NewContactQualityWorkflow(DN-scale CQ).CalibrationWorkflowreshaped around a sink for sample collection. Oldomotion/Sink.py/CsvSink.py/ScanDBSink.pydeleted (commit8e6cd0f).C — Ancillary.
omotion/console_telemetry_conversions.py+ RT lookup table (TEC ADC→°C/V/A moved into SDK). Issue #92 (ScanDatabase, SessionPlayback). Dark-drift study + PDC investigation research artifacts.AGENTS.md+CLAUDE.md.Tests
293 passed, 37 skipped, 136 deselected.
Bench-verified
dark_interval) produce corrected CSVs covering ~97% of the scan via terminal-dark flush.Companion PRs
next-next → nextflow oncenext-nextis set up there)🤖 Generated with Claude Code