Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 52 additions & 29 deletions backend/api/database_search/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
from ..services.citation_search.pubmed_citation_collection import PubMedCitationCollector
from ..services.citation_search.europePMC_citation_collection import EuropePMCCitationCollector
from ..services.citation_search.scopus_citation_collection import ScopusDataProcessor
from ..services.citation_search.citation_search_helper import *
from ..services.storage import storage_service
from io import BytesIO

import logging
import os
Expand All @@ -18,28 +21,6 @@

import datetime

def azure_client(container_name: str):
connection_string = settings.AZURE_STORAGE_CONNECTION_STRING
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
blob_client = blob_service_client.get_container_client(container_name)
return blob_client


def read_blob_to_df(blob_name: str, container_name: str) -> pd.DataFrame:
"""Read a blob (Parquet or CSV) into a pandas DataFrame."""
blob_client = azure_client(container_name).get_blob_client(blob_name)
downloaded_blob = blob_client.download_blob().content_as_text()
blob_df = pd.read_csv(StringIO(downloaded_blob), sep=",")
return blob_df


def write_df_to_blob(df: pd.DataFrame, blob_name: str, container_name: str):
"""Write a pandas DataFrame to blob as Parquet."""
buffer = BytesIO()
df.to_csv(buffer, index=False, encoding='utf-8')
buffer.seek(0)
blob_client = azure_client(container_name).get_blob_client(blob_name)
blob_client.upload_blob(buffer, overwrite=True)

router = APIRouter()

Expand All @@ -58,6 +39,9 @@ async def database_search(
MAXDATE = None
database = payload.database
SEARCH_TERM = payload.search_term

if not storage_service:
raise HTTPException(status_code=500, detail="Storage not configured")

if not SEARCH_TERM :
SEARCH_TERM = '(("epidemiological parameters"[Title/Abstract] OR "incidence"[MeSH Terms]))'
Expand Down Expand Up @@ -104,19 +88,30 @@ async def database_search(


blob_name = f"{database}-{datetime.datetime.now().strftime('%Y-%m-%d')}.csv"
container_name = f"citation-data/bronze-data/{database}/archive"

archive_path = (
f"{settings.STORAGE_CONTAINER_NAME}/"
f"citation-data/bronze-data/{database}/archive/{blob_name}"
)

try:
write_df_to_blob(content, blob_name, container_name)
logging.info(f"Uploaded blob '{blob_name}' to container '{container_name}'")
await save_df(archive_path, content)
logging.info(f"Uploaded blob '{blob_name}' to container '{settings.STORAGE_CONTAINER_NAME}'")
except AzureError as e:
logging.error(f"Azure Blob Storage error: {e.message if hasattr(e, 'message') else str(e)}")
raise HTTPException(status_code=500, detail="Failed to write to Azure Blob")
except Exception as ex:
logging.error(f"Unexpected error: {str(ex)}")
raise HTTPException(status_code=500, detail="Unexpected error while writing to blob")

all_path = (
f"{settings.STORAGE_CONTAINER_NAME}/"
f"citation-data/bronze-data/{database}/{database}-all-citations.csv"
)

try:
all_citations_df = read_blob_to_df(f"{database}-all-citations.csv", f"citation-data/bronze-data/{database}")
bytes_data, _ = await storage_service.get_bytes_by_path(all_path)
all_citations_df = pd.read_csv(BytesIO(bytes_data))
except Exception:
all_citations_df = content.iloc[0:0].copy()

Expand All @@ -132,7 +127,35 @@ async def database_search(

logging.info('New citations found: %s', len(new_citations))

write_df_to_blob(new_all_citations, f"{database}-all-citations.csv", f"citation-data/bronze-data/{database}")
write_df_to_blob(new_citations, blob_name, "citation-deduplicate/to-process")
await save_df(all_path, new_all_citations)

silver_path = (
f"{settings.STORAGE_CONTAINER_NAME}/"
f"citation-data/silver-data/silver-citation-data.csv"
)
combined_df = await load_all_database_searches()
combined_df = normalize_ids(combined_df)
await save_df(silver_path, combined_df)

return {"message": f"{database} collection completed", "new_citations_count": len(new_citations)}

@router.get("/{sr_id}/combine")
async def combine_sources(sr_id: str):
combined_df = await load_all_database_searches()

if combined_df.empty:
return {"message": "No data found, perform search first"}

combined_df = normalize_ids(combined_df)

silver_path = (
f"{settings.STORAGE_CONTAINER_NAME}/"
f"citation-data/silver-data/silver-citation-data.csv"
)

await save_df(silver_path, combined_df)

return {"message": f"{database} collection completed", "new_citations_count": len(new_citations)}
return {
"total citations": len(combined_df)
}

52 changes: 52 additions & 0 deletions backend/api/services/citation_search/citation_search_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from typing import Dict, Any, List, Optional
from ...core.security import get_current_active_user
from ...core.config import settings
from ..storage import storage_service

import logging
import os
import pandas as pd
from io import StringIO, BytesIO
import azure.functions as func
from azure.core.exceptions import AzureError
from azure.storage.blob import BlobServiceClient

def normalize_ids(df: pd.DataFrame):
if "pmid" in df.columns:
df["id"] = df["pmid"]
elif "eid" in df.columns:
df["id"] = df["eid"]
elif "id" not in df.columns:
df["id"] = None
return df

async def load_all_database_searches():
base_path = "citation-data/bronze-data"

databases = ["Pubmed", "EuropePMC", "Scopus"]
dfs = []

for database in databases:
all_path = (
f"{settings.STORAGE_CONTAINER_NAME}/"
f"{base_path}/{database}/{database}-all-citations.csv"
)

try:
bytes_data, _ = await storage_service.get_bytes_by_path(all_path)
df = pd.read_csv(BytesIO(bytes_data))
df["source"] = database
dfs.append(df)
except Exception as e:
logging.warning(f"{database} missing: {e}")

if not dfs:
return pd.DataFrame()

return pd.concat(dfs, ignore_index=True)

async def save_df(path: str, df: pd.DataFrame):
buffer = BytesIO()
df.to_csv(buffer, index=False, encoding="utf-8")
buffer.seek(0)
await storage_service.put_bytes_by_path(path, buffer.getvalue(), "text/csv")
3 changes: 1 addition & 2 deletions backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,4 @@ itsdangerous==2.2.0
biopython==1.86
clean-text==0.7.1
azure-functions==1.24.0

pandas
pandas==2.2.2
Loading