feat: add snmptrapreceiver and webhookexporter custom OTel components#104
feat: add snmptrapreceiver and webhookexporter custom OTel components#104Copilot wants to merge 4 commits into
Conversation
- receiver/snmptrapreceiver: listens for SNMP v1/v2c/v3 traps over UDP, decodes packets via gosnmp, emits one plog.LogRecord per trap with structured attributes and JSON body, size-guards the varbind list - exporter/webhookexporter: consumes plog.Logs from the pipeline and POSTs them to a configurable HTTP endpoint in single or batch mode, supports custom headers, TLS, and retryable/permanent error classification - internal/shared/components_base.go: registers both new factories - configs/snmptrap-col-config.yaml: example collector configuration - go.mod: promotes gosnmp to direct dependency Agent-Logs-Url: https://github.com/kloudmate/km-agent/sessions/1d8d5e08-f27b-489f-b4b6-0862b66082a8 Co-authored-by: amitava82 <1279530+amitava82@users.noreply.github.com>
Agent-Logs-Url: https://github.com/kloudmate/km-agent/sessions/1d8d5e08-f27b-489f-b4b6-0862b66082a8 Co-authored-by: amitava82 <1279530+amitava82@users.noreply.github.com>
|
@ansh-devs this is ready for review |
There was a problem hiding this comment.
Pull request overview
Adds two custom OpenTelemetry Collector components to the km-agent distribution: an SNMP trap logs receiver (snmptrap) and an HTTP webhook logs exporter (webhook), plus registration and an example Collector config to route traps to OTLP and a webhook.
Changes:
- Introduces
receiver/snmptrapreceiverto listen for SNMP traps over UDP, normalize them, and emit one OTelplog.LogRecordper trap. - Introduces
exporter/webhookexporterto flattenplog.Logsand POST JSON payloads to a configurable HTTP endpoint. - Registers the new factories in the shared component set and adds an example pipeline configuration.
Reviewed changes
Copilot reviewed 14 out of 15 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| receiver/snmptrapreceiver/types.go | Defines the internal normalized SNMP trap model (TrapEvent, TrapVarbind). |
| receiver/snmptrapreceiver/receiver.go | Implements UDP trap listener start/shutdown and forwarding into the logs pipeline. |
| receiver/snmptrapreceiver/mapper.go | Normalizes SNMP packets and varbind values into internal structures. |
| receiver/snmptrapreceiver/logs.go | Maps TrapEvent into OTel logs (resource/scope/record, attrs, body). |
| receiver/snmptrapreceiver/config.go | Adds receiver config + validation helpers. |
| receiver/snmptrapreceiver/factory.go | Adds receiver factory and default config. |
| exporter/webhookexporter/payload.go | Flattens OTel logs into outbound JSON record structures. |
| exporter/webhookexporter/exporter.go | Implements the HTTP delivery logic and HTTP error classification. |
| exporter/webhookexporter/config.go | Adds exporter configuration and validation. |
| exporter/webhookexporter/factory.go | Adds exporter factory and default config. |
| exporter/webhookexporter/client.go | Adds TLS/transport helpers (currently unused). |
| internal/shared/components_base.go | Registers snmptrap receiver and webhook exporter factories. |
| configs/snmptrap-col-config.yaml | Example Collector pipeline wiring traps to OTLP and webhook. |
| go.mod / go.sum | Adds gosnmp dependency and updates module requirements/sums. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| tl := gosnmp.NewTrapListener() | ||
| tl.Params = gosnmp.Default | ||
| tl.Params.Logger = gosnmp.NewLogger(&zapGosnmpLogger{logger: r.logger}) | ||
| tl.OnNewTrap = r.handleTrap | ||
|
|
There was a problem hiding this comment.
tl.Params = gosnmp.Default reuses and mutates the global gosnmp.Default instance (you then set tl.Params.Logger). This can have side effects on any other code using gosnmp.Default in-process. Also, receiver config fields like version, community, transport, and v3 are never applied to the listener params or used to filter/validate incoming traps, so the receiver cannot actually honor its configuration.
| // isClosedConnErr returns true if the error is a non-timeout net.Error, which is | ||
| // the signature of a "use of closed network connection" error produced when the | ||
| // listener is stopped cleanly. | ||
| func isClosedConnErr(err error) bool { | ||
| if err == nil { | ||
| return false | ||
| } | ||
| var netErr net.Error | ||
| if errors.As(err, &netErr) && !netErr.Timeout() { | ||
| return true | ||
| } | ||
| return false |
There was a problem hiding this comment.
isClosedConnErr currently returns true for any non-timeout net.Error. That will suppress logging of real failures (e.g., bind permission denied / address already in use) because those errors often satisfy net.Error with Timeout()==false. Consider checking errors.Is(err, net.ErrClosed) (or matching the specific closed-connection message) instead of treating all net.Error values as clean shutdowns.
| // isClosedConnErr returns true if the error is a non-timeout net.Error, which is | |
| // the signature of a "use of closed network connection" error produced when the | |
| // listener is stopped cleanly. | |
| func isClosedConnErr(err error) bool { | |
| if err == nil { | |
| return false | |
| } | |
| var netErr net.Error | |
| if errors.As(err, &netErr) && !netErr.Timeout() { | |
| return true | |
| } | |
| return false | |
| // isClosedConnErr returns true if the error indicates the network connection | |
| // was closed as part of a clean listener shutdown. | |
| func isClosedConnErr(err error) bool { | |
| if err == nil { | |
| return false | |
| } | |
| return errors.Is(err, net.ErrClosed) |
| // Validate checks that the configuration is self-consistent. | ||
| func (c *Config) Validate() error { | ||
| if c.Endpoint == "" { | ||
| return errors.New("endpoint is required") | ||
| } | ||
|
|
||
| // Parse the address out of "udp://host:port" or plain "host:port". | ||
| addr := c.Endpoint | ||
| if strings.HasPrefix(addr, "udp://") { | ||
| addr = strings.TrimPrefix(addr, "udp://") | ||
| } | ||
| if addr == "" { | ||
| return errors.New("endpoint address must not be empty") | ||
| } | ||
|
|
||
| allowed := map[string]bool{"1": true, "2c": true, "3": true, "auto": true} | ||
| if !allowed[c.Version] { | ||
| return fmt.Errorf("version must be one of 1, 2c, 3, auto; got %q", c.Version) | ||
| } |
There was a problem hiding this comment.
Config validation doesn't enforce the documented transport constraint (only UDP supported) and doesn't validate that the endpoint is a valid host:port (it only strips a udp:// prefix). This means invalid endpoints/transport values can pass Validate() but fail later at Start(). It would be better to validate transport and run net.SplitHostPort (after stripping scheme) inside Validate() to fail fast with a clear error.
| a := lr.Attributes() | ||
|
|
||
| a.PutStr("snmp.version", evt.Version) | ||
| a.PutStr("snmp.community", evt.Community) |
There was a problem hiding this comment.
The receiver always emits snmp.community as a log attribute. SNMP community strings are effectively credentials; emitting them into logs risks leaking secrets to downstream systems. Consider omitting this attribute by default and/or adding an explicit opt-in config to include/redact it.
| a.PutStr("snmp.community", evt.Community) |
| lr.Attributes().PutBool("snmp.payload_truncated", true) | ||
| lr.Attributes().PutInt("snmp.original_varbind_count", int64(originalCount)) | ||
| lr.Attributes().PutInt("snmp.dropped_varbind_count", int64(dropped)) | ||
| } | ||
|
|
||
| // Set body as a string-valued map in the pdata body. | ||
| lr.Body().SetStr(string(raw)) |
There was a problem hiding this comment.
The PR description and this file’s comments describe a structured JSON body, but the implementation sets the body to a JSON-encoded string (lr.Body().SetStr(string(raw))). This will cause downstream consumers (including webhookexporter) to see body as a string rather than an object. If you want a structured body, populate a pcommon.Value map (or set AnyValue fields) instead of serializing to a string; otherwise update the docs/comments accordingly.
| // extractVarbinds converts gosnmp PDU variables to TrapVarbind slice. | ||
| func extractVarbinds(vars []gosnmp.SnmpPDU, version string) []TrapVarbind { | ||
| out := make([]TrapVarbind, 0, len(vars)) | ||
| for _, v := range vars { | ||
| oid := normalizeOID(v.Name) | ||
| // Skip the standard header varbinds for v2c/v3 – they are already decoded | ||
| // into TrapEvent fields; include them in v1 where they are user payload. | ||
| if version != "1" && (oid == snmpTrapOIDObject || oid == sysUpTimeObject) { | ||
| continue | ||
| } | ||
| vb := TrapVarbind{ | ||
| OID: oid, | ||
| Name: oid, // No MIB resolution in this implementation; OID is used as the name. | ||
| Type: asn1BERName(v.Type), | ||
| } | ||
| vb.Value, vb.StringValue = normalizeValue(v) | ||
| out = append(out, vb) |
There was a problem hiding this comment.
emit_raw_varbinds is exposed in config, but varbind extraction always uses numeric OIDs and always includes the raw Value in the body payload; the flag is never consulted. Either wire this config through (e.g., pass cfg into extractVarbinds and conditionally include raw OIDs/values vs. only stringified/sanitized fields) or remove the unused setting to avoid a misleading config surface.
| // ConsumeLogs serialises and delivers the provided log records to the | ||
| // configured webhook endpoint. | ||
| func (e *webhookExporter) ConsumeLogs(ctx context.Context, logs plog.Logs) error { | ||
| records := flattenLogs(logs) | ||
| if len(records) == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| if e.cfg.PayloadMode == "single" { | ||
| for _, rec := range records { | ||
| payload, err := json.Marshal(buildBatchPayload([]outboundRecord{rec})) | ||
| if err != nil { | ||
| return consumererror.NewPermanent(fmt.Errorf("webhookexporter: failed to serialise record: %w", err)) | ||
| } | ||
| if err := e.send(ctx, payload); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // batch mode | ||
| batches := buildBatches(records, e.cfg.MaxBatchSize) | ||
| for _, batch := range batches { | ||
| payload, err := json.Marshal(buildBatchPayload(batch)) | ||
| if err != nil { | ||
| return consumererror.NewPermanent(fmt.Errorf("webhookexporter: failed to serialise batch: %w", err)) | ||
| } | ||
| if len(payload) > e.cfg.MaxPayloadBytes { | ||
| e.logger.Warn("webhookexporter: payload exceeds max_payload_bytes; sending anyway", | ||
| zap.Int("payload_bytes", len(payload)), | ||
| zap.Int("max_payload_bytes", e.cfg.MaxPayloadBytes)) | ||
| } | ||
| if err := e.send(ctx, payload); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| return nil |
There was a problem hiding this comment.
RetryOnFailure is part of the config surface (and the example YAML), but it is never used to implement any retry/backoff behavior. Right now retryable HTTP statuses just return an error and rely on upstream behavior. Either remove the config fields or wrap this exporter with the Collector’s helper retry/queue utilities so the advertised retry settings are actually honored.
| payload, err := json.Marshal(buildBatchPayload(batch)) | ||
| if err != nil { | ||
| return consumererror.NewPermanent(fmt.Errorf("webhookexporter: failed to serialise batch: %w", err)) | ||
| } | ||
| if len(payload) > e.cfg.MaxPayloadBytes { | ||
| e.logger.Warn("webhookexporter: payload exceeds max_payload_bytes; sending anyway", | ||
| zap.Int("payload_bytes", len(payload)), | ||
| zap.Int("max_payload_bytes", e.cfg.MaxPayloadBytes)) | ||
| } | ||
| if err := e.send(ctx, payload); err != nil { | ||
| return err | ||
| } |
There was a problem hiding this comment.
max_payload_bytes is treated as advisory only: when exceeded you log a warning and still send the oversized payload. This contradicts the config name and the comment in payload.go about enforcing byte-level splitting during serialization. Consider splitting batches further until under the limit, or returning a (permanent) error when the configured cap cannot be met.
| } | ||
|
|
||
| // buildBatches splits records into batches respecting maxCount. | ||
| // Byte-level splitting is enforced during serialisation in the exporter. |
There was a problem hiding this comment.
Comment says “Byte-level splitting is enforced during serialisation in the exporter”, but the exporter currently only warns when max_payload_bytes is exceeded and sends anyway. Update the comment or implement the byte-level enforcement so behavior matches the documentation/config.
| // Byte-level splitting is enforced during serialisation in the exporter. | |
| // It does not enforce any serialized byte-size limit. |
|
|
||
| import ( | ||
| "crypto/tls" | ||
| "net/http" | ||
| ) | ||
|
|
||
| // tlsConfig returns a *tls.Config for the HTTP transport. | ||
| func tlsConfig(insecureSkipVerify bool) *tls.Config { | ||
| return &tls.Config{ | ||
| InsecureSkipVerify: insecureSkipVerify, // #nosec G402 | ||
| MinVersion: tls.VersionTLS12, | ||
| } | ||
| } | ||
|
|
||
| // newHTTPTransport builds an http.RoundTripper with the provided TLS config. | ||
| func newHTTPTransport(cfg *tls.Config) http.RoundTripper { | ||
| return &http.Transport{ | ||
| TLSClientConfig: cfg, | ||
| } | ||
| } |
There was a problem hiding this comment.
tlsConfig / newHTTPTransport are currently unused (the exporter constructs its own http.Transport inline). Either use these helpers from Start() or remove this file to avoid dead code and duplicated TLS setup logic.
| import ( | |
| "crypto/tls" | |
| "net/http" | |
| ) | |
| // tlsConfig returns a *tls.Config for the HTTP transport. | |
| func tlsConfig(insecureSkipVerify bool) *tls.Config { | |
| return &tls.Config{ | |
| InsecureSkipVerify: insecureSkipVerify, // #nosec G402 | |
| MinVersion: tls.VersionTLS12, | |
| } | |
| } | |
| // newHTTPTransport builds an http.RoundTripper with the provided TLS config. | |
| func newHTTPTransport(cfg *tls.Config) http.RoundTripper { | |
| return &http.Transport{ | |
| TLSClientConfig: cfg, | |
| } | |
| } |
receiver/snmptrapreceiver: - config.go: validate transport (must be "udp") and host:port format in Validate() instead of deferring to Start(); add IncludeCommunity opt-in field - receiver.go: create fresh GoSNMP params instead of mutating gosnmp.Default; wire version/community/v3 config to listener; implement snmpVersionFromConfig and applyV3Params helpers; fix isClosedConnErr to use errors.Is(net.ErrClosed) - logs.go: emit snmp.community only when include_community:true; set body as structured pcommon.Map instead of JSON string; wire emit_raw_varbinds through body builder (typed value vs string representation) - mapper.go: correct misleading "last two OID components" comment exporter/webhookexporter: - exporter.go: implement sendWithRetry with exponential back-off using RetryOnFailure config; sub-split batches by bytes (splitByBytes) to enforce max_payload_bytes as a hard limit; use client.go helpers instead of inlining TLS config - payload.go: add splitByBytes; fix comment about byte-level enforcement - client.go: now used by exporter.go (no longer dead code) docs: - receiver/snmptrapreceiver/README.md: comprehensive doc (config, schema, OTel mapping, body format, varbind types, pipeline integration, SNMPv3, security) - exporter/webhookexporter/README.md: comprehensive doc (config, payload format, splitting, retry, auth, security, performance, error handling) - configs/snmptrap-col-config.yaml: add transport and include_community fields Agent-Logs-Url: https://github.com/kloudmate/km-agent/sessions/d3d5d8f8-5ca8-472a-bf08-05a540e36af0 Co-authored-by: amitava82 <1279530+amitava82@users.noreply.github.com>
- go.mod: restore require lines for km_classifier and ebpfreceiver that components_linux.go and components_kubernetes.go still depend on. The previous removal broke the Linux and Kubernetes builds. - snmptrap receiver: decouple the gosnmp callback from ConsumeLogs with a bounded queue + worker goroutine; configurable via queue_size. Drops are counted atomically and logged with throttling. - snmptrap receiver: gate SNMPv3 USM params strictly on version == "3" so v1/v2c traps under "auto" are no longer rejected. - snmptrap receiver: replace the O(N^2) body size guard with a single- pass per-varbind estimator (selectVarbindsForBudget). - webhookexporter: replace the O(N^2) splitByBytes with an O(N) pass that marshals each record once and partitions by accumulated size. - webhookexporter: rename component type "webhook" -> "km_webhook" to avoid collision with any future upstream component; delete the unused capability() helper. - Config validation tightened: in-place transport normalisation, support for udp4:// / udp6:// scheme prefixes, end-to-end SNMPv3 USM checks. - mapper: add OpaqueFloat / OpaqueDouble / BitString to asn1BERName and normalizeValue; use strconv.Atoi instead of fmt.Sscanf in splitAddr. - Docs: KloudMate Incident webhook integration section + a standalone snmptrap-incident-config.yaml showing the transform/incident OTTL recipe for the six generic SNMP traps (severity, status, dedup_key). - Tests: config / mapper / logs / payload coverage for both packages. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
@ansh-devs please review and test when you get a chance. We need to release this for testing with some users. |
Review fixes
transport(must be "udp") andhost:portformat insideValidate(); addinclude_communityopt-in fieldGoSNMPparams (never mutategosnmp.Default); wireversion/community/v3 config to listener params; addsnmpVersionFromConfig+applyV3Params; fixisClosedConnErrto useerrors.Is(err, net.ErrClosed)snmp.communityemitted only wheninclude_community: true; body set as structuredpcommon.Map(not JSON string);emit_raw_varbindswired through body builderRetryOnFailureconfig; enforcemax_payload_bytesviasplitByBytes(hard limit, not just a warning); useclient.goTLS helperssplitByBytes; fix commentexporter.go(no longer dead code)Documentation
receiver/snmptrapreceiver/README.md– comprehensive doc covering config reference, validation, OTel log schema, body format, varbind types, v3, security, performance, error handling, local developmentexporter/webhookexporter/README.md– comprehensive doc covering config reference, payload format, batch splitting, retry, payload modes comparison, auth, security, performance, error handlingValidation
go build ./receiver/snmptrapreceiver/... ./exporter/webhookexporter/...passesgo vet ./receiver/snmptrapreceiver/... ./exporter/webhookexporter/...passes