diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 012b7f5d090..6fc10c6c397 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -210,6 +210,9 @@ Bug Fixes * SOLR-8058: Fix the exclusion filter so that collections that start with js, css, img, tpl can be accessed. (Upayavira, Steve Rowe, Anshum Gupta) +* SOLR-8069: Ensure that only the valid ZooKeeper registered leader can put a replica into Leader + Initiated Recovery. (Mark Miller, Jessica Cheng, Anshum Gupta) + Optimizations ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java b/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java index fa767646967..bf1ef354a0c 100644 --- a/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java +++ b/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java @@ -43,7 +43,7 @@ public class CloudDescriptor { volatile Slice.State shardState = Slice.State.ACTIVE; volatile String shardParent = null; - volatile boolean isLeader = false; + private volatile boolean isLeader = false; volatile Replica.State lastPublished = Replica.State.ACTIVE; public static final String NUM_SHARDS = "numShards"; diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java index 6532a8b1ce2..0b8ea14c8a3 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java @@ -467,7 +467,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { } zkController.ensureReplicaInLeaderInitiatedRecovery(cc, - collection, shardId, coreNodeProps, coreNodeName, + collection, shardId, coreNodeProps, core.getCoreDescriptor(), false /* forcePublishState */); } } diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java b/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java index a67e8bdeea6..1321adb6d0b 100644 --- a/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java +++ b/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java @@ -13,6 +13,7 @@ import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction; import org.apache.solr.common.util.Utils; import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.CoreDescriptor; import org.apache.zookeeper.KeeperException; import org.apache.solr.util.RTimer; import org.slf4j.Logger; @@ -54,7 +55,7 @@ public class LeaderInitiatedRecoveryThread extends Thread { protected String shardId; protected ZkCoreNodeProps nodeProps; protected int maxTries; - protected String leaderCoreNodeName; + private CoreDescriptor leaderCd; public LeaderInitiatedRecoveryThread(ZkController zkController, CoreContainer cc, @@ -62,7 +63,7 @@ public class LeaderInitiatedRecoveryThread extends Thread { String shardId, ZkCoreNodeProps nodeProps, int maxTries, - String leaderCoreNodeName) + CoreDescriptor leaderCd) { super("LeaderInitiatedRecoveryThread-"+nodeProps.getCoreName()); this.zkController = zkController; @@ -71,8 +72,7 @@ public class LeaderInitiatedRecoveryThread extends Thread { this.shardId = shardId; this.nodeProps = nodeProps; this.maxTries = maxTries; - this.leaderCoreNodeName = leaderCoreNodeName; - + this.leaderCd = leaderCd; setDaemon(true); } @@ -171,7 +171,7 @@ public class LeaderInitiatedRecoveryThread extends Thread { protected void updateLIRState(String replicaCoreNodeName) { zkController.updateLeaderInitiatedRecoveryState(collection, shardId, - replicaCoreNodeName, Replica.State.DOWN, leaderCoreNodeName, true); + replicaCoreNodeName, Replica.State.DOWN, leaderCd, true); } protected void sendRecoveryCommandWithRetry() throws Exception { @@ -257,6 +257,7 @@ public class LeaderInitiatedRecoveryThread extends Thread { break; } + String leaderCoreNodeName = leaderCd.getCloudDescriptor().getCoreNodeName(); // stop trying if I'm no longer the leader if (leaderCoreNodeName != null && collection != null) { String leaderCoreNodeNameFromZk = null; @@ -273,6 +274,13 @@ public class LeaderInitiatedRecoveryThread extends Thread { continueTrying = false; break; } + if (!leaderCd.getCloudDescriptor().isLeader()) { + log.warn("Stop trying to send recovery command to downed replica core=" + coreNeedingRecovery + + ",coreNodeName=" + replicaCoreNodeName + " on " + replicaNodeName + " because " + + leaderCoreNodeName + " is no longer the leader!"); + continueTrying = false; + break; + } } // additional safeguard against the replica trying to be in the active state @@ -297,9 +305,9 @@ public class LeaderInitiatedRecoveryThread extends Thread { " on node "+replicaNodeName+" ack'd the leader initiated recovery state, " + "no need to keep trying to send recovery command"); } else { - String leaderCoreNodeName = zkStateReader.getLeaderRetry(collection, shardId, 5000).getName(); + String lcnn = zkStateReader.getLeaderRetry(collection, shardId, 5000).getName(); List replicaProps = - zkStateReader.getReplicaProps(collection, shardId, leaderCoreNodeName); + zkStateReader.getReplicaProps(collection, shardId, lcnn); if (replicaProps != null && replicaProps.size() > 0) { for (ZkCoreNodeProps prop : replicaProps) { final Replica replica = (Replica) prop.getNodeProps(); diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index fa23ce5aaef..178fe02da6b 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -1205,7 +1205,7 @@ public final class ZkController { if (state == Replica.State.ACTIVE) { // trying to become active, so leader-initiated state must be recovering if (lirState == Replica.State.RECOVERING) { - updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.ACTIVE, null, true); + updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.ACTIVE, cd, true); } else if (lirState == Replica.State.DOWN) { throw new SolrException(ErrorCode.INVALID_STATE, "Cannot publish state of core '" + cd.getName() + "' as active without recovering first!"); @@ -1213,7 +1213,7 @@ public final class ZkController { } else if (state == Replica.State.RECOVERING) { // if it is currently DOWN, then trying to enter into recovering state is good if (lirState == Replica.State.DOWN) { - updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.RECOVERING, null, true); + updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.RECOVERING, cd, true); } } } @@ -1981,7 +1981,7 @@ public final class ZkController { public boolean ensureReplicaInLeaderInitiatedRecovery( final CoreContainer container, final String collection, final String shardId, final ZkCoreNodeProps replicaCoreProps, - String leaderCoreNodeName, boolean forcePublishState) + CoreDescriptor leaderCd, boolean forcePublishState) throws KeeperException, InterruptedException { final String replicaUrl = replicaCoreProps.getCoreUrl(); @@ -2020,7 +2020,7 @@ public final class ZkController { shardId, replicaCoreProps, 120, - leaderCoreNodeName); // core node name of current leader + leaderCd); ExecutorService executor = container.getUpdateShardHandler().getUpdateExecutor(); try { MDC.put("DistributedUpdateProcessor.replicaUrlToRecover", replicaCoreProps.getCoreUrl()); @@ -2115,14 +2115,19 @@ public final class ZkController { } public void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName, - Replica.State state, String leaderCoreNodeName, boolean retryOnConnLoss) { + Replica.State state, CoreDescriptor leaderCd, boolean retryOnConnLoss) { if (collection == null || shardId == null || coreNodeName == null) { log.warn("Cannot set leader-initiated recovery state znode to " + state.toString() + " using: collection=" + collection + "; shardId=" + shardId + "; coreNodeName=" + coreNodeName); return; // if we don't have complete data about a core in cloud mode, do nothing } + + assert leaderCd != null; + assert leaderCd.getCloudDescriptor() != null; + String leaderCoreNodeName = leaderCd.getCloudDescriptor().getCoreNodeName(); + String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreNodeName); if (state == Replica.State.ACTIVE) { @@ -2158,7 +2163,7 @@ public final class ZkController { try { if (state == Replica.State.DOWN) { - markShardAsDownIfLeader(collection, shardId, leaderCoreNodeName, znodePath, znodeData, retryOnConnLoss); + markShardAsDownIfLeader(collection, shardId, leaderCd, znodePath, znodeData, retryOnConnLoss); } else { // must retry on conn loss otherwise future election attempts may assume wrong LIR state if (zkClient.exists(znodePath, true)) { @@ -2184,18 +2189,36 @@ public final class ZkController { * in ZK. This ensures that a long running network partition caused by GC etc * doesn't let us mark a node as down *after* we've already lost our session */ - private void markShardAsDownIfLeader(String collection, String shardId, String leaderCoreNodeName, + private void markShardAsDownIfLeader(String collection, String shardId, CoreDescriptor leaderCd, String znodePath, byte[] znodeData, boolean retryOnConnLoss) throws KeeperException, InterruptedException { - String leaderSeqPath = getLeaderSeqPath(collection, leaderCoreNodeName); - if (leaderSeqPath == null) { - throw new NotLeaderException(ErrorCode.SERVER_ERROR, - "Failed to update data to 'down' for znode: " + znodePath + - " because the zookeeper leader sequence for leader: " + leaderCoreNodeName + " is null"); + + + if (!leaderCd.getCloudDescriptor().isLeader()) { + log.info("No longer leader, aborting attempt to mark shard down as part of LIR"); + throw new NotLeaderException(ErrorCode.SERVER_ERROR, "Locally, we do not think we are the leader."); } + + ContextKey key = new ContextKey(collection, leaderCd.getCloudDescriptor().getCoreNodeName()); + ElectionContext context = electionContexts.get(key); + + // we make sure we locally think we are the leader before and after getting the context - then + // we only try zk if we still think we are the leader and have our leader context + if (context == null || !leaderCd.getCloudDescriptor().isLeader()) { + log.info("No longer leader, aborting attempt to mark shard down as part of LIR"); + throw new NotLeaderException(ErrorCode.SERVER_ERROR, "Locally, we do not think we are the leader."); + } + + // we think we are the leader - get the expected shard leader version + // we use this version and multi to ensure *only* the current zk registered leader + // for a shard can put a replica into LIR + + Integer leaderZkNodeParentVersion = ((ShardLeaderElectionContextBase)context).leaderZkNodeParentVersion; + + // TODO: should we do this optimistically to avoid races? if (zkClient.exists(znodePath, retryOnConnLoss)) { List ops = new ArrayList<>(2); - ops.add(Op.check(leaderSeqPath, -1)); // version doesn't matter, the seq path is unique + ops.add(Op.check(new org.apache.hadoop.fs.Path(((ShardLeaderElectionContextBase)context).leaderPath).getParent().toString(), leaderZkNodeParentVersion)); ops.add(Op.setData(znodePath, znodeData, -1)); zkClient.multi(ops, retryOnConnLoss); } else { @@ -2205,8 +2228,10 @@ public final class ZkController { } catch (KeeperException.NodeExistsException nee) { // if it exists, that's great! } + + // we only create the entry if the context we are using is registered as the current leader in ZK List ops = new ArrayList<>(2); - ops.add(Op.check(leaderSeqPath, -1)); // version doesn't matter, the seq path is unique + ops.add(Op.check(new org.apache.hadoop.fs.Path(((ShardLeaderElectionContextBase)context).leaderPath).getParent().toString(), leaderZkNodeParentVersion)); ops.add(Op.create(znodePath, znodeData, zkClient.getZkACLProvider().getACLsToAdd(znodePath), CreateMode.PERSISTENT)); zkClient.multi(ops, retryOnConnLoss); diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index 46dd5b8d787..3228926d676 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -886,7 +886,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { collection, shardId, stdNode.getNodeProps(), - leaderCoreNodeName, + req.getCore().getCoreDescriptor(), false /* forcePublishState */ ); } catch (Exception exc) { diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java index 129d762e9a6..cfe4b5efca2 100644 --- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java @@ -17,6 +17,19 @@ package org.apache.solr.cloud; * limitations under the License. */ +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; + import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.solr.JSONTestUtil; import org.apache.solr.SolrTestCaseJ4.SuppressSSL; @@ -40,23 +53,12 @@ import org.apache.solr.core.CoreContainer; import org.apache.solr.core.SolrCore; import org.apache.solr.servlet.SolrDispatchFilter; import org.apache.solr.update.UpdateLog; +import org.apache.solr.util.MockCoreContainer.MockCoreDescriptor; import org.apache.solr.util.RTimer; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - /** * Simulates HTTP partitions between a leader and replica but the replica does * not lose its ZooKeeper connection. @@ -145,7 +147,22 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase { ZkCoreNodeProps replicaCoreNodeProps = new ZkCoreNodeProps(notLeader); String replicaUrl = replicaCoreNodeProps.getCoreUrl(); - zkController.updateLeaderInitiatedRecoveryState(testCollectionName, shardId, notLeader.getName(), Replica.State.DOWN, leader.getName(), true); + MockCoreDescriptor cd = new MockCoreDescriptor() { + public CloudDescriptor getCloudDescriptor() { + return new CloudDescriptor(leader.getStr(ZkStateReader.CORE_NAME_PROP), new Properties(), this) { + @Override + public String getCoreNodeName() { + return leader.getName(); + } + @Override + public boolean isLeader() { + return true; + } + }; + } + }; + + zkController.updateLeaderInitiatedRecoveryState(testCollectionName, shardId, notLeader.getName(), Replica.State.DOWN, cd, true); Map lirStateMap = zkController.getLeaderInitiatedRecoveryStateObject(testCollectionName, shardId, notLeader.getName()); assertNotNull(lirStateMap); assertSame(Replica.State.DOWN, Replica.State.getState((String) lirStateMap.get(ZkStateReader.STATE_PROP))); diff --git a/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java b/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java index b5404eb7227..4124d94ebdf 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java @@ -1,5 +1,7 @@ package org.apache.solr.cloud; +import java.util.Properties; + /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -23,8 +25,10 @@ import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.ZkCoreNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.servlet.SolrDispatchFilter; import org.apache.solr.util.TimeOut; +import org.apache.solr.util.MockCoreContainer.MockCoreDescriptor; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; @@ -57,13 +61,29 @@ public class TestLeaderInitiatedRecoveryThread extends AbstractFullDistribZkTest assertNotNull(notLeader); Replica replica = cloudClient.getZkStateReader().getClusterState().getReplica(DEFAULT_COLLECTION, notLeader.coreNodeName); ZkCoreNodeProps replicaCoreNodeProps = new ZkCoreNodeProps(replica); + + MockCoreDescriptor cd = new MockCoreDescriptor() { + public CloudDescriptor getCloudDescriptor() { + return new CloudDescriptor(shardToLeaderJetty.get(SHARD1).info.getStr(ZkStateReader.CORE_NAME_PROP), new Properties(), this) { + @Override + public String getCoreNodeName() { + return shardToLeaderJetty.get(SHARD1).info.getStr(ZkStateReader.CORE_NODE_NAME_PROP); + } + @Override + public boolean isLeader() { + return true; + } + }; + } + }; /* 1. Test that publishDownState throws exception when zkController.isReplicaInRecoveryHandling == false */ try { + LeaderInitiatedRecoveryThread thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(), - DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName); + DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, cd); assertFalse(zkController.isReplicaInRecoveryHandling(replicaCoreNodeProps.getCoreUrl())); thread.run(); fail("publishDownState should not have succeeded because replica url is not marked in leader initiated recovery in ZkController"); @@ -76,7 +96,7 @@ public class TestLeaderInitiatedRecoveryThread extends AbstractFullDistribZkTest 2. Test that a non-live replica cannot be put into LIR or down state */ LeaderInitiatedRecoveryThread thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(), - DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName); + DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, cd); // kill the replica int children = cloudClient.getZkStateReader().getZkClient().getChildren("/live_nodes", null, true).size(); ChaosMonkey.stop(notLeader.jetty); @@ -107,7 +127,7 @@ public class TestLeaderInitiatedRecoveryThread extends AbstractFullDistribZkTest waitForRecoveriesToFinish(true); thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(), - DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName) { + DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, cd) { @Override protected void updateLIRState(String replicaCoreNodeName) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", new KeeperException.ConnectionLossException()); @@ -122,7 +142,7 @@ public class TestLeaderInitiatedRecoveryThread extends AbstractFullDistribZkTest 4. Test that if ZK connection loss or session expired then thread should not attempt to publish down state even if forcePublish=true */ thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(), - DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName) { + DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, cd) { @Override protected void updateLIRState(String replicaCoreNodeName) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", new KeeperException.SessionExpiredException()); @@ -137,7 +157,7 @@ public class TestLeaderInitiatedRecoveryThread extends AbstractFullDistribZkTest 5. Test that any exception other then ZK connection loss or session expired should publish down state only if forcePublish=true */ thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(), - DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName) { + DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, cd) { @Override protected void updateLIRState(String replicaCoreNodeName) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "bogus exception"); @@ -171,11 +191,12 @@ public class TestLeaderInitiatedRecoveryThread extends AbstractFullDistribZkTest /* 6. Test that non-leader cannot set LIR nodes */ + filter = (SolrDispatchFilter) notLeader.jetty.getDispatchFilter().getFilter(); zkController = filter.getCores().getZkController(); thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(), - DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName) { + DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, filter.getCores().getCores().iterator().next().getCoreDescriptor()) { @Override protected void updateLIRState(String replicaCoreNodeName) { try { @@ -197,7 +218,7 @@ public class TestLeaderInitiatedRecoveryThread extends AbstractFullDistribZkTest filter = (SolrDispatchFilter) leaderRunner.jetty.getDispatchFilter().getFilter(); zkController = filter.getCores().getZkController(); thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(), - DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName); + DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, filter.getCores().getCores().iterator().next().getCoreDescriptor()); thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false); timeOut = new TimeOut(30, TimeUnit.SECONDS); while (!timeOut.hasTimedOut()) {