MonitorDataService.java 8.5 KB


  1. package com.lts.web.service;
  2. import com.alibaba.fastjson.TypeReference;
  3. import com.lts.core.commons.utils.BeanUtils;
  4. import com.lts.core.commons.utils.CollectionUtils;
  5. import com.lts.core.commons.utils.JSONUtils;
  6. import com.lts.core.domain.monitor.JVMMonitorData;
  7. import com.lts.core.domain.monitor.JobTrackerMonitorData;
  8. import com.lts.core.domain.monitor.MonitorData;
  9. import com.lts.core.domain.monitor.TaskTrackerMonitorData;
  10. import com.lts.core.support.SystemClock;
  11. import com.lts.web.repository.domain.*;
  12. import com.lts.web.repository.mapper.*;
  13. import com.lts.web.request.MonitorDataAddRequest;
  14. import com.lts.web.request.MonitorDataRequest;
  15. import org.springframework.beans.factory.annotation.Autowired;
  16. import org.springframework.stereotype.Service;
  17. import java.lang.reflect.Method;
  18. import java.util.ArrayList;
  19. import java.util.List;
  20. import java.util.Map;
  21. import java.util.concurrent.ConcurrentHashMap;
  22. /**
  23. * @author Robert HG (254963746@qq.com) on 9/1/15.
  24. */
  25. @Service
  26. public class MonitorDataService {
  27. @Autowired
  28. TaskTrackerMonitorRepo taskTrackerMonitorRepo;
  29. @Autowired
  30. JobTrackerMonitorRepo jobTrackerMonitorDataRepo;
  31. @Autowired
  32. JVMInfoRepo jvmInfoRepo;
  33. @Autowired
  34. JVMGCRepo jvmGCRepo;
  35. @Autowired
  36. JVMMemoryRepo jvmMemoryRepo;
  37. @Autowired
  38. JVMThreadRepo jvmThreadRepo;
  39. /**
  40. * 添加TaskTracker监控数据
  41. */
  42. public void addTaskTrackerMonitorData(MonitorDataAddRequest request) {
  43. List<TaskTrackerMonitorData> mds = JSONUtils.parse(request.getData(),
  44. new TypeReference<List<TaskTrackerMonitorData>>() {
  45. });
  46. if (CollectionUtils.isEmpty(mds)) {
  47. throw new IllegalArgumentException("monitorData can not be null");
  48. }
  49. List<TaskTrackerMonitorDataPo> pos = new ArrayList<TaskTrackerMonitorDataPo>(mds.size());
  50. for (TaskTrackerMonitorData monitorData : mds) {
  51. TaskTrackerMonitorDataPo po = new TaskTrackerMonitorDataPo();
  52. BeanUtils.copyProperties(po, monitorData);
  53. po.setNodeType(request.getNodeType());
  54. po.setIdentity(request.getIdentity());
  55. po.setNodeGroup(request.getNodeGroup());
  56. po.setGmtCreated(SystemClock.now());
  57. pos.add(po);
  58. }
  59. taskTrackerMonitorRepo.insert(pos);
  60. // add JVM monitor data
  61. addJVMMonitorData(mds, request);
  62. }
  63. public List<? extends AbstractMonitorDataPo> queryMonitorDataSum(MonitorDataRequest request) {
  64. switch (request.getNodeType()) {
  65. case JOB_CLIENT:
  66. return null;
  67. case JOB_TRACKER:
  68. return jobTrackerMonitorDataRepo.querySum(request);
  69. case TASK_TRACKER:
  70. return taskTrackerMonitorRepo.querySum(request);
  71. default:
  72. return null;
  73. }
  74. }
  75. /**
  76. * 添加JobTracker监控数据
  77. */
  78. public void addJobTrackerMonitorData(MonitorDataAddRequest request) {
  79. List<JobTrackerMonitorData> mds = JSONUtils.parse(request.getData(),
  80. new TypeReference<List<JobTrackerMonitorData>>() {
  81. });
  82. if (CollectionUtils.isEmpty(mds)) {
  83. throw new IllegalArgumentException("monitorData can not be null");
  84. }
  85. List<JobTrackerMonitorDataPo> pos = new ArrayList<JobTrackerMonitorDataPo>(mds.size());
  86. for (JobTrackerMonitorData monitorData : mds) {
  87. JobTrackerMonitorDataPo po = new JobTrackerMonitorDataPo();
  88. BeanUtils.copyProperties(po, monitorData);
  89. po.setNodeType(request.getNodeType());
  90. po.setIdentity(request.getIdentity());
  91. po.setNodeGroup(request.getNodeGroup());
  92. po.setGmtCreated(SystemClock.now());
  93. pos.add(po);
  94. }
  95. jobTrackerMonitorDataRepo.insert(pos);
  96. // add JVM monitor data
  97. addJVMMonitorData(mds, request);
  98. }
  99. private <T extends MonitorData> void addJVMMonitorData(List<T> mds, MonitorDataAddRequest request) {
  100. int size = mds.size();
  101. List<JVMGCDataPo> jvmGCDataPos = new ArrayList<JVMGCDataPo>(size);
  102. List<JVMMemoryDataPo> jvmMemoryDataPos = new ArrayList<JVMMemoryDataPo>(size);
  103. List<JVMThreadDataPo> jvmThreadDataPos = new ArrayList<JVMThreadDataPo>(size);
  104. for (T md : mds) {
  105. JVMMonitorData jvmMonitorData = md.getJvmMonitorData();
  106. Long timestamp = md.getTimestamp();
  107. // gc
  108. JVMGCDataPo jvmgcDataPo = getDataPo(jvmMonitorData.getGcMap(), JVMGCDataPo.class, request, timestamp);
  109. jvmGCDataPos.add(jvmgcDataPo);
  110. // memory
  111. JVMMemoryDataPo jvmMemoryDataPo = getDataPo(jvmMonitorData.getMemoryMap(), JVMMemoryDataPo.class, request, timestamp);
  112. jvmMemoryDataPos.add(jvmMemoryDataPo);
  113. // thread
  114. JVMThreadDataPo jvmThreadDataPo = getDataPo(jvmMonitorData.getThreadMap(), JVMThreadDataPo.class, request, timestamp);
  115. jvmThreadDataPos.add(jvmThreadDataPo);
  116. }
  117. jvmGCRepo.insert(jvmGCDataPos);
  118. jvmMemoryRepo.insert(jvmMemoryDataPos);
  119. jvmThreadRepo.insert(jvmThreadDataPos);
  120. }
  121. private static final Map<String, Method> CACHED_METHOD_MAP = new ConcurrentHashMap<String, Method>();
  122. static {
  123. cacheMethod(JVMGCDataPo.class);
  124. cacheMethod(JVMMemoryDataPo.class);
  125. cacheMethod(JVMThreadDataPo.class);
  126. }
  127. private static void cacheMethod(Class<?> clazz) {
  128. Method[] methods = clazz.getDeclaredMethods();
  129. for (Method method : methods) {
  130. if (method.getName().startsWith("set")) {
  131. CACHED_METHOD_MAP.put(clazz.getSimpleName() + "_" + method.getName().substring(3), method);
  132. }
  133. }
  134. }
  135. /**
  136. * 根据Map得到 持久化对象
  137. */
  138. private <T extends AbstractMonitorDataPo> T getDataPo(Map<String, Object> dataMap, Class<T> clazz,
  139. MonitorDataAddRequest request, Long timestamp) {
  140. try {
  141. T data = clazz.newInstance();
  142. if (CollectionUtils.isNotEmpty(dataMap)) {
  143. for (Map.Entry<String, Object> entry : dataMap.entrySet()) {
  144. Method method = CACHED_METHOD_MAP.get(clazz.getSimpleName() + "_" + entry.getKey());
  145. if (method != null) {
  146. String string = String.valueOf(entry.getValue());
  147. Object value = entry.getValue();
  148. Class<?> parameterType = method.getParameterTypes()[0];
  149. if (parameterType == Long.class || parameterType == long.class) {
  150. value = Long.valueOf(string);
  151. } else if (parameterType == Integer.class || parameterType == int.class) {
  152. value = Integer.valueOf(string);
  153. } else if (parameterType == Float.class || parameterType == float.class) {
  154. value = Float.valueOf(string);
  155. } else if (parameterType == Double.class || parameterType == double.class) {
  156. value = Double.valueOf(string);
  157. } else if (parameterType == Short.class || parameterType == short.class) {
  158. value = Short.valueOf(string);
  159. } else if (parameterType == Boolean.class || parameterType == boolean.class) {
  160. value = Boolean.valueOf(string);
  161. } else if (parameterType == String.class) {
  162. value = string;
  163. }
  164. // TODO others
  165. method.invoke(data, value);
  166. }
  167. }
  168. }
  169. data.setNodeType(request.getNodeType());
  170. data.setNodeGroup(request.getNodeGroup());
  171. data.setIdentity(request.getIdentity());
  172. data.setGmtCreated(SystemClock.now());
  173. data.setTimestamp(timestamp);
  174. return data;
  175. } catch (Exception e) {
  176. throw new IllegalStateException(e);
  177. }
  178. }
  179. /**
  180. * 添加JVMInfo 信息
  181. */
  182. public void addJVMInfoData(MonitorDataAddRequest request) {
  183. JVMInfoDataPo data = new JVMInfoDataPo();
  184. data.setNodeType(request.getNodeType());
  185. data.setNodeGroup(request.getNodeGroup());
  186. data.setIdentity(request.getIdentity());
  187. data.setGmtCreated(SystemClock.now());
  188. data.setTimestamp(SystemClock.now());
  189. data.setJvmInfo(request.getData());
  190. jvmInfoRepo.insert(data);
  191. }
  192. }