SOLR-7819: ZK connection loss or session timeout do not stall indexing threads anymore and LIR activity is moved to a background thread

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1702067 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2015-09-09 18:07:45 +00:00
parent cae3b6574a
commit f538ed4e57
8 changed files with 420 additions and 196 deletions

View File

@ -182,6 +182,10 @@ Bug Fixes
* SOLR-8001: Fixed bugs in field(foo,min) and field(foo,max) when some docs have no values
(David Smiley, hossman)
* SOLR-7819: ZK connection loss or session timeout do not stall indexing threads anymore. All activity
related to leader initiated recovery is performed by a dedicated LIR thread in the background.
(Ramkumar Aiyengar, shalin)
Optimizations
----------------------

View File

@ -4,7 +4,6 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.Path;
@ -467,20 +466,9 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
}
}
LeaderInitiatedRecoveryThread lirThread =
new LeaderInitiatedRecoveryThread(zkController,
cc,
collection,
shardId,
coreNodeProps,
120,
coreNodeName);
zkController.ensureReplicaInLeaderInitiatedRecovery(
zkController.ensureReplicaInLeaderInitiatedRecovery(cc,
collection, shardId, coreNodeProps, coreNodeName,
false /* forcePublishState */, true /* retryOnConnLoss */);
ExecutorService executor = cc.getUpdateShardHandler().getUpdateExecutor();
executor.execute(lirThread);
false /* forcePublishState */);
}
}
}

View File

@ -8,9 +8,12 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.zookeeper.KeeperException;
import org.apache.solr.util.RTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -75,24 +78,108 @@ public class LeaderInitiatedRecoveryThread extends Thread {
public void run() {
RTimer timer = new RTimer();
try {
sendRecoveryCommandWithRetry();
} catch (Exception exc) {
log.error(getName()+" failed due to: "+exc, exc);
if (exc instanceof SolrException) {
throw (SolrException)exc;
} else {
throw new SolrException(ErrorCode.SERVER_ERROR, exc);
String replicaCoreName = nodeProps.getCoreName();
String replicaCoreNodeName = ((Replica) nodeProps.getNodeProps()).getName();
String replicaNodeName = nodeProps.getNodeName();
final String replicaUrl = nodeProps.getCoreUrl();
if (!zkController.isReplicaInRecoveryHandling(replicaUrl)) {
throw new SolrException(ErrorCode.INVALID_STATE, "Replica: " + replicaUrl + " should have been marked under leader initiated recovery in ZkController but wasn't.");
}
boolean sendRecoveryCommand = publishDownState(replicaCoreName, replicaCoreNodeName, replicaNodeName, replicaUrl, false);
if (sendRecoveryCommand) {
try {
sendRecoveryCommandWithRetry();
} catch (Exception exc) {
log.error(getName()+" failed due to: "+exc, exc);
if (exc instanceof SolrException) {
throw (SolrException)exc;
} else {
throw new SolrException(ErrorCode.SERVER_ERROR, exc);
}
} finally {
zkController.removeReplicaFromLeaderInitiatedRecoveryHandling(replicaUrl);
}
} else {
// replica is no longer in recovery on this node (may be handled on another node)
zkController.removeReplicaFromLeaderInitiatedRecoveryHandling(replicaUrl);
}
log.info("{} completed successfully after running for {}ms", getName(), timer.getTime());
}
public boolean publishDownState(String replicaCoreName, String replicaCoreNodeName, String replicaNodeName, String replicaUrl, boolean forcePublishState) {
boolean sendRecoveryCommand = true;
boolean publishDownState = false;
if (zkController.getZkStateReader().getClusterState().liveNodesContain(replicaNodeName)) {
try {
// create a znode that requires the replica needs to "ack" to verify it knows it was out-of-sync
updateLIRState(replicaCoreNodeName);
log.info("Put replica core={} coreNodeName={} on " +
replicaNodeName + " into leader-initiated recovery.", replicaCoreName, replicaCoreNodeName);
publishDownState = true;
} catch (Exception e) {
Throwable setLirZnodeFailedCause = SolrException.getRootCause(e);
log.error("Leader failed to set replica " +
nodeProps.getCoreUrl() + " state to DOWN due to: " + setLirZnodeFailedCause, setLirZnodeFailedCause);
if (setLirZnodeFailedCause instanceof KeeperException.SessionExpiredException
|| setLirZnodeFailedCause instanceof KeeperException.ConnectionLossException
|| setLirZnodeFailedCause instanceof ZkController.NotLeaderException) {
// our session is expired, which means our state is suspect, so don't go
// putting other replicas in recovery (see SOLR-6511)
sendRecoveryCommand = false;
forcePublishState = false; // no need to force publish any state in this case
} // else will go ahead and try to send the recovery command once after this error
}
} else {
log.info("Node " + replicaNodeName +
" is not live, so skipping leader-initiated recovery for replica: core={} coreNodeName={}",
replicaCoreName, replicaCoreNodeName);
// publishDownState will be false to avoid publishing the "down" state too many times
// as many errors can occur together and will each call into this method (SOLR-6189)
forcePublishState = false; // no need to force publish the state because replica is not live
sendRecoveryCommand = false; // no need to send recovery messages as well
}
try {
if (publishDownState || forcePublishState) {
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
ZkStateReader.BASE_URL_PROP, nodeProps.getBaseUrl(),
ZkStateReader.CORE_NAME_PROP, nodeProps.getCoreName(),
ZkStateReader.NODE_NAME_PROP, nodeProps.getNodeName(),
ZkStateReader.SHARD_ID_PROP, shardId,
ZkStateReader.COLLECTION_PROP, collection);
log.warn("Leader is publishing core={} coreNodeName ={} state={} on behalf of un-reachable replica {}",
replicaCoreName, replicaCoreNodeName, Replica.State.DOWN.toString(), replicaUrl);
zkController.getOverseerJobQueue().offer(Utils.toJSON(m));
}
} catch (Exception e) {
log.error("Could not publish 'down' state for replicaUrl: {}", replicaUrl, e);
}
return sendRecoveryCommand;
}
/*
protected scope for testing purposes
*/
protected void updateLIRState(String replicaCoreNodeName) {
zkController.updateLeaderInitiatedRecoveryState(collection,
shardId,
replicaCoreNodeName, Replica.State.DOWN, leaderCoreNodeName, true);
}
protected void sendRecoveryCommandWithRetry() throws Exception {
int tries = 0;
long waitBetweenTriesMs = 5000L;
boolean continueTrying = true;
String replicaCoreName = nodeProps.getCoreName();
String recoveryUrl = nodeProps.getBaseUrl();
String replicaNodeName = nodeProps.getNodeName();
String coreNeedingRecovery = nodeProps.getCoreName();
@ -224,11 +311,8 @@ public class LeaderInitiatedRecoveryThread extends Thread {
// OK, so the replica thinks it is active, but it never ack'd the leader initiated recovery
// so its state cannot be trusted and it needs to be told to recover again ... and we keep looping here
log.warn("Replica core={} coreNodeName={} set to active but the leader thinks it should be in recovery;"
+ " forcing it back to down state to re-run the leader-initiated recovery process; props: "+replicaProps.get(0), coreNeedingRecovery, replicaCoreNodeName);
zkController.ensureReplicaInLeaderInitiatedRecovery(
collection, shardId, nodeProps, leaderCoreNodeName,
true /* forcePublishState */, true /* retryOnConnLoss */
);
+ " forcing it back to down state to re-run the leader-initiated recovery process; props: " + replicaProps.get(0), coreNeedingRecovery, replicaCoreNodeName);
publishDownState(replicaCoreName, replicaCoreNodeName, replicaNodeName, replicaUrl, true);
}
}
break;

View File

@ -17,8 +17,6 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
import static org.apache.solr.common.cloud.ZkStateReader.*;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
@ -46,6 +44,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.google.common.base.Strings;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
@ -98,8 +97,16 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import com.google.common.base.Strings;
import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
/**
* Handle ZooKeeper interactions.
@ -1198,7 +1205,7 @@ public final class ZkController {
if (state == Replica.State.ACTIVE) {
// trying to become active, so leader-initiated state must be recovering
if (lirState == Replica.State.RECOVERING) {
updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.ACTIVE, null);
updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.ACTIVE, null, true);
} else if (lirState == Replica.State.DOWN) {
throw new SolrException(ErrorCode.INVALID_STATE,
"Cannot publish state of core '" + cd.getName() + "' as active without recovering first!");
@ -1206,7 +1213,7 @@ public final class ZkController {
} else if (state == Replica.State.RECOVERING) {
// if it is currently DOWN, then trying to enter into recovering state is good
if (lirState == Replica.State.DOWN) {
updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.RECOVERING, null);
updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.RECOVERING, null, true);
}
}
}
@ -1972,8 +1979,9 @@ public final class ZkController {
* to it.
*/
public boolean ensureReplicaInLeaderInitiatedRecovery(
final CoreContainer container,
final String collection, final String shardId, final ZkCoreNodeProps replicaCoreProps,
String leaderCoreNodeName, boolean forcePublishState, boolean retryOnConnLoss)
String leaderCoreNodeName, boolean forcePublishState)
throws KeeperException, InterruptedException {
final String replicaUrl = replicaCoreProps.getCoreUrl();
@ -1991,7 +1999,6 @@ public final class ZkController {
// about the same replica having trouble and we only need to send the "needs"
// recovery signal once
boolean nodeIsLive = true;
boolean publishDownState = false;
String replicaNodeName = replicaCoreProps.getNodeName();
String replicaCoreNodeName = ((Replica) replicaCoreProps.getNodeProps()).getName();
assert replicaCoreNodeName != null : "No core name for replica " + replicaNodeName;
@ -2003,16 +2010,30 @@ public final class ZkController {
}
}
// if the replica's state is not DOWN right now, make it so ...
// we only really need to try to send the recovery command if the node itself is "live"
// we only really need to try to start the LIR process if the node itself is "live"
if (getZkStateReader().getClusterState().liveNodesContain(replicaNodeName)) {
LeaderInitiatedRecoveryThread lirThread =
new LeaderInitiatedRecoveryThread(this,
container,
collection,
shardId,
replicaCoreProps,
120,
leaderCoreNodeName); // core node name of current leader
ExecutorService executor = container.getUpdateShardHandler().getUpdateExecutor();
try {
MDC.put("DistributedUpdateProcessor.replicaUrlToRecover", replicaCoreProps.getCoreUrl());
executor.execute(lirThread);
} finally {
MDC.remove("DistributedUpdateProcessor.replicaUrlToRecover");
}
// create a znode that requires the replica needs to "ack" to verify it knows it was out-of-sync
updateLeaderInitiatedRecoveryState(collection, shardId, replicaCoreNodeName, Replica.State.DOWN, leaderCoreNodeName);
replicasInLeaderInitiatedRecovery.put(replicaUrl,
getLeaderInitiatedRecoveryZnodePath(collection, shardId, replicaCoreNodeName));
log.info("Put replica core={} coreNodeName={} on " +
replicaNodeName + " into leader-initiated recovery.", replicaCoreProps.getCoreName(), replicaCoreNodeName);
publishDownState = true;
} else {
nodeIsLive = false; // we really don't need to send the recovery request if the node is NOT live
log.info("Node " + replicaNodeName +
@ -2023,20 +2044,6 @@ public final class ZkController {
}
}
if (publishDownState || forcePublishState) {
String replicaCoreName = replicaCoreProps.getCoreName();
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
ZkStateReader.BASE_URL_PROP, replicaCoreProps.getBaseUrl(),
ZkStateReader.CORE_NAME_PROP, replicaCoreProps.getCoreName(),
ZkStateReader.NODE_NAME_PROP, replicaCoreProps.getNodeName(),
ZkStateReader.SHARD_ID_PROP, shardId,
ZkStateReader.COLLECTION_PROP, collection);
log.warn("Leader is publishing core={} coreNodeName ={} state={} on behalf of un-reachable replica {}; forcePublishState? " + forcePublishState,
replicaCoreName, replicaCoreNodeName, Replica.State.DOWN.toString(), replicaUrl);
overseerJobQueue.offer(Utils.toJSON(m));
}
return nodeIsLive;
}
@ -2107,8 +2114,8 @@ public final class ZkController {
return stateObj;
}
private void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName,
Replica.State state, String leaderCoreNodeName) {
public void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName,
Replica.State state, String leaderCoreNodeName, boolean retryOnConnLoss) {
if (collection == null || shardId == null || coreNodeName == null) {
log.warn("Cannot set leader-initiated recovery state znode to "
+ state.toString() + " using: collection=" + collection
@ -2121,7 +2128,7 @@ public final class ZkController {
if (state == Replica.State.ACTIVE) {
// since we're marking it active, we don't need this znode anymore, so delete instead of update
try {
zkClient.delete(znodePath, -1, false);
zkClient.delete(znodePath, -1, retryOnConnLoss);
} catch (Exception justLogIt) {
log.warn("Failed to delete znode " + znodePath, justLogIt);
}
@ -2134,24 +2141,30 @@ public final class ZkController {
} catch (Exception exc) {
log.warn(exc.getMessage(), exc);
}
if (stateObj == null)
if (stateObj == null) {
stateObj = Utils.makeMap();
}
stateObj.put(ZkStateReader.STATE_PROP, state.toString());
// only update the createdBy value if it's not set
if (stateObj.get("createdByNodeName") == null)
stateObj.put("createdByNodeName", String.valueOf(this.nodeName));
if (stateObj.get("createdByNodeName") == null) {
stateObj.put("createdByNodeName", this.nodeName);
}
if (stateObj.get("createdByCoreNodeName") == null && leaderCoreNodeName != null) {
stateObj.put("createdByCoreNodeName", leaderCoreNodeName);
}
byte[] znodeData = Utils.toJSON(stateObj);
try {
if (state == Replica.State.DOWN) {
markShardAsDownIfLeader(collection, shardId, leaderCoreNodeName, znodePath, znodeData);
markShardAsDownIfLeader(collection, shardId, leaderCoreNodeName, znodePath, znodeData, retryOnConnLoss);
} else {
// must retry on conn loss otherwise future election attempts may assume wrong LIR state
if (zkClient.exists(znodePath, true)) {
zkClient.setData(znodePath, znodeData, true);
zkClient.setData(znodePath, znodeData, retryOnConnLoss);
} else {
zkClient.makePath(znodePath, znodeData, true);
zkClient.makePath(znodePath, znodeData, retryOnConnLoss);
}
}
log.info("Wrote {} to {}", state.toString(), znodePath);
@ -2172,22 +2185,23 @@ public final class ZkController {
* doesn't let us mark a node as down *after* we've already lost our session
*/
private void markShardAsDownIfLeader(String collection, String shardId, String leaderCoreNodeName,
String znodePath, byte[] znodeData) throws KeeperException, InterruptedException {
String znodePath, byte[] znodeData,
boolean retryOnConnLoss) throws KeeperException, InterruptedException {
String leaderSeqPath = getLeaderSeqPath(collection, leaderCoreNodeName);
if (leaderSeqPath == null) {
throw new SolrException(ErrorCode.SERVER_ERROR,
throw new NotLeaderException(ErrorCode.SERVER_ERROR,
"Failed to update data to 'down' for znode: " + znodePath +
" because the zookeeper leader sequence for leader: " + leaderCoreNodeName + " is null");
}
if (zkClient.exists(znodePath, true)) {
if (zkClient.exists(znodePath, retryOnConnLoss)) {
List<Op> ops = new ArrayList<>(2);
ops.add(Op.check(leaderSeqPath, -1)); // version doesn't matter, the seq path is unique
ops.add(Op.setData(znodePath, znodeData, -1));
zkClient.multi(ops, true);
zkClient.multi(ops, retryOnConnLoss);
} else {
String parentZNodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId);
try {
zkClient.makePath(parentZNodePath, true);
zkClient.makePath(parentZNodePath, retryOnConnLoss);
} catch (KeeperException.NodeExistsException nee) {
// if it exists, that's great!
}
@ -2195,7 +2209,7 @@ public final class ZkController {
ops.add(Op.check(leaderSeqPath, -1)); // version doesn't matter, the seq path is unique
ops.add(Op.create(znodePath, znodeData, zkClient.getZkACLProvider().getACLsToAdd(znodePath),
CreateMode.PERSISTENT));
zkClient.multi(ops, true);
zkClient.multi(ops, retryOnConnLoss);
}
}
@ -2473,4 +2487,13 @@ public final class ZkController {
ElectionContext context = electionContexts.get(key);
return context != null ? context.leaderSeqPath : null;
}
/**
* Thrown during leader initiated recovery process if current node is not leader
*/
public static class NotLeaderException extends SolrException {
public NotLeaderException(ErrorCode code, String msg) {
super(code, msg);
}
}
}

View File

@ -17,6 +17,21 @@ package org.apache.solr.update.processor;
* limitations under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRefBuilder;
import org.apache.solr.client.solrj.request.UpdateRequest;
@ -49,7 +64,6 @@ import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.Hash;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.handler.component.RealTimeGetComponent;
import org.apache.solr.request.SolrQueryRequest;
@ -72,23 +86,6 @@ import org.apache.solr.update.VersionInfo;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
@ -836,8 +833,6 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
break;
}
int maxTries = 1;
boolean sendRecoveryCommand = true;
String collection = null;
String shardId = null;
@ -878,33 +873,24 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (cloudDesc.getCoreNodeName().equals(leaderCoreNodeName) && foundErrorNodeInReplicaList) {
try {
// if false, then the node is probably not "live" anymore
sendRecoveryCommand =
zkController.ensureReplicaInLeaderInitiatedRecovery(collection,
shardId,
stdNode.getNodeProps(),
leaderCoreNodeName,
false /* forcePublishState */,
false /* retryOnConnLoss */
);
// we want to try more than once, ~10 minutes
if (sendRecoveryCommand) {
maxTries = 120;
} // else the node is no longer "live" so no need to send any recovery command
// and we do not need to send a recovery message
Throwable rootCause = SolrException.getRootCause(error.e);
log.error("Setting up to try to start recovery on replica {}", replicaUrl, rootCause);
zkController.ensureReplicaInLeaderInitiatedRecovery(
req.getCore().getCoreDescriptor().getCoreContainer(),
collection,
shardId,
stdNode.getNodeProps(),
leaderCoreNodeName,
false /* forcePublishState */
);
} catch (Exception exc) {
Throwable setLirZnodeFailedCause = SolrException.getRootCause(exc);
log.error("Leader failed to set replica " +
error.req.node.getUrl() + " state to DOWN due to: " + setLirZnodeFailedCause, setLirZnodeFailedCause);
if (setLirZnodeFailedCause instanceof KeeperException.SessionExpiredException ||
setLirZnodeFailedCause instanceof KeeperException.ConnectionLossException) {
// our session is expired, which means our state is suspect, so don't go
// putting other replicas in recovery (see SOLR-6511)
sendRecoveryCommand = false;
} // else will go ahead and try to send the recovery command once after this error
}
} else {
// not the leader anymore maybe or the error'd node is not my replica?
sendRecoveryCommand = false;
if (!foundErrorNodeInReplicaList) {
log.warn("Core "+cloudDesc.getCoreNodeName()+" belonging to "+collection+" "+
shardId+", does not have error'd node " + stdNode.getNodeProps().getCoreUrl() + " as a replica. " +
@ -914,30 +900,6 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
shardId+", no request recovery command will be sent!");
}
}
} // else not a StdNode, recovery command still gets sent once
if (!sendRecoveryCommand)
continue; // the replica is already in recovery handling or is not live
Throwable rootCause = SolrException.getRootCause(error.e);
log.error("Setting up to try to start recovery on replica {}", replicaUrl, rootCause);
// try to send the recovery command to the downed replica in a background thread
CoreContainer coreContainer = req.getCore().getCoreDescriptor().getCoreContainer();
LeaderInitiatedRecoveryThread lirThread =
new LeaderInitiatedRecoveryThread(zkController,
coreContainer,
collection,
shardId,
error.req.node.getNodeProps(),
maxTries,
cloudDesc.getCoreNodeName()); // core node name of current leader
ExecutorService executor = coreContainer.getUpdateShardHandler().getUpdateExecutor();
try {
MDC.put("DistributedUpdateProcessor.replicaUrlToRecover", error.req.node.getNodeProps().getCoreUrl());
executor.execute(lirThread);
} finally {
MDC.remove("DistributedUpdateProcessor.replicaUrlToRecover");
}
}

View File

@ -140,14 +140,10 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
ZkCoreNodeProps replicaCoreNodeProps = new ZkCoreNodeProps(notLeader);
String replicaUrl = replicaCoreNodeProps.getCoreUrl();
assertTrue(!zkController.isReplicaInRecoveryHandling(replicaUrl));
assertTrue(zkController.ensureReplicaInLeaderInitiatedRecovery(testCollectionName, shardId, replicaCoreNodeProps, leader.getName(), false, true));
assertTrue(zkController.isReplicaInRecoveryHandling(replicaUrl));
zkController.updateLeaderInitiatedRecoveryState(testCollectionName, shardId, notLeader.getName(), Replica.State.DOWN, leader.getName(), true);
Map<String,Object> lirStateMap = zkController.getLeaderInitiatedRecoveryStateObject(testCollectionName, shardId, notLeader.getName());
assertNotNull(lirStateMap);
assertSame(Replica.State.DOWN, Replica.State.getState((String) lirStateMap.get(ZkStateReader.STATE_PROP)));
zkController.removeReplicaFromLeaderInitiatedRecoveryHandling(replicaUrl);
assertTrue(!zkController.isReplicaInRecoveryHandling(replicaUrl));
// test old non-json format handling
SolrZkClient zkClient = zkController.getZkClient();

View File

@ -0,0 +1,224 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.servlet.SolrDispatchFilter;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
/**
* Test for {@link LeaderInitiatedRecoveryThread}
*/
@SolrTestCaseJ4.SuppressSSL
public class TestLeaderInitiatedRecoveryThread extends AbstractFullDistribZkTestBase {
public TestLeaderInitiatedRecoveryThread() {
sliceCount = 1;
fixShardCount(2);
}
public void testPublishDownState() throws Exception {
waitForRecoveriesToFinish(true);
final String leaderCoreNodeName = shardToLeaderJetty.get(SHARD1).coreNodeName;
final CloudJettyRunner leaderRunner = shardToLeaderJetty.get(SHARD1);
SolrDispatchFilter filter = (SolrDispatchFilter) leaderRunner.jetty.getDispatchFilter().getFilter();
ZkController zkController = filter.getCores().getZkController();
CloudJettyRunner notLeader = null;
for (CloudJettyRunner cloudJettyRunner : shardToJetty.get(SHARD1)) {
if (cloudJettyRunner != leaderRunner) {
notLeader = cloudJettyRunner;
break;
}
}
assertNotNull(notLeader);
Replica replica = cloudClient.getZkStateReader().getClusterState().getReplica(DEFAULT_COLLECTION, notLeader.coreNodeName);
ZkCoreNodeProps replicaCoreNodeProps = new ZkCoreNodeProps(replica);
/*
1. Test that publishDownState throws exception when zkController.isReplicaInRecoveryHandling == false
*/
try {
LeaderInitiatedRecoveryThread thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName);
assertFalse(zkController.isReplicaInRecoveryHandling(replicaCoreNodeProps.getCoreUrl()));
thread.run();
fail("publishDownState should not have succeeded because replica url is not marked in leader initiated recovery in ZkController");
} catch (SolrException e) {
assertTrue(e.code() == SolrException.ErrorCode.INVALID_STATE.code);
}
/*
2. Test that a non-live replica cannot be put into LIR or down state
*/
LeaderInitiatedRecoveryThread thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName);
// kill the replica
int children = cloudClient.getZkStateReader().getZkClient().getChildren("/live_nodes", null, true).size();
ChaosMonkey.stop(notLeader.jetty);
TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS);
while (!timeOut.hasTimedOut()) {
if (children > cloudClient.getZkStateReader().getZkClient().getChildren("/live_nodes", null, true).size()) {
break;
}
Thread.sleep(500);
}
assertTrue(children > cloudClient.getZkStateReader().getZkClient().getChildren("/live_nodes", null, true).size());
int cversion = getOverseerCversion();
// Thread should not publish LIR and down state for node which is not live, regardless of whether forcePublish is true or false
assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false));
// lets assert that we did not publish anything to overseer queue, simplest way is to assert that cversion of overseer queue zk node is still the same
assertEquals(cversion, getOverseerCversion());
assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), true));
// lets assert that we did not publish anything to overseer queue
assertEquals(cversion, getOverseerCversion());
/*
3. Test that if ZK connection loss then thread should not attempt to publish down state even if forcePublish=true
*/
ChaosMonkey.start(notLeader.jetty);
waitForRecoveriesToFinish(true);
thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName) {
@Override
protected void updateLIRState(String replicaCoreNodeName) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", new KeeperException.ConnectionLossException());
}
};
assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false));
assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), true));
assertNull(zkController.getLeaderInitiatedRecoveryState(DEFAULT_COLLECTION, SHARD1, replica.getName()));
/*
4. Test that if ZK connection loss or session expired then thread should not attempt to publish down state even if forcePublish=true
*/
thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName) {
@Override
protected void updateLIRState(String replicaCoreNodeName) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", new KeeperException.SessionExpiredException());
}
};
assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false));
assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), true));
assertNull(zkController.getLeaderInitiatedRecoveryState(DEFAULT_COLLECTION, SHARD1, replica.getName()));
/*
5. Test that any exception other then ZK connection loss or session expired should publish down state only if forcePublish=true
*/
thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName) {
@Override
protected void updateLIRState(String replicaCoreNodeName) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "bogus exception");
}
};
// the following should return true because regardless of the bogus exception in setting LIR state, we still want recovery commands to be sent,
// however the following will not publish a down state
cversion = getOverseerCversion();
assertTrue(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false));
// lets assert that we did not publish anything to overseer queue, simplest way is to assert that cversion of overseer queue zk node is still the same
assertEquals(cversion, getOverseerCversion());
assertTrue(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), true));
// this should have published a down state so assert that cversion has incremented
assertTrue(getOverseerCversion() > cversion);
timeOut = new TimeOut(30, TimeUnit.SECONDS);
while (!timeOut.hasTimedOut()) {
cloudClient.getZkStateReader().updateClusterState();
Replica r = cloudClient.getZkStateReader().getClusterState().getReplica(DEFAULT_COLLECTION, replica.getName());
if (r.getState() == Replica.State.DOWN) {
break;
}
Thread.sleep(500);
}
assertNull(zkController.getLeaderInitiatedRecoveryState(DEFAULT_COLLECTION, SHARD1, replica.getName()));
assertEquals(Replica.State.DOWN, cloudClient.getZkStateReader().getClusterState().getReplica(DEFAULT_COLLECTION, replica.getName()).getState());
/*
6. Test that non-leader cannot set LIR nodes
*/
filter = (SolrDispatchFilter) notLeader.jetty.getDispatchFilter().getFilter();
zkController = filter.getCores().getZkController();
thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName) {
@Override
protected void updateLIRState(String replicaCoreNodeName) {
try {
super.updateLIRState(replicaCoreNodeName);
} catch (Exception e) {
assertTrue(e instanceof ZkController.NotLeaderException);
throw e;
}
}
};
cversion = getOverseerCversion();
assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false));
assertEquals(cversion, getOverseerCversion());
/*
7. assert that we can write a LIR state if everything else is fine
*/
// reset the zkcontroller to the one from the leader
filter = (SolrDispatchFilter) leaderRunner.jetty.getDispatchFilter().getFilter();
zkController = filter.getCores().getZkController();
thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName);
thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false);
timeOut = new TimeOut(30, TimeUnit.SECONDS);
while (!timeOut.hasTimedOut()) {
Replica.State state = zkController.getLeaderInitiatedRecoveryState(DEFAULT_COLLECTION, SHARD1, replica.getName());
if (state == Replica.State.DOWN) {
break;
}
Thread.sleep(500);
}
assertNotNull(zkController.getLeaderInitiatedRecoveryStateObject(DEFAULT_COLLECTION, SHARD1, replica.getName()));
assertEquals(Replica.State.DOWN, zkController.getLeaderInitiatedRecoveryState(DEFAULT_COLLECTION, SHARD1, replica.getName()));
/*
7. Test that
*/
}
protected int getOverseerCversion() throws KeeperException, InterruptedException {
Stat stat = new Stat();
cloudClient.getZkStateReader().getZkClient().getData("/overseer/queue", null, stat, true);
return stat.getCversion();
}
}

View File

@ -49,6 +49,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
@Slow
@SolrTestCaseJ4.SuppressSSL
public class ZkControllerTest extends SolrTestCaseJ4 {
private static final String COLLECTION_NAME = "collection1";
@ -252,64 +253,6 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
}
}
/*
Test that:
1) LIR state to 'down' is not set unless publishing node is a leader
1a) Test that leader can publish when LIR node already exists in zk
1b) Test that leader can publish when LIR node does not exist - TODO
2) LIR state to 'active' or 'recovery' can be set regardless of whether publishing
node is leader or not - TODO
*/
public void testEnsureReplicaInLeaderInitiatedRecovery() throws Exception {
String zkDir = createTempDir("testEnsureReplicaInLeaderInitiatedRecovery").toFile().getAbsolutePath();
CoreContainer cc = null;
ZkTestServer server = new ZkTestServer(zkDir);
try {
server.run();
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
cc = getCoreContainer();
ZkController zkController = null;
try {
CloudConfig cloudConfig = new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "solr").build();
zkController = new ZkController(cc, server.getZkAddress(), TIMEOUT, cloudConfig, new CurrentCoreDescriptorProvider() {
@Override
public List<CoreDescriptor> getCurrentDescriptors() {
// do nothing
return null;
}
});
HashMap<String, Object> propMap = new HashMap<>();
propMap.put(ZkStateReader.BASE_URL_PROP, "http://127.0.0.1:8983/solr");
propMap.put(ZkStateReader.CORE_NAME_PROP, "replica1");
propMap.put(ZkStateReader.NODE_NAME_PROP, "127.0.0.1:8983_solr");
Replica replica = new Replica("replica1", propMap);
try {
// this method doesn't throw exception when node isn't leader
zkController.ensureReplicaInLeaderInitiatedRecovery("c1", "shard1",
new ZkCoreNodeProps(replica), "non_existent_leader", false, false);
fail("ZkController should not write LIR state for node which is not leader");
} catch (Exception e) {
assertNull("ZkController should not write LIR state for node which is not leader",
zkController.getLeaderInitiatedRecoveryState("c1", "shard1", "replica1"));
}
} finally {
if (zkController != null)
zkController.close();
}
} finally {
if (cc != null) {
cc.shutdown();
}
server.shutdown();
}
}
@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-7736")
public void testPublishAndWaitForDownStates() throws Exception {
String zkDir = createTempDir("testPublishAndWaitForDownStates").toFile().getAbsolutePath();