SOLR-7503: Recovery after ZK session expiration should happen in parallel for all cores using the thread-pool managed by ZkContainer instead of a single thread.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1679607 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy Potter 2015-05-15 17:17:46 +00:00
parent ccaadb1257
commit 5e447065a2
4 changed files with 40 additions and 4 deletions

View File

@ -379,6 +379,10 @@ Other Changes
ZkCmdExecutor#ensureExists as they were doing the same thing. Also ZkCmdExecutor#ensureExists now respects the
CreateMode passed to it. (Varun Thacker)
* SOLR-7503: Recovery after ZK session expiration should happen in parallel for all cores
using the thread-pool managed by ZkContainer instead of a single thread.
(Jessica Cheng Mallet, Timothy Potter, shalin, Mark Miller)
================== 5.1.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release

View File

@ -40,6 +40,8 @@ import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -207,6 +209,25 @@ public final class ZkController {
// keeps track of a list of objects that need to know a new ZooKeeper session was created after expiration occurred
private List<OnReconnect> reconnectListeners = new ArrayList<OnReconnect>();
private class RegisterCoreAsync implements Callable {
CoreDescriptor descriptor;
boolean recoverReloadedCores;
boolean afterExpiration;
RegisterCoreAsync(CoreDescriptor descriptor, boolean recoverReloadedCores, boolean afterExpiration) {
this.descriptor = descriptor;
this.recoverReloadedCores = recoverReloadedCores;
this.afterExpiration = afterExpiration;
}
public Object call() throws Exception {
log.info("Registering core {} afterExpiration? {}", descriptor.getName(), afterExpiration);
register(descriptor.getName(), descriptor, recoverReloadedCores, afterExpiration);
return descriptor;
}
}
public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientConnectTimeout, CloudConfig cloudConfig, final CurrentCoreDescriptorProvider registerOnReconnect)
throws InterruptedException, TimeoutException, IOException {
@ -293,10 +314,10 @@ public final class ZkController {
// we have to register as live first to pick up docs in the buffer
createEphemeralLiveNode();
List<CoreDescriptor> descriptors = registerOnReconnect
.getCurrentDescriptors();
List<CoreDescriptor> descriptors = registerOnReconnect.getCurrentDescriptors();
// re register all descriptors
if (descriptors != null) {
ExecutorService executorService = (cc != null) ? cc.getCoreZkRegisterExecutorService() : null;
for (CoreDescriptor descriptor : descriptors) {
// TODO: we need to think carefully about what happens when it
// was
@ -307,7 +328,11 @@ public final class ZkController {
// unload solrcores that have been 'failed over'
throwErrorIfReplicaReplaced(descriptor);
register(descriptor.getName(), descriptor, true, true);
if (executorService != null) {
executorService.submit(new RegisterCoreAsync(descriptor, true, true));
} else {
register(descriptor.getName(), descriptor, true, true);
}
} catch (Exception e) {
SolrException.log(log, "Error registering SolrCore", e);
}

View File

@ -112,6 +112,10 @@ public class CoreContainer {
private PluginBag<SolrRequestHandler> containerHandlers = new PluginBag<>(SolrRequestHandler.class, null);
public ExecutorService getCoreZkRegisterExecutorService() {
return zkSys.getCoreZkRegisterExecutorService();
}
public SolrRequestHandler getRequestHandler(String path) {
return RequestHandlerBase.getRequestHandler(path, containerHandlers);
}
@ -964,5 +968,4 @@ class CloserThread extends Thread {
}
}
}
}

View File

@ -251,4 +251,8 @@ public class ZkContainer {
}
}
public ExecutorService getCoreZkRegisterExecutorService() {
return coreZkRegister;
}
}