Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions datadog_lambda/durable.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although your change says "to use logs instead of traces to enrich logs" still creates a dependency where you need a tracing layer to have complete observability in logs – does that even make sense from a dependency standpoint?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dependency refers to the one between trace agent and log agent in lambda extension. Using traces adds coupling between them, making extension code harder to maintain. See discussion here DataDog/datadog-lambda-extension#1053 (comment)

Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@
# under the Apache License Version 2.0.
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2019 Datadog, Inc.
import json
import logging
import re
import sys

logger = logging.getLogger(__name__)

# When changing the schema of the durable invocation log, bump this version so
# the extension can handle compatibility between old and new schemas.
DURABLE_INVOCATION_LOG_SCHEMA_VERSION = "1.0.0"


def _parse_durable_execution_arn(arn):
"""
Expand Down Expand Up @@ -47,3 +53,18 @@ def extract_durable_function_tags(event):
"durable_function_execution_name": execution_name,
"durable_function_execution_id": execution_id,
}


def emit_durable_execution_log(request_id, execution_name, execution_id):
"""
Emits a structured JSON log to stdout mapping the Lambda request_id to the
durable execution name and ID. This is consumed by the Lambda extension layer
to correlate request IDs with durable executions.
"""
log = {
"request_id": request_id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought extension side has the information of the request_id

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race conditions often happen (e.g. logs can be delayed), and given a log message, it's hard for the extension to know which request_id it was for, so I think it's safer and easier to add request_id to the log message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see. that makes sense.

"durable_execution_name": execution_name,
"durable_execution_id": execution_id,
"schema_version": DURABLE_INVOCATION_LOG_SCHEMA_VERSION,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need a schema version here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried that we may need to add more fields in the future, though I'm not sure how yet. Adding schema version will make it easier for extension to identify incompatibility (e.g. missing field) and print an error to ask the user to update tracer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding more fields shouldn't require a schema. i'm not very convinced that we need it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would agree that we don't need schema version. If we need to change this in the future, which I think is very unlikely, then we can add schema version at that point.

}
print(json.dumps(log), file=sys.stdout, flush=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not using the logger?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joeyzhao2018 probably logger injects other things which we don't want

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this message should always be logged, and using logger introduces a risk that this message is dropped due to log level. Do you think it's safe to use logger.info()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, you mean the "parsing" part would be a big unpredictable? can we use regex to solve that part?
using logger can make sure it still follows the existing rules such as extra injections or if customers want to change the level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, i see, you don't want this to be controlled by log level.

11 changes: 10 additions & 1 deletion datadog_lambda/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@
tracer,
propagator,
)
from datadog_lambda.durable import extract_durable_function_tags
from datadog_lambda.durable import (
extract_durable_function_tags,
emit_durable_execution_log,
)
from datadog_lambda.trigger import (
extract_trigger_tags,
extract_http_status_code_tag,
Expand Down Expand Up @@ -245,6 +248,12 @@ def _before(self, event, context):

self.trigger_tags = extract_trigger_tags(event, context)
self.durable_function_tags = extract_durable_function_tags(event)
if self.durable_function_tags:
emit_durable_execution_log(
context.aws_request_id,
self.durable_function_tags["durable_function_execution_name"],
self.durable_function_tags["durable_function_execution_id"],
)
# Extract Datadog trace context and source from incoming requests
dd_context, trace_context_source, event_source = extract_dd_trace_context(
event,
Expand Down
37 changes: 37 additions & 0 deletions tests/test_durable.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@
# under the Apache License Version 2.0.
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2019 Datadog, Inc.
import io
import json
import sys
import unittest

from datadog_lambda.durable import (
_parse_durable_execution_arn,
extract_durable_function_tags,
emit_durable_execution_log,
DURABLE_INVOCATION_LOG_SCHEMA_VERSION,
)


Expand Down Expand Up @@ -89,3 +94,35 @@ def test_returns_empty_dict_when_durable_execution_arn_cannot_be_parsed(self):
def test_returns_empty_dict_when_event_is_empty(self):
result = extract_durable_function_tags({})
self.assertEqual(result, {})


class TestEmitDurableExecutionLog(unittest.TestCase):
def _capture_stdout(self, fn):
captured = io.StringIO()
original = sys.stdout
sys.stdout = captured
try:
fn()
finally:
sys.stdout = original
return captured.getvalue()

def test_emits_json_with_all_fields(self):
output = self._capture_stdout(
lambda: emit_durable_execution_log("req-123", "my-execution", "exec-id-456")
)
data = json.loads(output.strip())
self.assertEqual(data["request_id"], "req-123")
self.assertEqual(data["durable_execution_name"], "my-execution")
self.assertEqual(data["durable_execution_id"], "exec-id-456")
self.assertEqual(data["schema_version"], DURABLE_INVOCATION_LOG_SCHEMA_VERSION)

def test_emits_single_json_line(self):
output = self._capture_stdout(
lambda: emit_durable_execution_log("req-1", "name", "id")
)
lines = [l for l in output.splitlines() if l.strip()]
self.assertEqual(len(lines), 1)

def test_schema_version_is_correct(self):
self.assertEqual(DURABLE_INVOCATION_LOG_SCHEMA_VERSION, "1.0.0")
Loading