Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,23 @@ GitHub Actions (`.github/workflows/ci.yml`) builds and pushes all four images on
6. Add platform-specific router in `apps/api/src/routers/` (if API endpoints are needed)
7. Add any new pip dependencies to `packages/shared/pyproject.toml` under `[project.optional-dependencies] scrapers`

### Exception Handling
- **Custom exceptions** are defined in `packages/shared/fastfetchbot_shared/exceptions.py`:
- `FastFetchBotError` — base for all domain errors
- `ScraperError` / `ScraperNetworkError` / `ScraperParseError` — scraper failures
- `TelegraphPublishError` — Telegraph publishing failures
- `FileExportError` — file export (PDF, video, audio) failures
- `ExternalServiceError` — external service call failures (OpenAI, Firecrawl, Zyte, XHS sign server, etc.)
- **Always use typed exceptions** instead of generic `RuntimeError`, `ValueError`, or `Exception` for domain errors. Pick the most specific subclass that fits.
- **Use `from e` chaining** when wrapping exceptions: `raise ScraperError("message") from e`
- **Boundary-level handlers** catch exceptions at service boundaries:
- FastAPI: global `@app.exception_handler(FastFetchBotError)` returns 502, generic `Exception` returns 500
- Telegram bot: `error_process` handler catches handler exceptions; webhook server protects endpoints
- Celery/ARQ workers: existing task-level try/catch with outbox error push
- **Never use `print()` or `traceback.print_exc()`** — always use `logger.exception()` (includes traceback) or `logger.error()` (message only)
- **Never silently swallow exceptions** — if catching an exception, either re-raise it or handle it explicitly with logging. Do not return `None` or empty data on failure.
- **Fail fast after fallback chains** — scrapers may try multiple methods/APIs, but must raise a typed error when all fallbacks are exhausted

### Key Conventions
- **`packages/shared/` (`fastfetchbot-shared`)** is for shared async logic — scrapers, templates, Telegraph, and async Celery task wrappers (file_export). Most code here is async and reusable across apps
- **`packages/file-export/` (`fastfetchbot-file-export`)** is exclusively for synchronous Celery worker jobs — the heavy I/O operations that run inside the Celery worker process (yt-dlp video download, WeasyPrint PDF generation, OpenAI audio transcription). Apps never import this package directly; they use the async wrappers in `fastfetchbot_shared.services.file_export` which submit tasks to the Celery worker
Expand Down
21 changes: 19 additions & 2 deletions apps/api/src/main.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import sentry_sdk

from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from contextlib import asynccontextmanager
from starlette.middleware.base import BaseHTTPMiddleware

from src import database
from src.routers import inoreader, scraper_routers, scraper
from src.config import settings
from fastfetchbot_shared.utils.logger import logger
from fastfetchbot_shared.exceptions import FastFetchBotError

SENTRY_DSN = ""

Expand Down Expand Up @@ -38,12 +40,27 @@ def __init__(self, app):

async def dispatch(self, request: Request, call_next):
logger.info(f"{request.method} {request.url}")
response = await call_next(request)
return response
try:
response = await call_next(request)
return response
except Exception:
logger.exception(f"Unhandled error during {request.method} {request.url}")
raise


def create_app():
fastapi_app = FastAPI(lifespan=lifespan)

@fastapi_app.exception_handler(FastFetchBotError)
async def fastfetchbot_error_handler(request: Request, exc: FastFetchBotError):
logger.error(f"Domain error on {request.method} {request.url}: {exc}")
return JSONResponse(status_code=502, content={"error": str(exc)})

@fastapi_app.exception_handler(Exception)
async def generic_error_handler(request: Request, exc: Exception):
logger.exception(f"Unhandled error on {request.method} {request.url}")
return JSONResponse(status_code=500, content={"error": "Internal server error"})

fastapi_app.add_middleware(LogMiddleware)
fastapi_app.include_router(inoreader.router)
fastapi_app.include_router(scraper.router)
Expand Down
44 changes: 29 additions & 15 deletions apps/telegram-bot/core/services/message_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,18 +163,20 @@ async def send_item_message(
else False,
disable_notification=True,
)
except Exception as e:
logger.error(e)
traceback.print_exc()
except Exception:
logger.exception("Failed to send item message")
await send_debug_channel(traceback.format_exc())


async def send_debug_channel(message: str) -> None:
import html as html_module
from core.config import TELEBOT_DEBUG_CHANNEL
application = _get_application()
if TELEBOT_DEBUG_CHANNEL is not None:
await application.bot.send_message(
chat_id=TELEBOT_DEBUG_CHANNEL, text=message, parse_mode=ParseMode.HTML
chat_id=TELEBOT_DEBUG_CHANNEL,
text=html_module.escape(message),
parse_mode=ParseMode.HTML,
)


Expand Down Expand Up @@ -234,9 +236,13 @@ async def media_files_packaging(media_files: list, data: dict) -> tuple:
"https",
]: # if the url is a http url, download the file
file_format = "mp4" if media_item["media_type"] == "video" else None
io_object = await download_file_by_metadata_item(
media_item["url"], data=data, file_format=file_format
)
try:
io_object = await download_file_by_metadata_item(
media_item["url"], data=data, file_format=file_format
)
except Exception:
logger.warning(f"Skipping media download: {media_item['url']}")
continue
filename = io_object.name
file_size = io_object.size
else: # if the url is a local file path, just add it to the media group
Expand Down Expand Up @@ -300,9 +306,13 @@ async def media_files_packaging(media_files: list, data: dict) -> tuple:
or img_width > settings.TELEGRAM_IMAGE_DIMENSION_LIMIT
or img_height > settings.TELEGRAM_IMAGE_DIMENSION_LIMIT
) and data["category"] not in ["xiaohongshu"]:
io_object = await download_file_by_metadata_item(
url=image_url, data=data
)
try:
io_object = await download_file_by_metadata_item(
url=image_url, data=data
)
except Exception:
logger.warning(f"Skipping document download: {image_url}")
continue
if not io_object.name.endswith(".gif"):
if not io_object.name.endswith(ext.lower()):
io_object.name = io_object.name + "." + ext.lower()
Expand All @@ -312,11 +322,15 @@ async def media_files_packaging(media_files: list, data: dict) -> tuple:
)
file_counter += 1
elif media_item["media_type"] == "gif":
io_object = await download_file_by_metadata_item(
url=media_item["url"],
data=data,
file_name="gif_image-" + str(media_counter) + ".gif",
)
try:
io_object = await download_file_by_metadata_item(
url=media_item["url"],
data=data,
file_name="gif_image-" + str(media_counter) + ".gif",
)
except Exception:
logger.warning(f"Skipping gif download: {media_item['url']}")
continue
io_object.name = io_object.name + ".gif"
media_group.append(InputMediaAnimation(io_object))
elif media_item["media_type"] == "video":
Expand Down
19 changes: 4 additions & 15 deletions apps/telegram-bot/core/services/outbox_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import redis.asyncio as aioredis

from core.config import settings
from core.services.message_sender import send_item_message
from core.services.message_sender import send_item_message, send_debug_channel
from fastfetchbot_shared.utils.logger import logger

_redis: aioredis.Redis | None = None
Expand Down Expand Up @@ -42,7 +42,9 @@ async def _consume_loop() -> None:

if error:
logger.warning(f"[{job_id}] Scrape failed: {error}")
await _send_error_to_chat(chat_id, error)
await send_debug_channel(
f"[Scrape Error] job_id={job_id}\nchat_id: {chat_id}\n\n{error}"
)
else:
Comment on lines 43 to 48
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify producer intent vs consumer behavior for scrape errors.
rg -n -C2 "Push error to outbox so the bot can notify the user|error=str\\(e\\)|chat_id=chat_id" apps/async-worker/async_worker/tasks/scrape.py
rg -n -C4 "if error:|send_debug_channel|send_item_message" apps/telegram-bot/core/services/outbox_consumer.py

Repository: aturret/FastFetchBot

Length of output: 1708


🏁 Script executed:

fd message_sender.py

Repository: aturret/FastFetchBot

Length of output: 113


🏁 Script executed:

cat -n apps/telegram-bot/core/services/message_sender.py

Repository: aturret/FastFetchBot

Length of output: 19294


Error payloads are no longer delivered to the originating user chat.

Line 43–48 now routes scrape failures only to the debug channel. The producer at apps/async-worker/async_worker/tasks/scrape.py:85 explicitly documents that errors are pushed to outbox "so the bot can notify the user," but the consumer never calls send_item_message() to deliver the error to the originating chat_id. This silently fails user flows where the requester never receives a failure response.

Additionally, the raw error string (from str(e)) is sent to send_debug_channel() which uses ParseMode.HTML without escaping, risking malformed messages if the exception text contains HTML-unsafe characters like <, >, or &.

metadata_item = payload.get("metadata_item")
if metadata_item and chat_id:
Expand All @@ -63,19 +65,6 @@ async def _consume_loop() -> None:
await asyncio.sleep(1)


async def _send_error_to_chat(chat_id: int | str, error: str) -> None:
"""Send an error notification to the user's chat."""
try:
from core.services.bot_app import application

await application.bot.send_message(
chat_id=chat_id,
text=f"Sorry, an error occurred while processing your request:\n\n{error}",
)
except Exception as e:
logger.error(f"Failed to send error message to chat {chat_id}: {e}")


async def start(bot_id: int) -> None:
"""Start the outbox consumer as a background asyncio task.

Expand Down
33 changes: 24 additions & 9 deletions apps/telegram-bot/core/webhook/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,39 @@ async def lifespan(app):
await database.shutdown()


def _log_task_exception(task: asyncio.Task):
"""Callback for fire-and-forget tasks to ensure exceptions are logged."""
if not task.cancelled() and task.exception():
logger.exception("Unhandled error in background task", exc_info=task.exception())


async def telegram_webhook(request: Request):
secret = request.headers.get("X-Telegram-Bot-Api-Secret-Token")
if secret != settings.TELEGRAM_BOT_SECRET_TOKEN:
return JSONResponse({"error": "unauthorized"}, status_code=401)
data = await request.json()
try:
data = await request.json()
except Exception:
logger.exception("Failed to parse webhook request body")
return JSONResponse({"error": "invalid JSON"}, status_code=400)
logger.debug(f"Telegram webhook update received: {data.get('update_id', 'unknown')}")
asyncio.create_task(process_telegram_update(data))
task = asyncio.create_task(process_telegram_update(data))
task.add_done_callback(_log_task_exception)
return JSONResponse({"status": "ok"})


async def send_message_endpoint(request: Request):
data = await request.json()
metadata_item = data["data"]
chat_id = data.get("chat_id")
if isinstance(chat_id, str) and chat_id.startswith("-"):
chat_id = int(chat_id)
await send_item_message(metadata_item, chat_id=chat_id)
return JSONResponse({"status": "ok"})
try:
data = await request.json()
metadata_item = data["data"]
chat_id = data.get("chat_id")
if isinstance(chat_id, str) and chat_id.startswith("-"):
chat_id = int(chat_id)
await send_item_message(metadata_item, chat_id=chat_id)
return JSONResponse({"status": "ok"})
except Exception:
logger.exception("Failed to handle send_message request")
return JSONResponse({"error": "Internal server error"}, status_code=500)


async def health(request: Request):
Expand Down
22 changes: 14 additions & 8 deletions packages/file-export/fastfetchbot_file_export/pdf_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from weasyprint import HTML, CSS
from weasyprint.text.fonts import FontConfiguration
from loguru import logger
from fastfetchbot_shared.exceptions import FileExportError

CSS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "pdf_export.css")

Expand All @@ -19,7 +21,7 @@ def convert_html_to_pdf(
elif html_string:
html_item = HTML(string=html_string)
else:
raise ValueError("Either html_string or html_file must be provided")
raise FileExportError("Either html_string or html_file must be provided")
html_item.write_pdf(output_filename, stylesheets=[css_item])


Expand All @@ -30,10 +32,14 @@ def export_pdf(
download_dir: str = "/tmp",
) -> str:
"""Export HTML to PDF and return the output file path."""
output_path = os.path.join(download_dir, output_filename)
convert_html_to_pdf(
output_filename=output_path,
html_string=html_string,
html_file=html_file,
)
return output_path
try:
output_path = os.path.join(download_dir, output_filename)
convert_html_to_pdf(
output_filename=output_path,
html_string=html_string,
html_file=html_file,
)
return output_path
except Exception as e:
logger.exception(f"PDF export failed for {output_filename}")
raise FileExportError(f"PDF export failed for {output_filename}") from e
79 changes: 42 additions & 37 deletions packages/file-export/fastfetchbot_file_export/transcribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pydub import AudioSegment
from openai import OpenAI
from loguru import logger
from fastfetchbot_shared.exceptions import FileExportError

TRANSCRIBE_MODEL = "whisper-1"
SEGMENT_LENGTH = 5 * 60 # 5 minutes in seconds
Expand Down Expand Up @@ -74,40 +75,44 @@ def get_audio_text(audio_file: str, openai_api_key: str) -> str:

Returns formatted string with summary and full transcript.
"""
client = OpenAI(api_key=openai_api_key)
transcript = ""
AudioSegment.converter = "ffmpeg"
audio_file_non_ext, audio_file_ext = os.path.splitext(audio_file)
ext = audio_file_ext.lstrip(".")
audio_item = AudioSegment.from_file(audio_file, ext)
start_trim = milliseconds_until_sound(audio_item)
audio_item = audio_item[start_trim:]
audio_length = int(audio_item.duration_seconds) + 1

for index, i in enumerate(range(0, audio_length * 1000, SEGMENT_LENGTH * 1000)):
start_time = i
end_time = i + SEGMENT_LENGTH * 1000
if end_time >= audio_length * 1000:
audio_segment = audio_item[start_time:]
else:
audio_segment = audio_item[start_time:end_time]

segment_path = f"{audio_file_non_ext}-{index + 1}{audio_file_ext}"
audio_segment.export(segment_path)
logger.info(f"audio_segment_path: {segment_path}")

with open(segment_path, "rb") as f:
result = client.audio.transcriptions.create(
model=TRANSCRIBE_MODEL, file=f
)
transcript += result.text

os.remove(segment_path)

transcript = punctuation_assistant(client, transcript)
transcript = (
f"全文总结:\n{summary_assistant(client, transcript)}\n原文:\n{transcript}"
)
logger.info(f"transcript: {transcript}")
os.remove(audio_file)
return transcript
try:
client = OpenAI(api_key=openai_api_key)
transcript = ""
AudioSegment.converter = "ffmpeg"
audio_file_non_ext, audio_file_ext = os.path.splitext(audio_file)
ext = audio_file_ext.lstrip(".")
audio_item = AudioSegment.from_file(audio_file, ext)
start_trim = milliseconds_until_sound(audio_item)
audio_item = audio_item[start_trim:]
audio_length = int(audio_item.duration_seconds) + 1

for index, i in enumerate(range(0, audio_length * 1000, SEGMENT_LENGTH * 1000)):
start_time = i
end_time = i + SEGMENT_LENGTH * 1000
if end_time >= audio_length * 1000:
audio_segment = audio_item[start_time:]
else:
audio_segment = audio_item[start_time:end_time]

segment_path = f"{audio_file_non_ext}-{index + 1}{audio_file_ext}"
audio_segment.export(segment_path)
logger.info(f"audio_segment_path: {segment_path}")

with open(segment_path, "rb") as f:
result = client.audio.transcriptions.create(
model=TRANSCRIBE_MODEL, file=f
)
transcript += result.text

os.remove(segment_path)

transcript = punctuation_assistant(client, transcript)
transcript = (
f"全文总结:\n{summary_assistant(client, transcript)}\n原文:\n{transcript}"
)
logger.info(f"transcript: {transcript}")
os.remove(audio_file)
return transcript
except Exception as e:
logger.exception(f"Audio transcription failed for {audio_file}")
raise FileExportError(f"Audio transcription failed for {audio_file}") from e
10 changes: 5 additions & 5 deletions packages/file-export/fastfetchbot_file_export/video_download.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import os
import traceback

from loguru import logger
from yt_dlp import YoutubeDL
from fastfetchbot_shared.exceptions import FileExportError


def get_video_orientation(content_info: dict, extractor: str) -> str:
Expand Down Expand Up @@ -39,7 +39,7 @@ def get_format_for_orientation(
if hd and bilibili_cookie:
return "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best"
return "bv*[height<=480]+ba/b[height<=480] / wv*+ba/w"
raise ValueError("no available extractor found")
raise FileExportError("no available extractor found")


def init_yt_downloader(
Expand Down Expand Up @@ -201,6 +201,6 @@ def download_video(
"orientation": orientation,
"file_path": file_path_output,
}
except Exception:
logger.exception(f"download_video failed: url={url}\n{traceback.format_exc()}")
raise
except Exception as e:
logger.exception(f"download_video failed: url={url}")
raise FileExportError(f"download_video failed: url={url}") from e
Loading
Loading