123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234 |
- package statsd
- import (
- "log"
- "os"
- "time"
- "github.com/quipo/statsd/event"
- )
- type closeRequest struct {
- reply chan error
- }
- type StatsdBuffer struct {
- statsd Statsd
- flushInterval time.Duration
- eventChannel chan event.Event
- events map[string]event.Event
- closeChannel chan closeRequest
- Logger Logger
- Verbose bool
- }
- func NewStatsdBuffer(interval time.Duration, client Statsd) *StatsdBuffer {
- sb := &StatsdBuffer{
- flushInterval: interval,
- statsd: client,
- eventChannel: make(chan event.Event, 100),
- events: make(map[string]event.Event),
- closeChannel: make(chan closeRequest),
- Logger: log.New(os.Stdout, "[BufferedStatsdClient] ", log.Ldate|log.Ltime),
- Verbose: true,
- }
- go sb.collector()
- return sb
- }
- func (sb *StatsdBuffer) CreateSocket() error {
- return sb.statsd.CreateSocket()
- }
- func (sb *StatsdBuffer) CreateTCPSocket() error {
- return sb.statsd.CreateTCPSocket()
- }
- func (sb *StatsdBuffer) Incr(stat string, count int64) error {
- if 0 != count {
- sb.eventChannel <- &event.Increment{Name: stat, Value: count}
- }
- return nil
- }
- func (sb *StatsdBuffer) Decr(stat string, count int64) error {
- if 0 != count {
- sb.eventChannel <- &event.Increment{Name: stat, Value: -count}
- }
- return nil
- }
- func (sb *StatsdBuffer) Timing(stat string, delta int64) error {
- sb.eventChannel <- event.NewTiming(stat, delta)
- return nil
- }
- func (sb *StatsdBuffer) PrecisionTiming(stat string, delta time.Duration) error {
- sb.eventChannel <- event.NewPrecisionTiming(stat, delta)
- return nil
- }
- func (sb *StatsdBuffer) Gauge(stat string, value int64) error {
- sb.eventChannel <- &event.Gauge{Name: stat, Value: value}
- return nil
- }
- func (sb *StatsdBuffer) GaugeDelta(stat string, value int64) error {
- sb.eventChannel <- &event.GaugeDelta{Name: stat, Value: value}
- return nil
- }
- func (sb *StatsdBuffer) FGauge(stat string, value float64) error {
- sb.eventChannel <- &event.FGauge{Name: stat, Value: value}
- return nil
- }
- func (sb *StatsdBuffer) FGaugeDelta(stat string, value float64) error {
- sb.eventChannel <- &event.FGaugeDelta{Name: stat, Value: value}
- return nil
- }
- func (sb *StatsdBuffer) Absolute(stat string, value int64) error {
- sb.eventChannel <- &event.Absolute{Name: stat, Values: []int64{value}}
- return nil
- }
- func (sb *StatsdBuffer) FAbsolute(stat string, value float64) error {
- sb.eventChannel <- &event.FAbsolute{Name: stat, Values: []float64{value}}
- return nil
- }
- func (sb *StatsdBuffer) Total(stat string, value int64) error {
- sb.eventChannel <- &event.Total{Name: stat, Value: value}
- return nil
- }
- func (sb *StatsdBuffer) SendEvents(events map[string]event.Event) error {
- for _, e := range events {
- sb.eventChannel <- e
- }
- return nil
- }
- func initMemoisedKeyMap() func(typ string, key string) string {
- m := make(map[string]map[string]string)
- return func(typ string, key string) string {
- if _, ok := m[typ]; !ok {
- m[typ] = make(map[string]string)
- }
- k, ok := m[typ][key]
- if !ok {
- m[typ][key] = typ + "|" + key
- return m[typ][key]
- }
- return k
- }
- }
- func (sb *StatsdBuffer) collector() {
-
- defer func(sb *StatsdBuffer) {
- if r := recover(); r != nil {
- sb.Logger.Println("Caught panic, flushing stats before throwing the panic again")
- err := sb.flush()
- if nil != err {
- sb.Logger.Println("Error flushing stats", err.Error())
- }
- panic(r)
- }
- }(sb)
- keyFor := initMemoisedKeyMap()
- ticker := time.NewTicker(sb.flushInterval)
- for {
- select {
- case <-ticker.C:
-
- err := sb.flush()
- if nil != err {
- sb.Logger.Println("Error flushing stats", err.Error())
- }
- case e := <-sb.eventChannel:
-
-
- k := keyFor(e.TypeString(), e.Key())
- if e2, ok := sb.events[k]; ok {
-
- err := e2.Update(e)
- if nil != err {
- sb.Logger.Println("Error updating stats", err.Error())
- }
- sb.events[k] = e2
- } else {
-
- sb.events[k] = e
- }
- case c := <-sb.closeChannel:
- if sb.Verbose {
- sb.Logger.Println("Asked to terminate. Flushing stats before returning.")
- }
- c.reply <- sb.flush()
- return
- }
- }
- }
- func (sb *StatsdBuffer) Close() (err error) {
-
- req := closeRequest{reply: make(chan error)}
- sb.closeChannel <- req
-
- err = <-req.reply
-
- err2 := sb.statsd.Close()
- if err != nil {
- return err
- }
- return err2
- }
- func (sb *StatsdBuffer) flush() (err error) {
- n := len(sb.events)
- if n == 0 {
- return nil
- }
- if err := sb.statsd.SendEvents(sb.events); err != nil {
- sb.Logger.Println(err)
- return err
- }
- sb.events = make(map[string]event.Event)
- return nil
- }
|