From e1d4f6335a06b491265f325a556c72e0088062ff Mon Sep 17 00:00:00 2001 From: Seyed Yahya Shirazi Date: Tue, 9 Jun 2026 17:59:05 -0700 Subject: [PATCH 1/4] feat(citations): OpenAlex client + citation_counts table Add a direct OpenAlex client (openalex_citations.py) that resolves a DOI to its work id, returns the complete per-year citation histogram via group_by (uncapped), and cursor-paginates the latest N citing papers sorted by publication date. Add a citation_counts(cites_doi, year, count) table and a replace_citation_counts helper that mirrors the histogram wholesale. opencite caps citing-paper fetches at one page (<=200) with no pagination and no aggregation, which silently truncated recent citations and inverted the per-year curve; this is the foundation for fixing that. --- src/knowledge/db.py | 36 ++++++ src/knowledge/openalex_citations.py | 164 ++++++++++++++++++++++++++++ 2 files changed, 200 insertions(+) create mode 100644 src/knowledge/openalex_citations.py diff --git a/src/knowledge/db.py b/src/knowledge/db.py index ba2dfb6..79c859f 100644 --- a/src/knowledge/db.py +++ b/src/knowledge/db.py @@ -174,6 +174,18 @@ def active_mirror_context(mirror_id: str) -> Iterator[None]: UNIQUE(source_type, source_name) ); +-- True per-year citation counts per canonical DOI, fetched from OpenAlex +-- group_by (complete, uncapped). This is the source of truth for the public +-- citations dashboard; the papers table only stores a recent sample of the +-- citing papers themselves for the search tool. +CREATE TABLE IF NOT EXISTS citation_counts ( + cites_doi TEXT NOT NULL, + year INTEGER NOT NULL, + count INTEGER NOT NULL, + synced_at TEXT NOT NULL, + PRIMARY KEY (cites_doi, year) +); + -- Docstrings extracted from source code CREATE TABLE IF NOT EXISTS docstrings ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -752,6 +764,30 @@ def update_sync_metadata( conn.commit() +def replace_citation_counts(cites_doi: str, counts: dict[int, int], project: str = "hed") -> None: + """Replace the stored per-year citation counts for one canonical DOI. + + The counts are an exact, complete histogram from OpenAlex, so the row set + is replaced wholesale (delete + insert) inside one transaction: this keeps + the table an accurate mirror and drops any year that no longer appears. + + Args: + cites_doi: Canonical DOI whose citations these counts describe. + counts: Mapping of publication year to citing-paper count. + project: Assistant/project name. Defaults to 'hed'. + """ + now = _now_iso() + with get_connection(project) as conn: + conn.execute("DELETE FROM citation_counts WHERE cites_doi = ?", (cites_doi,)) + if counts: + conn.executemany( + "INSERT INTO citation_counts (cites_doi, year, count, synced_at) " + "VALUES (?, ?, ?, ?)", + [(cites_doi, year, count, now) for year, count in counts.items()], + ) + conn.commit() + + def upsert_bep_item( conn: sqlite3.Connection, *, diff --git a/src/knowledge/openalex_citations.py b/src/knowledge/openalex_citations.py new file mode 100644 index 0000000..9e692cc --- /dev/null +++ b/src/knowledge/openalex_citations.py @@ -0,0 +1,164 @@ +"""Direct OpenAlex client for citation analysis. + +opencite returns citing papers from a single page (<=200), ordered for its own +ranking, with no pagination and no aggregation exposed. For a citations +dashboard that silently truncates recent citations (the first page skews to +older, highly-cited works). We therefore query OpenAlex directly: + +- ``counts_by_year`` uses ``group_by=publication_year`` for the *exact, + complete* per-year histogram with no cap. +- ``recent_citing_papers`` cursor-paginates ``sort=publication_date:desc`` to + collect the latest N citing papers for the search corpus. + +The client takes an optional injected ``httpx.Client`` so tests can supply an +``httpx.MockTransport`` instead of hitting the network. +""" + +import logging +from dataclasses import dataclass + +import httpx + +logger = logging.getLogger(__name__) + +OPENALEX_BASE = "https://api.openalex.org" +_TIMEOUT = 30.0 +_PER_PAGE = 200 # OpenAlex maximum page size + + +@dataclass +class CitingPaper: + """A minimal citing-paper record for the search corpus.""" + + openalex_id: str + doi: str | None + title: str + publication_date: str | None + url: str + + +def _strip_id(value: str | None) -> str: + """Reduce an OpenAlex IRI (https://openalex.org/W123) to its bare id.""" + if not value: + return "" + return value.rstrip("/").rsplit("/", 1)[-1] + + +def _strip_doi(value: str | None) -> str | None: + """Reduce a DOI URL to the bare ``10.xxxx/yyyy`` form.""" + if not value: + return None + cleaned = value.strip() + for prefix in ("https://doi.org/", "http://doi.org/", "https://dx.doi.org/"): + if cleaned.lower().startswith(prefix): + cleaned = cleaned[len(prefix) :] + break + return cleaned or None + + +class OpenAlexCitationClient: + """Queries OpenAlex for citation counts and recent citing papers.""" + + def __init__( + self, + *, + email: str = "", + api_key: str = "", + client: httpx.Client | None = None, + ) -> None: + self._email = email + self._api_key = api_key + self._owns_client = client is None + self._client = client or httpx.Client(timeout=_TIMEOUT) + + def __enter__(self) -> "OpenAlexCitationClient": + return self + + def __exit__(self, *exc: object) -> None: + self.close() + + def close(self) -> None: + if self._owns_client: + self._client.close() + + def _params(self, **extra: object) -> dict[str, object]: + params: dict[str, object] = dict(extra) + # mailto routes to the polite pool; api_key unlocks premium throughput. + if self._email: + params["mailto"] = self._email + if self._api_key: + params["api_key"] = self._api_key + return params + + def resolve_work_id(self, doi: str) -> str | None: + """Resolve a DOI to its OpenAlex work id (e.g. ``W2128495200``).""" + resp = self._client.get( + f"{OPENALEX_BASE}/works/doi:{doi}", + params=self._params(select="id"), + ) + if resp.status_code == 404: + logger.warning("OpenAlex has no work for DOI %s", doi) + return None + resp.raise_for_status() + work_id = _strip_id(resp.json().get("id")) + return work_id or None + + def counts_by_year(self, work_id: str) -> dict[int, int]: + """Return the complete per-year count of works citing ``work_id``. + + Uses OpenAlex ``group_by`` so the counts are exact and uncapped, + independent of how many citing papers are stored. + """ + resp = self._client.get( + f"{OPENALEX_BASE}/works", + params=self._params(filter=f"cites:{work_id}", group_by="publication_year"), + ) + resp.raise_for_status() + counts: dict[int, int] = {} + for group in resp.json().get("group_by", []): + try: + year = int(group["key"]) + except (KeyError, TypeError, ValueError): + continue # non-year buckets (e.g. "unknown") are skipped + counts[year] = int(group.get("count", 0)) + return counts + + def recent_citing_papers(self, work_id: str, limit: int = 2000) -> list[CitingPaper]: + """Collect up to ``limit`` most-recent works citing ``work_id``. + + Cursor-paginates ``sort=publication_date:desc`` so the stored sample is + the newest citations rather than an arbitrary first page. + """ + papers: list[CitingPaper] = [] + cursor: str | None = "*" + while cursor and len(papers) < limit: + page_size = min(_PER_PAGE, limit - len(papers)) + resp = self._client.get( + f"{OPENALEX_BASE}/works", + params=self._params( + filter=f"cites:{work_id}", + sort="publication_date:desc", + select="id,doi,title,publication_date", + cursor=cursor, + **{"per-page": page_size}, + ), + ) + resp.raise_for_status() + data = resp.json() + for work in data.get("results", []): + title = work.get("title") + if not title: + continue + papers.append( + CitingPaper( + openalex_id=_strip_id(work.get("id")), + doi=_strip_doi(work.get("doi")), + title=title, + publication_date=work.get("publication_date"), + url=work.get("doi") or work.get("id") or "", + ) + ) + if len(papers) >= limit: + break + cursor = data.get("meta", {}).get("next_cursor") + return papers From 10daccadb890f37479fc8f490eb660721559a545 Mon Sep 17 00:00:00 2001 From: Seyed Yahya Shirazi Date: Tue, 9 Jun 2026 17:59:05 -0700 Subject: [PATCH 2/4] feat(citations): true uncapped per-year counts; store latest 2000 papers sync_citing_papers now queries OpenAlex directly per canonical DOI: it stores the exact, complete per-year counts in citation_counts (source of truth for the dashboard) and upserts the latest 2000 citing papers (publication date desc) into the papers table for the search tool. get_citation_stats reads the counts table (empty, not error, before the first sync). The CLI decouples the citation storage cap from the query --limit. --- src/cli/sync.py | 12 ++-- src/knowledge/papers_sync.py | 135 ++++++++++++++++++++++------------- src/knowledge/search.py | 28 ++++---- 3 files changed, 104 insertions(+), 71 deletions(-) diff --git a/src/cli/sync.py b/src/cli/sync.py index ce4503a..2abf636 100644 --- a/src/cli/sync.py +++ b/src/cli/sync.py @@ -364,16 +364,18 @@ def sync_papers( total += count console.print(f" [dim]{src}: {count} papers[/dim]") - # Sync citing papers if DOIs are configured + # Sync citing papers if DOIs are configured. Counts are fetched complete + # (uncapped) from OpenAlex; only the stored sample of recent citing papers + # uses the default cap, independent of the query --limit above. if include_citations: dois = _get_community_paper_dois(community) if dois: - console.print(f"\n[dim]Syncing papers citing {len(dois)} DOI(s)...[/dim]") - with console.status("[green]Syncing citing papers...[/green]"): - citing_count = sync_citing_papers(dois, limit, project=community) + console.print(f"\n[dim]Syncing citations for {len(dois)} DOI(s)...[/dim]") + with console.status("[green]Syncing citations...[/green]"): + citing_count = sync_citing_papers(dois, project=community) results_by_source["citing"] = citing_count total += citing_count - console.print(f"[dim]Citing papers: {citing_count}[/dim]") + console.print(f"[dim]Recent citing papers stored: {citing_count}[/dim]") console.print(f"\n[green]Total papers synced for {community}: {total}[/green]") diff --git a/src/knowledge/papers_sync.py b/src/knowledge/papers_sync.py index f185e27..9d7ce8d 100644 --- a/src/knowledge/papers_sync.py +++ b/src/knowledge/papers_sync.py @@ -21,11 +21,16 @@ from typing import Any, TypeVar from opencite import Config, Paper -from opencite.citations import CitationExplorer from opencite.exceptions import APIKeyError, ConfigurationError, OpenCiteError from opencite.search import SearchOrchestrator -from src.knowledge.db import get_connection, update_sync_metadata, upsert_paper +from src.knowledge.db import ( + get_connection, + replace_citation_counts, + update_sync_metadata, + upsert_paper, +) +from src.knowledge.openalex_citations import CitingPaper, OpenAlexCitationClient from src.knowledge.search import SearchResult logger = logging.getLogger(__name__) @@ -252,27 +257,6 @@ async def _search_queries( return out -async def _citing_for_dois( - config: Config, - dois: list[str], - max_results: int, -) -> list[tuple[str, list[Paper]]]: - """Fetch citing papers for every DOI through one shared CitationExplorer.""" - out: list[tuple[str, list[Paper]]] = [] - async with CitationExplorer(config) as explorer: - for doi in dois: - try: - result = await explorer.citing_papers(doi, max_results=max_results) - out.append((doi, result.papers)) - except (OpenCiteError, TimeoutError) as e: - logger.warning("opencite citation error for DOI %s: %s", doi, e) - out.append((doi, [])) - except Exception: - logger.exception("unexpected error fetching citations for DOI %s", doi) - out.append((doi, [])) - return out - - def _sync_single_source( query: str, max_results: int, @@ -389,51 +373,100 @@ def sync_all_papers( return results +def _store_citing_papers(papers: Iterable[CitingPaper], project: str, *, cites_doi: str) -> int: + """Upsert OpenAlex citing-paper records into the papers table. + + Returns the number of rows stored. Each row is labelled with ``cites_doi`` + so it links back to the canonical paper it cites. + """ + stored = 0 + with get_connection(project) as conn: + for paper in papers: + if not paper.openalex_id or not paper.title: + continue + upsert_paper( + conn, + source="openalex", + external_id=paper.openalex_id, + title=paper.title, + first_message=None, + url=paper.url or (f"https://doi.org/{paper.doi}" if paper.doi else ""), + created_at=paper.publication_date, + cites_doi=cites_doi, + ) + stored += 1 + conn.commit() + return stored + + def sync_citing_papers( dois: list[str], - max_results: int = 100, + max_results: int = 2000, project: str = "hed", openalex_api_key: str | None = None, openalex_email: str | None = None, ) -> int: - """Sync papers that cite the given DOIs using opencite's citation graph. + """Sync citation data for the given canonical DOIs from OpenAlex. + + For each DOI this records two things, queried directly from OpenAlex + (opencite caps citing-paper fetches at one page and exposes no aggregation, + which truncates recent citations): + + 1. The *complete, uncapped* per-year citation histogram, via + ``group_by=publication_year``, stored in ``citation_counts``. This is + the source of truth for the public citations dashboard. + 2. The latest ``max_results`` citing papers (publication date descending), + upserted into the ``papers`` table for the search corpus. Args: - dois: List of DOIs to find citations for. Bare format preferred - (e.g. "10.1016/j.neuroimage.2021.118809"); opencite auto-detects - and resolves the identifier. Unresolved DOIs are skipped with a - warning. - max_results: Maximum number of citing papers per DOI. + dois: Canonical DOIs to track citations for (bare ``10.xxxx/yyyy``). + Unresolved DOIs are skipped with a warning. + max_results: Maximum number of recent citing papers stored per DOI. + Does not limit the per-year counts, which are always complete. project: Project/community ID for database isolation. - openalex_api_key: Optional OpenAlex API key for premium access. - openalex_email: Optional email for OpenAlex polite pool. + openalex_api_key: Optional OpenAlex API key for premium throughput. + openalex_email: Optional email for the OpenAlex polite pool. Returns: - Total number of citing papers synced. + Total citing papers stored across all DOIs (counts are uncapped). """ if isinstance(dois, str): raise TypeError(f"dois must be a list of strings, not a bare string: {dois!r}") - config = _build_config(openalex_api_key=openalex_api_key, openalex_email=openalex_email) - try: - cited = _run(_citing_for_dois(config, dois, max_results)) - except Exception as e: - logger.warning("opencite citation lookup failed for %s: %s", project, e) - return 0 + email = openalex_email or _OPENALEX_EMAIL or "" + api_key = openalex_api_key or _OPENALEX_API_KEY or "" - total = 0 - for doi, papers in cited: - try: - counts = _store_papers(papers, project, cites_doi=doi) - count = sum(counts.values()) - update_sync_metadata("papers", f"citing_{doi}", count, project) - logger.info("Synced %d papers citing %s", count, doi) - total += count - except Exception: - # Isolate per-DOI so one DB failure does not abort the batch. - logger.exception("failed to store citing papers for %s (%s)", doi, project) + total_stored = 0 + with OpenAlexCitationClient(email=email, api_key=api_key) as client: + for doi in dois: + try: + work_id = client.resolve_work_id(doi) + if not work_id: + logger.warning("Skipping citations: cannot resolve DOI %s", doi) + continue + + # 1. Complete per-year counts (source of truth for the chart). + counts = client.counts_by_year(work_id) + replace_citation_counts(doi, counts, project) + total_citations = sum(counts.values()) + + # 2. Latest citing papers for the search corpus. + papers = client.recent_citing_papers(work_id, limit=max_results) + stored = _store_citing_papers(papers, project, cites_doi=doi) + + update_sync_metadata("citations", f"citing_{doi}", total_citations, project) + logger.info( + "Citations for %s: %d total across years, stored %d recent papers", + doi, + total_citations, + stored, + ) + total_stored += stored + except Exception: + # Isolate per-DOI so one failure does not abort the batch. + logger.exception("citation sync failed for %s (%s)", doi, project) - return total + return total_stored def _config_from_env() -> Config: diff --git a/src/knowledge/search.py b/src/knowledge/search.py index c8d0b7a..c3b0222 100644 --- a/src/knowledge/search.py +++ b/src/knowledge/search.py @@ -393,11 +393,11 @@ class CitationStats: def get_citation_stats(project: str = "eeglab") -> CitationStats: """Aggregate citation counts for the public citations dashboard. - Counts papers that cite a community's canonical DOIs (``papers.cites_doi`` - is set), grouped by the citing paper's publication year. The year is the - leading four digits of ``created_at`` (ISO date or bare year); rows whose - ``created_at`` is missing or not a four-digit year are skipped so a bad - date never lands in a bogus year bucket. + Reads the ``citation_counts`` table, which holds the exact, complete + per-year histogram per canonical DOI fetched from OpenAlex ``group_by`` + (not the capped sample of citing papers in the ``papers`` table). A + community that has not yet had its citations synced (table absent) yields + empty stats rather than an error. Args: project: Community ID for database isolation. Defaults to 'eeglab'. @@ -407,14 +407,7 @@ def get_citation_stats(project: str = "eeglab") -> CitationStats: stacked ``by_paper`` breakdown (canonical DOI -> year -> count). Years are sorted ascending in every mapping. """ - sql = """ - SELECT cites_doi, substr(created_at, 1, 4) AS yr, COUNT(*) AS cnt - FROM papers - WHERE cites_doi IS NOT NULL - AND created_at IS NOT NULL - AND substr(created_at, 1, 4) GLOB '[0-9][0-9][0-9][0-9]' - GROUP BY cites_doi, yr - """ + sql = "SELECT cites_doi, year, count FROM citation_counts" per_year: dict[str, int] = {} by_paper: dict[str, dict[str, int]] = {} @@ -423,12 +416,17 @@ def get_citation_stats(project: str = "eeglab") -> CitationStats: with get_connection(project) as conn: for row in conn.execute(sql): doi = row["cites_doi"] - year = row["yr"] - count = row["cnt"] + year = str(row["year"]) + count = row["count"] per_year[year] = per_year.get(year, 0) + count by_paper.setdefault(doi, {})[year] = count total += count except sqlite3.OperationalError as e: + # The table is created on the first citation sync; before then, treat + # the feed as empty instead of failing the request. + if "no such table" in str(e).lower(): + logger.info("citation_counts not yet present for project %s", project) + return CitationStats(total=0, per_year={}, by_paper={}) logger.error( "Database operational error computing citation stats: %s", e, From d67f5ecf7b24a7db70728841f92728c32aad4605 Mon Sep 17 00:00:00 2001 From: Seyed Yahya Shirazi Date: Tue, 9 Jun 2026 17:59:05 -0700 Subject: [PATCH 3/4] test(citations): OpenAlex client, counts-based stats, end-to-end sync - OpenAlex client tests via httpx.MockTransport (resolve/404, group_by parsing, cursor pagination, limit, titleless skip, error propagation). - get_citation_stats and the endpoint now assert against citation_counts; add replace-overwrites and missing-table-is-empty cases. - End-to-end sync_citing_papers test (real client + real DB, mock transport): stores true counts and links recent papers; unresolved DOI skipped. --- tests/test_api/test_citations_feed.py | 27 +-- tests/test_knowledge/test_citation_stats.py | 72 +++--- .../test_knowledge/test_openalex_citations.py | 205 ++++++++++++++++++ tests/test_knowledge/test_papers_sync.py | 83 +++++++ 4 files changed, 335 insertions(+), 52 deletions(-) create mode 100644 tests/test_knowledge/test_openalex_citations.py diff --git a/tests/test_api/test_citations_feed.py b/tests/test_api/test_citations_feed.py index 596f6e5..d992e2e 100644 --- a/tests/test_api/test_citations_feed.py +++ b/tests/test_api/test_citations_feed.py @@ -19,7 +19,7 @@ from src.api.routers.community import create_community_router from src.assistants import discover_assistants, registry from src.core.config.community import PublicFeedsConfig -from src.knowledge.db import get_connection, init_db, upsert_paper +from src.knowledge.db import init_db, replace_citation_counts COMMUNITY_ID = "eeglab" DOI_A = "10.1016/j.jneumeth.2003.10.009" @@ -30,30 +30,13 @@ @pytest.fixture def citations_db(tmp_path: Path) -> Iterator[Path]: - """Temp knowledge DB with citing papers across two canonical DOIs.""" + """Temp knowledge DB with per-year citation counts for two canonical DOIs.""" db_path = tmp_path / "knowledge" / "test.db" with patch("src.knowledge.db.get_db_path", return_value=db_path): init_db(COMMUNITY_ID) - with get_connection(COMMUNITY_ID) as conn: - rows = [ - ("a1", "2019-05-01", DOI_A), - ("a2", "2019-11-20", DOI_A), - ("a3", "2020", DOI_A), - ("b1", "2020-02-02", DOI_B), - ("k1", "2021", None), # keyword-only, excluded from stats - ] - for external_id, created_at, cites_doi in rows: - upsert_paper( - conn, - source="openalex", - external_id=external_id, - title=f"Paper {external_id}", - first_message=None, - url=f"https://doi.org/10.test/{external_id}", - created_at=created_at, - cites_doi=cites_doi, - ) - conn.commit() + # DOI_A: 2 in 2019, 1 in 2020 ; DOI_B: 1 in 2020 + replace_citation_counts(DOI_A, {2019: 2, 2020: 1}, project=COMMUNITY_ID) + replace_citation_counts(DOI_B, {2020: 1}, project=COMMUNITY_ID) yield db_path diff --git a/tests/test_knowledge/test_citation_stats.py b/tests/test_knowledge/test_citation_stats.py index 4d828cb..2178e68 100644 --- a/tests/test_knowledge/test_citation_stats.py +++ b/tests/test_knowledge/test_citation_stats.py @@ -9,7 +9,12 @@ import pytest -from src.knowledge.db import get_connection, init_db, upsert_paper +from src.knowledge.db import ( + get_connection, + init_db, + replace_citation_counts, + upsert_paper, +) from src.knowledge.search import CitationStats, get_citation_stats DOI_A = "10.1016/j.jneumeth.2003.10.009" @@ -30,59 +35,53 @@ def _add_paper(conn, external_id, *, created_at, cites_doi=None, source="openale @pytest.fixture -def citations_db(tmp_path: Path): - """Temp DB with citing papers across two canonical DOIs and several years.""" +def counts_db(tmp_path: Path): + """Temp DB with per-year citation counts for two canonical DOIs.""" db_path = tmp_path / "knowledge" / "test.db" with patch("src.knowledge.db.get_db_path", return_value=db_path): init_db() - with get_connection() as conn: - # DOI_A: 2 in 2019, 1 in 2020 - _add_paper(conn, "a1", created_at="2019-05-01", cites_doi=DOI_A) - _add_paper(conn, "a2", created_at="2019-11-20", cites_doi=DOI_A) - _add_paper(conn, "a3", created_at="2020", cites_doi=DOI_A) - # DOI_B: 1 in 2020, 1 in 2021 - _add_paper(conn, "b1", created_at="2020-02-02", cites_doi=DOI_B) - _add_paper(conn, "b2", created_at="2021-07-07", cites_doi=DOI_B) - # Keyword-search paper (no citation link) - excluded from stats - _add_paper(conn, "k1", created_at="2022", cites_doi=None) - # Citing paper with an unusable date - excluded from year buckets - _add_paper(conn, "x1", created_at="", cites_doi=DOI_A) - _add_paper(conn, "x2", created_at=None, cites_doi=DOI_B) - conn.commit() + replace_citation_counts(DOI_A, {2019: 2, 2020: 1}, project="eeglab") + replace_citation_counts(DOI_B, {2020: 1, 2021: 1}, project="eeglab") yield db_path class TestGetCitationStats: - def test_returns_citation_stats_object(self, citations_db: Path): - with patch("src.knowledge.db.get_db_path", return_value=citations_db): + def test_returns_citation_stats_object(self, counts_db: Path): + with patch("src.knowledge.db.get_db_path", return_value=counts_db): stats = get_citation_stats(project="eeglab") assert isinstance(stats, CitationStats) - def test_total_excludes_unlinked_and_undated(self, citations_db: Path): - with patch("src.knowledge.db.get_db_path", return_value=citations_db): + def test_total_sums_all_counts(self, counts_db: Path): + with patch("src.knowledge.db.get_db_path", return_value=counts_db): stats = get_citation_stats(project="eeglab") - # 5 linked papers with valid years (a1,a2,a3,b1,b2); k1 unlinked, - # x1/x2 undated are excluded. - assert stats.total == 5 + assert stats.total == 5 # 2+1 + 1+1 - def test_per_year_aggregates_across_dois(self, citations_db: Path): - with patch("src.knowledge.db.get_db_path", return_value=citations_db): + def test_per_year_aggregates_across_dois(self, counts_db: Path): + with patch("src.knowledge.db.get_db_path", return_value=counts_db): stats = get_citation_stats(project="eeglab") assert stats.per_year == {"2019": 2, "2020": 2, "2021": 1} - def test_per_year_is_sorted_ascending(self, citations_db: Path): - with patch("src.knowledge.db.get_db_path", return_value=citations_db): + def test_per_year_is_sorted_ascending(self, counts_db: Path): + with patch("src.knowledge.db.get_db_path", return_value=counts_db): stats = get_citation_stats(project="eeglab") assert list(stats.per_year.keys()) == sorted(stats.per_year.keys()) - def test_by_paper_stacked_breakdown(self, citations_db: Path): - with patch("src.knowledge.db.get_db_path", return_value=citations_db): + def test_by_paper_stacked_breakdown(self, counts_db: Path): + with patch("src.knowledge.db.get_db_path", return_value=counts_db): stats = get_citation_stats(project="eeglab") assert stats.by_paper == { DOI_A: {"2019": 2, "2020": 1}, DOI_B: {"2020": 1, "2021": 1}, } + def test_replace_overwrites_previous_counts(self, counts_db: Path): + """A re-sync replaces a DOI's histogram wholesale (no stale years).""" + with patch("src.knowledge.db.get_db_path", return_value=counts_db): + replace_citation_counts(DOI_A, {2025: 9}, project="eeglab") + stats = get_citation_stats(project="eeglab") + assert stats.by_paper[DOI_A] == {"2025": 9} + assert "2019" not in stats.per_year # old DOI_A years gone + def test_empty_database(self, tmp_path: Path): db_path = tmp_path / "knowledge" / "empty.db" with patch("src.knowledge.db.get_db_path", return_value=db_path): @@ -92,6 +91,19 @@ def test_empty_database(self, tmp_path: Path): assert stats.per_year == {} assert stats.by_paper == {} + def test_missing_table_returns_empty(self, tmp_path: Path): + """Before any citation sync (table absent), stats are empty, not an error.""" + db_path = tmp_path / "knowledge" / "noinit.db" + with patch("src.knowledge.db.get_db_path", return_value=db_path): + # Create the DB file with a connection but never run init_db, so + # citation_counts does not exist. + with get_connection() as conn: + conn.execute("CREATE TABLE placeholder (id INTEGER)") + conn.commit() + stats = get_citation_stats(project="eeglab") + assert stats.total == 0 + assert stats.by_paper == {} + class TestCitesDoiUpsert: def test_backfill_sets_link_on_existing_row(self, tmp_path: Path): diff --git a/tests/test_knowledge/test_openalex_citations.py b/tests/test_knowledge/test_openalex_citations.py new file mode 100644 index 0000000..af8d014 --- /dev/null +++ b/tests/test_knowledge/test_openalex_citations.py @@ -0,0 +1,205 @@ +"""Tests for the direct OpenAlex citation client. + +Uses httpx.MockTransport to serve canned OpenAlex responses at the transport +layer (an HTTP fixture, not a mock of business logic) so the client's parsing, +pagination, and error handling are exercised without network access. +""" + +import httpx +import pytest + +from src.knowledge.openalex_citations import ( + CitingPaper, + OpenAlexCitationClient, + _strip_doi, + _strip_id, +) + + +def _client(handler) -> OpenAlexCitationClient: + transport = httpx.MockTransport(handler) + return OpenAlexCitationClient(email="t@example.org", client=httpx.Client(transport=transport)) + + +class TestHelpers: + def test_strip_id(self): + assert _strip_id("https://openalex.org/W123") == "W123" + assert _strip_id("W123") == "W123" + assert _strip_id(None) == "" + + def test_strip_doi(self): + assert _strip_doi("https://doi.org/10.1/x") == "10.1/x" + assert _strip_doi("10.1/x") == "10.1/x" + assert _strip_doi(None) is None + + +class TestResolveWorkId: + def test_resolves_doi_to_work_id(self): + def handler(request: httpx.Request) -> httpx.Response: + assert "/works/doi:10.1/x" in str(request.url) + return httpx.Response(200, json={"id": "https://openalex.org/W999"}) + + with _client(handler) as c: + assert c.resolve_work_id("10.1/x") == "W999" + + def test_unresolved_doi_returns_none(self): + def handler(_request: httpx.Request) -> httpx.Response: + return httpx.Response(404, json={"error": "not found"}) + + with _client(handler) as c: + assert c.resolve_work_id("10.1/missing") is None + + def test_includes_mailto_param(self): + seen = {} + + def handler(request: httpx.Request) -> httpx.Response: + seen["mailto"] = request.url.params.get("mailto") + return httpx.Response(200, json={"id": "https://openalex.org/W1"}) + + with _client(handler) as c: + c.resolve_work_id("10.1/x") + assert seen["mailto"] == "t@example.org" + + +class TestCountsByYear: + def test_parses_group_by_counts(self): + def handler(request: httpx.Request) -> httpx.Response: + assert request.url.params.get("group_by") == "publication_year" + assert request.url.params.get("filter") == "cites:W1" + return httpx.Response( + 200, + json={ + "meta": {"count": 17}, + "group_by": [ + {"key": "2024", "count": 10}, + {"key": "2023", "count": 5}, + {"key": "2022", "count": 2}, + ], + }, + ) + + with _client(handler) as c: + counts = c.counts_by_year("W1") + assert counts == {2024: 10, 2023: 5, 2022: 2} + + def test_skips_non_year_buckets(self): + def handler(_request: httpx.Request) -> httpx.Response: + return httpx.Response( + 200, + json={ + "group_by": [ + {"key": "2024", "count": 3}, + {"key": "unknown", "count": 9}, + {"key": None, "count": 1}, + ] + }, + ) + + with _client(handler) as c: + counts = c.counts_by_year("W1") + assert counts == {2024: 3} + + +class TestRecentCitingPapers: + def test_paginates_with_cursor(self): + # Two pages: cursor "*" -> two works + next_cursor "p2"; "p2" -> one work, end. + def handler(request: httpx.Request) -> httpx.Response: + cursor = request.url.params.get("cursor") + assert request.url.params.get("sort") == "publication_date:desc" + if cursor == "*": + return httpx.Response( + 200, + json={ + "meta": {"next_cursor": "p2"}, + "results": [ + { + "id": "https://openalex.org/W10", + "doi": "https://doi.org/10.1/a", + "title": "Newest", + "publication_date": "2026-01-01", + }, + { + "id": "https://openalex.org/W11", + "doi": None, + "title": "Second", + "publication_date": "2025-06-01", + }, + ], + }, + ) + return httpx.Response( + 200, + json={ + "meta": {"next_cursor": None}, + "results": [ + { + "id": "https://openalex.org/W12", + "doi": "10.1/c", + "title": "Third", + "publication_date": "2025-01-01", + } + ], + }, + ) + + with _client(handler) as c: + papers = c.recent_citing_papers("W1", limit=100) + + assert [p.openalex_id for p in papers] == ["W10", "W11", "W12"] + assert all(isinstance(p, CitingPaper) for p in papers) + assert papers[0].doi == "10.1/a" # url-form DOI normalized + assert papers[1].doi is None + assert papers[0].url == "https://doi.org/10.1/a" + + def test_respects_limit_across_pages(self): + def handler(request: httpx.Request) -> httpx.Response: + # Always offer a next cursor; the client must stop at the limit. + return httpx.Response( + 200, + json={ + "meta": {"next_cursor": "more"}, + "results": [ + { + "id": f"https://openalex.org/W{request.url.params.get('cursor')}", + "doi": None, + "title": "P", + "publication_date": "2025-01-01", + } + ], + }, + ) + + with _client(handler) as c: + papers = c.recent_citing_papers("W1", limit=3) + assert len(papers) == 3 + + def test_skips_titleless_works(self): + def handler(_request: httpx.Request) -> httpx.Response: + return httpx.Response( + 200, + json={ + "meta": {"next_cursor": None}, + "results": [ + {"id": "https://openalex.org/W1", "title": None, "doi": None}, + { + "id": "https://openalex.org/W2", + "title": "Has title", + "doi": None, + "publication_date": "2025-01-01", + }, + ], + }, + ) + + with _client(handler) as c: + papers = c.recent_citing_papers("W1", limit=10) + assert [p.openalex_id for p in papers] == ["W2"] + + +class TestErrorPropagation: + def test_http_error_raises(self): + def handler(_request: httpx.Request) -> httpx.Response: + return httpx.Response(500, json={"error": "server"}) + + with _client(handler) as c, pytest.raises(httpx.HTTPStatusError): + c.counts_by_year("W1") diff --git a/tests/test_knowledge/test_papers_sync.py b/tests/test_knowledge/test_papers_sync.py index edf45c3..aa12600 100644 --- a/tests/test_knowledge/test_papers_sync.py +++ b/tests/test_knowledge/test_papers_sync.py @@ -9,11 +9,13 @@ from pathlib import Path from unittest.mock import patch +import httpx import pytest from opencite import IDSet, Paper import src.knowledge.papers_sync as ps from src.knowledge.db import get_connection, init_db +from src.knowledge.openalex_citations import OpenAlexCitationClient from src.knowledge.papers_sync import ( _cache_papers_async, _paper_source_and_id, @@ -26,6 +28,7 @@ sync_citing_papers, sync_openalex_papers, ) +from src.knowledge.search import get_citation_stats @pytest.fixture @@ -329,3 +332,83 @@ def test_sync_all_papers_rejects_bare_string(self) -> None: def test_sync_citing_papers_rejects_bare_string(self) -> None: with pytest.raises(TypeError, match="must be a list of strings"): sync_citing_papers(dois="10.3389/fnins.2013.00267") # type: ignore[arg-type] + + +class TestSyncCitingPapers: + """End-to-end sync via a mock OpenAlex transport (real client + real DB).""" + + def _handler(self, request: httpx.Request) -> httpx.Response: + url = str(request.url) + if "/works/doi:" in url: + return httpx.Response(200, json={"id": "https://openalex.org/W1"}) + if request.url.params.get("group_by") == "publication_year": + return httpx.Response( + 200, + json={"group_by": [{"key": "2024", "count": 3}, {"key": "2025", "count": 7}]}, + ) + # recent citing papers page (single page) + return httpx.Response( + 200, + json={ + "meta": {"next_cursor": None}, + "results": [ + { + "id": "https://openalex.org/W2", + "doi": "10.1/citing-a", + "title": "Citing paper A", + "publication_date": "2025-03-01", + }, + { + "id": "https://openalex.org/W3", + "doi": None, + "title": "Citing paper B", + "publication_date": "2024-09-01", + }, + ], + }, + ) + + def test_stores_true_counts_and_recent_papers(self, tmp_path: Path, monkeypatch) -> None: + def factory(**_kwargs): + transport = httpx.MockTransport(self._handler) + return OpenAlexCitationClient(client=httpx.Client(transport=transport)) + + monkeypatch.setattr(ps, "OpenAlexCitationClient", factory) + + db_path = tmp_path / "knowledge" / "test.db" + with patch("src.knowledge.db.get_db_path", return_value=db_path): + init_db("test") + stored = sync_citing_papers(["10.1/canon"], project="test") + stats = get_citation_stats("test") + with get_connection("test") as conn: + rows = conn.execute( + "SELECT external_id, cites_doi FROM papers WHERE cites_doi IS NOT NULL" + ).fetchall() + + # Counts come from the (uncapped) group_by histogram, not the stored rows. + assert stats.by_paper == {"10.1/canon": {"2024": 3, "2025": 7}} + assert stats.total == 10 + # Two recent citing papers stored and linked to the canonical DOI. + assert stored == 2 + assert {r["external_id"] for r in rows} == {"W2", "W3"} + assert all(r["cites_doi"] == "10.1/canon" for r in rows) + + def test_unresolved_doi_skipped(self, tmp_path: Path, monkeypatch) -> None: + def handler(_request: httpx.Request) -> httpx.Response: + return httpx.Response(404, json={"error": "not found"}) + + def factory(**_kwargs): + return OpenAlexCitationClient( + client=httpx.Client(transport=httpx.MockTransport(handler)) + ) + + monkeypatch.setattr(ps, "OpenAlexCitationClient", factory) + + db_path = tmp_path / "knowledge" / "test.db" + with patch("src.knowledge.db.get_db_path", return_value=db_path): + init_db("test") + stored = sync_citing_papers(["10.1/missing"], project="test") + stats = get_citation_stats("test") + + assert stored == 0 + assert stats.total == 0 From 53bba5e47a25046a2a21c24be84551388563f949 Mon Sep 17 00:00:00 2001 From: Seyed Yahya Shirazi Date: Tue, 9 Jun 2026 18:04:23 -0700 Subject: [PATCH 4/4] fix(citations): address PR review findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Critical: never wipe stored counts on an empty histogram (likely a transient OpenAlex gap) — skip the DOI with a warning instead. - sync all no longer forwards --limit to citations (would re-cap the stored sample at 100); uses the 2000 default like sync papers. - recent_citing_papers: bound page count and stop on an empty results page so a stuck/non-null cursor can't spin; build the stored URL from the normalized DOI for consistency. - replace_citation_counts: explicit rollback so a DOI is never half-replaced. - Per-DOI failure log includes the exception type. Tests: empty-counts-does-not-wipe, empty-results stop, absent-meta stop, normalized-URL. --- src/cli/sync.py | 5 +- src/knowledge/db.py | 21 +++++--- src/knowledge/openalex_citations.py | 24 +++++++-- src/knowledge/papers_sync.py | 23 +++++++-- .../test_knowledge/test_openalex_citations.py | 50 +++++++++++++++++++ tests/test_knowledge/test_papers_sync.py | 32 +++++++++++- 6 files changed, 137 insertions(+), 18 deletions(-) diff --git a/src/cli/sync.py b/src/cli/sync.py index 2abf636..f57bbec 100644 --- a/src/cli/sync.py +++ b/src/cli/sync.py @@ -581,10 +581,11 @@ def sync_all( ) paper_total += sum(paper_results.values()) - # Sync citing papers + # Sync citing papers. Counts are uncapped; the stored sample uses + # sync_citing_papers' own default cap, not the per-query --limit. if dois: with console.status("[green]Syncing citing papers...[/green]"): - citing_count = sync_citing_papers(dois, max_results=limit, project=comm_id) + citing_count = sync_citing_papers(dois, project=comm_id) paper_total += citing_count console.print(f"[green]Papers: {paper_total} items[/green]") diff --git a/src/knowledge/db.py b/src/knowledge/db.py index 79c859f..416b454 100644 --- a/src/knowledge/db.py +++ b/src/knowledge/db.py @@ -778,14 +778,19 @@ def replace_citation_counts(cites_doi: str, counts: dict[int, int], project: str """ now = _now_iso() with get_connection(project) as conn: - conn.execute("DELETE FROM citation_counts WHERE cites_doi = ?", (cites_doi,)) - if counts: - conn.executemany( - "INSERT INTO citation_counts (cites_doi, year, count, synced_at) " - "VALUES (?, ?, ?, ?)", - [(cites_doi, year, count, now) for year, count in counts.items()], - ) - conn.commit() + try: + conn.execute("DELETE FROM citation_counts WHERE cites_doi = ?", (cites_doi,)) + if counts: + conn.executemany( + "INSERT INTO citation_counts (cites_doi, year, count, synced_at) " + "VALUES (?, ?, ?, ?)", + [(cites_doi, year, count, now) for year, count in counts.items()], + ) + conn.commit() + except Exception: + # Keep the delete+insert atomic: never leave a DOI half-replaced. + conn.rollback() + raise def upsert_bep_item( diff --git a/src/knowledge/openalex_citations.py b/src/knowledge/openalex_citations.py index 9e692cc..db66b84 100644 --- a/src/knowledge/openalex_citations.py +++ b/src/knowledge/openalex_citations.py @@ -131,7 +131,12 @@ def recent_citing_papers(self, work_id: str, limit: int = 2000) -> list[CitingPa """ papers: list[CitingPaper] = [] cursor: str | None = "*" - while cursor and len(papers) < limit: + # Bound the page count: a highly-cited work may have title-less records + # that never accumulate, so cap pages (with headroom) to avoid spinning. + pages = 0 + max_pages = (limit // _PER_PAGE) + 50 + while cursor and len(papers) < limit and pages < max_pages: + pages += 1 page_size = min(_PER_PAGE, limit - len(papers)) resp = self._client.get( f"{OPENALEX_BASE}/works", @@ -145,20 +150,31 @@ def recent_citing_papers(self, work_id: str, limit: int = 2000) -> list[CitingPa ) resp.raise_for_status() data = resp.json() - for work in data.get("results", []): + results = data.get("results", []) + if not results: + break # no more works; a non-null cursor with no rows would spin + for work in results: title = work.get("title") if not title: continue + doi = _strip_doi(work.get("doi")) papers.append( CitingPaper( openalex_id=_strip_id(work.get("id")), - doi=_strip_doi(work.get("doi")), + doi=doi, title=title, publication_date=work.get("publication_date"), - url=work.get("doi") or work.get("id") or "", + url=f"https://doi.org/{doi}" if doi else (work.get("id") or ""), ) ) if len(papers) >= limit: break cursor = data.get("meta", {}).get("next_cursor") + if pages >= max_pages and cursor: + logger.warning( + "recent_citing_papers hit page cap for %s (%d pages, %d stored)", + work_id, + pages, + len(papers), + ) return papers diff --git a/src/knowledge/papers_sync.py b/src/knowledge/papers_sync.py index 9d7ce8d..cba7f03 100644 --- a/src/knowledge/papers_sync.py +++ b/src/knowledge/papers_sync.py @@ -390,7 +390,7 @@ def _store_citing_papers(papers: Iterable[CitingPaper], project: str, *, cites_d external_id=paper.openalex_id, title=paper.title, first_message=None, - url=paper.url or (f"https://doi.org/{paper.doi}" if paper.doi else ""), + url=paper.url, created_at=paper.publication_date, cites_doi=cites_doi, ) @@ -447,6 +447,17 @@ def sync_citing_papers( # 1. Complete per-year counts (source of truth for the chart). counts = client.counts_by_year(work_id) + if not counts: + # A canonical paper with zero citations is implausible; an + # empty histogram almost always means a transient OpenAlex + # gap. Do not wipe existing counts on a likely-bad read. + logger.warning( + "Empty citation histogram for %s (work %s); keeping existing " + "counts and skipping this DOI", + doi, + work_id, + ) + continue replace_citation_counts(doi, counts, project) total_citations = sum(counts.values()) @@ -462,9 +473,15 @@ def sync_citing_papers( stored, ) total_stored += stored - except Exception: + except Exception as exc: # Isolate per-DOI so one failure does not abort the batch. - logger.exception("citation sync failed for %s (%s)", doi, project) + logger.exception( + "citation sync failed for %s (%s): %s: %s", + doi, + project, + type(exc).__name__, + exc, + ) return total_stored diff --git a/tests/test_knowledge/test_openalex_citations.py b/tests/test_knowledge/test_openalex_citations.py index af8d014..7e270bb 100644 --- a/tests/test_knowledge/test_openalex_citations.py +++ b/tests/test_knowledge/test_openalex_citations.py @@ -173,6 +173,56 @@ def handler(request: httpx.Request) -> httpx.Response: papers = c.recent_citing_papers("W1", limit=3) assert len(papers) == 3 + def test_stops_on_empty_results_page(self): + # A non-null cursor with no results must not spin forever. + calls = {"n": 0} + + def handler(request: httpx.Request) -> httpx.Response: + calls["n"] += 1 + if request.url.params.get("cursor") == "*": + return httpx.Response( + 200, + json={ + "meta": {"next_cursor": "p2"}, + "results": [ + { + "id": "https://openalex.org/W1", + "doi": None, + "title": "P", + "publication_date": "2025-01-01", + } + ], + }, + ) + # Second page: cursor still present but no results -> must stop. + return httpx.Response(200, json={"meta": {"next_cursor": "p3"}, "results": []}) + + with _client(handler) as c: + papers = c.recent_citing_papers("W1", limit=100) + assert len(papers) == 1 + assert calls["n"] == 2 # stopped at the empty page, did not continue + + def test_absent_meta_stops_pagination(self): + def handler(_request: httpx.Request) -> httpx.Response: + return httpx.Response( + 200, + json={ + "results": [ + { + "id": "https://openalex.org/W1", + "doi": "10.1/x", + "title": "P", + "publication_date": "2025-01-01", + } + ] + }, + ) + + with _client(handler) as c: + papers = c.recent_citing_papers("W1", limit=100) + assert len(papers) == 1 + assert papers[0].url == "https://doi.org/10.1/x" # url built from stripped doi + def test_skips_titleless_works(self): def handler(_request: httpx.Request) -> httpx.Response: return httpx.Response( diff --git a/tests/test_knowledge/test_papers_sync.py b/tests/test_knowledge/test_papers_sync.py index aa12600..9c4ba93 100644 --- a/tests/test_knowledge/test_papers_sync.py +++ b/tests/test_knowledge/test_papers_sync.py @@ -14,7 +14,7 @@ from opencite import IDSet, Paper import src.knowledge.papers_sync as ps -from src.knowledge.db import get_connection, init_db +from src.knowledge.db import get_connection, init_db, replace_citation_counts from src.knowledge.openalex_citations import OpenAlexCitationClient from src.knowledge.papers_sync import ( _cache_papers_async, @@ -412,3 +412,33 @@ def factory(**_kwargs): assert stored == 0 assert stats.total == 0 + + def test_empty_counts_does_not_wipe_existing(self, tmp_path: Path, monkeypatch) -> None: + # An empty histogram (likely a transient API gap) must not erase the + # previously stored counts for that canonical DOI. + def handler(request: httpx.Request) -> httpx.Response: + if "/works/doi:" in str(request.url): + return httpx.Response(200, json={"id": "https://openalex.org/W1"}) + if request.url.params.get("group_by"): + return httpx.Response(200, json={"group_by": []}) # transient gap + return httpx.Response(200, json={"meta": {"next_cursor": None}, "results": []}) + + def factory(**_kwargs): + return OpenAlexCitationClient( + client=httpx.Client(transport=httpx.MockTransport(handler)) + ) + + monkeypatch.setattr(ps, "OpenAlexCitationClient", factory) + + db_path = tmp_path / "knowledge" / "test.db" + with patch("src.knowledge.db.get_db_path", return_value=db_path): + init_db("test") + # Seed good counts as if a prior healthy sync ran. + replace_citation_counts("10.1/canon", {2024: 50, 2025: 80}, project="test") + + stored = sync_citing_papers(["10.1/canon"], project="test") + stats = get_citation_stats("test") + + assert stored == 0 + # Existing histogram is preserved, not wiped to empty. + assert stats.by_paper == {"10.1/canon": {"2024": 50, "2025": 80}}