fix: use Kafka log timestamps for lag_ms (closes #30)#31
Conversation
klag_consumer_lag_ms previously plateaued at ~30 minutes. Root cause: lag_ms was derived solely from a poll-time (logEndOffset, systemTime) ring buffer with bounded retention (TIME_LAG_INTERPOLATION_BUFFER_SIZE × METRICS_INTERVAL_MS), and PartitionOffsetHistory back-extrapolated using the two oldest points when the committed offset fell behind the window — producing a fictional ceiling near the buffer's time span. Fix introduces a two-tier estimation in LagMsCalculator: - Primary: KafkaOffsetTimestampInterpolator linearly interpolates committed-offset timestamp between Kafka listOffsets log start/end anchors (already populated via OffsetSpec.earliest/maxTimestamp/latest in KafkaClientServiceImpl). No ceiling — reports true lag in ms for arbitrarily old commits. - Fallback: poll-time interpolation, used only when Kafka anchors are invalid (logStart/EndTimestamp ≤ 0 or committed outside log range). Now strictly refuses to extrapolate beyond the oldest retained sample. MetricsCollector.calculateLagMs delegates per-partition estimation to LagMsCalculator, still records poll offsets so the fallback stays warm. Bumps version to 0.1.11. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Review Summary by QodoFix lag_ms softcap by using Kafka log timestamps with bounded fallback
WalkthroughsDescription• Eliminates ~30-minute softcap on klag_consumer_lag_ms by using Kafka log timestamps • Adds KafkaOffsetTimestampInterpolator for primary unbounded lag estimation • Implements LagMsCalculator orchestrating Kafka timestamps with poll-time fallback • Removes backward extrapolation; fallback now strictly bounded to retained samples • Version bumped to 0.1.11 Diagramflowchart LR
A["PartitionLag<br/>with Kafka anchors"] -->|"hasValidAnchors?"| B["KafkaOffsetTimestampInterpolator"]
B -->|"interpolate<br/>committed offset"| C["Unbounded<br/>lag_ms estimate"]
A -->|"invalid anchors"| D["LagMsCalculator"]
D -->|"fallback"| E["OffsetTimestampTracker<br/>poll history"]
E -->|"within retained<br/>samples only"| F["Bounded<br/>lag_ms estimate"]
C --> G["MetricsCollector<br/>reports lag_ms"]
F --> G
File Changes1. src/main/java/io/github/themoah/klag/metrics/timelag/KafkaOffsetTimestampInterpolator.java
|
Code Review by Qodo
Context used✅ Tickets:
🎫 Time lag softcaps at 30 minutes 1. logEndOffset anchor mismatch
|
Summary
klag_consumer_lag_msreported in Time lag softcaps at 30 minutes #30KafkaOffsetTimestampInterpolator(primary) using KafkalistOffsetslog start/end timestamps + offsets — unbounded window(logEndOffset, systemTime)history as a strict fallback (no backward extrapolation)LagMsCalculatororchestrates primary → fallback per partition0.1.11Root cause
Pre-fix,
lag_mswas derived only from the poll-time ring buffer.With
TIME_LAG_INTERPOLATION_BUFFER_SIZE=60×METRICS_INTERVAL_MS=30000, thebuffer held 30 min of history. The old
PartitionOffsetHistoryback-extrapolatedvia the two oldest points whenever the committed offset fell behind the window,
which fabricated a timestamp near the edge — hence the visible ceiling.
Kafka's actual
logStartTimestamp/logEndTimestamp(fetched viaOffsetSpec.earliest()andOffsetSpec.maxTimestamp()inKafkaClientServiceImpl)was already in
PartitionLagbut never consulted forlag_ms.Design
KafkaOffsetTimestampInterpolator: validates anchors(
logStart/EndTimestamp > 0, valid range, committed within[logStart, logEnd]),linearly interpolates committed-offset timestamp between log boundaries, and
computes
currentTime - interpolatedTs. Reports lag accurately for any age.uses the existing poll-time ring buffer only within retained samples; if
the committed offset is older than the oldest retained sample, returns empty
(no fictitious cap).
OffsetTimestampTrackerkeeps recording every poll so the fallback stays warm.Test plan
gradle clean test— 113 tests pass, including 10 new tests inKafkaOffsetTimestampInterpolatorTestandLagMsCalculatorTestcovering a 2h lag case that previously would have softcappedgradle assemble— shadow jar builds clean0.1.11against the affected cluster, confirmklag_consumer_lag_mstracks the naive-exporter graph beyond 30 min and falls to 0 when consumer catches up🤖 Generated with Claude Code