feat: add otel to pulsar#46
Open
anson-lee-sl wants to merge 12 commits into
Open
Conversation
- Wrap producer with instrumentedProducer to create Send/SendAsync spans (SpanKindProducer) with W3C trace context injection - Add consumer process spans (SpanKindConsumer) and settle spans (SpanKindClient) for ack/nack lifecycle - Extract W3C trace context on receive; fall back to legacy trace_id for backward compatibility - Align span names, kinds, operation types, and attributes with OTel Semantic Conventions v1.41.0
ThomasKwan-shopline
requested changes
Jun 15, 2026
| func startProcessSpan(ctx context.Context, topic, subscriptionName string, msgID ap.MessageID) (context.Context, func(error)) { | ||
| attrs := []attribute.KeyValue{ | ||
| messagingSystemPulsar, | ||
| attribute.String("messaging.operation.name", "process"), |
There was a problem hiding this comment.
你可以將所有 span name 改做 constant, 咁你唔洗copy嚟copy去 + 怕改漏
btw 我見你成堆code都係有類似嘅logic, 會晤會有其他比較好嘅處理方法 🤔 但係我諗唔到
| ) | ||
|
|
||
| endSpan := func(err error) { | ||
| if err != nil { |
There was a problem hiding this comment.
same, 可以試下抽做獨立function for report error span
eg
func reportErrorSpan(span XXX, err error) {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
span.SetAttributes(attribute.String("error.type", fmt.Sprintf("%T", err)))
}
tonytoshopline
requested changes
Jun 15, 2026
ThomasKwan-shopline
requested changes
Jun 17, 2026
|
|
||
| // messagingSystemPulsar: a single shared KeyValue reused across all | ||
| // pulsar spans to avoid allocating a new attribute per span. | ||
| var messagingSystemPulsar = semconv.MessagingSystemPulsar |
There was a problem hiding this comment.
你用得個一次就唔洗特登 define la, 同埋你直接係 usage call semconv.MessagingSystemPulsar 唔得咩
| // PRODUCER-kind "send" span with the spec-required attributes. | ||
| func (p *instrumentedProducer) startSendSpan(ctx context.Context) (context.Context, trace.Span) { | ||
| tracer := otel.Tracer(tracerName) | ||
| return tracer.Start(ctx, spanName(spanOpSend, p.topic), |
There was a problem hiding this comment.
可以用 p.Topic(), 唔需要pass topic入wrap到
Drop the redundant topic arg from wrapProducer and read it from the underlying ap.Producer via Topic(). Inline semconv.MessagingSystemPulsar directly in messagingAttributes, removing the local messagingSystemPulsar alias whose original 'avoid per-span allocation' justification was incorrect. Use the existing spanOpAck and spanOpNack constants in the consumer's settle-span call sites. Update fakeProducer with a Topic() method and adjust the 7 wrapProducer test call sites to match the new signature. Rename TestInjectTraceContext_DefensivelyCopies to TestInjectTraceContext_NoDefensiveCopy and flip its assertions to match what PulsarMessageCarrier actually does (map alias, no defensive copy).
…ducer Follow the Go constructor convention (new<StructName>) and make the function name reflect what it actually returns. The previous name 'wrapProducer' was generic; 'newInstrumentedProducer' pairs with the existing instrumentedProducer struct, signals that the returned type embeds OTel tracing, and matches the test-side type assertion in otel_producer_test.go. Pure rename — definition, one production call site in AddProducer, and the seven test call sites.
ThomasKwan-shopline
approved these changes
Jun 22, 2026
tonytoshopline
approved these changes
Jun 22, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Why?
Adding OpenTelemetry instrumentation for Pulsar. There's no official Otel support by Pulsar Client Go.
How?
Follow https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/ and instrument producer & consumer
Test Case
Full Trace

Producer

Consumer
