Home
胡贵 edited this page 9 years ago

名称简写说明: JobTracker 简称 jt, TaskTracker检查 tt, JobClient检查 jc

####JobTracker启动 项目需要引入 lts-job-tracker.jar, 或者 使用maven构建,将lts所有的jar包上传到本地的maven仓库中。后面我会将这个上传到maven中央仓库中。

final JobTracker jobTracker = new JobTracker();
        // 节点信息配置
        jobTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");  // zookeeper 作为注册中心
//        jobTracker.setRegistryAddress("redis://127.0.0.1:6379");    // redis 做注册中心
//        jobTracker.setListenPort(35002); // 默认 35001
        jobTracker.setClusterName("test_cluster");                    // 集群名称(jt,tt,jc 的集群名称必须一样,才会是一个集群)

        jobTracker.addMasterChangeListener(new MasterChangeListenerImpl());   // jt master 节点监听器

        // 设置业务日志记录
        jobTracker.addConfig("job.logger", "mongo");
//        jobTracker.addConfig("job.logger", "mysql");
        // 任务队列用mongo
        jobTracker.addConfig("job.queue", "mongo");
        // mongo 配置
        jobTracker.addConfig("mongo.addresses", "127.0.0.1:27017");     // 多个地址用逗号分割
        jobTracker.addConfig("mongo.database", "lts");

        jobTracker.setOldDataHandler(new OldDataDeletePolicy());
        // 设置 zk 客户端用哪个, 可选 zkclient, curator 默认是 zkclient
//        jobTracker.addConfig("zk.client", "zkclient");
        // 启动节点
        jobTracker.start();

        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                jobTracker.stop();
            }
        }));

####TaskTracker启动

final TaskTracker taskTracker = new TaskTracker();
        taskTracker.setJobRunnerClass(TestJobRunner.class);
        taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
//        taskTracker.setRegistryAddress("redis://127.0.0.1:6379");
        taskTracker.setNodeGroup("test_trade_TaskTracker");
        taskTracker.setClusterName("test_cluster");
        taskTracker.setWorkThreads(20);                  // 工作线程
//        taskTracker.setFailStorePath(Constants.USER_HOME);
        taskTracker.addMasterChangeListener(new MasterChangeListenerImpl());
//        taskTracker.setBizLoggerLevel(Level.INFO);        // 业务日志级别
        taskTracker.start();

        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                taskTracker.stop();
            }
        }));
public class TestJobRunner implements JobRunner {

    @Override
    public void run(Job job) throws Throwable {

        System.out.println(
                new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
                        + " 我要执行:" + job + "shopId=" + job.getParam("shopId"));

        BizLogger bizLogger = LtsLoggerFactory.getBizLogger();
        // 会发送到 LTS (JobTracker上)
        bizLogger.info("测试,业务日志啊啊啊啊啊");
        // 这里是用户的业务逻辑
        try {
            System.out.println("我要睡个1s");
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

####JobClient启动

final JobClient jobClient = new RetryJobClient();
//      final JobClient jobClient = new JobClient();
        jobClient.setNodeGroup("test_jobClient");
        jobClient.setClusterName("test_cluster");
        jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181");
//        jobClient.setRegistryAddress("redis://127.0.0.1:6379");
        // 任务重试保存地址,默认用户目录下
//        jobClient.setFailStorePath(Constants.USER_HOME);
        jobClient.setJobFinishedHandler(new JobFinishedHandlerImpl());     // 任务完成监听器
        jobClient.addMasterChangeListener(new MasterChangeListenerImpl());
//      jobClient.addConfig("job.fail.store", "leveldb");     // 默认
//      jobClient.addConfig("job.fail.store", "berkeleydb");
//      jobClient.addConfig("job.fail.store", "rocksdb");
        jobClient.addConfig("job.submit.concurrency.size", "20");    // 并发提交任务线程数
        jobClient.start();