server.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. // Copyright 2023 The frp Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package ssh
  15. import (
  16. "context"
  17. "encoding/binary"
  18. "errors"
  19. "fmt"
  20. "net"
  21. "slices"
  22. "strings"
  23. "sync"
  24. "time"
  25. libio "github.com/fatedier/golib/io"
  26. "github.com/spf13/cobra"
  27. flag "github.com/spf13/pflag"
  28. "golang.org/x/crypto/ssh"
  29. "github.com/fatedier/frp/client/proxy"
  30. "github.com/fatedier/frp/pkg/config"
  31. v1 "github.com/fatedier/frp/pkg/config/v1"
  32. "github.com/fatedier/frp/pkg/msg"
  33. "github.com/fatedier/frp/pkg/util/log"
  34. netpkg "github.com/fatedier/frp/pkg/util/net"
  35. "github.com/fatedier/frp/pkg/util/util"
  36. "github.com/fatedier/frp/pkg/util/xlog"
  37. "github.com/fatedier/frp/pkg/virtual"
  38. )
  39. const (
  40. // https://datatracker.ietf.org/doc/html/rfc4254#page-16
  41. ChannelTypeServerOpenChannel = "forwarded-tcpip"
  42. RequestTypeForward = "tcpip-forward"
  43. )
  44. type tcpipForward struct {
  45. Host string
  46. Port uint32
  47. }
  48. // https://datatracker.ietf.org/doc/html/rfc4254#page-16
  49. type forwardedTCPPayload struct {
  50. Addr string
  51. Port uint32
  52. OriginAddr string
  53. OriginPort uint32
  54. }
  55. type TunnelServer struct {
  56. underlyingConn net.Conn
  57. sshConn *ssh.ServerConn
  58. sc *ssh.ServerConfig
  59. firstChannel ssh.Channel
  60. vc *virtual.Client
  61. peerServerListener *netpkg.InternalListener
  62. doneCh chan struct{}
  63. closeDoneChOnce sync.Once
  64. }
  65. func NewTunnelServer(conn net.Conn, sc *ssh.ServerConfig, peerServerListener *netpkg.InternalListener) (*TunnelServer, error) {
  66. s := &TunnelServer{
  67. underlyingConn: conn,
  68. sc: sc,
  69. peerServerListener: peerServerListener,
  70. doneCh: make(chan struct{}),
  71. }
  72. return s, nil
  73. }
  74. func (s *TunnelServer) Run() error {
  75. sshConn, channels, requests, err := ssh.NewServerConn(s.underlyingConn, s.sc)
  76. if err != nil {
  77. return err
  78. }
  79. s.sshConn = sshConn
  80. addr, extraPayload, err := s.waitForwardAddrAndExtraPayload(channels, requests, 3*time.Second)
  81. if err != nil {
  82. return err
  83. }
  84. clientCfg, pc, helpMessage, err := s.parseClientAndProxyConfigurer(addr, extraPayload)
  85. if err != nil {
  86. if errors.Is(err, flag.ErrHelp) {
  87. s.writeToClient(helpMessage)
  88. return nil
  89. }
  90. s.writeToClient(err.Error())
  91. return fmt.Errorf("parse flags from ssh client error: %v", err)
  92. }
  93. clientCfg.Complete()
  94. if sshConn.Permissions != nil {
  95. clientCfg.User = util.EmptyOr(sshConn.Permissions.Extensions["user"], clientCfg.User)
  96. }
  97. pc.Complete(clientCfg.User)
  98. vc, err := virtual.NewClient(virtual.ClientOptions{
  99. Common: clientCfg,
  100. Spec: &msg.ClientSpec{
  101. Type: "ssh-tunnel",
  102. // If ssh does not require authentication, then the virtual client needs to authenticate through a token.
  103. // Otherwise, once ssh authentication is passed, the virtual client does not need to authenticate again.
  104. AlwaysAuthPass: !s.sc.NoClientAuth,
  105. },
  106. HandleWorkConnCb: func(base *v1.ProxyBaseConfig, workConn net.Conn, m *msg.StartWorkConn) bool {
  107. // join workConn and ssh channel
  108. c, err := s.openConn(addr)
  109. if err != nil {
  110. log.Tracef("open conn error: %v", err)
  111. workConn.Close()
  112. return false
  113. }
  114. libio.Join(c, workConn)
  115. return false
  116. },
  117. })
  118. if err != nil {
  119. return err
  120. }
  121. s.vc = vc
  122. // transfer connection from virtual client to server peer listener
  123. go func() {
  124. l := s.vc.PeerListener()
  125. for {
  126. conn, err := l.Accept()
  127. if err != nil {
  128. return
  129. }
  130. _ = s.peerServerListener.PutConn(conn)
  131. }
  132. }()
  133. xl := xlog.New().AddPrefix(xlog.LogPrefix{Name: "sshVirtualClient", Value: "sshVirtualClient", Priority: 100})
  134. ctx := xlog.NewContext(context.Background(), xl)
  135. go func() {
  136. vcErr := s.vc.Run(ctx)
  137. if vcErr != nil {
  138. s.writeToClient(vcErr.Error())
  139. }
  140. // If vc.Run returns, it means that the virtual client has been closed, and the ssh tunnel connection should be closed.
  141. // One scenario is that the virtual client exits due to login failure.
  142. s.closeDoneChOnce.Do(func() {
  143. _ = sshConn.Close()
  144. close(s.doneCh)
  145. })
  146. }()
  147. s.vc.UpdateProxyConfigurer([]v1.ProxyConfigurer{pc})
  148. if ps, err := s.waitProxyStatusReady(pc.GetBaseConfig().Name, time.Second); err != nil {
  149. s.writeToClient(err.Error())
  150. log.Warnf("wait proxy status ready error: %v", err)
  151. } else {
  152. // success
  153. s.writeToClient(createSuccessInfo(clientCfg.User, pc, ps))
  154. _ = sshConn.Wait()
  155. }
  156. s.vc.Close()
  157. log.Tracef("ssh tunnel connection from %v closed", sshConn.RemoteAddr())
  158. s.closeDoneChOnce.Do(func() {
  159. _ = sshConn.Close()
  160. close(s.doneCh)
  161. })
  162. return nil
  163. }
  164. func (s *TunnelServer) writeToClient(data string) {
  165. if s.firstChannel == nil {
  166. return
  167. }
  168. _, _ = s.firstChannel.Write([]byte(data + "\n"))
  169. }
  170. func (s *TunnelServer) waitForwardAddrAndExtraPayload(
  171. channels <-chan ssh.NewChannel,
  172. requests <-chan *ssh.Request,
  173. timeout time.Duration,
  174. ) (*tcpipForward, string, error) {
  175. addrCh := make(chan *tcpipForward, 1)
  176. extraPayloadCh := make(chan string, 1)
  177. // get forward address
  178. go func() {
  179. addrGot := false
  180. for req := range requests {
  181. if req.Type == RequestTypeForward && !addrGot {
  182. payload := tcpipForward{}
  183. if err := ssh.Unmarshal(req.Payload, &payload); err != nil {
  184. return
  185. }
  186. addrGot = true
  187. addrCh <- &payload
  188. }
  189. if req.WantReply {
  190. _ = req.Reply(true, nil)
  191. }
  192. }
  193. }()
  194. // get extra payload
  195. go func() {
  196. for newChannel := range channels {
  197. // extraPayload will send to extraPayloadCh
  198. go s.handleNewChannel(newChannel, extraPayloadCh)
  199. }
  200. }()
  201. var (
  202. addr *tcpipForward
  203. extraPayload string
  204. )
  205. timer := time.NewTimer(timeout)
  206. defer timer.Stop()
  207. for {
  208. select {
  209. case v := <-addrCh:
  210. addr = v
  211. case extra := <-extraPayloadCh:
  212. extraPayload = extra
  213. case <-timer.C:
  214. return nil, "", fmt.Errorf("get addr and extra payload timeout")
  215. }
  216. if addr != nil && extraPayload != "" {
  217. break
  218. }
  219. }
  220. return addr, extraPayload, nil
  221. }
  222. func (s *TunnelServer) parseClientAndProxyConfigurer(_ *tcpipForward, extraPayload string) (*v1.ClientCommonConfig, v1.ProxyConfigurer, string, error) {
  223. helpMessage := ""
  224. cmd := &cobra.Command{
  225. Use: "ssh v0@{address} [command]",
  226. Short: "ssh v0@{address} [command]",
  227. Run: func(*cobra.Command, []string) {},
  228. }
  229. cmd.SetGlobalNormalizationFunc(config.WordSepNormalizeFunc)
  230. args := strings.Split(extraPayload, " ")
  231. if len(args) < 1 {
  232. return nil, nil, helpMessage, fmt.Errorf("invalid extra payload")
  233. }
  234. proxyType := strings.TrimSpace(args[0])
  235. supportTypes := []string{"tcp", "http", "https", "tcpmux", "stcp"}
  236. if !slices.Contains(supportTypes, proxyType) {
  237. return nil, nil, helpMessage, fmt.Errorf("invalid proxy type: %s, support types: %v", proxyType, supportTypes)
  238. }
  239. pc := v1.NewProxyConfigurerByType(v1.ProxyType(proxyType))
  240. if pc == nil {
  241. return nil, nil, helpMessage, fmt.Errorf("new proxy configurer error")
  242. }
  243. config.RegisterProxyFlags(cmd, pc, config.WithSSHMode())
  244. clientCfg := v1.ClientCommonConfig{}
  245. config.RegisterClientCommonConfigFlags(cmd, &clientCfg, config.WithSSHMode())
  246. cmd.InitDefaultHelpCmd()
  247. if err := cmd.ParseFlags(args); err != nil {
  248. if errors.Is(err, flag.ErrHelp) {
  249. helpMessage = cmd.UsageString()
  250. }
  251. return nil, nil, helpMessage, err
  252. }
  253. // if name is not set, generate a random one
  254. if pc.GetBaseConfig().Name == "" {
  255. id, err := util.RandIDWithLen(8)
  256. if err != nil {
  257. return nil, nil, helpMessage, fmt.Errorf("generate random id error: %v", err)
  258. }
  259. pc.GetBaseConfig().Name = fmt.Sprintf("sshtunnel-%s-%s", proxyType, id)
  260. }
  261. return &clientCfg, pc, helpMessage, nil
  262. }
  263. func (s *TunnelServer) handleNewChannel(channel ssh.NewChannel, extraPayloadCh chan string) {
  264. ch, reqs, err := channel.Accept()
  265. if err != nil {
  266. return
  267. }
  268. if s.firstChannel == nil {
  269. s.firstChannel = ch
  270. }
  271. go s.keepAlive(ch)
  272. for req := range reqs {
  273. if req.WantReply {
  274. _ = req.Reply(true, nil)
  275. }
  276. if req.Type != "exec" || len(req.Payload) <= 4 {
  277. continue
  278. }
  279. end := 4 + binary.BigEndian.Uint32(req.Payload[:4])
  280. if len(req.Payload) < int(end) {
  281. continue
  282. }
  283. extraPayload := string(req.Payload[4:end])
  284. select {
  285. case extraPayloadCh <- extraPayload:
  286. default:
  287. }
  288. }
  289. }
  290. func (s *TunnelServer) keepAlive(ch ssh.Channel) {
  291. tk := time.NewTicker(time.Second * 30)
  292. defer tk.Stop()
  293. for {
  294. select {
  295. case <-tk.C:
  296. _, err := ch.SendRequest("heartbeat", false, nil)
  297. if err != nil {
  298. return
  299. }
  300. case <-s.doneCh:
  301. return
  302. }
  303. }
  304. }
  305. func (s *TunnelServer) openConn(addr *tcpipForward) (net.Conn, error) {
  306. payload := forwardedTCPPayload{
  307. Addr: addr.Host,
  308. Port: addr.Port,
  309. // Note: Here is just for compatibility, not the real source address.
  310. OriginAddr: addr.Host,
  311. OriginPort: addr.Port,
  312. }
  313. channel, reqs, err := s.sshConn.OpenChannel(ChannelTypeServerOpenChannel, ssh.Marshal(&payload))
  314. if err != nil {
  315. return nil, fmt.Errorf("open ssh channel error: %v", err)
  316. }
  317. go ssh.DiscardRequests(reqs)
  318. conn := netpkg.WrapReadWriteCloserToConn(channel, s.underlyingConn)
  319. return conn, nil
  320. }
  321. func (s *TunnelServer) waitProxyStatusReady(name string, timeout time.Duration) (*proxy.WorkingStatus, error) {
  322. ticker := time.NewTicker(100 * time.Millisecond)
  323. defer ticker.Stop()
  324. timer := time.NewTimer(timeout)
  325. defer timer.Stop()
  326. statusExporter := s.vc.Service().StatusExporter()
  327. for {
  328. select {
  329. case <-ticker.C:
  330. ps, ok := statusExporter.GetProxyStatus(name)
  331. if !ok {
  332. continue
  333. }
  334. switch ps.Phase {
  335. case proxy.ProxyPhaseRunning:
  336. return ps, nil
  337. case proxy.ProxyPhaseStartErr, proxy.ProxyPhaseClosed:
  338. return ps, errors.New(ps.Err)
  339. }
  340. case <-timer.C:
  341. return nil, fmt.Errorf("wait proxy status ready timeout")
  342. case <-s.doneCh:
  343. return nil, fmt.Errorf("ssh tunnel server closed")
  344. }
  345. }
  346. }