pool.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. package utils
  2. import (
  3. "log"
  4. "sync"
  5. "time"
  6. )
  7. //ConnPool to use
  8. type ConnPool interface {
  9. Get() (conn interface{}, err error)
  10. Put(conn interface{})
  11. ReleaseAll()
  12. Len() (length int)
  13. }
  14. type poolConfig struct {
  15. Factory func() (interface{}, error)
  16. IsActive func(interface{}) bool
  17. Release func(interface{})
  18. InitialCap int
  19. MaxCap int
  20. }
  21. func NewConnPool(poolConfig poolConfig) (pool ConnPool, err error) {
  22. p := netPool{
  23. config: poolConfig,
  24. conns: make(chan interface{}, poolConfig.MaxCap),
  25. lock: &sync.Mutex{},
  26. }
  27. //log.Printf("pool MaxCap:%d", poolConfig.MaxCap)
  28. if poolConfig.MaxCap > 0 {
  29. err = p.initAutoFill(false)
  30. if err == nil {
  31. p.initAutoFill(true)
  32. }
  33. }
  34. return &p, nil
  35. }
  36. type netPool struct {
  37. conns chan interface{}
  38. lock *sync.Mutex
  39. config poolConfig
  40. }
  41. func (p *netPool) initAutoFill(async bool) (err error) {
  42. var worker = func() (err error) {
  43. for {
  44. //log.Printf("pool fill: %v , len: %d", p.Len() <= p.config.InitialCap/2, p.Len())
  45. if p.Len() <= p.config.InitialCap/2 {
  46. p.lock.Lock()
  47. errN := 0
  48. for i := 0; i < p.config.InitialCap; i++ {
  49. c, err := p.config.Factory()
  50. if err != nil {
  51. errN++
  52. if async {
  53. continue
  54. } else {
  55. p.lock.Unlock()
  56. return err
  57. }
  58. }
  59. select {
  60. case p.conns <- c:
  61. default:
  62. p.config.Release(c)
  63. break
  64. }
  65. if p.Len() >= p.config.InitialCap {
  66. break
  67. }
  68. }
  69. if errN > 0 {
  70. log.Printf("fill conn pool fail , ERRN:%d", errN)
  71. }
  72. p.lock.Unlock()
  73. }
  74. if !async {
  75. return
  76. }
  77. time.Sleep(time.Second * 2)
  78. }
  79. }
  80. if async {
  81. go worker()
  82. } else {
  83. err = worker()
  84. }
  85. return
  86. }
  87. func (p *netPool) Get() (conn interface{}, err error) {
  88. // defer func() {
  89. // log.Printf("pool len : %d", p.Len())
  90. // }()
  91. p.lock.Lock()
  92. defer p.lock.Unlock()
  93. // for {
  94. select {
  95. case conn = <-p.conns:
  96. if p.config.IsActive(conn) {
  97. return
  98. }
  99. p.config.Release(conn)
  100. default:
  101. conn, err = p.config.Factory()
  102. if err != nil {
  103. return nil, err
  104. }
  105. return conn, nil
  106. }
  107. // }
  108. return
  109. }
  110. func (p *netPool) Put(conn interface{}) {
  111. if conn == nil {
  112. return
  113. }
  114. p.lock.Lock()
  115. defer p.lock.Unlock()
  116. if !p.config.IsActive(conn) {
  117. p.config.Release(conn)
  118. }
  119. select {
  120. case p.conns <- conn:
  121. default:
  122. p.config.Release(conn)
  123. }
  124. }
  125. func (p *netPool) ReleaseAll() {
  126. p.lock.Lock()
  127. defer p.lock.Unlock()
  128. close(p.conns)
  129. for c := range p.conns {
  130. p.config.Release(c)
  131. }
  132. p.conns = make(chan interface{}, p.config.InitialCap)
  133. }
  134. func (p *netPool) Len() (length int) {
  135. return len(p.conns)
  136. }