feat(news_sources): adapter implementations + aggregator (Wave 1 PR β; requires lib v0.15.0)#226
Merged
Merged
Conversation
…; requires lib v0.15.0) Wave 1 PR β of the institutional data-revamp arc (plan doc: ~/Development/alpha-engine-docs/private/data-revamp-260513.md). Producer-side concrete adapter implementations + multi-source aggregator. Pairs with alpha-engine-lib PR #46 (PR α, v0.15.0) which defined NewsSource Protocol + NewsArticle shape. Architectural pattern: data is the producer; lib defines the contract; research is the consumer (will read producer outputs via S3 + RAG retrieval in future sub-PRs, never imports adapters directly). New modules: collectors/news_sources/ polygon.py — FREE. Uses our existing polygon_client (data repo's copy) for rate-limit reuse. Normalizes /v2/reference/news. gdelt.py — FREE (no key). GDELT 2.0 DOC API; academic-grade event-extracted news. Requires ticker→name map for query building. yahoo_rss.py — FREE (fallback). Pure feedparser-based; matches existing collectors/alternative.py pattern but normalized into NewsArticle. benzinga.py — PAID stub. Raises NotImplementedError on init. ravenpack.py — PAID stub. bloomberg.py — PAID stub. collectors/news_aggregator.py NewsAggregator(sources, trust_weights) — fan-in across enabled NewsSource adapters → dedup (composite fingerprint: normalized title + URL path hash with querystring/fragment stripped) → preserve all source-provenance variants → return AggregatedNewsArticle records sorted by earliest_published_at desc. DEFAULT_TRUST_WEIGHTS: paid 0.95-1.0, polygon 0.9, gdelt 0.85, edgar_press 0.95, yahoo_rss 0.5. Lib pin bumps (lockstep, both must move per the pin-lockstep test): requirements.txt v0.12.0 → v0.15.0 Dockerfile v0.12.0 → v0.15.0 What's deferred (subsequent Wave 1 sub-PRs): PR A.1 — NLP pipeline (Loughran-McDonald + FinBERT + spaCy NER + LLM event extraction). Heavier deps; separate PR. PR A.2 — Structured aggregates writer (S3 parquet per ticker per day). Joined onto research's snapshot in PR F. PR A.3 — RAG ingest path: news → chunked → embedded → indexed in pgvector alongside existing SEC filings corpus. PR B — Filings substrate expansion (EDGAR full coverage: 10-K/Q/14A/S-1/13D/G/13F/Form-4). PR C — Analyst substrate (yfinance + FMP adapters + self-derived revisions tracking from daily snapshots). PR D — Async + S3 cache + per-vendor rate limiters. PR E — Wire RAG retrieval tools into research repo's thesis_update + sector agents. PR F — Wire new substrate into research's fetch_data (supersedes #170's per-ticker pre-fetch). +37 unit tests: - Protocol structural-subtyping for all 3 free adapters - Polygon: happy + transient-failure-per-ticker + schema-drift-skip - GDELT: happy + query building (multi-word vs single-word) + failure-skips-ticker + missing-name-map-fallback - Yahoo RSS: happy + entries-older-than-cutoff-dropped + no-link-skipped + fetch-failure-skips-ticker - Paid stubs: all 3 raise NotImplementedError on init - Aggregator: fan-in + URL/title dedup + canonical-title-longest + canonical-url-highest-trust + ticker-union + one-broken-source- isolated + output-sorted-desc + empty-fan-in - Trust weights: defaults + overrides + unknown-source-defaults-half - Fingerprint determinism - Lib shape contract pin (extra='forbid' + frozen) Suite: 848 passing. Composes with: - alpha-engine-lib PR #46 (v0.15.0) — required for shapes + Protocols - alpha-engine-research PR #172 (CLOSED) — original mis-located substrate; relocated here per architectural correction Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2 tasks
cipher813
added a commit
that referenced
this pull request
May 13, 2026
… event extraction) (#227) Wave 1 PR A.1 of the institutional data-revamp arc (plan doc: ~/Development/alpha-engine-docs/private/data-revamp-260513.md). Producer-side NLP layer that consumes AggregatedNewsArticle output from PR β's aggregator and emits three parallel structured streams: sentiment scores, entity mentions, and event flags. Output is ready for the PR A.2 parquet writer + PR A.3 RAG ingest pass. Architecture: each NLP dimension is a Protocol with one or more concrete implementations. The pipeline orchestrator (NewsNLPPipeline) composes them without knowing which concrete classes are wired — upgrade paths drop in as new adapter classes without touching the orchestrator or downstream consumers. New modules: collectors/nlp/protocols.py SentimentScore, EntityMention, EventFlag Pydantic shapes (frozen + extra='forbid' so parquet writer has stable column schema) SentimentScorer, EntityExtractor, EventExtractor Protocols (runtime_checkable; structural subtyping) collectors/nlp/loughran_mcdonald.py LoughranMcDonaldScorer — finance-domain dictionary sentiment, the academic gold standard (Loughran & McDonald 2011). Composite = (positive - negative) / total_tokens clipped to [-clip, +clip]. Uncertainty counted separately from polarity. load_lm_master_dict() CSV parser — tolerates missing file (logs warning, returns empty dict so production fails clearly). collectors/nlp/event_extraction.py AnthropicEventExtractor — Haiku-tier structured event extraction via tool_use API. Closed taxonomy of 18 event categories (DEFAULT_EVENT_CATEGORIES — earnings, M&A, IPO, management change, regulatory action, FDA action, etc.). Cost-tracked under agent_id='news_event_extractor'. Tolerates transient failures (returns empty) + malformed entries (drops single entry, keeps others). Handles tool_use.input as either dict or JSON string. collectors/nlp/pipeline.py NewsNLPPipeline(sentiment_scorers, entity_extractors, event_extractors).process(articles) → NewsNLPOutput. Composes any number of components per stream. Per-component exceptions are isolated — one broken scorer can't take down a batch. Article text uses canonical_title + longest body_excerpt across variants. scripts/download_lm_dict.py Operator script. Fetches the canonical Loughran-McDonald 2022 Master Dictionary CSV from Notre Dame's research page. Idempotent (overwrites). Pin via VCS commit if reproducibility matters. URL is overridable when the source moves. What's deferred to subsequent sub-PRs: PR A.1.1 — FinBERT scorer (HF transformer; heavier deps; lives in collectors/nlp/finbert.py as a drop-in) PR A.1.2 — spaCy NER entity extractor PR A.2 — Structured aggregates writer (S3 parquet per (ticker, date) joining sentiment + entity + event streams) PR A.3 — RAG ingest path (raw article text → chunked → embedded → pgvector alongside SEC filings) +42 unit tests: - Pydantic shape construction + frozen + extra='forbid' (3 shapes) - Protocol structural-subtyping (3 protocols, structural matches) - Tokenization (alphabetic only, lowercase, drops digits) - _truthy helper (year-stamp / 0 / blank / non-numeric) - LM scorer: pure positive / pure negative / balanced / dilution-by-neutral / uncertainty-counted-separately / empty-text / clipped-to-range / empty-dict-warns-and-yields-zero - load_lm_master_dict: canonical-CSV / missing-file-warns / blank-rows-skipped - Anthropic event extractor: tool-spec-includes-default-categories / happy-path / empty-text-skips-call / transient-LLM-failure / malformed-entry-dropped / tool_use-input-as-JSON-string / no-tool-use-block-returns-empty - Pipeline: empty / sentiment-per-article / multiple-scorers / scorer-exception-isolated / event-extractor-receives-tickers / empty-article-text-skipped / uses-longest-excerpt-across-variants Suite: 890 passing (1 skipped). Composes with: - PR α (alpha-engine-lib #46, v0.15.0) — NewsArticle Pydantic shape - PR β (this repo #226) — AggregatedNewsArticle input - data-revamp-260513.md plan doc — full 4-wave arc context Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
cipher813
added a commit
that referenced
this pull request
May 13, 2026
…-ticker per-day parquet) (#228) Wave 1 PR A.2 of the institutional data-revamp arc (plan doc: ~/Development/alpha-engine-docs/private/data-revamp-260513.md). Joins the 3 NLP-pipeline output streams (sentiment_scores, entity_mentions, event_flags) with the aggregator's source-provenance information and produces one row per (ticker, aggregate_date) in S3 parquet. This is the canonical structured signal that downstream consumers (research's fetch_data) read into input_data_snapshot for thesis_update + sector_quant + sector_qual agents. New module: data/derived/news_aggregates.py NewsTickerDailyAggregate dataclass (frozen) — canonical row shape with schema_version pinned to column. 19 columns covering: - identity: ticker, aggregate_date, schema_version - volume: n_articles, n_articles_trusted_weighted, n_articles_by_source_json - LM sentiment: lm_sentiment_mean/max/min/trusted_mean + LM word counts (positive, negative, uncertainty, total_tokens) - events: event_count, severity_max/mean, categories (sorted comma-joined), top_event_descriptions (top-N by severity desc, " | "-joined) - entities: entity_mentions_count build_news_aggregates_df() — pure aggregation, no I/O. Accepts NewsNLPOutput + AggregatedNewsArticle list + aggregate_date + optional NewsAggregator (for trust-weighted means). write_news_aggregates_parquet() / read_news_aggregates_parquet() — S3 parquet I/O. Idempotent overwrite at ``s3://alpha-engine-research/data/news_aggregates/{date}.parquet``. aggregate_and_write() — orchestrator-friendly end-to-end helper returning (key, df) so callers can log row counts / emit CW metrics without re-reading from S3. Aggregation semantics: - Multi-ticker article emits one row per ticker (each row sees the full article). - Event.tickers gates per-ticker event attachment; empty tickers list means "inherit from article" (extractor didn't gate). - Trust-weighted sentiment mean weights each article by max trust across its source variants. n_articles_trusted_weighted sums trust per VARIANT (so a 3-source dedup counts 3× its trust contribution). - Top-3 event descriptions chosen by severity desc; ties broken by appearance order. Stored as " | "-joined string for parquet column friendliness. Why one parquet per date (vs per ticker): - ~25 held + ~50 universe tickers = ~75 rows/day. Single parquet reads faster than 75 separate files. - Single S3 object per day = cheaper LIST, atomic overwrite. - Schema migration simpler — one file per day, not 75. What's deferred: PR A.3 — RAG ingest path (full article text → chunked → embedded → pgvector alongside SEC filings corpus) PR D — Async + S3 cache + per-vendor rate limiters PR F — Wire substrate into research's fetch_data (supersedes #170's per-ticker pre-fetch) +21 unit tests: - Per-ticker aggregation: one-article-one-ticker / multi-ticker article emits one row per / multiple-articles-per-ticker / empty-articles-produces-empty-df-with-schema - Trust weighting: trusted-mean / n_articles_trusted_weighted / source counts json per variant - Event aggregation: tickers-set filters to those tickers / no-tickers inherits from article / top-N sorted by severity desc / categories sorted+joined / severity_mean computed - S3 round-trip: write-read preserves rows / missing parquet returns empty schema df / overwrite / s3 key format / custom prefix - End-to-end aggregate_and_write helper + accepts datetime - Schema version pinned to int constant + on every row In-memory S3 mock used for round-trip tests (no moto dep added). Suite: 911 passing (1 skipped). Composes with: - PR β (#226) — AggregatedNewsArticle input shape - PR A.1 (#227) — NewsNLPOutput shape - PR α (lib v0.15.0) — NewsArticle base shape - data-revamp-260513.md plan doc Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
cipher813
added a commit
that referenced
this pull request
May 13, 2026
Wave 1 PR A.3 of the institutional data-revamp arc (plan doc:
~/Development/alpha-engine-docs/private/data-revamp-260513.md).
Indexes aggregated news articles into the existing pgvector RAG
corpus alongside SEC filings. Consumer agents (thesis_update,
sector_quant/qual) retrieve relevant news at inference time via the
same hybrid-retrieval API the qual analyst's query_filings tool
already uses.
Pairs with PRs β/A.1/A.2 — completes the Wave 1 producer-side news
substrate. Consumer-side tool wiring (PR E) and fetch_data
integration (PR F) follow.
New module: rag/pipelines/ingest_news.py
ingest_articles(articles, *, filed_date, ticker_to_sector,
embed_texts_fn, document_exists_fn,
ingest_document_fn, dry_run) -> stats dict
Architecture:
- Mirrors ingest_8k_filings.py pattern (canonical RAG-pipeline shape)
- One document per (ticker, article) pair — multi-ticker articles
index once per ticker so the ticker-keyed RAG schema surfaces
them when the qual agent queries by any constituent
- Idempotent via document_exists pre-check — re-runs skip the
embedding call entirely (saves vector-API cost)
- Chunking: one chunk per article (title + longest body excerpt).
Polygon/GDELT/Yahoo bodies are short (typically <500 tokens);
multi-chunk splitting can land in a follow-up if Benzinga or
a full-body adapter joins
- Source labeling: RAG `source` field prefixed `news_` so
consumer queries can filter "news only" vs "filings only" by
source-prefix without enumerating vendors
Canonical source selection:
Picks alphabetically across variants so re-ingests produce the
same document (deterministic across runs). Composes with the
aggregator's source-provenance preservation in PR β.
Failure isolation:
Per-document failures (transient pgvector / embed-API hiccup)
isolated to that document; batch continues. ingest_document
returning None (lib's failure signal) counts as a failure
without crashing.
Composition chain (full Wave 1 dataflow):
PolygonNewsAdapter + GdeltNewsAdapter + YahooRssNewsAdapter (PR β)
│
▼
NewsAggregator.fetch() — fan-in + dedup + trust
│
▼
AggregatedNewsArticle list
│
┌─────────────────┼─────────────────┐
▼ ▼ ▼
NewsNLPPipeline aggregate_and_write ingest_articles
(PR A.1) (PR A.2 — parquet) (PR A.3 — RAG)
│ │ │
▼ ▼ ▼
sentiment+events per-ticker per-day pgvector docs
+entities streams structured parquet alongside filings
+18 unit tests:
- _rag_source prefix
- _chunk_text combines title + longest body
- _chunk_text handles missing title / all empty
- _canonical_source deterministic alphabetical / single variant
- Single-ticker happy path: embed + ingest called with right shape
- Idempotency: document_exists short-circuits embed AND ingest
- Empty / too-short bodies skipped (counter increments)
- Multi-ticker: emits one doc per ticker
- Multi-ticker per-ticker existence check (AAPL exists, MSFT new)
- Sector lookup: ticker_to_sector passed through; missing -> None
- dry_run mode skips embed/ingest but counts
- Failure isolation: one bad doc continues batch
- ingest returning None counts as failure not crash
- Stats dict shape pinned to 6 canonical keys
Suite: 929 passing (1 skipped).
Composes with:
- PR β (#226) — AggregatedNewsArticle input shape
- PR A.1 (#227) — NewsNLPOutput shape (joined to article fingerprints)
- PR A.2 (#228) — structured aggregates writer (parallel write path)
- PR α (lib v0.15.0) — NewsArticle base shape
- alpha_engine_lib.rag (embed_texts + document_exists + ingest_document)
- data-revamp-260513.md plan doc
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
cipher813
added a commit
that referenced
this pull request
May 13, 2026
…or rate limiters (#232) Wave 1 PR D of the institutional data-revamp arc (plan doc: ~/Development/alpha-engine-docs/private/data-revamp-260513.md). Wraps the sync NewsSource adapters (PR β) in an async fan-in pattern using anyio.to_thread.run_sync — adapters stay synchronous (existing tests + Lambda dispatch unchanged); aggregation becomes concurrent. Adds a generic S3-backed TTL cache used by the async aggregator and available to any future producer-side fetcher. New modules: data/cache.py S3TtlCache(s3_client, bucket, prefix, default_ttl_seconds) class. - get(key) -> bytes | None (returns None on missing OR expired) - set(key, value, *, ttl_seconds) (idempotent overwrite) - cached_call(key, *, compute_fn, ttl_seconds) (sync fetch-or-compute) - cached_acall(key, *, async_compute_fn, ttl_seconds) (async variant) - get_json / set_json convenience wrappers Storage: s3://{bucket}/{prefix}/{sha1(key)}.bin with S3 metadata: cache-key : original cache key (debug) cache-cached-at : ISO-8601 UTC write timestamp cache-ttl-seconds : applied TTL cache-expires-at : ISO-8601 UTC expiry timestamp Lazy eviction (expired entries NOT deleted on read — overwritten on next .set). Entries without our metadata stamp are treated as expired so external writes don't return stale or garbled content. collectors/news_aggregator_async.py AsyncNewsAggregator(sources, *, trust_weights, per_source_concurrency, cache, per_source_ttl_seconds, max_retry_attempts, retry_initial_wait) Architecture: - anyio.create_task_group for parallel adapter fan-in - anyio.Semaphore per vendor (DEFAULT_PER_SOURCE_CONCURRENCY: polygon=2 / gdelt=1 / yahoo_rss=4 / edgar_press=2 / paid=4) - tenacity AsyncRetrying with exp backoff (default 3 attempts, 2s/4s/8s wait, max 30s) - Optional S3TtlCache; per-source TTLs default 30m-4h by vendor cadence (polygon=30m, gdelt=1h, yahoo=1h, edgar=4h, paid=30m) - Reuses sync NewsAggregator._dedup + DEFAULT_TRUST_WEIGHTS verbatim so the canonical AggregatedNewsArticle output shape is identical to the sync path Adapter failure modes: - Transient (timeout, 5xx): tenacity exp-backoff retry; if final attempt fails returns empty list for that adapter - Final-attempt failure: caught at task-group level, logs + continues with remaining adapters (defense in depth matching sync path) - Cache deserialization failure: logs + treats as cache miss (re-fetches from adapter) +23 unit tests: - Cache helpers: hash deterministic / handles special chars - S3TtlCache: set→get round-trip / missing returns None / expired returns None / entry-without-metadata treated as expired / overwrite extends TTL / default TTL applied / get_json round-trip / get_json malformed returns None - cached_call: miss calls compute + caches / hit skips compute - cached_acall: same shape (async) - Cache key (async): stable under ticker reorder / changes with hours / changes with vendor - Async aggregator: parallel fan-in combines sources / per-source failure isolated / retry recovers from transient failure / retry exhausted returns empty - Caching: hit skips adapter / expiry re-runs adapter / isolated per-tickers - Semaphore concurrency: with concurrency=1, 2 same-named-vendor adapters serialize (max in-flight = 1 verified via lock-tracked counter) Suite: 1003 passing (1 skipped). Composes with: - PR β (#226) — NewsSource Protocol + sync NewsAggregator - PR A.1 / A.2 / A.3 — same NewsSource adapters - PR α (lib v0.15.0) — NewsArticle shape - alpha_engine_data already has anyio + tenacity in venv What's deferred: - Sources are still sync; if a vendor SDK becomes natively async, add an isinstance check in _invoke_adapter to dispatch awaitably - Adapters for analyst (AnalystSource) + filings (FilingSource) don't have async aggregators yet — same pattern lifts trivially when they're wired into Saturday SF fan-in Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Wave 1 PR β of the institutional data-revamp arc (plan doc:
~/Development/alpha-engine-docs/private/data-revamp-260513.md).Producer-side concrete adapter implementations + multi-source aggregator. Pairs with alpha-engine-lib PR #46 (PR α, v0.15.0) which defined the
NewsSourceProtocol +NewsArticleshape.Merge order: lib PR #46 must merge + tag v0.15.0 first; this PR then merges. The lockstep test pins the version pair (requirements.txt + Dockerfile) — both bump in lockstep so the Lambda image and venv stay in sync.
Architectural pattern
alpha-engine-lib) — defines the contract (NewsSource Protocol + NewsArticle shape)Closes the relocation of
alpha-engine-research#172(CLOSED — substrate was mis-located in the consumer repo).What's in
News adapters (
collectors/news_sources/)polygon.pypolygon_client.pyfor rate-limit reuse. Normalizes/v2/reference/news. Polygon aggregates Benzinga + Zacks + MT Newswires (institutional content on free tier).gdelt.pyyahoo_rss.pycollectors/alternative.py::_fetch_newspattern but normalized. Trust 0.5 (fallback).benzinga.pyNotImplementedErroron init — fails loud.ravenpack.pybloomberg.pyAggregator (
collectors/news_aggregator.py)NewsAggregator(sources, trust_weights)— fan-in across enabled adapters → dedup → preserve source-provenance → returnAggregatedNewsArticlerecords sorted byearliest_published_atdesc.DEFAULT_TRUST_WEIGHTSranges paid 0.95-1.0 → free 0.5-0.9 by quality.Lib pin bumps (lockstep)
requirements.txtv0.12.0 → v0.15.0Dockerfilev0.12.0 → v0.15.0Test plan
tests/test_news_sources_and_aggregator.py):isinstance(adapter, NewsSource))NotImplementedErroron initextra='forbid'+ frozen)test_lib_pin_lockstep.py): 848 passing in 4s.What's deferred
Per the plan doc, remaining Wave 1 sub-PRs:
fetch_data(supersedes fix(infra): drop inline EB-SFN role IAM writes #170)🤖 Generated with Claude Code