-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Preserve PhysicalExpr graph in proto round trip using Arc pointers as unique identifiers #20037
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
@milenkovicm I'm curious what you think of this now with the simpler implementation thanks to Tim's refactor? I think your concern about re-use and pointer collision still exists. I added some comments to try to address this. If you think it's still a concern we could add an optional method on the trait called |
|
Will have a look, but still believe if it is needed distributed engine should implement it not in DataFusion. Could that be an option? |
0c4e1aa to
5f57a8c
Compare
| /// | ||
| /// This cache should only be used for a single plan deserialization. | ||
| /// Create a new converter instance for each plan to avoid cross-plan collisions. | ||
| dedup_cache: RwLock<HashMap<u64, Arc<dyn PhysicalExpr>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still believe this should not be pushed as default for all users. If it is needed for distribution, it should be implemented by specific distribution engine
Thanks for taking a look. It could be, but as per the PR title I think there is actually a real benefit for DataFusion as well. There are cases of serializing and deserializing plans that do not involve distributed engines, I don't personally use the Python APIs / FFI but I think those users also use protobuf serde. Basically at the moment dynamic filters (a built in feature of DataFusion that is critical to query performance) and protobuf serde (a built in feature of DataFusion) are incompatible. And although not completely broken we do not do faithful serde of expression and execution node graphs/trees at the moment, we create duplicates trees/branches when deserializing that don't exist in the original version. This can cause duplication of large I also don't see any harm to other users of DataFusion. This change is fully contained within the protobuf serde code, so it definitely has no impact on users that don't use that part of DataFusion. For users that do use that part the only breaking change is the extra field in the proto, and most users are not creating proto structs by hand (they are created by our functions). So I don't think there are really any breaking changes. There is also no performance impact (getting Arc pointers is a very cheap operation and I suspect - but have not run benchmarks to prove it - the cost of setting a O(num_expressions) u64 keys in a HashMap is going to be negligible compared to other serde overhead). This is my reasoning for implementing this in DataFusion. |
|
I still have same concerns like last time. |
|
What problems do you see with having it as the default behavior? Maybe I can address them if I understand them better. I know last time a big concern was the code churn but I think that has been addressed by Tim's work. If I provide a case of broken behavior without it that doesn't even involve dynamic filters, would that be compelling? If we can't come to an agreement I am open to making it an optional choice in DataFusion, and I am glad you're open to at least including it. I think it will be a shame if we have to tell users "go set this config option for things to work properly" but it's better than nothing. |
|
My main fear that as we do not control id generation it might introduce spurious, very hard to debug errors. Probably my PTSD from debugging similar grpc issue. Also, we're talking about very advanced use case, where users probably know what they're doing, so they can opt in to this feature should they need it. Anyway, I think we both have a valid point, and I do agree it would make sense introducing it for features consistency |
|
Also, this api does not prevent converter reusability, so user can keep default converter static |
I think I can address this. I add recursion depth tracking that will reset the cache after each plan is deserialized in 3ae26af. That should make it impossible to misuse. This also ensures the cache memory usage is ~ 0 bytes. No API changes required. Wdyt? |
1dd0fbe to
3ae26af
Compare
|
I believe converter should not be shared between thread as well, my guess that it's As converter is pluggable, my suggestion would be to implement caching implementation in |
|
We may have found the limitation in Thank you for giving the option of implement it here as an optional feature. I think it’s important that it be included here since it’s fixing a bug that has nothing to do with I’ll look into Send/Sync. It should be easy to prohibit that. I can also add a salt to make the ids non collideable (or vanishingly small probability of collision in the u64 space) |
|
I don't really think this is a bug. And personally all this "hacks" to make this somewhat sound is exactly why i don't believe this should be pushed as the default. Thats the main reason why we made it as pluggable in the first place |
|
I don’t see how this isn’t a bug. You roundtrip a plan through protobuf and it becomes 1000x slower or uses 1000x more memory. Does that not impact users? In my mind the reason to make the change was so that we could implement this sort of thing in the first place, not so it could live somewhere else. The pluggable aspect was to support serialization of unknown dynamic hooks like SchemaAdapter. |
Maybe we can address it. What is unsound? What is the “hacks”? |
|
As far as I remember outcome of our discussion was to make this interface pluggable, and if it the cache is needed users can implement. That was on the community call few months back. I believe that with Tim's PR this discussion was closed. Converter is not consumed, it can be reused, that is the core problem. It can also be reused across multiple threads, "reset depth" is broken from that perspective as well. From the perspective of user impacts, I see no complain. I do not remember seeing plan ser/de in any heat map as a problem |
It's not that the serde itself is slow (although this PR does improve it considerably). It's that the plan looses essential information for it to perform correctly / not allocate extra memory. Thus when you execute it the execution will be orders of magnitude slower. I can give you concrete examples of queries impacted: I think you may not see complaints about it because (1) it's very hard to detect for users that something never got 1000x faster for them (i.e. they don't see the lack of improvement from upgrading from 51.0 to 52.0, it's not a regression for them), (2) the speedups that this cancels out are relatively new (51.0 and 52.0) and (3) in cases like the duplication of InList expressions (which has been an issue for a long time) while the extra memory consumption is theoretically unlimited real world in a system it's probably only a couple % on average; I'm sure users would love to get 5% less memory consumption but it's also very hard to detect that there's an extra 5% memory consumption being left on the table, especially if you have nothing to compare to. |
I don't think that's the case. Tim's PR helped make this work easier, and it did resolve another long standing bug w/ an inability to preserve extension hooks on scans, but it did not resolve this bug hence why I am bringing it up again. My hope was that with the new APIs allowing a much smaller diff and no breaking changes for users we would have addressed your concerns. |
I will make some changes to add the salt and make it even safer to re-use, as well as making it non Send. I will also make it non-default. I don't expect you to accept the changes just because of it, but I'd appreciate if you could consider them. If they still don't meet your bar for inclusion in DataFusion we can leave this as an open bug and we'll hack a fix in externally. |
|
Core issues of the id cache are still there, nothing really changed there (id generation, ability to mess up with "converter" usage). Change to make "converter" pluggable is to enable cache without pushing it on everyone, yet we again have this discussion again. |
I have nothing agains keeping default implementation as it was and adding caching implementation to public api, in that case its up to users to make decision what they want to do with it (and it does not matter if it is Send or not, maybe users will find a way to generate ids in some other way and it could be multi thread). Opt in option would mean users understand whats the sharp edges of cached implementation is, and they have to take care not to abuse it would that make sense ? |
Yes that sounds good, I'll change the caching to be optional. If we find there are no major issues and if there are user complaints about having to set the option then we can make it the default in the future. |
|
users could just decide to use it on compile time, place in the code where they call proto ser/de they can just call its up to implementors to decide which one they use |
|
"datafusion distributed" should use "cached" implementation directly, i don't believe it should be configurable. |
|
@milenkovicm I've pushed these changes mentioned above. In particular:
|
48a1ca5 to
497cd34
Compare
milenkovicm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @adriangb
|
Thank you for the patience and review Marko. I'll leave this open for another day or so before merging to allow more feedback. |
Replaces #18192 using the APIs in #19437.
Similar to #18192 the end goal here is specifically to enable deduplication of
DynamicFilterPhysicalExprso that distributed query engines can get one step closer to using dynamic filters.Because it's actually simpler we apply this deduplication to all
PhysicalExprs with the added benefit that we more faithfully preserve the original expression tree (instead of adding new duplicate branches) which will have the immediate impact of e.g. not duplicating largeInListExprs.