Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 48 additions & 9 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,17 @@ FastFetchBot/
│ ├── config.py # URL patterns, shared env vars
│ ├── models/ # UrlMetadata, MetadataItem, NamedBytesIO, etc.
│ ├── utils/ # parse, image, logger, network, cookie
│ ├── database/
│ │ ├── base.py, engine.py, session.py # SQLAlchemy (user settings)
│ │ ├── models/user_setting.py # UserSetting SQLAlchemy model
│ │ └── mongodb/ # Beanie ODM (scraped content)
│ │ ├── connection.py # init_mongodb(), close_mongodb(), save_instances()
│ │ ├── cache.py # find_cached(), save_metadata() — URL-based cache with TTL + versioning
│ │ └── models/metadata.py # Metadata Document, DatabaseMediaFile
│ └── services/
│ ├── scrapers/ # All platform scrapers + ScraperManager + InfoExtractService
│ │ ├── config.py # ALL scraper env vars (platform creds, Firecrawl, Zyte, Telegraph tokens)
│ │ ├── common.py # Core InfoExtractService (scraping only, no API dependencies)
│ │ ├── common.py # Core InfoExtractService (scraping + MongoDB cache lookup)
│ │ ├── scraper_manager.py
│ │ ├── scraper.py # Base Scraper + DataProcessor ABCs
│ │ ├── templates/ # 13 Jinja2 templates for platform output formatting
Expand Down Expand Up @@ -50,7 +57,9 @@ The Telegram Bot communicates with the API server over HTTP (`API_SERVER_URL`).
- **`main.py`** — FastAPI app setup, Sentry integration, lifecycle management
- **`config.py`** — API-only env vars: BASE_URL, API_KEY, DATABASE_ON, MongoDB, Celery, AWS S3, Inoreader, locale/i18n. **No scraper credentials** (those live in `fastfetchbot_shared.services.scrapers.config`)
- **`routers/`** — `scraper.py` (generic endpoint), `scraper_routers.py` (platform-specific), `inoreader.py`, `wechat.py`
- **`services/scrapers/common.py`** — `InfoExtractService` (enriched): extends core `InfoExtractService` from shared with Telegraph publishing, PDF export, DB storage, and video download (youtube/bilibili)
- **`services/scrapers/common.py`** — `InfoExtractService` (enriched): extends core `InfoExtractService` from shared with Telegraph publishing, PDF export, DB storage (via `save_metadata()`), and video download (youtube/bilibili). Defaults `database_cache_ttl` from `settings.DATABASE_CACHE_TTL`. Skips enrichment on cache hits via `_cached` flag
- **`database.py`** — Thin wrapper delegating to `fastfetchbot_shared.database.mongodb` (init/close/save)
- **`models/database_model.py`** — Re-export wrapper for `Metadata` from shared
- **`services/file_export/`** — PDF generation, audio transcription (OpenAI), video download
- **`services/amazon/s3.py`** — S3 storage integration
- **`services/telegraph/`** — Re-export wrapper: `from fastfetchbot_shared.services.telegraph import Telegraph`
Expand All @@ -59,19 +68,27 @@ The Telegram Bot communicates with the API server over HTTP (`API_SERVER_URL`).

- **`main.py`** — Entry point
- **`api_client.py`** — HTTP client calling the API server
- **`handlers/`** — `messages.py`, `buttons.py`, `url_process.py`
- **`services/`** — `bot_app.py`, `message_sender.py`, `constants.py`
- **`queue_client.py`** — ARQ Redis client for enqueuing scrape jobs (queue mode)
- **`handlers/`** — `messages.py`, `buttons.py`, `url_process.py`, `commands.py` (start, /settings with inline toggles)
- **`services/`** — `bot_app.py`, `message_sender.py`, `user_settings.py` (get/toggle `auto_fetch_in_dm` and `force_refresh_cache`), `constants.py`
- **`webhook/server.py`** — Webhook/polling server
- **`templates/`** — Jinja2 templates for bot messages

### Shared Library (`packages/shared/fastfetchbot_shared/`)

- **`config.py`** — URL patterns (SOCIAL_MEDIA_WEBSITE_PATTERNS, VIDEO_WEBSITE_PATTERNS, BANNED_PATTERNS); shared env vars including `SIGN_SERVER_URL` and `XHS_COOKIE_PATH`
- **`models/`** — `classes.py` (NamedBytesIO), `metadata_item.py`, `telegraph_item.py`, `url_metadata.py`
- **`models/`** — `classes.py` (NamedBytesIO), `metadata_item.py` (MediaFile, MetadataItem, MessageType), `telegraph_item.py`, `url_metadata.py`
- **`utils/`** — `parse.py` (URL parsing, HTML processing, `get_env_bool`), `image.py`, `logger.py`, `network.py`, `cookie.py`
- **`database/`** — Dual database layer:
- **SQLAlchemy** (user settings): `base.py`, `engine.py`, `session.py`, `models/user_setting.py` — `UserSetting` model with `auto_fetch_in_dm` and `force_refresh_cache` toggles. Supports SQLite (dev) and PostgreSQL (prod) via `SETTINGS_DATABASE_URL`. Alembic migrations in `packages/shared/alembic/`
- **`database/mongodb/`** — Beanie ODM for scraped content persistence, shared across API and async worker:
- **`connection.py`** — `init_mongodb(mongodb_url, db_name)`, `close_mongodb()`, `save_instances()`. Parameterized — each app passes its own config at startup
- **`cache.py`** — MongoDB-backed URL cache: `find_cached(url, ttl_seconds)` returns the latest versioned document if within TTL (0 = never expire); `save_metadata(metadata_item)` auto-increments `version` for the same URL before inserting
- **`models/metadata.py`** — `Metadata(Document)` with fields: title, url, author, content, media_files, telegraph_url, timestamp, version, etc. `DatabaseMediaFile(MediaFile)` extends the scraper `MediaFile` dataclass with `file_key` for S3 storage. Compound index on `(url, version)` for efficient cache lookups. `@before_event(Insert)` hook auto-computes text lengths and converts `MediaFile` → `DatabaseMediaFile`
- **`__init__.py`** — Re-exports: `init_mongodb`, `close_mongodb`, `save_instances`, `find_cached`, `save_metadata`, `Metadata`, `DatabaseMediaFile`
- **`services/scrapers/`** — All platform scrapers, fully decoupled from FastAPI:
- **`config.py`** — All scraper env vars: platform credentials (Twitter, Bluesky, Weibo, XHS, Zhihu, Reddit, Instagram), Firecrawl/Zyte config, OpenAI key, Telegraph tokens, `JINJA2_ENV`, cookie file loading. Configurable `CONF_DIR` for cookie/config files
- **`common.py`** — Core `InfoExtractService`: routes URLs to the correct scraper, returns raw metadata. No Telegraph/PDF/DB dependencies
- **`common.py`** — Core `InfoExtractService`: routes URLs to the correct scraper, returns raw metadata. Includes MongoDB cache lookup at the top of `get_item()` when `store_database=True` and `database_cache_ttl >= 0`. Cache hits return a dict with `_cached=True` so callers can skip enrichment. Uses lazy imports for `find_cached` to avoid import-time beanie dependency
- **`scraper.py`** — Base `Scraper` and `DataProcessor` abstract classes
- **`scraper_manager.py`** — `ScraperManager` with lazy initialization for bluesky, weibo, and general scrapers
- **`templates/`** — 13 Jinja2 templates for platform-specific output formatting (bundled via `__file__`-relative paths)
Expand All @@ -84,7 +101,7 @@ The shared scrapers library can be used standalone without the API server:
from fastfetchbot_shared.services.scrapers import InfoExtractService, ScraperManager
```

Optional dependencies are grouped under `fastfetchbot-shared[scrapers]` (Jinja2, atproto, asyncpraw, firecrawl-py, etc.).
Optional dependencies are grouped under `fastfetchbot-shared[scrapers]` (Jinja2, atproto, asyncpraw, firecrawl-py, etc.) and `fastfetchbot-shared[mongodb]` (beanie, motor).

## Development Commands

Expand Down Expand Up @@ -161,8 +178,30 @@ See `template.env` for a complete reference. Key variables:
- See `template.env` for all platform-specific variables (Twitter, Weibo, Xiaohongshu, Reddit, Instagram, Bluesky, etc.)

### Database
- Optional MongoDB integration (`DATABASE_ON=true`)
- Uses Beanie ODM for async operations

**MongoDB** (scraped content — optional, feature-gated):
| Variable | Default | Description |
|----------|---------|-------------|
| `DATABASE_ON` | `false` | Enable MongoDB storage of scraped metadata |
| `DATABASE_CACHE_TTL` | `86400` | Cache TTL in seconds. `0` = never expire (always use cache) |
| `MONGODB_HOST` | `localhost` | MongoDB host |
| `MONGODB_PORT` | `27017` | MongoDB port |
| `MONGODB_USERNAME` | `""` | MongoDB username (async worker only; included in derived URL if set) |
| `MONGODB_PASSWORD` | `""` | MongoDB password (async worker only) |
| `MONGODB_URL` | derived | Full MongoDB URI. Overrides host/port/credentials if set explicitly |

MongoDB models and connection logic live in `packages/shared/fastfetchbot_shared/database/mongodb/`. Both the API server and async worker use the same shared ODM layer. The `Metadata` Beanie Document stores scraped content with versioning — each re-scrape of the same URL increments the `version` field. The cache system (`find_cached` / `save_metadata`) queries the latest version and checks TTL before deciding to re-scrape.

**SQLite/PostgreSQL** (user settings — always enabled for the Telegram bot):
| Variable | Default | Description |
|----------|---------|-------------|
| `SETTINGS_DATABASE_URL` | `sqlite+aiosqlite:///data/fastfetchbot.db` | SQLAlchemy connection URL. Use `postgresql+asyncpg://...` for production |

Alembic migrations live in `packages/shared/alembic/`. Run with:
```bash
cd packages/shared
SETTINGS_DATABASE_URL="postgresql+asyncpg://user:pass@host:5432/db" uv run alembic upgrade head
```

## CI/CD

Expand Down
3 changes: 1 addition & 2 deletions apps/api/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ name = "fastfetchbot-api"
version = "0.1.0"
requires-python = ">=3.12,<3.13"
dependencies = [
"fastfetchbot-shared[scrapers]",
"fastfetchbot-shared[scrapers,mongodb]",
"fastapi>=0.115.12",
"sentry-sdk[fastapi]>=2.27.0",
"gunicorn>=23.0.0",
"uvicorn>=0.34.2",
"babel>=2.17.0",
"beanie>=1.29.0",
"pillow>=10.0.0",
"pydub>=0.25.1",
"xhtml2pdf>=0.2.17",
Expand Down
1 change: 1 addition & 0 deletions apps/api/src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class ApiSettings(BaseSettings):

# MongoDB
DATABASE_ON: bool = False
DATABASE_CACHE_TTL: int = 86400 # seconds; 0 = never expire
MONGODB_PORT: int = 27017
MONGODB_HOST: str = "localhost"
MONGODB_URL: str = ""
Expand Down
37 changes: 7 additions & 30 deletions apps/api/src/database.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,14 @@
from typing import Optional, Union, List

from motor.motor_asyncio import AsyncIOMotorClient
from beanie import init_beanie, Document, Indexed

from fastfetchbot_shared.database.mongodb import (
init_mongodb,
close_mongodb,
save_instances,
)
from src.config import settings
from src.models.database_model import document_list
from fastfetchbot_shared.utils.logger import logger


async def startup() -> None:
client = AsyncIOMotorClient(settings.MONGODB_URL)
await init_beanie(database=client["telegram_bot"], document_models=document_list)
await init_mongodb(settings.MONGODB_URL)


async def shutdown() -> None:
pass


async def save_instances(instances: Union[Document, List[Document]], *args) -> None:
if instances is None:
raise TypeError("instances must be a Model or a list of Model")

if isinstance(instances, Document):
instance_type = type(instances)
await instance_type.insert(instances)
elif isinstance(instances, list):
instance_type = type(instances[0])
await instance_type.insert_many(instances)
else:
raise TypeError("instances must be a Model or a list of Model")

for arg in args:
if not isinstance(arg, Document):
raise TypeError("args must be a Model")
instance_type = type(arg)
await instance_type.insert_one(arg)
await close_mongodb()
45 changes: 5 additions & 40 deletions apps/api/src/models/database_model.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,6 @@
from typing import Optional, Any
from datetime import datetime
from fastfetchbot_shared.database.mongodb.models.metadata import (
Metadata,
document_list,
)

from pydantic import BaseModel, Field
from beanie import Document, Indexed, Insert, after_event, before_event

from fastfetchbot_shared.models.metadata_item import MediaFile, MessageType
from fastfetchbot_shared.utils.logger import logger
from fastfetchbot_shared.utils.parse import get_html_text_length


class Metadata(Document):
title: str = Field(default="untitled")
message_type: MessageType = MessageType.SHORT
url: str
author: Optional[str] = None
author_url: Optional[str] = None
text: Optional[str] = None
text_length: Optional[int] = Field(ge=0)
content: Optional[str] = None
content_length: Optional[int] = Field(ge=0)
category: Optional[str] = None
source: Optional[str] = None
media_files: Optional[list[MediaFile]] = None
telegraph_url: Optional[str] = None
timestamp: datetime = Field(default_factory=datetime.utcnow)
scrape_status: bool = False

@before_event(Insert)
def get_text_length(self):
self.text_length = get_html_text_length(self.text)
self.content_length = get_html_text_length(self.content)

#
@staticmethod
def from_dict(obj: Any) -> "Metadata":
assert isinstance(obj, dict)
return Metadata(**obj)


document_list = [Metadata]
__all__ = ["Metadata", "document_list"]
12 changes: 9 additions & 3 deletions apps/api/src/services/scrapers/common.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
from typing import Optional, Any

from src.models.database_model import Metadata
from fastfetchbot_shared.models.url_metadata import UrlMetadata
from fastfetchbot_shared.models.metadata_item import MessageType
from fastfetchbot_shared.services.scrapers.common import InfoExtractService as CoreInfoExtractService
from fastfetchbot_shared.services.telegraph import Telegraph
from src.services.file_export import video_download, document_export
from src.database import save_instances
from fastfetchbot_shared.utils.logger import logger
from src.config import settings

Expand All @@ -27,20 +25,26 @@ def __init__(
store_database: Optional[bool] = None,
store_telegraph: Optional[bool] = True,
store_document: Optional[bool] = False,
database_cache_ttl: Optional[int] = None,
**kwargs,
):
if store_database is None:
store_database = settings.DATABASE_ON
if database_cache_ttl is None:
database_cache_ttl = settings.DATABASE_CACHE_TTL
super().__init__(
url_metadata,
data=data,
store_database=store_database,
store_telegraph=store_telegraph,
store_document=store_document,
database_cache_ttl=database_cache_ttl,
**kwargs,
)

async def process_item(self, metadata_item: dict) -> dict:
if metadata_item.pop("_cached", False):
return metadata_item
if metadata_item.get("message_type") == MessageType.LONG:
self.store_telegraph = True
logger.info("message type is long, store in telegraph")
Expand Down Expand Up @@ -73,5 +77,7 @@ async def process_item(self, metadata_item: dict) -> dict:
metadata_item["title"] = metadata_item["title"].strip()
if self.store_database:
logger.info("store in database")
await save_instances(Metadata.model_construct(**metadata_item))
from fastfetchbot_shared.database.mongodb.cache import save_metadata

await save_metadata(metadata_item)
return metadata_item
8 changes: 7 additions & 1 deletion apps/async-worker/async_worker/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ class AsyncWorkerSettings(BaseSettings):
STORE_TELEGRAPH: bool = True
STORE_DOCUMENT: bool = False
DATABASE_ON: bool = False
DATABASE_CACHE_TTL: int = 86400 # seconds; 0 = never expire

# MongoDB
MONGODB_HOST: str = "localhost"
MONGODB_PORT: int = 27017
MONGODB_USERNAME: str = ""
MONGODB_PASSWORD: str = ""
MONGODB_URL: str = ""

# Timeout
Expand All @@ -34,7 +37,10 @@ class AsyncWorkerSettings(BaseSettings):
@model_validator(mode="after")
def _resolve_derived(self) -> "AsyncWorkerSettings":
if not self.MONGODB_URL:
self.MONGODB_URL = f"mongodb://{self.MONGODB_HOST}:{self.MONGODB_PORT}"
if self.MONGODB_USERNAME and self.MONGODB_PASSWORD:
self.MONGODB_URL = f"mongodb://{self.MONGODB_USERNAME}:{self.MONGODB_PASSWORD}@{self.MONGODB_HOST}:{self.MONGODB_PORT}"
else:
self.MONGODB_URL = f"mongodb://{self.MONGODB_HOST}:{self.MONGODB_PORT}"
Comment on lines 39 to +43
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

Do MongoDB connection string usernames and passwords need percent-encoding for reserved URI characters such as @, :, /, and %?

💡 Result:

Yes, MongoDB connection string usernames and passwords need percent-encoding for reserved URI characters such as @ (%40), : (%3A), / (%2F), and % (%25). Official MongoDB documentation explicitly states that if the username or password includes characters like $ : / ? # [ ] @, they must be converted using percent encoding. Examples include D1fficultP%40ssw0rd for a password containing @, and %2Ftmp%2Fmongodb-27017.sock for a UNIX socket path with /. This applies to both mongodb:// and mongodb+srv:// formats, and is confirmed across multiple MongoDB docs versions and community examples.

Citations:


🏁 Script executed:

# Check the actual code at lines 39-43 in config.py
cat -n apps/async-worker/async_worker/config.py | sed -n '35,50p'

Repository: aturret/FastFetchBot

Length of output: 679


🏁 Script executed:

# Check the connection.py file usage
if [ -f "packages/shared/fastfetchbot_shared/database/mongodb/connection.py" ]; then
  cat -n "packages/shared/fastfetchbot_shared/database/mongodb/connection.py" | head -30
else
  find . -name "connection.py" -path "*/mongodb/*" 2>/dev/null
fi

Repository: aturret/FastFetchBot

Length of output: 1217


🏁 Script executed:

# Search for other MongoDB URL construction patterns in the codebase
rg "mongodb://" --type py -B 2 -A 2

Repository: aturret/FastFetchBot

Length of output: 3748


Percent-encode MongoDB credentials in the connection string.

Raw interpolation of MONGODB_USERNAME and MONGODB_PASSWORD into the connection string will fail if credentials contain reserved URI characters like @, :, /, or %. MongoDB URIs require these characters to be percent-encoded. The URL is passed directly to AsyncIOMotorClient() at line 14 of packages/shared/fastfetchbot_shared/database/mongodb/connection.py, so invalid URIs will cause connection failures.

Suggested fix

Add the import:

from urllib.parse import quote_plus

Update the URL construction:

if not self.MONGODB_URL:
    if self.MONGODB_USERNAME and self.MONGODB_PASSWORD:
        username = quote_plus(self.MONGODB_USERNAME)
        password = quote_plus(self.MONGODB_PASSWORD)
        self.MONGODB_URL = (
            f"mongodb://{username}:{password}@"
            f"{self.MONGODB_HOST}:{self.MONGODB_PORT}"
        )
    else:
        self.MONGODB_URL = f"mongodb://{self.MONGODB_HOST}:{self.MONGODB_PORT}"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/async-worker/async_worker/config.py` around lines 39 - 43, The MongoDB
connection string is built by interpolating MONGODB_USERNAME and
MONGODB_PASSWORD directly into MONGODB_URL in config.py, which will break for
reserved URI characters; fix by importing urllib.parse.quote_plus and
percent-encoding the username and password (e.g., username =
quote_plus(self.MONGODB_USERNAME), password = quote_plus(self.MONGODB_PASSWORD))
before constructing self.MONGODB_URL using the existing MONGODB_HOST and
MONGODB_PORT, leaving the no-credentials branch unchanged.

return self


Expand Down
14 changes: 14 additions & 0 deletions apps/async-worker/async_worker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,17 @@ class WorkerSettings:

# Health-check the Redis connection every 30s to prevent stale connections
health_check_interval = 30

@staticmethod
async def on_startup(ctx: dict) -> None:
if settings.DATABASE_ON:
from fastfetchbot_shared.database.mongodb import init_mongodb

await init_mongodb(settings.MONGODB_URL)

@staticmethod
async def on_shutdown(ctx: dict) -> None:
if settings.DATABASE_ON:
from fastfetchbot_shared.database.mongodb import close_mongodb

await close_mongodb()
12 changes: 12 additions & 0 deletions apps/async-worker/async_worker/services/enrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ async def enrich(
metadata_item: dict,
store_telegraph: bool | None = None,
store_document: bool | None = None,
store_database: bool | None = None,
) -> dict:
"""Apply enrichment steps to a scraped metadata item.

Expand All @@ -20,6 +21,8 @@ async def enrich(
store_telegraph = settings.STORE_TELEGRAPH
if store_document is None:
store_document = settings.STORE_DOCUMENT
if store_database is None:
store_database = settings.DATABASE_ON

# Force Telegraph for long messages
if metadata_item.get("message_type") == MessageType.LONG:
Expand Down Expand Up @@ -62,5 +65,14 @@ async def enrich(
except Exception as e:
logger.error(f"Error exporting PDF: {e}")

# MongoDB persistence (versioned)
if store_database:
try:
from fastfetchbot_shared.database.mongodb.cache import save_metadata

await save_metadata(metadata_item)
except Exception as e:
logger.error(f"Error saving to MongoDB: {e}")

metadata_item["title"] = metadata_item["title"].strip()
return metadata_item
17 changes: 11 additions & 6 deletions apps/async-worker/async_worker/tasks/scrape.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ async def scrape_and_enrich(
bot_id: int | str | None = None,
store_telegraph: bool | None = None,
store_document: bool | None = None,
force_refresh_cache: bool = False,
**kwargs,
) -> dict:
"""ARQ task: scrape a URL, enrich the result, and push to the outbox.
Expand All @@ -36,6 +37,7 @@ async def scrape_and_enrich(
bot's outbox queue (``scrape:outbox:{bot_id}``).
store_telegraph: Override Telegraph publishing flag.
store_document: Override PDF export flag.
force_refresh_cache: If True, bypass the database cache and re-scrape.
**kwargs: Extra arguments passed to the scraper.
"""
if job_id is None:
Expand All @@ -52,18 +54,21 @@ async def scrape_and_enrich(
url_metadata=url_metadata,
store_telegraph=False, # We handle enrichment separately
store_document=False,
store_database=settings.DATABASE_ON,
database_cache_ttl=-1 if force_refresh_cache else settings.DATABASE_CACHE_TTL,
celery_app=celery_app,
timeout=settings.DOWNLOAD_VIDEO_TIMEOUT,
**kwargs,
)
metadata_item = await service.get_item()

# Enrich: Telegraph, PDF
metadata_item = await enrichment.enrich(
metadata_item,
store_telegraph=store_telegraph,
store_document=store_document,
)
# Skip enrichment if result came from cache
if not metadata_item.pop("_cached", False):
metadata_item = await enrichment.enrich(
metadata_item,
store_telegraph=store_telegraph,
store_document=store_document,
)

logger.info(f"[{job_id}] Scrape completed successfully")

Expand Down
Loading
Loading