Преглед изворни кода

1.7.0-SNAPSHOT && 增加手动触发功能

hugui пре 8 година
родитељ
комит
a072a27404

+ 1 - 5
README.md

@@ -17,11 +17,7 @@ oschina地址:
 
 这两个地址都会同步更新。感兴趣,请加QQ群:109500214 一起探讨、完善。越多人支持,就越有动力去更新,喜欢记得右上角star哈。
 
-##1.6.9(master)变更主要点
-####1. 取任务优先级,按任务设置的priority优先,再按triggerTime优先, 数据库的索引也会变
-####2. lts_executable_job_queue_ (前缀)表名改为 lts_wjq_ (前缀), 缩短表名
-####3. lts_feedback_job_queue_ (前缀)表名改为 lts_fjq_ (前缀), 缩短表名
-
+##1.7.0-SNAPSHOT(master)变更主要点
 
 ## 框架概况
 LTS 有主要有以下四种节点:

+ 1 - 1
build.cmd

@@ -4,7 +4,7 @@ start mvn clean install -DskipTests
 echo "LTS: mvn clean install -DskipTests"
 echo "LTS: After sub window finished, close it , and press any key to continue" & pause>nul
 
-set VERSION=1.6.9
+set VERSION=1.7.0-SNAPSHOT
 set BASE_HOME=%~dp0%
 set DIST_BIN_DIR=lts-%VERSION%-bin
 

+ 1 - 1
build.sh

@@ -1,6 +1,6 @@
 #!/usr/bin/env bash
 
-VERSION="1.6.9"
+VERSION="1.7.0-SNAPSHOT"
 
 LTS_BIN="${BASH_SOURCE-$0}"
 LTS_BIN="$(dirname "${LTS_BIN}")"

+ 1 - 1
lts-admin/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>lts-parent</artifactId>
         <groupId>com.github.ltsopensource</groupId>
-        <version>1.6.9</version>
+        <version>1.7.0-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <packaging>war</packaging>

+ 35 - 0
lts-admin/src/main/java/com/github/ltsopensource/admin/web/api/JobQueueApi.java

@@ -79,6 +79,41 @@ public class JobQueueApi extends AbstractMVC {
         return rsp;
     }
 
+    @RequestMapping("/job-queue/executing-job-trigger")
+    public RestfulResponse triggerJobManually(JobQueueReq request) {
+
+        try {
+            Assert.hasLength(request.getJobId(), "jobId不能为空!");
+            Assert.hasLength(request.getTaskTrackerNodeGroup(), "taskTrackerNodeGroup不能为空!");
+        } catch (IllegalArgumentException e) {
+            return Builder.build(false, e.getMessage());
+        }
+
+        HttpCmd httpCmd = new DefaultHttpCmd();
+        httpCmd.setCommand(HttpCmdNames.HTTP_CMD_TRIGGER_JOB_MANUALLY);
+        httpCmd.addParam("jobId", request.getJobId());
+        httpCmd.addParam("nodeGroup", request.getTaskTrackerNodeGroup());
+
+        List<Node> jobTrackerNodeList = appContext.getNodeMemCacheAccess().getNodeByNodeType(NodeType.JOB_TRACKER);
+        if (CollectionUtils.isEmpty(jobTrackerNodeList)) {
+            return Builder.build(false, I18nManager.getMessage("job.tracker.not.found"));
+        }
+
+        HttpCmdResponse response = null;
+        for (Node node : jobTrackerNodeList) {
+            httpCmd.setNodeIdentity(node.getIdentity());
+            response = HttpCmdClient.doGet(node.getIp(), node.getHttpCmdPort(), httpCmd);
+            if (response.isSuccess()) {
+                return Builder.build(true);
+            }
+        }
+        if (response != null) {
+            return Builder.build(false, response.getMsg());
+        } else {
+            return Builder.build(false, "TriggerFailed failed");
+        }
+    }
+
     @RequestMapping("/job-queue/executing-job-get")
     public RestfulResponse executingJobGet(JobQueueReq request) {
         PaginationRsp<JobPo> paginationRsp = appContext.getExecutingJobQueue().pageSelect(request);

+ 25 - 2
lts-admin/src/main/webapp/WEB-INF/views/templates/executableJobQueue.vm

@@ -102,7 +102,8 @@
                         class="sr-only">关闭</span></button>
                 <h2 class="modal-title">编辑任务信息</h2>
                 <h5 style="color:indianred;" id="cronEditNotice">提示:该任务为Cron任务,修改只对当前这次执行有效,如需永久生效,请到《Cron任务》菜单中修改</h5>
-                <h5 style="color:indianred;" id="repeatEditNotice">提示:该任务为Repeat任务,修改只对当前这次执行有效,如需永久生效,请到《Repeat任务》菜单中修改</h5>
+                <h5 style="color:indianred;" id="repeatEditNotice">
+                    提示:该任务为Repeat任务,修改只对当前这次执行有效,如需永久生效,请到《Repeat任务》菜单中修改</h5>
             </div>
             <div class="modal-body">
                 <div class="row">
@@ -252,11 +253,33 @@
 
         LTS.colFormatter.optFormat = function (v, row) {
             var logUrl = "job-logger.htm?realTaskId=" + row['realTaskId'] + "&taskTrackerNodeGroup=" + row['taskTrackerNodeGroup'];
-            return '<a target="_blank" href="' + logUrl + '"><span class="label label-info"><i class="fa fa-file-code-o"></i> 日志</span></a>&nbsp;' +
+            return '<a class="job-trigger-btn" jobId="' + row['jobId'] + '" taskTrackerNodeGroup="' + row['taskTrackerNodeGroup'] + '" href="javascript:;"><span class="label label-info"><i class="fa fa-file-code-o"></i> 触发</span></a>&nbsp;' +
+                    '<a target="_blank" href="' + logUrl + '"><span class="label label-info"><i class="fa fa-file-code-o"></i> 日志</span></a>&nbsp;' +
                     '<a href="javascript:;" class="job-edit-btn"><span class="label label-success"><i class="fa fa-edit"></i> 编辑</span><span class="hidden lts-data">' + JSON.stringify(row) + '</span></a>&nbsp;' +
                     '<a href="javascript:;" class="job-del-btn" jobId="' + row['jobId'] + '" cronExpression="' + row['cronExpression'] + '" taskTrackerNodeGroup="' + row['taskTrackerNodeGroup'] + '"><span class="label label-primary" style="background-color: #DD6B55;"><i class="fa fa-trash-o"></i> 删除</span></a>';
         }
 
+        $(document).on("click", ".job-trigger-btn", function () {
+            var that = $(this);
+            var jobId = that.attr("jobId");
+            var taskTrackerNodeGroup = that.attr("taskTrackerNodeGroup");
+
+            var params = {jobId: jobId, taskTrackerNodeGroup: taskTrackerNodeGroup};
+            $.ajax({
+                url: 'api/job-queue/executing-job-trigger',
+                type: 'POST',
+                dataType: 'json',
+                data: params,
+                success: function (json) {
+                    if (json && json.success) {
+                        swal("触发成功!", "恭喜你", "success");
+                    } else {
+                        json ? swal(json['msg']) : {};
+                    }
+                }
+            });
+        });
+
         $(document).on("click", ".job-del-btn", function () {
             var that = $(this);
 

+ 1 - 1
lts-core/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>lts-parent</artifactId>
         <groupId>com.github.ltsopensource</groupId>
-        <version>1.6.9</version>
+        <version>1.7.0-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <packaging>jar</packaging>

+ 2 - 0
lts-core/src/main/java/com/github/ltsopensource/core/cmd/HttpCmdNames.java

@@ -9,6 +9,8 @@ public interface HttpCmdNames {
 
     String HTTP_CMD_ADD_JOB = "job_add_cmd";
 
+    String HTTP_CMD_TRIGGER_JOB_MANUALLY = "trigger_job_manually_cmd";
+
     String HTTP_CMD_ADD_M_DATA = "monitor_data_add_cmd";
 
     String HTTP_CMD_STATUS_CHECK = "status_check_cmd";

+ 2 - 0
lts-core/src/main/java/com/github/ltsopensource/core/constant/Constants.java

@@ -69,6 +69,8 @@ public interface Constants {
 
     String IS_RETRY_JOB = "__LTS_Is_Retry_Job";
 
+    String OLD_PRIORITY = "__LTS_Tmp_Old_Priority";
+
     // 执行的序号
     String EXE_SEQ_ID = "__LTS_Seq_Id";
 }

+ 1 - 1
lts-core/src/main/java/com/github/ltsopensource/core/support/Version.java

@@ -20,7 +20,7 @@ public final class Version {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(Version.class);
 
-    private static final String VERSION = getVersion(Version.class, "1.6.9");
+    private static final String VERSION = getVersion(Version.class, "1.7.0-SNAPSHOT");
 
     static {
         // 检查是否存在重复的jar包

+ 47 - 9
lts-core/src/main/java/com/github/ltsopensource/queue/AbstractPreLoader.java

@@ -5,6 +5,7 @@ import com.github.ltsopensource.core.commons.concurrent.ConcurrentHashSet;
 import com.github.ltsopensource.core.commons.utils.Callable;
 import com.github.ltsopensource.core.commons.utils.CollectionUtils;
 import com.github.ltsopensource.core.commons.utils.StringUtils;
+import com.github.ltsopensource.core.constant.Constants;
 import com.github.ltsopensource.core.constant.ExtConfig;
 import com.github.ltsopensource.core.factory.NamedThreadFactory;
 import com.github.ltsopensource.core.support.NodeShutdownHook;
@@ -116,6 +117,25 @@ public abstract class AbstractPreLoader implements PreLoader {
         LOAD_SIGNAL.add(FORCE_PREFIX + taskTrackerNodeGroup);
     }
 
+    @Override
+    public void loadOne2First(String taskTrackerNodeGroup, String jobId) {
+        JobPo jobPo = getJob(taskTrackerNodeGroup, jobId);
+        if (jobPo == null) {
+            return;
+        }
+        JobPriorityBlockingQueue queue = getQueue(taskTrackerNodeGroup);
+        jobPo.setInternalExtParam(Constants.OLD_PRIORITY, String.valueOf(jobPo.getPriority()));
+
+        jobPo.setPriority(Integer.MIN_VALUE);
+
+        if (!queue.offer(jobPo)) {
+            queue.poll();
+            queue.offer(jobPo);
+        }
+    }
+
+    protected abstract JobPo getJob(String taskTrackerNodeGroup, String jobId);
+
     /**
      * 锁定任务
      */
@@ -131,14 +151,8 @@ public abstract class AbstractPreLoader implements PreLoader {
     protected abstract List<JobPo> load(String loadTaskTrackerNodeGroup, int loadSize);
 
     private JobPo get(String taskTrackerNodeGroup) {
-        JobPriorityBlockingQueue queue = JOB_MAP.get(taskTrackerNodeGroup);
-        if (queue == null) {
-            queue = new JobPriorityBlockingQueue(loadSize);
-            JobPriorityBlockingQueue oldQueue = JOB_MAP.putIfAbsent(taskTrackerNodeGroup, queue);
-            if (oldQueue != null) {
-                queue = oldQueue;
-            }
-        }
+
+        JobPriorityBlockingQueue queue = getQueue(taskTrackerNodeGroup);
 
         if (queue.size() / loadSize < factor) {
             // 触发加载的请求
@@ -146,7 +160,31 @@ public abstract class AbstractPreLoader implements PreLoader {
                 LOAD_SIGNAL.add(taskTrackerNodeGroup);
             }
         }
-        return queue.poll();
+        JobPo jobPo = queue.poll();
+        if (jobPo != null && jobPo.getPriority() == Integer.MIN_VALUE) {
+            if (CollectionUtils.isNotEmpty(jobPo.getInternalExtParams())) {
+                if (jobPo.getInternalExtParams().containsKey(Constants.OLD_PRIORITY)) {
+                    try {
+                        int priority = Integer.parseInt(jobPo.getInternalExtParam(Constants.OLD_PRIORITY));
+                        jobPo.getInternalExtParams().remove(Constants.OLD_PRIORITY);
+                        jobPo.setPriority(priority);
+                    } catch (NumberFormatException ignored) {
+                    }
+                }
+            }
+        }
+        return jobPo;
     }
 
+    private JobPriorityBlockingQueue getQueue(String taskTrackerNodeGroup) {
+        JobPriorityBlockingQueue queue = JOB_MAP.get(taskTrackerNodeGroup);
+        if (queue == null) {
+            queue = new JobPriorityBlockingQueue(loadSize);
+            JobPriorityBlockingQueue oldQueue = JOB_MAP.putIfAbsent(taskTrackerNodeGroup, queue);
+            if (oldQueue != null) {
+                queue = oldQueue;
+            }
+        }
+        return queue;
+    }
 }

+ 5 - 0
lts-core/src/main/java/com/github/ltsopensource/queue/PreLoader.java

@@ -13,4 +13,9 @@ public interface PreLoader {
      * 如果taskTrackerNodeGroup为空,那么load所有的
      */
     public void load(String taskTrackerNodeGroup);
+
+    /**
+     * 加载某个任务并放置第一个
+     */
+    public void loadOne2First(String taskTrackerNodeGroup, String jobId);
 }

+ 9 - 0
lts-core/src/main/java/com/github/ltsopensource/queue/mongo/MongoPreLoader.java

@@ -27,6 +27,15 @@ public class MongoPreLoader extends AbstractPreLoader {
                 (AdvancedDatastore) DataStoreProvider.getDataStore(appContext.getConfig()));
     }
 
+    @Override
+    protected JobPo getJob(String taskTrackerNodeGroup, String jobId) {
+        String tableName = JobQueueUtils.getExecutableQueueName(taskTrackerNodeGroup);
+        Query<JobPo> query = template.createQuery(tableName, JobPo.class);
+        query.field("jobId").equal(jobId).
+                field("taskTrackerNodeGroup").equal(taskTrackerNodeGroup);
+        return query.get();
+    }
+
     protected boolean lockJob(String taskTrackerNodeGroup, String jobId, String taskTrackerIdentity, Long triggerTime, Long gmtModified) {
         UpdateOperations<JobPo> operations =
                 template.createUpdateOperations(JobPo.class)

+ 12 - 0
lts-core/src/main/java/com/github/ltsopensource/queue/mysql/MysqlPreLoader.java

@@ -29,6 +29,18 @@ public class MysqlPreLoader extends AbstractPreLoader {
         this.sqlTemplate = SqlTemplateFactory.create(appContext.getConfig());
     }
 
+    @Override
+    protected JobPo getJob(String taskTrackerNodeGroup, String jobId) {
+        return new SelectSql(sqlTemplate)
+                .select()
+                .all()
+                .from()
+                .table(getTableName(taskTrackerNodeGroup))
+                .where("job_id = ?", jobId)
+                .and("task_tracker_node_group = ?", taskTrackerNodeGroup)
+                .single(RshHolder.JOB_PO_RSH);
+    }
+
     @Override
     protected boolean lockJob(String taskTrackerNodeGroup, String jobId,
                               String taskTrackerIdentity,

+ 1 - 1
lts-jobclient/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>lts-parent</artifactId>
         <groupId>com.github.ltsopensource</groupId>
-        <version>1.6.9</version>
+        <version>1.7.0-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

+ 1 - 1
lts-jobtracker/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>lts-parent</artifactId>
         <groupId>com.github.ltsopensource</groupId>
-        <version>1.6.9</version>
+        <version>1.7.0-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

+ 4 - 2
lts-jobtracker/src/main/java/com/github/ltsopensource/jobtracker/JobTracker.java

@@ -6,6 +6,7 @@ import com.github.ltsopensource.core.spi.ServiceLoader;
 import com.github.ltsopensource.jobtracker.channel.ChannelManager;
 import com.github.ltsopensource.jobtracker.cmd.AddJobHttpCmd;
 import com.github.ltsopensource.jobtracker.cmd.LoadJobHttpCmd;
+import com.github.ltsopensource.jobtracker.cmd.TriggerJobManuallyHttpCmd;
 import com.github.ltsopensource.jobtracker.domain.JobTrackerAppContext;
 import com.github.ltsopensource.jobtracker.domain.JobTrackerNode;
 import com.github.ltsopensource.jobtracker.monitor.JobTrackerMStatReporter;
@@ -71,9 +72,10 @@ public class JobTracker extends AbstractServerNode<JobTrackerNode, JobTrackerApp
 
         appContext.getHttpCmdServer().registerCommands(
                 new LoadJobHttpCmd(appContext),     // 手动加载任务
-                new AddJobHttpCmd(appContext));     // 添加任务
+                new AddJobHttpCmd(appContext),
+                new TriggerJobManuallyHttpCmd(appContext));     // 添加任务
 
-        if(appContext.getOldDataHandler() == null){
+        if (appContext.getOldDataHandler() == null) {
             appContext.setOldDataHandler(new OldDataDeletePolicy());
         }
     }

+ 56 - 0
lts-jobtracker/src/main/java/com/github/ltsopensource/jobtracker/cmd/TriggerJobManuallyHttpCmd.java

@@ -0,0 +1,56 @@
+package com.github.ltsopensource.jobtracker.cmd;
+
+import com.github.ltsopensource.cmd.HttpCmdProc;
+import com.github.ltsopensource.cmd.HttpCmdRequest;
+import com.github.ltsopensource.cmd.HttpCmdResponse;
+import com.github.ltsopensource.core.cmd.HttpCmdNames;
+import com.github.ltsopensource.core.commons.utils.StringUtils;
+import com.github.ltsopensource.core.logger.Logger;
+import com.github.ltsopensource.core.logger.LoggerFactory;
+import com.github.ltsopensource.jobtracker.domain.JobTrackerAppContext;
+
+/**
+ * 用来手动触发任务
+ * 内部做的事情就是将某个任务加载到内存中
+ * Created by hugui.hg on 8/4/16.
+ */
+public class TriggerJobManuallyHttpCmd implements HttpCmdProc {
+
+    private final Logger LOGGER = LoggerFactory.getLogger(TriggerJobManuallyHttpCmd.class);
+
+    private JobTrackerAppContext appContext;
+
+    public TriggerJobManuallyHttpCmd(JobTrackerAppContext appContext) {
+        this.appContext = appContext;
+    }
+
+    @Override
+    public String nodeIdentity() {
+        return appContext.getConfig().getIdentity();
+    }
+
+    @Override
+    public String getCommand() {
+        return HttpCmdNames.HTTP_CMD_TRIGGER_JOB_MANUALLY;
+    }
+
+    @Override
+    public HttpCmdResponse execute(HttpCmdRequest request) throws Exception {
+        String taskTrackerNodeGroup = request.getParam("nodeGroup");
+        String jobId = request.getParam("jobId");
+
+        if (StringUtils.isEmpty(taskTrackerNodeGroup)) {
+            return HttpCmdResponse.newResponse(true, "nodeGroup should not be empty");
+        }
+
+        if (StringUtils.isEmpty(jobId)) {
+            return HttpCmdResponse.newResponse(true, "jobId should not be empty");
+        }
+
+        appContext.getPreLoader().loadOne2First(taskTrackerNodeGroup, jobId);
+
+        LOGGER.info("Trigger Job jobId={} taskTrackerNodeGroup={}", jobId, taskTrackerNodeGroup);
+
+        return HttpCmdResponse.newResponse(true, "trigger job succeed");
+    }
+}

+ 1 - 1
lts-monitor/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>lts-parent</artifactId>
         <groupId>com.github.ltsopensource</groupId>
-        <version>1.6.9</version>
+        <version>1.7.0-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

+ 1 - 1
lts-spring/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>lts-parent</artifactId>
         <groupId>com.github.ltsopensource</groupId>
-        <version>1.6.9</version>
+        <version>1.7.0-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <packaging>jar</packaging>

+ 1 - 1
lts-startup/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>lts-parent</artifactId>
         <groupId>com.github.ltsopensource</groupId>
-        <version>1.6.9</version>
+        <version>1.7.0-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <packaging>jar</packaging>

+ 1 - 1
lts-tasktracker/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>lts-parent</artifactId>
         <groupId>com.github.ltsopensource</groupId>
-        <version>1.6.9</version>
+        <version>1.7.0-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

+ 1 - 1
lts/pom.xml

@@ -5,7 +5,7 @@
     <parent>
         <artifactId>lts-parent</artifactId>
         <groupId>com.github.ltsopensource</groupId>
-        <version>1.6.9</version>
+        <version>1.7.0-SNAPSHOT</version>
     </parent>
     <packaging>jar</packaging>
     <modelVersion>4.0.0</modelVersion>

+ 1 - 1
pom.xml

@@ -13,7 +13,7 @@
     <groupId>com.github.ltsopensource</groupId>
     <artifactId>lts-parent</artifactId>
     <packaging>pom</packaging>
-    <version>1.6.9</version>
+    <version>1.7.0-SNAPSHOT</version>
     <url>https://github.com/ltsopensource/light-task-scheduler.git</url>
     <modules>
         <module>lts-core</module>