Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: all build test

test:
@PROJECT_ROOT=$(PWD) APP_ENV=test go test -timeout 5s -tags grpc,pulsar,newrelic,kitex ./...
@PROJECT_ROOT=$(PWD) APP_ENV=test go test -timeout 5s -tags grpc,pulsar,newrelic,kitex,sqs,sqs_worker,otel ./...
2 changes: 1 addition & 1 deletion plugins/grpc/presets/grpc_with_error_reporting.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//go:build grpc && newrelic && otel && sentry
// +build grpc,newrelic,sentry,otel
// +build grpc,newrelic,otel,sentry

package presets

Expand Down
23 changes: 23 additions & 0 deletions plugins/sqs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,26 @@ func main() {
}
```

## OpenTelemetry

When built with `-tags "sqs otel"`, every `*sqs.SQS` client produced by
`AwsTopicManager.AddTopic` is transparently instrumented via the
`aws-sdk-go` v1 request handler chain. Each SQS API call emits a span
following the OpenTelemetry Semantic Conventions 1.41.0 — [messaging spans](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/) and [AWS SQS](https://opentelemetry.io/docs/specs/semconv/messaging/sqs/):

| Operation | Span kind | `messaging.operation.type` | `messaging.operation.name` |
| --------------------------------------------- | --------- | -------------------------- | -------------------------- |
| `SendMessage` | Producer | `send` | `send` |
| `SendMessageBatch` | Producer | `send` | `send_batch` |
| `ReceiveMessage` | Consumer | `receive` | `receive` |
| `DeleteMessage(Batch)` | Client | `settle` | `delete[_batch]` |
| `ChangeMessageVisibility(Batch)` | Client | `settle` | `change_visibility[_batch]`|

Common attributes include `messaging.system=aws_sqs`,
`messaging.destination.name`, `aws.sqs.queue.url`, `aws.request_id`,
`server.address`, and — for `SendMessage` — `messaging.message.id`.

W3C trace context (`traceparent`) is injected into the outgoing message's
`MessageAttributes` (per entry for `SendMessageBatch`), enabling downstream
consumers to continue the trace.

39 changes: 39 additions & 0 deletions plugins/sqs/otel_carrier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//go:build sqs && otel
// +build sqs,otel

package sqs

import (
"github.com/aws/aws-sdk-go/aws"
aws_sqs "github.com/aws/aws-sdk-go/service/sqs"
"go.opentelemetry.io/otel/propagation"
)

// SQSMessageCarrier adapts an SQS MessageAttributeValue map to the
// propagation.TextMapCarrier interface so W3C trace context can be injected
// and extracted from SQS messages.
type SQSMessageCarrier map[string]*aws_sqs.MessageAttributeValue

var _ propagation.TextMapCarrier = SQSMessageCarrier{}

func (c SQSMessageCarrier) Get(key string) string {
if v, ok := c[key]; ok && v != nil && v.StringValue != nil {
return *v.StringValue
}
return ""
}

func (c SQSMessageCarrier) Set(key, val string) {
c[key] = &aws_sqs.MessageAttributeValue{
DataType: aws.String("String"),
StringValue: aws.String(val),
}
}

func (c SQSMessageCarrier) Keys() []string {
keys := make([]string, 0, len(c))
for k := range c {
keys = append(keys, k)
}
return keys
}
262 changes: 262 additions & 0 deletions plugins/sqs/otel_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
//go:build sqs && otel
// +build sqs,otel

package sqs

import (
"context"
"fmt"
"net/url"
"strconv"
"strings"

"github.com/aws/aws-sdk-go/aws/request"
aws_sqs "github.com/aws/aws-sdk-go/service/sqs"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

const tracerName = "github.com/shoplineapp/go-app/plugins/sqs"

// opClassification describes how a given SQS API operation should be mapped
// onto OTel messaging semantic conventions.
type opClassification struct {
spanKind trace.SpanKind
opType string // messaging.operation.type
opName string // messaging.operation.name
}

// classifyOperation returns the classification for a given SQS API operation
// name. The second return value is false for operations that are not
// messaging-relevant and should be skipped.
func classifyOperation(name string) (opClassification, bool) {
switch name {
case "SendMessage":
return opClassification{trace.SpanKindProducer, "send", "send"}, true
case "SendMessageBatch":
return opClassification{trace.SpanKindProducer, "send", "send_batch"}, true
case "ReceiveMessage":
return opClassification{trace.SpanKindConsumer, "receive", "receive"}, true
case "DeleteMessage":
return opClassification{trace.SpanKindClient, "settle", "delete"}, true
case "DeleteMessageBatch":
return opClassification{trace.SpanKindClient, "settle", "delete_batch"}, true
case "ChangeMessageVisibility":
return opClassification{trace.SpanKindClient, "settle", "change_visibility"}, true
case "ChangeMessageVisibilityBatch":
return opClassification{trace.SpanKindClient, "settle", "change_visibility_batch"}, true
}
return opClassification{}, false
}

// InstrumentSQSClient attaches OpenTelemetry instrumentation to a v1
// aws-sdk-go SQS client. It:
// - starts a span in the Validate phase (before params are serialized)
// - injects W3C trace context into outgoing message attributes for send
// operations (also Validate phase, after span start)
// - ends the span in the Complete phase, recording any error and setting
// AWS/messaging semconv attributes.
func InstrumentSQSClient(c *aws_sqs.SQS) {
instrument(c)
}

func instrument(c *aws_sqs.SQS) {
if c == nil {
return
}
c.Handlers.Validate.PushFrontNamed(request.NamedHandler{
Name: "otel.sqs.StartSpan",
Fn: startSpan,
})
c.Handlers.Validate.PushBackNamed(request.NamedHandler{
Name: "otel.sqs.InjectTraceContext",
Fn: injectTraceContext,
})
c.Handlers.Complete.PushBackNamed(request.NamedHandler{
Name: "otel.sqs.EndSpan",
Fn: endSpan,
})
}

func startSpan(r *request.Request) {
class, ok := classifyOperation(r.Operation.Name)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the class variable maybe call operation is better

if !ok {
return
}

queueURL := extractQueueURL(r.Params)
queueName := queueNameFromURL(queueURL)

attrs := []attribute.KeyValue{
attribute.String("messaging.system", "aws_sqs"),
attribute.String("messaging.operation.name", class.opName),
attribute.String("messaging.operation.type", class.opType),
}
if queueURL != "" {
attrs = append(attrs, attribute.String("aws.sqs.queue.url", queueURL))
}
if queueName != "" {
attrs = append(attrs, attribute.String("messaging.destination.name", queueName))
}
if host, port := HostPortFromURL(queueURL); host != "" {
attrs = append(attrs, attribute.String("server.address", host))
if port > 0 {
attrs = append(attrs, attribute.Int("server.port", port))
}
}

spanName := class.opName
if queueName != "" {
spanName = class.opName + " " + queueName
}

tracer := otel.Tracer(tracerName)
ctx, _ := tracer.Start(r.Context(), spanName,
trace.WithSpanKind(class.spanKind),
trace.WithAttributes(attrs...),
)
r.SetContext(ctx)
}

func injectTraceContext(r *request.Request) {
propagator := otel.GetTextMapPropagator()
switch p := r.Params.(type) {
case *aws_sqs.SendMessageInput:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

點解淨係得send先要inject trace context, to update the sqs message with trace id?

@anson-lee-sl anson-lee-sl Jun 10, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. When receiving, we will be using addReceiveLinks to extract the trace id from the message. So

  • Before send, we inject a trace id.
  • Before processing the message in consumer, we extract the trace id from the message

@ThomasKwan-shopline ThomasKwan-shopline Jun 12, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before processing the message, we extract the message

漏左?

@anson-lee-sl anson-lee-sl Jun 12, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to inject a trace id when consuming a message.

if p == nil {
return
}
if p.MessageAttributes == nil {
p.MessageAttributes = map[string]*aws_sqs.MessageAttributeValue{}
}
propagator.Inject(r.Context(), SQSMessageCarrier(p.MessageAttributes))
case *aws_sqs.SendMessageBatchInput:
if p == nil {
return
}
for i := range p.Entries {
if p.Entries[i] == nil {
continue
}
if p.Entries[i].MessageAttributes == nil {
p.Entries[i].MessageAttributes = map[string]*aws_sqs.MessageAttributeValue{}
}
propagator.Inject(r.Context(), SQSMessageCarrier(p.Entries[i].MessageAttributes))
}
}
}

func endSpan(r *request.Request) {
span := trace.SpanFromContext(r.Context())
if !span.IsRecording() {
Comment thread
kelvinlam-shopline marked this conversation as resolved.
return
}

if r.RequestID != "" {
span.SetAttributes(attribute.String("aws.request_id", r.RequestID))
}

switch out := r.Data.(type) {
case *aws_sqs.SendMessageOutput:
if out != nil && out.MessageId != nil {
span.SetAttributes(attribute.String("messaging.message.id", *out.MessageId))
}
case *aws_sqs.ReceiveMessageOutput:
addReceiveLinks(span, out)
Comment thread
ThomasKwan-shopline marked this conversation as resolved.
}

if r.Error != nil {
span.RecordError(r.Error)
span.SetStatus(codes.Error, r.Error.Error())
span.SetAttributes(attribute.String("error.type", fmt.Sprintf("%T", r.Error)))
}

span.End()
}

func addReceiveLinks(span trace.Span, out *aws_sqs.ReceiveMessageOutput) {
if out == nil {
return
}
propagator := otel.GetTextMapPropagator()
for _, msg := range out.Messages {
if msg == nil {
continue
}
carrier := SQSMessageCarrier(msg.MessageAttributes)
ctx := propagator.Extract(context.Background(), carrier)
Comment thread
ThomasKwan-shopline marked this conversation as resolved.
sc := trace.SpanContextFromContext(ctx)
if sc.IsValid() {
span.AddLink(trace.Link{SpanContext: sc})
}
}
}

// extractQueueURL returns the QueueUrl from a known SQS input type, or "".
func extractQueueURL(params interface{}) string {
switch p := params.(type) {
case *aws_sqs.SendMessageInput:
return derefString(p.QueueUrl)
case *aws_sqs.SendMessageBatchInput:
return derefString(p.QueueUrl)
case *aws_sqs.ReceiveMessageInput:
return derefString(p.QueueUrl)
case *aws_sqs.DeleteMessageInput:
return derefString(p.QueueUrl)
case *aws_sqs.DeleteMessageBatchInput:
return derefString(p.QueueUrl)
case *aws_sqs.ChangeMessageVisibilityInput:
return derefString(p.QueueUrl)
case *aws_sqs.ChangeMessageVisibilityBatchInput:
return derefString(p.QueueUrl)
}
return ""
}

// queueNameFromURL extracts the queue name (last path segment) from a
// standard SQS queue URL such as
// https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue.
func queueNameFromURL(queueURL string) string {
if queueURL == "" {
return ""
}
u, err := url.Parse(queueURL)
if err != nil {
return ""
}
path := strings.Trim(u.Path, "/")
if path == "" {
return ""
}
if idx := strings.LastIndex(path, "/"); idx >= 0 {
return path[idx+1:]
}
return path
}

// HostPortFromURL returns the hostname and port of an SQS queue URL.
// Port is 0 when the URL has no explicit port.
func HostPortFromURL(queueURL string) (host string, port int) {
if queueURL == "" {
return "", 0
}
u, err := url.Parse(queueURL)
if err != nil {
return "", 0
}
host = u.Hostname()
if p := u.Port(); p != "" {
if n, err := strconv.Atoi(p); err == nil {
port = n
}
}
return host, port
}

func derefString(s *string) string {
if s == nil {
return ""
}
return *s
}
9 changes: 9 additions & 0 deletions plugins/sqs/otel_client_noop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
//go:build sqs && !otel

package sqs

import aws_sqs "github.com/aws/aws-sdk-go/service/sqs"

func InstrumentSQSClient(c *aws_sqs.SQS) {}

func instrument(c *aws_sqs.SQS) {}
Loading
Loading