http.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. package group
  2. import (
  3. "fmt"
  4. "net"
  5. "sync"
  6. "sync/atomic"
  7. "github.com/fatedier/frp/pkg/util/vhost"
  8. )
  9. type HTTPGroupController struct {
  10. // groups indexed by group name
  11. groups map[string]*HTTPGroup
  12. // register createConn for each group to vhostRouter.
  13. // createConn will get a connection from one proxy of the group
  14. vhostRouter *vhost.Routers
  15. mu sync.Mutex
  16. }
  17. func NewHTTPGroupController(vhostRouter *vhost.Routers) *HTTPGroupController {
  18. return &HTTPGroupController{
  19. groups: make(map[string]*HTTPGroup),
  20. vhostRouter: vhostRouter,
  21. }
  22. }
  23. func (ctl *HTTPGroupController) Register(
  24. proxyName, group, groupKey string,
  25. routeConfig vhost.RouteConfig,
  26. ) (err error) {
  27. indexKey := group
  28. ctl.mu.Lock()
  29. g, ok := ctl.groups[indexKey]
  30. if !ok {
  31. g = NewHTTPGroup(ctl)
  32. ctl.groups[indexKey] = g
  33. }
  34. ctl.mu.Unlock()
  35. return g.Register(proxyName, group, groupKey, routeConfig)
  36. }
  37. func (ctl *HTTPGroupController) UnRegister(proxyName, group string, _ vhost.RouteConfig) {
  38. indexKey := group
  39. ctl.mu.Lock()
  40. defer ctl.mu.Unlock()
  41. g, ok := ctl.groups[indexKey]
  42. if !ok {
  43. return
  44. }
  45. isEmpty := g.UnRegister(proxyName)
  46. if isEmpty {
  47. delete(ctl.groups, indexKey)
  48. }
  49. }
  50. type HTTPGroup struct {
  51. group string
  52. groupKey string
  53. domain string
  54. location string
  55. routeByHTTPUser string
  56. // CreateConnFuncs indexed by proxy name
  57. createFuncs map[string]vhost.CreateConnFunc
  58. pxyNames []string
  59. index uint64
  60. ctl *HTTPGroupController
  61. mu sync.RWMutex
  62. }
  63. func NewHTTPGroup(ctl *HTTPGroupController) *HTTPGroup {
  64. return &HTTPGroup{
  65. createFuncs: make(map[string]vhost.CreateConnFunc),
  66. pxyNames: make([]string, 0),
  67. ctl: ctl,
  68. }
  69. }
  70. func (g *HTTPGroup) Register(
  71. proxyName, group, groupKey string,
  72. routeConfig vhost.RouteConfig,
  73. ) (err error) {
  74. g.mu.Lock()
  75. defer g.mu.Unlock()
  76. if len(g.createFuncs) == 0 {
  77. // the first proxy in this group
  78. tmp := routeConfig // copy object
  79. tmp.CreateConnFn = g.createConn
  80. tmp.ChooseEndpointFn = g.chooseEndpoint
  81. tmp.CreateConnByEndpointFn = g.createConnByEndpoint
  82. err = g.ctl.vhostRouter.Add(routeConfig.Domain, routeConfig.Location, routeConfig.RouteByHTTPUser, &tmp)
  83. if err != nil {
  84. return
  85. }
  86. g.group = group
  87. g.groupKey = groupKey
  88. g.domain = routeConfig.Domain
  89. g.location = routeConfig.Location
  90. g.routeByHTTPUser = routeConfig.RouteByHTTPUser
  91. } else {
  92. if g.group != group || g.domain != routeConfig.Domain ||
  93. g.location != routeConfig.Location || g.routeByHTTPUser != routeConfig.RouteByHTTPUser {
  94. err = ErrGroupParamsInvalid
  95. return
  96. }
  97. if g.groupKey != groupKey {
  98. err = ErrGroupAuthFailed
  99. return
  100. }
  101. }
  102. if _, ok := g.createFuncs[proxyName]; ok {
  103. err = ErrProxyRepeated
  104. return
  105. }
  106. g.createFuncs[proxyName] = routeConfig.CreateConnFn
  107. g.pxyNames = append(g.pxyNames, proxyName)
  108. return nil
  109. }
  110. func (g *HTTPGroup) UnRegister(proxyName string) (isEmpty bool) {
  111. g.mu.Lock()
  112. defer g.mu.Unlock()
  113. delete(g.createFuncs, proxyName)
  114. for i, name := range g.pxyNames {
  115. if name == proxyName {
  116. g.pxyNames = append(g.pxyNames[:i], g.pxyNames[i+1:]...)
  117. break
  118. }
  119. }
  120. if len(g.createFuncs) == 0 {
  121. isEmpty = true
  122. g.ctl.vhostRouter.Del(g.domain, g.location, g.routeByHTTPUser)
  123. }
  124. return
  125. }
  126. func (g *HTTPGroup) createConn(remoteAddr string) (net.Conn, error) {
  127. var f vhost.CreateConnFunc
  128. newIndex := atomic.AddUint64(&g.index, 1)
  129. g.mu.RLock()
  130. group := g.group
  131. domain := g.domain
  132. location := g.location
  133. routeByHTTPUser := g.routeByHTTPUser
  134. if len(g.pxyNames) > 0 {
  135. name := g.pxyNames[int(newIndex)%len(g.pxyNames)]
  136. f = g.createFuncs[name]
  137. }
  138. g.mu.RUnlock()
  139. if f == nil {
  140. return nil, fmt.Errorf("no CreateConnFunc for http group [%s], domain [%s], location [%s], routeByHTTPUser [%s]",
  141. group, domain, location, routeByHTTPUser)
  142. }
  143. return f(remoteAddr)
  144. }
  145. func (g *HTTPGroup) chooseEndpoint() (string, error) {
  146. newIndex := atomic.AddUint64(&g.index, 1)
  147. name := ""
  148. g.mu.RLock()
  149. group := g.group
  150. domain := g.domain
  151. location := g.location
  152. routeByHTTPUser := g.routeByHTTPUser
  153. if len(g.pxyNames) > 0 {
  154. name = g.pxyNames[int(newIndex)%len(g.pxyNames)]
  155. }
  156. g.mu.RUnlock()
  157. if name == "" {
  158. return "", fmt.Errorf("no healthy endpoint for http group [%s], domain [%s], location [%s], routeByHTTPUser [%s]",
  159. group, domain, location, routeByHTTPUser)
  160. }
  161. return name, nil
  162. }
  163. func (g *HTTPGroup) createConnByEndpoint(endpoint, remoteAddr string) (net.Conn, error) {
  164. var f vhost.CreateConnFunc
  165. g.mu.RLock()
  166. f = g.createFuncs[endpoint]
  167. g.mu.RUnlock()
  168. if f == nil {
  169. return nil, fmt.Errorf("no CreateConnFunc for endpoint [%s] in group [%s]", endpoint, g.group)
  170. }
  171. return f(remoteAddr)
  172. }