Skip to content

feat(labels): newsworthy event labeling pipeline with two-annotator workflow#4

Merged
Mathews-Tom merged 11 commits into
mainfrom
feat/event-labeling-pipeline
Apr 17, 2026
Merged

feat(labels): newsworthy event labeling pipeline with two-annotator workflow#4
Mathews-Tom merged 11 commits into
mainfrom
feat/event-labeling-pipeline

Conversation

@Mathews-Tom

@Mathews-Tom Mathews-Tom commented Apr 17, 2026

Copy link
Copy Markdown
Owner

Summary

Ships the full labeling pipeline that converts calibration placeholders into empirical reliability curves. Four wire-service adapters, a click-driven annotator CLI, an append-only partitioned Parquet corpus, inter-annotator agreement metrics, the two-annotator workflow enforcer, and the signal-to-event join that produces TP/FP labels for the Phase-1 calibration consumers. After merge, a labeler can drive real candidates end-to-end; the nightly calibration job consumes the resulting labels.

What Changed

  • Models: Pydantic contracts for NewsworthyEvent, EventCandidate, SourcePublication, QualifyingSource, LabelDecision, AnnotatorIdentity, and AgreementReport. Closed source_id enum across storage, workflow, and adapters.
  • Sources: AbstractSourceAdapter with ReutersAdapter, BloombergAdapter, ApAdapter, FtAdapter concrete implementations. Shared exponential-backoff via request_with_backoff. Auth via env vars; missing credentials fail loud at construction.
  • Storage: AppendOnlyParquetWriter with per-date partitioning and filelock-based safety; LabelReader with partition pruning; frozen pyarrow schema matching docs/methodology/labeling-protocol.md §Storage Schema.
  • Workflow: CandidateQueue substrate and WorkflowEnforcer.can_promote / promotion_warnings enforcing the two-annotator gate (distinct annotators, existence agreement, 5-min timestamp hard fail, strictly-positive market Jaccard).
  • Agreement: Cohen's kappa, 60-second timestamp agreement, and mean market Jaccard metrics. compute_agreement pairs decisions by candidate_id and checks the four targets.
  • Join: join_signals_to_events implements the TP/FP criteria: market match plus lead time in (0, 24h]. Multiple events per market match earliest-qualifying; non-labeled statuses excluded.
  • CLI: augur-label click commands (candidates, inspect, decide, promote, correct, coverage). Queue state persists to labels/queue.json; promoted events write to the parquet corpus.

How It Works

The canonical labeling protocol in docs/methodology/labeling-protocol.md drives every decision. Source qualification requires two publishers among {reuters, bloomberg, ap, ft} within 24 h; the earliest qualifying publication sets ground_truth_timestamp. Annotators record decisions via augur-label decide; augur-label promote invokes the workflow enforcer. Labeled events land in partitioned parquet and are consumed by join_signals_to_events alongside MarketSignal rows from DuckDB to produce SignalLabel inputs for the Phase-1 EmpiricalFPR and ReliabilityAnalyzer modules.

Configuration Added

  • config/labeling.toml — source rate limits and credential env var names; workflow thresholds; storage paths; join windows. Mirrors docs/methodology/labeling-protocol.md §Inter-Annotator Agreement and phase-2 §11.

Schema Changes

Labeled-corpus parquet schema is frozen at label_protocol_version = "1.0"; _schema.py codifies the column set. No JSON-schema exports are added because the labeled corpus is a storage schema, not a consumer-facing Pydantic model registered in scripts/export_schemas.py.

Quality Gates

  • uv run ruff check . clean
  • uv run ruff format --check . clean
  • uv run mypy --strict src/ clean (90 source files)
  • uv run pytest --cov-fail-under=80 passes; 177 tests, 87.6 % total coverage
  • LLM-import guard passes
  • Schema export in sync (uv run python scripts/export_schemas.py --check)
  • Forbidden-token doc lint passes
  • datetime.now() AST guard passes
  • uv run pre-commit run --all-files passes
  • CI workflow runs green on this PR (pending remote run)

Definition of Done

  • Files in the target module layout exist; strict lint, format, and type checks clean.
  • Four source adapters present with auth, retry, and parsing paths; per-source credential enforcement tested.
  • CLI commands in the canonical command set implemented (discover intentionally deferred — see Deferred Findings).
  • Two-annotator workflow enforcement: single-annotator promotions rejected by tests.
  • Inter-annotator agreement metrics implemented; AgreementReport surfaces via compute_agreement.
  • Parquet writer preserves concurrent appends via per-partition file locking; supersede tested.
  • Signal-to-event join produces correct TP/FP labels on ten synthetic cases including every boundary.
  • Phase-1 calibration consumers can accept SignalLabel output (contract matches compute_empirical_fpr).
  • CHANGELOG updated with the pipeline's added surface.
  • Test coverage meets the 80 % overall threshold; join, workflow, and parquet writer are exercised broadly.
  • Operational follow-up (post-merge): a real labeler runs augur-label discover and augur-label decide against real candidates. Deferred because discover requires live source-API credentials; no credentials are provisioned in this environment.

Operational Handoff

After merge the labeling pipeline is ready for double labeling once source credentials are provisioned. Annotators run augur-label against real candidates; the scripts/calibrate.py nightly job (from Phase 1) consumes join_signals_to_events output and rebuilds empirical FPR records and reliability curves. First 90 days require two annotators per candidate; agreement metrics surface via augur-label agreement (deferred until live corpus exists; the primitive is ready).

Deferred Findings

  • augur-label discover is not wired in this PR because source adapters require API credentials absent from the development environment. The four adapters are present and tested against credential-absent paths. Wiring discover is a follow-up once credentials are provisioned.
  • augur-label agreement command surface was not exposed in the CLI in this PR; compute_agreement is the callable primitive ready for the CLI wiring.

Test Plan

  • uv run pytest passes locally (177 tests).
  • uv run python scripts/export_schemas.py --check passes locally.
  • uv run python scripts/lint_detector_now.py passes locally.
  • uv run pre-commit run --all-files passes locally.
  • CI workflow runs green on this PR.
  • Operational: once credentials are provisioned, run augur-label discover --since <date> and confirm candidates appear in labels/queue.json.

Review Pass

  • pr-review findings addressed: 2 CRITICAL + 3 HIGH + 4 MEDIUM/LOW fixed in 01d241d fix(labels); 2 scale-ceiling observations documented in ab49e93 refactor(labels).
  • code-refiner simplifications applied: none beyond the inline cleanup already in the fix commit.

Deferred Findings

  • H2 / M4: parquet writer O(n²) append under partition lock and supersede's sequential scan are documented operational ceilings in the module docstring. Sibling-file layout via pq.ParquetDataset plus an event_id -> partition_date index is the migration path once dense-labeling volume approaches the lock-timeout threshold.
  • M3: CandidateQueue.pending() returns candidates with fewer than two decisions; "both annotators said no" is surfaceable via all_decisions but has no dedicated query. Deferred until the CLI exposes a resolved-rejected view.
  • L3 / L4: AbstractSourceAdapter and QualifyingSource are retained; the protocol has four concrete implementations and the Pydantic model ships for schema-export parity once discover lands.

…ig surface

augur-labels needs click for the annotator CLI and filelock for the
append-only parquet writer's per-partition locking. Declare both in
the workspace member's pyproject and regenerate uv.lock. Also depend
on augur-signals so the labeling package can import MarketSignal for
the signal-to-event join without duplicating the model.

config/labeling.toml mirrors the per-source credentials, workflow
thresholds, storage paths, and join parameters from
docs/methodology/labeling-protocol.md. LabelingConfig composes the
per-block Pydantic sub-models with frozen, extra=forbid, and bounded
numeric fields so a malformed config fails at startup rather than
silently coercing. Credential env var names — not the secrets
themselves — are stored in the config; the adapters read the secret
from os.environ at startup.
…tions

Five frozen Pydantic models mirror the schemas in
docs/methodology/labeling-protocol.md:

QualifyingSource and SourcePublication enforce the closed source_id
literal set (reuters, bloomberg, ap, ft) so every downstream consumer —
the workflow enforcer, the storage schema, the join — operates on the
same tag set. Adding a source requires a protocol-version bump per
the labeling-protocol doc.

EventCandidate holds the intermediate state before two annotators
agree; NewsworthyEvent is the labeled output the calibration layer
consumes through the signal-to-event join. The status literal is
closed at {labeled, candidate, superseded, rejected} — the corpus is
append-only and corrections produce new rows with a corrects
back-reference, not in-place mutation.

LabelDecision records one annotator's call on one candidate. The
qualifies-specific fields (timestamp, market_ids, category) are
optional on the model; the workflow enforcer validates them at
promotion time so an annotator can record "does not qualify" without
fabricating event metadata.

AgreementReport structures the four-metric agreement summary for CLI
and CI consumption.
…mentations

AbstractSourceAdapter is the uniform interface every source adapter
implements. The annotator CLI's discover command iterates over the
adapters registered per config/labeling.toml and merges their output
into candidate publications without caring about source-specific
request shapes or auth flows.

Each concrete adapter — ReutersAdapter, BloombergAdapter, ApAdapter,
FtAdapter — reads its credentials from the env vars documented in
phase-2 §5.2 and fails loud at construction when required credentials
are missing, rather than silently returning an empty list on first
call. The FT adapter is the exception: without an API key it returns
an empty list and health_check False, matching the doc's RSS-fallback
semantics.

sources/_http.py provides request_with_backoff, a parametrized
exponential-backoff helper (1 s initial, 60 s cap, 5 retries) that
every adapter routes its HTTP calls through so retry semantics stay
consistent across sources. Tests cover the happy path, the exhaustion
path with injectable sleep, and the per-adapter credential-enforcement
behavior.
…nd file locking

The labeled corpus lives at labels/newsworthy_events/date=YYYY-MM-DD/
events.parquet. AppendOnlyParquetWriter groups incoming events by the
date of their ground_truth_timestamp, acquires a filelock on the
partition's lock file, reads the existing parquet (if any), appends
the new rows, and atomically replaces the target via a
staging-file-plus-rename. Concurrent annotator processes can call
append() safely without corrupting the file.

supersede() implements the protocol's correction path from
docs/methodology/labeling-protocol.md §Annotator Protocol: it finds
the partition containing event_id, flips status to superseded, sets
the corrects back-reference, and rewrites the partition in place
(still under the filelock). KeyError is raised when the event_id is
absent, preventing silent no-op corrections.

LabelReader uses partition-name-range pruning: only date=YYYY-MM-DD
partitions in the requested window are opened, so the window query
does not scan the full archive. events_for_market and
coverage_by_category are built on top of events_in_window.

_schema.py freezes the pyarrow schema matching the table in
docs/methodology/labeling-protocol.md §Storage Schema verbatim.
Schema changes require a label_protocol_version bump.

mypy's disallow_any_unimported is relaxed for the three storage
modules because pyarrow has no published type stubs. Every other
invariant (strict everywhere else, Pydantic models round-tripped
through the pipeline) still holds.
compute_agreement is the primitive the workflow enforcer and the
agreement CLI command both consume. It pairs two annotators'
decisions by candidate_id — decisions on candidates only one
annotator reviewed are excluded — and computes four metrics against
the targets in docs/methodology/labeling-protocol.md §Inter-Annotator
Agreement.

Event-existence kappa and category-assignment kappa use Cohen's
kappa: observed agreement minus chance-expected agreement, scaled by
(1 - expected). When the population is perfectly concentrated on one
label (so expected == 1.0) the function returns 1.0 instead of
dividing by zero.

Timestamp agreement counts the fraction of paired qualifying
decisions whose timestamps fall within the 60-second window.
Market-association Jaccard averages |A ∩ B| / |A ∪ B| across paired
qualifying decisions; empty-set pairs count as 1.0 by convention
(both annotators agreed on zero associations).

meets_targets is True only when every metric meets or exceeds its
threshold (0.95 event kappa, 0.90 timestamp, 0.85 market Jaccard,
0.90 category kappa). The report is also the payload the nightly
double-labeling CI job consumes to detect regressions.

Six tests cover perfect agreement, existence disagreement, timestamp
boundary conditions (at and above 60 s), partial market Jaccard, and
the empty-pair base case.
CandidateQueue is the in-memory substrate the CLI operates on. It
rejects a second decision from the same annotator on the same
candidate and pre-enqueues candidates without duplicates. The real
deployment backs the queue with the parquet corpus; this abstraction
lets tests and the workflow enforcer run against a minimal surface.

WorkflowEnforcer.can_promote implements the protocol gate from
docs/methodology/labeling-protocol.md §Annotator Protocol. Promotion
requires (1) two distinct annotators have decided, (2) both qualifying
decisions agree on event existence, (3) the timestamp span across
qualifying decisions is below the hard-fail threshold
(default 5 min = 300 s), and (4) the market-association Jaccard is
strictly above the hard-fail floor (default 0.0). The reason string
surfaces to the CLI so annotators see exactly why promotion was
refused.

promotion_warnings returns soft-warning messages below target but
above hard-fail thresholds (timestamp span above 60 s, Jaccard below
0.85). These are advisory; the CLI prints them alongside allowed
promotions so the operator can escalate to a third annotator.

Eight tests cover every branch: no decisions, single annotator,
existence disagreement, happy path, timestamp hard-fail, zero-Jaccard
hard-fail, warning-only case, and double-decision rejection by the
queue.
join_signals_to_events is the load-bearing algorithm the calibration
layer consumes: for every MarketSignal, it returns a SignalLabel
classifying the signal as true_positive, false_positive, or
true_negative against the labeled event corpus.

The true-positive rule comes verbatim from
docs/methodology/labeling-protocol.md §True Positive Criteria:

  TP iff signal.market_id in event.market_ids
         AND lead_time in (0, 24h]

where lead_time = event.ground_truth_timestamp - signal.detected_at.
Negative lead time (signal after event) and zero lead time (same
instant) are explicitly false positives. Multiple events on the same
market within the window match against the earliest qualifying one,
matching the protocol's earliest-qualifying-publication preference.

Events in any non-labeled status (candidate, superseded, rejected)
are excluded from the join; only committed labels participate in
calibration.

now is a required parameter so every SignalLabel's labeled_at is
deterministic across backtest replays, matching the pipeline-wide
now-as-parameter invariant.

Ten tests cover every branch: happy-path TP, missing event,
out-of-window event, event before signal, earliest-match selection
across multiple events, candidate and superseded status filtering,
market_id mismatch, empty input, and the boundary conditions at
lead_time = 0 (FP) and lead_time = 24h (TP).
…coverage commands

augur-label is the click-driven CLI annotators use to record decisions
on candidates and promote qualifying candidates into the labeled
parquet corpus. The CLI state lives in labels/queue.json (configurable
via --queue-file); the labeled corpus is partitioned parquet at the
path in config.storage.labels_root. A production deployment will
replace the JSON queue with a persistent backend, but the surface the
workflow enforcer operates on (CandidateQueue) stays the same.

Commands:
  - candidates: list pending candidates
  - inspect <id>: show publications and suggested markets for a candidate
  - decide <id>: record one annotator's decision with timestamp,
    market_ids, category, notes
  - promote <id>: invoke the workflow enforcer; emit a NewsworthyEvent
    to the parquet corpus on approval; surface soft warnings
  - correct <event_id>: mark an existing event as superseded
  - coverage: print labeled-event counts per category

CandidateQueue gains ``all_candidates()`` and ``all_decisions()`` so
the CLI's queue-file persistence no longer reaches into private
dictionaries.

scripts/label.py (previously a NotImplementedError stub) now delegates
to the click CLI so ``python scripts/label.py --help`` and the
downstream augur-label command share a single implementation.

Six CLI tests exercise the public surface against a temporary queue
file and labels_root: listing, inspect (existing and missing),
decide-persist, promote-refused-on-single-annotator, and the
promote-writes-event happy path.
Enforce the ≥2-distinct-qualifying-publishers rule from
docs/methodology/labeling-protocol.md §Definition-of-a-Newsworthy-Event.
_compose_event now raises InsufficientSourcesError when the candidate
lacks two distinct source_ids among its publications, and the CLI's
promote command surfaces the error with a non-zero exit code. Without
this gate a candidate with two publications from the same publisher
could reach the labeled corpus after two annotators agreed.

Ground-truth timestamp and headline now come from the earliest
qualifying publication per §Ground-Truth Timestamp Rule, rather than
from min(annotator timestamps) and candidate.publications[0] (which
assumed ordering the adapter never guarantees). source_urls and
source_publishers are deduplicated in earliest-first order so the
labeled record surfaces distinct sources only.

_protocol.py centralizes LABEL_PROTOCOL_VERSION and
MIN_DISTINCT_QUALIFYING_SOURCES so a version bump lands in one place
instead of the CLI and the join separately.

Bloomberg adapter retries on 401 now re-enter _ensure_token inside
the _call closure, so a retry after self._token is invalidated picks
up the freshly-issued credential rather than looping against the
stale captured variable until the backoff budget exhausts.

_cohens_kappa raises ValueError on length mismatch so a data-integrity
bug in the caller fails loud instead of masquerading as low kappa.

AgreementReport gains unpaired_count so consumers see how many
candidates one annotator reviewed without the other — a kappa of 0.95
on 40 of 100 candidates is no longer silently dropped from the
denominator.

Signal-to-event join now uses timedelta comparison directly rather
than float-seconds round-trip, removing sub-microsecond ambiguity at
the lead_window boundary. join_signals_to_events and _compose_event
both read label_protocol_version from the shared constant.

FT adapter logs a WARNING when discover proceeds without an FT API
key rather than silently returning empty results.
The current read-modify-write approach under a per-partition lock is
O(n²) I/O per append. Document the ceiling (several hundred events
per day before lock contention) and the migration path (sibling-file
layout read via pq.ParquetDataset, plus an event_id -> partition_date
index so supersede can target the correct partition directly).

No behavior change; this is a note so future operators and reviewers
see the known scaling limits without having to rediscover them.
@Mathews-Tom Mathews-Tom merged commit e662904 into main Apr 17, 2026
2 checks passed
@Mathews-Tom Mathews-Tom deleted the feat/event-labeling-pipeline branch April 17, 2026 08:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant