group.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. package features
  2. import (
  3. "fmt"
  4. "strconv"
  5. "sync"
  6. "time"
  7. "github.com/onsi/ginkgo/v2"
  8. "github.com/fatedier/frp/test/e2e/framework"
  9. "github.com/fatedier/frp/test/e2e/framework/consts"
  10. "github.com/fatedier/frp/test/e2e/mock/server/httpserver"
  11. "github.com/fatedier/frp/test/e2e/mock/server/streamserver"
  12. "github.com/fatedier/frp/test/e2e/pkg/request"
  13. )
  14. var _ = ginkgo.Describe("[Feature: Group]", func() {
  15. f := framework.NewDefaultFramework()
  16. newHTTPServer := func(port int, respContent string) *httpserver.Server {
  17. return httpserver.New(
  18. httpserver.WithBindPort(port),
  19. httpserver.WithHandler(framework.SpecifiedHTTPBodyHandler([]byte(respContent))),
  20. )
  21. }
  22. validateFooBarResponse := func(resp *request.Response) bool {
  23. if string(resp.Content) == "foo" || string(resp.Content) == "bar" {
  24. return true
  25. }
  26. return false
  27. }
  28. doFooBarHTTPRequest := func(vhostPort int, host string) []string {
  29. results := []string{}
  30. var wait sync.WaitGroup
  31. var mu sync.Mutex
  32. expectFn := func() {
  33. framework.NewRequestExpect(f).Port(vhostPort).
  34. RequestModify(func(r *request.Request) {
  35. r.HTTP().HTTPHost(host)
  36. }).
  37. Ensure(validateFooBarResponse, func(resp *request.Response) bool {
  38. mu.Lock()
  39. defer mu.Unlock()
  40. results = append(results, string(resp.Content))
  41. return true
  42. })
  43. }
  44. for i := 0; i < 10; i++ {
  45. wait.Add(1)
  46. go func() {
  47. defer wait.Done()
  48. expectFn()
  49. }()
  50. }
  51. wait.Wait()
  52. return results
  53. }
  54. ginkgo.Describe("Load Balancing", func() {
  55. ginkgo.It("TCP", func() {
  56. serverConf := consts.LegacyDefaultServerConfig
  57. clientConf := consts.LegacyDefaultClientConfig
  58. fooPort := f.AllocPort()
  59. fooServer := streamserver.New(streamserver.TCP, streamserver.WithBindPort(fooPort), streamserver.WithRespContent([]byte("foo")))
  60. f.RunServer("", fooServer)
  61. barPort := f.AllocPort()
  62. barServer := streamserver.New(streamserver.TCP, streamserver.WithBindPort(barPort), streamserver.WithRespContent([]byte("bar")))
  63. f.RunServer("", barServer)
  64. remotePort := f.AllocPort()
  65. clientConf += fmt.Sprintf(`
  66. [foo]
  67. type = tcp
  68. local_port = %d
  69. remote_port = %d
  70. group = test
  71. group_key = 123
  72. [bar]
  73. type = tcp
  74. local_port = %d
  75. remote_port = %d
  76. group = test
  77. group_key = 123
  78. `, fooPort, remotePort, barPort, remotePort)
  79. f.RunProcesses([]string{serverConf}, []string{clientConf})
  80. fooCount := 0
  81. barCount := 0
  82. for i := 0; i < 10; i++ {
  83. framework.NewRequestExpect(f).Explain("times " + strconv.Itoa(i)).Port(remotePort).Ensure(func(resp *request.Response) bool {
  84. switch string(resp.Content) {
  85. case "foo":
  86. fooCount++
  87. case "bar":
  88. barCount++
  89. default:
  90. return false
  91. }
  92. return true
  93. })
  94. }
  95. framework.ExpectTrue(fooCount > 1 && barCount > 1, "fooCount: %d, barCount: %d", fooCount, barCount)
  96. })
  97. })
  98. ginkgo.Describe("Health Check", func() {
  99. ginkgo.It("TCP", func() {
  100. serverConf := consts.LegacyDefaultServerConfig
  101. clientConf := consts.LegacyDefaultClientConfig
  102. fooPort := f.AllocPort()
  103. fooServer := streamserver.New(streamserver.TCP, streamserver.WithBindPort(fooPort), streamserver.WithRespContent([]byte("foo")))
  104. f.RunServer("", fooServer)
  105. barPort := f.AllocPort()
  106. barServer := streamserver.New(streamserver.TCP, streamserver.WithBindPort(barPort), streamserver.WithRespContent([]byte("bar")))
  107. f.RunServer("", barServer)
  108. remotePort := f.AllocPort()
  109. clientConf += fmt.Sprintf(`
  110. [foo]
  111. type = tcp
  112. local_port = %d
  113. remote_port = %d
  114. group = test
  115. group_key = 123
  116. health_check_type = tcp
  117. health_check_interval_s = 1
  118. [bar]
  119. type = tcp
  120. local_port = %d
  121. remote_port = %d
  122. group = test
  123. group_key = 123
  124. health_check_type = tcp
  125. health_check_interval_s = 1
  126. `, fooPort, remotePort, barPort, remotePort)
  127. f.RunProcesses([]string{serverConf}, []string{clientConf})
  128. // check foo and bar is ok
  129. results := []string{}
  130. for i := 0; i < 10; i++ {
  131. framework.NewRequestExpect(f).Port(remotePort).Ensure(validateFooBarResponse, func(resp *request.Response) bool {
  132. results = append(results, string(resp.Content))
  133. return true
  134. })
  135. }
  136. framework.ExpectContainElements(results, []string{"foo", "bar"})
  137. // close bar server, check foo is ok
  138. barServer.Close()
  139. time.Sleep(2 * time.Second)
  140. for i := 0; i < 10; i++ {
  141. framework.NewRequestExpect(f).Port(remotePort).ExpectResp([]byte("foo")).Ensure()
  142. }
  143. // resume bar server, check foo and bar is ok
  144. f.RunServer("", barServer)
  145. time.Sleep(2 * time.Second)
  146. results = []string{}
  147. for i := 0; i < 10; i++ {
  148. framework.NewRequestExpect(f).Port(remotePort).Ensure(validateFooBarResponse, func(resp *request.Response) bool {
  149. results = append(results, string(resp.Content))
  150. return true
  151. })
  152. }
  153. framework.ExpectContainElements(results, []string{"foo", "bar"})
  154. })
  155. ginkgo.It("HTTP", func() {
  156. vhostPort := f.AllocPort()
  157. serverConf := consts.LegacyDefaultServerConfig + fmt.Sprintf(`
  158. vhost_http_port = %d
  159. `, vhostPort)
  160. clientConf := consts.LegacyDefaultClientConfig
  161. fooPort := f.AllocPort()
  162. fooServer := newHTTPServer(fooPort, "foo")
  163. f.RunServer("", fooServer)
  164. barPort := f.AllocPort()
  165. barServer := newHTTPServer(barPort, "bar")
  166. f.RunServer("", barServer)
  167. clientConf += fmt.Sprintf(`
  168. [foo]
  169. type = http
  170. local_port = %d
  171. custom_domains = example.com
  172. group = test
  173. group_key = 123
  174. health_check_type = http
  175. health_check_interval_s = 1
  176. health_check_url = /healthz
  177. [bar]
  178. type = http
  179. local_port = %d
  180. custom_domains = example.com
  181. group = test
  182. group_key = 123
  183. health_check_type = http
  184. health_check_interval_s = 1
  185. health_check_url = /healthz
  186. `, fooPort, barPort)
  187. f.RunProcesses([]string{serverConf}, []string{clientConf})
  188. // send first HTTP request
  189. var contents []string
  190. framework.NewRequestExpect(f).Port(vhostPort).
  191. RequestModify(func(r *request.Request) {
  192. r.HTTP().HTTPHost("example.com")
  193. }).
  194. Ensure(func(resp *request.Response) bool {
  195. contents = append(contents, string(resp.Content))
  196. return true
  197. })
  198. // send second HTTP request, should be forwarded to another service
  199. framework.NewRequestExpect(f).Port(vhostPort).
  200. RequestModify(func(r *request.Request) {
  201. r.HTTP().HTTPHost("example.com")
  202. }).
  203. Ensure(func(resp *request.Response) bool {
  204. contents = append(contents, string(resp.Content))
  205. return true
  206. })
  207. framework.ExpectContainElements(contents, []string{"foo", "bar"})
  208. // check foo and bar is ok
  209. results := doFooBarHTTPRequest(vhostPort, "example.com")
  210. framework.ExpectContainElements(results, []string{"foo", "bar"})
  211. // close bar server, check foo is ok
  212. barServer.Close()
  213. time.Sleep(2 * time.Second)
  214. results = doFooBarHTTPRequest(vhostPort, "example.com")
  215. framework.ExpectContainElements(results, []string{"foo"})
  216. framework.ExpectNotContainElements(results, []string{"bar"})
  217. // resume bar server, check foo and bar is ok
  218. f.RunServer("", barServer)
  219. time.Sleep(2 * time.Second)
  220. results = doFooBarHTTPRequest(vhostPort, "example.com")
  221. framework.ExpectContainElements(results, []string{"foo", "bar"})
  222. })
  223. })
  224. })