diff --git a/backend/api/database_search/router.py b/backend/api/database_search/router.py index a5b4e79f..9324ed95 100644 --- a/backend/api/database_search/router.py +++ b/backend/api/database_search/router.py @@ -7,6 +7,9 @@ from ..services.citation_search.pubmed_citation_collection import PubMedCitationCollector from ..services.citation_search.europePMC_citation_collection import EuropePMCCitationCollector from ..services.citation_search.scopus_citation_collection import ScopusDataProcessor +from ..services.citation_search.citation_search_helper import * +from ..services.storage import storage_service +from io import BytesIO import logging import os @@ -18,28 +21,6 @@ import datetime -def azure_client(container_name: str): - connection_string = settings.AZURE_STORAGE_CONNECTION_STRING - blob_service_client = BlobServiceClient.from_connection_string(connection_string) - blob_client = blob_service_client.get_container_client(container_name) - return blob_client - - -def read_blob_to_df(blob_name: str, container_name: str) -> pd.DataFrame: - """Read a blob (Parquet or CSV) into a pandas DataFrame.""" - blob_client = azure_client(container_name).get_blob_client(blob_name) - downloaded_blob = blob_client.download_blob().content_as_text() - blob_df = pd.read_csv(StringIO(downloaded_blob), sep=",") - return blob_df - - -def write_df_to_blob(df: pd.DataFrame, blob_name: str, container_name: str): - """Write a pandas DataFrame to blob as Parquet.""" - buffer = BytesIO() - df.to_csv(buffer, index=False, encoding='utf-8') - buffer.seek(0) - blob_client = azure_client(container_name).get_blob_client(blob_name) - blob_client.upload_blob(buffer, overwrite=True) router = APIRouter() @@ -58,6 +39,9 @@ async def database_search( MAXDATE = None database = payload.database SEARCH_TERM = payload.search_term + + if not storage_service: + raise HTTPException(status_code=500, detail="Storage not configured") if not SEARCH_TERM : SEARCH_TERM = '(("epidemiological parameters"[Title/Abstract] OR "incidence"[MeSH Terms]))' @@ -104,10 +88,15 @@ async def database_search( blob_name = f"{database}-{datetime.datetime.now().strftime('%Y-%m-%d')}.csv" - container_name = f"citation-data/bronze-data/{database}/archive" + + archive_path = ( + f"{settings.STORAGE_CONTAINER_NAME}/" + f"citation-data/bronze-data/{database}/archive/{blob_name}" + ) + try: - write_df_to_blob(content, blob_name, container_name) - logging.info(f"Uploaded blob '{blob_name}' to container '{container_name}'") + await save_df(archive_path, content) + logging.info(f"Uploaded blob '{blob_name}' to container '{settings.STORAGE_CONTAINER_NAME}'") except AzureError as e: logging.error(f"Azure Blob Storage error: {e.message if hasattr(e, 'message') else str(e)}") raise HTTPException(status_code=500, detail="Failed to write to Azure Blob") @@ -115,8 +104,14 @@ async def database_search( logging.error(f"Unexpected error: {str(ex)}") raise HTTPException(status_code=500, detail="Unexpected error while writing to blob") + all_path = ( + f"{settings.STORAGE_CONTAINER_NAME}/" + f"citation-data/bronze-data/{database}/{database}-all-citations.csv" + ) + try: - all_citations_df = read_blob_to_df(f"{database}-all-citations.csv", f"citation-data/bronze-data/{database}") + bytes_data, _ = await storage_service.get_bytes_by_path(all_path) + all_citations_df = pd.read_csv(BytesIO(bytes_data)) except Exception: all_citations_df = content.iloc[0:0].copy() @@ -132,7 +127,35 @@ async def database_search( logging.info('New citations found: %s', len(new_citations)) - write_df_to_blob(new_all_citations, f"{database}-all-citations.csv", f"citation-data/bronze-data/{database}") - write_df_to_blob(new_citations, blob_name, "citation-deduplicate/to-process") + await save_df(all_path, new_all_citations) + + silver_path = ( + f"{settings.STORAGE_CONTAINER_NAME}/" + f"citation-data/silver-data/silver-citation-data.csv" + ) + combined_df = await load_all_database_searches() + combined_df = normalize_ids(combined_df) + await save_df(silver_path, combined_df) + + return {"message": f"{database} collection completed", "new_citations_count": len(new_citations)} + +@router.get("/{sr_id}/combine") +async def combine_sources(sr_id: str): + combined_df = await load_all_database_searches() + + if combined_df.empty: + return {"message": "No data found, perform search first"} + + combined_df = normalize_ids(combined_df) + + silver_path = ( + f"{settings.STORAGE_CONTAINER_NAME}/" + f"citation-data/silver-data/silver-citation-data.csv" + ) + + await save_df(silver_path, combined_df) - return {"message": f"{database} collection completed", "new_citations_count": len(new_citations)} \ No newline at end of file + return { + "total citations": len(combined_df) + } + \ No newline at end of file diff --git a/backend/api/services/citation_search/citation_search_helper.py b/backend/api/services/citation_search/citation_search_helper.py new file mode 100644 index 00000000..85a091f2 --- /dev/null +++ b/backend/api/services/citation_search/citation_search_helper.py @@ -0,0 +1,52 @@ +from typing import Dict, Any, List, Optional +from ...core.security import get_current_active_user +from ...core.config import settings +from ..storage import storage_service + +import logging +import os +import pandas as pd +from io import StringIO, BytesIO +import azure.functions as func +from azure.core.exceptions import AzureError +from azure.storage.blob import BlobServiceClient + +def normalize_ids(df: pd.DataFrame): + if "pmid" in df.columns: + df["id"] = df["pmid"] + elif "eid" in df.columns: + df["id"] = df["eid"] + elif "id" not in df.columns: + df["id"] = None + return df + +async def load_all_database_searches(): + base_path = "citation-data/bronze-data" + + databases = ["Pubmed", "EuropePMC", "Scopus"] + dfs = [] + + for database in databases: + all_path = ( + f"{settings.STORAGE_CONTAINER_NAME}/" + f"{base_path}/{database}/{database}-all-citations.csv" + ) + + try: + bytes_data, _ = await storage_service.get_bytes_by_path(all_path) + df = pd.read_csv(BytesIO(bytes_data)) + df["source"] = database + dfs.append(df) + except Exception as e: + logging.warning(f"{database} missing: {e}") + + if not dfs: + return pd.DataFrame() + + return pd.concat(dfs, ignore_index=True) + +async def save_df(path: str, df: pd.DataFrame): + buffer = BytesIO() + df.to_csv(buffer, index=False, encoding="utf-8") + buffer.seek(0) + await storage_service.put_bytes_by_path(path, buffer.getvalue(), "text/csv") \ No newline at end of file diff --git a/backend/requirements.txt b/backend/requirements.txt index 1f1d93e6..fb7bee77 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -62,5 +62,4 @@ itsdangerous==2.2.0 biopython==1.86 clean-text==0.7.1 azure-functions==1.24.0 - -pandas +pandas==2.2.2