Skip to content

feat: add event filtering framework with pre-deserialization MCL optimization#17593

Open
sgomezvillamor wants to merge 12 commits into
masterfrom
claude/datahub-actions-config-plan-35owc
Open

feat: add event filtering framework with pre-deserialization MCL optimization#17593
sgomezvillamor wants to merge 12 commits into
masterfrom
claude/datahub-actions-config-plan-35owc

Conversation

@sgomezvillamor
Copy link
Copy Markdown
Contributor

Overview

This PR introduces a comprehensive event filtering framework for DataHub Actions pipelines, with a special optimization for MetadataChangeLog (MCL) events that enables pre-deserialization filtering to reduce processing overhead.

Key Changes

1. New Filter Framework

  • Filter base class (datahub_actions/filter/filter.py): Abstract base for all pipeline filters
  • EventTypeFilter (datahub_actions/plugin/filter/event_type_filter.py): Pluggable filter implementation that matches events by type and body predicates with OR/AND semantics
  • Filter registry and utilities: filter_registry.py, match_util.py, filter_stats.py for filter discovery and matching logic
  • Pipeline integration: Filters are now evaluated before transformers with AND semantics across multiple filters

2. MCL Pre-Deserialization Optimization

  • Added enable_mcl_pre_deserialization_filter config flag to KafkaEventSourceConfig
  • Implemented set_filters() method in KafkaEventSource to extract filter criteria for MCL events
  • MCL events can be dropped before expensive avrogen deserialization by checking top-level Avro fields (entityType, aspectName, entityUrn, changeType)
  • Added metrics tracking (MCL_EARLY_FILTER_METRIC) for pre-deserialization filter results

3. Configuration Updates

  • New filters: section in pipeline YAML (replaces deprecated filter: section)
  • Supports multiple filters with pluggable types
  • Updated example configs in docker/datahub-actions/config/ to use new filter syntax
  • Added deprecation warning for old filter: section

4. Documentation

  • Updated docs/actions/README.md with new filter configuration examples
  • Updated docs/actions/concepts.md to document the Filter component and its role in pipelines
  • Clarified filter execution order (before transformers)

5. Testing

  • Comprehensive unit tests for EventTypeFilter matching logic (tests/unit/plugin/filter/test_event_type_filter.py)
  • Integration tests for filter pipeline behavior (tests/unit/filter/test_filter_pipeline_integration.py)
  • Extended Kafka source tests with pre-deserialization filter scenarios (tests/unit/plugin/source/kafka/test_kafka_source.py)

Design Rationale

Why pre-deserialization filtering for MCL only?

  • MCL events have scalar fields (entityType, aspectName, etc.) accessible at the Avro message level without deserialization
  • EntityChangeEvent (ECE) events arrive JSON-encoded inside a PlatformEvent envelope, requiring full deserialization to inspect
  • This optimization provides significant performance gains for MCL-heavy pipelines with selective filtering

Filter semantics:

  • Multiple filters: AND (event must pass all filters)
  • Multiple event types in a filter: OR (event type must match any listed type)
  • Multiple body predicates: OR (event body must satisfy any predicate)
  • Within a predicate: AND (all key/value pairs must match)

Breaking Changes

The deprecated filter: section in pipeline YAML is replaced by filters: (list-based). Old configs will emit a deprecation warning but continue to work via the FilterTransformer.

Test Coverage

  • Unit tests for EventTypeFilter covering type matching, body predicates, and predicate combinations
  • Integration tests verifying filter execution order and interaction with transformers
  • Kafka source tests validating pre-deserialization filter behavior and metrics
  • All existing tests pass with new filter framework integrated

https://claude.ai/code/session_018Pohzav3u7c3dLpZG2yeKV

claude added 4 commits May 26, 2026 16:00
…r and pre-deserialization KafkaSource optimization

Introduces a pipeline-level `filters:` config section with AND semantics that runs before transformers. Adds the built-in `event_type` filter that supports per-type body predicates with OR semantics across event types and OR semantics across predicate list items.

Key changes:
- New `Filter` ABC, `FilterStats`, `match_util`, and `filter_registry` in `datahub_actions/filter/`
- New `EventTypeFilter` plugin in `datahub_actions/plugin/filter/`
- `PipelineConfig` gains `filters: List[FilterSpec]`; `filter:` section emits a `DeprecationWarning` and continues to run as a transformer for backwards compat
- Pipeline always calls `source.set_filters(filters)`; `EventSource.set_filters()` defaults to no-op
- `KafkaEventSource` gains `enable_pre_deserialization_filter` config flag; when enabled and an `EventTypeFilter` is present, drops MCL messages before the expensive `MetadataChangeLogClass.from_obj()` call
- `FilterTransformer` now delegates matching to `match_util` (no behaviour change)
- Docker config files for executor, doc_propagation, slack, teams migrated to `filters:` + `enable_pre_deserialization_filter: true`
- Docs updated in `docs/actions/concepts.md` and `docs/actions/README.md`
- Unit tests for `EventTypeFilter`, pipeline filter integration, and KafkaSource `set_filters()`

https://claude.ai/code/session_018Pohzav3u7c3dLpZG2yeKV
…ix skip_mcl_entirely bug, restrict early criteria keys

- Rename enable_pre_deserialization_filter → enable_mcl_pre_deserialization_filter to clarify
  that the optimization is MCL-only (ECE is unaffected because its fields are only accessible
  after deserializing the PlatformEvent envelope)
- Fix _skip_mcl_entirely bug: previously, if enable_mcl_pre_deserialization_filter=true but no
  EventTypeFilter was configured, ALL MCL events were silently dropped. Now _skip_mcl_entirely
  is only set when at least one EventTypeFilter exists AND none include MCL
- Restrict early criteria keys to _MCL_EARLY_FILTER_FIELDS={entityType,aspectName,entityUrn,changeType}
  — only top-level Avro scalar fields accessible without avrogen deserialization
- Add logger.debug traces in EventTypeFilter.matches() for all three decision paths
- Add debug/info logging in KafkaEventSource.set_filters() at each decision point
- Add test_handle_pe_not_affected_by_mcl_pre_deserialization_filter proving ECE delivery
  is never affected regardless of _skip_mcl_entirely or _early_mcl_criteria state
- Add test_set_filters_noop_when_no_event_type_filter covering the bug fix scenario
- Update all 4 docker configs and docs to use renamed flag

https://claude.ai/code/session_018Pohzav3u7c3dLpZG2yeKV
…irely_rejected metric label

- Log a warning when two OR predicate items have conflicting values for the same
  key so operators know that field won't benefit from pre-deserialization filtering
- Rename MCL_EARLY_FILTER_METRIC result label from "rejected" to "entirely_rejected"
  for the _skip_mcl_entirely path to distinguish it from the criteria-miss "rejected"
  path, making the metric useful for dashboards without ambiguity

https://claude.ai/code/session_018Pohzav3u7c3dLpZG2yeKV
…ments to slack/teams configs

- Merge the two executor.yaml MCL predicates into one using a list aspectName value
  so match_util.matches_list() handles OR semantics within a single predicate; this
  avoids the conflicting-values warning and lets aspectName be included in early_criteria
- Add 2-line comment to slack_action.yaml and teams_action.yaml explaining why
  enable_mcl_pre_deserialization_filter is effective: the filter has no MCL type so
  _skip_mcl_entirely fires and every MCL message is dropped before avrogen deserialization

https://claude.ai/code/session_018Pohzav3u7c3dLpZG2yeKV
@github-actions github-actions Bot added docs Issues and Improvements to docs devops PR or Issue related to DataHub backend & deployment labels May 27, 2026
@github-actions
Copy link
Copy Markdown
Contributor

Linear: ING-2765

@codecov
Copy link
Copy Markdown

codecov Bot commented May 27, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ All tests successful. No failed tests found.

📢 Thoughts on this report? Let us know!

…nd test helper

- Narrow EventTypeFilter.create() return annotation from Filter to EventTypeFilter
  so callers in tests don't get "Incompatible return value type" errors
- Add Any type annotation to **fields in _env() test helper to satisfy no-untyped-def

https://claude.ai/code/session_018Pohzav3u7c3dLpZG2yeKV
…e stats, and integration test

- EventTypeFilter.matches(): use 'in' to check key presence before retrieving the spec,
  so {"EventType": null} correctly passes (type match, no body predicate) instead of
  being treated identically to a missing key
- Pipeline.__init__: initialize self._stats = PipelineStats() as an instance attribute
  so each pipeline gets isolated stats; previously _stats was a class-level singleton
  shared across all instances, causing failure counts from one pipeline to bleed into
  subsequent pipelines in tests
- test_filter_passes_all_when_no_spec: add test_transformer so TestAction.act() does
  not throw KeyError on meta["test"]; add failed_event_count == 0 assertion

https://claude.ai/code/session_018Pohzav3u7c3dLpZG2yeKV
…t sharing

PipelineStats had transformer_stats, filter_stats, and action_stats as class-level
attributes. With a single Pipeline._stats class-level instance this was harmless.
After making Pipeline create a fresh PipelineStats per instance, these dicts were
shared across all PipelineStats instances, causing transformer processed_count to
accumulate across pipelines and break test_run assertions.

https://claude.ai/code/session_018Pohzav3u7c3dLpZG2yeKV
… nested objects

Before adding __init__, action_stats/filter_stats/transformer_stats were class-level
attributes absent from self.__dict__, so json.dumps(self.__dict__) only serialized
scalars. Now that they are instance attributes, as_string() must enumerate only the
scalar fields explicitly to preserve the original output and avoid TypeError.

https://claude.ai/code/session_018Pohzav3u7c3dLpZG2yeKV
claude added 2 commits May 27, 2026 12:45
…stats classes

- PipelineStats.as_string() now returns a full recursive snapshot including
  filter_stats, transformer_stats, and action_stats rather than only scalars
- ActionStats, TransformerStats, FilterStats: replace self.__dict__ with explicit
  field enumeration so as_string() returns correct zero values even on freshly
  created instances that have never been mutated (class-level int defaults are
  not in self.__dict__ until first assignment)

https://claude.ai/code/session_018Pohzav3u7c3dLpZG2yeKV
…terStats

Replaces class-level int defaults with explicit instance attributes initialized
in __init__, consistent with the PipelineStats fix. Eliminates reliance on the
mutation-creates-instance-shadow behavior that made class-level sharing non-obvious.

https://claude.ai/code/session_018Pohzav3u7c3dLpZG2yeKV
@sgomezvillamor
Copy link
Copy Markdown
Contributor Author

✅ Local Testing & Metrics Validation

I've tested this PR locally with Prometheus metrics enabled and can confirm the new MCL pre-deserialization filter feature is working as expected.

Test Setup

  • Generated MCL traffic with datahub docker ingest-sample-data
  • Triggered executions with available system sources
  • Enabled Prometheus metrics on datahub-actions (port 8000)

Metrics Results

Accumulated totals across running pipelines:

Pipeline Passed Rejected Entirely Rejected Total MCL Filtered Total Messages*
ingestion_executor 8 471 0 479 728
datahub_doc_propagation_action 0 473 0 473 722
TOTAL 8 944 0 952 1,450

* Total Messages includes both MCL and PlatformEvent messages

Filter Efficiency

Pipeline Efficiency (Rejected/Total) Performance Benefit
ingestion_executor 471/479 = 98.3% 98.3% of MCL events avoided avrogen deserialization
datahub_doc_propagation_action 473/473 = 100% All MCL events avoided avrogen deserialization
Overall 944/952 = 99.2% 🎯 99.2% optimization rate

Key Observations

  1. New metric is working correctly: kafka_mcl_early_filter_total is being incremented with proper labels (pipeline_name, result)
  2. Filter results are accurate:
    • passed: MCL events matching filter criteria (8 execution requests)
    • rejected: MCL events filtered before avrogen (JSON) deserialization (944 events)
    • entirely_rejected: Not applicable (both pipelines accept some MCL event types)
  3. Significant performance benefit: 99.2% of MCL events were rejected before expensive avrogen (JSON) deserialization, demonstrating the optimization's effectiveness

Configuration Verified

Both test pipelines have enable_mcl_pre_deserialization_filter: true configured and are using the new filters: section with event_type filters as documented.

The feature is production-ready from a metrics and functionality perspective! 🚀

async_commit_interval: int = 10000
commit_retry_count: int = 5
commit_retry_backoff: float = 10.0
enable_mcl_pre_deserialization_filter: bool = Field(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that performance improvement is functionally equal to the old code, why would we want to disable it? Moreover - disable it by default. I think this would contribute to the over-complication of flags. I propose we enable the feature universally and not introduce the flag at all.

Copy link
Copy Markdown
Contributor Author

@sgomezvillamor sgomezvillamor May 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed - enabled by default in commit 39b0c7c

Having the flag for optional optimizations is still good practice though - it allows operators to:

  • Disable if they encounter edge cases we didn't anticipate
  • A/B test performance impact in production
  • Gradually roll out optimizations with confidence

The flag provides a safety valve without requiring code changes or deployments.

DeprecationWarning,
stacklevel=2,
)
transforms.append(create_filter_transformer(config.filter, ctx))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two filter execution paths run in parallel: the deprecated filter: becomes a FilterTransformer here, while filters: runs through the new framework. Since match_util
already extracted the shared matching logic, the deprecated path could be auto-translated into a FilterSpec(type="event_type", ...) and pushed onto filters — removing FilterTransformer
and collapsing to a single code path. Backward-compatible, ~100 fewer lines, and the legacy config would even benefit from MCL pre-deserialization for free.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, we warn it as deprecated and let users migrate at their own pace. An automatic forward migration can be considered when we actually remove FilterTransform.

Why not do it now? IMO FilterTransform is fundamentally flawed — it only covers one event type, while actions can consume multiple. I'd rather users migrate in a controlled way, which also nudges them to rethink their filters — many of which probably belong in the action itself, not formalized as a filter transform at all.

Is that a blocking concern?

Comment on lines +30 to +40
class EventTypeFilterSpec(ConfigModel):
"""Body predicates for a single event type.

`event` is a list of body predicate dicts combined with OR semantics:
the event passes if it satisfies *any* predicate dict. Within a single
predicate dict all key/value pairs must match (AND semantics).

Omitting `event` means "pass on type match alone".
"""

event: Optional[List[Dict[str, Any]]] = Field(default=None)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we list the predicates directly, without key event?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially this was shaped by the legacy filter transformer, which had:

event_type: str
event: Dict[str, Any]

and we just promoted event_type as a top-level key.

At some point I considered removing the event key as you suggest now, but kept it as a good practice, event it's the key for pattern matching constraints, leaving room to extend the spec with other constraint types in the future.

Comment thread docs/actions/README.md
Comment on lines +65 to +74
filters:
- type: event_type
config:
filter:
# Map from event_type string to optional body predicates.
# The event passes if its type is listed here and satisfies the associated predicate.
<event-type-string>:
event:
# List of predicate dicts — OR across list items, AND within each dict.
- <field>: <value>
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused about new filters structure. What would be the use-case for filters have more than one item?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filters with a clear, single responsibility are more reusable and composable.
It's the same pattern than transforms.

Currently, an event must match all filters: AND behavior.
In the future, we could add an OR mode at the pipeline level, but that's not needed yet, so not included.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand, but considering an event is always of one type (is it?), then why would we want to have several filters? We still can list explicitly the field combinations via event list, right?
It would make sense if filters were OR, unless I do not understand something.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A list of filters doesn't seem necessary now because we only have one filter implementation (EventTypeFilter), so concatenating multiple instances would be unnecessary. Indeed.

However, users can already register and use custom filters via the plugin registry, and we may add new filter implementations in the future. In that case, having a collection of filters becomes essential.

Also, as said before, the AND semantics across filters (event must pass ALL filters) enables powerful composition patterns - like combining EventTypeFilter with eg future RateLimitFilter or WhateverFancyFilter implementations.

@skrydal
Copy link
Copy Markdown
Collaborator

skrydal commented May 28, 2026

I haven't confirmed it practically, but Claude insists that the optimization path does not handle correct below case:

The minimal failing case is one EventTypeFilter, two OR predicates, where the predicates don't have the same set of keys:

  filters:
    - type: event_type
      config:
        filter:
          MetadataChangeLogEvent_v1:
            event:
              - entityType: dataset                          # predicate 1 — no aspectName constraint
              - entityType: chart
                aspectName: documentation                    # predicate 2 — has aspectName

Pipeline filter (per EventTypeFilter.matches): pass if (entityType=dataset) OR (entityType=chart AND aspectName=documentation). So an MCL with entityType=dataset, aspectName=schemaMetadata
passes.

Now trace set_filters merge on mcl_predicates = [{entityType: dataset}, {entityType: chart, aspectName: documentation}]:

Step early_criteria after
predicate 1: entityType=dataset {entityType: dataset}
predicate 2: entityType=chart → conflicts, pop {}
predicate 2: aspectName=documentation → new key, add {aspectName: documentation}

So the early filter ends up requiring aspectName=documentation. The MCL above (entityType=dataset, aspectName=schemaMetadata) gets early-rejected before deserialization — even though the
pipeline filter would have passed it. The Action never sees the event.

@maggiehays maggiehays added pending-submitter-response Issue/request has been reviewed but requires a response from the submitter and removed needs-review Label for PRs that need review from a maintainer. labels May 28, 2026
…lization filter

The previous implementation incorrectly merged OR predicates using AND logic,
causing silent data loss when predicates had different key sets or conflicting
values.

Changes:
- Changed _early_mcl_criteria from single dict to _early_mcl_criteria_list (list of dicts)
- Implemented correct OR semantics: pass if ANY predicate matches
- Conservative by design: extracts extractable fields, ignores complex ones
- Optimization disabled if ANY predicate has no extractable fields (OR semantics requirement)
- Added comprehensive docstring explaining conservative design with examples

Tests:
- Updated existing tests for new data structure
- Added test_kafka_source_conservative_early_filtering.py with 3 comprehensive tests
- Added nested/deep matching tests to test_event_type_filter.py
- All 26 tests passing

Fixes #17593 comment: #17593 (comment)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
@sgomezvillamor
Copy link
Copy Markdown
Contributor Author

@skrydal Excellent catch!

You're absolutely right. The conservative approach of the early filtering was not properly implemented. The bug was in the merge logic attempting to combine OR predicates using AND semantics.

The Issue:
The code tried to merge [{entityType: dataset}, {entityType: chart, aspectName: documentation}] into a single criteria dict, which resulted in {aspectName: documentation} after conflict resolution. This incorrectly rejected events that matched the first predicate.

The Fix:
We now maintain a list of criteria (one per predicate) and implement correct OR semantics:

  • Pass if ANY criteria matches (OR across predicates)
  • Each criteria requires ALL its fields to match (AND within predicate)
  • Conservative by design: We only filter out events we can definitively reject. If resolving predicates requires deserialization (nested fields like aspect.value.executorId), we let the event pass through to be properly filtered after deserialization.
  • Optimization is disabled if ANY predicate has no extractable fields (since we can't safely reject anything due to OR semantics).

Fix and comprehensive tests added in commit 191dc0c

Changed enable_mcl_pre_deserialization_filter default from False to True.
With the conservative OR semantics fix, the optimization is now safe to
enable by default and will provide performance benefits for most pipelines.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

devops PR or Issue related to DataHub backend & deployment docs Issues and Improvements to docs pending-submitter-response Issue/request has been reviewed but requires a response from the submitter

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants