diff --git a/CLAUDE.md b/CLAUDE.md index b8184e0..461a04a 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -13,10 +13,17 @@ FastFetchBot/ │ ├── config.py # URL patterns, shared env vars │ ├── models/ # UrlMetadata, MetadataItem, NamedBytesIO, etc. │ ├── utils/ # parse, image, logger, network, cookie +│ ├── database/ +│ │ ├── base.py, engine.py, session.py # SQLAlchemy (user settings) +│ │ ├── models/user_setting.py # UserSetting SQLAlchemy model +│ │ └── mongodb/ # Beanie ODM (scraped content) +│ │ ├── connection.py # init_mongodb(), close_mongodb(), save_instances() +│ │ ├── cache.py # find_cached(), save_metadata() — URL-based cache with TTL + versioning +│ │ └── models/metadata.py # Metadata Document, DatabaseMediaFile │ └── services/ │ ├── scrapers/ # All platform scrapers + ScraperManager + InfoExtractService │ │ ├── config.py # ALL scraper env vars (platform creds, Firecrawl, Zyte, Telegraph tokens) -│ │ ├── common.py # Core InfoExtractService (scraping only, no API dependencies) +│ │ ├── common.py # Core InfoExtractService (scraping + MongoDB cache lookup) │ │ ├── scraper_manager.py │ │ ├── scraper.py # Base Scraper + DataProcessor ABCs │ │ ├── templates/ # 13 Jinja2 templates for platform output formatting @@ -50,7 +57,9 @@ The Telegram Bot communicates with the API server over HTTP (`API_SERVER_URL`). - **`main.py`** — FastAPI app setup, Sentry integration, lifecycle management - **`config.py`** — API-only env vars: BASE_URL, API_KEY, DATABASE_ON, MongoDB, Celery, AWS S3, Inoreader, locale/i18n. **No scraper credentials** (those live in `fastfetchbot_shared.services.scrapers.config`) - **`routers/`** — `scraper.py` (generic endpoint), `scraper_routers.py` (platform-specific), `inoreader.py`, `wechat.py` -- **`services/scrapers/common.py`** — `InfoExtractService` (enriched): extends core `InfoExtractService` from shared with Telegraph publishing, PDF export, DB storage, and video download (youtube/bilibili) +- **`services/scrapers/common.py`** — `InfoExtractService` (enriched): extends core `InfoExtractService` from shared with Telegraph publishing, PDF export, DB storage (via `save_metadata()`), and video download (youtube/bilibili). Defaults `database_cache_ttl` from `settings.DATABASE_CACHE_TTL`. Skips enrichment on cache hits via `_cached` flag +- **`database.py`** — Thin wrapper delegating to `fastfetchbot_shared.database.mongodb` (init/close/save) +- **`models/database_model.py`** — Re-export wrapper for `Metadata` from shared - **`services/file_export/`** — PDF generation, audio transcription (OpenAI), video download - **`services/amazon/s3.py`** — S3 storage integration - **`services/telegraph/`** — Re-export wrapper: `from fastfetchbot_shared.services.telegraph import Telegraph` @@ -59,19 +68,27 @@ The Telegram Bot communicates with the API server over HTTP (`API_SERVER_URL`). - **`main.py`** — Entry point - **`api_client.py`** — HTTP client calling the API server -- **`handlers/`** — `messages.py`, `buttons.py`, `url_process.py` -- **`services/`** — `bot_app.py`, `message_sender.py`, `constants.py` +- **`queue_client.py`** — ARQ Redis client for enqueuing scrape jobs (queue mode) +- **`handlers/`** — `messages.py`, `buttons.py`, `url_process.py`, `commands.py` (start, /settings with inline toggles) +- **`services/`** — `bot_app.py`, `message_sender.py`, `user_settings.py` (get/toggle `auto_fetch_in_dm` and `force_refresh_cache`), `constants.py` - **`webhook/server.py`** — Webhook/polling server - **`templates/`** — Jinja2 templates for bot messages ### Shared Library (`packages/shared/fastfetchbot_shared/`) - **`config.py`** — URL patterns (SOCIAL_MEDIA_WEBSITE_PATTERNS, VIDEO_WEBSITE_PATTERNS, BANNED_PATTERNS); shared env vars including `SIGN_SERVER_URL` and `XHS_COOKIE_PATH` -- **`models/`** — `classes.py` (NamedBytesIO), `metadata_item.py`, `telegraph_item.py`, `url_metadata.py` +- **`models/`** — `classes.py` (NamedBytesIO), `metadata_item.py` (MediaFile, MetadataItem, MessageType), `telegraph_item.py`, `url_metadata.py` - **`utils/`** — `parse.py` (URL parsing, HTML processing, `get_env_bool`), `image.py`, `logger.py`, `network.py`, `cookie.py` +- **`database/`** — Dual database layer: + - **SQLAlchemy** (user settings): `base.py`, `engine.py`, `session.py`, `models/user_setting.py` — `UserSetting` model with `auto_fetch_in_dm` and `force_refresh_cache` toggles. Supports SQLite (dev) and PostgreSQL (prod) via `SETTINGS_DATABASE_URL`. Alembic migrations in `packages/shared/alembic/` + - **`database/mongodb/`** — Beanie ODM for scraped content persistence, shared across API and async worker: + - **`connection.py`** — `init_mongodb(mongodb_url, db_name)`, `close_mongodb()`, `save_instances()`. Parameterized — each app passes its own config at startup + - **`cache.py`** — MongoDB-backed URL cache: `find_cached(url, ttl_seconds)` returns the latest versioned document if within TTL (0 = never expire); `save_metadata(metadata_item)` auto-increments `version` for the same URL before inserting + - **`models/metadata.py`** — `Metadata(Document)` with fields: title, url, author, content, media_files, telegraph_url, timestamp, version, etc. `DatabaseMediaFile(MediaFile)` extends the scraper `MediaFile` dataclass with `file_key` for S3 storage. Compound index on `(url, version)` for efficient cache lookups. `@before_event(Insert)` hook auto-computes text lengths and converts `MediaFile` → `DatabaseMediaFile` + - **`__init__.py`** — Re-exports: `init_mongodb`, `close_mongodb`, `save_instances`, `find_cached`, `save_metadata`, `Metadata`, `DatabaseMediaFile` - **`services/scrapers/`** — All platform scrapers, fully decoupled from FastAPI: - **`config.py`** — All scraper env vars: platform credentials (Twitter, Bluesky, Weibo, XHS, Zhihu, Reddit, Instagram), Firecrawl/Zyte config, OpenAI key, Telegraph tokens, `JINJA2_ENV`, cookie file loading. Configurable `CONF_DIR` for cookie/config files - - **`common.py`** — Core `InfoExtractService`: routes URLs to the correct scraper, returns raw metadata. No Telegraph/PDF/DB dependencies + - **`common.py`** — Core `InfoExtractService`: routes URLs to the correct scraper, returns raw metadata. Includes MongoDB cache lookup at the top of `get_item()` when `store_database=True` and `database_cache_ttl >= 0`. Cache hits return a dict with `_cached=True` so callers can skip enrichment. Uses lazy imports for `find_cached` to avoid import-time beanie dependency - **`scraper.py`** — Base `Scraper` and `DataProcessor` abstract classes - **`scraper_manager.py`** — `ScraperManager` with lazy initialization for bluesky, weibo, and general scrapers - **`templates/`** — 13 Jinja2 templates for platform-specific output formatting (bundled via `__file__`-relative paths) @@ -84,7 +101,7 @@ The shared scrapers library can be used standalone without the API server: from fastfetchbot_shared.services.scrapers import InfoExtractService, ScraperManager ``` -Optional dependencies are grouped under `fastfetchbot-shared[scrapers]` (Jinja2, atproto, asyncpraw, firecrawl-py, etc.). +Optional dependencies are grouped under `fastfetchbot-shared[scrapers]` (Jinja2, atproto, asyncpraw, firecrawl-py, etc.) and `fastfetchbot-shared[mongodb]` (beanie, motor). ## Development Commands @@ -161,8 +178,30 @@ See `template.env` for a complete reference. Key variables: - See `template.env` for all platform-specific variables (Twitter, Weibo, Xiaohongshu, Reddit, Instagram, Bluesky, etc.) ### Database -- Optional MongoDB integration (`DATABASE_ON=true`) -- Uses Beanie ODM for async operations + +**MongoDB** (scraped content — optional, feature-gated): +| Variable | Default | Description | +|----------|---------|-------------| +| `DATABASE_ON` | `false` | Enable MongoDB storage of scraped metadata | +| `DATABASE_CACHE_TTL` | `86400` | Cache TTL in seconds. `0` = never expire (always use cache) | +| `MONGODB_HOST` | `localhost` | MongoDB host | +| `MONGODB_PORT` | `27017` | MongoDB port | +| `MONGODB_USERNAME` | `""` | MongoDB username (async worker only; included in derived URL if set) | +| `MONGODB_PASSWORD` | `""` | MongoDB password (async worker only) | +| `MONGODB_URL` | derived | Full MongoDB URI. Overrides host/port/credentials if set explicitly | + +MongoDB models and connection logic live in `packages/shared/fastfetchbot_shared/database/mongodb/`. Both the API server and async worker use the same shared ODM layer. The `Metadata` Beanie Document stores scraped content with versioning — each re-scrape of the same URL increments the `version` field. The cache system (`find_cached` / `save_metadata`) queries the latest version and checks TTL before deciding to re-scrape. + +**SQLite/PostgreSQL** (user settings — always enabled for the Telegram bot): +| Variable | Default | Description | +|----------|---------|-------------| +| `SETTINGS_DATABASE_URL` | `sqlite+aiosqlite:///data/fastfetchbot.db` | SQLAlchemy connection URL. Use `postgresql+asyncpg://...` for production | + +Alembic migrations live in `packages/shared/alembic/`. Run with: +```bash +cd packages/shared +SETTINGS_DATABASE_URL="postgresql+asyncpg://user:pass@host:5432/db" uv run alembic upgrade head +``` ## CI/CD diff --git a/apps/api/pyproject.toml b/apps/api/pyproject.toml index b3fda22..122848c 100644 --- a/apps/api/pyproject.toml +++ b/apps/api/pyproject.toml @@ -3,13 +3,12 @@ name = "fastfetchbot-api" version = "0.1.0" requires-python = ">=3.12,<3.13" dependencies = [ - "fastfetchbot-shared[scrapers]", + "fastfetchbot-shared[scrapers,mongodb]", "fastapi>=0.115.12", "sentry-sdk[fastapi]>=2.27.0", "gunicorn>=23.0.0", "uvicorn>=0.34.2", "babel>=2.17.0", - "beanie>=1.29.0", "pillow>=10.0.0", "pydub>=0.25.1", "xhtml2pdf>=0.2.17", diff --git a/apps/api/src/config.py b/apps/api/src/config.py index 1ce4adc..119b649 100644 --- a/apps/api/src/config.py +++ b/apps/api/src/config.py @@ -28,6 +28,7 @@ class ApiSettings(BaseSettings): # MongoDB DATABASE_ON: bool = False + DATABASE_CACHE_TTL: int = 86400 # seconds; 0 = never expire MONGODB_PORT: int = 27017 MONGODB_HOST: str = "localhost" MONGODB_URL: str = "" diff --git a/apps/api/src/database.py b/apps/api/src/database.py index 40224f8..bea76dd 100644 --- a/apps/api/src/database.py +++ b/apps/api/src/database.py @@ -1,37 +1,14 @@ -from typing import Optional, Union, List - -from motor.motor_asyncio import AsyncIOMotorClient -from beanie import init_beanie, Document, Indexed - +from fastfetchbot_shared.database.mongodb import ( + init_mongodb, + close_mongodb, + save_instances, +) from src.config import settings -from src.models.database_model import document_list -from fastfetchbot_shared.utils.logger import logger async def startup() -> None: - client = AsyncIOMotorClient(settings.MONGODB_URL) - await init_beanie(database=client["telegram_bot"], document_models=document_list) + await init_mongodb(settings.MONGODB_URL) async def shutdown() -> None: - pass - - -async def save_instances(instances: Union[Document, List[Document]], *args) -> None: - if instances is None: - raise TypeError("instances must be a Model or a list of Model") - - if isinstance(instances, Document): - instance_type = type(instances) - await instance_type.insert(instances) - elif isinstance(instances, list): - instance_type = type(instances[0]) - await instance_type.insert_many(instances) - else: - raise TypeError("instances must be a Model or a list of Model") - - for arg in args: - if not isinstance(arg, Document): - raise TypeError("args must be a Model") - instance_type = type(arg) - await instance_type.insert_one(arg) + await close_mongodb() diff --git a/apps/api/src/models/database_model.py b/apps/api/src/models/database_model.py index 049756f..c96bd9e 100644 --- a/apps/api/src/models/database_model.py +++ b/apps/api/src/models/database_model.py @@ -1,41 +1,6 @@ -from typing import Optional, Any -from datetime import datetime +from fastfetchbot_shared.database.mongodb.models.metadata import ( + Metadata, + document_list, +) -from pydantic import BaseModel, Field -from beanie import Document, Indexed, Insert, after_event, before_event - -from fastfetchbot_shared.models.metadata_item import MediaFile, MessageType -from fastfetchbot_shared.utils.logger import logger -from fastfetchbot_shared.utils.parse import get_html_text_length - - -class Metadata(Document): - title: str = Field(default="untitled") - message_type: MessageType = MessageType.SHORT - url: str - author: Optional[str] = None - author_url: Optional[str] = None - text: Optional[str] = None - text_length: Optional[int] = Field(ge=0) - content: Optional[str] = None - content_length: Optional[int] = Field(ge=0) - category: Optional[str] = None - source: Optional[str] = None - media_files: Optional[list[MediaFile]] = None - telegraph_url: Optional[str] = None - timestamp: datetime = Field(default_factory=datetime.utcnow) - scrape_status: bool = False - - @before_event(Insert) - def get_text_length(self): - self.text_length = get_html_text_length(self.text) - self.content_length = get_html_text_length(self.content) - - # - @staticmethod - def from_dict(obj: Any) -> "Metadata": - assert isinstance(obj, dict) - return Metadata(**obj) - - -document_list = [Metadata] +__all__ = ["Metadata", "document_list"] diff --git a/apps/api/src/services/scrapers/common.py b/apps/api/src/services/scrapers/common.py index 2782dec..d159d7e 100644 --- a/apps/api/src/services/scrapers/common.py +++ b/apps/api/src/services/scrapers/common.py @@ -1,12 +1,10 @@ from typing import Optional, Any -from src.models.database_model import Metadata from fastfetchbot_shared.models.url_metadata import UrlMetadata from fastfetchbot_shared.models.metadata_item import MessageType from fastfetchbot_shared.services.scrapers.common import InfoExtractService as CoreInfoExtractService from fastfetchbot_shared.services.telegraph import Telegraph from src.services.file_export import video_download, document_export -from src.database import save_instances from fastfetchbot_shared.utils.logger import logger from src.config import settings @@ -27,20 +25,26 @@ def __init__( store_database: Optional[bool] = None, store_telegraph: Optional[bool] = True, store_document: Optional[bool] = False, + database_cache_ttl: Optional[int] = None, **kwargs, ): if store_database is None: store_database = settings.DATABASE_ON + if database_cache_ttl is None: + database_cache_ttl = settings.DATABASE_CACHE_TTL super().__init__( url_metadata, data=data, store_database=store_database, store_telegraph=store_telegraph, store_document=store_document, + database_cache_ttl=database_cache_ttl, **kwargs, ) async def process_item(self, metadata_item: dict) -> dict: + if metadata_item.pop("_cached", False): + return metadata_item if metadata_item.get("message_type") == MessageType.LONG: self.store_telegraph = True logger.info("message type is long, store in telegraph") @@ -73,5 +77,7 @@ async def process_item(self, metadata_item: dict) -> dict: metadata_item["title"] = metadata_item["title"].strip() if self.store_database: logger.info("store in database") - await save_instances(Metadata.model_construct(**metadata_item)) + from fastfetchbot_shared.database.mongodb.cache import save_metadata + + await save_metadata(metadata_item) return metadata_item diff --git a/apps/async-worker/async_worker/config.py b/apps/async-worker/async_worker/config.py index d5cb61b..63dca52 100644 --- a/apps/async-worker/async_worker/config.py +++ b/apps/async-worker/async_worker/config.py @@ -22,10 +22,13 @@ class AsyncWorkerSettings(BaseSettings): STORE_TELEGRAPH: bool = True STORE_DOCUMENT: bool = False DATABASE_ON: bool = False + DATABASE_CACHE_TTL: int = 86400 # seconds; 0 = never expire # MongoDB MONGODB_HOST: str = "localhost" MONGODB_PORT: int = 27017 + MONGODB_USERNAME: str = "" + MONGODB_PASSWORD: str = "" MONGODB_URL: str = "" # Timeout @@ -34,7 +37,10 @@ class AsyncWorkerSettings(BaseSettings): @model_validator(mode="after") def _resolve_derived(self) -> "AsyncWorkerSettings": if not self.MONGODB_URL: - self.MONGODB_URL = f"mongodb://{self.MONGODB_HOST}:{self.MONGODB_PORT}" + if self.MONGODB_USERNAME and self.MONGODB_PASSWORD: + self.MONGODB_URL = f"mongodb://{self.MONGODB_USERNAME}:{self.MONGODB_PASSWORD}@{self.MONGODB_HOST}:{self.MONGODB_PORT}" + else: + self.MONGODB_URL = f"mongodb://{self.MONGODB_HOST}:{self.MONGODB_PORT}" return self diff --git a/apps/async-worker/async_worker/main.py b/apps/async-worker/async_worker/main.py index be4f3bd..4f599b4 100644 --- a/apps/async-worker/async_worker/main.py +++ b/apps/async-worker/async_worker/main.py @@ -56,3 +56,17 @@ class WorkerSettings: # Health-check the Redis connection every 30s to prevent stale connections health_check_interval = 30 + + @staticmethod + async def on_startup(ctx: dict) -> None: + if settings.DATABASE_ON: + from fastfetchbot_shared.database.mongodb import init_mongodb + + await init_mongodb(settings.MONGODB_URL) + + @staticmethod + async def on_shutdown(ctx: dict) -> None: + if settings.DATABASE_ON: + from fastfetchbot_shared.database.mongodb import close_mongodb + + await close_mongodb() diff --git a/apps/async-worker/async_worker/services/enrichment.py b/apps/async-worker/async_worker/services/enrichment.py index 2254bcb..c33485e 100644 --- a/apps/async-worker/async_worker/services/enrichment.py +++ b/apps/async-worker/async_worker/services/enrichment.py @@ -10,6 +10,7 @@ async def enrich( metadata_item: dict, store_telegraph: bool | None = None, store_document: bool | None = None, + store_database: bool | None = None, ) -> dict: """Apply enrichment steps to a scraped metadata item. @@ -20,6 +21,8 @@ async def enrich( store_telegraph = settings.STORE_TELEGRAPH if store_document is None: store_document = settings.STORE_DOCUMENT + if store_database is None: + store_database = settings.DATABASE_ON # Force Telegraph for long messages if metadata_item.get("message_type") == MessageType.LONG: @@ -62,5 +65,14 @@ async def enrich( except Exception as e: logger.error(f"Error exporting PDF: {e}") + # MongoDB persistence (versioned) + if store_database: + try: + from fastfetchbot_shared.database.mongodb.cache import save_metadata + + await save_metadata(metadata_item) + except Exception as e: + logger.error(f"Error saving to MongoDB: {e}") + metadata_item["title"] = metadata_item["title"].strip() return metadata_item diff --git a/apps/async-worker/async_worker/tasks/scrape.py b/apps/async-worker/async_worker/tasks/scrape.py index e362608..43f2b33 100644 --- a/apps/async-worker/async_worker/tasks/scrape.py +++ b/apps/async-worker/async_worker/tasks/scrape.py @@ -20,6 +20,7 @@ async def scrape_and_enrich( bot_id: int | str | None = None, store_telegraph: bool | None = None, store_document: bool | None = None, + force_refresh_cache: bool = False, **kwargs, ) -> dict: """ARQ task: scrape a URL, enrich the result, and push to the outbox. @@ -36,6 +37,7 @@ async def scrape_and_enrich( bot's outbox queue (``scrape:outbox:{bot_id}``). store_telegraph: Override Telegraph publishing flag. store_document: Override PDF export flag. + force_refresh_cache: If True, bypass the database cache and re-scrape. **kwargs: Extra arguments passed to the scraper. """ if job_id is None: @@ -52,18 +54,21 @@ async def scrape_and_enrich( url_metadata=url_metadata, store_telegraph=False, # We handle enrichment separately store_document=False, + store_database=settings.DATABASE_ON, + database_cache_ttl=-1 if force_refresh_cache else settings.DATABASE_CACHE_TTL, celery_app=celery_app, timeout=settings.DOWNLOAD_VIDEO_TIMEOUT, **kwargs, ) metadata_item = await service.get_item() - # Enrich: Telegraph, PDF - metadata_item = await enrichment.enrich( - metadata_item, - store_telegraph=store_telegraph, - store_document=store_document, - ) + # Skip enrichment if result came from cache + if not metadata_item.pop("_cached", False): + metadata_item = await enrichment.enrich( + metadata_item, + store_telegraph=store_telegraph, + store_document=store_document, + ) logger.info(f"[{job_id}] Scrape completed successfully") diff --git a/apps/async-worker/pyproject.toml b/apps/async-worker/pyproject.toml index 6c4a00a..ffc8b80 100644 --- a/apps/async-worker/pyproject.toml +++ b/apps/async-worker/pyproject.toml @@ -3,11 +3,10 @@ name = "fastfetchbot-async-worker" version = "0.1.0" requires-python = ">=3.12,<3.13" dependencies = [ - "fastfetchbot-shared[scrapers]", + "fastfetchbot-shared[scrapers,mongodb]", "arq>=0.26.1", "redis[hiredis]>=5.0.0", "celery[redis]>=5.4.0", - "beanie>=1.29.0", ] [build-system] diff --git a/apps/telegram-bot/core/handlers/buttons.py b/apps/telegram-bot/core/handlers/buttons.py index 5d85149..c5f0578 100644 --- a/apps/telegram-bot/core/handlers/buttons.py +++ b/apps/telegram-bot/core/handlers/buttons.py @@ -10,6 +10,7 @@ from fastfetchbot_shared.models.metadata_item import MessageType from core import api_client from core.services.message_sender import send_item_message +from core.services.user_settings import get_force_refresh_cache from fastfetchbot_shared.utils.logger import logger from core.config import settings, TELEGRAM_CHANNEL_ID @@ -57,6 +58,11 @@ async def buttons_process(update: Update, context: CallbackContext) -> None: await query.answer("Video processing...") extra_args = data["extra_args"] if "extra_args" in data else {} + # Look up user's force_refresh_cache preference + force_refresh = await get_force_refresh_cache(query.from_user.id) + if force_refresh: + extra_args["force_refresh_cache"] = True + if settings.SCRAPE_MODE == "queue": from core import queue_client diff --git a/apps/telegram-bot/core/handlers/commands.py b/apps/telegram-bot/core/handlers/commands.py index fa6ec01..7bc0be1 100644 --- a/apps/telegram-bot/core/handlers/commands.py +++ b/apps/telegram-bot/core/handlers/commands.py @@ -5,6 +5,8 @@ ensure_user_settings, get_auto_fetch_in_dm, toggle_auto_fetch_in_dm, + get_force_refresh_cache, + toggle_force_refresh_cache, ) @@ -26,10 +28,11 @@ async def settings_command(update: Update, context: CallbackContext) -> None: user_id = update.effective_user.id await ensure_user_settings(user_id) auto_fetch = await get_auto_fetch_in_dm(user_id) + force_refresh = await get_force_refresh_cache(user_id) - keyboard = _build_settings_keyboard(auto_fetch) + keyboard = _build_settings_keyboard(auto_fetch, force_refresh) await update.message.reply_text( - text=_build_settings_text(auto_fetch), + text=_build_settings_text(auto_fetch, force_refresh), reply_markup=InlineKeyboardMarkup(keyboard), ) @@ -40,44 +43,61 @@ async def settings_callback(update: Update, context: CallbackContext) -> None: await query.answer() data = query.data + user_id = update.effective_user.id if data == "settings:close": await query.message.delete() return - if data != "settings:toggle_auto_fetch": + if data == "settings:toggle_auto_fetch": + await toggle_auto_fetch_in_dm(user_id) + elif data == "settings:toggle_force_refresh": + await toggle_force_refresh_cache(user_id) + else: return - user_id = update.effective_user.id - new_value = await toggle_auto_fetch_in_dm(user_id) + auto_fetch = await get_auto_fetch_in_dm(user_id) + force_refresh = await get_force_refresh_cache(user_id) - keyboard = _build_settings_keyboard(new_value) + keyboard = _build_settings_keyboard(auto_fetch, force_refresh) await query.edit_message_text( - text=_build_settings_text(new_value), + text=_build_settings_text(auto_fetch, force_refresh), reply_markup=InlineKeyboardMarkup(keyboard), ) -def _build_settings_keyboard(auto_fetch: bool) -> list[list[InlineKeyboardButton]]: - status = "ON" if auto_fetch else "OFF" +def _build_settings_keyboard( + auto_fetch: bool, force_refresh: bool +) -> list[list[InlineKeyboardButton]]: + auto_fetch_status = "ON" if auto_fetch else "OFF" + force_refresh_status = "ON" if force_refresh else "OFF" return [ [ InlineKeyboardButton( - f"Auto-fetch in DM: {status}", + f"Auto-fetch in DM: {auto_fetch_status}", callback_data="settings:toggle_auto_fetch", ) ], + [ + InlineKeyboardButton( + f"Force refresh cache: {force_refresh_status}", + callback_data="settings:toggle_force_refresh", + ) + ], [ InlineKeyboardButton("Close", callback_data="settings:close"), ], ] -def _build_settings_text(auto_fetch: bool) -> str: - status = "enabled" if auto_fetch else "disabled" +def _build_settings_text(auto_fetch: bool, force_refresh: bool) -> str: + auto_fetch_status = "enabled" if auto_fetch else "disabled" + force_refresh_status = "enabled" if force_refresh else "disabled" return ( f"Your Settings\n\n" - f"Auto-fetch in DM: {status}\n" + f"Auto-fetch in DM: {auto_fetch_status}\n" f"When enabled, URLs sent in private chat will be automatically processed.\n" - f"When disabled, you will see action buttons to choose how to process each URL." + f"When disabled, you will see action buttons to choose how to process each URL.\n\n" + f"Force refresh cache: {force_refresh_status}\n" + f"When enabled, cached results are ignored and content is always re-scraped." ) diff --git a/apps/telegram-bot/core/handlers/url_process.py b/apps/telegram-bot/core/handlers/url_process.py index a3a962a..5d39241 100644 --- a/apps/telegram-bot/core/handlers/url_process.py +++ b/apps/telegram-bot/core/handlers/url_process.py @@ -8,7 +8,7 @@ ) from core.services.message_sender import send_item_message -from core.services.user_settings import get_auto_fetch_in_dm +from core.services.user_settings import get_auto_fetch_in_dm, get_force_refresh_cache from fastfetchbot_shared.utils.config import SOCIAL_MEDIA_WEBSITE_PATTERNS, VIDEO_WEBSITE_PATTERNS from fastfetchbot_shared.utils.logger import logger from core.config import ( @@ -79,10 +79,12 @@ async def _fetch_and_send( async def https_url_process(update: Update, context: CallbackContext) -> None: message = update.message - # Check user's auto-fetch preference - auto_fetch = await get_auto_fetch_in_dm(message.from_user.id) + # Check user's preferences + user_id = message.from_user.id + auto_fetch = await get_auto_fetch_in_dm(user_id) + force_refresh = await get_force_refresh_cache(user_id) if auto_fetch: - await _auto_fetch_urls(message) + await _auto_fetch_urls(message, force_refresh_cache=force_refresh) return welcome_message = await message.reply_text( @@ -110,6 +112,7 @@ async def https_url_process(update: Update, context: CallbackContext) -> None: chat_id=message.chat_id, source=url_metadata.get("source", ""), content_type=url_metadata.get("content_type", ""), + force_refresh_cache=force_refresh, ) await process_message.edit_text( text=f"For the {i + 1} th url, no supported url found." @@ -261,7 +264,7 @@ async def https_url_process(update: Update, context: CallbackContext) -> None: await process_message.delete() -async def _auto_fetch_urls(message) -> None: +async def _auto_fetch_urls(message, force_refresh_cache: bool = False) -> None: """Auto-fetch all URLs in a DM message without showing action buttons.""" url_dict = message.parse_entities(types=["url"]) for i, url in enumerate(url_dict.values()): @@ -274,6 +277,7 @@ async def _auto_fetch_urls(message) -> None: chat_id=message.chat_id, source=url_metadata.get("source", ""), content_type=url_metadata.get("content_type", ""), + force_refresh_cache=force_refresh_cache, ) elif url_metadata["source"] == "unknown" or url_metadata["source"] == "banned": logger.debug(f"for the {i + 1}th url {url}, no supported url found.") @@ -284,6 +288,7 @@ async def _auto_fetch_urls(message) -> None: chat_id=message.chat_id, source=url_metadata.get("source", ""), content_type=url_metadata.get("content_type", ""), + force_refresh_cache=force_refresh_cache, ) if url_metadata.get("source") in VIDEO_WEBSITE_PATTERNS.keys(): await _fetch_and_send( @@ -291,6 +296,7 @@ async def _auto_fetch_urls(message) -> None: chat_id=message.chat_id, source=url_metadata.get("source", ""), content_type=url_metadata.get("content_type", ""), + force_refresh_cache=force_refresh_cache, ) diff --git a/apps/telegram-bot/core/services/user_settings.py b/apps/telegram-bot/core/services/user_settings.py index 9d17c0b..969303a 100644 --- a/apps/telegram-bot/core/services/user_settings.py +++ b/apps/telegram-bot/core/services/user_settings.py @@ -50,3 +50,32 @@ async def toggle_auto_fetch_in_dm(user_id: int) -> bool: else: user_setting.auto_fetch_in_dm = not user_setting.auto_fetch_in_dm return user_setting.auto_fetch_in_dm + + +async def get_force_refresh_cache(user_id: int) -> bool: + """Return the user's force_refresh_cache preference. Defaults to False.""" + async with get_session() as session: + result = await session.execute( + select(UserSetting.force_refresh_cache).where( + UserSetting.telegram_user_id == user_id + ) + ) + value = result.scalar_one_or_none() + return value if value is not None else False + + +async def toggle_force_refresh_cache(user_id: int) -> bool: + """Toggle force_refresh_cache for the given user. Returns the new value.""" + async with get_session() as session: + result = await session.execute( + select(UserSetting).where(UserSetting.telegram_user_id == user_id) + ) + user_setting = result.scalar_one_or_none() + if user_setting is None: + user_setting = UserSetting( + telegram_user_id=user_id, force_refresh_cache=True + ) + session.add(user_setting) + else: + user_setting.force_refresh_cache = not user_setting.force_refresh_cache + return user_setting.force_refresh_cache diff --git a/docker-compose.template.yml b/docker-compose.template.yml index fd79123..d552921 100644 --- a/docker-compose.template.yml +++ b/docker-compose.template.yml @@ -19,6 +19,7 @@ services: depends_on: - telegram-bot-api - redis + # - mongodb # Uncomment when using MongoDB for scraped content caching telegram-bot: image: ghcr.io/aturret/fastfetchbot-telegram-bot:latest @@ -86,6 +87,22 @@ services: ports: - 6379:6379 + # Uncomment to enable MongoDB for scraped content caching. + # Set DATABASE_ON=true in your .env file to activate the cache system. + # Also set MONGODB_URL=mongodb://fastfetchbot:fastfetchbot@mongodb:27017 + # or set MONGODB_HOST=mongodb, MONGODB_USERNAME=fastfetchbot, MONGODB_PASSWORD=fastfetchbot. + # mongodb: + # image: mongo:7-jammy + # container_name: fastfetchbot-mongodb + # restart: always + # volumes: + # - mongodb_data:/data/db + # environment: + # - MONGO_INITDB_ROOT_USERNAME=fastfetchbot + # - MONGO_INITDB_ROOT_PASSWORD=fastfetchbot + # ports: + # - 27017:27017 + async-worker: image: ghcr.io/aturret/fastfetchbot-async-worker:latest # build: @@ -104,6 +121,7 @@ services: - ./conf:/app/conf depends_on: - redis + # - mongodb # Uncomment when using MongoDB for scraped content caching worker: image: ghcr.io/aturret/fastfetchbot-worker:latest @@ -129,5 +147,6 @@ volumes: telegram-bot-api-data-cache: telegram_bot_data: # postgres_data: # Uncomment when using PostgreSQL + # mongodb_data: # Uncomment when using MongoDB redis_data: shared_files: diff --git a/packages/shared/alembic/versions/0002_add_force_refresh_cache_column.py b/packages/shared/alembic/versions/0002_add_force_refresh_cache_column.py new file mode 100644 index 0000000..eab59e7 --- /dev/null +++ b/packages/shared/alembic/versions/0002_add_force_refresh_cache_column.py @@ -0,0 +1,32 @@ +"""add force_refresh_cache column to user_settings + +Revision ID: 0002 +Revises: 0001 +Create Date: 2026-03-29 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + +revision: str = "0002" +down_revision: Union[str, None] = "0001" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column( + "user_settings", + sa.Column( + "force_refresh_cache", + sa.Boolean(), + server_default="0", + nullable=False, + ), + ) + + +def downgrade() -> None: + op.drop_column("user_settings", "force_refresh_cache") diff --git a/packages/shared/fastfetchbot_shared/database/models/user_setting.py b/packages/shared/fastfetchbot_shared/database/models/user_setting.py index 2c2d1d3..65cbff3 100644 --- a/packages/shared/fastfetchbot_shared/database/models/user_setting.py +++ b/packages/shared/fastfetchbot_shared/database/models/user_setting.py @@ -15,6 +15,9 @@ class UserSetting(Base): auto_fetch_in_dm: Mapped[bool] = mapped_column( Boolean, default=True, server_default="1" ) + force_refresh_cache: Mapped[bool] = mapped_column( + Boolean, default=False, server_default="0" + ) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), diff --git a/packages/shared/fastfetchbot_shared/database/mongodb/__init__.py b/packages/shared/fastfetchbot_shared/database/mongodb/__init__.py new file mode 100644 index 0000000..3e1f87b --- /dev/null +++ b/packages/shared/fastfetchbot_shared/database/mongodb/__init__.py @@ -0,0 +1,23 @@ +from fastfetchbot_shared.database.mongodb.connection import ( + init_mongodb, + close_mongodb, + save_instances, +) +from fastfetchbot_shared.database.mongodb.models.metadata import ( + DatabaseMediaFile, + Metadata, +) +from fastfetchbot_shared.database.mongodb.cache import ( + find_cached, + save_metadata, +) + +__all__ = [ + "init_mongodb", + "close_mongodb", + "save_instances", + "find_cached", + "save_metadata", + "DatabaseMediaFile", + "Metadata", +] diff --git a/packages/shared/fastfetchbot_shared/database/mongodb/cache.py b/packages/shared/fastfetchbot_shared/database/mongodb/cache.py new file mode 100644 index 0000000..3684332 --- /dev/null +++ b/packages/shared/fastfetchbot_shared/database/mongodb/cache.py @@ -0,0 +1,79 @@ +"""MongoDB cache layer for scraped metadata. + +Provides URL-based cache lookup with TTL support and versioned saves. +""" + +from datetime import datetime, timedelta +from typing import Optional + +from fastfetchbot_shared.database.mongodb.models.metadata import Metadata +from fastfetchbot_shared.utils.logger import logger + + +async def find_cached(url: str, ttl_seconds: int) -> Optional[Metadata]: + """Look up the latest cached Metadata document for a URL. + + Args: + url: The URL to look up. + ttl_seconds: Maximum age in seconds. ``0`` disables expiry + (always returns the cached document if it exists). + + Returns: + The cached Metadata document, or ``None`` if no valid cache entry exists. + """ + doc = await ( + Metadata.find(Metadata.url == url) + .sort(-Metadata.version) + .limit(1) + .first_or_none() + ) + if doc is None: + return None + + # ttl_seconds == 0 means never expire + if ttl_seconds != 0: + age = datetime.utcnow() - doc.timestamp + if age > timedelta(seconds=ttl_seconds): + logger.info( + f"Cache expired for {url} (age={age}, ttl={ttl_seconds}s)" + ) + return None + + logger.info(f"Cache hit for {url} (version={doc.version})") + return doc + + +async def save_metadata(metadata_item: dict) -> Metadata: + """Insert a new Metadata document with auto-incremented version. + + If a document with the same URL already exists, the new document's + version is set to ``latest_version + 1``. Otherwise it starts at 1. + + Args: + metadata_item: Scraper output dict (MetadataItem fields). + Must contain a non-empty ``url`` key. + + Returns: + The inserted Metadata document. + + Raises: + ValueError: If ``url`` is missing or empty. + """ + url = metadata_item.get("url", "") + if not url or not url.strip(): + raise ValueError("metadata_item must contain a non-empty 'url'") + + latest = await ( + Metadata.find(Metadata.url == url) + .sort(-Metadata.version) + .limit(1) + .first_or_none() + ) + new_version = (latest.version + 1) if latest else 1 + metadata_item["version"] = new_version + + doc = Metadata.model_construct(**metadata_item) + await Metadata.insert(doc) + + logger.info(f"Saved metadata for {url} (version={new_version})") + return doc diff --git a/packages/shared/fastfetchbot_shared/database/mongodb/connection.py b/packages/shared/fastfetchbot_shared/database/mongodb/connection.py new file mode 100644 index 0000000..110e2c3 --- /dev/null +++ b/packages/shared/fastfetchbot_shared/database/mongodb/connection.py @@ -0,0 +1,46 @@ +from typing import Union, List + +from motor.motor_asyncio import AsyncIOMotorClient +from beanie import init_beanie, Document + +from fastfetchbot_shared.database.mongodb.models.metadata import document_list +from fastfetchbot_shared.utils.logger import logger + +_client: AsyncIOMotorClient | None = None + + +async def init_mongodb(mongodb_url: str, db_name: str = "telegram_bot") -> None: + global _client + _client = AsyncIOMotorClient(mongodb_url) + await init_beanie(database=_client[db_name], document_models=document_list) + logger.info(f"MongoDB initialized: {db_name}") + + +async def close_mongodb() -> None: + global _client + if _client is not None: + _client.close() + _client = None + logger.info("MongoDB connection closed") + + +async def save_instances(instances: Union[Document, List[Document]], *args) -> None: + if instances is None: + raise TypeError("instances must be a Model or a list of Model") + + if isinstance(instances, Document): + instance_type = type(instances) + await instance_type.insert(instances) + elif isinstance(instances, list): + if not instances: + return + instance_type = type(instances[0]) + await instance_type.insert_many(instances) + else: + raise TypeError("instances must be a Model or a list of Model") + + for arg in args: + if not isinstance(arg, Document): + raise TypeError("args must be a Model") + instance_type = type(arg) + await instance_type.insert_one(arg) diff --git a/packages/shared/fastfetchbot_shared/database/mongodb/models/__init__.py b/packages/shared/fastfetchbot_shared/database/mongodb/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/packages/shared/fastfetchbot_shared/database/mongodb/models/metadata.py b/packages/shared/fastfetchbot_shared/database/mongodb/models/metadata.py new file mode 100644 index 0000000..cd893a8 --- /dev/null +++ b/packages/shared/fastfetchbot_shared/database/mongodb/models/metadata.py @@ -0,0 +1,62 @@ +from typing import Optional, Any +from datetime import datetime + +from pydantic import Field +from pydantic.dataclasses import dataclass as pydantic_dataclass +from beanie import Document, Insert, before_event +from pymongo import DESCENDING + +from fastfetchbot_shared.models.metadata_item import MediaFile, MessageType +from fastfetchbot_shared.utils.logger import logger +from fastfetchbot_shared.utils.parse import get_html_text_length + + +@pydantic_dataclass +class DatabaseMediaFile(MediaFile): + """Media file for MongoDB storage. Inherits all MediaFile fields and adds + ``file_key`` for the S3 object key after upload.""" + + file_key: Optional[str] = None + + +class Metadata(Document): + title: str = Field(default="untitled") + message_type: MessageType = MessageType.SHORT + url: str + author: Optional[str] = None + author_url: Optional[str] = None + text: Optional[str] = None + text_length: Optional[int] = Field(ge=0) + content: Optional[str] = None + content_length: Optional[int] = Field(ge=0) + category: Optional[str] = None + source: Optional[str] = None + media_files: Optional[list[DatabaseMediaFile]] = None + telegraph_url: Optional[str] = None + timestamp: datetime = Field(default_factory=datetime.utcnow) + scrape_status: bool = False + version: int = Field(default=1, ge=1) + + class Settings: + indexes = [ + [("url", DESCENDING), ("version", DESCENDING)], + ] + + @before_event(Insert) + def prepare_for_insert(self): + self.text_length = get_html_text_length(self.text) + self.content_length = get_html_text_length(self.content) + if self.media_files: + self.media_files = [ + item if isinstance(item, DatabaseMediaFile) + else DatabaseMediaFile(**item if isinstance(item, dict) else item.__dict__) + for item in self.media_files + ] + + @staticmethod + def from_dict(obj: Any) -> "Metadata": + assert isinstance(obj, dict) + return Metadata(**obj) + + +document_list = [Metadata] diff --git a/packages/shared/fastfetchbot_shared/services/scrapers/common.py b/packages/shared/fastfetchbot_shared/services/scrapers/common.py index f4e6e4f..f1d3570 100644 --- a/packages/shared/fastfetchbot_shared/services/scrapers/common.py +++ b/packages/shared/fastfetchbot_shared/services/scrapers/common.py @@ -54,6 +54,7 @@ def __init__( store_database: Optional[bool] = False, store_telegraph: Optional[bool] = True, store_document: Optional[bool] = False, + database_cache_ttl: int = -1, **kwargs, ): url_metadata = url_metadata.to_dict() @@ -65,6 +66,7 @@ def __init__( self.store_database = store_database self.store_telegraph = store_telegraph self.store_document = store_document + self.database_cache_ttl = database_cache_ttl @property def category(self) -> str: @@ -79,6 +81,20 @@ def _resolve_scraper_class(self, category: str): raise ScraperError(f"No scraper registered for category: {category}") async def get_item(self, metadata_item: Optional[dict] = None) -> dict: + # Cache lookup: skip scraping entirely if a valid cached document exists + if self.store_database and self.database_cache_ttl >= 0: + try: + from fastfetchbot_shared.database.mongodb.cache import find_cached + + cached = await find_cached(self.url, self.database_cache_ttl) + if cached is not None: + logger.info("Cache hit, returning cached metadata") + result = cached.model_dump(mode="json", exclude={"id"}) + result["_cached"] = True + return result + except Exception as e: + logger.error(f"Cache lookup failed, proceeding with scrape: {e}") + if not metadata_item: try: if self.category in ["bluesky", "weibo", "other", "unknown"]: diff --git a/packages/shared/pyproject.toml b/packages/shared/pyproject.toml index 2ab8616..08e316f 100644 --- a/packages/shared/pyproject.toml +++ b/packages/shared/pyproject.toml @@ -21,6 +21,7 @@ dependencies = [ [project.optional-dependencies] postgres = ["asyncpg>=0.30.0"] migrate = ["alembic>=1.15.0"] +mongodb = ["beanie>=1.29.0", "motor>=3.3.0"] scrapers = [ "jinja2>=3.1.6", "jmespath>=1.0.1", diff --git a/template.env b/template.env index 4d95693..ee034bd 100644 --- a/template.env +++ b/template.env @@ -158,6 +158,29 @@ ZYTE_API_KEY= # PostgreSQL: postgresql+asyncpg://user:password@host:5432/dbname SETTINGS_DATABASE_URL=sqlite+aiosqlite:///data/fastfetchbot.db +# MongoDB (Scraped Content Cache) +# Enable MongoDB storage and caching of scraped metadata. Default: `false` +DATABASE_ON=false + +# Cache TTL in seconds. Cached results older than this are re-scraped. +# Set to 0 to never expire (always use cache). Default: `86400` (24 hours) +DATABASE_CACHE_TTL=86400 + +# MongoDB host. Default: `localhost`. Use `mongodb` in Docker. +MONGODB_HOST=localhost + +# MongoDB port. Default: `27017` +MONGODB_PORT=27017 + +# MongoDB credentials (used by async worker to build the connection URL). +# Leave empty for unauthenticated connections. +MONGODB_USERNAME= +MONGODB_PASSWORD= + +# Full MongoDB connection URI. Overrides MONGODB_HOST/PORT/USERNAME/PASSWORD if set. +# Example: mongodb://user:password@host:27017 +MONGODB_URL= + # Celery Worker # Redis URL for Celery message broker. Default: `redis://localhost:6379/0` CELERY_BROKER_URL=redis://redis:6379/0 diff --git a/tests/unit/async_worker/test_enrichment_database.py b/tests/unit/async_worker/test_enrichment_database.py new file mode 100644 index 0000000..6999159 --- /dev/null +++ b/tests/unit/async_worker/test_enrichment_database.py @@ -0,0 +1,124 @@ +"""Tests for MongoDB persistence in enrichment.enrich(). + +Covers: store_database parameter, save_metadata call, error handling. +""" + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from async_worker.services.enrichment import enrich + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def base_metadata_item(): + """Minimal metadata item dict for enrichment.""" + return { + "title": " Test Title ", + "content": "
Test content
", + "telegraph_url": "https://existing.url", + "media_files": [], + "message_type": "short", + } + + +# --------------------------------------------------------------------------- +# store_database +# --------------------------------------------------------------------------- + + +class TestEnrichStoreDatabase: + @pytest.mark.asyncio + async def test_saves_to_mongodb_when_store_database_true( + self, base_metadata_item + ): + with patch( + "fastfetchbot_shared.database.mongodb.cache.save_metadata", + new_callable=AsyncMock, + ) as mock_save: + result = await enrich( + base_metadata_item, + store_telegraph=False, + store_document=False, + store_database=True, + ) + + mock_save.assert_awaited_once_with(base_metadata_item) + + @pytest.mark.asyncio + async def test_skips_mongodb_when_store_database_false( + self, base_metadata_item + ): + with patch( + "fastfetchbot_shared.database.mongodb.cache.save_metadata", + new_callable=AsyncMock, + ) as mock_save: + await enrich( + base_metadata_item, + store_telegraph=False, + store_document=False, + store_database=False, + ) + + mock_save.assert_not_awaited() + + @pytest.mark.asyncio + async def test_mongodb_error_does_not_crash(self, base_metadata_item): + with patch( + "fastfetchbot_shared.database.mongodb.cache.save_metadata", + new_callable=AsyncMock, + side_effect=RuntimeError("MongoDB connection failed"), + ): + # Should not raise + result = await enrich( + base_metadata_item, + store_telegraph=False, + store_document=False, + store_database=True, + ) + + assert result["title"] == "Test Title" + + @pytest.mark.asyncio + async def test_store_database_defaults_to_settings(self, base_metadata_item): + """When store_database is None, it should use settings.DATABASE_ON.""" + from async_worker.config import settings as aw_settings + + with patch.object(aw_settings, "DATABASE_ON", True), \ + patch( + "fastfetchbot_shared.database.mongodb.cache.save_metadata", + new_callable=AsyncMock, + ) as mock_save: + await enrich( + base_metadata_item, + store_telegraph=False, + store_document=False, + store_database=None, # should fall back to settings.DATABASE_ON + ) + + mock_save.assert_awaited_once() + + @pytest.mark.asyncio + async def test_store_database_defaults_false_from_settings( + self, base_metadata_item + ): + from async_worker.config import settings as aw_settings + + with patch.object(aw_settings, "DATABASE_ON", False), \ + patch( + "fastfetchbot_shared.database.mongodb.cache.save_metadata", + new_callable=AsyncMock, + ) as mock_save: + await enrich( + base_metadata_item, + store_telegraph=False, + store_document=False, + store_database=None, + ) + + mock_save.assert_not_awaited() diff --git a/tests/unit/async_worker/test_scrape_cache.py b/tests/unit/async_worker/test_scrape_cache.py new file mode 100644 index 0000000..6d70a1c --- /dev/null +++ b/tests/unit/async_worker/test_scrape_cache.py @@ -0,0 +1,209 @@ +"""Tests for cache-related behavior in scrape_and_enrich task. + +Covers: force_refresh_cache parameter, _cached flag handling, database_cache_ttl passthrough. +""" + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from async_worker.tasks.scrape import scrape_and_enrich + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def ctx(): + """ARQ worker context dict.""" + return {"redis": MagicMock()} + + +@pytest.fixture +def mock_outbox(): + """Patch outbox.push in the scrape module.""" + with patch("async_worker.tasks.scrape.outbox") as mock_mod: + mock_mod.push = AsyncMock() + yield mock_mod + + +@pytest.fixture +def mock_enrichment(): + """Patch enrichment.enrich in the scrape module.""" + with patch("async_worker.tasks.scrape.enrichment") as mock_mod: + mock_mod.enrich = AsyncMock( + return_value={ + "title": "Enriched", + "content": "hi
", + "media_files": [], + "telegraph_url": "https://telegra.ph/test", + } + ) + yield mock_mod + + +# --------------------------------------------------------------------------- +# force_refresh_cache → database_cache_ttl +# --------------------------------------------------------------------------- + + +class TestForceRefreshCache: + @pytest.mark.asyncio + async def test_force_refresh_sets_ttl_negative_one( + self, ctx, mock_outbox, mock_enrichment + ): + """When force_refresh_cache=True, database_cache_ttl should be -1.""" + with patch("async_worker.tasks.scrape.InfoExtractService") as MockCls: + instance = AsyncMock() + instance.get_item = AsyncMock( + return_value={"title": "Test", "content": "", "media_files": []} + ) + MockCls.return_value = instance + + await scrape_and_enrich( + ctx, + url="https://example.com", + chat_id=1, + force_refresh_cache=True, + ) + + call_kwargs = MockCls.call_args.kwargs + assert call_kwargs["database_cache_ttl"] == -1 + + @pytest.mark.asyncio + async def test_no_force_refresh_uses_config_ttl( + self, ctx, mock_outbox, mock_enrichment + ): + """When force_refresh_cache=False, use settings.DATABASE_CACHE_TTL.""" + with patch("async_worker.tasks.scrape.InfoExtractService") as MockCls, \ + patch("async_worker.tasks.scrape.settings") as mock_settings: + mock_settings.DATABASE_ON = True + mock_settings.DATABASE_CACHE_TTL = 86400 + mock_settings.DOWNLOAD_VIDEO_TIMEOUT = 60 + + instance = AsyncMock() + instance.get_item = AsyncMock( + return_value={"title": "Test", "content": "", "media_files": []} + ) + MockCls.return_value = instance + + await scrape_and_enrich( + ctx, + url="https://example.com", + chat_id=1, + force_refresh_cache=False, + ) + + call_kwargs = MockCls.call_args.kwargs + assert call_kwargs["database_cache_ttl"] == 86400 + + @pytest.mark.asyncio + async def test_default_force_refresh_is_false( + self, ctx, mock_outbox, mock_enrichment + ): + """force_refresh_cache defaults to False.""" + with patch("async_worker.tasks.scrape.InfoExtractService") as MockCls, \ + patch("async_worker.tasks.scrape.settings") as mock_settings: + mock_settings.DATABASE_ON = True + mock_settings.DATABASE_CACHE_TTL = 3600 + mock_settings.DOWNLOAD_VIDEO_TIMEOUT = 60 + + instance = AsyncMock() + instance.get_item = AsyncMock( + return_value={"title": "Test", "content": "", "media_files": []} + ) + MockCls.return_value = instance + + await scrape_and_enrich( + ctx, + url="https://example.com", + chat_id=1, + # force_refresh_cache not passed + ) + + call_kwargs = MockCls.call_args.kwargs + assert call_kwargs["database_cache_ttl"] == 3600 + + +# --------------------------------------------------------------------------- +# _cached flag → skip enrichment +# --------------------------------------------------------------------------- + + +class TestCachedFlagHandling: + @pytest.mark.asyncio + async def test_cached_result_skips_enrichment( + self, ctx, mock_outbox + ): + """When get_item returns _cached=True, enrichment should NOT be called.""" + with patch("async_worker.tasks.scrape.InfoExtractService") as MockCls, \ + patch("async_worker.tasks.scrape.enrichment") as mock_enrich: + instance = AsyncMock() + instance.get_item = AsyncMock( + return_value={ + "title": "Cached", + "content": "", + "media_files": [], + "_cached": True, + } + ) + MockCls.return_value = instance + mock_enrich.enrich = AsyncMock() + + result = await scrape_and_enrich( + ctx, url="https://example.com", chat_id=1 + ) + + mock_enrich.enrich.assert_not_awaited() + assert result["status"] == "success" + + @pytest.mark.asyncio + async def test_cached_flag_popped_from_result( + self, ctx, mock_outbox + ): + """The _cached flag should be popped (removed) from the metadata_item.""" + with patch("async_worker.tasks.scrape.InfoExtractService") as MockCls, \ + patch("async_worker.tasks.scrape.enrichment") as mock_enrich: + metadata = { + "title": "Cached", + "content": "", + "media_files": [], + "_cached": True, + } + instance = AsyncMock() + instance.get_item = AsyncMock(return_value=metadata) + MockCls.return_value = instance + mock_enrich.enrich = AsyncMock() + + await scrape_and_enrich( + ctx, url="https://example.com", chat_id=1 + ) + + # The outbox should receive metadata without _cached + outbox_call_kwargs = mock_outbox.push.call_args.kwargs + pushed_item = outbox_call_kwargs["metadata_item"] + assert "_cached" not in pushed_item + + @pytest.mark.asyncio + async def test_non_cached_result_runs_enrichment( + self, ctx, mock_outbox, mock_enrichment + ): + """When _cached is not in result, enrichment should be called.""" + with patch("async_worker.tasks.scrape.InfoExtractService") as MockCls: + instance = AsyncMock() + instance.get_item = AsyncMock( + return_value={ + "title": "Fresh", + "content": "", + "media_files": [], + } + ) + MockCls.return_value = instance + + await scrape_and_enrich( + ctx, url="https://example.com", chat_id=1 + ) + + mock_enrichment.enrich.assert_awaited_once() diff --git a/tests/unit/async_worker/test_worker_lifecycle.py b/tests/unit/async_worker/test_worker_lifecycle.py new file mode 100644 index 0000000..fbc4f09 --- /dev/null +++ b/tests/unit/async_worker/test_worker_lifecycle.py @@ -0,0 +1,74 @@ +"""Tests for WorkerSettings on_startup / on_shutdown MongoDB lifecycle hooks.""" + +from unittest.mock import AsyncMock, patch + +import pytest + +from async_worker.main import WorkerSettings + + +# --------------------------------------------------------------------------- +# on_startup +# --------------------------------------------------------------------------- + + +class TestWorkerStartup: + @pytest.mark.asyncio + async def test_init_mongodb_called_when_database_on(self): + with patch("async_worker.main.settings") as mock_settings, \ + patch( + "fastfetchbot_shared.database.mongodb.init_mongodb", + new_callable=AsyncMock, + ) as mock_init: + mock_settings.DATABASE_ON = True + mock_settings.MONGODB_URL = "mongodb://localhost:27017" + + await WorkerSettings.on_startup({}) + + mock_init.assert_awaited_once_with("mongodb://localhost:27017") + + @pytest.mark.asyncio + async def test_init_mongodb_skipped_when_database_off(self): + with patch("async_worker.main.settings") as mock_settings, \ + patch( + "fastfetchbot_shared.database.mongodb.init_mongodb", + new_callable=AsyncMock, + ) as mock_init: + mock_settings.DATABASE_ON = False + + await WorkerSettings.on_startup({}) + + mock_init.assert_not_awaited() + + +# --------------------------------------------------------------------------- +# on_shutdown +# --------------------------------------------------------------------------- + + +class TestWorkerShutdown: + @pytest.mark.asyncio + async def test_close_mongodb_called_when_database_on(self): + with patch("async_worker.main.settings") as mock_settings, \ + patch( + "fastfetchbot_shared.database.mongodb.close_mongodb", + new_callable=AsyncMock, + ) as mock_close: + mock_settings.DATABASE_ON = True + + await WorkerSettings.on_shutdown({}) + + mock_close.assert_awaited_once() + + @pytest.mark.asyncio + async def test_close_mongodb_skipped_when_database_off(self): + with patch("async_worker.main.settings") as mock_settings, \ + patch( + "fastfetchbot_shared.database.mongodb.close_mongodb", + new_callable=AsyncMock, + ) as mock_close: + mock_settings.DATABASE_ON = False + + await WorkerSettings.on_shutdown({}) + + mock_close.assert_not_awaited() diff --git a/tests/unit/database/__init__.py b/tests/unit/database/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/database/mongodb/__init__.py b/tests/unit/database/mongodb/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/database/mongodb/test_cache.py b/tests/unit/database/mongodb/test_cache.py new file mode 100644 index 0000000..0bdb549 --- /dev/null +++ b/tests/unit/database/mongodb/test_cache.py @@ -0,0 +1,196 @@ +"""Tests for packages/shared/fastfetchbot_shared/database/mongodb/cache.py""" + +from datetime import datetime, timedelta +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_mock_metadata(url="https://example.com", version=1, timestamp=None): + """Create a mock Metadata document.""" + doc = MagicMock() + doc.url = url + doc.version = version + doc.timestamp = timestamp or datetime.utcnow() + return doc + + +def _make_find_chain(result): + """Build a mock chain for Metadata.find().sort().limit().first_or_none().""" + mock_first = AsyncMock(return_value=result) + mock_limit = MagicMock() + mock_limit.first_or_none = mock_first + mock_sort = MagicMock() + mock_sort.limit.return_value = mock_limit + mock_find = MagicMock() + mock_find.sort.return_value = mock_sort + return mock_find + + +# --------------------------------------------------------------------------- +# find_cached +# --------------------------------------------------------------------------- + + +class TestFindCached: + @pytest.mark.asyncio + async def test_returns_none_when_no_document_found(self): + mock_find = _make_find_chain(None) + + with patch( + "fastfetchbot_shared.database.mongodb.cache.Metadata" + ) as MockMetadata: + MockMetadata.find.return_value = mock_find.sort.return_value.__class__() + # Simpler approach: patch the full chain + MockMetadata.find.return_value = mock_find + + from fastfetchbot_shared.database.mongodb.cache import find_cached + + result = await find_cached("https://example.com", ttl_seconds=3600) + + assert result is None + + @pytest.mark.asyncio + async def test_returns_document_when_ttl_zero(self): + """ttl_seconds=0 means never expire — always return cached doc.""" + doc = _make_mock_metadata(timestamp=datetime.utcnow() - timedelta(days=365)) + mock_find = _make_find_chain(doc) + + with patch( + "fastfetchbot_shared.database.mongodb.cache.Metadata" + ) as MockMetadata: + MockMetadata.find.return_value = mock_find + + from fastfetchbot_shared.database.mongodb.cache import find_cached + + result = await find_cached("https://example.com", ttl_seconds=0) + + assert result is doc + + @pytest.mark.asyncio + async def test_returns_document_within_ttl(self): + doc = _make_mock_metadata(timestamp=datetime.utcnow() - timedelta(seconds=30)) + mock_find = _make_find_chain(doc) + + with patch( + "fastfetchbot_shared.database.mongodb.cache.Metadata" + ) as MockMetadata: + MockMetadata.find.return_value = mock_find + + from fastfetchbot_shared.database.mongodb.cache import find_cached + + result = await find_cached("https://example.com", ttl_seconds=3600) + + assert result is doc + + @pytest.mark.asyncio + async def test_returns_none_when_ttl_expired(self): + doc = _make_mock_metadata( + timestamp=datetime.utcnow() - timedelta(seconds=7200) + ) + mock_find = _make_find_chain(doc) + + with patch( + "fastfetchbot_shared.database.mongodb.cache.Metadata" + ) as MockMetadata: + MockMetadata.find.return_value = mock_find + + from fastfetchbot_shared.database.mongodb.cache import find_cached + + result = await find_cached("https://example.com", ttl_seconds=3600) + + assert result is None + + +# --------------------------------------------------------------------------- +# save_metadata +# --------------------------------------------------------------------------- + + +class TestSaveMetadata: + @pytest.mark.asyncio + async def test_first_save_uses_version_1(self): + mock_find = _make_find_chain(None) # No existing doc + + with patch( + "fastfetchbot_shared.database.mongodb.cache.Metadata" + ) as MockMetadata: + MockMetadata.find.return_value = mock_find + mock_constructed = MagicMock() + MockMetadata.model_construct.return_value = mock_constructed + MockMetadata.insert = AsyncMock() + + from fastfetchbot_shared.database.mongodb.cache import save_metadata + + item = {"url": "https://example.com", "title": "Test"} + result = await save_metadata(item) + + assert item["version"] == 1 + MockMetadata.model_construct.assert_called_once() + MockMetadata.insert.assert_awaited_once_with(mock_constructed) + assert result is mock_constructed + + @pytest.mark.asyncio + async def test_increments_version_from_existing(self): + existing_doc = _make_mock_metadata(version=3) + mock_find = _make_find_chain(existing_doc) + + with patch( + "fastfetchbot_shared.database.mongodb.cache.Metadata" + ) as MockMetadata: + MockMetadata.find.return_value = mock_find + mock_constructed = MagicMock() + MockMetadata.model_construct.return_value = mock_constructed + MockMetadata.insert = AsyncMock() + + from fastfetchbot_shared.database.mongodb.cache import save_metadata + + item = {"url": "https://example.com", "title": "Test"} + await save_metadata(item) + + assert item["version"] == 4 + + @pytest.mark.asyncio + async def test_uses_url_from_metadata_item(self): + mock_find = _make_find_chain(None) + + with patch( + "fastfetchbot_shared.database.mongodb.cache.Metadata" + ) as MockMetadata: + MockMetadata.find.return_value = mock_find + MockMetadata.model_construct.return_value = MagicMock() + MockMetadata.insert = AsyncMock() + + from fastfetchbot_shared.database.mongodb.cache import save_metadata + + item = {"url": "https://specific.com/path", "title": "Test"} + await save_metadata(item) + + # Verify the find was called (to look up existing version) + MockMetadata.find.assert_called() + + @pytest.mark.asyncio + async def test_missing_url_raises_value_error(self): + from fastfetchbot_shared.database.mongodb.cache import save_metadata + + with pytest.raises(ValueError, match="non-empty 'url'"): + await save_metadata({"title": "No URL"}) + + @pytest.mark.asyncio + async def test_empty_url_raises_value_error(self): + from fastfetchbot_shared.database.mongodb.cache import save_metadata + + with pytest.raises(ValueError, match="non-empty 'url'"): + await save_metadata({"url": "", "title": "Empty URL"}) + + @pytest.mark.asyncio + async def test_whitespace_only_url_raises_value_error(self): + from fastfetchbot_shared.database.mongodb.cache import save_metadata + + with pytest.raises(ValueError, match="non-empty 'url'"): + await save_metadata({"url": " ", "title": "Whitespace URL"}) diff --git a/tests/unit/database/mongodb/test_connection.py b/tests/unit/database/mongodb/test_connection.py new file mode 100644 index 0000000..662f55a --- /dev/null +++ b/tests/unit/database/mongodb/test_connection.py @@ -0,0 +1,171 @@ +"""Tests for packages/shared/fastfetchbot_shared/database/mongodb/connection.py""" + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + + +@pytest.fixture(autouse=True) +def _reset_client(): + """Reset the module-level _client before each test.""" + from fastfetchbot_shared.database.mongodb import connection + + connection._client = None + yield + connection._client = None + + +# --------------------------------------------------------------------------- +# init_mongodb +# --------------------------------------------------------------------------- + + +class TestInitMongodb: + @pytest.mark.asyncio + async def test_creates_motor_client_and_calls_init_beanie(self): + with patch( + "fastfetchbot_shared.database.mongodb.connection.AsyncIOMotorClient" + ) as MockMotor, patch( + "fastfetchbot_shared.database.mongodb.connection.init_beanie", + new_callable=AsyncMock, + ) as mock_init_beanie: + mock_client = MagicMock() + mock_db = MagicMock() + mock_client.__getitem__ = MagicMock(return_value=mock_db) + MockMotor.return_value = mock_client + + from fastfetchbot_shared.database.mongodb.connection import ( + init_mongodb, + document_list, + ) + + await init_mongodb("mongodb://localhost:27017", "test_db") + + MockMotor.assert_called_once_with("mongodb://localhost:27017") + mock_client.__getitem__.assert_called_once_with("test_db") + mock_init_beanie.assert_awaited_once_with( + database=mock_db, document_models=document_list + ) + + @pytest.mark.asyncio + async def test_default_db_name_is_telegram_bot(self): + with patch( + "fastfetchbot_shared.database.mongodb.connection.AsyncIOMotorClient" + ) as MockMotor, patch( + "fastfetchbot_shared.database.mongodb.connection.init_beanie", + new_callable=AsyncMock, + ): + mock_client = MagicMock() + mock_client.__getitem__ = MagicMock(return_value=MagicMock()) + MockMotor.return_value = mock_client + + from fastfetchbot_shared.database.mongodb.connection import init_mongodb + + await init_mongodb("mongodb://localhost:27017") + + mock_client.__getitem__.assert_called_once_with("telegram_bot") + + @pytest.mark.asyncio + async def test_sets_module_level_client(self): + with patch( + "fastfetchbot_shared.database.mongodb.connection.AsyncIOMotorClient" + ) as MockMotor, patch( + "fastfetchbot_shared.database.mongodb.connection.init_beanie", + new_callable=AsyncMock, + ): + mock_client = MagicMock() + mock_client.__getitem__ = MagicMock(return_value=MagicMock()) + MockMotor.return_value = mock_client + + from fastfetchbot_shared.database.mongodb import connection + from fastfetchbot_shared.database.mongodb.connection import init_mongodb + + await init_mongodb("mongodb://localhost:27017") + + assert connection._client is mock_client + + +# --------------------------------------------------------------------------- +# close_mongodb +# --------------------------------------------------------------------------- + + +class TestCloseMongodb: + @pytest.mark.asyncio + async def test_closes_client_and_sets_to_none(self): + from fastfetchbot_shared.database.mongodb import connection + from fastfetchbot_shared.database.mongodb.connection import close_mongodb + + mock_client = MagicMock() + connection._client = mock_client + + await close_mongodb() + + mock_client.close.assert_called_once() + assert connection._client is None + + @pytest.mark.asyncio + async def test_noop_when_client_is_none(self): + from fastfetchbot_shared.database.mongodb import connection + from fastfetchbot_shared.database.mongodb.connection import close_mongodb + + connection._client = None + + # Should not raise + await close_mongodb() + assert connection._client is None + + +# --------------------------------------------------------------------------- +# save_instances +# --------------------------------------------------------------------------- + + +class TestSaveInstances: + @pytest.mark.asyncio + async def test_raises_type_error_for_none(self): + from fastfetchbot_shared.database.mongodb.connection import save_instances + + with pytest.raises(TypeError, match="instances must be a Model"): + await save_instances(None) + + @pytest.mark.asyncio + async def test_single_document_calls_insert(self): + """Test that passing a single Document instance calls type.insert().""" + from beanie import Document + from fastfetchbot_shared.database.mongodb.connection import save_instances + + # Create a mock that passes isinstance(x, Document) + mock_doc = MagicMock(spec=Document) + mock_doc_type = type(mock_doc) + mock_doc_type.insert = AsyncMock() + + await save_instances(mock_doc) + mock_doc_type.insert.assert_awaited_once_with(mock_doc) + + @pytest.mark.asyncio + async def test_list_of_documents_calls_insert_many(self): + from fastfetchbot_shared.database.mongodb.connection import save_instances + + mock_doc1 = MagicMock() + mock_doc2 = MagicMock() + mock_doc_type = type(mock_doc1) + mock_doc_type.insert_many = AsyncMock() + + docs = [mock_doc1, mock_doc2] + await save_instances(docs) + mock_doc_type.insert_many.assert_awaited_once_with(docs) + + @pytest.mark.asyncio + async def test_empty_list_is_noop(self): + from fastfetchbot_shared.database.mongodb.connection import save_instances + + # Should return early without error + await save_instances([]) + + @pytest.mark.asyncio + async def test_raises_type_error_for_invalid_type(self): + from fastfetchbot_shared.database.mongodb.connection import save_instances + + with pytest.raises(TypeError, match="instances must be a Model"): + await save_instances("not_a_document") diff --git a/tests/unit/database/mongodb/test_metadata_model.py b/tests/unit/database/mongodb/test_metadata_model.py new file mode 100644 index 0000000..c3bdcd9 --- /dev/null +++ b/tests/unit/database/mongodb/test_metadata_model.py @@ -0,0 +1,202 @@ +"""Tests for packages/shared/fastfetchbot_shared/database/mongodb/models/metadata.py + +Only tests pure logic (prepare_for_insert, from_dict, DatabaseMediaFile). +Does NOT require a running MongoDB instance. + +NOTE: Beanie Document.__init__ calls get_motor_collection() which requires +init_beanie(). We use model_construct() to bypass validation and init for +most tests, since we're testing business logic not Beanie integration. +""" + +from unittest.mock import patch + +import pytest + +from fastfetchbot_shared.models.metadata_item import MediaFile, MessageType +from fastfetchbot_shared.database.mongodb.models.metadata import ( + DatabaseMediaFile, + Metadata, + document_list, +) + + +# --------------------------------------------------------------------------- +# DatabaseMediaFile +# --------------------------------------------------------------------------- + + +class TestDatabaseMediaFile: + def test_inherits_from_media_file(self): + assert issubclass(DatabaseMediaFile, MediaFile) + + def test_default_file_key_is_none(self): + dmf = DatabaseMediaFile(media_type="photo", url="https://img.com/1.jpg") + assert dmf.file_key is None + + def test_file_key_can_be_set(self): + dmf = DatabaseMediaFile( + media_type="photo", + url="https://img.com/1.jpg", + file_key="s3://bucket/key.jpg", + ) + assert dmf.file_key == "s3://bucket/key.jpg" + + def test_inherits_media_file_fields(self): + dmf = DatabaseMediaFile( + media_type="video", + url="https://vid.com/v.mp4", + original_url="https://original.com/v.mp4", + caption="test caption", + ) + assert dmf.media_type == "video" + assert dmf.url == "https://vid.com/v.mp4" + assert dmf.original_url == "https://original.com/v.mp4" + assert dmf.caption == "test caption" + + +# --------------------------------------------------------------------------- +# Metadata model (using model_construct to avoid Beanie init) +# --------------------------------------------------------------------------- + + +def _make_metadata(**kwargs): + """Create a Metadata via model_construct (bypasses Beanie motor check).""" + defaults = { + "url": "https://example.com", + "title": "untitled", + "message_type": MessageType.SHORT, + "text_length": 0, + "content_length": 0, + "scrape_status": False, + "version": 1, + } + defaults.update(kwargs) + return Metadata.model_construct(**defaults) + + +class TestMetadataModel: + def test_document_list_contains_metadata(self): + assert Metadata in document_list + + def test_default_field_values(self): + m = _make_metadata() + assert m.title == "untitled" + assert m.message_type == MessageType.SHORT + assert m.version == 1 + assert m.scrape_status is False + + def test_custom_fields(self): + m = _make_metadata( + url="https://example.com", + title="Test", + version=2, + ) + assert m.url == "https://example.com" + assert m.title == "Test" + assert m.version == 2 + + def test_from_dict_raises_for_non_dict(self): + with pytest.raises(AssertionError): + Metadata.from_dict("not a dict") + + def test_settings_has_indexes(self): + assert hasattr(Metadata, "Settings") + assert hasattr(Metadata.Settings, "indexes") + indexes = Metadata.Settings.indexes + assert len(indexes) >= 1 + # First index should be (url, version) compound index + first_index = indexes[0] + field_names = [idx[0] for idx in first_index] + assert "url" in field_names + assert "version" in field_names + + +# --------------------------------------------------------------------------- +# prepare_for_insert +# --------------------------------------------------------------------------- + + +class TestPrepareForInsert: + def test_computes_text_length(self): + m = _make_metadata(text="Hello world
") + with patch( + "fastfetchbot_shared.database.mongodb.models.metadata.get_html_text_length", + return_value=11, + ): + m.prepare_for_insert() + assert m.text_length == 11 + + def test_computes_content_length(self): + m = _make_metadata(content="