mirror of https://github.com/apache/lucene.git
SOLR-13811: Refactor AutoAddReplicasIntegrationTest to isolate problematic situation into an AwaitsFix test method
This commit is contained in:
parent
302cd09b4c
commit
a57ec148e5
|
@ -24,6 +24,7 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
|
@ -31,7 +32,9 @@ import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
|||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.request.V2Request;
|
||||
import org.apache.solr.cloud.MiniSolrCloudCluster;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.cloud.CollectionStatePredicate;
|
||||
import org.apache.solr.common.cloud.ClusterStateUtil;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
|
@ -49,11 +52,10 @@ import org.junit.Test;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@org.apache.solr.util.LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.cloud.autoscaling.NodeLostTrigger=TRACE;org.apache.solr.cloud.Overseer=DEBUG;org.apache.solr.cloud.overseer=DEBUG")
|
||||
public class AutoAddReplicasIntegrationTest extends SolrCloudTestCase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private static final String COLLECTION1 = "testSimple1";
|
||||
private static final String COLLECTION2 = "testSimple2";
|
||||
|
||||
protected String getConfigSet() {
|
||||
return "cloud-minimal";
|
||||
|
@ -82,100 +84,265 @@ public class AutoAddReplicasIntegrationTest extends SolrCloudTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that basic autoAddReplicaLogic kicks in when a node is lost
|
||||
*/
|
||||
@Test
|
||||
public void testSimple() throws Exception {
|
||||
JettySolrRunner jetty1 = cluster.getJettySolrRunner(0);
|
||||
JettySolrRunner jetty2 = cluster.getJettySolrRunner(1);
|
||||
JettySolrRunner jetty3 = cluster.getJettySolrRunner(2);
|
||||
CollectionAdminRequest.createCollection(COLLECTION1, "conf", 2, 2)
|
||||
.setCreateNodeSet(jetty1.getNodeName()+","+jetty2.getNodeName())
|
||||
.setAutoAddReplicas(true)
|
||||
.setMaxShardsPerNode(2)
|
||||
.process(cluster.getSolrClient());
|
||||
final String COLLECTION = "test_simple";
|
||||
final ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
|
||||
final JettySolrRunner jetty1 = cluster.getJettySolrRunner(1);
|
||||
final JettySolrRunner jetty2 = cluster.getJettySolrRunner(2);
|
||||
log.info("Creating {} using jetty1:{}/{} and jetty2:{}/{}", COLLECTION,
|
||||
jetty1.getNodeName(), jetty1.getLocalPort(),
|
||||
jetty2.getNodeName(), jetty2.getLocalPort());
|
||||
|
||||
cluster.waitForActiveCollection(COLLECTION1, 2, 4);
|
||||
CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 2)
|
||||
.setCreateNodeSet(jetty1.getNodeName()+","+jetty2.getNodeName())
|
||||
.setAutoAddReplicas(true)
|
||||
.setMaxShardsPerNode(2)
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
CollectionAdminRequest.createCollection(COLLECTION2, "conf", 2, 2)
|
||||
.setCreateNodeSet(jetty2.getNodeName()+","+jetty3.getNodeName())
|
||||
.setAutoAddReplicas(false)
|
||||
.setMaxShardsPerNode(2)
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
cluster.waitForActiveCollection(COLLECTION2, 2, 4);
|
||||
|
||||
// the number of cores in jetty1 (5) will be larger than jetty3 (1)
|
||||
CollectionAdminRequest.createCollection("testSimple3", "conf", 3, 1)
|
||||
.setCreateNodeSet(jetty1.getNodeName())
|
||||
.setAutoAddReplicas(false)
|
||||
.setMaxShardsPerNode(3)
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
cluster.waitForActiveCollection("testSimple3", 3, 3);
|
||||
|
||||
ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
|
||||
cluster.waitForActiveCollection(COLLECTION, 2, 4);
|
||||
|
||||
// start the tests
|
||||
JettySolrRunner lostJetty = random().nextBoolean() ? cluster.getJettySolrRunner(0) : cluster.getJettySolrRunner(1);
|
||||
JettySolrRunner lostJetty = random().nextBoolean() ? jetty1 : jetty2;
|
||||
String lostNodeName = lostJetty.getNodeName();
|
||||
List<Replica> replacedHdfsReplicas = getReplacedSharedFsReplicas(COLLECTION1, zkStateReader, lostNodeName);
|
||||
List<Replica> replacedHdfsReplicas = getReplacedSharedFsReplicas(COLLECTION, zkStateReader, lostNodeName);
|
||||
log.info("Stopping random node: {} / {}", lostNodeName, lostJetty.getLocalPort());
|
||||
lostJetty.stop();
|
||||
|
||||
cluster.waitForJettyToStop(lostJetty);
|
||||
waitForNodeLeave(lostNodeName);
|
||||
|
||||
waitForState(COLLECTION + "=(2,4) w/o down replicas",
|
||||
COLLECTION, clusterShapeNoDownReplicas(2,4), 90, TimeUnit.SECONDS);
|
||||
|
||||
checkSharedFsReplicasMovedCorrectly(replacedHdfsReplicas, zkStateReader, COLLECTION);
|
||||
|
||||
log.info("Re-starting (same) random node: {} / {}", lostNodeName, lostJetty.getLocalPort());
|
||||
lostJetty.start();
|
||||
|
||||
waitForNodeLive(lostJetty);
|
||||
|
||||
assertTrue("Timeout waiting for all live and active",
|
||||
ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 90000));
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that basic autoAddReplicaLogic logic is <b>not</b> used if the cluster prop for it is disabled
|
||||
* (even if sys prop is set after collection is created)
|
||||
*/
|
||||
@Test
|
||||
public void testClusterPropOverridesCollecitonProp() throws Exception {
|
||||
final String COLLECTION = "test_clusterprop";
|
||||
final ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
|
||||
final JettySolrRunner jetty1 = cluster.getJettySolrRunner(1);
|
||||
final JettySolrRunner jetty2 = cluster.getJettySolrRunner(2);
|
||||
|
||||
log.info("Creating {} using jetty1:{}/{} and jetty2:{}/{}", COLLECTION,
|
||||
jetty1.getNodeName(), jetty1.getLocalPort(),
|
||||
jetty2.getNodeName(), jetty2.getLocalPort());
|
||||
|
||||
CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 2)
|
||||
.setCreateNodeSet(jetty1.getNodeName()+","+jetty2.getNodeName())
|
||||
.setAutoAddReplicas(true)
|
||||
.setMaxShardsPerNode(2)
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
cluster.waitForActiveCollection(COLLECTION, 2, 4);
|
||||
|
||||
// check cluster property is considered
|
||||
disableAutoAddReplicasInCluster();
|
||||
|
||||
JettySolrRunner lostJetty = random().nextBoolean() ? jetty1 : jetty2;
|
||||
String lostNodeName = lostJetty.getNodeName();
|
||||
List<Replica> replacedHdfsReplicas = getReplacedSharedFsReplicas(COLLECTION, zkStateReader, lostNodeName);
|
||||
|
||||
log.info("Stopping random node: {} / {}", lostNodeName, lostJetty.getLocalPort());
|
||||
lostJetty.stop();
|
||||
|
||||
cluster.waitForJettyToStop(lostJetty);
|
||||
|
||||
waitForNodeLeave(lostNodeName);
|
||||
|
||||
// ensure that 2 shards have 2 active replicas and only 4 replicas in total
|
||||
// i.e. old replicas have been deleted.
|
||||
// todo remove the condition for total replicas == 4 after SOLR-11591 is fixed
|
||||
waitForState("Waiting for collection " + COLLECTION1, COLLECTION1, (liveNodes, collectionState) -> clusterShape(2, 4).matches(liveNodes, collectionState)
|
||||
&& collectionState.getReplicas().size() == 4, 90, TimeUnit.SECONDS);
|
||||
checkSharedFsReplicasMovedCorrectly(replacedHdfsReplicas, zkStateReader, COLLECTION1);
|
||||
waitForState(COLLECTION + "=(2,2)", COLLECTION,
|
||||
clusterShape(2, 2), 90, TimeUnit.SECONDS);
|
||||
|
||||
|
||||
log.info("Re-starting (same) random node: {} / {}", lostNodeName, lostJetty.getLocalPort());
|
||||
lostJetty.start();
|
||||
|
||||
cluster.waitForAllNodes(30);
|
||||
waitForNodeLive(lostJetty);
|
||||
|
||||
assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), 90000));
|
||||
assertTrue("Timeout waiting for all live and active",
|
||||
ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 90000));
|
||||
|
||||
// check cluster property is considered
|
||||
disableAutoAddReplicasInCluster();
|
||||
lostNodeName = jetty3.getNodeName();
|
||||
jetty3.stop();
|
||||
waitForState(COLLECTION + "=(2,4) w/o down replicas",
|
||||
COLLECTION, clusterShapeNoDownReplicas(2,4), 90, TimeUnit.SECONDS);
|
||||
|
||||
cluster.waitForJettyToStop(jetty3);
|
||||
}
|
||||
|
||||
waitForNodeLeave(lostNodeName);
|
||||
/**
|
||||
* Test that we can modify a collection after creation to add autoAddReplicas.
|
||||
*/
|
||||
@Test
|
||||
public void testAddCollectionPropAfterCreation() throws Exception {
|
||||
final String COLLECTION = "test_addprop";
|
||||
final ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
|
||||
final JettySolrRunner jetty1 = cluster.getJettySolrRunner(1);
|
||||
final JettySolrRunner jetty2 = cluster.getJettySolrRunner(2);
|
||||
|
||||
waitForState("Waiting for collection " + COLLECTION1, COLLECTION1, clusterShape(2, 2));
|
||||
jetty3.start();
|
||||
waitForState("Waiting for collection " + COLLECTION1, COLLECTION1, clusterShape(2, 4));
|
||||
waitForState("Waiting for collection " + COLLECTION2, COLLECTION2, clusterShape(2, 4));
|
||||
enableAutoAddReplicasInCluster();
|
||||
log.info("Creating {} using jetty1:{}/{} and jetty2:{}/{}", COLLECTION,
|
||||
jetty1.getNodeName(), jetty1.getLocalPort(),
|
||||
jetty2.getNodeName(), jetty2.getLocalPort());
|
||||
|
||||
CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 2)
|
||||
.setCreateNodeSet(jetty1.getNodeName()+","+jetty2.getNodeName())
|
||||
.setAutoAddReplicas(false) // NOTE: false
|
||||
.setMaxShardsPerNode(2)
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
// test for multiple collections
|
||||
cluster.waitForActiveCollection(COLLECTION, 2, 4);
|
||||
|
||||
log.info("Modifying {} to use autoAddReplicas", COLLECTION);
|
||||
new CollectionAdminRequest.AsyncCollectionAdminRequest(CollectionParams.CollectionAction.MODIFYCOLLECTION) {
|
||||
@Override
|
||||
public SolrParams getParams() {
|
||||
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
|
||||
params.set("collection", COLLECTION2);
|
||||
params.set("collection", COLLECTION);
|
||||
params.set("autoAddReplicas", true);
|
||||
return params;
|
||||
}
|
||||
}.process(cluster.getSolrClient());
|
||||
|
||||
lostNodeName = jetty2.getNodeName();
|
||||
replacedHdfsReplicas = getReplacedSharedFsReplicas(COLLECTION2, zkStateReader, lostNodeName);
|
||||
JettySolrRunner lostJetty = random().nextBoolean() ? jetty1 : jetty2;
|
||||
String lostNodeName = lostJetty.getNodeName();
|
||||
List<Replica> replacedHdfsReplicas = getReplacedSharedFsReplicas(COLLECTION, zkStateReader, lostNodeName);
|
||||
|
||||
jetty2.stop();
|
||||
log.info("Stopping random node: {} / {}", lostNodeName, lostJetty.getLocalPort());
|
||||
lostJetty.stop();
|
||||
|
||||
cluster.waitForJettyToStop(jetty2);
|
||||
cluster.waitForJettyToStop(lostJetty);
|
||||
|
||||
waitForNodeLeave(lostNodeName);
|
||||
waitForState("Waiting for collection " + COLLECTION1, COLLECTION1, clusterShape(2, 4), 45, TimeUnit.SECONDS);
|
||||
waitForState("Waiting for collection " + COLLECTION2, COLLECTION2, clusterShape(2, 4), 45, TimeUnit.SECONDS);
|
||||
checkSharedFsReplicasMovedCorrectly(replacedHdfsReplicas, zkStateReader, COLLECTION2);
|
||||
|
||||
// overseer failover test..
|
||||
waitForState(COLLECTION + "=(2,4) w/o down replicas",
|
||||
COLLECTION, clusterShapeNoDownReplicas(2,4), 90, TimeUnit.SECONDS);
|
||||
checkSharedFsReplicasMovedCorrectly(replacedHdfsReplicas, zkStateReader, COLLECTION);
|
||||
|
||||
log.info("Re-starting (same) random node: {} / {}", lostNodeName, lostJetty.getLocalPort());
|
||||
lostJetty.start();
|
||||
|
||||
waitForNodeLive(lostJetty);
|
||||
|
||||
assertTrue("Timeout waiting for all live and active",
|
||||
ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 90000));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a specific sequence of problematic events:
|
||||
* <ul>
|
||||
* <li>create a collection with autoAddReplicas=<b>false</b></li>
|
||||
* <li>stop a nodeX in use by the collection</li>
|
||||
* <li>re-start nodeX</li>
|
||||
* <li>set autoAddReplicas=<b>true</b></li>
|
||||
* <li>re-stop nodeX</li>
|
||||
* </ul>
|
||||
*/
|
||||
@Test
|
||||
@AwaitsFix(bugUrl="https://issues.apache.org/jira/browse/SOLR-13811")
|
||||
public void testRapidStopStartStopWithPropChange() throws Exception {
|
||||
|
||||
// This is the collection we'll be focused on in our testing...
|
||||
final String COLLECTION = "test_stoptwice";
|
||||
// This is a collection we'll use as a "marker" to ensure we "wait" for the
|
||||
// autoAddReplicas logic (via NodeLostTrigger) to kick in at least once before proceeding...
|
||||
final String ALT_COLLECTION = "test_dummy";
|
||||
|
||||
final ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
|
||||
final JettySolrRunner jetty1 = cluster.getJettySolrRunner(1);
|
||||
final JettySolrRunner jetty2 = cluster.getJettySolrRunner(2);
|
||||
|
||||
log.info("Creating {} using jetty1:{}/{} and jetty2:{}/{}", COLLECTION,
|
||||
jetty1.getNodeName(), jetty1.getLocalPort(),
|
||||
jetty2.getNodeName(), jetty2.getLocalPort());
|
||||
|
||||
CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 2)
|
||||
.setCreateNodeSet(jetty1.getNodeName()+","+jetty2.getNodeName())
|
||||
.setAutoAddReplicas(false) // NOTE: false
|
||||
.setMaxShardsPerNode(2)
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
log.info("Creating {} using jetty1:{}/{} and jetty2:{}/{}", ALT_COLLECTION,
|
||||
jetty1.getNodeName(), jetty1.getLocalPort(),
|
||||
jetty2.getNodeName(), jetty2.getLocalPort());
|
||||
|
||||
CollectionAdminRequest.createCollection(ALT_COLLECTION, "conf", 2, 2)
|
||||
.setCreateNodeSet(jetty1.getNodeName()+","+jetty2.getNodeName())
|
||||
.setAutoAddReplicas(true) // NOTE: true
|
||||
.setMaxShardsPerNode(2)
|
||||
.process(cluster.getSolrClient());
|
||||
|
||||
cluster.waitForActiveCollection(COLLECTION, 2, 4);
|
||||
cluster.waitForActiveCollection(ALT_COLLECTION, 2, 4);
|
||||
|
||||
JettySolrRunner lostJetty = random().nextBoolean() ? jetty1 : jetty2;
|
||||
String lostNodeName = lostJetty.getNodeName();
|
||||
List<Replica> replacedHdfsReplicas = getReplacedSharedFsReplicas(COLLECTION, zkStateReader, lostNodeName);
|
||||
|
||||
log.info("Stopping random node: {} / {}", lostNodeName, lostJetty.getLocalPort());
|
||||
lostJetty.stop();
|
||||
|
||||
cluster.waitForJettyToStop(lostJetty);
|
||||
waitForNodeLeave(lostNodeName);
|
||||
|
||||
// ensure that our marker collection indicates that the autoAddReplicas logic
|
||||
// has detected the down node and done some processing
|
||||
waitForState(ALT_COLLECTION + "=(2,4) w/o down replicas",
|
||||
ALT_COLLECTION, clusterShapeNoDownReplicas(2,4), 90, TimeUnit.SECONDS);
|
||||
|
||||
waitForState(COLLECTION + "=(2,2)", COLLECTION, clusterShape(2, 2));
|
||||
|
||||
log.info("Re-starting (same) random node: {} / {}", lostNodeName, lostJetty.getLocalPort());
|
||||
lostJetty.start();
|
||||
// save time, don't bother waiting for lostJetty to start until after updating collection prop...
|
||||
|
||||
log.info("Modifying {} to use autoAddReplicas", COLLECTION);
|
||||
new CollectionAdminRequest.AsyncCollectionAdminRequest(CollectionParams.CollectionAction.MODIFYCOLLECTION) {
|
||||
@Override
|
||||
public SolrParams getParams() {
|
||||
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
|
||||
params.set("collection", COLLECTION);
|
||||
params.set("autoAddReplicas", true);
|
||||
return params;
|
||||
}
|
||||
}.process(cluster.getSolrClient());
|
||||
|
||||
// make sure lostJetty is fully up before stopping again...
|
||||
waitForNodeLive(lostJetty);
|
||||
|
||||
log.info("Re-Stopping (same) random node: {} / {}", lostNodeName, lostJetty.getLocalPort());
|
||||
lostJetty.stop();
|
||||
|
||||
cluster.waitForJettyToStop(lostJetty);
|
||||
waitForNodeLeave(lostNodeName);
|
||||
|
||||
// TODO: this is the problematic situation...
|
||||
// wether or not NodeLostTrigger noticed that lostJetty was re-started and shutdown *again*
|
||||
// and that the new auoAddReplicas=true since the last time lostJetty was shutdown is respected
|
||||
waitForState(COLLECTION + "=(2,4) w/o down replicas",
|
||||
COLLECTION, clusterShapeNoDownReplicas(2,4), 90, TimeUnit.SECONDS);
|
||||
checkSharedFsReplicasMovedCorrectly(replacedHdfsReplicas, zkStateReader, COLLECTION);
|
||||
|
||||
log.info("Re-Re-starting (same) random node: {} / {}", lostNodeName, lostJetty.getLocalPort());
|
||||
lostJetty.start();
|
||||
|
||||
waitForNodeLive(lostJetty);
|
||||
|
||||
assertTrue("Timeout waiting for all live and active",
|
||||
ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 90000));
|
||||
}
|
||||
|
||||
private void disableAutoAddReplicasInCluster() throws SolrServerException, IOException {
|
||||
|
@ -225,13 +392,44 @@ public class AutoAddReplicasIntegrationTest extends SolrCloudTestCase {
|
|||
return replacedHdfsReplicas;
|
||||
}
|
||||
|
||||
private void waitForNodeLeave(String lostNodeName) throws InterruptedException {
|
||||
/**
|
||||
* {@link MiniSolrCloudCluster#waitForNode} Doesn't check isRunning first, and we don't want to
|
||||
* use {@link MiniSolrCloudCluster#waitForAllNodes} because we don't want to waste cycles checking
|
||||
* nodes we aren't messing with
|
||||
*/
|
||||
private void waitForNodeLive(final JettySolrRunner jetty)
|
||||
throws InterruptedException, TimeoutException, IOException {
|
||||
log.info("waitForNodeLive: {}/{}", jetty.getNodeName(), jetty.getLocalPort());
|
||||
|
||||
TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
|
||||
while(!timeout.hasTimedOut()) {
|
||||
if (jetty.isRunning()) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
if (timeout.hasTimedOut()) {
|
||||
throw new TimeoutException("Waiting for Jetty to stop timed out");
|
||||
}
|
||||
cluster.waitForNode(jetty, 30);
|
||||
}
|
||||
|
||||
private void waitForNodeLeave(String lostNodeName) throws InterruptedException, TimeoutException {
|
||||
log.info("waitForNodeLeave: {}", lostNodeName);
|
||||
ZkStateReader reader = cluster.getSolrClient().getZkStateReader();
|
||||
TimeOut timeOut = new TimeOut(20, TimeUnit.SECONDS, TimeSource.NANO_TIME);
|
||||
while (reader.getClusterState().getLiveNodes().contains(lostNodeName)) {
|
||||
Thread.sleep(100);
|
||||
if (timeOut.hasTimedOut()) fail("Wait for " + lostNodeName + " to leave failed!");
|
||||
}
|
||||
reader.waitForLiveNodes(30, TimeUnit.SECONDS, (o, n) -> !n.contains(lostNodeName));
|
||||
}
|
||||
|
||||
|
||||
private static CollectionStatePredicate clusterShapeNoDownReplicas(final int expectedShards,
|
||||
final int expectedReplicas) {
|
||||
return (liveNodes, collectionState)
|
||||
-> (clusterShape(expectedShards, expectedReplicas).matches(liveNodes, collectionState)
|
||||
&& collectionState.getReplicas().size() == expectedReplicas);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue