feat(labels): newsworthy event labeling pipeline with two-annotator workflow#4
Merged
Conversation
…ig surface augur-labels needs click for the annotator CLI and filelock for the append-only parquet writer's per-partition locking. Declare both in the workspace member's pyproject and regenerate uv.lock. Also depend on augur-signals so the labeling package can import MarketSignal for the signal-to-event join without duplicating the model. config/labeling.toml mirrors the per-source credentials, workflow thresholds, storage paths, and join parameters from docs/methodology/labeling-protocol.md. LabelingConfig composes the per-block Pydantic sub-models with frozen, extra=forbid, and bounded numeric fields so a malformed config fails at startup rather than silently coercing. Credential env var names — not the secrets themselves — are stored in the config; the adapters read the secret from os.environ at startup.
…tions
Five frozen Pydantic models mirror the schemas in
docs/methodology/labeling-protocol.md:
QualifyingSource and SourcePublication enforce the closed source_id
literal set (reuters, bloomberg, ap, ft) so every downstream consumer —
the workflow enforcer, the storage schema, the join — operates on the
same tag set. Adding a source requires a protocol-version bump per
the labeling-protocol doc.
EventCandidate holds the intermediate state before two annotators
agree; NewsworthyEvent is the labeled output the calibration layer
consumes through the signal-to-event join. The status literal is
closed at {labeled, candidate, superseded, rejected} — the corpus is
append-only and corrections produce new rows with a corrects
back-reference, not in-place mutation.
LabelDecision records one annotator's call on one candidate. The
qualifies-specific fields (timestamp, market_ids, category) are
optional on the model; the workflow enforcer validates them at
promotion time so an annotator can record "does not qualify" without
fabricating event metadata.
AgreementReport structures the four-metric agreement summary for CLI
and CI consumption.
…mentations AbstractSourceAdapter is the uniform interface every source adapter implements. The annotator CLI's discover command iterates over the adapters registered per config/labeling.toml and merges their output into candidate publications without caring about source-specific request shapes or auth flows. Each concrete adapter — ReutersAdapter, BloombergAdapter, ApAdapter, FtAdapter — reads its credentials from the env vars documented in phase-2 §5.2 and fails loud at construction when required credentials are missing, rather than silently returning an empty list on first call. The FT adapter is the exception: without an API key it returns an empty list and health_check False, matching the doc's RSS-fallback semantics. sources/_http.py provides request_with_backoff, a parametrized exponential-backoff helper (1 s initial, 60 s cap, 5 retries) that every adapter routes its HTTP calls through so retry semantics stay consistent across sources. Tests cover the happy path, the exhaustion path with injectable sleep, and the per-adapter credential-enforcement behavior.
…nd file locking The labeled corpus lives at labels/newsworthy_events/date=YYYY-MM-DD/ events.parquet. AppendOnlyParquetWriter groups incoming events by the date of their ground_truth_timestamp, acquires a filelock on the partition's lock file, reads the existing parquet (if any), appends the new rows, and atomically replaces the target via a staging-file-plus-rename. Concurrent annotator processes can call append() safely without corrupting the file. supersede() implements the protocol's correction path from docs/methodology/labeling-protocol.md §Annotator Protocol: it finds the partition containing event_id, flips status to superseded, sets the corrects back-reference, and rewrites the partition in place (still under the filelock). KeyError is raised when the event_id is absent, preventing silent no-op corrections. LabelReader uses partition-name-range pruning: only date=YYYY-MM-DD partitions in the requested window are opened, so the window query does not scan the full archive. events_for_market and coverage_by_category are built on top of events_in_window. _schema.py freezes the pyarrow schema matching the table in docs/methodology/labeling-protocol.md §Storage Schema verbatim. Schema changes require a label_protocol_version bump. mypy's disallow_any_unimported is relaxed for the three storage modules because pyarrow has no published type stubs. Every other invariant (strict everywhere else, Pydantic models round-tripped through the pipeline) still holds.
compute_agreement is the primitive the workflow enforcer and the agreement CLI command both consume. It pairs two annotators' decisions by candidate_id — decisions on candidates only one annotator reviewed are excluded — and computes four metrics against the targets in docs/methodology/labeling-protocol.md §Inter-Annotator Agreement. Event-existence kappa and category-assignment kappa use Cohen's kappa: observed agreement minus chance-expected agreement, scaled by (1 - expected). When the population is perfectly concentrated on one label (so expected == 1.0) the function returns 1.0 instead of dividing by zero. Timestamp agreement counts the fraction of paired qualifying decisions whose timestamps fall within the 60-second window. Market-association Jaccard averages |A ∩ B| / |A ∪ B| across paired qualifying decisions; empty-set pairs count as 1.0 by convention (both annotators agreed on zero associations). meets_targets is True only when every metric meets or exceeds its threshold (0.95 event kappa, 0.90 timestamp, 0.85 market Jaccard, 0.90 category kappa). The report is also the payload the nightly double-labeling CI job consumes to detect regressions. Six tests cover perfect agreement, existence disagreement, timestamp boundary conditions (at and above 60 s), partial market Jaccard, and the empty-pair base case.
CandidateQueue is the in-memory substrate the CLI operates on. It rejects a second decision from the same annotator on the same candidate and pre-enqueues candidates without duplicates. The real deployment backs the queue with the parquet corpus; this abstraction lets tests and the workflow enforcer run against a minimal surface. WorkflowEnforcer.can_promote implements the protocol gate from docs/methodology/labeling-protocol.md §Annotator Protocol. Promotion requires (1) two distinct annotators have decided, (2) both qualifying decisions agree on event existence, (3) the timestamp span across qualifying decisions is below the hard-fail threshold (default 5 min = 300 s), and (4) the market-association Jaccard is strictly above the hard-fail floor (default 0.0). The reason string surfaces to the CLI so annotators see exactly why promotion was refused. promotion_warnings returns soft-warning messages below target but above hard-fail thresholds (timestamp span above 60 s, Jaccard below 0.85). These are advisory; the CLI prints them alongside allowed promotions so the operator can escalate to a third annotator. Eight tests cover every branch: no decisions, single annotator, existence disagreement, happy path, timestamp hard-fail, zero-Jaccard hard-fail, warning-only case, and double-decision rejection by the queue.
join_signals_to_events is the load-bearing algorithm the calibration
layer consumes: for every MarketSignal, it returns a SignalLabel
classifying the signal as true_positive, false_positive, or
true_negative against the labeled event corpus.
The true-positive rule comes verbatim from
docs/methodology/labeling-protocol.md §True Positive Criteria:
TP iff signal.market_id in event.market_ids
AND lead_time in (0, 24h]
where lead_time = event.ground_truth_timestamp - signal.detected_at.
Negative lead time (signal after event) and zero lead time (same
instant) are explicitly false positives. Multiple events on the same
market within the window match against the earliest qualifying one,
matching the protocol's earliest-qualifying-publication preference.
Events in any non-labeled status (candidate, superseded, rejected)
are excluded from the join; only committed labels participate in
calibration.
now is a required parameter so every SignalLabel's labeled_at is
deterministic across backtest replays, matching the pipeline-wide
now-as-parameter invariant.
Ten tests cover every branch: happy-path TP, missing event,
out-of-window event, event before signal, earliest-match selection
across multiple events, candidate and superseded status filtering,
market_id mismatch, empty input, and the boundary conditions at
lead_time = 0 (FP) and lead_time = 24h (TP).
…coverage commands
augur-label is the click-driven CLI annotators use to record decisions
on candidates and promote qualifying candidates into the labeled
parquet corpus. The CLI state lives in labels/queue.json (configurable
via --queue-file); the labeled corpus is partitioned parquet at the
path in config.storage.labels_root. A production deployment will
replace the JSON queue with a persistent backend, but the surface the
workflow enforcer operates on (CandidateQueue) stays the same.
Commands:
- candidates: list pending candidates
- inspect <id>: show publications and suggested markets for a candidate
- decide <id>: record one annotator's decision with timestamp,
market_ids, category, notes
- promote <id>: invoke the workflow enforcer; emit a NewsworthyEvent
to the parquet corpus on approval; surface soft warnings
- correct <event_id>: mark an existing event as superseded
- coverage: print labeled-event counts per category
CandidateQueue gains ``all_candidates()`` and ``all_decisions()`` so
the CLI's queue-file persistence no longer reaches into private
dictionaries.
scripts/label.py (previously a NotImplementedError stub) now delegates
to the click CLI so ``python scripts/label.py --help`` and the
downstream augur-label command share a single implementation.
Six CLI tests exercise the public surface against a temporary queue
file and labels_root: listing, inspect (existing and missing),
decide-persist, promote-refused-on-single-annotator, and the
promote-writes-event happy path.
Enforce the ≥2-distinct-qualifying-publishers rule from docs/methodology/labeling-protocol.md §Definition-of-a-Newsworthy-Event. _compose_event now raises InsufficientSourcesError when the candidate lacks two distinct source_ids among its publications, and the CLI's promote command surfaces the error with a non-zero exit code. Without this gate a candidate with two publications from the same publisher could reach the labeled corpus after two annotators agreed. Ground-truth timestamp and headline now come from the earliest qualifying publication per §Ground-Truth Timestamp Rule, rather than from min(annotator timestamps) and candidate.publications[0] (which assumed ordering the adapter never guarantees). source_urls and source_publishers are deduplicated in earliest-first order so the labeled record surfaces distinct sources only. _protocol.py centralizes LABEL_PROTOCOL_VERSION and MIN_DISTINCT_QUALIFYING_SOURCES so a version bump lands in one place instead of the CLI and the join separately. Bloomberg adapter retries on 401 now re-enter _ensure_token inside the _call closure, so a retry after self._token is invalidated picks up the freshly-issued credential rather than looping against the stale captured variable until the backoff budget exhausts. _cohens_kappa raises ValueError on length mismatch so a data-integrity bug in the caller fails loud instead of masquerading as low kappa. AgreementReport gains unpaired_count so consumers see how many candidates one annotator reviewed without the other — a kappa of 0.95 on 40 of 100 candidates is no longer silently dropped from the denominator. Signal-to-event join now uses timedelta comparison directly rather than float-seconds round-trip, removing sub-microsecond ambiguity at the lead_window boundary. join_signals_to_events and _compose_event both read label_protocol_version from the shared constant. FT adapter logs a WARNING when discover proceeds without an FT API key rather than silently returning empty results.
The current read-modify-write approach under a per-partition lock is O(n²) I/O per append. Document the ceiling (several hundred events per day before lock contention) and the migration path (sibling-file layout read via pq.ParquetDataset, plus an event_id -> partition_date index so supersede can target the correct partition directly). No behavior change; this is a note so future operators and reviewers see the known scaling limits without having to rediscover them.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Ships the full labeling pipeline that converts calibration placeholders into empirical reliability curves. Four wire-service adapters, a click-driven annotator CLI, an append-only partitioned Parquet corpus, inter-annotator agreement metrics, the two-annotator workflow enforcer, and the signal-to-event join that produces TP/FP labels for the Phase-1 calibration consumers. After merge, a labeler can drive real candidates end-to-end; the nightly calibration job consumes the resulting labels.
What Changed
NewsworthyEvent,EventCandidate,SourcePublication,QualifyingSource,LabelDecision,AnnotatorIdentity, andAgreementReport. Closedsource_idenum across storage, workflow, and adapters.AbstractSourceAdapterwithReutersAdapter,BloombergAdapter,ApAdapter,FtAdapterconcrete implementations. Shared exponential-backoff viarequest_with_backoff. Auth via env vars; missing credentials fail loud at construction.AppendOnlyParquetWriterwith per-date partitioning andfilelock-based safety;LabelReaderwith partition pruning; frozen pyarrow schema matchingdocs/methodology/labeling-protocol.md §Storage Schema.CandidateQueuesubstrate andWorkflowEnforcer.can_promote/promotion_warningsenforcing the two-annotator gate (distinct annotators, existence agreement, 5-min timestamp hard fail, strictly-positive market Jaccard).compute_agreementpairs decisions bycandidate_idand checks the four targets.join_signals_to_eventsimplements the TP/FP criteria: market match plus lead time in (0, 24h]. Multiple events per market match earliest-qualifying; non-labeled statuses excluded.augur-labelclick commands (candidates,inspect,decide,promote,correct,coverage). Queue state persists tolabels/queue.json; promoted events write to the parquet corpus.How It Works
The canonical labeling protocol in
docs/methodology/labeling-protocol.mddrives every decision. Source qualification requires two publishers among {reuters, bloomberg, ap, ft} within 24 h; the earliest qualifying publication setsground_truth_timestamp. Annotators record decisions viaaugur-label decide;augur-label promoteinvokes the workflow enforcer. Labeled events land in partitioned parquet and are consumed byjoin_signals_to_eventsalongsideMarketSignalrows from DuckDB to produce SignalLabel inputs for the Phase-1EmpiricalFPRandReliabilityAnalyzermodules.Configuration Added
config/labeling.toml— source rate limits and credential env var names; workflow thresholds; storage paths; join windows. Mirrorsdocs/methodology/labeling-protocol.md §Inter-Annotator Agreementand phase-2 §11.Schema Changes
Labeled-corpus parquet schema is frozen at
label_protocol_version = "1.0";_schema.pycodifies the column set. No JSON-schema exports are added because the labeled corpus is a storage schema, not a consumer-facing Pydantic model registered inscripts/export_schemas.py.Quality Gates
uv run ruff check .cleanuv run ruff format --check .cleanuv run mypy --strict src/clean (90 source files)uv run pytest --cov-fail-under=80passes; 177 tests, 87.6 % total coverageuv run python scripts/export_schemas.py --check)datetime.now()AST guard passesuv run pre-commit run --all-filespassesDefinition of Done
AgreementReportsurfaces viacompute_agreement.supersedetested.SignalLabeloutput (contract matchescompute_empirical_fpr).augur-label discoverandaugur-label decideagainst real candidates. Deferred because discover requires live source-API credentials; no credentials are provisioned in this environment.Operational Handoff
After merge the labeling pipeline is ready for double labeling once source credentials are provisioned. Annotators run
augur-labelagainst real candidates; thescripts/calibrate.pynightly job (from Phase 1) consumesjoin_signals_to_eventsoutput and rebuilds empirical FPR records and reliability curves. First 90 days require two annotators per candidate; agreement metrics surface viaaugur-label agreement(deferred until live corpus exists; the primitive is ready).Deferred Findings
augur-label discoveris not wired in this PR because source adapters require API credentials absent from the development environment. The four adapters are present and tested against credential-absent paths. Wiring discover is a follow-up once credentials are provisioned.augur-label agreementcommand surface was not exposed in the CLI in this PR;compute_agreementis the callable primitive ready for the CLI wiring.Test Plan
uv run pytestpasses locally (177 tests).uv run python scripts/export_schemas.py --checkpasses locally.uv run python scripts/lint_detector_now.pypasses locally.uv run pre-commit run --all-filespasses locally.augur-label discover --since <date>and confirm candidates appear inlabels/queue.json.Review Pass
pr-reviewfindings addressed: 2 CRITICAL + 3 HIGH + 4 MEDIUM/LOW fixed in01d241d fix(labels); 2 scale-ceiling observations documented inab49e93 refactor(labels).code-refinersimplifications applied: none beyond the inline cleanup already in the fix commit.Deferred Findings
pq.ParquetDatasetplus anevent_id -> partition_dateindex is the migration path once dense-labeling volume approaches the lock-timeout threshold.CandidateQueue.pending()returns candidates with fewer than two decisions; "both annotators said no" is surfaceable viaall_decisionsbut has no dedicated query. Deferred until the CLI exposes a resolved-rejected view.AbstractSourceAdapterandQualifyingSourceare retained; the protocol has four concrete implementations and the Pydantic model ships for schema-export parity once discover lands.