diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 50eca9c4d89..0831a92a6e9 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -200,6 +200,9 @@ Bug Fixes * SOLR-6511: Fencepost error in LeaderInitiatedRecoveryThread (Timothy Potter) +* SOLR-6530: Commits under network partitions can put any node in down state. + (Ramkumar Aiyengar, Alan Woodward, Mark Miller, shalin) + Other Changes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index 4d261c1edba..e5909d33ee1 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -808,6 +808,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { if (phase != DistribPhase.FROMLEADER) continue; // don't have non-leaders try to recovery other nodes + // commits are special -- they can run on any node irrespective of whether it is a leader or not + // we don't want to run recovery on a node which missed a commit command + if (error.req.uReq.getParams().get(COMMIT_END_POINT) != null) + continue; + final String replicaUrl = error.req.node.getUrl(); // if the remote replica failed the request because of leader change (SOLR-6511), then fail the request @@ -839,7 +844,17 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { " " + shardId + " before putting " + replicaUrl + " into leader-initiated recovery due to: " + exc); } - if (cloudDesc.getCoreNodeName().equals(leaderCoreNodeName)) { + List myReplicas = zkController.getZkStateReader().getReplicaProps(collection, + cloudDesc.getShardId(), cloudDesc.getCoreNodeName()); + boolean foundErrorNodeInReplicaList = false; + for (ZkCoreNodeProps replicaProp : myReplicas) { + if (((Replica) replicaProp.getNodeProps()).getName().equals(((Replica)stdNode.getNodeProps().getNodeProps()).getName())) { + foundErrorNodeInReplicaList = true; + break; + } + } + + if (cloudDesc.getCoreNodeName().equals(leaderCoreNodeName) && foundErrorNodeInReplicaList) { try { // if false, then the node is probably not "live" anymore sendRecoveryCommand = @@ -866,10 +881,16 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // will go ahead and try to send the recovery command once after this error } } else { - // not the leader anymore maybe? + // not the leader anymore maybe or the error'd node is not my replica? sendRecoveryCommand = false; - log.warn("Core "+cloudDesc.getCoreNodeName()+" is no longer the leader for "+collection+" "+ - shardId+", no request recovery command will be sent!"); + if (!foundErrorNodeInReplicaList) { + log.warn("Core "+cloudDesc.getCoreNodeName()+" belonging to "+collection+" "+ + shardId+", does not have error'd node " + stdNode.getNodeProps().getCoreUrl() + " as a replica. " + + "No request recovery command will be sent!"); + } else { + log.warn("Core "+cloudDesc.getCoreNodeName()+" is no longer the leader for "+collection+" "+ + shardId+", no request recovery command will be sent!"); + } } } // else not a StdNode, recovery command still gets sent once diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java new file mode 100644 index 00000000000..aee65732ae0 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java @@ -0,0 +1,163 @@ +package org.apache.solr.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.File; +import java.util.List; + +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.HttpSolrServer; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.ZkCoreNodeProps; +import org.junit.After; +import org.junit.Before; + +public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest { + + private static final long sleepMsBeforeHealPartition = 2000L; + + public LeaderInitiatedRecoveryOnCommitTest() { + super(); + sliceCount = 1; + shardCount = 4; + } + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + System.setProperty("numShards", Integer.toString(sliceCount)); + } + + @Override + @After + public void tearDown() throws Exception { + System.clearProperty("numShards"); + + try { + super.tearDown(); + } catch (Exception exc) { + } + + resetExceptionIgnores(); + + // close socket proxies after super.tearDown + if (!proxies.isEmpty()) { + for (SocketProxy proxy : proxies.values()) { + proxy.close(); + } + } + } + + @Override + public void doTest() throws Exception { + oneShardTest(); + multiShardTest(); + } + + private void multiShardTest() throws Exception { + // create a collection that has 1 shard and 3 replicas + String testCollectionName = "c8n_2x2_commits"; + createCollection(testCollectionName, 2, 2, 1); + cloudClient.setDefaultCollection(testCollectionName); + + List notLeaders = + ensureAllReplicasAreActive(testCollectionName, "shard1", 2, 2, 30); + assertTrue("Expected 1 replicas for collection " + testCollectionName + + " but found " + notLeaders.size() + "; clusterState: " + + printClusterStateInfo(), + notLeaders.size() == 1); + + // let's put the leader in it's own partition, no replicas can contact it now + Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1"); + SocketProxy leaderProxy = getProxyForReplica(leader); + leaderProxy.close(); + + // let's find the leader of shard2 and ask him to commit + Replica shard2Leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard2"); + HttpSolrServer server = new HttpSolrServer(ZkCoreNodeProps.getCoreUrl(shard2Leader.getStr("base_url"), shard2Leader.getStr("core"))); + server.commit(); + + Thread.sleep(sleepMsBeforeHealPartition); + + cloudClient.getZkStateReader().updateClusterState(true); // get the latest state + leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1"); + assertEquals("Leader was not active", "active", leader.getStr("state")); + + // try to clean up + try { + CollectionAdminRequest req = new CollectionAdminRequest.Delete(); + req.setCollectionName(testCollectionName); + req.process(cloudClient); + } catch (Exception e) { + // don't fail the test + log.warn("Could not delete collection {} after test completed", testCollectionName); + } + } + + private void oneShardTest() throws Exception { + // create a collection that has 1 shard and 3 replicas + String testCollectionName = "c8n_1x3_commits"; + createCollection(testCollectionName, 1, 3, 1); + cloudClient.setDefaultCollection(testCollectionName); + + List notLeaders = + ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, 30); + assertTrue("Expected 2 replicas for collection " + testCollectionName + + " but found " + notLeaders.size() + "; clusterState: " + + printClusterStateInfo(), + notLeaders.size() == 2); + + // let's put the leader in it's own partition, no replicas can contact it now + Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1"); + SocketProxy leaderProxy = getProxyForReplica(leader); + leaderProxy.close(); + + Replica replica = notLeaders.get(0); + HttpSolrServer server = new HttpSolrServer(ZkCoreNodeProps.getCoreUrl(replica.getStr("base_url"), replica.getStr("core"))); + server.commit(); + + Thread.sleep(sleepMsBeforeHealPartition); + + cloudClient.getZkStateReader().updateClusterState(true); // get the latest state + leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1"); + assertEquals("Leader was not active", "active", leader.getStr("state")); + + // try to clean up + try { + CollectionAdminRequest req = new CollectionAdminRequest.Delete(); + req.setCollectionName(testCollectionName); + req.process(cloudClient); + } catch (Exception e) { + // don't fail the test + log.warn("Could not delete collection {} after test completed", testCollectionName); + } + } + + /** + * Overrides the parent implementation to install a SocketProxy in-front of the Jetty server. + */ + @Override + public JettySolrRunner createJetty(File solrHome, String dataDir, + String shardList, String solrConfigOverride, String schemaOverride) + throws Exception { + return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride); + } + +} \ No newline at end of file