Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,21 @@ All notable changes to Augur are recorded in this file. Format follows [Keep a C

## [Unreleased]

### Added — Labeling Pipeline

- `src/augur_labels/` package with Pydantic data contracts for `NewsworthyEvent`, `EventCandidate`, `SourcePublication`, `QualifyingSource`, `LabelDecision`, `AnnotatorIdentity`, and `AgreementReport`. The closed `source_id` literal set (reuters, bloomberg, ap, ft) is load-bearing across adapters, storage, and workflow.
- Four source adapters (`ReutersAdapter`, `BloombergAdapter`, `ApAdapter`, `FtAdapter`) implementing `AbstractSourceAdapter` against their respective REST APIs with shared exponential-backoff retry. Credentials are read from the env vars documented in `docs/methodology/labeling-protocol.md`; missing credentials fail loud at construction except for the FT adapter, which gracefully degrades to empty output on missing API key.
- Append-only Parquet writer with per-date partitioning and `filelock`-based concurrent-write safety. `supersede()` implements the protocol's correction path in-place under the partition lock. `LabelReader` exposes `events_in_window`, `events_for_market`, and `coverage_by_category` with partition pruning.
- Inter-annotator agreement metrics via Cohen's kappa on event existence and category assignment, 60-second timestamp agreement, and mean market-association Jaccard. `compute_agreement` pairs decisions by `candidate_id` and evaluates the four targets from `docs/methodology/labeling-protocol.md §Inter-Annotator Agreement`.
- `WorkflowEnforcer.can_promote` and `promotion_warnings` enforce the two-annotator promotion gate: two distinct annotators, existence agreement, timestamp within 5-minute hard fail, and strictly positive market Jaccard.
- `join_signals_to_events` implements the canonical TP/FP criteria: market_id match plus lead time in (0, 24h]. Multiple events on the same market match the earliest qualifying one; non-labeled statuses (candidate, superseded, rejected) are excluded.
- Click-driven `augur-label` CLI with `candidates`, `inspect`, `decide`, `promote`, `correct`, and `coverage` commands. The CLI persists queue state to `labels/queue.json` and writes promoted events to the parquet corpus.
- `config/labeling.toml` mirrors `docs/methodology/labeling-protocol.md` defaults (rate limits, agreement targets, storage paths, join windows).

### Operational Handoff — Labeling

After merge a labeler can run `augur-label candidates`, `augur-label decide`, and `augur-label promote` against real candidates. The nightly calibration job (Phase 1's `scripts/calibrate.py`) consumes `join_signals_to_events` output to rebuild reliability curves. The first 90 days of operation require double labeling per `docs/methodology/labeling-protocol.md §Inter-Annotator Agreement`; CI reports agreement metrics during that window.

### Added

- Pydantic data contracts: `MarketSnapshot`, `FeatureVector`, `MarketSignal`, `SignalContext`, `RelatedMarketState`, and the closed enums `SignalType`, `ManipulationFlag`, `ConsumerType`, `InterpretationMode`. `MarketSignal` enforces `calibration_provenance` via a model validator; every model is frozen and rejects unknown fields. JSON schemas exported to `schemas/*.json` and kept in sync by `scripts/export_schemas.py`.
Expand Down
42 changes: 42 additions & 0 deletions config/labeling.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Labeling pipeline configuration. Schema mirrors
# docs/methodology/labeling-protocol.md §Source Hierarchy and
# phase-2 §11 verbatim. Annotator processes consume this file at
# startup via augur_labels._config.LabelingConfig.

[sources.reuters]
enabled = true
rate_limit_per_hour = 1000
api_key_env = "REUTERS_API_KEY"

[sources.bloomberg]
enabled = true
rate_limit_per_hour = 500
client_id_env = "BLOOMBERG_CLIENT_ID"
client_secret_env = "BLOOMBERG_CLIENT_SECRET"

[sources.ap]
enabled = true
rate_limit_per_hour = 500
api_key_env = "AP_API_KEY"

[sources.ft]
enabled = true
rate_limit_per_hour = 200
api_key_env = "FT_API_KEY"

[workflow]
double_label_window_days = 90
timestamp_agreement_window_seconds = 60
timestamp_hard_fail_seconds = 300
market_jaccard_target = 0.85
market_jaccard_hard_fail = 0.0
category_kappa_target = 0.90
event_existence_kappa_target = 0.95

[storage]
labels_root = "labels/newsworthy_events"
file_lock_timeout_seconds = 30

[join]
lead_window_hours = 24
true_negative_window_hours = 24
18 changes: 18 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,24 @@ explicit_package_bases = true
module = ["uuid_extensions.*"]
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = ["pyarrow.*"]
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = ["filelock.*"]
ignore_missing_imports = true

# pyarrow has no type stubs; relax the no-any-unimported gate for the
# storage modules that pass pyarrow types across their public surface.
[[tool.mypy.overrides]]
module = [
"augur_labels.storage._schema",
"augur_labels.storage.parquet_writer",
"augur_labels.storage.reader",
]
disallow_any_unimported = false

[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"
Expand Down
14 changes: 8 additions & 6 deletions scripts/label.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
"""Annotator CLI entrypoint.

Launches the two-annotator labeling workflow over the newsworthy-event
feed per docs/methodology/labeling-protocol.md and persists labels to
``labels/newsworthy_events.parquet``.

Stub until the labeling workstream lands.
Launches the augur-label click CLI over the newsworthy-event candidate
queue and the append-only parquet corpus. Available commands are
implemented in augur_labels.annotator.cli; run ``python scripts/label.py
--help`` to discover them.
"""

from __future__ import annotations

import sys

from augur_labels.annotator.cli import cli


def main() -> int:
raise NotImplementedError("annotator CLI not yet implemented")
cli(standalone_mode=True)
return 0


if __name__ == "__main__":
Expand Down
91 changes: 91 additions & 0 deletions src/augur_labels/augur_labels/_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
"""Labeling-pipeline configuration models.

Schema mirrors the blocks in config/labeling.toml and matches the
defaults documented in phase-2 §11. Every field is validated at
startup via augur_signals._config.load_config; a missing required
value fails loudly rather than coercing.
"""

from __future__ import annotations

from pydantic import BaseModel, ConfigDict, Field


class ReutersSourceConfig(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid")

enabled: bool = True
rate_limit_per_hour: int = Field(default=1000, gt=0)
api_key_env: str = "REUTERS_API_KEY"


class BloombergSourceConfig(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid")

enabled: bool = True
rate_limit_per_hour: int = Field(default=500, gt=0)
client_id_env: str = "BLOOMBERG_CLIENT_ID"
# Name of the env var that holds the secret, not the secret itself.
client_secret_env: str = "BLOOMBERG_CLIENT_SECRET" # noqa: S105


class ApSourceConfig(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid")

enabled: bool = True
rate_limit_per_hour: int = Field(default=500, gt=0)
api_key_env: str = "AP_API_KEY"


class FtSourceConfig(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid")

enabled: bool = True
rate_limit_per_hour: int = Field(default=200, gt=0)
api_key_env: str = "FT_API_KEY"


class SourcesConfig(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid")

reuters: ReutersSourceConfig = Field(default_factory=ReutersSourceConfig)
bloomberg: BloombergSourceConfig = Field(default_factory=BloombergSourceConfig)
ap: ApSourceConfig = Field(default_factory=ApSourceConfig)
ft: FtSourceConfig = Field(default_factory=FtSourceConfig)


class WorkflowConfig(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid")

double_label_window_days: int = Field(default=90, gt=0)
timestamp_agreement_window_seconds: int = Field(default=60, gt=0)
timestamp_hard_fail_seconds: int = Field(default=300, gt=0)
market_jaccard_target: float = Field(default=0.85, ge=0.0, le=1.0)
market_jaccard_hard_fail: float = Field(default=0.0, ge=0.0, le=1.0)
category_kappa_target: float = Field(default=0.90, ge=-1.0, le=1.0)
event_existence_kappa_target: float = Field(default=0.95, ge=-1.0, le=1.0)


class StorageConfig(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid")

labels_root: str = "labels/newsworthy_events"
file_lock_timeout_seconds: int = Field(default=30, gt=0)


class JoinConfig(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid")

lead_window_hours: int = Field(default=24, gt=0)
true_negative_window_hours: int = Field(default=24, gt=0)


class LabelingConfig(BaseModel):
"""Top-level labeling configuration loaded from config/labeling.toml."""

model_config = ConfigDict(frozen=True, extra="forbid")

sources: SourcesConfig = Field(default_factory=SourcesConfig)
workflow: WorkflowConfig = Field(default_factory=WorkflowConfig)
storage: StorageConfig = Field(default_factory=StorageConfig)
join: JoinConfig = Field(default_factory=JoinConfig)
14 changes: 14 additions & 0 deletions src/augur_labels/augur_labels/_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""Labeling-protocol constants shared across modules.

The protocol version is the single source of truth for
``label_protocol_version`` on every produced NewsworthyEvent and
SignalLabel. Bumping this constant triggers recomputation of any
calibration metric derived from the affected labels per
docs/methodology/labeling-protocol.md §Versioning.
"""

from __future__ import annotations

LABEL_PROTOCOL_VERSION: str = "1.0"

MIN_DISTINCT_QUALIFYING_SOURCES: int = 2
3 changes: 3 additions & 0 deletions src/augur_labels/augur_labels/annotator/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""Annotator workflow, agreement metrics, and CLI entrypoint."""

from __future__ import annotations
160 changes: 160 additions & 0 deletions src/augur_labels/augur_labels/annotator/agreement.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
"""Inter-annotator agreement metrics.

Implements Cohen's kappa, 60-second timestamp agreement, and mean
Jaccard overlap of market-association sets per the targets in
docs/methodology/labeling-protocol.md §Inter-Annotator Agreement.

Paired decisions are matched by ``candidate_id``; decisions on
candidates only one annotator reviewed are excluded from the report.
"""

from __future__ import annotations

from collections.abc import Sequence
from datetime import datetime, timedelta

from augur_labels.models import AgreementReport, LabelDecision

# Thresholds mirror labeling-protocol.md §Inter-Annotator Agreement.
EVENT_EXISTENCE_KAPPA_TARGET: float = 0.95
TIMESTAMP_AGREEMENT_TARGET: float = 0.90
MARKET_JACCARD_TARGET: float = 0.85
CATEGORY_KAPPA_TARGET: float = 0.90
TIMESTAMP_AGREEMENT_WINDOW: timedelta = timedelta(seconds=60)


def _cohens_kappa(labels_a: Sequence[object], labels_b: Sequence[object]) -> float:
"""Cohen's kappa on two equal-length sequences of categorical labels.

Raises ValueError on length mismatch so a data-integrity bug in the
caller surfaces immediately rather than masquerading as low kappa.
Empty inputs short-circuit to 0.0 because an empty window has no
meaningful agreement metric.
"""
if len(labels_a) != len(labels_b):
raise ValueError(
f"label sequences have mismatched lengths: {len(labels_a)} vs {len(labels_b)}"
)
if not labels_a:
return 0.0
n = len(labels_a)
observed = sum(1 for a, b in zip(labels_a, labels_b, strict=True) if a == b) / n
all_labels = set(labels_a) | set(labels_b)
expected = 0.0
for label in all_labels:
pa = sum(1 for x in labels_a if x == label) / n
pb = sum(1 for x in labels_b if x == label) / n
expected += pa * pb
if expected >= 1.0:
return 1.0
return (observed - expected) / (1.0 - expected)


def _jaccard(a: Sequence[str], b: Sequence[str]) -> float:
set_a = set(a)
set_b = set(b)
union = set_a | set_b
if not union:
return 1.0
return len(set_a & set_b) / len(union)


def _pair_decisions(
decisions_a: Sequence[LabelDecision],
decisions_b: Sequence[LabelDecision],
) -> tuple[list[tuple[LabelDecision, LabelDecision]], int]:
"""Return (paired_decisions, unpaired_count).

Unpaired decisions (candidate reviewed by only one annotator) are
surfaced so ``compute_agreement`` can report them without silently
dropping from the denominator.
"""
by_candidate_a = {d.candidate_id: d for d in decisions_a}
by_candidate_b = {d.candidate_id: d for d in decisions_b}
shared = set(by_candidate_a) & set(by_candidate_b)
unpaired = len(set(by_candidate_a) ^ set(by_candidate_b))
return [(by_candidate_a[c], by_candidate_b[c]) for c in sorted(shared)], unpaired


def compute_agreement(
decisions_a: Sequence[LabelDecision],
decisions_b: Sequence[LabelDecision],
window_start: datetime,
window_end: datetime,
) -> AgreementReport:
"""Compute the four-metric report for paired decisions."""
pairs, unpaired = _pair_decisions(decisions_a, decisions_b)
annotator_ids = (
tuple(sorted({decisions_a[0].annotator_id, decisions_b[0].annotator_id}))
if decisions_a and decisions_b
else ("unknown-a", "unknown-b")
)
if not pairs:
return AgreementReport(
annotator_pair=annotator_ids, # type: ignore[arg-type]
window_start=window_start,
window_end=window_end,
candidate_count=0,
unpaired_count=unpaired,
event_existence_kappa=0.0,
timestamp_agreement_60s=0.0,
market_association_jaccard_mean=0.0,
category_assignment_kappa=0.0,
meets_targets=False,
)
qualifies_a = [a.qualifies for a, _ in pairs]
qualifies_b = [b.qualifies for _, b in pairs]
event_kappa = _cohens_kappa(qualifies_a, qualifies_b)

# Timestamp agreement: only paired qualifying decisions contribute.
qualifying_pairs = [
(a, b) for a, b in pairs if a.qualifies and b.qualifies and a.timestamp and b.timestamp
]
if qualifying_pairs:
within = 0
threshold = TIMESTAMP_AGREEMENT_WINDOW.total_seconds()
for a, b in qualifying_pairs:
ts_a = a.timestamp
ts_b = b.timestamp
if ts_a is None or ts_b is None:
continue
if abs((ts_a - ts_b).total_seconds()) <= threshold:
within += 1
timestamp_agreement = within / len(qualifying_pairs)
else:
timestamp_agreement = 0.0

# Market Jaccard — mean across paired qualifying decisions.
if qualifying_pairs:
jaccards = [_jaccard(a.market_ids, b.market_ids) for a, b in qualifying_pairs]
jaccard_mean = sum(jaccards) / len(jaccards)
else:
jaccard_mean = 0.0

# Category kappa — pairs with both categories set.
category_pairs = [(a.category, b.category) for a, b in pairs if a.category and b.category]
if category_pairs:
category_kappa = _cohens_kappa(
[p[0] for p in category_pairs], [p[1] for p in category_pairs]
)
else:
category_kappa = 0.0

meets_targets = (
event_kappa >= EVENT_EXISTENCE_KAPPA_TARGET
and timestamp_agreement >= TIMESTAMP_AGREEMENT_TARGET
and jaccard_mean >= MARKET_JACCARD_TARGET
and category_kappa >= CATEGORY_KAPPA_TARGET
)
return AgreementReport(
annotator_pair=annotator_ids, # type: ignore[arg-type]
window_start=window_start,
window_end=window_end,
candidate_count=len(pairs),
unpaired_count=unpaired,
event_existence_kappa=event_kappa,
timestamp_agreement_60s=timestamp_agreement,
market_association_jaccard_mean=jaccard_mean,
category_assignment_kappa=category_kappa,
meets_targets=meets_targets,
)
Loading
Loading