rpc.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package backend
  2. import (
  3. "fmt"
  4. "log"
  5. "math/rand"
  6. "net/rpc"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/toolkits/net"
  11. )
  12. type BackendClients struct {
  13. sync.RWMutex
  14. Clients map[string]*rpc.Client
  15. Addresses []string
  16. }
  17. func (this *BackendClients) GetAddresses() []string {
  18. return this.Addresses
  19. }
  20. func (this *BackendClients) InitAddresses(addresses []string) {
  21. this.Addresses = addresses
  22. }
  23. func (this *BackendClients) InitClients(clients map[string]*rpc.Client) {
  24. this.Lock()
  25. defer this.Unlock()
  26. this.Clients = clients
  27. }
  28. func (this *BackendClients) ReplaceClient(addr string, client *rpc.Client) {
  29. this.Lock()
  30. defer this.Unlock()
  31. old, has := this.Clients[addr]
  32. if has && old != nil {
  33. old.Close()
  34. }
  35. this.Clients[addr] = client
  36. }
  37. func (this *BackendClients) GetClient(addr string) (*rpc.Client, bool) {
  38. this.RLock()
  39. defer this.RUnlock()
  40. c, has := this.Clients[addr]
  41. return c, has
  42. }
  43. var Clients = &BackendClients{Clients: make(map[string]*rpc.Client)}
  44. func InitClients(addresses []string) {
  45. Clients.InitAddresses(addresses)
  46. cs := make(map[string]*rpc.Client)
  47. for _, endpoint := range addresses {
  48. client, err := net.JsonRpcClient("tcp", endpoint, time.Second)
  49. if err != nil {
  50. log.Fatalln("cannot connect to", endpoint)
  51. }
  52. cs[endpoint] = client
  53. }
  54. Clients.InitClients(cs)
  55. }
  56. func CallRpc(method string, args, reply interface{}) error {
  57. addrs := Clients.GetAddresses()
  58. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  59. for _, i := range r.Perm(len(addrs)) {
  60. addr := addrs[i]
  61. client, has := Clients.GetClient(addr)
  62. if !has {
  63. log.Println(addr, "has no client")
  64. continue
  65. }
  66. err := client.Call(method, args, reply)
  67. if err == nil {
  68. return nil
  69. }
  70. if err == rpc.ErrShutdown || strings.Contains(err.Error(), "connection refused") {
  71. // 后端可能重启了以至于原来持有的连接关闭,或者后端挂了
  72. // 可以尝试再次建立连接,搞定重启的情况
  73. client, err = net.JsonRpcClient("tcp", addr, time.Second)
  74. if err != nil {
  75. log.Println(addr, "is dead")
  76. continue
  77. } else {
  78. // 重新建立了与该实例的连接
  79. Clients.ReplaceClient(addr, client)
  80. return client.Call(method, args, reply)
  81. }
  82. }
  83. // 刚开始后端没挂,但是仍然失败了,比如请求时间比较长,还没有结束,后端重启了,unexpected EOF
  84. // 不确定此时后端逻辑是否真的执行过了,防止后端逻辑不幂等,无法重试
  85. return err
  86. }
  87. return fmt.Errorf("all backends are dead")
  88. }