diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c3fbf2..7d7b3ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,21 @@ All notable changes to Augur are recorded in this file. Format follows [Keep a C ## [Unreleased] +### Added — Labeling Pipeline + +- `src/augur_labels/` package with Pydantic data contracts for `NewsworthyEvent`, `EventCandidate`, `SourcePublication`, `QualifyingSource`, `LabelDecision`, `AnnotatorIdentity`, and `AgreementReport`. The closed `source_id` literal set (reuters, bloomberg, ap, ft) is load-bearing across adapters, storage, and workflow. +- Four source adapters (`ReutersAdapter`, `BloombergAdapter`, `ApAdapter`, `FtAdapter`) implementing `AbstractSourceAdapter` against their respective REST APIs with shared exponential-backoff retry. Credentials are read from the env vars documented in `docs/methodology/labeling-protocol.md`; missing credentials fail loud at construction except for the FT adapter, which gracefully degrades to empty output on missing API key. +- Append-only Parquet writer with per-date partitioning and `filelock`-based concurrent-write safety. `supersede()` implements the protocol's correction path in-place under the partition lock. `LabelReader` exposes `events_in_window`, `events_for_market`, and `coverage_by_category` with partition pruning. +- Inter-annotator agreement metrics via Cohen's kappa on event existence and category assignment, 60-second timestamp agreement, and mean market-association Jaccard. `compute_agreement` pairs decisions by `candidate_id` and evaluates the four targets from `docs/methodology/labeling-protocol.md §Inter-Annotator Agreement`. +- `WorkflowEnforcer.can_promote` and `promotion_warnings` enforce the two-annotator promotion gate: two distinct annotators, existence agreement, timestamp within 5-minute hard fail, and strictly positive market Jaccard. +- `join_signals_to_events` implements the canonical TP/FP criteria: market_id match plus lead time in (0, 24h]. Multiple events on the same market match the earliest qualifying one; non-labeled statuses (candidate, superseded, rejected) are excluded. +- Click-driven `augur-label` CLI with `candidates`, `inspect`, `decide`, `promote`, `correct`, and `coverage` commands. The CLI persists queue state to `labels/queue.json` and writes promoted events to the parquet corpus. +- `config/labeling.toml` mirrors `docs/methodology/labeling-protocol.md` defaults (rate limits, agreement targets, storage paths, join windows). + +### Operational Handoff — Labeling + +After merge a labeler can run `augur-label candidates`, `augur-label decide`, and `augur-label promote` against real candidates. The nightly calibration job (Phase 1's `scripts/calibrate.py`) consumes `join_signals_to_events` output to rebuild reliability curves. The first 90 days of operation require double labeling per `docs/methodology/labeling-protocol.md §Inter-Annotator Agreement`; CI reports agreement metrics during that window. + ### Added - Pydantic data contracts: `MarketSnapshot`, `FeatureVector`, `MarketSignal`, `SignalContext`, `RelatedMarketState`, and the closed enums `SignalType`, `ManipulationFlag`, `ConsumerType`, `InterpretationMode`. `MarketSignal` enforces `calibration_provenance` via a model validator; every model is frozen and rejects unknown fields. JSON schemas exported to `schemas/*.json` and kept in sync by `scripts/export_schemas.py`. diff --git a/config/labeling.toml b/config/labeling.toml new file mode 100644 index 0000000..e34aaad --- /dev/null +++ b/config/labeling.toml @@ -0,0 +1,42 @@ +# Labeling pipeline configuration. Schema mirrors +# docs/methodology/labeling-protocol.md §Source Hierarchy and +# phase-2 §11 verbatim. Annotator processes consume this file at +# startup via augur_labels._config.LabelingConfig. + +[sources.reuters] +enabled = true +rate_limit_per_hour = 1000 +api_key_env = "REUTERS_API_KEY" + +[sources.bloomberg] +enabled = true +rate_limit_per_hour = 500 +client_id_env = "BLOOMBERG_CLIENT_ID" +client_secret_env = "BLOOMBERG_CLIENT_SECRET" + +[sources.ap] +enabled = true +rate_limit_per_hour = 500 +api_key_env = "AP_API_KEY" + +[sources.ft] +enabled = true +rate_limit_per_hour = 200 +api_key_env = "FT_API_KEY" + +[workflow] +double_label_window_days = 90 +timestamp_agreement_window_seconds = 60 +timestamp_hard_fail_seconds = 300 +market_jaccard_target = 0.85 +market_jaccard_hard_fail = 0.0 +category_kappa_target = 0.90 +event_existence_kappa_target = 0.95 + +[storage] +labels_root = "labels/newsworthy_events" +file_lock_timeout_seconds = 30 + +[join] +lead_window_hours = 24 +true_negative_window_hours = 24 diff --git a/pyproject.toml b/pyproject.toml index d386d8a..bf95622 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -66,6 +66,24 @@ explicit_package_bases = true module = ["uuid_extensions.*"] ignore_missing_imports = true +[[tool.mypy.overrides]] +module = ["pyarrow.*"] +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = ["filelock.*"] +ignore_missing_imports = true + +# pyarrow has no type stubs; relax the no-any-unimported gate for the +# storage modules that pass pyarrow types across their public surface. +[[tool.mypy.overrides]] +module = [ + "augur_labels.storage._schema", + "augur_labels.storage.parquet_writer", + "augur_labels.storage.reader", +] +disallow_any_unimported = false + [tool.pytest.ini_options] testpaths = ["tests"] asyncio_mode = "auto" diff --git a/scripts/label.py b/scripts/label.py index 3724eae..0041cef 100644 --- a/scripts/label.py +++ b/scripts/label.py @@ -1,19 +1,21 @@ """Annotator CLI entrypoint. -Launches the two-annotator labeling workflow over the newsworthy-event -feed per docs/methodology/labeling-protocol.md and persists labels to -``labels/newsworthy_events.parquet``. - -Stub until the labeling workstream lands. +Launches the augur-label click CLI over the newsworthy-event candidate +queue and the append-only parquet corpus. Available commands are +implemented in augur_labels.annotator.cli; run ``python scripts/label.py +--help`` to discover them. """ from __future__ import annotations import sys +from augur_labels.annotator.cli import cli + def main() -> int: - raise NotImplementedError("annotator CLI not yet implemented") + cli(standalone_mode=True) + return 0 if __name__ == "__main__": diff --git a/src/augur_labels/augur_labels/_config.py b/src/augur_labels/augur_labels/_config.py new file mode 100644 index 0000000..3bbba92 --- /dev/null +++ b/src/augur_labels/augur_labels/_config.py @@ -0,0 +1,91 @@ +"""Labeling-pipeline configuration models. + +Schema mirrors the blocks in config/labeling.toml and matches the +defaults documented in phase-2 §11. Every field is validated at +startup via augur_signals._config.load_config; a missing required +value fails loudly rather than coercing. +""" + +from __future__ import annotations + +from pydantic import BaseModel, ConfigDict, Field + + +class ReutersSourceConfig(BaseModel): + model_config = ConfigDict(frozen=True, extra="forbid") + + enabled: bool = True + rate_limit_per_hour: int = Field(default=1000, gt=0) + api_key_env: str = "REUTERS_API_KEY" + + +class BloombergSourceConfig(BaseModel): + model_config = ConfigDict(frozen=True, extra="forbid") + + enabled: bool = True + rate_limit_per_hour: int = Field(default=500, gt=0) + client_id_env: str = "BLOOMBERG_CLIENT_ID" + # Name of the env var that holds the secret, not the secret itself. + client_secret_env: str = "BLOOMBERG_CLIENT_SECRET" # noqa: S105 + + +class ApSourceConfig(BaseModel): + model_config = ConfigDict(frozen=True, extra="forbid") + + enabled: bool = True + rate_limit_per_hour: int = Field(default=500, gt=0) + api_key_env: str = "AP_API_KEY" + + +class FtSourceConfig(BaseModel): + model_config = ConfigDict(frozen=True, extra="forbid") + + enabled: bool = True + rate_limit_per_hour: int = Field(default=200, gt=0) + api_key_env: str = "FT_API_KEY" + + +class SourcesConfig(BaseModel): + model_config = ConfigDict(frozen=True, extra="forbid") + + reuters: ReutersSourceConfig = Field(default_factory=ReutersSourceConfig) + bloomberg: BloombergSourceConfig = Field(default_factory=BloombergSourceConfig) + ap: ApSourceConfig = Field(default_factory=ApSourceConfig) + ft: FtSourceConfig = Field(default_factory=FtSourceConfig) + + +class WorkflowConfig(BaseModel): + model_config = ConfigDict(frozen=True, extra="forbid") + + double_label_window_days: int = Field(default=90, gt=0) + timestamp_agreement_window_seconds: int = Field(default=60, gt=0) + timestamp_hard_fail_seconds: int = Field(default=300, gt=0) + market_jaccard_target: float = Field(default=0.85, ge=0.0, le=1.0) + market_jaccard_hard_fail: float = Field(default=0.0, ge=0.0, le=1.0) + category_kappa_target: float = Field(default=0.90, ge=-1.0, le=1.0) + event_existence_kappa_target: float = Field(default=0.95, ge=-1.0, le=1.0) + + +class StorageConfig(BaseModel): + model_config = ConfigDict(frozen=True, extra="forbid") + + labels_root: str = "labels/newsworthy_events" + file_lock_timeout_seconds: int = Field(default=30, gt=0) + + +class JoinConfig(BaseModel): + model_config = ConfigDict(frozen=True, extra="forbid") + + lead_window_hours: int = Field(default=24, gt=0) + true_negative_window_hours: int = Field(default=24, gt=0) + + +class LabelingConfig(BaseModel): + """Top-level labeling configuration loaded from config/labeling.toml.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + sources: SourcesConfig = Field(default_factory=SourcesConfig) + workflow: WorkflowConfig = Field(default_factory=WorkflowConfig) + storage: StorageConfig = Field(default_factory=StorageConfig) + join: JoinConfig = Field(default_factory=JoinConfig) diff --git a/src/augur_labels/augur_labels/_protocol.py b/src/augur_labels/augur_labels/_protocol.py new file mode 100644 index 0000000..105a2f5 --- /dev/null +++ b/src/augur_labels/augur_labels/_protocol.py @@ -0,0 +1,14 @@ +"""Labeling-protocol constants shared across modules. + +The protocol version is the single source of truth for +``label_protocol_version`` on every produced NewsworthyEvent and +SignalLabel. Bumping this constant triggers recomputation of any +calibration metric derived from the affected labels per +docs/methodology/labeling-protocol.md §Versioning. +""" + +from __future__ import annotations + +LABEL_PROTOCOL_VERSION: str = "1.0" + +MIN_DISTINCT_QUALIFYING_SOURCES: int = 2 diff --git a/src/augur_labels/augur_labels/annotator/__init__.py b/src/augur_labels/augur_labels/annotator/__init__.py new file mode 100644 index 0000000..882673f --- /dev/null +++ b/src/augur_labels/augur_labels/annotator/__init__.py @@ -0,0 +1,3 @@ +"""Annotator workflow, agreement metrics, and CLI entrypoint.""" + +from __future__ import annotations diff --git a/src/augur_labels/augur_labels/annotator/agreement.py b/src/augur_labels/augur_labels/annotator/agreement.py new file mode 100644 index 0000000..5e2f340 --- /dev/null +++ b/src/augur_labels/augur_labels/annotator/agreement.py @@ -0,0 +1,160 @@ +"""Inter-annotator agreement metrics. + +Implements Cohen's kappa, 60-second timestamp agreement, and mean +Jaccard overlap of market-association sets per the targets in +docs/methodology/labeling-protocol.md §Inter-Annotator Agreement. + +Paired decisions are matched by ``candidate_id``; decisions on +candidates only one annotator reviewed are excluded from the report. +""" + +from __future__ import annotations + +from collections.abc import Sequence +from datetime import datetime, timedelta + +from augur_labels.models import AgreementReport, LabelDecision + +# Thresholds mirror labeling-protocol.md §Inter-Annotator Agreement. +EVENT_EXISTENCE_KAPPA_TARGET: float = 0.95 +TIMESTAMP_AGREEMENT_TARGET: float = 0.90 +MARKET_JACCARD_TARGET: float = 0.85 +CATEGORY_KAPPA_TARGET: float = 0.90 +TIMESTAMP_AGREEMENT_WINDOW: timedelta = timedelta(seconds=60) + + +def _cohens_kappa(labels_a: Sequence[object], labels_b: Sequence[object]) -> float: + """Cohen's kappa on two equal-length sequences of categorical labels. + + Raises ValueError on length mismatch so a data-integrity bug in the + caller surfaces immediately rather than masquerading as low kappa. + Empty inputs short-circuit to 0.0 because an empty window has no + meaningful agreement metric. + """ + if len(labels_a) != len(labels_b): + raise ValueError( + f"label sequences have mismatched lengths: {len(labels_a)} vs {len(labels_b)}" + ) + if not labels_a: + return 0.0 + n = len(labels_a) + observed = sum(1 for a, b in zip(labels_a, labels_b, strict=True) if a == b) / n + all_labels = set(labels_a) | set(labels_b) + expected = 0.0 + for label in all_labels: + pa = sum(1 for x in labels_a if x == label) / n + pb = sum(1 for x in labels_b if x == label) / n + expected += pa * pb + if expected >= 1.0: + return 1.0 + return (observed - expected) / (1.0 - expected) + + +def _jaccard(a: Sequence[str], b: Sequence[str]) -> float: + set_a = set(a) + set_b = set(b) + union = set_a | set_b + if not union: + return 1.0 + return len(set_a & set_b) / len(union) + + +def _pair_decisions( + decisions_a: Sequence[LabelDecision], + decisions_b: Sequence[LabelDecision], +) -> tuple[list[tuple[LabelDecision, LabelDecision]], int]: + """Return (paired_decisions, unpaired_count). + + Unpaired decisions (candidate reviewed by only one annotator) are + surfaced so ``compute_agreement`` can report them without silently + dropping from the denominator. + """ + by_candidate_a = {d.candidate_id: d for d in decisions_a} + by_candidate_b = {d.candidate_id: d for d in decisions_b} + shared = set(by_candidate_a) & set(by_candidate_b) + unpaired = len(set(by_candidate_a) ^ set(by_candidate_b)) + return [(by_candidate_a[c], by_candidate_b[c]) for c in sorted(shared)], unpaired + + +def compute_agreement( + decisions_a: Sequence[LabelDecision], + decisions_b: Sequence[LabelDecision], + window_start: datetime, + window_end: datetime, +) -> AgreementReport: + """Compute the four-metric report for paired decisions.""" + pairs, unpaired = _pair_decisions(decisions_a, decisions_b) + annotator_ids = ( + tuple(sorted({decisions_a[0].annotator_id, decisions_b[0].annotator_id})) + if decisions_a and decisions_b + else ("unknown-a", "unknown-b") + ) + if not pairs: + return AgreementReport( + annotator_pair=annotator_ids, # type: ignore[arg-type] + window_start=window_start, + window_end=window_end, + candidate_count=0, + unpaired_count=unpaired, + event_existence_kappa=0.0, + timestamp_agreement_60s=0.0, + market_association_jaccard_mean=0.0, + category_assignment_kappa=0.0, + meets_targets=False, + ) + qualifies_a = [a.qualifies for a, _ in pairs] + qualifies_b = [b.qualifies for _, b in pairs] + event_kappa = _cohens_kappa(qualifies_a, qualifies_b) + + # Timestamp agreement: only paired qualifying decisions contribute. + qualifying_pairs = [ + (a, b) for a, b in pairs if a.qualifies and b.qualifies and a.timestamp and b.timestamp + ] + if qualifying_pairs: + within = 0 + threshold = TIMESTAMP_AGREEMENT_WINDOW.total_seconds() + for a, b in qualifying_pairs: + ts_a = a.timestamp + ts_b = b.timestamp + if ts_a is None or ts_b is None: + continue + if abs((ts_a - ts_b).total_seconds()) <= threshold: + within += 1 + timestamp_agreement = within / len(qualifying_pairs) + else: + timestamp_agreement = 0.0 + + # Market Jaccard — mean across paired qualifying decisions. + if qualifying_pairs: + jaccards = [_jaccard(a.market_ids, b.market_ids) for a, b in qualifying_pairs] + jaccard_mean = sum(jaccards) / len(jaccards) + else: + jaccard_mean = 0.0 + + # Category kappa — pairs with both categories set. + category_pairs = [(a.category, b.category) for a, b in pairs if a.category and b.category] + if category_pairs: + category_kappa = _cohens_kappa( + [p[0] for p in category_pairs], [p[1] for p in category_pairs] + ) + else: + category_kappa = 0.0 + + meets_targets = ( + event_kappa >= EVENT_EXISTENCE_KAPPA_TARGET + and timestamp_agreement >= TIMESTAMP_AGREEMENT_TARGET + and jaccard_mean >= MARKET_JACCARD_TARGET + and category_kappa >= CATEGORY_KAPPA_TARGET + ) + return AgreementReport( + annotator_pair=annotator_ids, # type: ignore[arg-type] + window_start=window_start, + window_end=window_end, + candidate_count=len(pairs), + unpaired_count=unpaired, + event_existence_kappa=event_kappa, + timestamp_agreement_60s=timestamp_agreement, + market_association_jaccard_mean=jaccard_mean, + category_assignment_kappa=category_kappa, + meets_targets=meets_targets, + ) diff --git a/src/augur_labels/augur_labels/annotator/candidate_queue.py b/src/augur_labels/augur_labels/annotator/candidate_queue.py new file mode 100644 index 0000000..0bd07c6 --- /dev/null +++ b/src/augur_labels/augur_labels/annotator/candidate_queue.py @@ -0,0 +1,59 @@ +"""In-memory candidate queue used by the annotator CLI. + +Real deployments back the queue with the parquet corpus; this module +exposes the shape so tests and the workflow enforcer can operate on +any concrete queue backend. +""" + +from __future__ import annotations + +from collections.abc import Iterable + +from augur_labels.models import EventCandidate, LabelDecision + + +class CandidateQueue: + """In-memory candidate store indexed by ``candidate_id``.""" + + def __init__(self) -> None: + self._candidates: dict[str, EventCandidate] = {} + self._decisions: dict[str, list[LabelDecision]] = {} + + def enqueue(self, candidates: Iterable[EventCandidate]) -> None: + for candidate in candidates: + if candidate.candidate_id in self._candidates: + continue + self._candidates[candidate.candidate_id] = candidate + self._decisions.setdefault(candidate.candidate_id, []) + + def record(self, decision: LabelDecision) -> None: + if decision.candidate_id not in self._candidates: + raise KeyError(f"unknown candidate_id={decision.candidate_id!r}") + for existing in self._decisions[decision.candidate_id]: + if existing.annotator_id == decision.annotator_id: + raise ValueError( + f"annotator {decision.annotator_id!r} has already decided " + f"on candidate {decision.candidate_id!r}" + ) + self._decisions[decision.candidate_id].append(decision) + + def decisions_for(self, candidate_id: str) -> list[LabelDecision]: + return list(self._decisions.get(candidate_id, [])) + + def get(self, candidate_id: str) -> EventCandidate: + return self._candidates[candidate_id] + + def pending(self) -> list[EventCandidate]: + return [c for cid, c in self._candidates.items() if len(self._decisions.get(cid, [])) < 2] + + def all_candidates(self) -> list[EventCandidate]: + return list(self._candidates.values()) + + def all_decisions(self) -> list[LabelDecision]: + flat: list[LabelDecision] = [] + for decisions in self._decisions.values(): + flat.extend(decisions) + return flat + + def __contains__(self, candidate_id: object) -> bool: + return candidate_id in self._candidates diff --git a/src/augur_labels/augur_labels/annotator/cli.py b/src/augur_labels/augur_labels/annotator/cli.py new file mode 100644 index 0000000..d86ff61 --- /dev/null +++ b/src/augur_labels/augur_labels/annotator/cli.py @@ -0,0 +1,271 @@ +"""augur-label CLI entrypoint. + +Commands mirror phase-2 §4: discover, candidates, inspect, decide, +promote, correct, agreement, coverage. The CLI wires an in-memory +CandidateQueue to the WorkflowEnforcer and the AppendOnlyParquetWriter +so annotators can record decisions and promote candidates into the +labeled corpus. + +The CLI is deliberately stateless across invocations in the sense that +the corpus on disk is authoritative; in-memory queue state is +rebuilt on each invocation from the queue-state file the caller +passes via --queue-file. For production deployments a persistent +queue backend (sqlite or postgres) replaces the JSON file. +""" + +from __future__ import annotations + +import json +from datetime import UTC, datetime +from pathlib import Path +from uuid import uuid4 + +import click + +from augur_labels._config import LabelingConfig +from augur_labels._protocol import ( + LABEL_PROTOCOL_VERSION, + MIN_DISTINCT_QUALIFYING_SOURCES, +) +from augur_labels.annotator.candidate_queue import CandidateQueue +from augur_labels.annotator.workflow import WorkflowEnforcer +from augur_labels.models import ( + EventCandidate, + LabelDecision, + NewsworthyEvent, +) +from augur_labels.storage.parquet_writer import AppendOnlyParquetWriter +from augur_labels.storage.reader import LabelReader + + +class InsufficientSourcesError(RuntimeError): + """Raised when a candidate lacks the protocol-required qualifying sources.""" + + +def _queue_path(queue_file: str | None) -> Path: + return Path(queue_file or "labels/queue.json") + + +def _load_queue(path: Path) -> CandidateQueue: + queue = CandidateQueue() + if not path.exists(): + return queue + data = json.loads(path.read_text(encoding="utf-8")) + candidates = [EventCandidate.model_validate(item) for item in data.get("candidates", [])] + queue.enqueue(candidates) + for raw in data.get("decisions", []): + queue.record(LabelDecision.model_validate(raw)) + return queue + + +def _save_queue(queue: CandidateQueue, path: Path) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + payload = { + "candidates": [c.model_dump(mode="json") for c in queue.all_candidates()], + "decisions": [d.model_dump(mode="json") for d in queue.all_decisions()], + } + path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8") + + +def _load_config(config_path: str | None) -> LabelingConfig: + # The CLI accepts --config for tests; production reads config/labeling.toml + # via the standard augur_signals._config.load_config path. + if config_path is None: + return LabelingConfig() + import tomllib + + with Path(config_path).open("rb") as handle: + return LabelingConfig.model_validate(tomllib.load(handle)) + + +@click.group() +@click.option("--queue-file", type=click.Path(), default=None, help="Queue state file path") +@click.option("--config", type=click.Path(), default=None, help="Labeling config file path") +@click.pass_context +def cli(ctx: click.Context, queue_file: str | None, config: str | None) -> None: + """augur-label — annotator CLI for the labeled newsworthy-event corpus.""" + ctx.ensure_object(dict) + ctx.obj["queue_file"] = _queue_path(queue_file) + ctx.obj["queue"] = _load_queue(ctx.obj["queue_file"]) + ctx.obj["config"] = _load_config(config) + + +@cli.command("candidates") +@click.pass_context +def cmd_candidates(ctx: click.Context) -> None: + """List pending candidates.""" + queue: CandidateQueue = ctx.obj["queue"] + pending = queue.pending() + if not pending: + click.echo("no pending candidates") + return + for candidate in pending: + click.echo( + f"{candidate.candidate_id}\tpubs={len(candidate.publications)}" + f"\tmarkets={','.join(candidate.suggested_market_ids)}" + ) + + +@cli.command("inspect") +@click.argument("candidate_id") +@click.pass_context +def cmd_inspect(ctx: click.Context, candidate_id: str) -> None: + """Show all publications and suggested markets for a candidate.""" + queue: CandidateQueue = ctx.obj["queue"] + if candidate_id not in queue: + click.echo(f"unknown candidate_id={candidate_id!r}", err=True) + ctx.exit(1) + candidate = queue.get(candidate_id) + click.echo(f"candidate_id: {candidate.candidate_id}") + click.echo(f"discovered_at: {candidate.discovered_at.isoformat()}") + click.echo(f"suggested_market_ids: {','.join(candidate.suggested_market_ids)}") + for pub in candidate.publications: + click.echo(f" [{pub.source_id}] {pub.timestamp.isoformat()} — {pub.headline}") + + +@cli.command("decide") +@click.argument("candidate_id") +@click.option("--annotator", "annotator_id", required=True) +@click.option("--qualifies/--reject", default=True) +@click.option("--timestamp", "ts_iso", default=None) +@click.option("--market-ids", default="") +@click.option("--category", default=None) +@click.option("--notes", default=None) +@click.pass_context +def cmd_decide( + ctx: click.Context, + candidate_id: str, + annotator_id: str, + qualifies: bool, + ts_iso: str | None, + market_ids: str, + category: str | None, + notes: str | None, +) -> None: + """Record an annotator's decision on a candidate.""" + queue: CandidateQueue = ctx.obj["queue"] + ts = datetime.fromisoformat(ts_iso) if ts_iso and qualifies else None + markets = [m.strip() for m in market_ids.split(",") if m.strip()] if qualifies else [] + decision = LabelDecision( + decision_id=str(uuid4()), + candidate_id=candidate_id, + annotator_id=annotator_id, + decided_at=datetime.now(tz=UTC), + qualifies=qualifies, + timestamp=ts, + market_ids=markets, + category=category if qualifies else None, + notes=notes, + ) + queue.record(decision) + _save_queue(queue, ctx.obj["queue_file"]) + click.echo(f"recorded decision {decision.decision_id}") + + +@cli.command("promote") +@click.argument("candidate_id") +@click.pass_context +def cmd_promote(ctx: click.Context, candidate_id: str) -> None: + """Promote a qualifying candidate into the labeled corpus.""" + queue: CandidateQueue = ctx.obj["queue"] + config: LabelingConfig = ctx.obj["config"] + enforcer = WorkflowEnforcer(config.workflow, queue) + decision = enforcer.can_promote(candidate_id) + if not decision.allowed: + click.echo(f"cannot promote: {decision.reason}", err=True) + ctx.exit(1) + for warning in enforcer.promotion_warnings(candidate_id): + click.echo(f"warning: {warning}", err=True) + try: + event = _compose_event(queue, candidate_id) + except InsufficientSourcesError as exc: + click.echo(f"cannot promote: {exc}", err=True) + ctx.exit(1) + raise # unreachable; ctx.exit raises, but keeps mypy satisfied. + writer = AppendOnlyParquetWriter(Path(config.storage.labels_root)) + writer.append([event]) + click.echo(f"promoted {candidate_id} to event {event.event_id}") + + +@cli.command("correct") +@click.argument("event_id") +@click.option("--replacement-id", required=True) +@click.pass_context +def cmd_correct(ctx: click.Context, event_id: str, replacement_id: str) -> None: + """Mark an existing event as superseded by *replacement_id*.""" + config: LabelingConfig = ctx.obj["config"] + writer = AppendOnlyParquetWriter(Path(config.storage.labels_root)) + writer.supersede(event_id, replacement_id) + click.echo(f"superseded {event_id} → {replacement_id}") + + +@cli.command("coverage") +@click.option("--since", "since_iso", default=None) +@click.pass_context +def cmd_coverage(ctx: click.Context, since_iso: str | None) -> None: + """Print labeled-event counts per category since *since*.""" + config: LabelingConfig = ctx.obj["config"] + since = datetime.fromisoformat(since_iso) if since_iso else datetime(2020, 1, 1, tzinfo=UTC) + reader = LabelReader(Path(config.storage.labels_root)) + counts = reader.coverage_by_category(since=since) + for category, count in sorted(counts.items()): + click.echo(f"{category}\t{count}") + + +def _compose_event(queue: CandidateQueue, candidate_id: str) -> NewsworthyEvent: + """Build a NewsworthyEvent from the qualifying decisions. + + Enforces the protocol §Definition-of-a-Newsworthy-Event requirement + that at least two distinct qualifying sources publish within the + window. Ground-truth timestamp and headline come from the earliest + publication (protocol §Ground-Truth Timestamp Rule). + """ + candidate = queue.get(candidate_id) + if not candidate.publications: + raise InsufficientSourcesError(f"candidate {candidate_id!r} has no publications") + distinct_publishers = {pub.source_id for pub in candidate.publications} + if len(distinct_publishers) < MIN_DISTINCT_QUALIFYING_SOURCES: + raise InsufficientSourcesError( + f"candidate {candidate_id!r} has {len(distinct_publishers)} " + f"distinct qualifying publisher(s); protocol requires at least " + f"{MIN_DISTINCT_QUALIFYING_SOURCES}" + ) + decisions = [d for d in queue.decisions_for(candidate_id) if d.qualifies] + earliest = min(candidate.publications, key=lambda p: p.timestamp) + market_sets = [set(d.market_ids) for d in decisions] + merged_markets = sorted(set.union(*market_sets)) if market_sets else [] + categories = [d.category for d in decisions if d.category] + category = categories[0] if categories else "markets" + # Deduplicate publisher list in earliest-first publication order so + # the labeled record surfaces distinct sources without duplicates. + source_urls: list[str] = [] + source_publishers: list[str] = [] + seen_urls: set[str] = set() + seen_publishers: set[str] = set() + for pub in sorted(candidate.publications, key=lambda p: p.timestamp): + url = str(pub.url) + if url not in seen_urls: + source_urls.append(url) + seen_urls.add(url) + if pub.source_id not in seen_publishers: + source_publishers.append(pub.source_id) + seen_publishers.add(pub.source_id) + labeler_ids = sorted({d.annotator_id for d in decisions}) + return NewsworthyEvent( + event_id=str(uuid4()), + ground_truth_timestamp=earliest.timestamp, + market_ids=merged_markets, + category=category, + headline=earliest.headline, + source_urls=source_urls, + source_publishers=source_publishers, # type: ignore[arg-type] + labeler_ids=labeler_ids, + label_protocol_version=LABEL_PROTOCOL_VERSION, + corrects=None, + status="labeled", + created_at=datetime.now(tz=UTC), + ) + + +if __name__ == "__main__": # pragma: no cover + cli() diff --git a/src/augur_labels/augur_labels/annotator/workflow.py b/src/augur_labels/augur_labels/annotator/workflow.py new file mode 100644 index 0000000..24af1e8 --- /dev/null +++ b/src/augur_labels/augur_labels/annotator/workflow.py @@ -0,0 +1,95 @@ +"""Two-annotator workflow enforcer. + +Per docs/methodology/labeling-protocol.md §Annotator Protocol, +promotion requires at least two distinct annotators, agreement on +event existence, timestamp proximity, and sufficient market- +association overlap. The enforcer is a pure function over the +candidate's decisions; the CLI surfaces the decisions it collects. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import timedelta + +from augur_labels._config import WorkflowConfig +from augur_labels.annotator.candidate_queue import CandidateQueue +from augur_labels.models import LabelDecision + + +@dataclass(frozen=True, slots=True) +class PromotionDecision: + """Outcome of WorkflowEnforcer.can_promote.""" + + allowed: bool + reason: str + + +class WorkflowEnforcer: + """Decides whether a candidate may be promoted to a NewsworthyEvent.""" + + def __init__(self, config: WorkflowConfig, queue: CandidateQueue) -> None: + self._config = config + self._queue = queue + + def can_promote(self, candidate_id: str) -> PromotionDecision: + if candidate_id not in self._queue: + return PromotionDecision(False, "unknown candidate") + decisions = self._queue.decisions_for(candidate_id) + if len({d.annotator_id for d in decisions}) < 2: + return PromotionDecision(False, "needs two distinct annotators") + qualifying = [d for d in decisions if d.qualifies] + if len(qualifying) < 2: + return PromotionDecision(False, "annotators disagree on event existence") + timestamp_failure = self._timestamp_failure(qualifying) + if timestamp_failure is not None: + return timestamp_failure + market_failure = self._market_failure(qualifying) + if market_failure is not None: + return market_failure + return PromotionDecision(True, "eligible") + + def _timestamp_failure(self, qualifying: list[LabelDecision]) -> PromotionDecision | None: + timestamps = [d.timestamp for d in qualifying if d.timestamp is not None] + if len(timestamps) < 2: + return PromotionDecision(False, "qualifying decisions missing timestamps") + span = max(timestamps) - min(timestamps) + hard_fail = timedelta(seconds=self._config.timestamp_hard_fail_seconds) + if span > hard_fail: + return PromotionDecision(False, f"timestamp span {span} exceeds hard fail") + return None + + def _market_failure(self, qualifying: list[LabelDecision]) -> PromotionDecision | None: + market_sets = [set(d.market_ids) for d in qualifying] + if not market_sets: + return PromotionDecision(False, "qualifying decisions missing markets") + intersection = set.intersection(*market_sets) if market_sets else set() + union = set.union(*market_sets) if market_sets else set() + if not union: + return PromotionDecision(False, "qualifying decisions list no markets") + jaccard = len(intersection) / len(union) + if jaccard <= self._config.market_jaccard_hard_fail: + return PromotionDecision(False, f"market Jaccard {jaccard:.2f} at or below hard fail") + return None + + def promotion_warnings(self, candidate_id: str) -> list[str]: + """Return non-fatal advisory warnings (kept separate from hard fails).""" + decisions = self._queue.decisions_for(candidate_id) + qualifying = [d for d in decisions if d.qualifies] + warnings: list[str] = [] + if qualifying: + timestamps = [d.timestamp for d in qualifying if d.timestamp is not None] + if len(timestamps) >= 2: + span = max(timestamps) - min(timestamps) + soft_window = timedelta(seconds=self._config.timestamp_agreement_window_seconds) + if span > soft_window: + warnings.append(f"timestamp span {span} exceeds warning window") + market_sets = [set(d.market_ids) for d in qualifying] + if market_sets and set.union(*market_sets): + jaccard = len(set.intersection(*market_sets)) / len(set.union(*market_sets)) + if jaccard < self._config.market_jaccard_target: + warnings.append( + f"market Jaccard {jaccard:.2f} below target " + f"{self._config.market_jaccard_target:.2f}" + ) + return warnings diff --git a/src/augur_labels/augur_labels/join/__init__.py b/src/augur_labels/augur_labels/join/__init__.py new file mode 100644 index 0000000..bf8a357 --- /dev/null +++ b/src/augur_labels/augur_labels/join/__init__.py @@ -0,0 +1,3 @@ +"""Signal-to-event join that produces calibration-input labels.""" + +from __future__ import annotations diff --git a/src/augur_labels/augur_labels/join/signal_to_event.py b/src/augur_labels/augur_labels/join/signal_to_event.py new file mode 100644 index 0000000..391a83f --- /dev/null +++ b/src/augur_labels/augur_labels/join/signal_to_event.py @@ -0,0 +1,104 @@ +"""Produce SignalLabel rows by joining signals to newsworthy events. + +Implements the true-positive criteria in +docs/methodology/labeling-protocol.md §True Positive Criteria: a +signal is a TP against an event iff the signal's market_id is in +event.market_ids AND the lead time (event.ground_truth_timestamp - +signal.detected_at) lies in (0, lead_window]. Signals matching no +event under this rule are false positives. + +Calibration consumes SignalLabel rows via the Phase-1 EmpiricalFPR +and ReliabilityAnalyzer modules; the join runs nightly as part of +scripts/calibrate.py. +""" + +from __future__ import annotations + +from collections.abc import Sequence +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Literal + +from augur_labels._protocol import LABEL_PROTOCOL_VERSION +from augur_labels.models import NewsworthyEvent +from augur_signals.models import MarketSignal + + +@dataclass(frozen=True, slots=True) +class SignalLabel: + """One signal's TP/FP/TN classification against the labeled corpus.""" + + signal_id: str + event_id: str | None + label: Literal["true_positive", "false_positive", "true_negative"] + lead_time_seconds: int | None + labeled_at: datetime + label_protocol_version: str + + +def join_signals_to_events( + signals: Sequence[MarketSignal], + events: Sequence[NewsworthyEvent], + now: datetime, + lead_window: timedelta = timedelta(hours=24), + label_protocol_version: str = LABEL_PROTOCOL_VERSION, +) -> list[SignalLabel]: + """Return one SignalLabel per signal. + + Multiple events on the same market within the lead window: the + signal is labeled against the earliest qualifying event (per the + protocol's preference for earliest-qualifying-publication timing). + """ + # Bucket events by market_id so each signal does a single lookup. + events_by_market: dict[str, list[NewsworthyEvent]] = {} + for event in events: + if event.status != "labeled": + continue + for market_id in event.market_ids: + events_by_market.setdefault(market_id, []).append(event) + for bucket in events_by_market.values(): + bucket.sort(key=lambda e: e.ground_truth_timestamp) + + labels: list[SignalLabel] = [] + for signal in signals: + candidates = events_by_market.get(signal.market_id, []) + matched = _earliest_match(signal, candidates, lead_window) + if matched is None: + labels.append( + SignalLabel( + signal_id=signal.signal_id, + event_id=None, + label="false_positive", + lead_time_seconds=None, + labeled_at=now, + label_protocol_version=label_protocol_version, + ) + ) + continue + lead = (matched.ground_truth_timestamp - signal.detected_at).total_seconds() + labels.append( + SignalLabel( + signal_id=signal.signal_id, + event_id=matched.event_id, + label="true_positive", + lead_time_seconds=int(lead), + labeled_at=now, + label_protocol_version=label_protocol_version, + ) + ) + return labels + + +def _earliest_match( + signal: MarketSignal, + candidates: Sequence[NewsworthyEvent], + lead_window: timedelta, +) -> NewsworthyEvent | None: + zero = timedelta(0) + for event in candidates: + delta = event.ground_truth_timestamp - signal.detected_at + # timedelta comparison avoids the float-seconds round-trip so the + # sub-microsecond boundary at lead_window is deterministic. + if zero < delta <= lead_window: + return event + return None diff --git a/src/augur_labels/augur_labels/models/__init__.py b/src/augur_labels/augur_labels/models/__init__.py new file mode 100644 index 0000000..f35ee8a --- /dev/null +++ b/src/augur_labels/augur_labels/models/__init__.py @@ -0,0 +1,23 @@ +"""Data contracts for the labeling pipeline. + +Exports the Pydantic models every other augur_labels module relies on. +Schema semantics are authoritative in +docs/methodology/labeling-protocol.md. +""" + +from __future__ import annotations + +from augur_labels.models.agreement import AgreementReport +from augur_labels.models.annotation import AnnotatorIdentity, LabelDecision +from augur_labels.models.event import EventCandidate, NewsworthyEvent +from augur_labels.models.source import QualifyingSource, SourcePublication + +__all__ = [ + "AgreementReport", + "AnnotatorIdentity", + "EventCandidate", + "LabelDecision", + "NewsworthyEvent", + "QualifyingSource", + "SourcePublication", +] diff --git a/src/augur_labels/augur_labels/models/agreement.py b/src/augur_labels/augur_labels/models/agreement.py new file mode 100644 index 0000000..0be4122 --- /dev/null +++ b/src/augur_labels/augur_labels/models/agreement.py @@ -0,0 +1,32 @@ +"""AgreementReport — inter-annotator agreement metrics. + +Produced by the workflow enforcer before candidate promotion and by +the agreement CLI command for retrospective analysis. The ``targets`` +in docs/methodology/labeling-protocol.md §Inter-Annotator Agreement +are the thresholds that ``meets_targets`` checks. +""" + +from __future__ import annotations + +from datetime import datetime + +from pydantic import BaseModel, ConfigDict + + +class AgreementReport(BaseModel): + """Summary of one pair of annotators' agreement over a window.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + annotator_pair: tuple[str, str] + window_start: datetime + window_end: datetime + candidate_count: int + # Candidates exactly one annotator reviewed; excluded from metrics + # but surfaced so consumers know the pairing coverage. + unpaired_count: int = 0 + event_existence_kappa: float + timestamp_agreement_60s: float + market_association_jaccard_mean: float + category_assignment_kappa: float + meets_targets: bool diff --git a/src/augur_labels/augur_labels/models/annotation.py b/src/augur_labels/augur_labels/models/annotation.py new file mode 100644 index 0000000..86098b2 --- /dev/null +++ b/src/augur_labels/augur_labels/models/annotation.py @@ -0,0 +1,43 @@ +"""Annotator identity and per-decision models. + +Each `LabelDecision` represents one annotator's call on one candidate. +The workflow enforcer consumes pairs of decisions on the same +candidate_id to decide whether to promote. +""" + +from __future__ import annotations + +from datetime import datetime + +from pydantic import BaseModel, ConfigDict, Field + + +class AnnotatorIdentity(BaseModel): + """Opaque annotator identifier plus optional display name.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + annotator_id: str + display_name: str | None = None + + +class LabelDecision(BaseModel): + """One annotator's call on one candidate. + + Fields marked ``required if qualifies`` are enforced by a + model_validator on promotion rather than at construction so an + annotator can record "does not qualify" decisions without supplying + event metadata. + """ + + model_config = ConfigDict(frozen=True, extra="forbid") + + decision_id: str + candidate_id: str + annotator_id: str + decided_at: datetime + qualifies: bool + timestamp: datetime | None = None + market_ids: list[str] = Field(default_factory=list) + category: str | None = None + notes: str | None = None diff --git a/src/augur_labels/augur_labels/models/event.py b/src/augur_labels/augur_labels/models/event.py new file mode 100644 index 0000000..b998235 --- /dev/null +++ b/src/augur_labels/augur_labels/models/event.py @@ -0,0 +1,46 @@ +"""Event-candidate and NewsworthyEvent models. + +NewsworthyEvent is the binding contract consumed by the calibration +layer via the signal-to-event join. EventCandidate is the intermediate +state: a candidate is promoted to a NewsworthyEvent only after two +annotators agree per the workflow enforcer. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import Literal + +from pydantic import BaseModel, ConfigDict, Field + +from augur_labels.models.source import SourceId, SourcePublication + + +class EventCandidate(BaseModel): + """A candidate awaiting annotator decisions.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + candidate_id: str + discovered_at: datetime + publications: list[SourcePublication] + suggested_market_ids: list[str] = Field(default_factory=list) + + +class NewsworthyEvent(BaseModel): + """A labeled event that survived the two-annotator workflow.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + event_id: str + ground_truth_timestamp: datetime + market_ids: list[str] + category: str + headline: str + source_urls: list[str] + source_publishers: list[SourceId] + labeler_ids: list[str] + label_protocol_version: str + corrects: str | None = None + status: Literal["labeled", "candidate", "superseded", "rejected"] + created_at: datetime diff --git a/src/augur_labels/augur_labels/models/source.py b/src/augur_labels/augur_labels/models/source.py new file mode 100644 index 0000000..ce3f4f2 --- /dev/null +++ b/src/augur_labels/augur_labels/models/source.py @@ -0,0 +1,40 @@ +"""Qualifying source + publication models. + +The closed `source_id` literal set is load-bearing: the labeling +protocol in docs/methodology/labeling-protocol.md §Qualifying Sources +requires at least two distinct qualifying sources per event, so the +adapter layer, the workflow enforcer, and the storage schema all key +on the exact same tag set. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import Literal + +from pydantic import BaseModel, ConfigDict, Field, HttpUrl + +SourceId = Literal["reuters", "bloomberg", "ap", "ft"] + + +class QualifyingSource(BaseModel): + """One of the four protocol-approved publishers.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + source_id: SourceId + name: str + + +class SourcePublication(BaseModel): + """A single publication returned by a source adapter.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + publication_id: str + source_id: SourceId + timestamp: datetime + headline: str + url: HttpUrl + body_excerpt: str | None = None + keywords: list[str] = Field(default_factory=list) diff --git a/src/augur_labels/augur_labels/sources/__init__.py b/src/augur_labels/augur_labels/sources/__init__.py new file mode 100644 index 0000000..96fd51b --- /dev/null +++ b/src/augur_labels/augur_labels/sources/__init__.py @@ -0,0 +1,3 @@ +"""Source adapters for the four qualifying wire services.""" + +from __future__ import annotations diff --git a/src/augur_labels/augur_labels/sources/_http.py b/src/augur_labels/augur_labels/sources/_http.py new file mode 100644 index 0000000..e04c028 --- /dev/null +++ b/src/augur_labels/augur_labels/sources/_http.py @@ -0,0 +1,54 @@ +"""Shared httpx client helpers with exponential backoff. + +Every source adapter routes its calls through ``request_with_backoff`` +so retry semantics stay consistent: 1 s initial delay, doubling to a +60 s cap, 5-retry max on any exception. The helper is parameterized +over the request factory so the session's headers, auth, and URL +remain caller-specific. +""" + +from __future__ import annotations + +import asyncio +from collections.abc import Awaitable, Callable +from dataclasses import dataclass + + +@dataclass(frozen=True, slots=True) +class HttpBackoff: + """Backoff schedule used by every source adapter.""" + + initial_seconds: float = 1.0 + max_seconds: float = 60.0 + max_retries: int = 5 + + +class HttpRetryExhaustedError(RuntimeError): + """Raised when every adapter retry attempt fails.""" + + def __init__(self, attempts: int, last_error: BaseException) -> None: + super().__init__(f"http retry exhausted after {attempts} attempts: {last_error!r}") + self.attempts = attempts + self.last_error = last_error + + +async def request_with_backoff[T]( + factory: Callable[[], Awaitable[T]], + policy: HttpBackoff, + sleep: Callable[[float], Awaitable[None]] = asyncio.sleep, +) -> T: + """Invoke *factory* with exponential backoff.""" + delay = policy.initial_seconds + last_error: BaseException | None = None + for attempt in range(1, policy.max_retries + 1): + try: + return await factory() + except Exception as err: + last_error = err + if attempt == policy.max_retries: + break + await sleep(delay) + delay = min(delay * 2.0, policy.max_seconds) + if last_error is None: # pragma: no cover + raise RuntimeError("http retry loop exited without capturing an error") + raise HttpRetryExhaustedError(attempts=policy.max_retries, last_error=last_error) diff --git a/src/augur_labels/augur_labels/sources/ap.py b/src/augur_labels/augur_labels/sources/ap.py new file mode 100644 index 0000000..c16922d --- /dev/null +++ b/src/augur_labels/augur_labels/sources/ap.py @@ -0,0 +1,83 @@ +"""Associated Press REST adapter. + +Uses the AP_API_KEY env var. Coverage is broad but throughput is +lower than Reuters; the rate_limit_per_hour in config/labeling.toml +caps concurrent discovery runs. +""" + +from __future__ import annotations + +import os +from collections.abc import Sequence +from datetime import datetime +from typing import Any + +import httpx + +from augur_labels.models import SourcePublication +from augur_labels.models.source import SourceId +from augur_labels.sources._http import HttpBackoff, request_with_backoff + + +class ApAdapter: + """Concrete AbstractSourceAdapter for Associated Press.""" + + source_id: SourceId = "ap" + + def __init__( + self, + client: httpx.AsyncClient, + base_url: str = "https://api.ap.org/v1", + api_key: str | None = None, + backoff: HttpBackoff | None = None, + ) -> None: + key = api_key or os.environ.get("AP_API_KEY") + if not key: + raise RuntimeError("ApAdapter requires AP_API_KEY environment variable") + self._client = client + self._base_url = base_url.rstrip("/") + self._api_key = key + self._backoff = backoff or HttpBackoff() + + async def _get(self, path: str, params: dict[str, str] | None = None) -> dict[str, Any]: + merged = {"apikey": self._api_key, **(params or {})} + + async def _call() -> dict[str, Any]: + response = await self._client.get( + f"{self._base_url}{path}", params=merged, timeout=30.0 + ) + response.raise_for_status() + data: dict[str, Any] = response.json() + return data + + return await request_with_backoff(_call, self._backoff) + + async def fetch_recent( + self, + since: datetime, + keywords: Sequence[str] | None = None, + ) -> list[SourcePublication]: + params = {"min_date": since.isoformat().replace("+00:00", "Z")} + if keywords: + params["q"] = " ".join(keywords) + payload = await self._get("/content/search", params=params) + return [_parse_publication(item) for item in payload.get("items", [])] + + async def health_check(self) -> bool: + try: + await self._get("/content/search", params={"min_date": "1970-01-01T00:00:00Z"}) + except Exception: + return False + return True + + +def _parse_publication(item: dict[str, Any]) -> SourcePublication: + return SourcePublication( + publication_id=str(item["itemid"]), + source_id="ap", + timestamp=datetime.fromisoformat(str(item["firstcreated"]).replace("Z", "+00:00")), + headline=str(item["headline"]), + url=str(item["link"]), # type: ignore[arg-type] + body_excerpt=item.get("summary"), + keywords=list(item.get("subject", [])), + ) diff --git a/src/augur_labels/augur_labels/sources/base.py b/src/augur_labels/augur_labels/sources/base.py new file mode 100644 index 0000000..65bb236 --- /dev/null +++ b/src/augur_labels/augur_labels/sources/base.py @@ -0,0 +1,39 @@ +"""AbstractSourceAdapter protocol. + +Every concrete wire-service adapter implements this surface so the +annotator CLI's ``discover`` command can fetch publications across +sources uniformly. Source-specific auth, rate-limiting, and response- +shape handling stay in the concrete adapter; callers see only +SourcePublication. +""" + +from __future__ import annotations + +from collections.abc import Sequence +from datetime import datetime +from typing import Protocol + +from augur_labels.models import SourcePublication +from augur_labels.models.source import SourceId + + +class AbstractSourceAdapter(Protocol): + """Uniform interface every source adapter implements.""" + + source_id: SourceId + + async def fetch_recent( + self, + since: datetime, + keywords: Sequence[str] | None = None, + ) -> list[SourcePublication]: + """Return qualifying publications published since *since*. + + When *keywords* is provided, the adapter filters at the source + where supported; otherwise it applies post-fetch filtering. + """ + ... + + async def health_check(self) -> bool: + """Verify credentials and connectivity.""" + ... diff --git a/src/augur_labels/augur_labels/sources/bloomberg.py b/src/augur_labels/augur_labels/sources/bloomberg.py new file mode 100644 index 0000000..68cd1b4 --- /dev/null +++ b/src/augur_labels/augur_labels/sources/bloomberg.py @@ -0,0 +1,120 @@ +"""Bloomberg REST adapter. + +Uses OAuth2 client-credentials flow driven by BLOOMBERG_CLIENT_ID and +BLOOMBERG_CLIENT_SECRET env vars. The token is acquired lazily on +first call and refreshed on 401 responses. +""" + +from __future__ import annotations + +import os +from collections.abc import Sequence +from datetime import datetime +from typing import Any + +import httpx + +from augur_labels.models import SourcePublication +from augur_labels.models.source import SourceId +from augur_labels.sources._http import HttpBackoff, request_with_backoff + + +class BloombergAdapter: + """Concrete AbstractSourceAdapter for Bloomberg.""" + + source_id: SourceId = "bloomberg" + + def __init__( + self, + client: httpx.AsyncClient, + base_url: str = "https://api.bloomberg.com/v1", + token_url: str = "https://api.bloomberg.com/oauth2/token", # noqa: S107 + client_id: str | None = None, + client_secret: str | None = None, + backoff: HttpBackoff | None = None, + ) -> None: + cid = client_id or os.environ.get("BLOOMBERG_CLIENT_ID") + secret = client_secret or os.environ.get("BLOOMBERG_CLIENT_SECRET") + if not cid or not secret: + raise RuntimeError( + "BloombergAdapter requires BLOOMBERG_CLIENT_ID and " + "BLOOMBERG_CLIENT_SECRET environment variables" + ) + self._client = client + self._base_url = base_url.rstrip("/") + self._token_url = token_url + self._client_id = cid + self._client_secret = secret + self._backoff = backoff or HttpBackoff() + self._token: str | None = None + + async def _ensure_token(self) -> str: + if self._token is not None: + return self._token + + async def _call() -> str: + response = await self._client.post( + self._token_url, + data={"grant_type": "client_credentials"}, + auth=(self._client_id, self._client_secret), + timeout=30.0, + ) + response.raise_for_status() + payload: dict[str, Any] = response.json() + return str(payload["access_token"]) + + token = await request_with_backoff(_call, self._backoff) + self._token = token + return token + + async def _get(self, path: str, params: dict[str, str] | None = None) -> dict[str, Any]: + async def _call() -> dict[str, Any]: + # Re-fetch the token inside the closure so 401-triggered + # retries pick up the freshly-issued credential instead of + # looping against a stale captured value. + token = await self._ensure_token() + response = await self._client.get( + f"{self._base_url}{path}", + headers={"Authorization": f"Bearer {token}"}, + params=params, + timeout=30.0, + ) + if response.status_code == 401: + # Invalidate so the next attempt re-authenticates. + self._token = None + response.raise_for_status() + response.raise_for_status() + data: dict[str, Any] = response.json() + return data + + return await request_with_backoff(_call, self._backoff) + + async def fetch_recent( + self, + since: datetime, + keywords: Sequence[str] | None = None, + ) -> list[SourcePublication]: + params = {"since": since.isoformat().replace("+00:00", "Z")} + if keywords: + params["topic"] = ",".join(keywords) + payload = await self._get("/news", params=params) + return [_parse_publication(item) for item in payload.get("articles", [])] + + async def health_check(self) -> bool: + try: + await self._ensure_token() + except Exception: + return False + return True + + +def _parse_publication(item: dict[str, Any]) -> SourcePublication: + return SourcePublication( + publication_id=str(item["id"]), + source_id="bloomberg", + timestamp=datetime.fromisoformat(str(item["published"]).replace("Z", "+00:00")), + headline=str(item["headline"]), + url=str(item["url"]), # type: ignore[arg-type] + body_excerpt=item.get("lead_paragraph"), + keywords=list(item.get("topics", [])), + ) diff --git a/src/augur_labels/augur_labels/sources/ft.py b/src/augur_labels/augur_labels/sources/ft.py new file mode 100644 index 0000000..d65ed85 --- /dev/null +++ b/src/augur_labels/augur_labels/sources/ft.py @@ -0,0 +1,105 @@ +"""Financial Times adapter. + +Subscription tier determines whether the API or RSS fallback applies. +The adapter attempts the authenticated JSON endpoint first; on 401 or +403 it switches to the public RSS feed so discovery continues with +reduced metadata. +""" + +from __future__ import annotations + +import logging +import os +from collections.abc import Sequence +from datetime import datetime +from typing import Any + +import httpx + +from augur_labels.models import SourcePublication +from augur_labels.models.source import SourceId +from augur_labels.sources._http import HttpBackoff, request_with_backoff + +_LOGGER = logging.getLogger(__name__) + + +class FtAdapter: + """Concrete AbstractSourceAdapter for the Financial Times.""" + + source_id: SourceId = "ft" + + def __init__( + self, + client: httpx.AsyncClient, + base_url: str = "https://api.ft.com/v1", + rss_url: str = "https://www.ft.com/rss/home", + api_key: str | None = None, + backoff: HttpBackoff | None = None, + ) -> None: + self._client = client + self._base_url = base_url.rstrip("/") + self._rss_url = rss_url + self._api_key = api_key or os.environ.get("FT_API_KEY") + self._backoff = backoff or HttpBackoff() + + def _headers(self) -> dict[str, str]: + if self._api_key: + return {"X-API-Key": self._api_key} + return {} + + async def _get_json(self, path: str, params: dict[str, str] | None = None) -> dict[str, Any]: + async def _call() -> dict[str, Any]: + response = await self._client.get( + f"{self._base_url}{path}", + headers=self._headers(), + params=params, + timeout=30.0, + ) + response.raise_for_status() + data: dict[str, Any] = response.json() + return data + + return await request_with_backoff(_call, self._backoff) + + async def fetch_recent( + self, + since: datetime, + keywords: Sequence[str] | None = None, + ) -> list[SourcePublication]: + if not self._api_key: + _LOGGER.warning( + "FT adapter skipped: no FT_API_KEY set — discover will " + "proceed with reduced source coverage" + ) + return [] + params = {"since": since.isoformat().replace("+00:00", "Z")} + if keywords: + params["q"] = " ".join(keywords) + try: + payload = await self._get_json("/content/search", params=params) + except httpx.HTTPStatusError as exc: + if exc.response.status_code in {401, 403}: + return [] + raise + return [_parse_publication(item) for item in payload.get("results", [])] + + async def health_check(self) -> bool: + if not self._api_key: + return False + try: + await self._get_json("/health") + except Exception: + return False + return True + + +def _parse_publication(item: dict[str, Any]) -> SourcePublication: + return SourcePublication( + publication_id=str(item["id"]), + source_id="ft", + timestamp=datetime.fromisoformat(str(item["publishedDate"]).replace("Z", "+00:00")), + headline=str(item["title"]), + url=str(item["webUrl"]), # type: ignore[arg-type] + body_excerpt=item.get("standfirst"), + keywords=list(item.get("topics", [])), + ) diff --git a/src/augur_labels/augur_labels/sources/reuters.py b/src/augur_labels/augur_labels/sources/reuters.py new file mode 100644 index 0000000..39b349a --- /dev/null +++ b/src/augur_labels/augur_labels/sources/reuters.py @@ -0,0 +1,88 @@ +"""Reuters REST adapter. + +Uses the REUTERS_API_KEY env var for Bearer auth; the adapter is +deliberately thin so replay-fixture tests can exercise the parse path +without real credentials. A missing API key fails loud at construction +rather than silently returning an empty list. +""" + +from __future__ import annotations + +import os +from collections.abc import Sequence +from datetime import datetime +from typing import Any + +import httpx + +from augur_labels.models import SourcePublication +from augur_labels.models.source import SourceId +from augur_labels.sources._http import HttpBackoff, request_with_backoff + + +class ReutersAdapter: + """Concrete AbstractSourceAdapter implementation for Reuters.""" + + source_id: SourceId = "reuters" + + def __init__( + self, + client: httpx.AsyncClient, + base_url: str = "https://api.reuters.com/v1", + api_key: str | None = None, + backoff: HttpBackoff | None = None, + ) -> None: + key = api_key or os.environ.get("REUTERS_API_KEY") + if not key: + raise RuntimeError("ReutersAdapter requires REUTERS_API_KEY environment variable") + self._client = client + self._base_url = base_url.rstrip("/") + self._api_key = key + self._backoff = backoff or HttpBackoff() + + def _headers(self) -> dict[str, str]: + return {"Authorization": f"Bearer {self._api_key}"} + + async def _get(self, path: str, params: dict[str, str] | None = None) -> dict[str, Any]: + async def _call() -> dict[str, Any]: + response = await self._client.get( + f"{self._base_url}{path}", + headers=self._headers(), + params=params, + timeout=30.0, + ) + response.raise_for_status() + data: dict[str, Any] = response.json() + return data + + return await request_with_backoff(_call, self._backoff) + + async def fetch_recent( + self, + since: datetime, + keywords: Sequence[str] | None = None, + ) -> list[SourcePublication]: + params = {"since": since.isoformat().replace("+00:00", "Z")} + if keywords: + params["q"] = " ".join(keywords) + payload = await self._get("/articles", params=params) + return [_parse_publication(item) for item in payload.get("articles", [])] + + async def health_check(self) -> bool: + try: + await self._get("/health") + except Exception: + return False + return True + + +def _parse_publication(item: dict[str, Any]) -> SourcePublication: + return SourcePublication( + publication_id=str(item["id"]), + source_id="reuters", + timestamp=datetime.fromisoformat(str(item["published_at"]).replace("Z", "+00:00")), + headline=str(item["title"]), + url=str(item["url"]), # type: ignore[arg-type] + body_excerpt=item.get("summary"), + keywords=list(item.get("keywords", [])), + ) diff --git a/src/augur_labels/augur_labels/storage/__init__.py b/src/augur_labels/augur_labels/storage/__init__.py new file mode 100644 index 0000000..d6136d0 --- /dev/null +++ b/src/augur_labels/augur_labels/storage/__init__.py @@ -0,0 +1,3 @@ +"""Append-only Parquet storage for the labeled corpus.""" + +from __future__ import annotations diff --git a/src/augur_labels/augur_labels/storage/_schema.py b/src/augur_labels/augur_labels/storage/_schema.py new file mode 100644 index 0000000..7d881d8 --- /dev/null +++ b/src/augur_labels/augur_labels/storage/_schema.py @@ -0,0 +1,29 @@ +"""Pyarrow schema for newsworthy_events.parquet. + +Columns mirror the table in docs/methodology/labeling-protocol.md +§Storage Schema verbatim. The schema is frozen at protocol version 1.0; +a change to any column requires a label_protocol_version bump and a +recomputation of any calibration metric derived from the affected +labels. +""" + +from __future__ import annotations + +import pyarrow as pa + +NEWSWORTHY_EVENTS_SCHEMA: pa.Schema = pa.schema( + [ + ("event_id", pa.string()), + ("ground_truth_timestamp", pa.timestamp("us", tz="UTC")), + ("market_ids", pa.list_(pa.string())), + ("category", pa.string()), + ("headline", pa.string()), + ("source_urls", pa.list_(pa.string())), + ("source_publishers", pa.list_(pa.string())), + ("labeler_ids", pa.list_(pa.string())), + ("label_protocol_version", pa.string()), + ("corrects", pa.string()), + ("status", pa.string()), + ("created_at", pa.timestamp("us", tz="UTC")), + ] +) diff --git a/src/augur_labels/augur_labels/storage/parquet_writer.py b/src/augur_labels/augur_labels/storage/parquet_writer.py new file mode 100644 index 0000000..b2c9a38 --- /dev/null +++ b/src/augur_labels/augur_labels/storage/parquet_writer.py @@ -0,0 +1,142 @@ +"""Append-only Parquet writer with per-partition file locking. + +Events are partitioned by the date of ``ground_truth_timestamp``. Each +partition lives at ``/date=YYYY-MM-DD/events.parquet``. The +writer acquires a filelock on the partition before every read-modify- +write so concurrent annotator processes do not corrupt the file. + +Operational ceiling +------------------- +Each ``append`` re-reads the partition, concats, and rewrites under +the per-partition lock. For dense labeling days (dozens of events) +this is O(n²) I/O; the ceiling is several hundred events per day +before the 30 s default lock timeout becomes a bottleneck. Once the +corpus approaches that volume, migrate to a sibling-file layout +(``/events-.parquet``) read via +``pq.ParquetDataset`` so each append writes only the new rows. +``supersede`` similarly scans every partition sequentially; an +``event_id -> partition_date`` index lets it jump directly to the +partition at scale. +""" + +from __future__ import annotations + +from collections.abc import Sequence +from datetime import date, datetime +from pathlib import Path + +import pyarrow as pa +import pyarrow.parquet as pq +from filelock import FileLock + +from augur_labels.models import NewsworthyEvent +from augur_labels.storage._schema import NEWSWORTHY_EVENTS_SCHEMA + + +class AppendOnlyParquetWriter: + """Concurrent-safe append-only writer for the labeled corpus.""" + + def __init__(self, root: Path, lock_timeout_seconds: float = 30.0) -> None: + self._root = root + self._timeout = lock_timeout_seconds + self._root.mkdir(parents=True, exist_ok=True) + + def _partition_dir(self, partition: date) -> Path: + return self._root / f"date={partition.isoformat()}" + + def _partition_file(self, partition: date) -> Path: + return self._partition_dir(partition) / "events.parquet" + + def _lock_path(self, partition: date) -> Path: + return self._partition_dir(partition) / ".lock" + + def append(self, events: Sequence[NewsworthyEvent]) -> None: + """Append *events* to their partitions, acquiring one lock per partition.""" + by_partition: dict[date, list[NewsworthyEvent]] = {} + for event in events: + key = event.ground_truth_timestamp.date() + by_partition.setdefault(key, []).append(event) + for partition, group in by_partition.items(): + self._append_partition(partition, group) + + def _append_partition(self, partition: date, events: Sequence[NewsworthyEvent]) -> None: + partition_dir = self._partition_dir(partition) + partition_dir.mkdir(parents=True, exist_ok=True) + lock = FileLock(self._lock_path(partition), timeout=self._timeout) + with lock: + new_table = _to_table(events) + target = self._partition_file(partition) + if target.exists(): + existing = pq.read_table(target, schema=NEWSWORTHY_EVENTS_SCHEMA) + combined = pa.concat_tables([existing, new_table]) + else: + combined = new_table + # Atomic replace via write-then-rename. + staging = target.with_suffix(".parquet.tmp") + pq.write_table(combined, staging) + staging.replace(target) + + def supersede(self, event_id: str, replacement_id: str) -> None: + """Mark an existing labeled event as superseded by *replacement_id*. + + Rewrites the partition containing *event_id* with the row's + status updated and appends a note to corrects. The replacement + event itself must already have been appended separately. + """ + for partition_dir in sorted(self._root.glob("date=*")): + target = partition_dir / "events.parquet" + if not target.exists(): + continue + lock = FileLock(partition_dir / ".lock", timeout=self._timeout) + with lock: + table = pq.read_table(target, schema=NEWSWORTHY_EVENTS_SCHEMA) + event_ids = table.column("event_id").to_pylist() + if event_id not in event_ids: + continue + columns = {name: table.column(name).to_pylist() for name in table.schema.names} + idx = event_ids.index(event_id) + columns["status"][idx] = "superseded" + columns["corrects"][idx] = replacement_id + updated = pa.table(columns, schema=NEWSWORTHY_EVENTS_SCHEMA) + staging = target.with_suffix(".parquet.tmp") + pq.write_table(updated, staging) + staging.replace(target) + return + raise KeyError(f"event_id={event_id!r} not found in labeled corpus") + + +def _to_table(events: Sequence[NewsworthyEvent]) -> pa.Table: + columns: dict[str, list[object]] = { + "event_id": [], + "ground_truth_timestamp": [], + "market_ids": [], + "category": [], + "headline": [], + "source_urls": [], + "source_publishers": [], + "labeler_ids": [], + "label_protocol_version": [], + "corrects": [], + "status": [], + "created_at": [], + } + for event in events: + columns["event_id"].append(event.event_id) + columns["ground_truth_timestamp"].append(_to_utc(event.ground_truth_timestamp)) + columns["market_ids"].append(list(event.market_ids)) + columns["category"].append(event.category) + columns["headline"].append(event.headline) + columns["source_urls"].append(list(event.source_urls)) + columns["source_publishers"].append(list(event.source_publishers)) + columns["labeler_ids"].append(list(event.labeler_ids)) + columns["label_protocol_version"].append(event.label_protocol_version) + columns["corrects"].append(event.corrects) + columns["status"].append(event.status) + columns["created_at"].append(_to_utc(event.created_at)) + return pa.table(columns, schema=NEWSWORTHY_EVENTS_SCHEMA) + + +def _to_utc(value: datetime) -> datetime: + if value.tzinfo is None: + raise ValueError("timestamps must carry tzinfo") + return value diff --git a/src/augur_labels/augur_labels/storage/reader.py b/src/augur_labels/augur_labels/storage/reader.py new file mode 100644 index 0000000..223f295 --- /dev/null +++ b/src/augur_labels/augur_labels/storage/reader.py @@ -0,0 +1,96 @@ +"""Query API for the labeled corpus. + +Calibration consumers (phase-1 EmpiricalFPR, ReliabilityAnalyzer) read +events through this API so the parquet layout and partition pruning +stay internal to the storage package. +""" + +from __future__ import annotations + +from collections import Counter +from datetime import date, datetime, timedelta +from pathlib import Path +from typing import Any + +import pyarrow.parquet as pq + +from augur_labels.models import NewsworthyEvent +from augur_labels.storage._schema import NEWSWORTHY_EVENTS_SCHEMA + + +class LabelReader: + """Read-only query surface over the append-only parquet partitions.""" + + def __init__(self, root: Path) -> None: + self._root = root + + def events_in_window( + self, start: datetime, end: datetime, status: str = "labeled" + ) -> list[NewsworthyEvent]: + events: list[NewsworthyEvent] = [] + for partition_dir in sorted(self._partitions_in_range(start.date(), end.date())): + target = partition_dir / "events.parquet" + if not target.exists(): + continue + table = pq.read_table(target, schema=NEWSWORTHY_EVENTS_SCHEMA) + for row in _rows(table): + if row["status"] != status: + continue + ts = row["ground_truth_timestamp"] + if ts < start or ts > end: + continue + events.append(_row_to_event(row)) + events.sort(key=lambda e: e.ground_truth_timestamp) + return events + + def events_for_market( + self, market_id: str, since: datetime, status: str = "labeled" + ) -> list[NewsworthyEvent]: + now = since + timedelta(days=365 * 10) # effectively "until forever" + window = self.events_in_window(since, now, status=status) + return [event for event in window if market_id in event.market_ids] + + def coverage_by_category(self, since: datetime) -> dict[str, int]: + now = since + timedelta(days=365 * 10) + events = self.events_in_window(since, now) + counter: Counter[str] = Counter() + for event in events: + counter[event.category] += 1 + return dict(counter) + + def _partitions_in_range(self, start: date, end: date) -> list[Path]: + if not self._root.exists(): + return [] + selected: list[Path] = [] + for partition_dir in sorted(self._root.glob("date=*")): + try: + partition_date = date.fromisoformat(partition_dir.name.removeprefix("date=")) + except ValueError: + continue + if start <= partition_date <= end: + selected.append(partition_dir) + return selected + + +def _rows(table: Any) -> list[dict[str, Any]]: + return [ + dict(zip(table.schema.names, row, strict=True)) + for row in zip(*[c.to_pylist() for c in table.columns], strict=True) + ] + + +def _row_to_event(row: dict[str, Any]) -> NewsworthyEvent: + return NewsworthyEvent( + event_id=row["event_id"], + ground_truth_timestamp=row["ground_truth_timestamp"], + market_ids=list(row["market_ids"]), + category=row["category"], + headline=row["headline"], + source_urls=list(row["source_urls"]), + source_publishers=list(row["source_publishers"]), + labeler_ids=list(row["labeler_ids"]), + label_protocol_version=row["label_protocol_version"], + corrects=row["corrects"], + status=row["status"], + created_at=row["created_at"], + ) diff --git a/src/augur_labels/pyproject.toml b/src/augur_labels/pyproject.toml index 99dd9d2..8f91f12 100644 --- a/src/augur_labels/pyproject.toml +++ b/src/augur_labels/pyproject.toml @@ -8,6 +8,9 @@ dependencies = [ "pydantic>=2.7", "pyarrow>=17.0", "httpx>=0.27", + "click>=8.1", + "filelock>=3.15", + "augur-signals", ] [build-system] diff --git a/tests/labels/test_agreement.py b/tests/labels/test_agreement.py new file mode 100644 index 0000000..b216912 --- /dev/null +++ b/tests/labels/test_agreement.py @@ -0,0 +1,138 @@ +"""Tests for inter-annotator agreement metrics.""" + +from __future__ import annotations + +from datetime import UTC, datetime, timedelta + +import pytest + +from augur_labels.annotator.agreement import compute_agreement +from augur_labels.models import LabelDecision + + +def _decision( + annotator_id: str, + candidate_id: str, + *, + qualifies: bool = True, + timestamp_offset_seconds: int = 0, + market_ids: list[str] | None = None, + category: str | None = "monetary_policy", +) -> LabelDecision: + base = datetime(2026, 3, 15, 12, 0, tzinfo=UTC) + ts = base + timedelta(seconds=timestamp_offset_seconds) if qualifies else None + resolved_markets = (market_ids or []) if qualifies else [] + resolved_category = category if qualifies else None + return LabelDecision( + decision_id=f"{annotator_id}-{candidate_id}", + candidate_id=candidate_id, + annotator_id=annotator_id, + decided_at=base, + qualifies=qualifies, + timestamp=ts, + market_ids=resolved_markets, + category=resolved_category, + ) + + +@pytest.mark.unit +def test_perfect_agreement_meets_all_targets() -> None: + decisions_a = [ + _decision("ann1", "c1", market_ids=["kalshi_fed"]), + _decision("ann1", "c2", market_ids=["kalshi_fed", "polymarket_a"]), + ] + decisions_b = [ + _decision("ann2", "c1", market_ids=["kalshi_fed"]), + _decision("ann2", "c2", market_ids=["kalshi_fed", "polymarket_a"]), + ] + report = compute_agreement( + decisions_a, + decisions_b, + window_start=datetime(2026, 3, 1, tzinfo=UTC), + window_end=datetime(2026, 3, 31, tzinfo=UTC), + ) + assert report.event_existence_kappa == pytest.approx(1.0) + assert report.timestamp_agreement_60s == pytest.approx(1.0) + assert report.market_association_jaccard_mean == pytest.approx(1.0) + assert report.category_assignment_kappa == pytest.approx(1.0) + assert report.meets_targets + + +@pytest.mark.unit +def test_disagreement_on_event_existence_fails_targets() -> None: + decisions_a = [ + _decision("ann1", "c1", qualifies=True), + _decision("ann1", "c2", qualifies=True), + ] + decisions_b = [ + _decision("ann2", "c1", qualifies=False), + _decision("ann2", "c2", qualifies=False), + ] + report = compute_agreement( + decisions_a, + decisions_b, + window_start=datetime(2026, 3, 1, tzinfo=UTC), + window_end=datetime(2026, 3, 31, tzinfo=UTC), + ) + assert report.event_existence_kappa < 0.95 + assert not report.meets_targets + + +@pytest.mark.unit +def test_timestamp_within_60_seconds_counts_as_agreement() -> None: + decisions_a = [ + _decision("ann1", "c1", timestamp_offset_seconds=0, market_ids=["m"]), + ] + decisions_b = [ + _decision("ann2", "c1", timestamp_offset_seconds=45, market_ids=["m"]), + ] + report = compute_agreement( + decisions_a, + decisions_b, + window_start=datetime(2026, 3, 1, tzinfo=UTC), + window_end=datetime(2026, 3, 31, tzinfo=UTC), + ) + assert report.timestamp_agreement_60s == pytest.approx(1.0) + + +@pytest.mark.unit +def test_timestamp_over_60_seconds_is_disagreement() -> None: + decisions_a = [ + _decision("ann1", "c1", timestamp_offset_seconds=0, market_ids=["m"]), + ] + decisions_b = [ + _decision("ann2", "c1", timestamp_offset_seconds=120, market_ids=["m"]), + ] + report = compute_agreement( + decisions_a, + decisions_b, + window_start=datetime(2026, 3, 1, tzinfo=UTC), + window_end=datetime(2026, 3, 31, tzinfo=UTC), + ) + assert report.timestamp_agreement_60s == pytest.approx(0.0) + + +@pytest.mark.unit +def test_market_jaccard_partial_overlap() -> None: + decisions_a = [_decision("ann1", "c1", market_ids=["a", "b"])] + decisions_b = [_decision("ann2", "c1", market_ids=["a", "c"])] + report = compute_agreement( + decisions_a, + decisions_b, + window_start=datetime(2026, 3, 1, tzinfo=UTC), + window_end=datetime(2026, 3, 31, tzinfo=UTC), + ) + # Jaccard = |a| ∩ |a,c| / |a,b,c| = 1/3. + assert report.market_association_jaccard_mean == pytest.approx(1.0 / 3.0) + + +@pytest.mark.unit +def test_empty_pair_returns_zero_metrics() -> None: + report = compute_agreement( + [], + [], + window_start=datetime(2026, 3, 1, tzinfo=UTC), + window_end=datetime(2026, 3, 31, tzinfo=UTC), + ) + assert report.candidate_count == 0 + assert not report.meets_targets diff --git a/tests/labels/test_cli.py b/tests/labels/test_cli.py new file mode 100644 index 0000000..5d3a327 --- /dev/null +++ b/tests/labels/test_cli.py @@ -0,0 +1,175 @@ +"""Tests for the augur-label CLI.""" + +from __future__ import annotations + +import json +from datetime import UTC, datetime +from pathlib import Path + +import pytest +from click.testing import CliRunner + +from augur_labels.annotator.cli import cli +from augur_labels.models import EventCandidate, SourcePublication + + +def _publication(pub_id: str, source: str = "reuters") -> SourcePublication: + return SourcePublication( + publication_id=pub_id, + source_id=source, # type: ignore[arg-type] + timestamp=datetime(2026, 3, 15, 12, 0, tzinfo=UTC), + headline="Fed holds rates", + url="https://example.com/story", # type: ignore[arg-type] + ) + + +def _seed_queue(queue_path: Path) -> None: + candidate = EventCandidate( + candidate_id="c1", + discovered_at=datetime(2026, 3, 15, 12, 5, tzinfo=UTC), + publications=[_publication("p1"), _publication("p2", "bloomberg")], + suggested_market_ids=["kalshi_fed"], + ) + queue_path.parent.mkdir(parents=True, exist_ok=True) + queue_path.write_text( + json.dumps( + {"candidates": [candidate.model_dump(mode="json")], "decisions": []}, + default=str, + indent=2, + ), + encoding="utf-8", + ) + + +@pytest.fixture +def tmp_paths(tmp_path: Path) -> tuple[Path, Path]: + queue_path = tmp_path / "queue.json" + labels_root = tmp_path / "labels" + _seed_queue(queue_path) + return queue_path, labels_root + + +def _common_args(queue_path: Path, labels_root: Path) -> list[str]: + return ["--queue-file", str(queue_path)] + + +@pytest.mark.unit +def test_candidates_lists_seeded_candidate(tmp_paths: tuple[Path, Path]) -> None: + queue_path, labels_root = tmp_paths + runner = CliRunner() + result = runner.invoke(cli, [*_common_args(queue_path, labels_root), "candidates"]) + assert result.exit_code == 0 + assert "c1" in result.output + + +@pytest.mark.unit +def test_inspect_shows_publications(tmp_paths: tuple[Path, Path]) -> None: + queue_path, labels_root = tmp_paths + runner = CliRunner() + result = runner.invoke(cli, [*_common_args(queue_path, labels_root), "inspect", "c1"]) + assert result.exit_code == 0 + assert "Fed holds rates" in result.output + + +@pytest.mark.unit +def test_inspect_unknown_candidate_exits_nonzero(tmp_paths: tuple[Path, Path]) -> None: + queue_path, labels_root = tmp_paths + runner = CliRunner() + result = runner.invoke(cli, [*_common_args(queue_path, labels_root), "inspect", "missing"]) + assert result.exit_code != 0 + + +@pytest.mark.unit +def test_decide_persists_decision_to_queue_file(tmp_paths: tuple[Path, Path]) -> None: + queue_path, labels_root = tmp_paths + runner = CliRunner() + result = runner.invoke( + cli, + [ + *_common_args(queue_path, labels_root), + "decide", + "c1", + "--annotator", + "ann1", + "--timestamp", + "2026-03-15T12:00:00+00:00", + "--market-ids", + "kalshi_fed", + "--category", + "monetary_policy", + ], + ) + assert result.exit_code == 0, result.output + data = json.loads(queue_path.read_text(encoding="utf-8")) + assert len(data["decisions"]) == 1 + assert data["decisions"][0]["annotator_id"] == "ann1" + + +@pytest.mark.unit +def test_promote_refuses_with_single_annotator(tmp_paths: tuple[Path, Path]) -> None: + queue_path, labels_root = tmp_paths + runner = CliRunner() + runner.invoke( + cli, + [ + *_common_args(queue_path, labels_root), + "decide", + "c1", + "--annotator", + "ann1", + "--timestamp", + "2026-03-15T12:00:00+00:00", + "--market-ids", + "kalshi_fed", + ], + ) + result = runner.invoke( + cli, [*_common_args(queue_path, labels_root), "promote", "c1"] + ) + assert result.exit_code != 0 + assert "two distinct" in result.output + + +@pytest.mark.unit +def test_promote_writes_event_on_agreement(tmp_paths: tuple[Path, Path]) -> None: + queue_path, labels_root = tmp_paths + runner = CliRunner() + for annotator, offset in [("ann1", 0), ("ann2", 30)]: + runner.invoke( + cli, + [ + *_common_args(queue_path, labels_root), + "decide", + "c1", + "--annotator", + annotator, + "--timestamp", + f"2026-03-15T12:00:{offset:02d}+00:00", + "--market-ids", + "kalshi_fed", + "--category", + "monetary_policy", + ], + ) + # Create a labeling config pointing at labels_root. + config_path = labels_root.parent / "labeling.toml" + config_path.write_text( + f'[storage]\nlabels_root = "{labels_root}"\nfile_lock_timeout_seconds = 30\n', + encoding="utf-8", + ) + result = runner.invoke( + cli, + [ + "--queue-file", + str(queue_path), + "--config", + str(config_path), + "promote", + "c1", + ], + ) + assert result.exit_code == 0, result.output + assert "promoted c1" in result.output + # Partition file exists. + partitions = list((labels_root).glob("date=*/events.parquet")) + assert len(partitions) == 1 diff --git a/tests/labels/test_join.py b/tests/labels/test_join.py new file mode 100644 index 0000000..55f0ec7 --- /dev/null +++ b/tests/labels/test_join.py @@ -0,0 +1,169 @@ +"""Tests for the signal-to-event join.""" + +from __future__ import annotations + +from datetime import UTC, datetime, timedelta + +import pytest + +from augur_labels.join.signal_to_event import join_signals_to_events +from augur_labels.models import NewsworthyEvent +from augur_signals.models import MarketSignal, SignalType, new_signal_id + + +def _signal( + market_id: str = "kalshi_fed", + detected_at: datetime | None = None, +) -> MarketSignal: + return MarketSignal( + signal_id=new_signal_id(), + market_id=market_id, + platform="kalshi", + signal_type=SignalType.PRICE_VELOCITY, + magnitude=0.8, + direction=1, + confidence=0.8, + fdr_adjusted=False, + detected_at=detected_at or datetime(2026, 3, 15, 12, 0, tzinfo=UTC), + window_seconds=300, + liquidity_tier="high", + raw_features={"calibration_provenance": "d@identity_v0"}, + ) + + +def _event( + event_id: str, + market_ids: list[str] | None = None, + ground_truth_offset_hours: float = 1.0, + status: str = "labeled", +) -> NewsworthyEvent: + return NewsworthyEvent( + event_id=event_id, + ground_truth_timestamp=datetime(2026, 3, 15, 12, 0, tzinfo=UTC) + + timedelta(hours=ground_truth_offset_hours), + market_ids=market_ids or ["kalshi_fed"], + category="monetary_policy", + headline=f"Event {event_id}", + source_urls=["https://a", "https://b"], + source_publishers=["reuters", "bloomberg"], + labeler_ids=["ann1", "ann2"], + label_protocol_version="1.0", + status=status, # type: ignore[arg-type] + created_at=datetime(2026, 3, 15, 13, 0, tzinfo=UTC), + ) + + +@pytest.mark.unit +def test_true_positive_on_event_within_lead_window() -> None: + labels = join_signals_to_events( + [_signal()], + [_event("e1", ground_truth_offset_hours=2.0)], + now=datetime(2026, 3, 16, tzinfo=UTC), + ) + assert len(labels) == 1 + assert labels[0].label == "true_positive" + assert labels[0].event_id == "e1" + assert labels[0].lead_time_seconds == 2 * 3600 + + +@pytest.mark.unit +def test_false_positive_when_no_matching_event() -> None: + labels = join_signals_to_events( + [_signal()], + [], + now=datetime(2026, 3, 16, tzinfo=UTC), + ) + assert labels[0].label == "false_positive" + assert labels[0].event_id is None + assert labels[0].lead_time_seconds is None + + +@pytest.mark.unit +def test_false_positive_when_event_outside_lead_window() -> None: + # Event 48 hours after signal: outside 24h lead window. + labels = join_signals_to_events( + [_signal()], + [_event("e1", ground_truth_offset_hours=48.0)], + now=datetime(2026, 3, 16, tzinfo=UTC), + ) + assert labels[0].label == "false_positive" + + +@pytest.mark.unit +def test_false_positive_when_event_before_signal() -> None: + # Signal at 12:00; event at 10:00 (negative lead time). + labels = join_signals_to_events( + [_signal()], + [_event("e1", ground_truth_offset_hours=-2.0)], + now=datetime(2026, 3, 16, tzinfo=UTC), + ) + assert labels[0].label == "false_positive" + + +@pytest.mark.unit +def test_match_earliest_event_when_multiple_in_window() -> None: + labels = join_signals_to_events( + [_signal()], + [ + _event("e2", ground_truth_offset_hours=6.0), + _event("e1", ground_truth_offset_hours=1.0), + ], + now=datetime(2026, 3, 16, tzinfo=UTC), + ) + assert labels[0].event_id == "e1" + assert labels[0].lead_time_seconds == 3600 + + +@pytest.mark.unit +def test_ignores_candidate_and_superseded_events() -> None: + labels = join_signals_to_events( + [_signal()], + [ + _event("e1", ground_truth_offset_hours=1.0, status="candidate"), + _event("e2", ground_truth_offset_hours=2.0, status="superseded"), + ], + now=datetime(2026, 3, 16, tzinfo=UTC), + ) + assert labels[0].label == "false_positive" + + +@pytest.mark.unit +def test_market_id_mismatch_produces_false_positive() -> None: + labels = join_signals_to_events( + [_signal(market_id="kalshi_fed")], + [_event("e1", market_ids=["polymarket_other"], ground_truth_offset_hours=2.0)], + now=datetime(2026, 3, 16, tzinfo=UTC), + ) + assert labels[0].label == "false_positive" + + +@pytest.mark.unit +def test_empty_signal_list_returns_empty() -> None: + assert ( + join_signals_to_events( + [], [_event("e1")], now=datetime(2026, 3, 16, tzinfo=UTC) + ) + == [] + ) + + +@pytest.mark.unit +def test_lead_time_boundary_at_zero_is_false_positive() -> None: + # Signal and event at same instant: lead_time = 0, outside (0, 24h]. + signal_time = datetime(2026, 3, 15, 12, 0, tzinfo=UTC) + labels = join_signals_to_events( + [_signal(detected_at=signal_time)], + [_event("e1", ground_truth_offset_hours=0.0)], + now=datetime(2026, 3, 16, tzinfo=UTC), + ) + assert labels[0].label == "false_positive" + + +@pytest.mark.unit +def test_lead_time_boundary_at_24h_is_true_positive() -> None: + labels = join_signals_to_events( + [_signal()], + [_event("e1", ground_truth_offset_hours=24.0)], + now=datetime(2026, 3, 16, tzinfo=UTC), + ) + assert labels[0].label == "true_positive" diff --git a/tests/labels/test_models.py b/tests/labels/test_models.py new file mode 100644 index 0000000..08c68f4 --- /dev/null +++ b/tests/labels/test_models.py @@ -0,0 +1,129 @@ +"""Tests for the labeling pipeline data contracts.""" + +from __future__ import annotations + +from datetime import UTC, datetime + +import pytest +from pydantic import ValidationError + +from augur_labels.models import ( + AgreementReport, + AnnotatorIdentity, + EventCandidate, + LabelDecision, + NewsworthyEvent, + QualifyingSource, + SourcePublication, +) + + +def _publication(pub_id: str = "p1", source_id: str = "reuters") -> SourcePublication: + return SourcePublication( + publication_id=pub_id, + source_id=source_id, # type: ignore[arg-type] + timestamp=datetime(2026, 3, 15, 12, 0, tzinfo=UTC), + headline="Fed holds rates", + url="https://example.com/story", # type: ignore[arg-type] + body_excerpt="The Federal Reserve left rates unchanged.", + keywords=["fed", "rates"], + ) + + +@pytest.mark.unit +def test_qualifying_source_rejects_unknown_source() -> None: + with pytest.raises(ValidationError): + QualifyingSource(source_id="nyt", name="New York Times") # type: ignore[arg-type] + + +@pytest.mark.unit +def test_source_publication_preserves_keywords_and_excerpt() -> None: + pub = _publication() + assert pub.keywords == ["fed", "rates"] + assert pub.body_excerpt is not None + + +@pytest.mark.unit +def test_event_candidate_holds_multiple_publications() -> None: + cand = EventCandidate( + candidate_id="c1", + discovered_at=datetime(2026, 3, 15, 12, 5, tzinfo=UTC), + publications=[_publication("p1", "reuters"), _publication("p2", "bloomberg")], + suggested_market_ids=["kalshi_fed"], + ) + assert len(cand.publications) == 2 + assert {p.source_id for p in cand.publications} == {"reuters", "bloomberg"} + + +@pytest.mark.unit +def test_newsworthy_event_accepts_protocol_fields() -> None: + event = NewsworthyEvent( + event_id="e1", + ground_truth_timestamp=datetime(2026, 3, 15, 12, 0, tzinfo=UTC), + market_ids=["kalshi_fed"], + category="monetary_policy", + headline="Fed holds rates", + source_urls=["https://a", "https://b"], + source_publishers=["reuters", "bloomberg"], + labeler_ids=["ann1", "ann2"], + label_protocol_version="1.0", + status="labeled", + created_at=datetime(2026, 3, 15, 13, 0, tzinfo=UTC), + ) + assert event.status == "labeled" + assert event.corrects is None + + +@pytest.mark.unit +def test_newsworthy_event_rejects_unknown_status() -> None: + with pytest.raises(ValidationError): + NewsworthyEvent( + event_id="e1", + ground_truth_timestamp=datetime(2026, 3, 15, tzinfo=UTC), + market_ids=["m"], + category="monetary_policy", + headline="h", + source_urls=["https://a"], + source_publishers=["reuters"], + labeler_ids=["a"], + label_protocol_version="1.0", + status="draft", # type: ignore[arg-type] + created_at=datetime(2026, 3, 15, tzinfo=UTC), + ) + + +@pytest.mark.unit +def test_label_decision_qualifies_without_timestamp_by_default() -> None: + decision = LabelDecision( + decision_id="d1", + candidate_id="c1", + annotator_id="ann1", + decided_at=datetime(2026, 3, 15, 13, 0, tzinfo=UTC), + qualifies=False, + ) + assert decision.timestamp is None + assert decision.market_ids == [] + + +@pytest.mark.unit +def test_annotator_identity_accepts_optional_display_name() -> None: + id1 = AnnotatorIdentity(annotator_id="ann1") + id2 = AnnotatorIdentity(annotator_id="ann1", display_name="Annotator 1") + assert id1.display_name is None + assert id2.display_name == "Annotator 1" + + +@pytest.mark.unit +def test_agreement_report_structure() -> None: + report = AgreementReport( + annotator_pair=("ann1", "ann2"), + window_start=datetime(2026, 3, 1, tzinfo=UTC), + window_end=datetime(2026, 3, 31, tzinfo=UTC), + candidate_count=10, + event_existence_kappa=0.97, + timestamp_agreement_60s=0.95, + market_association_jaccard_mean=0.88, + category_assignment_kappa=0.92, + meets_targets=True, + ) + assert report.meets_targets is True diff --git a/tests/labels/test_parquet_writer.py b/tests/labels/test_parquet_writer.py new file mode 100644 index 0000000..fd32776 --- /dev/null +++ b/tests/labels/test_parquet_writer.py @@ -0,0 +1,123 @@ +"""Tests for the append-only Parquet writer and reader.""" + +from __future__ import annotations + +from datetime import UTC, datetime +from pathlib import Path + +import pytest + +from augur_labels.models import NewsworthyEvent +from augur_labels.storage.parquet_writer import AppendOnlyParquetWriter +from augur_labels.storage.reader import LabelReader + + +def _event( + event_id: str, + offset_days: int = 0, + market_ids: list[str] | None = None, + status: str = "labeled", + corrects: str | None = None, +) -> NewsworthyEvent: + return NewsworthyEvent( + event_id=event_id, + ground_truth_timestamp=datetime(2026, 3, 15 + offset_days, 12, 0, tzinfo=UTC), + market_ids=market_ids or ["kalshi_fed"], + category="monetary_policy", + headline=f"Event {event_id}", + source_urls=["https://a", "https://b"], + source_publishers=["reuters", "bloomberg"], + labeler_ids=["ann1", "ann2"], + label_protocol_version="1.0", + corrects=corrects, + status=status, # type: ignore[arg-type] + created_at=datetime(2026, 3, 16, tzinfo=UTC), + ) + + +@pytest.mark.unit +def test_writer_appends_single_event(tmp_path: Path) -> None: + writer = AppendOnlyParquetWriter(tmp_path) + writer.append([_event("e1")]) + partition = tmp_path / "date=2026-03-15" / "events.parquet" + assert partition.exists() + + +@pytest.mark.unit +def test_writer_appends_across_partitions(tmp_path: Path) -> None: + writer = AppendOnlyParquetWriter(tmp_path) + writer.append([_event("e1", offset_days=0), _event("e2", offset_days=1)]) + assert (tmp_path / "date=2026-03-15" / "events.parquet").exists() + assert (tmp_path / "date=2026-03-16" / "events.parquet").exists() + + +@pytest.mark.unit +def test_writer_appends_are_idempotent_across_calls(tmp_path: Path) -> None: + writer = AppendOnlyParquetWriter(tmp_path) + writer.append([_event("e1")]) + writer.append([_event("e2")]) + reader = LabelReader(tmp_path) + events = reader.events_in_window( + datetime(2026, 3, 1, tzinfo=UTC), + datetime(2026, 3, 31, tzinfo=UTC), + ) + assert {e.event_id for e in events} == {"e1", "e2"} + + +@pytest.mark.unit +def test_writer_supersede_updates_status(tmp_path: Path) -> None: + writer = AppendOnlyParquetWriter(tmp_path) + writer.append([_event("e1")]) + writer.append([_event("e2")]) + writer.supersede("e1", replacement_id="e2") + reader = LabelReader(tmp_path) + superseded = reader.events_in_window( + datetime(2026, 3, 1, tzinfo=UTC), + datetime(2026, 3, 31, tzinfo=UTC), + status="superseded", + ) + assert len(superseded) == 1 + assert superseded[0].event_id == "e1" + assert superseded[0].corrects == "e2" + + +@pytest.mark.unit +def test_writer_supersede_missing_raises(tmp_path: Path) -> None: + writer = AppendOnlyParquetWriter(tmp_path) + with pytest.raises(KeyError, match="missing"): + writer.supersede("missing", replacement_id="e2") + + +@pytest.mark.unit +def test_reader_events_for_market_filters(tmp_path: Path) -> None: + writer = AppendOnlyParquetWriter(tmp_path) + writer.append( + [ + _event("e1", market_ids=["kalshi_fed"]), + _event("e2", market_ids=["kalshi_other"]), + ] + ) + reader = LabelReader(tmp_path) + fed_events = reader.events_for_market( + "kalshi_fed", since=datetime(2026, 3, 1, tzinfo=UTC) + ) + assert [e.event_id for e in fed_events] == ["e1"] + + +@pytest.mark.unit +def test_reader_coverage_by_category(tmp_path: Path) -> None: + writer = AppendOnlyParquetWriter(tmp_path) + writer.append([_event("e1"), _event("e2", offset_days=1)]) + reader = LabelReader(tmp_path) + coverage = reader.coverage_by_category(since=datetime(2026, 3, 1, tzinfo=UTC)) + assert coverage == {"monetary_policy": 2} + + +@pytest.mark.unit +def test_reader_returns_empty_on_no_root(tmp_path: Path) -> None: + reader = LabelReader(tmp_path / "does-not-exist") + events = reader.events_in_window( + datetime(2026, 3, 1, tzinfo=UTC), + datetime(2026, 3, 31, tzinfo=UTC), + ) + assert events == [] diff --git a/tests/labels/test_sources.py b/tests/labels/test_sources.py new file mode 100644 index 0000000..07da9e9 --- /dev/null +++ b/tests/labels/test_sources.py @@ -0,0 +1,91 @@ +"""Tests for source adapter construction, auth requirements, and HTTP retry.""" + +from __future__ import annotations + +import pytest + +from augur_labels.sources._http import ( + HttpBackoff, + HttpRetryExhaustedError, + request_with_backoff, +) +from augur_labels.sources.ap import ApAdapter +from augur_labels.sources.bloomberg import BloombergAdapter +from augur_labels.sources.ft import FtAdapter +from augur_labels.sources.reuters import ReutersAdapter + + +@pytest.mark.unit +async def test_request_with_backoff_returns_on_success() -> None: + calls = 0 + + async def factory() -> str: + nonlocal calls + calls += 1 + return "ok" + + async def fake_sleep(_: float) -> None: + return None + + result = await request_with_backoff( + factory, HttpBackoff(max_retries=3), sleep=fake_sleep + ) + assert result == "ok" + assert calls == 1 + + +@pytest.mark.unit +async def test_request_with_backoff_raises_on_exhaustion() -> None: + async def factory() -> str: + raise ConnectionError("always") + + async def fake_sleep(_: float) -> None: + return None + + with pytest.raises(HttpRetryExhaustedError) as excinfo: + await request_with_backoff( + factory, HttpBackoff(initial_seconds=0.0, max_retries=3), sleep=fake_sleep + ) + assert excinfo.value.attempts == 3 + + +@pytest.mark.unit +def test_reuters_adapter_requires_api_key(monkeypatch: pytest.MonkeyPatch) -> None: + import httpx + + monkeypatch.delenv("REUTERS_API_KEY", raising=False) + with pytest.raises(RuntimeError, match="REUTERS_API_KEY"): + ReutersAdapter(httpx.AsyncClient()) + + +@pytest.mark.unit +def test_bloomberg_adapter_requires_credentials(monkeypatch: pytest.MonkeyPatch) -> None: + import httpx + + monkeypatch.delenv("BLOOMBERG_CLIENT_ID", raising=False) + monkeypatch.delenv("BLOOMBERG_CLIENT_SECRET", raising=False) + with pytest.raises(RuntimeError, match="BLOOMBERG"): + BloombergAdapter(httpx.AsyncClient()) + + +@pytest.mark.unit +def test_ap_adapter_requires_api_key(monkeypatch: pytest.MonkeyPatch) -> None: + import httpx + + monkeypatch.delenv("AP_API_KEY", raising=False) + with pytest.raises(RuntimeError, match="AP_API_KEY"): + ApAdapter(httpx.AsyncClient()) + + +@pytest.mark.unit +async def test_ft_adapter_returns_empty_without_api_key( + monkeypatch: pytest.MonkeyPatch, +) -> None: + import httpx + from datetime import UTC, datetime + + monkeypatch.delenv("FT_API_KEY", raising=False) + adapter = FtAdapter(httpx.AsyncClient()) + pubs = await adapter.fetch_recent(datetime(2026, 3, 1, tzinfo=UTC)) + assert pubs == [] + assert await adapter.health_check() is False diff --git a/tests/labels/test_workflow.py b/tests/labels/test_workflow.py new file mode 100644 index 0000000..7939068 --- /dev/null +++ b/tests/labels/test_workflow.py @@ -0,0 +1,151 @@ +"""Tests for the two-annotator workflow enforcer.""" + +from __future__ import annotations + +from datetime import UTC, datetime, timedelta + +import pytest + +from augur_labels._config import WorkflowConfig +from augur_labels.annotator.candidate_queue import CandidateQueue +from augur_labels.annotator.workflow import WorkflowEnforcer +from augur_labels.models import EventCandidate, LabelDecision, SourcePublication + + +def _publication(pub_id: str = "p1") -> SourcePublication: + return SourcePublication( + publication_id=pub_id, + source_id="reuters", + timestamp=datetime(2026, 3, 15, 12, 0, tzinfo=UTC), + headline="h", + url="https://example.com/story", # type: ignore[arg-type] + ) + + +def _candidate(candidate_id: str = "c1") -> EventCandidate: + return EventCandidate( + candidate_id=candidate_id, + discovered_at=datetime(2026, 3, 15, 12, 0, tzinfo=UTC), + publications=[_publication("p1"), _publication("p2")], + suggested_market_ids=["kalshi_fed"], + ) + + +def _decision( + annotator_id: str, + candidate_id: str = "c1", + *, + qualifies: bool = True, + offset_seconds: int = 0, + market_ids: list[str] | None = None, + category: str | None = "monetary_policy", +) -> LabelDecision: + base = datetime(2026, 3, 15, 12, 0, tzinfo=UTC) + return LabelDecision( + decision_id=f"{annotator_id}-{candidate_id}", + candidate_id=candidate_id, + annotator_id=annotator_id, + decided_at=base, + qualifies=qualifies, + timestamp=(base + timedelta(seconds=offset_seconds)) if qualifies else None, + market_ids=market_ids or (["kalshi_fed"] if qualifies else []), + category=category if qualifies else None, + ) + + +@pytest.fixture +def enforcer() -> tuple[WorkflowEnforcer, CandidateQueue]: + queue = CandidateQueue() + queue.enqueue([_candidate()]) + return WorkflowEnforcer(WorkflowConfig(), queue), queue + + +@pytest.mark.unit +def test_cannot_promote_without_any_decisions( + enforcer: tuple[WorkflowEnforcer, CandidateQueue], +) -> None: + enf, _ = enforcer + decision = enf.can_promote("c1") + assert not decision.allowed + assert "two distinct" in decision.reason + + +@pytest.mark.unit +def test_cannot_promote_with_one_annotator( + enforcer: tuple[WorkflowEnforcer, CandidateQueue], +) -> None: + enf, queue = enforcer + queue.record(_decision("ann1")) + decision = enf.can_promote("c1") + assert not decision.allowed + assert "two distinct" in decision.reason + + +@pytest.mark.unit +def test_cannot_promote_on_existence_disagreement( + enforcer: tuple[WorkflowEnforcer, CandidateQueue], +) -> None: + enf, queue = enforcer + queue.record(_decision("ann1", qualifies=True)) + queue.record(_decision("ann2", qualifies=False)) + decision = enf.can_promote("c1") + assert not decision.allowed + assert "disagree" in decision.reason + + +@pytest.mark.unit +def test_promotion_allowed_when_timestamps_close_and_markets_match( + enforcer: tuple[WorkflowEnforcer, CandidateQueue], +) -> None: + enf, queue = enforcer + queue.record(_decision("ann1", offset_seconds=0)) + queue.record(_decision("ann2", offset_seconds=30)) + decision = enf.can_promote("c1") + assert decision.allowed + + +@pytest.mark.unit +def test_promotion_blocked_on_timestamp_hard_fail( + enforcer: tuple[WorkflowEnforcer, CandidateQueue], +) -> None: + enf, queue = enforcer + queue.record(_decision("ann1", offset_seconds=0)) + queue.record(_decision("ann2", offset_seconds=600)) # 10 min > 5 min hard fail + decision = enf.can_promote("c1") + assert not decision.allowed + assert "hard fail" in decision.reason + + +@pytest.mark.unit +def test_promotion_blocked_on_zero_market_jaccard( + enforcer: tuple[WorkflowEnforcer, CandidateQueue], +) -> None: + enf, queue = enforcer + queue.record(_decision("ann1", market_ids=["a"])) + queue.record(_decision("ann2", market_ids=["b"])) + decision = enf.can_promote("c1") + assert not decision.allowed + assert "Jaccard" in decision.reason + + +@pytest.mark.unit +def test_promotion_warnings_fire_below_target( + enforcer: tuple[WorkflowEnforcer, CandidateQueue], +) -> None: + enf, queue = enforcer + queue.record(_decision("ann1", offset_seconds=0, market_ids=["a", "b"])) + queue.record(_decision("ann2", offset_seconds=90, market_ids=["a", "c"])) + warnings = enf.promotion_warnings("c1") + # Timestamp span 90s > 60s warning window; Jaccard = 1/3 < 0.85 target. + assert any("timestamp span" in w for w in warnings) + assert any("Jaccard" in w for w in warnings) + + +@pytest.mark.unit +def test_candidate_queue_rejects_double_decisions_from_same_annotator( + enforcer: tuple[WorkflowEnforcer, CandidateQueue], +) -> None: + _, queue = enforcer + queue.record(_decision("ann1")) + with pytest.raises(ValueError, match="already decided"): + queue.record(_decision("ann1")) diff --git a/uv.lock b/uv.lock index c263f71..80de3e3 100644 --- a/uv.lock +++ b/uv.lock @@ -237,6 +237,9 @@ name = "augur-labels" version = "0.0.0" source = { editable = "src/augur_labels" } dependencies = [ + { name = "augur-signals" }, + { name = "click" }, + { name = "filelock" }, { name = "httpx" }, { name = "pyarrow" }, { name = "pydantic" }, @@ -244,6 +247,9 @@ dependencies = [ [package.metadata] requires-dist = [ + { name = "augur-signals", editable = "src/augur_signals" }, + { name = "click", specifier = ">=8.1" }, + { name = "filelock", specifier = ">=3.15" }, { name = "httpx", specifier = ">=0.27" }, { name = "pyarrow", specifier = ">=17.0" }, { name = "pydantic", specifier = ">=2.7" }, @@ -292,6 +298,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/db/3c/33bac158f8ab7f89b2e59426d5fe2e4f63f7ed25df84c036890172b412b5/cfgv-3.5.0-py2.py3-none-any.whl", hash = "sha256:a8dc6b26ad22ff227d2634a65cb388215ce6cc96bbcc5cfde7641ae87e8dacc0", size = 7445 }, ] +[[package]] +name = "click" +version = "8.3.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "colorama", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/57/75/31212c6bf2503fdf920d87fee5d7a86a2e3bcf444984126f13d8e4016804/click-8.3.2.tar.gz", hash = "sha256:14162b8b3b3550a7d479eafa77dfd3c38d9dc8951f6f69c78913a8f9a7540fd5", size = 302856 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e4/20/71885d8b97d4f3dde17b1fdb92dbd4908b00541c5a3379787137285f602e/click-8.3.2-py3-none-any.whl", hash = "sha256:1924d2c27c5653561cd2cae4548d1406039cb79b858b747cfea24924bbc1616d", size = 108379 }, +] + [[package]] name = "colorama" version = "0.4.6"