Browse Source

aggregator: refine code and request data from plus-api

laiwei 8 years ago
parent
commit
a76ed8c6cf

+ 0 - 70
common/sdk/graph/last.go

@@ -1,70 +0,0 @@
-package graph
-
-import (
-	"bytes"
-	"encoding/json"
-	"io/ioutil"
-	"net/http"
-
-	"github.com/open-falcon/falcon-plus/common/model"
-	"github.com/open-falcon/falcon-plus/common/sdk/requests"
-)
-
-var GraphLastUrl = "http://127.0.0.1:9966/graph/last"
-
-func Last(endpoint, counter string) (val float64, ts int64, err error) {
-	param := &model.GraphLastParam{Endpoint: endpoint, Counter: counter}
-	bs, err := json.Marshal([]*model.GraphLastParam{param})
-	if err != nil {
-		return val, ts, err
-	}
-
-	bf := bytes.NewBuffer(bs)
-
-	resp, err := http.Post(GraphLastUrl, "application/json", bf)
-	if err != nil {
-		return val, ts, err
-	}
-
-	defer resp.Body.Close()
-	body, err := ioutil.ReadAll(resp.Body)
-	if err != nil {
-		return val, ts, err
-	}
-
-	var L []*model.GraphLastResp
-	err = json.Unmarshal(body, &L)
-	if err != nil {
-		return val, ts, err
-	}
-
-	if len(L) == 0 {
-		return val, ts, nil
-	}
-
-	v := L[0].Value
-	if v == nil {
-		return val, ts, nil
-	}
-
-	return float64(v.Value), v.Timestamp, nil
-}
-
-func Lasts(params []*model.GraphLastParam) ([]*model.GraphLastResp, error) {
-	if len(params) == 0 {
-		return []*model.GraphLastResp{}, nil
-	}
-
-	body, err := requests.PostJsonBody(GraphLastUrl, params)
-	if err != nil {
-		return []*model.GraphLastResp{}, err
-	}
-
-	var L []*model.GraphLastResp
-	err = json.Unmarshal(body, &L)
-	if err != nil {
-		return []*model.GraphLastResp{}, err
-	}
-
-	return L, nil
-}

+ 0 - 52
common/sdk/portal/hostnames.go

@@ -1,52 +0,0 @@
-package portal
-
-import (
-	"encoding/json"
-	"fmt"
-	"io/ioutil"
-	"log"
-	"net/http"
-)
-
-var HostnamesUrl = "http://127.0.0.1:5050/api/group/%s/hosts.json"
-
-type hostnamesDto struct {
-	Msg  string   `json:"msg"`
-	Data []string `json:"data"`
-}
-
-func Hostnames(groupName string, hostnamesUrl ...string) ([]string, error) {
-	pattern := HostnamesUrl
-	if len(hostnamesUrl) > 0 {
-		pattern = hostnamesUrl[0]
-	}
-
-	url := fmt.Sprintf(pattern, groupName)
-
-	resp, err := http.Get(url)
-	if err != nil {
-		log.Println("[E]", err)
-		return []string{}, err
-	}
-
-	defer resp.Body.Close()
-	body, err := ioutil.ReadAll(resp.Body)
-	if err != nil {
-		log.Println("[E]", err)
-		return []string{}, err
-	}
-
-	if resp.StatusCode != 200 {
-		log.Printf("[E] status code: %d != 200, response: %s", resp.StatusCode, string(body))
-		return []string{}, fmt.Errorf(string(body))
-	}
-
-	var dto hostnamesDto
-	err = json.Unmarshal(body, &dto)
-	if err != nil {
-		log.Println("[E]", err)
-		return []string{}, err
-	}
-
-	return dto.Data, nil
-}

+ 43 - 0
common/sdk/requests/auth_request.go

@@ -0,0 +1,43 @@
+package requests
+
+import (
+	"github.com/toolkits/net/httplib"
+	"time"
+	"encoding/json"
+	"errors"
+)
+
+func CurlPlus(uri, method, token_name, token_sig string, headers, params map[string]string) (req *httplib.BeegoHttpRequest, err error) {
+	if method == "GET" {
+		req = httplib.Get(uri)
+	} else if method == "POST" {
+		req = httplib.Post(uri)
+	} else if method == "PUT" {
+		req = httplib.Put(uri)
+	} else if method == "DELETE" {
+		req = httplib.Delete(uri)
+	} else if method == "HEAD" {
+		req = httplib.Head(uri)
+	} else {
+		err = errors.New("invalid http method")
+		return
+	}
+
+	req = req.SetTimeout(1*time.Second, 5*time.Second)
+
+	token, _ := json.Marshal(map[string]string{
+		"name": token_name,
+		"sig":  token_sig,
+	})
+	req.Header("Apitoken", string(token))
+
+	for hk, hv := range headers {
+		req.Header(hk, hv)
+	}
+
+	for pk, pv := range params {
+		req.Param(pk, pv)
+	}
+
+	return
+}

+ 3 - 3
config/aggregator.json

@@ -11,8 +11,8 @@
         "interval": 55
     },
     "api": {
-        "hostnames": "http://127.0.0.1:5050/api/group/%s/hosts.json",
-        "push": "http://%%TRANSFER_HTTP%%/api/push",
-        "graphLast": "http://%%QUERY_HTTP%%/graph/last"
+        "plus_api": "http://127.0.0.1:8080",
+        "plus_api_token": "used-by-alarm-in-server-side-and-disabled-by-set-to-blank",
+        "push_api": "http://127.0.0.1:1988/v1/push"
     }
 }

+ 2 - 0
docs/assets.css

@@ -513,6 +513,8 @@ code {
   float:left;
   padding: 20px;
   max-width: 700px;
+  overflow: auto;
+  height: 100%;
 }
 
 #content .control {

+ 3 - 3
modules/aggregator/cfg.example.json

@@ -11,8 +11,8 @@
         "interval": 55
     },
     "api": {
-        "hostnames": "http://127.0.0.1:5050/api/group/%s/hosts.json",
-        "push": "http://127.0.0.1:6060/api/push",
-        "graphLast": "http://127.0.0.1:9966/graph/last"
+        "plus_api": "http://127.0.0.1:8080",
+        "plus_api_token": "used-by-alarm-in-server-side-and-disabled-by-set-to-blank",
+        "push_api": "http://127.0.0.1:1988/v1/push"
     }
 }

+ 3 - 14
modules/aggregator/cron/query.go

@@ -1,8 +1,7 @@
 package cron
 
 import (
-	"github.com/open-falcon/falcon-plus/common/model"
-	"github.com/open-falcon/falcon-plus/common/sdk/graph"
+	"github.com/open-falcon/falcon-plus/modules/aggregator/sdk"
 )
 
 func queryCounterLast(numeratorOperands, denominatorOperands, hostnames []string, begin, end int64) (map[string]float64, error) {
@@ -15,19 +14,9 @@ func queryCounterLast(numeratorOperands, denominatorOperands, hostnames []string
 		counters = append(counters, counter)
 	}
 
-	params := []*model.GraphLastParam{}
-	counterSize := len(counters)
-	hostnameSize := len(hostnames)
-
-	for i := 0; i < counterSize; i++ {
-		for j := 0; j < hostnameSize; j++ {
-			params = append(params, &model.GraphLastParam{Endpoint: hostnames[j], Counter: counters[i]})
-		}
-	}
-
-	resp, err := graph.Lasts(params)
+	resp, err := sdk.QueryLastPoints(hostnames, counters)
 	if err != nil {
-		return nil, err
+		return map[string]float64{}, err
 	}
 
 	ret := make(map[string]float64)

+ 2 - 3
modules/aggregator/cron/run.go

@@ -1,10 +1,9 @@
 package cron
 
 import (
-	"fmt"
-	"github.com/open-falcon/falcon-plus/common/sdk/portal"
 	"github.com/open-falcon/falcon-plus/common/sdk/sender"
 	"github.com/open-falcon/falcon-plus/modules/aggregator/g"
+	"github.com/open-falcon/falcon-plus/modules/aggregator/sdk"
 	"log"
 	"strconv"
 	"strings"
@@ -38,7 +37,7 @@ func WorkerRun(item *g.Cluster) {
 		return
 	}
 
-	hostnames, err := portal.Hostnames(fmt.Sprintf("%d", item.GroupId))
+	hostnames, err := sdk.HostnamesByID(item.GroupId)
 	if err != nil || len(hostnames) == 0 {
 		return
 	}

+ 3 - 3
modules/aggregator/g/cfg.go

@@ -20,9 +20,9 @@ type DatabaseConfig struct {
 }
 
 type ApiConfig struct {
-	Hostnames string `json:"hostnames"`
-	Push      string `json:"push"`
-	GraphLast string `json:"graphLast'`
+	PlusApi      string `json:"plus_api"`
+	PlusApiToken string `json:"plus_api_token"`
+	PushApi      string `json:"push_api"`
 }
 
 type GlobalConfig struct {

+ 1 - 5
modules/aggregator/main.go

@@ -7,8 +7,6 @@ import (
 	"os/signal"
 	"syscall"
 
-	"github.com/open-falcon/falcon-plus/common/sdk/graph"
-	"github.com/open-falcon/falcon-plus/common/sdk/portal"
 	"github.com/open-falcon/falcon-plus/common/sdk/sender"
 	"github.com/open-falcon/falcon-plus/modules/aggregator/cron"
 	"github.com/open-falcon/falcon-plus/modules/aggregator/db"
@@ -39,10 +37,8 @@ func main() {
 	go cron.UpdateItems()
 
 	// sdk configuration
-	graph.GraphLastUrl = g.Config().Api.GraphLast
 	sender.Debug = g.Config().Debug
-	sender.PostPushUrl = g.Config().Api.Push
-	portal.HostnamesUrl = g.Config().Api.Hostnames
+	sender.PostPushUrl = g.Config().Api.PushApi
 
 	sender.StartSender()
 

+ 71 - 0
modules/aggregator/sdk/sdk.go

@@ -0,0 +1,71 @@
+package sdk
+
+import (
+	"encoding/json"
+	"fmt"
+	cmodel "github.com/open-falcon/common/model"
+	"github.com/open-falcon/falcon-plus/common/sdk/requests"
+	"github.com/open-falcon/falcon-plus/modules/aggregator/g"
+	f "github.com/open-falcon/falcon-plus/modules/api/app/model/falcon_portal"
+	"github.com/toolkits/net/httplib"
+)
+
+func HostnamesByID(group_id int64) ([]string, error) {
+
+	uri := fmt.Sprintf("%s/api/v1/hostgroup/%d", g.Config().Api.PlusApi, group_id)
+	req, err := requests.CurlPlus(uri, "GET", "aggregator", g.Config().Api.PlusApiToken,
+		map[string]string{}, map[string]string{})
+
+	if err != nil {
+		return []string{}, err
+	}
+
+	type RESP struct {
+		HostGroup f.HostGroup `json:"hostgroup"`
+		Hosts     []f.Host    `json:"hosts"`
+	}
+
+	resp := &RESP{}
+	err = req.ToJson(&resp)
+	if err != nil {
+		return []string{}, err
+	}
+
+	hosts := []string{}
+	for _, x := range resp.Hosts {
+		hosts = append(hosts, x.Hostname)
+	}
+	return hosts, nil
+}
+
+func QueryLastPoints(endpoints, counters []string) (resp []*cmodel.GraphLastResp, err error) {
+	uri := fmt.Sprintf("%s/api/v1/graph/lastpoint", g.Config().Api.PlusApi)
+
+	var req *httplib.BeegoHttpRequest
+	headers := map[string]string{"Content-type": "application/json"}
+	req, err = requests.CurlPlus(uri, "POST", "aggregator", g.Config().Api.PlusApiToken,
+		headers, map[string]string{})
+
+	if err != nil {
+		return
+	}
+
+	body := map[string][]string{
+		"endpoints": endpoints,
+		"counters":  counters,
+	}
+
+	b, err := json.Marshal(body)
+	if err != nil {
+		return
+	}
+
+	req.Body(b)
+
+	err = req.ToJson(&resp)
+	if err != nil {
+		return
+	}
+
+	return resp, nil
+}

+ 31 - 0
modules/aggregator/sdk/sdk_test.go

@@ -0,0 +1,31 @@
+package sdk
+
+import (
+	"testing"
+
+	"github.com/open-falcon/falcon-plus/modules/aggregator/g"
+	. "github.com/smartystreets/goconvey/convey"
+)
+
+func init() {
+	g.ParseConfig("../cfg.example.json")
+}
+
+func TestSDK(t *testing.T) {
+	Convey("get hostnames by id", t, func() {
+		r, err := HostnamesByID(1)
+		t.Log(r, err)
+		So(err, ShouldBeNil)
+		So(len(r), ShouldBeGreaterThanOrEqualTo, 0)
+	})
+
+	Convey("query last points", t, func() {
+		r, err := QueryLastPoints([]string{"laiweiofficemac"}, []string{"agent.alive"})
+		t.Log(r, err)
+		So(err, ShouldBeNil)
+		So(len(r), ShouldBeGreaterThanOrEqualTo, 0)
+		for _, x := range r {
+			t.Log(x)
+		}
+	})
+}