Skip to content

Conversation

@adriangb
Copy link
Contributor

@adriangb adriangb commented Jan 27, 2026

Replaces #18192 using the APIs in #19437.

Similar to #18192 the end goal here is specifically to enable deduplication of DynamicFilterPhysicalExpr so that distributed query engines can get one step closer to using dynamic filters.

Because it's actually simpler we apply this deduplication to all PhysicalExprs with the added benefit that we more faithfully preserve the original expression tree (instead of adding new duplicate branches) which will have the immediate impact of e.g. not duplicating large InListExprs.

@github-actions github-actions bot added documentation Improvements or additions to documentation proto Related to proto crate labels Jan 27, 2026
@adriangb adriangb changed the title Dedupe expressions Preserve PhysicalExpr graph in proto round trip using Arc pointers as unique identifiers Jan 27, 2026
@adriangb adriangb marked this pull request as ready for review January 27, 2026 20:45
@adriangb
Copy link
Contributor Author

@milenkovicm I'm curious what you think of this now with the simpler implementation thanks to Tim's refactor?

I think your concern about re-use and pointer collision still exists. I added some comments to try to address this. If you think it's still a concern we could add an optional method on the trait called reset_state() or something that top level functions call automatically? Or we could make them accept an owned value instead of a reference.

@milenkovicm
Copy link
Contributor

Will have a look, but still believe if it is needed distributed engine should implement it not in DataFusion.

Could that be an option?

///
/// This cache should only be used for a single plan deserialization.
/// Create a new converter instance for each plan to avoid cross-plan collisions.
dedup_cache: RwLock<HashMap<u64, Arc<dyn PhysicalExpr>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still believe this should not be pushed as default for all users. If it is needed for distribution, it should be implemented by specific distribution engine

@adriangb
Copy link
Contributor Author

adriangb commented Jan 27, 2026

Will have a look, but still believe if it is needed distributed engine should implement it not in DataFusion.

Could that be an option?

Thanks for taking a look.

It could be, but as per the PR title I think there is actually a real benefit for DataFusion as well.

There are cases of serializing and deserializing plans that do not involve distributed engines, I don't personally use the Python APIs / FFI but I think those users also use protobuf serde.

Basically at the moment dynamic filters (a built in feature of DataFusion that is critical to query performance) and protobuf serde (a built in feature of DataFusion) are incompatible. And although not completely broken we do not do faithful serde of expression and execution node graphs/trees at the moment, we create duplicates trees/branches when deserializing that don't exist in the original version. This can cause duplication of large InList expressions, large scalar values, and possibly other negative consequences.

I also don't see any harm to other users of DataFusion. This change is fully contained within the protobuf serde code, so it definitely has no impact on users that don't use that part of DataFusion. For users that do use that part the only breaking change is the extra field in the proto, and most users are not creating proto structs by hand (they are created by our functions). So I don't think there are really any breaking changes. There is also no performance impact (getting Arc pointers is a very cheap operation and I suspect - but have not run benchmarks to prove it - the cost of setting a O(num_expressions) u64 keys in a HashMap is going to be negligible compared to other serde overhead).

This is my reasoning for implementing this in DataFusion.

@milenkovicm
Copy link
Contributor

I still have same concerns like last time.
I don't mind having caching implementation as the part of df but it should not be default behaviour, let the users choose that behavior if it is needed. I guess that's the whole point of Tim's change

@adriangb
Copy link
Contributor Author

adriangb commented Jan 27, 2026

What problems do you see with having it as the default behavior? Maybe I can address them if I understand them better. I know last time a big concern was the code churn but I think that has been addressed by Tim's work.

If I provide a case of broken behavior without it that doesn't even involve dynamic filters, would that be compelling?

If we can't come to an agreement I am open to making it an optional choice in DataFusion, and I am glad you're open to at least including it. I think it will be a shame if we have to tell users "go set this config option for things to work properly" but it's better than nothing.

@milenkovicm
Copy link
Contributor

My main fear that as we do not control id generation it might introduce spurious, very hard to debug errors. Probably my PTSD from debugging similar grpc issue.

Also, we're talking about very advanced use case, where users probably know what they're doing, so they can opt in to this feature should they need it.

Anyway, I think we both have a valid point, and I do agree it would make sense introducing it for features consistency

@milenkovicm
Copy link
Contributor

Also, this api does not prevent converter reusability, so user can keep default converter static

@adriangb
Copy link
Contributor Author

adriangb commented Jan 27, 2026

Also, this api does not prevent converter reusability, so user can keep default converter static

I think I can address this. I add recursion depth tracking that will reset the cache after each plan is deserialized in 3ae26af. That should make it impossible to misuse. This also ensures the cache memory usage is ~ 0 bytes. No API changes required. Wdyt?

@adriangb adriangb force-pushed the dedupe-expressions branch 2 times, most recently from 1dd0fbe to 3ae26af Compare January 28, 2026 03:01
@milenkovicm
Copy link
Contributor

I believe converter should not be shared between thread as well, my guess that it's Send and Sync at the moment.

As converter is pluggable, my suggestion would be to implement caching implementation in datafusion-distributed give it a run for few months, battle test it and the we could re-open this conversation again or we could implement it here, as a CachingConverter but keep DefaultConverter as default and noop (as it was)

@adriangb
Copy link
Contributor Author

We may have found the limitation in datafusion-distributed but this is fixing a bug in DataFusion vanilla. As such I think it should eventually be merged into DataFusion. Most bug reports and changes in DataFusion are discovered in this way, this one is no different. We hit this bug in our system and we don’t use datafusion-distributed.

Thank you for giving the option of implement it here as an optional feature. I think it’s important that it be included here since it’s fixing a bug that has nothing to do with datafusion-distributed

I’ll look into Send/Sync. It should be easy to prohibit that. I can also add a salt to make the ids non collideable (or vanishingly small probability of collision in the u64 space)

@milenkovicm
Copy link
Contributor

I don't really think this is a bug. And personally all this "hacks" to make this somewhat sound is exactly why i don't believe this should be pushed as the default. Thats the main reason why we made it as pluggable in the first place

@adriangb
Copy link
Contributor Author

I don’t see how this isn’t a bug. You roundtrip a plan through protobuf and it becomes 1000x slower or uses 1000x more memory. Does that not impact users?

In my mind the reason to make the change was so that we could implement this sort of thing in the first place, not so it could live somewhere else. The pluggable aspect was to support serialization of unknown dynamic hooks like SchemaAdapter.

@adriangb
Copy link
Contributor Author

And personally all this "hacks" to make this somewhat sound is exactly why i don't believe this should be pushed as the default

Maybe we can address it. What is unsound? What is the “hacks”?

@milenkovicm
Copy link
Contributor

As far as I remember outcome of our discussion was to make this interface pluggable, and if it the cache is needed users can implement. That was on the community call few months back. I believe that with Tim's PR this discussion was closed.
I was very surprised that you have open this discussion again.

Converter is not consumed, it can be reused, that is the core problem. It can also be reused across multiple threads, "reset depth" is broken from that perspective as well.

From the perspective of user impacts, I see no complain. I do not remember seeing plan ser/de in any heat map as a problem

@adriangb
Copy link
Contributor Author

From the perspective of user impacts, I see no complain. I do not remember seeing plan ser/de in any heat map as a problem

It's not that the serde itself is slow (although this PR does improve it considerably). It's that the plan looses essential information for it to perform correctly / not allocate extra memory. Thus when you execute it the execution will be orders of magnitude slower. I can give you concrete examples of queries impacted: select max(col) from t, select ... from t order by col limit 10, select ... from t1 join t2 on t1.pk = t2.t1_pk and many others.

I think you may not see complaints about it because (1) it's very hard to detect for users that something never got 1000x faster for them (i.e. they don't see the lack of improvement from upgrading from 51.0 to 52.0, it's not a regression for them), (2) the speedups that this cancels out are relatively new (51.0 and 52.0) and (3) in cases like the duplication of InList expressions (which has been an issue for a long time) while the extra memory consumption is theoretically unlimited real world in a system it's probably only a couple % on average; I'm sure users would love to get 5% less memory consumption but it's also very hard to detect that there's an extra 5% memory consumption being left on the table, especially if you have nothing to compare to.

@adriangb
Copy link
Contributor Author

As far as I remember outcome of our discussion was to make this interface pluggable, and if it the cache is needed users can implement. That was on the community call few months back. I believe that with Tim's PR this discussion was closed.
I was very surprised that you have open this discussion again.

I don't think that's the case. Tim's PR helped make this work easier, and it did resolve another long standing bug w/ an inability to preserve extension hooks on scans, but it did not resolve this bug hence why I am bringing it up again. My hope was that with the new APIs allowing a much smaller diff and no breaking changes for users we would have addressed your concerns.

@adriangb
Copy link
Contributor Author

Converter is not consumed, it can be reused, that is the core problem. It can also be reused across multiple threads, "reset depth" is broken from that perspective as well.

I will make some changes to add the salt and make it even safer to re-use, as well as making it non Send. I will also make it non-default.

I don't expect you to accept the changes just because of it, but I'd appreciate if you could consider them. If they still don't meet your bar for inclusion in DataFusion we can leave this as an open bug and we'll hack a fix in externally.

@milenkovicm
Copy link
Contributor

Core issues of the id cache are still there, nothing really changed there (id generation, ability to mess up with "converter" usage). Change to make "converter" pluggable is to enable cache without pushing it on everyone, yet we again have this discussion again.

@milenkovicm
Copy link
Contributor

Converter is not consumed, it can be reused, that is the core problem. It can also be reused across multiple threads, "reset depth" is broken from that perspective as well.

I will make some changes to add the salt and make it even safer to re-use, as well as making it non Send. I will also make it non-default.

I don't expect you to accept the changes just because of it, but I'd appreciate if you could consider them. If they still don't meet your bar for inclusion in DataFusion we can leave this as an open bug and we'll hack a fix in externally.

I have nothing agains keeping default implementation as it was and adding caching implementation to public api, in that case its up to users to make decision what they want to do with it (and it does not matter if it is Send or not, maybe users will find a way to generate ids in some other way and it could be multi thread).

Opt in option would mean users understand whats the sharp edges of cached implementation is, and they have to take care not to abuse it

would that make sense ?

@adriangb
Copy link
Contributor Author

Converter is not consumed, it can be reused, that is the core problem. It can also be reused across multiple threads, "reset depth" is broken from that perspective as well.

I will make some changes to add the salt and make it even safer to re-use, as well as making it non Send. I will also make it non-default.
I don't expect you to accept the changes just because of it, but I'd appreciate if you could consider them. If they still don't meet your bar for inclusion in DataFusion we can leave this as an open bug and we'll hack a fix in externally.

I have nothing agains keeping default implementation as it was and adding caching implementation to public api, in that case its up to users to make decision what they want to do with it (and it does not matter if it is Send or not, maybe users will find a way to generate ids in some other way and it could be multi thread).

Opt in option would mean users understand whats the sharp edges of cached implementation is, and they have to take care not to abuse it

would that make sense ?

Yes that sounds good, I'll change the caching to be optional. If we find there are no major issues and if there are user complaints about having to set the option then we can make it the default in the future.

@milenkovicm
Copy link
Contributor

milenkovicm commented Jan 28, 2026

users could just decide to use it on compile time, place in the code where they call proto ser/de they can just call _with_converter with cached implementation, I believe there is single place in the code where serialisation is done and one where deserialization is done.

its up to implementors to decide which one they use

@milenkovicm
Copy link
Contributor

milenkovicm commented Jan 28, 2026

"datafusion distributed" should use "cached" implementation directly, i don't believe it should be configurable.

@github-actions github-actions bot removed the documentation Improvements or additions to documentation label Jan 28, 2026
@adriangb
Copy link
Contributor Author

adriangb commented Jan 28, 2026

@milenkovicm I've pushed these changes mentioned above.

In particular:

  1. I removed recursion tracking. It now uses a cleaner approach of creating throwaway serializers/deserializers.
  2. I made it robust to collisions even under API misuses as far as I can tell. Secifically sharing across threads is safe and combining multiple proto payloads from different serializations and deserializing together is safe (or at least the probability of a collision is vanishingly small).
  3. I split out the caching stuff into it's own implementation. The default implementation is now unchanged.
  4. The change is <300LOC; the tests are most of the diff.

Copy link
Contributor

@milenkovicm milenkovicm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @adriangb

@adriangb
Copy link
Contributor Author

Thank you for the patience and review Marko. I'll leave this open for another day or so before merging to allow more feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

proto Related to proto crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants