Skip to content

Kafka backend: blocked consumers and known issues #944

@cybermaggedon

Description

@cybermaggedon

Summary

The Kafka pub/sub backend (trustgraph-base/trustgraph/base/kafka_backend.py) is functionally complete but has blocking issues in practice. When deployed, consumers appear to hang/block and messages are not processed. The root cause has not been identified.

Current State

The implementation covers:

  • Topic parsing and naming (class:topicspace:topictopicspace.class.topic)
  • Retention policy: 7 days for flow, 5 minutes for request/response/notify
  • Producer with delivery confirmation and Kafka header support
  • Consumer with competing-consumer (shared group) vs per-subscriber (unique group) semantics
  • Acknowledge (commit offset) and negative acknowledge (seek back)
  • Topic lifecycle via AdminClient: create_topic, delete_topic, topic_exists, ensure_topic
  • Factory integration in pubsub.py with CLI args and env vars
  • SASL/SSL auth config passthrough
  • Unit tests for parsing, retention, factory, and CLI args (all passing)

Known Issues

1. Consumer blocking (undiagnosed)

When running with --pubsub-backend kafka, consumers appear to block indefinitely. Messages are not delivered despite producers completing without error. The exact cause is unknown — possible areas to investigate:

  • Group coordinator timeouts: session.timeout.ms=6000 and heartbeat.interval.ms=1000 may be too tight or too loose depending on Kafka version and broker config
  • Partition assignment delay: single-partition topics with consumer groups may take time to assign, and if assignment hasn't completed when receive() is called, poll returns nothing until timeout
  • auto.offset.reset=latest on flow/request consumers: if the consumer joins after messages are published (race during flow startup), those messages are skipped. This is the same class of bug we fixed in the Pulsar backend's unsubscribe() removal
  • enable.auto.commit=False: offsets are only committed on explicit acknowledge(). If a consumer crashes before committing, the message is redelivered — but if the consumer never successfully polls, offsets are never established

2. unsubscribe() still present

KafkaBackendConsumer.unsubscribe() (line 238-243) still exists. We removed unsubscribe() from the Pulsar consumer (consumer.py) because it deleted subscription cursors and caused message loss on flow restarts. The Kafka consumer's unsubscribe() detaches from the consumer group, which has a similar effect — the group's committed offsets may be lost when all members leave. This method is no longer called by consumer.py after the Pulsar fix, but it remains as a footgun.

3. No integration/e2e test coverage

Unit tests pass but only cover topic parsing, retention, and factory wiring. There are no integration tests that verify end-to-end message flow through Kafka. The blocking behaviour only manifests with a real broker.

Also

  • This should be set to PR against the latest release/v2.X branch.
  • Should pass tests

Feel free to reach out for assistance on TrustGraph onboarding

How to test

Set up the environment...

python3 -m venv env
. env/bin/activate
pip install ./trustgraph-{base,cli,flow,vertexai,bedrock,unstructured}

and run tests...

pytest tests/unit
pytest tests/integration -m 'not slow'
pytest tests/contract

Reference

  • Implementation: trustgraph-base/trustgraph/base/kafka_backend.py
  • Tech spec: docs/tech-specs/kafka-backend.md
  • Unit tests: tests/unit/test_pubsub/test_kafka_backend.py
  • Factory/CLI: trustgraph-base/trustgraph/base/pubsub.py

Suggested Investigation

  1. Run with KAFKA_LOG_LEVEL=DEBUG or equivalent confluent-kafka debug config to trace consumer group join/rebalance/poll activity
  2. Check whether ensure_connected() poll kick (line 195) actually completes partition assignment, or whether it needs a longer timeout or retry loop
  3. Test with auto.offset.reset=earliest on flow consumers to rule out the "join after publish" race
  4. Verify topic creation completes before consumers subscribe — Kafka rejects subscriptions to non-existent topics (unlike Pulsar's auto-create)

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions