Browse Source

performance: jobclient JobCancel的时候, 也会remove掉 暂停队列

hugui 8 năm trước cách đây
mục cha
commit
42ed1f575d

+ 1 - 0
lts-core/src/main/java/com/github/ltsopensource/queue/AbstractPreLoader.java

@@ -59,6 +59,7 @@ public abstract class AbstractPreLoader implements PreLoader {
                             // 加入到内存中
                             if (CollectionUtils.isNotEmpty(loads)) {
                                 for (JobPo load : loads) {
+                                    // TODO 这里可以优化,对于force这种场景,可以移除执行优先级低的
                                     if (!queue.offer(load)) {
                                         // 没有成功说明已经满了
                                         break;

+ 5 - 1
lts-core/src/main/java/com/github/ltsopensource/queue/SuspendJobQueue.java

@@ -8,7 +8,7 @@ import com.github.ltsopensource.store.jdbc.exception.DupEntryException;
  *
  * @author Robert HG (254963746@qq.com) on 5/27/15.
  */
-public interface SuspendJobQueue extends JobQueue{
+public interface SuspendJobQueue extends JobQueue {
 
     /**
      * 添加任务
@@ -24,4 +24,8 @@ public interface SuspendJobQueue extends JobQueue{
      */
     boolean remove(String jobId);
 
+    /**
+     * 得到JobPo
+     */
+    JobPo getJob(String taskTrackerNodeGroup, String taskId);
 }

+ 9 - 0
lts-core/src/main/java/com/github/ltsopensource/queue/mongo/MongoSuspendJobQueue.java

@@ -67,4 +67,13 @@ public class MongoSuspendJobQueue extends AbstractMongoJobQueue implements Suspe
         return wr.getN() == 1;
     }
 
+    @Override
+    public JobPo getJob(String taskTrackerNodeGroup, String taskId) {
+        String tableName = JobQueueUtils.getExecutableQueueName(taskTrackerNodeGroup);
+        Query<JobPo> query = template.createQuery(tableName, JobPo.class);
+        query.field("taskId").equal(taskId).
+                field("taskTrackerNodeGroup").equal(taskTrackerNodeGroup);
+        return query.get();
+    }
+
 }

+ 12 - 0
lts-core/src/main/java/com/github/ltsopensource/queue/mysql/MysqlSuspendJobQueue.java

@@ -50,6 +50,18 @@ public class MysqlSuspendJobQueue extends AbstractMysqlJobQueue implements Suspe
                 .doDelete() == 1;
     }
 
+    @Override
+    public JobPo getJob(String taskTrackerNodeGroup, String taskId) {
+        return new SelectSql(getSqlTemplate())
+                .select()
+                .all()
+                .from()
+                .table(getTableName())
+                .where("task_id = ?", taskId)
+                .and("task_tracker_node_group = ?", taskTrackerNodeGroup)
+                .single(RshHolder.JOB_PO_RSH);
+    }
+
     private String getTableName() {
         return JobQueueUtils.SUSPEND_JOB_QUEUE;
     }

+ 6 - 0
lts-jobtracker/src/main/java/com/github/ltsopensource/jobtracker/processor/JobCancelProcessor.java

@@ -40,14 +40,20 @@ public class JobCancelProcessor extends AbstractRemotingProcessor {
         if (jobPo == null) {
             jobPo = appContext.getExecutableJobQueue().getJob(taskTrackerNodeGroup, taskId);
         }
+        if (jobPo == null) {
+            jobPo = appContext.getSuspendJobQueue().getJob(taskTrackerNodeGroup, taskId);
+        }
 
         if (jobPo != null) {
+            // 队列都remove下吧
             appContext.getExecutableJobQueue().removeBatch(jobPo.getRealTaskId(), jobPo.getTaskTrackerNodeGroup());
             if (jobPo.isCron()) {
                 appContext.getCronJobQueue().remove(jobPo.getJobId());
             } else if (jobPo.isRepeatable()) {
                 appContext.getRepeatJobQueue().remove(jobPo.getJobId());
             }
+            appContext.getSuspendJobQueue().remove(jobPo.getJobId());
+
             // 记录日志
             JobLogPo jobLogPo = JobDomainConverter.convertJobLog(jobPo);
             jobLogPo.setSuccess(true);