RetryJobClient.java 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package com.lts.jobclient;
  2. import com.lts.core.json.JSON;
  3. import com.lts.core.domain.Job;
  4. import com.lts.core.support.RetryScheduler;
  5. import com.lts.jobclient.domain.JobClientApplication;
  6. import com.lts.jobclient.domain.JobClientNode;
  7. import com.lts.jobclient.domain.Response;
  8. import com.lts.jobclient.domain.ResponseCode;
  9. import com.lts.jobclient.support.JobSubmitProtectException;
  10. import java.util.Collections;
  11. import java.util.List;
  12. /**
  13. * @author Robert HG (254963746@qq.com) on 8/14/14.
  14. * 重试 客户端, 如果 没有可用的JobTracker, 那么存文件, 定时重试
  15. */
  16. public class RetryJobClient extends JobClient<JobClientNode, JobClientApplication> {
  17. private RetryScheduler<Job> retryScheduler;
  18. @Override
  19. protected void beforeStart() {
  20. super.beforeStart();
  21. retryScheduler = new RetryScheduler<Job>(application, 30) {
  22. @Override
  23. protected boolean isRemotingEnable() {
  24. return isServerEnable();
  25. }
  26. @Override
  27. protected boolean retry(List<Job> jobs) {
  28. try {
  29. // 重试必须走同步,不然会造成文件锁,死锁
  30. return superSubmitJob(jobs, SubmitType.SYNC).isSuccess();
  31. } catch (Throwable t) {
  32. RetryScheduler.LOGGER.error(t.getMessage(), t);
  33. }
  34. return false;
  35. }
  36. };
  37. retryScheduler.setName(RetryJobClient.class.getSimpleName());
  38. retryScheduler.start();
  39. }
  40. @Override
  41. protected void beforeStop() {
  42. super.beforeStop();
  43. retryScheduler.stop();
  44. }
  45. @Override
  46. public Response submitJob(Job job) {
  47. return submitJob(Collections.singletonList(job));
  48. }
  49. @Override
  50. public Response submitJob(List<Job> jobs) {
  51. Response response;
  52. try {
  53. response = superSubmitJob(jobs);
  54. } catch (JobSubmitProtectException e) {
  55. response = new Response();
  56. response.setSuccess(true);
  57. response.setFailedJobs(jobs);
  58. response.setCode(ResponseCode.SUBMIT_TOO_BUSY_AND_SAVE_FOR_LATER);
  59. response.setMsg(response.getMsg() + ", submit too busy , save local fail store and send later !");
  60. LOGGER.warn(JSON.toJSONString(response));
  61. return response;
  62. }
  63. if (!response.isSuccess()) {
  64. try {
  65. for (Job job : response.getFailedJobs()) {
  66. retryScheduler.inSchedule(job.getTaskId(), job);
  67. }
  68. response.setSuccess(true);
  69. response.setCode(ResponseCode.SUBMIT_FAILED_AND_SAVE_FOR_LATER);
  70. response.setMsg(response.getMsg() + ", save local fail store and send later !");
  71. LOGGER.warn(JSON.toJSONString(response));
  72. } catch (Exception e) {
  73. response.setSuccess(false);
  74. response.setMsg(e.getMessage());
  75. }
  76. }
  77. return response;
  78. }
  79. private Response superSubmitJob(List<Job> jobs) {
  80. return super.submitJob(jobs);
  81. }
  82. private Response superSubmitJob(List<Job> jobs, SubmitType type) {
  83. return super.submitJob(jobs, type);
  84. }
  85. }