Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public IssueClassifier() {
detectors.add(new Http500Detector());
detectors.add(new FileSystemErrorDetector());
detectors.add(new TransactionErrorDetector());
detectors.add(new KafkaErrorDetector());

// WARNING severity
detectors.add(new TimeoutDetector());
Expand Down
73 changes: 73 additions & 0 deletions src/main/java/com/stacklens/detector/KafkaErrorDetector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.stacklens.detector;

import com.stacklens.model.Issue;
import com.stacklens.model.Severity;

import java.util.List;
import java.util.Optional;

/**
* Detects common Kafka producer and consumer errors in log lines.
*
* Matches patterns like ProducerFencedException, CommitFailedException,
* WakeupException, and common errors from org.apache.kafka.common.errors.
* These errors are frequent in microservice stacks with Kafka integration.
*/
public class KafkaErrorDetector implements IssueDetector {

/**
* Returns the issue type identifier for Kafka errors.
*
* @return the string "KafkaError"
*/
@Override
public String getIssueType() { return "KafkaError"; }

/**
* Returns the severity level for Kafka errors.
*
* @return Severity.ERROR
*/
@Override
public Severity getSeverity() { return Severity.ERROR; }

/**
* Analyzes a log line for Kafka producer/consumer error patterns.
*
* Checks for:
* - ProducerFencedException (producer fenced by broker)
* - CommitFailedException (consumer commit failed)
* - WakeupException (consumer/producer wakeup)
* - org.apache.kafka.common.errors (any Kafka common error)
*
* @param line a single line from the log or stack trace
* @return an Optional containing the detected Issue, or empty if no match
*/
@Override
public Optional<Issue> detect(String line) {
if (line == null) return Optional.empty();

if (line.contains("ProducerFencedException") ||
line.contains("CommitFailedException") ||
line.contains("WakeupException") ||
line.contains("org.apache.kafka.common.errors")) {
return Optional.of(new Issue(
getIssueType(),
getSeverity(),
"A Kafka producer or consumer error occurred. This is common in microservice stacks " +
"when there are issues with broker connectivity, consumer group coordination, or " +
"producer/consumer lifecycle management.",
List.of(
"Check that Kafka broker is running and accessible from the application",
"Verify consumer group ID configuration and avoid duplicate consumers joining the group",
"Ensure producer/consumer are properly closed during application shutdown",
"Check for network issues or incorrect broker addresses in configuration",
"Verify that the topic exists and the application has permission to produce/consume"
),
line.trim()
));
}

return Optional.empty();
}
}
47 changes: 47 additions & 0 deletions src/test/java/com/stacklens/detector/AllDetectorsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,4 +247,51 @@ void transactionErrorDetector_doesNotMatchCleanLine() {
IssueDetector detector = new TransactionErrorDetector();
assertFalse(detector.detect(CLEAN_LINE).isPresent());
}

@Test
void kafkaErrorDetector_detectsProducerFencedException() {
IssueDetector detector = new KafkaErrorDetector();
String line = "org.apache.kafka.common.errors.ProducerFencedException: Producer is fenced";

Optional<Issue> result = detector.detect(line);

assertTrue(result.isPresent());
assertEquals("KafkaError", result.get().getType());
}

@Test
void kafkaErrorDetector_detectsCommitFailedException() {
IssueDetector detector = new KafkaErrorDetector();
String line = "org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed";

Optional<Issue> result = detector.detect(line);

assertTrue(result.isPresent());
}

@Test
void kafkaErrorDetector_detectsWakeupException() {
IssueDetector detector = new KafkaErrorDetector();
String line = "org.apache.kafka.common.errors.WakeupException: Kafka consumer wakeup";

Optional<Issue> result = detector.detect(line);

assertTrue(result.isPresent());
}

@Test
void kafkaErrorDetector_detectsKafkaCommonErrors() {
IssueDetector detector = new KafkaErrorDetector();
String line = "org.apache.kafka.common.errors.NetworkException: Connection to node failed";

Optional<Issue> result = detector.detect(line);

assertTrue(result.isPresent());
}

@Test
void kafkaErrorDetector_doesNotMatchCleanLine() {
IssueDetector detector = new KafkaErrorDetector();
assertFalse(detector.detect(CLEAN_LINE).isPresent());
}
}
Loading