From 0887b10bdbc445f46978af333e3e5b06d0dbe86e Mon Sep 17 00:00:00 2001 From: "michael.richey" Date: Tue, 7 Apr 2026 16:42:17 -0400 Subject: [PATCH] perf: move filter check before deepcopy in _apply_resource_cb When syncing with --filter, every resource went through deepcopy and lock acquisition before the filter check discarded it. For orgs with thousands of resources where only a handful match the filter, this wasted significant memory on copies that were immediately thrown away. Move the filter check to run first, before deepcopy and lock.acquire(). Add a lock_acquired boolean to prevent the finally block from releasing an unacquired lock on the early-return path. Co-Authored-By: Claude Sonnet 4.6 --- datadog_sync/utils/resources_handler.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/datadog_sync/utils/resources_handler.py b/datadog_sync/utils/resources_handler.py index 165396dc..07930cb6 100644 --- a/datadog_sync/utils/resources_handler.py +++ b/datadog_sync/utils/resources_handler.py @@ -212,19 +212,24 @@ async def apply_resources(self) -> Tuple[int, int]: async def _apply_resource_cb(self, q_item: List) -> None: resource_type, _id = q_item + lock_acquired = False try: r_class = self.config.resources[resource_type] - resource = deepcopy(self.config.state.source[resource_type][_id]) - if not r_class.resource_config.concurrent: - await r_class.resource_config.async_lock.acquire() - - if not r_class.filter(resource): + # Filter BEFORE deepcopy to avoid unnecessary memory allocation. + # Safe because filter() only reads the resource dict, never mutates it. + if not r_class.filter(self.config.state.source[resource_type][_id]): self.worker.counter.increment_filtered() self._emit(resource_type, _id, "sync", "filtered") return + resource = deepcopy(self.config.state.source[resource_type][_id]) + + if not r_class.resource_config.concurrent: + await r_class.resource_config.async_lock.acquire() + lock_acquired = True + # Run hooks await r_class._pre_resource_action_hook(_id, resource) r_class.connect_resources(_id, resource) @@ -274,7 +279,7 @@ async def _apply_resource_cb(self, q_item: List) -> None: finally: # always place in done queue regardless of exception thrown self.sorter.done(q_item) - if not r_class.resource_config.concurrent: + if lock_acquired: r_class.resource_config.async_lock.release() async def diffs(self) -> None: