From 209bb8c6a110d9486d2049616d1acecb6d886211 Mon Sep 17 00:00:00 2001 From: Bo Blanton Date: Fri, 13 Feb 2015 08:29:31 -0800 Subject: [PATCH 1/3] Add a few things 1) more stats to timers 2) RetainKeys options for Buffer client to keep send stats even if they are 0 3) send stats in may lines (not one connection per line) 4) Add "Reset" to events (to deal with retain keys) 5) Add a NoOp client for easy switching when there is no statsd in your land 6) ReCycleConnection for buffered client to close connections after sending (in order to pick up and new IPs) --- bufferedclient.go | 65 +++++++++++++++++++++++++++++----------- client.go | 28 +++++++++++++++-- event/absolute.go | 5 ++++ event/fabsolute.go | 5 ++++ event/fgauge.go | 4 +++ event/fgaugedelta.go | 5 ++++ event/gauge.go | 5 ++++ event/gaugedelta.go | 5 ++++ event/increment.go | 5 ++++ event/interface.go | 1 + event/precisiontiming.go | 13 ++++++-- event/timing.go | 53 ++++++++++++++++++++++---------- event/total.go | 5 ++++ noopclient.go | 20 +++++++++++++ 14 files changed, 181 insertions(+), 38 deletions(-) create mode 100644 noopclient.go diff --git a/bufferedclient.go b/bufferedclient.go index 4572452..5ab833c 100644 --- a/bufferedclient.go +++ b/bufferedclient.go @@ -1,12 +1,11 @@ package statsd import ( + "./event" "log" "os" "strings" "time" - - "github.com/quipo/statsd/event" ) // request to close the buffered statsd collector @@ -18,23 +17,27 @@ type closeRequest struct { // flushing aggregates to StatsD, useful if the frequency of events is extremely high // and sampling is not desirable type StatsdBuffer struct { - statsd *StatsdClient - flushInterval time.Duration - eventChannel chan event.Event - events map[string]event.Event - closeChannel chan closeRequest - Logger *log.Logger + statsd *StatsdClient + flushInterval time.Duration + eventChannel chan event.Event + events map[string]event.Event + closeChannel chan closeRequest + Logger *log.Logger + RetainKeys bool + ReCycleConnection bool //close the Statsd connection each time after send (helps with picking up new hosts) } // NewStatsdBuffer Factory func NewStatsdBuffer(interval time.Duration, client *StatsdClient) *StatsdBuffer { sb := &StatsdBuffer{ - flushInterval: interval, - statsd: client, - eventChannel: make(chan event.Event, 100), - events: make(map[string]event.Event, 0), - closeChannel: make(chan closeRequest, 0), - Logger: log.New(os.Stdout, "[BufferedStatsdClient] ", log.Ldate|log.Ltime), + flushInterval: interval, + statsd: client, + eventChannel: make(chan event.Event, 100), + events: make(map[string]event.Event, 0), + closeChannel: make(chan closeRequest, 0), + Logger: log.New(os.Stdout, "[BufferedStatsdClient] ", log.Ldate|log.Ltime), + RetainKeys: false, + ReCycleConnection: true, } go sb.collector() return sb @@ -149,7 +152,7 @@ func (sb *StatsdBuffer) collector() { } case c := <-sb.closeChannel: sb.Logger.Println("Asked to terminate. Flushing stats before returning.") - c.reply <- sb.flush() + c.reply <- sb.flush() break } } @@ -180,16 +183,42 @@ func (sb *StatsdBuffer) flush() (err error) { return nil } err = sb.statsd.CreateSocket() + + if sb.ReCycleConnection { + defer sb.statsd.Close() + } + if nil != err { sb.Logger.Println("Error establishing UDP connection for sending statsd events:", err) } + + //buffer stats in 512k blocks save some frames + var buffersize = 512 + var out_string = "" + for k, v := range sb.events { - err := sb.statsd.SendEvent(v) + + if len([]byte(out_string)) >= buffersize { + sb.statsd.SendRaw(out_string) + out_string = "" + } + str, err := sb.statsd.EventStatsdString(v) if nil != err { sb.Logger.Println(err) + continue + } + out_string += str + + // if we are "retaining" the names (so that we can send "0"s do NOT delete + if sb.RetainKeys { + sb.events[k].Reset() + } else { + delete(sb.events, k) } - //sb.Logger.Println("Sent", v.String()) - delete(sb.events, k) + } + + if len(out_string) > 0 { + sb.statsd.SendRaw(out_string) } return nil diff --git a/client.go b/client.go index a8037e7..0af4a80 100644 --- a/client.go +++ b/client.go @@ -8,7 +8,7 @@ import ( "strings" "time" - "github.com/quipo/statsd/event" + event "./event" ) // note Hostname is exported so clients can set it to something different than the default @@ -160,13 +160,37 @@ func (c *StatsdClient) send(stat string, format string, value interface{}) error return err } +func (c *StatsdClient) SendRaw(buffer string) error { + if c.conn == nil { + return fmt.Errorf("not connected") + } + //log.Printf("SENDING EVENT %s", buffer) + + _, err := fmt.Fprintf(c.conn, buffer) + return err + +} + +func (c *StatsdClient) EventStatsdString(e event.Event) (string, error) { + var out_str = "" + + for _, stat := range e.Stats() { + str := fmt.Sprintf("%s%s", c.prefix, stat) + if len(str) > 0 { + out_str += str + "\n" + } + } + return out_str, nil + +} + // SendEvent - Sends stats from an event object func (c *StatsdClient) SendEvent(e event.Event) error { if c.conn == nil { return fmt.Errorf("cannot send stats, not connected to StatsD server") } for _, stat := range e.Stats() { - //fmt.Printf("SENDING EVENT %s%s\n", c.prefix, stat) + fmt.Printf("SENDING EVENT %s%s\n", c.prefix, stat) _, err := fmt.Fprintf(c.conn, "%s%s", c.prefix, stat) if nil != err { return err diff --git a/event/absolute.go b/event/absolute.go index 67be7f8..f1aded5 100644 --- a/event/absolute.go +++ b/event/absolute.go @@ -18,6 +18,11 @@ func (e *Absolute) Update(e2 Event) error { return nil } +//Reset the value +func (e *Absolute) Reset() { + e.Values = []int64{} +} + // Payload returns the aggregated value for this event func (e Absolute) Payload() interface{} { return e.Values diff --git a/event/fabsolute.go b/event/fabsolute.go index ec13011..b18350c 100644 --- a/event/fabsolute.go +++ b/event/fabsolute.go @@ -18,6 +18,11 @@ func (e *FAbsolute) Update(e2 Event) error { return nil } +//Reset the value +func (e *FAbsolute) Reset() { + e.Values = []float64{} +} + // Payload returns the aggregated value for this event func (e FAbsolute) Payload() interface{} { return e.Values diff --git a/event/fgauge.go b/event/fgauge.go index 463c3a4..69a7340 100644 --- a/event/fgauge.go +++ b/event/fgauge.go @@ -19,6 +19,10 @@ func (e *FGauge) Update(e2 Event) error { return nil } +//Reset the value GAUGES TO NOT RESET +func (e *FGauge) Reset() { +} + // Payload returns the aggregated value for this event func (e FGauge) Payload() interface{} { return e.Value diff --git a/event/fgaugedelta.go b/event/fgaugedelta.go index 232098a..9891299 100644 --- a/event/fgaugedelta.go +++ b/event/fgaugedelta.go @@ -24,6 +24,11 @@ func (e FGaugeDelta) Payload() interface{} { return e.Value } +//Reset the value +func (e *FGaugeDelta) Reset() { + e.Value = 0.0 +} + // Stats returns an array of StatsD events as they travel over UDP func (e FGaugeDelta) Stats() []string { if e.Value < 0 { diff --git a/event/gauge.go b/event/gauge.go index 0a36905..2c12784 100644 --- a/event/gauge.go +++ b/event/gauge.go @@ -24,6 +24,11 @@ func (e Gauge) Payload() interface{} { return e.Value } +//Reset the value GAUGES TO NOT RESET +func (e *Gauge) Reset() { + +} + // Stats returns an array of StatsD events as they travel over UDP func (e Gauge) Stats() []string { if e.Value < 0 { diff --git a/event/gaugedelta.go b/event/gaugedelta.go index 2a5ea7b..9a884d0 100644 --- a/event/gaugedelta.go +++ b/event/gaugedelta.go @@ -24,6 +24,11 @@ func (e GaugeDelta) Payload() interface{} { return e.Value } +//Reset the value +func (e *GaugeDelta) Reset() { + e.Value = 0 +} + // Stats returns an array of StatsD events as they travel over UDP func (e GaugeDelta) Stats() []string { if e.Value < 0 { diff --git a/event/increment.go b/event/increment.go index e617083..4369b06 100644 --- a/event/increment.go +++ b/event/increment.go @@ -17,6 +17,11 @@ func (e *Increment) Update(e2 Event) error { return nil } +//Reset the value +func (e *Increment) Reset() { + e.Value = 0 +} + // Payload returns the aggregated value for this event func (e Increment) Payload() interface{} { return e.Value diff --git a/event/interface.go b/event/interface.go index f174855..310f45c 100644 --- a/event/interface.go +++ b/event/interface.go @@ -24,4 +24,5 @@ type Event interface { String() string Key() string SetKey(string) + Reset() } diff --git a/event/precisiontiming.go b/event/precisiontiming.go index 587cd85..f2ab542 100644 --- a/event/precisiontiming.go +++ b/event/precisiontiming.go @@ -32,6 +32,14 @@ func (e *PrecisionTiming) Update(e2 Event) error { return nil } +//Reset the value +func (e *PrecisionTiming) Reset() { + e.Value = 0 + e.Count = 1 + e.Min = time.Duration(0) + e.Max = time.Duration(0) +} + // Payload returns the aggregated value for this event func (e PrecisionTiming) Payload() interface{} { return e @@ -40,9 +48,10 @@ func (e PrecisionTiming) Payload() interface{} { // Stats returns an array of StatsD events as they travel over UDP func (e PrecisionTiming) Stats() []string { return []string{ + fmt.Sprintf("%s.count:%d|a", e.Name, int64(e.Count)), fmt.Sprintf("%s.avg:%.6f|a", e.Name, float64(int64(e.Value)/e.Count)), // make sure e.Count != 0 - fmt.Sprintf("%s.min:%.6f|a", e.Name, e.Min), - fmt.Sprintf("%s.max:%.6f|a", e.Name, e.Max), + fmt.Sprintf("%s.min:%.6f|a", e.Name, float64(e.Min)), + fmt.Sprintf("%s.max:%.6f|a", e.Name, float64(e.Max)), } } diff --git a/event/timing.go b/event/timing.go index 273a1ce..9fae141 100644 --- a/event/timing.go +++ b/event/timing.go @@ -1,14 +1,16 @@ package event import "fmt" +import "math" // Timing keeps min/max/avg information about a timer over a certain interval type Timing struct { - Name string - Min int64 - Max int64 - Value int64 - Count int64 + Name string + Min int64 + Max int64 + Value int64 + Values []int64 + Count int64 } // NewTiming is a factory for a Timing event, setting the Count to 1 to prevent div_by_0 errors @@ -21,30 +23,49 @@ func (e *Timing) Update(e2 Event) error { if e.Type() != e2.Type() { return fmt.Errorf("statsd event type conflict: %s vs %s ", e.String(), e2.String()) } - p := e2.Payload().(map[string]int64) - e.Count += p["cnt"] - e.Value += p["val"] - e.Min = minInt64(e.Min, p["min"]) - e.Max = maxInt64(e.Max, p["max"]) + p := e2.Payload().(map[string]interface{}) + e.Count += p["cnt"].(int64) + e.Value += p["val"].(int64) + e.Min = minInt64(e.Min, p["min"].(int64)) + e.Max = maxInt64(e.Max, p["max"].(int64)) + e.Values = append(e.Values, p["val"].(int64)) return nil } +//Reset the value +func (e *Timing) Reset() { + e.Value = 0 + e.Count = 1 + e.Min = 0 + e.Max = 0 + e.Values = []int64{} +} + // Payload returns the aggregated value for this event func (e Timing) Payload() interface{} { - return map[string]int64{ - "min": e.Min, - "max": e.Max, - "val": e.Value, - "cnt": e.Count, + return map[string]interface{}{ + "min": e.Min, + "max": e.Max, + "val": e.Value, + "cnt": e.Count, + "vals": e.Values, } } // Stats returns an array of StatsD events as they travel over UDP func (e Timing) Stats() []string { + std := float64(0) + avg := float64(e.Value / e.Count) + for _, v := range e.Values { + std += math.Pow((float64(v) - avg), 2.0) + } + std = math.Sqrt(std / float64(e.Count)) return []string{ - fmt.Sprintf("%s.avg:%d|a", e.Name, int64(e.Value/e.Count)), // make sure e.Count != 0 + fmt.Sprintf("%s.count:%d|a", e.Name, int64(e.Count)), + fmt.Sprintf("%s.avg:%d|a", e.Name, int64(avg)), // make sure e.Count != 0 fmt.Sprintf("%s.min:%d|a", e.Name, e.Min), fmt.Sprintf("%s.max:%d|a", e.Name, e.Max), + fmt.Sprintf("%s.std:%d|a", e.Name, int64(std)), } } diff --git a/event/total.go b/event/total.go index 6c3ca3e..23a0385 100644 --- a/event/total.go +++ b/event/total.go @@ -17,6 +17,11 @@ func (e *Total) Update(e2 Event) error { return nil } +//Reset the value +func (e *Total) Reset() { + e.Value = 0 +} + // Payload returns the aggregated value for this event func (e Total) Payload() interface{} { return e.Value diff --git a/noopclient.go b/noopclient.go new file mode 100644 index 0000000..a06a62d --- /dev/null +++ b/noopclient.go @@ -0,0 +1,20 @@ +package statsd + + +//impliment a "noop" statsd in case there is no statsd +type StatsdNoop struct{} + +func (s StatsdNoop) CreateSocket() error { return nil } +func (s StatsdNoop) Close() error { return nil } +func (s StatsdNoop) Incr(stat string, count int64) error { return nil } +func (s StatsdNoop) Decr(stat string, count int64) error { return nil } +func (s StatsdNoop) Timing(stat string, count int64) error { return nil } +func (s StatsdNoop) PrecisionTiming(stat string, delta time.Duration) error { return nil } +func (s StatsdNoop) Gauge(stat string, value int64) error { return nil } +func (s StatsdNoop) GaugeDelta(stat string, value int64) error { return nil } +func (s StatsdNoop) Absolute(stat string, value int64) error { return nil } +func (s StatsdNoop) Total(stat string, value int64) error { return nil } +func (s StatsdNoop) FGauge(stat string, value float64) error { return nil } +func (s StatsdNoop) FGaugeDelta(stat string, value float64) error { return nil } +func (s StatsdNoop) FAbsolute(stat string, value float64) error { return nil } + From e27fa002093941b6bc4c65e7c7e919095ef1c1c0 Mon Sep 17 00:00:00 2001 From: Bo Blanton Date: Fri, 13 Feb 2015 08:34:52 -0800 Subject: [PATCH 2/3] remove printf --- client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client.go b/client.go index 0af4a80..0070256 100644 --- a/client.go +++ b/client.go @@ -190,7 +190,7 @@ func (c *StatsdClient) SendEvent(e event.Event) error { return fmt.Errorf("cannot send stats, not connected to StatsD server") } for _, stat := range e.Stats() { - fmt.Printf("SENDING EVENT %s%s\n", c.prefix, stat) + //fmt.Printf("SENDING EVENT %s%s\n", c.prefix, stat) _, err := fmt.Fprintf(c.conn, "%s%s", c.prefix, stat) if nil != err { return err From 616bf4912c06aac88ce861347ca91378f3644b77 Mon Sep 17 00:00:00 2001 From: Bo Blanton Date: Thu, 18 Aug 2016 16:17:11 -0700 Subject: [PATCH 3/3] Several changes - EchoClient: prints the lines as the would go out - FloatGauge - String() methods for nice printing - mend some go race conditions - add median/std to buffered timers - add sample rates for clients (static: one for counters, one for timers) - add stats to the expvars so we can seem them on the profile pages --- .gitignore | 2 + bufferedclient.go | 241 +++++++++++++++++++++++++++++++++++++-- client.go | 122 +++++++++++++++++--- echoclient.go | 31 +++++ event/absolute.go | 17 ++- event/fabsolute.go | 17 ++- event/fgauge.go | 15 ++- event/fgaugedelta.go | 15 ++- event/gauge.go | 17 ++- event/gaugeabsolute.go | 77 +++++++++++++ event/gaugeavg.go | 92 +++++++++++++++ event/gaugedelta.go | 14 ++- event/increment.go | 14 ++- event/interface.go | 10 +- event/precisiontiming.go | 18 ++- event/timing.go | 111 ++++++++++++++---- event/total.go | 19 +-- interface.go | 4 + noopclient.go | 7 +- 19 files changed, 758 insertions(+), 85 deletions(-) create mode 100644 .gitignore create mode 100644 echoclient.go create mode 100644 event/gaugeabsolute.go create mode 100644 event/gaugeavg.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..337ad02 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +statsd-github.iml +.idea diff --git a/bufferedclient.go b/bufferedclient.go index 5ab833c..5170057 100644 --- a/bufferedclient.go +++ b/bufferedclient.go @@ -1,9 +1,11 @@ package statsd import ( - "./event" + "expvar" "log" + "math/rand" "os" + "github.com/wyndhblb/statsd/event" "strings" "time" ) @@ -13,44 +15,90 @@ type closeRequest struct { reply chan error } +var ( + ExpMapCounts *expvar.Map + ExpMapGauges *expvar.Map + ExpMapTimers *expvar.Map +) + +func init() { + + ExpMapCounts = expvar.NewMap("counters") + ExpMapGauges = expvar.NewMap("gauges") + ExpMapTimers = expvar.NewMap("timers") +} + // StatsdBuffer is a client library to aggregate events in memory before // flushing aggregates to StatsD, useful if the frequency of events is extremely high // and sampling is not desirable type StatsdBuffer struct { statsd *StatsdClient + name string flushInterval time.Duration eventChannel chan event.Event events map[string]event.Event closeChannel chan closeRequest Logger *log.Logger RetainKeys bool + AddExpvar bool // add stats to expvar list of vars ReCycleConnection bool //close the Statsd connection each time after send (helps with picking up new hosts) + BufferLength int //buffer length before we send a multi line stat + SampleRate float32 + TimerSampleRate float32 } // NewStatsdBuffer Factory -func NewStatsdBuffer(interval time.Duration, client *StatsdClient) *StatsdBuffer { +func NewStatsdBuffer(name string, interval time.Duration, client *StatsdClient) *StatsdBuffer { sb := &StatsdBuffer{ flushInterval: interval, + name: name, statsd: client, eventChannel: make(chan event.Event, 100), events: make(map[string]event.Event, 0), closeChannel: make(chan closeRequest, 0), Logger: log.New(os.Stdout, "[BufferedStatsdClient] ", log.Ldate|log.Ltime), RetainKeys: false, + AddExpvar: true, ReCycleConnection: true, + BufferLength: 512, + SampleRate: 1.0, + TimerSampleRate: 1.0, } + rand.Seed(time.Now().UnixNano()) go sb.collector() return sb } +func (sb *StatsdBuffer) String() string { + return sb.statsd.String() +} + // CreateSocket creates a UDP connection to a StatsD server func (sb *StatsdBuffer) CreateSocket() error { return sb.statsd.CreateSocket() } +func (sb *StatsdBuffer) registerStat() bool { + if sb.SampleRate >= 1.0 { + return true + } + return rand.Float32() < sb.SampleRate +} + +func (sb *StatsdBuffer) registerTimerStat(samplerate float32) bool { + if samplerate >= 1.0 { + return true + } + return rand.Float32() < samplerate +} + // Incr - Increment a counter metric. Often used to note a particular event func (sb *StatsdBuffer) Incr(stat string, count int64) error { + if !sb.registerStat() { + return nil + } if 0 != count { + //ct := int64(float32(count) / sb.SampleRate) sb.eventChannel <- &event.Increment{Name: stat, Value: count} } return nil @@ -58,7 +106,11 @@ func (sb *StatsdBuffer) Incr(stat string, count int64) error { // Decr - Decrement a counter metric. Often used to note a particular event func (sb *StatsdBuffer) Decr(stat string, count int64) error { + if !sb.registerStat() { + return nil + } if 0 != count { + //ct := int64(float32(count) / sb.SampleRate) sb.eventChannel <- &event.Increment{Name: stat, Value: -count} } return nil @@ -66,13 +118,38 @@ func (sb *StatsdBuffer) Decr(stat string, count int64) error { // Timing - Track a duration event func (sb *StatsdBuffer) Timing(stat string, delta int64) error { - sb.eventChannel <- event.NewTiming(stat, delta) + if !sb.registerTimerStat(sb.TimerSampleRate) { + return nil + } + sb.eventChannel <- event.NewTiming(stat, delta, float64(sb.TimerSampleRate)) + return nil +} + +// TimingSampling - Track a duration event at a sampling rate +func (sb *StatsdBuffer) TimingSampling(stat string, delta int64, samplerate float32) error { + if !sb.registerTimerStat(samplerate) { + return nil + } + sb.eventChannel <- event.NewTiming(stat, delta, float64(sb.TimerSampleRate)) + return nil +} + +// PrecisionTiming - Track a duration event +// the time delta has to be a duration +func (sb *StatsdBuffer) PrecisionTimingSampling(stat string, delta time.Duration, samplerate float32) error { + if !sb.registerTimerStat(samplerate) { + return nil + } + sb.eventChannel <- event.NewPrecisionTiming(stat, time.Duration(float64(delta)/float64(time.Millisecond))) return nil } // PrecisionTiming - Track a duration event // the time delta has to be a duration func (sb *StatsdBuffer) PrecisionTiming(stat string, delta time.Duration) error { + if !sb.registerTimerStat(sb.TimerSampleRate) { + return nil + } sb.eventChannel <- event.NewPrecisionTiming(stat, time.Duration(float64(delta)/float64(time.Millisecond))) return nil } @@ -81,43 +158,179 @@ func (sb *StatsdBuffer) PrecisionTiming(stat string, delta time.Duration) error // and they don’t change unless you change them. That is, once you set a gauge value, // it will be a flat line on the graph until you change it again func (sb *StatsdBuffer) Gauge(stat string, value int64) error { + if !sb.registerStat() { + return nil + } sb.eventChannel <- &event.Gauge{Name: stat, Value: value} return nil } +// GaugeAbsolute which DOES NOT "add" values in the buffer, just +// resets the value to the current value, here in the non-buffered client it is +// the same as a Gauge +func (sb *StatsdBuffer) GaugeAbsolute(stat string, value int64) error { + if !sb.registerStat() { + return nil + } + sb.eventChannel <- &event.GaugeAbsolute{Name: stat, Value: value} + return nil +} + +func (sb *StatsdBuffer) GaugeAvg(stat string, value int64) error { + if !sb.registerStat() { + return nil + } + sb.eventChannel <- &event.GaugeAvg{Name: stat, Value: value} + return nil +} + func (sb *StatsdBuffer) GaugeDelta(stat string, value int64) error { + if !sb.registerStat() { + return nil + } sb.eventChannel <- &event.GaugeDelta{Name: stat, Value: value} return nil } func (sb *StatsdBuffer) FGauge(stat string, value float64) error { + if !sb.registerStat() { + return nil + } sb.eventChannel <- &event.FGauge{Name: stat, Value: value} return nil } func (sb *StatsdBuffer) FGaugeDelta(stat string, value float64) error { + if !sb.registerStat() { + return nil + } sb.eventChannel <- &event.FGaugeDelta{Name: stat, Value: value} return nil } // Absolute - Send absolute-valued metric (not averaged/aggregated) func (sb *StatsdBuffer) Absolute(stat string, value int64) error { + if !sb.registerStat() { + return nil + } sb.eventChannel <- &event.Absolute{Name: stat, Values: []int64{value}} return nil } // FAbsolute - Send absolute-valued metric (not averaged/aggregated) func (sb *StatsdBuffer) FAbsolute(stat string, value float64) error { + if !sb.registerStat() { + return nil + } sb.eventChannel <- &event.FAbsolute{Name: stat, Values: []float64{value}} return nil } // Total - Send a metric that is continously increasing, e.g. read operations since boot func (sb *StatsdBuffer) Total(stat string, value int64) error { + if !sb.registerStat() { + return nil + } sb.eventChannel <- &event.Total{Name: stat, Value: value} return nil } +func (sb *StatsdBuffer) expCollect(ev event.Event) { + payload := ev.Payload() + k := ev.Key() + + var use_map *expvar.Map + + switch ev.StatClass() { + case "counter": + use_map = ExpMapCounts + switch payload.(type) { + case int64: + use_map.Add(k, payload.(int64)) + + case float64: + use_map.AddFloat(k, payload.(float64)) + + case []int64: + p_len := len(payload.([]int64)) + if p_len > 0 { + use_map.Add(k, payload.([]int64)[p_len-1]) + } + case []float64: + p_len := len(payload.([]float64)) + if p_len > 0 { + use_map.AddFloat(k, payload.([]float64)[p_len-1]) + } + case map[string]interface{}: + use_map.Add(k, payload.(map[string]interface{})["val"].(int64)) + + } + case "gauge": + use_map = ExpMapGauges + switch payload.(type) { + case int64: + v := new(expvar.Int) + v.Set(payload.(int64)) + use_map.Set(k, v) + case float64: + v := new(expvar.Float) + v.Set(payload.(float64)) + use_map.Set(k, v) + + case []int64: + p_len := len(payload.([]int64)) + if p_len > 0 { + v := new(expvar.Int) + v.Set(payload.([]int64)[p_len-1]) + use_map.Set(k, v) + } + case []float64: + p_len := len(payload.([]float64)) + if p_len > 0 { + v := new(expvar.Float) + v.Set(payload.([]float64)[p_len-1]) + use_map.Set(k, v) + } + case map[string]interface{}: + v := new(expvar.Int) + v.Set(payload.(map[string]interface{})["val"].(int64)) + use_map.Set(k, v) + } + case "timer": + use_map = ExpMapTimers + switch payload.(type) { + case int64: + v := new(expvar.Int) + v.Set(payload.(int64)) + use_map.Set(k, v) + case float64: + v := new(expvar.Float) + v.Set(payload.(float64)) + use_map.Set(k, v) + + case []int64: + p_len := len(payload.([]int64)) + if p_len > 0 { + v := new(expvar.Int) + v.Set(payload.([]int64)[p_len-1]) + use_map.Set(k, v) + } + case []float64: + p_len := len(payload.([]float64)) + if p_len > 0 { + v := new(expvar.Float) + v.Set(payload.([]float64)[p_len-1]) + use_map.Set(k, v) + } + case map[string]interface{}: + v := new(expvar.Int) + v.Set(payload.(map[string]interface{})["val"].(int64)) + use_map.Set(k, v) + } + } + +} + // handle flushes and updates in one single thread (instead of locking the events map) func (sb *StatsdBuffer) collector() { // on a panic event, flush all the pending stats before panicking @@ -150,9 +363,11 @@ func (sb *StatsdBuffer) collector() { //sb.Logger.Println("Adding new event") sb.events[k] = e } + sb.expCollect(sb.events[k]) + case c := <-sb.closeChannel: sb.Logger.Println("Asked to terminate. Flushing stats before returning.") - c.reply <- sb.flush() + c.reply <- sb.flush() break } } @@ -192,21 +407,27 @@ func (sb *StatsdBuffer) flush() (err error) { sb.Logger.Println("Error establishing UDP connection for sending statsd events:", err) } - //buffer stats in 512k blocks save some frames - var buffersize = 512 + //buffer stats in BufferLength blocks save some frames var out_string = "" for k, v := range sb.events { + var str string + switch v.(type) { + case *event.Timing, *event.PrecisionTiming: + str, err = sb.statsd.EventStatsdStringTimerSample(v, sb.flushInterval, sb.TimerSampleRate) - if len([]byte(out_string)) >= buffersize { - sb.statsd.SendRaw(out_string) - out_string = "" + default: + str, err = sb.statsd.EventStatsdString(v, sb.flushInterval, sb.SampleRate) } - str, err := sb.statsd.EventStatsdString(v) if nil != err { sb.Logger.Println(err) continue } + if len([]byte(out_string+str)) >= sb.BufferLength { + sb.statsd.SendRaw(out_string) + out_string = "" + } + out_string += str // if we are "retaining" the names (so that we can send "0"s do NOT delete diff --git a/client.go b/client.go index 0070256..9e49b44 100644 --- a/client.go +++ b/client.go @@ -3,12 +3,13 @@ package statsd import ( "fmt" "log" + "math/rand" "net" "os" "strings" "time" - event "./event" + "github.com/wyndhblb/statsd/event" ) // note Hostname is exported so clients can set it to something different than the default @@ -23,23 +24,42 @@ func init() { // StatsdClient is a client library to send events to StatsD type StatsdClient struct { - conn net.Conn - addr string - prefix string - Logger *log.Logger + conn net.Conn + addr string + prefix string + Logger *log.Logger + SampleRate float32 + TimerSampleRate float32 } // NewStatsdClient - Factory func NewStatsdClient(addr string, prefix string) *StatsdClient { // allow %HOST% in the prefix string prefix = strings.Replace(prefix, "%HOST%", Hostname, 1) + rand.Seed(time.Now().UnixNano()) return &StatsdClient{ - addr: addr, - prefix: prefix, - Logger: log.New(os.Stdout, "[StatsdClient] ", log.Ldate|log.Ltime), + addr: addr, + prefix: prefix, + Logger: log.New(os.Stdout, "[StatsdClient] ", log.Ldate|log.Ltime), + SampleRate: 1.0, + TimerSampleRate: 1.0, } } +func (sb *StatsdClient) registerStat() bool { + if sb.SampleRate >= 1.0 { + return true + } + return rand.Float32() < sb.SampleRate +} + +func (sb *StatsdClient) registerTimerStat() bool { + if sb.SampleRate >= 1.0 { + return true + } + return rand.Float32() < sb.TimerSampleRate +} + // String returns the StatsD server address func (c *StatsdClient) String() string { return c.addr @@ -68,6 +88,9 @@ func (c *StatsdClient) Close() error { // Incr - Increment a counter metric. Often used to note a particular event func (c *StatsdClient) Incr(stat string, count int64) error { + if !c.registerStat() { + return nil + } if 0 != count { return c.send(stat, "%d|c", count) } @@ -76,6 +99,9 @@ func (c *StatsdClient) Incr(stat string, count int64) error { // Decr - Decrement a counter metric. Often used to note a particular event func (c *StatsdClient) Decr(stat string, count int64) error { + if !c.registerStat() { + return nil + } if 0 != count { return c.send(stat, "%d|c", -count) } @@ -85,12 +111,18 @@ func (c *StatsdClient) Decr(stat string, count int64) error { // Timing - Track a duration event // the time delta must be given in milliseconds func (c *StatsdClient) Timing(stat string, delta int64) error { + if !c.registerStat() { + return nil + } return c.send(stat, "%d|ms", delta) } // PrecisionTiming - Track a duration event // the time delta has to be a duration func (c *StatsdClient) PrecisionTiming(stat string, delta time.Duration) error { + if !c.registerTimerStat() { + return nil + } return c.send(stat, fmt.Sprintf("%.6f%s|ms", float64(delta)/float64(time.Millisecond), "%d"), 0) } @@ -101,6 +133,9 @@ func (c *StatsdClient) PrecisionTiming(stat string, delta time.Duration) error { // underlying protocol, you can't explicitly set a gauge to a negative number without // first setting it to zero. func (c *StatsdClient) Gauge(stat string, value int64) error { + if !c.registerStat() { + return nil + } if value < 0 { c.send(stat, "%d|g", 0) return c.send(stat, "%d|g", value) @@ -108,8 +143,25 @@ func (c *StatsdClient) Gauge(stat string, value int64) error { return c.send(stat, "%d|g", value) } +// the Buffered client has a GaugeAbsolute which DOES NOT "add" values in the buffer, just +// resets the value to the current value, here in the non-buffered client it is +// the same as a Gauge +func (c *StatsdClient) GaugeAbsolute(stat string, value int64) error { + return c.Gauge(stat, value) +} + +// the Buffered client has a GaugeAbsolute which DOES NOT "add" values in the buffer, just +// resets the value to the current value, here in the non-buffered client it is +// the same as a Gauge +func (c *StatsdClient) GaugeAvg(stat string, value int64) error { + return c.Gauge(stat, value) +} + // GaugeDelta -- Send a change for a gauge func (c *StatsdClient) GaugeDelta(stat string, value int64) error { + if !c.registerStat() { + return nil + } // Gauge Deltas are always sent with a leading '+' or '-'. The '-' takes care of itself but the '+' must added by hand if value < 0 { return c.send(stat, "%d|g", value) @@ -119,6 +171,9 @@ func (c *StatsdClient) GaugeDelta(stat string, value int64) error { // FGauge -- Send a floating point value for a gauge func (c *StatsdClient) FGauge(stat string, value float64) error { + if !c.registerStat() { + return nil + } if value < 0 { c.send(stat, "%d|g", 0) return c.send(stat, "%g|g", value) @@ -128,6 +183,9 @@ func (c *StatsdClient) FGauge(stat string, value float64) error { // FGaugeDelta -- Send a floating point change for a gauge func (c *StatsdClient) FGaugeDelta(stat string, value float64) error { + if !c.registerStat() { + return nil + } if value < 0 { return c.send(stat, "%g|g", value) } @@ -136,17 +194,26 @@ func (c *StatsdClient) FGaugeDelta(stat string, value float64) error { // Absolute - Send absolute-valued metric (not averaged/aggregated) func (c *StatsdClient) Absolute(stat string, value int64) error { - return c.send(stat, "%d|a", value) + if !c.registerStat() { + return nil + } + return c.send(stat, "%d|c", value) } // FAbsolute - Send absolute-valued floating point metric (not averaged/aggregated) func (c *StatsdClient) FAbsolute(stat string, value float64) error { - return c.send(stat, "%g|a", value) + if !c.registerStat() { + return nil + } + return c.send(stat, "%g|c", value) } // Total - Send a metric that is continously increasing, e.g. read operations since boot func (c *StatsdClient) Total(stat string, value int64) error { - return c.send(stat, "%d|t", value) + if !c.registerStat() { + return nil + } + return c.send(stat, "%d|c", value) } // write a UDP packet with the statsd event @@ -171,12 +238,37 @@ func (c *StatsdClient) SendRaw(buffer string) error { } -func (c *StatsdClient) EventStatsdString(e event.Event) (string, error) { +func (c *StatsdClient) EventStatsdString(e event.Event, tick time.Duration, samplerate float32) (string, error) { + var out_str = "" + + for _, stat := range e.Stats(tick) { + str := fmt.Sprintf("%s%s", c.prefix, stat) + + if len(str) > 0 { + if samplerate < 1.0 { + str += fmt.Sprintf("|@%f", samplerate) + } + out_str += str + "\n" + } + } + return out_str, nil + +} + +// special case for timers, if we are a buffered client, +// then we DO NOT add the sample rate to the acctuall "timers" parts of the metric lines +// (lower, upper, median, etc...) just to the "count and count_ps" +func (c *StatsdClient) EventStatsdStringTimerSample(e event.Event, tick time.Duration, samplerate float32) (string, error) { var out_str = "" - for _, stat := range e.Stats() { + for _, stat := range e.Stats(tick) { str := fmt.Sprintf("%s%s", c.prefix, stat) + if len(str) > 0 { + // just count and count ps + if samplerate < 1.0 && (strings.Contains(stat, "count:") || strings.Contains(stat, "count_ps:")) { + str += fmt.Sprintf("|@%f", samplerate) + } out_str += str + "\n" } } @@ -185,11 +277,11 @@ func (c *StatsdClient) EventStatsdString(e event.Event) (string, error) { } // SendEvent - Sends stats from an event object -func (c *StatsdClient) SendEvent(e event.Event) error { +func (c *StatsdClient) SendEvent(e event.Event, tick time.Duration) error { if c.conn == nil { return fmt.Errorf("cannot send stats, not connected to StatsD server") } - for _, stat := range e.Stats() { + for _, stat := range e.Stats(tick) { //fmt.Printf("SENDING EVENT %s%s\n", c.prefix, stat) _, err := fmt.Fprintf(c.conn, "%s%s", c.prefix, stat) if nil != err { diff --git a/echoclient.go b/echoclient.go new file mode 100644 index 0000000..46445c6 --- /dev/null +++ b/echoclient.go @@ -0,0 +1,31 @@ +package statsd + +import ( + "log" + "time" +) + +//impliment a "echo" statsd for debuging statsd +type StatsdEcho struct{} + +func prit(stat string, val interface{}) error { + log.Printf("%s:%v", stat, val) + return nil +} + +func (s StatsdEcho) String() string { return "EchoClient" } +func (s StatsdEcho) CreateSocket() error { return nil } +func (s StatsdEcho) Close() error { return nil } +func (s StatsdEcho) Incr(stat string, count int64) error { return prit(stat, count) } +func (s StatsdEcho) Decr(stat string, count int64) error { return prit(stat, count) } +func (s StatsdEcho) Timing(stat string, count int64) error { return prit(stat, count) } +func (s StatsdEcho) PrecisionTiming(stat string, delta time.Duration) error { return prit(stat, delta) } +func (s StatsdEcho) Gauge(stat string, value int64) error { return prit(stat, value) } +func (s StatsdEcho) GaugeAbsolute(stat string, value int64) error { return prit(stat, value) } +func (s StatsdEcho) GaugeAvg(stat string, value int64) error { return prit(stat, value) } +func (s StatsdEcho) GaugeDelta(stat string, value int64) error { return prit(stat, value) } +func (s StatsdEcho) Absolute(stat string, value int64) error { return prit(stat, value) } +func (s StatsdEcho) Total(stat string, value int64) error { return prit(stat, value) } +func (s StatsdEcho) FGauge(stat string, value float64) error { return prit(stat, value) } +func (s StatsdEcho) FGaugeDelta(stat string, value float64) error { return prit(stat, value) } +func (s StatsdEcho) FAbsolute(stat string, value float64) error { return prit(stat, value) } diff --git a/event/absolute.go b/event/absolute.go index f1aded5..0fe0214 100644 --- a/event/absolute.go +++ b/event/absolute.go @@ -1,19 +1,30 @@ package event -import "fmt" +import ( + "fmt" + "sync" + "time" +) // Absolute is a metric that is not averaged/aggregated. // We keep each value distinct and then we flush them all individually. type Absolute struct { Name string + mu sync.Mutex Values []int64 } +func (e *Absolute) StatClass() string { + return "counter" +} + // Update the event with metrics coming from a new one of the same type and with the same key func (e *Absolute) Update(e2 Event) error { if e.Type() != e2.Type() { return fmt.Errorf("statsd event type conflict: %s vs %s ", e.String(), e2.String()) } + e.mu.Lock() + defer e.mu.Unlock() e.Values = append(e.Values, e2.Payload().([]int64)...) return nil } @@ -29,10 +40,10 @@ func (e Absolute) Payload() interface{} { } // Stats returns an array of StatsD events as they travel over UDP -func (e Absolute) Stats() []string { +func (e Absolute) Stats(tick time.Duration) []string { ret := make([]string, 0, len(e.Values)) for v := range e.Values { - ret = append(ret, fmt.Sprintf("%s:%d|a", e.Name, v)) + ret = append(ret, fmt.Sprintf("%s:%d|c", e.Name, v)) } return ret } diff --git a/event/fabsolute.go b/event/fabsolute.go index b18350c..f93825b 100644 --- a/event/fabsolute.go +++ b/event/fabsolute.go @@ -1,19 +1,30 @@ package event -import "fmt" +import ( + "fmt" + "sync" + "time" +) // Absolute is a metric that is not averaged/aggregated. // We keep each value distinct and then we flush them all individually. type FAbsolute struct { Name string + mu sync.Mutex Values []float64 } +func (e *FAbsolute) StatClass() string { + return "counter" +} + // Update the event with metrics coming from a new one of the same type and with the same key func (e *FAbsolute) Update(e2 Event) error { if e.Type() != e2.Type() { return fmt.Errorf("statsd event type conflict: %s vs %s ", e.String(), e2.String()) } + e.mu.Lock() + defer e.mu.Unlock() e.Values = append(e.Values, e2.Payload().([]float64)...) return nil } @@ -29,10 +40,10 @@ func (e FAbsolute) Payload() interface{} { } // Stats returns an array of StatsD events as they travel over UDP -func (e FAbsolute) Stats() []string { +func (e FAbsolute) Stats(tick time.Duration) []string { ret := make([]string, 0, len(e.Values)) for v := range e.Values { - ret = append(ret, fmt.Sprintf("%s:%g|a", e.Name, v)) + ret = append(ret, fmt.Sprintf("%s:%g|c", e.Name, v)) } return ret } diff --git a/event/fgauge.go b/event/fgauge.go index 69a7340..2859fb3 100644 --- a/event/fgauge.go +++ b/event/fgauge.go @@ -1,6 +1,10 @@ package event -import "fmt" +import ( + "fmt" + "sync" + "time" +) // Gauge - Gauges are a constant data type. They are not subject to averaging, // and they don’t change unless you change them. That is, once you set a gauge value, @@ -8,6 +12,11 @@ import "fmt" type FGauge struct { Name string Value float64 + mu sync.Mutex +} + +func (e *FGauge) StatClass() string { + return "gauge" } // Update the event with metrics coming from a new one of the same type and with the same key @@ -15,6 +24,8 @@ func (e *FGauge) Update(e2 Event) error { if e.Type() != e2.Type() { return fmt.Errorf("statsd event type conflict: %s vs %s ", e.String(), e2.String()) } + e.mu.Lock() + defer e.mu.Unlock() e.Value += e2.Payload().(float64) return nil } @@ -29,7 +40,7 @@ func (e FGauge) Payload() interface{} { } // Stats returns an array of StatsD events as they travel over UDP -func (e FGauge) Stats() []string { +func (e FGauge) Stats(tick time.Duration) []string { if e.Value < 0 { // because a leading '+' or '-' in the value of a gauge denotes a delta, to send // a negative gauge value we first set the gauge absolutely to 0, then send the diff --git a/event/fgaugedelta.go b/event/fgaugedelta.go index 9891299..1f213e2 100644 --- a/event/fgaugedelta.go +++ b/event/fgaugedelta.go @@ -1,6 +1,10 @@ package event -import "fmt" +import ( + "fmt" + "sync" + "time" +) // Gauge - Gauges are a constant data type. They are not subject to averaging, // and they don’t change unless you change them. That is, once you set a gauge value, @@ -8,6 +12,11 @@ import "fmt" type FGaugeDelta struct { Name string Value float64 + mu sync.Mutex +} + +func (e *FGaugeDelta) StatClass() string { + return "gauge" } // Update the event with metrics coming from a new one of the same type and with the same key @@ -15,6 +24,8 @@ func (e *FGaugeDelta) Update(e2 Event) error { if e.Type() != e2.Type() { return fmt.Errorf("statsd event type conflict: %s vs %s ", e.String(), e2.String()) } + e.mu.Lock() + defer e.mu.Unlock() e.Value += e2.Payload().(float64) return nil } @@ -30,7 +41,7 @@ func (e *FGaugeDelta) Reset() { } // Stats returns an array of StatsD events as they travel over UDP -func (e FGaugeDelta) Stats() []string { +func (e FGaugeDelta) Stats(tick time.Duration) []string { if e.Value < 0 { // because a leading '+' or '-' in the value of a gauge denotes a delta, to send // a negative gauge value we first set the gauge absolutely to 0, then send the diff --git a/event/gauge.go b/event/gauge.go index 2c12784..eaa0f68 100644 --- a/event/gauge.go +++ b/event/gauge.go @@ -1,6 +1,10 @@ package event -import "fmt" +import ( + "fmt" + "sync/atomic" + "time" +) // Gauge - Gauges are a constant data type. They are not subject to averaging, // and they don’t change unless you change them. That is, once you set a gauge value, @@ -10,12 +14,16 @@ type Gauge struct { Value int64 } +func (e *Gauge) StatClass() string { + return "gauge" +} + // Update the event with metrics coming from a new one of the same type and with the same key func (e *Gauge) Update(e2 Event) error { if e.Type() != e2.Type() { return fmt.Errorf("statsd event type conflict: %s vs %s ", e.String(), e2.String()) } - e.Value += e2.Payload().(int64) + atomic.AddInt64(&e.Value, e2.Payload().(int64)) return nil } @@ -24,13 +32,12 @@ func (e Gauge) Payload() interface{} { return e.Value } -//Reset the value GAUGES TO NOT RESET func (e *Gauge) Reset() { - + e.Value = 0 } // Stats returns an array of StatsD events as they travel over UDP -func (e Gauge) Stats() []string { +func (e Gauge) Stats(tick time.Duration) []string { if e.Value < 0 { // because a leading '+' or '-' in the value of a gauge denotes a delta, to send // a negative gauge value we first set the gauge absolutely to 0, then send the diff --git a/event/gaugeabsolute.go b/event/gaugeabsolute.go new file mode 100644 index 0000000..d64e100 --- /dev/null +++ b/event/gaugeabsolute.go @@ -0,0 +1,77 @@ +package event + +import ( + "fmt" + "time" +) + +// GaugeAbsolute - Gauges are a constant data type. They are not subject to averaging, +// and they don’t change unless you change them. That is, once you set a gauge value, +// it will be a flat line on the graph until you change it again +type GaugeAbsolute struct { + Name string + Value int64 +} + +func (e *GaugeAbsolute) StatClass() string { + return "gauge" +} + +// Update the event with metrics coming from a new one of the same type and with the same key +func (e *GaugeAbsolute) Update(e2 Event) error { + if e.Type() != e2.Type() { + return fmt.Errorf("statsd event type conflict: %s vs %s ", e.String(), e2.String()) + } + //just RESET the value + e.Value = e2.Payload().(int64) + return nil +} + +// Payload returns the aggregated value for this event +func (e GaugeAbsolute) Payload() interface{} { + return e.Value +} + +//Reset the value GAUGES TO NOT RESET +func (e *GaugeAbsolute) Reset() { + +} + +// Stats returns an array of StatsD events as they travel over UDP +func (e GaugeAbsolute) Stats(tick time.Duration) []string { + if e.Value < 0 { + // because a leading '+' or '-' in the value of a gauge denotes a delta, to send + // a negative gauge value we first set the gauge absolutely to 0, then send the + // negative value as a delta from 0 (that's just how the spec works :-) + return []string{ + fmt.Sprintf("%s:%d|g", e.Name, 0), + fmt.Sprintf("%s:%d|g", e.Name, e.Value), + } + } + return []string{fmt.Sprintf("%s:%d|g", e.Name, e.Value)} +} + +// Key returns the name of this metric +func (e GaugeAbsolute) Key() string { + return e.Name +} + +// SetKey sets the name of this metric +func (e *GaugeAbsolute) SetKey(key string) { + e.Name = key +} + +// Type returns an integer identifier for this type of metric +func (e GaugeAbsolute) Type() int { + return EventGaugeAbsolute +} + +// TypeString returns a name for this type of metric +func (e GaugeAbsolute) TypeString() string { + return "GaugeAbsolute" +} + +// String returns a debug-friendly representation of this metric +func (e GaugeAbsolute) String() string { + return fmt.Sprintf("{Type: %s, Key: %s, Value: %d}", e.TypeString(), e.Name, e.Value) +} diff --git a/event/gaugeavg.go b/event/gaugeavg.go new file mode 100644 index 0000000..88da9be --- /dev/null +++ b/event/gaugeavg.go @@ -0,0 +1,92 @@ +package event + +import ( + "fmt" + "sync/atomic" + "time" +) + +// GaugeAvg - Since this is buffered, we keep a list of the gauge values +// sent, then at send time average them for the "gauge" value to statsd +type GaugeAvg struct { + Name string + Value int64 + Times int64 +} + +func (e *GaugeAvg) StatClass() string { + return "gauge" +} + +// Update the event with metrics coming from a new one of the same type and with the same key +func (e *GaugeAvg) Update(e2 Event) error { + if e.Type() != e2.Type() { + return fmt.Errorf("statsd event type conflict: %s vs %s ", e.String(), e2.String()) + } + //add it to the current + if e.Value == -1 { + e.Value = 0 + } + atomic.AddInt64(&e.Value, e2.Payload().(int64)) + atomic.AddInt64(&e.Times, 1) + return nil +} + +// Payload returns the aggregated value for this event +func (e GaugeAvg) Payload() interface{} { + return e.Value +} + +//this gauge +func (e *GaugeAvg) Reset() { + e.Times = 1 + e.Value = -1 +} + +// Stats returns an array of StatsD events as they travel over UDP +func (e GaugeAvg) Stats(tick time.Duration) []string { + + //return NOTHING if value is -1 + if e.Value == -1 { + return []string{} + } + true_val := e.Value + if e.Times > 0 { + true_val = e.Value / e.Times + } + if true_val < 0 { + // because a leading '+' or '-' in the value of a gauge denotes a delta, to send + // a negative gauge value we first set the gauge absolutely to 0, then send the + // negative value as a delta from 0 (that's just how the spec works :-) + return []string{ + fmt.Sprintf("%s:%d|g", e.Name, 0), + fmt.Sprintf("%s:%d|g", e.Name, true_val), + } + } + return []string{fmt.Sprintf("%s:%d|g", e.Name, true_val)} +} + +// Key returns the name of this metric +func (e GaugeAvg) Key() string { + return e.Name +} + +// SetKey sets the name of this metric +func (e *GaugeAvg) SetKey(key string) { + e.Name = key +} + +// Type returns an integer identifier for this type of metric +func (e GaugeAvg) Type() int { + return EventGaugeAvg +} + +// TypeString returns a name for this type of metric +func (e GaugeAvg) TypeString() string { + return "Gauge" +} + +// String returns a debug-friendly representation of this metric +func (e GaugeAvg) String() string { + return fmt.Sprintf("{Type: %s, Key: %s, Value: %d}", e.TypeString(), e.Name, e.Value) +} diff --git a/event/gaugedelta.go b/event/gaugedelta.go index 9a884d0..c43e033 100644 --- a/event/gaugedelta.go +++ b/event/gaugedelta.go @@ -1,6 +1,10 @@ package event -import "fmt" +import ( + "fmt" + "sync/atomic" + "time" +) // GaugeDelta - Gauges are a constant data type. They are not subject to averaging, // and they don’t change unless you change them. That is, once you set a gauge value, @@ -10,12 +14,16 @@ type GaugeDelta struct { Value int64 } +func (e *GaugeDelta) StatClass() string { + return "gauge" +} + // Update the event with metrics coming from a new one of the same type and with the same key func (e *GaugeDelta) Update(e2 Event) error { if e.Type() != e2.Type() { return fmt.Errorf("statsd event type conflict: %s vs %s ", e.String(), e2.String()) } - e.Value += e2.Payload().(int64) + atomic.AddInt64(&e.Value, e2.Payload().(int64)) return nil } @@ -30,7 +38,7 @@ func (e *GaugeDelta) Reset() { } // Stats returns an array of StatsD events as they travel over UDP -func (e GaugeDelta) Stats() []string { +func (e GaugeDelta) Stats(tick time.Duration) []string { if e.Value < 0 { // because a leading '+' or '-' in the value of a gauge denotes a delta, to send // a negative gauge value we first set the gauge absolutely to 0, then send the diff --git a/event/increment.go b/event/increment.go index 4369b06..5421360 100644 --- a/event/increment.go +++ b/event/increment.go @@ -1,6 +1,10 @@ package event -import "fmt" +import ( + "fmt" + "sync/atomic" + "time" +) // Increment represents a metric whose value is averaged over a minute type Increment struct { @@ -8,12 +12,16 @@ type Increment struct { Value int64 } +func (e *Increment) StatClass() string { + return "counter" +} + // Update the event with metrics coming from a new one of the same type and with the same key func (e *Increment) Update(e2 Event) error { if e.Type() != e2.Type() { return fmt.Errorf("statsd event type conflict: %s vs %s ", e.String(), e2.String()) } - e.Value += e2.Payload().(int64) + atomic.AddInt64(&e.Value, e2.Payload().(int64)) return nil } @@ -28,7 +36,7 @@ func (e Increment) Payload() interface{} { } // Stats returns an array of StatsD events as they travel over UDP -func (e Increment) Stats() []string { +func (e Increment) Stats(tick time.Duration) []string { return []string{fmt.Sprintf("%s:%d|c", e.Name, e.Value)} } diff --git a/event/interface.go b/event/interface.go index 310f45c..d352c35 100644 --- a/event/interface.go +++ b/event/interface.go @@ -1,5 +1,9 @@ package event +import ( + "time" +) + // constant event type identifiers const ( EventIncr = iota @@ -12,11 +16,14 @@ const ( EventFGaugeDelta EventFAbsolute EventPrecisionTiming + EventGaugeAbsolute + EventGaugeAvg ) // Event is an interface to a generic StatsD event, used by the buffered client collator type Event interface { - Stats() []string + //tick duration for those in the buffered that need a "persecond" metrix as well + Stats(tick time.Duration) []string Type() int TypeString() string Payload() interface{} @@ -25,4 +32,5 @@ type Event interface { Key() string SetKey(string) Reset() + StatClass() string // counter, gauge, timer } diff --git a/event/precisiontiming.go b/event/precisiontiming.go index f2ab542..85244c0 100644 --- a/event/precisiontiming.go +++ b/event/precisiontiming.go @@ -2,18 +2,24 @@ package event import ( "fmt" + "sync" "time" ) // PrecisionTiming keeps min/max/avg information about a timer over a certain interval type PrecisionTiming struct { Name string + mu sync.Mutex Min time.Duration Max time.Duration Value time.Duration Count int64 } +func (e *PrecisionTiming) StatClass() string { + return "timer" +} + // NewPrecisionTiming is a factory for a Timing event, setting the Count to 1 to prevent div_by_0 errors func NewPrecisionTiming(k string, delta time.Duration) *PrecisionTiming { return &PrecisionTiming{Name: k, Min: delta, Max: delta, Value: delta, Count: 1} @@ -25,6 +31,8 @@ func (e *PrecisionTiming) Update(e2 Event) error { return fmt.Errorf("statsd event type conflict: %s vs %s ", e.String(), e2.String()) } p := e2.Payload().(PrecisionTiming) + e.mu.Lock() + defer e.mu.Unlock() e.Count += p.Count e.Value += p.Value e.Min = time.Duration(minInt64(int64(e.Min), int64(p.Min))) @@ -46,12 +54,12 @@ func (e PrecisionTiming) Payload() interface{} { } // Stats returns an array of StatsD events as they travel over UDP -func (e PrecisionTiming) Stats() []string { +func (e PrecisionTiming) Stats(tick time.Duration) []string { return []string{ - fmt.Sprintf("%s.count:%d|a", e.Name, int64(e.Count)), - fmt.Sprintf("%s.avg:%.6f|a", e.Name, float64(int64(e.Value)/e.Count)), // make sure e.Count != 0 - fmt.Sprintf("%s.min:%.6f|a", e.Name, float64(e.Min)), - fmt.Sprintf("%s.max:%.6f|a", e.Name, float64(e.Max)), + fmt.Sprintf("%s.count:%d|c", e.Name, int64(e.Count)), + fmt.Sprintf("%s.avg:%.6f|ms", e.Name, float64(int64(e.Value)/e.Count)), // make sure e.Count != 0 + fmt.Sprintf("%s.min:%.6f|ms", e.Name, float64(e.Min)), + fmt.Sprintf("%s.max:%.6f|ms", e.Name, float64(e.Max)), } } diff --git a/event/timing.go b/event/timing.go index 9fae141..7217913 100644 --- a/event/timing.go +++ b/event/timing.go @@ -1,21 +1,48 @@ package event -import "fmt" -import "math" +import ( + "fmt" + "math" + "sort" + "sync" + "time" +) // Timing keeps min/max/avg information about a timer over a certain interval type Timing struct { - Name string - Min int64 - Max int64 - Value int64 - Values []int64 - Count int64 + Name string + + mu sync.Mutex + Min int64 + Max int64 + Value int64 + Values []int64 + Count int64 + Sample float64 + PercentThreshold []float64 +} + +func (e *Timing) StatClass() string { + return "timer" +} + +// for sorting +type statdInt64arr []int64 + +func (a statdInt64arr) Len() int { return len(a) } +func (a statdInt64arr) Swap(i int, j int) { a[i], a[j] = a[j], a[i] } +func (a statdInt64arr) Less(i, j int) bool { return (a[i] - a[j]) < 0 } //this is the sorting statsd uses for its timings + +func round(a float64) float64 { + if a < 0 { + return math.Ceil(a - 0.5) + } + return math.Floor(a + 0.5) } // NewTiming is a factory for a Timing event, setting the Count to 1 to prevent div_by_0 errors -func NewTiming(k string, delta int64) *Timing { - return &Timing{Name: k, Min: delta, Max: delta, Value: delta, Count: 1} +func NewTiming(k string, delta int64, sample float64) *Timing { + return &Timing{Name: k, Min: delta, Max: delta, Value: delta, Count: 1, Sample: 1.0 / sample, PercentThreshold: []float64{0.9, 0.99}} } // Update the event with metrics coming from a new one of the same type and with the same key @@ -23,7 +50,10 @@ func (e *Timing) Update(e2 Event) error { if e.Type() != e2.Type() { return fmt.Errorf("statsd event type conflict: %s vs %s ", e.String(), e2.String()) } + e.mu.Lock() + defer e.mu.Unlock() p := e2.Payload().(map[string]interface{}) + e.Sample += p["sample"].(float64) e.Count += p["cnt"].(int64) e.Value += p["val"].(int64) e.Min = minInt64(e.Min, p["min"].(int64)) @@ -34,6 +64,8 @@ func (e *Timing) Update(e2 Event) error { //Reset the value func (e *Timing) Reset() { + e.mu.Lock() + defer e.mu.Unlock() e.Value = 0 e.Count = 1 e.Min = 0 @@ -44,29 +76,58 @@ func (e *Timing) Reset() { // Payload returns the aggregated value for this event func (e Timing) Payload() interface{} { return map[string]interface{}{ - "min": e.Min, - "max": e.Max, - "val": e.Value, - "cnt": e.Count, - "vals": e.Values, + "min": e.Min, + "sample": e.Sample, + "max": e.Max, + "val": e.Value, + "cnt": e.Count, + "vals": e.Values, } } -// Stats returns an array of StatsD events as they travel over UDP -func (e Timing) Stats() []string { +// Stats returns an array of StatsD events +func (e Timing) Stats(tick time.Duration) []string { + e.mu.Lock() + defer e.mu.Unlock() + std := float64(0) avg := float64(e.Value / e.Count) - for _, v := range e.Values { + cumulativeValues := []int64{e.Min} + + sort.Sort(statdInt64arr(e.Values)) + + for idx, v := range e.Values { std += math.Pow((float64(v) - avg), 2.0) + if idx > 0 { + cumulativeValues = append(cumulativeValues, v+cumulativeValues[idx-1]) + } } - std = math.Sqrt(std / float64(e.Count)) - return []string{ - fmt.Sprintf("%s.count:%d|a", e.Name, int64(e.Count)), - fmt.Sprintf("%s.avg:%d|a", e.Name, int64(avg)), // make sure e.Count != 0 - fmt.Sprintf("%s.min:%d|a", e.Name, e.Min), - fmt.Sprintf("%s.max:%d|a", e.Name, e.Max), - fmt.Sprintf("%s.std:%d|a", e.Name, int64(std)), + + base := []string{ + fmt.Sprintf("%s.count:%d|c", e.Name, e.Count), + fmt.Sprintf("%s.min:%d|ms", e.Name, e.Min), + fmt.Sprintf("%s.max:%d|ms", e.Name, e.Max), + fmt.Sprintf("%s.sum:%d|c", e.Name, int64(e.Value)), } + + if e.Count > 0 { + mid := int(math.Floor(float64(e.Count) / 2.0)) + median := int64(0) + if mid < len(e.Values) { + if math.Mod(float64(mid), 2.0) == 0 { + median = e.Values[mid] + } else if len(e.Values) > 1 { + median = (e.Values[mid-1] + e.Values[mid]) / 2.0 + } + } + std = math.Sqrt(std / float64(e.Count)) + base = append(base, + fmt.Sprintf("%s.median:%d|ms", e.Name, int64(median)), + fmt.Sprintf("%s.std:%d|ms", e.Name, int64(e.Value)), + ) + } + + return base } // Key returns the name of this metric diff --git a/event/total.go b/event/total.go index 23a0385..19b727e 100644 --- a/event/total.go +++ b/event/total.go @@ -1,6 +1,9 @@ package event -import "fmt" +import ( + "fmt" + "time" +) // Total represents a metric that is continously increasing, e.g. read operations since boot type Total struct { @@ -8,6 +11,10 @@ type Total struct { Value int64 } +func (e *Total) StatClass() string { + return "counter" +} + // Update the event with metrics coming from a new one of the same type and with the same key func (e *Total) Update(e2 Event) error { if e.Type() != e2.Type() { @@ -17,10 +24,8 @@ func (e *Total) Update(e2 Event) error { return nil } -//Reset the value -func (e *Total) Reset() { - e.Value = 0 -} +// Never reset this value +func (e *Total) Reset() {} // Payload returns the aggregated value for this event func (e Total) Payload() interface{} { @@ -28,8 +33,8 @@ func (e Total) Payload() interface{} { } // Stats returns an array of StatsD events as they travel over UDP -func (e Total) Stats() []string { - return []string{fmt.Sprintf("%s:%d|t", e.Name, e.Value)} +func (e Total) Stats(tick time.Duration) []string { + return []string{fmt.Sprintf("%s:%d|c", e.Name, e.Value)} } // Key returns the name of this metric diff --git a/interface.go b/interface.go index 4657259..74c3e5e 100644 --- a/interface.go +++ b/interface.go @@ -4,6 +4,8 @@ import "time" // Statsd is an interface to a StatsD client (buffered/unbuffered) type Statsd interface { + String() string + CreateSocket() error Close() error Incr(stat string, count int64) error @@ -11,6 +13,8 @@ type Statsd interface { Timing(stat string, delta int64) error PrecisionTiming(stat string, delta time.Duration) error Gauge(stat string, value int64) error + GaugeAbsolute(stat string, value int64) error + GaugeAvg(stat string, value int64) error GaugeDelta(stat string, value int64) error Absolute(stat string, value int64) error Total(stat string, value int64) error diff --git a/noopclient.go b/noopclient.go index a06a62d..b01bdf7 100644 --- a/noopclient.go +++ b/noopclient.go @@ -1,9 +1,13 @@ package statsd +import ( + "time" +) //impliment a "noop" statsd in case there is no statsd type StatsdNoop struct{} +func (s StatsdNoop) String() string { return "NoopClient" } func (s StatsdNoop) CreateSocket() error { return nil } func (s StatsdNoop) Close() error { return nil } func (s StatsdNoop) Incr(stat string, count int64) error { return nil } @@ -11,10 +15,11 @@ func (s StatsdNoop) Decr(stat string, count int64) error { re func (s StatsdNoop) Timing(stat string, count int64) error { return nil } func (s StatsdNoop) PrecisionTiming(stat string, delta time.Duration) error { return nil } func (s StatsdNoop) Gauge(stat string, value int64) error { return nil } +func (s StatsdNoop) GaugeAbsolute(stat string, value int64) error { return nil } +func (s StatsdNoop) GaugeAvg(stat string, value int64) error { return nil } func (s StatsdNoop) GaugeDelta(stat string, value int64) error { return nil } func (s StatsdNoop) Absolute(stat string, value int64) error { return nil } func (s StatsdNoop) Total(stat string, value int64) error { return nil } func (s StatsdNoop) FGauge(stat string, value float64) error { return nil } func (s StatsdNoop) FGaugeDelta(stat string, value float64) error { return nil } func (s StatsdNoop) FAbsolute(stat string, value float64) error { return nil } -