mirror of https://github.com/apache/lucene.git
SOLR-12087: Deleting replicas sometimes fails and causes the replicas to exist in the down state
This commit is contained in:
parent
2c4b78c43f
commit
92f1cdebfa
|
@ -68,6 +68,9 @@ Bug Fixes
|
|||
* SOLR-12108: Fixed the fallback behavior of [raw] and [xml] transformers when an incompatble 'wt' was
|
||||
specified, the field value was lost if documentCache was not used. (hossman)
|
||||
|
||||
* SOLR-12087: Deleting replicas sometimes fails and causes the replicas to exist in the down
|
||||
state (Cao Manh Dat)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
|||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
|
@ -94,6 +95,17 @@ public class CloudUtil {
|
|||
}
|
||||
}
|
||||
|
||||
public static boolean replicaExists(ClusterState clusterState, String collection, String shard, String coreNodeName) {
|
||||
DocCollection docCollection = clusterState.getCollectionOrNull(collection);
|
||||
if (docCollection != null) {
|
||||
Slice slice = docCollection.getSlice(shard);
|
||||
if (slice != null) {
|
||||
return slice.getReplica(coreNodeName) != null;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a displayable unified path to the given resource. For non-solrCloud that will be the
|
||||
* same as getConfigDir, but for Cloud it will be getConfigSetZkPath ending in a /
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
|
|||
import java.lang.invoke.MethodHandles;
|
||||
import java.net.ConnectException;
|
||||
import java.net.SocketException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
|
@ -88,7 +89,9 @@ public class LeaderInitiatedRecoveryThread extends Thread {
|
|||
if (!zkController.isReplicaInRecoveryHandling(replicaUrl)) {
|
||||
throw new SolrException(ErrorCode.INVALID_STATE, "Replica: " + replicaUrl + " should have been marked under leader initiated recovery in ZkController but wasn't.");
|
||||
}
|
||||
|
||||
if (!CloudUtil.replicaExists(zkController.getClusterState(), collection, shardId, replicaCoreNodeName)) {
|
||||
log.info("Replica does not exist, skip doing LIR");
|
||||
}
|
||||
boolean sendRecoveryCommand = publishDownState(replicaCoreName, replicaCoreNodeName, replicaNodeName, replicaUrl, false);
|
||||
|
||||
if (sendRecoveryCommand) {
|
||||
|
@ -152,9 +155,11 @@ public class LeaderInitiatedRecoveryThread extends Thread {
|
|||
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
|
||||
ZkStateReader.BASE_URL_PROP, nodeProps.getBaseUrl(),
|
||||
ZkStateReader.CORE_NAME_PROP, nodeProps.getCoreName(),
|
||||
ZkStateReader.CORE_NODE_NAME_PROP, replicaCoreNodeName,
|
||||
ZkStateReader.NODE_NAME_PROP, nodeProps.getNodeName(),
|
||||
ZkStateReader.SHARD_ID_PROP, shardId,
|
||||
ZkStateReader.COLLECTION_PROP, collection);
|
||||
ZkStateReader.COLLECTION_PROP, collection,
|
||||
ZkStateReader.FORCE_SET_STATE_PROP, "false");
|
||||
log.warn("Leader is publishing core={} coreNodeName ={} state={} on behalf of un-reachable replica {}",
|
||||
replicaCoreName, replicaCoreNodeName, Replica.State.DOWN.toString(), replicaUrl);
|
||||
zkController.getOverseerJobQueue().offer(Utils.toJSON(m));
|
||||
|
@ -166,6 +171,12 @@ public class LeaderInitiatedRecoveryThread extends Thread {
|
|||
return sendRecoveryCommand;
|
||||
}
|
||||
|
||||
private void removeLIRState(String replicaCoreNodeName) {
|
||||
zkController.updateLeaderInitiatedRecoveryState(collection,
|
||||
shardId,
|
||||
replicaCoreNodeName, Replica.State.ACTIVE, leaderCd, true);
|
||||
}
|
||||
|
||||
/*
|
||||
protected scope for testing purposes
|
||||
*/
|
||||
|
@ -219,13 +230,20 @@ public class LeaderInitiatedRecoveryThread extends Thread {
|
|||
(rootCause instanceof ConnectException ||
|
||||
rootCause instanceof ConnectTimeoutException ||
|
||||
rootCause instanceof NoHttpResponseException ||
|
||||
rootCause instanceof SocketException);
|
||||
rootCause instanceof SocketException ||
|
||||
rootCause instanceof UnknownHostException);
|
||||
|
||||
SolrException.log(log, recoveryUrl + ": Could not tell a replica to recover", t);
|
||||
|
||||
if (!wasCommError) {
|
||||
continueTrying = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (CloudUtil.replicaExists(zkController.getClusterState(), collection, shardId, replicaCoreNodeName)) {
|
||||
SolrException.log(log, recoveryUrl + ": Could not tell a replica to recover, wasCommError:"+wasCommError, t);
|
||||
} else {
|
||||
log.info("Replica {} is removed, hence remove its lir state", replicaCoreNodeName);
|
||||
removeLIRState(replicaCoreNodeName);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.commons.lang.StringUtils;
|
|||
import org.apache.solr.client.solrj.cloud.DistribStateManager;
|
||||
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
|
||||
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
|
||||
import org.apache.solr.cloud.CloudUtil;
|
||||
import org.apache.solr.cloud.Overseer;
|
||||
import org.apache.solr.cloud.api.collections.Assign;
|
||||
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
|
||||
|
@ -233,15 +234,25 @@ public class ReplicaMutator {
|
|||
|
||||
private ZkWriteCommand updateState(final ClusterState prevState, ZkNodeProps message, String collectionName, Integer numShards, boolean collectionExists) {
|
||||
String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
||||
|
||||
String coreNodeName = message.getStr(ZkStateReader.CORE_NODE_NAME_PROP);
|
||||
boolean forceSetState = message.getBool(ZkStateReader.FORCE_SET_STATE_PROP, true);
|
||||
|
||||
DocCollection collection = prevState.getCollectionOrNull(collectionName);
|
||||
if (!forceSetState && !CloudUtil.replicaExists(prevState, collectionName, sliceName, coreNodeName)) {
|
||||
log.info("Failed to update state because the replica does not exist, {}", message);
|
||||
return ZkStateWriter.NO_OP;
|
||||
}
|
||||
|
||||
if (coreNodeName == null) {
|
||||
coreNodeName = ClusterStateMutator.getAssignedCoreNodeName(collection,
|
||||
message.getStr(ZkStateReader.NODE_NAME_PROP), message.getStr(ZkStateReader.CORE_NAME_PROP));
|
||||
if (coreNodeName != null) {
|
||||
log.debug("node=" + coreNodeName + " is already registered");
|
||||
} else {
|
||||
if (!forceSetState) {
|
||||
log.info("Failed to update state because the replica does not exist, {}", message);
|
||||
return ZkStateWriter.NO_OP;
|
||||
}
|
||||
// if coreNodeName is null, auto assign one
|
||||
coreNodeName = Assign.assignCoreNodeName(stateManager, collection);
|
||||
}
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
package org.apache.solr.util;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.apache.solr.common.util.TimeSource;
|
||||
|
||||
|
@ -48,4 +50,12 @@ public class TimeOut {
|
|||
public long timeElapsed(TimeUnit unit) {
|
||||
return unit.convert(timeSource.getTimeNs() - startTime, NANOSECONDS);
|
||||
}
|
||||
|
||||
public void waitFor(String messageOnTimeOut, Supplier<Boolean> supplier)
|
||||
throws InterruptedException, TimeoutException {
|
||||
while (!supplier.get() && hasTimedOut()) {
|
||||
Thread.sleep(500);
|
||||
}
|
||||
if (hasTimedOut()) throw new TimeoutException(messageOnTimeOut);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,8 +21,11 @@ import java.nio.file.Files;
|
|||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
|
@ -30,6 +33,7 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
|||
import org.apache.solr.client.solrj.request.CoreStatus;
|
||||
import org.apache.solr.cloud.overseer.OverseerAction;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
|
@ -39,6 +43,7 @@ import org.apache.solr.common.util.TimeSource;
|
|||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.core.ZkContainer;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -283,5 +288,57 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
|
|||
if (timeOut.hasTimedOut()) fail("Wait for " + lostNodeName + " to leave failed!");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void deleteReplicaOnIndexing() throws Exception {
|
||||
final String collectionName = "deleteReplicaOnIndexing";
|
||||
CollectionAdminRequest.createCollection(collectionName, "conf", 1, 2)
|
||||
.process(cluster.getSolrClient());
|
||||
waitForState("", collectionName, clusterShape(1, 2));
|
||||
AtomicBoolean closed = new AtomicBoolean(false);
|
||||
Thread[] threads = new Thread[100];
|
||||
for (int i = 0; i < threads.length; i++) {
|
||||
int finalI = i;
|
||||
threads[i] = new Thread(() -> {
|
||||
int doc = finalI * 10000;
|
||||
while (!closed.get()) {
|
||||
try {
|
||||
cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", String.valueOf(doc++)));
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed on adding document to {}", collectionName, e);
|
||||
}
|
||||
}
|
||||
});
|
||||
threads[i].start();
|
||||
}
|
||||
|
||||
Slice shard1 = getCollectionState(collectionName).getSlice("shard1");
|
||||
Replica nonLeader = shard1.getReplicas(rep -> !rep.getName().equals(shard1.getLeader().getName())).get(0);
|
||||
CollectionAdminRequest.deleteReplica(collectionName, "shard1", nonLeader.getName()).process(cluster.getSolrClient());
|
||||
closed.set(true);
|
||||
for (int i = 0; i < threads.length; i++) {
|
||||
threads[i].join();
|
||||
}
|
||||
|
||||
try {
|
||||
cluster.getSolrClient().waitForState(collectionName, 20, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState.getReplicas().size() == 1);
|
||||
} catch (TimeoutException e) {
|
||||
LOG.info("Timeout wait for state {}", getCollectionState(collectionName));
|
||||
throw e;
|
||||
}
|
||||
|
||||
TimeOut timeOut = new TimeOut(20, TimeUnit.SECONDS, TimeSource.NANO_TIME);
|
||||
timeOut.waitFor("Time out waiting for LIR state get removed", () -> {
|
||||
String lirPath = ZkController.getLeaderInitiatedRecoveryZnodePath(collectionName, "shard1");
|
||||
try {
|
||||
List<String> children = zkClient().getChildren(lirPath, null, true);
|
||||
return children.size() == 0;
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -78,6 +78,8 @@ public class ZkStateReader implements Closeable {
|
|||
public static final String CORE_NODE_NAME_PROP = "core_node_name";
|
||||
public static final String ROLES_PROP = "roles";
|
||||
public static final String STATE_PROP = "state";
|
||||
// if this flag equals to false and the replica does not exist in cluster state, set state op become no op (default is true)
|
||||
public static final String FORCE_SET_STATE_PROP = "force_set_state";
|
||||
/** SolrCore name. */
|
||||
public static final String CORE_NAME_PROP = "core";
|
||||
public static final String COLLECTION_PROP = "collection";
|
||||
|
|
Loading…
Reference in New Issue