MysqlExecutableJobQueue.java 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. package com.lts.queue.mysql;
  2. import com.lts.core.cluster.Config;
  3. import com.lts.core.commons.collect.ConcurrentHashSet;
  4. import com.lts.core.commons.file.FileUtils;
  5. import com.lts.core.commons.utils.StringUtils;
  6. import com.lts.core.constant.Constants;
  7. import com.lts.core.domain.JobQueueRequest;
  8. import com.lts.core.logger.Logger;
  9. import com.lts.core.logger.LoggerFactory;
  10. import com.lts.core.support.JobQueueUtils;
  11. import com.lts.core.support.SystemClock;
  12. import com.lts.queue.ExecutableJobQueue;
  13. import com.lts.queue.domain.JobPo;
  14. import com.lts.queue.exception.JobQueueException;
  15. import com.lts.queue.mysql.support.ResultSetHandlerHolder;
  16. import java.io.InputStream;
  17. import java.sql.SQLException;
  18. import java.util.List;
  19. import java.util.concurrent.ConcurrentHashMap;
  20. import java.util.concurrent.Semaphore;
  21. import java.util.concurrent.TimeUnit;
  22. /**
  23. * @author Robert HG (254963746@qq.com) on 5/31/15.
  24. */
  25. public class MysqlExecutableJobQueue extends AbstractMysqlJobQueue implements ExecutableJobQueue {
  26. // 用来缓存SQL,不用每次去生成,可以重用
  27. private final ConcurrentHashMap<String, String> SQL_CACHE_MAP = new ConcurrentHashMap<String, String>();
  28. public MysqlExecutableJobQueue(Config config) {
  29. super(config);
  30. }
  31. @Override
  32. protected String getTableName(JobQueueRequest request) {
  33. if (StringUtils.isEmpty(request.getTaskTrackerNodeGroup())) {
  34. throw new IllegalArgumentException(" takeTrackerNodeGroup cat not be null");
  35. }
  36. return getTableName(request.getTaskTrackerNodeGroup());
  37. }
  38. @Override
  39. public boolean createQueue(String taskTrackerNodeGroup) {
  40. // create table
  41. try {
  42. InputStream is = this.getClass().getClassLoader().getResourceAsStream("sql/lts_executable_job_queue.sql");
  43. String sql = FileUtils.read(is);
  44. getSqlTemplate().update(getRealSql(sql, taskTrackerNodeGroup));
  45. return true;
  46. } catch (Exception e) {
  47. throw new JobQueueException("create table error!", e);
  48. }
  49. }
  50. private String getTableName(String taskTrackerNodeGroup) {
  51. return JobQueueUtils.getExecutableQueueName(taskTrackerNodeGroup);
  52. }
  53. private String getRealSql(String sql, String taskTrackerNodeGroup) {
  54. String key = sql.concat(taskTrackerNodeGroup);
  55. String fineSQL = SQL_CACHE_MAP.get(key);
  56. // 这里可以不用锁,多生成一次也不会产生什么问题
  57. if (fineSQL == null) {
  58. fineSQL = sql.replace("{tableName}", getTableName(taskTrackerNodeGroup));
  59. SQL_CACHE_MAP.put(key, fineSQL);
  60. }
  61. return fineSQL;
  62. }
  63. @Override
  64. public boolean add(JobPo jobPo) {
  65. jobPo.setGmtCreated(SystemClock.now());
  66. jobPo.setGmtModified(jobPo.getGmtCreated());
  67. try {
  68. return super.add(getTableName(jobPo.getTaskTrackerNodeGroup()), jobPo);
  69. } catch (JobQueueException e) {
  70. if (e.getMessage().contains("doesn't exist Query:")) {
  71. createQueue(jobPo.getTaskTrackerNodeGroup());
  72. add(jobPo);
  73. }
  74. }
  75. return true;
  76. }
  77. private String removeSQL = "DELETE FROM `{tableName}` WHERE job_id = ?";
  78. @Override
  79. public boolean remove(String taskTrackerNodeGroup, String jobId) {
  80. try {
  81. return getSqlTemplate().update(getRealSql(removeSQL, taskTrackerNodeGroup), jobId) == 1;
  82. } catch (SQLException e) {
  83. throw new JobQueueException(e);
  84. }
  85. }
  86. private String resumeSQL = "UPDATE `{tableName}` SET " +
  87. "`is_running` = ?," +
  88. "`task_tracker_identity` = ?," +
  89. "`gmt_modified` = ?" +
  90. " WHERE job_id = ? ";
  91. @Override
  92. public void resume(JobPo jobPo) {
  93. try {
  94. Object[] params = new Object[]{false, null, SystemClock.now(), jobPo.getJobId()};
  95. getSqlTemplate().update(getRealSql(resumeSQL, jobPo.getTaskTrackerNodeGroup()), params);
  96. } catch (SQLException e) {
  97. throw new JobQueueException(e);
  98. }
  99. }
  100. private String getDeadJobSQL = "SELECT * FROM `{tableName}` WHERE is_running = ? AND gmt_modified < ?";
  101. @Override
  102. public List<JobPo> getDeadJob(String taskTrackerNodeGroup, long deadline) {
  103. try {
  104. return getSqlTemplate().query(getRealSql(getDeadJobSQL, taskTrackerNodeGroup), ResultSetHandlerHolder.JOB_PO_LIST_RESULT_SET_HANDLER, true, deadline);
  105. } catch (SQLException e) {
  106. throw new JobQueueException(e);
  107. }
  108. }
  109. }