diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..337ad02 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +statsd-github.iml +.idea diff --git a/bufferedclient.go b/bufferedclient.go index 4572452..5170057 100644 --- a/bufferedclient.go +++ b/bufferedclient.go @@ -1,12 +1,13 @@ package statsd import ( + "expvar" "log" + "math/rand" "os" + "github.com/wyndhblb/statsd/event" "strings" "time" - - "github.com/quipo/statsd/event" ) // request to close the buffered statsd collector @@ -14,40 +15,90 @@ type closeRequest struct { reply chan error } +var ( + ExpMapCounts *expvar.Map + ExpMapGauges *expvar.Map + ExpMapTimers *expvar.Map +) + +func init() { + + ExpMapCounts = expvar.NewMap("counters") + ExpMapGauges = expvar.NewMap("gauges") + ExpMapTimers = expvar.NewMap("timers") +} + // StatsdBuffer is a client library to aggregate events in memory before // flushing aggregates to StatsD, useful if the frequency of events is extremely high // and sampling is not desirable type StatsdBuffer struct { - statsd *StatsdClient - flushInterval time.Duration - eventChannel chan event.Event - events map[string]event.Event - closeChannel chan closeRequest - Logger *log.Logger + statsd *StatsdClient + name string + flushInterval time.Duration + eventChannel chan event.Event + events map[string]event.Event + closeChannel chan closeRequest + Logger *log.Logger + RetainKeys bool + AddExpvar bool // add stats to expvar list of vars + ReCycleConnection bool //close the Statsd connection each time after send (helps with picking up new hosts) + BufferLength int //buffer length before we send a multi line stat + SampleRate float32 + TimerSampleRate float32 } // NewStatsdBuffer Factory -func NewStatsdBuffer(interval time.Duration, client *StatsdClient) *StatsdBuffer { +func NewStatsdBuffer(name string, interval time.Duration, client *StatsdClient) *StatsdBuffer { sb := &StatsdBuffer{ - flushInterval: interval, - statsd: client, - eventChannel: make(chan event.Event, 100), - events: make(map[string]event.Event, 0), - closeChannel: make(chan closeRequest, 0), - Logger: log.New(os.Stdout, "[BufferedStatsdClient] ", log.Ldate|log.Ltime), + flushInterval: interval, + name: name, + statsd: client, + eventChannel: make(chan event.Event, 100), + events: make(map[string]event.Event, 0), + closeChannel: make(chan closeRequest, 0), + Logger: log.New(os.Stdout, "[BufferedStatsdClient] ", log.Ldate|log.Ltime), + RetainKeys: false, + AddExpvar: true, + ReCycleConnection: true, + BufferLength: 512, + SampleRate: 1.0, + TimerSampleRate: 1.0, } + rand.Seed(time.Now().UnixNano()) go sb.collector() return sb } +func (sb *StatsdBuffer) String() string { + return sb.statsd.String() +} + // CreateSocket creates a UDP connection to a StatsD server func (sb *StatsdBuffer) CreateSocket() error { return sb.statsd.CreateSocket() } +func (sb *StatsdBuffer) registerStat() bool { + if sb.SampleRate >= 1.0 { + return true + } + return rand.Float32() < sb.SampleRate +} + +func (sb *StatsdBuffer) registerTimerStat(samplerate float32) bool { + if samplerate >= 1.0 { + return true + } + return rand.Float32() < samplerate +} + // Incr - Increment a counter metric. Often used to note a particular event func (sb *StatsdBuffer) Incr(stat string, count int64) error { + if !sb.registerStat() { + return nil + } if 0 != count { + //ct := int64(float32(count) / sb.SampleRate) sb.eventChannel <- &event.Increment{Name: stat, Value: count} } return nil @@ -55,7 +106,11 @@ func (sb *StatsdBuffer) Incr(stat string, count int64) error { // Decr - Decrement a counter metric. Often used to note a particular event func (sb *StatsdBuffer) Decr(stat string, count int64) error { + if !sb.registerStat() { + return nil + } if 0 != count { + //ct := int64(float32(count) / sb.SampleRate) sb.eventChannel <- &event.Increment{Name: stat, Value: -count} } return nil @@ -63,13 +118,38 @@ func (sb *StatsdBuffer) Decr(stat string, count int64) error { // Timing - Track a duration event func (sb *StatsdBuffer) Timing(stat string, delta int64) error { - sb.eventChannel <- event.NewTiming(stat, delta) + if !sb.registerTimerStat(sb.TimerSampleRate) { + return nil + } + sb.eventChannel <- event.NewTiming(stat, delta, float64(sb.TimerSampleRate)) + return nil +} + +// TimingSampling - Track a duration event at a sampling rate +func (sb *StatsdBuffer) TimingSampling(stat string, delta int64, samplerate float32) error { + if !sb.registerTimerStat(samplerate) { + return nil + } + sb.eventChannel <- event.NewTiming(stat, delta, float64(sb.TimerSampleRate)) + return nil +} + +// PrecisionTiming - Track a duration event +// the time delta has to be a duration +func (sb *StatsdBuffer) PrecisionTimingSampling(stat string, delta time.Duration, samplerate float32) error { + if !sb.registerTimerStat(samplerate) { + return nil + } + sb.eventChannel <- event.NewPrecisionTiming(stat, time.Duration(float64(delta)/float64(time.Millisecond))) return nil } // PrecisionTiming - Track a duration event // the time delta has to be a duration func (sb *StatsdBuffer) PrecisionTiming(stat string, delta time.Duration) error { + if !sb.registerTimerStat(sb.TimerSampleRate) { + return nil + } sb.eventChannel <- event.NewPrecisionTiming(stat, time.Duration(float64(delta)/float64(time.Millisecond))) return nil } @@ -78,43 +158,179 @@ func (sb *StatsdBuffer) PrecisionTiming(stat string, delta time.Duration) error // and they don’t change unless you change them. That is, once you set a gauge value, // it will be a flat line on the graph until you change it again func (sb *StatsdBuffer) Gauge(stat string, value int64) error { + if !sb.registerStat() { + return nil + } sb.eventChannel <- &event.Gauge{Name: stat, Value: value} return nil } +// GaugeAbsolute which DOES NOT "add" values in the buffer, just +// resets the value to the current value, here in the non-buffered client it is +// the same as a Gauge +func (sb *StatsdBuffer) GaugeAbsolute(stat string, value int64) error { + if !sb.registerStat() { + return nil + } + sb.eventChannel <- &event.GaugeAbsolute{Name: stat, Value: value} + return nil +} + +func (sb *StatsdBuffer) GaugeAvg(stat string, value int64) error { + if !sb.registerStat() { + return nil + } + sb.eventChannel <- &event.GaugeAvg{Name: stat, Value: value} + return nil +} + func (sb *StatsdBuffer) GaugeDelta(stat string, value int64) error { + if !sb.registerStat() { + return nil + } sb.eventChannel <- &event.GaugeDelta{Name: stat, Value: value} return nil } func (sb *StatsdBuffer) FGauge(stat string, value float64) error { + if !sb.registerStat() { + return nil + } sb.eventChannel <- &event.FGauge{Name: stat, Value: value} return nil } func (sb *StatsdBuffer) FGaugeDelta(stat string, value float64) error { + if !sb.registerStat() { + return nil + } sb.eventChannel <- &event.FGaugeDelta{Name: stat, Value: value} return nil } // Absolute - Send absolute-valued metric (not averaged/aggregated) func (sb *StatsdBuffer) Absolute(stat string, value int64) error { + if !sb.registerStat() { + return nil + } sb.eventChannel <- &event.Absolute{Name: stat, Values: []int64{value}} return nil } // FAbsolute - Send absolute-valued metric (not averaged/aggregated) func (sb *StatsdBuffer) FAbsolute(stat string, value float64) error { + if !sb.registerStat() { + return nil + } sb.eventChannel <- &event.FAbsolute{Name: stat, Values: []float64{value}} return nil } // Total - Send a metric that is continously increasing, e.g. read operations since boot func (sb *StatsdBuffer) Total(stat string, value int64) error { + if !sb.registerStat() { + return nil + } sb.eventChannel <- &event.Total{Name: stat, Value: value} return nil } +func (sb *StatsdBuffer) expCollect(ev event.Event) { + payload := ev.Payload() + k := ev.Key() + + var use_map *expvar.Map + + switch ev.StatClass() { + case "counter": + use_map = ExpMapCounts + switch payload.(type) { + case int64: + use_map.Add(k, payload.(int64)) + + case float64: + use_map.AddFloat(k, payload.(float64)) + + case []int64: + p_len := len(payload.([]int64)) + if p_len > 0 { + use_map.Add(k, payload.([]int64)[p_len-1]) + } + case []float64: + p_len := len(payload.([]float64)) + if p_len > 0 { + use_map.AddFloat(k, payload.([]float64)[p_len-1]) + } + case map[string]interface{}: + use_map.Add(k, payload.(map[string]interface{})["val"].(int64)) + + } + case "gauge": + use_map = ExpMapGauges + switch payload.(type) { + case int64: + v := new(expvar.Int) + v.Set(payload.(int64)) + use_map.Set(k, v) + case float64: + v := new(expvar.Float) + v.Set(payload.(float64)) + use_map.Set(k, v) + + case []int64: + p_len := len(payload.([]int64)) + if p_len > 0 { + v := new(expvar.Int) + v.Set(payload.([]int64)[p_len-1]) + use_map.Set(k, v) + } + case []float64: + p_len := len(payload.([]float64)) + if p_len > 0 { + v := new(expvar.Float) + v.Set(payload.([]float64)[p_len-1]) + use_map.Set(k, v) + } + case map[string]interface{}: + v := new(expvar.Int) + v.Set(payload.(map[string]interface{})["val"].(int64)) + use_map.Set(k, v) + } + case "timer": + use_map = ExpMapTimers + switch payload.(type) { + case int64: + v := new(expvar.Int) + v.Set(payload.(int64)) + use_map.Set(k, v) + case float64: + v := new(expvar.Float) + v.Set(payload.(float64)) + use_map.Set(k, v) + + case []int64: + p_len := len(payload.([]int64)) + if p_len > 0 { + v := new(expvar.Int) + v.Set(payload.([]int64)[p_len-1]) + use_map.Set(k, v) + } + case []float64: + p_len := len(payload.([]float64)) + if p_len > 0 { + v := new(expvar.Float) + v.Set(payload.([]float64)[p_len-1]) + use_map.Set(k, v) + } + case map[string]interface{}: + v := new(expvar.Int) + v.Set(payload.(map[string]interface{})["val"].(int64)) + use_map.Set(k, v) + } + } + +} + // handle flushes and updates in one single thread (instead of locking the events map) func (sb *StatsdBuffer) collector() { // on a panic event, flush all the pending stats before panicking @@ -147,6 +363,8 @@ func (sb *StatsdBuffer) collector() { //sb.Logger.Println("Adding new event") sb.events[k] = e } + sb.expCollect(sb.events[k]) + case c := <-sb.closeChannel: sb.Logger.Println("Asked to terminate. Flushing stats before returning.") c.reply <- sb.flush() @@ -180,16 +398,48 @@ func (sb *StatsdBuffer) flush() (err error) { return nil } err = sb.statsd.CreateSocket() + + if sb.ReCycleConnection { + defer sb.statsd.Close() + } + if nil != err { sb.Logger.Println("Error establishing UDP connection for sending statsd events:", err) } + + //buffer stats in BufferLength blocks save some frames + var out_string = "" + for k, v := range sb.events { - err := sb.statsd.SendEvent(v) + var str string + switch v.(type) { + case *event.Timing, *event.PrecisionTiming: + str, err = sb.statsd.EventStatsdStringTimerSample(v, sb.flushInterval, sb.TimerSampleRate) + + default: + str, err = sb.statsd.EventStatsdString(v, sb.flushInterval, sb.SampleRate) + } if nil != err { sb.Logger.Println(err) + continue + } + if len([]byte(out_string+str)) >= sb.BufferLength { + sb.statsd.SendRaw(out_string) + out_string = "" } - //sb.Logger.Println("Sent", v.String()) - delete(sb.events, k) + + out_string += str + + // if we are "retaining" the names (so that we can send "0"s do NOT delete + if sb.RetainKeys { + sb.events[k].Reset() + } else { + delete(sb.events, k) + } + } + + if len(out_string) > 0 { + sb.statsd.SendRaw(out_string) } return nil diff --git a/client.go b/client.go index a8037e7..9e49b44 100644 --- a/client.go +++ b/client.go @@ -3,12 +3,13 @@ package statsd import ( "fmt" "log" + "math/rand" "net" "os" "strings" "time" - "github.com/quipo/statsd/event" + "github.com/wyndhblb/statsd/event" ) // note Hostname is exported so clients can set it to something different than the default @@ -23,23 +24,42 @@ func init() { // StatsdClient is a client library to send events to StatsD type StatsdClient struct { - conn net.Conn - addr string - prefix string - Logger *log.Logger + conn net.Conn + addr string + prefix string + Logger *log.Logger + SampleRate float32 + TimerSampleRate float32 } // NewStatsdClient - Factory func NewStatsdClient(addr string, prefix string) *StatsdClient { // allow %HOST% in the prefix string prefix = strings.Replace(prefix, "%HOST%", Hostname, 1) + rand.Seed(time.Now().UnixNano()) return &StatsdClient{ - addr: addr, - prefix: prefix, - Logger: log.New(os.Stdout, "[StatsdClient] ", log.Ldate|log.Ltime), + addr: addr, + prefix: prefix, + Logger: log.New(os.Stdout, "[StatsdClient] ", log.Ldate|log.Ltime), + SampleRate: 1.0, + TimerSampleRate: 1.0, } } +func (sb *StatsdClient) registerStat() bool { + if sb.SampleRate >= 1.0 { + return true + } + return rand.Float32() < sb.SampleRate +} + +func (sb *StatsdClient) registerTimerStat() bool { + if sb.SampleRate >= 1.0 { + return true + } + return rand.Float32() < sb.TimerSampleRate +} + // String returns the StatsD server address func (c *StatsdClient) String() string { return c.addr @@ -68,6 +88,9 @@ func (c *StatsdClient) Close() error { // Incr - Increment a counter metric. Often used to note a particular event func (c *StatsdClient) Incr(stat string, count int64) error { + if !c.registerStat() { + return nil + } if 0 != count { return c.send(stat, "%d|c", count) } @@ -76,6 +99,9 @@ func (c *StatsdClient) Incr(stat string, count int64) error { // Decr - Decrement a counter metric. Often used to note a particular event func (c *StatsdClient) Decr(stat string, count int64) error { + if !c.registerStat() { + return nil + } if 0 != count { return c.send(stat, "%d|c", -count) } @@ -85,12 +111,18 @@ func (c *StatsdClient) Decr(stat string, count int64) error { // Timing - Track a duration event // the time delta must be given in milliseconds func (c *StatsdClient) Timing(stat string, delta int64) error { + if !c.registerStat() { + return nil + } return c.send(stat, "%d|ms", delta) } // PrecisionTiming - Track a duration event // the time delta has to be a duration func (c *StatsdClient) PrecisionTiming(stat string, delta time.Duration) error { + if !c.registerTimerStat() { + return nil + } return c.send(stat, fmt.Sprintf("%.6f%s|ms", float64(delta)/float64(time.Millisecond), "%d"), 0) } @@ -101,6 +133,9 @@ func (c *StatsdClient) PrecisionTiming(stat string, delta time.Duration) error { // underlying protocol, you can't explicitly set a gauge to a negative number without // first setting it to zero. func (c *StatsdClient) Gauge(stat string, value int64) error { + if !c.registerStat() { + return nil + } if value < 0 { c.send(stat, "%d|g", 0) return c.send(stat, "%d|g", value) @@ -108,8 +143,25 @@ func (c *StatsdClient) Gauge(stat string, value int64) error { return c.send(stat, "%d|g", value) } +// the Buffered client has a GaugeAbsolute which DOES NOT "add" values in the buffer, just +// resets the value to the current value, here in the non-buffered client it is +// the same as a Gauge +func (c *StatsdClient) GaugeAbsolute(stat string, value int64) error { + return c.Gauge(stat, value) +} + +// the Buffered client has a GaugeAbsolute which DOES NOT "add" values in the buffer, just +// resets the value to the current value, here in the non-buffered client it is +// the same as a Gauge +func (c *StatsdClient) GaugeAvg(stat string, value int64) error { + return c.Gauge(stat, value) +} + // GaugeDelta -- Send a change for a gauge func (c *StatsdClient) GaugeDelta(stat string, value int64) error { + if !c.registerStat() { + return nil + } // Gauge Deltas are always sent with a leading '+' or '-'. The '-' takes care of itself but the '+' must added by hand if value < 0 { return c.send(stat, "%d|g", value) @@ -119,6 +171,9 @@ func (c *StatsdClient) GaugeDelta(stat string, value int64) error { // FGauge -- Send a floating point value for a gauge func (c *StatsdClient) FGauge(stat string, value float64) error { + if !c.registerStat() { + return nil + } if value < 0 { c.send(stat, "%d|g", 0) return c.send(stat, "%g|g", value) @@ -128,6 +183,9 @@ func (c *StatsdClient) FGauge(stat string, value float64) error { // FGaugeDelta -- Send a floating point change for a gauge func (c *StatsdClient) FGaugeDelta(stat string, value float64) error { + if !c.registerStat() { + return nil + } if value < 0 { return c.send(stat, "%g|g", value) } @@ -136,17 +194,26 @@ func (c *StatsdClient) FGaugeDelta(stat string, value float64) error { // Absolute - Send absolute-valued metric (not averaged/aggregated) func (c *StatsdClient) Absolute(stat string, value int64) error { - return c.send(stat, "%d|a", value) + if !c.registerStat() { + return nil + } + return c.send(stat, "%d|c", value) } // FAbsolute - Send absolute-valued floating point metric (not averaged/aggregated) func (c *StatsdClient) FAbsolute(stat string, value float64) error { - return c.send(stat, "%g|a", value) + if !c.registerStat() { + return nil + } + return c.send(stat, "%g|c", value) } // Total - Send a metric that is continously increasing, e.g. read operations since boot func (c *StatsdClient) Total(stat string, value int64) error { - return c.send(stat, "%d|t", value) + if !c.registerStat() { + return nil + } + return c.send(stat, "%d|c", value) } // write a UDP packet with the statsd event @@ -160,12 +227,61 @@ func (c *StatsdClient) send(stat string, format string, value interface{}) error return err } +func (c *StatsdClient) SendRaw(buffer string) error { + if c.conn == nil { + return fmt.Errorf("not connected") + } + //log.Printf("SENDING EVENT %s", buffer) + + _, err := fmt.Fprintf(c.conn, buffer) + return err + +} + +func (c *StatsdClient) EventStatsdString(e event.Event, tick time.Duration, samplerate float32) (string, error) { + var out_str = "" + + for _, stat := range e.Stats(tick) { + str := fmt.Sprintf("%s%s", c.prefix, stat) + + if len(str) > 0 { + if samplerate < 1.0 { + str += fmt.Sprintf("|@%f", samplerate) + } + out_str += str + "\n" + } + } + return out_str, nil + +} + +// special case for timers, if we are a buffered client, +// then we DO NOT add the sample rate to the acctuall "timers" parts of the metric lines +// (lower, upper, median, etc...) just to the "count and count_ps" +func (c *StatsdClient) EventStatsdStringTimerSample(e event.Event, tick time.Duration, samplerate float32) (string, error) { + var out_str = "" + + for _, stat := range e.Stats(tick) { + str := fmt.Sprintf("%s%s", c.prefix, stat) + + if len(str) > 0 { + // just count and count ps + if samplerate < 1.0 && (strings.Contains(stat, "count:") || strings.Contains(stat, "count_ps:")) { + str += fmt.Sprintf("|@%f", samplerate) + } + out_str += str + "\n" + } + } + return out_str, nil + +} + // SendEvent - Sends stats from an event object -func (c *StatsdClient) SendEvent(e event.Event) error { +func (c *StatsdClient) SendEvent(e event.Event, tick time.Duration) error { if c.conn == nil { return fmt.Errorf("cannot send stats, not connected to StatsD server") } - for _, stat := range e.Stats() { + for _, stat := range e.Stats(tick) { //fmt.Printf("SENDING EVENT %s%s\n", c.prefix, stat) _, err := fmt.Fprintf(c.conn, "%s%s", c.prefix, stat) if nil != err { diff --git a/echoclient.go b/echoclient.go new file mode 100644 index 0000000..46445c6 --- /dev/null +++ b/echoclient.go @@ -0,0 +1,31 @@ +package statsd + +import ( + "log" + "time" +) + +//impliment a "echo" statsd for debuging statsd +type StatsdEcho struct{} + +func prit(stat string, val interface{}) error { + log.Printf("%s:%v", stat, val) + return nil +} + +func (s StatsdEcho) String() string { return "EchoClient" } +func (s StatsdEcho) CreateSocket() error { return nil } +func (s StatsdEcho) Close() error { return nil } +func (s StatsdEcho) Incr(stat string, count int64) error { return prit(stat, count) } +func (s StatsdEcho) Decr(stat string, count int64) error { return prit(stat, count) } +func (s StatsdEcho) Timing(stat string, count int64) error { return prit(stat, count) } +func (s StatsdEcho) PrecisionTiming(stat string, delta time.Duration) error { return prit(stat, delta) } +func (s StatsdEcho) Gauge(stat string, value int64) error { return prit(stat, value) } +func (s StatsdEcho) GaugeAbsolute(stat string, value int64) error { return prit(stat, value) } +func (s StatsdEcho) GaugeAvg(stat string, value int64) error { return prit(stat, value) } +func (s StatsdEcho) GaugeDelta(stat string, value int64) error { return prit(stat, value) } +func (s StatsdEcho) Absolute(stat string, value int64) error { return prit(stat, value) } +func (s StatsdEcho) Total(stat string, value int64) error { return prit(stat, value) } +func (s StatsdEcho) FGauge(stat string, value float64) error { return prit(stat, value) } +func (s StatsdEcho) FGaugeDelta(stat string, value float64) error { return prit(stat, value) } +func (s StatsdEcho) FAbsolute(stat string, value float64) error { return prit(stat, value) } diff --git a/event/absolute.go b/event/absolute.go index 67be7f8..0fe0214 100644 --- a/event/absolute.go +++ b/event/absolute.go @@ -1,33 +1,49 @@ package event -import "fmt" +import ( + "fmt" + "sync" + "time" +) // Absolute is a metric that is not averaged/aggregated. // We keep each value distinct and then we flush them all individually. type Absolute struct { Name string + mu sync.Mutex Values []int64 } +func (e *Absolute) StatClass() string { + return "counter" +} + // Update the event with metrics coming from a new one of the same type and with the same key func (e *Absolute) Update(e2 Event) error { if e.Type() != e2.Type() { return fmt.Errorf("statsd event type conflict: %s vs %s ", e.String(), e2.String()) } + e.mu.Lock() + defer e.mu.Unlock() e.Values = append(e.Values, e2.Payload().([]int64)...) return nil } +//Reset the value +func (e *Absolute) Reset() { + e.Values = []int64{} +} + // Payload returns the aggregated value for this event func (e Absolute) Payload() interface{} { return e.Values } // Stats returns an array of StatsD events as they travel over UDP -func (e Absolute) Stats() []string { +func (e Absolute) Stats(tick time.Duration) []string { ret := make([]string, 0, len(e.Values)) for v := range e.Values { - ret = append(ret, fmt.Sprintf("%s:%d|a", e.Name, v)) + ret = append(ret, fmt.Sprintf("%s:%d|c", e.Name, v)) } return ret } diff --git a/event/fabsolute.go b/event/fabsolute.go index ec13011..f93825b 100644 --- a/event/fabsolute.go +++ b/event/fabsolute.go @@ -1,33 +1,49 @@ package event -import "fmt" +import ( + "fmt" + "sync" + "time" +) // Absolute is a metric that is not averaged/aggregated. // We keep each value distinct and then we flush them all individually. type FAbsolute struct { Name string + mu sync.Mutex Values []float64 } +func (e *FAbsolute) StatClass() string { + return "counter" +} + // Update the event with metrics coming from a new one of the same type and with the same key func (e *FAbsolute) Update(e2 Event) error { if e.Type() != e2.Type() { return fmt.Errorf("statsd event type conflict: %s vs %s ", e.String(), e2.String()) } + e.mu.Lock() + defer e.mu.Unlock() e.Values = append(e.Values, e2.Payload().([]float64)...) return nil } +//Reset the value +func (e *FAbsolute) Reset() { + e.Values = []float64{} +} + // Payload returns the aggregated value for this event func (e FAbsolute) Payload() interface{} { return e.Values } // Stats returns an array of StatsD events as they travel over UDP -func (e FAbsolute) Stats() []string { +func (e FAbsolute) Stats(tick time.Duration) []string { ret := make([]string, 0, len(e.Values)) for v := range e.Values { - ret = append(ret, fmt.Sprintf("%s:%g|a", e.Name, v)) + ret = append(ret, fmt.Sprintf("%s:%g|c", e.Name, v)) } return ret } diff --git a/event/fgauge.go b/event/fgauge.go index 463c3a4..2859fb3 100644 --- a/event/fgauge.go +++ b/event/fgauge.go @@ -1,6 +1,10 @@ package event -import "fmt" +import ( + "fmt" + "sync" + "time" +) // Gauge - Gauges are a constant data type. They are not subject to averaging, // and they don’t change unless you change them. That is, once you set a gauge value, @@ -8,6 +12,11 @@ import "fmt" type FGauge struct { Name string Value float64 + mu sync.Mutex +} + +func (e *FGauge) StatClass() string { + return "gauge" } // Update the event with metrics coming from a new one of the same type and with the same key @@ -15,17 +24,23 @@ func (e *FGauge) Update(e2 Event) error { if e.Type() != e2.Type() { return fmt.Errorf("statsd event type conflict: %s vs %s ", e.String(), e2.String()) } + e.mu.Lock() + defer e.mu.Unlock() e.Value += e2.Payload().(float64) return nil } +//Reset the value GAUGES TO NOT RESET +func (e *FGauge) Reset() { +} + // Payload returns the aggregated value for this event func (e FGauge) Payload() interface{} { return e.Value } // Stats returns an array of StatsD events as they travel over UDP -func (e FGauge) Stats() []string { +func (e FGauge) Stats(tick time.Duration) []string { if e.Value < 0 { // because a leading '+' or '-' in the value of a gauge denotes a delta, to send // a negative gauge value we first set the gauge absolutely to 0, then send the diff --git a/event/fgaugedelta.go b/event/fgaugedelta.go index 232098a..1f213e2 100644 --- a/event/fgaugedelta.go +++ b/event/fgaugedelta.go @@ -1,6 +1,10 @@ package event -import "fmt" +import ( + "fmt" + "sync" + "time" +) // Gauge - Gauges are a constant data type. They are not subject to averaging, // and they don’t change unless you change them. That is, once you set a gauge value, @@ -8,6 +12,11 @@ import "fmt" type FGaugeDelta struct { Name string Value float64 + mu sync.Mutex +} + +func (e *FGaugeDelta) StatClass() string { + return "gauge" } // Update the event with metrics coming from a new one of the same type and with the same key @@ -15,6 +24,8 @@ func (e *FGaugeDelta) Update(e2 Event) error { if e.Type() != e2.Type() { return fmt.Errorf("statsd event type conflict: %s vs %s ", e.String(), e2.String()) } + e.mu.Lock() + defer e.mu.Unlock() e.Value += e2.Payload().(float64) return nil } @@ -24,8 +35,13 @@ func (e FGaugeDelta) Payload() interface{} { return e.Value } +//Reset the value +func (e *FGaugeDelta) Reset() { + e.Value = 0.0 +} + // Stats returns an array of StatsD events as they travel over UDP -func (e FGaugeDelta) Stats() []string { +func (e FGaugeDelta) Stats(tick time.Duration) []string { if e.Value < 0 { // because a leading '+' or '-' in the value of a gauge denotes a delta, to send // a negative gauge value we first set the gauge absolutely to 0, then send the diff --git a/event/gauge.go b/event/gauge.go index 0a36905..eaa0f68 100644 --- a/event/gauge.go +++ b/event/gauge.go @@ -1,6 +1,10 @@ package event -import "fmt" +import ( + "fmt" + "sync/atomic" + "time" +) // Gauge - Gauges are a constant data type. They are not subject to averaging, // and they don’t change unless you change them. That is, once you set a gauge value, @@ -10,12 +14,16 @@ type Gauge struct { Value int64 } +func (e *Gauge) StatClass() string { + return "gauge" +} + // Update the event with metrics coming from a new one of the same type and with the same key func (e *Gauge) Update(e2 Event) error { if e.Type() != e2.Type() { return fmt.Errorf("statsd event type conflict: %s vs %s ", e.String(), e2.String()) } - e.Value += e2.Payload().(int64) + atomic.AddInt64(&e.Value, e2.Payload().(int64)) return nil } @@ -24,8 +32,12 @@ func (e Gauge) Payload() interface{} { return e.Value } +func (e *Gauge) Reset() { + e.Value = 0 +} + // Stats returns an array of StatsD events as they travel over UDP -func (e Gauge) Stats() []string { +func (e Gauge) Stats(tick time.Duration) []string { if e.Value < 0 { // because a leading '+' or '-' in the value of a gauge denotes a delta, to send // a negative gauge value we first set the gauge absolutely to 0, then send the diff --git a/event/gaugeabsolute.go b/event/gaugeabsolute.go new file mode 100644 index 0000000..d64e100 --- /dev/null +++ b/event/gaugeabsolute.go @@ -0,0 +1,77 @@ +package event + +import ( + "fmt" + "time" +) + +// GaugeAbsolute - Gauges are a constant data type. They are not subject to averaging, +// and they don’t change unless you change them. That is, once you set a gauge value, +// it will be a flat line on the graph until you change it again +type GaugeAbsolute struct { + Name string + Value int64 +} + +func (e *GaugeAbsolute) StatClass() string { + return "gauge" +} + +// Update the event with metrics coming from a new one of the same type and with the same key +func (e *GaugeAbsolute) Update(e2 Event) error { + if e.Type() != e2.Type() { + return fmt.Errorf("statsd event type conflict: %s vs %s ", e.String(), e2.String()) + } + //just RESET the value + e.Value = e2.Payload().(int64) + return nil +} + +// Payload returns the aggregated value for this event +func (e GaugeAbsolute) Payload() interface{} { + return e.Value +} + +//Reset the value GAUGES TO NOT RESET +func (e *GaugeAbsolute) Reset() { + +} + +// Stats returns an array of StatsD events as they travel over UDP +func (e GaugeAbsolute) Stats(tick time.Duration) []string { + if e.Value < 0 { + // because a leading '+' or '-' in the value of a gauge denotes a delta, to send + // a negative gauge value we first set the gauge absolutely to 0, then send the + // negative value as a delta from 0 (that's just how the spec works :-) + return []string{ + fmt.Sprintf("%s:%d|g", e.Name, 0), + fmt.Sprintf("%s:%d|g", e.Name, e.Value), + } + } + return []string{fmt.Sprintf("%s:%d|g", e.Name, e.Value)} +} + +// Key returns the name of this metric +func (e GaugeAbsolute) Key() string { + return e.Name +} + +// SetKey sets the name of this metric +func (e *GaugeAbsolute) SetKey(key string) { + e.Name = key +} + +// Type returns an integer identifier for this type of metric +func (e GaugeAbsolute) Type() int { + return EventGaugeAbsolute +} + +// TypeString returns a name for this type of metric +func (e GaugeAbsolute) TypeString() string { + return "GaugeAbsolute" +} + +// String returns a debug-friendly representation of this metric +func (e GaugeAbsolute) String() string { + return fmt.Sprintf("{Type: %s, Key: %s, Value: %d}", e.TypeString(), e.Name, e.Value) +} diff --git a/event/gaugeavg.go b/event/gaugeavg.go new file mode 100644 index 0000000..88da9be --- /dev/null +++ b/event/gaugeavg.go @@ -0,0 +1,92 @@ +package event + +import ( + "fmt" + "sync/atomic" + "time" +) + +// GaugeAvg - Since this is buffered, we keep a list of the gauge values +// sent, then at send time average them for the "gauge" value to statsd +type GaugeAvg struct { + Name string + Value int64 + Times int64 +} + +func (e *GaugeAvg) StatClass() string { + return "gauge" +} + +// Update the event with metrics coming from a new one of the same type and with the same key +func (e *GaugeAvg) Update(e2 Event) error { + if e.Type() != e2.Type() { + return fmt.Errorf("statsd event type conflict: %s vs %s ", e.String(), e2.String()) + } + //add it to the current + if e.Value == -1 { + e.Value = 0 + } + atomic.AddInt64(&e.Value, e2.Payload().(int64)) + atomic.AddInt64(&e.Times, 1) + return nil +} + +// Payload returns the aggregated value for this event +func (e GaugeAvg) Payload() interface{} { + return e.Value +} + +//this gauge +func (e *GaugeAvg) Reset() { + e.Times = 1 + e.Value = -1 +} + +// Stats returns an array of StatsD events as they travel over UDP +func (e GaugeAvg) Stats(tick time.Duration) []string { + + //return NOTHING if value is -1 + if e.Value == -1 { + return []string{} + } + true_val := e.Value + if e.Times > 0 { + true_val = e.Value / e.Times + } + if true_val < 0 { + // because a leading '+' or '-' in the value of a gauge denotes a delta, to send + // a negative gauge value we first set the gauge absolutely to 0, then send the + // negative value as a delta from 0 (that's just how the spec works :-) + return []string{ + fmt.Sprintf("%s:%d|g", e.Name, 0), + fmt.Sprintf("%s:%d|g", e.Name, true_val), + } + } + return []string{fmt.Sprintf("%s:%d|g", e.Name, true_val)} +} + +// Key returns the name of this metric +func (e GaugeAvg) Key() string { + return e.Name +} + +// SetKey sets the name of this metric +func (e *GaugeAvg) SetKey(key string) { + e.Name = key +} + +// Type returns an integer identifier for this type of metric +func (e GaugeAvg) Type() int { + return EventGaugeAvg +} + +// TypeString returns a name for this type of metric +func (e GaugeAvg) TypeString() string { + return "Gauge" +} + +// String returns a debug-friendly representation of this metric +func (e GaugeAvg) String() string { + return fmt.Sprintf("{Type: %s, Key: %s, Value: %d}", e.TypeString(), e.Name, e.Value) +} diff --git a/event/gaugedelta.go b/event/gaugedelta.go index 2a5ea7b..c43e033 100644 --- a/event/gaugedelta.go +++ b/event/gaugedelta.go @@ -1,6 +1,10 @@ package event -import "fmt" +import ( + "fmt" + "sync/atomic" + "time" +) // GaugeDelta - Gauges are a constant data type. They are not subject to averaging, // and they don’t change unless you change them. That is, once you set a gauge value, @@ -10,12 +14,16 @@ type GaugeDelta struct { Value int64 } +func (e *GaugeDelta) StatClass() string { + return "gauge" +} + // Update the event with metrics coming from a new one of the same type and with the same key func (e *GaugeDelta) Update(e2 Event) error { if e.Type() != e2.Type() { return fmt.Errorf("statsd event type conflict: %s vs %s ", e.String(), e2.String()) } - e.Value += e2.Payload().(int64) + atomic.AddInt64(&e.Value, e2.Payload().(int64)) return nil } @@ -24,8 +32,13 @@ func (e GaugeDelta) Payload() interface{} { return e.Value } +//Reset the value +func (e *GaugeDelta) Reset() { + e.Value = 0 +} + // Stats returns an array of StatsD events as they travel over UDP -func (e GaugeDelta) Stats() []string { +func (e GaugeDelta) Stats(tick time.Duration) []string { if e.Value < 0 { // because a leading '+' or '-' in the value of a gauge denotes a delta, to send // a negative gauge value we first set the gauge absolutely to 0, then send the diff --git a/event/increment.go b/event/increment.go index e617083..5421360 100644 --- a/event/increment.go +++ b/event/increment.go @@ -1,6 +1,10 @@ package event -import "fmt" +import ( + "fmt" + "sync/atomic" + "time" +) // Increment represents a metric whose value is averaged over a minute type Increment struct { @@ -8,22 +12,31 @@ type Increment struct { Value int64 } +func (e *Increment) StatClass() string { + return "counter" +} + // Update the event with metrics coming from a new one of the same type and with the same key func (e *Increment) Update(e2 Event) error { if e.Type() != e2.Type() { return fmt.Errorf("statsd event type conflict: %s vs %s ", e.String(), e2.String()) } - e.Value += e2.Payload().(int64) + atomic.AddInt64(&e.Value, e2.Payload().(int64)) return nil } +//Reset the value +func (e *Increment) Reset() { + e.Value = 0 +} + // Payload returns the aggregated value for this event func (e Increment) Payload() interface{} { return e.Value } // Stats returns an array of StatsD events as they travel over UDP -func (e Increment) Stats() []string { +func (e Increment) Stats(tick time.Duration) []string { return []string{fmt.Sprintf("%s:%d|c", e.Name, e.Value)} } diff --git a/event/interface.go b/event/interface.go index f174855..d352c35 100644 --- a/event/interface.go +++ b/event/interface.go @@ -1,5 +1,9 @@ package event +import ( + "time" +) + // constant event type identifiers const ( EventIncr = iota @@ -12,11 +16,14 @@ const ( EventFGaugeDelta EventFAbsolute EventPrecisionTiming + EventGaugeAbsolute + EventGaugeAvg ) // Event is an interface to a generic StatsD event, used by the buffered client collator type Event interface { - Stats() []string + //tick duration for those in the buffered that need a "persecond" metrix as well + Stats(tick time.Duration) []string Type() int TypeString() string Payload() interface{} @@ -24,4 +31,6 @@ type Event interface { String() string Key() string SetKey(string) + Reset() + StatClass() string // counter, gauge, timer } diff --git a/event/precisiontiming.go b/event/precisiontiming.go index 587cd85..85244c0 100644 --- a/event/precisiontiming.go +++ b/event/precisiontiming.go @@ -2,18 +2,24 @@ package event import ( "fmt" + "sync" "time" ) // PrecisionTiming keeps min/max/avg information about a timer over a certain interval type PrecisionTiming struct { Name string + mu sync.Mutex Min time.Duration Max time.Duration Value time.Duration Count int64 } +func (e *PrecisionTiming) StatClass() string { + return "timer" +} + // NewPrecisionTiming is a factory for a Timing event, setting the Count to 1 to prevent div_by_0 errors func NewPrecisionTiming(k string, delta time.Duration) *PrecisionTiming { return &PrecisionTiming{Name: k, Min: delta, Max: delta, Value: delta, Count: 1} @@ -25,6 +31,8 @@ func (e *PrecisionTiming) Update(e2 Event) error { return fmt.Errorf("statsd event type conflict: %s vs %s ", e.String(), e2.String()) } p := e2.Payload().(PrecisionTiming) + e.mu.Lock() + defer e.mu.Unlock() e.Count += p.Count e.Value += p.Value e.Min = time.Duration(minInt64(int64(e.Min), int64(p.Min))) @@ -32,17 +40,26 @@ func (e *PrecisionTiming) Update(e2 Event) error { return nil } +//Reset the value +func (e *PrecisionTiming) Reset() { + e.Value = 0 + e.Count = 1 + e.Min = time.Duration(0) + e.Max = time.Duration(0) +} + // Payload returns the aggregated value for this event func (e PrecisionTiming) Payload() interface{} { return e } // Stats returns an array of StatsD events as they travel over UDP -func (e PrecisionTiming) Stats() []string { +func (e PrecisionTiming) Stats(tick time.Duration) []string { return []string{ - fmt.Sprintf("%s.avg:%.6f|a", e.Name, float64(int64(e.Value)/e.Count)), // make sure e.Count != 0 - fmt.Sprintf("%s.min:%.6f|a", e.Name, e.Min), - fmt.Sprintf("%s.max:%.6f|a", e.Name, e.Max), + fmt.Sprintf("%s.count:%d|c", e.Name, int64(e.Count)), + fmt.Sprintf("%s.avg:%.6f|ms", e.Name, float64(int64(e.Value)/e.Count)), // make sure e.Count != 0 + fmt.Sprintf("%s.min:%.6f|ms", e.Name, float64(e.Min)), + fmt.Sprintf("%s.max:%.6f|ms", e.Name, float64(e.Max)), } } diff --git a/event/timing.go b/event/timing.go index 273a1ce..7217913 100644 --- a/event/timing.go +++ b/event/timing.go @@ -1,19 +1,48 @@ package event -import "fmt" +import ( + "fmt" + "math" + "sort" + "sync" + "time" +) // Timing keeps min/max/avg information about a timer over a certain interval type Timing struct { - Name string - Min int64 - Max int64 - Value int64 - Count int64 + Name string + + mu sync.Mutex + Min int64 + Max int64 + Value int64 + Values []int64 + Count int64 + Sample float64 + PercentThreshold []float64 +} + +func (e *Timing) StatClass() string { + return "timer" +} + +// for sorting +type statdInt64arr []int64 + +func (a statdInt64arr) Len() int { return len(a) } +func (a statdInt64arr) Swap(i int, j int) { a[i], a[j] = a[j], a[i] } +func (a statdInt64arr) Less(i, j int) bool { return (a[i] - a[j]) < 0 } //this is the sorting statsd uses for its timings + +func round(a float64) float64 { + if a < 0 { + return math.Ceil(a - 0.5) + } + return math.Floor(a + 0.5) } // NewTiming is a factory for a Timing event, setting the Count to 1 to prevent div_by_0 errors -func NewTiming(k string, delta int64) *Timing { - return &Timing{Name: k, Min: delta, Max: delta, Value: delta, Count: 1} +func NewTiming(k string, delta int64, sample float64) *Timing { + return &Timing{Name: k, Min: delta, Max: delta, Value: delta, Count: 1, Sample: 1.0 / sample, PercentThreshold: []float64{0.9, 0.99}} } // Update the event with metrics coming from a new one of the same type and with the same key @@ -21,31 +50,84 @@ func (e *Timing) Update(e2 Event) error { if e.Type() != e2.Type() { return fmt.Errorf("statsd event type conflict: %s vs %s ", e.String(), e2.String()) } - p := e2.Payload().(map[string]int64) - e.Count += p["cnt"] - e.Value += p["val"] - e.Min = minInt64(e.Min, p["min"]) - e.Max = maxInt64(e.Max, p["max"]) + e.mu.Lock() + defer e.mu.Unlock() + p := e2.Payload().(map[string]interface{}) + e.Sample += p["sample"].(float64) + e.Count += p["cnt"].(int64) + e.Value += p["val"].(int64) + e.Min = minInt64(e.Min, p["min"].(int64)) + e.Max = maxInt64(e.Max, p["max"].(int64)) + e.Values = append(e.Values, p["val"].(int64)) return nil } +//Reset the value +func (e *Timing) Reset() { + e.mu.Lock() + defer e.mu.Unlock() + e.Value = 0 + e.Count = 1 + e.Min = 0 + e.Max = 0 + e.Values = []int64{} +} + // Payload returns the aggregated value for this event func (e Timing) Payload() interface{} { - return map[string]int64{ - "min": e.Min, - "max": e.Max, - "val": e.Value, - "cnt": e.Count, + return map[string]interface{}{ + "min": e.Min, + "sample": e.Sample, + "max": e.Max, + "val": e.Value, + "cnt": e.Count, + "vals": e.Values, } } -// Stats returns an array of StatsD events as they travel over UDP -func (e Timing) Stats() []string { - return []string{ - fmt.Sprintf("%s.avg:%d|a", e.Name, int64(e.Value/e.Count)), // make sure e.Count != 0 - fmt.Sprintf("%s.min:%d|a", e.Name, e.Min), - fmt.Sprintf("%s.max:%d|a", e.Name, e.Max), +// Stats returns an array of StatsD events +func (e Timing) Stats(tick time.Duration) []string { + e.mu.Lock() + defer e.mu.Unlock() + + std := float64(0) + avg := float64(e.Value / e.Count) + cumulativeValues := []int64{e.Min} + + sort.Sort(statdInt64arr(e.Values)) + + for idx, v := range e.Values { + std += math.Pow((float64(v) - avg), 2.0) + if idx > 0 { + cumulativeValues = append(cumulativeValues, v+cumulativeValues[idx-1]) + } } + + base := []string{ + fmt.Sprintf("%s.count:%d|c", e.Name, e.Count), + fmt.Sprintf("%s.min:%d|ms", e.Name, e.Min), + fmt.Sprintf("%s.max:%d|ms", e.Name, e.Max), + fmt.Sprintf("%s.sum:%d|c", e.Name, int64(e.Value)), + } + + if e.Count > 0 { + mid := int(math.Floor(float64(e.Count) / 2.0)) + median := int64(0) + if mid < len(e.Values) { + if math.Mod(float64(mid), 2.0) == 0 { + median = e.Values[mid] + } else if len(e.Values) > 1 { + median = (e.Values[mid-1] + e.Values[mid]) / 2.0 + } + } + std = math.Sqrt(std / float64(e.Count)) + base = append(base, + fmt.Sprintf("%s.median:%d|ms", e.Name, int64(median)), + fmt.Sprintf("%s.std:%d|ms", e.Name, int64(e.Value)), + ) + } + + return base } // Key returns the name of this metric diff --git a/event/total.go b/event/total.go index 6c3ca3e..19b727e 100644 --- a/event/total.go +++ b/event/total.go @@ -1,6 +1,9 @@ package event -import "fmt" +import ( + "fmt" + "time" +) // Total represents a metric that is continously increasing, e.g. read operations since boot type Total struct { @@ -8,6 +11,10 @@ type Total struct { Value int64 } +func (e *Total) StatClass() string { + return "counter" +} + // Update the event with metrics coming from a new one of the same type and with the same key func (e *Total) Update(e2 Event) error { if e.Type() != e2.Type() { @@ -17,14 +24,17 @@ func (e *Total) Update(e2 Event) error { return nil } +// Never reset this value +func (e *Total) Reset() {} + // Payload returns the aggregated value for this event func (e Total) Payload() interface{} { return e.Value } // Stats returns an array of StatsD events as they travel over UDP -func (e Total) Stats() []string { - return []string{fmt.Sprintf("%s:%d|t", e.Name, e.Value)} +func (e Total) Stats(tick time.Duration) []string { + return []string{fmt.Sprintf("%s:%d|c", e.Name, e.Value)} } // Key returns the name of this metric diff --git a/interface.go b/interface.go index 4657259..74c3e5e 100644 --- a/interface.go +++ b/interface.go @@ -4,6 +4,8 @@ import "time" // Statsd is an interface to a StatsD client (buffered/unbuffered) type Statsd interface { + String() string + CreateSocket() error Close() error Incr(stat string, count int64) error @@ -11,6 +13,8 @@ type Statsd interface { Timing(stat string, delta int64) error PrecisionTiming(stat string, delta time.Duration) error Gauge(stat string, value int64) error + GaugeAbsolute(stat string, value int64) error + GaugeAvg(stat string, value int64) error GaugeDelta(stat string, value int64) error Absolute(stat string, value int64) error Total(stat string, value int64) error diff --git a/noopclient.go b/noopclient.go new file mode 100644 index 0000000..b01bdf7 --- /dev/null +++ b/noopclient.go @@ -0,0 +1,25 @@ +package statsd + +import ( + "time" +) + +//impliment a "noop" statsd in case there is no statsd +type StatsdNoop struct{} + +func (s StatsdNoop) String() string { return "NoopClient" } +func (s StatsdNoop) CreateSocket() error { return nil } +func (s StatsdNoop) Close() error { return nil } +func (s StatsdNoop) Incr(stat string, count int64) error { return nil } +func (s StatsdNoop) Decr(stat string, count int64) error { return nil } +func (s StatsdNoop) Timing(stat string, count int64) error { return nil } +func (s StatsdNoop) PrecisionTiming(stat string, delta time.Duration) error { return nil } +func (s StatsdNoop) Gauge(stat string, value int64) error { return nil } +func (s StatsdNoop) GaugeAbsolute(stat string, value int64) error { return nil } +func (s StatsdNoop) GaugeAvg(stat string, value int64) error { return nil } +func (s StatsdNoop) GaugeDelta(stat string, value int64) error { return nil } +func (s StatsdNoop) Absolute(stat string, value int64) error { return nil } +func (s StatsdNoop) Total(stat string, value int64) error { return nil } +func (s StatsdNoop) FGauge(stat string, value float64) error { return nil } +func (s StatsdNoop) FGaugeDelta(stat string, value float64) error { return nil } +func (s StatsdNoop) FAbsolute(stat string, value float64) error { return nil }