Explorar o código

fix the issue that release messages might be missed in certain scenarios

Jason Song %!s(int64=3) %!d(string=hai) anos
pai
achega
f9c69dbb18

+ 1 - 0
CHANGES.md

@@ -59,6 +59,7 @@ Apollo 1.9.0
 * [support release apollo-client-config-data](https://github.com/ctripcorp/apollo/pull/3822)
 * [Reduce bootstrap time in the situation with large properties](https://github.com/ctripcorp/apollo/pull/3816)
 * [docs: English catalog in sidebar](https://github.com/ctripcorp/apollo/pull/3831)
+* [fix the issue that release messages might be missed in certain scenarios](https://github.com/ctripcorp/apollo/pull/3819)
 ------------------
 All issues and pull requests are [here](https://github.com/ctripcorp/apollo/milestone/6?closed=1)
 

+ 54 - 4
apollo-biz/src/main/java/com/ctrip/framework/apollo/biz/message/ReleaseMessageScanner.java

@@ -16,7 +16,12 @@
  */
 package com.ctrip.framework.apollo.biz.message;
 
+import com.google.common.collect.Maps;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -40,19 +45,22 @@ import com.google.common.collect.Lists;
  */
 public class ReleaseMessageScanner implements InitializingBean {
   private static final Logger logger = LoggerFactory.getLogger(ReleaseMessageScanner.class);
+  private static final int missingReleaseMessageMaxAge = 10; // hardcoded to 10, could be configured via BizConfig if necessary
   @Autowired
   private BizConfig bizConfig;
   @Autowired
   private ReleaseMessageRepository releaseMessageRepository;
   private int databaseScanInterval;
-  private List<ReleaseMessageListener> listeners;
-  private ScheduledExecutorService executorService;
+  private final List<ReleaseMessageListener> listeners;
+  private final ScheduledExecutorService executorService;
+  private final Map<Long, Integer> missingReleaseMessages; // missing release message id => age counter
   private long maxIdScanned;
 
   public ReleaseMessageScanner() {
     listeners = Lists.newCopyOnWriteArrayList();
     executorService = Executors.newScheduledThreadPool(1, ApolloThreadFactory
         .create("ReleaseMessageScanner", true));
+    missingReleaseMessages = Maps.newHashMap();
   }
 
   @Override
@@ -62,6 +70,7 @@ public class ReleaseMessageScanner implements InitializingBean {
     executorService.scheduleWithFixedDelay(() -> {
       Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");
       try {
+        scanMissingMessages();
         scanMessages();
         transaction.setStatus(Transaction.SUCCESS);
       } catch (Throwable ex) {
@@ -108,10 +117,51 @@ public class ReleaseMessageScanner implements InitializingBean {
     }
     fireMessageScanned(releaseMessages);
     int messageScanned = releaseMessages.size();
-    maxIdScanned = releaseMessages.get(messageScanned - 1).getId();
+    long newMaxIdScanned = releaseMessages.get(messageScanned - 1).getId();
+    // check id gaps, possible reasons are release message not committed yet or already rolled back
+    if (newMaxIdScanned - maxIdScanned > messageScanned) {
+      recordMissingReleaseMessageIds(releaseMessages, maxIdScanned);
+    }
+    maxIdScanned = newMaxIdScanned;
     return messageScanned == 500;
   }
 
+  private void scanMissingMessages() {
+    Set<Long> missingReleaseMessageIds = missingReleaseMessages.keySet();
+    Iterable<ReleaseMessage> releaseMessages = releaseMessageRepository
+        .findAllById(missingReleaseMessageIds);
+    fireMessageScanned(releaseMessages);
+    releaseMessages.forEach(releaseMessage -> {
+      missingReleaseMessageIds.remove(releaseMessage.getId());
+    });
+    growAndCleanMissingMessages();
+  }
+
+  private void growAndCleanMissingMessages() {
+    Iterator<Entry<Long, Integer>> iterator = missingReleaseMessages.entrySet()
+        .iterator();
+    while (iterator.hasNext()) {
+      Entry<Long, Integer> entry = iterator.next();
+      if (entry.getValue() > missingReleaseMessageMaxAge) {
+        iterator.remove();
+      } else {
+        entry.setValue(entry.getValue() + 1);
+      }
+    }
+  }
+
+  private void recordMissingReleaseMessageIds(List<ReleaseMessage> messages, long startId) {
+    for (ReleaseMessage message : messages) {
+      long currentId = message.getId();
+      if (currentId - startId > 1) {
+        for (long i = startId + 1; i < currentId; i++) {
+          missingReleaseMessages.putIfAbsent(i, 1);
+        }
+      }
+      startId = currentId;
+    }
+  }
+
   /**
    * find largest message id as the current start point
    * @return current largest message id
@@ -125,7 +175,7 @@ public class ReleaseMessageScanner implements InitializingBean {
    * Notify listeners with messages loaded
    * @param messages
    */
-  private void fireMessageScanned(List<ReleaseMessage> messages) {
+  private void fireMessageScanned(Iterable<ReleaseMessage> messages) {
     for (ReleaseMessage message : messages) {
       for (ReleaseMessageListener listener : listeners) {
         try {

+ 88 - 0
apollo-biz/src/test/java/com/ctrip/framework/apollo/biz/message/ReleaseMessageScannerTest.java

@@ -18,12 +18,15 @@ package com.ctrip.framework.apollo.biz.message;
 
 import com.ctrip.framework.apollo.biz.config.BizConfig;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.SettableFuture;
 
 import com.ctrip.framework.apollo.biz.AbstractUnitTest;
 import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
 import com.ctrip.framework.apollo.biz.repository.ReleaseMessageRepository;
 
+import java.util.ArrayList;
+import org.awaitility.Awaitility;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -31,7 +34,9 @@ import org.springframework.test.util.ReflectionTestUtils;
 
 import java.util.concurrent.TimeUnit;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
 import static org.mockito.Mockito.when;
 
 /**
@@ -54,6 +59,10 @@ public class ReleaseMessageScannerTest extends AbstractUnitTest {
     databaseScanInterval = 100; //100 ms
     when(bizConfig.releaseMessageScanIntervalInMilli()).thenReturn(databaseScanInterval);
     releaseMessageScanner.afterPropertiesSet();
+
+    Awaitility.reset();
+    Awaitility.setDefaultTimeout(databaseScanInterval * 5, TimeUnit.MILLISECONDS);
+    Awaitility.setDefaultPollInterval(databaseScanInterval, TimeUnit.MILLISECONDS);
   }
 
   @Test
@@ -91,7 +100,86 @@ public class ReleaseMessageScannerTest extends AbstractUnitTest {
 
     assertEquals(anotherMessage, anotherListenerMessage.getMessage());
     assertEquals(anotherId, anotherListenerMessage.getId());
+  }
+
+  @Test
+  public void testScanMessageWithGapAndNotifyMessageListener() throws Exception {
+    String someMessage = "someMessage";
+    long someId = 1;
+    ReleaseMessage someReleaseMessage = assembleReleaseMessage(someId, someMessage);
+
+    String someMissingMessage = "someMissingMessage";
+    long someMissingId = 2;
+    ReleaseMessage someMissingReleaseMessage = assembleReleaseMessage(someMissingId, someMissingMessage);
+
+    String anotherMessage = "anotherMessage";
+    long anotherId = 3;
+    ReleaseMessage anotherReleaseMessage = assembleReleaseMessage(anotherId, anotherMessage);
+
+    String anotherMissingMessage = "anotherMissingMessage";
+    long anotherMissingId = 4;
+    ReleaseMessage anotherMissingReleaseMessage = assembleReleaseMessage(anotherMissingId, anotherMissingMessage);
+
+    long someRolledBackId = 5;
+
+    String yetAnotherMessage = "yetAnotherMessage";
+    long yetAnotherId = 6;
+    ReleaseMessage yetAnotherReleaseMessage = assembleReleaseMessage(yetAnotherId, yetAnotherMessage);
+
+    ArrayList<ReleaseMessage> receivedMessage = Lists.newArrayList();
+    SettableFuture<ReleaseMessage> someListenerFuture = SettableFuture.create();
+    ReleaseMessageListener someListener = (message, channel) -> receivedMessage.add(message);
+    releaseMessageScanner.addMessageListener(someListener);
+
+    when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(0L)).thenReturn(
+        Lists.newArrayList(someReleaseMessage));
+
+    await().untilAsserted(() -> {
+      assertEquals(1, receivedMessage.size());
+      assertSame(someReleaseMessage, receivedMessage.get(0));
+    });
+
+    when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(someId)).thenReturn(
+        Lists.newArrayList(anotherReleaseMessage));
 
+    await().untilAsserted(() -> {
+      assertEquals(2, receivedMessage.size());
+      assertSame(someReleaseMessage, receivedMessage.get(0));
+      assertSame(anotherReleaseMessage, receivedMessage.get(1));
+    });
+
+    when(releaseMessageRepository.findAllById(Sets.newHashSet(someMissingId)))
+        .thenReturn(Lists.newArrayList(someMissingReleaseMessage));
+
+    await().untilAsserted(() -> {
+      assertEquals(3, receivedMessage.size());
+      assertSame(someReleaseMessage, receivedMessage.get(0));
+      assertSame(anotherReleaseMessage, receivedMessage.get(1));
+      assertSame(someMissingReleaseMessage, receivedMessage.get(2));
+    });
+
+    when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(anotherId)).thenReturn(
+        Lists.newArrayList(yetAnotherReleaseMessage));
+
+    await().untilAsserted(() -> {
+      assertEquals(4, receivedMessage.size());
+      assertSame(someReleaseMessage, receivedMessage.get(0));
+      assertSame(anotherReleaseMessage, receivedMessage.get(1));
+      assertSame(someMissingReleaseMessage, receivedMessage.get(2));
+      assertSame(yetAnotherReleaseMessage, receivedMessage.get(3));
+    });
+
+    when(releaseMessageRepository.findAllById(Sets.newHashSet(anotherMissingId, someRolledBackId)))
+        .thenReturn(Lists.newArrayList(anotherMissingReleaseMessage));
+
+    await().untilAsserted(() -> {
+      assertEquals(5, receivedMessage.size());
+      assertSame(someReleaseMessage, receivedMessage.get(0));
+      assertSame(anotherReleaseMessage, receivedMessage.get(1));
+      assertSame(someMissingReleaseMessage, receivedMessage.get(2));
+      assertSame(yetAnotherReleaseMessage, receivedMessage.get(3));
+      assertSame(anotherMissingReleaseMessage, receivedMessage.get(4));
+    });
   }
 
   private ReleaseMessage assembleReleaseMessage(long id, String message) {