Skip to content

fix: use Kafka log timestamps for lag_ms (closes #30)#31

Merged
themoah merged 4 commits into
mainfrom
bugfix/timelag-softcap
May 30, 2026
Merged

fix: use Kafka log timestamps for lag_ms (closes #30)#31
themoah merged 4 commits into
mainfrom
bugfix/timelag-softcap

Conversation

@themoah
Copy link
Copy Markdown
Owner

@themoah themoah commented May 27, 2026

Summary

  • Eliminates the ~30-minute softcap on klag_consumer_lag_ms reported in Time lag softcaps at 30 minutes #30
  • Adds KafkaOffsetTimestampInterpolator (primary) using Kafka listOffsets log start/end timestamps + offsets — unbounded window
  • Keeps poll-time (logEndOffset, systemTime) history as a strict fallback (no backward extrapolation)
  • LagMsCalculator orchestrates primary → fallback per partition
  • Version bumped to 0.1.11

Root cause

Pre-fix, lag_ms was derived only from the poll-time ring buffer.
With TIME_LAG_INTERPOLATION_BUFFER_SIZE=60 × METRICS_INTERVAL_MS=30000, the
buffer held 30 min of history. The old PartitionOffsetHistory back-extrapolated
via the two oldest points whenever the committed offset fell behind the window,
which fabricated a timestamp near the edge — hence the visible ceiling.

Kafka's actual logStartTimestamp / logEndTimestamp (fetched via
OffsetSpec.earliest() and OffsetSpec.maxTimestamp() in KafkaClientServiceImpl)
was already in PartitionLag but never consulted for lag_ms.

Design

  • PrimaryKafkaOffsetTimestampInterpolator: validates anchors
    (logStart/EndTimestamp > 0, valid range, committed within [logStart, logEnd]),
    linearly interpolates committed-offset timestamp between log boundaries, and
    computes currentTime - interpolatedTs. Reports lag accurately for any age.
  • Fallback — when Kafka anchors are missing/invalid (e.g. pre-3.0 broker),
    uses the existing poll-time ring buffer only within retained samples; if
    the committed offset is older than the oldest retained sample, returns empty
    (no fictitious cap).
  • OffsetTimestampTracker keeps recording every poll so the fallback stays warm.

Test plan

  • gradle clean test — 113 tests pass, including 10 new tests in KafkaOffsetTimestampInterpolatorTest and LagMsCalculatorTest covering a 2h lag case that previously would have softcapped
  • gradle assemble — shadow jar builds clean
  • Manual: deploy 0.1.11 against the affected cluster, confirm klag_consumer_lag_ms tracks the naive-exporter graph beyond 30 min and falls to 0 when consumer catches up

🤖 Generated with Claude Code

klag_consumer_lag_ms previously plateaued at ~30 minutes. Root cause: lag_ms
was derived solely from a poll-time (logEndOffset, systemTime) ring buffer
with bounded retention (TIME_LAG_INTERPOLATION_BUFFER_SIZE × METRICS_INTERVAL_MS),
and PartitionOffsetHistory back-extrapolated using the two oldest points when
the committed offset fell behind the window — producing a fictional ceiling
near the buffer's time span.

Fix introduces a two-tier estimation in LagMsCalculator:

- Primary: KafkaOffsetTimestampInterpolator linearly interpolates committed-offset
  timestamp between Kafka listOffsets log start/end anchors (already populated
  via OffsetSpec.earliest/maxTimestamp/latest in KafkaClientServiceImpl). No
  ceiling — reports true lag in ms for arbitrarily old commits.
- Fallback: poll-time interpolation, used only when Kafka anchors are invalid
  (logStart/EndTimestamp ≤ 0 or committed outside log range). Now strictly
  refuses to extrapolate beyond the oldest retained sample.

MetricsCollector.calculateLagMs delegates per-partition estimation to
LagMsCalculator, still records poll offsets so the fallback stays warm.

Bumps version to 0.1.11.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@qodo-code-review
Copy link
Copy Markdown

Review Summary by Qodo

Fix lag_ms softcap by using Kafka log timestamps with bounded fallback

🐞 Bug fix

Grey Divider

Walkthroughs

Description
• Eliminates ~30-minute softcap on klag_consumer_lag_ms by using Kafka log timestamps
• Adds KafkaOffsetTimestampInterpolator for primary unbounded lag estimation
• Implements LagMsCalculator orchestrating Kafka timestamps with poll-time fallback
• Removes backward extrapolation; fallback now strictly bounded to retained samples
• Version bumped to 0.1.11
Diagram
flowchart LR
  A["PartitionLag<br/>with Kafka anchors"] -->|"hasValidAnchors?"| B["KafkaOffsetTimestampInterpolator"]
  B -->|"interpolate<br/>committed offset"| C["Unbounded<br/>lag_ms estimate"]
  A -->|"invalid anchors"| D["LagMsCalculator"]
  D -->|"fallback"| E["OffsetTimestampTracker<br/>poll history"]
  E -->|"within retained<br/>samples only"| F["Bounded<br/>lag_ms estimate"]
  C --> G["MetricsCollector<br/>reports lag_ms"]
  F --> G

Loading

Grey Divider

File Changes

1. src/main/java/io/github/themoah/klag/metrics/timelag/KafkaOffsetTimestampInterpolator.java ✨ Enhancement +72/-0

New primary lag estimation using Kafka log timestamps

src/main/java/io/github/themoah/klag/metrics/timelag/KafkaOffsetTimestampInterpolator.java


2. src/main/java/io/github/themoah/klag/metrics/timelag/LagMsCalculator.java ✨ Enhancement +56/-0

New orchestrator for two-tier lag estimation strategy

src/main/java/io/github/themoah/klag/metrics/timelag/LagMsCalculator.java


3. src/main/java/io/github/themoah/klag/metrics/MetricsCollector.java ✨ Enhancement +18/-39

Delegates per-partition lag calculation to LagMsCalculator

src/main/java/io/github/themoah/klag/metrics/MetricsCollector.java


View more (11)
4. src/main/java/io/github/themoah/klag/metrics/timelag/OffsetTimestampTracker.java ✨ Enhancement +16/-0

Adds method to retrieve oldest retained offset for validation

src/main/java/io/github/themoah/klag/metrics/timelag/OffsetTimestampTracker.java


5. src/main/java/io/github/themoah/klag/metrics/timelag/PartitionOffsetHistory.java 🐞 Bug fix +15/-12

Removes backward extrapolation; adds oldest offset accessor

src/main/java/io/github/themoah/klag/metrics/timelag/PartitionOffsetHistory.java


6. src/main/java/io/github/themoah/klag/metrics/timelag/TimeLagEstimator.java 📝 Documentation +2/-2

Updates documentation to reference new lag calculation path

src/main/java/io/github/themoah/klag/metrics/timelag/TimeLagEstimator.java


7. src/main/java/io/github/themoah/klag/model/LagMs.java 📝 Documentation +3/-3

Updates javadoc to describe dual-path lag estimation

src/main/java/io/github/themoah/klag/model/LagMs.java


8. src/main/java/io/github/themoah/klag/metrics/MicrometerReporter.java 📝 Documentation +1/-1

Updates javadoc for lag_ms reporting method

src/main/java/io/github/themoah/klag/metrics/MicrometerReporter.java


9. src/test/java/io/github/themoah/klag/metrics/timelag/KafkaOffsetTimestampInterpolatorTest.java 🧪 Tests +94/-0

New tests for Kafka timestamp interpolation and 2-hour lag case

src/test/java/io/github/themoah/klag/metrics/timelag/KafkaOffsetTimestampInterpolatorTest.java


10. src/test/java/io/github/themoah/klag/metrics/timelag/LagMsCalculatorTest.java 🧪 Tests +71/-0

New tests for two-tier lag estimation orchestration

src/test/java/io/github/themoah/klag/metrics/timelag/LagMsCalculatorTest.java


11. src/test/java/io/github/themoah/klag/metrics/timelag/PartitionOffsetHistoryTest.java 🧪 Tests +6/-20

Updates tests to verify no backward extrapolation occurs

src/test/java/io/github/themoah/klag/metrics/timelag/PartitionOffsetHistoryTest.java


12. CLAUDE.md 📝 Documentation +1/-1

Updates metric documentation with new dual-path lag strategy

CLAUDE.md


13. README.md 📝 Documentation +1/-1

Updates lag_ms metric description with Kafka timestamps and fallback

README.md


14. build.gradle.kts ⚙️ Configuration changes +1/-1

Bumps version from 0.1.10 to 0.1.11

build.gradle.kts


Grey Divider

Qodo Logo

@qodo-code-review
Copy link
Copy Markdown

qodo-code-review Bot commented May 27, 2026

Code Review by Qodo

🐞 Bugs (1) 📘 Rule violations (0) 📎 Requirement gaps (0)

Context used

Grey Divider


Remediation recommended

1. logEndOffset anchor mismatch 🐞 Bug ≡ Correctness
Description
KafkaOffsetTimestampInterpolator treats logEndOffset as the log’s upper offset boundary
(committedOffset must be <= logEndOffset), but KafkaClientServiceImpl may set logEndOffset from the
MAX_TIMESTAMP query (offset with highest timestamp) instead of the LATEST offset. If MAX_TIMESTAMP’s
offset is behind the true latest offset, lag_ms primary interpolation will be rejected and other
lag/throughput metrics derived from logEndOffset become incorrect.
Code

src/main/java/io/github/themoah/klag/metrics/timelag/KafkaOffsetTimestampInterpolator.java[R18-31]

Evidence
The interpolator requires committedOffset <= logEndOffset to consider Kafka anchors valid; meanwhile
KafkaClientServiceImpl can assign logEndOffset from MAX_TIMESTAMP (“offset with the highest
timestamp”) even though it also fetches LATEST (“actual latest offset”). PartitionLag.lag is derived
from logEndOffset, so any mismatch affects both lag_ms anchor validity and core lag computations.

src/main/java/io/github/themoah/klag/metrics/timelag/KafkaOffsetTimestampInterpolator.java[18-32]
src/main/java/io/github/themoah/klag/kafka/KafkaClientServiceImpl.java[102-170]
src/main/java/io/github/themoah/klag/model/ConsumerGroupLag.java[19-41]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`KafkaOffsetTimestampInterpolator.hasValidAnchors()` assumes `PartitionLag.logEndOffset` is the true end-of-log offset boundary and rejects interpolation if `committedOffset > logEndOffset`. However `KafkaClientServiceImpl.getLogEndOffsets()` can set `logEndOffset` from the MAX_TIMESTAMP listOffsets query (documented in-code as “offset with the highest timestamp”), while also fetching `OffsetSpec.LATEST`.

This conflates two different notions:
- the true end offset (needed for `lag = logEndOffset - committedOffset`, throughput, retention window)
- the offset associated with the max-timestamp record (needed if you want a `(offset,timestamp)` anchor for interpolation)

When these differ, the new lag_ms primary path can be skipped (anchors invalid) and existing lag/throughput/retention computations can be wrong because they all use `logEndOffset`.

### Issue Context
- `KafkaClientServiceImpl` explicitly issues **both** MAX_TIMESTAMP and LATEST queries but assigns `logEndOffset` from MAX_TIMESTAMP when it returns a valid result.
- `ConsumerGroupLag.PartitionLag.of()` computes `lag` directly from `logEndOffset`.

### Fix Focus Areas
- src/main/java/io/github/themoah/klag/kafka/KafkaClientServiceImpl.java[102-170]
- src/main/java/io/github/themoah/klag/model/ConsumerGroupLag.java[19-41]
- src/main/java/io/github/themoah/klag/metrics/timelag/KafkaOffsetTimestampInterpolator.java[18-32]

### Suggested fix approach
1. Make `logEndOffset` consistently represent the **LATEST** (true end) offset.
2. If you still need a timestamp anchor from MAX_TIMESTAMP, carry it separately (e.g., add `maxTimestampOffset` alongside `logEndTimestamp`, or add a dedicated end-anchor struct for interpolation).
3. Update `KafkaOffsetTimestampInterpolator` to use the correct offset that corresponds to `logEndTimestamp` (the max-timestamp offset) for interpolation, while `PartitionLag.lag` and other offset-based metrics continue using the true end offset.
4. Add a unit test around `buildConsumerGroupLag` / lag_ms estimation to cover a case where `LATEST offset != MAX_TIMESTAMP offset` and ensure lag and lag_ms behave sensibly (no silent skipping due to mismatched anchors).

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


Grey Divider

Qodo Logo

@themoah themoah merged commit 9653d2f into main May 30, 2026
9 checks passed
@themoah themoah deleted the bugfix/timelag-softcap branch May 30, 2026 19:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant