-
Notifications
You must be signed in to change notification settings - Fork 8
feat(sqs): add OpenTelemetry instrumentation for SQS client and worker #48
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
f35b9f8
1ab9cbf
ae7db86
7aed6cd
3b659c1
0f4fa7b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| .PHONY: all build test | ||
|
|
||
| test: | ||
| @PROJECT_ROOT=$(PWD) APP_ENV=test go test -timeout 5s -tags grpc,pulsar,newrelic,kitex ./... | ||
| @PROJECT_ROOT=$(PWD) APP_ENV=test go test -timeout 5s -tags grpc,pulsar,newrelic,kitex,sqs,sqs_worker,otel ./... |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| //go:build sqs && otel | ||
| // +build sqs,otel | ||
|
|
||
| package sqs | ||
|
|
||
| import ( | ||
| "github.com/aws/aws-sdk-go/aws" | ||
| aws_sqs "github.com/aws/aws-sdk-go/service/sqs" | ||
| "go.opentelemetry.io/otel/propagation" | ||
| ) | ||
|
|
||
| // SQSMessageCarrier adapts an SQS MessageAttributeValue map to the | ||
| // propagation.TextMapCarrier interface so W3C trace context can be injected | ||
| // and extracted from SQS messages. | ||
| type SQSMessageCarrier map[string]*aws_sqs.MessageAttributeValue | ||
|
|
||
| var _ propagation.TextMapCarrier = SQSMessageCarrier{} | ||
|
|
||
| func (c SQSMessageCarrier) Get(key string) string { | ||
| if v, ok := c[key]; ok && v != nil && v.StringValue != nil { | ||
| return *v.StringValue | ||
| } | ||
| return "" | ||
| } | ||
|
|
||
| func (c SQSMessageCarrier) Set(key, val string) { | ||
| c[key] = &aws_sqs.MessageAttributeValue{ | ||
| DataType: aws.String("String"), | ||
| StringValue: aws.String(val), | ||
| } | ||
| } | ||
|
|
||
| func (c SQSMessageCarrier) Keys() []string { | ||
| keys := make([]string, 0, len(c)) | ||
| for k := range c { | ||
| keys = append(keys, k) | ||
| } | ||
| return keys | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,262 @@ | ||
| //go:build sqs && otel | ||
| // +build sqs,otel | ||
|
|
||
| package sqs | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "net/url" | ||
| "strconv" | ||
| "strings" | ||
|
|
||
| "github.com/aws/aws-sdk-go/aws/request" | ||
| aws_sqs "github.com/aws/aws-sdk-go/service/sqs" | ||
| "go.opentelemetry.io/otel" | ||
| "go.opentelemetry.io/otel/attribute" | ||
| "go.opentelemetry.io/otel/codes" | ||
| "go.opentelemetry.io/otel/trace" | ||
| ) | ||
|
|
||
| const tracerName = "github.com/shoplineapp/go-app/plugins/sqs" | ||
|
|
||
| // opClassification describes how a given SQS API operation should be mapped | ||
| // onto OTel messaging semantic conventions. | ||
| type opClassification struct { | ||
| spanKind trace.SpanKind | ||
| opType string // messaging.operation.type | ||
| opName string // messaging.operation.name | ||
| } | ||
|
|
||
| // classifyOperation returns the classification for a given SQS API operation | ||
| // name. The second return value is false for operations that are not | ||
| // messaging-relevant and should be skipped. | ||
| func classifyOperation(name string) (opClassification, bool) { | ||
| switch name { | ||
| case "SendMessage": | ||
| return opClassification{trace.SpanKindProducer, "send", "send"}, true | ||
| case "SendMessageBatch": | ||
| return opClassification{trace.SpanKindProducer, "send", "send_batch"}, true | ||
| case "ReceiveMessage": | ||
| return opClassification{trace.SpanKindConsumer, "receive", "receive"}, true | ||
| case "DeleteMessage": | ||
| return opClassification{trace.SpanKindClient, "settle", "delete"}, true | ||
| case "DeleteMessageBatch": | ||
| return opClassification{trace.SpanKindClient, "settle", "delete_batch"}, true | ||
| case "ChangeMessageVisibility": | ||
| return opClassification{trace.SpanKindClient, "settle", "change_visibility"}, true | ||
| case "ChangeMessageVisibilityBatch": | ||
| return opClassification{trace.SpanKindClient, "settle", "change_visibility_batch"}, true | ||
| } | ||
| return opClassification{}, false | ||
| } | ||
|
|
||
| // InstrumentSQSClient attaches OpenTelemetry instrumentation to a v1 | ||
| // aws-sdk-go SQS client. It: | ||
| // - starts a span in the Validate phase (before params are serialized) | ||
| // - injects W3C trace context into outgoing message attributes for send | ||
| // operations (also Validate phase, after span start) | ||
| // - ends the span in the Complete phase, recording any error and setting | ||
| // AWS/messaging semconv attributes. | ||
| func InstrumentSQSClient(c *aws_sqs.SQS) { | ||
| instrument(c) | ||
| } | ||
|
|
||
| func instrument(c *aws_sqs.SQS) { | ||
| if c == nil { | ||
| return | ||
| } | ||
| c.Handlers.Validate.PushFrontNamed(request.NamedHandler{ | ||
| Name: "otel.sqs.StartSpan", | ||
| Fn: startSpan, | ||
| }) | ||
| c.Handlers.Validate.PushBackNamed(request.NamedHandler{ | ||
| Name: "otel.sqs.InjectTraceContext", | ||
| Fn: injectTraceContext, | ||
| }) | ||
| c.Handlers.Complete.PushBackNamed(request.NamedHandler{ | ||
| Name: "otel.sqs.EndSpan", | ||
| Fn: endSpan, | ||
| }) | ||
| } | ||
|
|
||
| func startSpan(r *request.Request) { | ||
| class, ok := classifyOperation(r.Operation.Name) | ||
| if !ok { | ||
| return | ||
| } | ||
|
|
||
| queueURL := extractQueueURL(r.Params) | ||
| queueName := queueNameFromURL(queueURL) | ||
|
|
||
| attrs := []attribute.KeyValue{ | ||
| attribute.String("messaging.system", "aws_sqs"), | ||
| attribute.String("messaging.operation.name", class.opName), | ||
| attribute.String("messaging.operation.type", class.opType), | ||
| } | ||
| if queueURL != "" { | ||
| attrs = append(attrs, attribute.String("aws.sqs.queue.url", queueURL)) | ||
| } | ||
| if queueName != "" { | ||
| attrs = append(attrs, attribute.String("messaging.destination.name", queueName)) | ||
| } | ||
| if host, port := HostPortFromURL(queueURL); host != "" { | ||
| attrs = append(attrs, attribute.String("server.address", host)) | ||
| if port > 0 { | ||
| attrs = append(attrs, attribute.Int("server.port", port)) | ||
| } | ||
| } | ||
|
|
||
| spanName := class.opName | ||
| if queueName != "" { | ||
| spanName = class.opName + " " + queueName | ||
| } | ||
|
|
||
| tracer := otel.Tracer(tracerName) | ||
| ctx, _ := tracer.Start(r.Context(), spanName, | ||
| trace.WithSpanKind(class.spanKind), | ||
| trace.WithAttributes(attrs...), | ||
| ) | ||
| r.SetContext(ctx) | ||
| } | ||
|
|
||
| func injectTraceContext(r *request.Request) { | ||
| propagator := otel.GetTextMapPropagator() | ||
| switch p := r.Params.(type) { | ||
| case *aws_sqs.SendMessageInput: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 點解淨係得send先要inject trace context, to update the sqs message with trace id?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. When receiving, we will be using addReceiveLinks to extract the trace id from the message. So
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
漏左?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't need to inject a trace id when consuming a message. |
||
| if p == nil { | ||
| return | ||
| } | ||
| if p.MessageAttributes == nil { | ||
| p.MessageAttributes = map[string]*aws_sqs.MessageAttributeValue{} | ||
| } | ||
| propagator.Inject(r.Context(), SQSMessageCarrier(p.MessageAttributes)) | ||
| case *aws_sqs.SendMessageBatchInput: | ||
| if p == nil { | ||
| return | ||
| } | ||
| for i := range p.Entries { | ||
| if p.Entries[i] == nil { | ||
| continue | ||
| } | ||
| if p.Entries[i].MessageAttributes == nil { | ||
| p.Entries[i].MessageAttributes = map[string]*aws_sqs.MessageAttributeValue{} | ||
| } | ||
| propagator.Inject(r.Context(), SQSMessageCarrier(p.Entries[i].MessageAttributes)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func endSpan(r *request.Request) { | ||
| span := trace.SpanFromContext(r.Context()) | ||
| if !span.IsRecording() { | ||
|
kelvinlam-shopline marked this conversation as resolved.
|
||
| return | ||
| } | ||
|
|
||
| if r.RequestID != "" { | ||
| span.SetAttributes(attribute.String("aws.request_id", r.RequestID)) | ||
| } | ||
|
|
||
| switch out := r.Data.(type) { | ||
| case *aws_sqs.SendMessageOutput: | ||
| if out != nil && out.MessageId != nil { | ||
| span.SetAttributes(attribute.String("messaging.message.id", *out.MessageId)) | ||
| } | ||
| case *aws_sqs.ReceiveMessageOutput: | ||
| addReceiveLinks(span, out) | ||
|
ThomasKwan-shopline marked this conversation as resolved.
|
||
| } | ||
|
|
||
| if r.Error != nil { | ||
| span.RecordError(r.Error) | ||
| span.SetStatus(codes.Error, r.Error.Error()) | ||
| span.SetAttributes(attribute.String("error.type", fmt.Sprintf("%T", r.Error))) | ||
| } | ||
|
|
||
| span.End() | ||
| } | ||
|
|
||
| func addReceiveLinks(span trace.Span, out *aws_sqs.ReceiveMessageOutput) { | ||
| if out == nil { | ||
| return | ||
| } | ||
| propagator := otel.GetTextMapPropagator() | ||
| for _, msg := range out.Messages { | ||
| if msg == nil { | ||
| continue | ||
| } | ||
| carrier := SQSMessageCarrier(msg.MessageAttributes) | ||
| ctx := propagator.Extract(context.Background(), carrier) | ||
|
ThomasKwan-shopline marked this conversation as resolved.
|
||
| sc := trace.SpanContextFromContext(ctx) | ||
| if sc.IsValid() { | ||
| span.AddLink(trace.Link{SpanContext: sc}) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // extractQueueURL returns the QueueUrl from a known SQS input type, or "". | ||
| func extractQueueURL(params interface{}) string { | ||
| switch p := params.(type) { | ||
| case *aws_sqs.SendMessageInput: | ||
| return derefString(p.QueueUrl) | ||
| case *aws_sqs.SendMessageBatchInput: | ||
| return derefString(p.QueueUrl) | ||
| case *aws_sqs.ReceiveMessageInput: | ||
| return derefString(p.QueueUrl) | ||
| case *aws_sqs.DeleteMessageInput: | ||
| return derefString(p.QueueUrl) | ||
| case *aws_sqs.DeleteMessageBatchInput: | ||
| return derefString(p.QueueUrl) | ||
| case *aws_sqs.ChangeMessageVisibilityInput: | ||
| return derefString(p.QueueUrl) | ||
| case *aws_sqs.ChangeMessageVisibilityBatchInput: | ||
| return derefString(p.QueueUrl) | ||
| } | ||
| return "" | ||
| } | ||
|
|
||
| // queueNameFromURL extracts the queue name (last path segment) from a | ||
| // standard SQS queue URL such as | ||
| // https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue. | ||
| func queueNameFromURL(queueURL string) string { | ||
| if queueURL == "" { | ||
| return "" | ||
| } | ||
| u, err := url.Parse(queueURL) | ||
| if err != nil { | ||
| return "" | ||
| } | ||
| path := strings.Trim(u.Path, "/") | ||
| if path == "" { | ||
| return "" | ||
| } | ||
| if idx := strings.LastIndex(path, "/"); idx >= 0 { | ||
| return path[idx+1:] | ||
| } | ||
| return path | ||
| } | ||
|
|
||
| // HostPortFromURL returns the hostname and port of an SQS queue URL. | ||
| // Port is 0 when the URL has no explicit port. | ||
| func HostPortFromURL(queueURL string) (host string, port int) { | ||
| if queueURL == "" { | ||
| return "", 0 | ||
| } | ||
| u, err := url.Parse(queueURL) | ||
| if err != nil { | ||
| return "", 0 | ||
| } | ||
| host = u.Hostname() | ||
| if p := u.Port(); p != "" { | ||
| if n, err := strconv.Atoi(p); err == nil { | ||
| port = n | ||
| } | ||
| } | ||
| return host, port | ||
| } | ||
|
|
||
| func derefString(s *string) string { | ||
| if s == nil { | ||
| return "" | ||
| } | ||
| return *s | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| //go:build sqs && !otel | ||
|
|
||
| package sqs | ||
|
|
||
| import aws_sqs "github.com/aws/aws-sdk-go/service/sqs" | ||
|
|
||
| func InstrumentSQSClient(c *aws_sqs.SQS) {} | ||
|
|
||
| func instrument(c *aws_sqs.SQS) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the
classvariable maybe calloperationis better