sudp.go 6.5 KB


  1. // Copyright 2017 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package visitor
  15. import (
  16. "fmt"
  17. "io"
  18. "net"
  19. "strconv"
  20. "sync"
  21. "time"
  22. "github.com/fatedier/golib/errors"
  23. libio "github.com/fatedier/golib/io"
  24. v1 "github.com/fatedier/frp/pkg/config/v1"
  25. "github.com/fatedier/frp/pkg/msg"
  26. "github.com/fatedier/frp/pkg/proto/udp"
  27. netpkg "github.com/fatedier/frp/pkg/util/net"
  28. "github.com/fatedier/frp/pkg/util/util"
  29. "github.com/fatedier/frp/pkg/util/xlog"
  30. )
  31. type SUDPVisitor struct {
  32. *BaseVisitor
  33. checkCloseCh chan struct{}
  34. // udpConn is the listener of udp packet
  35. udpConn *net.UDPConn
  36. readCh chan *msg.UDPPacket
  37. sendCh chan *msg.UDPPacket
  38. cfg *v1.SUDPVisitorConfig
  39. }
  40. // SUDP Run start listen a udp port
  41. func (sv *SUDPVisitor) Run() (err error) {
  42. xl := xlog.FromContextSafe(sv.ctx)
  43. addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(sv.cfg.BindAddr, strconv.Itoa(sv.cfg.BindPort)))
  44. if err != nil {
  45. return fmt.Errorf("sudp ResolveUDPAddr error: %v", err)
  46. }
  47. sv.udpConn, err = net.ListenUDP("udp", addr)
  48. if err != nil {
  49. return fmt.Errorf("listen udp port %s error: %v", addr.String(), err)
  50. }
  51. sv.sendCh = make(chan *msg.UDPPacket, 1024)
  52. sv.readCh = make(chan *msg.UDPPacket, 1024)
  53. xl.Infof("sudp start to work, listen on %s", addr)
  54. go sv.dispatcher()
  55. go udp.ForwardUserConn(sv.udpConn, sv.readCh, sv.sendCh, int(sv.clientCfg.UDPPacketSize))
  56. return
  57. }
  58. func (sv *SUDPVisitor) dispatcher() {
  59. xl := xlog.FromContextSafe(sv.ctx)
  60. var (
  61. visitorConn net.Conn
  62. err error
  63. firstPacket *msg.UDPPacket
  64. )
  65. for {
  66. select {
  67. case firstPacket = <-sv.sendCh:
  68. if firstPacket == nil {
  69. xl.Infof("frpc sudp visitor proxy is closed")
  70. return
  71. }
  72. case <-sv.checkCloseCh:
  73. xl.Infof("frpc sudp visitor proxy is closed")
  74. return
  75. }
  76. visitorConn, err = sv.getNewVisitorConn()
  77. if err != nil {
  78. xl.Warnf("newVisitorConn to frps error: %v, try to reconnect", err)
  79. continue
  80. }
  81. // visitorConn always be closed when worker done.
  82. sv.worker(visitorConn, firstPacket)
  83. select {
  84. case <-sv.checkCloseCh:
  85. return
  86. default:
  87. }
  88. }
  89. }
  90. func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) {
  91. xl := xlog.FromContextSafe(sv.ctx)
  92. xl.Debugf("starting sudp proxy worker")
  93. wg := &sync.WaitGroup{}
  94. wg.Add(2)
  95. closeCh := make(chan struct{})
  96. // udp service -> frpc -> frps -> frpc visitor -> user
  97. workConnReaderFn := func(conn net.Conn) {
  98. defer func() {
  99. conn.Close()
  100. close(closeCh)
  101. wg.Done()
  102. }()
  103. for {
  104. var (
  105. rawMsg msg.Message
  106. errRet error
  107. )
  108. // frpc will send heartbeat in workConn to frpc visitor for keeping alive
  109. _ = conn.SetReadDeadline(time.Now().Add(60 * time.Second))
  110. if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil {
  111. xl.Warnf("read from workconn for user udp conn error: %v", errRet)
  112. return
  113. }
  114. _ = conn.SetReadDeadline(time.Time{})
  115. switch m := rawMsg.(type) {
  116. case *msg.Ping:
  117. xl.Debugf("frpc visitor get ping message from frpc")
  118. continue
  119. case *msg.UDPPacket:
  120. if errRet := errors.PanicToError(func() {
  121. sv.readCh <- m
  122. xl.Tracef("frpc visitor get udp packet from workConn: %s", m.Content)
  123. }); errRet != nil {
  124. xl.Infof("reader goroutine for udp work connection closed")
  125. return
  126. }
  127. }
  128. }
  129. }
  130. // udp service <- frpc <- frps <- frpc visitor <- user
  131. workConnSenderFn := func(conn net.Conn) {
  132. defer func() {
  133. conn.Close()
  134. wg.Done()
  135. }()
  136. var errRet error
  137. if firstPacket != nil {
  138. if errRet = msg.WriteMsg(conn, firstPacket); errRet != nil {
  139. xl.Warnf("sender goroutine for udp work connection closed: %v", errRet)
  140. return
  141. }
  142. xl.Tracef("send udp package to workConn: %s", firstPacket.Content)
  143. }
  144. for {
  145. select {
  146. case udpMsg, ok := <-sv.sendCh:
  147. if !ok {
  148. xl.Infof("sender goroutine for udp work connection closed")
  149. return
  150. }
  151. if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil {
  152. xl.Warnf("sender goroutine for udp work connection closed: %v", errRet)
  153. return
  154. }
  155. xl.Tracef("send udp package to workConn: %s", udpMsg.Content)
  156. case <-closeCh:
  157. return
  158. }
  159. }
  160. }
  161. go workConnReaderFn(workConn)
  162. go workConnSenderFn(workConn)
  163. wg.Wait()
  164. xl.Infof("sudp worker is closed")
  165. }
  166. func (sv *SUDPVisitor) getNewVisitorConn() (net.Conn, error) {
  167. xl := xlog.FromContextSafe(sv.ctx)
  168. visitorConn, err := sv.helper.ConnectServer()
  169. if err != nil {
  170. return nil, fmt.Errorf("frpc connect frps error: %v", err)
  171. }
  172. now := time.Now().Unix()
  173. newVisitorConnMsg := &msg.NewVisitorConn{
  174. RunID: sv.helper.RunID(),
  175. ProxyName: sv.cfg.ServerName,
  176. SignKey: util.GetAuthKey(sv.cfg.SecretKey, now),
  177. Timestamp: now,
  178. UseEncryption: sv.cfg.Transport.UseEncryption,
  179. UseCompression: sv.cfg.Transport.UseCompression,
  180. }
  181. err = msg.WriteMsg(visitorConn, newVisitorConnMsg)
  182. if err != nil {
  183. return nil, fmt.Errorf("frpc send newVisitorConnMsg to frps error: %v", err)
  184. }
  185. var newVisitorConnRespMsg msg.NewVisitorConnResp
  186. _ = visitorConn.SetReadDeadline(time.Now().Add(10 * time.Second))
  187. err = msg.ReadMsgInto(visitorConn, &newVisitorConnRespMsg)
  188. if err != nil {
  189. return nil, fmt.Errorf("frpc read newVisitorConnRespMsg error: %v", err)
  190. }
  191. _ = visitorConn.SetReadDeadline(time.Time{})
  192. if newVisitorConnRespMsg.Error != "" {
  193. return nil, fmt.Errorf("start new visitor connection error: %s", newVisitorConnRespMsg.Error)
  194. }
  195. var remote io.ReadWriteCloser
  196. remote = visitorConn
  197. if sv.cfg.Transport.UseEncryption {
  198. remote, err = libio.WithEncryption(remote, []byte(sv.cfg.SecretKey))
  199. if err != nil {
  200. xl.Errorf("create encryption stream error: %v", err)
  201. return nil, err
  202. }
  203. }
  204. if sv.cfg.Transport.UseCompression {
  205. remote = libio.WithCompression(remote)
  206. }
  207. return netpkg.WrapReadWriteCloserToConn(remote, visitorConn), nil
  208. }
  209. func (sv *SUDPVisitor) Close() {
  210. sv.mu.Lock()
  211. defer sv.mu.Unlock()
  212. select {
  213. case <-sv.checkCloseCh:
  214. return
  215. default:
  216. close(sv.checkCloseCh)
  217. }
  218. sv.BaseVisitor.Close()
  219. if sv.udpConn != nil {
  220. sv.udpConn.Close()
  221. }
  222. close(sv.readCh)
  223. close(sv.sendCh)
  224. }