Currently, handling of invalid messages with the available DLQ does only work if the strategy has a direct processing chain and does not include batching. Consider the following:
0) consume -> 1) batch -> 2) process in batch -> 3) unbatch -> 4) process message wise -> 5) produce -> 6) commit
All (individual) strategies after the batch step (2 & 4) cannot raise an InvalidMessage exception to trigger the DLQ, because any exception would dump the current batch of messages. Furthermore, if we need the faulty message in the DLQ, the strategy needs some kind of routing producer, which routes valid messages to one and invalid messages to the DLQ topic.
To avoid this, I would like the StreamProcessor to expose it’s handle_invalid_message to the strategy factory, like it does with commit. This would introduce considerably changes to the API but also allow strategies to pass invalid messages directly to the dlq handler, utilizing the configured policy and buffered messages, while avoiding complex routing logic and replicating the DLQ handling.
Currently, handling of invalid messages with the available DLQ does only work if the strategy has a direct processing chain and does not include batching. Consider the following:
All (individual) strategies after the batch step (2 & 4) cannot raise an
InvalidMessageexception to trigger the DLQ, because any exception would dump the current batch of messages. Furthermore, if we need the faulty message in the DLQ, the strategy needs some kind of routing producer, which routes valid messages to one and invalid messages to the DLQ topic.To avoid this, I would like the
StreamProcessorto expose it’shandle_invalid_messageto the strategy factory, like it does withcommit. This would introduce considerably changes to the API but also allow strategies to pass invalid messages directly to the dlq handler, utilizing the configured policy and buffered messages, while avoiding complex routing logic and replicating the DLQ handling.