mirror of https://github.com/apache/lucene.git
SOLR-6511: Fencepost error in LeaderInitiatedRecoveryThread; refactor HttpPartitionTest to resolve jenkins failures.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1627347 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
16c4a3cd91
commit
3f31f26d3d
|
@ -201,6 +201,8 @@ Bug Fixes
|
|||
|
||||
* SOLR-6509: Solr start scripts interactive mode doesn't honor -z argument (Timothy Potter)
|
||||
|
||||
* SOLR-6511: Fencepost error in LeaderInitiatedRecoveryThread (Timothy Potter)
|
||||
|
||||
|
||||
Other Changes
|
||||
----------------------
|
||||
|
|
|
@ -387,7 +387,8 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
|||
collection,
|
||||
shardId,
|
||||
coreNodeProps,
|
||||
120);
|
||||
120,
|
||||
coreNodeName);
|
||||
zkController.ensureReplicaInLeaderInitiatedRecovery(
|
||||
collection, shardId, coreNodeProps.getCoreUrl(), coreNodeProps, false);
|
||||
|
||||
|
|
|
@ -50,13 +50,15 @@ public class LeaderInitiatedRecoveryThread extends Thread {
|
|||
protected String shardId;
|
||||
protected ZkCoreNodeProps nodeProps;
|
||||
protected int maxTries;
|
||||
protected String leaderCoreNodeName;
|
||||
|
||||
public LeaderInitiatedRecoveryThread(ZkController zkController,
|
||||
CoreContainer cc,
|
||||
String collection,
|
||||
String shardId,
|
||||
ZkCoreNodeProps nodeProps,
|
||||
int maxTries)
|
||||
int maxTries,
|
||||
String leaderCoreNodeName)
|
||||
{
|
||||
super("LeaderInitiatedRecoveryThread-"+nodeProps.getCoreName());
|
||||
this.zkController = zkController;
|
||||
|
@ -65,6 +67,7 @@ public class LeaderInitiatedRecoveryThread extends Thread {
|
|||
this.shardId = shardId;
|
||||
this.nodeProps = nodeProps;
|
||||
this.maxTries = maxTries;
|
||||
this.leaderCoreNodeName = leaderCoreNodeName;
|
||||
|
||||
setDaemon(true);
|
||||
}
|
||||
|
@ -103,7 +106,7 @@ public class LeaderInitiatedRecoveryThread extends Thread {
|
|||
recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
|
||||
recoverRequestCmd.setCoreName(coreNeedingRecovery);
|
||||
|
||||
while (continueTrying && ++tries < maxTries) {
|
||||
while (continueTrying && ++tries <= maxTries) {
|
||||
if (tries > 1) {
|
||||
log.warn("Asking core={} coreNodeName={} on " + recoveryUrl +
|
||||
" to recover; unsuccessful after "+tries+" of "+maxTries+" attempts so far ...", coreNeedingRecovery, replicaCoreNodeName);
|
||||
|
@ -150,7 +153,7 @@ public class LeaderInitiatedRecoveryThread extends Thread {
|
|||
|
||||
if (coreContainer.isShutDown()) {
|
||||
log.warn("Stop trying to send recovery command to downed replica core={} coreNodeName={} on "
|
||||
+ replicaNodeName + " because my core container is close.", coreNeedingRecovery, replicaCoreNodeName);
|
||||
+ replicaNodeName + " because my core container is closed.", coreNeedingRecovery, replicaCoreNodeName);
|
||||
continueTrying = false;
|
||||
break;
|
||||
}
|
||||
|
@ -170,6 +173,24 @@ public class LeaderInitiatedRecoveryThread extends Thread {
|
|||
break;
|
||||
}
|
||||
|
||||
// stop trying if I'm no longer the leader
|
||||
if (leaderCoreNodeName != null && collection != null) {
|
||||
String leaderCoreNodeNameFromZk = null;
|
||||
try {
|
||||
leaderCoreNodeNameFromZk = zkController.getZkStateReader().getLeaderRetry(collection, shardId, 1000).getName();
|
||||
} catch (Exception exc) {
|
||||
log.error("Failed to determine if " + leaderCoreNodeName + " is still the leader for " + collection +
|
||||
" " + shardId + " before starting leader-initiated recovery thread for " + replicaUrl + " due to: " + exc);
|
||||
}
|
||||
if (!leaderCoreNodeName.equals(leaderCoreNodeNameFromZk)) {
|
||||
log.warn("Stop trying to send recovery command to downed replica core=" + coreNeedingRecovery +
|
||||
",coreNodeName=" + replicaCoreNodeName + " on " + replicaNodeName + " because " +
|
||||
leaderCoreNodeName + " is no longer the leader! New leader is " + leaderCoreNodeNameFromZk);
|
||||
continueTrying = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// additional safeguard against the replica trying to be in the active state
|
||||
// before acknowledging the leader initiated recovery command
|
||||
if (continueTrying && collection != null && shardId != null) {
|
||||
|
|
|
@ -595,7 +595,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
String fromCollection = req.getParams().get(DISTRIB_FROM_COLLECTION); // is it because of a routing rule?
|
||||
if (fromCollection == null) {
|
||||
log.error("Request says it is coming from leader, but we are the leader: " + req.getParamString());
|
||||
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader");
|
||||
SolrException solrExc = new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader");
|
||||
solrExc.setMetadata("cause", "LeaderChanged");
|
||||
throw solrExc;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -808,6 +810,16 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
|
||||
final String replicaUrl = error.req.node.getUrl();
|
||||
|
||||
// if the remote replica failed the request because of leader change (SOLR-6511), then fail the request
|
||||
String cause = (error.e instanceof SolrException) ? ((SolrException)error.e).getMetadata("cause") : null;
|
||||
if ("LeaderChanged".equals(cause)) {
|
||||
// let's just fail this request and let the client retry? or just call processAdd again?
|
||||
log.error("On "+cloudDesc.getCoreNodeName()+", replica "+replicaUrl+
|
||||
" now thinks it is the leader! Failing the request to let the client retry! "+error.e);
|
||||
rsp.setException(error.e);
|
||||
break;
|
||||
}
|
||||
|
||||
int maxTries = 1;
|
||||
boolean sendRecoveryCommand = true;
|
||||
String collection = null;
|
||||
|
@ -817,6 +829,17 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
StdNode stdNode = (StdNode)error.req.node;
|
||||
collection = stdNode.getCollection();
|
||||
shardId = stdNode.getShardId();
|
||||
|
||||
// before we go setting other replicas to down, make sure we're still the leader!
|
||||
String leaderCoreNodeName = null;
|
||||
try {
|
||||
leaderCoreNodeName = zkController.getZkStateReader().getLeaderRetry(collection, shardId).getName();
|
||||
} catch (Exception exc) {
|
||||
log.error("Failed to determine if " + cloudDesc.getCoreNodeName() + " is still the leader for " + collection +
|
||||
" " + shardId + " before putting " + replicaUrl + " into leader-initiated recovery due to: " + exc);
|
||||
}
|
||||
|
||||
if (cloudDesc.getCoreNodeName().equals(leaderCoreNodeName)) {
|
||||
try {
|
||||
// if false, then the node is probably not "live" anymore
|
||||
sendRecoveryCommand =
|
||||
|
@ -831,9 +854,22 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
maxTries = 120;
|
||||
} // else the node is no longer "live" so no need to send any recovery command
|
||||
|
||||
} catch (KeeperException.SessionExpiredException see) {
|
||||
log.error("Leader failed to set replica " +
|
||||
error.req.node.getUrl() + " state to DOWN due to: " + see, see);
|
||||
// our session is expired, which means our state is suspect, so don't go
|
||||
// putting other replicas in recovery (see SOLR-6511)
|
||||
sendRecoveryCommand = false;
|
||||
} catch (Exception e) {
|
||||
log.error("Leader failed to set replica "+
|
||||
error.req.node.getUrl()+" state to DOWN due to: "+e, e);
|
||||
log.error("Leader failed to set replica " +
|
||||
error.req.node.getUrl() + " state to DOWN due to: " + e, e);
|
||||
// will go ahead and try to send the recovery command once after this error
|
||||
}
|
||||
} else {
|
||||
// not the leader anymore maybe?
|
||||
sendRecoveryCommand = false;
|
||||
log.warn("Core "+cloudDesc.getCoreNodeName()+" is no longer the leader for "+collection+" "+
|
||||
shardId+", no request recovery command will be sent!");
|
||||
}
|
||||
} // else not a StdNode, recovery command still gets sent once
|
||||
|
||||
|
@ -841,7 +877,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
continue; // the replica is already in recovery handling or is not live
|
||||
|
||||
Throwable rootCause = SolrException.getRootCause(error.e);
|
||||
log.error("Setting up to try to start recovery on replica "+replicaUrl+" after: "+rootCause);
|
||||
log.error("Setting up to try to start recovery on replica " + replicaUrl + " after: " + rootCause);
|
||||
|
||||
// try to send the recovery command to the downed replica in a background thread
|
||||
CoreContainer coreContainer = req.getCore().getCoreDescriptor().getCoreContainer();
|
||||
|
@ -851,7 +887,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
collection,
|
||||
shardId,
|
||||
error.req.node.getNodeProps(),
|
||||
maxTries);
|
||||
maxTries,
|
||||
cloudDesc.getCoreNodeName()); // core node name of current leader
|
||||
ExecutorService executor = coreContainer.getUpdateShardHandler().getUpdateExecutor();
|
||||
executor.execute(lirThread);
|
||||
}
|
||||
|
|
|
@ -18,9 +18,6 @@ package org.apache.solr.cloud;
|
|||
*/
|
||||
|
||||
import java.io.File;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
|
@ -30,7 +27,6 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.JSONTestUtil;
|
||||
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
|
||||
|
@ -39,6 +35,7 @@ import org.apache.solr.client.solrj.impl.HttpSolrServer;
|
|||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
|
@ -55,26 +52,24 @@ import org.slf4j.LoggerFactory;
|
|||
* Simulates HTTP partitions between a leader and replica but the replica does
|
||||
* not lose its ZooKeeper connection.
|
||||
*/
|
||||
|
||||
@Slow
|
||||
@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
|
||||
@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-6241")
|
||||
public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
|
||||
|
||||
private static final transient Logger log =
|
||||
protected static final transient Logger log =
|
||||
LoggerFactory.getLogger(HttpPartitionTest.class);
|
||||
|
||||
// To prevent the test assertions firing too fast before cluster state
|
||||
// recognizes (and propagates) partitions
|
||||
private static final long sleepMsBeforeHealPartition = 2000L;
|
||||
protected static final long sleepMsBeforeHealPartition = 2000L;
|
||||
|
||||
private static final int maxWaitSecsToSeeAllActive = 30;
|
||||
|
||||
private Map<URI,SocketProxy> proxies = new HashMap<URI,SocketProxy>();
|
||||
protected static final int maxWaitSecsToSeeAllActive = 30;
|
||||
|
||||
public HttpPartitionTest() {
|
||||
super();
|
||||
sliceCount = 2;
|
||||
shardCount = 2;
|
||||
shardCount = 3;
|
||||
}
|
||||
|
||||
@Before
|
||||
|
@ -87,58 +82,22 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
|
|||
@Override
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
System.clearProperty("numShards");
|
||||
|
||||
try {
|
||||
super.tearDown();
|
||||
} catch (Exception exc) {}
|
||||
|
||||
resetExceptionIgnores();
|
||||
|
||||
// close socket proxies after super.tearDown
|
||||
if (!proxies.isEmpty()) {
|
||||
for (SocketProxy proxy : proxies.values()) {
|
||||
proxy.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Overrides the parent implementation so that we can configure a socket proxy
|
||||
* to sit infront of each Jetty server, which gives us the ability to simulate
|
||||
* network partitions without having to fuss with IPTables (which is not very
|
||||
* cross platform friendly).
|
||||
* Overrides the parent implementation to install a SocketProxy in-front of the Jetty server.
|
||||
*/
|
||||
@Override
|
||||
public JettySolrRunner createJetty(File solrHome, String dataDir,
|
||||
String shardList, String solrConfigOverride, String schemaOverride)
|
||||
throws Exception {
|
||||
|
||||
JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), context,
|
||||
0, solrConfigOverride, schemaOverride, false,
|
||||
getExtraServlets(), sslConfig, getExtraRequestFilters());
|
||||
jetty.setShards(shardList);
|
||||
jetty.setDataDir(getDataDir(dataDir));
|
||||
|
||||
// setup to proxy Http requests to this server unless it is the control
|
||||
// server
|
||||
int proxyPort = getNextAvailablePort();
|
||||
jetty.setProxyPort(proxyPort);
|
||||
jetty.start();
|
||||
|
||||
// create a socket proxy for the jetty server ...
|
||||
SocketProxy proxy = new SocketProxy(proxyPort, jetty.getBaseUrl().toURI());
|
||||
proxies.put(proxy.getUrl(), proxy);
|
||||
|
||||
return jetty;
|
||||
}
|
||||
|
||||
protected int getNextAvailablePort() throws Exception {
|
||||
int port = -1;
|
||||
try (ServerSocket s = new ServerSocket(0)) {
|
||||
port = s.getLocalPort();
|
||||
}
|
||||
return port;
|
||||
throws Exception
|
||||
{
|
||||
return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -148,12 +107,16 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
|
|||
// test a 1x2 collection
|
||||
testRf2();
|
||||
|
||||
waitForThingsToLevelOut(30000);
|
||||
|
||||
// now do similar for a 1x3 collection while taking 2 replicas on-and-off
|
||||
// each time
|
||||
testRf3();
|
||||
|
||||
// kill a leader and make sure recovery occurs as expected
|
||||
testRf3WithLeaderFailover();
|
||||
waitForThingsToLevelOut(30000);
|
||||
|
||||
// have the leader lose its Zk session temporarily
|
||||
testLeaderZkSessionLoss();
|
||||
}
|
||||
|
||||
protected void testRf2() throws Exception {
|
||||
|
@ -247,11 +210,9 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
|
|||
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
|
||||
assertTrue("Expected 2 replicas for collection " + testCollectionName
|
||||
+ " but found " + notLeaders.size() + "; clusterState: "
|
||||
+ printClusterStateInfo(),
|
||||
+ printClusterStateInfo(testCollectionName),
|
||||
notLeaders.size() == 2);
|
||||
|
||||
sendDoc(1);
|
||||
|
||||
// ok, now introduce a network partition between the leader and the replica
|
||||
SocketProxy proxy0 = getProxyForReplica(notLeaders.get(0));
|
||||
|
||||
|
@ -290,126 +251,111 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
protected void testRf3WithLeaderFailover() throws Exception {
|
||||
// now let's create a partition in one of the replicas and outright
|
||||
// kill the leader ... see what happens
|
||||
// create a collection that has 1 shard but 3 replicas
|
||||
String testCollectionName = "c8n_1x3_lf"; // _lf is leader fails
|
||||
createCollection(testCollectionName, 1, 3, 1);
|
||||
// test inspired by SOLR-6511
|
||||
protected void testLeaderZkSessionLoss() throws Exception {
|
||||
|
||||
String testCollectionName = "c8n_1x2_leader_session_loss";
|
||||
createCollection(testCollectionName, 1, 2, 1);
|
||||
cloudClient.setDefaultCollection(testCollectionName);
|
||||
|
||||
sendDoc(1);
|
||||
|
||||
List<Replica> notLeaders =
|
||||
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
|
||||
assertTrue("Expected 2 replicas for collection " + testCollectionName
|
||||
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
|
||||
assertTrue("Expected 1 replicas for collection " + testCollectionName
|
||||
+ " but found " + notLeaders.size() + "; clusterState: "
|
||||
+ printClusterStateInfo(),
|
||||
notLeaders.size() == 2);
|
||||
|
||||
sendDoc(1);
|
||||
|
||||
// ok, now introduce a network partition between the leader and the replica
|
||||
SocketProxy proxy0 = null;
|
||||
proxy0 = getProxyForReplica(notLeaders.get(0));
|
||||
|
||||
proxy0.close();
|
||||
|
||||
// indexing during a partition
|
||||
sendDoc(2);
|
||||
|
||||
Thread.sleep(sleepMsBeforeHealPartition);
|
||||
|
||||
proxy0.reopen();
|
||||
|
||||
SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1));
|
||||
|
||||
proxy1.close();
|
||||
|
||||
sendDoc(3);
|
||||
|
||||
Thread.sleep(sleepMsBeforeHealPartition);
|
||||
proxy1.reopen();
|
||||
|
||||
// sent 4 docs in so far, verify they are on the leader and replica
|
||||
notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
|
||||
|
||||
sendDoc(4);
|
||||
|
||||
assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 4);
|
||||
+ printClusterStateInfo(testCollectionName),
|
||||
notLeaders.size() == 1);
|
||||
|
||||
Replica leader =
|
||||
cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
|
||||
String leaderNode = leader.getNodeName();
|
||||
assertNotNull("Could not find leader for shard1 of "+
|
||||
testCollectionName+"; clusterState: "+printClusterStateInfo(), leader);
|
||||
testCollectionName+"; clusterState: "+printClusterStateInfo(testCollectionName), leader);
|
||||
JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
|
||||
|
||||
// since maxShardsPerNode is 1, we're safe to kill the leader
|
||||
notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
|
||||
proxy0 = getProxyForReplica(notLeaders.get(0));
|
||||
proxy0.close();
|
||||
HttpSolrServer leaderSolr = getHttpSolrServer(leader, testCollectionName);
|
||||
SolrInputDocument doc = new SolrInputDocument();
|
||||
doc.addField(id, String.valueOf(2));
|
||||
doc.addField("a_t", "hello" + 2);
|
||||
|
||||
// indexing during a partition
|
||||
// doc should be on leader and 1 replica
|
||||
sendDoc(5);
|
||||
|
||||
assertDocExists(getHttpSolrServer(leader, testCollectionName), testCollectionName, "5");
|
||||
assertDocExists(getHttpSolrServer(notLeaders.get(1), testCollectionName), testCollectionName, "5");
|
||||
|
||||
Thread.sleep(sleepMsBeforeHealPartition);
|
||||
|
||||
String shouldNotBeNewLeaderNode = notLeaders.get(0).getNodeName();
|
||||
|
||||
// kill the leader
|
||||
leaderJetty.stop();
|
||||
|
||||
if (leaderJetty.isRunning())
|
||||
fail("Failed to stop the leader on "+leaderNode);
|
||||
|
||||
SocketProxy oldLeaderProxy = getProxyForReplica(leader);
|
||||
if (oldLeaderProxy != null) {
|
||||
oldLeaderProxy.close();
|
||||
} else {
|
||||
log.warn("No SocketProxy found for old leader node "+leaderNode);
|
||||
}
|
||||
|
||||
Thread.sleep(10000); // give chance for new leader to be elected.
|
||||
|
||||
Replica newLeader =
|
||||
cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 60000);
|
||||
|
||||
assertNotNull("No new leader was elected after 60 seconds; clusterState: "+
|
||||
printClusterStateInfo(),newLeader);
|
||||
|
||||
assertTrue("Expected node "+shouldNotBeNewLeaderNode+
|
||||
" to NOT be the new leader b/c it was out-of-sync with the old leader! ClusterState: "+
|
||||
printClusterStateInfo(),
|
||||
!shouldNotBeNewLeaderNode.equals(newLeader.getNodeName()));
|
||||
|
||||
proxy0.reopen();
|
||||
// cause leader migration by expiring the current leader's zk session
|
||||
chaosMonkey.expireSession(leaderJetty);
|
||||
|
||||
String expectedNewLeaderCoreNodeName = notLeaders.get(0).getName();
|
||||
long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS);
|
||||
while (System.nanoTime() < timeout) {
|
||||
cloudClient.getZkStateReader().updateClusterState(true);
|
||||
String currentLeaderName = null;
|
||||
try {
|
||||
Replica currentLeader =
|
||||
cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
|
||||
currentLeaderName = currentLeader.getName();
|
||||
} catch (Exception exc) {}
|
||||
|
||||
List<Replica> activeReps = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
|
||||
if (activeReps.size() == 2) break;
|
||||
Thread.sleep(1000);
|
||||
if (expectedNewLeaderCoreNodeName.equals(currentLeaderName))
|
||||
break; // new leader was elected after zk session expiration
|
||||
|
||||
Thread.sleep(500);
|
||||
}
|
||||
|
||||
Replica currentLeader =
|
||||
cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
|
||||
assertEquals(expectedNewLeaderCoreNodeName, currentLeader.getName());
|
||||
|
||||
log.info("Sending doc 2 to old leader "+leader.getName());
|
||||
try {
|
||||
leaderSolr.add(doc);
|
||||
leaderSolr.shutdown();
|
||||
|
||||
Replica oldLeaderInRecovery = null;
|
||||
for (Replica next : getActiveOrRecoveringReplicas(testCollectionName, "shard1")) {
|
||||
if (next.getName().equals(leader.getName()) &&
|
||||
ZkStateReader.RECOVERING.equals(next.getStr(ZkStateReader.STATE_PROP)))
|
||||
{
|
||||
oldLeaderInRecovery = next;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// if the old leader is not active or recovering, the add should have failed
|
||||
if (oldLeaderInRecovery != null) {
|
||||
HttpSolrServer oldLeaderSolr = getHttpSolrServer(oldLeaderInRecovery, testCollectionName);
|
||||
try {
|
||||
assertDocExists(oldLeaderSolr, testCollectionName, "2");
|
||||
} finally {
|
||||
oldLeaderSolr.shutdown();
|
||||
}
|
||||
} else {
|
||||
fail("Send doc 2 to old leader " + leader.getName() +
|
||||
" should have failed! ClusterState: " + printClusterStateInfo(testCollectionName));
|
||||
}
|
||||
|
||||
} catch (SolrException exc) {
|
||||
// this is expected ..
|
||||
leaderSolr = getHttpSolrServer(currentLeader, testCollectionName);
|
||||
try {
|
||||
leaderSolr.add(doc); // this should work
|
||||
} finally {
|
||||
leaderSolr.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
List<Replica> participatingReplicas = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
|
||||
assertTrue("Expected 2 of 3 replicas to be active but only found "+
|
||||
participatingReplicas.size()+"; "+participatingReplicas+"; clusterState: "+printClusterStateInfo(),
|
||||
participatingReplicas.size() == 2);
|
||||
|
||||
sendDoc(6);
|
||||
|
||||
Set<String> replicasToCheck = new HashSet<>();
|
||||
for (Replica stillUp : participatingReplicas)
|
||||
replicasToCheck.add(stillUp.getName());
|
||||
waitToSeeReplicasActive(testCollectionName, "shard1", replicasToCheck, 20);
|
||||
assertDocsExistInAllReplicas(participatingReplicas, testCollectionName, 1, 6);
|
||||
assertDocsExistInAllReplicas(participatingReplicas, testCollectionName, 1, 2);
|
||||
|
||||
// try to clean up
|
||||
try {
|
||||
CollectionAdminRequest req = new CollectionAdminRequest.Delete();
|
||||
req.setCollectionName(testCollectionName);
|
||||
req.process(cloudClient);
|
||||
} catch (Exception e) {
|
||||
// don't fail the test
|
||||
log.warn("Could not delete collection {} after test completed", testCollectionName);
|
||||
}
|
||||
}
|
||||
|
||||
protected List<Replica> getActiveOrRecoveringReplicas(String testCollectionName, String shardId) throws Exception {
|
||||
|
@ -432,20 +378,6 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
|
|||
return replicas;
|
||||
}
|
||||
|
||||
protected SocketProxy getProxyForReplica(Replica replica) throws Exception {
|
||||
String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
|
||||
assertNotNull(replicaBaseUrl);
|
||||
URL baseUrl = new URL(replicaBaseUrl);
|
||||
|
||||
SocketProxy proxy = proxies.get(baseUrl.toURI());
|
||||
if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) {
|
||||
baseUrl = new URL(baseUrl.toExternalForm() + "/");
|
||||
proxy = proxies.get(baseUrl.toURI());
|
||||
}
|
||||
assertNotNull("No proxy found for " + baseUrl + "!", proxy);
|
||||
return proxy;
|
||||
}
|
||||
|
||||
protected void assertDocsExistInAllReplicas(List<Replica> notLeaders,
|
||||
String testCollectionName, int firstDocId, int lastDocId)
|
||||
throws Exception {
|
||||
|
@ -501,33 +433,11 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
|
|||
protected void assertDocExists(HttpSolrServer solr, String coll, String docId) throws Exception {
|
||||
QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId, "distrib", "false"));
|
||||
NamedList rsp = solr.request(qr);
|
||||
String match =
|
||||
JSONTestUtil.matchObj("/id", rsp.get("doc"), new Integer(docId));
|
||||
String match = JSONTestUtil.matchObj("/id", rsp.get("doc"), new Integer(docId));
|
||||
assertTrue("Doc with id=" + docId + " not found in " + solr.getBaseURL()
|
||||
+ " due to: " + match + "; rsp="+rsp, match == null);
|
||||
}
|
||||
|
||||
protected JettySolrRunner getJettyOnPort(int port) {
|
||||
JettySolrRunner theJetty = null;
|
||||
for (JettySolrRunner jetty : jettys) {
|
||||
if (port == jetty.getLocalPort()) {
|
||||
theJetty = jetty;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (theJetty == null) {
|
||||
if (controlJetty.getLocalPort() == port) {
|
||||
theJetty = controlJetty;
|
||||
}
|
||||
}
|
||||
|
||||
if (theJetty == null)
|
||||
fail("Not able to find JettySolrRunner for port: "+port);
|
||||
|
||||
return theJetty;
|
||||
}
|
||||
|
||||
protected int getReplicaPort(Replica replica) {
|
||||
String replicaNode = replica.getNodeName();
|
||||
String tmp = replicaNode.substring(replicaNode.indexOf(':')+1);
|
||||
|
@ -580,7 +490,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
|
|||
|
||||
if (!allReplicasUp)
|
||||
fail("Didn't see replicas "+ replicasToCheck +
|
||||
" come up within " + maxWaitMs + " ms! ClusterState: " + printClusterStateInfo());
|
||||
" come up within " + maxWaitMs + " ms! ClusterState: " + printClusterStateInfo(testCollectionName));
|
||||
|
||||
long diffMs = (System.currentTimeMillis() - startMs);
|
||||
log.info("Took " + diffMs + " ms to see replicas ["+replicasToCheck+"] become active.");
|
||||
|
|
|
@ -0,0 +1,181 @@
|
|||
package org.apache.solr.cloud;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Tests leader-initiated recovery scenarios after a leader node fails
|
||||
* and one of the replicas is out-of-sync.
|
||||
*/
|
||||
@Slow
|
||||
@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
|
||||
public class LeaderFailoverAfterPartitionTest extends HttpPartitionTest {
|
||||
|
||||
public LeaderFailoverAfterPartitionTest() {
|
||||
super();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void doTest() throws Exception {
|
||||
waitForThingsToLevelOut(30000);
|
||||
|
||||
// kill a leader and make sure recovery occurs as expected
|
||||
testRf3WithLeaderFailover();
|
||||
}
|
||||
|
||||
protected void testRf3WithLeaderFailover() throws Exception {
|
||||
// now let's create a partition in one of the replicas and outright
|
||||
// kill the leader ... see what happens
|
||||
// create a collection that has 1 shard but 3 replicas
|
||||
String testCollectionName = "c8n_1x3_lf"; // _lf is leader fails
|
||||
createCollection(testCollectionName, 1, 3, 1);
|
||||
cloudClient.setDefaultCollection(testCollectionName);
|
||||
|
||||
sendDoc(1);
|
||||
|
||||
List<Replica> notLeaders =
|
||||
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
|
||||
assertTrue("Expected 2 replicas for collection " + testCollectionName
|
||||
+ " but found " + notLeaders.size() + "; clusterState: "
|
||||
+ printClusterStateInfo(testCollectionName),
|
||||
notLeaders.size() == 2);
|
||||
|
||||
// ok, now introduce a network partition between the leader and the replica
|
||||
SocketProxy proxy0 = null;
|
||||
proxy0 = getProxyForReplica(notLeaders.get(0));
|
||||
|
||||
proxy0.close();
|
||||
|
||||
// indexing during a partition
|
||||
sendDoc(2);
|
||||
|
||||
Thread.sleep(sleepMsBeforeHealPartition);
|
||||
|
||||
proxy0.reopen();
|
||||
|
||||
SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1));
|
||||
|
||||
proxy1.close();
|
||||
|
||||
sendDoc(3);
|
||||
|
||||
Thread.sleep(sleepMsBeforeHealPartition);
|
||||
proxy1.reopen();
|
||||
|
||||
// sent 4 docs in so far, verify they are on the leader and replica
|
||||
notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
|
||||
|
||||
sendDoc(4);
|
||||
|
||||
assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 4);
|
||||
|
||||
Replica leader =
|
||||
cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
|
||||
String leaderNode = leader.getNodeName();
|
||||
assertNotNull("Could not find leader for shard1 of "+
|
||||
testCollectionName+"; clusterState: "+printClusterStateInfo(testCollectionName), leader);
|
||||
JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
|
||||
|
||||
// since maxShardsPerNode is 1, we're safe to kill the leader
|
||||
notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
|
||||
proxy0 = getProxyForReplica(notLeaders.get(0));
|
||||
proxy0.close();
|
||||
|
||||
// indexing during a partition
|
||||
// doc should be on leader and 1 replica
|
||||
sendDoc(5);
|
||||
|
||||
assertDocExists(getHttpSolrServer(leader, testCollectionName), testCollectionName, "5");
|
||||
assertDocExists(getHttpSolrServer(notLeaders.get(1), testCollectionName), testCollectionName, "5");
|
||||
|
||||
Thread.sleep(sleepMsBeforeHealPartition);
|
||||
|
||||
String shouldNotBeNewLeaderNode = notLeaders.get(0).getNodeName();
|
||||
|
||||
//chaosMonkey.expireSession(leaderJetty);
|
||||
// kill the leader
|
||||
leaderJetty.stop();
|
||||
if (leaderJetty.isRunning())
|
||||
fail("Failed to stop the leader on "+leaderNode);
|
||||
|
||||
SocketProxy oldLeaderProxy = getProxyForReplica(leader);
|
||||
if (oldLeaderProxy != null) {
|
||||
oldLeaderProxy.close();
|
||||
} else {
|
||||
log.warn("No SocketProxy found for old leader node "+leaderNode);
|
||||
}
|
||||
|
||||
Thread.sleep(10000); // give chance for new leader to be elected.
|
||||
|
||||
Replica newLeader =
|
||||
cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 60000);
|
||||
|
||||
assertNotNull("No new leader was elected after 60 seconds; clusterState: "+
|
||||
printClusterStateInfo(testCollectionName),newLeader);
|
||||
|
||||
assertTrue("Expected node "+shouldNotBeNewLeaderNode+
|
||||
" to NOT be the new leader b/c it was out-of-sync with the old leader! ClusterState: "+
|
||||
printClusterStateInfo(testCollectionName),
|
||||
!shouldNotBeNewLeaderNode.equals(newLeader.getNodeName()));
|
||||
|
||||
proxy0.reopen();
|
||||
|
||||
long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS);
|
||||
while (System.nanoTime() < timeout) {
|
||||
cloudClient.getZkStateReader().updateClusterState(true);
|
||||
|
||||
List<Replica> activeReps = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
|
||||
if (activeReps.size() >= 2) break;
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
List<Replica> participatingReplicas = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
|
||||
assertTrue("Expected 2 of 3 replicas to be active but only found "+
|
||||
participatingReplicas.size()+"; "+participatingReplicas+"; clusterState: "+
|
||||
printClusterStateInfo(testCollectionName),
|
||||
participatingReplicas.size() >= 2);
|
||||
|
||||
sendDoc(6);
|
||||
|
||||
Set<String> replicasToCheck = new HashSet<>();
|
||||
for (Replica stillUp : participatingReplicas)
|
||||
replicasToCheck.add(stillUp.getName());
|
||||
waitToSeeReplicasActive(testCollectionName, "shard1", replicasToCheck, 20);
|
||||
assertDocsExistInAllReplicas(participatingReplicas, testCollectionName, 1, 6);
|
||||
|
||||
// try to clean up
|
||||
try {
|
||||
CollectionAdminRequest req = new CollectionAdminRequest.Delete();
|
||||
req.setCollectionName(testCollectionName);
|
||||
req.process(cloudClient);
|
||||
} catch (Exception e) {
|
||||
// don't fail the test
|
||||
log.warn("Could not delete collection {} after test completed", testCollectionName);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -26,7 +26,9 @@ import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -81,6 +83,8 @@ import org.junit.After;
|
|||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.noggit.CharArr;
|
||||
import org.noggit.JSONWriter;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -124,6 +128,8 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
|||
protected boolean checkCreatedVsState;
|
||||
protected boolean useJettyDataDir = true;
|
||||
|
||||
protected Map<URI,SocketProxy> proxies = new HashMap<URI,SocketProxy>();
|
||||
|
||||
public static class CloudJettyRunner {
|
||||
public JettySolrRunner jetty;
|
||||
public String nodeName;
|
||||
|
@ -515,6 +521,77 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
|||
return jetty;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a JettySolrRunner with a socket proxy sitting infront of the Jetty server,
|
||||
* which gives us the ability to simulate network partitions without having to fuss
|
||||
* with IPTables.
|
||||
*/
|
||||
public JettySolrRunner createProxiedJetty(File solrHome, String dataDir,
|
||||
String shardList, String solrConfigOverride, String schemaOverride)
|
||||
throws Exception {
|
||||
|
||||
JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), context,
|
||||
0, solrConfigOverride, schemaOverride, false,
|
||||
getExtraServlets(), sslConfig, getExtraRequestFilters());
|
||||
jetty.setShards(shardList);
|
||||
jetty.setDataDir(getDataDir(dataDir));
|
||||
|
||||
// setup to proxy Http requests to this server unless it is the control
|
||||
// server
|
||||
int proxyPort = getNextAvailablePort();
|
||||
jetty.setProxyPort(proxyPort);
|
||||
jetty.start();
|
||||
|
||||
// create a socket proxy for the jetty server ...
|
||||
SocketProxy proxy = new SocketProxy(proxyPort, jetty.getBaseUrl().toURI());
|
||||
proxies.put(proxy.getUrl(), proxy);
|
||||
|
||||
return jetty;
|
||||
}
|
||||
|
||||
protected JettySolrRunner getJettyOnPort(int port) {
|
||||
JettySolrRunner theJetty = null;
|
||||
for (JettySolrRunner jetty : jettys) {
|
||||
if (port == jetty.getLocalPort()) {
|
||||
theJetty = jetty;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (theJetty == null) {
|
||||
if (controlJetty.getLocalPort() == port) {
|
||||
theJetty = controlJetty;
|
||||
}
|
||||
}
|
||||
|
||||
if (theJetty == null)
|
||||
fail("Not able to find JettySolrRunner for port: "+port);
|
||||
|
||||
return theJetty;
|
||||
}
|
||||
|
||||
protected SocketProxy getProxyForReplica(Replica replica) throws Exception {
|
||||
String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
|
||||
assertNotNull(replicaBaseUrl);
|
||||
URL baseUrl = new URL(replicaBaseUrl);
|
||||
|
||||
SocketProxy proxy = proxies.get(baseUrl.toURI());
|
||||
if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) {
|
||||
baseUrl = new URL(baseUrl.toExternalForm() + "/");
|
||||
proxy = proxies.get(baseUrl.toURI());
|
||||
}
|
||||
assertNotNull("No proxy found for " + baseUrl + "!", proxy);
|
||||
return proxy;
|
||||
}
|
||||
|
||||
protected int getNextAvailablePort() throws Exception {
|
||||
int port = -1;
|
||||
try (ServerSocket s = new ServerSocket(0)) {
|
||||
port = s.getLocalPort();
|
||||
}
|
||||
return port;
|
||||
}
|
||||
|
||||
private File getRelativeSolrHomePath(File solrHome) {
|
||||
String path = SolrResourceLoader.normalizeDir(new File(".").getAbsolutePath());
|
||||
String base = new File(solrHome.getPath()).getAbsolutePath();
|
||||
|
@ -1467,6 +1544,13 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
|||
|
||||
System.clearProperty("zkHost");
|
||||
System.clearProperty("numShards");
|
||||
|
||||
// close socket proxies after super.tearDown
|
||||
if (!proxies.isEmpty()) {
|
||||
for (SocketProxy proxy : proxies.values()) {
|
||||
proxy.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1860,7 +1944,23 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
|||
}
|
||||
|
||||
protected String printClusterStateInfo() throws Exception {
|
||||
return printClusterStateInfo(null);
|
||||
}
|
||||
|
||||
protected String printClusterStateInfo(String collection) throws Exception {
|
||||
cloudClient.getZkStateReader().updateClusterState(true);
|
||||
return String.valueOf(cloudClient.getZkStateReader().getClusterState());
|
||||
String cs = null;
|
||||
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
|
||||
if (collection != null) {
|
||||
cs = clusterState.getCollection(collection).toString();
|
||||
} else {
|
||||
Map<String,DocCollection> map = new HashMap<String,DocCollection>();
|
||||
for (String coll : clusterState.getCollections())
|
||||
map.put(coll, clusterState.getCollection(coll));
|
||||
CharArr out = new CharArr();
|
||||
new JSONWriter(out, 2).write(map);
|
||||
cs = out.toString();
|
||||
}
|
||||
return cs;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue