mirror of https://github.com/apache/lucene.git
SOLR-6235: Leader initiated recovery should use coreNodeName instead of coreName to avoid marking all replicas having common core name as down
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1610028 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5ff6d55b77
commit
31d361d6ec
|
@ -168,6 +168,9 @@ Bug Fixes
|
|||
* SOLR-6229: Make SuggestComponent return 400 instead of 500 for bad dictionary selected in request.
|
||||
(Tomás Fernández Löbbe via shalin)
|
||||
|
||||
* SOLR-6235: Leader initiated recovery should use coreNodeName instead of coreName to avoid marking
|
||||
all replicas having common core name as down. (shalin)
|
||||
|
||||
Optimizations
|
||||
---------------------
|
||||
|
||||
|
|
|
@ -330,10 +330,11 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
|||
CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
|
||||
String coll = cloudDesc.getCollectionName();
|
||||
String shardId = cloudDesc.getShardId();
|
||||
String coreNodeName = cloudDesc.getCoreNodeName();
|
||||
|
||||
if (coll == null || shardId == null) {
|
||||
log.error("Cannot start leader-initiated recovery on new leader (core="+
|
||||
coreName+") because collection and/or shard is null!");
|
||||
coreName+",coreNodeName=" + coreNodeName + ") because collection and/or shard is null!");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -346,24 +347,22 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
|||
}
|
||||
|
||||
if (replicas != null && replicas.size() > 0) {
|
||||
for (String replicaCore : replicas) {
|
||||
for (String replicaCoreNodeName : replicas) {
|
||||
|
||||
if (coreName.equals(replicaCore))
|
||||
if (coreNodeName.equals(replicaCoreNodeName))
|
||||
continue; // added safe-guard so we don't mark this core as down
|
||||
|
||||
String lirState = zkController.getLeaderInitiatedRecoveryState(coll, shardId, replicaCore);
|
||||
String lirState = zkController.getLeaderInitiatedRecoveryState(coll, shardId, replicaCoreNodeName);
|
||||
if (ZkStateReader.DOWN.equals(lirState) || ZkStateReader.RECOVERY_FAILED.equals(lirState)) {
|
||||
log.info("After "+coreName+" was elected leader, found "+
|
||||
replicaCore+" as "+lirState+" and needing recovery.");
|
||||
|
||||
log.info("After core={} coreNodeName={} was elected leader, it was found in state: "
|
||||
+ lirState + " and needing recovery.", coreName, coreNodeName);
|
||||
List<ZkCoreNodeProps> replicaProps =
|
||||
zkController.getZkStateReader().getReplicaProps(
|
||||
collection, shardId, coreName, replicaCore, null, null);
|
||||
zkController.getZkStateReader().getReplicaProps(collection, shardId, coreNodeName, null);
|
||||
|
||||
if (replicaProps != null && replicaProps.size() > 0) {
|
||||
ZkCoreNodeProps coreNodeProps = null;
|
||||
for (ZkCoreNodeProps p : replicaProps) {
|
||||
if (p.getCoreName().equals(replicaCore)) {
|
||||
if (p.getCoreName().equals(replicaCoreNodeName)) {
|
||||
coreNodeProps = p;
|
||||
break;
|
||||
}
|
||||
|
@ -377,7 +376,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
|||
coreNodeProps,
|
||||
120);
|
||||
zkController.ensureReplicaInLeaderInitiatedRecovery(
|
||||
collection, shardId, replicaCore, coreNodeProps, false);
|
||||
collection, shardId, coreNodeProps.getCoreUrl(), coreNodeProps, false);
|
||||
|
||||
ExecutorService executor = cc.getUpdateShardHandler().getUpdateExecutor();
|
||||
executor.execute(lirThread);
|
||||
|
@ -453,7 +452,8 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
|||
}
|
||||
|
||||
private boolean shouldIBeLeader(ZkNodeProps leaderProps, SolrCore core, boolean weAreReplacement) {
|
||||
log.info("Checking if I ("+core.getName()+") should try and be the leader.");
|
||||
log.info("Checking if I (core={},coreNodeName={}) should try and be the leader.", core.getName(),
|
||||
core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
|
||||
|
||||
if (isClosed) {
|
||||
log.info("Bailing on leader process because we have been closed");
|
||||
|
@ -470,7 +470,8 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
|
|||
|
||||
// maybe active but if the previous leader marked us as down and
|
||||
// we haven't recovered, then can't be leader
|
||||
String lirState = zkController.getLeaderInitiatedRecoveryState(collection, shardId, core.getName());
|
||||
String lirState = zkController.getLeaderInitiatedRecoveryState(collection, shardId,
|
||||
core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
|
||||
if (ZkStateReader.DOWN.equals(lirState) || ZkStateReader.RECOVERING.equals(lirState)) {
|
||||
log.warn("Although my last published state is Active, the previous leader marked me "+core.getName()
|
||||
+ " as " + lirState
|
||||
|
|
|
@ -10,6 +10,7 @@ import org.apache.solr.client.solrj.impl.HttpSolrServer;
|
|||
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
|
||||
|
@ -92,6 +93,7 @@ public class LeaderInitiatedRecoveryThread extends Thread {
|
|||
String recoveryUrl = nodeProps.getBaseUrl();
|
||||
String replicaNodeName = nodeProps.getNodeName();
|
||||
String coreNeedingRecovery = nodeProps.getCoreName();
|
||||
String replicaCoreNodeName = ((Replica) nodeProps.getNodeProps()).getName();
|
||||
String replicaUrl = nodeProps.getCoreUrl();
|
||||
|
||||
log.info(getName()+" started running to send REQUESTRECOVERY command to "+replicaUrl+
|
||||
|
@ -103,10 +105,10 @@ public class LeaderInitiatedRecoveryThread extends Thread {
|
|||
|
||||
while (continueTrying && ++tries < maxTries) {
|
||||
if (tries > 1) {
|
||||
log.warn("Asking core "+coreNeedingRecovery+" on " + recoveryUrl +
|
||||
" to recover; unsuccessful after "+tries+" of "+maxTries+" attempts so far ...");
|
||||
log.warn("Asking core={} coreNodeName={} on " + recoveryUrl +
|
||||
" to recover; unsuccessful after "+tries+" of "+maxTries+" attempts so far ...", coreNeedingRecovery, replicaCoreNodeName);
|
||||
} else {
|
||||
log.info("Asking core "+coreNeedingRecovery+" on " + recoveryUrl + " to recover");
|
||||
log.info("Asking core={} coreNodeName={} on " + recoveryUrl + " to recover", coreNeedingRecovery, replicaCoreNodeName);
|
||||
}
|
||||
|
||||
HttpSolrServer server = new HttpSolrServer(recoveryUrl);
|
||||
|
@ -116,8 +118,8 @@ public class LeaderInitiatedRecoveryThread extends Thread {
|
|||
try {
|
||||
server.request(recoverRequestCmd);
|
||||
|
||||
log.info("Successfully sent "+CoreAdminAction.REQUESTRECOVERY+
|
||||
" command to core "+coreNeedingRecovery+" on "+recoveryUrl);
|
||||
log.info("Successfully sent " + CoreAdminAction.REQUESTRECOVERY +
|
||||
" command to core={} coreNodeName={} on " + recoveryUrl, coreNeedingRecovery, replicaCoreNodeName);
|
||||
|
||||
continueTrying = false; // succeeded, so stop looping
|
||||
} catch (Throwable t) {
|
||||
|
@ -147,8 +149,8 @@ public class LeaderInitiatedRecoveryThread extends Thread {
|
|||
}
|
||||
|
||||
if (coreContainer.isShutDown()) {
|
||||
log.warn("Stop trying to send recovery command to downed replica "+coreNeedingRecovery+
|
||||
" on "+replicaNodeName+" because my core container is shutdown.");
|
||||
log.warn("Stop trying to send recovery command to downed replica core={} coreNodeName={} on "
|
||||
+ replicaNodeName + " because my core container is shutdown.", coreNeedingRecovery, replicaCoreNodeName);
|
||||
continueTrying = false;
|
||||
break;
|
||||
}
|
||||
|
@ -174,11 +176,11 @@ public class LeaderInitiatedRecoveryThread extends Thread {
|
|||
try {
|
||||
// call out to ZooKeeper to get the leader-initiated recovery state
|
||||
String lirState =
|
||||
zkController.getLeaderInitiatedRecoveryState(collection, shardId, coreNeedingRecovery);
|
||||
zkController.getLeaderInitiatedRecoveryState(collection, shardId, replicaCoreNodeName);
|
||||
|
||||
if (lirState == null) {
|
||||
log.warn("Stop trying to send recovery command to downed replica "+coreNeedingRecovery+
|
||||
" on "+replicaNodeName+" because the znode no longer exists.");
|
||||
log.warn("Stop trying to send recovery command to downed replica core="+coreNeedingRecovery+
|
||||
",coreNodeName=" + replicaCoreNodeName + " on "+replicaNodeName+" because the znode no longer exists.");
|
||||
continueTrying = false;
|
||||
break;
|
||||
}
|
||||
|
@ -202,8 +204,8 @@ public class LeaderInitiatedRecoveryThread extends Thread {
|
|||
if (ZkStateReader.DOWN.equals(lirState)) {
|
||||
// OK, so the replica thinks it is active, but it never ack'd the leader initiated recovery
|
||||
// so its state cannot be trusted and it needs to be told to recover again ... and we keep looping here
|
||||
log.warn("Replica "+coreNeedingRecovery+" set to active but the leader thinks it should be in recovery;"
|
||||
+ " forcing it back to down state to re-run the leader-initiated recovery process; props: "+replicaProps.get(0));
|
||||
log.warn("Replica core={} coreNodeName={} set to active but the leader thinks it should be in recovery;"
|
||||
+ " forcing it back to down state to re-run the leader-initiated recovery process; props: "+replicaProps.get(0), coreNeedingRecovery, replicaCoreNodeName);
|
||||
zkController.ensureReplicaInLeaderInitiatedRecovery(collection,
|
||||
shardId, replicaUrl, nodeProps, true); // force republish state to "down"
|
||||
}
|
||||
|
@ -211,7 +213,7 @@ public class LeaderInitiatedRecoveryThread extends Thread {
|
|||
}
|
||||
}
|
||||
} catch (Exception ignoreMe) {
|
||||
log.warn("Failed to determine state of "+coreNeedingRecovery+" due to: "+ignoreMe);
|
||||
log.warn("Failed to determine state of core={} coreNodeName={} due to: "+ignoreMe, coreNeedingRecovery, replicaCoreNodeName);
|
||||
// eventually this loop will exhaust max tries and stop so we can just log this for now
|
||||
}
|
||||
}
|
||||
|
|
|
@ -988,7 +988,8 @@ public final class ZkController {
|
|||
}
|
||||
|
||||
// see if the leader told us to recover
|
||||
String lirState = getLeaderInitiatedRecoveryState(collection, shardId, coreName);
|
||||
String lirState = getLeaderInitiatedRecoveryState(collection, shardId,
|
||||
core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
|
||||
if (ZkStateReader.DOWN.equals(lirState)) {
|
||||
log.info("Leader marked core "+core.getName()+" down; starting recovery process");
|
||||
core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
|
||||
|
@ -1041,12 +1042,12 @@ public final class ZkController {
|
|||
// If the leader initiated recovery, then verify that this replica has performed
|
||||
// recovery as requested before becoming active; don't even look at lirState if going down
|
||||
if (!ZkStateReader.DOWN.equals(state)) {
|
||||
String lirState = getLeaderInitiatedRecoveryState(collection, shardId, cd.getName());
|
||||
String lirState = getLeaderInitiatedRecoveryState(collection, shardId, coreNodeName);
|
||||
if (lirState != null) {
|
||||
if ("active".equals(state)) {
|
||||
// trying to become active, so leader-initiated state must be recovering
|
||||
if (ZkStateReader.RECOVERING.equals(lirState)) {
|
||||
updateLeaderInitiatedRecoveryState(collection, shardId, cd.getName(), ZkStateReader.ACTIVE);
|
||||
updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.ACTIVE);
|
||||
} else if (ZkStateReader.DOWN.equals(lirState)) {
|
||||
throw new SolrException(ErrorCode.INVALID_STATE,
|
||||
"Cannot publish state of core '"+cd.getName()+"' as active without recovering first!");
|
||||
|
@ -1054,7 +1055,7 @@ public final class ZkController {
|
|||
} else if (ZkStateReader.RECOVERING.equals(state)) {
|
||||
// if it is currently DOWN, then trying to enter into recovering state is good
|
||||
if (ZkStateReader.DOWN.equals(lirState)) {
|
||||
updateLeaderInitiatedRecoveryState(collection, shardId, cd.getName(), ZkStateReader.RECOVERING);
|
||||
updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.RECOVERING);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1486,6 +1487,7 @@ public final class ZkController {
|
|||
String leaderBaseUrl = leaderProps.getBaseUrl();
|
||||
String leaderCoreName = leaderProps.getCoreName();
|
||||
|
||||
String myCoreNodeName = cloudDesc.getCoreNodeName();
|
||||
String myCoreName = descriptor.getName();
|
||||
String ourUrl = ZkCoreNodeProps.getCoreUrl(getBaseUrl(), myCoreName);
|
||||
|
||||
|
@ -1496,18 +1498,18 @@ public final class ZkController {
|
|||
// then we don't need the leader to wait on seeing the down state
|
||||
String lirState = null;
|
||||
try {
|
||||
lirState = getLeaderInitiatedRecoveryState(collection, shard, myCoreName);
|
||||
lirState = getLeaderInitiatedRecoveryState(collection, shard, myCoreNodeName);
|
||||
} catch (Exception exc) {
|
||||
log.error("Failed to determine if replica "+myCoreName+
|
||||
log.error("Failed to determine if replica "+myCoreNodeName+
|
||||
" is in leader-initiated recovery due to: "+exc, exc);
|
||||
}
|
||||
|
||||
if (lirState != null) {
|
||||
log.info("Replica "+myCoreName+
|
||||
log.info("Replica "+myCoreNodeName+
|
||||
" is already in leader-initiated recovery, so not waiting for leader to see down state.");
|
||||
} else {
|
||||
|
||||
log.info("Replica "+myCoreName+
|
||||
log.info("Replica "+myCoreNodeName+
|
||||
" NOT in leader-initiated recovery, need to wait for leader to see down state.");
|
||||
|
||||
HttpSolrServer server = null;
|
||||
|
@ -1777,6 +1779,9 @@ public final class ZkController {
|
|||
// recovery signal once
|
||||
boolean nodeIsLive = true;
|
||||
boolean publishDownState = false;
|
||||
String replicaNodeName = replicaCoreProps.getNodeName();
|
||||
String replicaCoreNodeName = ((Replica)replicaCoreProps.getNodeProps()).getName();
|
||||
assert replicaCoreNodeName != null : "No core name for replica "+replicaNodeName;
|
||||
synchronized (replicasInLeaderInitiatedRecovery) {
|
||||
if (replicasInLeaderInitiatedRecovery.containsKey(replicaUrl)) {
|
||||
if (!forcePublishState) {
|
||||
|
@ -1786,24 +1791,20 @@ public final class ZkController {
|
|||
}
|
||||
|
||||
// if the replica's state is not DOWN right now, make it so ...
|
||||
String replicaNodeName = replicaCoreProps.getNodeName();
|
||||
String replicaCoreName = replicaCoreProps.getCoreName();
|
||||
assert replicaCoreName != null : "No core name for replica "+replicaNodeName;
|
||||
|
||||
// we only really need to try to send the recovery command if the node itself is "live"
|
||||
if (getZkStateReader().getClusterState().liveNodesContain(replicaNodeName)) {
|
||||
replicasInLeaderInitiatedRecovery.put(replicaUrl,
|
||||
getLeaderInitiatedRecoveryZnodePath(collection, shardId, replicaCoreName));
|
||||
getLeaderInitiatedRecoveryZnodePath(collection, shardId, replicaCoreNodeName));
|
||||
// create a znode that requires the replica needs to "ack" to verify it knows it was out-of-sync
|
||||
updateLeaderInitiatedRecoveryState(collection, shardId, replicaCoreName, ZkStateReader.DOWN);
|
||||
log.info("Put replica "+replicaCoreName+" on "+
|
||||
replicaNodeName+" into leader-initiated recovery.");
|
||||
updateLeaderInitiatedRecoveryState(collection, shardId, replicaCoreNodeName, ZkStateReader.DOWN);
|
||||
log.info("Put replica core={} coreNodeName={} on "+
|
||||
replicaNodeName+" into leader-initiated recovery.", replicaCoreProps.getCoreName(), replicaCoreNodeName);
|
||||
publishDownState = true;
|
||||
} else {
|
||||
nodeIsLive = false; // we really don't need to send the recovery request if the node is NOT live
|
||||
log.info("Node "+replicaNodeName+
|
||||
" is not live, so skipping leader-initiated recovery for replica: "+
|
||||
replicaCoreName);
|
||||
log.info("Node " + replicaNodeName +
|
||||
" is not live, so skipping leader-initiated recovery for replica: core={} coreNodeName={}",
|
||||
replicaCoreProps.getCoreName(), replicaCoreNodeName);
|
||||
// publishDownState will be false to avoid publishing the "down" state too many times
|
||||
// as many errors can occur together and will each call into this method (SOLR-6189)
|
||||
}
|
||||
|
@ -1818,8 +1819,8 @@ public final class ZkController {
|
|||
ZkStateReader.NODE_NAME_PROP, replicaCoreProps.getNodeName(),
|
||||
ZkStateReader.SHARD_ID_PROP, shardId,
|
||||
ZkStateReader.COLLECTION_PROP, collection);
|
||||
log.warn("Leader is publishing core={} state={} on behalf of un-reachable replica {}; forcePublishState? "+forcePublishState,
|
||||
replicaCoreName, ZkStateReader.DOWN, replicaUrl);
|
||||
log.warn("Leader is publishing core={} coreNodeName ={} state={} on behalf of un-reachable replica {}; forcePublishState? "+forcePublishState,
|
||||
replicaCoreName, replicaCoreNodeName, ZkStateReader.DOWN, replicaUrl);
|
||||
overseerJobQueue.offer(ZkStateReader.toJSON(m));
|
||||
}
|
||||
|
||||
|
@ -1840,12 +1841,12 @@ public final class ZkController {
|
|||
}
|
||||
}
|
||||
|
||||
public String getLeaderInitiatedRecoveryState(String collection, String shardId, String coreName) {
|
||||
public String getLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName) {
|
||||
|
||||
if (collection == null || shardId == null || coreName == null)
|
||||
if (collection == null || shardId == null || coreNodeName == null)
|
||||
return null; // if we don't have complete data about a core in cloud mode, return null
|
||||
|
||||
String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreName);
|
||||
String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreNodeName);
|
||||
String state = null;
|
||||
try {
|
||||
byte[] data = zkClient.getData(znodePath, null, new Stat(), false);
|
||||
|
@ -1875,14 +1876,14 @@ public final class ZkController {
|
|||
return state;
|
||||
}
|
||||
|
||||
private void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreName, String state) {
|
||||
if (collection == null || shardId == null || coreName == null) {
|
||||
private void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName, String state) {
|
||||
if (collection == null || shardId == null || coreNodeName == null) {
|
||||
log.warn("Cannot set leader-initiated recovery state znode to "+state+" using: collection="+collection+
|
||||
"; shardId="+shardId+"; coreName="+coreName);
|
||||
"; shardId="+shardId+"; coreNodeName="+coreNodeName);
|
||||
return; // if we don't have complete data about a core in cloud mode, do nothing
|
||||
}
|
||||
|
||||
String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreName);
|
||||
String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreNodeName);
|
||||
|
||||
if (ZkStateReader.ACTIVE.equals(state)) {
|
||||
// since we're marking it active, we don't need this znode anymore, so delete instead of update
|
||||
|
@ -1923,7 +1924,7 @@ public final class ZkController {
|
|||
return "/collections/"+collection+"/leader_initiated_recovery/"+shardId;
|
||||
}
|
||||
|
||||
public String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId, String coreName) {
|
||||
return getLeaderInitiatedRecoveryZnodePath(collection, shardId)+"/"+coreName;
|
||||
public String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId, String coreNodeName) {
|
||||
return getLeaderInitiatedRecoveryZnodePath(collection, shardId)+"/"+coreNodeName;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue