diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java index 48533459b17..e6e5b7987f9 100644 --- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java @@ -22,26 +22,39 @@ import java.net.ServerSocket; import java.net.URI; import java.net.URL; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.lucene.util.LuceneTestCase.AwaitsFix; import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.solr.JSONTestUtil; import org.apache.solr.SolrTestCaseJ4.SuppressSSL; +import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.impl.HttpSolrServer; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.UpdateResponse; +import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.StrUtils; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.SolrCore; +import org.apache.solr.servlet.SolrDispatchFilter; import org.junit.After; import org.junit.Before; import org.slf4j.Logger; @@ -53,7 +66,7 @@ import org.slf4j.LoggerFactory; */ @Slow @SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776") -@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-6241") +//@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-6241") public class HttpPartitionTest extends AbstractFullDistribZkTestBase { private static final transient Logger log = @@ -140,7 +153,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase { @Override public void doTest() throws Exception { waitForThingsToLevelOut(30000); - + // test a 1x2 collection testRf2(); @@ -149,7 +162,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase { testRf3(); // kill a leader and make sure recovery occurs as expected - testRf3WithLeaderFailover(); + testRf3WithLeaderFailover(); } protected void testRf2() throws Exception { @@ -349,7 +362,10 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase { // indexing during a partition // doc should be on leader and 1 replica sendDoc(5); - + + assertDocExists(getHttpSolrServer(leader, testCollectionName), testCollectionName, "5"); + assertDocExists(getHttpSolrServer(notLeaders.get(1), testCollectionName), testCollectionName, "5"); + Thread.sleep(sleepMsBeforeHealPartition); String shouldNotBeNewLeaderNode = notLeaders.get(0).getNodeName(); @@ -391,16 +407,20 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase { Thread.sleep(1000); } - List activeReps = getActiveOrRecoveringReplicas(testCollectionName, "shard1"); + List participatingReplicas = getActiveOrRecoveringReplicas(testCollectionName, "shard1"); assertTrue("Expected 2 of 3 replicas to be active but only found "+ - activeReps.size()+"; "+activeReps+"; clusterState: "+printClusterStateInfo(), - activeReps.size() == 2); + participatingReplicas.size()+"; "+participatingReplicas+"; clusterState: "+printClusterStateInfo(), + participatingReplicas.size() == 2); sendDoc(6); - assertDocsExistInAllReplicas(activeReps, testCollectionName, 1, 6); + Set replicasToCheck = new HashSet<>(); + for (Replica stillUp : participatingReplicas) + replicasToCheck.add(stillUp.getName()); + waitToSeeReplicasActive(testCollectionName, "shard1", replicasToCheck, 20); + assertDocsExistInAllReplicas(participatingReplicas, testCollectionName, 1, 6); } - + protected List getActiveOrRecoveringReplicas(String testCollectionName, String shardId) throws Exception { Map activeReplicas = new HashMap(); ZkStateReader zkr = cloudClient.getZkStateReader(); @@ -472,26 +492,30 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase { } protected void sendDoc(int docId) throws Exception { + UpdateRequest up = new UpdateRequest(); + up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(2)); SolrInputDocument doc = new SolrInputDocument(); doc.addField(id, String.valueOf(docId)); doc.addField("a_t", "hello" + docId); - cloudClient.add(doc); + up.add(doc); + int minAchievedRf = + cloudClient.getMinAchievedReplicationFactor(cloudClient.getDefaultCollection(), cloudClient.request(up)); } /** * Query the real-time get handler for a specific doc by ID to verify it - * exists in the provided server. + * exists in the provided server, using distrib=false so it doesn't route to another replica. */ @SuppressWarnings("rawtypes") protected void assertDocExists(HttpSolrServer solr, String coll, String docId) throws Exception { - QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId)); + QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId, "distrib", "false")); NamedList rsp = solr.request(qr); - String match = + String match = JSONTestUtil.matchObj("/id", rsp.get("doc"), new Integer(docId)); assertTrue("Doc with id=" + docId + " not found in " + solr.getBaseURL() - + " due to: " + match, match == null); + + " due to: " + match + "; rsp="+rsp, match == null); } - + protected JettySolrRunner getJettyOnPort(int port) { JettySolrRunner theJetty = null; for (JettySolrRunner jetty : jettys) { @@ -519,5 +543,55 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase { if (tmp.indexOf('_') != -1) tmp = tmp.substring(0,tmp.indexOf('_')); return Integer.parseInt(tmp); - } + } + + protected void waitToSeeReplicasActive(String testCollectionName, String shardId, Set replicasToCheck, int maxWaitSecs) throws Exception { + long startMs = System.currentTimeMillis(); + + ZkStateReader zkr = cloudClient.getZkStateReader(); + zkr.updateClusterState(true); // force the state to be fresh + + ClusterState cs = zkr.getClusterState(); + Collection slices = cs.getActiveSlices(testCollectionName); + boolean allReplicasUp = false; + long waitMs = 0L; + long maxWaitMs = maxWaitSecs * 1000L; + while (waitMs < maxWaitMs && !allReplicasUp) { + // refresh state every 2 secs + if (waitMs % 2000 == 0) + cloudClient.getZkStateReader().updateClusterState(true); + + cs = cloudClient.getZkStateReader().getClusterState(); + assertNotNull(cs); + Slice shard = cs.getSlice(testCollectionName, shardId); + assertNotNull("No Slice for "+shardId, shard); + allReplicasUp = true; // assume true + + // wait to see all replicas are "active" + for (Replica replica : shard.getReplicas()) { + if (!replicasToCheck.contains(replica.getName())) + continue; + + String replicaState = replica.getStr(ZkStateReader.STATE_PROP); + if (!ZkStateReader.ACTIVE.equals(replicaState)) { + log.info("Replica " + replica.getName() + " is currently " + replicaState); + allReplicasUp = false; + } + } + + if (!allReplicasUp) { + try { + Thread.sleep(1000L); + } catch (Exception ignoreMe) {} + waitMs += 1000L; + } + } // end while + + if (!allReplicasUp) + fail("Didn't see replicas "+ replicasToCheck + + " come up within " + maxWaitMs + " ms! ClusterState: " + printClusterStateInfo()); + + long diffMs = (System.currentTimeMillis() - startMs); + log.info("Took " + diffMs + " ms to see replicas ["+replicasToCheck+"] become active."); + } }