feat: add event filtering framework with pre-deserialization MCL optimization#17593
feat: add event filtering framework with pre-deserialization MCL optimization#17593sgomezvillamor wants to merge 12 commits into
Conversation
…r and pre-deserialization KafkaSource optimization Introduces a pipeline-level `filters:` config section with AND semantics that runs before transformers. Adds the built-in `event_type` filter that supports per-type body predicates with OR semantics across event types and OR semantics across predicate list items. Key changes: - New `Filter` ABC, `FilterStats`, `match_util`, and `filter_registry` in `datahub_actions/filter/` - New `EventTypeFilter` plugin in `datahub_actions/plugin/filter/` - `PipelineConfig` gains `filters: List[FilterSpec]`; `filter:` section emits a `DeprecationWarning` and continues to run as a transformer for backwards compat - Pipeline always calls `source.set_filters(filters)`; `EventSource.set_filters()` defaults to no-op - `KafkaEventSource` gains `enable_pre_deserialization_filter` config flag; when enabled and an `EventTypeFilter` is present, drops MCL messages before the expensive `MetadataChangeLogClass.from_obj()` call - `FilterTransformer` now delegates matching to `match_util` (no behaviour change) - Docker config files for executor, doc_propagation, slack, teams migrated to `filters:` + `enable_pre_deserialization_filter: true` - Docs updated in `docs/actions/concepts.md` and `docs/actions/README.md` - Unit tests for `EventTypeFilter`, pipeline filter integration, and KafkaSource `set_filters()` https://claude.ai/code/session_018Pohzav3u7c3dLpZG2yeKV
…ix skip_mcl_entirely bug, restrict early criteria keys
- Rename enable_pre_deserialization_filter → enable_mcl_pre_deserialization_filter to clarify
that the optimization is MCL-only (ECE is unaffected because its fields are only accessible
after deserializing the PlatformEvent envelope)
- Fix _skip_mcl_entirely bug: previously, if enable_mcl_pre_deserialization_filter=true but no
EventTypeFilter was configured, ALL MCL events were silently dropped. Now _skip_mcl_entirely
is only set when at least one EventTypeFilter exists AND none include MCL
- Restrict early criteria keys to _MCL_EARLY_FILTER_FIELDS={entityType,aspectName,entityUrn,changeType}
— only top-level Avro scalar fields accessible without avrogen deserialization
- Add logger.debug traces in EventTypeFilter.matches() for all three decision paths
- Add debug/info logging in KafkaEventSource.set_filters() at each decision point
- Add test_handle_pe_not_affected_by_mcl_pre_deserialization_filter proving ECE delivery
is never affected regardless of _skip_mcl_entirely or _early_mcl_criteria state
- Add test_set_filters_noop_when_no_event_type_filter covering the bug fix scenario
- Update all 4 docker configs and docs to use renamed flag
https://claude.ai/code/session_018Pohzav3u7c3dLpZG2yeKV
…irely_rejected metric label - Log a warning when two OR predicate items have conflicting values for the same key so operators know that field won't benefit from pre-deserialization filtering - Rename MCL_EARLY_FILTER_METRIC result label from "rejected" to "entirely_rejected" for the _skip_mcl_entirely path to distinguish it from the criteria-miss "rejected" path, making the metric useful for dashboards without ambiguity https://claude.ai/code/session_018Pohzav3u7c3dLpZG2yeKV
…ments to slack/teams configs - Merge the two executor.yaml MCL predicates into one using a list aspectName value so match_util.matches_list() handles OR semantics within a single predicate; this avoids the conflicting-values warning and lets aspectName be included in early_criteria - Add 2-line comment to slack_action.yaml and teams_action.yaml explaining why enable_mcl_pre_deserialization_filter is effective: the filter has no MCL type so _skip_mcl_entirely fires and every MCL message is dropped before avrogen deserialization https://claude.ai/code/session_018Pohzav3u7c3dLpZG2yeKV
|
Linear: ING-2765 |
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
…nd test helper - Narrow EventTypeFilter.create() return annotation from Filter to EventTypeFilter so callers in tests don't get "Incompatible return value type" errors - Add Any type annotation to **fields in _env() test helper to satisfy no-untyped-def https://claude.ai/code/session_018Pohzav3u7c3dLpZG2yeKV
…e stats, and integration test
- EventTypeFilter.matches(): use 'in' to check key presence before retrieving the spec,
so {"EventType": null} correctly passes (type match, no body predicate) instead of
being treated identically to a missing key
- Pipeline.__init__: initialize self._stats = PipelineStats() as an instance attribute
so each pipeline gets isolated stats; previously _stats was a class-level singleton
shared across all instances, causing failure counts from one pipeline to bleed into
subsequent pipelines in tests
- test_filter_passes_all_when_no_spec: add test_transformer so TestAction.act() does
not throw KeyError on meta["test"]; add failed_event_count == 0 assertion
https://claude.ai/code/session_018Pohzav3u7c3dLpZG2yeKV
…t sharing PipelineStats had transformer_stats, filter_stats, and action_stats as class-level attributes. With a single Pipeline._stats class-level instance this was harmless. After making Pipeline create a fresh PipelineStats per instance, these dicts were shared across all PipelineStats instances, causing transformer processed_count to accumulate across pipelines and break test_run assertions. https://claude.ai/code/session_018Pohzav3u7c3dLpZG2yeKV
… nested objects Before adding __init__, action_stats/filter_stats/transformer_stats were class-level attributes absent from self.__dict__, so json.dumps(self.__dict__) only serialized scalars. Now that they are instance attributes, as_string() must enumerate only the scalar fields explicitly to preserve the original output and avoid TypeError. https://claude.ai/code/session_018Pohzav3u7c3dLpZG2yeKV
…stats classes - PipelineStats.as_string() now returns a full recursive snapshot including filter_stats, transformer_stats, and action_stats rather than only scalars - ActionStats, TransformerStats, FilterStats: replace self.__dict__ with explicit field enumeration so as_string() returns correct zero values even on freshly created instances that have never been mutated (class-level int defaults are not in self.__dict__ until first assignment) https://claude.ai/code/session_018Pohzav3u7c3dLpZG2yeKV
…terStats Replaces class-level int defaults with explicit instance attributes initialized in __init__, consistent with the PipelineStats fix. Eliminates reliance on the mutation-creates-instance-shadow behavior that made class-level sharing non-obvious. https://claude.ai/code/session_018Pohzav3u7c3dLpZG2yeKV
✅ Local Testing & Metrics ValidationI've tested this PR locally with Prometheus metrics enabled and can confirm the new MCL pre-deserialization filter feature is working as expected. Test Setup
Metrics ResultsAccumulated totals across running pipelines:
* Total Messages includes both MCL and PlatformEvent messages Filter Efficiency
Key Observations
Configuration VerifiedBoth test pipelines have The feature is production-ready from a metrics and functionality perspective! 🚀 |
| async_commit_interval: int = 10000 | ||
| commit_retry_count: int = 5 | ||
| commit_retry_backoff: float = 10.0 | ||
| enable_mcl_pre_deserialization_filter: bool = Field( |
There was a problem hiding this comment.
It seems that performance improvement is functionally equal to the old code, why would we want to disable it? Moreover - disable it by default. I think this would contribute to the over-complication of flags. I propose we enable the feature universally and not introduce the flag at all.
There was a problem hiding this comment.
Agreed - enabled by default in commit 39b0c7c
Having the flag for optional optimizations is still good practice though - it allows operators to:
- Disable if they encounter edge cases we didn't anticipate
- A/B test performance impact in production
- Gradually roll out optimizations with confidence
The flag provides a safety valve without requiring code changes or deployments.
| DeprecationWarning, | ||
| stacklevel=2, | ||
| ) | ||
| transforms.append(create_filter_transformer(config.filter, ctx)) |
There was a problem hiding this comment.
Two filter execution paths run in parallel: the deprecated filter: becomes a FilterTransformer here, while filters: runs through the new framework. Since match_util
already extracted the shared matching logic, the deprecated path could be auto-translated into a FilterSpec(type="event_type", ...) and pushed onto filters — removing FilterTransformer
and collapsing to a single code path. Backward-compatible, ~100 fewer lines, and the legacy config would even benefit from MCL pre-deserialization for free.
There was a problem hiding this comment.
For now, we warn it as deprecated and let users migrate at their own pace. An automatic forward migration can be considered when we actually remove FilterTransform.
Why not do it now? IMO FilterTransform is fundamentally flawed — it only covers one event type, while actions can consume multiple. I'd rather users migrate in a controlled way, which also nudges them to rethink their filters — many of which probably belong in the action itself, not formalized as a filter transform at all.
Is that a blocking concern?
| class EventTypeFilterSpec(ConfigModel): | ||
| """Body predicates for a single event type. | ||
|
|
||
| `event` is a list of body predicate dicts combined with OR semantics: | ||
| the event passes if it satisfies *any* predicate dict. Within a single | ||
| predicate dict all key/value pairs must match (AND semantics). | ||
|
|
||
| Omitting `event` means "pass on type match alone". | ||
| """ | ||
|
|
||
| event: Optional[List[Dict[str, Any]]] = Field(default=None) |
There was a problem hiding this comment.
Why can't we list the predicates directly, without key event?
There was a problem hiding this comment.
Initially this was shaped by the legacy filter transformer, which had:
event_type: str
event: Dict[str, Any]
and we just promoted event_type as a top-level key.
At some point I considered removing the event key as you suggest now, but kept it as a good practice, event it's the key for pattern matching constraints, leaving room to extend the spec with other constraint types in the future.
| filters: | ||
| - type: event_type | ||
| config: | ||
| filter: | ||
| # Map from event_type string to optional body predicates. | ||
| # The event passes if its type is listed here and satisfies the associated predicate. | ||
| <event-type-string>: | ||
| event: | ||
| # List of predicate dicts — OR across list items, AND within each dict. | ||
| - <field>: <value> |
There was a problem hiding this comment.
I am confused about new filters structure. What would be the use-case for filters have more than one item?
There was a problem hiding this comment.
Filters with a clear, single responsibility are more reusable and composable.
It's the same pattern than transforms.
Currently, an event must match all filters: AND behavior.
In the future, we could add an OR mode at the pipeline level, but that's not needed yet, so not included.
There was a problem hiding this comment.
I understand, but considering an event is always of one type (is it?), then why would we want to have several filters? We still can list explicitly the field combinations via event list, right?
It would make sense if filters were OR, unless I do not understand something.
There was a problem hiding this comment.
A list of filters doesn't seem necessary now because we only have one filter implementation (EventTypeFilter), so concatenating multiple instances would be unnecessary. Indeed.
However, users can already register and use custom filters via the plugin registry, and we may add new filter implementations in the future. In that case, having a collection of filters becomes essential.
Also, as said before, the AND semantics across filters (event must pass ALL filters) enables powerful composition patterns - like combining EventTypeFilter with eg future RateLimitFilter or WhateverFancyFilter implementations.
|
I haven't confirmed it practically, but Claude insists that the optimization path does not handle correct below case: The minimal failing case is one EventTypeFilter, two OR predicates, where the predicates don't have the same set of keys: filters:
- type: event_type
config:
filter:
MetadataChangeLogEvent_v1:
event:
- entityType: dataset # predicate 1 — no aspectName constraint
- entityType: chart
aspectName: documentation # predicate 2 — has aspectNamePipeline filter (per Now trace set_filters merge on mcl_predicates =
So the early filter ends up requiring aspectName=documentation. The MCL above ( |
…lization filter The previous implementation incorrectly merged OR predicates using AND logic, causing silent data loss when predicates had different key sets or conflicting values. Changes: - Changed _early_mcl_criteria from single dict to _early_mcl_criteria_list (list of dicts) - Implemented correct OR semantics: pass if ANY predicate matches - Conservative by design: extracts extractable fields, ignores complex ones - Optimization disabled if ANY predicate has no extractable fields (OR semantics requirement) - Added comprehensive docstring explaining conservative design with examples Tests: - Updated existing tests for new data structure - Added test_kafka_source_conservative_early_filtering.py with 3 comprehensive tests - Added nested/deep matching tests to test_event_type_filter.py - All 26 tests passing Fixes #17593 comment: #17593 (comment) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
|
@skrydal Excellent catch! You're absolutely right. The conservative approach of the early filtering was not properly implemented. The bug was in the merge logic attempting to combine OR predicates using AND semantics. The Issue: The Fix:
Fix and comprehensive tests added in commit 191dc0c |
Changed enable_mcl_pre_deserialization_filter default from False to True. With the conservative OR semantics fix, the optimization is now safe to enable by default and will provide performance benefits for most pipelines. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Overview
This PR introduces a comprehensive event filtering framework for DataHub Actions pipelines, with a special optimization for MetadataChangeLog (MCL) events that enables pre-deserialization filtering to reduce processing overhead.
Key Changes
1. New Filter Framework
Filterbase class (datahub_actions/filter/filter.py): Abstract base for all pipeline filtersEventTypeFilter(datahub_actions/plugin/filter/event_type_filter.py): Pluggable filter implementation that matches events by type and body predicates with OR/AND semanticsfilter_registry.py,match_util.py,filter_stats.pyfor filter discovery and matching logic2. MCL Pre-Deserialization Optimization
enable_mcl_pre_deserialization_filterconfig flag toKafkaEventSourceConfigset_filters()method inKafkaEventSourceto extract filter criteria for MCL eventsentityType,aspectName,entityUrn,changeType)MCL_EARLY_FILTER_METRIC) for pre-deserialization filter results3. Configuration Updates
filters:section in pipeline YAML (replaces deprecatedfilter:section)docker/datahub-actions/config/to use new filter syntaxfilter:section4. Documentation
docs/actions/README.mdwith new filter configuration examplesdocs/actions/concepts.mdto document the Filter component and its role in pipelines5. Testing
EventTypeFiltermatching logic (tests/unit/plugin/filter/test_event_type_filter.py)tests/unit/filter/test_filter_pipeline_integration.py)tests/unit/plugin/source/kafka/test_kafka_source.py)Design Rationale
Why pre-deserialization filtering for MCL only?
entityType,aspectName, etc.) accessible at the Avro message level without deserializationFilter semantics:
Breaking Changes
The deprecated
filter:section in pipeline YAML is replaced byfilters:(list-based). Old configs will emit a deprecation warning but continue to work via theFilterTransformer.Test Coverage
EventTypeFiltercovering type matching, body predicates, and predicate combinationshttps://claude.ai/code/session_018Pohzav3u7c3dLpZG2yeKV