123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- package backend
- import (
- "log"
- "math"
- "net"
- "net/rpc"
- "net/rpc/jsonrpc"
- "sync"
- "time"
- "github.com/710leo/urlooker/modules/web/g"
- )
- type SingleConnRpcClient struct {
- sync.Mutex
- rpcClient *rpc.Client
- RpcServer string
- Timeout time.Duration
- }
- func (this *SingleConnRpcClient) close() {
- if this.rpcClient != nil {
- this.rpcClient.Close()
- this.rpcClient = nil
- }
- }
- func (this *SingleConnRpcClient) insureConn() {
- if this.rpcClient != nil {
- return
- }
- var err error
- var retry int = 1
- for {
- if this.rpcClient != nil {
- return
- }
- this.rpcClient, err = NewClient("tcp", this.RpcServer, this.Timeout)
- if err == nil {
- return
- }
- log.Printf("dial %s fail: %v", this.RpcServer, err)
- if retry > 8 {
- retry = 1
- }
- time.Sleep(time.Duration(math.Pow(2.0, float64(retry))) * time.Second)
- retry++
- }
- }
- func (this *SingleConnRpcClient) Call(method string, args interface{}, reply interface{}) error {
- this.Lock()
- defer this.Unlock()
- this.insureConn()
- timeout := time.Duration(15 * time.Second)
- done := make(chan error, 1)
- go func() {
- err := this.rpcClient.Call(method, args, reply)
- done <- err
- }()
- select {
- case <-time.After(timeout):
- log.Printf("[WARN] rpc call timeout %v => %v", this.rpcClient, this.RpcServer)
- this.close()
- case err := <-done:
- if err != nil {
- this.close()
- return err
- }
- this.close()
- }
- return nil
- }
- func NewClient(network, address string, timeout time.Duration) (*rpc.Client, error) {
- conn, err := net.DialTimeout(network, address, timeout)
- if err != nil {
- return nil, err
- }
- return jsonrpc.NewClient(conn), err
- }
- func NewRpcClient(addr string) *SingleConnRpcClient {
- return &SingleConnRpcClient{
- RpcServer: addr,
- Timeout: time.Duration(g.Config.Alarm.CallTimeout) * time.Millisecond,
- }
- }
|