Skip to content

feat: add otel to pulsar#46

Open
anson-lee-sl wants to merge 12 commits into
masterfrom
feature/PLAT-2139-Pulsar-Otel
Open

feat: add otel to pulsar#46
anson-lee-sl wants to merge 12 commits into
masterfrom
feature/PLAT-2139-Pulsar-Otel

Conversation

@anson-lee-sl

@anson-lee-sl anson-lee-sl commented Jun 1, 2026

Copy link
Copy Markdown
Contributor

Why?

Adding OpenTelemetry instrumentation for Pulsar. There's no official Otel support by Pulsar Client Go.

How?

Follow https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/ and instrument producer & consumer

Test Case

Full Trace
image

Producer
image

Consumer
image

image

- Wrap producer with instrumentedProducer to create Send/SendAsync spans
  (SpanKindProducer) with W3C trace context injection
- Add consumer process spans (SpanKindConsumer) and settle spans
  (SpanKindClient) for ack/nack lifecycle
- Extract W3C trace context on receive; fall back to legacy trace_id
  for backward compatibility
- Align span names, kinds, operation types, and attributes with
  OTel Semantic Conventions v1.41.0
Comment thread plugins/pulsar/otel_consumer.go Outdated
func startProcessSpan(ctx context.Context, topic, subscriptionName string, msgID ap.MessageID) (context.Context, func(error)) {
attrs := []attribute.KeyValue{
messagingSystemPulsar,
attribute.String("messaging.operation.name", "process"),

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

你可以將所有 span name 改做 constant, 咁你唔洗copy嚟copy去 + 怕改漏

btw 我見你成堆code都係有類似嘅logic, 會晤會有其他比較好嘅處理方法 🤔 但係我諗唔到

Comment thread plugins/pulsar/otel_consumer.go Outdated
)

endSpan := func(err error) {
if err != nil {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same, 可以試下抽做獨立function for report error span

eg

func reportErrorSpan(span XXX, err error) {
			span.RecordError(err)
			span.SetStatus(codes.Error, err.Error())
			span.SetAttributes(attribute.String("error.type", fmt.Sprintf("%T", err)))
}

Comment thread plugins/pulsar/otel_consumer.go Outdated
Comment thread plugins/pulsar/otel_consumer.go Outdated
@anson-lee-sl anson-lee-sl marked this pull request as ready for review June 16, 2026 10:30
Comment thread plugins/pulsar/otel_common.go Outdated

// messagingSystemPulsar: a single shared KeyValue reused across all
// pulsar spans to avoid allocating a new attribute per span.
var messagingSystemPulsar = semconv.MessagingSystemPulsar

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

你用得個一次就唔洗特登 define la, 同埋你直接係 usage call semconv.MessagingSystemPulsar 唔得咩

Comment thread plugins/pulsar/otel_producer.go Outdated
// PRODUCER-kind "send" span with the spec-required attributes.
func (p *instrumentedProducer) startSendSpan(ctx context.Context) (context.Context, trace.Span) {
tracer := otel.Tracer(tracerName)
return tracer.Start(ctx, spanName(spanOpSend, p.topic),

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以用 p.Topic(), 唔需要pass topic入wrap到

Drop the redundant topic arg from wrapProducer and read it from the underlying ap.Producer via Topic(). Inline semconv.MessagingSystemPulsar directly in messagingAttributes, removing the local messagingSystemPulsar alias whose original 'avoid per-span allocation' justification was incorrect. Use the existing spanOpAck and spanOpNack constants in the consumer's settle-span call sites. Update fakeProducer with a Topic() method and adjust the 7 wrapProducer test call sites to match the new signature. Rename TestInjectTraceContext_DefensivelyCopies to TestInjectTraceContext_NoDefensiveCopy and flip its assertions to match what PulsarMessageCarrier actually does (map alias, no defensive copy).
…ducer

Follow the Go constructor convention (new<StructName>) and make the function name reflect what it actually returns. The previous name 'wrapProducer' was generic; 'newInstrumentedProducer' pairs with the existing instrumentedProducer struct, signals that the returned type embeds OTel tracing, and matches the test-side type assertion in otel_producer_test.go. Pure rename — definition, one production call site in AddProducer, and the seven test call sites.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants