SOLR-8069: Ensure that only the valid ZooKeeper registered leader can put a replica into Leader Initiated Recovery.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1704836 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2015-09-23 12:36:02 +00:00
parent 69e5f9bdf4
commit bd464ee9ed
8 changed files with 118 additions and 44 deletions

View File

@ -210,6 +210,9 @@ Bug Fixes
* SOLR-8058: Fix the exclusion filter so that collections that start with js, css, img, tpl
can be accessed. (Upayavira, Steve Rowe, Anshum Gupta)
* SOLR-8069: Ensure that only the valid ZooKeeper registered leader can put a replica into Leader
Initiated Recovery. (Mark Miller, Jessica Cheng, Anshum Gupta)
Optimizations
----------------------

View File

@ -43,7 +43,7 @@ public class CloudDescriptor {
volatile Slice.State shardState = Slice.State.ACTIVE;
volatile String shardParent = null;
volatile boolean isLeader = false;
private volatile boolean isLeader = false;
volatile Replica.State lastPublished = Replica.State.ACTIVE;
public static final String NUM_SHARDS = "numShards";

View File

@ -467,7 +467,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
}
zkController.ensureReplicaInLeaderInitiatedRecovery(cc,
collection, shardId, coreNodeProps, coreNodeName,
collection, shardId, coreNodeProps, core.getCoreDescriptor(),
false /* forcePublishState */);
}
}

View File

@ -13,6 +13,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.zookeeper.KeeperException;
import org.apache.solr.util.RTimer;
import org.slf4j.Logger;
@ -54,7 +55,7 @@ public class LeaderInitiatedRecoveryThread extends Thread {
protected String shardId;
protected ZkCoreNodeProps nodeProps;
protected int maxTries;
protected String leaderCoreNodeName;
private CoreDescriptor leaderCd;
public LeaderInitiatedRecoveryThread(ZkController zkController,
CoreContainer cc,
@ -62,7 +63,7 @@ public class LeaderInitiatedRecoveryThread extends Thread {
String shardId,
ZkCoreNodeProps nodeProps,
int maxTries,
String leaderCoreNodeName)
CoreDescriptor leaderCd)
{
super("LeaderInitiatedRecoveryThread-"+nodeProps.getCoreName());
this.zkController = zkController;
@ -71,8 +72,7 @@ public class LeaderInitiatedRecoveryThread extends Thread {
this.shardId = shardId;
this.nodeProps = nodeProps;
this.maxTries = maxTries;
this.leaderCoreNodeName = leaderCoreNodeName;
this.leaderCd = leaderCd;
setDaemon(true);
}
@ -171,7 +171,7 @@ public class LeaderInitiatedRecoveryThread extends Thread {
protected void updateLIRState(String replicaCoreNodeName) {
zkController.updateLeaderInitiatedRecoveryState(collection,
shardId,
replicaCoreNodeName, Replica.State.DOWN, leaderCoreNodeName, true);
replicaCoreNodeName, Replica.State.DOWN, leaderCd, true);
}
protected void sendRecoveryCommandWithRetry() throws Exception {
@ -257,6 +257,7 @@ public class LeaderInitiatedRecoveryThread extends Thread {
break;
}
String leaderCoreNodeName = leaderCd.getCloudDescriptor().getCoreNodeName();
// stop trying if I'm no longer the leader
if (leaderCoreNodeName != null && collection != null) {
String leaderCoreNodeNameFromZk = null;
@ -273,6 +274,13 @@ public class LeaderInitiatedRecoveryThread extends Thread {
continueTrying = false;
break;
}
if (!leaderCd.getCloudDescriptor().isLeader()) {
log.warn("Stop trying to send recovery command to downed replica core=" + coreNeedingRecovery +
",coreNodeName=" + replicaCoreNodeName + " on " + replicaNodeName + " because " +
leaderCoreNodeName + " is no longer the leader!");
continueTrying = false;
break;
}
}
// additional safeguard against the replica trying to be in the active state
@ -297,9 +305,9 @@ public class LeaderInitiatedRecoveryThread extends Thread {
" on node "+replicaNodeName+" ack'd the leader initiated recovery state, "
+ "no need to keep trying to send recovery command");
} else {
String leaderCoreNodeName = zkStateReader.getLeaderRetry(collection, shardId, 5000).getName();
String lcnn = zkStateReader.getLeaderRetry(collection, shardId, 5000).getName();
List<ZkCoreNodeProps> replicaProps =
zkStateReader.getReplicaProps(collection, shardId, leaderCoreNodeName);
zkStateReader.getReplicaProps(collection, shardId, lcnn);
if (replicaProps != null && replicaProps.size() > 0) {
for (ZkCoreNodeProps prop : replicaProps) {
final Replica replica = (Replica) prop.getNodeProps();

View File

@ -1205,7 +1205,7 @@ public final class ZkController {
if (state == Replica.State.ACTIVE) {
// trying to become active, so leader-initiated state must be recovering
if (lirState == Replica.State.RECOVERING) {
updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.ACTIVE, null, true);
updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.ACTIVE, cd, true);
} else if (lirState == Replica.State.DOWN) {
throw new SolrException(ErrorCode.INVALID_STATE,
"Cannot publish state of core '" + cd.getName() + "' as active without recovering first!");
@ -1213,7 +1213,7 @@ public final class ZkController {
} else if (state == Replica.State.RECOVERING) {
// if it is currently DOWN, then trying to enter into recovering state is good
if (lirState == Replica.State.DOWN) {
updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.RECOVERING, null, true);
updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.RECOVERING, cd, true);
}
}
}
@ -1981,7 +1981,7 @@ public final class ZkController {
public boolean ensureReplicaInLeaderInitiatedRecovery(
final CoreContainer container,
final String collection, final String shardId, final ZkCoreNodeProps replicaCoreProps,
String leaderCoreNodeName, boolean forcePublishState)
CoreDescriptor leaderCd, boolean forcePublishState)
throws KeeperException, InterruptedException {
final String replicaUrl = replicaCoreProps.getCoreUrl();
@ -2020,7 +2020,7 @@ public final class ZkController {
shardId,
replicaCoreProps,
120,
leaderCoreNodeName); // core node name of current leader
leaderCd);
ExecutorService executor = container.getUpdateShardHandler().getUpdateExecutor();
try {
MDC.put("DistributedUpdateProcessor.replicaUrlToRecover", replicaCoreProps.getCoreUrl());
@ -2115,7 +2115,7 @@ public final class ZkController {
}
public void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName,
Replica.State state, String leaderCoreNodeName, boolean retryOnConnLoss) {
Replica.State state, CoreDescriptor leaderCd, boolean retryOnConnLoss) {
if (collection == null || shardId == null || coreNodeName == null) {
log.warn("Cannot set leader-initiated recovery state znode to "
+ state.toString() + " using: collection=" + collection
@ -2123,6 +2123,11 @@ public final class ZkController {
return; // if we don't have complete data about a core in cloud mode, do nothing
}
assert leaderCd != null;
assert leaderCd.getCloudDescriptor() != null;
String leaderCoreNodeName = leaderCd.getCloudDescriptor().getCoreNodeName();
String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreNodeName);
if (state == Replica.State.ACTIVE) {
@ -2158,7 +2163,7 @@ public final class ZkController {
try {
if (state == Replica.State.DOWN) {
markShardAsDownIfLeader(collection, shardId, leaderCoreNodeName, znodePath, znodeData, retryOnConnLoss);
markShardAsDownIfLeader(collection, shardId, leaderCd, znodePath, znodeData, retryOnConnLoss);
} else {
// must retry on conn loss otherwise future election attempts may assume wrong LIR state
if (zkClient.exists(znodePath, true)) {
@ -2184,18 +2189,36 @@ public final class ZkController {
* in ZK. This ensures that a long running network partition caused by GC etc
* doesn't let us mark a node as down *after* we've already lost our session
*/
private void markShardAsDownIfLeader(String collection, String shardId, String leaderCoreNodeName,
private void markShardAsDownIfLeader(String collection, String shardId, CoreDescriptor leaderCd,
String znodePath, byte[] znodeData,
boolean retryOnConnLoss) throws KeeperException, InterruptedException {
String leaderSeqPath = getLeaderSeqPath(collection, leaderCoreNodeName);
if (leaderSeqPath == null) {
throw new NotLeaderException(ErrorCode.SERVER_ERROR,
"Failed to update data to 'down' for znode: " + znodePath +
" because the zookeeper leader sequence for leader: " + leaderCoreNodeName + " is null");
if (!leaderCd.getCloudDescriptor().isLeader()) {
log.info("No longer leader, aborting attempt to mark shard down as part of LIR");
throw new NotLeaderException(ErrorCode.SERVER_ERROR, "Locally, we do not think we are the leader.");
}
ContextKey key = new ContextKey(collection, leaderCd.getCloudDescriptor().getCoreNodeName());
ElectionContext context = electionContexts.get(key);
// we make sure we locally think we are the leader before and after getting the context - then
// we only try zk if we still think we are the leader and have our leader context
if (context == null || !leaderCd.getCloudDescriptor().isLeader()) {
log.info("No longer leader, aborting attempt to mark shard down as part of LIR");
throw new NotLeaderException(ErrorCode.SERVER_ERROR, "Locally, we do not think we are the leader.");
}
// we think we are the leader - get the expected shard leader version
// we use this version and multi to ensure *only* the current zk registered leader
// for a shard can put a replica into LIR
Integer leaderZkNodeParentVersion = ((ShardLeaderElectionContextBase)context).leaderZkNodeParentVersion;
// TODO: should we do this optimistically to avoid races?
if (zkClient.exists(znodePath, retryOnConnLoss)) {
List<Op> ops = new ArrayList<>(2);
ops.add(Op.check(leaderSeqPath, -1)); // version doesn't matter, the seq path is unique
ops.add(Op.check(new org.apache.hadoop.fs.Path(((ShardLeaderElectionContextBase)context).leaderPath).getParent().toString(), leaderZkNodeParentVersion));
ops.add(Op.setData(znodePath, znodeData, -1));
zkClient.multi(ops, retryOnConnLoss);
} else {
@ -2205,8 +2228,10 @@ public final class ZkController {
} catch (KeeperException.NodeExistsException nee) {
// if it exists, that's great!
}
// we only create the entry if the context we are using is registered as the current leader in ZK
List<Op> ops = new ArrayList<>(2);
ops.add(Op.check(leaderSeqPath, -1)); // version doesn't matter, the seq path is unique
ops.add(Op.check(new org.apache.hadoop.fs.Path(((ShardLeaderElectionContextBase)context).leaderPath).getParent().toString(), leaderZkNodeParentVersion));
ops.add(Op.create(znodePath, znodeData, zkClient.getZkACLProvider().getACLsToAdd(znodePath),
CreateMode.PERSISTENT));
zkClient.multi(ops, retryOnConnLoss);

View File

@ -886,7 +886,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
collection,
shardId,
stdNode.getNodeProps(),
leaderCoreNodeName,
req.getCore().getCoreDescriptor(),
false /* forcePublishState */
);
} catch (Exception exc) {

View File

@ -17,6 +17,19 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.JSONTestUtil;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
@ -40,23 +53,12 @@ import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.servlet.SolrDispatchFilter;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.util.MockCoreContainer.MockCoreDescriptor;
import org.apache.solr.util.RTimer;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* Simulates HTTP partitions between a leader and replica but the replica does
* not lose its ZooKeeper connection.
@ -145,7 +147,22 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
ZkCoreNodeProps replicaCoreNodeProps = new ZkCoreNodeProps(notLeader);
String replicaUrl = replicaCoreNodeProps.getCoreUrl();
zkController.updateLeaderInitiatedRecoveryState(testCollectionName, shardId, notLeader.getName(), Replica.State.DOWN, leader.getName(), true);
MockCoreDescriptor cd = new MockCoreDescriptor() {
public CloudDescriptor getCloudDescriptor() {
return new CloudDescriptor(leader.getStr(ZkStateReader.CORE_NAME_PROP), new Properties(), this) {
@Override
public String getCoreNodeName() {
return leader.getName();
}
@Override
public boolean isLeader() {
return true;
}
};
}
};
zkController.updateLeaderInitiatedRecoveryState(testCollectionName, shardId, notLeader.getName(), Replica.State.DOWN, cd, true);
Map<String,Object> lirStateMap = zkController.getLeaderInitiatedRecoveryStateObject(testCollectionName, shardId, notLeader.getName());
assertNotNull(lirStateMap);
assertSame(Replica.State.DOWN, Replica.State.getState((String) lirStateMap.get(ZkStateReader.STATE_PROP)));

View File

@ -1,5 +1,7 @@
package org.apache.solr.cloud;
import java.util.Properties;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@ -23,8 +25,10 @@ import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.servlet.SolrDispatchFilter;
import org.apache.solr.util.TimeOut;
import org.apache.solr.util.MockCoreContainer.MockCoreDescriptor;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
@ -58,12 +62,28 @@ public class TestLeaderInitiatedRecoveryThread extends AbstractFullDistribZkTest
Replica replica = cloudClient.getZkStateReader().getClusterState().getReplica(DEFAULT_COLLECTION, notLeader.coreNodeName);
ZkCoreNodeProps replicaCoreNodeProps = new ZkCoreNodeProps(replica);
MockCoreDescriptor cd = new MockCoreDescriptor() {
public CloudDescriptor getCloudDescriptor() {
return new CloudDescriptor(shardToLeaderJetty.get(SHARD1).info.getStr(ZkStateReader.CORE_NAME_PROP), new Properties(), this) {
@Override
public String getCoreNodeName() {
return shardToLeaderJetty.get(SHARD1).info.getStr(ZkStateReader.CORE_NODE_NAME_PROP);
}
@Override
public boolean isLeader() {
return true;
}
};
}
};
/*
1. Test that publishDownState throws exception when zkController.isReplicaInRecoveryHandling == false
*/
try {
LeaderInitiatedRecoveryThread thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName);
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, cd);
assertFalse(zkController.isReplicaInRecoveryHandling(replicaCoreNodeProps.getCoreUrl()));
thread.run();
fail("publishDownState should not have succeeded because replica url is not marked in leader initiated recovery in ZkController");
@ -76,7 +96,7 @@ public class TestLeaderInitiatedRecoveryThread extends AbstractFullDistribZkTest
2. Test that a non-live replica cannot be put into LIR or down state
*/
LeaderInitiatedRecoveryThread thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName);
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, cd);
// kill the replica
int children = cloudClient.getZkStateReader().getZkClient().getChildren("/live_nodes", null, true).size();
ChaosMonkey.stop(notLeader.jetty);
@ -107,7 +127,7 @@ public class TestLeaderInitiatedRecoveryThread extends AbstractFullDistribZkTest
waitForRecoveriesToFinish(true);
thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName) {
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, cd) {
@Override
protected void updateLIRState(String replicaCoreNodeName) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", new KeeperException.ConnectionLossException());
@ -122,7 +142,7 @@ public class TestLeaderInitiatedRecoveryThread extends AbstractFullDistribZkTest
4. Test that if ZK connection loss or session expired then thread should not attempt to publish down state even if forcePublish=true
*/
thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName) {
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, cd) {
@Override
protected void updateLIRState(String replicaCoreNodeName) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", new KeeperException.SessionExpiredException());
@ -137,7 +157,7 @@ public class TestLeaderInitiatedRecoveryThread extends AbstractFullDistribZkTest
5. Test that any exception other then ZK connection loss or session expired should publish down state only if forcePublish=true
*/
thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName) {
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, cd) {
@Override
protected void updateLIRState(String replicaCoreNodeName) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "bogus exception");
@ -171,11 +191,12 @@ public class TestLeaderInitiatedRecoveryThread extends AbstractFullDistribZkTest
/*
6. Test that non-leader cannot set LIR nodes
*/
filter = (SolrDispatchFilter) notLeader.jetty.getDispatchFilter().getFilter();
zkController = filter.getCores().getZkController();
thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName) {
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, filter.getCores().getCores().iterator().next().getCoreDescriptor()) {
@Override
protected void updateLIRState(String replicaCoreNodeName) {
try {
@ -197,7 +218,7 @@ public class TestLeaderInitiatedRecoveryThread extends AbstractFullDistribZkTest
filter = (SolrDispatchFilter) leaderRunner.jetty.getDispatchFilter().getFilter();
zkController = filter.getCores().getZkController();
thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName);
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, filter.getCores().getCores().iterator().next().getCoreDescriptor());
thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false);
timeOut = new TimeOut(30, TimeUnit.SECONDS);
while (!timeOut.hasTimedOut()) {