Browse Source

解决一个jvm 运行两个 node 的问题

胡贵 10 years ago
parent
commit
d947e2850b
61 changed files with 522 additions and 549 deletions
  1. 1 1
      README.md
  2. 15 0
      job-admin/pom.xml
  3. 1 1
      job-client/pom.xml
  4. 3 1
      job-client/src/main/java/com/lts/job/client/JobClient.java
  5. 1 1
      job-client/src/main/java/com/lts/job/client/RetryJobClient.java
  6. 1 1
      job-client/src/main/java/com/lts/job/client/processor/RemotingDispatcher.java
  7. 1 1
      job-core/pom.xml
  8. 6 6
      job-core/src/main/java/com/lts/job/core/cluster/AbstractClientNode.java
  9. 14 6
      job-core/src/main/java/com/lts/job/core/cluster/AbstractJobNode.java
  10. 2 3
      job-core/src/main/java/com/lts/job/core/cluster/AbstractServerNode.java
  11. 20 15
      job-core/src/main/java/com/lts/job/core/cluster/MasterElector.java
  12. 15 10
      job-core/src/main/java/com/lts/job/core/cluster/NodeFactory.java
  13. 15 9
      job-core/src/main/java/com/lts/job/core/cluster/NodeManager.java
  14. 15 9
      job-core/src/main/java/com/lts/job/core/cluster/PathParser.java
  15. 17 6
      job-core/src/main/java/com/lts/job/core/constant/Constants.java
  16. 11 6
      job-core/src/main/java/com/lts/job/core/listener/MasterNodeElectionListener.java
  17. 0 7
      job-core/src/main/java/com/lts/job/core/protocol/command/AbstractCommandBody.java
  18. 25 0
      job-core/src/main/java/com/lts/job/core/protocol/command/CommandWrapper.java
  19. 1 1
      job-core/src/main/java/com/lts/job/core/protocol/command/HeartBeatRequest.java
  20. 17 14
      job-core/src/main/java/com/lts/job/core/registry/NodeRegistry.java
  21. 3 1
      job-core/src/main/java/com/lts/job/core/remoting/HeartBeatMonitor.java
  22. 4 2
      job-core/src/main/java/com/lts/job/core/remoting/HeartBeater.java
  23. 13 8
      job-core/src/main/java/com/lts/job/core/remoting/RemotingClientDelegate.java
  24. 9 4
      job-core/src/main/java/com/lts/job/core/remoting/RemotingServerDelegate.java
  25. 41 4
      job-core/src/main/java/com/lts/job/core/support/Application.java
  26. 5 5
      job-core/src/main/java/com/lts/job/core/support/RetryScheduler.java
  27. 8 0
      job-core/src/main/java/com/lts/job/core/support/Singleton.java
  28. 5 3
      job-core/src/main/java/com/lts/job/core/support/SingletonBeanContext.java
  29. 2 1
      job-core/src/main/java/com/lts/job/store/mongo/AbstractMongoRepository.java
  30. 0 10
      job-core/src/test/java/com/lts/job/core/RegistryTest.java
  31. 0 41
      job-core/src/test/java/com/lts/job/core/support/RetrySchedulerTest.java
  32. 1 1
      job-example/pom.xml
  33. 1 1
      job-extensions/job-ext-spring/pom.xml
  34. 1 1
      job-extensions/pom.xml
  35. 1 1
      job-task-tracker/pom.xml
  36. 10 3
      job-task-tracker/src/main/java/com/lts/job/task/tracker/TaskTracker.java
  37. 9 2
      job-task-tracker/src/main/java/com/lts/job/task/tracker/processor/JobAskProcessor.java
  38. 12 5
      job-task-tracker/src/main/java/com/lts/job/task/tracker/processor/JobPushProcessor.java
  39. 1 1
      job-task-tracker/src/main/java/com/lts/job/task/tracker/processor/RemotingDispatcher.java
  40. 2 2
      job-task-tracker/src/main/java/com/lts/job/task/tracker/runner/DefaultRunnerFactory.java
  41. 6 6
      job-task-tracker/src/main/java/com/lts/job/task/tracker/runner/JobRunnerDelegate.java
  42. 25 14
      job-task-tracker/src/main/java/com/lts/job/task/tracker/runner/RunnerPool.java
  43. 0 45
      job-task-tracker/src/test/java/RunnerPoolTest.java
  44. 1 1
      job-tracker/pom.xml
  45. 12 13
      job-tracker/src/main/java/com/lts/job/tracker/JobTracker.java
  46. 10 14
      job-tracker/src/main/java/com/lts/job/tracker/channel/ChannelManager.java
  47. 3 1
      job-tracker/src/main/java/com/lts/job/tracker/processor/HeartBeatProcessor.java
  48. 6 3
      job-tracker/src/main/java/com/lts/job/tracker/processor/JobFinishedProcessor.java
  49. 5 1
      job-tracker/src/main/java/com/lts/job/tracker/processor/JobSubmitProcessor.java
  50. 8 4
      job-tracker/src/main/java/com/lts/job/tracker/processor/RemotingDispatcher.java
  51. 17 7
      job-tracker/src/main/java/com/lts/job/tracker/support/ClientNotifier.java
  52. 28 13
      job-tracker/src/main/java/com/lts/job/tracker/support/DeadJobChecker.java
  53. 16 6
      job-tracker/src/main/java/com/lts/job/tracker/support/FeedbackJobSendChecker.java
  54. 8 6
      job-tracker/src/main/java/com/lts/job/tracker/support/JobClientManager.java
  55. 19 12
      job-tracker/src/main/java/com/lts/job/tracker/support/JobController.java
  56. 21 8
      job-tracker/src/main/java/com/lts/job/tracker/support/JobNodeChangeListener.java
  57. 16 6
      job-tracker/src/main/java/com/lts/job/tracker/support/JobTrackerMasterChangeListener.java
  58. 0 19
      job-tracker/src/main/java/com/lts/job/tracker/support/RemotingServerManager.java
  59. 10 7
      job-tracker/src/main/java/com/lts/job/tracker/support/TaskTrackerManager.java
  60. 0 168
      job-tracker/src/test/java/RemotingTest.java
  61. 2 1
      pom.xml

+ 1 - 1
README.md

@@ -29,7 +29,7 @@ LTS 轻量级分布式任务调度框架(Light Task Schedule)
 ## 特性
 
 * 负载均衡:
-     * JobClient 和 TaskTracker会随机连接JobTracker节点组中的一个节点,实现JobTracker负载均衡。当连接上后,将一直保持连接这个节点,保持连接通道,知道这个节点不可用,减少每次都重新连接一个节点带来的性能开销。
+     * JobClient 和 TaskTracker会随机连接JobTracker节点组中的一个节点,实现JobTracker负载均衡。当连接上后,将一直保持连接这个节点,保持连接通道,直到这个节点不可用,减少每次都重新连接一个节点带来的性能开销。
      * JobTracker 分发任务时,是优先分配给最空闲的一个TaskTracker节点,实现TaskTracker节点的负载均衡。
 
 * 健壮性:

+ 15 - 0
job-admin/pom.xml

@@ -0,0 +1,15 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>light-task-schedule</artifactId>
+        <groupId>com.lts</groupId>
+        <version>1.3-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <packaging>war</packaging>
+    <artifactId>job-admin</artifactId>
+
+
+</project>

+ 1 - 1
job-client/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>light-task-schedule</artifactId>
         <groupId>com.lts</groupId>
-        <version>1.2-SNAPSHOT</version>
+        <version>1.3-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

+ 3 - 1
job-client/src/main/java/com/lts/job/client/JobClient.java

@@ -10,6 +10,7 @@ import com.lts.job.client.domain.Response;
 import com.lts.job.client.domain.ResponseCode;
 import com.lts.job.core.exception.JobTrackerNotFoundException;
 import com.lts.job.core.protocol.JobProtos;
+import com.lts.job.core.protocol.command.CommandWrapper;
 import com.lts.job.core.protocol.command.JobSubmitRequest;
 import com.lts.job.core.protocol.command.JobSubmitResponse;
 import com.lts.job.core.util.BatchUtils;
@@ -40,6 +41,7 @@ public class JobClient<T extends JobClientNode> extends AbstractClientNode<JobCl
     public JobClient() {
         // 设置默认节点组
         config.setNodeGroup(Constants.DEFAULT_NODE_JOB_CLIENT_GROUP);
+        
     }
 
     /**
@@ -55,7 +57,7 @@ public class JobClient<T extends JobClientNode> extends AbstractClientNode<JobCl
         final Response response = new Response();
 
         try {
-            JobSubmitRequest jobSubmitRequest = new JobSubmitRequest();
+            JobSubmitRequest jobSubmitRequest = application.getCommandWrapper().wrapper(new JobSubmitRequest());
             jobSubmitRequest.setJobs(jobs);
 
             RemotingCommand requestCommand = RemotingCommand.createRequestCommand(JobProtos.RequestCode.SUBMIT_JOB.code(), jobSubmitRequest);

+ 1 - 1
job-client/src/main/java/com/lts/job/client/RetryJobClient.java

@@ -20,7 +20,7 @@ public class RetryJobClient extends JobClient<JobClientNode> {
     @Override
     protected void nodeStart() {
 
-        retryScheduler = new RetryScheduler<Job>(30) {
+        retryScheduler = new RetryScheduler<Job>(application, 30) {
             @Override
             protected boolean isRemotingEnable() {
                 return isServerEnable();

+ 1 - 1
job-client/src/main/java/com/lts/job/client/processor/RemotingDispatcher.java

@@ -21,7 +21,7 @@ import static com.lts.job.core.protocol.JobProtos.RequestCode.valueOf;
  */
 public class RemotingDispatcher extends AbstractProcessor {
 
-    private static final Map<JobProtos.RequestCode, NettyRequestProcessor> processors = new HashMap<JobProtos.RequestCode, NettyRequestProcessor>();
+    private final Map<JobProtos.RequestCode, NettyRequestProcessor> processors = new HashMap<JobProtos.RequestCode, NettyRequestProcessor>();
 
     public RemotingDispatcher(RemotingClientDelegate remotingClient, JobFinishedHandler jobFinishedHandler) {
         super(remotingClient);

+ 1 - 1
job-core/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>light-task-schedule</artifactId>
         <groupId>com.lts</groupId>
-        <version>1.2-SNAPSHOT</version>
+        <version>1.3-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

+ 6 - 6
job-core/src/main/java/com/lts/job/core/cluster/AbstractClientNode.java

@@ -3,7 +3,6 @@ package com.lts.job.core.cluster;
 import com.lts.job.core.constant.Constants;
 import com.lts.job.core.remoting.HeartBeatMonitor;
 import com.lts.job.core.remoting.RemotingClientDelegate;
-import com.lts.job.core.support.Application;
 import com.lts.job.core.util.StringUtils;
 import com.lts.job.remoting.netty.NettyClientConfig;
 import com.lts.job.remoting.netty.NettyRemotingClient;
@@ -13,7 +12,7 @@ import java.util.concurrent.Executors;
 
 /**
  * @author Robert HG (254963746@qq.com) on 8/18/14.
- * 抽象 netty 客户端
+ *         抽象 netty 客户端
  */
 public abstract class AbstractClientNode<T extends Node> extends AbstractJobNode<T> {
 
@@ -21,7 +20,7 @@ public abstract class AbstractClientNode<T extends Node> extends AbstractJobNode
     private HeartBeatMonitor heartBeatMonitor;
 
     public AbstractClientNode() {
-        this.remotingClient = new RemotingClientDelegate(new NettyRemotingClient(getNettyClientConfig()));
+        this.remotingClient = new RemotingClientDelegate(new NettyRemotingClient(getNettyClientConfig()), application);
         this.heartBeatMonitor = new HeartBeatMonitor(remotingClient);
     }
 
@@ -53,19 +52,20 @@ public abstract class AbstractClientNode<T extends Node> extends AbstractJobNode
 
     public void setWorkThreads(int workThreads) {
         config.setWorkThreads(workThreads);
-        Application.setAttribute(Constants.KEY_AVAILABLE_THREADS, config.getWorkThreads());
+        application.setAttribute(Constants.KEY_AVAILABLE_THREADS, config.getWorkThreads());
     }
 
     /**
      * 设置节点组名
+     *
      * @param nodeGroup
      */
-    public void setNodeGroup(String nodeGroup){
+    public void setNodeGroup(String nodeGroup) {
         config.setNodeGroup(nodeGroup);
     }
 
     public void setJobInfoSavePath(String jobInfoSavePath) {
-        if(StringUtils.isNotEmpty(jobInfoSavePath)){
+        if (StringUtils.isNotEmpty(jobInfoSavePath)) {
             config.setJobInfoSavePath(jobInfoSavePath + "/.job");
         }
     }

+ 14 - 6
job-core/src/main/java/com/lts/job/core/cluster/AbstractJobNode.java

@@ -4,6 +4,7 @@ import com.lts.job.core.constant.Constants;
 import com.lts.job.core.domain.JobNodeConfig;
 import com.lts.job.core.listener.MasterNodeChangeListener;
 import com.lts.job.core.listener.NodeChangeListener;
+import com.lts.job.core.protocol.command.CommandWrapper;
 import com.lts.job.core.registry.NodeRegistry;
 import com.lts.job.core.support.Application;
 import com.lts.job.core.listener.MasterNodeElectionListener;
@@ -23,8 +24,11 @@ public abstract class AbstractJobNode<T extends Node> implements JobNode {
     protected NodeRegistry registry;
     protected T node;
     protected JobNodeConfig config;
+    protected Application application;
+    protected NodeFactory nodeFactory;
 
     public AbstractJobNode() {
+        application = new Application();
         config = new JobNodeConfig();
         config.setIdentity(StringUtils.generateUUID());
         config.setWorkThreads(Constants.AVAILABLE_PROCESSOR);
@@ -35,20 +39,24 @@ public abstract class AbstractJobNode<T extends Node> implements JobNode {
         config.setJobInfoSavePath(Constants.USER_HOME + "/.job");
         config.setClusterName(Constants.DEFAULT_CLUSTER_NAME);
         // 可用的线程数
-        Application.setAttribute(Constants.KEY_AVAILABLE_THREADS, config.getWorkThreads());
+        application.setAttribute(Constants.KEY_AVAILABLE_THREADS, config.getWorkThreads());
 
-        Application.Config = config;
+        application.setConfig(config);
+        application.setCommandWrapper(new CommandWrapper(application));
+        application.setNodeManager(new NodeManager(application));
+        nodeFactory = new NodeFactory(application);
+        application.setMasterElector(new MasterElector(application));
 
-        this.registry = new NodeRegistry();
+        this.registry = new NodeRegistry(application);
         // 用于master选举的监听器
-        addNodeChangeListener(new MasterNodeElectionListener());
+        addNodeChangeListener(new MasterNodeElectionListener(application));
     }
 
     final public void start() {
         try {
 
             Class<T> nodeClass = GenericsUtils.getSuperClassGenericType(this.getClass());
-            node = NodeFactory.create(nodeClass, config);
+            node = nodeFactory.create(nodeClass, config);
             config.setNodeType(node.getNodeType());
 
             LOGGER.info("当前节点配置:{}", config);
@@ -116,6 +124,6 @@ public abstract class AbstractJobNode<T extends Node> implements JobNode {
      * @param masterNodeChangeListener
      */
     public void addMasterNodeChangeListener(MasterNodeChangeListener masterNodeChangeListener) {
-        MasterElector.addMasterNodeChangeListener(masterNodeChangeListener);
+        application.getMasterElector().addMasterNodeChangeListener(masterNodeChangeListener);
     }
 }

+ 2 - 3
job-core/src/main/java/com/lts/job/core/cluster/AbstractServerNode.java

@@ -1,7 +1,6 @@
 package com.lts.job.core.cluster;
 
 import com.lts.job.core.remoting.RemotingServerDelegate;
-import com.lts.job.core.support.Application;
 import com.lts.job.remoting.netty.NettyRemotingServer;
 import com.lts.job.remoting.netty.NettyRequestProcessor;
 import com.lts.job.remoting.netty.NettyServerConfig;
@@ -22,7 +21,7 @@ public abstract class AbstractServerNode<T extends Node> extends AbstractJobNode
         // config 配置
         config.setListenPort(getListenPort());
 
-        remotingServer = new RemotingServerDelegate(new NettyRemotingServer(config));
+        remotingServer = new RemotingServerDelegate(new NettyRemotingServer(config), application);
 
         remotingServer.start();
 
@@ -55,7 +54,7 @@ public abstract class AbstractServerNode<T extends Node> extends AbstractJobNode
      * @return
      */
     protected Integer getListenPort() {
-        return Application.Config.getListenPort();
+        return application.getConfig().getListenPort();
     }
 
 }

+ 20 - 15
job-core/src/main/java/com/lts/job/core/cluster/MasterElector.java

@@ -11,31 +11,36 @@ import java.util.List;
 
 /**
  * @author Robert HG (254963746@qq.com) on 8/23/14.
- * Master 选举
- * 选举思想:
- * 选出每种节点中得master, 通过每个节点的创建时间来决定 (创建时间最小的便是master, 即最早创建的是master)
- * 当master 挂掉之后, 要重新选举
+ *         Master 选举
+ *         选举思想:
+ *         选出每种节点中得master, 通过每个节点的创建时间来决定 (创建时间最小的便是master, 即最早创建的是master)
+ *         当master 挂掉之后, 要重新选举
  */
 public class MasterElector {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(MasterElector.class);
 
-    private static List<MasterNodeChangeListener> masterNodeChangeListenerList;
-    private static Node master;
+    private Application application;
+    private List<MasterNodeChangeListener> masterNodeChangeListenerList;
+    private Node master;
 
-    public static void addMasterNodeChangeListener(MasterNodeChangeListener masterNodeChangeListener) {
+    public MasterElector(Application application) {
+        this.application = application;
+    }
+
+    public void addMasterNodeChangeListener(MasterNodeChangeListener masterNodeChangeListener) {
         if (masterNodeChangeListenerList == null) {
             masterNodeChangeListenerList = new ArrayList<MasterNodeChangeListener>();
         }
         masterNodeChangeListenerList.add(masterNodeChangeListener);
     }
 
-    public static Node getMaster() {
+    public Node getMaster() {
         return master;
     }
 
 
-    public static void addNodes(List<Node> nodes) {
+    public void addNodes(List<Node> nodes) {
         Node newMaster = null;
         for (Node node : nodes) {
             if (newMaster == null) {
@@ -46,13 +51,13 @@ public class MasterElector {
                 }
             }
         }
-        if(master != newMaster){
+        if (master != newMaster) {
             master = newMaster;
             notifyListener();
         }
     }
 
-    public static void addNode(Node newNode) {
+    public void addNode(Node newNode) {
         if (master == null) {
             master = newNode;
             notifyListener();
@@ -64,12 +69,12 @@ public class MasterElector {
         }
     }
 
-    public static void removeNode(Node removedNode) {
+    public void removeNode(Node removedNode) {
 
         if (master != null) {
             if (master.getIdentity().equals(removedNode.getIdentity())) {
                 // 如果挂掉的是master, 需要重新选举
-                List<Node> nodes = NodeManager.getNodeList(Application.Config.getNodeType(), Application.Config.getNodeGroup());
+                List<Node> nodes = application.getNodeManager().getNodeList(application.getConfig().getNodeType(), application.getConfig().getNodeGroup());
                 if (CollectionUtils.isNotEmpty(nodes)) {
                     Node newMaster = null;
                     for (Node node : nodes) {
@@ -88,9 +93,9 @@ public class MasterElector {
         }
     }
 
-    private static void notifyListener() {
+    private void notifyListener() {
         boolean isMaster = false;
-        if (Application.Config.getIdentity().equals(master.getIdentity())) {
+        if (application.getConfig().getIdentity().equals(master.getIdentity())) {
             LOGGER.info("Master节点变化为当前节点:{}", master.getPath());
             isMaster = true;
         } else {

+ 15 - 10
job-core/src/main/java/com/lts/job/core/cluster/NodeFactory.java

@@ -13,11 +13,16 @@ import org.slf4j.LoggerFactory;
 public class NodeFactory {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(NodeFactory.class);
-
-    private NodeFactory() {
+    
+    private Application application;
+    private PathParser pathParser;
+    
+    public NodeFactory(Application application) {
+        this.pathParser = new PathParser(application);
+        this.application = application;
     }
 
-    public static <T extends Node> T create(Class<T> clazz, JobNodeConfig config) {
+    public <T extends Node> T create(Class<T> clazz, JobNodeConfig config) {
         try {
             T node = clazz.newInstance();
             node.setIp(NetUtils.getLocalHost());
@@ -25,7 +30,7 @@ public class NodeFactory {
             node.setThreads(config.getWorkThreads());
             node.setPort(config.getListenPort());
             node.setIdentity(config.getIdentity());
-            node.setPath(PathUtils.getPath(node));
+            node.setPath(pathParser.getPath(node));
             return node;
         } catch (InstantiationException e ) {
             LOGGER.error(e.getMessage(), e);
@@ -35,16 +40,16 @@ public class NodeFactory {
         return null;
     }
 
-    public static <T extends Node> T create(Class<T> clazz) {
+    public <T extends Node> T create(Class<T> clazz) {
         try {
             T node = clazz.newInstance();
 
             node.setIp(NetUtils.getLocalHost());
-            node.setGroup(Application.Config.getNodeGroup());
-            node.setThreads(Application.Config.getWorkThreads());
-            node.setPort(Application.Config.getListenPort());
-            node.setIdentity(Application.Config.getIdentity());
-            node.setPath(PathUtils.getPath(node));
+            node.setGroup(application.getConfig().getNodeGroup());
+            node.setThreads(application.getConfig().getWorkThreads());
+            node.setPort(application.getConfig().getListenPort());
+            node.setIdentity(application.getConfig().getIdentity());
+            node.setPath(pathParser.getPath(node));
             return node;
         } catch (InstantiationException e) {
             LOGGER.error(e.getMessage(), e);

+ 15 - 9
job-core/src/main/java/com/lts/job/core/cluster/NodeManager.java

@@ -18,16 +18,22 @@ import java.util.concurrent.CopyOnWriteArrayList;
 public class NodeManager {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(NodeManager.class);
-    private static final ConcurrentHashMap<NodeType, List<Node>> NODES = new ConcurrentHashMap<NodeType, List<Node>>();
+    private final ConcurrentHashMap<NodeType, List<Node>> NODES = new ConcurrentHashMap<NodeType, List<Node>>();
+    
+    private Application application;
 
-    public static void addNode(Node node) {
+    public NodeManager(Application application) {
+        this.application = application;
+    }
+
+    public void addNode(Node node) {
 
         // JobClient 和 TaskTracker 只对自己同一个组的节点关注和JobTracker节点关注
         // JobTracker 要对所有节点都要关注
         if (    (NodeType.JOB_TRACKER.equals(node.getNodeType())) ||
-                ((Application.Config.getNodeType().equals(node.getNodeType())
-                && Application.Config.getNodeGroup().equals(node.getGroup()))
-                || (NodeType.JOB_TRACKER.equals(Application.Config.getNodeType())))
+                ((application.getConfig().getNodeType().equals(node.getNodeType())
+                && application.getConfig().getNodeGroup().equals(node.getGroup()))
+                || (NodeType.JOB_TRACKER.equals(application.getConfig().getNodeType())))
                 ) {
 
             List<Node> nodeList = NODES.get(node.getNodeType());
@@ -40,7 +46,7 @@ public class NodeManager {
         }
     }
 
-    public static List<Node> getNodeList(NodeType nodeType, final String nodeGroup) {
+    public List<Node> getNodeList(NodeType nodeType, final String nodeGroup) {
 
         List<Node> nodes = NODES.get(nodeType);
 
@@ -52,18 +58,18 @@ public class NodeManager {
         });
     }
 
-    public static List<Node> getNodeList(NodeType nodeType) {
+    public List<Node> getNodeList(NodeType nodeType) {
         return NODES.get(nodeType);
     }
 
-    public static void removeNode(Node node) {
+    public void removeNode(Node node) {
         List<Node> nodeList = NODES.get(node.getNodeType());
         if (nodeList.remove(node)) {
             LOGGER.info("删除节点{}", node);
         }
     }
 
-    public static void destroy() {
+    public void destroy() {
         NODES.clear();
     }
 }

+ 15 - 9
job-core/src/main/java/com/lts/job/core/cluster/PathUtils.java → job-core/src/main/java/com/lts/job/core/cluster/PathParser.java

@@ -1,6 +1,7 @@
 package com.lts.job.core.cluster;
 
 
+
 import com.lts.job.core.support.Application;
 
 import java.util.regex.Matcher;
@@ -13,15 +14,20 @@ import java.util.regex.Pattern;
  * <p/>
  * 节点 zookeeper上路径解析工具
  */
-public class PathUtils {
+public class PathParser {
+
+    private Application application;
+
+    public PathParser(Application application) {
+        this.application = application;
+    }
 
-    public static String getBasePath(){
+    public String getBasePath(){
         // 集群名字
-        String clusterName =  Application.Config.getClusterName();
-        return "/LTS/" + clusterName + "/NODES";
+        return "/LTS/" + application.getConfig().getClusterName() + "/NODES";
     }
 
-    public static Node parse(String fullPath) {
+    public Node parse(String fullPath) {
         Node node = new Node();
 
         node.setPath(fullPath);
@@ -69,13 +75,13 @@ public class PathUtils {
         return node;
     }
 
-    public static NodeType parseNodeType(String path) {
+    public NodeType parseNodeType(String path) {
         String nodeType = getMatcher(getBasePath() + "/(.*)/", path);
         return NodeType.valueOf(nodeType);
     }
 
 
-    public static String getPath(Node node) {
+    public String getPath(Node node) {
         StringBuilder path = new StringBuilder();
         path.append(getBasePath())
                 .append("/")
@@ -116,11 +122,11 @@ public class PathUtils {
         return path.toString();
     }
 
-    public static String getPath(NodeType nodeType) {
+    public String getPath(NodeType nodeType) {
         return getBasePath() + "/" + nodeType;
     }
 
-    private static String getMatcher(String regex, String source) {
+    private String getMatcher(String regex, String source) {
         Pattern pattern = Pattern.compile(regex);
         Matcher matcher = pattern.matcher(source);
         while (matcher.find()) {

+ 17 - 6
job-core/src/main/java/com/lts/job/core/constant/Constants.java

@@ -3,7 +3,7 @@ package com.lts.job.core.constant;
 
 /**
  * @author Robert HG (254963746@qq.com) on 7/24/14.
- * 一些配置常量
+ *         一些配置常量
  */
 public interface Constants {
 
@@ -18,16 +18,27 @@ public interface Constants {
     public static final String DEFAULT_CLUSTER_NAME = "defaultCluster";
 
     // 默认JobTracker节点组
-    public static final String DEFAULT_NODE_JOB_TRACKER_GROUP = "JOB_TRACKER_GROUP";
+    public static final String DEFAULT_NODE_JOB_TRACKER_GROUP = "jobTrackerGroup";
     // 默认JobClient节点组
-    public static final String DEFAULT_NODE_JOB_CLIENT_GROUP = "TASK_TRACKER_GROUP";
+    public static final String DEFAULT_NODE_JOB_CLIENT_GROUP = "jobClientGroup";
     // 默认TaskTracker节点组
-    public static final String DEFAULT_NODE_TASK_TRACKER_GROUP = "TASK_TRACKER_GROUP";
+    public static final String DEFAULT_NODE_TASK_TRACKER_GROUP = "taskTrackerGroup";
 
     public static final String KEY_AVAILABLE_THREADS = "availableThreads";
 
-    public static final String JOB_RUNNING_CLASS = "JOB_RUNNING_CLASS";
+    public static final String JOB_RUNNING_CLASS = "JobRunningClass";
 
-    public static final String RUNNER_FACTORY = "RUNNER_FACTORY";
+    public static final String RUNNER_FACTORY = "RunnerFactory";
+
+    public static final String DEAD_JOB_CHECKER = "DeadJobChecker";
+
+    public static final String REMOTING_SERVER = "RemotingServer";
+
+    public static final String JOB_CLIENT_MANAGER = "JobClientManager";
+    public static final String TASK_TRACKER_MANAGER = "TaskTrackerManager";
+
+    public static final String TASK_TRACKER_RUNNER_POOL = "TaskTrackerRunnerPool";
+
+    public static final String CHANNEL_MANAGER = "ChannelManager";
 
 }

+ 11 - 6
job-core/src/main/java/com/lts/job/core/listener/MasterNodeElectionListener.java

@@ -1,6 +1,5 @@
 package com.lts.job.core.listener;
 
-import com.lts.job.core.cluster.MasterElector;
 import com.lts.job.core.cluster.Node;
 import com.lts.job.core.support.Application;
 
@@ -13,17 +12,23 @@ import java.util.List;
  */
 public class MasterNodeElectionListener implements NodeChangeListener {
 
+    private Application application;
+
+    public MasterNodeElectionListener(Application application) {
+        this.application = application;
+    }
+
     @Override
     public void addNode(Node node) {
         if (isSameGroup(node)) {
-            MasterElector.addNode(node);
+            application.getMasterElector().addNode(node);
         }
     }
 
     @Override
     public void removeNode(Node node) {
         if (isSameGroup(node)) {
-            MasterElector.removeNode(node);
+            application.getMasterElector().removeNode(node);
         }
     }
 
@@ -37,7 +42,7 @@ public class MasterNodeElectionListener implements NodeChangeListener {
             }
         }
         if(groupNodes.size() > 0){
-            MasterElector.addNodes(groupNodes);
+            application.getMasterElector().addNodes(groupNodes);
         }
     }
 
@@ -47,7 +52,7 @@ public class MasterNodeElectionListener implements NodeChangeListener {
      * @return
      */
     private boolean isSameGroup(Node node){
-        return node.getNodeType().equals(Application.Config.getNodeType())
-                && node.getGroup().equals(Application.Config.getNodeGroup());
+        return node.getNodeType().equals(application.getConfig().getNodeType())
+                && node.getGroup().equals(application.getConfig().getNodeGroup());
     }
 }

+ 0 - 7
job-core/src/main/java/com/lts/job/core/protocol/command/AbstractCommandBody.java

@@ -43,13 +43,6 @@ public class AbstractCommandBody implements CommandBody {
     @Nullable
     private Map<String, Object> extParams;
 
-    public AbstractCommandBody() {
-        this.nodeGroup = Application.Config.getNodeGroup();
-        this.nodeType = Application.Config.getNodeType().name();
-        this.identity = Application.Config.getIdentity();
-        this.availableThreads = Application.getAttribute(Constants.KEY_AVAILABLE_THREADS);
-    }
-
     public Integer getAvailableThreads() {
         return availableThreads;
     }

+ 25 - 0
job-core/src/main/java/com/lts/job/core/protocol/command/CommandWrapper.java

@@ -0,0 +1,25 @@
+package com.lts.job.core.protocol.command;
+
+import com.lts.job.core.constant.Constants;
+import com.lts.job.core.support.Application;
+
+/**
+ * Created by hugui on 3/13/15.
+ */
+public class CommandWrapper {
+
+    private Application application;
+
+    public CommandWrapper(Application application) {
+        this.application = application;
+    }
+
+    public <T extends AbstractCommandBody> T wrapper(T commandBody) {
+        commandBody.setNodeGroup(application.getConfig().getNodeGroup());
+        commandBody.setNodeType(application.getConfig().getNodeType().name());
+        commandBody.setIdentity(application.getConfig().getIdentity());
+        commandBody.setAvailableThreads((Integer) application.getAttribute(Constants.KEY_AVAILABLE_THREADS));
+        return commandBody;
+    }
+
+}

+ 1 - 1
job-core/src/main/java/com/lts/job/core/protocol/command/HeartBeatRequest.java

@@ -2,7 +2,7 @@ package com.lts.job.core.protocol.command;
 
 /**
  * @author Robert HG (254963746@qq.com) on 7/24/14.
- * 节点之间心跳的 头信息
+ *         节点之间心跳的 头信息
  */
 public class HeartBeatRequest extends AbstractCommandBody {
 

+ 17 - 14
job-core/src/main/java/com/lts/job/core/registry/NodeRegistry.java

@@ -1,9 +1,8 @@
 package com.lts.job.core.registry;
 
-import com.lts.job.core.cluster.NodeManager;
 import com.lts.job.core.cluster.Node;
 import com.lts.job.core.cluster.NodeType;
-import com.lts.job.core.cluster.PathUtils;
+import com.lts.job.core.cluster.PathParser;
 import com.lts.job.core.listener.NodeChangeListener;
 import com.lts.job.core.support.Application;
 import com.lts.job.core.util.CollectionUtils;
@@ -30,9 +29,13 @@ public class NodeRegistry implements Registry {
     private final ConcurrentHashMap<String/*path*/, List<String/*children*/>> NODE_CHILDREN_MAP = new ConcurrentHashMap<String, List<String>>();
     private ChildChangeListener listener;
     private List<NodeChangeListener> nodeChangeListeners;
+    private Application application;
+    private PathParser pathParser;
 
-    public NodeRegistry() {
-        listener = new ChildChangeListener();
+    public NodeRegistry(Application application) {
+        this.listener = new ChildChangeListener();
+        this.application = application;
+        this.pathParser = new PathParser(application);
     }
 
     /**
@@ -56,7 +59,7 @@ public class NodeRegistry implements Registry {
 
     @Override
     public void register(final Node node) {
-        zkClient = new ZkClientZookeeperClient(Application.Config.getZookeeperAddress());
+        zkClient = new ZkClientZookeeperClient(application.getConfig().getZookeeperAddress());
         zkClient.addStateListener(new StateListener() {
             @Override
             public void stateChanged(int state) {
@@ -84,7 +87,7 @@ public class NodeRegistry implements Registry {
         if (CollectionUtils.isNotEmpty(listenNodeTypes)) {
 
             for (NodeType nodeType : listenNodeTypes) {
-                String listenNodePath = PathUtils.getPath(nodeType);
+                String listenNodePath = pathParser.getPath(nodeType);
                 // 为自己关注的 节点 添加监听
                 zkClient.addChildListener(listenNodePath, listener);
 
@@ -93,9 +96,9 @@ public class NodeRegistry implements Registry {
                 if (CollectionUtils.isNotEmpty(children)) {
                     List<Node> listenedNodes = new ArrayList<Node>();
                     for (String child : children) {
-                        Node listenedNode = PathUtils.parse(listenNodePath + "/" + child);
+                        Node listenedNode = pathParser.parse(listenNodePath + "/" + child);
                         listenedNodes.add(listenedNode);
-                        NodeManager.addNode(listenedNode);
+                        application.getNodeManager().addNode(listenedNode);
                     }
                     if (CollectionUtils.isNotEmpty(nodeChangeListeners)) {
                         for (NodeChangeListener nodeChangeListener : nodeChangeListeners) {
@@ -114,9 +117,9 @@ public class NodeRegistry implements Registry {
         List<NodeType> listenNodeTypes = node.getListenNodeTypes();
         if (CollectionUtils.isNotEmpty(listenNodeTypes)) {
             for (NodeType nodeType : listenNodeTypes) {
-                zkClient.removeChildListener(PathUtils.getPath(nodeType), listener);
+                zkClient.removeChildListener(pathParser.getPath(nodeType), listener);
 
-                NodeManager.destroy();
+                application.getNodeManager().destroy();
             }
         }
     }
@@ -149,9 +152,9 @@ public class NodeRegistry implements Registry {
 
             if (CollectionUtils.isNotEmpty(addChildren)) {
                 for (String child : addChildren) {
-                    Node node = PathUtils.parse(path + "/" + child);
+                    Node node = pathParser.parse(path + "/" + child);
 
-                    NodeManager.addNode(node);
+                    application.getNodeManager().addNode(node);
                     if (CollectionUtils.isNotEmpty(nodeChangeListeners)) {
                         for (NodeChangeListener nodeChangeListener : nodeChangeListeners) {
                             nodeChangeListener.addNode(node);
@@ -162,8 +165,8 @@ public class NodeRegistry implements Registry {
 
             if (CollectionUtils.isNotEmpty(decChildren)) {
                 for (String child : decChildren) {
-                    Node node = PathUtils.parse(path + "/" + child);
-                    NodeManager.removeNode(node);
+                    Node node = pathParser.parse(path + "/" + child);
+                    application.getNodeManager().removeNode(node);
                     if (CollectionUtils.isNotEmpty(nodeChangeListeners)) {
                         for (NodeChangeListener nodeChangeListener : nodeChangeListeners) {
                             nodeChangeListener.removeNode(node);

+ 3 - 1
job-core/src/main/java/com/lts/job/core/remoting/HeartBeatMonitor.java

@@ -6,6 +6,8 @@ import com.lts.job.core.support.Application;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -30,7 +32,7 @@ public class HeartBeatMonitor {
     public void start() {
         // 设置 JobClient 心跳10s一次,TaskTracker 5s一次
         int delay = 5;
-        if(NodeType.CLIENT.equals(Application.Config.getNodeType())){
+        if(NodeType.CLIENT.equals(remotingClient.getApplication().getConfig().getNodeType())){
             delay = 10;
         }
         HEART_BEAT_EXECUTOR_SERVICE.scheduleWithFixedDelay(

+ 4 - 2
job-core/src/main/java/com/lts/job/core/remoting/HeartBeater.java

@@ -26,10 +26,12 @@ public class HeartBeater {
      */
     public static boolean beat(RemotingClientDelegate remotingClient, String addr) {
 
-        RemotingCommand request = RemotingCommand.createRequestCommand(JobProtos.RequestCode.HEART_BEAT.code(), new HeartBeatRequest());
+        HeartBeatRequest commandBody = remotingClient.getApplication().getCommandWrapper().wrapper(new HeartBeatRequest());
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(JobProtos.RequestCode.HEART_BEAT.code(), commandBody);
         RemotingCommand response = null;
         try {
-            response = remotingClient.getNettyClient().invokeSync(addr, request, Application.Config.getInvokeTimeoutMillis());
+            response = remotingClient.getNettyClient().invokeSync(addr, request, remotingClient.getApplication().getConfig().getInvokeTimeoutMillis());
         } catch (Exception e) {
             if(LOGGER.isDebugEnabled()){
                 LOGGER.debug(e.getMessage(), e);

+ 13 - 8
job-core/src/main/java/com/lts/job/core/remoting/RemotingClientDelegate.java

@@ -1,6 +1,5 @@
 package com.lts.job.core.remoting;
 
-import com.lts.job.core.cluster.NodeManager;
 import com.lts.job.core.cluster.Node;
 import com.lts.job.core.cluster.NodeType;
 import com.lts.job.core.exception.JobTrackerNotFoundException;
@@ -31,12 +30,14 @@ public class RemotingClientDelegate {
     private NettyRemotingClient remotingClient;
     // 连接的jobTracker node
     private Node stickyJobTrackerNode;
+    private Application application;
 
     // JobTracker 是否可用
     private boolean serverEnable = false;
 
-    public RemotingClientDelegate(NettyRemotingClient remotingClient) {
+    public RemotingClientDelegate(NettyRemotingClient remotingClient, Application application) {
         this.remotingClient = remotingClient;
+        this.application = application;
     }
 
     public Node getStickyJobTrackerNode() throws JobTrackerNotFoundException {
@@ -53,7 +54,7 @@ public class RemotingClientDelegate {
             return;
         }
 
-        List<Node> jobTrackerNodes = NodeManager.getNodeList(NodeType.JOB_TRACKER);
+        List<Node> jobTrackerNodes = application.getNodeManager().getNodeList(NodeType.JOB_TRACKER);
         if (CollectionUtils.isEmpty(jobTrackerNodes)) {
             throw new JobTrackerNotFoundException("没有找到可用的JobTracker节点!");
         }
@@ -73,7 +74,7 @@ public class RemotingClientDelegate {
      */
     private Node getJobTackerNodeByRandom(List<Node> jobTrackerNodes) {
 
-        if(jobTrackerNodes.size() == 0){
+        if (jobTrackerNodes.size() == 0) {
             return null;
         }
 
@@ -86,7 +87,7 @@ public class RemotingClientDelegate {
 
         if (HeartBeater.beat(this, jobTackerNode.getAddress())) {
             return jobTackerNode;
-        }else{
+        } else {
             // 如果这个节点不可用,那么移除
             jobTrackerNodes.remove(index);
         }
@@ -112,7 +113,7 @@ public class RemotingClientDelegate {
 
             String addr = getStickyJobTrackerNode().getAddress();
 
-            RemotingCommand response = remotingClient.invokeSync(addr, request, Application.Config.getInvokeTimeoutMillis());
+            RemotingCommand response = remotingClient.invokeSync(addr, request, application.getConfig().getInvokeTimeoutMillis());
             this.serverEnable = true;
             return response;
 
@@ -153,7 +154,7 @@ public class RemotingClientDelegate {
 
             String addr = getStickyJobTrackerNode().getAddress();
 
-            remotingClient.invokeAsync(addr, request, Application.Config.getInvokeTimeoutMillis(), invokeCallback);
+            remotingClient.invokeAsync(addr, request, application.getConfig().getInvokeTimeoutMillis(), invokeCallback);
             this.serverEnable = true;
 
         } catch (RemotingCommandFieldCheckException e) {
@@ -192,7 +193,7 @@ public class RemotingClientDelegate {
 
             String addr = getStickyJobTrackerNode().getAddress();
 
-            remotingClient.invokeOneway(addr, request, Application.Config.getInvokeTimeoutMillis());
+            remotingClient.invokeOneway(addr, request, application.getConfig().getInvokeTimeoutMillis());
             this.serverEnable = true;
 
         } catch (RemotingCommandFieldCheckException e) {
@@ -241,4 +242,8 @@ public class RemotingClientDelegate {
     public NettyRemotingClient getNettyClient() {
         return remotingClient;
     }
+
+    public Application getApplication() {
+        return application;
+    }
 }

+ 9 - 4
job-core/src/main/java/com/lts/job/core/remoting/RemotingServerDelegate.java

@@ -18,9 +18,11 @@ import java.util.concurrent.ExecutorService;
 public class RemotingServerDelegate {
 
     private RemotingServer remotingServer;
+    private Application application;
 
-    public RemotingServerDelegate(RemotingServer remotingServer) {
+    public RemotingServerDelegate(RemotingServer remotingServer, Application application) {
         this.remotingServer = remotingServer;
+        this.application = application;
     }
 
     public void start() {
@@ -44,7 +46,7 @@ public class RemotingServerDelegate {
 
             request.checkCommandBody();
 
-            return remotingServer.invokeSync(channel, request, Application.Config.getInvokeTimeoutMillis());
+            return remotingServer.invokeSync(channel, request, application.getConfig().getInvokeTimeoutMillis());
         } catch (RemotingCommandFieldCheckException e) {
             throw e;
         } catch (Throwable t) {
@@ -57,7 +59,7 @@ public class RemotingServerDelegate {
 
             request.checkCommandBody();
 
-            remotingServer.invokeAsync(channel, request, Application.Config.getInvokeTimeoutMillis(), invokeCallback);
+            remotingServer.invokeAsync(channel, request, application.getConfig().getInvokeTimeoutMillis(), invokeCallback);
         } catch (RemotingCommandFieldCheckException e) {
             throw e;
         } catch (Throwable t) {
@@ -70,7 +72,7 @@ public class RemotingServerDelegate {
 
             request.checkCommandBody();
 
-            remotingServer.invokeOneway(channel, request, Application.Config.getInvokeTimeoutMillis());
+            remotingServer.invokeOneway(channel, request, application.getConfig().getInvokeTimeoutMillis());
         } catch (RemotingCommandFieldCheckException e) {
             throw e;
         } catch (Throwable t) {
@@ -83,4 +85,7 @@ public class RemotingServerDelegate {
         remotingServer.shutdown();
     }
 
+    public Application getApplication() {
+        return application;
+    }
 }

+ 41 - 4
job-core/src/main/java/com/lts/job/core/support/Application.java

@@ -1,6 +1,9 @@
 package com.lts.job.core.support;
 
+import com.lts.job.core.cluster.MasterElector;
+import com.lts.job.core.cluster.NodeManager;
 import com.lts.job.core.domain.JobNodeConfig;
+import com.lts.job.core.protocol.command.CommandWrapper;
 
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -10,13 +13,13 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 public class Application {
 
-    private static final ConcurrentHashMap<String, Object> keyValue = new ConcurrentHashMap<String, Object>();
+    private final ConcurrentHashMap<String, Object> keyValue = new ConcurrentHashMap<String, Object>();
 
-    public static void setAttribute(String key, Object value) {
+    public void setAttribute(String key, Object value) {
         keyValue.put(key, value);
     }
 
-    public static <T> T getAttribute(String key) {
+    public <T> T getAttribute(String key) {
         Object object = keyValue.get(key);
         if (object == null) {
             return null;
@@ -25,6 +28,40 @@ public class Application {
         return (T) object;
     }
 
-    public static JobNodeConfig Config;
+    private JobNodeConfig Config;
+    private NodeManager nodeManager;
+    private MasterElector masterElector;
+    private CommandWrapper commandWrapper;
 
+    public CommandWrapper getCommandWrapper() {
+        return commandWrapper;
+    }
+
+    public void setCommandWrapper(CommandWrapper commandWrapper) {
+        this.commandWrapper = commandWrapper;
+    }
+
+    public JobNodeConfig getConfig() {
+        return Config;
+    }
+
+    public void setConfig(JobNodeConfig config) {
+        Config = config;
+    }
+
+    public NodeManager getNodeManager() {
+        return nodeManager;
+    }
+
+    public void setNodeManager(NodeManager nodeManager) {
+        this.nodeManager = nodeManager;
+    }
+
+    public MasterElector getMasterElector() {
+        return masterElector;
+    }
+
+    public void setMasterElector(MasterElector masterElector) {
+        this.masterElector = masterElector;
+    }
 }

+ 5 - 5
job-core/src/main/java/com/lts/job/core/support/RetryScheduler.java

@@ -33,17 +33,17 @@ public abstract class RetryScheduler<T> {
     // 批量发送的消息数
     private int batchSize = 5;
 
-    public RetryScheduler() {
+    public RetryScheduler(Application application) {
         try {
-            levelDBStore = new LevelDBStore(Application.Config.getFilePath());
-            dbLock = new FileAccessor(Application.Config.getFilePath() + "___db.lock");
+            levelDBStore = new LevelDBStore(application.getConfig().getFilePath());
+            dbLock = new FileAccessor(application.getConfig().getFilePath() + "___db.lock");
         } catch (FileException e) {
             throw new RuntimeException(e);
         }
     }
 
-    protected RetryScheduler(int batchSize) {
-        this();
+    protected RetryScheduler(Application application, int batchSize) {
+        this(application);
         this.batchSize = batchSize;
     }
 

+ 8 - 0
job-core/src/main/java/com/lts/job/core/support/Singleton.java

@@ -0,0 +1,8 @@
+package com.lts.job.core.support;
+
+/**
+ * 如果想使用{@link com.lts.job.core.support.SingletonBeanContext } 必须实现这个接口
+ * Created by hugui on 3/13/15.
+ */
+public interface Singleton {
+}

+ 5 - 3
job-core/src/main/java/com/lts/job/core/support/SingletonBeanContext.java

@@ -4,8 +4,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * @author Robert HG (254963746@qq.com) on 8/18/14.
@@ -28,7 +26,11 @@ public class SingletonBeanContext {
                 }
                 try {
                     bean = clazz.newInstance();
-                    beanMap.put(clazz, bean);
+                    if (bean instanceof Singleton) {
+                        beanMap.put(clazz, bean);
+                    } else {
+                        LOGGER.error("{} is not implements {}", clazz.getName(), Singleton.class.getName());
+                    }
                 } catch (InstantiationException e) {
                     LOGGER.error(e.getMessage(), e);
                 } catch (IllegalAccessException e) {

+ 2 - 1
job-core/src/main/java/com/lts/job/store/mongo/AbstractMongoRepository.java

@@ -3,6 +3,7 @@ package com.lts.job.store.mongo;
 import com.google.code.morphia.Datastore;
 import com.google.code.morphia.Key;
 import com.google.code.morphia.query.Query;
+import com.lts.job.core.support.Singleton;
 import com.mongodb.WriteResult;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
@@ -13,7 +14,7 @@ import java.util.Map;
  * @author Robert HG (254963746@qq.com) on 8/8/14.
  *         通用的mongo存储类
  */
-public abstract class AbstractMongoRepository<T> {
+public abstract class AbstractMongoRepository<T> implements Singleton {
 
     protected static Datastore ds;
 

+ 0 - 10
job-core/src/test/java/com/lts/job/core/RegistryTest.java

@@ -19,16 +19,6 @@ public class RegistryTest {
     @Test
     public void test_registry() throws IOException {
 
-        String address = "localhost:2181";
-        NodeRegistry registry = new NodeRegistry();
-
-        List<NodeType> nodeTypeList = new ArrayList<NodeType>();
-        nodeTypeList.add(NodeType.JOB_TRACKER);
-
-        JobClientNode node = NodeFactory.create(JobClientNode.class, new JobNodeConfig());
-        registry.register(node);
-
-        System.in.read();
     }
 
     public class JobClientNode extends Node {

+ 0 - 41
job-core/src/test/java/com/lts/job/core/support/RetrySchedulerTest.java

@@ -1,41 +0,0 @@
-package com.lts.job.core.support;
-
-import com.lts.job.core.cluster.NodeType;
-import com.lts.job.core.constant.Constants;
-import com.lts.job.core.domain.Job;
-import com.lts.job.core.domain.JobNodeConfig;
-import org.junit.Test;
-
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-public class RetrySchedulerTest {
-
-    @Test
-    public void testInSchedule() throws Exception {
-
-        JobNodeConfig config = new JobNodeConfig();
-        config.setJobInfoSavePath(Constants.USER_HOME + "/.job");
-        config.setNodeType(NodeType.CLIENT);
-        config.setNodeGroup("TEST");
-        Application.Config = config;
-
-        RetryScheduler<Job> retryScheduler = new RetryScheduler<Job>() {
-            @Override
-            protected boolean isRemotingEnable() {
-                return true;
-            }
-
-            @Override
-            protected boolean retry(List<Job> list) {
-                for (Job job : list) {
-                    System.out.println(job);
-                }
-                return true;
-            }
-        };
-
-        retryScheduler.inSchedule("11111", "232323");
-    }
-}

+ 1 - 1
job-example/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>light-task-schedule</artifactId>
         <groupId>com.lts</groupId>
-        <version>1.2-SNAPSHOT</version>
+        <version>1.3-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>job-example</artifactId>

+ 1 - 1
job-extensions/job-ext-spring/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>job-extensions</artifactId>
         <groupId>com.lts</groupId>
-        <version>1.2-SNAPSHOT</version>
+        <version>1.3-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

+ 1 - 1
job-extensions/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>light-task-schedule</artifactId>
         <groupId>com.lts</groupId>
-        <version>1.2-SNAPSHOT</version>
+        <version>1.3-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <packaging>pom</packaging>

+ 1 - 1
job-task-tracker/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>light-task-schedule</artifactId>
         <groupId>com.lts</groupId>
-        <version>1.2-SNAPSHOT</version>
+        <version>1.3-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

+ 10 - 3
job-task-tracker/src/main/java/com/lts/job/task/tracker/TaskTracker.java

@@ -6,10 +6,11 @@ import com.lts.job.core.support.Application;
 import com.lts.job.remoting.netty.NettyRequestProcessor;
 import com.lts.job.task.tracker.domain.TaskTrackerNode;
 import com.lts.job.task.tracker.processor.RemotingDispatcher;
+import com.lts.job.task.tracker.runner.RunnerPool;
 
 /**
  * @author Robert HG (254963746@qq.com) on 8/14/14.
- * 任务执行节点
+ *         任务执行节点
  */
 public class TaskTracker extends AbstractClientNode<TaskTrackerNode> {
 
@@ -18,12 +19,18 @@ public class TaskTracker extends AbstractClientNode<TaskTrackerNode> {
         config.setNodeGroup(Constants.DEFAULT_NODE_TASK_TRACKER_GROUP);
     }
 
+    @Override
+    protected void nodeStart() {
+        application.setAttribute(Constants.TASK_TRACKER_RUNNER_POOL, new RunnerPool(application));
+        super.nodeStart();
+    }
+
     @Override
     protected NettyRequestProcessor getDefaultProcessor() {
         return new RemotingDispatcher(remotingClient);
     }
 
-    public void setJobRunnerClass(Class clazz){
-        Application.setAttribute(Constants.JOB_RUNNING_CLASS, clazz);
+    public void setJobRunnerClass(Class clazz) {
+        application.setAttribute(Constants.JOB_RUNNING_CLASS, clazz);
     }
 }

+ 9 - 2
job-task-tracker/src/main/java/com/lts/job/task/tracker/processor/JobAskProcessor.java

@@ -1,5 +1,7 @@
 package com.lts.job.task.tracker.processor;
 
+import com.lts.job.core.constant.Constants;
+import com.lts.job.core.protocol.command.CommandWrapper;
 import com.lts.job.core.protocol.command.JobAskRequest;
 import com.lts.job.core.protocol.command.JobAskResponse;
 import com.lts.job.core.remoting.RemotingClientDelegate;
@@ -16,8 +18,13 @@ import java.util.List;
  */
 public class JobAskProcessor extends AbstractProcessor {
 
+    private RunnerPool runnerPool;
+    private CommandWrapper commandWrapper;
+
     protected JobAskProcessor(RemotingClientDelegate remotingClient) {
         super(remotingClient);
+        this.runnerPool = remotingClient.getApplication().getAttribute(Constants.TASK_TRACKER_RUNNER_POOL);
+        this.commandWrapper = remotingClient.getApplication().getCommandWrapper();
     }
 
     @Override
@@ -27,9 +34,9 @@ public class JobAskProcessor extends AbstractProcessor {
 
         List<String> jobIds = requestBody.getJobIds();
 
-        List<String> notExistJobIds = RunnerPool.RunningJobManager.getNotExists(jobIds);
+        List<String> notExistJobIds = runnerPool.getRunningJobManager().getNotExists(jobIds);
 
-        JobAskResponse responseBody = new JobAskResponse();
+        JobAskResponse responseBody = commandWrapper.wrapper(new JobAskResponse());
         responseBody.setJobIds(notExistJobIds);
 
         return RemotingCommand.createResponseCommand(RemotingProtos.ResponseCode.SUCCESS.code(), "查询成功", responseBody);

+ 12 - 5
job-task-tracker/src/main/java/com/lts/job/task/tracker/processor/JobPushProcessor.java

@@ -1,14 +1,16 @@
 package com.lts.job.task.tracker.processor;
 
+import com.lts.job.core.constant.Constants;
 import com.lts.job.core.domain.Job;
 import com.lts.job.core.domain.JobResult;
 import com.lts.job.core.exception.JobTrackerNotFoundException;
 import com.lts.job.core.protocol.JobProtos;
+import com.lts.job.core.protocol.command.CommandWrapper;
 import com.lts.job.core.protocol.command.JobFinishedRequest;
 import com.lts.job.core.protocol.command.JobPushRequest;
 import com.lts.job.core.remoting.RemotingClientDelegate;
+import com.lts.job.core.support.Application;
 import com.lts.job.core.support.RetryScheduler;
-import com.lts.job.core.support.SingletonBeanContext;
 import com.lts.job.remoting.InvokeCallback;
 import com.lts.job.remoting.exception.RemotingCommandException;
 import com.lts.job.remoting.exception.RemotingCommandFieldCheckException;
@@ -36,11 +38,17 @@ public class JobPushProcessor extends AbstractProcessor {
 
     private RetryScheduler retryScheduler;
     private JobRunnerCallback jobRunnerCallback;
+    private Application application;
+    private RunnerPool runnerPool;
+    private CommandWrapper commandWrapper;
 
     protected JobPushProcessor(final RemotingClientDelegate remotingClient) {
         super(remotingClient);
 
-        retryScheduler = new RetryScheduler<JobResult>(3) {
+        this.application = remotingClient.getApplication();
+        this.commandWrapper = application.getCommandWrapper();
+        this.runnerPool = application.getAttribute(Constants.TASK_TRACKER_RUNNER_POOL);
+        retryScheduler = new RetryScheduler<JobResult>(application, 3) {
             @Override
             protected boolean isRemotingEnable() {
                 return remotingClient.isServerEnable();
@@ -67,7 +75,6 @@ public class JobPushProcessor extends AbstractProcessor {
         final Job job = requestBody.getJob();
 
         try {
-            RunnerPool runnerPool = SingletonBeanContext.getBean(RunnerPool.class);
             runnerPool.execute(job, jobRunnerCallback);
         } catch (NoAvailableJobRunnerException e) {
             // 任务推送失败
@@ -90,7 +97,7 @@ public class JobPushProcessor extends AbstractProcessor {
             jobResult.setJob(response.getJob());
             jobResult.setSuccess(response.isSuccess());
             jobResult.setMsg(response.getMsg());
-            JobFinishedRequest requestBody = new JobFinishedRequest();
+            JobFinishedRequest requestBody = commandWrapper.wrapper(new JobFinishedRequest());
             requestBody.addJobResult(jobResult);
             requestBody.setReceiveNewJob(response.isReceiveNewJob());     // 设置可以接受新任务
 
@@ -153,7 +160,7 @@ public class JobPushProcessor extends AbstractProcessor {
      */
     private boolean sendJobResults(List<JobResult> jobResults) {
         // 发送消息给 JobTracker
-        JobFinishedRequest requestBody = new JobFinishedRequest();
+        JobFinishedRequest requestBody = commandWrapper.wrapper(new JobFinishedRequest());
         requestBody.setJobResults(jobResults);
         requestBody.setReSend(true);
 

+ 1 - 1
job-task-tracker/src/main/java/com/lts/job/task/tracker/processor/RemotingDispatcher.java

@@ -19,7 +19,7 @@ import static com.lts.job.core.protocol.JobProtos.RequestCode.*;
  */
 public class RemotingDispatcher extends AbstractProcessor {
 
-    private static final Map<JobProtos.RequestCode, NettyRequestProcessor> processors = new HashMap<JobProtos.RequestCode, NettyRequestProcessor>();
+    private final Map<JobProtos.RequestCode, NettyRequestProcessor> processors = new HashMap<JobProtos.RequestCode, NettyRequestProcessor>();
 
     public RemotingDispatcher(RemotingClientDelegate remotingClient) {
         super(remotingClient);

+ 2 - 2
job-task-tracker/src/main/java/com/lts/job/task/tracker/runner/DefaultRunnerFactory.java

@@ -13,8 +13,8 @@ public class DefaultRunnerFactory implements RunnerFactory {
     private static final Logger LOGGER = LoggerFactory.getLogger(RunnerFactory.class);
     private Class clazz;
 
-    public DefaultRunnerFactory() {
-        clazz = Application.getAttribute(Constants.JOB_RUNNING_CLASS);
+    public DefaultRunnerFactory(Application application) {
+        clazz = application.getAttribute(Constants.JOB_RUNNING_CLASS);
     }
 
     public JobRunner newRunner() {

+ 6 - 6
job-task-tracker/src/main/java/com/lts/job/task/tracker/runner/JobRunnerDelegate.java

@@ -16,12 +16,12 @@ import java.io.StringWriter;
 public class JobRunnerDelegate implements Runnable {
 
     private static final Logger LOGGER = LoggerFactory.getLogger("JobRunner");
-    private RunnerFactory runnerFactory;
+    private RunnerPool runnerPool;
     private Job job;
     private RunnerCallback callback;
 
-    public JobRunnerDelegate(RunnerFactory runnerFactory, Job job, RunnerCallback callback) {
-        this.runnerFactory = runnerFactory;
+    public JobRunnerDelegate(RunnerPool runnerPool, Job job, RunnerCallback callback) {
+        this.runnerPool = runnerPool;
         this.job = job;
         this.callback = callback;
     }
@@ -31,12 +31,12 @@ public class JobRunnerDelegate implements Runnable {
 
         while (job != null) {
 
-            RunnerPool.RunningJobManager.in(job.getJobId());
+            runnerPool.getRunningJobManager().in(job.getJobId());
 
             Response response = new Response();
             response.setJob(job);
             try {
-                runnerFactory.newRunner().run(job);
+                runnerPool.getRunnerFactory().newRunner().run(job);
                 response.setSuccess(true);
                 LOGGER.info("执行任务成功 : {}", job);
             } catch (Throwable t) {
@@ -52,7 +52,7 @@ public class JobRunnerDelegate implements Runnable {
                     LOGGER.info("任务执行失败: {} {}", job, t.getMessage(), t);
                 }
             } finally {
-                RunnerPool.RunningJobManager.out(job.getJobId());
+                runnerPool.getRunningJobManager().out(job.getJobId());
             }
             job = callback.runComplete(response);
         }

+ 25 - 14
job-task-tracker/src/main/java/com/lts/job/task/tracker/runner/RunnerPool.java

@@ -19,17 +19,21 @@ import java.util.concurrent.*;
  */
 public class RunnerPool {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger("RunnerPool");
+    private final Logger LOGGER = LoggerFactory.getLogger("RunnerPool");
 
     private ThreadPoolExecutor threadPoolExecutor = null;
     // 定时更新可用线程
     private ScheduledExecutorService REFRESH_EXECUTOR_SERVICE = null;
 
     private RunnerFactory runnerFactory;
+    private Application application;
+    private RunningJobManager runningJobManager;
 
-    public RunnerPool() {
+    public RunnerPool(final Application application) {
+        this.application = application;
+        this.runningJobManager = new RunningJobManager();
 
-        int maxSize = Application.Config.getWorkThreads();
+        int maxSize = application.getConfig().getWorkThreads();
         int minSize = 4 > maxSize ? maxSize : 4;
 
         threadPoolExecutor = new ThreadPoolExecutor(minSize, maxSize, 30, TimeUnit.SECONDS,
@@ -40,22 +44,22 @@ public class RunnerPool {
         REFRESH_EXECUTOR_SERVICE.scheduleWithFixedDelay(new Runnable() {
             @Override
             public void run() {
-                Application.setAttribute(Constants.KEY_AVAILABLE_THREADS, getAvailablePoolSize());
+                application.setAttribute(Constants.KEY_AVAILABLE_THREADS, getAvailablePoolSize());
             }
         }, 60, 30, TimeUnit.SECONDS);
 
-        runnerFactory = Application.getAttribute(Constants.RUNNER_FACTORY);
+        runnerFactory = application.getAttribute(Constants.RUNNER_FACTORY);
         if (runnerFactory == null) {
-            runnerFactory = new DefaultRunnerFactory();
+            runnerFactory = new DefaultRunnerFactory(application);
         }
     }
 
     public void execute(Job job, RunnerCallback callback) throws NoAvailableJobRunnerException {
         try {
             threadPoolExecutor.execute(
-                    new JobRunnerDelegate(runnerFactory, job, callback));
+                    new JobRunnerDelegate(this, job, callback));
             // 更新应用可用线程数
-            Application.setAttribute(Constants.KEY_AVAILABLE_THREADS, getAvailablePoolSize());
+            application.setAttribute(Constants.KEY_AVAILABLE_THREADS, getAvailablePoolSize());
             if (LOGGER.isDebugEnabled()) {
                 LOGGER.debug("receive job success ! " + job);
             }
@@ -83,22 +87,26 @@ public class RunnerPool {
         return threadPoolExecutor.getMaximumPoolSize();
     }
 
+    public RunnerFactory getRunnerFactory() {
+        return runnerFactory;
+    }
+
     /**
      * 用来管理正在执行的任务
      */
-    public static class RunningJobManager {
+    public class RunningJobManager {
 
-        private static final Set<String/*jobId*/> RUNNING_JOB_ID_SET = new ConcurrentHashSet<String>();
+        private final Set<String/*jobId*/> RUNNING_JOB_ID_SET = new ConcurrentHashSet<String>();
 
-        public static void in(String jobId) {
+        public void in(String jobId) {
             RUNNING_JOB_ID_SET.add(jobId);
         }
 
-        public static void out(String jobId) {
+        public void out(String jobId) {
             RUNNING_JOB_ID_SET.remove(jobId);
         }
 
-        public static boolean running(String jobId) {
+        public boolean running(String jobId) {
             return RUNNING_JOB_ID_SET.contains(jobId);
         }
 
@@ -108,7 +116,7 @@ public class RunnerPool {
          * @param jobIds
          * @return
          */
-        public static List<String> getNotExists(List<String> jobIds) {
+        public List<String> getNotExists(List<String> jobIds) {
             if (LOGGER.isDebugEnabled()) {
                 LOGGER.debug("running jobs :" + RUNNING_JOB_ID_SET);
                 LOGGER.debug("ask jobs:" + jobIds);
@@ -123,4 +131,7 @@ public class RunnerPool {
         }
     }
 
+    public RunningJobManager getRunningJobManager() {
+        return runningJobManager;
+    }
 }

+ 0 - 45
job-task-tracker/src/test/java/RunnerPoolTest.java

@@ -1,45 +0,0 @@
-import com.lts.job.core.domain.Job;
-import com.lts.job.task.tracker.domain.Response;
-import com.lts.job.task.tracker.expcetion.NoAvailableJobRunnerException;
-import com.lts.job.task.tracker.runner.RunnerCallback;
-import com.lts.job.task.tracker.runner.RunnerPool;
-import org.junit.Test;
-
-import java.io.IOException;
-
-/**
- * @author Robert HG (254963746@qq.com) on 8/16/14.
- */
-public class RunnerPoolTest {
-
-    @Test
-    public void test() {
-
-
-        for (int i = 0; i < 18; i++) {
-
-            try {
-                try {
-                    Thread.sleep(50L);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-                RunnerPool runnerPool = new RunnerPool();
-                runnerPool.execute(new Job(), new RunnerCallback() {
-                    @Override
-                    public Job runComplete(Response response) {
-                        return null;
-                    }
-                });
-            } catch (NoAvailableJobRunnerException e) {
-                e.printStackTrace();
-            }
-        }
-
-        try {
-            System.in.read();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-}

+ 1 - 1
job-tracker/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>light-task-schedule</artifactId>
         <groupId>com.lts</groupId>
-        <version>1.2-SNAPSHOT</version>
+        <version>1.3-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

+ 12 - 13
job-tracker/src/main/java/com/lts/job/tracker/JobTracker.java

@@ -2,45 +2,44 @@ package com.lts.job.tracker;
 
 import com.lts.job.core.cluster.*;
 import com.lts.job.core.constant.Constants;
-import com.lts.job.core.support.SingletonBeanContext;
 import com.lts.job.remoting.netty.NettyRequestProcessor;
 import com.lts.job.store.Config;
 import com.lts.job.store.mongo.DatastoreHolder;
+import com.lts.job.tracker.channel.ChannelManager;
 import com.lts.job.tracker.domain.JobTrackerNode;
 import com.lts.job.tracker.processor.RemotingDispatcher;
-import com.lts.job.tracker.support.DeadJobChecker;
+import com.lts.job.tracker.support.JobClientManager;
 import com.lts.job.tracker.support.JobNodeChangeListener;
 import com.lts.job.tracker.support.JobTrackerMasterChangeListener;
-import com.lts.job.tracker.support.RemotingServerManager;
+import com.lts.job.tracker.support.TaskTrackerManager;
 
 /**
  * @author Robert HG (254963746@qq.com) on 7/23/14.
  */
 public class JobTracker extends AbstractServerNode<JobTrackerNode> {
 
-    private DeadJobChecker deadJobChecker;
-
     public JobTracker() {
         config.setNodeGroup(Constants.DEFAULT_NODE_JOB_TRACKER_GROUP);
         config.setListenPort(Constants.JOB_TRACKER_DEFAULT_LISTEN_PORT);
-        addNodeChangeListener(new JobNodeChangeListener());
-        addMasterNodeChangeListener(new JobTrackerMasterChangeListener());
     }
 
     @Override
     protected void nodeStart() {
-        super.nodeStart();
-        // 改为这里初始化, 主要是为了 让 mongo延迟初始化,可以让设置的 storeConfig起作用
-        deadJobChecker = SingletonBeanContext.getBean(DeadJobChecker.class);
-        deadJobChecker.start();
 
-        RemotingServerManager.setRemotingServer(remotingServer);
+        ChannelManager channelManager = new ChannelManager();
+        application.setAttribute(Constants.CHANNEL_MANAGER, channelManager);
+        application.setAttribute(Constants.JOB_CLIENT_MANAGER, new JobClientManager(channelManager));
+        application.setAttribute(Constants.TASK_TRACKER_MANAGER, new TaskTrackerManager(channelManager));
+        addNodeChangeListener(new JobNodeChangeListener(application));
+        addMasterNodeChangeListener(new JobTrackerMasterChangeListener(application));
+
+        super.nodeStart();
+        application.setAttribute(Constants.REMOTING_SERVER, remotingServer);
     }
 
     @Override
     protected void nodeStop() {
         super.nodeStop();
-        deadJobChecker.stop();
     }
 
     @Override

+ 10 - 14
job-tracker/src/main/java/com/lts/job/tracker/channel/ChannelManager.java

@@ -18,16 +18,15 @@ import java.util.concurrent.TimeUnit;
  */
 public class ChannelManager {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(ChannelManager.class);
+    private final Logger LOGGER = LoggerFactory.getLogger(ChannelManager.class);
     // 客户端列表 (要保证同一个group的node要是无状态的)
-    private static final ConcurrentHashMap<String/*clientGroup*/, List<ChannelWrapper>> clientChannelMap = new ConcurrentHashMap<String, List<ChannelWrapper>>();
+    private final ConcurrentHashMap<String/*clientGroup*/, List<ChannelWrapper>> clientChannelMap = new ConcurrentHashMap<String, List<ChannelWrapper>>();
     // 任务节点列表
-    private static final ConcurrentHashMap<String/*taskTrackerGroup*/, List<ChannelWrapper>> taskTrackerChannelMap = new ConcurrentHashMap<String, List<ChannelWrapper>>();
+    private final ConcurrentHashMap<String/*taskTrackerGroup*/, List<ChannelWrapper>> taskTrackerChannelMap = new ConcurrentHashMap<String, List<ChannelWrapper>>();
     // 用来定时检查已经关闭的channel
-    private static final ScheduledExecutorService channelCheckExecutorService = Executors.newScheduledThreadPool(1);
-
-    static {
+    private final ScheduledExecutorService channelCheckExecutorService = Executors.newScheduledThreadPool(1);
 
+    public ChannelManager() {
         channelCheckExecutorService.scheduleWithFixedDelay(new Runnable() {
             @Override
             public void run() {
@@ -44,15 +43,12 @@ public class ChannelManager {
         }, 10, 30, TimeUnit.SECONDS);
     }
 
-    private ChannelManager() {
-    }
-
     /**
      * 检查 关闭的channel
      *
      * @param channelMap
      */
-    private static void checkCloseChannel(ConcurrentHashMap<String, List<ChannelWrapper>> channelMap) {
+    private void checkCloseChannel(ConcurrentHashMap<String, List<ChannelWrapper>> channelMap) {
         for (Map.Entry<String, List<ChannelWrapper>> entry : channelMap.entrySet()) {
             List<ChannelWrapper> channels = entry.getValue();
             List<ChannelWrapper> removeList = new ArrayList<ChannelWrapper>();
@@ -66,7 +62,7 @@ public class ChannelManager {
         }
     }
 
-    public static List<ChannelWrapper> getChannels(String nodeGroup, NodeType nodeType) {
+    public List<ChannelWrapper> getChannels(String nodeGroup, NodeType nodeType) {
         if (nodeType == NodeType.CLIENT) {
             return clientChannelMap.get(nodeGroup);
         } else if (nodeType == NodeType.TASK_TRACKER) {
@@ -82,7 +78,7 @@ public class ChannelManager {
      * @param identity
      * @return
      */
-    public static ChannelWrapper getChannel(String nodeGroup, NodeType nodeType, String identity) {
+    public ChannelWrapper getChannel(String nodeGroup, NodeType nodeType, String identity) {
         List<ChannelWrapper> channelWrappers = getChannels(nodeGroup, nodeType);
         if (channelWrappers != null && channelWrappers.size() != 0) {
             for (ChannelWrapper channelWrapper : channelWrappers) {
@@ -99,7 +95,7 @@ public class ChannelManager {
      *
      * @param channel
      */
-    public static void offerChannel(ChannelWrapper channel) {
+    public void offerChannel(ChannelWrapper channel) {
         String nodeGroup = channel.getNodeGroup();
         NodeType nodeType = channel.getNodeType();
         List<ChannelWrapper> channels = getChannels(nodeGroup, nodeType);
@@ -120,7 +116,7 @@ public class ChannelManager {
         }
     }
 
-    public static void removeChannel(ChannelWrapper channel) {
+    public void removeChannel(ChannelWrapper channel) {
         String nodeGroup = channel.getNodeGroup();
         NodeType nodeType = channel.getNodeType();
         List<ChannelWrapper> channels = getChannels(nodeGroup, nodeType);

+ 3 - 1
job-tracker/src/main/java/com/lts/job/tracker/processor/HeartBeatProcessor.java

@@ -23,11 +23,13 @@ public class HeartBeatProcessor extends AbstractProcessor {
     private final static Logger LOGGER = LoggerFactory.getLogger(HeartBeatProcessor.class);
 
     private final ExecutorService executor;
+    private JobController jobController;
 
     public HeartBeatProcessor(RemotingServerDelegate remotingServer) {
         super(remotingServer);
 
         executor = Executors.newCachedThreadPool();
+        jobController = new JobController(remotingServer.getApplication());
     }
 
     @Override
@@ -44,7 +46,7 @@ public class HeartBeatProcessor extends AbstractProcessor {
                 @Override
                 public void run() {
                     try {
-                        JobController.pushJob(remotingServer, ctx, request);
+                        jobController.pushJob(remotingServer, ctx, request);
                     } catch (Throwable t) {
                         LOGGER.error(t.getMessage(), t);
                     }

+ 6 - 3
job-tracker/src/main/java/com/lts/job/tracker/processor/JobFinishedProcessor.java

@@ -3,6 +3,7 @@ package com.lts.job.tracker.processor;
 import com.lts.job.core.domain.Job;
 import com.lts.job.core.domain.JobResult;
 import com.lts.job.core.domain.LogType;
+import com.lts.job.core.protocol.command.CommandWrapper;
 import com.lts.job.core.protocol.command.JobFinishedRequest;
 import com.lts.job.core.protocol.command.JobPushRequest;
 import com.lts.job.core.remoting.RemotingServerDelegate;
@@ -35,11 +36,13 @@ public class JobFinishedProcessor extends AbstractProcessor {
     private ClientNotifier clientNotifier;
     private JobFeedbackQueueMongoRepository jobFeedbackQueueMongoRepository;
     private static final Logger LOGGER = LoggerFactory.getLogger(JobFinishedProcessor.class.getSimpleName());
+    private CommandWrapper commandWrapper;
 
     public JobFinishedProcessor(RemotingServerDelegate remotingServer) {
         super(remotingServer);
         this.jobFeedbackQueueMongoRepository = SingletonBeanContext.getBean(JobFeedbackQueueMongoRepository.class);
-        this.clientNotifier = new ClientNotifier(remotingServer, new ClientNotifyHandler() {
+        this.commandWrapper = remotingServer.getApplication().getCommandWrapper();
+        this.clientNotifier = new ClientNotifier(remotingServer.getApplication(), new ClientNotifyHandler() {
             @Override
             public void handleSuccess(List<JobResult> jobResults) {
                 finishedJob(jobResults);
@@ -127,10 +130,10 @@ public class JobFinishedProcessor extends AbstractProcessor {
             // 查看有没有其他可以执行的任务
             JobPo jobPo = jobRepository.getJobPo(requestBody.getNodeGroup(), requestBody.getIdentity());
             if (jobPo != null) {
-                JobPushRequest jobPushRequest = new JobPushRequest();
+                JobPushRequest jobPushRequest = commandWrapper.wrapper(new JobPushRequest());
                 Job job = JobDomainConverter.convert(jobPo);
                 jobPushRequest.setJob(job);
-                if(LOGGER.isDebugEnabled()){
+                if (LOGGER.isDebugEnabled()) {
                     LOGGER.debug("发送任务{}给 {} {} ", job, requestBody.getNodeGroup(), requestBody.getIdentity());
                 }
                 // 返回 新的任务

+ 5 - 1
job-tracker/src/main/java/com/lts/job/tracker/processor/JobSubmitProcessor.java

@@ -2,6 +2,7 @@ package com.lts.job.tracker.processor;
 
 import com.lts.job.core.exception.JobReceiveException;
 import com.lts.job.core.protocol.JobProtos;
+import com.lts.job.core.protocol.command.CommandWrapper;
 import com.lts.job.core.protocol.command.JobSubmitRequest;
 import com.lts.job.core.protocol.command.JobSubmitResponse;
 import com.lts.job.core.remoting.RemotingServerDelegate;
@@ -20,8 +21,11 @@ public class JobSubmitProcessor extends AbstractProcessor {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(JobSubmitProcessor.class);
 
+    private CommandWrapper commandWrapper;
+
     public JobSubmitProcessor(RemotingServerDelegate remotingServer) {
         super(remotingServer);
+        this.commandWrapper = remotingServer.getApplication().getCommandWrapper();
     }
 
     @Override
@@ -29,7 +33,7 @@ public class JobSubmitProcessor extends AbstractProcessor {
 
         JobSubmitRequest jobSubmitRequest =  request.getBody();
 
-        JobSubmitResponse jobSubmitResponse = new JobSubmitResponse();
+        JobSubmitResponse jobSubmitResponse = commandWrapper.wrapper(new JobSubmitResponse());
         RemotingCommand response = null;
         try {
             JobReceiver.receive(jobSubmitRequest);

+ 8 - 4
job-tracker/src/main/java/com/lts/job/tracker/processor/RemotingDispatcher.java

@@ -1,6 +1,7 @@
 package com.lts.job.tracker.processor;
 
 import com.lts.job.core.cluster.NodeType;
+import com.lts.job.core.constant.Constants;
 import com.lts.job.core.protocol.command.AbstractCommandBody;
 import com.lts.job.core.remoting.RemotingServerDelegate;
 import com.lts.job.remoting.exception.RemotingCommandException;
@@ -24,11 +25,14 @@ import static com.lts.job.core.protocol.JobProtos.RequestCode.*;
  */
 public class RemotingDispatcher extends AbstractProcessor {
 
-    private static final Map<RequestCode, NettyRequestProcessor> processors = new HashMap<RequestCode, NettyRequestProcessor>();
+    private final Map<RequestCode, NettyRequestProcessor> processors = new HashMap<RequestCode, NettyRequestProcessor>();
+    private TaskTrackerManager taskTrackerManager;
+    private ChannelManager channelManager;
 
     public RemotingDispatcher(RemotingServerDelegate remotingServer) {
         super(remotingServer);
-
+        this.taskTrackerManager = remotingServer.getApplication().getAttribute(Constants.TASK_TRACKER_MANAGER);
+        this.channelManager = remotingServer.getApplication().getAttribute(Constants.CHANNEL_MANAGER);
         processors.put(SUBMIT_JOB, new JobSubmitProcessor(remotingServer));
         processors.put(HEART_BEAT, new HeartBeatProcessor(remotingServer));
         processors.put(JOB_FINISHED, new JobFinishedProcessor(remotingServer));
@@ -60,13 +64,13 @@ public class RemotingDispatcher extends AbstractProcessor {
         NodeType nodeType = NodeType.valueOf(commandBody.getNodeType());
 
         // 1. 将 channel 纳入管理中(不存在就加入)
-        ChannelManager.offerChannel(new ChannelWrapper(ctx.channel(), nodeType, nodeGroup, identity));
+        channelManager.offerChannel(new ChannelWrapper(ctx.channel(), nodeType, nodeGroup, identity));
 
         // 2. 更新 TaskTracker 节点信息(可用线程数)
         if (NodeType.TASK_TRACKER.equals(nodeType)) {
             // 更新 TaskTracker 节点信息(可用线程数)
             Integer availableThreads = commandBody.getAvailableThreads();
-            TaskTrackerManager.INSTANCE.updateTaskTrackerAvailableThreads(nodeGroup, identity, availableThreads, commandBody.getTimestamp());
+            taskTrackerManager.updateTaskTrackerAvailableThreads(nodeGroup, identity, availableThreads, commandBody.getTimestamp());
         }
     }
 

+ 17 - 7
job-tracker/src/main/java/com/lts/job/tracker/support/ClientNotifier.java

@@ -1,10 +1,13 @@
 package com.lts.job.tracker.support;
 
+import com.lts.job.core.constant.Constants;
 import com.lts.job.core.domain.JobResult;
 import com.lts.job.core.exception.RemotingSendException;
 import com.lts.job.core.protocol.JobProtos;
+import com.lts.job.core.protocol.command.CommandWrapper;
 import com.lts.job.core.protocol.command.JobFinishedRequest;
 import com.lts.job.core.remoting.RemotingServerDelegate;
+import com.lts.job.core.support.Application;
 import com.lts.job.remoting.InvokeCallback;
 import com.lts.job.remoting.exception.RemotingCommandFieldCheckException;
 import com.lts.job.remoting.netty.ResponseFuture;
@@ -22,14 +25,17 @@ import java.util.concurrent.CountDownLatch;
 public class ClientNotifier {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ClientNotifier.class.getSimpleName());
-    private RemotingServerDelegate remotingServer;
     private ClientNotifyHandler clientNotifyHandler;
+    private JobClientManager jobClientManager;
+    private CommandWrapper commandWrapper;
+    private Application application;
 
-    public ClientNotifier(RemotingServerDelegate remotingServer, ClientNotifyHandler clientNotifyHandler) {
-        this.remotingServer = remotingServer;
+    public ClientNotifier(Application application, ClientNotifyHandler clientNotifyHandler) {
+        this.application = application;
         this.clientNotifyHandler = clientNotifyHandler;
+        this.jobClientManager = application.getAttribute(Constants.JOB_CLIENT_MANAGER);
+        this.commandWrapper = application.getCommandWrapper();
     }
-
     /**
      * 发送给客户端
      *
@@ -80,20 +86,20 @@ public class ClientNotifier {
      */
     private boolean send0(String nodeGroup, final List<JobResult> jobResults) {
         // 得到 可用的客户端节点
-        JobClientNode jobClientNode = JobClientManager.INSTANCE.getAvailableJobClient(nodeGroup);
+        JobClientNode jobClientNode = jobClientManager.getAvailableJobClient(nodeGroup);
 
         if (jobClientNode == null) {
             return false;
         }
 
-        JobFinishedRequest requestBody = new JobFinishedRequest();
+        JobFinishedRequest requestBody = commandWrapper.wrapper(new JobFinishedRequest());
         requestBody.setJobResults(jobResults);
         RemotingCommand commandRequest = RemotingCommand.createRequestCommand(JobProtos.RequestCode.JOB_FINISHED.code(), requestBody);
 
         final boolean[] result = new boolean[1];
         try {
             final CountDownLatch latch = new CountDownLatch(1);
-            remotingServer.invokeAsync(jobClientNode.getChannel().getChannel(), commandRequest, new InvokeCallback() {
+            getRemotingServer().invokeAsync(jobClientNode.getChannel().getChannel(), commandRequest, new InvokeCallback() {
                 @Override
                 public void operationComplete(ResponseFuture responseFuture) {
                     try {
@@ -124,4 +130,8 @@ public class ClientNotifier {
         return result[0];
     }
 
+    private RemotingServerDelegate getRemotingServer(){
+        return (RemotingServerDelegate)application.getAttribute(Constants.REMOTING_SERVER);
+    }
+
 }

+ 28 - 13
job-tracker/src/main/java/com/lts/job/tracker/support/DeadJobChecker.java

@@ -1,13 +1,15 @@
 package com.lts.job.tracker.support;
 
 import com.lts.job.core.cluster.Node;
-import com.lts.job.core.cluster.NodeManager;
 import com.lts.job.core.cluster.NodeType;
+import com.lts.job.core.constant.Constants;
 import com.lts.job.core.domain.LogType;
 import com.lts.job.core.protocol.JobProtos;
+import com.lts.job.core.protocol.command.CommandWrapper;
 import com.lts.job.core.protocol.command.JobAskRequest;
 import com.lts.job.core.protocol.command.JobAskResponse;
 import com.lts.job.core.remoting.RemotingServerDelegate;
+import com.lts.job.core.support.Application;
 import com.lts.job.core.support.SingletonBeanContext;
 import com.lts.job.core.repository.JobMongoRepository;
 import com.lts.job.core.repository.po.JobPo;
@@ -37,7 +39,7 @@ public class DeadJobChecker {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(DeadJobChecker.class);
 
-    private static JobMongoRepository jobRepository;
+    private JobMongoRepository jobRepository;
     // 5 分钟没有收到反馈信息 (并且该节点不存在了),表示这个任务已经死掉了
     private static final long MAX_DEAD_CHECK_TIME = 5 * 60 * 1000;
     // 5 分钟没有收到反馈信息 并且该节点存在, 那么主动去询问taskTracker 这个任务是否在执行, 如果没有,则表示这个任务已经死掉了
@@ -45,8 +47,14 @@ public class DeadJobChecker {
 
     private final ScheduledExecutorService FIXED_EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1);
 
-    public DeadJobChecker() {
-        jobRepository = SingletonBeanContext.getBean(JobMongoRepository.class);
+    private Application application;
+    private ChannelManager channelManager;
+    private CommandWrapper commandWrapper;
+
+    public DeadJobChecker(Application application) {
+        this.application = application;
+        this.channelManager = application.getAttribute(Constants.CHANNEL_MANAGER);
+        this.commandWrapper = application.getCommandWrapper();
     }
 
     public void start() {
@@ -56,9 +64,9 @@ public class DeadJobChecker {
                 try {
                     // 查询出所有死掉的任务 (其实可以直接在数据库中fix的, 查询出来主要是为了日志打印)
                     // 一般来说这个是没有多大的,我就不分页去查询了
-                    List<JobPo> jobPos = jobRepository.getDeadJob(MAX_DEAD_CHECK_TIME);
+                    List<JobPo> jobPos = getJobRepository().getDeadJob(MAX_DEAD_CHECK_TIME);
                     if (jobPos != null && jobPos.size() > 0) {
-                        List<Node> nodes = NodeManager.getNodeList(NodeType.TASK_TRACKER);
+                        List<Node> nodes = application.getNodeManager().getNodeList(NodeType.TASK_TRACKER);
                         HashSet<String/*identity*/> identities = new HashSet<String>();
                         if (CollectionUtils.isNotEmpty(nodes)) {
                             for (Node node : nodes) {
@@ -85,12 +93,12 @@ public class DeadJobChecker {
                         }
 
                         if (CollectionUtils.isNotEmpty(timeoutMap)) {
-                            RemotingServerDelegate remotingServer = RemotingServerManager.getRemotingServer();
+                            RemotingServerDelegate remotingServer = application.getAttribute(Constants.REMOTING_SERVER);
                             for (Map.Entry<TaskTrackerNode, List<String>> entry : timeoutMap.entrySet()) {
                                 TaskTrackerNode taskTrackerNode = entry.getKey();
-                                ChannelWrapper channelWrapper = ChannelManager.getChannel(taskTrackerNode.getNodeGroup(), NodeType.TASK_TRACKER, taskTrackerNode.getIdentity());
+                                ChannelWrapper channelWrapper = channelManager.getChannel(taskTrackerNode.getNodeGroup(), NodeType.TASK_TRACKER, taskTrackerNode.getIdentity());
                                 if (channelWrapper != null && channelWrapper.getChannel() != null && channelWrapper.isOpen()) {
-                                    JobAskRequest requestBody = new JobAskRequest();
+                                    JobAskRequest requestBody = commandWrapper.wrapper(new JobAskRequest());
                                     requestBody.setJobIds(entry.getValue());
                                     RemotingCommand request = RemotingCommand.createRequestCommand(JobProtos.RequestCode.JOB_ASK.code(), requestBody);
                                     RemotingCommand response = remotingServer.invokeSync(channelWrapper.getChannel(), request);
@@ -123,9 +131,9 @@ public class DeadJobChecker {
      *
      * @param node
      */
-    public static void fixedDeadLock(Node node) {
+    public void fixedDeadLock(Node node) {
         try {
-            List<JobPo> jobPos = jobRepository.getJobByTaskTracker(node.getIdentity());
+            List<JobPo> jobPos = getJobRepository().getJobByTaskTracker(node.getIdentity());
             if (CollectionUtils.isNotEmpty(jobPos)) {
                 for (JobPo jobPo : jobPos) {
                     fixedDeadJob(jobPo);
@@ -136,12 +144,19 @@ public class DeadJobChecker {
         }
     }
 
-    private static void fixedDeadJob(JobPo jobPo) {
-        jobRepository.setJobRunnable(jobPo);
+    private void fixedDeadJob(JobPo jobPo) {
+        getJobRepository().setJobRunnable(jobPo);
         JobLogger.log(jobPo, LogType.FIXED_DEAD);
         LOGGER.info("修复死掉的任务成功! {}", jobPo);
     }
 
+    private JobMongoRepository getJobRepository() {
+        if (jobRepository == null) {
+            jobRepository = SingletonBeanContext.getBean(JobMongoRepository.class);
+        }
+        return jobRepository;
+    }
+
     public void stop() {
         FIXED_EXECUTOR_SERVICE.shutdown();
     }

+ 16 - 6
job-tracker/src/main/java/com/lts/job/tracker/support/FeedbackJobSendChecker.java

@@ -1,8 +1,11 @@
 package com.lts.job.tracker.support;
 
+import com.lts.job.core.constant.Constants;
 import com.lts.job.core.domain.JobResult;
+import com.lts.job.core.remoting.RemotingServerDelegate;
 import com.lts.job.core.repository.JobFeedbackQueueMongoRepository;
 import com.lts.job.core.repository.po.JobFeedbackQueuePo;
+import com.lts.job.core.support.Application;
 import com.lts.job.core.support.SingletonBeanContext;
 import com.lts.job.core.util.CollectionUtils;
 import org.slf4j.Logger;
@@ -28,6 +31,7 @@ public class FeedbackJobSendChecker {
     private volatile boolean start = false;
     private ClientNotifier clientNotifier;
 
+
     /**
      * 是否已经启动
      *
@@ -37,13 +41,12 @@ public class FeedbackJobSendChecker {
         return start;
     }
 
-    public FeedbackJobSendChecker() {
-        jobFeedbackQueueMongoRepository = SingletonBeanContext.getBean(JobFeedbackQueueMongoRepository.class);
-        clientNotifier = new ClientNotifier(RemotingServerManager.getRemotingServer(), new ClientNotifyHandler() {
+    public FeedbackJobSendChecker(Application application) {
+        clientNotifier = new ClientNotifier(application, new ClientNotifyHandler() {
             @Override
             public void handleSuccess(List<JobResult> jobResults) {
                 for (JobResult jobResult : jobResults) {
-                    jobFeedbackQueueMongoRepository.delJobFeedback(((JobFeedbackQueuePo) jobResult).getId());
+                    getJobFeedbackQueueMongoRepository().delJobFeedback(((JobFeedbackQueuePo) jobResult).getId());
                 }
             }
 
@@ -79,12 +82,19 @@ public class FeedbackJobSendChecker {
         }
     }
 
+    private JobFeedbackQueueMongoRepository getJobFeedbackQueueMongoRepository(){
+        if(jobFeedbackQueueMongoRepository == null){
+            jobFeedbackQueueMongoRepository = SingletonBeanContext.getBean(JobFeedbackQueueMongoRepository.class);
+        }
+        return jobFeedbackQueueMongoRepository;
+    }
+
     private class Runner implements Runnable {
         @Override
         public void run() {
             try {
 
-                long count = jobFeedbackQueueMongoRepository.count();
+                long count = getJobFeedbackQueueMongoRepository().count();
                 if (count == 0) {
                     return;
                 }
@@ -94,7 +104,7 @@ public class FeedbackJobSendChecker {
                 int offset = 0;
                 int limit = 5;
                 do {
-                    jobFeedbackQueuePos = jobFeedbackQueueMongoRepository.get(offset, limit);
+                    jobFeedbackQueuePos = getJobFeedbackQueueMongoRepository().get(offset, limit);
                     if (CollectionUtils.isEmpty(jobFeedbackQueuePos)) {
                         return;
                     }

+ 8 - 6
job-tracker/src/main/java/com/lts/job/tracker/support/JobClientManager.java

@@ -2,6 +2,8 @@ package com.lts.job.tracker.support;
 
 import com.lts.job.core.cluster.Node;
 import com.lts.job.core.cluster.NodeType;
+import com.lts.job.core.constant.Constants;
+import com.lts.job.core.support.Application;
 import com.lts.job.core.util.ConcurrentHashSet;
 import com.lts.job.tracker.channel.ChannelManager;
 import com.lts.job.tracker.channel.ChannelWrapper;
@@ -22,11 +24,11 @@ public class JobClientManager {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(JobClientManager.class);
 
-    public static final JobClientManager INSTANCE = new JobClientManager();
+    private final ConcurrentHashMap<String/*nodeGroup*/, ConcurrentHashSet<JobClientNode>> NODE_MAP = new ConcurrentHashMap<String, ConcurrentHashSet<JobClientNode>>();
 
-    private static final ConcurrentHashMap<String/*nodeGroup*/, ConcurrentHashSet<JobClientNode>> NODE_MAP = new ConcurrentHashMap<String, ConcurrentHashSet<JobClientNode>>();
-
-    private JobClientManager() {
+    private ChannelManager channelManager;
+    public JobClientManager(ChannelManager channelManager) {
+        this.channelManager = channelManager;
     }
 
     /**
@@ -36,7 +38,7 @@ public class JobClientManager {
      */
     public void addNode(Node node) {
         //  channel 可能为 null
-        ChannelWrapper channel = ChannelManager.getChannel(node.getGroup(), node.getNodeType(), node.getIdentity());
+        ChannelWrapper channel = channelManager.getChannel(node.getGroup(), node.getNodeType(), node.getIdentity());
         ConcurrentHashSet<JobClientNode> jobClientNodes = NODE_MAP.get(node.getGroup());
 
         synchronized (NODE_MAP) {
@@ -94,7 +96,7 @@ public class JobClientManager {
             jobClientNode = list.get(index);
             // 如果 channel 已经关闭, 更新channel, 如果没有channel, 略过
             if (jobClientNode != null && (jobClientNode.getChannel() == null || jobClientNode.getChannel().isClosed())) {
-                ChannelWrapper channel = ChannelManager.getChannel(jobClientNode.getNodeGroup(), NodeType.CLIENT, jobClientNode.getIdentity());
+                ChannelWrapper channel = channelManager.getChannel(jobClientNode.getNodeGroup(), NodeType.CLIENT, jobClientNode.getIdentity());
                 if (channel != null) {
                     // 更新channel
                     jobClientNode.setChannel(channel);

+ 19 - 12
job-tracker/src/main/java/com/lts/job/tracker/support/JobController.java

@@ -1,11 +1,14 @@
 package com.lts.job.tracker.support;
 
+import com.lts.job.core.constant.Constants;
 import com.lts.job.core.domain.Job;
 import com.lts.job.core.exception.RemotingSendException;
 import com.lts.job.core.protocol.JobProtos;
 import com.lts.job.core.protocol.command.AbstractCommandBody;
+import com.lts.job.core.protocol.command.CommandWrapper;
 import com.lts.job.core.protocol.command.JobPushRequest;
 import com.lts.job.core.remoting.RemotingServerDelegate;
+import com.lts.job.core.support.Application;
 import com.lts.job.core.support.JobDomainConverter;
 import com.lts.job.core.support.SingletonBeanContext;
 import com.lts.job.remoting.exception.RemotingCommandFieldCheckException;
@@ -25,13 +28,18 @@ import java.util.HashSet;
  */
 public class JobController {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(JobController.class);
-    private static JobMongoRepository jobRepository;
+    private final Logger LOGGER = LoggerFactory.getLogger(JobController.class);
+    private JobMongoRepository jobRepository;
+    private TaskTrackerManager taskTrackerManager;
+    private CommandWrapper commandWrapper;
 
-    static {
-        jobRepository = SingletonBeanContext.getBean(JobMongoRepository.class);
+    public JobController(Application application) {
+        this.jobRepository = SingletonBeanContext.getBean(JobMongoRepository.class);
+        this.taskTrackerManager = application.getAttribute(Constants.TASK_TRACKER_MANAGER);
+        this.commandWrapper = application.getCommandWrapper();
     }
 
+
     /**
      * 对 TaskTracker的每次请求进行处理
      * 分发任务等
@@ -40,7 +48,7 @@ public class JobController {
      * @param ctx
      * @param request
      */
-    public static void pushJob(RemotingServerDelegate remotingServer, ChannelHandlerContext ctx, RemotingCommand request) {
+    public void pushJob(RemotingServerDelegate remotingServer, ChannelHandlerContext ctx, RemotingCommand request) {
 
         AbstractCommandBody requestBody = request.getBody();
         String nodeGroup = requestBody.getNodeGroup();
@@ -50,7 +58,7 @@ public class JobController {
 
         while (true) {
 
-            TaskTrackerNode taskTrackerNode = TaskTrackerManager.INSTANCE.getIdleTaskTrackerNode(nodeGroup, failedNodes);
+            TaskTrackerNode taskTrackerNode = taskTrackerManager.getIdleTaskTrackerNode(nodeGroup, failedNodes);
 
             if (taskTrackerNode != null) {
                 // 推送任务
@@ -70,11 +78,11 @@ public class JobController {
     }
 
     // 没有任务可执行
-    private static final int NO_JOB = 1;
+    private final int NO_JOB = 1;
     // 推送成功
-    private static final int PUSH_SUCCESS = 2;
+    private final int PUSH_SUCCESS = 2;
     // 推送失败
-    private static final int PUSH_FAILED = 3;
+    private final int PUSH_FAILED = 3;
 
 
     /**
@@ -84,7 +92,7 @@ public class JobController {
      * @param taskTrackerNode
      * @return
      */
-    private static int pushJob(RemotingServerDelegate remotingServer, TaskTrackerNode taskTrackerNode) {
+    private int pushJob(RemotingServerDelegate remotingServer, TaskTrackerNode taskTrackerNode) {
 
         String nodeGroup = taskTrackerNode.getNodeGroup();
         String identity = taskTrackerNode.getIdentity();
@@ -96,7 +104,7 @@ public class JobController {
             return NO_JOB;
         }
 
-        JobPushRequest body = new JobPushRequest();
+        JobPushRequest body = commandWrapper.wrapper(new JobPushRequest());
         Job job = JobDomainConverter.convert(jobPo);
         body.setJob(job);
         RemotingCommand commandRequest = RemotingCommand.createRequestCommand(JobProtos.RequestCode.PUSH_JOB.code(), body);
@@ -108,7 +116,6 @@ public class JobController {
             RemotingCommand commandResponse = remotingServer.invokeSync(taskTrackerNode.getChannel().getChannel(), commandRequest);
 
             if (commandResponse.getCode() == JobProtos.ResponseCode.JOB_PUSH_SUCCESS.code()) {
-//                    JobLogger.log(job, LogType.PUSH);
                 pushSuccess = true;
             }
 

+ 21 - 8
job-tracker/src/main/java/com/lts/job/tracker/support/JobNodeChangeListener.java

@@ -2,32 +2,45 @@ package com.lts.job.tracker.support;
 
 import com.lts.job.core.cluster.Node;
 import com.lts.job.core.cluster.NodeType;
+import com.lts.job.core.constant.Constants;
 import com.lts.job.core.listener.NodeChangeListener;
+import com.lts.job.core.support.Application;
 
 import java.util.List;
 
 /**
  * @author Robert HG (254963746@qq.com) on 8/17/14.
- * 节点变化监听器
+ *         节点变化监听器
  */
 public class JobNodeChangeListener implements NodeChangeListener {
 
+    private Application application;
+
+    public JobNodeChangeListener(Application application) {
+        this.application = application;
+        this.jobClientManager = application.getAttribute(Constants.JOB_CLIENT_MANAGER);
+        this.taskTrackerManager = application.getAttribute(Constants.TASK_TRACKER_MANAGER);
+    }
+
+    private TaskTrackerManager taskTrackerManager;
+    private JobClientManager jobClientManager;
+
     @Override
     public void addNode(Node node) {
         if (node.getNodeType().equals(NodeType.TASK_TRACKER)) {
-            TaskTrackerManager.INSTANCE.addNode(node);
-        }else if(node.getNodeType().equals(NodeType.CLIENT)){
-            JobClientManager.INSTANCE.addNode(node);
+            taskTrackerManager.addNode(node);
+        } else if (node.getNodeType().equals(NodeType.CLIENT)) {
+            jobClientManager.addNode(node);
         }
     }
 
     @Override
     public void removeNode(Node node) {
         if (node.getNodeType().equals(NodeType.TASK_TRACKER)) {
-            TaskTrackerManager.INSTANCE.removeNode(node);
-            DeadJobChecker.fixedDeadLock(node);
-        }else if(node.getNodeType().equals(NodeType.CLIENT)){
-            JobClientManager.INSTANCE.removeNode(node);
+            taskTrackerManager.removeNode(node);
+            ((DeadJobChecker) application.getAttribute(Constants.DEAD_JOB_CHECKER)).fixedDeadLock(node);
+        } else if (node.getNodeType().equals(NodeType.CLIENT)) {
+            jobClientManager.removeNode(node);
         }
     }
 

+ 16 - 6
job-tracker/src/main/java/com/lts/job/tracker/support/JobTrackerMasterChangeListener.java

@@ -1,31 +1,41 @@
 package com.lts.job.tracker.support;
 
 import com.lts.job.core.cluster.Node;
+import com.lts.job.core.constant.Constants;
 import com.lts.job.core.listener.MasterNodeChangeListener;
 import com.lts.job.core.support.Application;
-import com.lts.job.core.support.SingletonBeanContext;
 
 /**
  * @author Robert HG (254963746@qq.com) on 8/24/14.
- * JobTracker master 节点变化之后
+ *         JobTracker master 节点变化之后
  */
 public class JobTrackerMasterChangeListener implements MasterNodeChangeListener {
 
+    private Application application;
+    private DeadJobChecker deadJobChecker;
+    private FeedbackJobSendChecker feedbackJobSendChecker;
+
+    public JobTrackerMasterChangeListener(Application application) {
+        this.application = application;
+        this.deadJobChecker = new DeadJobChecker(application);
+        this.application.setAttribute(Constants.DEAD_JOB_CHECKER, deadJobChecker);
+        this.feedbackJobSendChecker = new FeedbackJobSendChecker(application);
+    }
+
     @Override
     public void change(Node master, boolean isMaster) {
 
-        FeedbackJobSendChecker feedbackJobSendChecker = SingletonBeanContext.getBean(FeedbackJobSendChecker.class);
-
-        if (Application.Config.getIdentity().equals(master.getIdentity())) {
+        if (application.getConfig().getIdentity().equals(master.getIdentity())) {
             // 如果 master 节点是自己
             // 2. 启动通知客户端失败检查重发的定时器
             feedbackJobSendChecker.start();
-
+            deadJobChecker.start();
         } else {
             // 如果 master 节点不是自己
 
             // 2. 关闭通知客户端失败检查重发的定时器
             feedbackJobSendChecker.stop();
+            deadJobChecker.stop();
         }
     }
 }

+ 0 - 19
job-tracker/src/main/java/com/lts/job/tracker/support/RemotingServerManager.java

@@ -1,19 +0,0 @@
-package com.lts.job.tracker.support;
-
-import com.lts.job.core.remoting.RemotingServerDelegate;
-
-/**
- * @author Robert HG (254963746@qq.com) on 8/21/14.
- */
-public class RemotingServerManager {
-
-    private static RemotingServerDelegate remotingServer;
-
-    public static RemotingServerDelegate getRemotingServer() {
-        return RemotingServerManager.remotingServer;
-    }
-
-    public static void setRemotingServer(RemotingServerDelegate remotingServer) {
-        RemotingServerManager.remotingServer = remotingServer;
-    }
-}

+ 10 - 7
job-tracker/src/main/java/com/lts/job/tracker/support/TaskTrackerManager.java

@@ -3,6 +3,8 @@ package com.lts.job.tracker.support;
 
 import com.lts.job.core.cluster.Node;
 import com.lts.job.core.cluster.NodeType;
+import com.lts.job.core.constant.Constants;
+import com.lts.job.core.support.Application;
 import com.lts.job.core.util.ConcurrentHashSet;
 import com.lts.job.tracker.channel.ChannelManager;
 import com.lts.job.tracker.channel.ChannelWrapper;
@@ -15,16 +17,17 @@ import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * @author Robert HG (254963746@qq.com) on 8/16/14.
- * Task Tracker 管理器 (对 TaskTracker 节点的记录 和 可用线程的记录)
+ *         Task Tracker 管理器 (对 TaskTracker 节点的记录 和 可用线程的记录)
  */
 public class TaskTrackerManager {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(TaskTrackerManager.class);
     // 单例
-    public static final TaskTrackerManager INSTANCE = new TaskTrackerManager();
-    private static final ConcurrentHashMap<String/*nodeGroup*/, ConcurrentHashSet<TaskTrackerNode>> NODE_MAP = new ConcurrentHashMap<String, ConcurrentHashSet<TaskTrackerNode>>();
+    private final ConcurrentHashMap<String/*nodeGroup*/, ConcurrentHashSet<TaskTrackerNode>> NODE_MAP = new ConcurrentHashMap<String, ConcurrentHashSet<TaskTrackerNode>>();
+    private ChannelManager channelManager;
 
-    private TaskTrackerManager() {
+    public TaskTrackerManager(ChannelManager channelManager) {
+        this.channelManager = channelManager;
     }
 
     /**
@@ -34,7 +37,7 @@ public class TaskTrackerManager {
      */
     public void addNode(Node node) {
         //  channel 可能为 null
-        ChannelWrapper channel = ChannelManager.getChannel(node.getGroup(), node.getNodeType(), node.getIdentity());
+        ChannelWrapper channel = channelManager.getChannel(node.getGroup(), node.getNodeType(), node.getIdentity());
         ConcurrentHashSet<TaskTrackerNode> taskTrackerNodes = NODE_MAP.get(node.getGroup());
 
         synchronized (NODE_MAP) {
@@ -81,7 +84,7 @@ public class TaskTrackerManager {
                 if (trackerNode.getIdentity().equals(identity) && trackerNode.getTimestamp() <= timestamp) {
                     trackerNode.setAvailableThread(availableThreads);
                     trackerNode.setTimestamp(timestamp);
-                    if(LOGGER.isDebugEnabled()){
+                    if (LOGGER.isDebugEnabled()) {
                         LOGGER.debug("更新节点线程数: {}", trackerNode);
                     }
                 }
@@ -114,7 +117,7 @@ public class TaskTrackerManager {
 
             if (taskTrackerNode.getChannel() == null || taskTrackerNode.getChannel().isClosed()) {
                 // 如果 channel 已经关闭, 更新channel, 如果没有channel, 略过
-                ChannelWrapper channel = ChannelManager.getChannel(taskTrackerNode.getNodeGroup(), NodeType.TASK_TRACKER, taskTrackerNode.getIdentity());
+                ChannelWrapper channel = channelManager.getChannel(taskTrackerNode.getNodeGroup(), NodeType.TASK_TRACKER, taskTrackerNode.getIdentity());
                 if (channel != null) {
                     // 更新channel
                     taskTrackerNode.setChannel(channel);

+ 0 - 168
job-tracker/src/test/java/RemotingTest.java

@@ -1,168 +0,0 @@
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
-import com.lts.job.core.cluster.NodeType;
-import com.lts.job.core.protocol.JobProtos;
-import com.lts.job.core.protocol.command.JobSubmitRequest;
-import com.lts.job.core.remoting.RemotingServerDelegate;
-import com.lts.job.remoting.RemotingClient;
-import com.lts.job.remoting.RemotingServer;
-import com.lts.job.remoting.exception.RemotingCommandException;
-import com.lts.job.remoting.exception.RemotingConnectException;
-import com.lts.job.remoting.exception.RemotingSendRequestException;
-import com.lts.job.remoting.exception.RemotingTimeoutException;
-import com.lts.job.remoting.netty.*;
-import com.lts.job.remoting.protocol.RemotingCommand;
-import com.lts.job.remoting.protocol.RemotingProtos;
-import com.lts.job.remoting.protocol.RemotingSerializable;
-import com.lts.job.tracker.processor.RemotingDispatcher;
-import io.netty.channel.ChannelHandlerContext;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Executors;
-
-/**
- * @author Robert HG (254963746@qq.com) on 7/23/14.
- */
-public class RemotingTest {
-
-    private static final Logger logger = LoggerFactory.getLogger(RemotingTest.class);
-
-    @Test
-    public void test_processor() throws IOException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
-
-        NettyServerConfig serverConfig = new NettyServerConfig();
-        RemotingServer remotingServer = new NettyRemotingServer(serverConfig);
-
-        try {
-            remotingServer.start();
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-
-        remotingServer.registerDefaultProcessor(new RemotingDispatcher(new RemotingServerDelegate(remotingServer)),
-                Executors.newCachedThreadPool());
-
-        System.in.read();
-    }
-
-    @Test
-    public void test_processor2() throws IOException {
-
-        NettyClientConfig clientConfig = new NettyClientConfig();
-        RemotingClient client = new NettyRemotingClient(clientConfig);
-        client.start();
-
-        client.registerProcessor(RemotingProtos.ResponseCode.SUCCESS.code(), new NettyRequestProcessor() {
-            @Override
-            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
-                System.out.println("client receive server request = " + request);
-                request.setRemark("client remark");
-                return request;
-            }
-        }, Executors.newCachedThreadPool());
-
-
-        JobSubmitRequest header = new JobSubmitRequest();
-        header.setNodeType(NodeType.CLIENT.name());
-        header.setNodeGroup("CLIENT_GROUP");
-        header.putExtParam("测试", "好的");
-
-        RemotingCommand request = RemotingCommand.createRequestCommand(JobProtos.RequestCode.HEART_BEAT.code(), header);
-        RemotingCommand response = null;
-        try {
-            response = client.invokeSync("127.0.0.1:8888", request, 1000 * 3);
-        } catch (InterruptedException e) {
-            logger.error(e.getMessage(), e);
-        } catch (RemotingConnectException e) {
-            logger.error(e.getMessage(), e);
-        } catch (RemotingSendRequestException e) {
-            logger.error(e.getMessage(), e);
-        } catch (RemotingTimeoutException e) {
-            e.printStackTrace();
-        }
-        System.out.println("invoke result = " + response);
-        try {
-            response = client.invokeSync("127.0.0.1:8888", request, 1000 * 3);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        } catch (RemotingConnectException e) {
-            e.printStackTrace();
-        } catch (RemotingSendRequestException e) {
-            e.printStackTrace();
-        } catch (RemotingTimeoutException e) {
-            e.printStackTrace();
-        }
-
-
-        System.out.println("invoke result = " + response);
-
-        try {
-            Thread.sleep(5000);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-
-        try {
-            response = client.invokeSync("127.0.0.1:8888", request, 1000 * 3);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        } catch (RemotingConnectException e) {
-            e.printStackTrace();
-        } catch (RemotingSendRequestException e) {
-            e.printStackTrace();
-        } catch (RemotingTimeoutException e) {
-            e.printStackTrace();
-        }
-
-
-        System.out.println("invoke result = " + response);
-
-
-        System.in.read();
-    }
-
-
-    @Test
-    public void testJSON() {
-
-        Map<String, Object> map2 = new HashMap<String, Object>();
-        map2.put("111", "333");
-
-        byte[] json = RemotingSerializable.encode(map2);
-
-        System.out.println(new String(json));
-
-        Map<String, Object> map = RemotingSerializable.decode(json, Map.class);
-
-        System.out.println(map);
-
-
-        JobSubmitRequest header = new JobSubmitRequest();
-        header.setNodeGroup("CLIENT_GROUP");
-        header.setNodeType(NodeType.CLIENT.name());
-        header.putExtParam("测试", "好的");
-
-        RemotingCommand request = RemotingCommand.createRequestCommand(JobProtos.RequestCode.HEART_BEAT.code(), header);
-
-        byte[] bytes = RemotingSerializable.encode(request);
-
-        System.out.println(JSONObject.toJSONString(request, false));
-
-        System.out.println(new String(bytes));
-    }
-
-    @Test
-    public void testJSON2() {
-
-        Boolean json = false;
-        Double d = 1d;
-        System.out.println(JSON.toJSONString(d, false));
-    }
-
-
-}

+ 2 - 1
pom.xml

@@ -7,7 +7,7 @@
     <groupId>com.lts</groupId>
     <artifactId>light-task-schedule</artifactId>
     <packaging>pom</packaging>
-    <version>1.2-SNAPSHOT</version>
+    <version>1.3-SNAPSHOT</version>
     <modules>
         <module>job-core</module>
         <module>job-tracker</module>
@@ -15,6 +15,7 @@
         <module>job-client</module>
         <module>job-example</module>
         <module>job-extensions</module>
+        <!--<module>job-admin</module>-->
     </modules>
 
     <dependencies>