mirror of https://github.com/apache/lucene.git
SOLR-6444: Use distrib=false to ensure real-time get request only hits the replica we're testing for proper recovery.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1621181 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ac4f5bbabe
commit
f20a9e70bb
|
@ -22,26 +22,39 @@ import java.net.ServerSocket;
|
|||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.JSONTestUtil;
|
||||
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrServer;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.client.solrj.response.UpdateResponse;
|
||||
import org.apache.solr.common.SolrDocumentList;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.servlet.SolrDispatchFilter;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -53,7 +66,7 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
@Slow
|
||||
@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
|
||||
@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-6241")
|
||||
//@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-6241")
|
||||
public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
|
||||
|
||||
private static final transient Logger log =
|
||||
|
@ -140,7 +153,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
|
|||
@Override
|
||||
public void doTest() throws Exception {
|
||||
waitForThingsToLevelOut(30000);
|
||||
|
||||
|
||||
// test a 1x2 collection
|
||||
testRf2();
|
||||
|
||||
|
@ -149,7 +162,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
|
|||
testRf3();
|
||||
|
||||
// kill a leader and make sure recovery occurs as expected
|
||||
testRf3WithLeaderFailover();
|
||||
testRf3WithLeaderFailover();
|
||||
}
|
||||
|
||||
protected void testRf2() throws Exception {
|
||||
|
@ -349,7 +362,10 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
|
|||
// indexing during a partition
|
||||
// doc should be on leader and 1 replica
|
||||
sendDoc(5);
|
||||
|
||||
|
||||
assertDocExists(getHttpSolrServer(leader, testCollectionName), testCollectionName, "5");
|
||||
assertDocExists(getHttpSolrServer(notLeaders.get(1), testCollectionName), testCollectionName, "5");
|
||||
|
||||
Thread.sleep(sleepMsBeforeHealPartition);
|
||||
|
||||
String shouldNotBeNewLeaderNode = notLeaders.get(0).getNodeName();
|
||||
|
@ -391,16 +407,20 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
|
|||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
List<Replica> activeReps = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
|
||||
List<Replica> participatingReplicas = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
|
||||
assertTrue("Expected 2 of 3 replicas to be active but only found "+
|
||||
activeReps.size()+"; "+activeReps+"; clusterState: "+printClusterStateInfo(),
|
||||
activeReps.size() == 2);
|
||||
participatingReplicas.size()+"; "+participatingReplicas+"; clusterState: "+printClusterStateInfo(),
|
||||
participatingReplicas.size() == 2);
|
||||
|
||||
sendDoc(6);
|
||||
|
||||
assertDocsExistInAllReplicas(activeReps, testCollectionName, 1, 6);
|
||||
Set<String> replicasToCheck = new HashSet<>();
|
||||
for (Replica stillUp : participatingReplicas)
|
||||
replicasToCheck.add(stillUp.getName());
|
||||
waitToSeeReplicasActive(testCollectionName, "shard1", replicasToCheck, 20);
|
||||
assertDocsExistInAllReplicas(participatingReplicas, testCollectionName, 1, 6);
|
||||
}
|
||||
|
||||
|
||||
protected List<Replica> getActiveOrRecoveringReplicas(String testCollectionName, String shardId) throws Exception {
|
||||
Map<String,Replica> activeReplicas = new HashMap<String,Replica>();
|
||||
ZkStateReader zkr = cloudClient.getZkStateReader();
|
||||
|
@ -472,26 +492,30 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
|
|||
}
|
||||
|
||||
protected void sendDoc(int docId) throws Exception {
|
||||
UpdateRequest up = new UpdateRequest();
|
||||
up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(2));
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
doc.addField(id, String.valueOf(docId));
|
||||
doc.addField("a_t", "hello" + docId);
|
||||
cloudClient.add(doc);
|
||||
up.add(doc);
|
||||
int minAchievedRf =
|
||||
cloudClient.getMinAchievedReplicationFactor(cloudClient.getDefaultCollection(), cloudClient.request(up));
|
||||
}
|
||||
|
||||
/**
|
||||
* Query the real-time get handler for a specific doc by ID to verify it
|
||||
* exists in the provided server.
|
||||
* exists in the provided server, using distrib=false so it doesn't route to another replica.
|
||||
*/
|
||||
@SuppressWarnings("rawtypes")
|
||||
protected void assertDocExists(HttpSolrServer solr, String coll, String docId) throws Exception {
|
||||
QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId));
|
||||
QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId, "distrib", "false"));
|
||||
NamedList rsp = solr.request(qr);
|
||||
String match =
|
||||
String match =
|
||||
JSONTestUtil.matchObj("/id", rsp.get("doc"), new Integer(docId));
|
||||
assertTrue("Doc with id=" + docId + " not found in " + solr.getBaseURL()
|
||||
+ " due to: " + match, match == null);
|
||||
+ " due to: " + match + "; rsp="+rsp, match == null);
|
||||
}
|
||||
|
||||
|
||||
protected JettySolrRunner getJettyOnPort(int port) {
|
||||
JettySolrRunner theJetty = null;
|
||||
for (JettySolrRunner jetty : jettys) {
|
||||
|
@ -519,5 +543,55 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
|
|||
if (tmp.indexOf('_') != -1)
|
||||
tmp = tmp.substring(0,tmp.indexOf('_'));
|
||||
return Integer.parseInt(tmp);
|
||||
}
|
||||
}
|
||||
|
||||
protected void waitToSeeReplicasActive(String testCollectionName, String shardId, Set<String> replicasToCheck, int maxWaitSecs) throws Exception {
|
||||
long startMs = System.currentTimeMillis();
|
||||
|
||||
ZkStateReader zkr = cloudClient.getZkStateReader();
|
||||
zkr.updateClusterState(true); // force the state to be fresh
|
||||
|
||||
ClusterState cs = zkr.getClusterState();
|
||||
Collection<Slice> slices = cs.getActiveSlices(testCollectionName);
|
||||
boolean allReplicasUp = false;
|
||||
long waitMs = 0L;
|
||||
long maxWaitMs = maxWaitSecs * 1000L;
|
||||
while (waitMs < maxWaitMs && !allReplicasUp) {
|
||||
// refresh state every 2 secs
|
||||
if (waitMs % 2000 == 0)
|
||||
cloudClient.getZkStateReader().updateClusterState(true);
|
||||
|
||||
cs = cloudClient.getZkStateReader().getClusterState();
|
||||
assertNotNull(cs);
|
||||
Slice shard = cs.getSlice(testCollectionName, shardId);
|
||||
assertNotNull("No Slice for "+shardId, shard);
|
||||
allReplicasUp = true; // assume true
|
||||
|
||||
// wait to see all replicas are "active"
|
||||
for (Replica replica : shard.getReplicas()) {
|
||||
if (!replicasToCheck.contains(replica.getName()))
|
||||
continue;
|
||||
|
||||
String replicaState = replica.getStr(ZkStateReader.STATE_PROP);
|
||||
if (!ZkStateReader.ACTIVE.equals(replicaState)) {
|
||||
log.info("Replica " + replica.getName() + " is currently " + replicaState);
|
||||
allReplicasUp = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (!allReplicasUp) {
|
||||
try {
|
||||
Thread.sleep(1000L);
|
||||
} catch (Exception ignoreMe) {}
|
||||
waitMs += 1000L;
|
||||
}
|
||||
} // end while
|
||||
|
||||
if (!allReplicasUp)
|
||||
fail("Didn't see replicas "+ replicasToCheck +
|
||||
" come up within " + maxWaitMs + " ms! ClusterState: " + printClusterStateInfo());
|
||||
|
||||
long diffMs = (System.currentTimeMillis() - startMs);
|
||||
log.info("Took " + diffMs + " ms to see replicas ["+replicasToCheck+"] become active.");
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue