Browse Source

优化 队列策略,大大提升性能

hugui 9 years ago
parent
commit
e7af10a2a7
24 changed files with 419 additions and 27 deletions
  1. 8 0
      lts-core/src/main/java/com/lts/core/cluster/SubscribedNodeManager.java
  2. 5 0
      lts-core/src/main/java/com/lts/core/constant/EcTopic.java
  3. 3 2
      lts-example/src/main/java/com/lts/example/api/JobClientTest.java
  4. 1 1
      lts-example/src/main/java/com/lts/example/spring/SpringJobClientTest.java
  5. 1 1
      lts-example/src/main/java/com/lts/example/spring/SpringJobTrackerTest.java
  6. 1 1
      lts-example/src/main/java/com/lts/example/spring/SpringTaskTrackerTest.java
  7. 1 1
      lts-example/src/main/java/com/lts/example/support/SpringBean.java
  8. 8 2
      lts-jobtracker/src/main/java/com/lts/jobtracker/support/JobPusher.java
  9. 1 1
      lts-logger/lts-logger-mongo/src/main/java/com/lts/biz/logger/mongo/MongoJobLogger.java
  10. 143 0
      lts-queue/lts-queue-api/src/main/java/com/lts/queue/AbstractPreLoader.java
  11. 12 0
      lts-queue/lts-queue-api/src/main/java/com/lts/queue/PreLoader.java
  12. 18 0
      lts-queue/lts-queue-api/src/main/java/com/lts/queue/PreLoaderFactory.java
  13. 1 1
      lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoCronJobQueue.java
  14. 39 14
      lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoExecutableJobQueue.java
  15. 1 1
      lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoExecutingJobQueue.java
  16. 1 1
      lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoJobFeedbackQueue.java
  17. 1 1
      lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoNodeGroupStore.java
  18. 56 0
      lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoPreLoader.java
  19. 16 0
      lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoPreLoaderFactory.java
  20. 1 0
      lts-queue/lts-queue-mongo/src/main/resources/META-INF/lts/internal/com.lts.queue.PreLoaderFactory
  21. 1 0
      lts-queue/lts-queue-mysql/src/main/java/com/lts/queue/mysql/MysqlExecutableJobQueue.java
  22. 83 0
      lts-queue/lts-queue-mysql/src/main/java/com/lts/queue/mysql/MysqlPreLoader.java
  23. 16 0
      lts-queue/lts-queue-mysql/src/main/java/com/lts/queue/mysql/MysqlPreLoaderFactory.java
  24. 1 0
      lts-queue/lts-queue-mysql/src/main/resources/META-INF/lts/internal/com.lts.queue.PreLoaderFactory

+ 8 - 0
lts-core/src/main/java/com/lts/core/cluster/SubscribedNodeManager.java

@@ -4,9 +4,11 @@ package com.lts.core.cluster;
 import com.lts.core.Application;
 import com.lts.core.commons.utils.CollectionUtils;
 import com.lts.core.commons.utils.ListUtils;
+import com.lts.core.constant.EcTopic;
 import com.lts.core.listener.NodeChangeListener;
 import com.lts.core.logger.Logger;
 import com.lts.core.logger.LoggerFactory;
+import com.lts.ec.EventInfo;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -58,6 +60,9 @@ public class SubscribedNodeManager implements NodeChangeListener {
             }
         }
         nodeList.add(node);
+        EventInfo eventInfo = new EventInfo(EcTopic.NODE_ADD);
+        eventInfo.setParam("node", node);
+        application.getEventCenter().publishSync(eventInfo);
         LOGGER.info("Add {}", node);
     }
 
@@ -94,6 +99,9 @@ public class SubscribedNodeManager implements NodeChangeListener {
             for (Node node : nodeList) {
                 if (node.getIdentity().equals(delNode.getIdentity())) {
                     nodeList.remove(node);
+                    EventInfo eventInfo = new EventInfo(EcTopic.NODE_REMOVE);
+                    eventInfo.setParam("node", node);
+                    application.getEventCenter().publishSync(eventInfo);
                     LOGGER.info("Remove {}", node);
                 }
             }

+ 5 - 0
lts-core/src/main/java/com/lts/core/constant/EcTopic.java

@@ -18,4 +18,9 @@ public interface EcTopic {
     String JOB_TRACKER_AVAILABLE = "JOB_TRACKER_AVAILABLE";
     // master 节点改变了
     String MASTER_CHANGED = "MASTER_CHANGED";
+
+    String NODE_ADD = "NODE_ADD";
+
+    String NODE_REMOVE = "NODE_REMOVE";
+
 }

+ 3 - 2
lts-example/src/main/java/com/lts/example/api/JobClientTest.java

@@ -21,8 +21,8 @@ import java.util.concurrent.atomic.AtomicLong;
 public class JobClientTest extends BaseJobClientTest {
 
     public static void main(String[] args) throws IOException {
-        console();
-//        testProtector();
+//        console();
+        testProtector();
     }
 
     public static void console() throws IOException {
@@ -82,6 +82,7 @@ public class JobClientTest extends BaseJobClientTest {
                         job.setTaskId(StringUtils.generateUUID());
                         job.setTaskTrackerNodeGroup("test_trade_TaskTracker");
                         job.setParam("shopId", "111");
+                        job.setNeedFeedback(false);
                         try {
                             Response response = jobClient.submitJob(job);
                             System.out.print(" " + num.incrementAndGet());

+ 1 - 1
lts-example/src/main/java/com/lts/example/spring/SpringJobClientTest.java

@@ -8,7 +8,7 @@ import org.springframework.context.support.ClassPathXmlApplicationContext;
 import java.io.IOException;
 
 /**
- * Created by hugui on 8/4/15.
+ * @author Robert HG (254963746@qq.com) on 8/4/15.
  */
 public class SpringJobClientTest extends BaseJobClientTest {
 

+ 1 - 1
lts-example/src/main/java/com/lts/example/spring/SpringJobTrackerTest.java

@@ -5,7 +5,7 @@ import org.springframework.context.support.ClassPathXmlApplicationContext;
 import java.io.IOException;
 
 /**
- * Created by hugui on 8/4/15.
+ * @author Robert HG (254963746@qq.com) on 8/4/15.
  */
 public class SpringJobTrackerTest {
 

+ 1 - 1
lts-example/src/main/java/com/lts/example/spring/SpringTaskTrackerTest.java

@@ -5,7 +5,7 @@ import org.springframework.context.support.ClassPathXmlApplicationContext;
 import java.io.IOException;
 
 /**
- * Created by hugui on 8/4/15.
+ * @author Robert HG (254963746@qq.com) on 8/4/15.
  */
 public class SpringTaskTrackerTest {
 

+ 1 - 1
lts-example/src/main/java/com/lts/example/support/SpringBean.java

@@ -2,7 +2,7 @@ package com.lts.example.support;
 
 /**
  * 测试bean 注入
- * Created by hugui on 8/4/15.
+ * @author Robert HG (254963746@qq.com) on 8/4/15.
  */
 public class SpringBean {
 

+ 8 - 2
lts-jobtracker/src/main/java/com/lts/jobtracker/support/JobPusher.java

@@ -7,6 +7,7 @@ import com.lts.core.constant.Constants;
 import com.lts.core.constant.Level;
 import com.lts.core.exception.RemotingSendException;
 import com.lts.core.exception.RequestTimeoutException;
+import com.lts.core.extension.ExtensionLoader;
 import com.lts.core.factory.NamedThreadFactory;
 import com.lts.core.logger.Logger;
 import com.lts.core.logger.LoggerFactory;
@@ -17,6 +18,8 @@ import com.lts.core.remoting.RemotingServerDelegate;
 import com.lts.core.support.SystemClock;
 import com.lts.jobtracker.domain.JobTrackerApplication;
 import com.lts.jobtracker.domain.TaskTrackerNode;
+import com.lts.queue.PreLoader;
+import com.lts.queue.PreLoaderFactory;
 import com.lts.queue.domain.JobPo;
 import com.lts.queue.exception.DuplicateJobException;
 import com.lts.remoting.InvokeCallback;
@@ -37,11 +40,15 @@ public class JobPusher {
     private final Logger LOGGER = LoggerFactory.getLogger(JobPusher.class);
     private JobTrackerApplication application;
     private final ExecutorService executorService;
+    private PreLoader preLoader;
+
 
     public JobPusher(JobTrackerApplication application) {
         this.application = application;
         this.executorService = Executors.newFixedThreadPool(Constants.AVAILABLE_PROCESSOR * 5,
                 new NamedThreadFactory(JobPusher.class.getSimpleName()));
+
+        preLoader = ExtensionLoader.getExtensionLoader(PreLoaderFactory.class).getAdaptiveExtension().getPreLoader(application.getConfig(), application);
     }
 
     public void push(final RemotingServerDelegate remotingServer, final JobPullRequest request) {
@@ -108,8 +115,7 @@ public class JobPusher {
         final String identity = taskTrackerNode.getIdentity();
 
         // 从mongo 中取一个可运行的job
-        final JobPo jobPo = application.getExecutableJobQueue().take(nodeGroup, identity);
-
+        final JobPo jobPo = preLoader.take(nodeGroup, identity);
         if (jobPo == null) {
             if (LOGGER.isDebugEnabled()) {
                 LOGGER.debug("Job push failed: no job! nodeGroup=" + nodeGroup + ", identity=" + identity);

+ 1 - 1
lts-logger/lts-logger-mongo/src/main/java/com/lts/biz/logger/mongo/MongoJobLogger.java

@@ -28,7 +28,7 @@ public class MongoJobLogger extends MongoRepository implements JobLogger {
         DBCollection dbCollection = template.getCollection();
         List<DBObject> indexInfo = dbCollection.getIndexInfo();
         // create index if not exist
-        if (CollectionUtils.isEmpty(indexInfo)) {
+        if (CollectionUtils.sizeOf(indexInfo) <= 1) {
             template.ensureIndex("idx_logTime", "logTime");
             template.ensureIndex("idx_taskId_taskTrackerNodeGroup", "taskId,taskTrackerNodeGroup");
         }

+ 143 - 0
lts-queue/lts-queue-api/src/main/java/com/lts/queue/AbstractPreLoader.java

@@ -0,0 +1,143 @@
+package com.lts.queue;
+
+import com.lts.core.Application;
+import com.lts.core.cluster.Node;
+import com.lts.core.cluster.NodeType;
+import com.lts.core.commons.collect.ConcurrentHashSet;
+import com.lts.core.commons.utils.CollectionUtils;
+import com.lts.core.constant.EcTopic;
+import com.lts.ec.EventInfo;
+import com.lts.ec.EventSubscriber;
+import com.lts.ec.Observer;
+import com.lts.queue.domain.JobPo;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 8/14/15.
+ */
+public abstract class AbstractPreLoader implements PreLoader {
+
+    // 当前节点的序号
+    private int curSequence = 0;
+    private int totalNodes = 1;
+    // 没个节点的步长
+    protected int step = 500;
+    // 预取阀值
+    private double factor = 0.8;
+
+    private ConcurrentHashMap<String/*taskTrackerNodeGroup*/, List<JobPo>>
+            JOB_MAP = new ConcurrentHashMap<String, List<JobPo>>();
+
+    // 加载的信号
+    private ConcurrentHashSet<String> LOAD_SIGNAL = new ConcurrentHashSet<String>();
+    private ScheduledExecutorService LOAD_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();
+    private ScheduledFuture<?> scheduledFuture;
+    private AtomicBoolean start = new AtomicBoolean(false);
+
+    public AbstractPreLoader(final Application application) {
+        if (start.compareAndSet(false, true)) {
+            scheduledFuture = LOAD_EXECUTOR_SERVICE.scheduleWithFixedDelay(new Runnable() {
+                @Override
+                public void run() {
+
+                    for (String loadTaskTrackerNodeGroup : LOAD_SIGNAL) {
+                        if (JOB_MAP.get(loadTaskTrackerNodeGroup).size() / step < factor) {
+                            // load
+                            List<JobPo> loads = load(loadTaskTrackerNodeGroup, curSequence * step);
+                            // 加入到内存中
+                            if (CollectionUtils.isNotEmpty(loads)) {
+                                JOB_MAP.get(loadTaskTrackerNodeGroup).addAll(loads);
+                            }
+                        }
+                    }
+                }
+            }, 3, 1, TimeUnit.SECONDS);
+        }
+
+        application.getEventCenter().subscribe(new EventSubscriber(application.getConfig().getIdentity() + "_preLoader", new Observer() {
+            @Override
+            public void onObserved(EventInfo eventInfo) {
+                setCurSequence(application);
+            }
+        }), EcTopic.NODE_ADD, EcTopic.NODE_REMOVE);
+
+        setCurSequence(application);
+    }
+
+    private void setCurSequence(Application application) {
+        List<Node> nodes = application.getSubscribedNodeManager().getNodeList(NodeType.JOB_TRACKER);
+        totalNodes = CollectionUtils.sizeOf(nodes);
+        if (totalNodes == 0) {
+            curSequence = 0;
+        } else if (totalNodes == 1) {
+            curSequence = 0;
+        } else {
+            List<Node> copy = new ArrayList<Node>(nodes);
+            Collections.sort(copy, new Comparator<Node>() {
+                @Override
+                public int compare(Node left, Node right) {
+                    return left.getCreateTime().compareTo(right.getCreateTime());
+                }
+            });
+
+            int index = 0;
+            for (Node node : copy) {
+                if (node.getIdentity().equals(application.getConfig().getIdentity())) {
+                    // 当前节点
+                    curSequence = index;
+                    break;
+                }
+                index++;
+            }
+        }
+    }
+
+    public JobPo take(String taskTrackerNodeGroup, String taskTrackerIdentity) {
+        while (true) {
+            JobPo jobPo = get(taskTrackerNodeGroup);
+            if (jobPo == null) {
+                return null;
+            }
+            // update jobPo
+            if (lockJob(taskTrackerNodeGroup, jobPo.getJobId(), taskTrackerIdentity)) {
+                return jobPo;
+            }
+        }
+    }
+
+    protected abstract boolean lockJob(String taskTrackerNodeGroup, String jobId, String taskTrackerIdentity);
+
+    protected abstract List<JobPo> load(String loadTaskTrackerNodeGroup, int offset);
+
+    private JobPo get(String taskTrackerNodeGroup) {
+        List<JobPo> jobPos = JOB_MAP.get(taskTrackerNodeGroup);
+        if (jobPos == null) {
+            jobPos = new CopyOnWriteArrayList<JobPo>();
+            List<JobPo> oldJobPos = JOB_MAP.putIfAbsent(taskTrackerNodeGroup, jobPos);
+            if (oldJobPos != null) {
+                jobPos = oldJobPos;
+            }
+        }
+
+        if (jobPos.size() / step < factor) {
+            // 触发加载的请求
+            if (!LOAD_SIGNAL.contains(taskTrackerNodeGroup)) {
+                LOAD_SIGNAL.add(taskTrackerNodeGroup);
+            }
+        }
+        if (jobPos.size() > 0) {
+            try {
+                return jobPos.remove(0);
+            } catch (ArrayIndexOutOfBoundsException e) {
+                return null;
+            }
+        }
+        return null;
+    }
+}

+ 12 - 0
lts-queue/lts-queue-api/src/main/java/com/lts/queue/PreLoader.java

@@ -0,0 +1,12 @@
+package com.lts.queue;
+
+import com.lts.queue.domain.JobPo;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 8/14/15.
+ */
+public interface PreLoader {
+
+    public JobPo take(String taskTrackerNodeGroup, String taskTrackerIdentity);
+
+}

+ 18 - 0
lts-queue/lts-queue-api/src/main/java/com/lts/queue/PreLoaderFactory.java

@@ -0,0 +1,18 @@
+package com.lts.queue;
+
+import com.lts.core.Application;
+import com.lts.core.cluster.Config;
+import com.lts.core.extension.Adaptive;
+import com.lts.core.extension.SPI;
+import com.lts.queue.domain.JobPo;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 8/14/15.
+ */
+@SPI("mysql")
+public interface PreLoaderFactory {
+
+    @Adaptive("job.queue")
+    public PreLoader getPreLoader(Config config, Application application);
+
+}

+ 1 - 1
lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoCronJobQueue.java

@@ -29,7 +29,7 @@ public class MongoCronJobQueue extends AbstractMongoJobQueue implements CronJobQ
         DBCollection dbCollection = template.getCollection();
         List<DBObject> indexInfo = dbCollection.getIndexInfo();
         // create index if not exist
-        if (CollectionUtils.isEmpty(indexInfo)) {
+        if (CollectionUtils.sizeOf(indexInfo) <= 1) {
             template.ensureIndex("idx_jobId", "jobId", true, true);
             template.ensureIndex("idx_taskId_taskTrackerNodeGroup", "taskId, taskTrackerNodeGroup", true, true);
         }

+ 39 - 14
lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoExecutableJobQueue.java

@@ -1,6 +1,7 @@
 package com.lts.queue.mongo;
 
 import com.lts.core.cluster.Config;
+import com.lts.core.commons.collect.ConcurrentHashSet;
 import com.lts.core.commons.utils.CollectionUtils;
 import com.lts.core.commons.utils.StringUtils;
 import com.lts.core.constant.Constants;
@@ -18,6 +19,7 @@ import com.mongodb.DuplicateKeyException;
 import com.mongodb.WriteResult;
 import org.mongodb.morphia.query.Query;
 import org.mongodb.morphia.query.UpdateOperations;
+import org.mongodb.morphia.query.UpdateResults;
 
 import java.util.List;
 import java.util.concurrent.Semaphore;
@@ -36,7 +38,7 @@ public class MongoExecutableJobQueue extends AbstractMongoJobQueue implements Ex
 
     public MongoExecutableJobQueue(Config config) {
         super(config);
-        int permits = config.getParameter(Constants.JOB_TAKE_PARALLEL_SIZE, 50);
+        int permits = config.getParameter(Constants.JOB_TAKE_PARALLEL_SIZE, 10000);
         if (permits <= 10) {
             permits = Constants.DEFAULT_JOB_TAKE_PARALLEL_SIZE;
         }
@@ -52,13 +54,15 @@ public class MongoExecutableJobQueue extends AbstractMongoJobQueue implements Ex
         return JobQueueUtils.getExecutableQueueName(taskTrackerNodeGroup);
     }
 
+    private ConcurrentHashSet<String> EXIST_TABLE = new ConcurrentHashSet<String>();
+
     @Override
     public boolean createQueue(String taskTrackerNodeGroup) {
         String tableName = JobQueueUtils.getExecutableQueueName(taskTrackerNodeGroup);
         DBCollection dbCollection = template.getCollection(tableName);
         List<DBObject> indexInfo = dbCollection.getIndexInfo();
         // create index if not exist
-        if (CollectionUtils.isEmpty(indexInfo)) {
+        if (CollectionUtils.sizeOf(indexInfo) <= 1) {
             template.ensureIndex(tableName, "idx_jobId", "jobId", true, true);
             template.ensureIndex(tableName, "idx_taskId_taskTrackerNodeGroup", "taskId, taskTrackerNodeGroup", true, true);
             template.ensureIndex(tableName, "idx_taskTrackerIdentity", "taskTrackerIdentity");
@@ -66,13 +70,18 @@ public class MongoExecutableJobQueue extends AbstractMongoJobQueue implements Ex
             template.ensureIndex(tableName, "idx_isRunning", "isRunning");
             LOGGER.info("create queue " + tableName);
         }
+        EXIST_TABLE.add(tableName);
         return true;
     }
 
     @Override
     public boolean add(JobPo jobPo) {
         try {
+            // TODO 这里后面改掉,所有的nodeGroup 需要先在lts-admin中申请,就不用判断了
             String tableName = JobQueueUtils.getExecutableQueueName(jobPo.getTaskTrackerNodeGroup());
+            if (!EXIST_TABLE.contains(tableName)) {
+                createQueue(jobPo.getTaskTrackerNodeGroup());
+            }
             jobPo.setGmtCreated(SystemClock.now());
             jobPo.setGmtModified(jobPo.getGmtCreated());
             template.save(tableName, jobPo);
@@ -96,18 +105,34 @@ public class MongoExecutableJobQueue extends AbstractMongoJobQueue implements Ex
             LOGGER.warn("Try to take job failed.", e);
         }
         try {
-            String tableName = JobQueueUtils.getExecutableQueueName(taskTrackerNodeGroup);
-            Query<JobPo> query = template.createQuery(tableName, JobPo.class);
-            query.field("isRunning").equal(false)
-                    .filter("triggerTime < ", SystemClock.now())
-                    .order(" triggerTime, priority , gmtCreated");
-
-            UpdateOperations<JobPo> operations =
-                    template.createUpdateOperations(JobPo.class)
-                            .set("isRunning", true)
-                            .set("taskTrackerIdentity", taskTrackerIdentity)
-                            .set("gmtModified", SystemClock.now());
-            return template.findAndModify(query, operations, false);
+            while (true) {
+                String tableName = JobQueueUtils.getExecutableQueueName(taskTrackerNodeGroup);
+                Query<JobPo> query = template.createQuery(tableName, JobPo.class);
+                query.field("isRunning").equal(false)
+                        .filter("triggerTime < ", SystemClock.now())
+                        .order(" triggerTime, priority , gmtCreated");
+
+                JobPo jobPo = query.get();
+                if (jobPo == null) {
+                    return null;
+                }
+                UpdateOperations<JobPo> operations =
+                        template.createUpdateOperations(JobPo.class)
+                                .set("isRunning", true)
+                                .set("taskTrackerIdentity", taskTrackerIdentity)
+                                .set("gmtModified", SystemClock.now());
+                Query<JobPo> updateQuery = template.createQuery(tableName, JobPo.class);
+                updateQuery.field("jobId").equal(jobPo.getJobId())
+                        .field("isRunning").equal(false);
+                UpdateResults updateResult = template.update(updateQuery, operations);
+                if (updateResult.getUpdatedCount() == 1) {
+                    return jobPo;
+                }
+                try {
+                    Thread.sleep(10);
+                } catch (InterruptedException ignored) {
+                }
+            }
         } finally {
             if (acquire) {
                 semaphore.release();

+ 1 - 1
lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoExecutingJobQueue.java

@@ -29,7 +29,7 @@ public class MongoExecutingJobQueue extends AbstractMongoJobQueue implements Exe
         DBCollection dbCollection = template.getCollection();
         List<DBObject> indexInfo = dbCollection.getIndexInfo();
         // create index if not exist
-        if (CollectionUtils.isEmpty(indexInfo)) {
+        if (CollectionUtils.sizeOf(indexInfo) <= 1) {
             template.ensureIndex("idx_jobId", "jobId", true, true);
             template.ensureIndex("idx_taskId_taskTrackerNodeGroup", "taskId, taskTrackerNodeGroup", true, true);
             template.ensureIndex("idx_taskTrackerIdentity", "taskTrackerIdentity");

+ 1 - 1
lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoJobFeedbackQueue.java

@@ -36,7 +36,7 @@ public class MongoJobFeedbackQueue extends MongoRepository implements JobFeedbac
         DBCollection dbCollection = template.getCollection(tableName);
         List<DBObject> indexInfo = dbCollection.getIndexInfo();
         // create index if not exist
-        if (CollectionUtils.isEmpty(indexInfo)) {
+        if (CollectionUtils.sizeOf(indexInfo) <= 1) {
             template.ensureIndex(tableName, "idx_gmtCreated", "gmtCreated");
             LOGGER.info("create queue " + tableName);
         }

+ 1 - 1
lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoNodeGroupStore.java

@@ -28,7 +28,7 @@ public class MongoNodeGroupStore extends MongoRepository implements NodeGroupSto
         DBCollection dbCollection = template.getCollection();
         List<DBObject> indexInfo = dbCollection.getIndexInfo();
         // create index if not exist
-        if (CollectionUtils.isEmpty(indexInfo)) {
+        if (CollectionUtils.sizeOf(indexInfo) <= 1) {
             template.ensureIndex("idx_nodeType_name", "nodeType,name", true, true);
         }
     }

+ 56 - 0
lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoPreLoader.java

@@ -0,0 +1,56 @@
+package com.lts.queue.mongo;
+
+import com.lts.core.Application;
+import com.lts.core.support.JobQueueUtils;
+import com.lts.core.support.SystemClock;
+import com.lts.queue.AbstractPreLoader;
+import com.lts.queue.domain.JobPo;
+import com.lts.store.mongo.DataStoreProvider;
+import com.lts.store.mongo.MongoTemplate;
+import org.mongodb.morphia.AdvancedDatastore;
+import org.mongodb.morphia.query.Query;
+import org.mongodb.morphia.query.UpdateOperations;
+import org.mongodb.morphia.query.UpdateResults;
+
+import java.util.List;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 8/13/15.
+ */
+public class MongoPreLoader extends AbstractPreLoader {
+
+    private MongoTemplate template;
+
+    public MongoPreLoader(final Application application) {
+        super(application);
+        this.template = new MongoTemplate(
+                (AdvancedDatastore) DataStoreProvider.getDataStore(application.getConfig()));
+    }
+
+    protected boolean lockJob(String taskTrackerNodeGroup, String jobId, String taskTrackerIdentity) {
+        UpdateOperations<JobPo> operations =
+                template.createUpdateOperations(JobPo.class)
+                        .set("isRunning", true)
+                        .set("taskTrackerIdentity", taskTrackerIdentity)
+                        .set("gmtModified", SystemClock.now());
+
+        String tableName = JobQueueUtils.getExecutableQueueName(taskTrackerNodeGroup);
+
+        Query<JobPo> updateQuery = template.createQuery(tableName, JobPo.class);
+        updateQuery.field("jobId").equal(jobId)
+                .field("isRunning").equal(false);
+        UpdateResults updateResult = template.update(updateQuery, operations);
+        return updateResult.getUpdatedCount() == 1;
+    }
+
+    protected List<JobPo> load(String loadTaskTrackerNodeGroup, int offset) {
+        // load
+        String tableName = JobQueueUtils.getExecutableQueueName(loadTaskTrackerNodeGroup);
+        Query<JobPo> query = template.createQuery(tableName, JobPo.class);
+        query.field("isRunning").equal(false)
+                .filter("triggerTime < ", SystemClock.now())
+                .order(" triggerTime, priority , gmtCreated").offset(offset).limit(step);
+        return query.asList();
+    }
+
+}

+ 16 - 0
lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoPreLoaderFactory.java

@@ -0,0 +1,16 @@
+package com.lts.queue.mongo;
+
+import com.lts.core.Application;
+import com.lts.core.cluster.Config;
+import com.lts.queue.PreLoader;
+import com.lts.queue.PreLoaderFactory;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 8/14/15.
+ */
+public class MongoPreLoaderFactory implements PreLoaderFactory {
+    @Override
+    public PreLoader getPreLoader(Config config, Application application) {
+        return new MongoPreLoader(application);
+    }
+}

+ 1 - 0
lts-queue/lts-queue-mongo/src/main/resources/META-INF/lts/internal/com.lts.queue.PreLoaderFactory

@@ -0,0 +1 @@
+mongo=com.lts.queue.mongo.MongoPreLoaderFactory

+ 1 - 0
lts-queue/lts-queue-mysql/src/main/java/com/lts/queue/mysql/MysqlExecutableJobQueue.java

@@ -1,6 +1,7 @@
 package com.lts.queue.mysql;
 
 import com.lts.core.cluster.Config;
+import com.lts.core.commons.collect.ConcurrentHashSet;
 import com.lts.core.commons.file.FileUtils;
 import com.lts.core.commons.utils.StringUtils;
 import com.lts.core.constant.Constants;

+ 83 - 0
lts-queue/lts-queue-mysql/src/main/java/com/lts/queue/mysql/MysqlPreLoader.java

@@ -0,0 +1,83 @@
+package com.lts.queue.mysql;
+
+import com.lts.core.Application;
+import com.lts.core.support.JobQueueUtils;
+import com.lts.core.support.SystemClock;
+import com.lts.queue.AbstractPreLoader;
+import com.lts.queue.domain.JobPo;
+import com.lts.queue.mysql.support.ResultSetHandlerHolder;
+import com.lts.store.jdbc.DataSourceProviderFactory;
+import com.lts.store.jdbc.SqlTemplate;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 8/14/15.
+ */
+public class MysqlPreLoader extends AbstractPreLoader {
+
+    private SqlTemplate sqlTemplate;
+
+    public MysqlPreLoader(Application application) {
+        super(application);
+
+        sqlTemplate = new SqlTemplate(
+                DataSourceProviderFactory.create(application.getConfig())
+                        .getDataSource(application.getConfig()));
+    }
+
+
+    private String taskUpdateSQL = "UPDATE `{tableName}` SET " +
+            "`is_running` = ?, " +
+            "`task_tracker_identity` = ?, " +
+            "`gmt_modified` = ?" +
+            " WHERE job_id = ? AND is_running = ?";
+
+    @Override
+    protected boolean lockJob(String taskTrackerNodeGroup, String jobId, String taskTrackerIdentity) {
+        try {
+            int affectedRow = sqlTemplate.update(getRealSql(taskUpdateSQL, taskTrackerNodeGroup), true, taskTrackerIdentity, SystemClock.now(), jobId, false);
+            return affectedRow == 1;
+        } catch (SQLException e) {
+            return false;
+        }
+    }
+
+    private String takeSelectSQL = "SELECT *" +
+            " FROM `{tableName}` " +
+            " WHERE is_running = ? " +
+            " AND `trigger_time` < ? " +
+            " ORDER BY `trigger_time` ASC, `priority` ASC, `gmt_created` ASC " +
+            " LIMIT ?, ?";
+
+    @Override
+    protected List<JobPo> load(String loadTaskTrackerNodeGroup, int offset) {
+        try {
+            Long now = SystemClock.now();
+            return sqlTemplate.query(getRealSql(takeSelectSQL, loadTaskTrackerNodeGroup),
+                    ResultSetHandlerHolder.JOB_PO_LIST_RESULT_SET_HANDLER,
+                    false, now, offset, step);
+        } catch (SQLException e) {
+            return null;
+        }
+    }
+
+    private String getRealSql(String sql, String taskTrackerNodeGroup) {
+        String key = sql.concat(taskTrackerNodeGroup);
+        String fineSQL = SQL_CACHE_MAP.get(key);
+        // 这里可以不用锁,多生成一次也不会产生什么问题
+        if (fineSQL == null) {
+            fineSQL = sql.replace("{tableName}", getTableName(taskTrackerNodeGroup));
+            SQL_CACHE_MAP.put(key, fineSQL);
+        }
+        return fineSQL;
+    }
+
+    private final ConcurrentHashMap<String, String> SQL_CACHE_MAP = new ConcurrentHashMap<String, String>();
+
+    private String getTableName(String taskTrackerNodeGroup) {
+        return JobQueueUtils.getExecutableQueueName(taskTrackerNodeGroup);
+    }
+}

+ 16 - 0
lts-queue/lts-queue-mysql/src/main/java/com/lts/queue/mysql/MysqlPreLoaderFactory.java

@@ -0,0 +1,16 @@
+package com.lts.queue.mysql;
+
+import com.lts.core.Application;
+import com.lts.core.cluster.Config;
+import com.lts.queue.PreLoader;
+import com.lts.queue.PreLoaderFactory;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 8/14/15.
+ */
+public class MysqlPreLoaderFactory implements PreLoaderFactory {
+    @Override
+    public PreLoader getPreLoader(Config config, Application application) {
+        return new MysqlPreLoader(application);
+    }
+}

+ 1 - 0
lts-queue/lts-queue-mysql/src/main/resources/META-INF/lts/internal/com.lts.queue.PreLoaderFactory

@@ -0,0 +1 @@
+mysql=com.lts.queue.mysql.MysqlPreLoaderFactory