diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoAddReplicasIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoAddReplicasIntegrationTest.java index a5dedc3a8a6..68898fb4a6a 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoAddReplicasIntegrationTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoAddReplicasIntegrationTest.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrServerException; @@ -31,7 +32,9 @@ import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.request.V2Request; +import org.apache.solr.cloud.MiniSolrCloudCluster; import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.cloud.CollectionStatePredicate; import org.apache.solr.common.cloud.ClusterStateUtil; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; @@ -49,16 +52,15 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@org.apache.solr.util.LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.cloud.autoscaling.NodeLostTrigger=TRACE;org.apache.solr.cloud.Overseer=DEBUG;org.apache.solr.cloud.overseer=DEBUG") public class AutoAddReplicasIntegrationTest extends SolrCloudTestCase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - private static final String COLLECTION1 = "testSimple1"; - private static final String COLLECTION2 = "testSimple2"; + protected String getConfigSet() { return "cloud-minimal"; } - + @Before public void setupCluster() throws Exception { configureCluster(3) @@ -82,102 +84,267 @@ public class AutoAddReplicasIntegrationTest extends SolrCloudTestCase { } } + /** + * Test that basic autoAddReplicaLogic kicks in when a node is lost + */ @Test public void testSimple() throws Exception { - JettySolrRunner jetty1 = cluster.getJettySolrRunner(0); - JettySolrRunner jetty2 = cluster.getJettySolrRunner(1); - JettySolrRunner jetty3 = cluster.getJettySolrRunner(2); - CollectionAdminRequest.createCollection(COLLECTION1, "conf", 2, 2) - .setCreateNodeSet(jetty1.getNodeName()+","+jetty2.getNodeName()) - .setAutoAddReplicas(true) - .setMaxShardsPerNode(2) - .process(cluster.getSolrClient()); + final String COLLECTION = "test_simple"; + final ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader(); + final JettySolrRunner jetty1 = cluster.getJettySolrRunner(1); + final JettySolrRunner jetty2 = cluster.getJettySolrRunner(2); + log.info("Creating {} using jetty1:{}/{} and jetty2:{}/{}", COLLECTION, + jetty1.getNodeName(), jetty1.getLocalPort(), + jetty2.getNodeName(), jetty2.getLocalPort()); + + CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 2) + .setCreateNodeSet(jetty1.getNodeName()+","+jetty2.getNodeName()) + .setAutoAddReplicas(true) + .setMaxShardsPerNode(2) + .process(cluster.getSolrClient()); - cluster.waitForActiveCollection(COLLECTION1, 2, 4); + cluster.waitForActiveCollection(COLLECTION, 2, 4); - CollectionAdminRequest.createCollection(COLLECTION2, "conf", 2, 2) - .setCreateNodeSet(jetty2.getNodeName()+","+jetty3.getNodeName()) - .setAutoAddReplicas(false) - .setMaxShardsPerNode(2) - .process(cluster.getSolrClient()); - - cluster.waitForActiveCollection(COLLECTION2, 2, 4); - - // the number of cores in jetty1 (5) will be larger than jetty3 (1) - CollectionAdminRequest.createCollection("testSimple3", "conf", 3, 1) - .setCreateNodeSet(jetty1.getNodeName()) - .setAutoAddReplicas(false) - .setMaxShardsPerNode(3) - .process(cluster.getSolrClient()); - - cluster.waitForActiveCollection("testSimple3", 3, 3); - - ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader(); - // start the tests - JettySolrRunner lostJetty = random().nextBoolean() ? cluster.getJettySolrRunner(0) : cluster.getJettySolrRunner(1); + JettySolrRunner lostJetty = random().nextBoolean() ? jetty1 : jetty2; String lostNodeName = lostJetty.getNodeName(); - List replacedHdfsReplicas = getReplacedSharedFsReplicas(COLLECTION1, zkStateReader, lostNodeName); + List replacedHdfsReplicas = getReplacedSharedFsReplicas(COLLECTION, zkStateReader, lostNodeName); + log.info("Stopping random node: {} / {}", lostNodeName, lostJetty.getLocalPort()); + lostJetty.stop(); + + cluster.waitForJettyToStop(lostJetty); + waitForNodeLeave(lostNodeName); + + waitForState(COLLECTION + "=(2,4) w/o down replicas", + COLLECTION, clusterShapeNoDownReplicas(2,4), 90, TimeUnit.SECONDS); + + checkSharedFsReplicasMovedCorrectly(replacedHdfsReplicas, zkStateReader, COLLECTION); + + log.info("Re-starting (same) random node: {} / {}", lostNodeName, lostJetty.getLocalPort()); + lostJetty.start(); + + waitForNodeLive(lostJetty); + + assertTrue("Timeout waiting for all live and active", + ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 90000)); + + } + + /** + * Test that basic autoAddReplicaLogic logic is not used if the cluster prop for it is disabled + * (even if sys prop is set after collection is created) + */ + @Test + public void testClusterPropOverridesCollecitonProp() throws Exception { + final String COLLECTION = "test_clusterprop"; + final ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader(); + final JettySolrRunner jetty1 = cluster.getJettySolrRunner(1); + final JettySolrRunner jetty2 = cluster.getJettySolrRunner(2); + + log.info("Creating {} using jetty1:{}/{} and jetty2:{}/{}", COLLECTION, + jetty1.getNodeName(), jetty1.getLocalPort(), + jetty2.getNodeName(), jetty2.getLocalPort()); + + CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 2) + .setCreateNodeSet(jetty1.getNodeName()+","+jetty2.getNodeName()) + .setAutoAddReplicas(true) + .setMaxShardsPerNode(2) + .process(cluster.getSolrClient()); + + cluster.waitForActiveCollection(COLLECTION, 2, 4); + + // check cluster property is considered + disableAutoAddReplicasInCluster(); + + JettySolrRunner lostJetty = random().nextBoolean() ? jetty1 : jetty2; + String lostNodeName = lostJetty.getNodeName(); + List replacedHdfsReplicas = getReplacedSharedFsReplicas(COLLECTION, zkStateReader, lostNodeName); + + log.info("Stopping random node: {} / {}", lostNodeName, lostJetty.getLocalPort()); lostJetty.stop(); cluster.waitForJettyToStop(lostJetty); waitForNodeLeave(lostNodeName); - // ensure that 2 shards have 2 active replicas and only 4 replicas in total - // i.e. old replicas have been deleted. - // todo remove the condition for total replicas == 4 after SOLR-11591 is fixed - waitForState("Waiting for collection " + COLLECTION1, COLLECTION1, (liveNodes, collectionState) -> clusterShape(2, 4).matches(liveNodes, collectionState) - && collectionState.getReplicas().size() == 4, 90, TimeUnit.SECONDS); - checkSharedFsReplicasMovedCorrectly(replacedHdfsReplicas, zkStateReader, COLLECTION1); + waitForState(COLLECTION + "=(2,2)", COLLECTION, + clusterShape(2, 2), 90, TimeUnit.SECONDS); + + + log.info("Re-starting (same) random node: {} / {}", lostNodeName, lostJetty.getLocalPort()); lostJetty.start(); - cluster.waitForAllNodes(30); + waitForNodeLive(lostJetty); - assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), 90000)); + assertTrue("Timeout waiting for all live and active", + ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 90000)); + + waitForState(COLLECTION + "=(2,4) w/o down replicas", + COLLECTION, clusterShapeNoDownReplicas(2,4), 90, TimeUnit.SECONDS); - // check cluster property is considered - disableAutoAddReplicasInCluster(); - lostNodeName = jetty3.getNodeName(); - jetty3.stop(); - - cluster.waitForJettyToStop(jetty3); - - waitForNodeLeave(lostNodeName); - - waitForState("Waiting for collection " + COLLECTION1, COLLECTION1, clusterShape(2, 2)); - jetty3.start(); - waitForState("Waiting for collection " + COLLECTION1, COLLECTION1, clusterShape(2, 4)); - waitForState("Waiting for collection " + COLLECTION2, COLLECTION2, clusterShape(2, 4)); - enableAutoAddReplicasInCluster(); + } + /** + * Test that we can modify a collection after creation to add autoAddReplicas. + */ + @Test + public void testAddCollectionPropAfterCreation() throws Exception { + final String COLLECTION = "test_addprop"; + final ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader(); + final JettySolrRunner jetty1 = cluster.getJettySolrRunner(1); + final JettySolrRunner jetty2 = cluster.getJettySolrRunner(2); - // test for multiple collections + log.info("Creating {} using jetty1:{}/{} and jetty2:{}/{}", COLLECTION, + jetty1.getNodeName(), jetty1.getLocalPort(), + jetty2.getNodeName(), jetty2.getLocalPort()); + + CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 2) + .setCreateNodeSet(jetty1.getNodeName()+","+jetty2.getNodeName()) + .setAutoAddReplicas(false) // NOTE: false + .setMaxShardsPerNode(2) + .process(cluster.getSolrClient()); + + cluster.waitForActiveCollection(COLLECTION, 2, 4); + + log.info("Modifying {} to use autoAddReplicas", COLLECTION); new CollectionAdminRequest.AsyncCollectionAdminRequest(CollectionParams.CollectionAction.MODIFYCOLLECTION) { @Override public SolrParams getParams() { ModifiableSolrParams params = (ModifiableSolrParams) super.getParams(); - params.set("collection", COLLECTION2); + params.set("collection", COLLECTION); params.set("autoAddReplicas", true); return params; } }.process(cluster.getSolrClient()); - lostNodeName = jetty2.getNodeName(); - replacedHdfsReplicas = getReplacedSharedFsReplicas(COLLECTION2, zkStateReader, lostNodeName); + JettySolrRunner lostJetty = random().nextBoolean() ? jetty1 : jetty2; + String lostNodeName = lostJetty.getNodeName(); + List replacedHdfsReplicas = getReplacedSharedFsReplicas(COLLECTION, zkStateReader, lostNodeName); + + log.info("Stopping random node: {} / {}", lostNodeName, lostJetty.getLocalPort()); + lostJetty.stop(); - jetty2.stop(); - - cluster.waitForJettyToStop(jetty2); + cluster.waitForJettyToStop(lostJetty); waitForNodeLeave(lostNodeName); - waitForState("Waiting for collection " + COLLECTION1, COLLECTION1, clusterShape(2, 4), 45, TimeUnit.SECONDS); - waitForState("Waiting for collection " + COLLECTION2, COLLECTION2, clusterShape(2, 4), 45, TimeUnit.SECONDS); - checkSharedFsReplicasMovedCorrectly(replacedHdfsReplicas, zkStateReader, COLLECTION2); - // overseer failover test.. + waitForState(COLLECTION + "=(2,4) w/o down replicas", + COLLECTION, clusterShapeNoDownReplicas(2,4), 90, TimeUnit.SECONDS); + checkSharedFsReplicasMovedCorrectly(replacedHdfsReplicas, zkStateReader, COLLECTION); + + log.info("Re-starting (same) random node: {} / {}", lostNodeName, lostJetty.getLocalPort()); + lostJetty.start(); + + waitForNodeLive(lostJetty); + + assertTrue("Timeout waiting for all live and active", + ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 90000)); } + /** + * Test a specific sequence of problematic events: + *
    + *
  • create a collection with autoAddReplicas=false
  • + *
  • stop a nodeX in use by the collection
  • + *
  • re-start nodeX
  • + *
  • set autoAddReplicas=true
  • + *
  • re-stop nodeX
  • + *
+ */ + @Test + @AwaitsFix(bugUrl="https://issues.apache.org/jira/browse/SOLR-13811") + public void testRapidStopStartStopWithPropChange() throws Exception { + + // This is the collection we'll be focused on in our testing... + final String COLLECTION = "test_stoptwice"; + // This is a collection we'll use as a "marker" to ensure we "wait" for the + // autoAddReplicas logic (via NodeLostTrigger) to kick in at least once before proceeding... + final String ALT_COLLECTION = "test_dummy"; + + final ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader(); + final JettySolrRunner jetty1 = cluster.getJettySolrRunner(1); + final JettySolrRunner jetty2 = cluster.getJettySolrRunner(2); + + log.info("Creating {} using jetty1:{}/{} and jetty2:{}/{}", COLLECTION, + jetty1.getNodeName(), jetty1.getLocalPort(), + jetty2.getNodeName(), jetty2.getLocalPort()); + + CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 2) + .setCreateNodeSet(jetty1.getNodeName()+","+jetty2.getNodeName()) + .setAutoAddReplicas(false) // NOTE: false + .setMaxShardsPerNode(2) + .process(cluster.getSolrClient()); + + log.info("Creating {} using jetty1:{}/{} and jetty2:{}/{}", ALT_COLLECTION, + jetty1.getNodeName(), jetty1.getLocalPort(), + jetty2.getNodeName(), jetty2.getLocalPort()); + + CollectionAdminRequest.createCollection(ALT_COLLECTION, "conf", 2, 2) + .setCreateNodeSet(jetty1.getNodeName()+","+jetty2.getNodeName()) + .setAutoAddReplicas(true) // NOTE: true + .setMaxShardsPerNode(2) + .process(cluster.getSolrClient()); + + cluster.waitForActiveCollection(COLLECTION, 2, 4); + cluster.waitForActiveCollection(ALT_COLLECTION, 2, 4); + + JettySolrRunner lostJetty = random().nextBoolean() ? jetty1 : jetty2; + String lostNodeName = lostJetty.getNodeName(); + List replacedHdfsReplicas = getReplacedSharedFsReplicas(COLLECTION, zkStateReader, lostNodeName); + + log.info("Stopping random node: {} / {}", lostNodeName, lostJetty.getLocalPort()); + lostJetty.stop(); + + cluster.waitForJettyToStop(lostJetty); + waitForNodeLeave(lostNodeName); + + // ensure that our marker collection indicates that the autoAddReplicas logic + // has detected the down node and done some processing + waitForState(ALT_COLLECTION + "=(2,4) w/o down replicas", + ALT_COLLECTION, clusterShapeNoDownReplicas(2,4), 90, TimeUnit.SECONDS); + + waitForState(COLLECTION + "=(2,2)", COLLECTION, clusterShape(2, 2)); + + log.info("Re-starting (same) random node: {} / {}", lostNodeName, lostJetty.getLocalPort()); + lostJetty.start(); + // save time, don't bother waiting for lostJetty to start until after updating collection prop... + + log.info("Modifying {} to use autoAddReplicas", COLLECTION); + new CollectionAdminRequest.AsyncCollectionAdminRequest(CollectionParams.CollectionAction.MODIFYCOLLECTION) { + @Override + public SolrParams getParams() { + ModifiableSolrParams params = (ModifiableSolrParams) super.getParams(); + params.set("collection", COLLECTION); + params.set("autoAddReplicas", true); + return params; + } + }.process(cluster.getSolrClient()); + + // make sure lostJetty is fully up before stopping again... + waitForNodeLive(lostJetty); + + log.info("Re-Stopping (same) random node: {} / {}", lostNodeName, lostJetty.getLocalPort()); + lostJetty.stop(); + + cluster.waitForJettyToStop(lostJetty); + waitForNodeLeave(lostNodeName); + + // TODO: this is the problematic situation... + // wether or not NodeLostTrigger noticed that lostJetty was re-started and shutdown *again* + // and that the new auoAddReplicas=true since the last time lostJetty was shutdown is respected + waitForState(COLLECTION + "=(2,4) w/o down replicas", + COLLECTION, clusterShapeNoDownReplicas(2,4), 90, TimeUnit.SECONDS); + checkSharedFsReplicasMovedCorrectly(replacedHdfsReplicas, zkStateReader, COLLECTION); + + log.info("Re-Re-starting (same) random node: {} / {}", lostNodeName, lostJetty.getLocalPort()); + lostJetty.start(); + + waitForNodeLive(lostJetty); + + assertTrue("Timeout waiting for all live and active", + ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 90000)); + } + private void disableAutoAddReplicasInCluster() throws SolrServerException, IOException { Map m = makeMap( "action", CollectionParams.CollectionAction.CLUSTERPROP.toLower(), @@ -225,13 +392,44 @@ public class AutoAddReplicasIntegrationTest extends SolrCloudTestCase { return replacedHdfsReplicas; } - private void waitForNodeLeave(String lostNodeName) throws InterruptedException { + /** + * {@link MiniSolrCloudCluster#waitForNode} Doesn't check isRunning first, and we don't want to + * use {@link MiniSolrCloudCluster#waitForAllNodes} because we don't want to waste cycles checking + * nodes we aren't messing with + */ + private void waitForNodeLive(final JettySolrRunner jetty) + throws InterruptedException, TimeoutException, IOException { + log.info("waitForNodeLive: {}/{}", jetty.getNodeName(), jetty.getLocalPort()); + + TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME); + while(!timeout.hasTimedOut()) { + if (jetty.isRunning()) { + break; + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // ignore + } + } + if (timeout.hasTimedOut()) { + throw new TimeoutException("Waiting for Jetty to stop timed out"); + } + cluster.waitForNode(jetty, 30); + } + + private void waitForNodeLeave(String lostNodeName) throws InterruptedException, TimeoutException { log.info("waitForNodeLeave: {}", lostNodeName); ZkStateReader reader = cluster.getSolrClient().getZkStateReader(); - TimeOut timeOut = new TimeOut(20, TimeUnit.SECONDS, TimeSource.NANO_TIME); - while (reader.getClusterState().getLiveNodes().contains(lostNodeName)) { - Thread.sleep(100); - if (timeOut.hasTimedOut()) fail("Wait for " + lostNodeName + " to leave failed!"); - } + reader.waitForLiveNodes(30, TimeUnit.SECONDS, (o, n) -> !n.contains(lostNodeName)); } + + + private static CollectionStatePredicate clusterShapeNoDownReplicas(final int expectedShards, + final int expectedReplicas) { + return (liveNodes, collectionState) + -> (clusterShape(expectedShards, expectedReplicas).matches(liveNodes, collectionState) + && collectionState.getReplicas().size() == expectedReplicas); + } + }