-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-18615: StreamThread *-ratio metrics suffer from sampling bias #21160
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Outdated
Show resolved
Hide resolved
mjsax
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also update the description of each metric, saying it's windowed (eg in ThreadMetrics.java)
bbejeck
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @aliehsaeedii - overall LGTM - can we add a description to docs/upgrade.html section for 4.2?
mjsax
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. Few more minor things.
| <li>The behavior of <code>org.apache.kafka.streams.KafkaStreams#removeStreamThread</code> has been changed. The consumer has no longer remove once <code>removeStreamThread</code> finished. | ||
| Instead, consumer would be kicked off from the group after <code>org.apache.kafka.streams.processor.internals.StreamThread</code> completes its <code>run</code> function. | ||
| </li> | ||
| <li>TThe streams thread metrics <code>commit-ratio</code>, <code>process-ratio</code>, <code>punctuate-ratio</code>, and <code>poll-ratio</code> have been updated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| <li>TThe streams thread metrics <code>commit-ratio</code>, <code>process-ratio</code>, <code>punctuate-ratio</code>, and <code>poll-ratio</code> have been updated. | |
| <li>The streams thread metrics <code>commit-ratio</code>, <code>process-ratio</code>, <code>punctuate-ratio</code>, and <code>poll-ratio</code> have been updated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bbejeck Did you intent docs/upgrade.html? Not sure if this change it too small for top level upgrade notes and if docs/streams/upgrade-guide.html would be better suited?
| boolean cleanRun = false; | ||
| try { | ||
| taskManager.init(); | ||
| initLatencyWindowsIfNeeded(System.currentTimeMillis()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should never call System.curentTimeMillis()directly, but use time.milliseconds() instead.
| pollLatencyWindowedSum.record(metricsConfig, 0.0, now); | ||
| this.totalCommitLatencyWindowedSum.record(metricsConfig, 0, now); | ||
| this.processLatencyWindowedSum.record(metricsConfig, 0, now); | ||
| this.punctuateLatencyWindowedSum.record(metricsConfig, 0, now); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove unnecessary this.
| this.totalCommitLatencyWindowedSum.record(metricsConfig, totalCommitLatency, now); | ||
| this.processLatencyWindowedSum.record(metricsConfig, processLatency, now); | ||
| this.punctuateLatencyWindowedSum.record(metricsConfig, punctuateLatency, now); | ||
| this.runOnceLatencyWindowedSum.record(metricsConfig, runOnceLatency, now); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
This PR implements KAFKA-18615 by adding a windowSum aggregation that
computes the sum of values over a time window, so that
commit-ratio,poll-ratio,process-ratio, andpunctuate-ratiorepresent the ratioof the
{action}over a window duration rather than a single iteration.The effective window duration is whatever you configure for metrics:
metrics.sample.window.ms(per-sample window length)times metrics.num.samples(number of rolling windows)With the default Kafka metrics config, that is typically:
metrics.sample.window.ms = 30000 msmetrics.num.samples = 2→ ~60seconds total rolling window.