Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions src/cli/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,16 +364,18 @@ def sync_papers(
total += count
console.print(f" [dim]{src}: {count} papers[/dim]")

# Sync citing papers if DOIs are configured
# Sync citing papers if DOIs are configured. Counts are fetched complete
# (uncapped) from OpenAlex; only the stored sample of recent citing papers
# uses the default cap, independent of the query --limit above.
if include_citations:
dois = _get_community_paper_dois(community)
if dois:
console.print(f"\n[dim]Syncing papers citing {len(dois)} DOI(s)...[/dim]")
with console.status("[green]Syncing citing papers...[/green]"):
citing_count = sync_citing_papers(dois, limit, project=community)
console.print(f"\n[dim]Syncing citations for {len(dois)} DOI(s)...[/dim]")
with console.status("[green]Syncing citations...[/green]"):
citing_count = sync_citing_papers(dois, project=community)
results_by_source["citing"] = citing_count
total += citing_count
console.print(f"[dim]Citing papers: {citing_count}[/dim]")
console.print(f"[dim]Recent citing papers stored: {citing_count}[/dim]")

console.print(f"\n[green]Total papers synced for {community}: {total}[/green]")

Expand Down Expand Up @@ -579,10 +581,11 @@ def sync_all(
)
paper_total += sum(paper_results.values())

# Sync citing papers
# Sync citing papers. Counts are uncapped; the stored sample uses
# sync_citing_papers' own default cap, not the per-query --limit.
if dois:
with console.status("[green]Syncing citing papers...[/green]"):
citing_count = sync_citing_papers(dois, max_results=limit, project=comm_id)
citing_count = sync_citing_papers(dois, project=comm_id)
paper_total += citing_count

console.print(f"[green]Papers: {paper_total} items[/green]")
Expand Down
41 changes: 41 additions & 0 deletions src/knowledge/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,18 @@ def active_mirror_context(mirror_id: str) -> Iterator[None]:
UNIQUE(source_type, source_name)
);

-- True per-year citation counts per canonical DOI, fetched from OpenAlex
-- group_by (complete, uncapped). This is the source of truth for the public
-- citations dashboard; the papers table only stores a recent sample of the
-- citing papers themselves for the search tool.
CREATE TABLE IF NOT EXISTS citation_counts (
cites_doi TEXT NOT NULL,
year INTEGER NOT NULL,
count INTEGER NOT NULL,
synced_at TEXT NOT NULL,
PRIMARY KEY (cites_doi, year)
);

-- Docstrings extracted from source code
CREATE TABLE IF NOT EXISTS docstrings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
Expand Down Expand Up @@ -752,6 +764,35 @@ def update_sync_metadata(
conn.commit()


def replace_citation_counts(cites_doi: str, counts: dict[int, int], project: str = "hed") -> None:
"""Replace the stored per-year citation counts for one canonical DOI.

The counts are an exact, complete histogram from OpenAlex, so the row set
is replaced wholesale (delete + insert) inside one transaction: this keeps
the table an accurate mirror and drops any year that no longer appears.

Args:
cites_doi: Canonical DOI whose citations these counts describe.
counts: Mapping of publication year to citing-paper count.
project: Assistant/project name. Defaults to 'hed'.
"""
now = _now_iso()
with get_connection(project) as conn:
try:
conn.execute("DELETE FROM citation_counts WHERE cites_doi = ?", (cites_doi,))
if counts:
conn.executemany(
"INSERT INTO citation_counts (cites_doi, year, count, synced_at) "
"VALUES (?, ?, ?, ?)",
[(cites_doi, year, count, now) for year, count in counts.items()],
)
conn.commit()
except Exception:
# Keep the delete+insert atomic: never leave a DOI half-replaced.
conn.rollback()
raise


def upsert_bep_item(
conn: sqlite3.Connection,
*,
Expand Down
180 changes: 180 additions & 0 deletions src/knowledge/openalex_citations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
"""Direct OpenAlex client for citation analysis.

opencite returns citing papers from a single page (<=200), ordered for its own
ranking, with no pagination and no aggregation exposed. For a citations
dashboard that silently truncates recent citations (the first page skews to
older, highly-cited works). We therefore query OpenAlex directly:

- ``counts_by_year`` uses ``group_by=publication_year`` for the *exact,
complete* per-year histogram with no cap.
- ``recent_citing_papers`` cursor-paginates ``sort=publication_date:desc`` to
collect the latest N citing papers for the search corpus.

The client takes an optional injected ``httpx.Client`` so tests can supply an
``httpx.MockTransport`` instead of hitting the network.
"""

import logging
from dataclasses import dataclass

import httpx

logger = logging.getLogger(__name__)

OPENALEX_BASE = "https://api.openalex.org"
_TIMEOUT = 30.0
_PER_PAGE = 200 # OpenAlex maximum page size


@dataclass
class CitingPaper:
"""A minimal citing-paper record for the search corpus."""

openalex_id: str
doi: str | None
title: str
publication_date: str | None
url: str


def _strip_id(value: str | None) -> str:
"""Reduce an OpenAlex IRI (https://openalex.org/W123) to its bare id."""
if not value:
return ""
return value.rstrip("/").rsplit("/", 1)[-1]


def _strip_doi(value: str | None) -> str | None:
"""Reduce a DOI URL to the bare ``10.xxxx/yyyy`` form."""
if not value:
return None
cleaned = value.strip()
for prefix in ("https://doi.org/", "http://doi.org/", "https://dx.doi.org/"):
if cleaned.lower().startswith(prefix):
cleaned = cleaned[len(prefix) :]
break
return cleaned or None


class OpenAlexCitationClient:
"""Queries OpenAlex for citation counts and recent citing papers."""

def __init__(
self,
*,
email: str = "",
api_key: str = "",
client: httpx.Client | None = None,
) -> None:
self._email = email
self._api_key = api_key
self._owns_client = client is None
self._client = client or httpx.Client(timeout=_TIMEOUT)

def __enter__(self) -> "OpenAlexCitationClient":
return self

def __exit__(self, *exc: object) -> None:
self.close()

def close(self) -> None:
if self._owns_client:
self._client.close()

def _params(self, **extra: object) -> dict[str, object]:
params: dict[str, object] = dict(extra)
# mailto routes to the polite pool; api_key unlocks premium throughput.
if self._email:
params["mailto"] = self._email
if self._api_key:
params["api_key"] = self._api_key
return params

def resolve_work_id(self, doi: str) -> str | None:
"""Resolve a DOI to its OpenAlex work id (e.g. ``W2128495200``)."""
resp = self._client.get(
f"{OPENALEX_BASE}/works/doi:{doi}",
params=self._params(select="id"),
)
if resp.status_code == 404:
logger.warning("OpenAlex has no work for DOI %s", doi)
return None
resp.raise_for_status()
work_id = _strip_id(resp.json().get("id"))
return work_id or None

def counts_by_year(self, work_id: str) -> dict[int, int]:
"""Return the complete per-year count of works citing ``work_id``.

Uses OpenAlex ``group_by`` so the counts are exact and uncapped,
independent of how many citing papers are stored.
"""
resp = self._client.get(
f"{OPENALEX_BASE}/works",
params=self._params(filter=f"cites:{work_id}", group_by="publication_year"),
)
resp.raise_for_status()
counts: dict[int, int] = {}
for group in resp.json().get("group_by", []):
try:
year = int(group["key"])
except (KeyError, TypeError, ValueError):
continue # non-year buckets (e.g. "unknown") are skipped
counts[year] = int(group.get("count", 0))
return counts

def recent_citing_papers(self, work_id: str, limit: int = 2000) -> list[CitingPaper]:
"""Collect up to ``limit`` most-recent works citing ``work_id``.

Cursor-paginates ``sort=publication_date:desc`` so the stored sample is
the newest citations rather than an arbitrary first page.
"""
papers: list[CitingPaper] = []
cursor: str | None = "*"
# Bound the page count: a highly-cited work may have title-less records
# that never accumulate, so cap pages (with headroom) to avoid spinning.
pages = 0
max_pages = (limit // _PER_PAGE) + 50
while cursor and len(papers) < limit and pages < max_pages:
pages += 1
page_size = min(_PER_PAGE, limit - len(papers))
resp = self._client.get(
f"{OPENALEX_BASE}/works",
params=self._params(
filter=f"cites:{work_id}",
sort="publication_date:desc",
select="id,doi,title,publication_date",
cursor=cursor,
**{"per-page": page_size},
),
)
resp.raise_for_status()
data = resp.json()
results = data.get("results", [])
if not results:
break # no more works; a non-null cursor with no rows would spin
for work in results:
title = work.get("title")
if not title:
continue
doi = _strip_doi(work.get("doi"))
papers.append(
CitingPaper(
openalex_id=_strip_id(work.get("id")),
doi=doi,
title=title,
publication_date=work.get("publication_date"),
url=f"https://doi.org/{doi}" if doi else (work.get("id") or ""),
)
)
if len(papers) >= limit:
break
cursor = data.get("meta", {}).get("next_cursor")
if pages >= max_pages and cursor:
logger.warning(
"recent_citing_papers hit page cap for %s (%d pages, %d stored)",
work_id,
pages,
len(papers),
)
return papers
Loading
Loading