Bladeren bron

优化 队列策略,大大提升性能

hugui 9 jaren geleden
bovenliggende
commit
796fd87830

+ 32 - 37
lts-core/src/main/java/com/lts/core/constant/Constants.java

@@ -11,80 +11,75 @@ import java.util.regex.Pattern;
 public interface Constants {
 
     // 可用的处理器个数
-    public static final int AVAILABLE_PROCESSOR = Runtime.getRuntime().availableProcessors();
+    int AVAILABLE_PROCESSOR = Runtime.getRuntime().availableProcessors();
 
-    public static final String USER_HOME = System.getProperty("user.home");
+    String USER_HOME = System.getProperty("user.home");
 
-    public static final int JOB_TRACKER_DEFAULT_LISTEN_PORT = 35001;
+    int JOB_TRACKER_DEFAULT_LISTEN_PORT = 35001;
 
     // 默认集群名字
-    public static final String DEFAULT_CLUSTER_NAME = "defaultCluster";
+    String DEFAULT_CLUSTER_NAME = "defaultCluster";
 
     // 默认JobTracker节点组
-    public static final String DEFAULT_NODE_JOB_TRACKER_GROUP = "jobTrackerGroup";
+    String DEFAULT_NODE_JOB_TRACKER_GROUP = "jobTrackerGroup";
     // 默认JobClient节点组
-    public static final String DEFAULT_NODE_JOB_CLIENT_GROUP = "jobClientGroup";
+    String DEFAULT_NODE_JOB_CLIENT_GROUP = "jobClientGroup";
     // 默认TaskTracker节点组
-    public static final String DEFAULT_NODE_TASK_TRACKER_GROUP = "taskTrackerGroup";
+    String DEFAULT_NODE_TASK_TRACKER_GROUP = "taskTrackerGroup";
 
-    public static final String CHARSET = "utf-8";
+    String CHARSET = "utf-8";
 
-    public static final int DEFAULT_TIMEOUT = 1000;
+    int DEFAULT_TIMEOUT = 1000;
 
-    public static final String TIMEOUT_KEY = "timeout";
+    String TIMEOUT_KEY = "timeout";
 
-    public static final String SESSION_TIMEOUT_KEY = "session";
+    String SESSION_TIMEOUT_KEY = "session";
 
-    public static final int DEFAULT_SESSION_TIMEOUT = 60 * 1000;
+    int DEFAULT_SESSION_TIMEOUT = 60 * 1000;
 
-    public static final String REGISTER = "register";
+    String REGISTER = "register";
 
-    public static final String UNREGISTER = "unregister";
+    String UNREGISTER = "unregister";
 
-    public static final String SUBSCRIBE = "subscribe";
+    String SUBSCRIBE = "subscribe";
 
-    public static final String UNSUBSCRIBE = "unsubscribe";
+    String UNSUBSCRIBE = "unsubscribe";
     /**
      * 注册中心失败事件重试事件
      */
-    public static final String REGISTRY_RETRY_PERIOD_KEY = "retry.period";
+    String REGISTRY_RETRY_PERIOD_KEY = "retry.period";
 
     /**
      * 重试周期
      */
-    public static final int DEFAULT_REGISTRY_RETRY_PERIOD = 5 * 1000;
+    int DEFAULT_REGISTRY_RETRY_PERIOD = 5 * 1000;
 
-    public static final Pattern COMMA_SPLIT_PATTERN = Pattern.compile("\\s*[,]+\\s*");
+    Pattern COMMA_SPLIT_PATTERN = Pattern.compile("\\s*[,]+\\s*");
 
     /**
      * 注册中心自动重连时间
      */
-    public static final String REGISTRY_RECONNECT_PERIOD_KEY = "reconnect.period";
+    String REGISTRY_RECONNECT_PERIOD_KEY = "reconnect.period";
 
-    public static final int DEFAULT_REGISTRY_RECONNECT_PERIOD = 3 * 1000;
+    int DEFAULT_REGISTRY_RECONNECT_PERIOD = 3 * 1000;
 
-    public static final String ZK_CLIENT_KEY = "zk.client";
+    String ZK_CLIENT_KEY = "zk.client";
 
-    public static final String JOB_LOGGER_KEY = "job.logger";
+    String JOB_LOGGER_KEY = "job.logger";
 
-    public static final String JOB_QUEUE_KEY = "job.queue";
+    String JOB_QUEUE_KEY = "job.queue";
     // 客户端提交并发请求size
-    public static final String JOB_SUBMIT_CONCURRENCY_SIZE = "job.submit.concurrency.size";
-    public static final int DEFAULT_JOB_SUBMIT_CONCURRENCY_SIZE = 100;
+    String JOB_SUBMIT_CONCURRENCY_SIZE = "job.submit.concurrency.size";
+    int DEFAULT_JOB_SUBMIT_CONCURRENCY_SIZE = 100;
 
-    public static final String PROCESSOR_THREAD = "job.processor.thread";
-    public static final int DEFAULT_PROCESSOR_THREAD = 32 + AVAILABLE_PROCESSOR * 5;
+    String PROCESSOR_THREAD = "job.processor.thread";
+    int DEFAULT_PROCESSOR_THREAD = 32 + AVAILABLE_PROCESSOR * 5;
 
-    public static final int LATCH_TIMEOUT_MILLIS = 10 * 60 * 1000;      // 10分钟
-
-    // 取任务的时候的并发数控制
-    public static final String JOB_TAKE_PARALLEL_SIZE = "job.take.parallel.size";
-    public static final String JOB_TAKE_ACQUIRE_TIMEOUT = "job.take.acquire.timeout";
-    public static final int DEFAULT_JOB_TAKE_PARALLEL_SIZE = 20;
+    int LATCH_TIMEOUT_MILLIS = 10 * 60 * 1000;      // 10分钟
 
     // 任务最多重试次数
-    public static final String JOB_MAX_RETRY_TIMES = "job.max.retry.times";
-    public static final int DEFAULT_JOB_MAX_RETRY_TIMES = 10;
+    String JOB_MAX_RETRY_TIMES = "job.max.retry.times";
+    int DEFAULT_JOB_MAX_RETRY_TIMES = 10;
 
-    public static final Charset UTF_8 = Charset.forName("UTF-8");
+    Charset UTF_8 = Charset.forName("UTF-8");
 }

+ 2 - 1
lts-example/src/main/java/com/lts/example/api/TaskTrackerTest.java

@@ -1,6 +1,7 @@
 package com.lts.example.api;
 
 import com.lts.example.support.MasterChangeListenerImpl;
+import com.lts.example.support.NoopJobRunner;
 import com.lts.example.support.TestJobRunner;
 import com.lts.tasktracker.TaskTracker;
 
@@ -19,7 +20,7 @@ public class TaskTrackerTest {
         // taskTracker.setRegistryAddress("redis://127.0.0.1:6379");
         taskTracker.setNodeGroup("test_trade_TaskTracker"); // 同一个TaskTracker集群这个名字相同
         taskTracker.setClusterName("test_cluster");
-        taskTracker.setWorkThreads(20);
+        taskTracker.setWorkThreads(100);
         // 反馈任务给JobTracker失败,存储本地文件路径
         // taskTracker.setFailStorePath(Constants.USER_HOME);
         // master 节点变化监听器,当有集群中只需要一个节点执行某个事情的时候,可以监听这个事件

+ 90 - 0
lts-example/src/main/java/com/lts/example/benchmark/JobClientMain.java

@@ -0,0 +1,90 @@
+package com.lts.example.benchmark;
+
+import com.lts.core.commons.utils.StringUtils;
+import com.lts.core.domain.Job;
+import com.lts.example.support.JobFinishedHandlerImpl;
+import com.lts.example.support.MasterChangeListenerImpl;
+import com.lts.jobclient.JobClient;
+import com.lts.jobclient.RetryJobClient;
+import com.lts.jobclient.domain.Response;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 8/13/15.
+ */
+public class JobClientMain {
+
+    public static void main(String[] args) {
+        // 推荐使用RetryJobClient
+        final JobClient jobClient = new RetryJobClient();
+        jobClient.setNodeGroup("test_jobClient");
+        jobClient.setClusterName("test_cluster");
+        jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181");
+        // jobClient.setRegistryAddress("redis://127.0.0.1:6379");
+        // 任务重试保存地址,默认用户目录下
+        // jobClient.setFailStorePath(Constants.USER_HOME);
+        // 任务完成反馈接口
+        jobClient.setJobFinishedHandler(new JobFinishedHandlerImpl());
+        // master 节点变化监听器,当有集群中只需要一个节点执行某个事情的时候,可以监听这个事件
+        jobClient.addMasterChangeListener(new MasterChangeListenerImpl());
+        // 可选址  leveldb(默认), rocksdb, bekeleydb
+        // taskTracker.addConfig("job.fail.store", "leveldb");
+        jobClient.addConfig("job.submit.concurrency.size", "100");
+        jobClient.start();
+
+        try {
+            // 休息1s 等待 连上JobTracker
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        final AtomicLong num = new AtomicLong();
+        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
+
+        // 假设分了 20 个 partition
+
+        final int partition = 100;
+
+        for (int i = 0; i < 100; i++) {
+
+            new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    while (true) {
+                        Job job = new Job();
+                        job.setTaskId(StringUtils.generateUUID());
+                        job.setTaskTrackerNodeGroup("test_trade_TaskTracker_" + (num.get() % partition));
+                        job.setParam("shopId", "111");
+                        job.setNeedFeedback(false);
+
+                        Response response = jobClient.submitJob(job);
+                        if (!response.isSuccess()) {
+                            System.out.println(response.getMsg());
+                        } else {
+                            num.incrementAndGet();
+                        }
+                    }
+                }
+            }).start();
+        }
+
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                while(true){
+                    try {
+                        Thread.sleep(5000);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                    System.out.println(num.get());
+                }
+            }
+        }).start();
+    }
+
+}

+ 65 - 0
lts-example/src/main/java/com/lts/example/benchmark/JobTrackerMain.java

@@ -0,0 +1,65 @@
+package com.lts.example.benchmark;
+
+import com.lts.example.support.MasterChangeListenerImpl;
+import com.lts.jobtracker.JobTracker;
+import com.lts.jobtracker.support.policy.OldDataDeletePolicy;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 8/13/15.
+ */
+public class JobTrackerMain {
+
+    public static void main(String[] args) {
+
+//        final JobTracker jobTracker = new JobTracker();
+//        // 节点信息配置
+//        jobTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
+////        jobTracker.setRegistryAddress("redis://127.0.0.1:6379");
+//        jobTracker.setListenPort(35001); // 默认 35001
+//        jobTracker.setClusterName("test_cluster");
+//
+//        jobTracker.addMasterChangeListener(new MasterChangeListenerImpl());
+//
+//        // 设置业务日志记录 mysql
+//        jobTracker.addConfig("job.logger", "mysql");
+//        // 任务队列用mysql
+//        jobTracker.addConfig("job.queue", "mysql");
+//        // mysql 配置
+//         jobTracker.addConfig("jdbc.url", "jdbc:mysql://127.0.0.1:3306/lts");
+//         jobTracker.addConfig("jdbc.username", "root");
+//         jobTracker.addConfig("jdbc.password", "root");
+//
+//        jobTracker.setOldDataHandler(new OldDataDeletePolicy());
+//        // 设置 zk 客户端用哪个, 可选 zkclient, curator 默认是 zkclient
+////        jobTracker.addConfig("zk.client", "zkclient");
+//        // 启动节点
+//        jobTracker.start();
+
+        final JobTracker jobTracker = new JobTracker();
+        // 节点信息配置
+        jobTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
+        jobTracker.setClusterName("test_cluster");   // 三种节点都要保持一致
+//        jobTracker.setListenPort(50001);
+        // master 节点变化监听器,当有集群中只需要一个节点执行某个事情的时候,可以监听这个事件
+        jobTracker.addMasterChangeListener(new MasterChangeListenerImpl());
+        // 设置业务日志记录, 可选值: mongo, mysql , console
+        jobTracker.addConfig("job.logger", "mongo");
+        // 任务队列用mongo
+        jobTracker.addConfig("job.queue", "mongo");
+        // mongo 配置
+        jobTracker.addConfig("mongo.addresses", "127.0.0.1:27017");     // 多个地址用逗号分割
+        jobTracker.addConfig("mongo.database", "lts2");
+        // 这个是对于 返回给客户端 任务的 老数据删除策略
+        jobTracker.setOldDataHandler(new OldDataDeletePolicy());
+        // 设置 zk 客户端用哪个, 可选 zkclient(默认), curator
+        jobTracker.addConfig("zk.client", "zkclient");
+        // 启动节点
+        jobTracker.start();
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                jobTracker.stop();
+            }
+        }));
+    }
+}

+ 38 - 0
lts-example/src/main/java/com/lts/example/benchmark/TaskTrackerMain.java

@@ -0,0 +1,38 @@
+package com.lts.example.benchmark;
+
+import com.lts.example.support.MasterChangeListenerImpl;
+import com.lts.example.support.NoopJobRunner;
+import com.lts.tasktracker.TaskTracker;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 8/13/15.
+ */
+public class TaskTrackerMain {
+
+    public static void main(String[] args) {
+        for (int i = 0; i < 10; i++) {
+            startTaskTracker(i);
+        }
+    }
+
+    private static void startTaskTracker(int index) {
+        final TaskTracker taskTracker = new TaskTracker();
+        // 任务执行类,实现JobRunner 接口
+        taskTracker.setJobRunnerClass(NoopJobRunner.class);
+        taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
+        // taskTracker.setRegistryAddress("redis://127.0.0.1:6379");
+        taskTracker.setNodeGroup("test_trade_TaskTracker_" + index); // 同一个TaskTracker集群这个名字相同
+        taskTracker.setClusterName("test_cluster");
+        taskTracker.setWorkThreads(10);
+        // 反馈任务给JobTracker失败,存储本地文件路径
+        // taskTracker.setFailStorePath(Constants.USER_HOME);
+        // master 节点变化监听器,当有集群中只需要一个节点执行某个事情的时候,可以监听这个事件
+        taskTracker.addMasterChangeListener(new MasterChangeListenerImpl());
+        // 业务日志级别
+        // taskTracker.setBizLoggerLevel(Level.INFO);
+        // 可选址  leveldb(默认), rocksdb, bekeleydb
+        // taskTracker.addConfig("job.fail.store", "leveldb");
+        taskTracker.start();
+    }
+
+}

+ 21 - 0
lts-example/src/main/java/com/lts/example/support/NoopJobRunner.java

@@ -0,0 +1,21 @@
+package com.lts.example.support;
+
+import com.lts.core.domain.Action;
+import com.lts.core.domain.Job;
+import com.lts.tasktracker.Result;
+import com.lts.tasktracker.runner.JobRunner;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 8/13/15.
+ */
+public class NoopJobRunner implements JobRunner {
+
+    static AtomicInteger num = new AtomicInteger(0);
+    @Override
+    public Result run(Job job) throws Throwable {
+        System.out.println(num.incrementAndGet());
+        return new Result(Action.EXECUTE_SUCCESS);
+    }
+}

+ 2 - 2
lts-example/src/main/resources/log4j.properties

@@ -1,7 +1,7 @@
 
-log4j.rootLogger=INFO,stdout
+log4j.rootLogger=WARN,stdout
 
-log4j.appender.stdout.Threshold=INFO
+log4j.appender.stdout.Threshold=WARN
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=[%-5p] [%d{HH:mm:ss}] %c - %m%n

+ 3 - 1
lts-jobtracker/src/main/java/com/lts/jobtracker/JobTracker.java

@@ -28,6 +28,8 @@ public class JobTracker extends AbstractServerNode<JobTrackerNode, JobTrackerApp
     private JobFeedbackQueueFactory jobFeedbackQueueFactory = ExtensionLoader.getExtensionLoader(JobFeedbackQueueFactory.class).getAdaptiveExtension();
     private NodeGroupStoreFactory nodeGroupStoreFactory = ExtensionLoader.getExtensionLoader(NodeGroupStoreFactory.class).getAdaptiveExtension();
 
+    private PreLoaderFactory preLoaderFactory = ExtensionLoader.getExtensionLoader(PreLoaderFactory.class).getAdaptiveExtension();
+
     public JobTracker() {
         // 添加节点变化监听器
         addNodeChangeListener(new JobNodeChangeListener(application));
@@ -50,7 +52,7 @@ public class JobTracker extends AbstractServerNode<JobTrackerNode, JobTrackerApp
         application.setCronJobQueue(cronJobQueueFactory.getQueue(config));
         application.setJobFeedbackQueue(jobFeedbackQueueFactory.getQueue(config));
         application.setNodeGroupStore(nodeGroupStoreFactory.getStore(config));
-
+        application.setPreLoader(preLoaderFactory.getPreLoader(config, application));
         application.getChannelManager().start();
     }
 

+ 10 - 0
lts-jobtracker/src/main/java/com/lts/jobtracker/domain/JobTrackerApplication.java

@@ -46,6 +46,16 @@ public class JobTrackerApplication extends Application {
     // job id generator
     private IdGenerator idGenerator;
 
+    private PreLoader preLoader;
+
+    public PreLoader getPreLoader() {
+        return preLoader;
+    }
+
+    public void setPreLoader(PreLoader preLoader) {
+        this.preLoader = preLoader;
+    }
+
     public JobLogger getJobLogger() {
         return jobLogger;
     }

+ 1 - 1
lts-jobtracker/src/main/java/com/lts/jobtracker/processor/JobFinishedProcessor.java

@@ -235,7 +235,7 @@ public class JobFinishedProcessor extends AbstractProcessor {
      */
     private JobPushRequest getNewJob(String taskTrackerNodeGroup, String taskTrackerIdentity) {
 
-        JobPo jobPo = application.getExecutableJobQueue().take(taskTrackerNodeGroup, taskTrackerIdentity);
+        JobPo jobPo = application.getPreLoader().take(taskTrackerNodeGroup, taskTrackerIdentity);
         if (jobPo == null) {
             return null;
         }

+ 1 - 8
lts-jobtracker/src/main/java/com/lts/jobtracker/support/JobPusher.java

@@ -7,7 +7,6 @@ import com.lts.core.constant.Constants;
 import com.lts.core.constant.Level;
 import com.lts.core.exception.RemotingSendException;
 import com.lts.core.exception.RequestTimeoutException;
-import com.lts.core.extension.ExtensionLoader;
 import com.lts.core.factory.NamedThreadFactory;
 import com.lts.core.logger.Logger;
 import com.lts.core.logger.LoggerFactory;
@@ -18,8 +17,6 @@ import com.lts.core.remoting.RemotingServerDelegate;
 import com.lts.core.support.SystemClock;
 import com.lts.jobtracker.domain.JobTrackerApplication;
 import com.lts.jobtracker.domain.TaskTrackerNode;
-import com.lts.queue.PreLoader;
-import com.lts.queue.PreLoaderFactory;
 import com.lts.queue.domain.JobPo;
 import com.lts.queue.exception.DuplicateJobException;
 import com.lts.remoting.InvokeCallback;
@@ -40,15 +37,11 @@ public class JobPusher {
     private final Logger LOGGER = LoggerFactory.getLogger(JobPusher.class);
     private JobTrackerApplication application;
     private final ExecutorService executorService;
-    private PreLoader preLoader;
-
 
     public JobPusher(JobTrackerApplication application) {
         this.application = application;
         this.executorService = Executors.newFixedThreadPool(Constants.AVAILABLE_PROCESSOR * 5,
                 new NamedThreadFactory(JobPusher.class.getSimpleName()));
-
-        preLoader = ExtensionLoader.getExtensionLoader(PreLoaderFactory.class).getAdaptiveExtension().getPreLoader(application.getConfig(), application);
     }
 
     public void push(final RemotingServerDelegate remotingServer, final JobPullRequest request) {
@@ -115,7 +108,7 @@ public class JobPusher {
         final String identity = taskTrackerNode.getIdentity();
 
         // 从mongo 中取一个可运行的job
-        final JobPo jobPo = preLoader.take(nodeGroup, identity);
+        final JobPo jobPo = application.getPreLoader().take(nodeGroup, identity);
         if (jobPo == null) {
             if (LOGGER.isDebugEnabled()) {
                 LOGGER.debug("Job push failed: no job! nodeGroup=" + nodeGroup + ", identity=" + identity);

+ 0 - 5
lts-queue/lts-queue-api/src/main/java/com/lts/queue/ExecutableJobQueue.java

@@ -21,11 +21,6 @@ public interface ExecutableJobQueue extends JobQueue{
      */
     boolean add(JobPo jobPo);
 
-    /**
-     * 从队列中取一个元素,并锁住这个元素
-     */
-    JobPo take(String taskTrackerNodeGroup, String taskTrackerIdentity);
-
     /**
      * 出队列
      */

+ 0 - 62
lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoExecutableJobQueue.java

@@ -4,7 +4,6 @@ import com.lts.core.cluster.Config;
 import com.lts.core.commons.collect.ConcurrentHashSet;
 import com.lts.core.commons.utils.CollectionUtils;
 import com.lts.core.commons.utils.StringUtils;
-import com.lts.core.constant.Constants;
 import com.lts.core.logger.Logger;
 import com.lts.core.logger.LoggerFactory;
 import com.lts.core.support.JobQueueUtils;
@@ -19,11 +18,8 @@ import com.mongodb.DuplicateKeyException;
 import com.mongodb.WriteResult;
 import org.mongodb.morphia.query.Query;
 import org.mongodb.morphia.query.UpdateOperations;
-import org.mongodb.morphia.query.UpdateResults;
 
 import java.util.List;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
 
 /**
  * @author Robert HG (254963746@qq.com) on 5/28/15.
@@ -32,18 +28,8 @@ public class MongoExecutableJobQueue extends AbstractMongoJobQueue implements Ex
 
     private static final Logger LOGGER = LoggerFactory.getLogger(MongoExecutableJobQueue.class);
 
-    // 这里做一下流控
-    private Semaphore semaphore;
-    private long acquireTimeout = 2000; // 2s
-
     public MongoExecutableJobQueue(Config config) {
         super(config);
-        int permits = config.getParameter(Constants.JOB_TAKE_PARALLEL_SIZE, 10000);
-        if (permits <= 10) {
-            permits = Constants.DEFAULT_JOB_TAKE_PARALLEL_SIZE;
-        }
-        this.acquireTimeout = config.getParameter(Constants.JOB_TAKE_ACQUIRE_TIMEOUT, acquireTimeout);
-        this.semaphore = new Semaphore(permits);
     }
 
     @Override
@@ -92,54 +78,6 @@ public class MongoExecutableJobQueue extends AbstractMongoJobQueue implements Ex
         return true;
     }
 
-    @Override
-    public JobPo take(String taskTrackerNodeGroup, String taskTrackerIdentity) {
-        boolean acquire = false;
-        try {
-            acquire = semaphore.tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS);
-            if (!acquire) {
-                // 直接返回null
-                return null;
-            }
-        } catch (InterruptedException e) {
-            LOGGER.warn("Try to take job failed.", e);
-        }
-        try {
-            while (true) {
-                String tableName = JobQueueUtils.getExecutableQueueName(taskTrackerNodeGroup);
-                Query<JobPo> query = template.createQuery(tableName, JobPo.class);
-                query.field("isRunning").equal(false)
-                        .filter("triggerTime < ", SystemClock.now())
-                        .order(" triggerTime, priority , gmtCreated");
-
-                JobPo jobPo = query.get();
-                if (jobPo == null) {
-                    return null;
-                }
-                UpdateOperations<JobPo> operations =
-                        template.createUpdateOperations(JobPo.class)
-                                .set("isRunning", true)
-                                .set("taskTrackerIdentity", taskTrackerIdentity)
-                                .set("gmtModified", SystemClock.now());
-                Query<JobPo> updateQuery = template.createQuery(tableName, JobPo.class);
-                updateQuery.field("jobId").equal(jobPo.getJobId())
-                        .field("isRunning").equal(false);
-                UpdateResults updateResult = template.update(updateQuery, operations);
-                if (updateResult.getUpdatedCount() == 1) {
-                    return jobPo;
-                }
-                try {
-                    Thread.sleep(10);
-                } catch (InterruptedException ignored) {
-                }
-            }
-        } finally {
-            if (acquire) {
-                semaphore.release();
-            }
-        }
-    }
-
     @Override
     public boolean remove(String taskTrackerNodeGroup, String jobId) {
         String tableName = JobQueueUtils.getExecutableQueueName(taskTrackerNodeGroup);

+ 0 - 80
lts-queue/lts-queue-mysql/src/main/java/com/lts/queue/mysql/MysqlExecutableJobQueue.java

@@ -27,22 +27,11 @@ import java.util.concurrent.TimeUnit;
  */
 public class MysqlExecutableJobQueue extends AbstractMysqlJobQueue implements ExecutableJobQueue {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(MysqlExecutableJobQueue.class);
-
-    // 这里做一下流控, 尽量减小 mysql dead lock 的概率
-    private Semaphore semaphore;
-    private long acquireTimeout = 2000; // 2s
     // 用来缓存SQL,不用每次去生成,可以重用
     private final ConcurrentHashMap<String, String> SQL_CACHE_MAP = new ConcurrentHashMap<String, String>();
 
     public MysqlExecutableJobQueue(Config config) {
         super(config);
-        int permits = config.getParameter(Constants.JOB_TAKE_PARALLEL_SIZE, Constants.DEFAULT_JOB_TAKE_PARALLEL_SIZE);
-        if (permits <= 10) {
-            permits = Constants.DEFAULT_JOB_TAKE_PARALLEL_SIZE;
-        }
-        this.acquireTimeout = config.getParameter(Constants.JOB_TAKE_ACQUIRE_TIMEOUT, acquireTimeout);
-        this.semaphore = new Semaphore(permits);
     }
 
     @Override
@@ -96,75 +85,6 @@ public class MysqlExecutableJobQueue extends AbstractMysqlJobQueue implements Ex
         return true;
     }
 
-    private String takeSelectSQL = "SELECT *" +
-            " FROM `{tableName}` " +
-            " WHERE is_running = ? " +
-            " AND `trigger_time` < ? " +
-            " ORDER BY `trigger_time` ASC, `priority` ASC, `gmt_created` ASC " +
-            " LIMIT 0, 1";
-
-    private String taskUpdateSQL = "UPDATE `{tableName}` SET " +
-            "`is_running` = ?, " +
-            "`task_tracker_identity` = ?, " +
-            "`gmt_modified` = ?" +
-            " WHERE job_id = ? AND is_running = ?";
-
-    @Override
-    public JobPo take(final String taskTrackerNodeGroup, final String taskTrackerIdentity) {
-        boolean acquire = false;
-        try {
-            acquire = semaphore.tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS);
-            if (!acquire) {
-                // 直接返回null
-                return null;
-            }
-        } catch (InterruptedException e) {
-            LOGGER.warn("Try to take job failed.", e);
-        }
-        try {
-            /**
-             * 这里从SELECT FOR UPDATE 优化为 CAS 乐观锁
-             */
-            Long now = SystemClock.now();
-            Object[] selectParams = new Object[]{false, now};
-
-            JobPo jobPo = getSqlTemplate().query(getRealSql(takeSelectSQL, taskTrackerNodeGroup),
-                    ResultSetHandlerHolder.JOB_PO_RESULT_SET_HANDLER, selectParams);
-            if (jobPo == null) {
-                return null;
-            }
-
-            Object[] params = new Object[]{
-                    true, taskTrackerIdentity, now, jobPo.getJobId(), false
-            };
-            // 返回影响的行数
-            int affectedRow = 0;
-            try {
-                affectedRow = getSqlTemplate().update(getRealSql(taskUpdateSQL, taskTrackerNodeGroup), params);
-            } catch (SQLException e) {
-                //  dead lock ignore
-                if (e.getMessage().contains("Deadlock found when trying to get lock")) {
-                    return null;
-                }
-                throw e;
-            }
-            if (affectedRow == 0) {
-                return take(taskTrackerNodeGroup, taskTrackerIdentity);
-            } else {
-                jobPo.setIsRunning(true);
-                jobPo.setTaskTrackerIdentity(taskTrackerIdentity);
-                jobPo.setGmtModified(now);
-                return jobPo;
-            }
-        } catch (SQLException e) {
-            throw new JobQueueException(e);
-        } finally {
-            if(acquire){
-                semaphore.release();
-            }
-        }
-    }
-
     private String removeSQL = "DELETE FROM `{tableName}` WHERE job_id = ?";
 
     @Override