SOLR-7180: MiniSolrCloudCluster starts up and shuts down its jetties in parallel

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1665353 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alan Woodward 2015-03-09 20:51:28 +00:00
parent 91de592387
commit b4ee7b6eb4
3 changed files with 238 additions and 98 deletions

View File

@ -285,6 +285,9 @@ Other Changes
* SOLR-6804: Untangle SnapPuller and ReplicationHandler (Ramkumar Aiyengar)
* SOLR-7180: MiniSolrCloudCluster will startup and shutdown its jetties in
parallel (Alan Woodward, Tomás Fernández Löbbe, Vamsee Yarlagadda)
================== 5.0.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

View File

@ -22,9 +22,9 @@ import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.SuppressSysoutChecks;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.embedded.JettyConfig;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
@ -34,8 +34,6 @@ import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.util.RevertDefaultThreadHandlerRule;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
@ -45,10 +43,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Test of the MiniSolrCloudCluster functionality. Keep in mind,
@ -62,7 +62,6 @@ public class TestMiniSolrCloudCluster extends LuceneTestCase {
private static final int NUM_SERVERS = 5;
private static final int NUM_SHARDS = 2;
private static final int REPLICATION_FACTOR = 2;
private static MiniSolrCloudCluster miniCluster;
@Rule
public TestRule solrTestRules = RuleChain
@ -73,99 +72,151 @@ public class TestMiniSolrCloudCluster extends LuceneTestCase {
new SystemPropertiesRestoreRule()).around(
new RevertDefaultThreadHandlerRule());
@BeforeClass
public static void startup() throws Exception {
File solrXml = new File(SolrTestCaseJ4.TEST_HOME(), "solr-no-core.xml");
miniCluster = new MiniSolrCloudCluster(NUM_SERVERS, null, createTempDir().toFile(), solrXml, null, null);
}
@Test
public void testBasics() throws Exception {
@AfterClass
public static void shutdown() throws Exception {
if (miniCluster != null) {
File solrXml = new File(SolrTestCaseJ4.TEST_HOME(), "solr-no-core.xml");
MiniSolrCloudCluster miniCluster = new MiniSolrCloudCluster(NUM_SERVERS, null, createTempDir().toFile(), solrXml, null, null);
try {
assertNotNull(miniCluster.getZkServer());
List<JettySolrRunner> jettys = miniCluster.getJettySolrRunners();
assertEquals(NUM_SERVERS, jettys.size());
for (JettySolrRunner jetty : jettys) {
assertTrue(jetty.isRunning());
}
// shut down a server
JettySolrRunner stoppedServer = miniCluster.stopJettySolrRunner(0);
assertTrue(stoppedServer.isStopped());
assertEquals(NUM_SERVERS - 1, miniCluster.getJettySolrRunners().size());
// create a server
JettySolrRunner startedServer = miniCluster.startJettySolrRunner(null, null, null);
assertTrue(startedServer.isRunning());
assertEquals(NUM_SERVERS, miniCluster.getJettySolrRunners().size());
// create collection
String collectionName = "testSolrCloudCollection";
String configName = "solrCloudCollectionConfig";
File configDir = new File(SolrTestCaseJ4.TEST_HOME() + File.separator + "collection1" + File.separator + "conf");
miniCluster.uploadConfigDir(configDir, configName);
Map<String, String> collectionProperties = new HashMap<>();
collectionProperties.put(CoreDescriptor.CORE_CONFIG, "solrconfig-tlog.xml");
collectionProperties.put("solr.tests.maxBufferedDocs", "100000");
collectionProperties.put("solr.tests.maxIndexingThreads", "-1");
collectionProperties.put("solr.tests.ramBufferSizeMB", "100");
// use non-test classes so RandomizedRunner isn't necessary
collectionProperties.put("solr.tests.mergePolicy", "org.apache.lucene.index.TieredMergePolicy");
collectionProperties.put("solr.tests.mergeScheduler", "org.apache.lucene.index.ConcurrentMergeScheduler");
collectionProperties.put("solr.directoryFactory", "solr.RAMDirectoryFactory");
miniCluster.createCollection(collectionName, NUM_SHARDS, REPLICATION_FACTOR, configName, collectionProperties);
try (SolrZkClient zkClient = new SolrZkClient
(miniCluster.getZkServer().getZkAddress(), AbstractZkTestCase.TIMEOUT, 45000, null)) {
ZkStateReader zkStateReader = new ZkStateReader(zkClient);
AbstractDistribZkTestBase.waitForRecoveriesToFinish(collectionName, zkStateReader, true, true, 330);
// modify/query collection
CloudSolrClient cloudSolrClient = miniCluster.getSolrClient();
cloudSolrClient.setDefaultCollection(collectionName);
SolrInputDocument doc = new SolrInputDocument();
doc.setField("id", "1");
cloudSolrClient.add(doc);
cloudSolrClient.commit();
SolrQuery query = new SolrQuery();
query.setQuery("*:*");
QueryResponse rsp = cloudSolrClient.query(query);
assertEquals(1, rsp.getResults().getNumFound());
// remove a server not hosting any replicas
zkStateReader.updateClusterState(true);
ClusterState clusterState = zkStateReader.getClusterState();
HashMap<String, JettySolrRunner> jettyMap = new HashMap<String, JettySolrRunner>();
for (JettySolrRunner jetty : miniCluster.getJettySolrRunners()) {
String key = jetty.getBaseUrl().toString().substring((jetty.getBaseUrl().getProtocol() + "://").length());
jettyMap.put(key, jetty);
}
Collection<Slice> slices = clusterState.getSlices(collectionName);
// track the servers not host repliacs
for (Slice slice : slices) {
jettyMap.remove(slice.getLeader().getNodeName().replace("_solr", "/solr"));
for (Replica replica : slice.getReplicas()) {
jettyMap.remove(replica.getNodeName().replace("_solr", "/solr"));
}
}
assertTrue("Expected to find a node without a replica", jettyMap.size() > 0);
JettySolrRunner jettyToStop = jettyMap.entrySet().iterator().next().getValue();
jettys = miniCluster.getJettySolrRunners();
for (int i = 0; i < jettys.size(); ++i) {
if (jettys.get(i).equals(jettyToStop)) {
miniCluster.stopJettySolrRunner(i);
assertEquals(NUM_SERVERS - 1, miniCluster.getJettySolrRunners().size());
}
}
}
}
finally {
miniCluster.shutdown();
}
miniCluster = null;
}
@Test
public void testBasics() throws Exception {
assertNotNull(miniCluster.getZkServer());
List<JettySolrRunner> jettys = miniCluster.getJettySolrRunners();
assertEquals(NUM_SERVERS, jettys.size());
for (JettySolrRunner jetty : jettys) {
assertTrue(jetty.isRunning());
public void testErrorsInStartup() throws Exception {
File solrXml = new File(SolrTestCaseJ4.TEST_HOME(), "solr-no-core.xml");
AtomicInteger jettyIndex = new AtomicInteger();
MiniSolrCloudCluster cluster = null;
try {
cluster = new MiniSolrCloudCluster(3, createTempDir().toFile(), solrXml, JettyConfig.builder().build()) {
@Override
public JettySolrRunner startJettySolrRunner(JettyConfig config) throws Exception {
if (jettyIndex.incrementAndGet() != 2)
return super.startJettySolrRunner(config);
throw new IOException("Fake exception on startup!");
}
};
fail("Expected an exception to be thrown from MiniSolrCloudCluster");
}
// shut down a server
JettySolrRunner stoppedServer = miniCluster.stopJettySolrRunner(0);
assertTrue(stoppedServer.isStopped());
assertEquals(NUM_SERVERS - 1, miniCluster.getJettySolrRunners().size());
// create a server
JettySolrRunner startedServer = miniCluster.startJettySolrRunner(null, null, null);
assertTrue(startedServer.isRunning());
assertEquals(NUM_SERVERS, miniCluster.getJettySolrRunners().size());
// create collection
String collectionName = "testSolrCloudCollection";
String configName = "solrCloudCollectionConfig";
File configDir = new File(SolrTestCaseJ4.TEST_HOME() + File.separator + "collection1" + File.separator + "conf");
miniCluster.uploadConfigDir(configDir, configName);
Map<String, String> collectionProperties = new HashMap<>();
collectionProperties.put(CoreDescriptor.CORE_CONFIG, "solrconfig-tlog.xml");
collectionProperties.put("solr.tests.maxBufferedDocs", "100000");
collectionProperties.put("solr.tests.maxIndexingThreads", "-1");
collectionProperties.put("solr.tests.ramBufferSizeMB", "100");
// use non-test classes so RandomizedRunner isn't necessary
collectionProperties.put("solr.tests.mergePolicy", "org.apache.lucene.index.TieredMergePolicy");
collectionProperties.put("solr.tests.mergeScheduler", "org.apache.lucene.index.ConcurrentMergeScheduler");
collectionProperties.put("solr.directoryFactory", "solr.RAMDirectoryFactory");
miniCluster.createCollection(collectionName, NUM_SHARDS, REPLICATION_FACTOR, configName, collectionProperties);
try(SolrZkClient zkClient = new SolrZkClient
(miniCluster.getZkServer().getZkAddress(), AbstractZkTestCase.TIMEOUT, 45000, null)) {
ZkStateReader zkStateReader = new ZkStateReader(zkClient);
AbstractDistribZkTestBase.waitForRecoveriesToFinish(collectionName, zkStateReader, true, true, 330);
// modify/query collection
CloudSolrClient cloudSolrClient = miniCluster.getSolrClient();
cloudSolrClient.setDefaultCollection(collectionName);
SolrInputDocument doc = new SolrInputDocument();
doc.setField("id", "1");
cloudSolrClient.add(doc);
cloudSolrClient.commit();
SolrQuery query = new SolrQuery();
query.setQuery("*:*");
QueryResponse rsp = cloudSolrClient.query(query);
assertEquals(1, rsp.getResults().getNumFound());
// remove a server not hosting any replicas
zkStateReader.updateClusterState(true);
ClusterState clusterState = zkStateReader.getClusterState();
HashMap<String, JettySolrRunner> jettyMap = new HashMap<String, JettySolrRunner>();
for (JettySolrRunner jetty : miniCluster.getJettySolrRunners()) {
String key = jetty.getBaseUrl().toString().substring((jetty.getBaseUrl().getProtocol() + "://").length());
jettyMap.put(key, jetty);
}
Collection<Slice> slices = clusterState.getSlices(collectionName);
// track the servers not host repliacs
for (Slice slice : slices) {
jettyMap.remove(slice.getLeader().getNodeName().replace("_solr", "/solr"));
for (Replica replica : slice.getReplicas()) {
jettyMap.remove(replica.getNodeName().replace("_solr", "/solr"));
}
}
assertTrue("Expected to find a node without a replica", jettyMap.size() > 0);
JettySolrRunner jettyToStop = jettyMap.entrySet().iterator().next().getValue();
jettys = miniCluster.getJettySolrRunners();
for (int i = 0; i < jettys.size(); ++i) {
if (jettys.get(i).equals(jettyToStop)) {
miniCluster.stopJettySolrRunner(i);
assertEquals(NUM_SERVERS - 1, miniCluster.getJettySolrRunners().size());
}
}
catch (Exception e) {
assertEquals("Error starting up MiniSolrCloudCluster", e.getMessage());
assertEquals("Expected one suppressed exception", 1, e.getSuppressed().length);
assertEquals("Fake exception on startup!", e.getSuppressed()[0].getMessage());
}
finally {
if (cluster != null)
cluster.shutdown();
}
}
@Test
public void testErrorsInShutdown() throws Exception {
File solrXml = new File(SolrTestCaseJ4.TEST_HOME(), "solr-no-core.xml");
AtomicInteger jettyIndex = new AtomicInteger();
MiniSolrCloudCluster cluster = new MiniSolrCloudCluster(3, createTempDir().toFile(), solrXml, JettyConfig.builder().build()) {
@Override
protected JettySolrRunner stopJettySolrRunner(JettySolrRunner jetty) throws Exception {
JettySolrRunner j = super.stopJettySolrRunner(jetty);
if (jettyIndex.incrementAndGet() == 2)
throw new IOException("Fake IOException on shutdown!");
return j;
}
};
try {
cluster.shutdown();
fail("Expected an exception to be thrown on MiniSolrCloudCluster shutdown");
}
catch (Exception e) {
assertEquals("Error shutting down MiniSolrCloudCluster", e.getMessage());
assertEquals("Expected one suppressed exception", 1, e.getSuppressed().length);
assertEquals("Fake IOException on shutdown!", e.getSuppressed()[0].getMessage());
}
}
}

View File

@ -31,6 +31,7 @@ import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.zookeeper.KeeperException;
import org.eclipse.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
@ -39,30 +40,46 @@ import org.slf4j.LoggerFactory;
import javax.servlet.Filter;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* "Mini" SolrCloud cluster to be used for testing
*/
public class MiniSolrCloudCluster {
private static Logger log = LoggerFactory.getLogger(MiniSolrCloudCluster.class);
private final ZkTestServer zkServer;
private final List<JettySolrRunner> jettys;
private final List<JettySolrRunner> jettys = new LinkedList<>();
private final File testDir;
private final CloudSolrClient solrClient;
private final JettyConfig jettyConfig;
private final ExecutorService executor = Executors.newCachedThreadPool(new SolrjNamedThreadFactory("jetty-launcher"));
/**
* "Mini" SolrCloud cluster to be used for testing
* Create a MiniSolrCloudCluster
*
* @param numServers number of Solr servers to start
* @param hostContext context path of Solr servers used by Jetty
* @param baseDir base directory that the mini cluster should be run from
* @param solrXml solr.xml file to be uploaded to ZooKeeper
* @param extraServlets Extra servlets to be started by Jetty
* @param extraRequestFilters extra filters to be started by Jetty
*
* @throws Exception if there was an error starting the cluster
*/
public MiniSolrCloudCluster(int numServers, String hostContext, File baseDir, File solrXml,
SortedMap<ServletHolder, String> extraServlets,
@ -71,7 +88,8 @@ public class MiniSolrCloudCluster {
}
/**
* "Mini" SolrCloud cluster to be used for testing
* Create a MiniSolrCloudCluster
*
* @param numServers number of Solr servers to start
* @param hostContext context path of Solr servers used by Jetty
* @param baseDir base directory that the mini cluster should be run from
@ -79,6 +97,8 @@ public class MiniSolrCloudCluster {
* @param extraServlets Extra servlets to be started by Jetty
* @param extraRequestFilters extra filters to be started by Jetty
* @param sslConfig SSL configuration
*
* @throws Exception if there was an error starting the cluster
*/
public MiniSolrCloudCluster(int numServers, String hostContext, File baseDir, File solrXml,
SortedMap<ServletHolder, String> extraServlets,
@ -92,6 +112,16 @@ public class MiniSolrCloudCluster {
.build());
}
/**
* Create a MiniSolrCloudCluster
*
* @param numServers number of Solr servers to start
* @param baseDir base directory that the mini cluster should be run from
* @param solrXml solr.xml file to be uploaded to ZooKeeper
* @param jettyConfig Jetty configuration
*
* @throws Exception if there was an error starting the cluster
*/
public MiniSolrCloudCluster(int numServers, File baseDir, File solrXml, JettyConfig jettyConfig) throws Exception {
this.testDir = baseDir;
@ -114,11 +144,28 @@ public class MiniSolrCloudCluster {
System.setProperty("solr.solrxml.location","zookeeper");
System.setProperty("zkHost", zkServer.getZkAddress());
jettys = new LinkedList<>();
List<Callable<JettySolrRunner>> startups = new ArrayList<>(numServers);
for (int i = 0; i < numServers; ++i) {
startJettySolrRunner(jettyConfig);
startups.add(new Callable<JettySolrRunner>() {
@Override
public JettySolrRunner call() throws Exception {
return startJettySolrRunner(jettyConfig);
}
});
}
Collection<Future<JettySolrRunner>> futures = executor.invokeAll(startups);
Exception startupError = checkForExceptions("Error starting up MiniSolrCloudCluster", futures);
if (startupError != null) {
try {
this.shutdown();
}
catch (Throwable t) {
startupError.addSuppressed(t);
}
throw startupError;
}
solrClient = buildSolrClient();
}
@ -220,6 +267,11 @@ public class MiniSolrCloudCluster {
jettys.remove(index);
return jetty;
}
protected JettySolrRunner stopJettySolrRunner(JettySolrRunner jetty) throws Exception {
jetty.stop();
return jetty;
}
public void uploadConfigDir(File configDir, String configName) throws IOException, KeeperException, InterruptedException {
try(SolrZkClient zkClient = new SolrZkClient(zkServer.getZkAddress(),
@ -254,11 +306,26 @@ public class MiniSolrCloudCluster {
*/
public void shutdown() throws Exception {
try {
solrClient.close();
for (int i = jettys.size() - 1; i >= 0; --i) {
stopJettySolrRunner(i);
if (solrClient != null)
solrClient.close();
List<Callable<JettySolrRunner>> shutdowns = new ArrayList<>(jettys.size());
for (final JettySolrRunner jetty : jettys) {
shutdowns.add(new Callable<JettySolrRunner>() {
@Override
public JettySolrRunner call() throws Exception {
return stopJettySolrRunner(jetty);
}
});
}
jettys.clear();
Collection<Future<JettySolrRunner>> futures = executor.invokeAll(shutdowns);
Exception shutdownError = checkForExceptions("Error shutting down MiniSolrCloudCluster", futures);
if (shutdownError != null) {
throw shutdownError;
}
} finally {
executor.shutdown();
executor.awaitTermination(2, TimeUnit.SECONDS);
try {
zkServer.shutdown();
} finally {
@ -282,4 +349,23 @@ public class MiniSolrCloudCluster {
if (!ctx.startsWith("/")) ctx = "/" + ctx;
return ctx;
}
private Exception checkForExceptions(String message, Collection<Future<JettySolrRunner>> futures) throws InterruptedException {
Exception parsed = new Exception(message);
boolean ok = true;
for (Future<JettySolrRunner> future : futures) {
try {
future.get();
}
catch (ExecutionException e) {
parsed.addSuppressed(e.getCause());
ok = false;
}
catch (InterruptedException e) {
Thread.interrupted();
throw e;
}
}
return ok ? null : parsed;
}
}