rpc.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package backend
  2. import (
  3. "log"
  4. "math"
  5. "net"
  6. "net/rpc"
  7. "net/rpc/jsonrpc"
  8. "sync"
  9. "time"
  10. "github.com/710leo/urlooker/modules/web/g"
  11. )
  12. type SingleConnRpcClient struct {
  13. sync.Mutex
  14. rpcClient *rpc.Client
  15. RpcServer string
  16. Timeout time.Duration
  17. }
  18. func (this *SingleConnRpcClient) close() {
  19. if this.rpcClient != nil {
  20. this.rpcClient.Close()
  21. this.rpcClient = nil
  22. }
  23. }
  24. func (this *SingleConnRpcClient) insureConn() {
  25. if this.rpcClient != nil {
  26. return
  27. }
  28. var err error
  29. var retry int = 1
  30. for {
  31. if this.rpcClient != nil {
  32. return
  33. }
  34. this.rpcClient, err = NewClient("tcp", this.RpcServer, this.Timeout)
  35. if err == nil {
  36. return
  37. }
  38. log.Printf("dial %s fail: %v", this.RpcServer, err)
  39. if retry > 8 {
  40. retry = 1
  41. }
  42. time.Sleep(time.Duration(math.Pow(2.0, float64(retry))) * time.Second)
  43. retry++
  44. }
  45. }
  46. func (this *SingleConnRpcClient) Call(method string, args interface{}, reply interface{}) error {
  47. this.Lock()
  48. defer this.Unlock()
  49. this.insureConn()
  50. timeout := time.Duration(15 * time.Second)
  51. done := make(chan error, 1)
  52. go func() {
  53. err := this.rpcClient.Call(method, args, reply)
  54. done <- err
  55. }()
  56. select {
  57. case <-time.After(timeout):
  58. log.Printf("[WARN] rpc call timeout %v => %v", this.rpcClient, this.RpcServer)
  59. this.close()
  60. case err := <-done:
  61. if err != nil {
  62. this.close()
  63. return err
  64. }
  65. this.close()
  66. }
  67. return nil
  68. }
  69. func NewClient(network, address string, timeout time.Duration) (*rpc.Client, error) {
  70. conn, err := net.DialTimeout(network, address, timeout)
  71. if err != nil {
  72. return nil, err
  73. }
  74. return jsonrpc.NewClient(conn), err
  75. }
  76. func NewRpcClient(addr string) *SingleConnRpcClient {
  77. return &SingleConnRpcClient{
  78. RpcServer: addr,
  79. Timeout: time.Duration(g.Config.Alarm.CallTimeout) * time.Millisecond,
  80. }
  81. }