udp.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. // Copyright 2017 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package net
  15. import (
  16. "fmt"
  17. "io"
  18. "net"
  19. "strconv"
  20. "sync"
  21. "time"
  22. "github.com/fatedier/golib/pool"
  23. )
  24. type UDPPacket struct {
  25. Buf []byte
  26. LocalAddr net.Addr
  27. RemoteAddr net.Addr
  28. }
  29. type FakeUDPConn struct {
  30. l *UDPListener
  31. localAddr net.Addr
  32. remoteAddr net.Addr
  33. packets chan []byte
  34. closeFlag bool
  35. lastActive time.Time
  36. mu sync.RWMutex
  37. }
  38. func NewFakeUDPConn(l *UDPListener, laddr, raddr net.Addr) *FakeUDPConn {
  39. fc := &FakeUDPConn{
  40. l: l,
  41. localAddr: laddr,
  42. remoteAddr: raddr,
  43. packets: make(chan []byte, 20),
  44. }
  45. go func() {
  46. for {
  47. time.Sleep(5 * time.Second)
  48. fc.mu.RLock()
  49. if time.Since(fc.lastActive) > 10*time.Second {
  50. fc.mu.RUnlock()
  51. fc.Close()
  52. break
  53. }
  54. fc.mu.RUnlock()
  55. }
  56. }()
  57. return fc
  58. }
  59. func (c *FakeUDPConn) putPacket(content []byte) {
  60. defer func() {
  61. _ = recover()
  62. }()
  63. select {
  64. case c.packets <- content:
  65. default:
  66. }
  67. }
  68. func (c *FakeUDPConn) Read(b []byte) (n int, err error) {
  69. content, ok := <-c.packets
  70. if !ok {
  71. return 0, io.EOF
  72. }
  73. c.mu.Lock()
  74. c.lastActive = time.Now()
  75. c.mu.Unlock()
  76. if len(b) < len(content) {
  77. n = len(b)
  78. } else {
  79. n = len(content)
  80. }
  81. copy(b, content)
  82. return n, nil
  83. }
  84. func (c *FakeUDPConn) Write(b []byte) (n int, err error) {
  85. c.mu.RLock()
  86. if c.closeFlag {
  87. c.mu.RUnlock()
  88. return 0, io.ErrClosedPipe
  89. }
  90. c.mu.RUnlock()
  91. packet := &UDPPacket{
  92. Buf: b,
  93. LocalAddr: c.localAddr,
  94. RemoteAddr: c.remoteAddr,
  95. }
  96. _ = c.l.writeUDPPacket(packet)
  97. c.mu.Lock()
  98. c.lastActive = time.Now()
  99. c.mu.Unlock()
  100. return len(b), nil
  101. }
  102. func (c *FakeUDPConn) Close() error {
  103. c.mu.Lock()
  104. defer c.mu.Unlock()
  105. if !c.closeFlag {
  106. c.closeFlag = true
  107. close(c.packets)
  108. }
  109. return nil
  110. }
  111. func (c *FakeUDPConn) IsClosed() bool {
  112. c.mu.RLock()
  113. defer c.mu.RUnlock()
  114. return c.closeFlag
  115. }
  116. func (c *FakeUDPConn) LocalAddr() net.Addr {
  117. return c.localAddr
  118. }
  119. func (c *FakeUDPConn) RemoteAddr() net.Addr {
  120. return c.remoteAddr
  121. }
  122. func (c *FakeUDPConn) SetDeadline(_ time.Time) error {
  123. return nil
  124. }
  125. func (c *FakeUDPConn) SetReadDeadline(_ time.Time) error {
  126. return nil
  127. }
  128. func (c *FakeUDPConn) SetWriteDeadline(_ time.Time) error {
  129. return nil
  130. }
  131. type UDPListener struct {
  132. addr net.Addr
  133. acceptCh chan net.Conn
  134. writeCh chan *UDPPacket
  135. readConn net.Conn
  136. closeFlag bool
  137. fakeConns map[string]*FakeUDPConn
  138. }
  139. func ListenUDP(bindAddr string, bindPort int) (l *UDPListener, err error) {
  140. udpAddr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(bindAddr, strconv.Itoa(bindPort)))
  141. if err != nil {
  142. return l, err
  143. }
  144. readConn, err := net.ListenUDP("udp", udpAddr)
  145. l = &UDPListener{
  146. addr: udpAddr,
  147. acceptCh: make(chan net.Conn),
  148. writeCh: make(chan *UDPPacket, 1000),
  149. fakeConns: make(map[string]*FakeUDPConn),
  150. }
  151. // for reading
  152. go func() {
  153. for {
  154. buf := pool.GetBuf(1450)
  155. n, remoteAddr, err := readConn.ReadFromUDP(buf)
  156. if err != nil {
  157. close(l.acceptCh)
  158. close(l.writeCh)
  159. return
  160. }
  161. fakeConn, exist := l.fakeConns[remoteAddr.String()]
  162. if !exist || fakeConn.IsClosed() {
  163. fakeConn = NewFakeUDPConn(l, l.Addr(), remoteAddr)
  164. l.fakeConns[remoteAddr.String()] = fakeConn
  165. }
  166. fakeConn.putPacket(buf[:n])
  167. l.acceptCh <- fakeConn
  168. }
  169. }()
  170. // for writing
  171. go func() {
  172. for {
  173. packet, ok := <-l.writeCh
  174. if !ok {
  175. return
  176. }
  177. if addr, ok := packet.RemoteAddr.(*net.UDPAddr); ok {
  178. _, _ = readConn.WriteToUDP(packet.Buf, addr)
  179. }
  180. }
  181. }()
  182. return
  183. }
  184. func (l *UDPListener) writeUDPPacket(packet *UDPPacket) (err error) {
  185. defer func() {
  186. if errRet := recover(); errRet != nil {
  187. err = fmt.Errorf("udp write closed listener")
  188. }
  189. }()
  190. l.writeCh <- packet
  191. return
  192. }
  193. func (l *UDPListener) WriteMsg(buf []byte, remoteAddr *net.UDPAddr) (err error) {
  194. // only set remote addr here
  195. packet := &UDPPacket{
  196. Buf: buf,
  197. RemoteAddr: remoteAddr,
  198. }
  199. err = l.writeUDPPacket(packet)
  200. return
  201. }
  202. func (l *UDPListener) Accept() (net.Conn, error) {
  203. conn, ok := <-l.acceptCh
  204. if !ok {
  205. return conn, fmt.Errorf("channel for udp listener closed")
  206. }
  207. return conn, nil
  208. }
  209. func (l *UDPListener) Close() error {
  210. if !l.closeFlag {
  211. l.closeFlag = true
  212. if l.readConn != nil {
  213. l.readConn.Close()
  214. }
  215. }
  216. return nil
  217. }
  218. func (l *UDPListener) Addr() net.Addr {
  219. return l.addr
  220. }
  221. // ConnectedUDPConn is a wrapper for net.UDPConn which converts WriteTo syscalls
  222. // to Write syscalls that are 4 times faster on some OS'es. This should only be
  223. // used for connections that were produced by a net.Dial* call.
  224. type ConnectedUDPConn struct{ *net.UDPConn }
  225. // WriteTo redirects all writes to the Write syscall, which is 4 times faster.
  226. func (c *ConnectedUDPConn) WriteTo(b []byte, _ net.Addr) (int, error) { return c.Write(b) }