client.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. package statsd
  2. import (
  3. "errors"
  4. "fmt"
  5. "log"
  6. "math/rand"
  7. "net"
  8. "os"
  9. "strings"
  10. "time"
  11. "github.com/quipo/statsd/event"
  12. )
  13. // Logger interface compatible with log.Logger
  14. type Logger interface {
  15. Println(v ...interface{})
  16. }
  17. // UDPPayloadSize is the number of bytes to send at one go through the udp socket.
  18. // SendEvents will try to pack as many events into one udp packet.
  19. // Change this value as per network capabilities
  20. // For example to change to 16KB
  21. // import "github.com/quipo/statsd"
  22. // func init() {
  23. // statsd.UDPPayloadSize = 16 * 1024
  24. // }
  25. var UDPPayloadSize = 512
  26. // Hostname is exported so clients can set it to something different than the default
  27. var Hostname string
  28. var errNotConnected = fmt.Errorf("cannot send stats, not connected to StatsD server")
  29. // errors
  30. var (
  31. ErrInvalidCount = errors.New("count is less than 0")
  32. ErrInvalidSampleRate = errors.New("sample rate is larger than 1 or less then 0")
  33. )
  34. func init() {
  35. if host, err := os.Hostname(); nil == err {
  36. Hostname = host
  37. }
  38. }
  39. type socketType string
  40. const (
  41. udpSocket socketType = "udp"
  42. tcpSocket socketType = "tcp"
  43. )
  44. // StatsdClient is a client library to send events to StatsD
  45. type StatsdClient struct {
  46. conn net.Conn
  47. addr string
  48. prefix string
  49. sockType socketType
  50. Logger Logger
  51. }
  52. // NewStatsdClient - Factory
  53. func NewStatsdClient(addr string, prefix string) *StatsdClient {
  54. // allow %HOST% in the prefix string
  55. prefix = strings.Replace(prefix, "%HOST%", Hostname, 1)
  56. return &StatsdClient{
  57. addr: addr,
  58. prefix: prefix,
  59. Logger: log.New(os.Stdout, "[StatsdClient] ", log.Ldate|log.Ltime),
  60. }
  61. }
  62. // String returns the StatsD server address
  63. func (c *StatsdClient) String() string {
  64. return c.addr
  65. }
  66. // CreateSocket creates a UDP connection to a StatsD server
  67. func (c *StatsdClient) CreateSocket() error {
  68. conn, err := net.DialTimeout(string(udpSocket), c.addr, 5*time.Second)
  69. if err != nil {
  70. return err
  71. }
  72. c.conn = conn
  73. c.sockType = udpSocket
  74. return nil
  75. }
  76. // CreateTCPSocket creates a TCP connection to a StatsD server
  77. func (c *StatsdClient) CreateTCPSocket() error {
  78. conn, err := net.DialTimeout(string(tcpSocket), c.addr, 5*time.Second)
  79. if err != nil {
  80. return err
  81. }
  82. c.conn = conn
  83. c.sockType = tcpSocket
  84. return nil
  85. }
  86. // Close the UDP connection
  87. func (c *StatsdClient) Close() error {
  88. if nil == c.conn {
  89. return nil
  90. }
  91. return c.conn.Close()
  92. }
  93. // See statsd data types here: http://statsd.readthedocs.org/en/latest/types.html
  94. // or also https://github.com/b/statsd_spec
  95. // Incr - Increment a counter metric. Often used to note a particular event
  96. func (c *StatsdClient) Incr(stat string, count int64) error {
  97. return c.IncrWithSampling(stat, count, 1)
  98. }
  99. // IncrWithSampling - Increment a counter metric with sampling between 0 and 1
  100. func (c *StatsdClient) IncrWithSampling(stat string, count int64, sampleRate float32) error {
  101. if err := checkSampleRate(sampleRate); err != nil {
  102. return err
  103. }
  104. if !shouldFire(sampleRate) {
  105. return nil // ignore this call
  106. }
  107. if err := checkCount(count); err != nil {
  108. return err
  109. }
  110. return c.send(stat, "%d|c", count, sampleRate)
  111. }
  112. // Decr - Decrement a counter metric. Often used to note a particular event
  113. func (c *StatsdClient) Decr(stat string, count int64) error {
  114. return c.DecrWithSampling(stat, count, 1)
  115. }
  116. // DecrWithSampling - Decrement a counter metric with sampling between 0 and 1
  117. func (c *StatsdClient) DecrWithSampling(stat string, count int64, sampleRate float32) error {
  118. if err := checkSampleRate(sampleRate); err != nil {
  119. return err
  120. }
  121. if !shouldFire(sampleRate) {
  122. return nil // ignore this call
  123. }
  124. if err := checkCount(count); err != nil {
  125. return err
  126. }
  127. return c.send(stat, "%d|c", -count, sampleRate)
  128. }
  129. // Timing - Track a duration event
  130. // the time delta must be given in milliseconds
  131. func (c *StatsdClient) Timing(stat string, delta int64) error {
  132. return c.TimingWithSampling(stat, delta, 1)
  133. }
  134. // TimingWithSampling - Track a duration event
  135. func (c *StatsdClient) TimingWithSampling(stat string, delta int64, sampleRate float32) error {
  136. if err := checkSampleRate(sampleRate); err != nil {
  137. return err
  138. }
  139. if !shouldFire(sampleRate) {
  140. return nil // ignore this call
  141. }
  142. return c.send(stat, "%d|ms", delta, sampleRate)
  143. }
  144. // PrecisionTiming - Track a duration event
  145. // the time delta has to be a duration
  146. func (c *StatsdClient) PrecisionTiming(stat string, delta time.Duration) error {
  147. return c.send(stat, "%.6f|ms", float64(delta)/float64(time.Millisecond), 1)
  148. }
  149. // Gauge - Gauges are a constant data type. They are not subject to averaging,
  150. // and they don’t change unless you change them. That is, once you set a gauge value,
  151. // it will be a flat line on the graph until you change it again. If you specify
  152. // delta to be true, that specifies that the gauge should be updated, not set. Due to the
  153. // underlying protocol, you can't explicitly set a gauge to a negative number without
  154. // first setting it to zero.
  155. func (c *StatsdClient) Gauge(stat string, value int64) error {
  156. return c.GaugeWithSampling(stat, value, 1)
  157. }
  158. // GaugeWithSampling - Gauges are a constant data type.
  159. func (c *StatsdClient) GaugeWithSampling(stat string, value int64, sampleRate float32) error {
  160. if err := checkSampleRate(sampleRate); err != nil {
  161. return err
  162. }
  163. if !shouldFire(sampleRate) {
  164. return nil // ignore this call
  165. }
  166. if value < 0 {
  167. err := c.send(stat, "%d|g", 0, 1)
  168. if nil != err {
  169. return err
  170. }
  171. }
  172. return c.send(stat, "%d|g", value, sampleRate)
  173. }
  174. // GaugeDelta -- Send a change for a gauge
  175. func (c *StatsdClient) GaugeDelta(stat string, value int64) error {
  176. // Gauge Deltas are always sent with a leading '+' or '-'. The '-' takes care of itself but the '+' must added by hand
  177. if value < 0 {
  178. return c.send(stat, "%d|g", value, 1)
  179. }
  180. return c.send(stat, "+%d|g", value, 1)
  181. }
  182. // FGauge -- Send a floating point value for a gauge
  183. func (c *StatsdClient) FGauge(stat string, value float64) error {
  184. return c.FGaugeWithSampling(stat, value, 1)
  185. }
  186. // FGaugeWithSampling - Gauges are a constant data type.
  187. func (c *StatsdClient) FGaugeWithSampling(stat string, value float64, sampleRate float32) error {
  188. if err := checkSampleRate(sampleRate); err != nil {
  189. return err
  190. }
  191. if !shouldFire(sampleRate) {
  192. return nil
  193. }
  194. if value < 0 {
  195. err := c.send(stat, "%d|g", 0, 1)
  196. if nil != err {
  197. return err
  198. }
  199. }
  200. return c.send(stat, "%g|g", value, sampleRate)
  201. }
  202. // FGaugeDelta -- Send a floating point change for a gauge
  203. func (c *StatsdClient) FGaugeDelta(stat string, value float64) error {
  204. if value < 0 {
  205. return c.send(stat, "%g|g", value, 1)
  206. }
  207. return c.send(stat, "+%g|g", value, 1)
  208. }
  209. // Absolute - Send absolute-valued metric (not averaged/aggregated)
  210. func (c *StatsdClient) Absolute(stat string, value int64) error {
  211. return c.send(stat, "%d|a", value, 1)
  212. }
  213. // FAbsolute - Send absolute-valued floating point metric (not averaged/aggregated)
  214. func (c *StatsdClient) FAbsolute(stat string, value float64) error {
  215. return c.send(stat, "%g|a", value, 1)
  216. }
  217. // Total - Send a metric that is continously increasing, e.g. read operations since boot
  218. func (c *StatsdClient) Total(stat string, value int64) error {
  219. return c.send(stat, "%d|t", value, 1)
  220. }
  221. // write a UDP packet with the statsd event
  222. func (c *StatsdClient) send(stat string, format string, value interface{}, sampleRate float32) error {
  223. if c.conn == nil {
  224. return errNotConnected
  225. }
  226. stat = strings.Replace(stat, "%HOST%", Hostname, 1)
  227. metricString := c.prefix + stat + ":" + fmt.Sprintf(format, value)
  228. if sampleRate != 1 {
  229. metricString = fmt.Sprintf("%s|@%f", metricString, sampleRate)
  230. }
  231. // if sending tcp append a newline
  232. if c.sockType == tcpSocket {
  233. metricString += "\n"
  234. }
  235. _, err := fmt.Fprint(c.conn, metricString)
  236. return err
  237. }
  238. // SendEvent - Sends stats from an event object
  239. func (c *StatsdClient) SendEvent(e event.Event) error {
  240. if c.conn == nil {
  241. return errNotConnected
  242. }
  243. for _, stat := range e.Stats() {
  244. //fmt.Printf("SENDING EVENT %s%s\n", c.prefix, strings.Replace(stat, "%HOST%", Hostname, 1))
  245. _, err := fmt.Fprintf(c.conn, "%s%s", c.prefix, strings.Replace(stat, "%HOST%", Hostname, 1))
  246. if nil != err {
  247. return err
  248. }
  249. }
  250. return nil
  251. }
  252. // SendEvents - Sends stats from all the event objects.
  253. // Tries to bundle many together into one fmt.Fprintf based on UDPPayloadSize.
  254. func (c *StatsdClient) SendEvents(events map[string]event.Event) error {
  255. if c.conn == nil {
  256. return errNotConnected
  257. }
  258. var n int
  259. var stats = make([]string, 0)
  260. for _, e := range events {
  261. for _, stat := range e.Stats() {
  262. stat = c.prefix + strings.Replace(stat, "%HOST%", Hostname, 1)
  263. _n := n + len(stat) + 1
  264. if _n > UDPPayloadSize {
  265. // with this last event, the UDP payload would be too big
  266. if _, err := fmt.Fprintf(c.conn, strings.Join(stats, "\n")+"\n"); err != nil {
  267. return err
  268. }
  269. // reset payload after flushing, and add the last event
  270. stats = []string{stat}
  271. n = len(stat)
  272. continue
  273. }
  274. // can fit more into the current payload
  275. n = _n
  276. stats = append(stats, stat)
  277. }
  278. }
  279. if len(stats) != 0 {
  280. if _, err := fmt.Fprintf(c.conn, strings.Join(stats, "\n")+"\n"); err != nil {
  281. return err
  282. }
  283. }
  284. return nil
  285. }
  286. func checkCount(c int64) error {
  287. if c <= 0 {
  288. return ErrInvalidCount
  289. }
  290. return nil
  291. }
  292. func checkSampleRate(r float32) error {
  293. if r < 0 || r > 1 {
  294. return ErrInvalidSampleRate
  295. }
  296. return nil
  297. }
  298. func shouldFire(sampleRate float32) bool {
  299. if sampleRate == 1 {
  300. return true
  301. }
  302. r := rand.New(rand.NewSource(time.Now().Unix()))
  303. return r.Float32() <= sampleRate
  304. }