Summary
The Kafka pub/sub backend (trustgraph-base/trustgraph/base/kafka_backend.py) is functionally complete but has blocking issues in practice. When deployed, consumers appear to hang/block and messages are not processed. The root cause has not been identified.
Current State
The implementation covers:
- Topic parsing and naming (
class:topicspace:topic → topicspace.class.topic)
- Retention policy: 7 days for
flow, 5 minutes for request/response/notify
- Producer with delivery confirmation and Kafka header support
- Consumer with competing-consumer (shared group) vs per-subscriber (unique group) semantics
- Acknowledge (commit offset) and negative acknowledge (seek back)
- Topic lifecycle via AdminClient:
create_topic, delete_topic, topic_exists, ensure_topic
- Factory integration in
pubsub.py with CLI args and env vars
- SASL/SSL auth config passthrough
- Unit tests for parsing, retention, factory, and CLI args (all passing)
Known Issues
1. Consumer blocking (undiagnosed)
When running with --pubsub-backend kafka, consumers appear to block indefinitely. Messages are not delivered despite producers completing without error. The exact cause is unknown — possible areas to investigate:
- Group coordinator timeouts:
session.timeout.ms=6000 and heartbeat.interval.ms=1000 may be too tight or too loose depending on Kafka version and broker config
- Partition assignment delay: single-partition topics with consumer groups may take time to assign, and if assignment hasn't completed when
receive() is called, poll returns nothing until timeout
auto.offset.reset=latest on flow/request consumers: if the consumer joins after messages are published (race during flow startup), those messages are skipped. This is the same class of bug we fixed in the Pulsar backend's unsubscribe() removal
enable.auto.commit=False: offsets are only committed on explicit acknowledge(). If a consumer crashes before committing, the message is redelivered — but if the consumer never successfully polls, offsets are never established
2. unsubscribe() still present
KafkaBackendConsumer.unsubscribe() (line 238-243) still exists. We removed unsubscribe() from the Pulsar consumer (consumer.py) because it deleted subscription cursors and caused message loss on flow restarts. The Kafka consumer's unsubscribe() detaches from the consumer group, which has a similar effect — the group's committed offsets may be lost when all members leave. This method is no longer called by consumer.py after the Pulsar fix, but it remains as a footgun.
3. No integration/e2e test coverage
Unit tests pass but only cover topic parsing, retention, and factory wiring. There are no integration tests that verify end-to-end message flow through Kafka. The blocking behaviour only manifests with a real broker.
Also
- This should be set to PR against the latest
release/v2.X branch.
- Should pass tests
Feel free to reach out for assistance on TrustGraph onboarding
How to test
Set up the environment...
python3 -m venv env
. env/bin/activate
pip install ./trustgraph-{base,cli,flow,vertexai,bedrock,unstructured}
and run tests...
pytest tests/unit
pytest tests/integration -m 'not slow'
pytest tests/contract
Reference
- Implementation:
trustgraph-base/trustgraph/base/kafka_backend.py
- Tech spec:
docs/tech-specs/kafka-backend.md
- Unit tests:
tests/unit/test_pubsub/test_kafka_backend.py
- Factory/CLI:
trustgraph-base/trustgraph/base/pubsub.py
Suggested Investigation
- Run with
KAFKA_LOG_LEVEL=DEBUG or equivalent confluent-kafka debug config to trace consumer group join/rebalance/poll activity
- Check whether
ensure_connected() poll kick (line 195) actually completes partition assignment, or whether it needs a longer timeout or retry loop
- Test with
auto.offset.reset=earliest on flow consumers to rule out the "join after publish" race
- Verify topic creation completes before consumers subscribe — Kafka rejects subscriptions to non-existent topics (unlike Pulsar's auto-create)
Summary
The Kafka pub/sub backend (
trustgraph-base/trustgraph/base/kafka_backend.py) is functionally complete but has blocking issues in practice. When deployed, consumers appear to hang/block and messages are not processed. The root cause has not been identified.Current State
The implementation covers:
class:topicspace:topic→topicspace.class.topic)flow, 5 minutes forrequest/response/notifycreate_topic,delete_topic,topic_exists,ensure_topicpubsub.pywith CLI args and env varsKnown Issues
1. Consumer blocking (undiagnosed)
When running with
--pubsub-backend kafka, consumers appear to block indefinitely. Messages are not delivered despite producers completing without error. The exact cause is unknown — possible areas to investigate:session.timeout.ms=6000andheartbeat.interval.ms=1000may be too tight or too loose depending on Kafka version and broker configreceive()is called, poll returns nothing until timeoutauto.offset.reset=lateston flow/request consumers: if the consumer joins after messages are published (race during flow startup), those messages are skipped. This is the same class of bug we fixed in the Pulsar backend'sunsubscribe()removalenable.auto.commit=False: offsets are only committed on explicitacknowledge(). If a consumer crashes before committing, the message is redelivered — but if the consumer never successfully polls, offsets are never established2.
unsubscribe()still presentKafkaBackendConsumer.unsubscribe()(line 238-243) still exists. We removedunsubscribe()from the Pulsar consumer (consumer.py) because it deleted subscription cursors and caused message loss on flow restarts. The Kafka consumer'sunsubscribe()detaches from the consumer group, which has a similar effect — the group's committed offsets may be lost when all members leave. This method is no longer called byconsumer.pyafter the Pulsar fix, but it remains as a footgun.3. No integration/e2e test coverage
Unit tests pass but only cover topic parsing, retention, and factory wiring. There are no integration tests that verify end-to-end message flow through Kafka. The blocking behaviour only manifests with a real broker.
Also
release/v2.Xbranch.Feel free to reach out for assistance on TrustGraph onboarding
How to test
Set up the environment...
and run tests...
Reference
trustgraph-base/trustgraph/base/kafka_backend.pydocs/tech-specs/kafka-backend.mdtests/unit/test_pubsub/test_kafka_backend.pytrustgraph-base/trustgraph/base/pubsub.pySuggested Investigation
KAFKA_LOG_LEVEL=DEBUGor equivalentconfluent-kafkadebug config to trace consumer group join/rebalance/poll activityensure_connected()poll kick (line 195) actually completes partition assignment, or whether it needs a longer timeout or retry loopauto.offset.reset=earlieston flow consumers to rule out the "join after publish" race