Browse Source

Merge pull request #146 from qq254963746/develop

Develop
醉枫 9 years ago
parent
commit
7357f62173

+ 2 - 1
lts-admin/src/main/webapp/WEB-INF/views/templates/jobLogger.vm

@@ -163,7 +163,8 @@
             FINISHED: '完成任务',
             FIXED_DEAD: '修复死任务',
             BIZ: '业务日志',
-            RESEND: '重新反馈任务'
+            RESEND: '重新反馈任务',
+            DEL: '删除'
         };
 
         LTS.colFormatter.logTypeLabel = function (v) {

+ 8 - 5
lts-core/src/main/java/com/lts/core/protocol/JobProtos.java

@@ -2,7 +2,7 @@ package com.lts.core.protocol;
 
 /**
  * @author Robert HG (254963746@qq.com) on 7/23/14.
- * 用于定义通信协议中的一些code
+ *         用于定义通信协议中的一些code
  */
 public class JobProtos {
 
@@ -26,7 +26,8 @@ public class JobProtos {
         JOB_PULL(16),
         // TaskTracker的业务日志
         BIZ_LOG_SEND(17),
-        ;
+        // 取消(删除)任务
+        CANCEL_JOB(18),;
 
         private int code;
 
@@ -74,9 +75,11 @@ public class JobProtos {
         // 任务推送
         JOB_PULL_SUCCESS(21),
         // 业务日志发送成功
-        BIZ_LOG_SEND_SUCCESS(22)
-
-        ;
+        BIZ_LOG_SEND_SUCCESS(22),
+        // 任务删除成功
+        JOB_CANCEL_SUCCESS(23),
+        // 任务删除失败
+        JOB_CANCEL_FAILED(24);
 
 
         private int code;

+ 28 - 0
lts-core/src/main/java/com/lts/core/protocol/command/JobCancelRequest.java

@@ -0,0 +1,28 @@
+package com.lts.core.protocol.command;
+
+/**
+ * 取消(删除)任务
+ * @author Robert HG (254963746@qq.com) on 11/7/15.
+ */
+public class JobCancelRequest extends AbstractRemotingCommandBody{
+
+    private String taskId;
+
+    private String taskTrackerNodeGroup;
+
+    public String getTaskId() {
+        return taskId;
+    }
+
+    public void setTaskId(String taskId) {
+        this.taskId = taskId;
+    }
+
+    public String getTaskTrackerNodeGroup() {
+        return taskTrackerNodeGroup;
+    }
+
+    public void setTaskTrackerNodeGroup(String taskTrackerNodeGroup) {
+        this.taskTrackerNodeGroup = taskTrackerNodeGroup;
+    }
+}

+ 20 - 0
lts-example/src/main/java/com/lts/example/api/JobClientTest.java

@@ -24,6 +24,7 @@ public class JobClientTest extends BaseJobClientTest {
 //        submitWidthReplaceOnExist();
         console();
 //        testProtector();
+//        cancelJob();
     }
 
     public static void submitWidthReplaceOnExist() throws IOException {
@@ -49,6 +50,25 @@ public class JobClientTest extends BaseJobClientTest {
         System.out.println(response);
     }
 
+    public static void cancelJob(){
+        JobClient jobClient = new RetryJobClient();
+        jobClient.setNodeGroup("test_jobClient");
+        jobClient.setClusterName("test_cluster");
+        jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181");
+//         jobClient.setRegistryAddress("redis://127.0.0.1:6379");
+        // 任务重试保存地址,默认用户目录下
+        // jobClient.setDataPath(Constants.USER_HOME);
+        // 任务完成反馈接口
+        jobClient.setJobFinishedHandler(new JobFinishedHandlerImpl());
+        // master 节点变化监听器,当有集群中只需要一个节点执行某个事情的时候,可以监听这个事件
+        jobClient.addMasterChangeListener(new MasterChangeListenerImpl());
+        // 可选址  leveldb(默认), rocksdb, berkeleydb
+        // taskTracker.addConfig("job.fail.store", "leveldb");
+        jobClient.start();
+
+        jobClient.cancelJob("t_4", "test_trade_TaskTracker");
+    }
+
 
     public static void console() throws IOException {
         // 推荐使用RetryJobClient

+ 51 - 4
lts-jobclient/src/main/java/com/lts/jobclient/JobClient.java

@@ -2,6 +2,7 @@ package com.lts.jobclient;
 
 import com.lts.core.Application;
 import com.lts.core.cluster.AbstractClientNode;
+import com.lts.core.commons.utils.Assert;
 import com.lts.core.commons.utils.BatchUtils;
 import com.lts.core.commons.utils.CollectionUtils;
 import com.lts.core.commons.utils.CommonUtils;
@@ -13,6 +14,7 @@ import com.lts.core.logger.Logger;
 import com.lts.core.logger.LoggerFactory;
 import com.lts.core.protocol.JobProtos;
 import com.lts.core.protocol.command.CommandBodyWrapper;
+import com.lts.core.protocol.command.JobCancelRequest;
 import com.lts.core.protocol.command.JobSubmitRequest;
 import com.lts.core.protocol.command.JobSubmitResponse;
 import com.lts.core.support.LoggerName;
@@ -49,8 +51,6 @@ public class JobClient<T extends JobClientNode, App extends Application> extends
     // 过载保护的提交者
     private JobSubmitProtector protector;
 
-    private JobFinishedHandler jobFinishedHandler;
-
     @Override
     protected void beforeStart() {
         application.setRemotingClient(remotingClient);
@@ -84,6 +84,47 @@ public class JobClient<T extends JobClientNode, App extends Application> extends
         });
     }
 
+    /**
+     * 取消任务
+     */
+    public Response cancelJob(String taskId, String taskTrackerNodeGroup) {
+
+        final Response response = new Response();
+
+        Assert.hasText(taskId, "taskId can not be empty");
+        Assert.hasText(taskTrackerNodeGroup, "taskTrackerNodeGroup can not be empty");
+
+        JobCancelRequest request = CommandBodyWrapper.wrapper(application, new JobCancelRequest());
+        request.setTaskId(taskId);
+        request.setTaskTrackerNodeGroup(taskTrackerNodeGroup);
+
+        RemotingCommand requestCommand = RemotingCommand.createRequestCommand(
+                JobProtos.RequestCode.CANCEL_JOB.code(), request);
+
+        try {
+            RemotingCommand remotingResponse = remotingClient.invokeSync(requestCommand);
+
+            if (JobProtos.ResponseCode.JOB_CANCEL_SUCCESS.code() == remotingResponse.getCode()) {
+                LOGGER.info("Cancel job success taskId={}, taskTrackerNodeGroup={} ", taskId, taskTrackerNodeGroup);
+                response.setSuccess(true);
+                return response;
+            }
+
+            response.setSuccess(false);
+            response.setCode(JobProtos.ResponseCode.valueOf(remotingResponse.getCode()).name());
+            response.setMsg(remotingResponse.getRemark());
+            LOGGER.warn("Cancel job failed: taskId={}, taskTrackerNodeGroup={}, msg={}", taskId,
+                    taskTrackerNodeGroup, remotingResponse.getRemark());
+            return response;
+
+        } catch (JobTrackerNotFoundException e) {
+            response.setSuccess(false);
+            response.setCode(ResponseCode.JOB_TRACKER_NOT_FOUND);
+            response.setMsg("Can not found JobTracker node!");
+            return response;
+        }
+    }
+
     private void checkFields(List<Job> jobs) {
         // 参数验证
         if (CollectionUtils.isEmpty(jobs)) {
@@ -152,6 +193,9 @@ public class JobClient<T extends JobClientNode, App extends Application> extends
         return response;
     }
 
+    /**
+     * 异步提交任务
+     */
     private void asyncSubmit(RemotingCommand requestCommand, final SubmitCallback submitCallback)
             throws JobTrackerNotFoundException {
         final CountDownLatch latch = new CountDownLatch(1);
@@ -172,6 +216,9 @@ public class JobClient<T extends JobClientNode, App extends Application> extends
         }
     }
 
+    /**
+     * 同步提交任务
+     */
     private void syncSubmit(RemotingCommand requestCommand, final SubmitCallback submitCallback)
             throws JobTrackerNotFoundException {
         submitCallback.call(remotingClient.invokeSync(requestCommand));
@@ -200,14 +247,14 @@ public class JobClient<T extends JobClientNode, App extends Application> extends
 
     @Override
     protected RemotingProcessor getDefaultProcessor() {
-        return new RemotingDispatcher(jobFinishedHandler);
+        return new RemotingDispatcher(application);
     }
 
     /**
      * 设置任务完成接收器
      */
     public void setJobFinishedHandler(JobFinishedHandler jobFinishedHandler) {
-        this.jobFinishedHandler = jobFinishedHandler;
+        application.setJobFinishedHandler(jobFinishedHandler);
     }
 
     enum SubmitType {

+ 11 - 0
lts-jobclient/src/main/java/com/lts/jobclient/domain/JobClientApplication.java

@@ -2,6 +2,7 @@ package com.lts.jobclient.domain;
 
 import com.lts.core.Application;
 import com.lts.core.remoting.RemotingClientDelegate;
+import com.lts.jobclient.support.JobFinishedHandler;
 
 /**
  * @author Robert HG (254963746@qq.com) on 3/30/15.
@@ -10,6 +11,16 @@ public class JobClientApplication extends Application{
 
     private RemotingClientDelegate remotingClient;
 
+    private JobFinishedHandler jobFinishedHandler;
+
+    public JobFinishedHandler getJobFinishedHandler() {
+        return jobFinishedHandler;
+    }
+
+    public void setJobFinishedHandler(JobFinishedHandler jobFinishedHandler) {
+        this.jobFinishedHandler = jobFinishedHandler;
+    }
+
     public RemotingClientDelegate getRemotingClient() {
         return remotingClient;
     }

+ 0 - 10
lts-jobclient/src/main/java/com/lts/jobclient/processor/AbstractProcessor.java

@@ -1,10 +0,0 @@
-package com.lts.jobclient.processor;
-
-import com.lts.remoting.RemotingProcessor;
-
-/**
- * @author Robert HG (254963746@qq.com) on 8/16/14.
- */
-public abstract class AbstractProcessor implements RemotingProcessor {
-
-}

+ 10 - 8
lts-jobclient/src/main/java/com/lts/jobclient/processor/JobFinishedProcessor.java

@@ -4,22 +4,23 @@ import com.lts.core.logger.Logger;
 import com.lts.core.logger.LoggerFactory;
 import com.lts.core.protocol.JobProtos;
 import com.lts.core.protocol.command.JobFinishedRequest;
-import com.lts.jobclient.support.JobFinishedHandler;
+import com.lts.jobclient.domain.JobClientApplication;
 import com.lts.remoting.Channel;
+import com.lts.remoting.RemotingProcessor;
 import com.lts.remoting.exception.RemotingCommandException;
 import com.lts.remoting.protocol.RemotingCommand;
 
 /**
  * @author Robert HG (254963746@qq.com) on 8/18/14.
  */
-public class JobFinishedProcessor extends AbstractProcessor {
+public class JobFinishedProcessor implements RemotingProcessor {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(JobFinishedProcessor.class);
 
-    private JobFinishedHandler jobFinishedHandler;
+    private JobClientApplication application;
 
-    public JobFinishedProcessor(JobFinishedHandler jobFinishedHandler) {
-        this.jobFinishedHandler = jobFinishedHandler;
+    public JobFinishedProcessor(JobClientApplication application) {
+        this.application = application;
     }
 
     @Override
@@ -28,13 +29,14 @@ public class JobFinishedProcessor extends AbstractProcessor {
 
         JobFinishedRequest requestBody = request.getBody();
         try {
-            if (jobFinishedHandler != null) {
-                jobFinishedHandler.handle(requestBody.getJobResults());
+            if (application.getJobFinishedHandler() != null) {
+                application.getJobFinishedHandler().handle(requestBody.getJobResults());
             }
         } catch (Exception t) {
             LOGGER.error(t.getMessage(), t);
         }
 
-        return RemotingCommand.createResponseCommand(JobProtos.ResponseCode.JOB_NOTIFY_SUCCESS.code(), "received successful");
+        return RemotingCommand.createResponseCommand(JobProtos.ResponseCode.JOB_NOTIFY_SUCCESS.code(),
+                "received successful");
     }
 }

+ 6 - 6
lts-jobclient/src/main/java/com/lts/jobclient/processor/RemotingDispatcher.java

@@ -1,10 +1,10 @@
 package com.lts.jobclient.processor;
 
 import com.lts.core.protocol.JobProtos;
-import com.lts.jobclient.support.JobFinishedHandler;
+import com.lts.jobclient.domain.JobClientApplication;
 import com.lts.remoting.Channel;
-import com.lts.remoting.exception.RemotingCommandException;
 import com.lts.remoting.RemotingProcessor;
+import com.lts.remoting.exception.RemotingCommandException;
 import com.lts.remoting.protocol.RemotingCommand;
 import com.lts.remoting.protocol.RemotingProtos;
 
@@ -16,14 +16,14 @@ import static com.lts.core.protocol.JobProtos.RequestCode.valueOf;
 
 /**
  * @author Robert HG (254963746@qq.com) on 7/25/14.
- * 客户端默认通信处理器
+ *         客户端默认通信处理器
  */
-public class RemotingDispatcher extends AbstractProcessor {
+public class RemotingDispatcher implements RemotingProcessor {
 
     private final Map<JobProtos.RequestCode, RemotingProcessor> processors = new HashMap<JobProtos.RequestCode, RemotingProcessor>();
 
-    public RemotingDispatcher(JobFinishedHandler jobFinishedHandler) {
-        processors.put(JOB_FINISHED, new JobFinishedProcessor(jobFinishedHandler));
+    public RemotingDispatcher(JobClientApplication application) {
+        processors.put(JOB_FINISHED, new JobFinishedProcessor(application));
     }
 
     @Override

+ 62 - 0
lts-jobtracker/src/main/java/com/lts/jobtracker/processor/JobCancelProcessor.java

@@ -0,0 +1,62 @@
+package com.lts.jobtracker.processor;
+
+import com.lts.biz.logger.domain.JobLogPo;
+import com.lts.biz.logger.domain.LogType;
+import com.lts.core.constant.Level;
+import com.lts.core.logger.Logger;
+import com.lts.core.logger.LoggerFactory;
+import com.lts.core.protocol.JobProtos;
+import com.lts.core.protocol.command.JobCancelRequest;
+import com.lts.core.support.SystemClock;
+import com.lts.jobtracker.domain.JobTrackerApplication;
+import com.lts.jobtracker.support.JobDomainConverter;
+import com.lts.queue.domain.JobPo;
+import com.lts.remoting.Channel;
+import com.lts.remoting.exception.RemotingCommandException;
+import com.lts.remoting.protocol.RemotingCommand;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 11/7/15.
+ */
+public class JobCancelProcessor extends AbstractRemotingProcessor {
+
+    private final Logger LOGGER = LoggerFactory.getLogger(JobCancelProcessor.class);
+
+    public JobCancelProcessor(JobTrackerApplication application) {
+        super(application);
+    }
+
+    @Override
+    public RemotingCommand processRequest(Channel channel, RemotingCommand request) throws RemotingCommandException {
+
+        JobCancelRequest jobCancelRequest = request.getBody();
+
+        String taskId = jobCancelRequest.getTaskId();
+        String taskTrackerNodeGroup = jobCancelRequest.getTaskTrackerNodeGroup();
+        JobPo job = application.getCronJobQueue().getJob(taskTrackerNodeGroup, taskId);
+        if (job == null) {
+            job = application.getExecutableJobQueue().getJob(taskTrackerNodeGroup, taskId);
+        }
+
+        if (job != null) {
+            application.getExecutableJobQueue().remove(job.getTaskTrackerNodeGroup(), job.getJobId());
+            if (job.isSchedule()) {
+                application.getCronJobQueue().remove(job.getJobId());
+            }
+            // 记录日志
+            JobLogPo jobLogPo = JobDomainConverter.convertJobLog(job);
+            jobLogPo.setSuccess(true);
+            jobLogPo.setLogType(LogType.DEL);
+            jobLogPo.setLogTime(SystemClock.now());
+            jobLogPo.setLevel(Level.INFO);
+            application.getJobLogger().log(jobLogPo);
+
+            LOGGER.info("Cancel Job success , jobId={}, taskId={}, taskTrackerNodeGroup={}", job.getJobId(), taskId, taskTrackerNodeGroup);
+            return RemotingCommand.createResponseCommand(JobProtos
+                    .ResponseCode.JOB_CANCEL_SUCCESS.code());
+        }
+
+        return RemotingCommand.createResponseCommand(JobProtos
+                .ResponseCode.JOB_CANCEL_FAILED.code(), "Job maybe running");
+    }
+}

+ 1 - 1
lts-jobtracker/src/main/java/com/lts/jobtracker/processor/JobSubmitProcessor.java

@@ -37,7 +37,7 @@ public class JobSubmitProcessor extends AbstractRemotingProcessor {
                     JobProtos.ResponseCode.JOB_RECEIVE_SUCCESS.code(), "job submit success!", jobSubmitResponse);
 
         } catch (JobReceiveException e) {
-            LOGGER.error("receive job failed , jobs = " + jobSubmitRequest.getJobs(), e);
+            LOGGER.error("Receive job failed , jobs = " + jobSubmitRequest.getJobs(), e);
             jobSubmitResponse.setSuccess(false);
             jobSubmitResponse.setMsg(e.getMessage());
             jobSubmitResponse.setFailedJobs(e.getJobs());

+ 1 - 0
lts-jobtracker/src/main/java/com/lts/jobtracker/processor/RemotingDispatcher.java

@@ -30,6 +30,7 @@ public class RemotingDispatcher extends AbstractRemotingProcessor {
         processors.put(RequestCode.JOB_FINISHED, new JobFinishedProcessor(application));
         processors.put(RequestCode.JOB_PULL, new JobPullProcessor(application));
         processors.put(RequestCode.BIZ_LOG_SEND, new JobBizLogProcessor(application));
+        processors.put(RequestCode.CANCEL_JOB, new JobCancelProcessor(application));
     }
 
     @Override

+ 2 - 1
lts-logger/lts-logger-api/src/main/java/com/lts/biz/logger/domain/LogType.java

@@ -10,6 +10,7 @@ public enum LogType {
     FINISHED,        // 任务执行完成
     RESEND,          // TaskTracker 重新发送的任务执行结果
     FIXED_DEAD,       // 修复死掉的任务
-    BIZ             // 业务日志
+    BIZ,             // 业务日志
+    DEL             // 删除
     ;
 }