SOLR-11812: Remove backward compatibility of old LIR implementation in 8.0

This commit is contained in:
Cao Manh Dat 2018-10-09 16:20:49 +07:00
parent 184ed88ecb
commit a37a213975
12 changed files with 14 additions and 1840 deletions

View File

@ -89,6 +89,8 @@ Other Changes
* LUCENE-8513: SlowCompositeReaderWrapper now uses MultiTerms directly instead of MultiFields (David Smiley)
* SOLR-11812: Remove backward compatibility of old LIR implementation in 8.0 (Cao Manh Dat)
================== 7.6.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

View File

@ -455,8 +455,6 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
boolean isLeader = true;
if (!isClosed) {
try {
// we must check LIR before registering as leader
checkLIR(coreName, allReplicasInLine);
if (replicaType == Replica.Type.TLOG) {
// stop replicate from old leader
zkController.stopReplicationFromLeader(coreName);
@ -509,16 +507,6 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
rejoinLeaderElection(core);
}
}
if (isLeader) {
// check for any replicas in my shard that were set to down by the previous leader
try {
startLeaderInitiatedRecoveryOnReplicas(coreName);
} catch (Exception exc) {
// don't want leader election to fail because of
// an error trying to tell others to recover
}
}
} else {
cancelElection();
}
@ -595,110 +583,6 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
return docCollection.getReplica(replicaName);
}
@Deprecated
public void checkLIR(String coreName, boolean allReplicasInLine)
throws InterruptedException, KeeperException, IOException {
if (allReplicasInLine) {
log.info("Found all replicas participating in election, clear LIR");
// SOLR-8075: A bug may allow the proper leader to get marked as LIR DOWN and
// if we are marked as DOWN but were able to become the leader, we remove
// the DOWN entry here so that we don't fail publishing ACTIVE due to being in LIR.
// We only do this if all the replicas participated in the election just in case
// this was a valid LIR entry and the proper leader replica is missing.
try (SolrCore core = cc.getCore(coreName)) {
final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(collection, shardId,
core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
if (lirState == Replica.State.DOWN) {
// We can do this before registering as leader because only setting DOWN requires that
// we are already registered as leader, and here we are setting ACTIVE
// The fact that we just won the zk leader election provides a quasi lock on setting this state, but
// we should improve this: see SOLR-8075 discussion
zkController.updateLeaderInitiatedRecoveryState(collection, shardId,
leaderProps.getStr(ZkStateReader.CORE_NODE_NAME_PROP), Replica.State.ACTIVE, core.getCoreDescriptor(), true);
}
}
} else {
try (SolrCore core = cc.getCore(coreName)) {
if (core != null) {
final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(collection, shardId,
core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
if (lirState == Replica.State.DOWN || lirState == Replica.State.RECOVERING) {
log.warn("The previous leader marked me " + core.getName()
+ " as " + lirState.toString() + " and I haven't recovered yet, so I shouldn't be the leader.");
throw new SolrException(ErrorCode.SERVER_ERROR, "Leader Initiated Recovery prevented leadership");
}
}
}
}
}
@Deprecated
private void startLeaderInitiatedRecoveryOnReplicas(String coreName) throws Exception {
try (SolrCore core = cc.getCore(coreName)) {
CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
String coll = cloudDesc.getCollectionName();
String shardId = cloudDesc.getShardId();
String coreNodeName = cloudDesc.getCoreNodeName();
if (coll == null || shardId == null) {
log.error("Cannot start leader-initiated recovery on new leader (core="+
coreName+",coreNodeName=" + coreNodeName + ") because collection and/or shard is null!");
return;
}
String znodePath = zkController.getLeaderInitiatedRecoveryZnodePath(coll, shardId);
List<String> replicas = null;
try {
replicas = zkClient.getChildren(znodePath, null, false);
} catch (NoNodeException nne) {
// this can be ignored
}
if (replicas != null && replicas.size() > 0) {
// set of replicas which is running in new LIR but lirState=DOWN
Set<String> replicasMustBeInLowerTerm = new HashSet<>();
for (String replicaCoreNodeName : replicas) {
if (coreNodeName.equals(replicaCoreNodeName))
continue; // added safe-guard so we don't mark this core as down
final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(coll, shardId, replicaCoreNodeName);
if (lirState == Replica.State.DOWN || lirState == Replica.State.RECOVERY_FAILED) {
log.info("After core={} coreNodeName={} was elected leader, a replica coreNodeName={} was found in state: "
+ lirState.toString() + " and needing recovery.", coreName, coreNodeName, replicaCoreNodeName);
List<Replica> replicasProps =
zkController.getZkStateReader().getClusterState().getCollection(collection)
.getSlice(shardId).getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
if (replicasProps != null && replicasProps.size() > 0) {
ZkCoreNodeProps coreNodeProps = null;
for (Replica p : replicasProps) {
if (p.getName().equals(replicaCoreNodeName)) {
coreNodeProps = new ZkCoreNodeProps(p);
break;
}
}
if (zkController.getShardTerms(collection, shardId).registered(replicaCoreNodeName)) {
replicasMustBeInLowerTerm.add(replicaCoreNodeName);
} else {
zkController.ensureReplicaInLeaderInitiatedRecovery(cc,
collection, shardId, coreNodeProps, core.getCoreDescriptor(),
false /* forcePublishState */);
}
}
}
}
// these replicas registered their terms so it is running with the new LIR implementation
// we can put this replica into recovery by increase our terms
zkController.getShardTerms(collection, shardId).ensureTermsIsHigher(coreNodeName, replicasMustBeInLowerTerm);
}
} // core gets closed automagically
}
// returns true if all replicas are found to be up, false if not
private boolean waitForReplicasToComeUp(int timeoutms) throws InterruptedException {
long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutms, TimeUnit.MILLISECONDS);

View File

@ -1,366 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import org.apache.http.NoHttpResponseException;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.zookeeper.KeeperException;
import org.apache.solr.util.RTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.net.ConnectException;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.List;
/**
* Background daemon thread that tries to send the REQUESTRECOVERY to a downed
* replica; used by a shard leader to nag a replica into recovering after the
* leader experiences an error trying to send an update request to the replica.
*/
@Deprecated
public class LeaderInitiatedRecoveryThread extends Thread {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected ZkController zkController;
protected CoreContainer coreContainer;
protected String collection;
protected String shardId;
protected ZkCoreNodeProps nodeProps;
protected int maxTries;
private CoreDescriptor leaderCd;
public LeaderInitiatedRecoveryThread(ZkController zkController,
CoreContainer cc,
String collection,
String shardId,
ZkCoreNodeProps nodeProps,
int maxTries,
CoreDescriptor leaderCd)
{
super("LeaderInitiatedRecoveryThread-"+nodeProps.getCoreName());
this.zkController = zkController;
this.coreContainer = cc;
this.collection = collection;
this.shardId = shardId;
this.nodeProps = nodeProps;
this.maxTries = maxTries;
this.leaderCd = leaderCd;
setDaemon(true);
}
public void run() {
RTimer timer = new RTimer();
String replicaCoreName = nodeProps.getCoreName();
String replicaCoreNodeName = ((Replica) nodeProps.getNodeProps()).getName();
String replicaNodeName = nodeProps.getNodeName();
final String replicaUrl = nodeProps.getCoreUrl();
if (!zkController.isReplicaInRecoveryHandling(replicaUrl)) {
throw new SolrException(ErrorCode.INVALID_STATE, "Replica: " + replicaUrl + " should have been marked under leader initiated recovery in ZkController but wasn't.");
}
if (!CloudUtil.replicaExists(zkController.getClusterState(), collection, shardId, replicaCoreNodeName)) {
log.info("Replica does not exist, skip doing LIR");
}
boolean sendRecoveryCommand = publishDownState(replicaCoreName, replicaCoreNodeName, replicaNodeName, replicaUrl, false);
if (sendRecoveryCommand) {
try {
sendRecoveryCommandWithRetry();
} catch (Exception exc) {
log.error(getName()+" failed due to: "+exc, exc);
if (exc instanceof SolrException) {
throw (SolrException)exc;
} else {
throw new SolrException(ErrorCode.SERVER_ERROR, exc);
}
} finally {
zkController.removeReplicaFromLeaderInitiatedRecoveryHandling(replicaUrl);
}
} else {
// replica is no longer in recovery on this node (may be handled on another node)
zkController.removeReplicaFromLeaderInitiatedRecoveryHandling(replicaUrl);
}
log.info("{} completed successfully after running for {}ms", getName(), timer.getTime());
}
public boolean publishDownState(String replicaCoreName, String replicaCoreNodeName, String replicaNodeName, String replicaUrl, boolean forcePublishState) {
boolean sendRecoveryCommand = true;
boolean publishDownState = false;
if (zkController.getZkStateReader().getClusterState().liveNodesContain(replicaNodeName)) {
try {
// create a znode that requires the replica needs to "ack" to verify it knows it was out-of-sync
updateLIRState(replicaCoreNodeName);
log.info("Put replica core={} coreNodeName={} on " +
replicaNodeName + " into leader-initiated recovery.", replicaCoreName, replicaCoreNodeName);
publishDownState = true;
} catch (Exception e) {
Throwable setLirZnodeFailedCause = SolrException.getRootCause(e);
log.error("Leader failed to set replica " +
nodeProps.getCoreUrl() + " state to DOWN due to: " + setLirZnodeFailedCause, setLirZnodeFailedCause);
if (setLirZnodeFailedCause instanceof KeeperException.SessionExpiredException
|| setLirZnodeFailedCause instanceof KeeperException.ConnectionLossException
|| setLirZnodeFailedCause instanceof ZkController.NotLeaderException) {
// our session is expired, which means our state is suspect, so don't go
// putting other replicas in recovery (see SOLR-6511)
sendRecoveryCommand = false;
forcePublishState = false; // no need to force publish any state in this case
} // else will go ahead and try to send the recovery command once after this error
}
} else {
log.info("Node " + replicaNodeName +
" is not live, so skipping leader-initiated recovery for replica: core={} coreNodeName={}",
replicaCoreName, replicaCoreNodeName);
// publishDownState will be false to avoid publishing the "down" state too many times
// as many errors can occur together and will each call into this method (SOLR-6189)
forcePublishState = false; // no need to force publish the state because replica is not live
sendRecoveryCommand = false; // no need to send recovery messages as well
}
try {
if (publishDownState || forcePublishState) {
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
ZkStateReader.BASE_URL_PROP, nodeProps.getBaseUrl(),
ZkStateReader.CORE_NAME_PROP, nodeProps.getCoreName(),
ZkStateReader.CORE_NODE_NAME_PROP, replicaCoreNodeName,
ZkStateReader.NODE_NAME_PROP, nodeProps.getNodeName(),
ZkStateReader.SHARD_ID_PROP, shardId,
ZkStateReader.COLLECTION_PROP, collection,
ZkStateReader.FORCE_SET_STATE_PROP, "false");
log.warn("Leader is publishing core={} coreNodeName ={} state={} on behalf of un-reachable replica {}",
replicaCoreName, replicaCoreNodeName, Replica.State.DOWN.toString(), replicaUrl);
zkController.getOverseerJobQueue().offer(Utils.toJSON(m));
}
} catch (Exception e) {
log.error("Could not publish 'down' state for replicaUrl: {}", replicaUrl, e);
}
return sendRecoveryCommand;
}
private void removeLIRState(String replicaCoreNodeName) {
zkController.updateLeaderInitiatedRecoveryState(collection,
shardId,
replicaCoreNodeName, Replica.State.ACTIVE, leaderCd, true);
}
/*
protected scope for testing purposes
*/
protected void updateLIRState(String replicaCoreNodeName) {
zkController.updateLeaderInitiatedRecoveryState(collection,
shardId,
replicaCoreNodeName, Replica.State.DOWN, leaderCd, true);
}
protected void sendRecoveryCommandWithRetry() throws Exception {
int tries = 0;
long waitBetweenTriesMs = 5000L;
boolean continueTrying = true;
String replicaCoreName = nodeProps.getCoreName();
String recoveryUrl = nodeProps.getBaseUrl();
String replicaNodeName = nodeProps.getNodeName();
String coreNeedingRecovery = nodeProps.getCoreName();
String replicaCoreNodeName = ((Replica) nodeProps.getNodeProps()).getName();
String replicaUrl = nodeProps.getCoreUrl();
log.info(getName()+" started running to send REQUESTRECOVERY command to "+replicaUrl+
"; will try for a max of "+(maxTries * (waitBetweenTriesMs/1000))+" secs");
RequestRecovery recoverRequestCmd = new RequestRecovery();
recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
recoverRequestCmd.setCoreName(coreNeedingRecovery);
while (continueTrying && ++tries <= maxTries) {
if (tries > 1) {
log.warn("Asking core={} coreNodeName={} on " + recoveryUrl +
" to recover; unsuccessful after "+tries+" of "+maxTries+" attempts so far ...", coreNeedingRecovery, replicaCoreNodeName);
} else {
log.info("Asking core={} coreNodeName={} on " + recoveryUrl + " to recover", coreNeedingRecovery, replicaCoreNodeName);
}
try (HttpSolrClient client = new HttpSolrClient.Builder(recoveryUrl)
.withConnectionTimeout(15000)
.withSocketTimeout(60000)
.build()) {
try {
client.request(recoverRequestCmd);
log.info("Successfully sent " + CoreAdminAction.REQUESTRECOVERY +
" command to core={} coreNodeName={} on " + recoveryUrl, coreNeedingRecovery, replicaCoreNodeName);
continueTrying = false; // succeeded, so stop looping
} catch (Exception t) {
Throwable rootCause = SolrException.getRootCause(t);
boolean wasCommError =
(rootCause instanceof ConnectException ||
rootCause instanceof ConnectTimeoutException ||
rootCause instanceof NoHttpResponseException ||
rootCause instanceof SocketException ||
rootCause instanceof UnknownHostException);
if (!wasCommError) {
continueTrying = false;
}
if (rootCause.getMessage().contains("Unable to locate core")) {
log.info("Replica {} is removed, hence remove its lir state", replicaCoreNodeName);
removeLIRState(replicaCoreNodeName);
break;
} else {
SolrException.log(log, recoveryUrl + ": Could not tell a replica to recover, wasCommError:"+wasCommError, t);
}
}
}
// wait a few seconds
if (continueTrying) {
try {
Thread.sleep(waitBetweenTriesMs);
} catch (InterruptedException ignoreMe) {
Thread.currentThread().interrupt();
}
if (coreContainer.isShutDown()) {
log.warn("Stop trying to send recovery command to downed replica core={} coreNodeName={} on "
+ replicaNodeName + " because my core container is closed.", coreNeedingRecovery, replicaCoreNodeName);
continueTrying = false;
break;
}
// see if the replica's node is still live, if not, no need to keep doing this loop
ZkStateReader zkStateReader = zkController.getZkStateReader();
if (!zkStateReader.getClusterState().liveNodesContain(replicaNodeName)) {
log.warn("Node "+replicaNodeName+" hosting core "+coreNeedingRecovery+
" is no longer live. No need to keep trying to tell it to recover!");
continueTrying = false;
break;
}
String leaderCoreNodeName = leaderCd.getCloudDescriptor().getCoreNodeName();
// stop trying if I'm no longer the leader
if (leaderCoreNodeName != null && collection != null) {
String leaderCoreNodeNameFromZk = null;
try {
leaderCoreNodeNameFromZk = zkController.getZkStateReader().getLeaderRetry(collection, shardId, 1000).getName();
} catch (Exception exc) {
log.error("Failed to determine if " + leaderCoreNodeName + " is still the leader for " + collection +
" " + shardId + " before starting leader-initiated recovery thread for " + replicaUrl + " due to: " + exc);
}
if (!leaderCoreNodeName.equals(leaderCoreNodeNameFromZk)) {
log.warn("Stop trying to send recovery command to downed replica core=" + coreNeedingRecovery +
",coreNodeName=" + replicaCoreNodeName + " on " + replicaNodeName + " because " +
leaderCoreNodeName + " is no longer the leader! New leader is " + leaderCoreNodeNameFromZk);
continueTrying = false;
break;
}
if (!leaderCd.getCloudDescriptor().isLeader()) {
log.warn("Stop trying to send recovery command to downed replica core=" + coreNeedingRecovery +
",coreNodeName=" + replicaCoreNodeName + " on " + replicaNodeName + " because " +
leaderCoreNodeName + " is no longer the leader!");
continueTrying = false;
break;
}
}
// additional safeguard against the replica trying to be in the active state
// before acknowledging the leader initiated recovery command
if (collection != null && shardId != null) {
try {
// call out to ZooKeeper to get the leader-initiated recovery state
final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(collection, shardId, replicaCoreNodeName);
if (lirState == null) {
log.warn("Stop trying to send recovery command to downed replica core="+coreNeedingRecovery+
",coreNodeName=" + replicaCoreNodeName + " on "+replicaNodeName+" because the znode no longer exists.");
continueTrying = false;
break;
}
if (lirState == Replica.State.RECOVERING) {
// replica has ack'd leader initiated recovery and entered the recovering state
// so we don't need to keep looping to send the command
continueTrying = false;
log.info("Replica "+coreNeedingRecovery+
" on node "+replicaNodeName+" ack'd the leader initiated recovery state, "
+ "no need to keep trying to send recovery command");
} else {
String lcnn = zkStateReader.getLeaderRetry(collection, shardId, 5000).getName();
List<ZkCoreNodeProps> replicaProps =
zkStateReader.getReplicaProps(collection, shardId, lcnn);
if (replicaProps != null && replicaProps.size() > 0) {
for (ZkCoreNodeProps prop : replicaProps) {
final Replica replica = (Replica) prop.getNodeProps();
if (replicaCoreNodeName.equals(replica.getName())) {
if (replica.getState() == Replica.State.ACTIVE) {
// replica published its state as "active",
// which is bad if lirState is still "down"
if (lirState == Replica.State.DOWN) {
// OK, so the replica thinks it is active, but it never ack'd the leader initiated recovery
// so its state cannot be trusted and it needs to be told to recover again ... and we keep looping here
log.warn("Replica core={} coreNodeName={} set to active but the leader thinks it should be in recovery;"
+ " forcing it back to down state to re-run the leader-initiated recovery process; props: " + replicaProps.get(0), coreNeedingRecovery, replicaCoreNodeName);
publishDownState(replicaCoreName, replicaCoreNodeName, replicaNodeName, replicaUrl, true);
}
}
break;
}
}
}
}
} catch (Exception ignoreMe) {
log.warn("Failed to determine state of core={} coreNodeName={} due to: "+ignoreMe, coreNeedingRecovery, replicaCoreNodeName);
// eventually this loop will exhaust max tries and stop so we can just log this for now
}
}
}
}
// replica is no longer in recovery on this node (may be handled on another node)
zkController.removeReplicaFromLeaderInitiatedRecoveryHandling(replicaUrl);
if (continueTrying) {
// ugh! this means the loop timed out before the recovery command could be delivered
// how exotic do we want to get here?
log.error("Timed out after waiting for "+(tries * (waitBetweenTriesMs/1000))+
" secs to send the recovery request to: "+replicaUrl+"; not much more we can do here?");
// TODO: need to raise a JMX event to allow monitoring tools to take over from here
}
}
}

View File

@ -1115,9 +1115,7 @@ public class ZkController {
ZkShardTerms shardTerms = getShardTerms(collection, cloudDesc.getShardId());
// This flag is used for testing rolling updates and should be removed in SOLR-11812
boolean isRunningInNewLIR = "new".equals(desc.getCoreProperty("lirVersion", "new"));
if (isRunningInNewLIR && replica.getType() != Type.PULL) {
if (replica.getType() != Type.PULL) {
shardTerms.registerTerm(coreZkNodeName);
}
@ -1196,7 +1194,7 @@ public class ZkController {
publish(desc, Replica.State.ACTIVE);
}
if (isRunningInNewLIR && replica.getType() != Type.PULL) {
if (replica.getType() != Type.PULL) {
// the watcher is added to a set so multiple calls of this method will left only one watcher
shardTerms.addListener(new RecoveringCoreTermWatcher(core.getCoreDescriptor(), getCoreContainer()));
}
@ -1406,15 +1404,6 @@ public class ZkController {
return true;
}
// see if the leader told us to recover
final Replica.State lirState = getLeaderInitiatedRecoveryState(collection, shardId,
core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
if (lirState == Replica.State.DOWN) {
log.info("Leader marked core " + core.getName() + " down; starting recovery process");
core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
return true;
}
ZkShardTerms zkShardTerms = getShardTerms(collection, shardId);
if (zkShardTerms.registered(coreZkNodeName) && !zkShardTerms.canBecomeLeader(coreZkNodeName)) {
log.info("Leader's term larger than core " + core.getName() + "; starting recovery process");
@ -1468,29 +1457,6 @@ public class ZkController {
String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
// If the leader initiated recovery, then verify that this replica has performed
// recovery as requested before becoming active; don't even look at lirState if going down
if (state != Replica.State.DOWN) {
final Replica.State lirState = getLeaderInitiatedRecoveryState(collection, shardId, coreNodeName);
if (lirState != null) {
assert cd.getCloudDescriptor().getReplicaType() != Replica.Type.PULL: "LIR should not happen for pull replicas!";
if (state == Replica.State.ACTIVE) {
// trying to become active, so leader-initiated state must be recovering
if (lirState == Replica.State.RECOVERING) {
updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.ACTIVE, cd, true);
} else if (lirState == Replica.State.DOWN) {
throw new SolrException(ErrorCode.INVALID_STATE,
"Cannot publish state of core '" + cd.getName() + "' as active without recovering first!");
}
} else if (state == Replica.State.RECOVERING) {
// if it is currently DOWN, then trying to enter into recovering state is good
if (lirState == Replica.State.DOWN) {
updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.RECOVERING, cd, true);
}
}
}
}
Map<String,Object> props = new HashMap<>();
props.put(Overseer.QUEUE_OPERATION, "state");
props.put(ZkStateReader.STATE_PROP, state.toString());
@ -1529,15 +1495,13 @@ public class ZkController {
log.info("The core '{}' had failed to initialize before.", cd.getName());
}
// This flag is used for testing rolling updates and should be removed in SOLR-11812
boolean isRunningInNewLIR = "new".equals(cd.getCoreProperty("lirVersion", "new"));
// pull replicas are excluded because their terms are not considered
if (state == Replica.State.RECOVERING && isRunningInNewLIR && cd.getCloudDescriptor().getReplicaType() != Type.PULL) {
if (state == Replica.State.RECOVERING && cd.getCloudDescriptor().getReplicaType() != Type.PULL) {
// state is used by client, state of replica can change from RECOVERING to DOWN without needed to finish recovery
// by calling this we will know that a replica actually finished recovery or not
getShardTerms(collection, shardId).startRecovering(coreNodeName);
}
if (state == Replica.State.ACTIVE && isRunningInNewLIR && cd.getCloudDescriptor().getReplicaType() != Type.PULL) {
if (state == Replica.State.ACTIVE && cd.getCloudDescriptor().getReplicaType() != Type.PULL) {
getShardTerms(collection, shardId).doneRecovering(coreNodeName);
}
@ -1857,24 +1821,12 @@ public class ZkController {
boolean isLeader = leaderProps.getCoreUrl().equals(ourUrl);
if (!isLeader && !SKIP_AUTO_RECOVERY) {
// detect if this core is in leader-initiated recovery and if so,
// then we don't need the leader to wait on seeing the down state
Replica.State lirState = null;
try {
lirState = getLeaderInitiatedRecoveryState(collection, shard, myCoreNodeName);
} catch (Exception exc) {
log.error("Failed to determine if replica " + myCoreNodeName +
" is in leader-initiated recovery due to: " + exc, exc);
}
if (lirState != null || !getShardTerms(collection, shard).canBecomeLeader(myCoreNodeName)) {
if (!getShardTerms(collection, shard).canBecomeLeader(myCoreNodeName)) {
log.debug("Term of replica " + myCoreNodeName +
" is already less than leader, so not waiting for leader to see down state.");
} else {
log.info("Replica " + myCoreNodeName +
" NOT in leader-initiated recovery, need to wait for leader to see down state.");
log.info("Replica need to wait for leader to see down state.");
try (HttpSolrClient client = new Builder(leaderBaseUrl)
.withConnectionTimeout(15000)
@ -2211,291 +2163,6 @@ public class ZkController {
return cc;
}
/**
* When a leader receives a communication error when trying to send a request to a replica,
* it calls this method to ensure the replica enters recovery when connectivity is restored.
* <p>
* returns true if the node hosting the replica is still considered "live" by ZooKeeper;
* false means the node is not live either, so no point in trying to send recovery commands
* to it.
*/
@Deprecated
public boolean ensureReplicaInLeaderInitiatedRecovery(
final CoreContainer container,
final String collection, final String shardId, final ZkCoreNodeProps replicaCoreProps,
CoreDescriptor leaderCd, boolean forcePublishState)
throws KeeperException, InterruptedException {
final String replicaUrl = replicaCoreProps.getCoreUrl();
if (collection == null)
throw new IllegalArgumentException("collection parameter cannot be null for starting leader-initiated recovery for replica: " + replicaUrl);
if (shardId == null)
throw new IllegalArgumentException("shard parameter cannot be null for starting leader-initiated recovery for replica: " + replicaUrl);
if (replicaUrl == null)
throw new IllegalArgumentException("replicaUrl parameter cannot be null for starting leader-initiated recovery");
// First, determine if this replica is already in recovery handling
// which is needed because there can be many concurrent errors flooding in
// about the same replica having trouble and we only need to send the "needs"
// recovery signal once
boolean nodeIsLive = true;
String replicaNodeName = replicaCoreProps.getNodeName();
String replicaCoreNodeName = ((Replica) replicaCoreProps.getNodeProps()).getName();
assert replicaCoreNodeName != null : "No core name for replica " + replicaNodeName;
synchronized (replicasInLeaderInitiatedRecovery) {
if (replicasInLeaderInitiatedRecovery.containsKey(replicaUrl)) {
if (!forcePublishState) {
log.debug("Replica {} already in leader-initiated recovery handling.", replicaUrl);
return false; // already in this recovery process
}
}
// we only really need to try to start the LIR process if the node itself is "live"
if (getZkStateReader().getClusterState().liveNodesContain(replicaNodeName)
&& CloudUtil.replicaExists(getZkStateReader().getClusterState(), collection, shardId, replicaCoreNodeName)) {
LeaderInitiatedRecoveryThread lirThread =
new LeaderInitiatedRecoveryThread(this,
container,
collection,
shardId,
replicaCoreProps,
120,
leaderCd);
ExecutorService executor = container.getUpdateShardHandler().getUpdateExecutor();
try {
MDC.put("DistributedUpdateProcessor.replicaUrlToRecover", replicaCoreProps.getCoreUrl());
executor.execute(lirThread);
} finally {
MDC.remove("DistributedUpdateProcessor.replicaUrlToRecover");
}
// create a znode that requires the replica needs to "ack" to verify it knows it was out-of-sync
replicasInLeaderInitiatedRecovery.put(replicaUrl,
getLeaderInitiatedRecoveryZnodePath(collection, shardId, replicaCoreNodeName));
log.info("Put replica core={} coreNodeName={} on " +
replicaNodeName + " into leader-initiated recovery.", replicaCoreProps.getCoreName(), replicaCoreNodeName);
} else {
nodeIsLive = false; // we really don't need to send the recovery request if the node is NOT live
log.info("Node {} is not live or replica {} is deleted, so skipping leader-initiated recovery for replica: core={}",
replicaNodeName, replicaCoreNodeName, replicaCoreProps.getCoreName());
// publishDownState will be false to avoid publishing the "down" state too many times
// as many errors can occur together and will each call into this method (SOLR-6189)
}
}
return nodeIsLive;
}
@Deprecated
public boolean isReplicaInRecoveryHandling(String replicaUrl) {
boolean exists = false;
synchronized (replicasInLeaderInitiatedRecovery) {
exists = replicasInLeaderInitiatedRecovery.containsKey(replicaUrl);
}
return exists;
}
@Deprecated
public void removeReplicaFromLeaderInitiatedRecoveryHandling(String replicaUrl) {
synchronized (replicasInLeaderInitiatedRecovery) {
replicasInLeaderInitiatedRecovery.remove(replicaUrl);
}
}
@Deprecated
public Replica.State getLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName) {
final Map<String, Object> stateObj = getLeaderInitiatedRecoveryStateObject(collection, shardId, coreNodeName);
if (stateObj == null) {
return null;
}
final String stateStr = (String) stateObj.get(ZkStateReader.STATE_PROP);
return stateStr == null ? null : Replica.State.getState(stateStr);
}
@Deprecated
public Map<String, Object> getLeaderInitiatedRecoveryStateObject(String collection, String shardId, String coreNodeName) {
if (collection == null || shardId == null || coreNodeName == null)
return null; // if we don't have complete data about a core in cloud mode, return null
String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreNodeName);
byte[] stateData = null;
try {
stateData = zkClient.getData(znodePath, null, new Stat(), false);
} catch (NoNodeException ignoreMe) {
// safe to ignore as this znode will only exist if the leader initiated recovery
} catch (ConnectionLossException | SessionExpiredException cle) {
// sort of safe to ignore ??? Usually these are seen when the core is going down
// or there are bigger issues to deal with than reading this znode
log.warn("Unable to read " + znodePath + " due to: " + cle);
} catch (Exception exc) {
log.error("Failed to read data from znode " + znodePath + " due to: " + exc);
if (exc instanceof SolrException) {
throw (SolrException) exc;
} else {
throw new SolrException(ErrorCode.SERVER_ERROR,
"Failed to read data from znodePath: " + znodePath, exc);
}
}
Map<String, Object> stateObj = null;
if (stateData != null && stateData.length > 0) {
// TODO: Remove later ... this is for upgrading from 4.8.x to 4.10.3 (see: SOLR-6732)
if (stateData[0] == (byte) '{') {
Object parsedJson = Utils.fromJSON(stateData);
if (parsedJson instanceof Map) {
stateObj = (Map<String, Object>) parsedJson;
} else {
throw new SolrException(ErrorCode.SERVER_ERROR, "Leader-initiated recovery state data is invalid! " + parsedJson);
}
} else {
// old format still in ZK
stateObj = Utils.makeMap("state", new String(stateData, StandardCharsets.UTF_8));
}
}
return stateObj;
}
@Deprecated
public void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName,
Replica.State state, CoreDescriptor leaderCd, boolean retryOnConnLoss) {
if (collection == null || shardId == null || coreNodeName == null) {
log.warn("Cannot set leader-initiated recovery state znode to "
+ state.toString() + " using: collection=" + collection
+ "; shardId=" + shardId + "; coreNodeName=" + coreNodeName);
return; // if we don't have complete data about a core in cloud mode, do nothing
}
assert leaderCd != null;
assert leaderCd.getCloudDescriptor() != null;
String leaderCoreNodeName = leaderCd.getCloudDescriptor().getCoreNodeName();
String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreNodeName);
if (state == Replica.State.ACTIVE) {
// since we're marking it active, we don't need this znode anymore, so delete instead of update
try {
zkClient.delete(znodePath, -1, retryOnConnLoss);
} catch (Exception justLogIt) {
log.warn("Failed to delete znode " + znodePath, justLogIt);
}
return;
}
Map<String, Object> stateObj = null;
try {
stateObj = getLeaderInitiatedRecoveryStateObject(collection, shardId, coreNodeName);
} catch (Exception exc) {
log.warn(exc.getMessage(), exc);
}
if (stateObj == null) {
stateObj = Utils.makeMap();
}
stateObj.put(ZkStateReader.STATE_PROP, state.toString());
// only update the createdBy value if it's not set
if (stateObj.get("createdByNodeName") == null) {
stateObj.put("createdByNodeName", this.nodeName);
}
if (stateObj.get("createdByCoreNodeName") == null && leaderCoreNodeName != null) {
stateObj.put("createdByCoreNodeName", leaderCoreNodeName);
}
byte[] znodeData = Utils.toJSON(stateObj);
try {
if (state == Replica.State.DOWN) {
markShardAsDownIfLeader(collection, shardId, leaderCd, znodePath, znodeData, retryOnConnLoss);
} else {
// must retry on conn loss otherwise future election attempts may assume wrong LIR state
if (zkClient.exists(znodePath, true)) {
zkClient.setData(znodePath, znodeData, retryOnConnLoss);
} else {
zkClient.makePath(znodePath, znodeData, retryOnConnLoss);
}
}
log.debug("Wrote {} to {}", state.toString(), znodePath);
} catch (Exception exc) {
if (exc instanceof SolrException) {
throw (SolrException) exc;
} else {
throw new SolrException(ErrorCode.SERVER_ERROR,
"Failed to update data to " + state.toString() + " for znode: " + znodePath, exc);
}
}
}
/**
* we use ZK's multi-transactional semantics to ensure that we are able to
* publish a replica as 'down' only if our leader election node still exists
* in ZK. This ensures that a long running network partition caused by GC etc
* doesn't let us mark a node as down *after* we've already lost our session
*/
private void markShardAsDownIfLeader(String collection, String shardId, CoreDescriptor leaderCd,
String znodePath, byte[] znodeData,
boolean retryOnConnLoss) throws KeeperException, InterruptedException {
if (!leaderCd.getCloudDescriptor().isLeader()) {
log.info("No longer leader, aborting attempt to mark shard down as part of LIR");
throw new NotLeaderException(ErrorCode.SERVER_ERROR, "Locally, we do not think we are the leader.");
}
ContextKey key = new ContextKey(collection, leaderCd.getCloudDescriptor().getCoreNodeName());
ElectionContext context = electionContexts.get(key);
// we make sure we locally think we are the leader before and after getting the context - then
// we only try zk if we still think we are the leader and have our leader context
if (context == null || !leaderCd.getCloudDescriptor().isLeader()) {
log.info("No longer leader, aborting attempt to mark shard down as part of LIR");
throw new NotLeaderException(ErrorCode.SERVER_ERROR, "Locally, we do not think we are the leader.");
}
// we think we are the leader - get the expected shard leader version
// we use this version and multi to ensure *only* the current zk registered leader
// for a shard can put a replica into LIR
Integer leaderZkNodeParentVersion = ((ShardLeaderElectionContextBase)context).getLeaderZkNodeParentVersion();
// TODO: should we do this optimistically to avoid races?
if (zkClient.exists(znodePath, retryOnConnLoss)) {
List<Op> ops = new ArrayList<>(2);
ops.add(Op.check(new org.apache.hadoop.fs.Path(((ShardLeaderElectionContextBase)context).leaderPath).getParent().toString(), leaderZkNodeParentVersion));
ops.add(Op.setData(znodePath, znodeData, -1));
zkClient.multi(ops, retryOnConnLoss);
} else {
String parentZNodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId);
try {
// make sure we don't create /collections/{collection} if they do not exist with 2 param
zkClient.makePath(parentZNodePath, (byte[]) null, CreateMode.PERSISTENT, (Watcher) null, true, retryOnConnLoss, 2);
} catch (KeeperException.NodeExistsException nee) {
// if it exists, that's great!
}
// we only create the entry if the context we are using is registered as the current leader in ZK
List<Op> ops = new ArrayList<>(2);
ops.add(Op.check(new org.apache.hadoop.fs.Path(((ShardLeaderElectionContextBase)context).leaderPath).getParent().toString(), leaderZkNodeParentVersion));
ops.add(Op.create(znodePath, znodeData, zkClient.getZkACLProvider().getACLsToAdd(znodePath),
CreateMode.PERSISTENT));
zkClient.multi(ops, retryOnConnLoss);
}
}
@Deprecated
public static String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId) {
return "/collections/" + collection + "/leader_initiated_recovery/" + shardId;
}
@Deprecated
public static String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId, String coreNodeName) {
return getLeaderInitiatedRecoveryZnodePath(collection, shardId) + "/" + coreNodeName;
}
public void throwErrorIfReplicaReplaced(CoreDescriptor desc) {
ClusterState clusterState = getZkStateReader().getClusterState();
if (clusterState != null) {
@ -2830,15 +2497,6 @@ public class ZkController {
}
}
/**
* Thrown during leader initiated recovery process if current node is not leader
*/
public static class NotLeaderException extends SolrException {
public NotLeaderException(ErrorCode code, String msg) {
super(code, msg);
}
}
/**
* Thrown during pre register process if the replica is not present in clusterstate
*/

View File

@ -1193,15 +1193,6 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
"The shard already has an active leader. Force leader is not applicable. State: " + slice);
}
// Clear out any LIR state
String lirPath = handler.coreContainer.getZkController().getLeaderInitiatedRecoveryZnodePath(collectionName, sliceId);
if (handler.coreContainer.getZkController().getZkClient().exists(lirPath, true)) {
StringBuilder sb = new StringBuilder();
handler.coreContainer.getZkController().getZkClient().printLayout(lirPath, 4, sb);
log.info("Cleaning out LIR data, which was: {}", sb);
handler.coreContainer.getZkController().getZkClient().clean(lirPath);
}
final Set<String> liveNodes = clusterState.getLiveNodes();
List<Replica> liveReplicas = slice.getReplicas().stream()
.filter(rep -> liveNodes.contains(rep.getNodeName())).collect(Collectors.toList());

View File

@ -207,10 +207,6 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private final boolean cloneRequiredOnLeader;
private final Replica.Type replicaType;
@Deprecated
// this flag, used for testing rolling updates, should be removed by SOLR-11812
private final boolean isOldLIRMode;
public DistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
this(req, rsp, new AtomicUpdateDocumentMerger(req), next);
}
@ -229,7 +225,6 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
this.ulog = req.getCore().getUpdateHandler().getUpdateLog();
this.vinfo = ulog == null ? null : ulog.getVersionInfo();
this.isOldLIRMode = !"new".equals(req.getCore().getCoreDescriptor().getCoreProperty("lirVersion", "new"));
versionsStored = this.vinfo != null && this.vinfo.getVersionField() != null;
returnVersions = req.getParams().getBool(UpdateParams.VERSIONS ,false);
@ -381,7 +376,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
String coreNodeName = replica.getName();
if (skipList != null && skipListSet.contains(replica.getCoreUrl())) {
log.info("check url:" + replica.getCoreUrl() + " against:" + skipListSet + " result:true");
} else if(!isOldLIRMode && zkShardTerms.registered(coreNodeName) && zkShardTerms.skipSendingUpdatesTo(coreNodeName)) {
} else if(zkShardTerms.registered(coreNodeName) && zkShardTerms.skipSendingUpdatesTo(coreNodeName)) {
log.debug("skip url:{} cause its term is less than leader", replica.getCoreUrl());
skippedCoreNodeNames.add(replica.getName());
} else if (!clusterState.getLiveNodes().contains(replica.getNodeName()) || replica.getState() == Replica.State.DOWN) {
@ -769,7 +764,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// TODO: optionally fail if n replicas are not reached...
private void doFinish() {
boolean shouldUpdateTerms = isLeader && !isOldLIRMode && isIndexChanged;
boolean shouldUpdateTerms = isLeader && isIndexChanged;
if (shouldUpdateTerms) {
ZkShardTerms zkShardTerms = zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId());
if (skippedCoreNodeNames != null) {
@ -875,21 +870,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// if false, then the node is probably not "live" anymore
// and we do not need to send a recovery message
Throwable rootCause = SolrException.getRootCause(error.e);
if (!isOldLIRMode && zkController.getShardTerms(collection, shardId).registered(coreNodeName)) {
log.error("Setting up to try to start recovery on replica {} with url {} by increasing leader term", coreNodeName, replicaUrl, rootCause);
replicasShouldBeInLowerTerms.add(coreNodeName);
} else {
// The replica did not registered its term, so it must run with old LIR implementation
log.error("Setting up to try to start recovery on replica {}", replicaUrl, rootCause);
zkController.ensureReplicaInLeaderInitiatedRecovery(
req.getCore().getCoreContainer(),
collection,
shardId,
stdNode.getNodeProps(),
req.getCore().getCoreDescriptor(),
false /* forcePublishState */
);
}
log.error("Setting up to try to start recovery on replica {} with url {} by increasing leader term", coreNodeName, replicaUrl, rootCause);
replicasShouldBeInLowerTerms.add(coreNodeName);
} catch (Exception exc) {
Throwable setLirZnodeFailedCause = SolrException.getRootCause(exc);
log.error("Leader failed to set replica " +
@ -913,7 +895,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
}
if (!isOldLIRMode && !replicasShouldBeInLowerTerms.isEmpty()) {
if (!replicasShouldBeInLowerTerms.isEmpty()) {
zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId())
.ensureTermsIsHigher(cloudDesc.getCoreNodeName(), replicasShouldBeInLowerTerms);
}

View File

@ -392,19 +392,6 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
log.info("Timeout wait for state {}", getCollectionState(collectionName));
throw e;
}
TimeOut timeOut = new TimeOut(20, TimeUnit.SECONDS, TimeSource.NANO_TIME);
timeOut.waitFor("Time out waiting for LIR state get removed", () -> {
String lirPath = ZkController.getLeaderInitiatedRecoveryZnodePath(collectionName, "shard1");
try {
List<String> children = zkClient().getChildren(lirPath, null, true);
return children.size() == 0;
} catch (KeeperException.NoNodeException e) {
return true;
} catch (Exception e) {
throw new AssertionError(e);
}
});
}
}

View File

@ -18,10 +18,8 @@ package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
@ -33,8 +31,6 @@ import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Replica.State;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
@ -204,117 +200,6 @@ public class ForceLeaderTest extends HttpPartitionTest {
}
}
/***
* Tests that FORCELEADER can get an active leader after leader puts all replicas in LIR and itself goes down,
* hence resulting in a leaderless shard.
*/
@Test
@Slow
//TODO remove in SOLR-11812
// 12-Jun-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
public void testReplicasInLIRNoLeader() throws Exception {
handle.put("maxScore", SKIPVAL);
handle.put("timestamp", SKIPVAL);
String testCollectionName = "forceleader_test_collection";
createOldLirCollection(testCollectionName, 3);
cloudClient.setDefaultCollection(testCollectionName);
try {
List<Replica> notLeaders = ensureAllReplicasAreActive(testCollectionName, SHARD1, 1, 3, maxWaitSecsToSeeAllActive);
assertEquals("Expected 2 replicas for collection " + testCollectionName
+ " but found " + notLeaders.size() + "; clusterState: "
+ printClusterStateInfo(testCollectionName), 2, notLeaders.size());
Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, SHARD1);
JettySolrRunner notLeader0 = getJettyOnPort(getReplicaPort(notLeaders.get(0)));
ZkController zkController = notLeader0.getCoreContainer().getZkController();
putNonLeadersIntoLIR(testCollectionName, SHARD1, zkController, leader, notLeaders);
cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName);
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
int numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1);
assertEquals("Expected only 0 active replica but found " + numActiveReplicas +
"; clusterState: " + printClusterStateInfo(), 0, numActiveReplicas);
int numReplicasOnLiveNodes = 0;
for (Replica rep : clusterState.getCollection(testCollectionName).getSlice(SHARD1).getReplicas()) {
if (clusterState.getLiveNodes().contains(rep.getNodeName())) {
numReplicasOnLiveNodes++;
}
}
assertEquals(2, numReplicasOnLiveNodes);
log.info("Before forcing leader: " + printClusterStateInfo());
// Assert there is no leader yet
assertNull("Expected no leader right now. State: " + clusterState.getCollection(testCollectionName).getSlice(SHARD1),
clusterState.getCollection(testCollectionName).getSlice(SHARD1).getLeader());
assertSendDocFails(3);
doForceLeader(cloudClient, testCollectionName, SHARD1);
// By now we have an active leader. Wait for recoveries to begin
waitForRecoveriesToFinish(testCollectionName, cloudClient.getZkStateReader(), true);
cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName);
clusterState = cloudClient.getZkStateReader().getClusterState();
log.info("After forcing leader: " + clusterState.getCollection(testCollectionName).getSlice(SHARD1));
// we have a leader
Replica newLeader = clusterState.getCollectionOrNull(testCollectionName).getSlice(SHARD1).getLeader();
assertNotNull(newLeader);
// leader is active
assertEquals(State.ACTIVE, newLeader.getState());
numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1);
assertEquals(2, numActiveReplicas);
// Assert that indexing works again
log.info("Sending doc 4...");
sendDoc(4);
log.info("Committing...");
cloudClient.commit();
log.info("Doc 4 sent and commit issued");
assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 1);
assertDocsExistInAllReplicas(notLeaders, testCollectionName, 4, 4);
// Docs 1 and 4 should be here. 2 was lost during the partition, 3 had failed to be indexed.
log.info("Checking doc counts...");
ModifiableSolrParams params = new ModifiableSolrParams();
params.add("q", "*:*");
assertEquals("Expected only 2 documents in the index", 2, cloudClient.query(params).getResults().getNumFound());
bringBackOldLeaderAndSendDoc(testCollectionName, leader, notLeaders, 5);
} finally {
log.info("Cleaning up after the test.");
// try to clean up
attemptCollectionDelete(cloudClient, testCollectionName);
}
}
private void createOldLirCollection(String collection, int numReplicas) throws IOException, SolrServerException {
if (onlyLeaderIndexes) {
CollectionAdminRequest
.createCollection(collection, "conf1", 1, 0, numReplicas, 0)
.setCreateNodeSet("")
.process(cloudClient);
} else {
CollectionAdminRequest.createCollection(collection, "conf1", 1, numReplicas)
.setCreateNodeSet("")
.process(cloudClient);
}
Properties oldLir = new Properties();
oldLir.setProperty("lirVersion", "old");
for (int i = 0; i < numReplicas; i++) {
// this is the only way to create replicas which run in old lir implementation
CollectionAdminRequest
.addReplicaToShard(collection, "shard1", onlyLeaderIndexes? Replica.Type.TLOG: Replica.Type.NRT)
.setProperties(oldLir)
.process(cloudClient);
}
}
private void assertSendDocFails(int docId) throws Exception {
// sending a doc in this state fails
expectThrows(SolrException.class,
@ -322,68 +207,6 @@ public class ForceLeaderTest extends HttpPartitionTest {
() -> sendDoc(docId));
}
private void putNonLeadersIntoLIR(String collectionName, String shard, ZkController zkController, Replica leader, List<Replica> notLeaders) throws Exception {
SocketProxy[] nonLeaderProxies = new SocketProxy[notLeaders.size()];
for (int i = 0; i < notLeaders.size(); i++)
nonLeaderProxies[i] = getProxyForReplica(notLeaders.get(i));
sendDoc(1);
// ok, now introduce a network partition between the leader and both replicas
log.info("Closing proxies for the non-leader replicas...");
for (SocketProxy proxy : nonLeaderProxies)
proxy.close();
// indexing during a partition
log.info("Sending a doc during the network partition...");
sendDoc(2);
// Wait a little
Thread.sleep(2000);
// Kill the leader
log.info("Killing leader for shard1 of " + collectionName + " on node " + leader.getNodeName() + "");
JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
getProxyForReplica(leader).close();
leaderJetty.stop();
// Wait for a steady state, till LIR flags have been set and the shard is leaderless
log.info("Sleep and periodically wake up to check for state...");
for (int i = 0; i < 20; i++) {
Thread.sleep(1000);
State lirStates[] = new State[notLeaders.size()];
for (int j = 0; j < notLeaders.size(); j++)
lirStates[j] = zkController.getLeaderInitiatedRecoveryState(collectionName, shard, notLeaders.get(j).getName());
ClusterState clusterState = zkController.getZkStateReader().getClusterState();
boolean allDown = true;
for (State lirState : lirStates)
if (Replica.State.DOWN.equals(lirState) == false)
allDown = false;
if (allDown && clusterState.getCollection(collectionName).getSlice(shard).getLeader() == null) {
break;
}
log.warn("Attempt " + i + ", waiting on for 1 sec to settle down in the steady state. State: " +
printClusterStateInfo(collectionName));
log.warn("LIR state: " + getLIRState(zkController, collectionName, shard));
}
log.info("Waking up...");
// remove the network partition
log.info("Reopening the proxies for the non-leader replicas...");
for (SocketProxy proxy : nonLeaderProxies)
proxy.reopen();
log.info("LIR state: " + getLIRState(zkController, collectionName, shard));
State lirStates[] = new State[notLeaders.size()];
for (int j = 0; j < notLeaders.size(); j++)
lirStates[j] = zkController.getLeaderInitiatedRecoveryState(collectionName, shard, notLeaders.get(j).getName());
for (State lirState : lirStates)
assertTrue("Expected that the replicas would be in LIR state by now. LIR states: "+Arrays.toString(lirStates),
Replica.State.DOWN == lirState || Replica.State.RECOVERING == lirState);
}
private void bringBackOldLeaderAndSendDoc(String collection, Replica leader, List<Replica> notLeaders, int docid) throws Exception {
// Bring back the leader which was stopped
log.info("Bringing back originally killed leader...");
@ -405,19 +228,6 @@ public class ForceLeaderTest extends HttpPartitionTest {
assertDocsExistInAllReplicas(Collections.singletonList(leader), collection, docid, docid);
}
private String getLIRState(ZkController zkController, String collection, String shard) throws KeeperException, InterruptedException {
StringBuilder sb = new StringBuilder();
String path = zkController.getLeaderInitiatedRecoveryZnodePath(collection, shard);
if (path == null)
return null;
try {
zkController.getZkClient().printLayout(path, 4, sb);
} catch (NoNodeException ex) {
return null;
}
return sb.toString();
}
@Override
protected int sendDoc(int docId) throws Exception {
SolrInputDocument doc = new SolrInputDocument();

View File

@ -129,8 +129,6 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
public void test() throws Exception {
waitForThingsToLevelOut(30000);
testLeaderInitiatedRecoveryCRUD();
testDoRecoveryOnRestart();
// test a 1x2 collection
@ -152,62 +150,6 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
log.info("HttpPartitionTest succeeded ... shutting down now!");
}
/**
* Tests handling of different format of lir nodes
*/
//TODO remove in SOLR-11812
protected void testLeaderInitiatedRecoveryCRUD() throws Exception {
String testCollectionName = "c8n_crud_1x2";
String shardId = "shard1";
createCollectionRetry(testCollectionName, "conf1", 1, 2, 1);
cloudClient.setDefaultCollection(testCollectionName);
Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, shardId);
JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
CoreContainer cores = leaderJetty.getCoreContainer();
ZkController zkController = cores.getZkController();
assertNotNull("ZkController is null", zkController);
Replica notLeader =
ensureAllReplicasAreActive(testCollectionName, shardId, 1, 2, maxWaitSecsToSeeAllActive).get(0);
ZkCoreNodeProps replicaCoreNodeProps = new ZkCoreNodeProps(notLeader);
String replicaUrl = replicaCoreNodeProps.getCoreUrl();
MockCoreDescriptor cd = new MockCoreDescriptor() {
public CloudDescriptor getCloudDescriptor() {
return new CloudDescriptor(leader.getStr(ZkStateReader.CORE_NAME_PROP), new Properties(), this) {
@Override
public String getCoreNodeName() {
return leader.getName();
}
@Override
public boolean isLeader() {
return true;
}
};
}
};
zkController.updateLeaderInitiatedRecoveryState(testCollectionName, shardId, notLeader.getName(), DOWN, cd, true);
Map<String,Object> lirStateMap = zkController.getLeaderInitiatedRecoveryStateObject(testCollectionName, shardId, notLeader.getName());
assertNotNull(lirStateMap);
assertSame(DOWN, Replica.State.getState((String) lirStateMap.get(ZkStateReader.STATE_PROP)));
// test old non-json format handling
SolrZkClient zkClient = zkController.getZkClient();
String znodePath = zkController.getLeaderInitiatedRecoveryZnodePath(testCollectionName, shardId, notLeader.getName());
zkClient.setData(znodePath, "down".getBytes(StandardCharsets.UTF_8), true);
lirStateMap = zkController.getLeaderInitiatedRecoveryStateObject(testCollectionName, shardId, notLeader.getName());
assertNotNull(lirStateMap);
assertSame(DOWN, Replica.State.getState((String) lirStateMap.get(ZkStateReader.STATE_PROP)));
zkClient.delete(znodePath, -1, false);
// try to clean up
attemptCollectionDelete(cloudClient, testCollectionName);
}
private void testDoRecoveryOnRestart() throws Exception {
String testCollectionName = "collDoRecoveryOnRestart";
try {

View File

@ -1,473 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.Reader;
import java.io.Writer;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.solr.JSONTestUtil;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.util.TimeOut;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LIRRollingUpdatesTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static Map<URI, SocketProxy> proxies;
private static Map<URI, JettySolrRunner> jettys;
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(3)
.addConfig("conf", configset("cloud-minimal"))
.configure();
// Add proxies
proxies = new HashMap<>(cluster.getJettySolrRunners().size());
jettys = new HashMap<>(cluster.getJettySolrRunners().size());
for (JettySolrRunner jetty:cluster.getJettySolrRunners()) {
SocketProxy proxy = new SocketProxy();
jetty.setProxyPort(proxy.getListenPort());
cluster.stopJettySolrRunner(jetty);//TODO: Can we avoid this restart
cluster.startJettySolrRunner(jetty);
proxy.open(jetty.getBaseUrl().toURI());
log.info("Adding proxy for URL: " + jetty.getBaseUrl() + ". Proxy: " + proxy.getUrl());
proxies.put(proxy.getUrl(), proxy);
jettys.put(proxy.getUrl(), jetty);
}
}
@AfterClass
public static void tearDownCluster() throws Exception {
for (SocketProxy proxy:proxies.values()) {
proxy.close();
}
proxies = null;
jettys = null;
}
@Test
// 12-Jun-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 21-May-2018
// commented 15-Sep-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // added 09-Aug-2018
public void testNewReplicaOldLeader() throws Exception {
String collection = "testNewReplicaOldLeader";
CollectionAdminRequest.createCollection(collection, 1, 2)
.setCreateNodeSet("")
.process(cluster.getSolrClient());
Properties oldLir = new Properties();
oldLir.setProperty("lirVersion", "old");
CollectionAdminRequest
.addReplicaToShard(collection, "shard1")
.setProperties(oldLir)
.setNode(cluster.getJettySolrRunner(0).getNodeName())
.process(cluster.getSolrClient());
CollectionAdminRequest
.addReplicaToShard(collection, "shard1")
.setProperties(oldLir)
.setNode(cluster.getJettySolrRunner(1).getNodeName())
.process(cluster.getSolrClient());
waitForState("Time waiting for 1x2 collection", collection, clusterShape(1, 2));
addDocs(collection, 2, 0);
Slice shard1 = getCollectionState(collection).getSlice("shard1");
//introduce network partition between leader & replica
Replica notLeader = shard1.getReplicas(x -> x != shard1.getLeader()).get(0);
assertTrue(runInOldLIRMode(collection, "shard1", notLeader));
getProxyForReplica(notLeader).close();
getProxyForReplica(shard1.getLeader()).close();
addDoc(collection, 2, getJettyForReplica(shard1.getLeader()));
waitForState("Replica " + notLeader.getName() + " is not put as DOWN", collection,
(liveNodes, collectionState) ->
collectionState.getSlice("shard1").getReplica(notLeader.getName()).getState() == Replica.State.DOWN);
getProxyForReplica(shard1.getLeader()).reopen();
getProxyForReplica(notLeader).reopen();
// make sure that, when new replica works with old leader, it still can recovery normally
waitForState("Timeout waiting for recovering", collection, clusterShape(1, 2));
assertDocsExistInAllReplicas(Collections.singletonList(notLeader), collection, 0, 2);
// make sure that, when new replica restart during LIR, it still can recovery normally (by looking at LIR node)
getProxyForReplica(notLeader).close();
getProxyForReplica(shard1.getLeader()).close();
addDoc(collection, 3, getJettyForReplica(shard1.getLeader()));
waitForState("Replica " + notLeader.getName() + " is not put as DOWN", collection,
(liveNodes, collectionState) ->
collectionState.getSlice("shard1").getReplica(notLeader.getName()).getState() == Replica.State.DOWN);
JettySolrRunner notLeaderJetty = getJettyForReplica(notLeader);
notLeaderJetty.stop();
waitForState("Node did not leave", collection, (liveNodes, collectionState) -> liveNodes.size() == 2);
upgrade(notLeaderJetty);
notLeaderJetty.start();
getProxyForReplica(shard1.getLeader()).reopen();
getProxyForReplica(notLeader).reopen();
waitForState("Timeout waiting for recovering", collection, clusterShape(1, 2));
assertFalse(runInOldLIRMode(collection, "shard1", notLeader));
assertDocsExistInAllReplicas(Collections.singletonList(notLeader), collection, 0, 3);
CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
}
@Test
// 12-Jun-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 04-May-2018
// commented 15-Sep-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // added 09-Aug-2018
public void testNewLeaderOldReplica() throws Exception {
// in case of new leader & old replica, new leader can still put old replica into LIR
String collection = "testNewLeaderOldReplica";
CollectionAdminRequest.createCollection(collection, 1, 2)
.setCreateNodeSet("")
.process(cluster.getSolrClient());
Properties oldLir = new Properties();
oldLir.setProperty("lirVersion", "old");
CollectionAdminRequest
.addReplicaToShard(collection, "shard1")
.setNode(cluster.getJettySolrRunner(0).getNodeName())
.process(cluster.getSolrClient());
waitForState("Timeout waiting for shard1 become active", collection, (liveNodes, collectionState) -> {
Slice shard1 = collectionState.getSlice("shard1");
if (shard1.getReplicas().size() == 1 && shard1.getLeader() != null) return true;
return false;
});
CollectionAdminRequest
.addReplicaToShard(collection, "shard1")
.setProperties(oldLir)
.setNode(cluster.getJettySolrRunner(1).getNodeName())
.process(cluster.getSolrClient());
waitForState("Time waiting for 1x2 collection", collection, clusterShape(1, 2));
Slice shard1 = getCollectionState(collection).getSlice("shard1");
Replica notLeader = shard1.getReplicas(x -> x != shard1.getLeader()).get(0);
Replica leader = shard1.getLeader();
assertTrue(runInOldLIRMode(collection, "shard1", notLeader));
assertFalse(runInOldLIRMode(collection, "shard1", leader));
addDocs(collection, 2, 0);
getProxyForReplica(notLeader).close();
getProxyForReplica(leader).close();
JettySolrRunner leaderJetty = getJettyForReplica(leader);
addDoc(collection, 2, leaderJetty);
waitForState("Replica " + notLeader.getName() + " is not put as DOWN", collection,
(liveNodes, collectionState) ->
collectionState.getSlice("shard1").getReplica(notLeader.getName()).getState() == Replica.State.DOWN);
// wait a little bit
Thread.sleep(500);
getProxyForReplica(notLeader).reopen();
getProxyForReplica(leader).reopen();
waitForState("Timeout waiting for recovering", collection, clusterShape(1, 2));
assertDocsExistInAllReplicas(Collections.singletonList(notLeader), collection, 0, 2);
// ensure that after recovery, the upgraded replica will clean its LIR status cause it is no longer needed
assertFalse(cluster.getSolrClient().getZkStateReader().getZkClient().exists(
ZkController.getLeaderInitiatedRecoveryZnodePath(collection, "shard1", notLeader.getName()), true));
// ensure that, leader should not register other replica's term
try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
assertFalse(zkShardTerms.getTerms().containsKey(notLeader.getName()));
}
CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
}
public void testLeaderAndMixedReplicas(boolean leaderInOldMode) throws Exception {
// in case of new leader and mixed old replica and new replica, new leader can still put all of them into recovery
// step1 : setup collection
String collection = "testMixedReplicas-"+leaderInOldMode;
CollectionAdminRequest.createCollection(collection, 1, 2)
.setCreateNodeSet("")
.process(cluster.getSolrClient());
Properties oldLir = new Properties();
oldLir.setProperty("lirVersion", "old");
if (leaderInOldMode) {
CollectionAdminRequest
.addReplicaToShard(collection, "shard1")
.setProperties(oldLir)
.setNode(cluster.getJettySolrRunner(0).getNodeName())
.process(cluster.getSolrClient());
} else {
CollectionAdminRequest
.addReplicaToShard(collection, "shard1")
.setNode(cluster.getJettySolrRunner(0).getNodeName())
.process(cluster.getSolrClient());
}
waitForState("Timeout waiting for shard1 become active", collection, clusterShape(1, 1));
CollectionAdminRequest
.addReplicaToShard(collection, "shard1")
.setProperties(oldLir)
.setNode(cluster.getJettySolrRunner(1).getNodeName())
.process(cluster.getSolrClient());
CollectionAdminRequest
.addReplicaToShard(collection, "shard1")
.setNode(cluster.getJettySolrRunner(2).getNodeName())
.process(cluster.getSolrClient());
waitForState("Timeout waiting for shard1 become active", collection, clusterShape(1, 3));
Slice shard1 = getCollectionState(collection).getSlice("shard1");
Replica replicaInOldMode = shard1.getReplicas(x -> x != shard1.getLeader()).get(0);
Replica replicaInNewMode = shard1.getReplicas(x -> x != shard1.getLeader()).get(1);
Replica leader = shard1.getLeader();
assertEquals(leaderInOldMode, runInOldLIRMode(collection, "shard1", leader));
if (!runInOldLIRMode(collection, "shard1", replicaInOldMode)) {
Replica temp = replicaInOldMode;
replicaInOldMode = replicaInNewMode;
replicaInNewMode = temp;
}
assertTrue(runInOldLIRMode(collection, "shard1", replicaInOldMode));
assertFalse(runInOldLIRMode(collection, "shard1", replicaInNewMode));
addDocs(collection, 2, 0);
// step2 : introduce network partition then add doc, replicas should be put into recovery
getProxyForReplica(replicaInOldMode).close();
getProxyForReplica(replicaInNewMode).close();
getProxyForReplica(leader).close();
JettySolrRunner leaderJetty = getJettyForReplica(leader);
addDoc(collection, 2, leaderJetty);
Replica finalReplicaInOldMode = replicaInOldMode;
waitForState("Replica " + replicaInOldMode.getName() + " is not put as DOWN", collection,
(liveNodes, collectionState) ->
collectionState.getSlice("shard1").getReplica(finalReplicaInOldMode.getName()).getState() == Replica.State.DOWN);
Replica finalReplicaInNewMode = replicaInNewMode;
waitForState("Replica " + finalReplicaInNewMode.getName() + " is not put as DOWN", collection,
(liveNodes, collectionState) ->
collectionState.getSlice("shard1").getReplica(finalReplicaInNewMode.getName()).getState() == Replica.State.DOWN);
// wait a little bit
Thread.sleep(500);
getProxyForReplica(replicaInOldMode).reopen();
getProxyForReplica(replicaInNewMode).reopen();
getProxyForReplica(leader).reopen();
waitForState("Timeout waiting for recovering", collection, clusterShape(1, 3));
assertDocsExistInAllReplicas(Arrays.asList(replicaInNewMode, replicaInOldMode), collection, 0, 2);
addDocs(collection, 3, 3);
// ensure that, leader should not register other replica's term
try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
assertFalse(zkShardTerms.getTerms().containsKey(replicaInOldMode.getName()));
}
// step3 : upgrade the replica running in old mode to the new mode
getProxyForReplica(leader).close();
getProxyForReplica(replicaInOldMode).close();
addDoc(collection, 6, leaderJetty);
JettySolrRunner oldJetty = getJettyForReplica(replicaInOldMode);
oldJetty.stop();
waitForState("Node did not leave", collection, (liveNodes, collectionState)
-> liveNodes.size() == 2);
upgrade(oldJetty);
oldJetty.start();
getProxyForReplica(leader).reopen();
getProxyForReplica(replicaInOldMode).reopen();
waitForState("Timeout waiting for recovering", collection, clusterShape(1, 3));
assertDocsExistInAllReplicas(Arrays.asList(replicaInNewMode, replicaInOldMode), collection, 0, 6);
CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
}
@Test
// 12-Jun-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 04-May-2018
// commented 15-Sep-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // added 09-Aug-2018
public void testNewLeaderAndMixedReplicas() throws Exception {
testLeaderAndMixedReplicas(false);
}
@Test
// 12-Jun-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 04-May-2018
public void testOldLeaderAndMixedReplicas() throws Exception {
testLeaderAndMixedReplicas(true);
}
private void upgrade(JettySolrRunner solrRunner) {
File[] corePaths = new File(solrRunner.getSolrHome()).listFiles();
for (File corePath : corePaths) {
File coreProperties = new File(corePath, "core.properties");
if (!coreProperties.exists()) continue;
Properties properties = new Properties();
try (Reader reader = new InputStreamReader(new FileInputStream(coreProperties), "UTF-8")) {
properties.load(reader);
} catch (Exception e) {
continue;
}
properties.remove("lirVersion");
try (Writer writer = new OutputStreamWriter(new FileOutputStream(coreProperties), "UTF-8")) {
properties.store(writer, "Upgraded");
} catch (Exception e) {
continue;
}
}
}
protected void assertDocsExistInAllReplicas(List<Replica> notLeaders,
String testCollectionName, int firstDocId, int lastDocId)
throws Exception {
Replica leader =
cluster.getSolrClient().getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 10000);
HttpSolrClient leaderSolr = getHttpSolrClient(leader, testCollectionName);
List<HttpSolrClient> replicas =
new ArrayList<HttpSolrClient>(notLeaders.size());
for (Replica r : notLeaders) {
replicas.add(getHttpSolrClient(r, testCollectionName));
}
try {
for (int d = firstDocId; d <= lastDocId; d++) {
String docId = String.valueOf(d);
assertDocExists(leaderSolr, testCollectionName, docId);
for (HttpSolrClient replicaSolr : replicas) {
assertDocExists(replicaSolr, testCollectionName, docId);
}
}
} finally {
if (leaderSolr != null) {
leaderSolr.close();
}
for (HttpSolrClient replicaSolr : replicas) {
replicaSolr.close();
}
}
}
protected void assertDocExists(HttpSolrClient solr, String coll, String docId) throws Exception {
NamedList rsp = realTimeGetDocId(solr, docId);
String match = JSONTestUtil.matchObj("/id", rsp.get("doc"), docId);
assertTrue("Doc with id=" + docId + " not found in " + solr.getBaseURL()
+ " due to: " + match + "; rsp="+rsp, match == null);
}
private NamedList realTimeGetDocId(HttpSolrClient solr, String docId) throws SolrServerException, IOException {
QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId, "distrib", "false"));
return solr.request(qr);
}
protected HttpSolrClient getHttpSolrClient(Replica replica, String coll) throws Exception {
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(replica);
String url = zkProps.getBaseUrl() + "/" + coll;
return getHttpSolrClient(url);
}
private <T> void waitFor(int waitTimeInSecs, T expected, Supplier<T> supplier) throws InterruptedException {
TimeOut timeOut = new TimeOut(waitTimeInSecs, TimeUnit.SECONDS, new TimeSource.CurrentTimeSource());
while (!timeOut.hasTimedOut()) {
if (expected == supplier.get()) return;
Thread.sleep(100);
}
assertEquals(expected, supplier.get());
}
private boolean runInOldLIRMode(String collection, String shard, Replica replica) {
try (ZkShardTerms shardTerms = new ZkShardTerms(collection, shard, cluster.getZkClient())) {
return !shardTerms.registered(replica.getName());
}
}
private void addDoc(String collection, int docId, JettySolrRunner solrRunner) throws IOException, SolrServerException {
try (HttpSolrClient solrClient = new HttpSolrClient.Builder(solrRunner.getBaseUrl().toString()).build()) {
solrClient.add(collection, new SolrInputDocument("id", String.valueOf(docId), "fieldName_s", String.valueOf(docId)));
}
}
private void addDocs(String collection, int numDocs, int startId) throws SolrServerException, IOException {
List<SolrInputDocument> docs = new ArrayList<>(numDocs);
for (int i = 0; i < numDocs; i++) {
int id = startId + i;
docs.add(new SolrInputDocument("id", String.valueOf(id), "fieldName_s", String.valueOf(id)));
}
cluster.getSolrClient().add(collection, docs);
cluster.getSolrClient().commit(collection);
}
protected JettySolrRunner getJettyForReplica(Replica replica) throws Exception {
String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
assertNotNull(replicaBaseUrl);
URL baseUrl = new URL(replicaBaseUrl);
JettySolrRunner proxy = jettys.get(baseUrl.toURI());
assertNotNull("No proxy found for " + baseUrl + "!", proxy);
return proxy;
}
protected SocketProxy getProxyForReplica(Replica replica) throws Exception {
String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
assertNotNull(replicaBaseUrl);
URL baseUrl = new URL(replicaBaseUrl);
SocketProxy proxy = proxies.get(baseUrl.toURI());
if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) {
baseUrl = new URL(baseUrl.toExternalForm() + "/");
proxy = proxies.get(baseUrl.toURI());
}
assertNotNull("No proxy found for " + baseUrl + "!", proxy);
return proxy;
}
}

View File

@ -1,242 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.util.MockCoreContainer.MockCoreDescriptor;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
/**
* Test for {@link LeaderInitiatedRecoveryThread}
*/
@Deprecated
@SolrTestCaseJ4.SuppressSSL
public class TestLeaderInitiatedRecoveryThread extends AbstractFullDistribZkTestBase {
public TestLeaderInitiatedRecoveryThread() {
sliceCount = 1;
fixShardCount(2);
}
@Test
//17-Aug-2018 commented @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 09-Apr-2018
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // added 20-Sep-2018
public void testPublishDownState() throws Exception {
waitForRecoveriesToFinish(true);
final String leaderCoreNodeName = shardToLeaderJetty.get(SHARD1).coreNodeName;
final CloudJettyRunner leaderRunner = shardToLeaderJetty.get(SHARD1);
final CoreContainer coreContainer1 = leaderRunner.jetty.getCoreContainer();
final ZkController zkController1 = coreContainer1.getZkController();
CloudJettyRunner notLeader = null;
for (CloudJettyRunner cloudJettyRunner : shardToJetty.get(SHARD1)) {
if (cloudJettyRunner != leaderRunner) {
notLeader = cloudJettyRunner;
break;
}
}
assertNotNull(notLeader);
Replica replica = cloudClient.getZkStateReader().getClusterState().getCollection(DEFAULT_COLLECTION).getReplica(notLeader.coreNodeName);
ZkCoreNodeProps replicaCoreNodeProps = new ZkCoreNodeProps(replica);
MockCoreDescriptor cd = new MockCoreDescriptor() {
public CloudDescriptor getCloudDescriptor() {
return new CloudDescriptor(shardToLeaderJetty.get(SHARD1).info.getStr(ZkStateReader.CORE_NAME_PROP), new Properties(), this) {
@Override
public String getCoreNodeName() {
return shardToLeaderJetty.get(SHARD1).info.getStr(ZkStateReader.CORE_NODE_NAME_PROP);
}
@Override
public boolean isLeader() {
return true;
}
};
}
};
/*
1. Test that publishDownState throws exception when zkController.isReplicaInRecoveryHandling == false
*/
SolrException e = expectThrows(SolrException.class,
"publishDownState should not have succeeded because replica url is not marked in leader initiated recovery in ZkController",
() -> {
LeaderInitiatedRecoveryThread thread = new LeaderInitiatedRecoveryThread(zkController1, coreContainer1,
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, cd);
assertFalse(zkController1.isReplicaInRecoveryHandling(replicaCoreNodeProps.getCoreUrl()));
thread.run();
});
assertEquals(e.code(), SolrException.ErrorCode.INVALID_STATE.code);
/*
2. Test that a non-live replica cannot be put into LIR or down state
*/
LeaderInitiatedRecoveryThread thread = new LeaderInitiatedRecoveryThread(zkController1, coreContainer1,
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, cd);
// kill the replica
int children = cloudClient.getZkStateReader().getZkClient().getChildren("/live_nodes", null, true).size();
ChaosMonkey.stop(notLeader.jetty);
TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeOut.hasTimedOut()) {
if (children > cloudClient.getZkStateReader().getZkClient().getChildren("/live_nodes", null, true).size()) {
break;
}
Thread.sleep(500);
}
assertTrue(children > cloudClient.getZkStateReader().getZkClient().getChildren("/live_nodes", null, true).size());
int cversion = getOverseerCversion();
// Thread should not publish LIR and down state for node which is not live, regardless of whether forcePublish is true or false
assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false));
// lets assert that we did not publish anything to overseer queue, simplest way is to assert that cversion of overseer queue zk node is still the same
assertEquals(cversion, getOverseerCversion());
assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), true));
// lets assert that we did not publish anything to overseer queue
assertEquals(cversion, getOverseerCversion());
/*
3. Test that if ZK connection loss then thread should not attempt to publish down state even if forcePublish=true
*/
ChaosMonkey.start(notLeader.jetty);
waitForRecoveriesToFinish(true);
thread = new LeaderInitiatedRecoveryThread(zkController1, coreContainer1,
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, cd) {
@Override
protected void updateLIRState(String replicaCoreNodeName) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", new KeeperException.ConnectionLossException());
}
};
assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false));
assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), true));
assertNull(zkController1.getLeaderInitiatedRecoveryState(DEFAULT_COLLECTION, SHARD1, replica.getName()));
/*
4. Test that if ZK connection loss or session expired then thread should not attempt to publish down state even if forcePublish=true
*/
thread = new LeaderInitiatedRecoveryThread(zkController1, coreContainer1,
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, cd) {
@Override
protected void updateLIRState(String replicaCoreNodeName) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", new KeeperException.SessionExpiredException());
}
};
assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false));
assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), true));
assertNull(zkController1.getLeaderInitiatedRecoveryState(DEFAULT_COLLECTION, SHARD1, replica.getName()));
/*
5. Test that any exception other then ZK connection loss or session expired should publish down state only if forcePublish=true
*/
thread = new LeaderInitiatedRecoveryThread(zkController1, coreContainer1,
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, cd) {
@Override
protected void updateLIRState(String replicaCoreNodeName) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "bogus exception");
}
};
// the following should return true because regardless of the bogus exception in setting LIR state, we still want recovery commands to be sent,
// however the following will not publish a down state
cversion = getOverseerCversion();
assertTrue(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false));
// lets assert that we did not publish anything to overseer queue, simplest way is to assert that cversion of overseer queue zk node is still the same
assertEquals(cversion, getOverseerCversion());
assertTrue(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), true));
// this should have published a down state so assert that cversion has incremented
assertTrue(getOverseerCversion() > cversion);
timeOut = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeOut.hasTimedOut()) {
Replica r = cloudClient.getZkStateReader().getClusterState().getCollection(DEFAULT_COLLECTION).getReplica(replica.getName());
if (r.getState() == Replica.State.DOWN) {
break;
}
Thread.sleep(500);
}
assertNull(zkController1.getLeaderInitiatedRecoveryState(DEFAULT_COLLECTION, SHARD1, replica.getName()));
assertEquals(Replica.State.DOWN, cloudClient.getZkStateReader().getClusterState().getCollection(DEFAULT_COLLECTION).getReplica(replica.getName()).getState());
/*
6. Test that non-leader cannot set LIR nodes
*/
final CoreContainer coreContainer2 = notLeader.jetty.getCoreContainer();
final ZkController zkController2 = coreContainer2.getZkController();
thread = new LeaderInitiatedRecoveryThread(zkController2, coreContainer2,
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, coreContainer2.getCores().iterator().next().getCoreDescriptor()) {
@Override
protected void updateLIRState(String replicaCoreNodeName) {
throw expectThrows(ZkController.NotLeaderException.class, () -> super.updateLIRState(replicaCoreNodeName));
}
};
cversion = getOverseerCversion();
assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false));
assertEquals(cversion, getOverseerCversion());
/*
7. assert that we can write a LIR state if everything else is fine
*/
// reset the zkcontroller to the one from the leader
final CoreContainer coreContainer3 = leaderRunner.jetty.getCoreContainer();
final ZkController zkController3 = coreContainer3.getZkController();
thread = new LeaderInitiatedRecoveryThread(zkController3, coreContainer3,
DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, coreContainer3.getCores().iterator().next().getCoreDescriptor());
thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false);
timeOut = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeOut.hasTimedOut()) {
Replica.State state = zkController3.getLeaderInitiatedRecoveryState(DEFAULT_COLLECTION, SHARD1, replica.getName());
if (state == Replica.State.DOWN) {
break;
}
Thread.sleep(500);
}
assertNotNull(zkController3.getLeaderInitiatedRecoveryStateObject(DEFAULT_COLLECTION, SHARD1, replica.getName()));
assertEquals(Replica.State.DOWN, zkController3.getLeaderInitiatedRecoveryState(DEFAULT_COLLECTION, SHARD1, replica.getName()));
/*
7. Test that
*/
}
protected int getOverseerCversion() throws KeeperException, InterruptedException {
Stat stat = new Stat();
cloudClient.getZkStateReader().getZkClient().getData("/overseer/queue", null, stat, true);
return stat.getCversion();
}
}

View File

@ -84,7 +84,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
zkShardTerms.registerTerm("replica1");
zkShardTerms.registerTerm("replica2");
// normal case when leader start lir process
// normal case when leader failed to send an update to replica
zkShardTerms.ensureTermsIsHigher("replica1", Collections.singleton("replica2"));
zkShardTerms.startRecovering("replica2");
assertEquals(zkShardTerms.getTerm("replica2"), 1);
@ -95,7 +95,6 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
assertEquals(zkShardTerms.getTerm("replica2"), 1);
assertEquals(zkShardTerms.getTerm("replica2_recovering"), -1);
// stack of lir processes
zkShardTerms.ensureTermsIsHigher("replica1", Collections.singleton("replica2"));
assertEquals(zkShardTerms.getTerm("replica1"), 2);
assertEquals(zkShardTerms.getTerm("replica2"), 1);