-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-17019: Producer TimeoutException should include root cause #20159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
frankvicky
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chickenchickenlove: Thanks for the patch.
| this.await(timeout, unit, null); | ||
| } | ||
|
|
||
| public void await(long timeout, TimeUnit unit, PotentialCauseException potentialCauseException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we change PotentialCauseException to Supplier<PotentialCauseException>?
Creating an exception instance is very expensive; lazy creation could help us avoid this situation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your comments and sounds great!
I have addressed PR review.
When you have time, please take another look. 🙇♂️
a77073f to
abc86a6
Compare
|
Thanks for the PR. Because you've created a new public exception class |
chia7712
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chickenchickenlove thanks for this patch. one major comment is left. Please take a a look
| throw new TimeoutException(errorMessage); | ||
| if (ex.getCause() != null) | ||
| throw new TimeoutException(errorMessage, ex.getCause()); | ||
| throw new TimeoutException(errorMessage, new PotentialCauseException("Metadata update timed out ― topic missing, auth denied, broker/partition unavailable, or client sender/buffer stalled.")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems PotentialCauseException does not offer more useful information than the error message, right? Perhaps we could enrich the error message instead of introducing a new nonspecific exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712 thanks for your comment. 🙇♂️
I agree your opinion especially that 'we could enrich the error message instead of introducing a new nonspecific exception'.
However, Introducing a new nonspecific exception to solve this issue has has pros and cons.
Let me explain more.
The KAFKA-17019 requires that all org.apache.kafka.common.errors.TimeoutException should include root cause as a nested exception.
It’s hard to pinpoint the root cause in every situation where a TimeoutException occurs
kafka/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
Lines 405 to 418 in c6cf517
| accumulator.resetNextBatchExpiryTime(); | |
| List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now); | |
| List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now); | |
| expiredBatches.addAll(expiredInflightBatches); | |
| // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics | |
| // for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why | |
| // we need to reset the producer id here. | |
| if (!expiredBatches.isEmpty()) | |
| log.trace("Expired {} batches in accumulator", expiredBatches.size()); | |
| for (ProducerBatch expiredBatch : expiredBatches) { | |
| String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition | |
| + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation"; | |
| failBatch(expiredBatch, new TimeoutException(errorMessage), false); |
In here, expiredInflightBatches and expiredBatches can cause TimeoutException.
However, TimeoutException is thrown as a result of calculating elapsed time.
Therefore, expiredInflightBatches and expiredBatches don't have information about root cause.
If expiredInflight encounters some problem --
- Normal Case - No errors and succeed.
- Network Issue - Error might occurs after the expired period.
- a little bit slow network issue because of Bandwidth, and so on. - No errors and slow response.
- Busy CPU due to various reasons - No errors and called slowly.
There are many possible scenarios where a TimeoutException can occur, even with a simple analysis.
However, the expiredInFlight instance can actually throw an error in only 2 (Maybe connection closed, fail to establish connection, ...)
After spending a considerable amount of time reviewing the code and analyzing the TimeoutExceptions thrown by the Producer, I concluded that it's difficult to extract the root cause at every point where a TimeoutException is created.
Idea + Pros and Cons
Developers usually have a good understanding of what kind of error might occur in a given context.
Therefore, in cases where it's difficult to catch the actual root cause, it's possible to include an expected exception as the root cause instead.
Here’s a summary of the pros and cons (compared to simply enhancing the error message):
- pros : At call sites(For example,
kafka streams,kafka connect,kafka producer internaland so on) where aTimeoutExceptionis expected, the root cause can be used to handle different cases conditionally.- For example, you could create
NetworkPotentialCauseExceptionandCPUBusyPotentialCauseExceptionas subclasses ofPotentialCauseException, and handle different branches based on the root cause—branch A if the root cause is aNetworkPotentialCauseException, and branch B if it’s aCPUBusyPotentialCauseException.
- For example, you could create
- cons : Higher instance creation cost compared to
Stringinstance.
I spent quite a bit of time thinking through the direction of this PR.
What are your thoughts on it?
Please let me know. 🙇♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for your explanation. I agree that specific exception types could help developer to catch the actual root cause. My question is, do we really need PotentialCauseException to be the base exception for NetworkPotentialCauseException and CPUBusyPotentialCauseException? Perhaps KafkaException is good enough.
Another exception design uses TimeoutException as a parent class, similar to e032a36. The benefit of this is simplifying the code, since developers wouldn't need to check root cause of a TimeoutException from root cause of an ExecutionException 😄
thanks for bringing up this great discussion. I'd like to see kafka exception hierarchy becomr more developer-friendly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712
Thank you for considering my proposal and for sharing your valuable thoughts.
As you mentioned, I also think it’s a good idea to reuse an existing Exception class.
Personally, I think using KafkaException would be a better approach.
Since there can be multiple potential causes for a TimeoutException in certain code paths, it might be difficult to pinpoint the exact cause. In such cases, it could be unclear which subclass of TimeoutException should be used. (e032a36)
So, my suggestion is as follows:
- Include a
KafkaExceptionas the root cause of theTimeoutException, and describe the possible scenario in the error message of theKafkaException. - Let the detailed error information be available via the root cause of the
TimeoutException. - Keep the current message format of the
TimeoutExceptionitself (which currently only includes the elapsed time before it expired).
I’m thinking of revising the PR in this direction.
This way, I believe we can preserve the current semantics of TimeoutException while still conveying helpful contextual information (such as a potential cause) when necessary.
In the future, if there's a need to branch logic based on a more specific cause of the timeout—even if no actual exception was thrown—then the developer could define a concrete PotentialCauseException class as needed.
Also, if this direction sounds reasonable, I think it wouldn’t require a KIP change.
What do you think?
Please share your opinion 🙇♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The proposal of reusing KafkaException sounds good. +1 to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me also. There's no need to create a KIP for this approach and it gives just as much information to the user.
|
@AndrewJSchofield Thanks for your comments 🙇♂️ https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals Thanks in advance! |
Yes, that's right. You would follow the instructions in that wiki page and make a new KIP using the template. The discussion would proceed on the dev mailing list. For an interesting KIP, the community will generally not need any encouragement to get involved in the discussion. |
54df407 to
66215de
Compare
|
@chia7712 @AndrewJSchofield @frankvicky Hi! |
|
@chia7712 @AndrewJSchofield @frankvicky |
AndrewJSchofield
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. I think this is a good idea, but the consistency of the error messages could be better. I suggest defining something like:
private static final String INIT_TRANSACTIONS_TIME_OUT_MESSAGE = "InitTransactions timed out – could not discover the transaction coordinator or receive the InitProducerId response within max.block.ms (broker unavailable, network lag, or ACL denial).";
and the same for the other new error strings.
Then, you'll find it easier to use the same form for things like "ACL" and "ACL denial" and "ACL issue" and "auth denial" which I guess are all the same thing just written differently, and hard to see because the strings are spread across the code.
Yunyung
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Some comments left. Also, could you add tests to validate that the messages are thrown correctly as expected?
| } catch (TimeoutException ex) { | ||
| // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs | ||
| final String errorMessage = getErrorMessage(partitionsCount, topic, partition, maxWaitMs); | ||
| if (metadata.getError(topic) != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to refactor it like this:
Errors error = metadata.getError(topic);
if (error != null) {
cause = error.exception();
} else if (ex.getCause() != null) {
cause = ex.getCause();
} else {
cause = new KafkaException(METADATA_TIMEOUT_MSG);
}
throw new TimeoutException(errorMessage, cause);
Also, since ex.getCause() != null can never be true based on the current awaitUpdate implementation, do we need to check for it?
|
|
||
| public void await() { | ||
| this.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS); | ||
| this.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, () -> new KafkaException("Unknown reason.")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| this.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, () -> new KafkaException("Unknown reason.")); | |
| this.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Yunyung thanks for your suggestion. 🙇♂️
However, I would like to keep my current code for this review.
The purpose of KAFKA-17019 is to ensure that any TimeoutException thrown by ProducerClient always includes a root cause.
To elaborate, the root cause may sometimes appear meaningless, such as Unknown Error.
However, by doing so, every call site that catches a TimeoutException can use the root cause without having to check whether it is null or not.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, these two lines are exactly the same for your current change, right?
this.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS); (This will call public void await(long timeout, TimeUnit unit))
this.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS, () -> new KafkaException("Unknown reason."));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, My Bad...!!!
Thanks for pointing it out 🙇♂️
|
@AndrewJSchofield @Yunyung |
|
@AndrewJSchofield @chia7712 @Yunyung @frankvicky |
|
@chickenchickenlove please rebase code to include CI changes. |
b11a4f2 to
ddb777c
Compare
|
@chia7712 thanks for your comments! |
|
Also, I have something to tell you.
It seems that would only be possible if I propose a KIP to add a new API to the
and that KIP gets approved. What do you think? |
Are there situations where using a custom exception is necessary for clearer error reporting? |
chia7712
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chickenchickenlove thanks for updates. I have left a couple of comments
|
|
||
| private static final String INIT_TXN_TIMEOUT_MSG = | ||
| "InitTransactions timed out – could not discover the transaction coordinator or " | ||
| + "receive the InitProducerId response within max.block.ms (broker unavailable, " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto - could we avoid those specific root cause?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have revised it to make the content more general. 👍
| private static final String SEND_OFFSETS_TIMEOUT_MSG = | ||
| "SendOffsetsToTransaction timed out – unable to reach the consumer-group or " | ||
| + "transaction coordinator or to receive the TxnOffsetCommit/AddOffsetsToTxn response " | ||
| + "within max.block.ms (coordinator unavailable, rebalance in progress, network lag, or ACL denial)."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
Outdated
Show resolved
Hide resolved
|
@chia7712 , thanks for your comments.
// https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1224
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException {
...
try {
metadata.awaitUpdate(version, remainingWaitMs); // <----
}
...Here, a |
|
@chia7712 |
the case |
|
@chia7712 Ah, Sorry. You are right. |
|
@chia7712 Hi! |
| throw new TimeoutException(errorMessage, metadata.getError(topic).exception()); | ||
| } | ||
| throw new TimeoutException(errorMessage); | ||
| if (metadata.getError(topic) != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a comment on the origin PR https://github.com/apache/kafka/pull/16344/files#r2517753553. Understanding the origin design point would be very useful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712 thanks for your comments.
I will stay tuned!
If there is something to do, Please let me know, I'm willing to do so.
Thanks always! 🙇♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please revert this change? The non-retriable error should not be included within the retriable TimeoutException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712 thanks for digging into it!
I reverted!
I understood it this way and reverted the change.
TimeoutException is a type of RetryableException, but in this code, we only want to include it as the cause when a more explicit RetryableException is present.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After reading the discussion at #16344 (comment), I note that adding KafkaException creates a strange scenario where a retriable exception has a non-retriable error as its root cause. This could confuse users when handling the exception. Did we discuss simply enriching the exception message directly instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712
Gently ping and sorry to bother you 😢
Could you share your thoughts with me?
Thanks a lot 🙇♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have to say, my concern remains the inconsistent exception hierarchy. This inconsistency could lead users to mistakenly catch a non-retryable exception.
ping @mjsax WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @chia7712 !
I updated the PR based on your comments (reverted the KafkaException change and improved the log messages).
When you have time, please take another look.
Thank you again for your time 🙇♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chickenchickenlove thanks for your updates. I think current version is less controversial so we can accept it first. If there are more discussion later, we can modify the error structure (maybe it needs a KIP)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for taking the time to review this PR. 🙇♂️
Thanks to you, I learned a lot, and I’ll make sure to take these points into consideration when writing my next PR.
1b9dba0 to
aa7e6b9
Compare
|
I rebased on f685d57 (latest commit). |
|
@chia7712 Sorry to frequently mention you 😭 |
| public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread"; | ||
| public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics"; | ||
|
|
||
| private static final String INIT_TXN_TIMEOUT_MSG = "InitTransactions timed out — " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's stick to the ASCII hyphen - and avoid using the longer dashes – or —. I will prepare a patch for this change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for comments.
I'll make sure to keep this in mind for the next PR. Thanks in advance for your patch as well. 🙇♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't worry. a small issue won't bring down kafka:smile
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
open #21148
…he#20159) ### Changes - Add new Exception class `PotentialCauseException`. - All `org.apache.kafka.common.errors.TimeoutException` in `KafkaProducer` has `PotentialCauseException` as root cause if it cannot catch any exception. ### Describe `TimeoutException` can be thrown for various reasons. However, it is often difficult to identify the root cause, Because there are so many potential factors that can lead to a `TimeoutException`. For example: 1. The `ProducerClient` might be busy, so it may not be able to send the request in time. As a result, some batches may expire, leading to a `TimeoutException`. 2. The `broker` might be unavailable due to network issues or internal failures. 3. A request may be in flight, and although the broker successfully handles and responds to it, the response might arrive slightly late. As shown above, there are many possible causes. In some cases, no `exception` is caught in the `catch` block, and a `TimeoutException` is thrown simply by comparing the `elapsed time`. However, the developer using `TimeoutException` in `KafkaProducer` likely already knows which specific reasons could cause it in that context. Therefore, I think it would be helpful to include a `PotentialCauseException` that reflects the likely reason, based on the developer’s knowledge. Reviewers: TengYao Chi <kitingiao@gmail.com>, Yung <yungyung7654321@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
Changes
PotentialCauseException.org.apache.kafka.common.errors.TimeoutExceptioninKafkaProducerhasPotentialCauseExceptionas root cause if itcannot catch any exception.
Describe
TimeoutExceptioncan be thrown for various reasons.However, it is often difficult to identify the root cause,
Because there are so many potential factors that can lead to a
TimeoutException.For example:
ProducerClientmight be busy, so it may not be able to send therequest in time. As a result, some batches may expire, leading to a
TimeoutException.brokermight be unavailable due to network issues or internalfailures.
handles and responds to it, the response might arrive slightly late.
As shown above, there are many possible causes. In some cases, no
exceptionis caught in thecatchblock, and aTimeoutExceptionisthrown simply by comparing the
elapsed time. However, the developerusing
TimeoutExceptioninKafkaProducerlikely already knows whichspecific reasons could cause it in that context. Therefore, I think it
would be helpful to include a
PotentialCauseExceptionthat reflectsthe likely reason, based on the developer’s knowledge.
Reviewers: TengYao Chi kitingiao@gmail.com, Yung
yungyung7654321@gmail.com, Andrew Schofield
aschofield@confluent.io, Chia-Ping Tsai chia7712@gmail.com