Эх сурвалжийг харах

加入Monitor节点选举的负载均衡策略支持

hugui 9 жил өмнө
parent
commit
e4ed8f6eb3

+ 20 - 0
docs/包引入说明.md

@@ -36,6 +36,26 @@
 </dependency>
 ```
 
+`zookeeper包`
+
+```java
+<dependency>
+    <groupId>org.apache.zookeeper</groupId>
+    <artifactId>zookeeper</artifactId>
+    <version>${zk.version}</version>
+    <exclusions>
+        <exclusion>
+            <groupId>org.jboss.netty</groupId>
+            <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+        </exclusion>
+    </exclusions>
+</dependency>
+```
+
 #####1.3 通讯包
 netty或者mina, 二选一, 通过 addConfig("lts.remoting", "可选值: netty, mina") 设置
 

+ 14 - 5
lts-core/src/main/java/com/lts/core/monitor/MStatReportWorker.java

@@ -14,8 +14,10 @@ import com.lts.core.commons.utils.CollectionUtils;
 import com.lts.core.domain.monitor.MData;
 import com.lts.core.domain.monitor.MNode;
 import com.lts.core.json.JSON;
+import com.lts.core.loadbalance.LoadBalance;
 import com.lts.core.logger.Logger;
 import com.lts.core.logger.LoggerFactory;
+import com.lts.core.spi.ServiceLoader;
 import com.lts.core.support.SystemClock;
 import com.lts.jvmmonitor.JVMCollector;
 
@@ -39,11 +41,13 @@ public class MStatReportWorker implements Runnable {
     private final static int MAX_RETRY_RETAIN = 500;
     private final static int BATCH_REPORT_SIZE = 10;
     private volatile boolean running = false;
+    private LoadBalance loadBalance;
 
     public MStatReportWorker(AppContext appContext, AbstractMStatReporter reporter) {
         this.appContext = appContext;
         this.reporter = reporter;
         interval = appContext.getConfig().getParameter("lts.monitor.report.interval", 1);
+        this.loadBalance = ServiceLoader.load(LoadBalance.class, appContext.getConfig(), "monitor.select.loadbalance");
     }
 
     @Override
@@ -141,8 +145,8 @@ public class MStatReportWorker implements Runnable {
 
     // 发送请求
     private boolean sendReq(List<Node> monitorNodes, HttpCmd cmd) {
-        boolean success = false;
-        for (Node node : monitorNodes) {
+        while (true) {
+            Node node = selectMNode(monitorNodes);
             try {
                 cmd.setNodeIdentity(node.getIdentity());
                 HttpCmdResponse response = HttpCmdClient.doPost(node.getIp(), node.getPort(), cmd);
@@ -150,19 +154,24 @@ public class MStatReportWorker implements Runnable {
                     if (LOGGER.isDebugEnabled()) {
                         LOGGER.debug("Report Monitor Data Success.");
                     }
-                    success = true;
-                    break;
+                    return true;
                 } else {
                     LOGGER.warn("Report Monitor Data Failed: " + response.getMsg());
+                    monitorNodes.remove(node);
                 }
             } catch (Exception e) {
                 LOGGER.warn("Report Monitor Data Error: " + e.getMessage(), e);
                 // 重试下一个
             }
+            if (monitorNodes.size() == 0) {
+                return false;
+            }
         }
-        return success;
     }
 
+    private Node selectMNode(List<Node> monitorNodes) {
+        return loadBalance.select(monitorNodes, appContext.getConfig().getIdentity());
+    }
 
     private MNode buildMNode() {
         MNode mNode = new MNode();

+ 6 - 0
lts-core/src/main/java/com/lts/core/spi/ServiceLoader.java

@@ -37,6 +37,12 @@ public class ServiceLoader {
         return load(clazz, dynamicServiceName, identity);
     }
 
+    public static <T> T load(Class<T> clazz, Config config, String configKey) {
+        String dynamicServiceName = config.getParameter(configKey);
+        String identity = config.getIdentity();
+        return load(clazz, dynamicServiceName, identity);
+    }
+
     public static <T> T loadDefault(Class<T> clazz) {
         return load(clazz, "");
     }