Skip to content

feat: DLQ support for processing strategies with batch step #373

@mj0nez

Description

@mj0nez

Currently, handling of invalid messages with the available DLQ does only work if the strategy has a direct processing chain and does not include batching. Consider the following:

0) consume -> 1) batch -> 2) process in batch -> 3) unbatch -> 4) process message wise -> 5) produce -> 6) commit

All (individual) strategies after the batch step (2 & 4) cannot raise an InvalidMessage exception to trigger the DLQ, because any exception would dump the current batch of messages. Furthermore, if we need the faulty message in the DLQ, the strategy needs some kind of routing producer, which routes valid messages to one and invalid messages to the DLQ topic.

To avoid this, I would like the StreamProcessor to expose it’s handle_invalid_message to the strategy factory, like it does with commit. This would introduce considerably changes to the API but also allow strategies to pass invalid messages directly to the dlq handler, utilizing the configured policy and buffered messages, while avoiding complex routing logic and replicating the DLQ handling.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request
    No fields configured for issues without a type.

    Projects

    Status

    Waiting for: Product Owner

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions