1
0

bufferedclient.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. package statsd
  2. import (
  3. "log"
  4. "os"
  5. "time"
  6. "github.com/quipo/statsd/event"
  7. )
  8. // request to close the buffered statsd collector
  9. type closeRequest struct {
  10. reply chan error
  11. }
  12. // StatsdBuffer is a client library to aggregate events in memory before
  13. // flushing aggregates to StatsD, useful if the frequency of events is extremely high
  14. // and sampling is not desirable
  15. type StatsdBuffer struct {
  16. statsd Statsd
  17. flushInterval time.Duration
  18. eventChannel chan event.Event
  19. events map[string]event.Event
  20. closeChannel chan closeRequest
  21. Logger Logger
  22. Verbose bool
  23. }
  24. // NewStatsdBuffer Factory
  25. func NewStatsdBuffer(interval time.Duration, client Statsd) *StatsdBuffer {
  26. sb := &StatsdBuffer{
  27. flushInterval: interval,
  28. statsd: client,
  29. eventChannel: make(chan event.Event, 100),
  30. events: make(map[string]event.Event),
  31. closeChannel: make(chan closeRequest),
  32. Logger: log.New(os.Stdout, "[BufferedStatsdClient] ", log.Ldate|log.Ltime),
  33. Verbose: true,
  34. }
  35. go sb.collector()
  36. return sb
  37. }
  38. // CreateSocket creates a UDP connection to a StatsD server
  39. func (sb *StatsdBuffer) CreateSocket() error {
  40. return sb.statsd.CreateSocket()
  41. }
  42. // CreateTCPSocket creates a TCP connection to a StatsD server
  43. func (sb *StatsdBuffer) CreateTCPSocket() error {
  44. return sb.statsd.CreateTCPSocket()
  45. }
  46. // Incr - Increment a counter metric. Often used to note a particular event
  47. func (sb *StatsdBuffer) Incr(stat string, count int64) error {
  48. if 0 != count {
  49. sb.eventChannel <- &event.Increment{Name: stat, Value: count}
  50. }
  51. return nil
  52. }
  53. // Decr - Decrement a counter metric. Often used to note a particular event
  54. func (sb *StatsdBuffer) Decr(stat string, count int64) error {
  55. if 0 != count {
  56. sb.eventChannel <- &event.Increment{Name: stat, Value: -count}
  57. }
  58. return nil
  59. }
  60. // Timing - Track a duration event
  61. func (sb *StatsdBuffer) Timing(stat string, delta int64) error {
  62. sb.eventChannel <- event.NewTiming(stat, delta)
  63. return nil
  64. }
  65. // PrecisionTiming - Track a duration event
  66. // the time delta has to be a duration
  67. func (sb *StatsdBuffer) PrecisionTiming(stat string, delta time.Duration) error {
  68. sb.eventChannel <- event.NewPrecisionTiming(stat, delta)
  69. return nil
  70. }
  71. // Gauge - Gauges are a constant data type. They are not subject to averaging,
  72. // and they don’t change unless you change them. That is, once you set a gauge value,
  73. // it will be a flat line on the graph until you change it again
  74. func (sb *StatsdBuffer) Gauge(stat string, value int64) error {
  75. sb.eventChannel <- &event.Gauge{Name: stat, Value: value}
  76. return nil
  77. }
  78. // GaugeDelta records a delta from the previous value (as int64)
  79. func (sb *StatsdBuffer) GaugeDelta(stat string, value int64) error {
  80. sb.eventChannel <- &event.GaugeDelta{Name: stat, Value: value}
  81. return nil
  82. }
  83. // FGauge is a Gauge working with float64 values
  84. func (sb *StatsdBuffer) FGauge(stat string, value float64) error {
  85. sb.eventChannel <- &event.FGauge{Name: stat, Value: value}
  86. return nil
  87. }
  88. // FGaugeDelta records a delta from the previous value (as float64)
  89. func (sb *StatsdBuffer) FGaugeDelta(stat string, value float64) error {
  90. sb.eventChannel <- &event.FGaugeDelta{Name: stat, Value: value}
  91. return nil
  92. }
  93. // Absolute - Send absolute-valued metric (not averaged/aggregated)
  94. func (sb *StatsdBuffer) Absolute(stat string, value int64) error {
  95. sb.eventChannel <- &event.Absolute{Name: stat, Values: []int64{value}}
  96. return nil
  97. }
  98. // FAbsolute - Send absolute-valued metric (not averaged/aggregated)
  99. func (sb *StatsdBuffer) FAbsolute(stat string, value float64) error {
  100. sb.eventChannel <- &event.FAbsolute{Name: stat, Values: []float64{value}}
  101. return nil
  102. }
  103. // Total - Send a metric that is continously increasing, e.g. read operations since boot
  104. func (sb *StatsdBuffer) Total(stat string, value int64) error {
  105. sb.eventChannel <- &event.Total{Name: stat, Value: value}
  106. return nil
  107. }
  108. // SendEvents - Sends stats from all the event objects.
  109. func (sb *StatsdBuffer) SendEvents(events map[string]event.Event) error {
  110. for _, e := range events {
  111. sb.eventChannel <- e
  112. }
  113. return nil
  114. }
  115. // avoid too many allocations by memoizing the "type|key" pair for an event
  116. // @see https://gobyexample.com/closures
  117. func initMemoisedKeyMap() func(typ string, key string) string {
  118. m := make(map[string]map[string]string)
  119. return func(typ string, key string) string {
  120. if _, ok := m[typ]; !ok {
  121. m[typ] = make(map[string]string)
  122. }
  123. k, ok := m[typ][key]
  124. if !ok {
  125. m[typ][key] = typ + "|" + key
  126. return m[typ][key]
  127. }
  128. return k // memoized value
  129. }
  130. }
  131. // handle flushes and updates in one single thread (instead of locking the events map)
  132. func (sb *StatsdBuffer) collector() {
  133. // on a panic event, flush all the pending stats before panicking
  134. defer func(sb *StatsdBuffer) {
  135. if r := recover(); r != nil {
  136. sb.Logger.Println("Caught panic, flushing stats before throwing the panic again")
  137. err := sb.flush()
  138. if nil != err {
  139. sb.Logger.Println("Error flushing stats", err.Error())
  140. }
  141. panic(r)
  142. }
  143. }(sb)
  144. keyFor := initMemoisedKeyMap() // avoid allocations (https://gobyexample.com/closures)
  145. ticker := time.NewTicker(sb.flushInterval)
  146. for {
  147. select {
  148. case <-ticker.C:
  149. //sb.Logger.Println("Flushing stats")
  150. err := sb.flush()
  151. if nil != err {
  152. sb.Logger.Println("Error flushing stats", err.Error())
  153. }
  154. case e := <-sb.eventChannel:
  155. //sb.Logger.Println("Received ", e.String())
  156. // issue #28: unable to use Incr and PrecisionTiming with the same key (also fixed #27)
  157. k := keyFor(e.TypeString(), e.Key()) // avoid allocations
  158. if e2, ok := sb.events[k]; ok {
  159. //sb.Logger.Println("Updating existing event")
  160. err := e2.Update(e)
  161. if nil != err {
  162. sb.Logger.Println("Error updating stats", err.Error())
  163. }
  164. sb.events[k] = e2
  165. } else {
  166. //sb.Logger.Println("Adding new event")
  167. sb.events[k] = e
  168. }
  169. case c := <-sb.closeChannel:
  170. if sb.Verbose {
  171. sb.Logger.Println("Asked to terminate. Flushing stats before returning.")
  172. }
  173. c.reply <- sb.flush()
  174. return
  175. }
  176. }
  177. }
  178. // Close sends a close event to the collector asking to stop & flush pending stats
  179. // and closes the statsd client
  180. func (sb *StatsdBuffer) Close() (err error) {
  181. // 1. send a close event to the collector
  182. req := closeRequest{reply: make(chan error)}
  183. sb.closeChannel <- req
  184. // 2. wait for the collector to drain the queue and respond
  185. err = <-req.reply
  186. // 3. close the statsd client
  187. err2 := sb.statsd.Close()
  188. if err != nil {
  189. return err
  190. }
  191. return err2
  192. }
  193. // send the events to StatsD and reset them.
  194. // This function is NOT thread-safe, so it must only be invoked synchronously
  195. // from within the collector() goroutine
  196. func (sb *StatsdBuffer) flush() (err error) {
  197. n := len(sb.events)
  198. if n == 0 {
  199. return nil
  200. }
  201. if err := sb.statsd.SendEvents(sb.events); err != nil {
  202. sb.Logger.Println(err)
  203. return err
  204. }
  205. sb.events = make(map[string]event.Event)
  206. return nil
  207. }