diff --git a/Makefile b/Makefile index 072865f..c2ed369 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ .PHONY: all build test test: - @PROJECT_ROOT=$(PWD) APP_ENV=test go test -timeout 5s -tags grpc,pulsar,kitex ./... + @PROJECT_ROOT=$(PWD) APP_ENV=test go test -timeout 5s -tags grpc,pulsar,kitex,otel ./... diff --git a/common/context.go b/common/context.go index b471692..8750de9 100644 --- a/common/context.go +++ b/common/context.go @@ -2,8 +2,11 @@ package common import ( "context" + "encoding/binary" + "math/rand/v2" "github.com/google/uuid" + "go.opentelemetry.io/otel/trace" ) func NewContextWithTraceID(ctx context.Context, traceId string) context.Context { @@ -13,10 +16,38 @@ func NewContextWithTraceID(ctx context.Context, traceId string) context.Context return context.WithValue(ctx, "trace_id", traceId) } +// GetTraceID returns the trace_id associated with ctx, looking in three places +// in order: +// +// 1. The legacy ctx["trace_id"] string key (seeded by the gRPC/Kitex bridge +// interceptors from the OTel SpanContext, or by NewContextWithTraceID). +// 2. The active OTel SpanContext on ctx (covers code paths that wrap a span +// without going through the bridge — e.g. background goroutines that +// inherit a parent span, or direct tracer.Start callers). +// 3. A freshly generated 16-byte W3C TraceID, so callers that invoke +// GetTraceID on a context.Background() / unset context still receive a +// valid W3C-format ID for log/Sentry/Pulsar correlation rather than "". func GetTraceID(ctx context.Context) string { - traceId := ctx.Value("trace_id") - if traceId == nil { - return uuid.New().String() + if v, ok := ctx.Value("trace_id").(string); ok && v != "" { + return v + } + if sc := trace.SpanContextFromContext(ctx); sc.IsValid() { + return sc.TraceID().String() + } + return newTraceID().String() +} + +// newTraceID returns a fresh non-zero 16-byte OTel TraceID, matching the +// OTel SDK's own randomIDGenerator.NewIDs pattern (see +// go.opentelemetry.io/otel/sdk/trace/id_generator.go). math/rand/v2's +// package-level functions are goroutine-safe, so no mutex is required. +func newTraceID() trace.TraceID { + var tid trace.TraceID + for { + binary.NativeEndian.PutUint64(tid[:8], rand.Uint64()) + binary.NativeEndian.PutUint64(tid[8:], rand.Uint64()) + if tid.IsValid() { + return tid + } } - return traceId.(string) } diff --git a/go.mod b/go.mod index 8c88604..48578eb 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/joho/godotenv v1.4.0 github.com/joonix/log v0.0.0-20200409080653-9c1d2ceb5f1d github.com/kamva/mgm/v3 v3.4.1 + github.com/kitex-contrib/obs-opentelemetry v0.3.0 github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.9.2 github.com/stoewer/go-strcase v1.3.0 @@ -39,8 +40,8 @@ require ( github.com/ardielle/ardielle-go v1.5.2 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bytedance/gopkg v0.1.3 // indirect - github.com/bytedance/sonic v1.14.1 // indirect - github.com/bytedance/sonic/loader v0.3.0 // indirect + github.com/bytedance/sonic v1.14.2 // indirect + github.com/bytedance/sonic/loader v0.4.0 // indirect github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudwego/base64x v0.1.6 // indirect @@ -101,7 +102,7 @@ require ( go.opentelemetry.io/proto/otlp v1.7.1 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/dig v1.14.0 // indirect - go.uber.org/multierr v1.6.0 // indirect + go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.21.0 // indirect golang.org/x/arch v0.14.0 // indirect golang.org/x/crypto v0.41.0 // indirect @@ -110,7 +111,9 @@ require ( golang.org/x/sys v0.35.0 // indirect golang.org/x/term v0.34.0 // indirect golang.org/x/text v0.28.0 // indirect - google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect + google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 // indirect google.golang.org/protobuf v1.36.8 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index f1daa58..a3280b8 100644 --- a/go.sum +++ b/go.sum @@ -77,10 +77,10 @@ github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D github.com/bytedance/gopkg v0.1.1/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM= github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M= github.com/bytedance/gopkg v0.1.3/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM= -github.com/bytedance/sonic v1.14.1 h1:FBMC0zVz5XUmE4z9wF4Jey0An5FueFvOsTKKKtwIl7w= -github.com/bytedance/sonic v1.14.1/go.mod h1:gi6uhQLMbTdeP0muCnrjHLeCUPyb70ujhnNlhOylAFc= -github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA= -github.com/bytedance/sonic/loader v0.3.0/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI= +github.com/bytedance/sonic v1.14.2 h1:k1twIoe97C1DtYUo+fZQy865IuHia4PR5RPiuGPPIIE= +github.com/bytedance/sonic v1.14.2/go.mod h1:T80iDELeHiHKSc0C9tubFygiuXoGzrkjKzX2quAx980= +github.com/bytedance/sonic/loader v0.4.0 h1:olZ7lEqcxtZygCK9EKYKADnpQoYkRQxaeY2NYzevs+o= +github.com/bytedance/sonic/loader v0.4.0/go.mod h1:AR4NYCk5DdzZizZ5djGqQ92eEhCCcdf5x77udYiSJRo= github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -346,6 +346,8 @@ github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaR github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kitex-contrib/obs-opentelemetry v0.3.0 h1:STAuMGRhmtZP1zHKZVl9vj7sxMXpu7nU3IqrslShzbo= +github.com/kitex-contrib/obs-opentelemetry v0.3.0/go.mod h1:OReZqYd24Q5djEtkRU2kMQEMq4auWtxJNk4FTKPlGHE= github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= @@ -505,6 +507,7 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= @@ -551,6 +554,10 @@ go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJyS go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0 h1:YH4g8lQroajqUwWbq/tr2QX1JFmEXaDLgG+ew9bLMWo= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0/go.mod h1:fvPi2qXDqFs8M4B4fmJhE92TyQs9Ydjlg3RvfUp+NbQ= +go.opentelemetry.io/contrib/propagators/b3 v1.20.0 h1:Yty9Vs4F3D6/liF1o6FNt0PvN85h/BJJ6DQKJ3nrcM0= +go.opentelemetry.io/contrib/propagators/b3 v1.20.0/go.mod h1:On4VgbkqYL18kbJlWsa18+cMNe6rYpBnPi1ARI/BrsU= +go.opentelemetry.io/contrib/propagators/ot v1.25.0 h1:9+54ye9caWA5XplhJoN6E8ECDKGeEsw/mqR4BIuZUfg= +go.opentelemetry.io/contrib/propagators/ot v1.25.0/go.mod h1:Fn0a9xFTClSSwNLpS1l0l55PkLHzr70RYlu+gUsPhHo= go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 h1:GqRJVj7UmLjCVyVJ3ZFLdPRmhDUp2zFmQe3RHIOsw24= @@ -580,8 +587,9 @@ go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= -go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= go.uber.org/zap v1.21.0 h1:WefMeulhovoZ2sYXz7st6K0sLj7bBhpiFaud4r4zST8= @@ -963,8 +971,12 @@ google.golang.org/genproto v0.0.0-20210310155132-4ce2db91004e/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20210319143718-93e7006c17a6/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210402141018-6c239bbf2bb1/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A= google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= -google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 h1:DdoeryqhaXp1LtT/emMP1BRJPHHKFi5akj/nbx/zNTA= -google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4/go.mod h1:NWraEVixdDnqcqQ30jipen1STv2r/n24Wb7twVTGR4s= +google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de h1:F6qOa9AZTYJXOUEr4jDysRDLrm4PHePlge4v4TGAlxY= +google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:VUhTRKeHn9wwcdrk73nvdC9gF178Tzhmt/qyaFcPLSo= +google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5 h1:BIRfGDEjiHRrk0QKZe3Xv2ieMhtgRGeLcZQ0mIVn4EY= +google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5/go.mod h1:j3QtIyytwqGr1JUDtYXwtMXWPKsEa5LtzIFN1Wn5WvE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 h1:eaY8u2EuxbRv7c3NiGK0/NedzVsCcV6hDuU5qPX5EGE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5/go.mod h1:M4/wBTSeyLxupu3W3tJtOgB14jILAS/XWPSSa3TAlJc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= diff --git a/plugins/grpc/interceptors/opentelemetry.go b/plugins/grpc/interceptors/opentelemetry.go deleted file mode 100644 index 2c2465d..0000000 --- a/plugins/grpc/interceptors/opentelemetry.go +++ /dev/null @@ -1,94 +0,0 @@ -//go:build grpc && otel -// +build grpc,otel - -package interceptors - -import ( - "context" - "path" - "strings" - - grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" - "github.com/shoplineapp/go-app/plugins" - "github.com/shoplineapp/go-app/plugins/opentelemetry" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" - "google.golang.org/grpc" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" -) - -func init() { - plugins.Registry = append(plugins.Registry, NewOtelInterceptor) -} - -type OtelInterceptor struct { - agent *opentelemetry.OtelAgent -} - -func (i OtelInterceptor) Handler() grpc.UnaryServerInterceptor { - customNewrelicInterceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - tracer := opentelemetry.GetTracer() - service := path.Dir(info.FullMethod)[1:] - if tracer == nil || service == "grpc.health.v1.Health" { - return handler(ctx, req) - } - - if v, ok := ctx.Value("trace_id").(string); ok && v != "" { - traceIDHex := strings.ReplaceAll(v, "-", "") - if tid, err := trace.TraceIDFromHex(traceIDHex); err == nil && tid.IsValid() { - spanContext := trace.SpanContextFromContext(ctx) - if !spanContext.IsValid() || spanContext.TraceID().String() != tid.String() { - sc := trace.NewSpanContext(trace.SpanContextConfig{ - TraceID: tid, - }) - ctx = trace.ContextWithSpanContext(ctx, sc) - } - } - } - - newCtx, span := tracer.Start(ctx, info.FullMethod) - - defer span.End() - - var attrs []attribute.KeyValue - - resp, err = handler(newCtx, req) - - if err != nil { - st, _ := status.FromError(err) - attrs = append(attrs, attribute.KeyValue{ - Key: "GrpcStatusMessage", - Value: attribute.StringValue(st.Message()), - }) - attrs = append(attrs, attribute.KeyValue{ - Key: "GrpcStatusCode", - Value: attribute.StringValue(st.Code().String()), - }) - span.RecordError(err) - } - - if md, ok := metadata.FromIncomingContext(ctx); ok { - for key, value := range md { - attrs = append(attrs, attribute.KeyValue{ - Key: attribute.Key(key), - Value: attribute.StringSliceValue(value), - }) - } - } - - span.SetAttributes(attrs...) - - return resp, err - } - - return grpc_middleware.ChainUnaryServer( - customNewrelicInterceptor, - ) -} - -func NewOtelInterceptor(agent *opentelemetry.OtelAgent) *OtelInterceptor { - return &OtelInterceptor{ - agent: agent, - } -} diff --git a/plugins/grpc/interceptors/recovery.go b/plugins/grpc/interceptors/recovery.go index eec988e..fd9f607 100644 --- a/plugins/grpc/interceptors/recovery.go +++ b/plugins/grpc/interceptors/recovery.go @@ -9,6 +9,7 @@ import ( "github.com/pkg/errors" "github.com/shoplineapp/go-app/plugins" app_grpc "github.com/shoplineapp/go-app/plugins/grpc" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "google.golang.org/grpc/codes" ) @@ -41,7 +42,10 @@ func (i RecoveryInterceptor) Handler() grpc.UnaryServerInterceptor { } // trace_id is captured into the ApplicationError for downstream // reporters (Sentry, structured logs) to attribute the panic. - traceID, _ := ctx.Value("trace_id").(string) + var traceID string + if sc := trace.SpanContextFromContext(ctx); sc.IsValid() { + traceID = sc.TraceID().String() + } err = app_grpc.NewApplicationError(traceID, err, codes.Internal, false, "panic recovered from RecoveryInterceptor") } }() diff --git a/plugins/grpc/interceptors/request_log.go b/plugins/grpc/interceptors/request_log.go index 47d79ba..6f37ca4 100644 --- a/plugins/grpc/interceptors/request_log.go +++ b/plugins/grpc/interceptors/request_log.go @@ -16,6 +16,7 @@ import ( "github.com/shoplineapp/go-app/plugins/env" "github.com/shoplineapp/go-app/plugins/logger" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" ) @@ -123,8 +124,13 @@ func (i RequestLogInterceptor) Handler() grpc.UnaryServerInterceptor { service := path.Dir(info.FullMethod)[1:] + var traceID string + if sc := trace.SpanContextFromContext(ctx); sc.IsValid() { + traceID = sc.TraceID().String() + } + log := i.logger.WithFields(logrus.Fields{ - "trace_id": ctx.Value("trace_id"), + "trace_id": traceID, "service": service, "method": path.Base(info.FullMethod), }) diff --git a/plugins/grpc/interceptors/trace_id.go b/plugins/grpc/interceptors/trace_id.go index 44325ea..c7093d0 100644 --- a/plugins/grpc/interceptors/trace_id.go +++ b/plugins/grpc/interceptors/trace_id.go @@ -1,15 +1,15 @@ -//go:build grpc -// +build grpc +//go:build grpc && otel +// +build grpc,otel package interceptors import ( "context" - "github.com/google/uuid" + "github.com/shoplineapp/go-app/common" "github.com/shoplineapp/go-app/plugins" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" - "google.golang.org/grpc/metadata" ) func init() { @@ -19,24 +19,16 @@ func init() { type TraceIdInterceptor struct { } +// Handler bridges the OTel SpanContext (populated by otelgrpc.NewServerHandler +// which is installed as a gRPC StatsHandler on the server) into the legacy +// ctx["trace_id"] string key that downstream consumers (e.g. common.GetTraceID, +// plugins/pulsar/producer.go) still rely on. func (i TraceIdInterceptor) Handler() grpc.UnaryServerInterceptor { - return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - traceId := uuid.New().String() - if md, ok := metadata.FromIncomingContext(ctx); ok { - if v := md.Get("x-trace-id"); len(v) > 0 { - traceId = v[0] - } else if v := md.Get("trace_id"); len(v) > 0 { - traceId = v[0] - } + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + if sc := trace.SpanContextFromContext(ctx); sc.IsValid() { + ctx = common.NewContextWithTraceID(ctx, sc.TraceID().String()) } - - ctx = context.WithValue(ctx, "trace_id", traceId) - - grpc.SetHeader(ctx, metadata.Pairs("x-trace-id", traceId)) - - resp, err = handler(ctx, req) - - return resp, err + return handler(ctx, req) } } diff --git a/plugins/grpc/presets/grpc_with_error_reporting.go b/plugins/grpc/presets/grpc_with_error_reporting.go index b549aab..cfd0f64 100644 --- a/plugins/grpc/presets/grpc_with_error_reporting.go +++ b/plugins/grpc/presets/grpc_with_error_reporting.go @@ -37,7 +37,6 @@ func NewDefaultGrpcServerWithErrorReporting( requestLog *interceptors.RequestLogInterceptor, recovery *interceptors.RecoveryInterceptor, sentry *interceptors.SentryInterceptor, - otlp *interceptors.OtelInterceptor, ) *DefaultGrpcServerWithErrorReporting { s := *grpcServer plugin := &DefaultGrpcServerWithErrorReporting{ @@ -51,7 +50,6 @@ func NewDefaultGrpcServerWithErrorReporting( sentry.Handler(), deadline.Handler(), recovery.Handler(), - otlp.Handler(), } grpc_plugin.SetGlobalServerOptions( diff --git a/plugins/grpc/presets/grpc_with_sentry.go b/plugins/grpc/presets/grpc_with_sentry.go index 63398b0..87c5e2c 100644 --- a/plugins/grpc/presets/grpc_with_sentry.go +++ b/plugins/grpc/presets/grpc_with_sentry.go @@ -37,7 +37,6 @@ func NewDefaultGrpcServerWithSentry( requestLog *interceptors.RequestLogInterceptor, recovery *interceptors.RecoveryInterceptor, sentry *interceptors.SentryInterceptor, - otlp *interceptors.OtelInterceptor, ) *DefaultGrpcServerWithSentry { s := *grpcServer plugin := &DefaultGrpcServerWithSentry{ @@ -51,7 +50,6 @@ func NewDefaultGrpcServerWithSentry( sentry.Handler(), deadline.Handler(), recovery.Handler(), - otlp.Handler(), } grpc_plugin.SetGlobalServerOptions( diff --git a/plugins/kitex/kitex.go b/plugins/kitex/kitex.go index 86a5cb2..5af3f35 100644 --- a/plugins/kitex/kitex.go +++ b/plugins/kitex/kitex.go @@ -1,5 +1,5 @@ -//go:build kitex -// +build kitex +//go:build kitex && otel +// +build kitex,otel package kitex @@ -14,6 +14,7 @@ import ( "github.com/cloudwego/kitex/pkg/endpoint" "github.com/cloudwego/kitex/server" kitex_server "github.com/cloudwego/kitex/server" + "github.com/kitex-contrib/obs-opentelemetry/tracing" "github.com/shoplineapp/go-app/plugins" "github.com/shoplineapp/go-app/plugins/env" "github.com/shoplineapp/go-app/plugins/kitex/middlewares" @@ -32,6 +33,7 @@ type KitexServer struct { wg *sync.WaitGroup server server.Server middlewares []endpoint.Middleware + suites []server.Suite kitexExit chan error } @@ -54,6 +56,12 @@ func (s *KitexServer) Configure(initializer func(opts ...kitex_server.Option) ki }), } + if s.suites != nil { + for _, suite := range s.suites { + options = append(options, kitex_server.WithSuite(suite)) + } + } + if s.middlewares != nil { for _, middleware := range s.middlewares { options = append(options, kitex_server.WithMiddleware(middleware)) @@ -70,6 +78,12 @@ func (s *KitexServer) SetMiddlewares(middlewares []endpoint.Middleware) { s.middlewares = middlewares } +// SetSuites attaches Kitex server.Suite options (e.g. OpenTelemetry tracing). +// The suites are applied in order after middlewares when Configure is called. +func (s *KitexServer) SetSuites(suites []server.Suite) { + s.suites = suites +} + func (s *KitexServer) RegisterGracefullyShutdown(lc fx.Lifecycle) { s.wg = &sync.WaitGroup{} @@ -120,6 +134,9 @@ func NewKitexServer( requestLogMiddleware.Handler, deadlineMiddleware.Handler, }, + suites: []server.Suite{ + tracing.NewServerSuite(), + }, } plugin.RegisterGracefullyShutdown(lc) return plugin diff --git a/plugins/kitex/middlewares/request_log.go b/plugins/kitex/middlewares/request_log.go index 385b0ca..f0bd3e7 100644 --- a/plugins/kitex/middlewares/request_log.go +++ b/plugins/kitex/middlewares/request_log.go @@ -9,6 +9,7 @@ import ( "github.com/shoplineapp/go-app/plugins" "github.com/shoplineapp/go-app/plugins/logger" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/trace" ) func init() { @@ -22,9 +23,13 @@ type KitexRequestLogMiddleware struct { func (m KitexRequestLogMiddleware) Handler(next endpoint.Endpoint) endpoint.Endpoint { return func(ctx context.Context, request, response interface{}) error { ri := rpcinfo.GetRPCInfo(ctx) + var traceID string + if sc := trace.SpanContextFromContext(ctx); sc.IsValid() { + traceID = sc.TraceID().String() + } logger := logrus.WithFields(logrus.Fields{ "Method": ri.To().Method(), - "trace_id": ctx.Value("trace_id"), + "trace_id": traceID, }) ctx = context.WithValue(ctx, "logger", logger) diff --git a/plugins/kitex/middlewares/trace_id.go b/plugins/kitex/middlewares/trace_id.go index 86e1660..0c30644 100644 --- a/plugins/kitex/middlewares/trace_id.go +++ b/plugins/kitex/middlewares/trace_id.go @@ -1,13 +1,15 @@ +//go:build kitex && otel +// +build kitex,otel + package middlewares import ( "context" "github.com/cloudwego/kitex/pkg/endpoint" - "github.com/google/uuid" + "github.com/shoplineapp/go-app/common" "github.com/shoplineapp/go-app/plugins" - "google.golang.org/grpc" - "google.golang.org/grpc/metadata" + "go.opentelemetry.io/otel/trace" ) func init() { @@ -17,19 +19,16 @@ func init() { type KitexTraceIDMiddleware struct { } +// Handler bridges the OTel SpanContext (populated by +// kitex-contrib/obs-opentelemetry's tracing.NewServerSuite) into the legacy +// ctx["trace_id"] string key that downstream consumers (e.g. common.GetTraceID, +// plugins/pulsar/producer.go) still rely on. func (m KitexTraceIDMiddleware) Handler(next endpoint.Endpoint) endpoint.Endpoint { return func(ctx context.Context, request, response interface{}) error { - traceId := uuid.New().String() - if md, ok := metadata.FromIncomingContext(ctx); ok { - if v := md.Get("x-trace-id"); len(v) > 0 { - traceId = v[0] - } + if sc := trace.SpanContextFromContext(ctx); sc.IsValid() { + ctx = common.NewContextWithTraceID(ctx, sc.TraceID().String()) } - - ctx = context.WithValue(ctx, "trace_id", traceId) - grpc.SetHeader(ctx, metadata.Pairs("x-trace-id", traceId)) - err := next(ctx, request, response) - return err + return next(ctx, request, response) } }