diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 3f504098449..d81ba357149 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -201,6 +201,8 @@ Bug Fixes * SOLR-6509: Solr start scripts interactive mode doesn't honor -z argument (Timothy Potter) +* SOLR-6511: Fencepost error in LeaderInitiatedRecoveryThread (Timothy Potter) + Other Changes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java index 18c30cc5a98..2cde87a8645 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java @@ -387,7 +387,8 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { collection, shardId, coreNodeProps, - 120); + 120, + coreNodeName); zkController.ensureReplicaInLeaderInitiatedRecovery( collection, shardId, coreNodeProps.getCoreUrl(), coreNodeProps, false); diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java b/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java index cd6b736afd1..76fc18556d3 100644 --- a/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java +++ b/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java @@ -50,13 +50,15 @@ public class LeaderInitiatedRecoveryThread extends Thread { protected String shardId; protected ZkCoreNodeProps nodeProps; protected int maxTries; + protected String leaderCoreNodeName; public LeaderInitiatedRecoveryThread(ZkController zkController, CoreContainer cc, String collection, String shardId, ZkCoreNodeProps nodeProps, - int maxTries) + int maxTries, + String leaderCoreNodeName) { super("LeaderInitiatedRecoveryThread-"+nodeProps.getCoreName()); this.zkController = zkController; @@ -65,6 +67,7 @@ public class LeaderInitiatedRecoveryThread extends Thread { this.shardId = shardId; this.nodeProps = nodeProps; this.maxTries = maxTries; + this.leaderCoreNodeName = leaderCoreNodeName; setDaemon(true); } @@ -103,7 +106,7 @@ public class LeaderInitiatedRecoveryThread extends Thread { recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY); recoverRequestCmd.setCoreName(coreNeedingRecovery); - while (continueTrying && ++tries < maxTries) { + while (continueTrying && ++tries <= maxTries) { if (tries > 1) { log.warn("Asking core={} coreNodeName={} on " + recoveryUrl + " to recover; unsuccessful after "+tries+" of "+maxTries+" attempts so far ...", coreNeedingRecovery, replicaCoreNodeName); @@ -150,7 +153,7 @@ public class LeaderInitiatedRecoveryThread extends Thread { if (coreContainer.isShutDown()) { log.warn("Stop trying to send recovery command to downed replica core={} coreNodeName={} on " - + replicaNodeName + " because my core container is close.", coreNeedingRecovery, replicaCoreNodeName); + + replicaNodeName + " because my core container is closed.", coreNeedingRecovery, replicaCoreNodeName); continueTrying = false; break; } @@ -170,6 +173,24 @@ public class LeaderInitiatedRecoveryThread extends Thread { break; } + // stop trying if I'm no longer the leader + if (leaderCoreNodeName != null && collection != null) { + String leaderCoreNodeNameFromZk = null; + try { + leaderCoreNodeNameFromZk = zkController.getZkStateReader().getLeaderRetry(collection, shardId, 1000).getName(); + } catch (Exception exc) { + log.error("Failed to determine if " + leaderCoreNodeName + " is still the leader for " + collection + + " " + shardId + " before starting leader-initiated recovery thread for " + replicaUrl + " due to: " + exc); + } + if (!leaderCoreNodeName.equals(leaderCoreNodeNameFromZk)) { + log.warn("Stop trying to send recovery command to downed replica core=" + coreNeedingRecovery + + ",coreNodeName=" + replicaCoreNodeName + " on " + replicaNodeName + " because " + + leaderCoreNodeName + " is no longer the leader! New leader is " + leaderCoreNodeNameFromZk); + continueTrying = false; + break; + } + } + // additional safeguard against the replica trying to be in the active state // before acknowledging the leader initiated recovery command if (continueTrying && collection != null && shardId != null) { diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index 363000244a1..4d261c1edba 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -595,7 +595,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { String fromCollection = req.getParams().get(DISTRIB_FROM_COLLECTION); // is it because of a routing rule? if (fromCollection == null) { log.error("Request says it is coming from leader, but we are the leader: " + req.getParamString()); - throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader"); + SolrException solrExc = new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader"); + solrExc.setMetadata("cause", "LeaderChanged"); + throw solrExc; } } } @@ -805,57 +807,92 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { DistribPhase.parseParam(error.req.uReq.getParams().get(DISTRIB_UPDATE_PARAM)); if (phase != DistribPhase.FROMLEADER) continue; // don't have non-leaders try to recovery other nodes - - final String replicaUrl = error.req.node.getUrl(); + + final String replicaUrl = error.req.node.getUrl(); + + // if the remote replica failed the request because of leader change (SOLR-6511), then fail the request + String cause = (error.e instanceof SolrException) ? ((SolrException)error.e).getMetadata("cause") : null; + if ("LeaderChanged".equals(cause)) { + // let's just fail this request and let the client retry? or just call processAdd again? + log.error("On "+cloudDesc.getCoreNodeName()+", replica "+replicaUrl+ + " now thinks it is the leader! Failing the request to let the client retry! "+error.e); + rsp.setException(error.e); + break; + } int maxTries = 1; boolean sendRecoveryCommand = true; String collection = null; String shardId = null; - + if (error.req.node instanceof StdNode) { StdNode stdNode = (StdNode)error.req.node; collection = stdNode.getCollection(); shardId = stdNode.getShardId(); + + // before we go setting other replicas to down, make sure we're still the leader! + String leaderCoreNodeName = null; try { - // if false, then the node is probably not "live" anymore - sendRecoveryCommand = - zkController.ensureReplicaInLeaderInitiatedRecovery(collection, - shardId, - replicaUrl, - stdNode.getNodeProps(), - false); - - // we want to try more than once, ~10 minutes - if (sendRecoveryCommand) { - maxTries = 120; - } // else the node is no longer "live" so no need to send any recovery command - - } catch (Exception e) { - log.error("Leader failed to set replica "+ - error.req.node.getUrl()+" state to DOWN due to: "+e, e); + leaderCoreNodeName = zkController.getZkStateReader().getLeaderRetry(collection, shardId).getName(); + } catch (Exception exc) { + log.error("Failed to determine if " + cloudDesc.getCoreNodeName() + " is still the leader for " + collection + + " " + shardId + " before putting " + replicaUrl + " into leader-initiated recovery due to: " + exc); + } + + if (cloudDesc.getCoreNodeName().equals(leaderCoreNodeName)) { + try { + // if false, then the node is probably not "live" anymore + sendRecoveryCommand = + zkController.ensureReplicaInLeaderInitiatedRecovery(collection, + shardId, + replicaUrl, + stdNode.getNodeProps(), + false); + + // we want to try more than once, ~10 minutes + if (sendRecoveryCommand) { + maxTries = 120; + } // else the node is no longer "live" so no need to send any recovery command + + } catch (KeeperException.SessionExpiredException see) { + log.error("Leader failed to set replica " + + error.req.node.getUrl() + " state to DOWN due to: " + see, see); + // our session is expired, which means our state is suspect, so don't go + // putting other replicas in recovery (see SOLR-6511) + sendRecoveryCommand = false; + } catch (Exception e) { + log.error("Leader failed to set replica " + + error.req.node.getUrl() + " state to DOWN due to: " + e, e); + // will go ahead and try to send the recovery command once after this error + } + } else { + // not the leader anymore maybe? + sendRecoveryCommand = false; + log.warn("Core "+cloudDesc.getCoreNodeName()+" is no longer the leader for "+collection+" "+ + shardId+", no request recovery command will be sent!"); } } // else not a StdNode, recovery command still gets sent once if (!sendRecoveryCommand) continue; // the replica is already in recovery handling or is not live - - Throwable rootCause = SolrException.getRootCause(error.e); - log.error("Setting up to try to start recovery on replica "+replicaUrl+" after: "+rootCause); - + + Throwable rootCause = SolrException.getRootCause(error.e); + log.error("Setting up to try to start recovery on replica " + replicaUrl + " after: " + rootCause); + // try to send the recovery command to the downed replica in a background thread - CoreContainer coreContainer = req.getCore().getCoreDescriptor().getCoreContainer(); - LeaderInitiatedRecoveryThread lirThread = + CoreContainer coreContainer = req.getCore().getCoreDescriptor().getCoreContainer(); + LeaderInitiatedRecoveryThread lirThread = new LeaderInitiatedRecoveryThread(zkController, - coreContainer, - collection, - shardId, - error.req.node.getNodeProps(), - maxTries); + coreContainer, + collection, + shardId, + error.req.node.getNodeProps(), + maxTries, + cloudDesc.getCoreNodeName()); // core node name of current leader ExecutorService executor = coreContainer.getUpdateShardHandler().getUpdateExecutor(); - executor.execute(lirThread); + executor.execute(lirThread); } - + if (replicationTracker != null) { rsp.getResponseHeader().add(UpdateRequest.REPFACT, replicationTracker.getAchievedRf()); rsp.getResponseHeader().add(UpdateRequest.MIN_REPFACT, replicationTracker.minRf); diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java index 42f8d2568ab..155cd31d307 100644 --- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java @@ -18,9 +18,6 @@ package org.apache.solr.cloud; */ import java.io.File; -import java.net.ServerSocket; -import java.net.URI; -import java.net.URL; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -30,7 +27,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; -import org.apache.lucene.util.LuceneTestCase.AwaitsFix; import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.solr.JSONTestUtil; import org.apache.solr.SolrTestCaseJ4.SuppressSSL; @@ -39,6 +35,7 @@ import org.apache.solr.client.solrj.impl.HttpSolrServer; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.Replica; @@ -55,26 +52,24 @@ import org.slf4j.LoggerFactory; * Simulates HTTP partitions between a leader and replica but the replica does * not lose its ZooKeeper connection. */ + @Slow @SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776") -@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-6241") public class HttpPartitionTest extends AbstractFullDistribZkTestBase { - private static final transient Logger log = + protected static final transient Logger log = LoggerFactory.getLogger(HttpPartitionTest.class); // To prevent the test assertions firing too fast before cluster state // recognizes (and propagates) partitions - private static final long sleepMsBeforeHealPartition = 2000L; - - private static final int maxWaitSecsToSeeAllActive = 30; - - private Map proxies = new HashMap(); - + protected static final long sleepMsBeforeHealPartition = 2000L; + + protected static final int maxWaitSecsToSeeAllActive = 30; + public HttpPartitionTest() { super(); sliceCount = 2; - shardCount = 2; + shardCount = 3; } @Before @@ -87,60 +82,24 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase { @Override @After public void tearDown() throws Exception { - System.clearProperty("numShards"); - try { super.tearDown(); } catch (Exception exc) {} resetExceptionIgnores(); - - // close socket proxies after super.tearDown - if (!proxies.isEmpty()) { - for (SocketProxy proxy : proxies.values()) { - proxy.close(); - } - } } /** - * Overrides the parent implementation so that we can configure a socket proxy - * to sit infront of each Jetty server, which gives us the ability to simulate - * network partitions without having to fuss with IPTables (which is not very - * cross platform friendly). + * Overrides the parent implementation to install a SocketProxy in-front of the Jetty server. */ @Override public JettySolrRunner createJetty(File solrHome, String dataDir, String shardList, String solrConfigOverride, String schemaOverride) - throws Exception { - - JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), context, - 0, solrConfigOverride, schemaOverride, false, - getExtraServlets(), sslConfig, getExtraRequestFilters()); - jetty.setShards(shardList); - jetty.setDataDir(getDataDir(dataDir)); - - // setup to proxy Http requests to this server unless it is the control - // server - int proxyPort = getNextAvailablePort(); - jetty.setProxyPort(proxyPort); - jetty.start(); - - // create a socket proxy for the jetty server ... - SocketProxy proxy = new SocketProxy(proxyPort, jetty.getBaseUrl().toURI()); - proxies.put(proxy.getUrl(), proxy); - - return jetty; + throws Exception + { + return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride); } - - protected int getNextAvailablePort() throws Exception { - int port = -1; - try (ServerSocket s = new ServerSocket(0)) { - port = s.getLocalPort(); - } - return port; - } - + @Override public void doTest() throws Exception { waitForThingsToLevelOut(30000); @@ -148,12 +107,16 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase { // test a 1x2 collection testRf2(); + waitForThingsToLevelOut(30000); + // now do similar for a 1x3 collection while taking 2 replicas on-and-off // each time testRf3(); - // kill a leader and make sure recovery occurs as expected - testRf3WithLeaderFailover(); + waitForThingsToLevelOut(30000); + + // have the leader lose its Zk session temporarily + testLeaderZkSessionLoss(); } protected void testRf2() throws Exception { @@ -247,11 +210,9 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase { ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive); assertTrue("Expected 2 replicas for collection " + testCollectionName + " but found " + notLeaders.size() + "; clusterState: " - + printClusterStateInfo(), + + printClusterStateInfo(testCollectionName), notLeaders.size() == 2); - - sendDoc(1); - + // ok, now introduce a network partition between the leader and the replica SocketProxy proxy0 = getProxyForReplica(notLeaders.get(0)); @@ -289,127 +250,112 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase { log.warn("Could not delete collection {} after test completed", testCollectionName); } } - - protected void testRf3WithLeaderFailover() throws Exception { - // now let's create a partition in one of the replicas and outright - // kill the leader ... see what happens - // create a collection that has 1 shard but 3 replicas - String testCollectionName = "c8n_1x3_lf"; // _lf is leader fails - createCollection(testCollectionName, 1, 3, 1); + + // test inspired by SOLR-6511 + protected void testLeaderZkSessionLoss() throws Exception { + + String testCollectionName = "c8n_1x2_leader_session_loss"; + createCollection(testCollectionName, 1, 2, 1); cloudClient.setDefaultCollection(testCollectionName); - + sendDoc(1); - - List notLeaders = - ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive); - assertTrue("Expected 2 replicas for collection " + testCollectionName - + " but found " + notLeaders.size() + "; clusterState: " - + printClusterStateInfo(), - notLeaders.size() == 2); - - sendDoc(1); - - // ok, now introduce a network partition between the leader and the replica - SocketProxy proxy0 = null; - proxy0 = getProxyForReplica(notLeaders.get(0)); - - proxy0.close(); - - // indexing during a partition - sendDoc(2); - - Thread.sleep(sleepMsBeforeHealPartition); - - proxy0.reopen(); - - SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1)); - - proxy1.close(); - - sendDoc(3); - - Thread.sleep(sleepMsBeforeHealPartition); - proxy1.reopen(); - - // sent 4 docs in so far, verify they are on the leader and replica - notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive); - - sendDoc(4); - - assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 4); - - Replica leader = + + List notLeaders = + ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive); + assertTrue("Expected 1 replicas for collection " + testCollectionName + + " but found " + notLeaders.size() + "; clusterState: " + + printClusterStateInfo(testCollectionName), + notLeaders.size() == 1); + + Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1"); String leaderNode = leader.getNodeName(); assertNotNull("Could not find leader for shard1 of "+ - testCollectionName+"; clusterState: "+printClusterStateInfo(), leader); + testCollectionName+"; clusterState: "+printClusterStateInfo(testCollectionName), leader); JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader)); - - // since maxShardsPerNode is 1, we're safe to kill the leader - notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive); - proxy0 = getProxyForReplica(notLeaders.get(0)); - proxy0.close(); - - // indexing during a partition - // doc should be on leader and 1 replica - sendDoc(5); - assertDocExists(getHttpSolrServer(leader, testCollectionName), testCollectionName, "5"); - assertDocExists(getHttpSolrServer(notLeaders.get(1), testCollectionName), testCollectionName, "5"); + HttpSolrServer leaderSolr = getHttpSolrServer(leader, testCollectionName); + SolrInputDocument doc = new SolrInputDocument(); + doc.addField(id, String.valueOf(2)); + doc.addField("a_t", "hello" + 2); - Thread.sleep(sleepMsBeforeHealPartition); - - String shouldNotBeNewLeaderNode = notLeaders.get(0).getNodeName(); + // cause leader migration by expiring the current leader's zk session + chaosMonkey.expireSession(leaderJetty); - // kill the leader - leaderJetty.stop(); - - if (leaderJetty.isRunning()) - fail("Failed to stop the leader on "+leaderNode); - - SocketProxy oldLeaderProxy = getProxyForReplica(leader); - if (oldLeaderProxy != null) { - oldLeaderProxy.close(); - } else { - log.warn("No SocketProxy found for old leader node "+leaderNode); - } - - Thread.sleep(10000); // give chance for new leader to be elected. - - Replica newLeader = - cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 60000); - - assertNotNull("No new leader was elected after 60 seconds; clusterState: "+ - printClusterStateInfo(),newLeader); - - assertTrue("Expected node "+shouldNotBeNewLeaderNode+ - " to NOT be the new leader b/c it was out-of-sync with the old leader! ClusterState: "+ - printClusterStateInfo(), - !shouldNotBeNewLeaderNode.equals(newLeader.getNodeName())); - - proxy0.reopen(); - + String expectedNewLeaderCoreNodeName = notLeaders.get(0).getName(); long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS); while (System.nanoTime() < timeout) { - cloudClient.getZkStateReader().updateClusterState(true); + String currentLeaderName = null; + try { + Replica currentLeader = + cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1"); + currentLeaderName = currentLeader.getName(); + } catch (Exception exc) {} - List activeReps = getActiveOrRecoveringReplicas(testCollectionName, "shard1"); - if (activeReps.size() == 2) break; - Thread.sleep(1000); + if (expectedNewLeaderCoreNodeName.equals(currentLeaderName)) + break; // new leader was elected after zk session expiration + + Thread.sleep(500); + } + + Replica currentLeader = + cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1"); + assertEquals(expectedNewLeaderCoreNodeName, currentLeader.getName()); + + log.info("Sending doc 2 to old leader "+leader.getName()); + try { + leaderSolr.add(doc); + leaderSolr.shutdown(); + + Replica oldLeaderInRecovery = null; + for (Replica next : getActiveOrRecoveringReplicas(testCollectionName, "shard1")) { + if (next.getName().equals(leader.getName()) && + ZkStateReader.RECOVERING.equals(next.getStr(ZkStateReader.STATE_PROP))) + { + oldLeaderInRecovery = next; + break; + } + } + + // if the old leader is not active or recovering, the add should have failed + if (oldLeaderInRecovery != null) { + HttpSolrServer oldLeaderSolr = getHttpSolrServer(oldLeaderInRecovery, testCollectionName); + try { + assertDocExists(oldLeaderSolr, testCollectionName, "2"); + } finally { + oldLeaderSolr.shutdown(); + } + } else { + fail("Send doc 2 to old leader " + leader.getName() + + " should have failed! ClusterState: " + printClusterStateInfo(testCollectionName)); + } + + } catch (SolrException exc) { + // this is expected .. + leaderSolr = getHttpSolrServer(currentLeader, testCollectionName); + try { + leaderSolr.add(doc); // this should work + } finally { + leaderSolr.shutdown(); + } } List participatingReplicas = getActiveOrRecoveringReplicas(testCollectionName, "shard1"); - assertTrue("Expected 2 of 3 replicas to be active but only found "+ - participatingReplicas.size()+"; "+participatingReplicas+"; clusterState: "+printClusterStateInfo(), - participatingReplicas.size() == 2); - - sendDoc(6); - Set replicasToCheck = new HashSet<>(); for (Replica stillUp : participatingReplicas) replicasToCheck.add(stillUp.getName()); waitToSeeReplicasActive(testCollectionName, "shard1", replicasToCheck, 20); - assertDocsExistInAllReplicas(participatingReplicas, testCollectionName, 1, 6); + assertDocsExistInAllReplicas(participatingReplicas, testCollectionName, 1, 2); + + // try to clean up + try { + CollectionAdminRequest req = new CollectionAdminRequest.Delete(); + req.setCollectionName(testCollectionName); + req.process(cloudClient); + } catch (Exception e) { + // don't fail the test + log.warn("Could not delete collection {} after test completed", testCollectionName); + } } protected List getActiveOrRecoveringReplicas(String testCollectionName, String shardId) throws Exception { @@ -431,21 +377,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase { replicas.addAll(activeReplicas.values()); return replicas; } - - protected SocketProxy getProxyForReplica(Replica replica) throws Exception { - String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP); - assertNotNull(replicaBaseUrl); - URL baseUrl = new URL(replicaBaseUrl); - - SocketProxy proxy = proxies.get(baseUrl.toURI()); - if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) { - baseUrl = new URL(baseUrl.toExternalForm() + "/"); - proxy = proxies.get(baseUrl.toURI()); - } - assertNotNull("No proxy found for " + baseUrl + "!", proxy); - return proxy; - } - + protected void assertDocsExistInAllReplicas(List notLeaders, String testCollectionName, int firstDocId, int lastDocId) throws Exception { @@ -501,33 +433,11 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase { protected void assertDocExists(HttpSolrServer solr, String coll, String docId) throws Exception { QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId, "distrib", "false")); NamedList rsp = solr.request(qr); - String match = - JSONTestUtil.matchObj("/id", rsp.get("doc"), new Integer(docId)); + String match = JSONTestUtil.matchObj("/id", rsp.get("doc"), new Integer(docId)); assertTrue("Doc with id=" + docId + " not found in " + solr.getBaseURL() + " due to: " + match + "; rsp="+rsp, match == null); } - protected JettySolrRunner getJettyOnPort(int port) { - JettySolrRunner theJetty = null; - for (JettySolrRunner jetty : jettys) { - if (port == jetty.getLocalPort()) { - theJetty = jetty; - break; - } - } - - if (theJetty == null) { - if (controlJetty.getLocalPort() == port) { - theJetty = controlJetty; - } - } - - if (theJetty == null) - fail("Not able to find JettySolrRunner for port: "+port); - - return theJetty; - } - protected int getReplicaPort(Replica replica) { String replicaNode = replica.getNodeName(); String tmp = replicaNode.substring(replicaNode.indexOf(':')+1); @@ -580,7 +490,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase { if (!allReplicasUp) fail("Didn't see replicas "+ replicasToCheck + - " come up within " + maxWaitMs + " ms! ClusterState: " + printClusterStateInfo()); + " come up within " + maxWaitMs + " ms! ClusterState: " + printClusterStateInfo(testCollectionName)); long diffMs = (System.currentTimeMillis() - startMs); log.info("Took " + diffMs + " ms to see replicas ["+replicasToCheck+"] become active."); diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderFailoverAfterPartitionTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderFailoverAfterPartitionTest.java new file mode 100644 index 00000000000..bfc4c239421 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/LeaderFailoverAfterPartitionTest.java @@ -0,0 +1,181 @@ +package org.apache.solr.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.util.LuceneTestCase.Slow; +import org.apache.solr.SolrTestCaseJ4.SuppressSSL; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.common.cloud.Replica; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Tests leader-initiated recovery scenarios after a leader node fails + * and one of the replicas is out-of-sync. + */ +@Slow +@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776") +public class LeaderFailoverAfterPartitionTest extends HttpPartitionTest { + + public LeaderFailoverAfterPartitionTest() { + super(); + } + + + @Override + public void doTest() throws Exception { + waitForThingsToLevelOut(30000); + + // kill a leader and make sure recovery occurs as expected + testRf3WithLeaderFailover(); + } + + protected void testRf3WithLeaderFailover() throws Exception { + // now let's create a partition in one of the replicas and outright + // kill the leader ... see what happens + // create a collection that has 1 shard but 3 replicas + String testCollectionName = "c8n_1x3_lf"; // _lf is leader fails + createCollection(testCollectionName, 1, 3, 1); + cloudClient.setDefaultCollection(testCollectionName); + + sendDoc(1); + + List notLeaders = + ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive); + assertTrue("Expected 2 replicas for collection " + testCollectionName + + " but found " + notLeaders.size() + "; clusterState: " + + printClusterStateInfo(testCollectionName), + notLeaders.size() == 2); + + // ok, now introduce a network partition between the leader and the replica + SocketProxy proxy0 = null; + proxy0 = getProxyForReplica(notLeaders.get(0)); + + proxy0.close(); + + // indexing during a partition + sendDoc(2); + + Thread.sleep(sleepMsBeforeHealPartition); + + proxy0.reopen(); + + SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1)); + + proxy1.close(); + + sendDoc(3); + + Thread.sleep(sleepMsBeforeHealPartition); + proxy1.reopen(); + + // sent 4 docs in so far, verify they are on the leader and replica + notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive); + + sendDoc(4); + + assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 4); + + Replica leader = + cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1"); + String leaderNode = leader.getNodeName(); + assertNotNull("Could not find leader for shard1 of "+ + testCollectionName+"; clusterState: "+printClusterStateInfo(testCollectionName), leader); + JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader)); + + // since maxShardsPerNode is 1, we're safe to kill the leader + notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive); + proxy0 = getProxyForReplica(notLeaders.get(0)); + proxy0.close(); + + // indexing during a partition + // doc should be on leader and 1 replica + sendDoc(5); + + assertDocExists(getHttpSolrServer(leader, testCollectionName), testCollectionName, "5"); + assertDocExists(getHttpSolrServer(notLeaders.get(1), testCollectionName), testCollectionName, "5"); + + Thread.sleep(sleepMsBeforeHealPartition); + + String shouldNotBeNewLeaderNode = notLeaders.get(0).getNodeName(); + + //chaosMonkey.expireSession(leaderJetty); + // kill the leader + leaderJetty.stop(); + if (leaderJetty.isRunning()) + fail("Failed to stop the leader on "+leaderNode); + + SocketProxy oldLeaderProxy = getProxyForReplica(leader); + if (oldLeaderProxy != null) { + oldLeaderProxy.close(); + } else { + log.warn("No SocketProxy found for old leader node "+leaderNode); + } + + Thread.sleep(10000); // give chance for new leader to be elected. + + Replica newLeader = + cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 60000); + + assertNotNull("No new leader was elected after 60 seconds; clusterState: "+ + printClusterStateInfo(testCollectionName),newLeader); + + assertTrue("Expected node "+shouldNotBeNewLeaderNode+ + " to NOT be the new leader b/c it was out-of-sync with the old leader! ClusterState: "+ + printClusterStateInfo(testCollectionName), + !shouldNotBeNewLeaderNode.equals(newLeader.getNodeName())); + + proxy0.reopen(); + + long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS); + while (System.nanoTime() < timeout) { + cloudClient.getZkStateReader().updateClusterState(true); + + List activeReps = getActiveOrRecoveringReplicas(testCollectionName, "shard1"); + if (activeReps.size() >= 2) break; + Thread.sleep(1000); + } + + List participatingReplicas = getActiveOrRecoveringReplicas(testCollectionName, "shard1"); + assertTrue("Expected 2 of 3 replicas to be active but only found "+ + participatingReplicas.size()+"; "+participatingReplicas+"; clusterState: "+ + printClusterStateInfo(testCollectionName), + participatingReplicas.size() >= 2); + + sendDoc(6); + + Set replicasToCheck = new HashSet<>(); + for (Replica stillUp : participatingReplicas) + replicasToCheck.add(stillUp.getName()); + waitToSeeReplicasActive(testCollectionName, "shard1", replicasToCheck, 20); + assertDocsExistInAllReplicas(participatingReplicas, testCollectionName, 1, 6); + + // try to clean up + try { + CollectionAdminRequest req = new CollectionAdminRequest.Delete(); + req.setCollectionName(testCollectionName); + req.process(cloudClient); + } catch (Exception e) { + // don't fail the test + log.warn("Could not delete collection {} after test completed", testCollectionName); + } + } +} diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java index f5c01754fa7..fd6e6775b5e 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java @@ -26,7 +26,9 @@ import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE; import java.io.File; import java.io.IOException; +import java.net.ServerSocket; import java.net.URI; +import java.net.URL; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -81,6 +83,8 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import org.noggit.CharArr; +import org.noggit.JSONWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,6 +127,8 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes private boolean cloudInit; protected boolean checkCreatedVsState; protected boolean useJettyDataDir = true; + + protected Map proxies = new HashMap(); public static class CloudJettyRunner { public JettySolrRunner jetty; @@ -515,6 +521,77 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes return jetty; } + /** + * Creates a JettySolrRunner with a socket proxy sitting infront of the Jetty server, + * which gives us the ability to simulate network partitions without having to fuss + * with IPTables. + */ + public JettySolrRunner createProxiedJetty(File solrHome, String dataDir, + String shardList, String solrConfigOverride, String schemaOverride) + throws Exception { + + JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), context, + 0, solrConfigOverride, schemaOverride, false, + getExtraServlets(), sslConfig, getExtraRequestFilters()); + jetty.setShards(shardList); + jetty.setDataDir(getDataDir(dataDir)); + + // setup to proxy Http requests to this server unless it is the control + // server + int proxyPort = getNextAvailablePort(); + jetty.setProxyPort(proxyPort); + jetty.start(); + + // create a socket proxy for the jetty server ... + SocketProxy proxy = new SocketProxy(proxyPort, jetty.getBaseUrl().toURI()); + proxies.put(proxy.getUrl(), proxy); + + return jetty; + } + + protected JettySolrRunner getJettyOnPort(int port) { + JettySolrRunner theJetty = null; + for (JettySolrRunner jetty : jettys) { + if (port == jetty.getLocalPort()) { + theJetty = jetty; + break; + } + } + + if (theJetty == null) { + if (controlJetty.getLocalPort() == port) { + theJetty = controlJetty; + } + } + + if (theJetty == null) + fail("Not able to find JettySolrRunner for port: "+port); + + return theJetty; + } + + protected SocketProxy getProxyForReplica(Replica replica) throws Exception { + String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP); + assertNotNull(replicaBaseUrl); + URL baseUrl = new URL(replicaBaseUrl); + + SocketProxy proxy = proxies.get(baseUrl.toURI()); + if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) { + baseUrl = new URL(baseUrl.toExternalForm() + "/"); + proxy = proxies.get(baseUrl.toURI()); + } + assertNotNull("No proxy found for " + baseUrl + "!", proxy); + return proxy; + } + + protected int getNextAvailablePort() throws Exception { + int port = -1; + try (ServerSocket s = new ServerSocket(0)) { + port = s.getLocalPort(); + } + return port; + } + private File getRelativeSolrHomePath(File solrHome) { String path = SolrResourceLoader.normalizeDir(new File(".").getAbsolutePath()); String base = new File(solrHome.getPath()).getAbsolutePath(); @@ -1467,6 +1544,13 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes System.clearProperty("zkHost"); System.clearProperty("numShards"); + + // close socket proxies after super.tearDown + if (!proxies.isEmpty()) { + for (SocketProxy proxy : proxies.values()) { + proxy.close(); + } + } } @Override @@ -1860,7 +1944,23 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes } protected String printClusterStateInfo() throws Exception { + return printClusterStateInfo(null); + } + + protected String printClusterStateInfo(String collection) throws Exception { cloudClient.getZkStateReader().updateClusterState(true); - return String.valueOf(cloudClient.getZkStateReader().getClusterState()); - } + String cs = null; + ClusterState clusterState = cloudClient.getZkStateReader().getClusterState(); + if (collection != null) { + cs = clusterState.getCollection(collection).toString(); + } else { + Map map = new HashMap(); + for (String coll : clusterState.getCollections()) + map.put(coll, clusterState.getCollection(coll)); + CharArr out = new CharArr(); + new JSONWriter(out, 2).write(map); + cs = out.toString(); + } + return cs; + } } diff --git a/solr/core/src/test/org/apache/solr/cloud/SocketProxy.java b/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java similarity index 100% rename from solr/core/src/test/org/apache/solr/cloud/SocketProxy.java rename to solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java