Conversation
|
Caution Review failedPull request was closed or merged during review 📝 WalkthroughWalkthroughAdds a Redis-backed pipeline: the Telegram bot captures sent-message Changes
Sequence Diagram(s)sequenceDiagram
participant Bot as Telegram Bot
participant Redis as Redis Queue
participant Worker as Async Worker
participant DB as MongoDB
Bot->>Bot: send_item_message()
Bot->>Bot: media_files_packaging() (uses cached telegram_file_id when present)
Bot->>Bot: extract_file_id() from sent messages
Bot->>Redis: LPUSH fileid:updates {metadata_url, file_id_updates}
Worker->>Worker: startup (DATABASE_ON)
Worker->>Worker: file_id_consumer.start()
Worker->>Redis: BRPOP fileid:updates (blocking)
Redis-->>Worker: payload
Worker->>Worker: _process_file_id_update(payload)
Worker->>DB: query latest Metadata by metadata_url
Worker->>DB: update matching media_files[].telegram_file_id
Worker->>DB: save document
loop
Worker->>Redis: BRPOP (await next)
end
Worker->>Worker: shutdown -> file_id_consumer.stop()
Worker->>Redis: close connection
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
apps/telegram-bot/core/services/message_sender.py (1)
89-125:⚠️ Potential issue | 🟠 MajorStart
file_idcapture after document sends too.
uncached_media_infois populated for every downloaded item, including ones placed intofile_message_group, butall_sent_messagesonly contains the media-group results when the background task is created. In document-only flows capture never starts at all, and in mixed flows the sequences diverge, so document/oversized-imagefile_ids are missed or matched to the wrong URL. Launch capture after both send loops and flatten the document send results into the same order asuncached_media_info.Also applies to: 166-174
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/telegram-bot/core/services/message_sender.py` around lines 89 - 125, The background file_id capture is only launched after processing media_message_group, so documents in file_message_group (and oversized images) never get included; update the logic to launch capture only after both send loops complete by appending document send results into all_sent_messages in the same order as uncached_media_info (flatten the document send results into all_sent_messages so indexes align), then call capture_and_push_file_ids(uncached_info=uncached_media_info, sent_messages=tuple(all_sent_messages), ...) and schedule the task; apply the same change where the duplicate capture logic exists (the other capture block handling file_message_group) so both send paths use the unified all_sent_messages before creating the asyncio task.packages/shared/fastfetchbot_shared/database/mongodb/models/metadata.py (1)
41-47:⚠️ Potential issue | 🟠 MajorAdd TTL index to the Metadata document settings.
This
Settingsclass defines only the URL/version index. The guideline requires "TTL support" at the document level, but there is no TTL index or TTL field configuration. Currently, TTL is enforced only via application-level logic infind_cached(), which checks thetimestampfield at query time. Implement a MongoDB TTL index inSettingsto enable automatic document expiry at the database layer, ensuringDATABASE_CACHE_TTLcan be enforced consistently without relying on application-level checks.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/shared/fastfetchbot_shared/database/mongodb/models/metadata.py` around lines 41 - 47, Add a TTL index to the Metadata document by updating the Settings.indexes in the Settings class: include an index tuple for ("timestamp", ASCENDING) with expireAfterSeconds set to the cache TTL (use the existing DATABASE_CACHE_TTL constant or env-derived value) so MongoDB automatically expires documents; locate the Settings class in metadata.py and append the TTL index to the current indexes list (keeping the existing [("url", DESCENDING), ("version", DESCENDING)] entry) so the database enforces expiry rather than relying solely on find_cached().
🧹 Nitpick comments (1)
apps/telegram-bot/core/services/file_id_capture.py (1)
10-10: Duplicated queue key constant creates silent failure risk.
FILEID_QUEUE_KEYis defined identically inapps/async-worker/async_worker/services/file_id_consumer.py. If either is changed independently (e.g., a typo), the producer and consumer will silently diverge, causing updates to be lost without error.Consider extracting this to a shared location (e.g.,
fastfetchbot_shared.constants) or at minimum adding a comment cross-referencing the other definition.Example shared constant approach
In a shared module (e.g.,
fastfetchbot_shared/constants.py):FILEID_QUEUE_KEY = "fileid:updates"Then import in both files:
-FILEID_QUEUE_KEY = "fileid:updates" +from fastfetchbot_shared.constants import FILEID_QUEUE_KEY🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/telegram-bot/core/services/file_id_capture.py` at line 10, Duplicate FILEID_QUEUE_KEY constant creates a silent divergence risk; fix by moving the constant into a shared module (e.g., fastfetchbot_shared.constants) and import FILEID_QUEUE_KEY from there in both producers and consumers, or if a shared module is not possible, add a clear comment on the FILEID_QUEUE_KEY definition in file_id_capture.py referencing the other definition (file_id_consumer.py) and the canonical source so future edits stay in sync; update any imports/usages to reference the single shared constant symbol FILEID_QUEUE_KEY.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@apps/async-worker/async_worker/services/file_id_consumer.py`:
- Around line 29-42: The consumer currently uses brpop which removes the payload
before processing (in the block around result = await r.brpop(...) and
subsequent json.loads(...)/_process_file_id_update), risking data loss on parse
or processing failures; change the flow so the message is not discarded on
failure — either switch to a Redis stream with acknowledgment semantics or
atomically move/push the item to a processing queue (or use BRPOPLPUSH/RPOPLPUSH
semantics) and only remove/acknowledge after _process_file_id_update succeeds;
additionally, replace the broad except Exception handler with specific handlers
for json.JSONDecodeError and the database/IO errors raised by
_process_file_id_update, and on any recoverable failure requeue the raw_payload
to a retry or dead-letter queue (including detailed error info in the logger)
before continuing or sleeping.
- Around line 76-83: start() currently only schedules _consume_loop() and
returns before Redis is connected; modify start() to await a Redis-readiness
check before returning (e.g., implement or call an awaitable like
_wait_for_redis_ready() or await the Redis client's connection/ready method used
in this module) and only then create/assign _consumer_task or mark the consumer
ready so callers see a truly initialized consumer; also replace the generic
Exception catch in the consumer connection logic with the Redis client's
specific exception type (e.g., RedisError/ConnectionError used by your Redis
lib) and change logger.error(...) to logger.exception(...) to include the
traceback (update the handler around the Redis connection in
_consume_loop()/connection routine accordingly).
In `@apps/telegram-bot/core/services/file_id_capture.py`:
- Around line 77-78: The except block in file_id_capture.py that currently does
"except Exception: logger.warning(f'Failed to push file_id updates for
{metadata_url}')" should be changed to log the exception details; modify the
handler in the function/method surrounding the try/except that references
metadata_url (the block that pushes file_id updates) to call
logger.exception(...) (or logger.error(..., exc_info=True)) instead of
logger.warning so the exception message and traceback are recorded. Ensure the
log message keeps the metadata_url context (e.g., "Failed to push file_id
updates for {metadata_url}") while using logger.exception or passing
exc_info=True.
In `@apps/telegram-bot/core/services/message_sender.py`:
- Around line 247-267: The early shortcut checks
media_item.get("telegram_file_id") and for media_type "image" always appends
InputMediaPhoto(file_id), which breaks cases where oversized images must be sent
as a document (InputMediaDocument) after size/dimension checks; update the
shortcut in the message_sender logic to either (A) store and use the final send
mode alongside telegram_file_id and branch on that stored mode, or (B) skip
using the cached telegram_file_id for images whose packaging depends on
downloaded bytes so the full size/dimension checks run and the code can append
an InputMediaDocument to file_group and record uncached_media_info
appropriately; modify the block that touches media_item["telegram_file_id"],
media_group, file_group, uncached_media_info, and media_counter to implement one
of these approaches and preserve existing behavior for non-image types.
---
Outside diff comments:
In `@apps/telegram-bot/core/services/message_sender.py`:
- Around line 89-125: The background file_id capture is only launched after
processing media_message_group, so documents in file_message_group (and
oversized images) never get included; update the logic to launch capture only
after both send loops complete by appending document send results into
all_sent_messages in the same order as uncached_media_info (flatten the document
send results into all_sent_messages so indexes align), then call
capture_and_push_file_ids(uncached_info=uncached_media_info,
sent_messages=tuple(all_sent_messages), ...) and schedule the task; apply the
same change where the duplicate capture logic exists (the other capture block
handling file_message_group) so both send paths use the unified
all_sent_messages before creating the asyncio task.
In `@packages/shared/fastfetchbot_shared/database/mongodb/models/metadata.py`:
- Around line 41-47: Add a TTL index to the Metadata document by updating the
Settings.indexes in the Settings class: include an index tuple for ("timestamp",
ASCENDING) with expireAfterSeconds set to the cache TTL (use the existing
DATABASE_CACHE_TTL constant or env-derived value) so MongoDB automatically
expires documents; locate the Settings class in metadata.py and append the TTL
index to the current indexes list (keeping the existing [("url", DESCENDING),
("version", DESCENDING)] entry) so the database enforces expiry rather than
relying solely on find_cached().
---
Nitpick comments:
In `@apps/telegram-bot/core/services/file_id_capture.py`:
- Line 10: Duplicate FILEID_QUEUE_KEY constant creates a silent divergence risk;
fix by moving the constant into a shared module (e.g.,
fastfetchbot_shared.constants) and import FILEID_QUEUE_KEY from there in both
producers and consumers, or if a shared module is not possible, add a clear
comment on the FILEID_QUEUE_KEY definition in file_id_capture.py referencing the
other definition (file_id_consumer.py) and the canonical source so future edits
stay in sync; update any imports/usages to reference the single shared constant
symbol FILEID_QUEUE_KEY.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 1ef0472f-25f8-41cb-846e-7ef48c674ba1
📒 Files selected for processing (11)
apps/async-worker/async_worker/config.pyapps/async-worker/async_worker/main.pyapps/async-worker/async_worker/services/file_id_consumer.pyapps/telegram-bot/core/services/file_id_capture.pyapps/telegram-bot/core/services/message_sender.pypackages/shared/fastfetchbot_shared/database/mongodb/models/metadata.pypackages/shared/fastfetchbot_shared/models/metadata_item.pytests/unit/async_worker/test_file_id_consumer.pytests/unit/database/mongodb/test_file_id_fields.pytests/unit/telegram_bot/test_file_id_capture.pytests/unit/telegram_bot/test_message_sender.py
| async def start() -> None: | ||
| """Start the file_id consumer as a background asyncio task.""" | ||
| global _consumer_task | ||
| if _consumer_task is not None: | ||
| logger.warning("file_id consumer already running") | ||
| return | ||
| _consumer_task = asyncio.create_task(_consume_loop()) | ||
| logger.info("file_id consumer task created") |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, locate the file and check if it exists
find . -name "file_id_consumer.py" -type fRepository: aturret/FastFetchBot
Length of output: 125
🏁 Script executed:
# List the repository structure to understand the project layout
ls -la apps/async-worker/ 2>/dev/null || ls -la . | head -20Repository: aturret/FastFetchBot
Length of output: 360
🏁 Script executed:
# Read the full file to understand the implementation
cat -n ./apps/async-worker/async_worker/services/file_id_consumer.pyRepository: aturret/FastFetchBot
Length of output: 3864
🏁 Script executed:
# Search for calls to start() and stop() functions
rg "file_id_consumer\.(start|stop)" --type py -B 2 -A 2Repository: aturret/FastFetchBot
Length of output: 755
🏁 Script executed:
# Search for imports of file_id_consumer
rg "from.*file_id_consumer|import.*file_id_consumer" --type py -B 1 -A 1Repository: aturret/FastFetchBot
Length of output: 4514
🏁 Script executed:
# Read main.py to understand the startup/shutdown context
cat -n ./apps/async-worker/async_worker/main.pyRepository: aturret/FastFetchBot
Length of output: 3159
🏁 Script executed:
# Check test file for any failure patterns or CI issues
cat -n ./tests/unit/async_worker/test_file_id_consumer.pyRepository: aturret/FastFetchBot
Length of output: 16256
🏁 Script executed:
# Check if there are any other event loop management patterns in the codebase
rg "asyncio\.(new_event_loop|set_event_loop|get_event_loop)" --type py -B 2 -A 2Repository: aturret/FastFetchBot
Length of output: 507
🏁 Script executed:
# Look for any CI configuration that might show loop-related failures
find . -name "*.yml" -o -name "*.yaml" -o -name ".github" -type d | head -20Repository: aturret/FastFetchBot
Length of output: 202
🏁 Script executed:
# Check for any README or error logs mentioning loop errors
find . -name "README*" -o -name "*.md" | xargs grep -l "event loop\|different loop\|await wasn't used" 2>/dev/null || echo "No matches found"Repository: aturret/FastFetchBot
Length of output: 80
🏁 Script executed:
# Check CI workflow files for any test failures or patterns related to event loops
cat -n ./.github/workflows/ci.ymlRepository: aturret/FastFetchBot
Length of output: 4565
🏁 Script executed:
# Check if there are any git commits mentioning "different loop" or event loop issues
git log --all --oneline --grep="loop\|event" 2>/dev/null | head -20 || echo "Git log not available"Repository: aturret/FastFetchBot
Length of output: 46
🏁 Script executed:
# Look for any existing issues or TODOs in the code mentioning the problem
rg "TODO|FIXME|BUG|XXX" apps/async-worker/async_worker/services/file_id_consumer.py -B 1 -A 1Repository: aturret/FastFetchBot
Length of output: 46
🏁 Script executed:
# Verify the exact behavior: check if ARQ Worker guarantees same event loop for startup/shutdown
rg "on_startup|on_shutdown|WorkerSettings" apps/async-worker/ -A 3 -B 1 --type pyRepository: aturret/FastFetchBot
Length of output: 2066
🏁 Script executed:
# Check exception handling pattern in rest of codebase
rg "logger\.(error|exception)\(" apps/async-worker/ --type py | head -15Repository: aturret/FastFetchBot
Length of output: 712
Await Redis readiness before marking consumer ready, and fix exception logging.
start() only schedules _consume_loop() without awaiting, so the ready flag is set (line 70 in main.py) before the Redis connection actually completes (line 24 in file_id_consumer.py). This allows callers to treat the consumer as initialized before it's truly ready. Have start() await a readiness check to ensure Redis is connected before returning.
Additionally, line 41 should use logger.exception() instead of logger.error() to capture the full traceback, and the generic Exception should be replaced with a more specific typed exception per guidelines.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/async-worker/async_worker/services/file_id_consumer.py` around lines 76
- 83, start() currently only schedules _consume_loop() and returns before Redis
is connected; modify start() to await a Redis-readiness check before returning
(e.g., implement or call an awaitable like _wait_for_redis_ready() or await the
Redis client's connection/ready method used in this module) and only then
create/assign _consumer_task or mark the consumer ready so callers see a truly
initialized consumer; also replace the generic Exception catch in the consumer
connection logic with the Redis client's specific exception type (e.g.,
RedisError/ConnectionError used by your Redis lib) and change logger.error(...)
to logger.exception(...) to include the traceback (update the handler around the
Redis connection in _consume_loop()/connection routine accordingly).
| except Exception: | ||
| logger.warning(f"Failed to push file_id updates for {metadata_url}") |
There was a problem hiding this comment.
Exception handler should log exception details.
The bare except Exception catches all errors but logger.warning() doesn't capture the exception traceback or message, making debugging difficult. Per coding guidelines, use logger.exception() to include the full traceback, or at minimum log the exception details.
Proposed fix
- except Exception:
- logger.warning(f"Failed to push file_id updates for {metadata_url}")
+ except Exception as e:
+ logger.exception(f"Failed to push file_id updates for {metadata_url}: {e}")Using logger.exception() automatically includes the traceback at ERROR level. If you prefer to keep it at WARNING level:
- except Exception:
- logger.warning(f"Failed to push file_id updates for {metadata_url}")
+ except Exception as e:
+ logger.warning(f"Failed to push file_id updates for {metadata_url}: {e}", exc_info=True)As per coding guidelines: "Never use print() or traceback.print_exc() — always use logger.exception() (includes traceback) or logger.error() (message only)".
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| except Exception: | |
| logger.warning(f"Failed to push file_id updates for {metadata_url}") | |
| except Exception as e: | |
| logger.exception(f"Failed to push file_id updates for {metadata_url}: {e}") |
🧰 Tools
🪛 Ruff (0.15.9)
[warning] 77-77: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/telegram-bot/core/services/file_id_capture.py` around lines 77 - 78, The
except block in file_id_capture.py that currently does "except Exception:
logger.warning(f'Failed to push file_id updates for {metadata_url}')" should be
changed to log the exception details; modify the handler in the function/method
surrounding the try/except that references metadata_url (the block that pushes
file_id updates) to call logger.exception(...) (or logger.error(...,
exc_info=True)) instead of logger.warning so the exception message and traceback
are recorded. Ensure the log message keeps the metadata_url context (e.g.,
"Failed to push file_id updates for {metadata_url}") while using
logger.exception or passing exc_info=True.
| # Check for cached telegram file_id — skip download entirely if available | ||
| file_id = media_item.get("telegram_file_id") | ||
| if file_id: | ||
| media_type = media_item["media_type"] | ||
| if media_type == "image": | ||
| media_group.append(InputMediaPhoto(file_id)) | ||
| elif media_type == "gif": | ||
| media_group.append(InputMediaAnimation(file_id)) | ||
| elif media_type == "video": | ||
| media_group.append(InputMediaVideo(file_id, supports_streaming=True)) | ||
| elif media_type == "audio": | ||
| media_group.append(InputMediaAudio(file_id)) | ||
| elif media_type == "document": | ||
| file_group.append(InputMediaDocument(file_id, parse_mode=ParseMode.HTML)) | ||
| file_counter += 1 | ||
| uncached_media_info.append(None) | ||
| media_counter += 1 | ||
| logger.info( | ||
| f"get the {media_counter}th media item (cached file_id), type: {media_type}, url: {media_item['url']}" | ||
| ) | ||
| continue |
There was a problem hiding this comment.
The shortcut changes behavior for oversized images.
An "image" item can still go down the later document path after the size/dimension checks, but this early return sends only InputMediaPhoto(file_id) and never recreates the companion InputMediaDocument. Once such an item is cached, users stop receiving the original-file attachment. You either need to cache the final send mode(s), not just one telegram_file_id, or skip the shortcut for images whose packaging depends on the downloaded bytes.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/telegram-bot/core/services/message_sender.py` around lines 247 - 267,
The early shortcut checks media_item.get("telegram_file_id") and for media_type
"image" always appends InputMediaPhoto(file_id), which breaks cases where
oversized images must be sent as a document (InputMediaDocument) after
size/dimension checks; update the shortcut in the message_sender logic to either
(A) store and use the final send mode alongside telegram_file_id and branch on
that stored mode, or (B) skip using the cached telegram_file_id for images whose
packaging depends on downloaded bytes so the full size/dimension checks run and
the code can append an InputMediaDocument to file_group and record
uncached_media_info appropriately; modify the block that touches
media_item["telegram_file_id"], media_group, file_group, uncached_media_info,
and media_counter to implement one of these approaches and preserve existing
behavior for non-image types.
Codecov Report❌ Patch coverage is
@@ Coverage Diff @@
## main #74 +/- ##
=======================================
Coverage ? 80.61%
=======================================
Files ? 69
Lines ? 3636
Branches ? 0
=======================================
Hits ? 2931
Misses ? 705
Partials ? 0
🚀 New features to boost your workflow:
|
Summary by CodeRabbit
New Features
Documentation
Tests