From 4db9d9cf411aeac51d0fde3639b7ad6cbb1c7cce Mon Sep 17 00:00:00 2001 From: sandeshgorde Date: Wed, 6 May 2026 14:58:29 +0530 Subject: [PATCH 1/2] Add KafkaErrorDetector for producer/consumer error detection Detects: - ProducerFencedException - CommitFailedException - WakeupException - org.apache.kafka.common.errors Severity: ERROR Good first issue contribution (#1). --- .../stacklens/classifier/IssueClassifier.java | 1 + .../detector/KafkaErrorDetector.java | 44 +++++++++++++++++ .../stacklens/detector/AllDetectorsTest.java | 47 +++++++++++++++++++ 3 files changed, 92 insertions(+) create mode 100644 src/main/java/com/stacklens/detector/KafkaErrorDetector.java diff --git a/src/main/java/com/stacklens/classifier/IssueClassifier.java b/src/main/java/com/stacklens/classifier/IssueClassifier.java index 1f45db5..e22318a 100644 --- a/src/main/java/com/stacklens/classifier/IssueClassifier.java +++ b/src/main/java/com/stacklens/classifier/IssueClassifier.java @@ -58,6 +58,7 @@ public IssueClassifier() { detectors.add(new Http500Detector()); detectors.add(new FileSystemErrorDetector()); detectors.add(new TransactionErrorDetector()); + detectors.add(new KafkaErrorDetector()); // WARNING severity detectors.add(new TimeoutDetector()); diff --git a/src/main/java/com/stacklens/detector/KafkaErrorDetector.java b/src/main/java/com/stacklens/detector/KafkaErrorDetector.java new file mode 100644 index 0000000..a2e3d36 --- /dev/null +++ b/src/main/java/com/stacklens/detector/KafkaErrorDetector.java @@ -0,0 +1,44 @@ +package com.stacklens.detector; + +import com.stacklens.model.Issue; +import com.stacklens.model.Severity; + +import java.util.List; +import java.util.Optional; + +public class KafkaErrorDetector implements IssueDetector { + + @Override + public String getIssueType() { return "KafkaError"; } + + @Override + public Severity getSeverity() { return Severity.ERROR; } + + @Override + public Optional detect(String line) { + if (line == null) return Optional.empty(); + + if (line.contains("ProducerFencedException") || + line.contains("CommitFailedException") || + line.contains("WakeupException") || + line.contains("org.apache.kafka.common.errors")) { + return Optional.of(new Issue( + getIssueType(), + getSeverity(), + "A Kafka producer or consumer error occurred. This is common in microservice stacks " + + "when there are issues with broker connectivity, consumer group coordination, or " + + "producer/consumer lifecycle management.", + List.of( + "Check that Kafka broker is running and accessible from the application", + "Verify consumer group ID configuration and avoid duplicate consumers joining the group", + "Ensure producer/consumer are properly closed during application shutdown", + "Check for network issues or incorrect broker addresses in configuration", + "Verify that the topic exists and the application has permission to produce/consume" + ), + line.trim() + )); + } + + return Optional.empty(); + } +} \ No newline at end of file diff --git a/src/test/java/com/stacklens/detector/AllDetectorsTest.java b/src/test/java/com/stacklens/detector/AllDetectorsTest.java index d298ad3..ab817fd 100644 --- a/src/test/java/com/stacklens/detector/AllDetectorsTest.java +++ b/src/test/java/com/stacklens/detector/AllDetectorsTest.java @@ -247,4 +247,51 @@ void transactionErrorDetector_doesNotMatchCleanLine() { IssueDetector detector = new TransactionErrorDetector(); assertFalse(detector.detect(CLEAN_LINE).isPresent()); } + + @Test + void kafkaErrorDetector_detectsProducerFencedException() { + IssueDetector detector = new KafkaErrorDetector(); + String line = "org.apache.kafka.common.errors.ProducerFencedException: Producer is fenced"; + + Optional result = detector.detect(line); + + assertTrue(result.isPresent()); + assertEquals("KafkaError", result.get().getType()); + } + + @Test + void kafkaErrorDetector_detectsCommitFailedException() { + IssueDetector detector = new KafkaErrorDetector(); + String line = "org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed"; + + Optional result = detector.detect(line); + + assertTrue(result.isPresent()); + } + + @Test + void kafkaErrorDetector_detectsWakeupException() { + IssueDetector detector = new KafkaErrorDetector(); + String line = "org.apache.kafka.common.errors.WakeupException: Kafka consumer wakeup"; + + Optional result = detector.detect(line); + + assertTrue(result.isPresent()); + } + + @Test + void kafkaErrorDetector_detectsKafkaCommonErrors() { + IssueDetector detector = new KafkaErrorDetector(); + String line = "org.apache.kafka.common.errors.NetworkException: Connection to node failed"; + + Optional result = detector.detect(line); + + assertTrue(result.isPresent()); + } + + @Test + void kafkaErrorDetector_doesNotMatchCleanLine() { + IssueDetector detector = new KafkaErrorDetector(); + assertFalse(detector.detect(CLEAN_LINE).isPresent()); + } } From f87eb98f4a306c564ddb1b143c04ebdf4eda11bb Mon Sep 17 00:00:00 2001 From: sandeshgorde Date: Wed, 6 May 2026 15:43:13 +0530 Subject: [PATCH 2/2] Add Javadoc comments to KafkaErrorDetector Added class-level and method-level Javadoc to fix CodeRabbit's docstring coverage warning. Follows existing detector patterns. --- .../detector/KafkaErrorDetector.java | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/main/java/com/stacklens/detector/KafkaErrorDetector.java b/src/main/java/com/stacklens/detector/KafkaErrorDetector.java index a2e3d36..3203184 100644 --- a/src/main/java/com/stacklens/detector/KafkaErrorDetector.java +++ b/src/main/java/com/stacklens/detector/KafkaErrorDetector.java @@ -6,14 +6,43 @@ import java.util.List; import java.util.Optional; +/** + * Detects common Kafka producer and consumer errors in log lines. + * + * Matches patterns like ProducerFencedException, CommitFailedException, + * WakeupException, and common errors from org.apache.kafka.common.errors. + * These errors are frequent in microservice stacks with Kafka integration. + */ public class KafkaErrorDetector implements IssueDetector { + /** + * Returns the issue type identifier for Kafka errors. + * + * @return the string "KafkaError" + */ @Override public String getIssueType() { return "KafkaError"; } + /** + * Returns the severity level for Kafka errors. + * + * @return Severity.ERROR + */ @Override public Severity getSeverity() { return Severity.ERROR; } + /** + * Analyzes a log line for Kafka producer/consumer error patterns. + * + * Checks for: + * - ProducerFencedException (producer fenced by broker) + * - CommitFailedException (consumer commit failed) + * - WakeupException (consumer/producer wakeup) + * - org.apache.kafka.common.errors (any Kafka common error) + * + * @param line a single line from the log or stack trace + * @return an Optional containing the detected Issue, or empty if no match + */ @Override public Optional detect(String line) { if (line == null) return Optional.empty();