-
Notifications
You must be signed in to change notification settings - Fork 4
feat: Telegram file id update #74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,141 @@ | ||
| import asyncio | ||
| import json | ||
|
|
||
| import redis.asyncio as aioredis | ||
|
|
||
| from async_worker.config import settings | ||
| from fastfetchbot_shared.utils.logger import logger | ||
|
|
||
| FILEID_QUEUE_KEY = "fileid:updates" | ||
| FILEID_DLQ_KEY = "fileid:updates:dlq" | ||
| _MAX_RETRIES = 3 | ||
|
|
||
| _redis: aioredis.Redis | None = None | ||
| _consumer_task: asyncio.Task | None = None | ||
|
|
||
|
|
||
| async def _get_redis() -> aioredis.Redis: | ||
| global _redis | ||
| if _redis is None: | ||
| _redis = aioredis.from_url(settings.OUTBOX_REDIS_URL, decode_responses=True) | ||
| return _redis | ||
|
|
||
|
|
||
| async def _consume_loop() -> None: | ||
| """Background loop: BRPOP from the file_id updates queue and persist to MongoDB.""" | ||
| r = await _get_redis() | ||
| logger.info(f"file_id consumer started, listening on '{FILEID_QUEUE_KEY}'") | ||
|
|
||
| while True: | ||
| raw_payload = None | ||
| try: | ||
| result = await r.brpop(FILEID_QUEUE_KEY, timeout=0) | ||
| if result is None: | ||
| continue | ||
|
|
||
| _, raw_payload = result | ||
| payload = json.loads(raw_payload) | ||
| await _process_file_id_update(payload) | ||
|
|
||
| except asyncio.CancelledError: | ||
| # Shutdown requested — requeue unprocessed payload so it isn't lost. | ||
| if raw_payload is not None: | ||
| try: | ||
| await r.lpush(FILEID_QUEUE_KEY, raw_payload) | ||
| logger.info("Requeued in-flight payload before shutdown") | ||
| except Exception: | ||
| logger.warning(f"Failed to requeue payload on shutdown: {raw_payload}") | ||
| logger.info("file_id consumer cancelled, shutting down") | ||
| break | ||
| except json.JSONDecodeError as e: | ||
| # Permanent failure — payload is malformed, send to dead-letter queue. | ||
| logger.error(f"Malformed JSON in file_id queue, moving to DLQ: {e}") | ||
| try: | ||
| await r.lpush(FILEID_DLQ_KEY, raw_payload) | ||
| except Exception: | ||
| logger.warning(f"Failed to push to DLQ: {raw_payload}") | ||
| except Exception as e: | ||
| # Transient failure (DB unavailable, network blip, etc.) — requeue | ||
| # the payload so it can be retried on the next loop iteration. | ||
| logger.error(f"file_id consumer error, requeuing payload: {e}") | ||
| if raw_payload is not None: | ||
| retry_count = 0 | ||
| try: | ||
| payload = json.loads(raw_payload) | ||
| retry_count = payload.get("_retry_count", 0) | ||
| except (json.JSONDecodeError, TypeError): | ||
| pass | ||
|
|
||
| if retry_count < _MAX_RETRIES: | ||
| try: | ||
| # Stamp retry count so we can detect repeated failures. | ||
| payload["_retry_count"] = retry_count + 1 | ||
| await r.lpush(FILEID_QUEUE_KEY, json.dumps(payload, ensure_ascii=False)) | ||
| except Exception: | ||
| logger.warning(f"Failed to requeue payload: {raw_payload}") | ||
| else: | ||
| logger.error(f"Max retries ({_MAX_RETRIES}) exceeded, moving to DLQ: {raw_payload}") | ||
| try: | ||
| await r.lpush(FILEID_DLQ_KEY, raw_payload) | ||
| except Exception: | ||
| logger.warning(f"Failed to push to DLQ: {raw_payload}") | ||
| await asyncio.sleep(1) | ||
|
|
||
|
|
||
| async def _process_file_id_update(payload: dict) -> None: | ||
| """Update the latest Metadata document with telegram file_ids.""" | ||
| from fastfetchbot_shared.database.mongodb.models.metadata import Metadata | ||
|
|
||
| metadata_url = payload.get("metadata_url", "") | ||
| updates = payload.get("file_id_updates", []) | ||
|
|
||
| if not metadata_url or not updates: | ||
| logger.warning(f"Invalid file_id update payload: {payload}") | ||
| return | ||
|
|
||
| doc = await Metadata.find( | ||
| Metadata.url == metadata_url | ||
| ).sort("-version").limit(1).first_or_none() | ||
|
|
||
| if doc is None or not doc.media_files: | ||
| logger.warning(f"No metadata found for file_id update: {metadata_url}") | ||
| return | ||
|
|
||
| matched = 0 | ||
| for update in updates: | ||
| for mf in doc.media_files: | ||
| if mf.url == update["url"] and mf.telegram_file_id is None: | ||
| mf.telegram_file_id = update["telegram_file_id"] | ||
| matched += 1 | ||
|
|
||
| if matched > 0: | ||
| await doc.save() | ||
| logger.info(f"Updated {matched}/{len(updates)} file_ids for {metadata_url}") | ||
|
|
||
|
|
||
| async def start() -> None: | ||
| """Start the file_id consumer as a background asyncio task.""" | ||
| global _consumer_task | ||
| if _consumer_task is not None: | ||
| logger.warning("file_id consumer already running") | ||
| return | ||
| _consumer_task = asyncio.create_task(_consume_loop()) | ||
| logger.info("file_id consumer task created") | ||
|
Comment on lines
+116
to
+123
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: # First, locate the file and check if it exists
find . -name "file_id_consumer.py" -type fRepository: aturret/FastFetchBot Length of output: 125 🏁 Script executed: # List the repository structure to understand the project layout
ls -la apps/async-worker/ 2>/dev/null || ls -la . | head -20Repository: aturret/FastFetchBot Length of output: 360 🏁 Script executed: # Read the full file to understand the implementation
cat -n ./apps/async-worker/async_worker/services/file_id_consumer.pyRepository: aturret/FastFetchBot Length of output: 3864 🏁 Script executed: # Search for calls to start() and stop() functions
rg "file_id_consumer\.(start|stop)" --type py -B 2 -A 2Repository: aturret/FastFetchBot Length of output: 755 🏁 Script executed: # Search for imports of file_id_consumer
rg "from.*file_id_consumer|import.*file_id_consumer" --type py -B 1 -A 1Repository: aturret/FastFetchBot Length of output: 4514 🏁 Script executed: # Read main.py to understand the startup/shutdown context
cat -n ./apps/async-worker/async_worker/main.pyRepository: aturret/FastFetchBot Length of output: 3159 🏁 Script executed: # Check test file for any failure patterns or CI issues
cat -n ./tests/unit/async_worker/test_file_id_consumer.pyRepository: aturret/FastFetchBot Length of output: 16256 🏁 Script executed: # Check if there are any other event loop management patterns in the codebase
rg "asyncio\.(new_event_loop|set_event_loop|get_event_loop)" --type py -B 2 -A 2Repository: aturret/FastFetchBot Length of output: 507 🏁 Script executed: # Look for any CI configuration that might show loop-related failures
find . -name "*.yml" -o -name "*.yaml" -o -name ".github" -type d | head -20Repository: aturret/FastFetchBot Length of output: 202 🏁 Script executed: # Check for any README or error logs mentioning loop errors
find . -name "README*" -o -name "*.md" | xargs grep -l "event loop\|different loop\|await wasn't used" 2>/dev/null || echo "No matches found"Repository: aturret/FastFetchBot Length of output: 80 🏁 Script executed: # Check CI workflow files for any test failures or patterns related to event loops
cat -n ./.github/workflows/ci.ymlRepository: aturret/FastFetchBot Length of output: 4565 🏁 Script executed: # Check if there are any git commits mentioning "different loop" or event loop issues
git log --all --oneline --grep="loop\|event" 2>/dev/null | head -20 || echo "Git log not available"Repository: aturret/FastFetchBot Length of output: 46 🏁 Script executed: # Look for any existing issues or TODOs in the code mentioning the problem
rg "TODO|FIXME|BUG|XXX" apps/async-worker/async_worker/services/file_id_consumer.py -B 1 -A 1Repository: aturret/FastFetchBot Length of output: 46 🏁 Script executed: # Verify the exact behavior: check if ARQ Worker guarantees same event loop for startup/shutdown
rg "on_startup|on_shutdown|WorkerSettings" apps/async-worker/ -A 3 -B 1 --type pyRepository: aturret/FastFetchBot Length of output: 2066 🏁 Script executed: # Check exception handling pattern in rest of codebase
rg "logger\.(error|exception)\(" apps/async-worker/ --type py | head -15Repository: aturret/FastFetchBot Length of output: 712 Await Redis readiness before marking consumer ready, and fix exception logging.
Additionally, line 41 should use 🤖 Prompt for AI Agents |
||
|
|
||
|
|
||
| async def stop() -> None: | ||
| """Stop the file_id consumer and close the Redis connection.""" | ||
| global _consumer_task, _redis | ||
|
|
||
| if _consumer_task is not None: | ||
| _consumer_task.cancel() | ||
| try: | ||
| await _consumer_task | ||
| except asyncio.CancelledError: | ||
| pass | ||
| _consumer_task = None | ||
| logger.info("file_id consumer task stopped") | ||
|
|
||
| if _redis is not None: | ||
| await _redis.aclose() | ||
| _redis = None | ||
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,78 @@ | ||||||||||
| import json | ||||||||||
| from typing import Optional | ||||||||||
|
|
||||||||||
| import redis.asyncio as aioredis | ||||||||||
| from telegram import Message | ||||||||||
|
|
||||||||||
| from core.config import settings | ||||||||||
| from fastfetchbot_shared.utils.logger import logger | ||||||||||
|
|
||||||||||
| FILEID_QUEUE_KEY = "fileid:updates" | ||||||||||
|
|
||||||||||
| _redis: aioredis.Redis | None = None | ||||||||||
|
|
||||||||||
|
|
||||||||||
| async def _get_redis() -> aioredis.Redis: | ||||||||||
| global _redis | ||||||||||
| if _redis is None: | ||||||||||
| _redis = aioredis.from_url(settings.OUTBOX_REDIS_URL, decode_responses=True) | ||||||||||
| return _redis | ||||||||||
|
|
||||||||||
|
|
||||||||||
| def extract_file_id(message: Message, media_type: str) -> Optional[str]: | ||||||||||
| """Extract the telegram file_id from a sent Message based on media type.""" | ||||||||||
| if media_type == "image" and message.photo: | ||||||||||
| return message.photo[-1].file_id | ||||||||||
| elif media_type == "video" and message.video: | ||||||||||
| return message.video.file_id | ||||||||||
| elif media_type == "gif" and message.animation: | ||||||||||
| return message.animation.file_id | ||||||||||
| elif media_type == "audio" and message.audio: | ||||||||||
| return message.audio.file_id | ||||||||||
| elif media_type == "document" and message.document: | ||||||||||
| return message.document.file_id | ||||||||||
| return None | ||||||||||
|
|
||||||||||
|
|
||||||||||
| async def capture_and_push_file_ids( | ||||||||||
| uncached_info: list[dict], | ||||||||||
| sent_messages: tuple[Message, ...], | ||||||||||
| metadata_url: str, | ||||||||||
| ) -> None: | ||||||||||
| """Extract file_ids from sent messages and push updates to Redis for the async worker. | ||||||||||
|
|
||||||||||
| Args: | ||||||||||
| uncached_info: list of {"url": str, "media_type": str} for items that were | ||||||||||
| downloaded (not served from cached file_id). Parallel to sent_messages — | ||||||||||
| each entry corresponds to a message at the same position. | ||||||||||
| sent_messages: tuple of Message objects returned by send_media_group. | ||||||||||
| metadata_url: the original scraped page URL, used as the key for MongoDB lookup. | ||||||||||
| """ | ||||||||||
| file_id_updates = [] | ||||||||||
|
|
||||||||||
| for i, info in enumerate(uncached_info): | ||||||||||
| if info is None: | ||||||||||
| continue | ||||||||||
| if i >= len(sent_messages): | ||||||||||
| break | ||||||||||
| file_id = extract_file_id(sent_messages[i], info["media_type"]) | ||||||||||
| if file_id: | ||||||||||
| file_id_updates.append({ | ||||||||||
| "url": info["url"], | ||||||||||
| "media_type": info["media_type"], | ||||||||||
| "telegram_file_id": file_id, | ||||||||||
| }) | ||||||||||
|
|
||||||||||
| if not file_id_updates: | ||||||||||
| return | ||||||||||
|
|
||||||||||
| try: | ||||||||||
| r = await _get_redis() | ||||||||||
| payload = json.dumps({ | ||||||||||
| "metadata_url": metadata_url, | ||||||||||
| "file_id_updates": file_id_updates, | ||||||||||
| }, ensure_ascii=False) | ||||||||||
| await r.lpush(FILEID_QUEUE_KEY, payload) | ||||||||||
| logger.info(f"Pushed {len(file_id_updates)} file_id updates for {metadata_url}") | ||||||||||
| except Exception: | ||||||||||
| logger.warning(f"Failed to push file_id updates for {metadata_url}") | ||||||||||
|
Comment on lines
+77
to
+78
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exception handler should log exception details. The bare Proposed fix- except Exception:
- logger.warning(f"Failed to push file_id updates for {metadata_url}")
+ except Exception as e:
+ logger.exception(f"Failed to push file_id updates for {metadata_url}: {e}")Using - except Exception:
- logger.warning(f"Failed to push file_id updates for {metadata_url}")
+ except Exception as e:
+ logger.warning(f"Failed to push file_id updates for {metadata_url}: {e}", exc_info=True)As per coding guidelines: "Never use 📝 Committable suggestion
Suggested change
🧰 Tools🪛 Ruff (0.15.9)[warning] 77-77: Do not catch blind exception: (BLE001) 🤖 Prompt for AI Agents |
||||||||||
Uh oh!
There was an error while loading. Please reload this page.