SOLR-7109: Indexing threads stuck during network partition can put leader into down state

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1666825 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2015-03-15 18:40:50 +00:00
parent c7a69e7acc
commit 5caf937f26
8 changed files with 153 additions and 24 deletions

View File

@ -236,6 +236,9 @@ Bug Fixes
* SOLR-6892: Improve the way update processors are used and make it simpler (Noble Paul)
* SOLR-7109: Indexing threads stuck during network partition can put leader into down state.
(Mark Miller, Anshum Gupta, Ramkumar Aiyengar, yonik, shalin)
Optimizations
----------------------

View File

@ -402,7 +402,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
120,
coreNodeName);
zkController.ensureReplicaInLeaderInitiatedRecovery(
collection, shardId, coreNodeProps.getCoreUrl(), coreNodeProps, false);
collection, shardId, coreNodeProps, false, coreNodeName);
ExecutorService executor = cc.getUpdateShardHandler().getUpdateExecutor();
executor.execute(lirThread);

View File

@ -226,8 +226,8 @@ public class LeaderInitiatedRecoveryThread extends Thread {
// so its state cannot be trusted and it needs to be told to recover again ... and we keep looping here
log.warn("Replica core={} coreNodeName={} set to active but the leader thinks it should be in recovery;"
+ " forcing it back to down state to re-run the leader-initiated recovery process; props: "+replicaProps.get(0), coreNeedingRecovery, replicaCoreNodeName);
zkController.ensureReplicaInLeaderInitiatedRecovery(collection,
shardId, replicaUrl, nodeProps, true); // force republish state to "down"
// force republish state to "down"
zkController.ensureReplicaInLeaderInitiatedRecovery(collection, shardId, nodeProps, true, leaderCoreNodeName);
}
}
break;

View File

@ -59,6 +59,7 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
@ -160,7 +161,7 @@ public final class ZkController {
return true;
}
}
private final Map<ContextKey, ElectionContext> electionContexts = Collections.synchronizedMap(new HashMap<ContextKey, ElectionContext>());
private final Map<ContextKey, ElectionContext> electionContexts = Collections.synchronizedMap(new HashMap<>());
private final SolrZkClient zkClient;
private final ZkCmdExecutor cmdExecutor;
@ -1125,10 +1126,10 @@ public final class ZkController {
if (!ZkStateReader.DOWN.equals(state)) {
String lirState = getLeaderInitiatedRecoveryState(collection, shardId, coreNodeName);
if (lirState != null) {
if ("active".equals(state)) {
if (ZkStateReader.ACTIVE.equals(state)) {
// trying to become active, so leader-initiated state must be recovering
if (ZkStateReader.RECOVERING.equals(lirState)) {
updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.ACTIVE);
updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.ACTIVE, null);
} else if (ZkStateReader.DOWN.equals(lirState)) {
throw new SolrException(ErrorCode.INVALID_STATE,
"Cannot publish state of core '"+cd.getName()+"' as active without recovering first!");
@ -1136,13 +1137,13 @@ public final class ZkController {
} else if (ZkStateReader.RECOVERING.equals(state)) {
// if it is currently DOWN, then trying to enter into recovering state is good
if (ZkStateReader.DOWN.equals(lirState)) {
updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.RECOVERING);
updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.RECOVERING, null);
}
}
}
}
Map<String, Object> props = new HashMap<String, Object>();
Map<String, Object> props = new HashMap<>();
props.put(Overseer.QUEUE_OPERATION, "state");
props.put(ZkStateReader.STATE_PROP, state);
props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
@ -1881,9 +1882,11 @@ public final class ZkController {
* to it.
*/
public boolean ensureReplicaInLeaderInitiatedRecovery(final String collection,
final String shardId, final String replicaUrl, final ZkCoreNodeProps replicaCoreProps, boolean forcePublishState)
final String shardId, final ZkCoreNodeProps replicaCoreProps, boolean forcePublishState, String leaderCoreNodeName)
throws KeeperException, InterruptedException
{
{
final String replicaUrl = replicaCoreProps.getCoreUrl();
if (collection == null)
throw new IllegalArgumentException("collection parameter cannot be null for starting leader-initiated recovery for replica: "+replicaUrl);
@ -1914,7 +1917,7 @@ public final class ZkController {
// we only really need to try to send the recovery command if the node itself is "live"
if (getZkStateReader().getClusterState().liveNodesContain(replicaNodeName)) {
// create a znode that requires the replica needs to "ack" to verify it knows it was out-of-sync
updateLeaderInitiatedRecoveryState(collection, shardId, replicaCoreNodeName, ZkStateReader.DOWN);
updateLeaderInitiatedRecoveryState(collection, shardId, replicaCoreNodeName, ZkStateReader.DOWN, leaderCoreNodeName);
replicasInLeaderInitiatedRecovery.put(replicaUrl,
getLeaderInitiatedRecoveryZnodePath(collection, shardId, replicaCoreNodeName));
log.info("Put replica core={} coreNodeName={} on "+
@ -2010,7 +2013,8 @@ public final class ZkController {
return stateObj;
}
private void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName, String state) {
private void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName, String state,
String leaderCoreNodeName) {
if (collection == null || shardId == null || coreNodeName == null) {
log.warn("Cannot set leader-initiated recovery state znode to "+state+" using: collection="+collection+
"; shardId="+shardId+"; coreNodeName="+coreNodeName);
@ -2024,7 +2028,7 @@ public final class ZkController {
try {
zkClient.delete(znodePath, -1, false);
} catch (Exception justLogIt) {
log.warn("Failed to delete znode "+znodePath+" due to: "+justLogIt);
log.warn("Failed to delete znode " + znodePath, justLogIt);
}
return;
}
@ -2044,24 +2048,62 @@ public final class ZkController {
stateObj.put("createdByNodeName", String.valueOf(this.nodeName));
byte[] znodeData = ZkStateReader.toJSON(stateObj);
boolean retryOnConnLoss = true; // be a little more robust when trying to write data
try {
if (zkClient.exists(znodePath, retryOnConnLoss)) {
zkClient.setData(znodePath, znodeData, retryOnConnLoss);
} else {
zkClient.makePath(znodePath, znodeData, retryOnConnLoss);
if (ZkStateReader.DOWN.equals(state)) {
markShardAsDownIfLeader(collection, shardId, leaderCoreNodeName, znodePath, znodeData);
} else {
if (zkClient.exists(znodePath, true)) {
zkClient.setData(znodePath, znodeData, true);
} else {
zkClient.makePath(znodePath, znodeData, true);
}
}
log.info("Wrote "+state+" to "+znodePath);
} catch (Exception exc) {
if (exc instanceof SolrException) {
throw (SolrException)exc;
} else {
throw new SolrException(ErrorCode.SERVER_ERROR,
"Failed to update data to "+state+" for znode: "+znodePath, exc);
throw new SolrException(ErrorCode.SERVER_ERROR,
"Failed to update data to "+state+" for znode: "+znodePath, exc);
}
}
}
/**
* we use ZK's multi-transactional semantics to ensure that we are able to
* publish a replica as 'down' only if our leader election node still exists
* in ZK. This ensures that a long running network partition caused by GC etc
* doesn't let us mark a node as down *after* we've already lost our session
*/
private void markShardAsDownIfLeader(String collection, String shardId, String leaderCoreNodeName,
String znodePath, byte[] znodeData) throws KeeperException, InterruptedException {
String leaderSeqPath = getLeaderSeqPath(collection, leaderCoreNodeName);
if (leaderSeqPath == null) {
throw new SolrException(ErrorCode.SERVER_ERROR,
"Failed to update data to 'down' for znode: " + znodePath +
" because the zookeeper leader sequence for leader: " + leaderCoreNodeName + " is null");
}
if (zkClient.exists(znodePath, true)) {
List<Op> ops = new ArrayList<>(2);
ops.add(Op.check(leaderSeqPath, -1)); // version doesn't matter, the seq path is unique
ops.add(Op.setData(znodePath, znodeData, -1));
zkClient.multi(ops, true);
} else {
String parentZNodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId);
try {
zkClient.makePath(parentZNodePath, true);
} catch (KeeperException.NodeExistsException nee) {
// if it exists, that's great!
}
List<Op> ops = new ArrayList<>(2);
ops.add(Op.check(leaderSeqPath, -1)); // version doesn't matter, the seq path is unique
ops.add(Op.create(znodePath, znodeData, zkClient.getZkACLProvider().getACLsToAdd(znodePath),
CreateMode.PERSISTENT));
zkClient.multi(ops, true);
}
}
public String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId) {
return "/collections/"+collection+"/leader_initiated_recovery/"+shardId;
}
@ -2299,4 +2341,9 @@ public final class ZkController {
};
}
public String getLeaderSeqPath(String collection, String coreNodeName) {
ContextKey key = new ContextKey(collection, coreNodeName);
ElectionContext context = electionContexts.get(key);
return context != null ? context.leaderSeqPath : null;
}
}

View File

@ -872,9 +872,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
sendRecoveryCommand =
zkController.ensureReplicaInLeaderInitiatedRecovery(collection,
shardId,
replicaUrl,
stdNode.getNodeProps(),
false);
false,
leaderCoreNodeName);
// we want to try more than once, ~10 minutes
if (sendRecoveryCommand) {

View File

@ -146,7 +146,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
String replicaUrl = replicaCoreNodeProps.getCoreUrl();
assertTrue(!zkController.isReplicaInRecoveryHandling(replicaUrl));
assertTrue(zkController.ensureReplicaInLeaderInitiatedRecovery(testCollectionName, shardId, replicaUrl, replicaCoreNodeProps, false));
assertTrue(zkController.ensureReplicaInLeaderInitiatedRecovery(testCollectionName, shardId, replicaCoreNodeProps, false, leader.getName()));
assertTrue(zkController.isReplicaInRecoveryHandling(replicaUrl));
Map<String,Object> lirStateMap = zkController.getLeaderInitiatedRecoveryStateObject(testCollectionName, shardId, notLeader.getName());
assertNotNull(lirStateMap);

View File

@ -19,8 +19,10 @@ package org.apache.solr.cloud;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkConfigManager;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CloudConfig;
@ -244,6 +246,65 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
}
}
public void testEnsureReplicaInLeaderInitiatedRecovery() throws Exception {
String zkDir = createTempDir("testEnsureReplicaInLeaderInitiatedRecovery").toFile().getAbsolutePath();
CoreContainer cc = null;
ZkTestServer server = new ZkTestServer(zkDir);
try {
server.run();
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
cc = getCoreContainer();
ZkController zkController = null;
try {
CloudConfig cloudConfig = new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "solr").build();
zkController = new ZkController(cc, server.getZkAddress(), TIMEOUT, cloudConfig, new CurrentCoreDescriptorProvider() {
@Override
public List<CoreDescriptor> getCurrentDescriptors() {
// do nothing
return null;
}
});
HashMap<String, Object> propMap = new HashMap<>();
propMap.put(ZkStateReader.BASE_URL_PROP, "http://127.0.0.1:8983/solr");
propMap.put(ZkStateReader.CORE_NAME_PROP, "replica1");
propMap.put(ZkStateReader.NODE_NAME_PROP, "127.0.0.1:8983_solr");
Replica replica = new Replica("replica1", propMap);
try {
// this method doesn't throw exception when node isn't leader
zkController.ensureReplicaInLeaderInitiatedRecovery("c1", "shard1",
new ZkCoreNodeProps(replica), false, "non_existent_leader");
fail("ZkController should not write LIR state for node which is not leader");
} catch (Exception e) {
assertNull("ZkController should not write LIR state for node which is not leader",
zkController.getLeaderInitiatedRecoveryState("c1", "shard1", "replica1"));
}
} finally {
if (zkController != null)
zkController.close();
}
} finally {
if (cc != null) {
cc.shutdown();
}
server.shutdown();
}
}
/*
Test that:
1) LIR state to 'down' is not set unless publishing node is a leader
1a) Test that leader can publish when LIR node already exists in zk
1b) Test that leader can publish when LIR node does not exist
2) LIR state to 'active' or 'recovery' can be set regardless of whether publishing
node is leader or not
*/
private CoreContainer getCoreContainer() {
return new MockCoreContainer();
}

View File

@ -29,6 +29,8 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.KeeperException.NotEmptyException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
@ -560,6 +562,19 @@ public class SolrZkClient implements Closeable {
return setData(path, data, retryOnConnLoss);
}
public List<OpResult> multi(final Iterable<Op> ops, boolean retryOnConnLoss) throws InterruptedException, KeeperException {
if (retryOnConnLoss) {
return zkCmdExecutor.retryOperation(new ZkOperation() {
@Override
public List<OpResult> execute() throws KeeperException, InterruptedException {
return keeper.multi(ops);
}
});
} else {
return keeper.multi(ops);
}
}
/**
* Fills string with printout of current ZooKeeper layout.
*/
@ -740,4 +755,7 @@ public class SolrZkClient implements Closeable {
return zkServerAddress;
}
public ZkACLProvider getZkACLProvider() {
return zkACLProvider;
}
}