Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ OTEL_RESOURCE_ATTRIBUTES=environment=development,cluster=local
- `klag.hot_partition` - Partition throughput × 100 when statistically high (outlier)

**Time-Based Lag Metrics:**
- `klag.consumer.lag.ms` - Lag in milliseconds using interpolation from recorded offset/timestamp history. For each partition, records `(logEndOffset, systemTime)` pairs at each poll interval, then interpolates the timestamp for the committed offset to determine how old unconsumed messages are. Formula: `lag_ms = currentTime - interpolatedTimestamp`. Note: Requires 2 poll intervals (warmup) before data is available.
- `klag.consumer.lag.ms` - Lag in milliseconds (`lag_ms = currentTime - committedMessageTimestamp`). **Primary:** linear interpolation between Kafka `listOffsets` log start/end timestamps and offsets. **Fallback:** poll-time `(logEndOffset, systemTime)` history when Kafka timestamps are invalid (e.g. `logStartTimestamp=0`); requires 2+ poll intervals and does not extrapolate beyond the oldest retained sample (`TIME_LAG_INTERPOLATION_BUFFER_SIZE`).
- `klag.consumer.lag.time_to_close_seconds` - Estimated seconds until lag reaches zero (only when catching up and lag > threshold)

**Data Loss Prevention (DLP) Metrics:**
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Metrics available at `http://localhost:8888/metrics`
|--------|----------------------------------------------------------------------|
| `klag.consumer.lag` | Current lag per partition (also `.sum`, `.max`, `.min` aggregations) |
| `klag.consumer.lag.velocity` | Rate of change — positive means falling behind |
| `klag.consumer.lag.ms` | Lag in milliseconds based on message timestamps |
| `klag.consumer.lag.ms` | Lag in ms from Kafka log timestamps; poll-history fallback when unavailable |
| `klag.consumer.lag.time_to_close_seconds` | Estimated seconds until lag reaches zero (only when catching up) |
| `klag.consumer.lag.retention_percent` | Lag as percentage of available messages. Use to prevent data loss. |
| `klag.consumer.group.state` | Group health: Stable, Rebalancing, Dead, Empty |
Expand Down
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ plugins {
}

group = "io.github.themoah"
version = "0.1.12"
version = "0.1.13"

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,20 +151,26 @@ public Future<List<PartitionOffsets>> getLogEndOffsets(String topic) {
long logStartOffset = earliestTimestampOffsets.get(tp).getOffset();
long logStartTimestamp = earliestTimestampOffsets.get(tp).getTimestamp();

// Get latest offset and timestamp - prefer MAX_TIMESTAMP if valid
ListOffsetsResultInfo maxTimestampResult = maxTimestampOffsets.get(tp);
ListOffsetsResultInfo latestOffsetResult = latestOffsets.get(tp);

long logEndOffset;
// logEndOffset is ALWAYS the true end-of-log (LATEST). This is the boundary
// for lag = logEndOffset - committedOffset, throughput, and retention.
long logEndOffset = latestOffsetResult.getOffset();

// The timestamp anchor comes from MAX_TIMESTAMP (offset of the highest-timestamp
// record). This offset can be < logEndOffset, so carry it separately rather than
// overwriting logEndOffset, otherwise interpolation anchors and lag disagree.
long logEndTimestamp;
long maxTimestampOffset;

if (maxTimestampResult.getOffset() >= 0 && maxTimestampResult.getTimestamp() > 0) {
// MAX_TIMESTAMP returned valid offset and timestamp
logEndOffset = maxTimestampResult.getOffset();
// MAX_TIMESTAMP returned a valid offset/timestamp anchor
maxTimestampOffset = maxTimestampResult.getOffset();
logEndTimestamp = maxTimestampResult.getTimestamp();
} else {
// MAX_TIMESTAMP failed (pre-3.0 broker or no timestamps) - use LATEST
logEndOffset = latestOffsetResult.getOffset();
// MAX_TIMESTAMP failed (pre-3.0 broker or no timestamps) - fall back to LATEST
maxTimestampOffset = latestOffsetResult.getOffset();
logEndTimestamp = latestOffsetResult.getTimestamp();
}

Expand All @@ -176,16 +182,16 @@ public Future<List<PartitionOffsets>> getLogEndOffsets(String topic) {

// Log first partition's timestamps at INFO level for debugging
if (!loggedSampleTimestamp) {
log.info("Topic {} sample timestamps: partition {} logStart={} (ts={}), logEnd={} (ts={})",
topic, partition.partition(), logStartOffset, logStartTimestamp, logEndOffset, logEndTimestamp);
log.info("Topic {} sample timestamps: partition {} logStart={} (ts={}), logEnd={}, maxTsOffset={} (ts={})",
topic, partition.partition(), logStartOffset, logStartTimestamp, logEndOffset, maxTimestampOffset, logEndTimestamp);
loggedSampleTimestamp = true;
} else {
log.debug("Topic {} partition {}: logStart={} (ts={}), logEnd={} (ts={})",
topic, partition.partition(), logStartOffset, logStartTimestamp, logEndOffset, logEndTimestamp);
log.debug("Topic {} partition {}: logStart={} (ts={}), logEnd={}, maxTsOffset={} (ts={})",
topic, partition.partition(), logStartOffset, logStartTimestamp, logEndOffset, maxTimestampOffset, logEndTimestamp);
}

result.add(new PartitionOffsets(topic, partition.partition(), logEndOffset, logStartOffset,
logEndTimestamp, logStartTimestamp));
logEndTimestamp, maxTimestampOffset, logStartTimestamp));
}

log.info("Retrieved offsets for {} partitions of topic {}", result.size(), topic);
Expand Down
58 changes: 19 additions & 39 deletions src/main/java/io/github/themoah/klag/metrics/MetricsCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.github.themoah.klag.kafka.KafkaClientService;
import io.github.themoah.klag.metrics.hotpartition.HotPartitionConfig;
import io.github.themoah.klag.metrics.hotpartition.HotPartitionDetector;
import io.github.themoah.klag.metrics.timelag.LagMsCalculator;
import io.github.themoah.klag.metrics.timelag.OffsetTimestampTracker;
import io.github.themoah.klag.metrics.timelag.TimeLagConfig;
import io.github.themoah.klag.metrics.timelag.TimeLagEstimator;
Expand Down Expand Up @@ -519,6 +520,7 @@ private ConsumerGroupLag buildConsumerGroupLag(
po.logEndOffset(),
po.logStartOffset(),
po.logEndTimestamp(),
po.maxTimestampOffset(),
po.logStartTimestamp(),
committedOffset
);
Expand Down Expand Up @@ -604,12 +606,11 @@ private List<RetentionRisk> calculateRetentionRisks(List<ConsumerGroupLag> lagDa
}

/**
* Calculates lag in milliseconds using interpolation from recorded offset/timestamp history.
* For each partition, records the current log end offset, then interpolates the timestamp
* for the committed offset to determine how old the unconsumed messages are.
* Calculates lag in milliseconds per consumer group and topic.
*
* <p>This approach uses system time instead of Kafka message timestamps, avoiding issues
* where Kafka doesn't provide logStartTimestamp (returns 0).
* <p>Primary path: linear interpolation between Kafka {@code listOffsets} log start/end
* timestamps. Fallback: poll-time {@code (logEndOffset, systemTime)} history when Kafka
* timestamps are unavailable; fallback does not extrapolate beyond the oldest retained sample.
*
* @param lagData list of consumer group lag data
* @return list of lag in milliseconds per consumer group and topic
Expand All @@ -620,53 +621,33 @@ private List<LagMs> calculateLagMs(List<ConsumerGroupLag> lagData) {
}

List<LagMs> lagMsList = new ArrayList<>();
int skippedDueToWarmup = 0;
int skippedPartitions = 0;
long currentTime = System.currentTimeMillis();
Set<String> trackedPartitions = new HashSet<>();

for (ConsumerGroupLag group : lagData) {
// First pass: record all partition offsets to the tracker
for (PartitionLag p : group.partitions()) {
offsetTimestampTracker.recordOffset(p.topic(), p.partition(), p.logEndOffset());
trackedPartitions.add(p.topic() + ":" + p.partition());
}

// Second pass: aggregate lag_ms by topic using max lag_ms across partitions
Map<String, TopicLagMsAggregates> topicAggregates = new HashMap<>();

for (PartitionLag p : group.partitions()) {
// Consumer is caught up - no lag
if (p.lag() <= 0) {
topicAggregates.computeIfAbsent(p.topic(), k -> new TopicLagMsAggregates())
.add(0, p.lag());
continue;
}

// Check if we have interpolation data for this partition
if (!offsetTimestampTracker.hasInterpolationData(p.topic(), p.partition())) {
skippedDueToWarmup++;
log.trace("Skipping lag_ms for {}:{}:{}: insufficient interpolation data (warmup)",
group.consumerGroup(), p.topic(), p.partition());
continue;
}

// Interpolate timestamp for the committed offset
var interpolatedTs = offsetTimestampTracker.getInterpolatedTimestamp(
p.topic(), p.partition(), p.committedOffset());
var lagMs = LagMsCalculator.estimatePartitionLagMs(p, offsetTimestampTracker, currentTime);

if (interpolatedTs.isPresent()) {
long lagMs = Math.max(0, currentTime - interpolatedTs.getAsLong());
if (lagMs.isPresent()) {
topicAggregates.computeIfAbsent(p.topic(), k -> new TopicLagMsAggregates())
.add(lagMs, p.lag());
log.trace("Partition {}:{}:{} lag_ms={} (committed={}, interpolated_ts={})",
group.consumerGroup(), p.topic(), p.partition(), lagMs,
p.committedOffset(), interpolatedTs.getAsLong());
} else {
skippedDueToWarmup++;
.add(lagMs.getAsLong(), p.lag());
log.trace("Partition {}:{}:{} lag_ms={} (committed={})",
group.consumerGroup(), p.topic(), p.partition(), lagMs.getAsLong(), p.committedOffset());
} else if (p.lag() > 0) {
skippedPartitions++;
log.trace("Skipping lag_ms for {}:{}:{}: no Kafka anchors and insufficient poll history",
group.consumerGroup(), p.topic(), p.partition());
}
}

// Create LagMs records for topics that have data
for (Map.Entry<String, TopicLagMsAggregates> entry : topicAggregates.entrySet()) {
String topic = entry.getKey();
TopicLagMsAggregates agg = entry.getValue();
Expand All @@ -679,14 +660,13 @@ private List<LagMs> calculateLagMs(List<ConsumerGroupLag> lagData) {
}
}

// Cleanup stale partitions
offsetTimestampTracker.cleanupStalePartitions(trackedPartitions);

if (skippedDueToWarmup > 0) {
log.debug("Skipped {} partitions due to warmup (insufficient interpolation data)", skippedDueToWarmup);
if (skippedPartitions > 0) {
log.debug("Skipped {} partitions with no lag_ms estimate", skippedPartitions);
}
if (!lagMsList.isEmpty()) {
log.info("Calculated lag_ms for {} consumer-group/topic pairs using interpolation", lagMsList.size());
log.info("Calculated lag_ms for {} consumer-group/topic pairs", lagMsList.size());
}

return lagMsList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public void reportHotPartitionThroughput(List<HotPartitionThroughput> hotPartiti
}

/**
* Reports lag in milliseconds based on actual message timestamps.
* Reports lag in milliseconds (Kafka timestamps or poll-history fallback).
*
* @param lagMsData list of lag in milliseconds data
* @param activeKeys set to populate with active gauge keys (can be null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.github.themoah.klag.metrics.timelag;

import io.github.themoah.klag.model.ConsumerGroupLag.PartitionLag;
import java.util.OptionalLong;

/**
* Estimates committed-offset message time from Kafka {@code listOffsets} anchor points.
*
* <p>Anchors are {@code (logStartOffset, logStartTimestamp)} and
* {@code (maxTimestampOffset, logEndTimestamp)}. The end anchor uses {@code maxTimestampOffset}
* (the offset of the MAX_TIMESTAMP record), NOT {@code logEndOffset} (the true end-of-log),
* because only the former is the offset that actually carries {@code logEndTimestamp}. The two
* can differ; conflating them would skip interpolation or skew the timestamp.
*/
public final class KafkaOffsetTimestampInterpolator {

private KafkaOffsetTimestampInterpolator() {}

/**
* Returns true when partition log boundaries have usable Kafka message timestamps
* and the committed offset lies within the log span (or at/after the timestamp anchor).
*/
public static boolean hasValidAnchors(PartitionLag partition) {
if (partition.logStartTimestamp() <= 0 || partition.logEndTimestamp() <= 0) {
return false;
}
if (partition.logEndTimestamp() < partition.logStartTimestamp()) {
return false;
}
if (partition.maxTimestampOffset() <= partition.logStartOffset()) {
return false;
}
if (partition.committedOffset() < partition.logStartOffset()) {
return false;
}
// Committed may sit between the max-timestamp record and the true end of the log;
// bound it by the true end offset, not the timestamp anchor.
return partition.committedOffset() <= partition.logEndOffset();
}

/**
* Interpolates the Kafka message timestamp for {@code committedOffset} between log boundaries.
*
* @return estimated message timestamp, or empty when anchors are invalid
*/
public static OptionalLong interpolateCommittedTimestamp(PartitionLag partition) {
if (!hasValidAnchors(partition)) {
return OptionalLong.empty();
}

// At or beyond the highest-timestamp record: the newest known timestamp applies.
if (partition.committedOffset() >= partition.maxTimestampOffset()) {
return OptionalLong.of(partition.logEndTimestamp());
}

return OptionalLong.of(linearInterpolate(
partition.logStartOffset(), partition.logStartTimestamp(),
partition.maxTimestampOffset(), partition.logEndTimestamp(),
partition.committedOffset()));
}

/**
* Estimates lag in milliseconds as {@code currentTime - committedMessageTimestamp}.
*/
public static OptionalLong estimateLagMs(PartitionLag partition, long currentTime) {
OptionalLong ts = interpolateCommittedTimestamp(partition);
if (ts.isEmpty()) {
return OptionalLong.empty();
}
return OptionalLong.of(Math.max(0, currentTime - ts.getAsLong()));
}

static long linearInterpolate(long o1, long t1, long o2, long t2, long targetOffset) {
if (o1 == o2) {
return t1;
}
double slope = (double) (t2 - t1) / (o2 - o1);
return Math.round(t1 + slope * (targetOffset - o1));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.github.themoah.klag.metrics.timelag;

import io.github.themoah.klag.model.ConsumerGroupLag.PartitionLag;
import java.util.OptionalLong;

/**
* Computes per-partition lag in milliseconds using Kafka timestamps when available,
* otherwise bounded poll-time interpolation history.
*/
public final class LagMsCalculator {

private LagMsCalculator() {}

/**
* Estimates lag in milliseconds for a partition.
*
* @param partition partition lag snapshot
* @param pollHistory poll-time offset history (may be null when time lag disabled)
* @param currentTime current wall-clock time in milliseconds
* @return lag in ms, or empty when no reliable estimate is available
*/
public static OptionalLong estimatePartitionLagMs(
PartitionLag partition,
OffsetTimestampTracker pollHistory,
long currentTime) {

if (partition.lag() <= 0) {
return OptionalLong.of(0);
}

var kafkaLagMs = KafkaOffsetTimestampInterpolator.estimateLagMs(partition, currentTime);
if (kafkaLagMs.isPresent()) {
return kafkaLagMs;
}

if (pollHistory == null) {
return OptionalLong.empty();
}

if (!pollHistory.hasInterpolationData(partition.topic(), partition.partition())) {
return OptionalLong.empty();
}

var oldestOffset = pollHistory.getOldestOffset(partition.topic(), partition.partition());
if (oldestOffset.isEmpty() || partition.committedOffset() < oldestOffset.getAsLong()) {
return OptionalLong.empty();
}

OptionalLong ts = pollHistory.getInterpolatedTimestamp(
partition.topic(), partition.partition(), partition.committedOffset());
if (ts.isEmpty()) {
return OptionalLong.empty();
}
return OptionalLong.of(Math.max(0, currentTime - ts.getAsLong()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,22 @@ public void cleanupStalePartitions(Set<String> activeKeys) {
}
}

/**
* Returns the oldest retained offset for a partition, if available.
*
* @param topic the topic name
* @param partition the partition number
* @return the oldest offset in poll history, or empty if no history exists
*/
public OptionalLong getOldestOffset(String topic, int partition) {
String key = makeKey(topic, partition);
PartitionOffsetHistory history = histories.get(key);
if (history == null) {
return OptionalLong.empty();
}
return history.getOldestOffset();
}

/**
* Returns the latest recorded offset for a partition, if available.
*
Expand Down
Loading
Loading