Browse Source

Merge pull request #151 from qq254963746/develop

add mapdb FailStore and other upgrade
qq254963746 9 years ago
parent
commit
694f4403bf
34 changed files with 760 additions and 100 deletions
  1. 1 1
      README.md
  2. 43 0
      lts-admin/src/main/java/com/lts/web/support/csrf/CSRFHandlerInterceptor.java
  3. 40 0
      lts-admin/src/main/java/com/lts/web/support/csrf/CSRFTokenManager.java
  4. 16 0
      lts-admin/src/main/java/com/lts/web/support/csrf/CSRFTool.java
  5. 28 0
      lts-admin/src/main/java/com/lts/web/support/xss/XssFilter.java
  6. 70 0
      lts-admin/src/main/java/com/lts/web/support/xss/XssHttpServletRequestWrapper.java
  7. 3 1
      lts-admin/src/main/webapp/WEB-INF/views/templates/jobLogger.vm
  8. 5 0
      lts-core/pom.xml
  9. 84 0
      lts-core/src/main/java/com/lts/core/commons/io/UnsafeByteArrayInputStream.java
  10. 99 0
      lts-core/src/main/java/com/lts/core/commons/io/UnsafeByteArrayOutputStream.java
  11. 27 0
      lts-core/src/main/java/com/lts/core/commons/utils/LRUCache.java
  12. 2 0
      lts-core/src/main/java/com/lts/core/constant/Constants.java
  13. 6 3
      lts-core/src/main/java/com/lts/core/failstore/AbstractFailStore.java
  14. 26 0
      lts-core/src/main/java/com/lts/core/failstore/AbstractFailStoreFactory.java
  15. 5 6
      lts-core/src/main/java/com/lts/core/failstore/berkeleydb/BerkeleydbFailStore.java
  16. 12 9
      lts-core/src/main/java/com/lts/core/failstore/berkeleydb/BerkeleydbFailStoreFactory.java
  17. 4 5
      lts-core/src/main/java/com/lts/core/failstore/leveldb/LeveldbFailStore.java
  18. 12 9
      lts-core/src/main/java/com/lts/core/failstore/leveldb/LeveldbFailStoreFactory.java
  19. 139 0
      lts-core/src/main/java/com/lts/core/failstore/mapdb/MapdbFailStore.java
  20. 22 0
      lts-core/src/main/java/com/lts/core/failstore/mapdb/MapdbFailStoreFactory.java
  21. 5 6
      lts-core/src/main/java/com/lts/core/failstore/rocksdb/RocksdbFailStore.java
  22. 11 9
      lts-core/src/main/java/com/lts/core/failstore/rocksdb/RocksdbFailStoreFactory.java
  23. 0 5
      lts-core/src/main/java/com/lts/core/support/RetryScheduler.java
  24. 3 4
      lts-core/src/main/java/com/lts/remoting/netty/NettyCodecFactory.java
  25. 5 17
      lts-core/src/main/java/com/lts/remoting/serialize/Hessian2Serializable.java
  26. 5 11
      lts-core/src/main/java/com/lts/remoting/serialize/JavaSerializable.java
  27. 2 1
      lts-core/src/main/resources/META-INF/lts/internal/com.lts.core.failstore.FailStoreFactory
  28. 4 4
      lts-core/src/test/java/com/lts/core/failstore/berkeleydb/BerkeleydbFailStoreTest.java
  29. 7 3
      lts-core/src/test/java/com/lts/core/failstore/leveldb/LeveldbFailStoreTest.java
  30. 57 0
      lts-core/src/test/java/com/lts/core/failstore/mapdb/MapdbFailStoreTest.java
  31. 4 4
      lts-core/src/test/java/com/lts/core/failstore/rocksdb/RocksdbFailStoreTest.java
  32. 4 0
      lts-example/pom.xml
  33. 3 2
      lts-example/src/main/java/com/lts/example/api/JobClientTest.java
  34. 6 0
      pom.xml

+ 1 - 1
README.md

@@ -295,7 +295,7 @@ public class LTSSpringConfig implements ApplicationContextAware {
 |job.max.retry.times|可选|10|JobTracker|addConfig("job.max.retry.times", "xx")|任务的最大重试次数|
 |lts.monitor.url|可选|无|JobTracker,TaskTracker|addConfig("lts.monitor.url", "xx")|监控中心地址,也就是LTS-Admin地址,如 http://localhost:8081|
 |stop.working|可选|false|TaskTracker|addConfig("stop.working", "true")|主要用于当TaskTracker与JobTracker出现网络隔离的时候,超过一定时间隔离之后,TaskTracker自动停止当前正在运行的任务|
-|job.fail.store|可选|leveldb|JobClient,TaskTracker|addConfig("job.fail.store", "leveldb")|可选值:leveldb(默认), rocksdb, berkeleydb, FailStore实现|
+|job.fail.store|可选|leveldb|JobClient,TaskTracker|addConfig("job.fail.store", "leveldb")|可选值:leveldb(默认), rocksdb, berkeleydb, mapdb FailStore实现, leveldb有问题的同学,可以试试mapdb|
 |lazy.job.logger|可选|false|JobTracker|addConfig("lazy.job.logger", "true")|可选值:ture,false, 是否延迟批量刷盘日志, 如果启用,采用队列的方式批量将日志刷盘(在应用关闭的时候,可能会造成日志丢失)|
 |dataPath|可选|user.home|JobClient,TaskTracker,JobTracker|setDataPath("xxxx")|FailStore文件存储路径及其它数据存储路径|
 |lts.monitor.interval|可选|1|JobClient,TaskTracker,JobTracker|addConfig("lts.monitor.interval", "2")|分钟,整数,建议1-5分钟|

+ 43 - 0
lts-admin/src/main/java/com/lts/web/support/csrf/CSRFHandlerInterceptor.java

@@ -0,0 +1,43 @@
+package com.lts.web.support.csrf;
+
+import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;
+import org.springframework.web.servlet.resource.DefaultServletHttpRequestHandler;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * A Spring MVC <code>HandlerInterceptor</code> which is responsible to enforce CSRF token validity on incoming posts
+ * requests. The interceptor should be registered with Spring MVC servlet using the following syntax:
+ * <p/>
+ * <mvc:interceptors>
+ * <bean class="com.lts.web.support.csrf.CSRFHandlerInterceptor"/>
+ * </mvc:interceptors>
+ *
+ * @author Robert HG (254963746@qq.com) on 11/10/15.
+ */
+public class CSRFHandlerInterceptor extends HandlerInterceptorAdapter {
+
+    @Override
+    public boolean preHandle(HttpServletRequest request,
+                             HttpServletResponse response, Object handler) throws Exception {
+
+        if (handler instanceof DefaultServletHttpRequestHandler) {
+            return true;
+        }
+
+        if (request.getMethod().equalsIgnoreCase("GET")) {
+            return true;
+        } else {
+            String sessionToken = CSRFTokenManager.getToken(request.getSession());
+            String requestToken = CSRFTokenManager.getToken(request);
+            // 检查 csrf token是否正确
+            if (sessionToken.equals(requestToken)) {
+                return true;
+            } else {
+                response.sendError(HttpServletResponse.SC_FORBIDDEN, "Bad or missing CSRF value");
+                return false;
+            }
+        }
+    }
+}

+ 40 - 0
lts-admin/src/main/java/com/lts/web/support/csrf/CSRFTokenManager.java

@@ -0,0 +1,40 @@
+package com.lts.web.support.csrf;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpSession;
+import java.util.UUID;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 11/10/15.
+ */
+public final class CSRFTokenManager {
+
+    static final String CSRF_PARAM_NAME = "csrfToken";
+
+    public final static String CSRF_TOKEN_FOR_SESSION_ATTR_NAME = CSRFTokenManager.class.getSimpleName() + ".token";
+
+    private CSRFTokenManager() {
+    }
+
+    public static String getToken(HttpSession session) {
+        String token = null;
+
+        synchronized (session) {
+            token = (String) session.getAttribute(CSRF_TOKEN_FOR_SESSION_ATTR_NAME);
+            if (null == token) {
+                token = UUID.randomUUID().toString();
+                session.setAttribute(CSRF_TOKEN_FOR_SESSION_ATTR_NAME, token);
+            }
+        }
+        return token;
+    }
+
+    public static String getToken(HttpServletRequest request) {
+        String token = request.getParameter(CSRF_PARAM_NAME);
+        if (token == null || "".equals(token)) {
+            token = request.getHeader(CSRF_PARAM_NAME);
+        }
+        return token;
+    }
+
+}

+ 16 - 0
lts-admin/src/main/java/com/lts/web/support/csrf/CSRFTool.java

@@ -0,0 +1,16 @@
+package com.lts.web.support.csrf;
+
+import javax.servlet.http.HttpServletRequest;
+
+/**
+ * 配置在 velocity tools 中
+ *
+ * <input type="hidden" name="csrfToken" value="$csrfTool.getToken($request)"/>
+ *
+ * @author Robert HG (254963746@qq.com) on 11/10/15.
+ */
+public class CSRFTool {
+    public static String getToken(HttpServletRequest request) {
+        return CSRFTokenManager.getToken(request.getSession());
+    }
+}

+ 28 - 0
lts-admin/src/main/java/com/lts/web/support/xss/XssFilter.java

@@ -0,0 +1,28 @@
+package com.lts.web.support.xss;
+
+import javax.servlet.*;
+import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 11/10/15.
+ */
+public class XssFilter implements Filter {
+
+    @Override
+    public void init(FilterConfig filterConfig) throws ServletException {
+
+    }
+
+    @Override
+    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
+
+        chain.doFilter(new XssHttpServletRequestWrapper((HttpServletRequest) request), response);
+
+    }
+
+    @Override
+    public void destroy() {
+
+    }
+}

+ 70 - 0
lts-admin/src/main/java/com/lts/web/support/xss/XssHttpServletRequestWrapper.java

@@ -0,0 +1,70 @@
+package com.lts.web.support.xss;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletRequestWrapper;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.regex.Pattern;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 11/10/15.
+ */
+public class XssHttpServletRequestWrapper extends HttpServletRequestWrapper {
+
+    public XssHttpServletRequestWrapper(HttpServletRequest servletRequest) {
+        super(servletRequest);
+    }
+
+    public String[] getParameterValues(String parameter) {
+        String[] values = super.getParameterValues(parameter);
+        if (values == null) {
+            return null;
+        }
+        int count = values.length;
+        String[] encodedValues = new String[count];
+        for (int i = 0; i < count; i++) {
+            encodedValues[i] = cleanXSS(values[i]);
+        }
+        return encodedValues;
+    }
+
+    public String getParameter(String parameter) {
+        String value = super.getParameter(parameter);
+        if (value == null) {
+            return null;
+        }
+        return cleanXSS(value);
+    }
+
+    public String getHeader(String name) {
+        String value = super.getHeader(name);
+        if (value == null)
+            return null;
+        return cleanXSS(value);
+    }
+
+    private static final List<Pattern> PATTERNS = new CopyOnWriteArrayList<Pattern>();
+
+    static {
+        PATTERNS.add(Pattern.compile("<script>(.*?)</script>", Pattern.CASE_INSENSITIVE));
+        PATTERNS.add(Pattern.compile("src[\r\n]*=[\r\n]*\\\'(.*?)\\\'", Pattern.CASE_INSENSITIVE | Pattern.MULTILINE | Pattern.DOTALL));
+        PATTERNS.add(Pattern.compile("src[\r\n]*=[\r\n]*\\\"(.*?)\\\"", Pattern.CASE_INSENSITIVE | Pattern.MULTILINE | Pattern.DOTALL));
+        PATTERNS.add(Pattern.compile("</script>", Pattern.CASE_INSENSITIVE));
+        PATTERNS.add(Pattern.compile("<script(.*?)>", Pattern.CASE_INSENSITIVE | Pattern.MULTILINE | Pattern.DOTALL));
+        PATTERNS.add(Pattern.compile("eval\\((.*?)\\)", Pattern.CASE_INSENSITIVE | Pattern.MULTILINE | Pattern.DOTALL));
+        PATTERNS.add(Pattern.compile("e­xpression\\((.*?)\\)", Pattern.CASE_INSENSITIVE | Pattern.MULTILINE | Pattern.DOTALL));
+        PATTERNS.add(Pattern.compile("javascript:", Pattern.CASE_INSENSITIVE));
+        PATTERNS.add(Pattern.compile("vbscript:", Pattern.CASE_INSENSITIVE));
+        PATTERNS.add(Pattern.compile("onload(.*?)=", Pattern.CASE_INSENSITIVE | Pattern.MULTILINE | Pattern.DOTALL));
+    }
+
+    private String cleanXSS(String value) {
+        if (value != null) {
+            for (Pattern pattern : PATTERNS) {
+                value = pattern.matcher(value).replaceAll("");
+            }
+        }
+        return value;
+    }
+
+}

+ 3 - 1
lts-admin/src/main/webapp/WEB-INF/views/templates/jobLogger.vm

@@ -94,6 +94,7 @@
             <th>日志记录时间</th>
             <th data-hide="all">日志创建时间</th>
             <th>执行节点组</th>
+            <th data-hide="all">执行节点标识</th>
             <th data-hide="phone,tablet">提交节点组</th>
             <th>日志类型</th>
             <th>执行结果</th>
@@ -114,6 +115,7 @@
             <td>{{row.logTime | dateFormat:'yyyy-MM-dd HH:mm:ss'}}</td>
             <td>{{row.gmtCreated | dateFormat:'yyyy-MM-dd HH:mm:ss'}}</td>
             <td>{{row.taskTrackerNodeGroup}}</td>
+            <td>{{row.taskTrackerIdentity}}</td>
             <td>{{row.submitNodeGroup}}</td>
             <td>{{row.logType | format:'logTypeLabel'}}</td>
             <td>{{row.success | format:'successLabel'}}</td>
@@ -131,7 +133,7 @@
         {{/each}}
         {{if results == 0}}
         <tr>
-            <td colspan="15">暂无数据</td>
+            <td colspan="16">暂无数据</td>
         </tr>
         {{/if}}
         </tbody>

+ 5 - 0
lts-core/pom.xml

@@ -115,5 +115,10 @@
             <artifactId>hessian</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.mapdb</groupId>
+            <artifactId>mapdb</artifactId>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
 </project>

+ 84 - 0
lts-core/src/main/java/com/lts/core/commons/io/UnsafeByteArrayInputStream.java

@@ -0,0 +1,84 @@
+package com.lts.core.commons.io;
+
+import java.io.InputStream;
+
+/**
+ * @author Robert HG (254963746@qq.com)
+ */
+public class UnsafeByteArrayInputStream extends InputStream {
+
+    protected byte buf[];
+
+    protected int pos;
+
+    protected int mark = 0;
+
+    protected int count;
+
+    public UnsafeByteArrayInputStream(byte buf[]) {
+        this.buf = buf;
+        this.pos = 0;
+        this.count = buf.length;
+    }
+
+    public UnsafeByteArrayInputStream(byte buf[], int offset, int length) {
+        this.buf = buf;
+        this.pos = offset;
+        this.count = Math.min(offset + length, buf.length);
+        this.mark = offset;
+    }
+
+    public int read() {
+        return (pos < count) ? (buf[pos++] & 0xff) : -1;
+    }
+
+    public int read(byte b[], int off, int len) {
+        if (b == null) {
+            throw new NullPointerException();
+        } else if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException();
+        }
+
+        if (pos >= count) {
+            return -1;
+        }
+
+        int avail = count - pos;
+        if (len > avail) {
+            len = avail;
+        }
+        if (len <= 0) {
+            return 0;
+        }
+        System.arraycopy(buf, pos, b, off, len);
+        pos += len;
+        return len;
+    }
+
+    public long skip(long n) {
+        long k = count - pos;
+        if (n < k) {
+            k = n < 0 ? 0 : n;
+        }
+
+        pos += k;
+        return k;
+    }
+
+    public int available() {
+        return count - pos;
+    }
+
+    public boolean markSupported() {
+        return true;
+    }
+
+    public void mark(int readAheadLimit) {
+        mark = pos;
+    }
+
+    public void reset() {
+        pos = mark;
+    }
+
+}

+ 99 - 0
lts-core/src/main/java/com/lts/core/commons/io/UnsafeByteArrayOutputStream.java

@@ -0,0 +1,99 @@
+package com.lts.core.commons.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+
+/**
+ * @author Robert HG (254963746@qq.com)
+ */
+public class UnsafeByteArrayOutputStream extends OutputStream {
+
+    protected byte buf[];
+
+    protected int count;
+
+    public UnsafeByteArrayOutputStream() {
+        this(32);
+    }
+
+    public UnsafeByteArrayOutputStream(int size) {
+        if (size < 0) {
+            throw new IllegalArgumentException("Negative initial size: "
+                    + size);
+        }
+        buf = new byte[size];
+    }
+
+    private void ensureCapacity(int minCapacity) {
+        // overflow-conscious code
+        if (minCapacity - buf.length > 0)
+            grow(minCapacity);
+    }
+
+    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+    private void grow(int minCapacity) {
+        // overflow-conscious code
+        int oldCapacity = buf.length;
+        int newCapacity = oldCapacity << 1;
+        if (newCapacity - minCapacity < 0)
+            newCapacity = minCapacity;
+        if (newCapacity - MAX_ARRAY_SIZE > 0)
+            newCapacity = hugeCapacity(minCapacity);
+        buf = Arrays.copyOf(buf, newCapacity);
+    }
+
+    private static int hugeCapacity(int minCapacity) {
+        if (minCapacity < 0) // overflow
+            throw new OutOfMemoryError();
+        return (minCapacity > MAX_ARRAY_SIZE) ?
+                Integer.MAX_VALUE :
+                MAX_ARRAY_SIZE;
+    }
+
+    public void write(int b) {
+        ensureCapacity(count + 1);
+        buf[count] = (byte) b;
+        count += 1;
+    }
+
+    public void write(byte b[], int off, int len) {
+        if ((off < 0) || (off > b.length) || (len < 0) ||
+                ((off + len) - b.length > 0)) {
+            throw new IndexOutOfBoundsException();
+        }
+        ensureCapacity(count + len);
+        System.arraycopy(b, off, buf, count, len);
+        count += len;
+    }
+
+    public void writeTo(OutputStream out) throws IOException {
+        out.write(buf, 0, count);
+    }
+    public void reset() {
+        count = 0;
+    }
+
+    public byte toByteArray()[] {
+        return Arrays.copyOf(buf, count);
+    }
+
+    public int size() {
+        return count;
+    }
+
+    public String toString() {
+        return new String(buf, 0, count);
+    }
+
+    public String toString(String charsetName)
+            throws UnsupportedEncodingException {
+        return new String(buf, 0, count, charsetName);
+    }
+
+    public void close() throws IOException {
+    }
+
+}

+ 27 - 0
lts-core/src/main/java/com/lts/core/commons/utils/LRUCache.java

@@ -0,0 +1,27 @@
+package com.lts.core.commons.utils;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 11/9/15.
+ */
+public class LRUCache<K, V> extends LinkedHashMap<K, V> {
+
+    private static final long serialVersionUID = 1L;
+    private final int         maxSize;
+
+    public LRUCache(int maxSize){
+        this(maxSize, 16, 0.75f, false);
+    }
+
+    public LRUCache(int maxSize, int initialCapacity, float loadFactor, boolean accessOrder){
+        super(initialCapacity, loadFactor, accessOrder);
+        this.maxSize = maxSize;
+    }
+
+    protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+        return this.size() > this.maxSize;
+    }
+}
+

+ 2 - 0
lts-core/src/main/java/com/lts/core/constant/Constants.java

@@ -37,6 +37,8 @@ public interface Constants {
     String SUBSCRIBE = "subscribe";
 
     String UNSUBSCRIBE = "unsubscribe";
+
+    int DEFAULT_BUFFER_SIZE = 16 * 1024;
     /**
      * 注册中心失败事件重试事件
      */

+ 6 - 3
lts-core/src/main/java/com/lts/core/failstore/AbstractFailStore.java

@@ -23,10 +23,13 @@ public abstract class AbstractFailStore implements FailStore {
     protected File dbPath;
     private static final String dbLockName = "___db.lock";
 
-    public AbstractFailStore(File dbPath) {
+    public AbstractFailStore(File dbPath, boolean needLock) {
         this.dbPath = dbPath;
         String path = dbPath.getPath();
         this.home = path.substring(0, path.indexOf(getName())).concat(getName());
+        if (needLock) {
+            getLock(dbPath.getPath());
+        }
         init();
     }
 
@@ -71,8 +74,8 @@ public abstract class AbstractFailStore implements FailStore {
 
     private FailStore getFailStore(File dbPath) {
         try {
-            Constructor constructor = this.getClass().getConstructor(File.class);
-            return (FailStore) constructor.newInstance(dbPath);
+            Constructor constructor = this.getClass().getConstructor(File.class, boolean.class);
+            return (FailStore) constructor.newInstance(dbPath, false);
         } catch (Exception e) {
             LOGGER.error("new instance failStore failed,", e);
         }

+ 26 - 0
lts-core/src/main/java/com/lts/core/failstore/AbstractFailStoreFactory.java

@@ -0,0 +1,26 @@
+package com.lts.core.failstore;
+
+import com.lts.core.cluster.Config;
+import com.lts.core.commons.file.FileUtils;
+import com.lts.core.commons.utils.StringUtils;
+
+import java.io.File;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 11/10/15.
+ */
+public abstract class AbstractFailStoreFactory implements FailStoreFactory {
+    @Override
+    public final FailStore getFailStore(Config config, String storePath) {
+        if (StringUtils.isEmpty(storePath)) {
+            storePath = config.getFailStorePath();
+        }
+        File dbPath = new File(storePath.concat(getName()).concat("/").concat(config.getIdentity()));
+        FileUtils.createDirIfNotExist(dbPath);
+        return newInstance(dbPath);
+    }
+
+    protected abstract String getName();
+
+    protected abstract FailStore newInstance(File dbPath);
+}

+ 5 - 6
lts-core/src/main/java/com/lts/core/failstore/berkeleydb/BerkeleydbFailStore.java

@@ -27,7 +27,11 @@ public class BerkeleydbFailStore extends AbstractFailStore {
     private DatabaseConfig dbConfig;
 
     public BerkeleydbFailStore(File dbPath) {
-        super(dbPath);
+        super(dbPath, true);
+    }
+
+    public BerkeleydbFailStore(File dbPath, boolean needLock) {
+        super(dbPath, needLock);
     }
 
     public static final String name = "berkeleydb";
@@ -54,11 +58,6 @@ public class BerkeleydbFailStore extends AbstractFailStore {
         }
     }
 
-    public BerkeleydbFailStore(String storePath, String identity) {
-        this(new File(storePath.concat(name).concat("/").concat(identity)));
-        getLock(dbPath.getPath());
-    }
-
     @Override
     public void open() throws FailStoreException {
         try {

+ 12 - 9
lts-core/src/main/java/com/lts/core/failstore/berkeleydb/BerkeleydbFailStoreFactory.java

@@ -1,19 +1,22 @@
 package com.lts.core.failstore.berkeleydb;
 
-import com.lts.core.cluster.Config;
-import com.lts.core.commons.utils.StringUtils;
+import com.lts.core.failstore.AbstractFailStoreFactory;
 import com.lts.core.failstore.FailStore;
-import com.lts.core.failstore.FailStoreFactory;
+
+import java.io.File;
 
 /**
  * Robert HG (254963746@qq.com) on 5/26/15.
  */
-public class BerkeleydbFailStoreFactory implements FailStoreFactory {
+public class BerkeleydbFailStoreFactory extends AbstractFailStoreFactory{
+
+    @Override
+    protected String getName() {
+        return BerkeleydbFailStore.name;
+    }
+
     @Override
-    public FailStore getFailStore(Config config, String storePath) {
-        if (StringUtils.isEmpty(storePath)) {
-            storePath = config.getFailStorePath();
-        }
-        return new BerkeleydbFailStore(storePath, config.getIdentity());
+    protected FailStore newInstance(File dbPath) {
+        return new BerkeleydbFailStore(dbPath);
     }
 }

+ 4 - 5
lts-core/src/main/java/com/lts/core/failstore/leveldb/LeveldbFailStore.java

@@ -28,13 +28,12 @@ public class LeveldbFailStore extends AbstractFailStore {
 
     public static final String name = "leveldb";
 
-    public LeveldbFailStore(String storePath, String identity) {
-        this(new File(storePath.concat(name).concat("/").concat(identity)));
-        getLock(dbPath.getPath());
+    public LeveldbFailStore(File dbPath) {
+        super(dbPath, true);
     }
 
-    public LeveldbFailStore(File dbPath) {
-        super(dbPath);
+    public LeveldbFailStore(File dbPath, boolean needLock) {
+        super(dbPath, needLock);
     }
 
     @Override

+ 12 - 9
lts-core/src/main/java/com/lts/core/failstore/leveldb/LeveldbFailStoreFactory.java

@@ -1,19 +1,22 @@
 package com.lts.core.failstore.leveldb;
 
-import com.lts.core.cluster.Config;
-import com.lts.core.commons.utils.StringUtils;
+import com.lts.core.failstore.AbstractFailStoreFactory;
 import com.lts.core.failstore.FailStore;
-import com.lts.core.failstore.FailStoreFactory;
+
+import java.io.File;
 
 /**
  * Robert HG (254963746@qq.com) on 5/21/15.
  */
-public class LeveldbFailStoreFactory implements FailStoreFactory {
+public class LeveldbFailStoreFactory extends AbstractFailStoreFactory {
+
+    @Override
+    protected String getName() {
+        return LeveldbFailStore.name;
+    }
+
     @Override
-    public FailStore getFailStore(Config config, String storePath) {
-        if (StringUtils.isEmpty(storePath)) {
-            storePath = config.getFailStorePath();
-        }
-        return new LeveldbFailStore(storePath, config.getIdentity());
+    protected FailStore newInstance(File dbPath) {
+        return new LeveldbFailStore(dbPath);
     }
 }

+ 139 - 0
lts-core/src/main/java/com/lts/core/failstore/mapdb/MapdbFailStore.java

@@ -0,0 +1,139 @@
+package com.lts.core.failstore.mapdb;
+
+import com.lts.core.commons.file.FileUtils;
+import com.lts.core.commons.utils.JSONUtils;
+import com.lts.core.domain.KVPair;
+import com.lts.core.failstore.AbstractFailStore;
+import com.lts.core.failstore.FailStoreException;
+import org.mapdb.DB;
+import org.mapdb.DBMaker;
+
+import java.io.File;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentNavigableMap;
+
+/**
+ * see http://www.mapdb.org/
+ *
+ * @author Robert HG (254963746@qq.com) on 11/10/15.
+ */
+public class MapdbFailStore extends AbstractFailStore {
+
+    public static final String name = "mapdb";
+    private DB db;
+    private ConcurrentNavigableMap<String, String> map;
+
+    public MapdbFailStore(File dbPath) {
+        super(dbPath, true);
+    }
+
+    public MapdbFailStore(File dbPath, boolean needLock) {
+        super(dbPath, needLock);
+    }
+
+    @Override
+    protected void init() {
+        String dbName = dbPath.getPath() + "/lts.db";
+        db = DBMaker.fileDB(new File(dbName))
+                .closeOnJvmShutdown()
+                .encryptionEnable("lts")
+                .make();
+    }
+
+    @Override
+    protected String getName() {
+        return name;
+    }
+
+    @Override
+    public void open() throws FailStoreException {
+        try {
+            map = db.treeMap("lts");
+        } catch (Exception e) {
+            throw new FailStoreException(e);
+        }
+    }
+
+    @Override
+    public void put(String key, Object value) throws FailStoreException {
+        try {
+            String valueString = JSONUtils.toJSONString(value);
+            map.put(key, valueString);
+            // persist changes into disk
+            db.commit();
+        } catch (Exception e) {
+            db.rollback();
+            throw new FailStoreException(e);
+        }
+    }
+
+    @Override
+    public void delete(String key) throws FailStoreException {
+        try {
+            map.remove(key);
+            // persist changes into disk
+            db.commit();
+        } catch (Exception e) {
+            db.rollback();
+            throw new FailStoreException(e);
+        }
+    }
+
+    @Override
+    public void delete(List<String> keys) throws FailStoreException {
+        if (keys == null || keys.size() == 0) {
+            return;
+        }
+        try {
+            for (String key : keys) {
+                map.remove(key);
+            }
+            db.commit();
+        } catch (Exception e) {
+            db.rollback();
+            throw new FailStoreException(e);
+        }
+    }
+
+    @Override
+    public <T> List<KVPair<String, T>> fetchTop(int size, Type type) throws FailStoreException {
+
+        List<KVPair<String, T>> list = new ArrayList<KVPair<String, T>>(size);
+        if (map.size() == 0) {
+            return list;
+        }
+        for (Map.Entry<String, String> entry : map.entrySet()) {
+            String key = entry.getKey();
+            T value = JSONUtils.parse(entry.getValue(), type);
+            KVPair<String, T> pair = new KVPair<String, T>(key, value);
+            list.add(pair);
+            if (list.size() >= size) {
+                break;
+            }
+        }
+        return list;
+    }
+
+    @Override
+    public void close() throws FailStoreException {
+        try {
+            db.close();
+        } catch (Exception e) {
+            throw new FailStoreException(e);
+        }
+    }
+
+    @Override
+    public void destroy() throws FailStoreException {
+        try {
+            close();
+        } catch (Exception e) {
+            throw new FailStoreException(e);
+        } finally {
+            FileUtils.delete(dbPath);
+        }
+    }
+}

+ 22 - 0
lts-core/src/main/java/com/lts/core/failstore/mapdb/MapdbFailStoreFactory.java

@@ -0,0 +1,22 @@
+package com.lts.core.failstore.mapdb;
+
+import com.lts.core.failstore.AbstractFailStoreFactory;
+import com.lts.core.failstore.FailStore;
+
+import java.io.File;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 11/10/15.
+ */
+public class MapdbFailStoreFactory extends AbstractFailStoreFactory{
+
+    @Override
+    protected String getName() {
+        return MapdbFailStore.name;
+    }
+
+    @Override
+    protected FailStore newInstance(File dbPath) {
+        return new MapdbFailStore(dbPath);
+    }
+}

+ 5 - 6
lts-core/src/main/java/com/lts/core/failstore/rocksdb/RocksdbFailStore.java

@@ -23,7 +23,11 @@ public class RocksdbFailStore extends AbstractFailStore {
     private Options options;
 
     public RocksdbFailStore(File dbPath) {
-        super(dbPath);
+        super(dbPath, true);
+    }
+
+    public RocksdbFailStore(File dbPath, boolean needLock) {
+        super(dbPath, needLock);
     }
 
     public static final String name = "rocksdb";
@@ -53,11 +57,6 @@ public class RocksdbFailStore extends AbstractFailStore {
         options.setTableFormatConfig(tableConfig);
     }
 
-    public RocksdbFailStore(String storePath, String identity) {
-        this(new File(storePath.concat(name).concat("/").concat(identity)));
-        getLock(dbPath.getPath());
-    }
-
     @Override
     protected String getName() {
         return name;

+ 11 - 9
lts-core/src/main/java/com/lts/core/failstore/rocksdb/RocksdbFailStoreFactory.java

@@ -1,20 +1,22 @@
 package com.lts.core.failstore.rocksdb;
 
-import com.lts.core.cluster.Config;
-import com.lts.core.commons.utils.StringUtils;
+import com.lts.core.failstore.AbstractFailStoreFactory;
 import com.lts.core.failstore.FailStore;
-import com.lts.core.failstore.FailStoreFactory;
+
+import java.io.File;
 
 /**
  * Robert HG (254963746@qq.com) on 5/27/15.
  */
-public class RocksdbFailStoreFactory implements FailStoreFactory {
+public class RocksdbFailStoreFactory extends AbstractFailStoreFactory{
+
+    @Override
+    protected String getName() {
+        return RocksdbFailStore.name;
+    }
 
     @Override
-    public FailStore getFailStore(Config config, String storePath) {
-        if (StringUtils.isEmpty(storePath)) {
-            storePath = config.getFailStorePath();
-        }
-        return new RocksdbFailStore(storePath, config.getIdentity());
+    protected FailStore newInstance(File dbPath) {
+        return new RocksdbFailStore(dbPath);
     }
 }

+ 0 - 5
lts-core/src/main/java/com/lts/core/support/RetryScheduler.java

@@ -276,16 +276,11 @@ public abstract class RetryScheduler<T> {
 
     /**
      * 远程连接是否可用
-     *
-     * @return
      */
     protected abstract boolean isRemotingEnable();
 
     /**
      * 重试
-     *
-     * @param list
-     * @return
      */
     protected abstract boolean retry(List<T> list);
 

+ 3 - 4
lts-core/src/main/java/com/lts/remoting/netty/NettyCodecFactory.java

@@ -1,5 +1,6 @@
 package com.lts.remoting.netty;
 
+import com.lts.core.constant.Constants;
 import com.lts.core.logger.Logger;
 import com.lts.core.logger.LoggerFactory;
 import com.lts.remoting.Channel;
@@ -27,8 +28,6 @@ public class NettyCodecFactory {
         this.codec = codec;
     }
 
-    private ChannelHandler encoder = new NettyEncoder();
-
     @ChannelHandler.Sharable
     public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
         @Override
@@ -53,7 +52,7 @@ public class NettyCodecFactory {
 
     public class NettyDecoder extends LengthFieldBasedFrameDecoder {
 
-        private static final int FRAME_MAX_LENGTH = 1024 * 1024 * 8;
+        private static final int FRAME_MAX_LENGTH = Constants.DEFAULT_BUFFER_SIZE;
 
         public NettyDecoder() {
             super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
@@ -85,7 +84,7 @@ public class NettyCodecFactory {
     }
 
     public ChannelHandler getEncoder() {
-        return encoder;
+        return new NettyEncoder();
     }
 
     public ChannelHandler getDecoder() {

+ 5 - 17
lts-core/src/main/java/com/lts/remoting/serialize/Hessian2Serializable.java

@@ -2,9 +2,8 @@ package com.lts.remoting.serialize;
 
 import com.caucho.hessian.io.Hessian2Input;
 import com.caucho.hessian.io.Hessian2Output;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
+import com.lts.core.commons.io.UnsafeByteArrayInputStream;
+import com.lts.core.commons.io.UnsafeByteArrayOutputStream;
 
 /**
  * @author Robert HG (254963746@qq.com) on 11/6/15.
@@ -19,35 +18,24 @@ public class Hessian2Serializable implements RemotingSerializable {
     @Override
     public byte[] serialize(Object obj) throws Exception {
 
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-
+        UnsafeByteArrayOutputStream bos = new UnsafeByteArrayOutputStream();
         Hessian2Output out = new Hessian2Output(bos);
-
         out.startMessage();
-
         out.writeObject(obj);
-
         out.completeMessage();
         out.close();
-        bos.close();
-
         return bos.toByteArray();
     }
 
     @Override
     public <T> T deserialize(byte[] data, Class<T> clazz) throws Exception {
 
-        ByteArrayInputStream bin = new ByteArrayInputStream(data);
+        UnsafeByteArrayInputStream bin = new UnsafeByteArrayInputStream(data);
         Hessian2Input in = new Hessian2Input(bin);
-
         in.startMessage();
-
-        Object obj = in.readObject();
-
+        Object obj = in.readObject(clazz);
         in.completeMessage();
         in.close();
-        bin.close();
-
         return (T) obj;
     }
 

+ 5 - 11
lts-core/src/main/java/com/lts/remoting/serialize/JavaSerializable.java

@@ -1,5 +1,8 @@
 package com.lts.remoting.serialize;
 
+import com.lts.core.commons.io.UnsafeByteArrayInputStream;
+import com.lts.core.commons.io.UnsafeByteArrayOutputStream;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.ObjectInputStream;
@@ -18,38 +21,29 @@ public class JavaSerializable implements RemotingSerializable {
     @Override
     public byte[] serialize(Object obj) throws Exception {
 
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        UnsafeByteArrayOutputStream bos = new UnsafeByteArrayOutputStream();
         ObjectOutputStream oos = new ObjectOutputStream(bos);
 
         try {
-
             oos.writeObject(obj);
-
             oos.flush();
-
             return bos.toByteArray();
-
         } finally {
             oos.close();
-            bos.close();
         }
     }
 
     @Override
     public <T> T deserialize(byte[] data, Class<T> clazz) throws Exception {
 
-        ByteArrayInputStream bin = new ByteArrayInputStream(data);
+        UnsafeByteArrayInputStream bin = new UnsafeByteArrayInputStream(data);
         ObjectInputStream ois = new ObjectInputStream(bin);
 
         try {
-
             Object obj = ois.readObject();
-
             return (T) obj;
-
         } finally {
             ois.close();
-            bin.close();
         }
     }
 

+ 2 - 1
lts-core/src/main/resources/META-INF/lts/internal/com.lts.core.failstore.FailStoreFactory

@@ -1,3 +1,4 @@
 leveldb=com.lts.core.failstore.leveldb.LeveldbFailStoreFactory
 berkeleydb=com.lts.core.failstore.berkeleydb.BerkeleydbFailStoreFactory
-rocksdb=com.lts.core.failstore.rocksdb.RocksdbFailStoreFactory
+rocksdb=com.lts.core.failstore.rocksdb.RocksdbFailStoreFactory
+mapdb=com.lts.core.failstore.mapdb.MapdbFailStoreFactory

+ 4 - 4
lts-core/src/test/java/com/lts/core/failstore/berkeleydb/BerkeleydbFailStoreTest.java

@@ -27,11 +27,11 @@ public class BerkeleydbFailStoreTest {
     @Before
     public void setup() throws FailStoreException {
         Config config = new Config();
-        config.setNodeGroup("berkeleydb_test");
-        config.setNodeType(NodeType.JOB_CLIENT);
         config.setDataPath(Constants.USER_HOME);
-        config.setIdentity(StringUtils.generateUUID());
-        failStore = new BerkeleydbFailStore(config.getFailStorePath(), config.getIdentity());
+        config.setNodeGroup("test");
+        config.setNodeType(NodeType.JOB_CLIENT);
+        config.setIdentity("testIdentity");
+        failStore = new BerkeleydbFailStoreFactory().getFailStore(config, config.getFailStorePath());
         failStore.open();
     }
 

+ 7 - 3
lts-core/src/test/java/com/lts/core/failstore/leveldb/LeveldbFailStoreTest.java

@@ -1,9 +1,11 @@
 package com.lts.core.failstore.leveldb;
 
 import com.lts.core.cluster.Config;
+import com.lts.core.cluster.NodeType;
 import com.lts.core.commons.utils.CollectionUtils;
 import com.lts.core.commons.utils.JSONUtils;
 import com.lts.core.commons.utils.StringUtils;
+import com.lts.core.constant.Constants;
 import com.lts.core.domain.Job;
 import com.lts.core.domain.KVPair;
 import com.lts.core.failstore.FailStore;
@@ -25,9 +27,11 @@ public class LeveldbFailStoreTest {
     @Before
     public void setup() throws FailStoreException {
         Config config = new Config();
-        config.setIdentity(StringUtils.generateUUID());
-        config.setDataPath("/Users/hugui/.lts/TASK_TRACKER/test_trade_TaskTracker/");
-        failStore = new LeveldbFailStore(config.getFailStorePath(), config.getIdentity());
+        config.setIdentity("testIdentity");
+        config.setDataPath(Constants.USER_HOME);
+        config.setNodeGroup("test");
+        config.setNodeType(NodeType.JOB_CLIENT);
+        failStore = new LeveldbFailStoreFactory().getFailStore(config, config.getFailStorePath());
         failStore.open();
     }
 

+ 57 - 0
lts-core/src/test/java/com/lts/core/failstore/mapdb/MapdbFailStoreTest.java

@@ -0,0 +1,57 @@
+package com.lts.core.failstore.mapdb;
+
+import com.lts.core.cluster.Config;
+import com.lts.core.cluster.NodeType;
+import com.lts.core.commons.utils.CollectionUtils;
+import com.lts.core.commons.utils.JSONUtils;
+import com.lts.core.commons.utils.StringUtils;
+import com.lts.core.constant.Constants;
+import com.lts.core.domain.Job;
+import com.lts.core.domain.KVPair;
+import com.lts.core.failstore.FailStore;
+import com.lts.core.failstore.FailStoreException;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Created by hugui on 11/10/15.
+ */
+public class MapdbFailStoreTest {
+
+    private String key = "23412x";
+    FailStore failStore;
+
+    @Before
+    public void setup() throws FailStoreException {
+        Config config = new Config();
+        config.setIdentity("testIdentity");
+        config.setDataPath(Constants.USER_HOME);
+        config.setNodeGroup("test");
+        config.setNodeType(NodeType.JOB_CLIENT);
+        failStore = new MapdbFailStoreFactory().getFailStore(config, config.getFailStorePath());
+        failStore.open();
+    }
+
+    @Test
+    public void put() throws FailStoreException {
+        Job job = new Job();
+        job.setTaskId("2131232");
+        for (int i = 0; i < 100; i++) {
+            failStore.put(key + "" + i, job);
+        }
+        System.out.println("这里debug测试多线程");
+        failStore.close();
+    }
+
+    @Test
+    public void fetchTop() throws FailStoreException {
+        List<KVPair<String, Job>> kvPairs = failStore.fetchTop(5, Job.class);
+        if (CollectionUtils.isNotEmpty(kvPairs)) {
+            for (KVPair<String, Job> kvPair : kvPairs) {
+                System.out.println(JSONUtils.toJSONString(kvPair));
+            }
+        }
+    }
+}

+ 4 - 4
lts-core/src/test/java/com/lts/core/failstore/rocksdb/RocksdbFailStoreTest.java

@@ -27,11 +27,11 @@ public class RocksdbFailStoreTest {
     @Before
     public void setup() throws FailStoreException {
         Config config = new Config();
-        config.setIdentity(StringUtils.generateUUID());
-        config.setNodeGroup("berkeleydb_test");
-        config.setNodeType(NodeType.JOB_CLIENT);
+        config.setIdentity("testIdentity");
         config.setDataPath(Constants.USER_HOME);
-        failStore = new RocksdbFailStore(config.getFailStorePath(), config.getIdentity());
+        config.setNodeGroup("test");
+        config.setNodeType(NodeType.JOB_CLIENT);
+        failStore = new RocksdbFailStoreFactory().getFailStore(config, config.getFailStorePath());
         failStore.open();
     }
 

+ 4 - 0
lts-example/pom.xml

@@ -118,5 +118,9 @@
             <groupId>com.caucho</groupId>
             <artifactId>hessian</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.mapdb</groupId>
+            <artifactId>mapdb</artifactId>
+        </dependency>
     </dependencies>
 </project>

+ 3 - 2
lts-example/src/main/java/com/lts/example/api/JobClientTest.java

@@ -83,8 +83,9 @@ public class JobClientTest extends BaseJobClientTest {
         jobClient.setJobFinishedHandler(new JobFinishedHandlerImpl());
         // master 节点变化监听器,当有集群中只需要一个节点执行某个事情的时候,可以监听这个事件
         jobClient.addMasterChangeListener(new MasterChangeListenerImpl());
-        // 可选址  leveldb(默认), rocksdb, berkeleydb
-        // taskTracker.addConfig("job.fail.store", "leveldb");
+        // 可选址  leveldb(默认), rocksdb, berkeleydb, mapdb
+//        jobClient.addConfig("job.fail.store", "mapdb");
+        jobClient.addConfig("lts.remoting.serializable.default", "hessian2");
         jobClient.start();
 
         JobClientTest jobClientTest = new JobClientTest();

+ 6 - 0
pom.xml

@@ -47,6 +47,7 @@
         <springframework.version>4.1.6.RELEASE</springframework.version>
         <mina.version>2.0.9</mina.version>
         <hessian.version>4.0.38</hessian.version>
+        <mapdb.version>2.0-beta10</mapdb.version>
     </properties>
 
     <dependencyManagement>
@@ -196,6 +197,11 @@
                 <artifactId>hessian</artifactId>
                 <version>${hessian.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.mapdb</groupId>
+                <artifactId>mapdb</artifactId>
+                <version>${mapdb.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
     <build>