diff --git a/datadog_lambda/durable.py b/datadog_lambda/durable.py index e9443f92..1833b99a 100644 --- a/datadog_lambda/durable.py +++ b/datadog_lambda/durable.py @@ -2,11 +2,17 @@ # under the Apache License Version 2.0. # This product includes software developed at Datadog (https://www.datadoghq.com/). # Copyright 2019 Datadog, Inc. +import json import logging import re +import sys logger = logging.getLogger(__name__) +# When changing the schema of the durable invocation log, bump this version so +# the extension can handle compatibility between old and new schemas. +DURABLE_INVOCATION_LOG_SCHEMA_VERSION = "1.0.0" + def _parse_durable_execution_arn(arn): """ @@ -47,3 +53,18 @@ def extract_durable_function_tags(event): "durable_function_execution_name": execution_name, "durable_function_execution_id": execution_id, } + + +def emit_durable_execution_log(request_id, execution_name, execution_id): + """ + Emits a structured JSON log to stdout mapping the Lambda request_id to the + durable execution name and ID. This is consumed by the Lambda extension layer + to correlate request IDs with durable executions. + """ + log = { + "request_id": request_id, + "durable_execution_name": execution_name, + "durable_execution_id": execution_id, + "schema_version": DURABLE_INVOCATION_LOG_SCHEMA_VERSION, + } + print(json.dumps(log), file=sys.stdout, flush=True) diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index c174a501..2a7315b9 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -42,7 +42,10 @@ tracer, propagator, ) -from datadog_lambda.durable import extract_durable_function_tags +from datadog_lambda.durable import ( + extract_durable_function_tags, + emit_durable_execution_log, +) from datadog_lambda.trigger import ( extract_trigger_tags, extract_http_status_code_tag, @@ -245,6 +248,12 @@ def _before(self, event, context): self.trigger_tags = extract_trigger_tags(event, context) self.durable_function_tags = extract_durable_function_tags(event) + if self.durable_function_tags: + emit_durable_execution_log( + context.aws_request_id, + self.durable_function_tags["durable_function_execution_name"], + self.durable_function_tags["durable_function_execution_id"], + ) # Extract Datadog trace context and source from incoming requests dd_context, trace_context_source, event_source = extract_dd_trace_context( event, diff --git a/tests/test_durable.py b/tests/test_durable.py index 60914934..fe3667ce 100644 --- a/tests/test_durable.py +++ b/tests/test_durable.py @@ -2,11 +2,16 @@ # under the Apache License Version 2.0. # This product includes software developed at Datadog (https://www.datadoghq.com/). # Copyright 2019 Datadog, Inc. +import io +import json +import sys import unittest from datadog_lambda.durable import ( _parse_durable_execution_arn, extract_durable_function_tags, + emit_durable_execution_log, + DURABLE_INVOCATION_LOG_SCHEMA_VERSION, ) @@ -89,3 +94,35 @@ def test_returns_empty_dict_when_durable_execution_arn_cannot_be_parsed(self): def test_returns_empty_dict_when_event_is_empty(self): result = extract_durable_function_tags({}) self.assertEqual(result, {}) + + +class TestEmitDurableExecutionLog(unittest.TestCase): + def _capture_stdout(self, fn): + captured = io.StringIO() + original = sys.stdout + sys.stdout = captured + try: + fn() + finally: + sys.stdout = original + return captured.getvalue() + + def test_emits_json_with_all_fields(self): + output = self._capture_stdout( + lambda: emit_durable_execution_log("req-123", "my-execution", "exec-id-456") + ) + data = json.loads(output.strip()) + self.assertEqual(data["request_id"], "req-123") + self.assertEqual(data["durable_execution_name"], "my-execution") + self.assertEqual(data["durable_execution_id"], "exec-id-456") + self.assertEqual(data["schema_version"], DURABLE_INVOCATION_LOG_SCHEMA_VERSION) + + def test_emits_single_json_line(self): + output = self._capture_stdout( + lambda: emit_durable_execution_log("req-1", "name", "id") + ) + lines = [l for l in output.splitlines() if l.strip()] + self.assertEqual(len(lines), 1) + + def test_schema_version_is_correct(self): + self.assertEqual(DURABLE_INVOCATION_LOG_SCHEMA_VERSION, "1.0.0")