Add Redis subscribe usage ingest#138
Conversation
There was a problem hiding this comment.
Code Review
This pull request refactors the usage ingestion architecture by replacing the legacy RedisDrain with a more modular system comprising a RedisIngestRunner and a RedisProcessRunner. The new implementation introduces a state machine that supports Redis SUBSCRIBE, Redis pull, and HTTP pull with built-in fallback and recovery logic. Additionally, the Redis client now includes security limits for RESP parsing to improve stability. Review feedback highlighted several areas for improvement: ensuring the backfill logic fully drains queues during startup to prevent data lag, allowing the ingest runner to "upgrade" back to higher-priority modes if they become available after a fallback, and making the Redis subscription channel configurable to ensure proper data isolation in shared environments.
Ensure subscribe reconnect catch-up drains legacy pull sources to an empty batch while keeping per-batch details at debug level.
Keep usage queue keys, subscribe channel, and Redis commands in the CPA endpoint constants so pull and subscribe paths share one source of truth.
Wait for asynchronous log writes before canceling test runners so slower CI schedulers do not miss expected backfill and subscribe debug logs.
Ensure fixed redis_pull mode clears stale status errors when Redis succeeds again after a full pull-chain failure.
Reject Redis queue batch sizes above the RESP array limit and make poller log capture safe for race-enabled tests.
Summary
Test plan