Add retries for coordinator fetch and lookup start in LookupReferencesManager (#5029)

* Add retries for coordinator fetch and lookup start in LookupReferencesManager

* Fix LookupConfigTest

* Address comments

* Address more comments

* And address more comments

* Address comms

* Recognize 'not found' lookups in LookupReferencesManager.tryGetLookupListFromCoordinator(), by @egor-ryashin
This commit is contained in:
Roman Leventov 2017-11-09 02:30:36 -03:00 committed by GitHub
parent e6ec4310b1
commit a8dc056c09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 235 additions and 124 deletions

View File

@ -322,9 +322,11 @@ It is possible to save the configuration across restarts such that a node will n
|Property|Description|Default|
|--------|-----------|-------|
|`druid.lookup.snapshotWorkingDir`| Working path used to store snapshot of current lookup configuration, leaving this property null will disable snapshot/bootstrap utility|null|
|`druid.lookup.numLookupLoadingThreads`| Number of threads for loading the lookups in parallel on startup. This thread pool is destroyed once startup is done. It is not kept during the lifetime of the JVM|Available Processors / 2|
|`druid.lookup.snapshotWorkingDir`|Working path used to store snapshot of current lookup configuration, leaving this property null will disable snapshot/bootstrap utility|null|
|`druid.lookup.enableLookupSyncOnStartup`|Enable the lookup synchronization process with coordinator on startup. The queryable nodes will fetch and load the lookups from the coordinator instead of waiting for the coordinator to load the lookups for them. Users may opt to disable this option if there are no lookups configured in the cluster.|true|
|`druid.lookup.numLookupLoadingThreads`|Number of threads for loading the lookups in parallel on startup. This thread pool is destroyed once startup is done. It is not kept during the lifetime of the JVM|Available Processors / 2|
|`druid.lookup.coordinatorFetchRetries`|How many times to retry to fetch the lookup bean list from coordinator, during the sync on startup.|3|
|`druid.lookup.lookupStartRetries`|How many times to retry to start each lookup, either during the sync on startup, or during the runtime.|3|
## Introspect a Lookup

View File

@ -23,6 +23,9 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Strings;
import javax.validation.constraints.Min;
import java.util.Objects;
public class LookupConfig
{
@ -32,13 +35,21 @@ public class LookupConfig
@JsonProperty("enableLookupSyncOnStartup")
private boolean enableLookupSyncOnStartup = true;
@Min(1)
@JsonProperty("numLookupLoadingThreads")
private int numLookupLoadingThreads = Runtime.getRuntime().availableProcessors() / 2;
@Min(1)
@JsonProperty("coordinatorFetchRetries")
private int coordinatorFetchRetries = 3;
@Min(1)
@JsonProperty("lookupStartRetries")
private int lookupStartRetries = 3;
/**
* @param snapshotWorkingDir working directory to store lookups snapshot file, passing null or empty string will disable the snapshot utility
* @param numLookupLoadingThreads number of threads for loading the lookups as part of the synchronization process
* @param enableLookupSyncOnStartup decides whether the lookup synchronization process should be enabled at startup
* @param snapshotWorkingDir working directory to store lookups snapshot file, passing null or empty string will
* disable the snapshot utility
*/
@JsonCreator
public LookupConfig(
@ -63,6 +74,16 @@ public class LookupConfig
return enableLookupSyncOnStartup;
}
public int getCoordinatorFetchRetries()
{
return coordinatorFetchRetries;
}
public int getLookupStartRetries()
{
return lookupStartRetries;
}
@Override
public boolean equals(Object o)
{
@ -75,19 +96,34 @@ public class LookupConfig
LookupConfig that = (LookupConfig) o;
return snapshotWorkingDir.equals(that.snapshotWorkingDir) &&
return Objects.equals(snapshotWorkingDir, that.snapshotWorkingDir) &&
enableLookupSyncOnStartup == that.enableLookupSyncOnStartup &&
numLookupLoadingThreads == that.numLookupLoadingThreads;
numLookupLoadingThreads == that.numLookupLoadingThreads &&
coordinatorFetchRetries == that.coordinatorFetchRetries &&
lookupStartRetries == that.lookupStartRetries;
}
@Override
public int hashCode()
{
return Objects.hash(
snapshotWorkingDir,
enableLookupSyncOnStartup,
numLookupLoadingThreads,
coordinatorFetchRetries,
lookupStartRetries
);
}
@Override
public String toString()
{
return "LookupConfig{" +
"snapshotWorkingDir='" + getSnapshotWorkingDir() + '\'' +
" numLookupLoadingThreads='" + getNumLookupLoadingThreads() + '\'' +
" enableLookupSyncOnStartup='" + getEnableLookupSyncOnStartup() + '\'' +
"snapshotWorkingDir='" + snapshotWorkingDir + '\'' +
", enableLookupSyncOnStartup=" + enableLookupSyncOnStartup +
", numLookupLoadingThreads=" + numLookupLoadingThreads +
", coordinatorFetchRetries=" + coordinatorFetchRetries +
", lookupStartRetries=" + lookupStartRetries +
'}';
}
}

View File

@ -51,7 +51,9 @@ public class LookupConfigTest
String json = "{\n"
+ " \"enableLookupSyncOnStartup\": false,\n"
+ " \"snapshotWorkingDir\": \"/tmp\",\n"
+ " \"numLookupLoadingThreads\": 4 \n"
+ " \"numLookupLoadingThreads\": 4,\n"
+ " \"coordinatorFetchRetries\": 4,\n"
+ " \"lookupStartRetries\": 4 \n"
+ "}\n";
LookupConfig config = mapper.readValue(
mapper.writeValueAsString(
@ -63,5 +65,7 @@ public class LookupConfigTest
Assert.assertEquals("/tmp", config.getSnapshotWorkingDir());
Assert.assertEquals(false, config.getEnableLookupSyncOnStartup());
Assert.assertEquals(4, config.getNumLookupLoadingThreads());
Assert.assertEquals(4, config.getCoordinatorFetchRetries());
Assert.assertEquals(4, config.getLookupStartRetries());
}
}

View File

@ -35,17 +35,20 @@ import io.druid.concurrent.LifecycleLock;
import io.druid.discovery.DruidLeaderClient;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Json;
import io.druid.java.util.common.IOE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.RE;
import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import javax.annotation.Nullable;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
@ -53,6 +56,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
@ -61,6 +65,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* This class provide a basic {@link LookupExtractorFactory} references manager.
@ -258,7 +263,7 @@ public class LookupReferencesManager
public void add(String lookupName, LookupExtractorFactoryContainer lookupExtractorFactoryContainer)
{
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
addNotice(new LoadNotice(lookupName, lookupExtractorFactoryContainer));
addNotice(new LoadNotice(lookupName, lookupExtractorFactoryContainer, lookupConfig.getLookupStartRetries()));
}
public void remove(String lookupName)
@ -341,7 +346,7 @@ public class LookupReferencesManager
private void loadAllLookupsAndInitStateRef()
{
List<LookupBean> lookupBeanList = getLookupsListFromLookupConfig();
List<LookupBean> lookupBeanList = getLookupsList();
if (lookupBeanList != null) {
startLookups(lookupBeanList);
} else {
@ -350,6 +355,26 @@ public class LookupReferencesManager
}
}
/**
* Gets the lookup list from coordinator or from snapshot.
*/
@Nullable
private List<LookupBean> getLookupsList()
{
List<LookupBean> lookupBeanList;
if (lookupConfig.getEnableLookupSyncOnStartup()) {
String tier = lookupListeningAnnouncerConfig.getLookupTier();
lookupBeanList = getLookupListFromCoordinator(tier);
if (lookupBeanList == null) {
LOG.info("Coordinator is unavailable. Loading saved snapshot instead");
lookupBeanList = getLookupListFromSnapshot();
}
} else {
lookupBeanList = getLookupListFromSnapshot();
}
return lookupBeanList;
}
/**
* Returns a list of lookups from the coordinator if the coordinator is available. If it's not available, returns null.
*
@ -361,32 +386,29 @@ public class LookupReferencesManager
private List<LookupBean> getLookupListFromCoordinator(String tier)
{
try {
final FullResponseHolder response = fetchLookupsForTier(tier);
List<LookupBean> lookupBeanList = new ArrayList<>();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
LOG.error(
"Error while fetching lookup code from Coordinator with status[%s] and content[%s]",
response.getStatus(),
response.getContent()
);
return null;
}
// Older version of getSpecificTier returns a list of lookup names.
// Lookup loading is performed via snapshot if older version is present.
// This check is only for backward compatibility and should be removed in a future release
if (response.getContent().startsWith("[")) {
LOG.info("Failed to retrieve lookup information from coordinator. Attempting to load lookups using snapshot instead");
return null;
} else {
Map<String, LookupExtractorFactoryContainer> lookupMap = jsonMapper.readValue(
response.getContent(),
LOOKUPS_ALL_REFERENCE
);
MutableBoolean firstAttempt = new MutableBoolean(true);
Map<String, LookupExtractorFactoryContainer> lookupMap = RetryUtils.retry(
() -> {
if (firstAttempt.isTrue()) {
firstAttempt.setValue(false);
} else {
// Adding an extra minute in addition to the retry wait. In RetryUtils, retry wait starts from a few
// seconds, that is likely not enough to coordinator to be back to healthy state, e. g. if it experiences
// 30-second GC pause.
Thread.sleep(60_000);
}
return tryGetLookupListFromCoordinator(tier);
},
e -> true,
lookupConfig.getCoordinatorFetchRetries()
);
if (lookupMap != null) {
List<LookupBean> lookupBeanList = new ArrayList<>();
lookupMap.forEach((k, v) -> lookupBeanList.add(new LookupBean(k, null, v)));
return lookupBeanList;
} else {
return null;
}
return lookupBeanList;
}
catch (Exception e) {
LOG.error(e, "Error while trying to get lookup list from coordinator for tier[%s]", tier);
@ -394,8 +416,39 @@ public class LookupReferencesManager
}
}
@Nullable
private Map<String, LookupExtractorFactoryContainer> tryGetLookupListFromCoordinator(String tier) throws Exception
{
final FullResponseHolder response = fetchLookupsForTier(tier);
if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
LOG.warn("No lookups found for tier [%s], response [%s]", tier, response);
return null;
} else if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new IOE(
"Error while fetching lookup code from Coordinator with status[%s] and content[%s]",
response.getStatus(),
response.getContent()
);
}
// Older version of getSpecificTier returns a list of lookup names.
// Lookup loading is performed via snapshot if older version is present.
// This check is only for backward compatibility and should be removed in a future release
if (response.getContent().startsWith("[")) {
LOG.info(
"Failed to retrieve lookup information from coordinator, " +
"because coordinator appears to be running on older Druid version. " +
"Attempting to load lookups using snapshot instead"
);
return null;
} else {
return jsonMapper.readValue(response.getContent(), LOOKUPS_ALL_REFERENCE);
}
}
/**
* Returns a list of lookups from the snapshot if the lookupsnapshottaker is configured. If it's not available, returns null.
* Returns a list of lookups from the snapshot if the lookupsnapshottaker is configured. If it's not available,
* returns null.
*
* @return list of LookupBean objects, or null
*/
@ -417,89 +470,96 @@ public class LookupReferencesManager
return lookups;
}
private List<LookupBean> getLookupsListFromLookupConfig()
{
List<LookupBean> lookupBeanList;
if (lookupConfig.getEnableLookupSyncOnStartup()) {
String tier = lookupListeningAnnouncerConfig.getLookupTier();
lookupBeanList = getLookupListFromCoordinator(tier);
if (lookupBeanList == null) {
LOG.info("Coordinator is unavailable. Loading saved snapshot instead");
lookupBeanList = getLookupListFromSnapshot();
}
} else {
lookupBeanList = getLookupListFromSnapshot();
}
return lookupBeanList;
}
private void startLookups(List<LookupBean> lookupBeanList)
private void startLookups(final List<LookupBean> lookupBeanList)
{
ImmutableMap.Builder<String, LookupExtractorFactoryContainer> builder = ImmutableMap.builder();
ExecutorService executorService = Execs.multiThreaded(
lookupConfig.getNumLookupLoadingThreads(),
"LookupReferencesManager-Startup-%s"
);
ExecutorCompletionService completionService = new ExecutorCompletionService(executorService);
List<Future<Map.Entry>> futures = new ArrayList<>();
CompletionService<Map.Entry<String, LookupExtractorFactoryContainer>> completionService =
new ExecutorCompletionService<>(executorService);
try {
LOG.info("Starting lookup loading process");
for (LookupBean lookupBean : lookupBeanList) {
futures.add(
completionService.submit(
() -> {
LookupExtractorFactoryContainer container = lookupBean.getContainer();
LOG.info(
"Starting lookup [%s]:[%s]",
lookupBean.getName(),
container
);
if (container.getLookupExtractorFactory().start()) {
LOG.info(
"Started lookup [%s]:[%s]",
lookupBean.getName(),
container
);
return new AbstractMap.SimpleImmutableEntry<>(lookupBean.getName(), container);
} else {
LOG.error(
"Failed to start lookup [%s]:[%s]",
lookupBean.getName(),
container
);
return null;
}
}
)
);
}
for (int i = 0; i < futures.size(); i++) {
try {
final Future<AbstractMap.SimpleImmutableEntry<String, LookupExtractorFactoryContainer>> completedFuture = completionService
.take();
final AbstractMap.SimpleImmutableEntry<String, LookupExtractorFactoryContainer> lookupResult = completedFuture
.get();
if (lookupResult != null) {
builder.put(lookupResult);
}
}
catch (ExecutionException e) {
LOG.error(e, "Execution error during lookup loading.");
List<LookupBean> remainingLookups = lookupBeanList;
for (int i = 0; i < lookupConfig.getLookupStartRetries(); i++) {
LOG.info("Round of attempts #%d, [%d] lookups", i + 1, remainingLookups.size());
Map<String, LookupExtractorFactoryContainer> successfulLookups =
startLookups(remainingLookups, completionService);
builder.putAll(successfulLookups);
List<LookupBean> failedLookups = remainingLookups
.stream()
.filter(l -> !successfulLookups.containsKey(l.getName()))
.collect(Collectors.toList());
if (failedLookups.isEmpty()) {
break;
} else {
// next round
remainingLookups = failedLookups;
}
}
LOG.info(
"Failed to start the following lookups after [%d] attempts: [%s]",
lookupConfig.getLookupStartRetries(),
remainingLookups
);
stateRef.set(new LookupUpdateState(builder.build(), ImmutableList.of(), ImmutableList.of()));
}
catch (Exception e) {
catch (InterruptedException | RuntimeException e) {
LOG.error(e, "Failed to finish lookup load process.");
for (Future future : futures) {
future.cancel(true);
}
}
finally {
executorService.shutdownNow();
}
}
/**
* @return a map with successful lookups
*/
private Map<String, LookupExtractorFactoryContainer> startLookups(
List<LookupBean> lookupBeans,
CompletionService<Map.Entry<String, LookupExtractorFactoryContainer>> completionService
) throws InterruptedException
{
for (LookupBean lookupBean : lookupBeans) {
completionService.submit(() -> startLookup(lookupBean));
}
Map<String, LookupExtractorFactoryContainer> successfulLookups = new HashMap<>();
for (int i = 0; i < lookupBeans.size(); i++) {
Future<Map.Entry<String, LookupExtractorFactoryContainer>> completedFuture = completionService.take();
try {
Map.Entry<String, LookupExtractorFactoryContainer> lookupResult = completedFuture.get();
if (lookupResult != null) {
successfulLookups.put(lookupResult.getKey(), lookupResult.getValue());
}
}
catch (ExecutionException e) {
LOG.error(e.getCause(), "Exception while starting a lookup");
// not adding to successfulLookups
}
}
return successfulLookups;
}
@Nullable
private Map.Entry<String, LookupExtractorFactoryContainer> startLookup(LookupBean lookupBean)
{
LookupExtractorFactoryContainer container = lookupBean.getContainer();
LOG.info("Starting lookup [%s]:[%s]", lookupBean.getName(), container);
try {
if (container.getLookupExtractorFactory().start()) {
LOG.info("Started lookup [%s]:[%s]", lookupBean.getName(), container);
return new AbstractMap.SimpleImmutableEntry<>(lookupBean.getName(), container);
} else {
LOG.error("Failed to start lookup [%s]:[%s]", lookupBean.getName(), container);
return null;
}
}
catch (RuntimeException e) {
throw new RE(e, "Failed to start lookup [%s]:[%s]", lookupBean.getName(), container);
}
}
private LookupUpdateState atomicallyUpdateStateRef(Function<LookupUpdateState, LookupUpdateState> fn)
{
while (true) {
@ -512,37 +572,37 @@ public class LookupReferencesManager
}
private FullResponseHolder fetchLookupsForTier(String tier)
throws ExecutionException, InterruptedException, MalformedURLException, IOException
throws ExecutionException, InterruptedException, IOException
{
return druidLeaderClient.go(
druidLeaderClient.makeRequest(
HttpMethod.GET,
StringUtils.format(
"/druid/coordinator/v1/lookups/%s?detailed=true",
tier
)
));
StringUtils.format("/druid/coordinator/v1/lookups/%s?detailed=true", tier)
)
);
}
@VisibleForTesting
interface Notice
{
void handle(Map<String, LookupExtractorFactoryContainer> lookupMap);
void handle(Map<String, LookupExtractorFactoryContainer> lookupMap) throws Exception;
}
private static class LoadNotice implements Notice
{
private final String lookupName;
private final LookupExtractorFactoryContainer lookupExtractorFactoryContainer;
private final int startRetries;
public LoadNotice(String lookupName, LookupExtractorFactoryContainer lookupExtractorFactoryContainer)
LoadNotice(String lookupName, LookupExtractorFactoryContainer lookupExtractorFactoryContainer, int startRetries)
{
this.lookupName = lookupName;
this.lookupExtractorFactoryContainer = lookupExtractorFactoryContainer;
this.startRetries = startRetries;
}
@Override
public void handle(Map<String, LookupExtractorFactoryContainer> lookupMap)
public void handle(Map<String, LookupExtractorFactoryContainer> lookupMap) throws Exception
{
LookupExtractorFactoryContainer old = lookupMap.get(lookupName);
if (old != null && !lookupExtractorFactoryContainer.replaces(old)) {
@ -554,13 +614,20 @@ public class LookupReferencesManager
return;
}
if (!lookupExtractorFactoryContainer.getLookupExtractorFactory().start()) {
throw new ISE(
"start method returned false for lookup [%s]:[%s]",
lookupName,
lookupExtractorFactoryContainer
);
}
RetryUtils.retry(
() -> {
if (!lookupExtractorFactoryContainer.getLookupExtractorFactory().start()) {
throw new ISE(
"start method returned false for lookup [%s]:[%s]",
lookupName,
lookupExtractorFactoryContainer
);
}
return null;
},
e -> true,
startRetries
);
old = lookupMap.put(lookupName, lookupExtractorFactoryContainer);
@ -587,7 +654,7 @@ public class LookupReferencesManager
{
private final String lookupName;
public DropNotice(String lookupName)
DropNotice(String lookupName)
{
this.lookupName = lookupName;
}

View File

@ -556,13 +556,14 @@ public class LookupReferencesManagerTest
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
replay(config);
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true"))
.andReturn(request);
.andReturn(request)
.anyTimes();
FullResponseHolder responseHolder = new FullResponseHolder(
HttpResponseStatus.NOT_FOUND,
EasyMock.createNiceMock(HttpResponse.class),
new StringBuilder().append(strResult)
);
expect(druidLeaderClient.go(request)).andThrow(new IllegalStateException());
expect(druidLeaderClient.go(request)).andThrow(new IllegalStateException()).anyTimes();
replay(druidLeaderClient);
lookupReferencesManager.start();
@ -579,8 +580,9 @@ public class LookupReferencesManagerTest
expect(config.getLookupTier()).andReturn(LOOKUP_TIER);
replay(config);
expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/lookupTier?detailed=true"))
.andReturn(request);
expect(druidLeaderClient.go(request)).andThrow(new IllegalStateException());
.andReturn(request)
.anyTimes();
expect(druidLeaderClient.go(request)).andThrow(new IllegalStateException()).anyTimes();
replay(druidLeaderClient);
lookupReferencesManager.start();
Assert.assertEquals(container, lookupReferencesManager.get("testMockForLoadLookupOnCoordinatorFailure"));