From 377b4678ee4c055aec1803dc9265878aa1a91b3f Mon Sep 17 00:00:00 2001 From: Aviv Dozorets Date: Wed, 27 May 2026 09:56:15 +0300 Subject: [PATCH 1/3] fix: use Kafka log timestamps for lag_ms (closes #30) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit klag_consumer_lag_ms previously plateaued at ~30 minutes. Root cause: lag_ms was derived solely from a poll-time (logEndOffset, systemTime) ring buffer with bounded retention (TIME_LAG_INTERPOLATION_BUFFER_SIZE × METRICS_INTERVAL_MS), and PartitionOffsetHistory back-extrapolated using the two oldest points when the committed offset fell behind the window — producing a fictional ceiling near the buffer's time span. Fix introduces a two-tier estimation in LagMsCalculator: - Primary: KafkaOffsetTimestampInterpolator linearly interpolates committed-offset timestamp between Kafka listOffsets log start/end anchors (already populated via OffsetSpec.earliest/maxTimestamp/latest in KafkaClientServiceImpl). No ceiling — reports true lag in ms for arbitrarily old commits. - Fallback: poll-time interpolation, used only when Kafka anchors are invalid (logStart/EndTimestamp ≤ 0 or committed outside log range). Now strictly refuses to extrapolate beyond the oldest retained sample. MetricsCollector.calculateLagMs delegates per-partition estimation to LagMsCalculator, still records poll offsets so the fallback stays warm. Bumps version to 0.1.11. Co-Authored-By: Claude Opus 4.7 --- CLAUDE.md | 2 +- README.md | 2 +- build.gradle.kts | 2 +- .../klag/metrics/MetricsCollector.java | 57 ++++------- .../klag/metrics/MicrometerReporter.java | 2 +- .../KafkaOffsetTimestampInterpolator.java | 72 ++++++++++++++ .../klag/metrics/timelag/LagMsCalculator.java | 56 +++++++++++ .../timelag/OffsetTimestampTracker.java | 16 ++++ .../timelag/PartitionOffsetHistory.java | 27 +++--- .../metrics/timelag/TimeLagEstimator.java | 4 +- .../io/github/themoah/klag/model/LagMs.java | 6 +- .../KafkaOffsetTimestampInterpolatorTest.java | 94 +++++++++++++++++++ .../metrics/timelag/LagMsCalculatorTest.java | 71 ++++++++++++++ .../timelag/PartitionOffsetHistoryTest.java | 26 ++--- 14 files changed, 357 insertions(+), 80 deletions(-) create mode 100644 src/main/java/io/github/themoah/klag/metrics/timelag/KafkaOffsetTimestampInterpolator.java create mode 100644 src/main/java/io/github/themoah/klag/metrics/timelag/LagMsCalculator.java create mode 100644 src/test/java/io/github/themoah/klag/metrics/timelag/KafkaOffsetTimestampInterpolatorTest.java create mode 100644 src/test/java/io/github/themoah/klag/metrics/timelag/LagMsCalculatorTest.java diff --git a/CLAUDE.md b/CLAUDE.md index 20c58a2..2848760 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -113,7 +113,7 @@ OTEL_RESOURCE_ATTRIBUTES=environment=development,cluster=local - `klag.hot_partition` - Partition throughput × 100 when statistically high (outlier) **Time-Based Lag Metrics:** -- `klag.consumer.lag.ms` - Lag in milliseconds using interpolation from recorded offset/timestamp history. For each partition, records `(logEndOffset, systemTime)` pairs at each poll interval, then interpolates the timestamp for the committed offset to determine how old unconsumed messages are. Formula: `lag_ms = currentTime - interpolatedTimestamp`. Note: Requires 2 poll intervals (warmup) before data is available. +- `klag.consumer.lag.ms` - Lag in milliseconds (`lag_ms = currentTime - committedMessageTimestamp`). **Primary:** linear interpolation between Kafka `listOffsets` log start/end timestamps and offsets. **Fallback:** poll-time `(logEndOffset, systemTime)` history when Kafka timestamps are invalid (e.g. `logStartTimestamp=0`); requires 2+ poll intervals and does not extrapolate beyond the oldest retained sample (`TIME_LAG_INTERPOLATION_BUFFER_SIZE`). - `klag.consumer.lag.time_to_close_seconds` - Estimated seconds until lag reaches zero (only when catching up and lag > threshold) **Data Loss Prevention (DLP) Metrics:** diff --git a/README.md b/README.md index 16d0c5f..e1baea6 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,7 @@ Metrics available at `http://localhost:8888/metrics` |--------|----------------------------------------------------------------------| | `klag.consumer.lag` | Current lag per partition (also `.sum`, `.max`, `.min` aggregations) | | `klag.consumer.lag.velocity` | Rate of change — positive means falling behind | -| `klag.consumer.lag.ms` | Lag in milliseconds based on message timestamps | +| `klag.consumer.lag.ms` | Lag in ms from Kafka log timestamps; poll-history fallback when unavailable | | `klag.consumer.lag.time_to_close_seconds` | Estimated seconds until lag reaches zero (only when catching up) | | `klag.consumer.lag.retention_percent` | Lag as percentage of available messages. Use to prevent data loss. | | `klag.consumer.group.state` | Group health: Stable, Rebalancing, Dead, Empty | diff --git a/build.gradle.kts b/build.gradle.kts index 2357a22..d3604e3 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -8,7 +8,7 @@ plugins { } group = "io.github.themoah" -version = "0.1.10" +version = "0.1.11" repositories { mavenCentral() diff --git a/src/main/java/io/github/themoah/klag/metrics/MetricsCollector.java b/src/main/java/io/github/themoah/klag/metrics/MetricsCollector.java index dfd4ed2..dd1ea4a 100644 --- a/src/main/java/io/github/themoah/klag/metrics/MetricsCollector.java +++ b/src/main/java/io/github/themoah/klag/metrics/MetricsCollector.java @@ -5,6 +5,7 @@ import io.github.themoah.klag.kafka.KafkaClientService; import io.github.themoah.klag.metrics.hotpartition.HotPartitionConfig; import io.github.themoah.klag.metrics.hotpartition.HotPartitionDetector; +import io.github.themoah.klag.metrics.timelag.LagMsCalculator; import io.github.themoah.klag.metrics.timelag.OffsetTimestampTracker; import io.github.themoah.klag.metrics.timelag.TimeLagConfig; import io.github.themoah.klag.metrics.timelag.TimeLagEstimator; @@ -618,12 +619,11 @@ private List calculateRetentionRisks(List lagDa } /** - * Calculates lag in milliseconds using interpolation from recorded offset/timestamp history. - * For each partition, records the current log end offset, then interpolates the timestamp - * for the committed offset to determine how old the unconsumed messages are. + * Calculates lag in milliseconds per consumer group and topic. * - *

This approach uses system time instead of Kafka message timestamps, avoiding issues - * where Kafka doesn't provide logStartTimestamp (returns 0). + *

Primary path: linear interpolation between Kafka {@code listOffsets} log start/end + * timestamps. Fallback: poll-time {@code (logEndOffset, systemTime)} history when Kafka + * timestamps are unavailable; fallback does not extrapolate beyond the oldest retained sample. * * @param lagData list of consumer group lag data * @return list of lag in milliseconds per consumer group and topic @@ -634,53 +634,33 @@ private List calculateLagMs(List lagData) { } List lagMsList = new ArrayList<>(); - int skippedDueToWarmup = 0; + int skippedPartitions = 0; long currentTime = System.currentTimeMillis(); Set trackedPartitions = new HashSet<>(); for (ConsumerGroupLag group : lagData) { - // First pass: record all partition offsets to the tracker for (PartitionLag p : group.partitions()) { offsetTimestampTracker.recordOffset(p.topic(), p.partition(), p.logEndOffset()); trackedPartitions.add(p.topic() + ":" + p.partition()); } - // Second pass: aggregate lag_ms by topic using max lag_ms across partitions Map topicAggregates = new HashMap<>(); for (PartitionLag p : group.partitions()) { - // Consumer is caught up - no lag - if (p.lag() <= 0) { - topicAggregates.computeIfAbsent(p.topic(), k -> new TopicLagMsAggregates()) - .add(0, p.lag()); - continue; - } - - // Check if we have interpolation data for this partition - if (!offsetTimestampTracker.hasInterpolationData(p.topic(), p.partition())) { - skippedDueToWarmup++; - log.trace("Skipping lag_ms for {}:{}:{}: insufficient interpolation data (warmup)", - group.consumerGroup(), p.topic(), p.partition()); - continue; - } - - // Interpolate timestamp for the committed offset - var interpolatedTs = offsetTimestampTracker.getInterpolatedTimestamp( - p.topic(), p.partition(), p.committedOffset()); + var lagMs = LagMsCalculator.estimatePartitionLagMs(p, offsetTimestampTracker, currentTime); - if (interpolatedTs.isPresent()) { - long lagMs = Math.max(0, currentTime - interpolatedTs.getAsLong()); + if (lagMs.isPresent()) { topicAggregates.computeIfAbsent(p.topic(), k -> new TopicLagMsAggregates()) - .add(lagMs, p.lag()); - log.trace("Partition {}:{}:{} lag_ms={} (committed={}, interpolated_ts={})", - group.consumerGroup(), p.topic(), p.partition(), lagMs, - p.committedOffset(), interpolatedTs.getAsLong()); - } else { - skippedDueToWarmup++; + .add(lagMs.getAsLong(), p.lag()); + log.trace("Partition {}:{}:{} lag_ms={} (committed={})", + group.consumerGroup(), p.topic(), p.partition(), lagMs.getAsLong(), p.committedOffset()); + } else if (p.lag() > 0) { + skippedPartitions++; + log.trace("Skipping lag_ms for {}:{}:{}: no Kafka anchors and insufficient poll history", + group.consumerGroup(), p.topic(), p.partition()); } } - // Create LagMs records for topics that have data for (Map.Entry entry : topicAggregates.entrySet()) { String topic = entry.getKey(); TopicLagMsAggregates agg = entry.getValue(); @@ -693,14 +673,13 @@ private List calculateLagMs(List lagData) { } } - // Cleanup stale partitions offsetTimestampTracker.cleanupStalePartitions(trackedPartitions); - if (skippedDueToWarmup > 0) { - log.debug("Skipped {} partitions due to warmup (insufficient interpolation data)", skippedDueToWarmup); + if (skippedPartitions > 0) { + log.debug("Skipped {} partitions with no lag_ms estimate", skippedPartitions); } if (!lagMsList.isEmpty()) { - log.info("Calculated lag_ms for {} consumer-group/topic pairs using interpolation", lagMsList.size()); + log.info("Calculated lag_ms for {} consumer-group/topic pairs", lagMsList.size()); } return lagMsList; diff --git a/src/main/java/io/github/themoah/klag/metrics/MicrometerReporter.java b/src/main/java/io/github/themoah/klag/metrics/MicrometerReporter.java index 6be5072..ccaabaf 100644 --- a/src/main/java/io/github/themoah/klag/metrics/MicrometerReporter.java +++ b/src/main/java/io/github/themoah/klag/metrics/MicrometerReporter.java @@ -200,7 +200,7 @@ public void reportHotPartitionThroughput(List hotPartiti } /** - * Reports lag in milliseconds based on actual message timestamps. + * Reports lag in milliseconds (Kafka timestamps or poll-history fallback). * * @param lagMsData list of lag in milliseconds data * @param activeKeys set to populate with active gauge keys (can be null) diff --git a/src/main/java/io/github/themoah/klag/metrics/timelag/KafkaOffsetTimestampInterpolator.java b/src/main/java/io/github/themoah/klag/metrics/timelag/KafkaOffsetTimestampInterpolator.java new file mode 100644 index 0000000..e4f7377 --- /dev/null +++ b/src/main/java/io/github/themoah/klag/metrics/timelag/KafkaOffsetTimestampInterpolator.java @@ -0,0 +1,72 @@ +package io.github.themoah.klag.metrics.timelag; + +import io.github.themoah.klag.model.ConsumerGroupLag.PartitionLag; +import java.util.OptionalLong; + +/** + * Estimates committed-offset message time from Kafka {@code listOffsets} anchor points + * ({@code logStartOffset/logStartTimestamp} and {@code logEndOffset/logEndTimestamp}). + */ +public final class KafkaOffsetTimestampInterpolator { + + private KafkaOffsetTimestampInterpolator() {} + + /** + * Returns true when partition log boundaries have usable Kafka message timestamps + * and the committed offset lies within the log span (or at log end). + */ + public static boolean hasValidAnchors(PartitionLag partition) { + if (partition.logStartTimestamp() <= 0 || partition.logEndTimestamp() <= 0) { + return false; + } + if (partition.logEndTimestamp() < partition.logStartTimestamp()) { + return false; + } + if (partition.logEndOffset() <= partition.logStartOffset()) { + return false; + } + if (partition.committedOffset() < partition.logStartOffset()) { + return false; + } + return partition.committedOffset() <= partition.logEndOffset(); + } + + /** + * Interpolates the Kafka message timestamp for {@code committedOffset} between log boundaries. + * + * @return estimated message timestamp, or empty when anchors are invalid + */ + public static OptionalLong interpolateCommittedTimestamp(PartitionLag partition) { + if (!hasValidAnchors(partition)) { + return OptionalLong.empty(); + } + + if (partition.committedOffset() >= partition.logEndOffset()) { + return OptionalLong.of(partition.logEndTimestamp()); + } + + return OptionalLong.of(linearInterpolate( + partition.logStartOffset(), partition.logStartTimestamp(), + partition.logEndOffset(), partition.logEndTimestamp(), + partition.committedOffset())); + } + + /** + * Estimates lag in milliseconds as {@code currentTime - committedMessageTimestamp}. + */ + public static OptionalLong estimateLagMs(PartitionLag partition, long currentTime) { + OptionalLong ts = interpolateCommittedTimestamp(partition); + if (ts.isEmpty()) { + return OptionalLong.empty(); + } + return OptionalLong.of(Math.max(0, currentTime - ts.getAsLong())); + } + + static long linearInterpolate(long o1, long t1, long o2, long t2, long targetOffset) { + if (o1 == o2) { + return t1; + } + double slope = (double) (t2 - t1) / (o2 - o1); + return Math.round(t1 + slope * (targetOffset - o1)); + } +} diff --git a/src/main/java/io/github/themoah/klag/metrics/timelag/LagMsCalculator.java b/src/main/java/io/github/themoah/klag/metrics/timelag/LagMsCalculator.java new file mode 100644 index 0000000..a4bf9cb --- /dev/null +++ b/src/main/java/io/github/themoah/klag/metrics/timelag/LagMsCalculator.java @@ -0,0 +1,56 @@ +package io.github.themoah.klag.metrics.timelag; + +import io.github.themoah.klag.model.ConsumerGroupLag.PartitionLag; +import java.util.OptionalLong; + +/** + * Computes per-partition lag in milliseconds using Kafka timestamps when available, + * otherwise bounded poll-time interpolation history. + */ +public final class LagMsCalculator { + + private LagMsCalculator() {} + + /** + * Estimates lag in milliseconds for a partition. + * + * @param partition partition lag snapshot + * @param pollHistory poll-time offset history (may be null when time lag disabled) + * @param currentTime current wall-clock time in milliseconds + * @return lag in ms, or empty when no reliable estimate is available + */ + public static OptionalLong estimatePartitionLagMs( + PartitionLag partition, + OffsetTimestampTracker pollHistory, + long currentTime) { + + if (partition.lag() <= 0) { + return OptionalLong.of(0); + } + + var kafkaLagMs = KafkaOffsetTimestampInterpolator.estimateLagMs(partition, currentTime); + if (kafkaLagMs.isPresent()) { + return kafkaLagMs; + } + + if (pollHistory == null) { + return OptionalLong.empty(); + } + + if (!pollHistory.hasInterpolationData(partition.topic(), partition.partition())) { + return OptionalLong.empty(); + } + + var oldestOffset = pollHistory.getOldestOffset(partition.topic(), partition.partition()); + if (oldestOffset.isEmpty() || partition.committedOffset() < oldestOffset.getAsLong()) { + return OptionalLong.empty(); + } + + OptionalLong ts = pollHistory.getInterpolatedTimestamp( + partition.topic(), partition.partition(), partition.committedOffset()); + if (ts.isEmpty()) { + return OptionalLong.empty(); + } + return OptionalLong.of(Math.max(0, currentTime - ts.getAsLong())); + } +} diff --git a/src/main/java/io/github/themoah/klag/metrics/timelag/OffsetTimestampTracker.java b/src/main/java/io/github/themoah/klag/metrics/timelag/OffsetTimestampTracker.java index 4c0d855..b6f466b 100644 --- a/src/main/java/io/github/themoah/klag/metrics/timelag/OffsetTimestampTracker.java +++ b/src/main/java/io/github/themoah/klag/metrics/timelag/OffsetTimestampTracker.java @@ -119,6 +119,22 @@ public void cleanupStalePartitions(Set activeKeys) { } } + /** + * Returns the oldest retained offset for a partition, if available. + * + * @param topic the topic name + * @param partition the partition number + * @return the oldest offset in poll history, or empty if no history exists + */ + public OptionalLong getOldestOffset(String topic, int partition) { + String key = makeKey(topic, partition); + PartitionOffsetHistory history = histories.get(key); + if (history == null) { + return OptionalLong.empty(); + } + return history.getOldestOffset(); + } + /** * Returns the latest recorded offset for a partition, if available. * diff --git a/src/main/java/io/github/themoah/klag/metrics/timelag/PartitionOffsetHistory.java b/src/main/java/io/github/themoah/klag/metrics/timelag/PartitionOffsetHistory.java index 063a666..03c4c22 100644 --- a/src/main/java/io/github/themoah/klag/metrics/timelag/PartitionOffsetHistory.java +++ b/src/main/java/io/github/themoah/klag/metrics/timelag/PartitionOffsetHistory.java @@ -10,11 +10,11 @@ /** * Per-partition interpolation table for converting message offsets to timestamps. - * Records (logEndOffset, systemTimestamp) pairs at each poll interval. + * Records (logEndOffset, systemTimestamp) pairs at each poll interval for fallback + * lag.ms estimation when Kafka log timestamps are unavailable. * - *

This enables interpolation/extrapolation to estimate when a given offset - * was produced, which is used to calculate lag in milliseconds without relying - * on Kafka message timestamps (which are often unavailable). + *

Interpolates only within the retained sample window; does not extrapolate + * backward beyond the oldest sample. */ public class PartitionOffsetHistory { @@ -75,7 +75,6 @@ public OptionalLong interpolateTimestamp(long targetOffset) { } List pointsList = new ArrayList<>(points); - OffsetTimestampPoint oldest = pointsList.get(0); OffsetTimestampPoint latest = pointsList.get(pointsList.size() - 1); // Consumer is caught up or ahead @@ -103,13 +102,7 @@ public OptionalLong interpolateTimestamp(long targetOffset) { return OptionalLong.of(linearInterpolate(lower, upper, targetOffset)); } - // Target offset is older than our oldest point - extrapolate backward - if (targetOffset < oldest.offset() && pointsList.size() >= 2) { - OffsetTimestampPoint p1 = pointsList.get(0); - OffsetTimestampPoint p2 = pointsList.get(1); - return OptionalLong.of(linearInterpolate(p1, p2, targetOffset)); - } - + // Target offset is older than retained history — do not extrapolate backward return OptionalLong.empty(); } @@ -175,4 +168,14 @@ public OptionalLong getLatestOffset() { } return OptionalLong.of(points.getLast().offset()); } + + /** + * Returns the oldest retained offset, or empty if no points exist. + */ + public OptionalLong getOldestOffset() { + if (points.isEmpty()) { + return OptionalLong.empty(); + } + return OptionalLong.of(points.getFirst().offset()); + } } diff --git a/src/main/java/io/github/themoah/klag/metrics/timelag/TimeLagEstimator.java b/src/main/java/io/github/themoah/klag/metrics/timelag/TimeLagEstimator.java index 5bd30da..acb8c7f 100644 --- a/src/main/java/io/github/themoah/klag/metrics/timelag/TimeLagEstimator.java +++ b/src/main/java/io/github/themoah/klag/metrics/timelag/TimeLagEstimator.java @@ -16,8 +16,8 @@ *

  • Time to close (seconds) - estimated time until lag reaches zero (only when catching up)
  • * * - *

    Note: Lag in milliseconds (klag.consumer.lag.ms) is now calculated directly from - * Kafka message timestamps in MetricsCollector, not from velocity. + *

    Note: Lag in milliseconds ({@code klag.consumer.lag.ms}) is calculated in + * {@link LagMsCalculator} / {@link MetricsCollector}, not from velocity. */ public class TimeLagEstimator { diff --git a/src/main/java/io/github/themoah/klag/model/LagMs.java b/src/main/java/io/github/themoah/klag/model/LagMs.java index bb1413c..409003c 100644 --- a/src/main/java/io/github/themoah/klag/model/LagMs.java +++ b/src/main/java/io/github/themoah/klag/model/LagMs.java @@ -1,13 +1,13 @@ package io.github.themoah.klag.model; /** - * Lag in milliseconds for a consumer group and topic. - * Calculated from actual Kafka message timestamps. + * Lag in milliseconds for a consumer group and topic (max across partitions). + * Uses Kafka log start/end timestamps when available; otherwise bounded poll-time estimation. * * @param consumerGroup the consumer group ID * @param topic the topic name * @param lagMessages the current lag in messages - * @param lagMs the lag in milliseconds (based on message timestamps) + * @param lagMs the lag in milliseconds */ public record LagMs( String consumerGroup, diff --git a/src/test/java/io/github/themoah/klag/metrics/timelag/KafkaOffsetTimestampInterpolatorTest.java b/src/test/java/io/github/themoah/klag/metrics/timelag/KafkaOffsetTimestampInterpolatorTest.java new file mode 100644 index 0000000..52191be --- /dev/null +++ b/src/test/java/io/github/themoah/klag/metrics/timelag/KafkaOffsetTimestampInterpolatorTest.java @@ -0,0 +1,94 @@ +package io.github.themoah.klag.metrics.timelag; + +import static org.junit.jupiter.api.Assertions.*; + +import io.github.themoah.klag.model.ConsumerGroupLag.PartitionLag; +import java.util.OptionalLong; +import org.junit.jupiter.api.Test; + +public class KafkaOffsetTimestampInterpolatorTest { + + private static PartitionLag partition( + long logStartOffset, long logStartTs, + long logEndOffset, long logEndTs, + long committedOffset) { + return PartitionLag.of("topic", 0, logEndOffset, logStartOffset, logEndTs, logStartTs, committedOffset); + } + + @Test + void hasValidAnchors_returnsTrue_forNormalPartition() { + assertTrue(KafkaOffsetTimestampInterpolator.hasValidAnchors( + partition(0, 1_000_000L, 10_000, 8_200_000L, 2_000))); + } + + @Test + void hasValidAnchors_returnsFalse_whenTimestampsInvalid() { + assertFalse(KafkaOffsetTimestampInterpolator.hasValidAnchors( + partition(0, 0, 10_000, 8_200_000L, 2_000))); + assertFalse(KafkaOffsetTimestampInterpolator.hasValidAnchors( + partition(0, 1_000_000L, 10_000, -1, 2_000))); + } + + @Test + void hasValidAnchors_returnsFalse_whenOffsetSpanDegenerate() { + assertFalse(KafkaOffsetTimestampInterpolator.hasValidAnchors( + partition(100, 1_000_000L, 100, 1_000_000L, 100))); + } + + @Test + void hasValidAnchors_returnsFalse_whenCommittedBeforeLogStart() { + assertFalse(KafkaOffsetTimestampInterpolator.hasValidAnchors( + partition(100, 1_000_000L, 10_000, 8_200_000L, 50))); + } + + @Test + void hasValidAnchors_returnsFalse_whenCommittedAfterLogEnd() { + assertFalse(KafkaOffsetTimestampInterpolator.hasValidAnchors( + partition(0, 1_000_000L, 10_000, 8_200_000L, 10_001))); + } + + @Test + void estimateLagMs_reportsBeyondThirtyMinutes() { + long twoHoursMs = 2 * 60 * 60 * 1000L; + long logStartTs = 1_000_000L; + long logEndTs = logStartTs + twoHoursMs; + long currentTime = logEndTs + 60_000L; + // committed near log start → message ~2h old + PartitionLag p = partition(0, logStartTs, 10_000, logEndTs, 100); + + OptionalLong lagMs = KafkaOffsetTimestampInterpolator.estimateLagMs(p, currentTime); + + assertTrue(lagMs.isPresent()); + assertTrue(lagMs.getAsLong() > 1_800_000L, + "lag_ms should exceed 30 minutes when Kafka timestamps span 2 hours"); + } + + @Test + void interpolateCommittedTimestamp_interpolatesMidpoint() { + PartitionLag p = partition(0, 1_000L, 1000, 11_000L, 500); + + OptionalLong ts = KafkaOffsetTimestampInterpolator.interpolateCommittedTimestamp(p); + + assertTrue(ts.isPresent()); + assertEquals(6_000L, ts.getAsLong()); + } + + @Test + void estimateLagMs_returnsZero_whenCaughtUpAtLogEnd() { + long logEndTs = 8_000_000L; + long currentTime = logEndTs + 5_000L; + PartitionLag p = partition(0, 1_000_000L, 10_000, logEndTs, 10_000); + + OptionalLong lagMs = KafkaOffsetTimestampInterpolator.estimateLagMs(p, currentTime); + + assertTrue(lagMs.isPresent()); + assertEquals(5_000L, lagMs.getAsLong()); + } + + @Test + void estimateLagMs_returnsEmpty_whenAnchorsInvalid() { + PartitionLag p = partition(0, 0, 10_000, 8_200_000L, 2_000); + + assertTrue(KafkaOffsetTimestampInterpolator.estimateLagMs(p, System.currentTimeMillis()).isEmpty()); + } +} diff --git a/src/test/java/io/github/themoah/klag/metrics/timelag/LagMsCalculatorTest.java b/src/test/java/io/github/themoah/klag/metrics/timelag/LagMsCalculatorTest.java new file mode 100644 index 0000000..8eadf27 --- /dev/null +++ b/src/test/java/io/github/themoah/klag/metrics/timelag/LagMsCalculatorTest.java @@ -0,0 +1,71 @@ +package io.github.themoah.klag.metrics.timelag; + +import static org.junit.jupiter.api.Assertions.*; + +import io.github.themoah.klag.model.ConsumerGroupLag.PartitionLag; +import java.util.OptionalLong; +import org.junit.jupiter.api.Test; + +public class LagMsCalculatorTest { + + private static PartitionLag partition( + long logStartOffset, long logStartTs, + long logEndOffset, long logEndTs, + long committedOffset) { + return PartitionLag.of("topic", 0, logEndOffset, logStartOffset, logEndTs, logStartTs, committedOffset); + } + + @Test + void estimatePartitionLagMs_usesKafkaPath_beyondThirtyMinutes() { + long twoHoursMs = 2 * 60 * 60 * 1000L; + long logStartTs = 1_000_000L; + long logEndTs = logStartTs + twoHoursMs; + long currentTime = logEndTs + 30_000L; + PartitionLag p = partition(0, logStartTs, 10_000, logEndTs, 100); + + OptionalLong lagMs = LagMsCalculator.estimatePartitionLagMs(p, null, currentTime); + + assertTrue(lagMs.isPresent()); + assertTrue(lagMs.getAsLong() > 1_800_000L); + } + + @Test + void estimatePartitionLagMs_omitsWhenFallbackCommittedBeforeOldestSample() { + OffsetTimestampTracker tracker = new OffsetTimestampTracker(60, 180_000); + long now = 10_000_000L; + tracker.recordOffset("topic", 0, 9000, now - 30_000); + tracker.recordOffset("topic", 0, 9500, now - 15_000); + + PartitionLag p = partition(0, 0, 10_000, 0, 100); + + OptionalLong lagMs = LagMsCalculator.estimatePartitionLagMs(p, tracker, now); + + assertTrue(lagMs.isEmpty(), "Invalid Kafka timestamps and committed before poll history"); + } + + @Test + void estimatePartitionLagMs_usesFallbackWhenKafkaTimestampsMissing() { + OffsetTimestampTracker tracker = new OffsetTimestampTracker(60, 180_000); + long t0 = 1_000_000L; + tracker.recordOffset("topic", 0, 1000, t0); + tracker.recordOffset("topic", 0, 2000, t0 + 60_000); + + PartitionLag p = partition(0, 0, 2000, 0, 1500); + long currentTime = t0 + 90_000; + + OptionalLong lagMs = LagMsCalculator.estimatePartitionLagMs(p, tracker, currentTime); + + assertTrue(lagMs.isPresent()); + assertEquals(60_000L, lagMs.getAsLong()); + } + + @Test + void estimatePartitionLagMs_returnsZeroWhenCaughtUp() { + PartitionLag p = partition(0, 1_000_000L, 10_000, 8_000_000L, 10_000); + + OptionalLong lagMs = LagMsCalculator.estimatePartitionLagMs(p, null, 8_100_000L); + + assertTrue(lagMs.isPresent()); + assertEquals(0, lagMs.getAsLong()); + } +} diff --git a/src/test/java/io/github/themoah/klag/metrics/timelag/PartitionOffsetHistoryTest.java b/src/test/java/io/github/themoah/klag/metrics/timelag/PartitionOffsetHistoryTest.java index 72d66b4..8f8ac68 100644 --- a/src/test/java/io/github/themoah/klag/metrics/timelag/PartitionOffsetHistoryTest.java +++ b/src/test/java/io/github/themoah/klag/metrics/timelag/PartitionOffsetHistoryTest.java @@ -116,39 +116,31 @@ void interpolateTimestamp_interpolatesCorrectly_withThreePoints() { } @Test - void interpolateTimestamp_extrapolatesBackward_whenOffsetOlderThanHistory() { + void interpolateTimestamp_returnsEmpty_whenOffsetOlderThanHistory() { PartitionOffsetHistory history = new PartitionOffsetHistory("topic", 0, 10); - // Record: offset 100 at time 1000, offset 200 at time 2000 history.addPoint(100, 1000); history.addPoint(200, 2000); - // Target offset 50 should extrapolate backward: time 500 - // Using slope from first two points: (2000-1000)/(200-100) = 10 ms/offset - // t = 1000 + (50-100) * 10 = 1000 - 500 = 500 OptionalLong result = history.interpolateTimestamp(50); - assertTrue(result.isPresent()); - assertEquals(500, result.getAsLong()); + assertTrue(result.isEmpty(), "Should not extrapolate beyond oldest retained sample"); } @Test - void interpolateTimestamp_extrapolatesBackward_toNegativeTimestamp() { + void interpolateTimestamp_returnsEmpty_whenOffsetWellBeforeHistory() { PartitionOffsetHistory history = new PartitionOffsetHistory("topic", 0, 10); - // Record: offset 100 at time 1000, offset 200 at time 2000 history.addPoint(100, 1000); history.addPoint(200, 2000); - // Target offset 0 should extrapolate to: 1000 + (0-100) * 10 = 0 OptionalLong result = history.interpolateTimestamp(0); - assertTrue(result.isPresent()); - assertEquals(0, result.getAsLong()); + assertTrue(result.isEmpty()); } @Test - void bufferSize_evictsOldestPoints() { + void bufferSize_evictsOldestPoints_andRejectsOutOfRangeInterpolation() { PartitionOffsetHistory history = new PartitionOffsetHistory("topic", 0, 3); history.addPoint(100, 1000); @@ -156,17 +148,11 @@ void bufferSize_evictsOldestPoints() { history.addPoint(300, 3000); assertEquals(3, history.size()); - // Adding 4th point should evict the oldest history.addPoint(400, 4000); assertEquals(3, history.size()); - // Now the oldest point is (200, 2000) - // Extrapolating to offset 100 should use points (200,2000) and (300,3000) - // slope = (3000-2000)/(300-200) = 10 - // t = 2000 + (100-200) * 10 = 2000 - 1000 = 1000 OptionalLong result = history.interpolateTimestamp(100); - assertTrue(result.isPresent()); - assertEquals(1000, result.getAsLong()); + assertTrue(result.isEmpty(), "Offset evicted from buffer must not be extrapolated"); } @Test From 31c57209b661c653adf2ceee95c844d3ba08bec8 Mon Sep 17 00:00:00 2001 From: Aviv Dozorets Date: Sat, 30 May 2026 18:21:54 +0300 Subject: [PATCH 2/3] logEndOffset anchor mismatch --- .../klag/kafka/KafkaClientServiceImpl.java | 28 +++++---- .../klag/metrics/MetricsCollector.java | 1 + .../KafkaOffsetTimestampInterpolator.java | 20 +++++-- .../themoah/klag/model/ConsumerGroupLag.java | 25 +++++++- .../themoah/klag/model/PartitionOffsets.java | 5 ++ .../klag/metrics/MetricsCollectorTest.java | 34 +++++------ .../KafkaOffsetTimestampInterpolatorTest.java | 57 +++++++++++++++++++ 7 files changed, 135 insertions(+), 35 deletions(-) diff --git a/src/main/java/io/github/themoah/klag/kafka/KafkaClientServiceImpl.java b/src/main/java/io/github/themoah/klag/kafka/KafkaClientServiceImpl.java index fc5d4b9..68354ea 100644 --- a/src/main/java/io/github/themoah/klag/kafka/KafkaClientServiceImpl.java +++ b/src/main/java/io/github/themoah/klag/kafka/KafkaClientServiceImpl.java @@ -151,20 +151,26 @@ public Future> getLogEndOffsets(String topic) { long logStartOffset = earliestTimestampOffsets.get(tp).getOffset(); long logStartTimestamp = earliestTimestampOffsets.get(tp).getTimestamp(); - // Get latest offset and timestamp - prefer MAX_TIMESTAMP if valid ListOffsetsResultInfo maxTimestampResult = maxTimestampOffsets.get(tp); ListOffsetsResultInfo latestOffsetResult = latestOffsets.get(tp); - long logEndOffset; + // logEndOffset is ALWAYS the true end-of-log (LATEST). This is the boundary + // for lag = logEndOffset - committedOffset, throughput, and retention. + long logEndOffset = latestOffsetResult.getOffset(); + + // The timestamp anchor comes from MAX_TIMESTAMP (offset of the highest-timestamp + // record). This offset can be < logEndOffset, so carry it separately rather than + // overwriting logEndOffset, otherwise interpolation anchors and lag disagree. long logEndTimestamp; + long maxTimestampOffset; if (maxTimestampResult.getOffset() >= 0 && maxTimestampResult.getTimestamp() > 0) { - // MAX_TIMESTAMP returned valid offset and timestamp - logEndOffset = maxTimestampResult.getOffset(); + // MAX_TIMESTAMP returned a valid offset/timestamp anchor + maxTimestampOffset = maxTimestampResult.getOffset(); logEndTimestamp = maxTimestampResult.getTimestamp(); } else { - // MAX_TIMESTAMP failed (pre-3.0 broker or no timestamps) - use LATEST - logEndOffset = latestOffsetResult.getOffset(); + // MAX_TIMESTAMP failed (pre-3.0 broker or no timestamps) - fall back to LATEST + maxTimestampOffset = latestOffsetResult.getOffset(); logEndTimestamp = latestOffsetResult.getTimestamp(); } @@ -176,16 +182,16 @@ public Future> getLogEndOffsets(String topic) { // Log first partition's timestamps at INFO level for debugging if (!loggedSampleTimestamp) { - log.info("Topic {} sample timestamps: partition {} logStart={} (ts={}), logEnd={} (ts={})", - topic, partition.partition(), logStartOffset, logStartTimestamp, logEndOffset, logEndTimestamp); + log.info("Topic {} sample timestamps: partition {} logStart={} (ts={}), logEnd={}, maxTsOffset={} (ts={})", + topic, partition.partition(), logStartOffset, logStartTimestamp, logEndOffset, maxTimestampOffset, logEndTimestamp); loggedSampleTimestamp = true; } else { - log.debug("Topic {} partition {}: logStart={} (ts={}), logEnd={} (ts={})", - topic, partition.partition(), logStartOffset, logStartTimestamp, logEndOffset, logEndTimestamp); + log.debug("Topic {} partition {}: logStart={} (ts={}), logEnd={}, maxTsOffset={} (ts={})", + topic, partition.partition(), logStartOffset, logStartTimestamp, logEndOffset, maxTimestampOffset, logEndTimestamp); } result.add(new PartitionOffsets(topic, partition.partition(), logEndOffset, logStartOffset, - logEndTimestamp, logStartTimestamp)); + logEndTimestamp, maxTimestampOffset, logStartTimestamp)); } log.info("Retrieved offsets for {} partitions of topic {}", result.size(), topic); diff --git a/src/main/java/io/github/themoah/klag/metrics/MetricsCollector.java b/src/main/java/io/github/themoah/klag/metrics/MetricsCollector.java index dd1ea4a..7c275c6 100644 --- a/src/main/java/io/github/themoah/klag/metrics/MetricsCollector.java +++ b/src/main/java/io/github/themoah/klag/metrics/MetricsCollector.java @@ -506,6 +506,7 @@ private ConsumerGroupLag buildConsumerGroupLag( po.logEndOffset(), po.logStartOffset(), po.logEndTimestamp(), + po.maxTimestampOffset(), po.logStartTimestamp(), committedOffset ); diff --git a/src/main/java/io/github/themoah/klag/metrics/timelag/KafkaOffsetTimestampInterpolator.java b/src/main/java/io/github/themoah/klag/metrics/timelag/KafkaOffsetTimestampInterpolator.java index e4f7377..db338ea 100644 --- a/src/main/java/io/github/themoah/klag/metrics/timelag/KafkaOffsetTimestampInterpolator.java +++ b/src/main/java/io/github/themoah/klag/metrics/timelag/KafkaOffsetTimestampInterpolator.java @@ -4,8 +4,13 @@ import java.util.OptionalLong; /** - * Estimates committed-offset message time from Kafka {@code listOffsets} anchor points - * ({@code logStartOffset/logStartTimestamp} and {@code logEndOffset/logEndTimestamp}). + * Estimates committed-offset message time from Kafka {@code listOffsets} anchor points. + * + *

    Anchors are {@code (logStartOffset, logStartTimestamp)} and + * {@code (maxTimestampOffset, logEndTimestamp)}. The end anchor uses {@code maxTimestampOffset} + * (the offset of the MAX_TIMESTAMP record), NOT {@code logEndOffset} (the true end-of-log), + * because only the former is the offset that actually carries {@code logEndTimestamp}. The two + * can differ; conflating them would skip interpolation or skew the timestamp. */ public final class KafkaOffsetTimestampInterpolator { @@ -13,7 +18,7 @@ private KafkaOffsetTimestampInterpolator() {} /** * Returns true when partition log boundaries have usable Kafka message timestamps - * and the committed offset lies within the log span (or at log end). + * and the committed offset lies within the log span (or at/after the timestamp anchor). */ public static boolean hasValidAnchors(PartitionLag partition) { if (partition.logStartTimestamp() <= 0 || partition.logEndTimestamp() <= 0) { @@ -22,12 +27,14 @@ public static boolean hasValidAnchors(PartitionLag partition) { if (partition.logEndTimestamp() < partition.logStartTimestamp()) { return false; } - if (partition.logEndOffset() <= partition.logStartOffset()) { + if (partition.maxTimestampOffset() <= partition.logStartOffset()) { return false; } if (partition.committedOffset() < partition.logStartOffset()) { return false; } + // Committed may sit between the max-timestamp record and the true end of the log; + // bound it by the true end offset, not the timestamp anchor. return partition.committedOffset() <= partition.logEndOffset(); } @@ -41,13 +48,14 @@ public static OptionalLong interpolateCommittedTimestamp(PartitionLag partition) return OptionalLong.empty(); } - if (partition.committedOffset() >= partition.logEndOffset()) { + // At or beyond the highest-timestamp record: the newest known timestamp applies. + if (partition.committedOffset() >= partition.maxTimestampOffset()) { return OptionalLong.of(partition.logEndTimestamp()); } return OptionalLong.of(linearInterpolate( partition.logStartOffset(), partition.logStartTimestamp(), - partition.logEndOffset(), partition.logEndTimestamp(), + partition.maxTimestampOffset(), partition.logEndTimestamp(), partition.committedOffset())); } diff --git a/src/main/java/io/github/themoah/klag/model/ConsumerGroupLag.java b/src/main/java/io/github/themoah/klag/model/ConsumerGroupLag.java index 75042c0..e9430c9 100644 --- a/src/main/java/io/github/themoah/klag/model/ConsumerGroupLag.java +++ b/src/main/java/io/github/themoah/klag/model/ConsumerGroupLag.java @@ -15,6 +15,11 @@ public record ConsumerGroupLag( /** * Lag details for a single partition. + * + *

    {@code logEndOffset} is the true end-of-log (LATEST) offset and is the basis for + * {@code lag}, throughput, and retention. {@code maxTimestampOffset} is the offset of the + * record carrying {@code logEndTimestamp} (the MAX_TIMESTAMP record); it can be less than + * {@code logEndOffset} and is the correct end anchor for time-based interpolation. */ public record PartitionLag( String topic, @@ -22,10 +27,14 @@ public record PartitionLag( long logEndOffset, long logStartOffset, long logEndTimestamp, + long maxTimestampOffset, long logStartTimestamp, long committedOffset, long lag ) { + /** + * Convenience factory where the max-timestamp anchor coincides with the true end offset. + */ public static PartitionLag of( String topic, int partition, @@ -34,10 +43,24 @@ public static PartitionLag of( long logEndTimestamp, long logStartTimestamp, long committedOffset + ) { + return of(topic, partition, logEndOffset, logStartOffset, logEndTimestamp, + logEndOffset, logStartTimestamp, committedOffset); + } + + public static PartitionLag of( + String topic, + int partition, + long logEndOffset, + long logStartOffset, + long logEndTimestamp, + long maxTimestampOffset, + long logStartTimestamp, + long committedOffset ) { long lag = Math.max(0, logEndOffset - committedOffset); return new PartitionLag(topic, partition, logEndOffset, logStartOffset, - logEndTimestamp, logStartTimestamp, committedOffset, lag); + logEndTimestamp, maxTimestampOffset, logStartTimestamp, committedOffset, lag); } } diff --git a/src/main/java/io/github/themoah/klag/model/PartitionOffsets.java b/src/main/java/io/github/themoah/klag/model/PartitionOffsets.java index 82b0f18..c0dbb0a 100644 --- a/src/main/java/io/github/themoah/klag/model/PartitionOffsets.java +++ b/src/main/java/io/github/themoah/klag/model/PartitionOffsets.java @@ -2,6 +2,10 @@ /** * Offset information for a topic partition, including timestamps. + * + *

    {@code logEndOffset} is the true end-of-log (LATEST) offset used for lag, throughput, and + * retention. {@code maxTimestampOffset} is the offset of the record carrying {@code logEndTimestamp} + * (the MAX_TIMESTAMP record), used as the end anchor for time-based interpolation. */ public record PartitionOffsets( String topic, @@ -9,5 +13,6 @@ public record PartitionOffsets( long logEndOffset, long logStartOffset, long logEndTimestamp, + long maxTimestampOffset, long logStartTimestamp ) {} diff --git a/src/test/java/io/github/themoah/klag/metrics/MetricsCollectorTest.java b/src/test/java/io/github/themoah/klag/metrics/MetricsCollectorTest.java index a723af4..ed14a75 100644 --- a/src/test/java/io/github/themoah/klag/metrics/MetricsCollectorTest.java +++ b/src/test/java/io/github/themoah/klag/metrics/MetricsCollectorTest.java @@ -18,8 +18,8 @@ public class MetricsCollectorTest { @Test void topicAggregates_singlePartition() { // Test TopicAggregates aggregation logic with single partition - // PartitionLag: topic, partition, logEndOffset, logStartOffset, logEndTimestamp, logStartTimestamp, committedOffset, lag - PartitionLag partition = new PartitionLag("topic1", 0, 1000, 0, 1000000L, 0L, 800, 200); + // PartitionLag: topic, partition, logEndOffset, logStartOffset, logEndTimestamp, maxTimestampOffset, logStartTimestamp, committedOffset, lag + PartitionLag partition = new PartitionLag("topic1", 0, 1000, 0, 1000000L, 1000, 0L, 800, 200); long totalLogEnd = partition.logEndOffset(); long totalCommitted = partition.committedOffset(); @@ -33,10 +33,10 @@ void topicAggregates_singlePartition() { @Test void topicAggregates_multiplePartitions_sumsCorrectly() { // Test aggregation across multiple partitions of same topic - // PartitionLag: topic, partition, logEndOffset, logStartOffset, logEndTimestamp, logStartTimestamp, committedOffset, lag - PartitionLag p1 = new PartitionLag("topic1", 0, 1000, 0, 1000000L, 0L, 800, 200); - PartitionLag p2 = new PartitionLag("topic1", 1, 1500, 0, 1000000L, 0L, 1200, 300); - PartitionLag p3 = new PartitionLag("topic1", 2, 2000, 0, 1000000L, 0L, 1800, 200); + // PartitionLag: topic, partition, logEndOffset, logStartOffset, logEndTimestamp, maxTimestampOffset, logStartTimestamp, committedOffset, lag + PartitionLag p1 = new PartitionLag("topic1", 0, 1000, 0, 1000000L, 1000, 0L, 800, 200); + PartitionLag p2 = new PartitionLag("topic1", 1, 1500, 0, 1000000L, 1500, 0L, 1200, 300); + PartitionLag p3 = new PartitionLag("topic1", 2, 2000, 0, 1000000L, 2000, 0L, 1800, 200); // Simulate aggregation as done in MetricsCollector long totalLogEnd = p1.logEndOffset() + p2.logEndOffset() + p3.logEndOffset(); @@ -193,7 +193,7 @@ void retentionRisk_basicPercentageCalculation() { // retention_window = 35000 - 31000 = 4000 // lag = 35000 - 34300 = 700 // percent = (700 / 4000) * 100 = 17.5% - PartitionLag partition = new PartitionLag("topic1", 0, 35000, 31000, 0L, 0L, 34300, 700); + PartitionLag partition = new PartitionLag("topic1", 0, 35000, 31000, 0L, 35000, 0L, 34300, 700); long retentionWindow = partition.logEndOffset() - partition.logStartOffset(); double percent = (partition.lag() / (double) retentionWindow) * 100.0; @@ -207,7 +207,7 @@ void retentionRisk_basicPercentageCalculation() { void retentionRisk_emptyPartitionSkipped() { // Empty partition: logEndOffset == logStartOffset // Should skip (retentionWindow = 0) - PartitionLag partition = new PartitionLag("topic1", 0, 1000, 1000, 0L, 0L, 1000, 0); + PartitionLag partition = new PartitionLag("topic1", 0, 1000, 1000, 0L, 1000, 0L, 1000, 0); long retentionWindow = partition.logEndOffset() - partition.logStartOffset(); @@ -218,7 +218,7 @@ void retentionRisk_emptyPartitionSkipped() { void retentionRisk_consumerCaughtUp_zeroPercent() { // Consumer caught up: committedOffset >= logEndOffset // Should report 0% - PartitionLag partition = new PartitionLag("topic1", 0, 5000, 1000, 0L, 0L, 5000, 0); + PartitionLag partition = new PartitionLag("topic1", 0, 5000, 1000, 0L, 5000, 0L, 5000, 0); long retentionWindow = partition.logEndOffset() - partition.logStartOffset(); double percent = partition.lag() <= 0 ? 0.0 : (partition.lag() / (double) retentionWindow) * 100.0; @@ -232,7 +232,7 @@ void retentionRisk_consumerCaughtUp_zeroPercent() { void retentionRisk_consumerBehindLogStart_100Percent() { // Consumer behind log_start: committedOffset < logStartOffset // Data loss already occurred - should report 100% - PartitionLag partition = new PartitionLag("topic1", 0, 5000, 2000, 0L, 0L, 1500, 3500); + PartitionLag partition = new PartitionLag("topic1", 0, 5000, 2000, 0L, 5000, 0L, 1500, 3500); boolean dataLossOccurred = partition.committedOffset() < partition.logStartOffset(); @@ -248,9 +248,9 @@ void retentionRisk_multiplePartitions_maxAggregation() { // Partition 2: lag=100, window=5000 -> 2% // Max should be 26.3% - PartitionLag p0 = new PartitionLag("topic1", 0, 35000, 31000, 0L, 0L, 34300, 700); - PartitionLag p1 = new PartitionLag("topic1", 1, 35300, 31500, 0L, 0L, 34300, 1000); - PartitionLag p2 = new PartitionLag("topic1", 2, 40000, 35000, 0L, 0L, 39900, 100); + PartitionLag p0 = new PartitionLag("topic1", 0, 35000, 31000, 0L, 35000, 0L, 34300, 700); + PartitionLag p1 = new PartitionLag("topic1", 1, 35300, 31500, 0L, 35300, 0L, 34300, 1000); + PartitionLag p2 = new PartitionLag("topic1", 2, 40000, 35000, 0L, 40000, 0L, 39900, 100); double percent0 = (p0.lag() / (double)(p0.logEndOffset() - p0.logStartOffset())) * 100.0; double percent1 = (p1.lag() / (double)(p1.logEndOffset() - p1.logStartOffset())) * 100.0; @@ -270,8 +270,8 @@ void retentionRisk_activeConsumer_percentageDecreases() { // Poll 1: lag=1000, window=4000 -> 25% // Poll 2: consumer processed 500 messages, lag=500, window=4000 -> 12.5% - PartitionLag poll1 = new PartitionLag("topic1", 0, 35000, 31000, 0L, 0L, 34000, 1000); - PartitionLag poll2 = new PartitionLag("topic1", 0, 35000, 31000, 0L, 0L, 34500, 500); + PartitionLag poll1 = new PartitionLag("topic1", 0, 35000, 31000, 0L, 35000, 0L, 34000, 1000); + PartitionLag poll2 = new PartitionLag("topic1", 0, 35000, 31000, 0L, 35000, 0L, 34500, 500); double percent1 = (poll1.lag() / (double)(poll1.logEndOffset() - poll1.logStartOffset())) * 100.0; double percent2 = (poll2.lag() / (double)(poll2.logEndOffset() - poll2.logStartOffset())) * 100.0; @@ -287,8 +287,8 @@ void retentionRisk_staleConsumer_shrinkingWindow_percentageIncreases() { // Poll 1: lag=1000, window=4000 (logEnd=35000, logStart=31000) -> 25% // Poll 2: retention cleaned up 500 messages, lag=1000, window=3500 (logEnd=35000, logStart=31500) -> 28.57% - PartitionLag poll1 = new PartitionLag("topic1", 0, 35000, 31000, 0L, 0L, 34000, 1000); - PartitionLag poll2 = new PartitionLag("topic1", 0, 35000, 31500, 0L, 0L, 34000, 1000); + PartitionLag poll1 = new PartitionLag("topic1", 0, 35000, 31000, 0L, 35000, 0L, 34000, 1000); + PartitionLag poll2 = new PartitionLag("topic1", 0, 35000, 31500, 0L, 35000, 0L, 34000, 1000); double percent1 = (poll1.lag() / (double)(poll1.logEndOffset() - poll1.logStartOffset())) * 100.0; double percent2 = (poll2.lag() / (double)(poll2.logEndOffset() - poll2.logStartOffset())) * 100.0; diff --git a/src/test/java/io/github/themoah/klag/metrics/timelag/KafkaOffsetTimestampInterpolatorTest.java b/src/test/java/io/github/themoah/klag/metrics/timelag/KafkaOffsetTimestampInterpolatorTest.java index 52191be..8d582e5 100644 --- a/src/test/java/io/github/themoah/klag/metrics/timelag/KafkaOffsetTimestampInterpolatorTest.java +++ b/src/test/java/io/github/themoah/klag/metrics/timelag/KafkaOffsetTimestampInterpolatorTest.java @@ -15,6 +15,15 @@ private static PartitionLag partition( return PartitionLag.of("topic", 0, logEndOffset, logStartOffset, logEndTs, logStartTs, committedOffset); } + // Variant where the true end offset (LATEST) differs from the max-timestamp offset. + private static PartitionLag partition( + long logStartOffset, long logStartTs, + long logEndOffset, long logEndTs, long maxTimestampOffset, + long committedOffset) { + return PartitionLag.of("topic", 0, logEndOffset, logStartOffset, logEndTs, + maxTimestampOffset, logStartTs, committedOffset); + } + @Test void hasValidAnchors_returnsTrue_forNormalPartition() { assertTrue(KafkaOffsetTimestampInterpolator.hasValidAnchors( @@ -91,4 +100,52 @@ void estimateLagMs_returnsEmpty_whenAnchorsInvalid() { assertTrue(KafkaOffsetTimestampInterpolator.estimateLagMs(p, System.currentTimeMillis()).isEmpty()); } + + // Regression: LATEST (true end) > MAX_TIMESTAMP offset. Committed sits between the + // max-timestamp record and the true end; interpolation must NOT be skipped, and the + // timestamp anchor must come from maxTimestampOffset/logEndTimestamp (not logEndOffset). + @Test + void hasValidAnchors_returnsTrue_whenLatestExceedsMaxTimestampOffset() { + // logStart=0, maxTimestampOffset=1000, true logEnd=1200, committed=1100 (past max-ts record) + PartitionLag p = partition(0, 1_000L, 1200, 11_000L, 1000, 1100); + assertTrue(KafkaOffsetTimestampInterpolator.hasValidAnchors(p)); + } + + @Test + void interpolate_clampsToLogEndTimestamp_whenCommittedPastMaxTimestampOffset() { + PartitionLag p = partition(0, 1_000L, 1200, 11_000L, 1000, 1100); + + OptionalLong ts = KafkaOffsetTimestampInterpolator.interpolateCommittedTimestamp(p); + + assertTrue(ts.isPresent()); + assertEquals(11_000L, ts.getAsLong(), + "committed beyond max-timestamp record should clamp to logEndTimestamp"); + } + + @Test + void interpolate_usesMaxTimestampOffsetSlope_notLogEndOffset() { + // Span: offset 0..1000 maps to ts 1000..11000 (slope 10ms/offset). True end is 5000. + // committed=500 → 1000 + 500*10 = 6000. If the (wrong) logEndOffset=5000 were used as + // the anchor, slope would be 2ms/offset → 2000, a very different result. + PartitionLag p = partition(0, 1_000L, 5000, 11_000L, 1000, 500); + + OptionalLong ts = KafkaOffsetTimestampInterpolator.interpolateCommittedTimestamp(p); + + assertTrue(ts.isPresent()); + assertEquals(6_000L, ts.getAsLong()); + } + + @Test + void estimateLagMs_notSkipped_whenLatestExceedsMaxTimestampOffset() { + long logStartTs = 1_000_000L; + long logEndTs = logStartTs + 2 * 60 * 60 * 1000L; // 2h span + long currentTime = logEndTs + 60_000L; + // committed near log start → ~2h old; true end (10_000) well past max-ts offset (8_000) + PartitionLag p = partition(0, logStartTs, 10_000, logEndTs, 8_000, 100); + + OptionalLong lagMs = KafkaOffsetTimestampInterpolator.estimateLagMs(p, currentTime); + + assertTrue(lagMs.isPresent(), "must not silently skip when LATEST != MAX_TIMESTAMP offset"); + assertTrue(lagMs.getAsLong() > 1_800_000L); + } } From 5c77a2bec95fa0330a984fc79729ce339168669d Mon Sep 17 00:00:00 2001 From: Aviv Dozorets Date: Sat, 30 May 2026 22:18:18 +0300 Subject: [PATCH 3/3] bump the version --- build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle.kts b/build.gradle.kts index d3604e3..e87d144 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -8,7 +8,7 @@ plugins { } group = "io.github.themoah" -version = "0.1.11" +version = "0.1.13" repositories { mavenCentral()