Skip to content

Conversation

@aliehsaeedii
Copy link
Contributor

@aliehsaeedii aliehsaeedii commented Dec 15, 2025

This PR implements KAFKA-18615 by adding a windowSum aggregation that
computes the sum of values over a time window, so that commit-ratio,
poll-ratio, process-ratio, and punctuate-ratio represent the ratio
of the {action} over a window duration rather than a single iteration.

The effective window duration is whatever you configure for metrics:

  • metrics.sample.window.ms (per-sample window length)
  • times metrics.num.samples (number of rolling windows)

With the default Kafka metrics config, that is typically:
metrics.sample.window.ms = 30000 ms metrics.num.samples = 2 → ~60
seconds total rolling window.

@github-actions github-actions bot added triage PRs from the community streams small Small PRs labels Dec 15, 2025
@github-actions github-actions bot removed the small Small PRs label Dec 16, 2025
Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also update the description of each metric, saying it's windowed (eg in ThreadMetrics.java)

@github-actions github-actions bot removed the triage PRs from the community label Dec 16, 2025
@github-actions github-actions bot added the small Small PRs label Dec 16, 2025
Copy link
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @aliehsaeedii - overall LGTM - can we add a description to docs/upgrade.html section for 4.2?

@github-actions github-actions bot removed the small Small PRs label Dec 17, 2025
Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Few more minor things.

<li>The behavior of <code>org.apache.kafka.streams.KafkaStreams#removeStreamThread</code> has been changed. The consumer has no longer remove once <code>removeStreamThread</code> finished.
Instead, consumer would be kicked off from the group after <code>org.apache.kafka.streams.processor.internals.StreamThread</code> completes its <code>run</code> function.
</li>
<li>TThe streams thread metrics <code>commit-ratio</code>, <code>process-ratio</code>, <code>punctuate-ratio</code>, and <code>poll-ratio</code> have been updated.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
<li>TThe streams thread metrics <code>commit-ratio</code>, <code>process-ratio</code>, <code>punctuate-ratio</code>, and <code>poll-ratio</code> have been updated.
<li>The streams thread metrics <code>commit-ratio</code>, <code>process-ratio</code>, <code>punctuate-ratio</code>, and <code>poll-ratio</code> have been updated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbejeck Did you intent docs/upgrade.html? Not sure if this change it too small for top level upgrade notes and if docs/streams/upgrade-guide.html would be better suited?

boolean cleanRun = false;
try {
taskManager.init();
initLatencyWindowsIfNeeded(System.currentTimeMillis());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should never call System.curentTimeMillis()directly, but use time.milliseconds() instead.

pollLatencyWindowedSum.record(metricsConfig, 0.0, now);
this.totalCommitLatencyWindowedSum.record(metricsConfig, 0, now);
this.processLatencyWindowedSum.record(metricsConfig, 0, now);
this.punctuateLatencyWindowedSum.record(metricsConfig, 0, now);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove unnecessary this.

this.totalCommitLatencyWindowedSum.record(metricsConfig, totalCommitLatency, now);
this.processLatencyWindowedSum.record(metricsConfig, processLatency, now);
this.punctuateLatencyWindowedSum.record(metricsConfig, punctuateLatency, now);
this.runOnceLatencyWindowedSum.record(metricsConfig, runOnceLatency, now);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants