فهرست منبع

performance: ltsdb

hugui 8 سال پیش
والد
کامیت
db5ab0bb87

+ 19 - 1
lts-core/src/main/java/com/github/ltsopensource/core/commons/utils/BatchUtils.java

@@ -12,7 +12,7 @@ public class BatchUtils {
     /**
      * 批量处理切分的时候,返回第index个List
      */
-    public static <E> List<E> getBatchList(Integer index, int batchSize, Collection<E> collection) {
+    private static <E> List<E> getBatchList(Integer index, int batchSize, Collection<E> collection) {
         List<E> list = null;
         if (collection instanceof List) {
             list = (List<E>) collection;
@@ -27,6 +27,24 @@ public class BatchUtils {
         }
     }
 
+    public static <E> void batchExecute(int totalSize, int batchSize, Collection<E> collection, Executor<E> executor) {
+
+        for (int i = 0; i <= totalSize / batchSize; i++) {
+            List<E> list = BatchUtils.getBatchList(i, batchSize, collection);
+
+            if (CollectionUtils.isNotEmpty(list)) {
+                if (!executor.execute(list)) {
+                    break;
+                }
+            }
+        }
+    }
+
+    public interface Executor<E> {
+
+        // 返回是否需要继续
+        boolean execute(List<E> list);
+    }
 }
 
 

+ 16 - 13
lts-core/src/main/java/com/github/ltsopensource/core/monitor/MStatReportWorker.java

@@ -11,6 +11,7 @@ import com.github.ltsopensource.core.cmd.HttpCmdNames;
 import com.github.ltsopensource.core.cmd.HttpCmdParamNames;
 import com.github.ltsopensource.core.commons.utils.BatchUtils;
 import com.github.ltsopensource.core.commons.utils.CollectionUtils;
+import com.github.ltsopensource.core.commons.utils.Holder;
 import com.github.ltsopensource.core.constant.ExtConfig;
 import com.github.ltsopensource.core.domain.monitor.MData;
 import com.github.ltsopensource.core.domain.monitor.MNode;
@@ -104,7 +105,7 @@ public class MStatReportWorker implements Runnable {
             mDataQueue = mDataQueue.subList(size - MAX_RETRY_RETAIN, size);
         }
 
-        List<Node> monitorNodes = appContext.getSubscribedNodeManager().getNodeList(NodeType.MONITOR);
+        final List<Node> monitorNodes = appContext.getSubscribedNodeManager().getNodeList(NodeType.MONITOR);
         if (CollectionUtils.isEmpty(monitorNodes)) {
             if (LOGGER.isDebugEnabled()) {
                 LOGGER.debug("Please Start LTS-Monitor");
@@ -112,37 +113,39 @@ public class MStatReportWorker implements Runnable {
             return;
         }
 
-        int toIndex = 0;
+        final Holder<Integer> toIndex = new Holder<Integer>(0);
         size = mDataQueue.size();
         try {
-            for (int i = 0; i <= size / BATCH_REPORT_SIZE; i++) {
-                List<MData> mDatas = BatchUtils.getBatchList(i, BATCH_REPORT_SIZE, mDataQueue);
-                if (CollectionUtils.isNotEmpty(mDatas)) {
+
+            BatchUtils.batchExecute(size, BATCH_REPORT_SIZE, mDataQueue, new BatchUtils.Executor<MData>() {
+                @Override
+                public boolean execute(List<MData> list) {
                     try {
                         HttpCmd cmd = new DefaultHttpCmd();
                         cmd.setCommand(HttpCmdNames.HTTP_CMD_ADD_M_DATA);
                         cmd.addParam(HttpCmdParamNames.M_NODE, JSON.toJSONString(buildMNode()));
-                        cmd.addParam(HttpCmdParamNames.M_DATA, JSON.toJSONString(mDatas));
+                        cmd.addParam(HttpCmdParamNames.M_DATA, JSON.toJSONString(list));
 
                         if (sendReq(monitorNodes, cmd)) {
-                            toIndex = toIndex + CollectionUtils.sizeOf(mDatas);
+                            toIndex.set(toIndex.get() + CollectionUtils.sizeOf(list));
                         } else {
-                            break;
+                            return false;
                         }
                     } catch (Exception e) {
                         LOGGER.warn("Report monitor data Error : " + e.getMessage(), e);
-                        break;
+                        return false;
                     }
+                    return true;
                 }
-            }
+            });
         } finally {
             // to delete
-            if (toIndex == 0) {
+            if (toIndex.get() == 0) {
                 // do nothing
-            } else if (size == toIndex) {
+            } else if (size == toIndex.get()) {
                 mDataQueue.clear();
             } else {
-                mDataQueue = mDataQueue.subList(toIndex + 1, size);
+                mDataQueue = mDataQueue.subList(toIndex.get() + 1, size);
             }
         }
     }

+ 1 - 0
lts-core/src/main/java/com/github/ltsopensource/kv/DBImpl.java

@@ -47,6 +47,7 @@ public class DBImpl<K, V> implements DB<K, V>, Closeable {
             this.index = new MemIndex<K, V>(storeConfig, dataBlockEngine, dataCache);
             this.txLogReplay = new TxLogReplay<K, V>(storeTxLogEngine, dataBlockEngine, index, dataCache);
             this.indexSnapshot = new MemIndexSnapshot<K, V>(txLogReplay, index, storeConfig, serializer);
+            ((MemIndex<K, V>) this.index).setIndexSnapshot(this.indexSnapshot);
         } else {
             throw new IllegalArgumentException("Illegal IndexEngine " + storeConfig.getIndexType());
         }

+ 19 - 0
lts-core/src/main/java/com/github/ltsopensource/kv/StoreConfig.java

@@ -20,6 +20,7 @@ public class StoreConfig {
     private File indexPath;
     // 索引snapshot的间隔
     private int indexSnapshotInterval = 60 * 1000;     // 60s
+    private int indexSnapshotThreshold = 1000;         // 当超过一千个改变的时候snapshot
     // 定时刷盘时间
     private int flushDataInterval = 1000;   // 1s
     // 是否开启定时刷盘, 默认不开启
@@ -42,6 +43,8 @@ public class StoreConfig {
     private int maxIndexSnapshotSize = 3;
     // DataBlock 合并检查间隔
     private int dataBlockCompactCheckInterval = 60 * 1000;
+    // index每次批量写入的size是100条
+    private int indexSnapshotBatchSize = 100;
 
     public int getDbLogFlushInterval() {
         return dbLogFlushInterval;
@@ -170,4 +173,20 @@ public class StoreConfig {
     public void setDataBlockCompactCheckInterval(int dataBlockCompactCheckInterval) {
         this.dataBlockCompactCheckInterval = dataBlockCompactCheckInterval;
     }
+
+    public int getIndexSnapshotThreshold() {
+        return indexSnapshotThreshold;
+    }
+
+    public void setIndexSnapshotThreshold(int indexSnapshotThreshold) {
+        this.indexSnapshotThreshold = indexSnapshotThreshold;
+    }
+
+    public int getIndexSnapshotBatchSize() {
+        return indexSnapshotBatchSize;
+    }
+
+    public void setIndexSnapshotBatchSize(int indexSnapshotBatchSize) {
+        this.indexSnapshotBatchSize = indexSnapshotBatchSize;
+    }
 }

+ 9 - 4
lts-core/src/main/java/com/github/ltsopensource/kv/data/DataBlockEngine.java

@@ -3,6 +3,8 @@ package com.github.ltsopensource.kv.data;
 import com.github.ltsopensource.core.commons.file.FileUtils;
 import com.github.ltsopensource.core.commons.io.UnsafeByteArrayInputStream;
 import com.github.ltsopensource.core.commons.io.UnsafeByteArrayOutputStream;
+import com.github.ltsopensource.core.json.TypeReference;
+import com.github.ltsopensource.core.logger.Logger;
 import com.github.ltsopensource.kv.CapacityNotEnoughException;
 import com.github.ltsopensource.kv.DB;
 import com.github.ltsopensource.kv.DBException;
@@ -10,8 +12,6 @@ import com.github.ltsopensource.kv.StoreConfig;
 import com.github.ltsopensource.kv.index.IndexItem;
 import com.github.ltsopensource.kv.serializer.StoreSerializer;
 import com.github.ltsopensource.kv.txlog.StoreTxLogPosition;
-import com.github.ltsopensource.core.json.TypeReference;
-import com.github.ltsopensource.core.logger.Logger;
 
 import java.io.File;
 import java.io.FilenameFilter;
@@ -93,7 +93,7 @@ public class DataBlockEngine<K, V> {
         storeConfig.setLastTxLogPositionOnDataBlock(maxTxLog);
     }
 
-    protected List<DataBlock> getReadonlyBlocks(){
+    protected List<DataBlock> getReadonlyBlocks() {
         return readonlyBlocks;
     }
 
@@ -102,15 +102,20 @@ public class DataBlockEngine<K, V> {
      */
     public DataAppendResult append(StoreTxLogPosition storeTxLogPosition, K key, V value) {
 
+        UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream();
         try {
             DataEntry<K, V> dataEntry = new DataEntry<K, V>(key, value);
-            UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream();
             serializer.serialize(dataEntry, out);
 
             return append(storeTxLogPosition, out.toByteArray());
 
         } catch (Exception e) {
             throw new DBException("Persistent data error: " + e.getMessage(), e);
+        } finally {
+            try {
+                out.close();
+            } catch (IOException ignored) {
+            }
         }
     }
 

+ 3 - 1
lts-core/src/main/java/com/github/ltsopensource/kv/data/DataBlockFileHeader.java

@@ -89,7 +89,9 @@ public class DataBlockFileHeader extends AbstractFileHeader {
 
     @Override
     public void read(FileChannel fileChannel) throws IOException {
-
+        if (fileChannel.size() == 0) {
+            return;
+        }
         fileChannel.position(0);
         fileChannel.read(byteBuffer());
 

+ 4 - 0
lts-core/src/main/java/com/github/ltsopensource/kv/index/IndexSnapshotFileHeader.java

@@ -10,6 +10,7 @@ import java.nio.channels.FileChannel;
  * 1. magic                  2 byte
  * 3. storeTxLogPosition     8 byte      // 快照到的最后一条事务日志ID
  * <p/>
+ *
  * @author Robert HG (254963746@qq.com) on 12/19/15.
  */
 public class IndexSnapshotFileHeader extends AbstractFileHeader {
@@ -25,6 +26,9 @@ public class IndexSnapshotFileHeader extends AbstractFileHeader {
 
     @Override
     public void read(FileChannel fileChannel) throws IOException {
+        if (fileChannel.size() == 0) {
+            return;
+        }
 
         fileChannel.position(0);
         fileChannel.read(byteBuffer());

+ 29 - 3
lts-core/src/main/java/com/github/ltsopensource/kv/index/MemIndex.java

@@ -1,5 +1,8 @@
 package com.github.ltsopensource.kv.index;
 
+import com.github.ltsopensource.core.factory.NamedThreadFactory;
+import com.github.ltsopensource.core.logger.Logger;
+import com.github.ltsopensource.core.logger.LoggerFactory;
 import com.github.ltsopensource.kv.Entry;
 import com.github.ltsopensource.kv.StoreConfig;
 import com.github.ltsopensource.kv.cache.DataCache;
@@ -8,25 +11,43 @@ import com.github.ltsopensource.kv.iterator.DBIterator;
 import com.github.ltsopensource.kv.iterator.MemIteratorImpl;
 import com.github.ltsopensource.kv.txlog.StoreTxLogPosition;
 
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * @author Robert HG (254963746@qq.com) on 12/16/15.
  */
 public class MemIndex<K, V> implements Index<K, V> {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(MemIndex.class);
     private StoreTxLogPosition lastTxLog;
     private ConcurrentMap<K, IndexItem<K>> indexMap;
     private StoreConfig storeConfig;
     private DataBlockEngine<K, V> dataBlockEngine;
     private DataCache<K, V> dataCache;
+    private AtomicLong lastSnapshotChangeNum = new AtomicLong(0);
+    private AtomicLong currentChangeNum = new AtomicLong(0);
+    private IndexSnapshot<K, V> indexSnapshot;
 
-    public MemIndex(StoreConfig storeConfig, DataBlockEngine<K, V> dataBlockEngine, DataCache<K, V> dataCache) {
+    public MemIndex(final StoreConfig storeConfig, DataBlockEngine<K, V> dataBlockEngine, DataCache<K, V> dataCache) {
         this.indexMap = new ConcurrentSkipListMap<K, IndexItem<K>>();
         this.storeConfig = storeConfig;
         this.dataBlockEngine = dataBlockEngine;
         this.dataCache = dataCache;
+        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ltsdb-index-snapshot-check-service", true));
+        executorService.scheduleWithFixedDelay(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    // 检查一下当改变的量达到一定量时要snapshot
+                    if (currentChangeNum.get() - lastSnapshotChangeNum.get() > storeConfig.getIndexSnapshotThreshold()) {
+                        indexSnapshot.snapshot();
+                    }
+                } catch (Throwable t) {
+                    LOGGER.error("SNAPSHOT Error", t);
+                }
+            }
+        }, 3, 2, TimeUnit.SECONDS);
     }
 
     public IndexItem<K> getIndexItem(K key) {
@@ -37,6 +58,7 @@ public class MemIndex<K, V> implements Index<K, V> {
     public IndexItem<K> removeIndexItem(StoreTxLogPosition txLogResult, K key) {
         IndexItem<K> value = indexMap.remove(key);
         this.lastTxLog = txLogResult;
+        currentChangeNum.incrementAndGet();
         return value;
     }
 
@@ -44,6 +66,7 @@ public class MemIndex<K, V> implements Index<K, V> {
     public void putIndexItem(StoreTxLogPosition txLogResult, K key, IndexItem<K> indexItem) {
         indexMap.put(key, indexItem);
         this.lastTxLog = txLogResult;
+        currentChangeNum.incrementAndGet();
     }
 
     @Override
@@ -78,6 +101,9 @@ public class MemIndex<K, V> implements Index<K, V> {
         this.indexMap = indexMap;
     }
 
+    public void setIndexSnapshot(IndexSnapshot<K, V> indexSnapshot) {
+        this.indexSnapshot = indexSnapshot;
+    }
 }
 
 

+ 42 - 30
lts-core/src/main/java/com/github/ltsopensource/kv/index/MemIndexSnapshot.java

@@ -3,11 +3,11 @@ package com.github.ltsopensource.kv.index;
 import com.github.ltsopensource.core.commons.file.FileUtils;
 import com.github.ltsopensource.core.commons.io.UnsafeByteArrayInputStream;
 import com.github.ltsopensource.core.commons.io.UnsafeByteArrayOutputStream;
+import com.github.ltsopensource.core.json.TypeReference;
 import com.github.ltsopensource.kv.StoreConfig;
 import com.github.ltsopensource.kv.replay.TxLogReplay;
 import com.github.ltsopensource.kv.serializer.StoreSerializer;
 import com.github.ltsopensource.kv.txlog.StoreTxLogPosition;
-import com.github.ltsopensource.core.json.TypeReference;
 
 import java.io.File;
 import java.io.FilenameFilter;
@@ -20,6 +20,7 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * @author Robert HG (254963746@qq.com) on 12/19/15.
@@ -27,6 +28,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
 public class MemIndexSnapshot<K, V> extends AbstractIndexSnapshot<K, V> {
 
     private TxLogReplay<K, V> txLogReplay;
+    private AtomicBoolean snapshoting = new AtomicBoolean(false);
 
     public MemIndexSnapshot(TxLogReplay<K, V> txLogReplay, Index<K, V> index, StoreConfig storeConfig, StoreSerializer serializer) {
         super(index, storeConfig, serializer);
@@ -137,46 +139,56 @@ public class MemIndexSnapshot<K, V> extends AbstractIndexSnapshot<K, V> {
 
     @Override
     public void snapshot() throws IOException {
-
-        StoreTxLogPosition storeTxLogPosition = index.lastTxLog();
-
-        if (storeTxLogPosition == null) {
+        if (!snapshoting.compareAndSet(false, true)) {
             return;
         }
-        if (lastStoreTxLogPosition != null && lastStoreTxLogPosition.getRecordId() == storeTxLogPosition.getRecordId()) {
-            return;
-        }
-
-        ConcurrentMap<K, IndexItem<K>> indexMap = ((MemIndex<K, V>) index).getIndexMap();
+        try {
+            StoreTxLogPosition storeTxLogPosition = index.lastTxLog();
 
-        String name = System.currentTimeMillis() + ".snapshot";
-        File snapshot = new File(storeConfig.getIndexPath(), name);
-        FileChannel fileChannel = FileUtils.newFileChannel(snapshot, "rw");
+            if (storeTxLogPosition == null) {
+                return;
+            }
+            if (lastStoreTxLogPosition != null && lastStoreTxLogPosition.getRecordId() == storeTxLogPosition.getRecordId()) {
+                return;
+            }
 
-        IndexSnapshotFileHeader fileHeader = new IndexSnapshotFileHeader();
+            ConcurrentMap<K, IndexItem<K>> indexMap = ((MemIndex<K, V>) index).getIndexMap();
 
-        UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream();
-        serializer.serialize(indexMap, os);
-        byte[] payload = os.toByteArray();
-        ReadableByteChannel src = Channels.newChannel(new UnsafeByteArrayInputStream(payload));
+            String name = System.currentTimeMillis() + ".snapshot";
+            File snapshot = new File(storeConfig.getIndexPath(), name);
+            FileChannel fileChannel = FileUtils.newFileChannel(snapshot, "rw");
 
-        // 先写一个空的文件头
-        fileHeader.write(fileChannel);
+            IndexSnapshotFileHeader fileHeader = new IndexSnapshotFileHeader();
+            UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream();
+            try {
+                serializer.serialize(indexMap, os);
+                byte[] payload = os.toByteArray();
+                ReadableByteChannel src = Channels.newChannel(new UnsafeByteArrayInputStream(payload));
+
+                // 先写一个空的文件头
+                fileHeader.write(fileChannel);
+
+                // 写内容
+                fileChannel.transferFrom(src, fileHeader.getLength(), payload.length);
+            } finally {
+                os.close();
+            }
 
-        // 写内容
-        fileChannel.transferFrom(src, fileHeader.getLength(), payload.length);
-        fileChannel.force(true);
+            fileChannel.force(true);
 
-        // 写真实的文件头
-        fileHeader.setStoreTxLogRecordId(storeTxLogPosition.getRecordId());
-        fileHeader.write(fileChannel);
+            // 写真实的文件头
+            fileHeader.setStoreTxLogRecordId(storeTxLogPosition.getRecordId());
+            fileHeader.write(fileChannel);
 
-        // 删除多余的快照数目
-        deleteOverSnapshot();
+            // 删除多余的快照数目
+            deleteOverSnapshot();
 
-        LOGGER.info("snapshot index finished: [" + name + "]");
+            LOGGER.info("snapshot index finished: [" + name + "]");
 
-        lastStoreTxLogPosition = storeTxLogPosition;
+            lastStoreTxLogPosition = storeTxLogPosition;
+        } finally {
+            snapshoting.set(false);
+        }
     }
 
     /**

+ 8 - 6
lts-core/src/main/java/com/github/ltsopensource/kv/replay/TxLogReplay.java

@@ -1,5 +1,7 @@
 package com.github.ltsopensource.kv.replay;
 
+import com.github.ltsopensource.core.logger.Logger;
+import com.github.ltsopensource.core.support.SystemClock;
 import com.github.ltsopensource.kv.*;
 import com.github.ltsopensource.kv.cache.DataCache;
 import com.github.ltsopensource.kv.data.DataAppendResult;
@@ -10,7 +12,6 @@ import com.github.ltsopensource.kv.txlog.StoreTxLogCursorEntry;
 import com.github.ltsopensource.kv.txlog.StoreTxLogEngine;
 import com.github.ltsopensource.kv.txlog.StoreTxLogEntry;
 import com.github.ltsopensource.kv.txlog.StoreTxLogPosition;
-import com.github.ltsopensource.core.logger.Logger;
 
 /**
  * @author Robert HG (254963746@qq.com) on 12/19/15.
@@ -37,6 +38,7 @@ public class TxLogReplay<K, V> {
         Cursor<StoreTxLogCursorEntry<K, V>> cursor = storeTxLogEngine.cursor(startPosition);
 
         int count = 0;
+        long startTime = SystemClock.now();
 
         while (cursor.hasNext()) {
             StoreTxLogCursorEntry<K, V> storeTxLogCursorEntry = cursor.next();
@@ -55,8 +57,8 @@ public class TxLogReplay<K, V> {
                 DataAppendResult dataAppendResult = dataBlockEngine.append(position, key, value);
                 // 2. 写Index
                 index.putIndexItem(position, key, DBImpl.convertToIndex(key, dataAppendResult));
-                // 3. 写缓存
-                dataCache.put(key, value);
+//                 3. 写缓存
+//                dataCache.put(key, value);
 
             } else if (op == Operation.REMOVE) {
                 // 1. 移除Index
@@ -65,8 +67,8 @@ public class TxLogReplay<K, V> {
                     // 2. 移除Data
                     dataBlockEngine.remove(position, indexItem);
                 }
-                // 2. 移除缓存
-                dataCache.remove(key);
+//                // 2. 移除缓存
+//                dataCache.remove(key);
             } else {
                 throw new DBException("error op=" + op);
             }
@@ -74,7 +76,7 @@ public class TxLogReplay<K, V> {
             count++;
         }
 
-        LOGGER.info("replay txLog complete, txLog size:" + count);
+        LOGGER.info("replay txLog complete, txLog size:" + count + ", cost mills:" + (SystemClock.now() - startTime));
 
     }
 

+ 8 - 5
lts-core/src/main/java/com/github/ltsopensource/kv/txlog/StoreTxLogEngine.java

@@ -3,9 +3,9 @@ package com.github.ltsopensource.kv.txlog;
 import com.github.ltsopensource.core.commons.file.FileUtils;
 import com.github.ltsopensource.core.commons.io.UnsafeByteArrayInputStream;
 import com.github.ltsopensource.core.commons.io.UnsafeByteArrayOutputStream;
+import com.github.ltsopensource.core.json.TypeReference;
 import com.github.ltsopensource.kv.*;
 import com.github.ltsopensource.kv.serializer.StoreSerializer;
-import com.github.ltsopensource.core.json.TypeReference;
 
 import java.io.File;
 import java.io.FilenameFilter;
@@ -135,10 +135,13 @@ public class StoreTxLogEngine<K, V> {
         }
 
         UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream();
-        serializer.serialize(entry, out);
-
-        byte[] entryBytes = out.toByteArray();
-        return storeTxLog.append(entryBytes);
+        try {
+            serializer.serialize(entry, out);
+            byte[] entryBytes = out.toByteArray();
+            return storeTxLog.append(entryBytes);
+        } finally {
+            out.close();
+        }
     }
 
     public Cursor<StoreTxLogCursorEntry<K, V>> cursor(StoreTxLogPosition position) {

+ 3 - 0
lts-core/src/main/java/com/github/ltsopensource/kv/txlog/StoreTxLogFileHeader.java

@@ -35,6 +35,9 @@ public class StoreTxLogFileHeader extends AbstractFileHeader {
 
     @Override
     public void read(FileChannel fileChannel) throws IOException {
+        if (fileChannel.size() == 0) {
+            return;
+        }
         fileChannel.position(0);
         fileChannel.read(byteBuffer());
 

+ 4 - 4
lts-core/src/test/java/com/github/ltsopensource/core/failstore/ltsdb/DBTest.java

@@ -42,10 +42,10 @@ public class DBTest {
 
         long start = System.currentTimeMillis();
 
-        for (int i = 0; i < 1000000; i++) {
+        for (int i = 0; i < 100000; i++) {
             db.put("testKey" + i, "testvalue" + i);
         }
-        // 17458 待优化
+        // 4597 待优化
         System.out.println(System.currentTimeMillis() - start);
     }
 
@@ -65,10 +65,10 @@ public class DBTest {
 
         long start = System.currentTimeMillis();
 
-        for (int i = 0; i < 1000000; i++) {
+        for (int i = 0; i < 100000; i++) {
             db.put(("testKey" + i).getBytes("UTF-8"), ("testvalue" + i).getBytes("UTF-8"));
         }
-        //3856
+        //972
         System.out.println(System.currentTimeMillis() - start);
     }
 

+ 8 - 8
lts-jobclient/src/main/java/com/github/ltsopensource/jobclient/JobClient.java

@@ -242,22 +242,22 @@ public class JobClient<T extends JobClientNode, Context extends AppContext> exte
 
     public Response submitJob(List<Job> jobs) throws JobSubmitException {
         checkStart();
-        Response response = new Response();
+        final Response response = new Response();
         response.setSuccess(true);
         int size = jobs.size();
-        for (int i = 0; i <= size / BATCH_SIZE; i++) {
-            List<Job> subJobs = BatchUtils.getBatchList(i, BATCH_SIZE, jobs);
 
-            if (CollectionUtils.isNotEmpty(subJobs)) {
-                Response subResponse = protectSubmit(subJobs);
+        BatchUtils.batchExecute(size, BATCH_SIZE, jobs, new BatchUtils.Executor<Job>() {
+            @Override
+            public boolean execute(List<Job> list) {
+                Response subResponse = protectSubmit(list);
                 if (!subResponse.isSuccess()) {
                     response.setSuccess(false);
-                    response.addFailedJobs(subJobs);
+                    response.addFailedJobs(list);
                     response.setMsg(subResponse.getMsg());
                 }
+                return true;
             }
-        }
-
+        });
         return response;
     }