mirror of https://github.com/apache/lucene.git
SOLR-7361: Slow loading SolrCores should not hold up all other SolrCores that have finished loading from serving requests.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1682060 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
feb1f86d84
commit
88640a507c
solr
CHANGES.txt
core/src
java/org/apache/solr
client/solrj/embedded
cloud
core
servlet
test/org/apache/solr/cloud
solrj/src/java/org/apache/solr
test-framework/src/java/org/apache/solr/cloud
|
@ -86,7 +86,9 @@ New Features
|
|||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
(no changes)
|
||||
|
||||
* SOLR-7361: Slow loading SolrCores should not hold up all other SolrCores that have finished loading from serving
|
||||
requests. (Mark Miller, Timothy Potter, Ramkumar Aiyengar)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
|
|
@ -31,17 +31,20 @@ public class JettyConfig {
|
|||
|
||||
public final boolean stopAtShutdown;
|
||||
|
||||
public final Long waitForLoadingCoresToFinishMs;
|
||||
|
||||
public final Map<ServletHolder, String> extraServlets;
|
||||
|
||||
public final Map<Class<? extends Filter>, String> extraFilters;
|
||||
|
||||
public final SSLConfig sslConfig;
|
||||
|
||||
private JettyConfig(int port, String context, boolean stopAtShutdown, Map<ServletHolder, String> extraServlets,
|
||||
private JettyConfig(int port, String context, boolean stopAtShutdown, Long waitForLoadingCoresToFinishMs, Map<ServletHolder, String> extraServlets,
|
||||
Map<Class<? extends Filter>, String> extraFilters, SSLConfig sslConfig) {
|
||||
this.port = port;
|
||||
this.context = context;
|
||||
this.stopAtShutdown = stopAtShutdown;
|
||||
this.waitForLoadingCoresToFinishMs = waitForLoadingCoresToFinishMs;
|
||||
this.extraServlets = extraServlets;
|
||||
this.extraFilters = extraFilters;
|
||||
this.sslConfig = sslConfig;
|
||||
|
@ -67,6 +70,7 @@ public class JettyConfig {
|
|||
int port = 0;
|
||||
String context = "/solr";
|
||||
boolean stopAtShutdown = true;
|
||||
Long waitForLoadingCoresToFinishMs = 300000L;
|
||||
Map<ServletHolder, String> extraServlets = new TreeMap<>();
|
||||
Map<Class<? extends Filter>, String> extraFilters = new TreeMap<>();
|
||||
SSLConfig sslConfig = null;
|
||||
|
@ -86,6 +90,11 @@ public class JettyConfig {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder waitForLoadingCoresToFinish(Long waitForLoadingCoresToFinishMs) {
|
||||
this.waitForLoadingCoresToFinishMs = waitForLoadingCoresToFinishMs;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withServlet(ServletHolder servlet, String servletName) {
|
||||
extraServlets.put(servlet, servletName);
|
||||
return this;
|
||||
|
@ -114,7 +123,7 @@ public class JettyConfig {
|
|||
}
|
||||
|
||||
public JettyConfig build() {
|
||||
return new JettyConfig(port, context, stopAtShutdown, extraServlets, extraFilters, sslConfig);
|
||||
return new JettyConfig(port, context, stopAtShutdown, waitForLoadingCoresToFinishMs, extraServlets, extraFilters, sslConfig);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.solr.client.solrj.embedded;
|
||||
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.servlet.SolrDispatchFilter;
|
||||
import org.eclipse.jetty.server.Connector;
|
||||
import org.eclipse.jetty.server.HttpConfiguration;
|
||||
|
@ -393,6 +394,8 @@ public class JettySolrRunner {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (config.waitForLoadingCoresToFinishMs != null && config.waitForLoadingCoresToFinishMs > 0L) waitForLoadingCoresToFinish(config.waitForLoadingCoresToFinishMs);
|
||||
} finally {
|
||||
if (prevContext != null) {
|
||||
MDC.setContextMap(prevContext);
|
||||
|
@ -562,4 +565,14 @@ public class JettySolrRunner {
|
|||
public String getSolrHome() {
|
||||
return solrHome;
|
||||
}
|
||||
|
||||
private void waitForLoadingCoresToFinish(long timeoutMs) {
|
||||
if (dispatchFilter != null) {
|
||||
SolrDispatchFilter solrFilter = (SolrDispatchFilter) dispatchFilter.getFilter();
|
||||
CoreContainer cores = solrFilter.getCores();
|
||||
if (cores != null) {
|
||||
cores.waitForLoadingCoresToFinish(timeoutMs);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1902,7 +1902,7 @@ public final class ZkController {
|
|||
} catch (NoNodeException nne) {
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
log.warn("could not readd the overseer designate ", e);
|
||||
log.warn("could not read the overseer designate ", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,10 @@ import java.util.Map;
|
|||
import java.util.Properties;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
|
@ -100,6 +103,9 @@ public class CoreContainer {
|
|||
|
||||
private UpdateShardHandler updateShardHandler;
|
||||
|
||||
private ExecutorService coreContainerWorkExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(
|
||||
new DefaultSolrThreadFactory("coreContainerWorkExecutor") );
|
||||
|
||||
protected LogWatcher logging = null;
|
||||
|
||||
private CloserThread backgroundCloser = null;
|
||||
|
@ -122,6 +128,8 @@ public class CoreContainer {
|
|||
|
||||
private PluginBag<SolrRequestHandler> containerHandlers = new PluginBag<>(SolrRequestHandler.class, null);
|
||||
|
||||
private boolean asyncSolrCoreLoad;
|
||||
|
||||
public ExecutorService getCoreZkRegisterExecutorService() {
|
||||
return zkSys.getCoreZkRegisterExecutorService();
|
||||
}
|
||||
|
@ -184,12 +192,21 @@ public class CoreContainer {
|
|||
this(config, properties, new CorePropertiesLocator(config.getCoreRootDirectory()));
|
||||
}
|
||||
|
||||
public CoreContainer(NodeConfig config, Properties properties, boolean asyncSolrCoreLoad) {
|
||||
this(config, properties, new CorePropertiesLocator(config.getCoreRootDirectory()), asyncSolrCoreLoad);
|
||||
}
|
||||
|
||||
public CoreContainer(NodeConfig config, Properties properties, CoresLocator locator) {
|
||||
this(config, properties, locator, false);
|
||||
}
|
||||
|
||||
public CoreContainer(NodeConfig config, Properties properties, CoresLocator locator, boolean asyncSolrCoreLoad) {
|
||||
this.loader = config.getSolrResourceLoader();
|
||||
this.solrHome = loader.getInstanceDir();
|
||||
this.cfg = checkNotNull(config);
|
||||
this.coresLocator = locator;
|
||||
this.containerProperties = new Properties(properties);
|
||||
this.asyncSolrCoreLoad = asyncSolrCoreLoad;
|
||||
}
|
||||
|
||||
private void intializeAuthorizationPlugin() {
|
||||
|
@ -359,57 +376,77 @@ public class CoreContainer {
|
|||
ExecutorService coreLoadExecutor = ExecutorUtil.newMDCAwareFixedThreadPool(
|
||||
( zkSys.getZkController() == null ? cfg.getCoreLoadThreadCount() : Integer.MAX_VALUE ),
|
||||
new DefaultSolrThreadFactory("coreLoadExecutor") );
|
||||
|
||||
final List<Future<SolrCore>> futures = new ArrayList<Future<SolrCore>>();
|
||||
try {
|
||||
|
||||
List<CoreDescriptor> cds = coresLocator.discover(this);
|
||||
checkForDuplicateCoreNames(cds);
|
||||
|
||||
List<Callable<SolrCore>> creators = new ArrayList<>();
|
||||
|
||||
for (final CoreDescriptor cd : cds) {
|
||||
if (cd.isTransient() || !cd.isLoadOnStartup()) {
|
||||
solrCores.putDynamicDescriptor(cd.getName(), cd);
|
||||
} else if (asyncSolrCoreLoad) {
|
||||
solrCores.markCoreAsLoading(cd);
|
||||
}
|
||||
if (cd.isLoadOnStartup()) {
|
||||
creators.add(new Callable<SolrCore>() {
|
||||
futures.add(coreLoadExecutor.submit(new Callable<SolrCore>() {
|
||||
@Override
|
||||
public SolrCore call() throws Exception {
|
||||
SolrCore core;
|
||||
try {
|
||||
if (zkSys.getZkController() != null) {
|
||||
zkSys.getZkController().throwErrorIfReplicaReplaced(cd);
|
||||
}
|
||||
return create(cd, false);
|
||||
|
||||
core = create(cd, false);
|
||||
} finally {
|
||||
if (asyncSolrCoreLoad) {
|
||||
solrCores.markCoreAsNotLoading(cd);
|
||||
}
|
||||
});
|
||||
}
|
||||
try {
|
||||
zkSys.registerInZk(core, true);
|
||||
} catch (Throwable t) {
|
||||
SolrException.log(log, "Error registering SolrCore", t);
|
||||
}
|
||||
return core;
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
coreLoadExecutor.invokeAll(creators);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Interrupted while loading cores");
|
||||
}
|
||||
|
||||
// Start the background thread
|
||||
backgroundCloser = new CloserThread(this, solrCores, cfg);
|
||||
backgroundCloser.start();
|
||||
|
||||
} finally {
|
||||
if (asyncSolrCoreLoad && futures != null) {
|
||||
Thread shutdownThread = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
for (Future<SolrCore> future : futures) {
|
||||
try {
|
||||
future.get();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (ExecutionException e) {
|
||||
log.error("Error waiting for SolrCore to be created", e);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
ExecutorUtil.shutdownNowAndAwaitTermination(coreLoadExecutor);
|
||||
}
|
||||
}
|
||||
};
|
||||
coreContainerWorkExecutor.submit(shutdownThread);
|
||||
} else {
|
||||
ExecutorUtil.shutdownAndAwaitTermination(coreLoadExecutor);
|
||||
}
|
||||
}
|
||||
|
||||
if (isZooKeeperAware()) {
|
||||
// register in zk in background threads
|
||||
Collection<SolrCore> cores = getCores();
|
||||
if (cores != null) {
|
||||
for (SolrCore core : cores) {
|
||||
try {
|
||||
zkSys.registerInZk(core, true);
|
||||
} catch (Throwable t) {
|
||||
SolrException.log(log, "Error registering SolrCore", t);
|
||||
}
|
||||
}
|
||||
}
|
||||
zkSys.getZkController().checkOverseerDesignate();
|
||||
}
|
||||
}
|
||||
|
@ -441,6 +478,8 @@ public class CoreContainer {
|
|||
|
||||
isShutDown = true;
|
||||
|
||||
ExecutorUtil.shutdownAndAwaitTermination(coreContainerWorkExecutor);
|
||||
|
||||
if (isZooKeeperAware()) {
|
||||
cancelCoreRecoveries();
|
||||
zkSys.publishCoresAsDown(solrCores.getCores());
|
||||
|
@ -621,7 +660,7 @@ public class CoreContainer {
|
|||
public SolrCore create(CoreDescriptor dcore, boolean publishState) {
|
||||
|
||||
if (isShutDown) {
|
||||
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Solr has close.");
|
||||
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Solr has been shutdown.");
|
||||
}
|
||||
|
||||
SolrCore core = null;
|
||||
|
@ -913,6 +952,20 @@ public class CoreContainer {
|
|||
return jarRepository;
|
||||
}
|
||||
|
||||
/**
|
||||
* If using asyncSolrCoreLoad=true, calling this after {@link #load()} will
|
||||
* not return until all cores have finished loading.
|
||||
*
|
||||
* @param timeoutMs timeout, upon which method simply returns
|
||||
*/
|
||||
public void waitForLoadingCoresToFinish(long timeoutMs) {
|
||||
solrCores.waitForLoadingCoresToFinish(timeoutMs);
|
||||
}
|
||||
|
||||
public void waitForLoadingCore(String name, long timeoutMs) {
|
||||
solrCores.waitForLoadingCoreToFinish(name, timeoutMs);
|
||||
}
|
||||
|
||||
// ---------------- CoreContainer request handlers --------------
|
||||
|
||||
protected <T> T createHandler(String handlerClass, Class<T> clazz) {
|
||||
|
@ -1002,6 +1055,10 @@ public class CoreContainer {
|
|||
return loader;
|
||||
}
|
||||
|
||||
public boolean isCoreLoading(String name) {
|
||||
return solrCores.isCoreLoading(name);
|
||||
}
|
||||
|
||||
public AuthorizationPlugin getAuthorizationPlugin() {
|
||||
return authorizationPlugin;
|
||||
}
|
||||
|
|
|
@ -25,12 +25,15 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
||||
class SolrCores {
|
||||
|
@ -47,7 +50,9 @@ class SolrCores {
|
|||
|
||||
private final CoreContainer container;
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(SolrCores.class);
|
||||
private Set<String> currentlyLoadingCores = Collections.newSetFromMap(new ConcurrentHashMap<String,Boolean>());
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SolrCores.class);
|
||||
|
||||
// This map will hold objects that are being currently operated on. The core (value) may be null in the case of
|
||||
// initial load. The rule is, never to any operation on a core that is currently being operated upon.
|
||||
|
@ -72,7 +77,7 @@ class SolrCores {
|
|||
if (size() > cacheSize) {
|
||||
synchronized (modifyLock) {
|
||||
SolrCore coreToClose = eldest.getValue();
|
||||
logger.info("Closing transient core [{}]", coreToClose.getName());
|
||||
log.info("Closing transient core [{}]", coreToClose.getName());
|
||||
pendingCloses.add(coreToClose); // Essentially just queue this core up for closing.
|
||||
modifyLock.notifyAll(); // Wakes up closer thread too
|
||||
}
|
||||
|
@ -395,6 +400,7 @@ class SolrCores {
|
|||
|
||||
/**
|
||||
* Return the CoreDescriptor corresponding to a given core name.
|
||||
* Blocks if the SolrCore is still loading until it is ready.
|
||||
* @param coreName the name of the core
|
||||
* @return the CoreDescriptor
|
||||
*/
|
||||
|
@ -425,4 +431,63 @@ class SolrCores {
|
|||
}
|
||||
return cds;
|
||||
}
|
||||
|
||||
// cores marked as loading will block on getCore
|
||||
public void markCoreAsLoading(CoreDescriptor cd) {
|
||||
synchronized (modifyLock) {
|
||||
currentlyLoadingCores.add(cd.getName());
|
||||
}
|
||||
}
|
||||
|
||||
//cores marked as loading will block on getCore
|
||||
public void markCoreAsNotLoading(CoreDescriptor cd) {
|
||||
synchronized (modifyLock) {
|
||||
currentlyLoadingCores.remove(cd.getName());
|
||||
}
|
||||
}
|
||||
|
||||
// returns when no cores are marked as loading
|
||||
public void waitForLoadingCoresToFinish(long timeoutMs) {
|
||||
long time = System.nanoTime();
|
||||
long timeout = time + TimeUnit.NANOSECONDS.convert(timeoutMs, TimeUnit.MILLISECONDS);
|
||||
synchronized (modifyLock) {
|
||||
while (!currentlyLoadingCores.isEmpty()) {
|
||||
try {
|
||||
modifyLock.wait(500);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
if (System.nanoTime() >= timeout) {
|
||||
log.warn("Timed out waiting for SolrCores to finish loading.");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// returns when core is finished loading, throws exception if no such core loading or loaded
|
||||
public void waitForLoadingCoreToFinish(String core, long timeoutMs) {
|
||||
long time = System.nanoTime();
|
||||
long timeout = time + TimeUnit.NANOSECONDS.convert(timeoutMs, TimeUnit.MILLISECONDS);
|
||||
synchronized (modifyLock) {
|
||||
while (isCoreLoading(core)) {
|
||||
try {
|
||||
modifyLock.wait(500);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
if (System.nanoTime() >= timeout) {
|
||||
log.warn("Timed out waiting for SolrCore, {}, to finish loading.", core);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isCoreLoading(String name) {
|
||||
if (currentlyLoadingCores.contains(name)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -71,6 +71,7 @@ import org.apache.http.entity.InputStreamEntity;
|
|||
import org.apache.http.util.EntityUtils;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.cloud.Aliases;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
|
@ -232,6 +233,14 @@ public class HttpSolrCall {
|
|||
core = cores.getCore(corename);
|
||||
if (core != null) {
|
||||
path = path.substring(idx);
|
||||
} else if (cores.isCoreLoading(corename)) { // extra mem barriers, so don't look at this before trying to get core
|
||||
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "SolrCore is loading");
|
||||
} else {
|
||||
// the core may have just finished loading
|
||||
core = cores.getCore(corename);
|
||||
if (core != null) {
|
||||
path = path.substring(idx);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (core == null) {
|
||||
|
|
|
@ -136,7 +136,7 @@ public class SolrDispatchFilter extends BaseSolrFilter {
|
|||
*/
|
||||
protected CoreContainer createCoreContainer(String solrHome, Properties extraProperties) {
|
||||
NodeConfig nodeConfig = loadNodeConfig(solrHome, extraProperties);
|
||||
cores = new CoreContainer(nodeConfig, extraProperties);
|
||||
cores = new CoreContainer(nodeConfig, extraProperties, true);
|
||||
cores.load();
|
||||
return cores;
|
||||
}
|
||||
|
|
|
@ -519,7 +519,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
createCores(httpSolrClient, executor, "multiunload2", 1, cnt);
|
||||
} finally {
|
||||
if (executor != null) {
|
||||
ExecutorUtil.shutdownAndAwaitTermination(executor, 120, TimeUnit.SECONDS);
|
||||
ExecutorUtil.shutdownAndAwaitTermination(executor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -158,8 +158,6 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa
|
|||
|
||||
public CollectionsAPIDistributedZkTest() {
|
||||
sliceCount = 2;
|
||||
checkCreatedVsState = false;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -93,8 +93,6 @@ public class CustomCollectionTest extends AbstractFullDistribZkTestBase {
|
|||
|
||||
public CustomCollectionTest() {
|
||||
sliceCount = 2;
|
||||
checkCreatedVsState = false;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -71,7 +71,6 @@ public class DeleteLastCustomShardedReplicaTest extends AbstractFullDistribZkTes
|
|||
|
||||
public DeleteLastCustomShardedReplicaTest() {
|
||||
sliceCount = 2;
|
||||
checkCreatedVsState = false;
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -70,7 +70,6 @@ public class DeleteReplicaTest extends AbstractFullDistribZkTestBase {
|
|||
|
||||
public DeleteReplicaTest() {
|
||||
sliceCount = 2;
|
||||
checkCreatedVsState = false;
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -49,7 +49,6 @@ public class ExternalCollectionsTest extends AbstractFullDistribZkTestBase {
|
|||
}
|
||||
|
||||
public ExternalCollectionsTest() {
|
||||
checkCreatedVsState = false;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -80,8 +80,6 @@ public class OverseerRolesTest extends AbstractFullDistribZkTestBase{
|
|||
public OverseerRolesTest() {
|
||||
sliceCount = 2;
|
||||
fixShardCount(TEST_NIGHTLY ? 6 : 2);
|
||||
|
||||
checkCreatedVsState = false;
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -100,8 +100,6 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
|
|||
sliceCount = 2;
|
||||
completionService = new ExecutorCompletionService<>(executor);
|
||||
pending = new HashSet<>();
|
||||
checkCreatedVsState = false;
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -22,10 +22,13 @@ import org.apache.lucene.util.LuceneTestCase;
|
|||
import org.apache.lucene.util.LuceneTestCase.SuppressSysoutChecks;
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.embedded.JettyConfig;
|
||||
import org.apache.solr.client.solrj.embedded.JettyConfig.Builder;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
|
@ -88,7 +91,9 @@ public class TestMiniSolrCloudCluster extends LuceneTestCase {
|
|||
protected void testCollectionCreateSearchDelete() throws Exception {
|
||||
|
||||
File solrXml = new File(SolrTestCaseJ4.TEST_HOME(), "solr-no-core.xml");
|
||||
MiniSolrCloudCluster miniCluster = new MiniSolrCloudCluster(NUM_SERVERS, null, createTempDir().toFile(), solrXml, null, null);
|
||||
Builder jettyConfig = JettyConfig.builder();
|
||||
jettyConfig.waitForLoadingCoresToFinish(null);
|
||||
MiniSolrCloudCluster miniCluster = new MiniSolrCloudCluster(NUM_SERVERS, createTempDir().toFile(), solrXml, jettyConfig.build());
|
||||
|
||||
try {
|
||||
assertNotNull(miniCluster.getZkServer());
|
||||
|
@ -174,6 +179,16 @@ public class TestMiniSolrCloudCluster extends LuceneTestCase {
|
|||
startedServer = miniCluster.startJettySolrRunner(null, null, null);
|
||||
assertTrue(startedServer.isRunning());
|
||||
assertEquals(NUM_SERVERS, miniCluster.getJettySolrRunners().size());
|
||||
Thread.sleep(15000);
|
||||
try {
|
||||
cloudSolrClient.query(query);
|
||||
fail("Expected exception on query because collection should not be ready - we have turned on async core loading");
|
||||
} catch (SolrServerException e) {
|
||||
SolrException rc = (SolrException) e.getRootCause();
|
||||
assertTrue(rc.code() >= 500 && rc.code() < 600);
|
||||
} catch (SolrException e) {
|
||||
assertTrue(e.code() >= 500 && e.code() < 600);
|
||||
}
|
||||
|
||||
// delete the collection we created earlier
|
||||
miniCluster.deleteCollection(collectionName);
|
||||
|
|
|
@ -56,7 +56,6 @@ public class UnloadDistributedZkTest extends BasicDistributedZkTest {
|
|||
|
||||
public UnloadDistributedZkTest() {
|
||||
super();
|
||||
checkCreatedVsState = false;
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -375,7 +374,7 @@ public class UnloadDistributedZkTest extends BasicDistributedZkTest {
|
|||
// create the cores
|
||||
createCores(adminClient, executor, "multiunload", 2, cnt);
|
||||
} finally {
|
||||
ExecutorUtil.shutdownAndAwaitTermination(executor, 120, TimeUnit.SECONDS);
|
||||
ExecutorUtil.shutdownAndAwaitTermination(executor);
|
||||
}
|
||||
|
||||
executor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0, Integer.MAX_VALUE, 5,
|
||||
|
@ -399,7 +398,7 @@ public class UnloadDistributedZkTest extends BasicDistributedZkTest {
|
|||
Thread.sleep(random().nextInt(50));
|
||||
}
|
||||
} finally {
|
||||
ExecutorUtil.shutdownAndAwaitTermination(executor, 120, TimeUnit.SECONDS);
|
||||
ExecutorUtil.shutdownAndAwaitTermination(executor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1068,7 +1068,7 @@ public class CloudSolrClient extends SolrClient {
|
|||
for (String s : collectionNames) {
|
||||
if(s!=null) collectionStateCache.remove(s);
|
||||
}
|
||||
throw new SolrException(SolrException.ErrorCode.INVALID_STATE, "Not enough nodes to handle the request");
|
||||
throw new SolrException(SolrException.ErrorCode.INVALID_STATE, "Could not find a healthy node to handle the request.");
|
||||
}
|
||||
|
||||
Collections.shuffle(theUrlList, rand);
|
||||
|
|
|
@ -37,42 +37,37 @@ import org.slf4j.MDC;
|
|||
public class ExecutorUtil {
|
||||
public static Logger log = LoggerFactory.getLogger(ExecutorUtil.class);
|
||||
|
||||
// this will interrupt the threads! Lucene and Solr do not like this because it can close channels, so only use
|
||||
// this if you know what you are doing - you probably want shutdownAndAwaitTermination
|
||||
public static void shutdownNowAndAwaitTermination(ExecutorService pool) {
|
||||
pool.shutdown(); // Disable new tasks from being submitted
|
||||
pool.shutdownNow(); // Cancel currently executing tasks
|
||||
pool.shutdownNow(); // Cancel currently executing tasks - NOTE: this interrupts!
|
||||
boolean shutdown = false;
|
||||
while (!shutdown) {
|
||||
try {
|
||||
// Wait a while for existing tasks to terminate
|
||||
shutdown = pool.awaitTermination(5, TimeUnit.SECONDS);
|
||||
shutdown = pool.awaitTermination(1, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException ie) {
|
||||
// Preserve interrupt status
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
if (!shutdown) {
|
||||
pool.shutdownNow(); // Cancel currently executing tasks
|
||||
pool.shutdownNow(); // Cancel currently executing tasks - NOTE: this interrupts!
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void shutdownAndAwaitTermination(ExecutorService pool) {
|
||||
shutdownAndAwaitTermination(pool, 60, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public static void shutdownAndAwaitTermination(ExecutorService pool, long timeout, TimeUnit timeUnit) {
|
||||
pool.shutdown(); // Disable new tasks from being submitted
|
||||
boolean shutdown = false;
|
||||
while (!shutdown) {
|
||||
try {
|
||||
// Wait a while for existing tasks to terminate
|
||||
shutdown = pool.awaitTermination(timeout, timeUnit);
|
||||
shutdown = pool.awaitTermination(1, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException ie) {
|
||||
// Preserve interrupt status
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
if (!shutdown) {
|
||||
pool.shutdownNow(); // Cancel currently executing tasks
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -118,7 +118,6 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
|||
|
||||
protected Map<String,CloudJettyRunner> shardToLeaderJetty = new HashMap<>();
|
||||
private boolean cloudInit;
|
||||
protected boolean checkCreatedVsState;
|
||||
protected boolean useJettyDataDir = true;
|
||||
|
||||
protected Map<URI,SocketProxy> proxies = new HashMap<>();
|
||||
|
@ -305,7 +304,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
|||
|
||||
initCloud();
|
||||
|
||||
createJettys(numServers, checkCreatedVsState).size();
|
||||
createJettys(numServers).size();
|
||||
|
||||
int cnt = getTotalReplicas(DEFAULT_COLLECTION);
|
||||
if (cnt > 0) {
|
||||
|
@ -336,10 +335,6 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
|||
}
|
||||
}
|
||||
|
||||
protected List<JettySolrRunner> createJettys(int numJettys) throws Exception {
|
||||
return createJettys(numJettys, false);
|
||||
}
|
||||
|
||||
protected String defaultStateFormat = String.valueOf( 1 + random().nextInt(2));
|
||||
|
||||
protected String getStateFormat() {
|
||||
|
@ -350,13 +345,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
|||
return defaultStateFormat; // random
|
||||
}
|
||||
|
||||
/**
|
||||
* @param checkCreatedVsState
|
||||
* if true, make sure the number created (numJettys) matches the
|
||||
* number in the cluster state - if you add more jetties this may not
|
||||
* be the case
|
||||
*/
|
||||
protected List<JettySolrRunner> createJettys(int numJettys, boolean checkCreatedVsState) throws Exception {
|
||||
protected List<JettySolrRunner> createJettys(int numJettys) throws Exception {
|
||||
List<JettySolrRunner> jettys = new ArrayList<>();
|
||||
List<SolrClient> clients = new ArrayList<>();
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
@ -393,7 +382,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
|||
this.clients.addAll(clients);
|
||||
|
||||
int numShards = getTotalReplicas(DEFAULT_COLLECTION);
|
||||
if (checkCreatedVsState) {
|
||||
|
||||
// now wait until we see that the number of shards in the cluster state
|
||||
// matches what we expect
|
||||
int retries = 0;
|
||||
|
@ -402,18 +391,16 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
|||
if (numShards == getShardCount()) break;
|
||||
if (retries++ == 60) {
|
||||
printLayoutOnTearDown = true;
|
||||
fail("Shards in the state does not match what we set:" + numShards
|
||||
+ " vs " + getShardCount());
|
||||
fail("Shards in the state does not match what we set:" + numShards + " vs " + getShardCount());
|
||||
}
|
||||
Thread.sleep(500);
|
||||
}
|
||||
|
||||
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
|
||||
// also make sure we have a leader for each shard
|
||||
// make sure we have a leader for each shard
|
||||
for (int i = 1; i <= sliceCount; i++) {
|
||||
zkStateReader.getLeaderRetry(DEFAULT_COLLECTION, "shard" + i, 10000);
|
||||
}
|
||||
}
|
||||
|
||||
if (numShards > 0) {
|
||||
updateMappingsFromZk(this.jettys, this.clients);
|
||||
|
|
Loading…
Reference in New Issue