Просмотр исходного кода

增加有中断接口的 InterruptibleJobRunner

hugui 9 лет назад
Родитель
Сommit
5ebf955a7a

+ 1 - 1
lts-core/src/main/java/com/lts/core/cmd/HttpCmdServer.java

@@ -37,7 +37,7 @@ public class HttpCmdServer {
                 LOGGER.info("Start succeed at port {}", port);
             }
         } catch (Exception t) {
-            LOGGER.error("Start error at port {} , use lts.command.port config change the port.", port, t);
+            LOGGER.error("Start error at port {} , use [lts.command.port] config change the port.", port, t);
             throw new HttpCmdException(t);
         }
     }

+ 5 - 0
lts-tasktracker/pom.xml

@@ -17,6 +17,11 @@
             <artifactId>lts-core</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 

+ 12 - 0
lts-tasktracker/src/main/java/com/lts/tasktracker/logger/BizLoggerAdapter.java

@@ -0,0 +1,12 @@
+package com.lts.tasktracker.logger;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 2/21/16.
+ */
+public abstract class BizLoggerAdapter implements BizLogger {
+
+    public abstract void setId(String jobId, String taskId);
+
+    public abstract void removeId();
+
+}

+ 1 - 1
lts-tasktracker/src/main/java/com/lts/tasktracker/logger/BizLoggerImpl.java

@@ -27,7 +27,7 @@ import java.util.List;
  *
  * @author Robert HG (254963746@qq.com) on 3/27/15.
  */
-public class BizLoggerImpl implements BizLogger {
+public class BizLoggerImpl extends BizLoggerAdapter implements BizLogger {
 
     private Level level;
     private RemotingClientDelegate remotingClient;

+ 10 - 1
lts-tasktracker/src/main/java/com/lts/tasktracker/logger/MockBizLogger.java

@@ -7,7 +7,7 @@ import com.lts.core.logger.LoggerFactory;
 /**
  * @author Robert HG (254963746@qq.com) on 9/12/15.
  */
-public class MockBizLogger implements BizLogger {
+public class MockBizLogger extends BizLoggerAdapter implements BizLogger {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(MockBizLogger.class);
     private Level level;
@@ -40,4 +40,13 @@ public class MockBizLogger implements BizLogger {
         }
     }
 
+    @Override
+    public void setId(String jobId, String taskId) {
+
+    }
+
+    @Override
+    public void removeId() {
+
+    }
 }

+ 13 - 0
lts-tasktracker/src/main/java/com/lts/tasktracker/runner/InterruptibleJobRunner.java

@@ -0,0 +1,13 @@
+package com.lts.tasktracker.runner;
+
+/**
+ * 实现这个类可以自定义在中断时候的操作
+ * @author Robert HG (254963746@qq.com) on 2/21/16.
+ */
+public interface InterruptibleJobRunner extends JobRunner {
+
+    /**
+     * 当任务被cancel(中断)的时候,调用这个
+     */
+    void interrupt();
+}

+ 35 - 4
lts-tasktracker/src/main/java/com/lts/tasktracker/runner/JobRunnerDelegate.java

@@ -9,9 +9,12 @@ import com.lts.core.support.SystemClock;
 import com.lts.tasktracker.Result;
 import com.lts.tasktracker.domain.Response;
 import com.lts.tasktracker.domain.TaskTrackerAppContext;
+import com.lts.tasktracker.logger.BizLogger;
+import com.lts.tasktracker.logger.BizLoggerAdapter;
 import com.lts.tasktracker.logger.BizLoggerFactory;
 import com.lts.tasktracker.logger.BizLoggerImpl;
 import com.lts.tasktracker.monitor.TaskTrackerMonitor;
+import sun.nio.ch.Interruptible;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
@@ -29,24 +32,38 @@ public class JobRunnerDelegate implements Runnable {
     private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.TaskTracker);
     private JobWrapper jobWrapper;
     private RunnerCallback callback;
-    private BizLoggerImpl logger;
+    private BizLoggerAdapter logger;
     private TaskTrackerAppContext appContext;
     private TaskTrackerMonitor monitor;
+    private Interruptible interruptor;
+    private JobRunner curJobRunner;
 
     public JobRunnerDelegate(TaskTrackerAppContext appContext,
                              JobWrapper jobWrapper, RunnerCallback callback) {
         this.jobWrapper = jobWrapper;
         this.callback = callback;
         this.appContext = appContext;
-        this.logger = (BizLoggerImpl) BizLoggerFactory.getLogger(
+        this.logger =  (BizLoggerAdapter)BizLoggerFactory.getLogger(
                 appContext.getBizLogLevel(),
                 appContext.getRemotingClient(), appContext);
         monitor = (TaskTrackerMonitor) appContext.getMonitor();
+        this.interruptor = new Interruptible() {
+            @Override
+            public void interrupt() {
+                JobRunnerDelegate.this.interrupt();
+            }
+        };
     }
 
     @Override
     public void run() {
         try {
+
+            blockedOn(interruptor);
+            if (Thread.currentThread().isInterrupted()) {
+                interruptor.interrupt();
+            }
+
             LtsLoggerFactory.setLogger(logger);
 
             while (jobWrapper != null) {
@@ -58,8 +75,10 @@ public class JobRunnerDelegate implements Runnable {
                 try {
                     appContext.getRunnerPool().getRunningJobManager()
                             .in(jobWrapper.getJobId());
-                    Result result = appContext.getRunnerPool().getRunnerFactory()
-                            .newRunner().run(jobWrapper.getJob());
+
+                    this.curJobRunner = appContext.getRunnerPool().getRunnerFactory().newRunner();
+                    Result result = curJobRunner.run(jobWrapper.getJob());
+
                     if (result == null) {
                         response.setAction(Action.EXECUTE_SUCCESS);
                     } else {
@@ -70,6 +89,7 @@ public class JobRunnerDelegate implements Runnable {
                         }
                         response.setMsg(result.getMsg());
                     }
+
                     long time = SystemClock.now() - startTime;
                     monitor.addRunningTime(time);
                     LOGGER.info("Job execute completed : {}, time:{} ms.", jobWrapper, time);
@@ -97,6 +117,13 @@ public class JobRunnerDelegate implements Runnable {
             }
         } finally {
             LtsLoggerFactory.remove();
+            blockedOn(null);
+        }
+    }
+
+    private void interrupt() {
+        if (this.curJobRunner != null && this.curJobRunner instanceof InterruptibleJobRunner) {
+            ((InterruptibleJobRunner) this.curJobRunner).interrupt();
         }
     }
 
@@ -120,4 +147,8 @@ public class JobRunnerDelegate implements Runnable {
         }
     }
 
+    private static void blockedOn(Interruptible interruptible) {
+        sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), interruptible);
+    }
+
 }

+ 2 - 0
lts-tasktracker/src/main/java/com/lts/tasktracker/runner/RunnerPool.java

@@ -10,6 +10,7 @@ import com.lts.ec.EventSubscriber;
 import com.lts.ec.Observer;
 import com.lts.tasktracker.domain.TaskTrackerAppContext;
 import com.lts.tasktracker.expcetion.NoAvailableJobRunnerException;
+import sun.nio.ch.Interruptible;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -112,6 +113,7 @@ public class RunnerPool {
      * 它试图终止线程的方法是通过调用Thread.interrupt()方法来实现的,但是大家知道,这种方法的作用有限,
      * 如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt()方法是无法中断当前的线程的。
      * 所以,ShutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。
+     * 特殊的时候可以通过使用{@link InterruptibleJobRunner}来解决
      */
     public void stopWorking() {
         try {

+ 98 - 0
lts-tasktracker/src/test/java/com/lts/tasktracker/interrupter/InterruptRead.java

@@ -0,0 +1,98 @@
+package com.lts.tasktracker.interrupter;
+
+import sun.nio.ch.Interruptible;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+abstract class InterruptSupport {
+    private volatile boolean interrupted = false;
+    private Interruptible interruptor = new Interruptible() {
+        public void interrupt() {
+            interrupted = true;
+            InterruptSupport.this.interrupt(); // 位置3
+        }
+    };
+
+    public final boolean execute() throws InterruptedException {
+        try {
+            blockedOn(interruptor); // 位置1
+            System.out.println("=======1");
+            if (Thread.currentThread().isInterrupted()) { // 立马被interrupted
+                interruptor.interrupt();
+                System.out.println("=======2");
+            }
+            // 执行业务代码
+            bussiness();
+            System.out.println("=======3");
+        } finally {
+            blockedOn(null); // 位置2
+            System.out.println("=======4");
+        }
+        return interrupted;
+    }
+
+    public abstract void bussiness();
+
+    public abstract void interrupt();
+
+    // -- sun.misc.SharedSecrets --
+    static void blockedOn(Interruptible intr) { // package-private
+        sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr);
+    }
+}
+
+public class InterruptRead extends InterruptSupport {
+    private FileInputStream in;
+
+    @Override
+    public void bussiness() {
+        File file = new File("/dev/urandom"); // 读取linux黑洞,永远读不完
+        try {
+            in = new FileInputStream(file);
+            byte[] bytes = new byte[1024];
+            while (in.read(bytes, 0, 1024) > 0) {
+                //
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public FileInputStream getIn() {
+        return in;
+    }
+
+    @Override
+    public void interrupt() {
+        try {
+            in.getChannel().close();
+            System.out.println("=======6");
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public static void main(String args[]) throws Exception {
+        final InterruptRead test = new InterruptRead();
+        Thread t = new Thread() {
+            @Override
+            public void run() {
+                long start = System.currentTimeMillis();
+                try {
+                    System.out.println("InterruptRead start!");
+                    test.execute();
+                } catch (InterruptedException e) {
+                    System.out.println("InterruptRead end! cost time : " + (System.currentTimeMillis() - start));
+                    e.printStackTrace();
+                }
+            }
+        };
+        t.start();
+        // 先让Read执行3秒
+        Thread.sleep(3000);
+        // 发出interrupt中断
+        t.interrupt();
+    }
+}

+ 21 - 0
lts-tasktracker/src/test/java/com/lts/tasktracker/runner/NormalJobRunner.java

@@ -0,0 +1,21 @@
+package com.lts.tasktracker.runner;
+
+import com.lts.core.domain.Job;
+import com.lts.core.json.JSON;
+import com.lts.tasktracker.Result;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 2/21/16.
+ */
+public class NormalJobRunner implements JobRunner {
+    boolean stop = false;
+    @Override
+    public Result run(Job job) throws Throwable {
+        System.out.println("我开始执行:" + JSON.toJSONString(job));
+        while (!stop) {
+            int i = 1;
+        }
+        System.out.println("我退出了");
+        return null;
+    }
+}

+ 71 - 0
lts-tasktracker/src/test/java/com/lts/tasktracker/runner/RunnerPoolTest.java

@@ -0,0 +1,71 @@
+package com.lts.tasktracker.runner;
+
+import com.lts.core.cluster.Config;
+import com.lts.core.cluster.LTSConfig;
+import com.lts.core.constant.Environment;
+import com.lts.core.domain.Job;
+import com.lts.core.domain.JobWrapper;
+import com.lts.core.json.JSON;
+import com.lts.ec.injvm.InjvmEventCenter;
+import com.lts.tasktracker.domain.Response;
+import com.lts.tasktracker.domain.TaskTrackerAppContext;
+import com.lts.tasktracker.expcetion.NoAvailableJobRunnerException;
+import com.lts.tasktracker.monitor.TaskTrackerMonitor;
+import org.junit.Test;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 2/21/16.
+ */
+public class RunnerPoolTest {
+
+    @Test
+    public void testInterruptor() throws NoAvailableJobRunnerException {
+
+        LTSConfig.setEnvironment(Environment.UNIT_TEST);
+
+        Config config = new Config();
+        config.setWorkThreads(10);
+        config.setIdentity("fjdaslfjlasj");
+
+        TaskTrackerAppContext appContext = new TaskTrackerAppContext();
+        appContext.setConfig(config);
+        appContext.setEventCenter(new InjvmEventCenter());
+//        appContext.setJobRunnerClass(TestInterruptorJobRunner.class);
+        appContext.setJobRunnerClass(NormalJobRunner.class);
+
+        RunnerPool runnerPool = new RunnerPool(appContext);
+
+        appContext.setRunnerPool(runnerPool);
+
+        TaskTrackerMonitor monitor = new TaskTrackerMonitor(appContext);
+        appContext.setMonitor(monitor);
+
+        RunnerCallback callback = new RunnerCallback(){
+
+            @Override
+            public JobWrapper runComplete(Response response) {
+                System.out.println("complete:" + JSON.toJSONString(response));
+                return null;
+            }
+        };
+
+        Job job = new Job();
+        job.setTaskId("fdsafas");
+
+        JobWrapper jobWrapper = new JobWrapper();
+        jobWrapper.setJobId("111111");
+        jobWrapper.setJob(job);
+
+        runnerPool.execute(jobWrapper, callback);
+
+
+        try {
+            Thread.sleep(5000L);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        // 5s之后停止
+        runnerPool.stopWorking();
+    }
+
+}

+ 29 - 0
lts-tasktracker/src/test/java/com/lts/tasktracker/runner/TestInterruptorJobRunner.java

@@ -0,0 +1,29 @@
+package com.lts.tasktracker.runner;
+
+import com.lts.core.domain.Job;
+import com.lts.core.json.JSON;
+import com.lts.tasktracker.Result;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 2/21/16.
+ */
+public class TestInterruptorJobRunner implements InterruptibleJobRunner {
+
+    private boolean stop = false;
+
+    @Override
+    public void interrupt() {
+        System.out.println("我设置停止标识");
+        stop = true;
+    }
+
+    @Override
+    public Result run(Job job) throws Throwable {
+        System.out.println("我开始执行:" + JSON.toJSONString(job));
+        while (!stop) {
+            int i = 1;
+        }
+        System.out.println("我退出了");
+        return null;
+    }
+}