Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions datadog_sync/utils/resources_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,19 +212,24 @@ async def apply_resources(self) -> Tuple[int, int]:

async def _apply_resource_cb(self, q_item: List) -> None:
resource_type, _id = q_item
lock_acquired = False

try:
r_class = self.config.resources[resource_type]
resource = deepcopy(self.config.state.source[resource_type][_id])

if not r_class.resource_config.concurrent:
await r_class.resource_config.async_lock.acquire()

if not r_class.filter(resource):
# Filter BEFORE deepcopy to avoid unnecessary memory allocation.
# Safe because filter() only reads the resource dict, never mutates it.
if not r_class.filter(self.config.state.source[resource_type][_id]):
self.worker.counter.increment_filtered()
self._emit(resource_type, _id, "sync", "filtered")
return

resource = deepcopy(self.config.state.source[resource_type][_id])

if not r_class.resource_config.concurrent:
await r_class.resource_config.async_lock.acquire()
lock_acquired = True

# Run hooks
await r_class._pre_resource_action_hook(_id, resource)
r_class.connect_resources(_id, resource)
Expand Down Expand Up @@ -274,7 +279,7 @@ async def _apply_resource_cb(self, q_item: List) -> None:
finally:
# always place in done queue regardless of exception thrown
self.sorter.done(q_item)
if not r_class.resource_config.concurrent:
if lock_acquired:
r_class.resource_config.async_lock.release()

async def diffs(self) -> None:
Expand Down