|
@@ -7,6 +7,7 @@ import com.google.common.collect.Maps;
|
|
|
import com.google.common.escape.Escaper;
|
|
|
import com.google.common.net.UrlEscapers;
|
|
|
|
|
|
+import com.ctrip.framework.apollo.Apollo;
|
|
|
import com.ctrip.framework.apollo.core.ConfigConsts;
|
|
|
import com.ctrip.framework.apollo.core.dto.ApolloConfig;
|
|
|
import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification;
|
|
@@ -14,6 +15,7 @@ import com.ctrip.framework.apollo.core.dto.ServiceDTO;
|
|
|
import com.ctrip.framework.apollo.core.schedule.ExponentialSchedulePolicy;
|
|
|
import com.ctrip.framework.apollo.core.schedule.SchedulePolicy;
|
|
|
import com.ctrip.framework.apollo.core.utils.ApolloThreadFactory;
|
|
|
+import com.ctrip.framework.apollo.exceptions.ApolloConfigException;
|
|
|
import com.ctrip.framework.apollo.util.ConfigUtil;
|
|
|
import com.ctrip.framework.apollo.util.ExceptionUtil;
|
|
|
import com.ctrip.framework.apollo.util.http.HttpRequest;
|
|
@@ -54,13 +56,19 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
|
|
|
private final ConfigUtil m_configUtil;
|
|
|
private volatile AtomicReference<ApolloConfig> m_configCache;
|
|
|
private final String m_namespace;
|
|
|
- private final ScheduledExecutorService m_executorService;
|
|
|
+ private final static ScheduledExecutorService m_executorService;
|
|
|
private final ExecutorService m_longPollingService;
|
|
|
private final AtomicBoolean m_longPollingStopped;
|
|
|
- private SchedulePolicy m_longPollSchedulePolicy;
|
|
|
+ private SchedulePolicy m_longPollFailSchedulePolicyInSecond;
|
|
|
+ private SchedulePolicy m_longPollSuccessSchedulePolicyInMS;
|
|
|
private AtomicReference<ServiceDTO> m_longPollServiceDto;
|
|
|
private AtomicReference<ApolloConfigNotification> m_longPollResult;
|
|
|
|
|
|
+ static {
|
|
|
+ m_executorService = Executors.newScheduledThreadPool(1,
|
|
|
+ ApolloThreadFactory.create("RemoteConfigRepository", true));
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Constructor.
|
|
|
*
|
|
@@ -76,13 +84,12 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
|
|
|
m_serviceLocator = m_container.lookup(ConfigServiceLocator.class);
|
|
|
} catch (ComponentLookupException ex) {
|
|
|
Cat.logError(ex);
|
|
|
- throw new IllegalStateException("Unable to load component!", ex);
|
|
|
+ throw new ApolloConfigException("Unable to load component!", ex);
|
|
|
}
|
|
|
- m_longPollSchedulePolicy = new ExponentialSchedulePolicy(1, 120);
|
|
|
+ m_longPollFailSchedulePolicyInSecond = new ExponentialSchedulePolicy(1, 120); //in second
|
|
|
+ m_longPollSuccessSchedulePolicyInMS = new ExponentialSchedulePolicy(100, 1000); //in millisecond
|
|
|
m_longPollingStopped = new AtomicBoolean(false);
|
|
|
- m_executorService = Executors.newScheduledThreadPool(1,
|
|
|
- ApolloThreadFactory.create("RemoteConfigRepository", true));
|
|
|
- m_longPollingService = Executors.newFixedThreadPool(2,
|
|
|
+ m_longPollingService = Executors.newSingleThreadExecutor(
|
|
|
ApolloThreadFactory.create("RemoteConfigRepository-LongPolling", true));
|
|
|
m_longPollServiceDto = new AtomicReference<>();
|
|
|
m_longPollResult = new AtomicReference<>();
|
|
@@ -100,8 +107,8 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void setFallback(ConfigRepository fallbackConfigRepository) {
|
|
|
- //remote config doesn't need fallback
|
|
|
+ public void setUpstreamRepository(ConfigRepository upstreamConfigRepository) {
|
|
|
+ //remote config doesn't need upstream
|
|
|
}
|
|
|
|
|
|
private void schedulePeriodicRefresh() {
|
|
@@ -111,12 +118,10 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
|
|
|
new Runnable() {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
+ Cat.logEvent("Apollo.ConfigService", String.format("periodicRefresh: %s", m_namespace));
|
|
|
logger.debug("refresh config for namespace: {}", m_namespace);
|
|
|
- Transaction transaction = Cat.newTransaction("Apollo.ConfigService", "periodicRefresh");
|
|
|
- boolean syncSuccess = trySync();
|
|
|
- String status = syncSuccess ? Message.SUCCESS : "-1";
|
|
|
- transaction.setStatus(status);
|
|
|
- transaction.complete();
|
|
|
+ trySync();
|
|
|
+ Cat.logEvent("Apollo.Client.Version", Apollo.VERSION);
|
|
|
}
|
|
|
}, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
|
|
|
m_configUtil.getRefreshTimeUnit());
|
|
@@ -124,19 +129,26 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
|
|
|
|
|
|
@Override
|
|
|
protected synchronized void sync() {
|
|
|
- ApolloConfig previous = m_configCache.get();
|
|
|
- ApolloConfig current = loadApolloConfig();
|
|
|
-
|
|
|
- //HTTP 304, nothing changed
|
|
|
- if (previous == current) {
|
|
|
- return;
|
|
|
- }
|
|
|
+ Transaction transaction = Cat.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
|
|
|
|
|
|
- logger.debug("Remote Config refreshed!");
|
|
|
-
|
|
|
- m_configCache.set(current);
|
|
|
+ try {
|
|
|
+ ApolloConfig previous = m_configCache.get();
|
|
|
+ ApolloConfig current = loadApolloConfig();
|
|
|
+
|
|
|
+ //reference equals means HTTP 304
|
|
|
+ if (previous != current) {
|
|
|
+ logger.debug("Remote Config refreshed!");
|
|
|
+ m_configCache.set(current);
|
|
|
+ this.fireRepositoryChange(m_namespace, this.getConfig());
|
|
|
+ }
|
|
|
|
|
|
- this.fireRepositoryChange(m_namespace, this.getConfig());
|
|
|
+ transaction.setStatus(Message.SUCCESS);
|
|
|
+ } catch (Throwable ex) {
|
|
|
+ transaction.setStatus(ex);
|
|
|
+ throw ex;
|
|
|
+ } finally {
|
|
|
+ transaction.complete();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private Properties transformApolloConfigToProperties(ApolloConfig apolloConfig) {
|
|
@@ -210,7 +222,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
|
|
|
String message = String.format(
|
|
|
"Load Apollo Config failed - appId: %s, cluster: %s, namespace: %s, services: %s",
|
|
|
appId, cluster, m_namespace, configServices);
|
|
|
- throw new RuntimeException(message, exception);
|
|
|
+ throw new ApolloConfigException(message, exception);
|
|
|
}
|
|
|
|
|
|
private String assembleQueryConfigUrl(String uri, String appId, String cluster, String namespace,
|
|
@@ -253,17 +265,17 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
|
|
|
m_longPollingService.submit(new Runnable() {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
- doLongPollingRefresh(appId, cluster, dataCenter, m_longPollingService);
|
|
|
+ doLongPollingRefresh(appId, cluster, dataCenter);
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- private void doLongPollingRefresh(String appId, String cluster, String dataCenter,
|
|
|
- ExecutorService longPollingService) {
|
|
|
+ private void doLongPollingRefresh(String appId, String cluster, String dataCenter) {
|
|
|
final Random random = new Random();
|
|
|
ServiceDTO lastServiceDto = null;
|
|
|
- Transaction transaction = null;
|
|
|
while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
|
|
|
+ Transaction transaction = Cat.newTransaction("Apollo.ConfigService", "pollNotification");
|
|
|
+ long sleepTime = 50; //default 50 ms
|
|
|
try {
|
|
|
if (lastServiceDto == null) {
|
|
|
List<ServiceDTO> configServices = getConfigServices();
|
|
@@ -279,7 +291,6 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
|
|
|
//longer timeout for read - 1 minute
|
|
|
request.setReadTimeout(60000);
|
|
|
|
|
|
- transaction = Cat.newTransaction("Apollo.ConfigService", "pollNotification");
|
|
|
transaction.addData("Url", url);
|
|
|
|
|
|
HttpResponse<ApolloConfigNotification> response =
|
|
@@ -292,35 +303,37 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
|
|
|
m_longPollResult.set(response.getBody());
|
|
|
transaction.addData("Result", response.getBody().toString());
|
|
|
}
|
|
|
- longPollingService.submit(new Runnable() {
|
|
|
+ m_executorService.submit(new Runnable() {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
trySync();
|
|
|
}
|
|
|
});
|
|
|
+ m_longPollSuccessSchedulePolicyInMS.success();
|
|
|
}
|
|
|
- m_longPollSchedulePolicy.success();
|
|
|
+
|
|
|
+ if (response.getStatusCode() == 304) {
|
|
|
+ sleepTime = m_longPollSuccessSchedulePolicyInMS.fail();
|
|
|
+ }
|
|
|
+ m_longPollFailSchedulePolicyInSecond.success();
|
|
|
transaction.addData("StatusCode", response.getStatusCode());
|
|
|
transaction.setStatus(Message.SUCCESS);
|
|
|
} catch (Throwable ex) {
|
|
|
lastServiceDto = null;
|
|
|
Cat.logError(ex);
|
|
|
- if (transaction != null) {
|
|
|
- transaction.setStatus(ex);
|
|
|
- }
|
|
|
- long sleepTime = m_longPollSchedulePolicy.fail();
|
|
|
+ transaction.setStatus(ex);
|
|
|
+ long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();
|
|
|
logger.warn(
|
|
|
"Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespace: {}, reason: {}",
|
|
|
- sleepTime, appId, cluster, m_namespace, ExceptionUtil.getDetailMessage(ex));
|
|
|
+ sleepTimeInSecond, appId, cluster, m_namespace, ExceptionUtil.getDetailMessage(ex));
|
|
|
+ sleepTime = sleepTimeInSecond * 1000;
|
|
|
+ } finally {
|
|
|
+ transaction.complete();
|
|
|
try {
|
|
|
- TimeUnit.SECONDS.sleep(sleepTime);
|
|
|
+ TimeUnit.MILLISECONDS.sleep(sleepTime);
|
|
|
} catch (InterruptedException ie) {
|
|
|
//ignore
|
|
|
}
|
|
|
- } finally {
|
|
|
- if (transaction != null) {
|
|
|
- transaction.complete();
|
|
|
- }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -364,7 +377,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
|
|
|
private List<ServiceDTO> getConfigServices() {
|
|
|
List<ServiceDTO> services = m_serviceLocator.getConfigServices();
|
|
|
if (services.size() == 0) {
|
|
|
- throw new RuntimeException("No available config service");
|
|
|
+ throw new ApolloConfigException("No available config service");
|
|
|
}
|
|
|
|
|
|
return services;
|