Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Project Overview

FastFetchBot is a social media content fetching service built as a **UV workspace monorepo** with four microservices: a FastAPI server (API), a Telegram Bot client, a Celery worker for file operations, and an ARQ-based async worker for off-path scraping. It scrapes and archives content from various social media platforms including Twitter, Weibo, Xiaohongshu, Reddit, Bluesky, Instagram, Zhihu, Douban, YouTube, and Bilibili.
FastFetchBot is a social media content fetching service built as a **UV workspace monorepo** with four microservices: a FastAPI server (API), a Telegram Bot client, a Celery worker for file operations, and an ARQ-based async worker for off-path scraping and file_id persistence. It scrapes and archives content from various social media platforms including Twitter, Weibo, Xiaohongshu, Reddit, Bluesky, Instagram, Zhihu, Douban, YouTube, and Bilibili.

## Architecture

Expand Down Expand Up @@ -36,7 +36,7 @@ FastFetchBot/
├── apps/api/ # FastAPI server: enriched service, routing, storage
├── apps/telegram-bot/ # Telegram Bot: webhook/polling, message handling
├── apps/worker/ # Celery worker: sync file operations (video, PDF, audio)
├── apps/async-worker/ # ARQ async worker: off-path scraping + enrichment
├── apps/async-worker/ # ARQ async worker: off-path scraping + enrichment + file_id persistence
├── pyproject.toml # Root workspace configuration
└── uv.lock # Lockfile for the entire workspace
```
Expand Down Expand Up @@ -70,21 +70,27 @@ The Telegram Bot communicates with the API server over HTTP (`API_SERVER_URL`).
- **`api_client.py`** — HTTP client calling the API server
- **`queue_client.py`** — ARQ Redis client for enqueuing scrape jobs (queue mode)
- **`handlers/`** — `messages.py`, `buttons.py`, `url_process.py`, `commands.py` (start, /settings with inline toggles)
- **`services/`** — `bot_app.py`, `message_sender.py`, `user_settings.py` (get/toggle `auto_fetch_in_dm` and `force_refresh_cache`), `constants.py`
- **`services/`** — `bot_app.py`, `message_sender.py` (media packaging with file_id shortcut + background file_id capture), `file_id_capture.py` (extracts file_ids from sent messages, pushes to Redis for async worker persistence), `user_settings.py` (get/toggle `auto_fetch_in_dm` and `force_refresh_cache`), `constants.py`
- **`webhook/server.py`** — Webhook/polling server
- **`templates/`** — Jinja2 templates for bot messages

### Async Worker (`apps/async-worker/async_worker/`)

- **`main.py`** — ARQ worker entry point with `on_startup`/`on_shutdown` hooks for MongoDB and file_id consumer lifecycle
- **`config.py`** — `AsyncWorkerSettings` with MongoDB, Redis, and runtime flags (`file_id_consumer_ready`)
- **`services/file_id_consumer.py`** — Background Redis BRPOP consumer for `fileid:updates` queue. Receives file_id payloads from the Telegram bot, matches media URLs to the latest `Metadata` document in MongoDB, and persists `telegram_file_id` values. Lifecycle managed via `start()`/`stop()` during worker startup/shutdown when `DATABASE_ON` is true

### Shared Library (`packages/shared/fastfetchbot_shared/`)

- **`config.py`** — URL patterns (SOCIAL_MEDIA_WEBSITE_PATTERNS, VIDEO_WEBSITE_PATTERNS, BANNED_PATTERNS); shared env vars including `SIGN_SERVER_URL` and `XHS_COOKIE_PATH`
- **`models/`** — `classes.py` (NamedBytesIO), `metadata_item.py` (MediaFile, MetadataItem, MessageType), `telegraph_item.py`, `url_metadata.py`
- **`models/`** — `classes.py` (NamedBytesIO), `metadata_item.py` (MediaFile with optional `telegram_file_id` for Telegram file_id caching, MetadataItem, MessageType), `telegraph_item.py`, `url_metadata.py`
- **`utils/`** — `parse.py` (URL parsing, HTML processing, `get_env_bool`), `image.py`, `logger.py`, `network.py`, `cookie.py`
- **`database/`** — Dual database layer:
- **SQLAlchemy** (user settings): `base.py`, `engine.py`, `session.py`, `models/user_setting.py` — `UserSetting` model with `auto_fetch_in_dm` and `force_refresh_cache` toggles. Supports SQLite (dev) and PostgreSQL (prod) via `SETTINGS_DATABASE_URL`. Alembic migrations in `packages/shared/alembic/`
- **`database/mongodb/`** — Beanie ODM for scraped content persistence, shared across API and async worker:
- **`connection.py`** — `init_mongodb(mongodb_url, db_name)`, `close_mongodb()`, `save_instances()`. Parameterized — each app passes its own config at startup
- **`cache.py`** — MongoDB-backed URL cache: `find_cached(url, ttl_seconds)` returns the latest versioned document if within TTL (0 = never expire); `save_metadata(metadata_item)` auto-increments `version` for the same URL before inserting
- **`models/metadata.py`** — `Metadata(Document)` with fields: title, url, author, content, media_files, telegraph_url, timestamp, version, etc. `DatabaseMediaFile(MediaFile)` extends the scraper `MediaFile` dataclass with `file_key` for S3 storage. Compound index on `(url, version)` for efficient cache lookups. `@before_event(Insert)` hook auto-computes text lengths and converts `MediaFile` → `DatabaseMediaFile`
- **`models/metadata.py`** — `Metadata(Document)` with fields: title, url, author, content, media_files, telegraph_url, timestamp, version, etc. `DatabaseMediaFile(MediaFile)` extends the scraper `MediaFile` dataclass with `file_key` for S3 storage and inherits `telegram_file_id` for Telegram file_id caching. Compound index on `(url, version)` for efficient cache lookups. `@before_event(Insert)` hook auto-computes text lengths and converts `MediaFile` → `DatabaseMediaFile`. Custom `bson_encoders = {DatabaseMediaFile: asdict}` ensures proper BSON serialization of pydantic dataclasses
- **`__init__.py`** — Re-exports: `init_mongodb`, `close_mongodb`, `save_instances`, `find_cached`, `save_metadata`, `Metadata`, `DatabaseMediaFile`
- **`services/scrapers/`** — All platform scrapers, fully decoupled from FastAPI:
- **`config.py`** — All scraper env vars: platform credentials (Twitter, Bluesky, Weibo, XHS, Zhihu, Reddit, Instagram), Firecrawl/Zyte config, OpenAI key, Telegraph tokens, `JINJA2_ENV`, cookie file loading. Configurable `CONF_DIR` for cookie/config files
Expand Down Expand Up @@ -192,6 +198,13 @@ See `template.env` for a complete reference. Key variables:

MongoDB models and connection logic live in `packages/shared/fastfetchbot_shared/database/mongodb/`. Both the API server and async worker use the same shared ODM layer. The `Metadata` Beanie Document stores scraped content with versioning — each re-scrape of the same URL increments the `version` field. The cache system (`find_cached` / `save_metadata`) queries the latest version and checks TTL before deciding to re-scrape.

**Telegram file_id caching** — automatic when `DATABASE_ON` is true and `SCRAPE_MODE` is `queue`:
- When the bot sends media to users, it extracts Telegram `file_id` values from the `send_media_group` response
- File_ids are pushed to Redis queue `fileid:updates` via the bot's `file_id_capture` module (fire-and-forget background task)
- The async worker's `file_id_consumer` processes the queue and persists file_ids to the corresponding `Metadata.media_files[*].telegram_file_id` in MongoDB
- On subsequent cache hits, `media_files_packaging` uses stored file_ids directly via `InputMediaPhoto(file_id)` etc., skipping HTTP download entirely
- The bot has no direct MongoDB access — all database writes go through the async worker via Redis

**SQLite/PostgreSQL** (user settings — always enabled for the Telegram bot):
| Variable | Default | Description |
|----------|---------|-------------|
Expand Down
3 changes: 3 additions & 0 deletions apps/async-worker/async_worker/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ class AsyncWorkerSettings(BaseSettings):
# Timeout
DOWNLOAD_VIDEO_TIMEOUT: int = 600

# Runtime flag (not loaded from env; set at startup)
file_id_consumer_ready: bool = False

@model_validator(mode="after")
def _resolve_derived(self) -> "AsyncWorkerSettings":
if not self.MONGODB_URL:
Expand Down
10 changes: 10 additions & 0 deletions apps/async-worker/async_worker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,18 @@ async def on_startup(ctx: dict) -> None:

await init_mongodb(settings.MONGODB_URL)

from async_worker.services import file_id_consumer

await file_id_consumer.start()
settings.file_id_consumer_ready = True

@staticmethod
async def on_shutdown(ctx: dict) -> None:
if settings.file_id_consumer_ready:
from async_worker.services import file_id_consumer

await file_id_consumer.stop()

if settings.DATABASE_ON:
from fastfetchbot_shared.database.mongodb import close_mongodb

Expand Down
141 changes: 141 additions & 0 deletions apps/async-worker/async_worker/services/file_id_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import asyncio
import json

import redis.asyncio as aioredis

from async_worker.config import settings
from fastfetchbot_shared.utils.logger import logger

FILEID_QUEUE_KEY = "fileid:updates"
FILEID_DLQ_KEY = "fileid:updates:dlq"
_MAX_RETRIES = 3

_redis: aioredis.Redis | None = None
_consumer_task: asyncio.Task | None = None


async def _get_redis() -> aioredis.Redis:
global _redis
if _redis is None:
_redis = aioredis.from_url(settings.OUTBOX_REDIS_URL, decode_responses=True)
return _redis


async def _consume_loop() -> None:
"""Background loop: BRPOP from the file_id updates queue and persist to MongoDB."""
r = await _get_redis()
logger.info(f"file_id consumer started, listening on '{FILEID_QUEUE_KEY}'")

while True:
raw_payload = None
try:
result = await r.brpop(FILEID_QUEUE_KEY, timeout=0)
if result is None:
continue

_, raw_payload = result
payload = json.loads(raw_payload)
await _process_file_id_update(payload)

except asyncio.CancelledError:
# Shutdown requested — requeue unprocessed payload so it isn't lost.
if raw_payload is not None:
try:
await r.lpush(FILEID_QUEUE_KEY, raw_payload)
logger.info("Requeued in-flight payload before shutdown")
except Exception:
logger.warning(f"Failed to requeue payload on shutdown: {raw_payload}")
logger.info("file_id consumer cancelled, shutting down")
break
except json.JSONDecodeError as e:
# Permanent failure — payload is malformed, send to dead-letter queue.
logger.error(f"Malformed JSON in file_id queue, moving to DLQ: {e}")
try:
await r.lpush(FILEID_DLQ_KEY, raw_payload)
except Exception:
logger.warning(f"Failed to push to DLQ: {raw_payload}")
except Exception as e:
# Transient failure (DB unavailable, network blip, etc.) — requeue
# the payload so it can be retried on the next loop iteration.
logger.error(f"file_id consumer error, requeuing payload: {e}")
if raw_payload is not None:
retry_count = 0
try:
payload = json.loads(raw_payload)
retry_count = payload.get("_retry_count", 0)
except (json.JSONDecodeError, TypeError):
pass

if retry_count < _MAX_RETRIES:
try:
# Stamp retry count so we can detect repeated failures.
payload["_retry_count"] = retry_count + 1
await r.lpush(FILEID_QUEUE_KEY, json.dumps(payload, ensure_ascii=False))
except Exception:
logger.warning(f"Failed to requeue payload: {raw_payload}")
else:
logger.error(f"Max retries ({_MAX_RETRIES}) exceeded, moving to DLQ: {raw_payload}")
try:
await r.lpush(FILEID_DLQ_KEY, raw_payload)
except Exception:
logger.warning(f"Failed to push to DLQ: {raw_payload}")
await asyncio.sleep(1)


async def _process_file_id_update(payload: dict) -> None:
"""Update the latest Metadata document with telegram file_ids."""
from fastfetchbot_shared.database.mongodb.models.metadata import Metadata

metadata_url = payload.get("metadata_url", "")
updates = payload.get("file_id_updates", [])

if not metadata_url or not updates:
logger.warning(f"Invalid file_id update payload: {payload}")
return

doc = await Metadata.find(
Metadata.url == metadata_url
).sort("-version").limit(1).first_or_none()

if doc is None or not doc.media_files:
logger.warning(f"No metadata found for file_id update: {metadata_url}")
return

matched = 0
for update in updates:
for mf in doc.media_files:
if mf.url == update["url"] and mf.telegram_file_id is None:
mf.telegram_file_id = update["telegram_file_id"]
matched += 1

if matched > 0:
await doc.save()
logger.info(f"Updated {matched}/{len(updates)} file_ids for {metadata_url}")


async def start() -> None:
"""Start the file_id consumer as a background asyncio task."""
global _consumer_task
if _consumer_task is not None:
logger.warning("file_id consumer already running")
return
_consumer_task = asyncio.create_task(_consume_loop())
logger.info("file_id consumer task created")
Comment on lines +116 to +123
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# First, locate the file and check if it exists
find . -name "file_id_consumer.py" -type f

Repository: aturret/FastFetchBot

Length of output: 125


🏁 Script executed:

# List the repository structure to understand the project layout
ls -la apps/async-worker/ 2>/dev/null || ls -la . | head -20

Repository: aturret/FastFetchBot

Length of output: 360


🏁 Script executed:

# Read the full file to understand the implementation
cat -n ./apps/async-worker/async_worker/services/file_id_consumer.py

Repository: aturret/FastFetchBot

Length of output: 3864


🏁 Script executed:

# Search for calls to start() and stop() functions
rg "file_id_consumer\.(start|stop)" --type py -B 2 -A 2

Repository: aturret/FastFetchBot

Length of output: 755


🏁 Script executed:

# Search for imports of file_id_consumer
rg "from.*file_id_consumer|import.*file_id_consumer" --type py -B 1 -A 1

Repository: aturret/FastFetchBot

Length of output: 4514


🏁 Script executed:

# Read main.py to understand the startup/shutdown context
cat -n ./apps/async-worker/async_worker/main.py

Repository: aturret/FastFetchBot

Length of output: 3159


🏁 Script executed:

# Check test file for any failure patterns or CI issues
cat -n ./tests/unit/async_worker/test_file_id_consumer.py

Repository: aturret/FastFetchBot

Length of output: 16256


🏁 Script executed:

# Check if there are any other event loop management patterns in the codebase
rg "asyncio\.(new_event_loop|set_event_loop|get_event_loop)" --type py -B 2 -A 2

Repository: aturret/FastFetchBot

Length of output: 507


🏁 Script executed:

# Look for any CI configuration that might show loop-related failures
find . -name "*.yml" -o -name "*.yaml" -o -name ".github" -type d | head -20

Repository: aturret/FastFetchBot

Length of output: 202


🏁 Script executed:

# Check for any README or error logs mentioning loop errors
find . -name "README*" -o -name "*.md" | xargs grep -l "event loop\|different loop\|await wasn't used" 2>/dev/null || echo "No matches found"

Repository: aturret/FastFetchBot

Length of output: 80


🏁 Script executed:

# Check CI workflow files for any test failures or patterns related to event loops
cat -n ./.github/workflows/ci.yml

Repository: aturret/FastFetchBot

Length of output: 4565


🏁 Script executed:

# Check if there are any git commits mentioning "different loop" or event loop issues
git log --all --oneline --grep="loop\|event" 2>/dev/null | head -20 || echo "Git log not available"

Repository: aturret/FastFetchBot

Length of output: 46


🏁 Script executed:

# Look for any existing issues or TODOs in the code mentioning the problem
rg "TODO|FIXME|BUG|XXX" apps/async-worker/async_worker/services/file_id_consumer.py -B 1 -A 1

Repository: aturret/FastFetchBot

Length of output: 46


🏁 Script executed:

# Verify the exact behavior: check if ARQ Worker guarantees same event loop for startup/shutdown
rg "on_startup|on_shutdown|WorkerSettings" apps/async-worker/ -A 3 -B 1 --type py

Repository: aturret/FastFetchBot

Length of output: 2066


🏁 Script executed:

# Check exception handling pattern in rest of codebase
rg "logger\.(error|exception)\(" apps/async-worker/ --type py | head -15

Repository: aturret/FastFetchBot

Length of output: 712


Await Redis readiness before marking consumer ready, and fix exception logging.

start() only schedules _consume_loop() without awaiting, so the ready flag is set (line 70 in main.py) before the Redis connection actually completes (line 24 in file_id_consumer.py). This allows callers to treat the consumer as initialized before it's truly ready. Have start() await a readiness check to ensure Redis is connected before returning.

Additionally, line 41 should use logger.exception() instead of logger.error() to capture the full traceback, and the generic Exception should be replaced with a more specific typed exception per guidelines.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/async-worker/async_worker/services/file_id_consumer.py` around lines 76
- 83, start() currently only schedules _consume_loop() and returns before Redis
is connected; modify start() to await a Redis-readiness check before returning
(e.g., implement or call an awaitable like _wait_for_redis_ready() or await the
Redis client's connection/ready method used in this module) and only then
create/assign _consumer_task or mark the consumer ready so callers see a truly
initialized consumer; also replace the generic Exception catch in the consumer
connection logic with the Redis client's specific exception type (e.g.,
RedisError/ConnectionError used by your Redis lib) and change logger.error(...)
to logger.exception(...) to include the traceback (update the handler around the
Redis connection in _consume_loop()/connection routine accordingly).



async def stop() -> None:
"""Stop the file_id consumer and close the Redis connection."""
global _consumer_task, _redis

if _consumer_task is not None:
_consumer_task.cancel()
try:
await _consumer_task
except asyncio.CancelledError:
pass
_consumer_task = None
logger.info("file_id consumer task stopped")

if _redis is not None:
await _redis.aclose()
_redis = None
78 changes: 78 additions & 0 deletions apps/telegram-bot/core/services/file_id_capture.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import json
from typing import Optional

import redis.asyncio as aioredis
from telegram import Message

from core.config import settings
from fastfetchbot_shared.utils.logger import logger

FILEID_QUEUE_KEY = "fileid:updates"

_redis: aioredis.Redis | None = None


async def _get_redis() -> aioredis.Redis:
global _redis
if _redis is None:
_redis = aioredis.from_url(settings.OUTBOX_REDIS_URL, decode_responses=True)
return _redis


def extract_file_id(message: Message, media_type: str) -> Optional[str]:
"""Extract the telegram file_id from a sent Message based on media type."""
if media_type == "image" and message.photo:
return message.photo[-1].file_id
elif media_type == "video" and message.video:
return message.video.file_id
elif media_type == "gif" and message.animation:
return message.animation.file_id
elif media_type == "audio" and message.audio:
return message.audio.file_id
elif media_type == "document" and message.document:
return message.document.file_id
return None


async def capture_and_push_file_ids(
uncached_info: list[dict],
sent_messages: tuple[Message, ...],
metadata_url: str,
) -> None:
"""Extract file_ids from sent messages and push updates to Redis for the async worker.

Args:
uncached_info: list of {"url": str, "media_type": str} for items that were
downloaded (not served from cached file_id). Parallel to sent_messages —
each entry corresponds to a message at the same position.
sent_messages: tuple of Message objects returned by send_media_group.
metadata_url: the original scraped page URL, used as the key for MongoDB lookup.
"""
file_id_updates = []

for i, info in enumerate(uncached_info):
if info is None:
continue
if i >= len(sent_messages):
break
file_id = extract_file_id(sent_messages[i], info["media_type"])
if file_id:
file_id_updates.append({
"url": info["url"],
"media_type": info["media_type"],
"telegram_file_id": file_id,
})

if not file_id_updates:
return

try:
r = await _get_redis()
payload = json.dumps({
"metadata_url": metadata_url,
"file_id_updates": file_id_updates,
}, ensure_ascii=False)
await r.lpush(FILEID_QUEUE_KEY, payload)
logger.info(f"Pushed {len(file_id_updates)} file_id updates for {metadata_url}")
except Exception:
logger.warning(f"Failed to push file_id updates for {metadata_url}")
Comment on lines +77 to +78
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Exception handler should log exception details.

The bare except Exception catches all errors but logger.warning() doesn't capture the exception traceback or message, making debugging difficult. Per coding guidelines, use logger.exception() to include the full traceback, or at minimum log the exception details.

Proposed fix
-    except Exception:
-        logger.warning(f"Failed to push file_id updates for {metadata_url}")
+    except Exception as e:
+        logger.exception(f"Failed to push file_id updates for {metadata_url}: {e}")

Using logger.exception() automatically includes the traceback at ERROR level. If you prefer to keep it at WARNING level:

-    except Exception:
-        logger.warning(f"Failed to push file_id updates for {metadata_url}")
+    except Exception as e:
+        logger.warning(f"Failed to push file_id updates for {metadata_url}: {e}", exc_info=True)

As per coding guidelines: "Never use print() or traceback.print_exc() — always use logger.exception() (includes traceback) or logger.error() (message only)".

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except Exception:
logger.warning(f"Failed to push file_id updates for {metadata_url}")
except Exception as e:
logger.exception(f"Failed to push file_id updates for {metadata_url}: {e}")
🧰 Tools
🪛 Ruff (0.15.9)

[warning] 77-77: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/telegram-bot/core/services/file_id_capture.py` around lines 77 - 78, The
except block in file_id_capture.py that currently does "except Exception:
logger.warning(f'Failed to push file_id updates for {metadata_url}')" should be
changed to log the exception details; modify the handler in the function/method
surrounding the try/except that references metadata_url (the block that pushes
file_id updates) to call logger.exception(...) (or logger.error(...,
exc_info=True)) instead of logger.warning so the exception message and traceback
are recorded. Ensure the log message keeps the metadata_url context (e.g.,
"Failed to push file_id updates for {metadata_url}") while using
logger.exception or passing exc_info=True.

Loading
Loading