mirror of https://github.com/apache/lucene.git
SOLR-9438: Shard split can be marked successful and sub-shard states switched to 'active' even when one or more sub-shards replicas do not recover due to the leader crashing or restarting between the time the replicas are created and before they can recover
This commit is contained in:
parent
1a78ab621f
commit
f177a660f5
|
@ -122,6 +122,10 @@ Bug Fixes
|
||||||
* SOLR-9490: Fixed bugs in BoolField that caused it to erroneously return "false" for all docs depending
|
* SOLR-9490: Fixed bugs in BoolField that caused it to erroneously return "false" for all docs depending
|
||||||
on usage (Colvin Cowie, Dan Fox, hossman)
|
on usage (Colvin Cowie, Dan Fox, hossman)
|
||||||
|
|
||||||
|
* SOLR-9438: Shard split can be marked successful and sub-shard states switched to 'active' even when
|
||||||
|
one or more sub-shards replicas do not recover due to the leader crashing or restarting between the time
|
||||||
|
the replicas are created and before they can recover. This can cause data loss. (shalin)
|
||||||
|
|
||||||
Optimizations
|
Optimizations
|
||||||
----------------------
|
----------------------
|
||||||
|
|
||||||
|
|
|
@ -79,7 +79,7 @@ public class DeleteShardCmd implements Cmd {
|
||||||
// TODO: Add check for range gaps on Slice deletion
|
// TODO: Add check for range gaps on Slice deletion
|
||||||
final Slice.State state = slice.getState();
|
final Slice.State state = slice.getState();
|
||||||
if (!(slice.getRange() == null || state == Slice.State.INACTIVE || state == Slice.State.RECOVERY
|
if (!(slice.getRange() == null || state == Slice.State.INACTIVE || state == Slice.State.RECOVERY
|
||||||
|| state == Slice.State.CONSTRUCTION)) {
|
|| state == Slice.State.CONSTRUCTION) || state == Slice.State.RECOVERY_FAILED) {
|
||||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The slice: " + slice.getName() + " is currently " + state
|
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The slice: " + slice.getName() + " is currently " + state
|
||||||
+ ". Only non-active (or custom-hashed) slices can be deleted.");
|
+ ". Only non-active (or custom-hashed) slices can be deleted.");
|
||||||
}
|
}
|
||||||
|
|
|
@ -582,7 +582,7 @@ public class RecoveryStrategy extends Thread implements Closeable {
|
||||||
prepCmd.setCheckLive(true);
|
prepCmd.setCheckLive(true);
|
||||||
prepCmd.setOnlyIfLeader(true);
|
prepCmd.setOnlyIfLeader(true);
|
||||||
final Slice.State state = slice.getState();
|
final Slice.State state = slice.getState();
|
||||||
if (state != Slice.State.CONSTRUCTION && state != Slice.State.RECOVERY) {
|
if (state != Slice.State.CONSTRUCTION && state != Slice.State.RECOVERY && state != Slice.State.RECOVERY_FAILED) {
|
||||||
prepCmd.setOnlyIfLeaderActive(true);
|
prepCmd.setOnlyIfLeaderActive(true);
|
||||||
}
|
}
|
||||||
HttpUriRequestResponse mrr = client.httpUriRequest(prepCmd);
|
HttpUriRequestResponse mrr = client.httpUriRequest(prepCmd);
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.solr.common.util.NamedList;
|
||||||
import org.apache.solr.common.util.Utils;
|
import org.apache.solr.common.util.Utils;
|
||||||
import org.apache.solr.handler.component.ShardHandler;
|
import org.apache.solr.handler.component.ShardHandler;
|
||||||
import org.apache.solr.util.TestInjection;
|
import org.apache.solr.util.TestInjection;
|
||||||
|
import org.apache.zookeeper.data.Stat;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -126,6 +127,13 @@ public class SplitShardCmd implements Cmd {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// let's record the ephemeralOwner of the parent leader node
|
||||||
|
Stat leaderZnodeStat = zkStateReader.getZkClient().exists(ZkStateReader.LIVE_NODES_ZKNODE + "/" + parentShardLeader.getNodeName(), null, true);
|
||||||
|
if (leaderZnodeStat == null) {
|
||||||
|
// we just got to know the leader but its live node is gone already!
|
||||||
|
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "The shard leader node: " + parentShardLeader.getNodeName() + " is not live anymore!");
|
||||||
|
}
|
||||||
|
|
||||||
DocRouter.Range range = parentSlice.getRange();
|
DocRouter.Range range = parentSlice.getRange();
|
||||||
if (range == null) {
|
if (range == null) {
|
||||||
range = new PlainIdRouter().fullRange();
|
range = new PlainIdRouter().fullRange();
|
||||||
|
@ -253,6 +261,8 @@ public class SplitShardCmd implements Cmd {
|
||||||
propMap.put(ZkStateReader.SHARD_RANGE_PROP, subRange.toString());
|
propMap.put(ZkStateReader.SHARD_RANGE_PROP, subRange.toString());
|
||||||
propMap.put(ZkStateReader.SHARD_STATE_PROP, Slice.State.CONSTRUCTION.toString());
|
propMap.put(ZkStateReader.SHARD_STATE_PROP, Slice.State.CONSTRUCTION.toString());
|
||||||
propMap.put(ZkStateReader.SHARD_PARENT_PROP, parentSlice.getName());
|
propMap.put(ZkStateReader.SHARD_PARENT_PROP, parentSlice.getName());
|
||||||
|
propMap.put("shard_parent_node", parentShardLeader.getNodeName());
|
||||||
|
propMap.put("shard_parent_zk_session", leaderZnodeStat.getEphemeralOwner());
|
||||||
DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
|
DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
|
||||||
inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
|
inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
|
||||||
|
|
||||||
|
@ -420,6 +430,32 @@ public class SplitShardCmd implements Cmd {
|
||||||
|
|
||||||
assert TestInjection.injectSplitFailureBeforeReplicaCreation();
|
assert TestInjection.injectSplitFailureBeforeReplicaCreation();
|
||||||
|
|
||||||
|
long ephemeralOwner = leaderZnodeStat.getEphemeralOwner();
|
||||||
|
// compare against the ephemeralOwner of the parent leader node
|
||||||
|
leaderZnodeStat = zkStateReader.getZkClient().exists(ZkStateReader.LIVE_NODES_ZKNODE + "/" + parentShardLeader.getNodeName(), null, true);
|
||||||
|
if (leaderZnodeStat == null || ephemeralOwner != leaderZnodeStat.getEphemeralOwner()) {
|
||||||
|
// put sub-shards in recovery_failed state
|
||||||
|
DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
|
||||||
|
Map<String, Object> propMap = new HashMap<>();
|
||||||
|
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
|
||||||
|
for (String subSlice : subSlices) {
|
||||||
|
propMap.put(subSlice, Slice.State.RECOVERY_FAILED.toString());
|
||||||
|
}
|
||||||
|
propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
|
||||||
|
ZkNodeProps m = new ZkNodeProps(propMap);
|
||||||
|
inQueue.offer(Utils.toJSON(m));
|
||||||
|
|
||||||
|
if (leaderZnodeStat == null) {
|
||||||
|
// the leader is not live anymore, fail the split!
|
||||||
|
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "The shard leader node: " + parentShardLeader.getNodeName() + " is not live anymore!");
|
||||||
|
} else if (ephemeralOwner != leaderZnodeStat.getEphemeralOwner()) {
|
||||||
|
// there's a new leader, fail the split!
|
||||||
|
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
|
||||||
|
"The zk session id for the shard leader node: " + parentShardLeader.getNodeName() + " has changed from "
|
||||||
|
+ ephemeralOwner + " to " + leaderZnodeStat.getEphemeralOwner() + ". This can cause data loss so we must abort the split");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// we must set the slice state into recovery before actually creating the replica cores
|
// we must set the slice state into recovery before actually creating the replica cores
|
||||||
// this ensures that the logic inside Overseer to update sub-shard state to 'active'
|
// this ensures that the logic inside Overseer to update sub-shard state to 'active'
|
||||||
// always gets a chance to execute. See SOLR-7673
|
// always gets a chance to execute. See SOLR-7673
|
||||||
|
|
|
@ -59,11 +59,19 @@ public class CollectionMutator {
|
||||||
String shardRange = message.getStr(ZkStateReader.SHARD_RANGE_PROP);
|
String shardRange = message.getStr(ZkStateReader.SHARD_RANGE_PROP);
|
||||||
String shardState = message.getStr(ZkStateReader.SHARD_STATE_PROP);
|
String shardState = message.getStr(ZkStateReader.SHARD_STATE_PROP);
|
||||||
String shardParent = message.getStr(ZkStateReader.SHARD_PARENT_PROP);
|
String shardParent = message.getStr(ZkStateReader.SHARD_PARENT_PROP);
|
||||||
|
String shardParentZkSession = message.getStr("shard_parent_zk_session");
|
||||||
|
String shardParentNode = message.getStr("shard_parent_node");
|
||||||
sliceProps.put(Slice.RANGE, shardRange);
|
sliceProps.put(Slice.RANGE, shardRange);
|
||||||
sliceProps.put(ZkStateReader.STATE_PROP, shardState);
|
sliceProps.put(ZkStateReader.STATE_PROP, shardState);
|
||||||
if (shardParent != null) {
|
if (shardParent != null) {
|
||||||
sliceProps.put(Slice.PARENT, shardParent);
|
sliceProps.put(Slice.PARENT, shardParent);
|
||||||
}
|
}
|
||||||
|
if (shardParentZkSession != null) {
|
||||||
|
sliceProps.put("shard_parent_zk_session", shardParentZkSession);
|
||||||
|
}
|
||||||
|
if (shardParentNode != null) {
|
||||||
|
sliceProps.put("shard_parent_node", shardParentNode);
|
||||||
|
}
|
||||||
collection = updateSlice(collectionName, collection, new Slice(shardId, replicas, sliceProps));
|
collection = updateSlice(collectionName, collection, new Slice(shardId, replicas, sliceProps));
|
||||||
return new ZkWriteCommand(collectionName, collection);
|
return new ZkWriteCommand(collectionName, collection);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.solr.common.cloud.Slice;
|
||||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
import org.apache.solr.common.util.Utils;
|
import org.apache.solr.common.util.Utils;
|
||||||
|
import org.apache.zookeeper.data.Stat;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -403,9 +404,35 @@ public class ReplicaMutator {
|
||||||
}
|
}
|
||||||
if (allActive) {
|
if (allActive) {
|
||||||
// hurray, all sub shard replicas are active
|
// hurray, all sub shard replicas are active
|
||||||
log.info("Shard: {} - All replicas across all fellow sub-shards are now ACTIVE. Preparing to switch shard states.", sliceName);
|
log.info("Shard: {} - All replicas across all fellow sub-shards are now ACTIVE.", sliceName);
|
||||||
String parentSliceName = (String) sliceProps.remove(Slice.PARENT);
|
String parentSliceName = (String) sliceProps.remove(Slice.PARENT);
|
||||||
|
// now lets see if the parent leader is still the same or else there's a chance of data loss
|
||||||
|
// see SOLR-9438 for details
|
||||||
|
String shardParentZkSession = (String) sliceProps.remove("shard_parent_zk_session");
|
||||||
|
String shardParentNode = (String) sliceProps.remove("shard_parent_node");
|
||||||
|
boolean isLeaderSame = true;
|
||||||
|
if (shardParentNode != null && shardParentZkSession != null) {
|
||||||
|
log.info("Checking whether sub-shard leader node is still the same one at {} with ZK session id {}", shardParentNode, shardParentZkSession);
|
||||||
|
try {
|
||||||
|
Stat leaderZnodeStat = zkStateReader.getZkClient().exists(ZkStateReader.LIVE_NODES_ZKNODE
|
||||||
|
+ "/" + shardParentNode, null, true);
|
||||||
|
if (leaderZnodeStat == null) {
|
||||||
|
log.error("The shard leader node: {} is not live anymore!", shardParentNode);
|
||||||
|
isLeaderSame = false;
|
||||||
|
} else if (leaderZnodeStat.getEphemeralOwner() != Long.parseLong(shardParentZkSession)) {
|
||||||
|
log.error("The zk session id for shard leader node: {} has changed from {} to {}",
|
||||||
|
shardParentNode, shardParentZkSession, leaderZnodeStat.getEphemeralOwner());
|
||||||
|
isLeaderSame = false;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("Error occurred while checking if parent shard node is still live with the same zk session id. " +
|
||||||
|
"We cannot switch shard states at this time.", e);
|
||||||
|
return collection; // we aren't going to make any changes right now
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isLeaderSame) {
|
||||||
|
log.info("Sub-shard leader node is still the same one at {} with ZK session id {}. Preparing to switch shard states.", shardParentNode, shardParentZkSession);
|
||||||
Map<String, Object> propMap = new HashMap<>();
|
Map<String, Object> propMap = new HashMap<>();
|
||||||
propMap.put(Overseer.QUEUE_OPERATION, "updateshardstate");
|
propMap.put(Overseer.QUEUE_OPERATION, "updateshardstate");
|
||||||
propMap.put(parentSliceName, Slice.State.INACTIVE.toString());
|
propMap.put(parentSliceName, Slice.State.INACTIVE.toString());
|
||||||
|
@ -416,6 +443,18 @@ public class ReplicaMutator {
|
||||||
propMap.put(ZkStateReader.COLLECTION_PROP, collection.getName());
|
propMap.put(ZkStateReader.COLLECTION_PROP, collection.getName());
|
||||||
ZkNodeProps m = new ZkNodeProps(propMap);
|
ZkNodeProps m = new ZkNodeProps(propMap);
|
||||||
return new SliceMutator(zkStateReader).updateShardState(prevState, m).collection;
|
return new SliceMutator(zkStateReader).updateShardState(prevState, m).collection;
|
||||||
|
} else {
|
||||||
|
// we must mark the shard split as failed by switching sub-shards to recovery_failed state
|
||||||
|
Map<String, Object> propMap = new HashMap<>();
|
||||||
|
propMap.put(Overseer.QUEUE_OPERATION, "updateshardstate");
|
||||||
|
propMap.put(sliceName, Slice.State.RECOVERY_FAILED.toString());
|
||||||
|
for (Slice subShardSlice : subShardSlices) {
|
||||||
|
propMap.put(subShardSlice.getName(), Slice.State.RECOVERY_FAILED.toString());
|
||||||
|
}
|
||||||
|
propMap.put(ZkStateReader.COLLECTION_PROP, collection.getName());
|
||||||
|
ZkNodeProps m = new ZkNodeProps(propMap);
|
||||||
|
return new SliceMutator(zkStateReader).updateShardState(prevState, m).collection;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -164,9 +164,10 @@ public class SliceMutator {
|
||||||
log.info("Update shard state " + key + " to " + message.getStr(key));
|
log.info("Update shard state " + key + " to " + message.getStr(key));
|
||||||
Map<String, Object> props = slice.shallowCopy();
|
Map<String, Object> props = slice.shallowCopy();
|
||||||
|
|
||||||
if (Slice.State.getState((String) props.get(ZkStateReader.STATE_PROP)) == Slice.State.RECOVERY
|
if (Slice.State.getState(message.getStr(key)) == Slice.State.ACTIVE) {
|
||||||
&& Slice.State.getState(message.getStr(key)) == Slice.State.ACTIVE) {
|
|
||||||
props.remove(Slice.PARENT);
|
props.remove(Slice.PARENT);
|
||||||
|
props.remove("shard_parent_node");
|
||||||
|
props.remove("shard_parent_zk_session");
|
||||||
}
|
}
|
||||||
props.put(ZkStateReader.STATE_PROP, message.getStr(key));
|
props.put(ZkStateReader.STATE_PROP, message.getStr(key));
|
||||||
Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props);
|
Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props);
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||||
import org.apache.solr.client.solrj.SolrClient;
|
import org.apache.solr.client.solrj.SolrClient;
|
||||||
|
@ -37,8 +38,10 @@ import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
|
import org.apache.solr.client.solrj.request.CoreAdminRequest;
|
||||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||||
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
|
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
|
||||||
|
import org.apache.solr.client.solrj.response.CoreAdminResponse;
|
||||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||||
import org.apache.solr.client.solrj.response.RequestStatusState;
|
import org.apache.solr.client.solrj.response.RequestStatusState;
|
||||||
import org.apache.solr.common.SolrDocument;
|
import org.apache.solr.common.SolrDocument;
|
||||||
|
@ -295,6 +298,218 @@ public class ShardSplitTest extends BasicDistributedZkTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSplitWithChaosMonkey() throws Exception {
|
||||||
|
waitForThingsToLevelOut(15);
|
||||||
|
|
||||||
|
List<StoppableIndexingThread> indexers = new ArrayList<>();
|
||||||
|
try {
|
||||||
|
for (int i = 0; i < 1; i++) {
|
||||||
|
StoppableIndexingThread thread = new StoppableIndexingThread(controlClient, cloudClient, String.valueOf(i), true);
|
||||||
|
indexers.add(thread);
|
||||||
|
thread.start();
|
||||||
|
}
|
||||||
|
Thread.sleep(1000); // give the indexers some time to do their work
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Error in test", e);
|
||||||
|
} finally {
|
||||||
|
for (StoppableIndexingThread indexer : indexers) {
|
||||||
|
indexer.safeStop();
|
||||||
|
indexer.join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cloudClient.commit();
|
||||||
|
controlClient.commit();
|
||||||
|
|
||||||
|
AtomicBoolean stop = new AtomicBoolean();
|
||||||
|
AtomicBoolean killed = new AtomicBoolean(false);
|
||||||
|
Runnable monkey = new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
|
||||||
|
zkStateReader.registerCollectionStateWatcher(AbstractDistribZkTestBase.DEFAULT_COLLECTION, new CollectionStateWatcher() {
|
||||||
|
@Override
|
||||||
|
public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
|
||||||
|
if (stop.get()) {
|
||||||
|
return true; // abort and remove the watch
|
||||||
|
}
|
||||||
|
Slice slice = collectionState.getSlice(SHARD1_0);
|
||||||
|
if (slice != null && slice.getReplicas().size() > 1) {
|
||||||
|
// ensure that only one watcher invocation thread can kill!
|
||||||
|
if (killed.compareAndSet(false, true)) {
|
||||||
|
log.info("Monkey thread found 2 replicas for {} {}", AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1);
|
||||||
|
CloudJettyRunner cjetty = shardToLeaderJetty.get(SHARD1);
|
||||||
|
try {
|
||||||
|
Thread.sleep(1000 + random().nextInt(500));
|
||||||
|
ChaosMonkey.kill(cjetty);
|
||||||
|
stop.set(true);
|
||||||
|
return true;
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Monkey unable to kill jetty at port " + cjetty.jetty.getLocalPort(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.info("Monkey thread found only one replica for {} {}", AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Thread monkeyThread = null;
|
||||||
|
/*
|
||||||
|
somehow the cluster state object inside this zk state reader has static copy of the collection which is never updated
|
||||||
|
so any call to waitForRecoveriesToFinish just keeps looping until timeout.
|
||||||
|
We workaround by explicitly registering the collection as an interesting one so that it is watched by ZkStateReader
|
||||||
|
see SOLR-9440. Todo remove this hack after SOLR-9440 is fixed.
|
||||||
|
*/
|
||||||
|
cloudClient.getZkStateReader().registerCore(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
|
||||||
|
|
||||||
|
monkeyThread = new Thread(monkey);
|
||||||
|
monkeyThread.start();
|
||||||
|
try {
|
||||||
|
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
|
||||||
|
splitShard.setShardName(SHARD1);
|
||||||
|
String asyncId = splitShard.processAsync(cloudClient);
|
||||||
|
RequestStatusState splitStatus = null;
|
||||||
|
try {
|
||||||
|
splitStatus = CollectionAdminRequest.requestStatus(asyncId).waitFor(cloudClient, 120);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("Failed to get request status, maybe because the overseer node was shutdown by monkey", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// we don't care if the split failed because we are injecting faults and it is likely
|
||||||
|
// that the split has failed but in any case we want to assert that all docs that got
|
||||||
|
// indexed are available in SolrCloud and if the split succeeded then all replicas of the sub-shard
|
||||||
|
// must be consistent (i.e. have same numdocs)
|
||||||
|
|
||||||
|
log.info("Shard split request state is COMPLETED");
|
||||||
|
stop.set(true);
|
||||||
|
monkeyThread.join();
|
||||||
|
Set<String> addFails = new HashSet<>();
|
||||||
|
Set<String> deleteFails = new HashSet<>();
|
||||||
|
for (StoppableIndexingThread indexer : indexers) {
|
||||||
|
addFails.addAll(indexer.getAddFails());
|
||||||
|
deleteFails.addAll(indexer.getDeleteFails());
|
||||||
|
}
|
||||||
|
|
||||||
|
CloudJettyRunner cjetty = shardToLeaderJetty.get(SHARD1);
|
||||||
|
log.info("Starting shard1 leader jetty at port {}", cjetty.jetty.getLocalPort());
|
||||||
|
ChaosMonkey.start(cjetty.jetty);
|
||||||
|
cloudClient.getZkStateReader().forceUpdateCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
|
||||||
|
log.info("Current collection state: {}", printClusterStateInfo(AbstractDistribZkTestBase.DEFAULT_COLLECTION));
|
||||||
|
|
||||||
|
boolean replicaCreationsFailed = false;
|
||||||
|
if (splitStatus == RequestStatusState.FAILED) {
|
||||||
|
// either one or more replica creation failed (because it may have been created on the same parent shard leader node)
|
||||||
|
// or the split may have failed while trying to soft-commit *after* all replicas have been created
|
||||||
|
// the latter counts as a successful switch even if the API doesn't say so
|
||||||
|
// so we must find a way to distinguish between the two
|
||||||
|
// an easy way to do that is to look at the sub-shard replicas and check if the replica core actually exists
|
||||||
|
// instead of existing solely inside the cluster state
|
||||||
|
DocCollection collectionState = cloudClient.getZkStateReader().getClusterState().getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
|
||||||
|
Slice slice10 = collectionState.getSlice(SHARD1_0);
|
||||||
|
Slice slice11 = collectionState.getSlice(SHARD1_1);
|
||||||
|
if (slice10 != null && slice11 != null) {
|
||||||
|
for (Replica replica : slice10) {
|
||||||
|
if (!doesReplicaCoreExist(replica)) {
|
||||||
|
replicaCreationsFailed = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (Replica replica : slice11) {
|
||||||
|
if (!doesReplicaCoreExist(replica)) {
|
||||||
|
replicaCreationsFailed = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// true if sub-shard states switch to 'active' eventually
|
||||||
|
AtomicBoolean areSubShardsActive = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
if (!replicaCreationsFailed) {
|
||||||
|
// all sub-shard replicas were created successfully so all cores must recover eventually
|
||||||
|
waitForRecoveriesToFinish(AbstractDistribZkTestBase.DEFAULT_COLLECTION, true);
|
||||||
|
// let's wait for the overseer to switch shard states
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
cloudClient.getZkStateReader().registerCollectionStateWatcher(AbstractDistribZkTestBase.DEFAULT_COLLECTION, new CollectionStateWatcher() {
|
||||||
|
@Override
|
||||||
|
public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
|
||||||
|
Slice parent = collectionState.getSlice(SHARD1);
|
||||||
|
Slice slice10 = collectionState.getSlice(SHARD1_0);
|
||||||
|
Slice slice11 = collectionState.getSlice(SHARD1_1);
|
||||||
|
if (slice10 != null && slice11 != null &&
|
||||||
|
parent.getState() == Slice.State.INACTIVE &&
|
||||||
|
slice10.getState() == Slice.State.ACTIVE &&
|
||||||
|
slice11.getState() == Slice.State.ACTIVE) {
|
||||||
|
areSubShardsActive.set(true);
|
||||||
|
latch.countDown();
|
||||||
|
return true; // removes the watch
|
||||||
|
} else if (slice10 != null && slice11 != null &&
|
||||||
|
parent.getState() == Slice.State.ACTIVE &&
|
||||||
|
slice10.getState() == Slice.State.RECOVERY_FAILED &&
|
||||||
|
slice11.getState() == Slice.State.RECOVERY_FAILED) {
|
||||||
|
areSubShardsActive.set(false);
|
||||||
|
latch.countDown();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
latch.await(2, TimeUnit.MINUTES);
|
||||||
|
|
||||||
|
if (latch.getCount() != 0) {
|
||||||
|
// sanity check
|
||||||
|
fail("We think that split was successful but sub-shard states were not updated even after 2 minutes.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cloudClient.commit(); // for visibility of results on sub-shards
|
||||||
|
|
||||||
|
checkShardConsistency(true, true, addFails, deleteFails);
|
||||||
|
long ctrlDocs = controlClient.query(new SolrQuery("*:*")).getResults().getNumFound();
|
||||||
|
// ensure we have added more than 0 docs
|
||||||
|
long cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
|
||||||
|
assertTrue("Found " + ctrlDocs + " control docs", cloudClientDocs > 0);
|
||||||
|
assertEquals("Found " + ctrlDocs + " control docs and " + cloudClientDocs + " cloud docs", ctrlDocs, cloudClientDocs);
|
||||||
|
|
||||||
|
// check consistency of sub-shard replica explicitly because checkShardConsistency methods doesn't
|
||||||
|
// handle new shards/replica so well.
|
||||||
|
if (areSubShardsActive.get()) {
|
||||||
|
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
|
||||||
|
DocCollection collection = clusterState.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
|
||||||
|
int numReplicasChecked = assertConsistentReplicas(collection.getSlice(SHARD1_0));
|
||||||
|
assertEquals("We should have checked consistency for exactly 2 replicas of shard1_0", 2, numReplicasChecked);
|
||||||
|
numReplicasChecked = assertConsistentReplicas(collection.getSlice(SHARD1_1));
|
||||||
|
assertEquals("We should have checked consistency for exactly 2 replicas of shard1_1", 2, numReplicasChecked);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
stop.set(true);
|
||||||
|
monkeyThread.join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean doesReplicaCoreExist(Replica replica) throws IOException {
|
||||||
|
try (HttpSolrClient client = new HttpSolrClient.Builder(replica.getStr(BASE_URL_PROP))
|
||||||
|
.withHttpClient(cloudClient.getLbClient().getHttpClient()).build()) {
|
||||||
|
String coreName = replica.getCoreName();
|
||||||
|
try {
|
||||||
|
CoreAdminResponse status = CoreAdminRequest.getStatus(coreName, client);
|
||||||
|
if (status.getCoreStatus(coreName) == null || status.getCoreStatus(coreName).size() == 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("Error gettting core status of replica " + replica + ". Perhaps it does not exist!", e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSplitShardWithRule() throws Exception {
|
public void testSplitShardWithRule() throws Exception {
|
||||||
waitForThingsToLevelOut(15);
|
waitForThingsToLevelOut(15);
|
||||||
|
|
|
@ -81,7 +81,16 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
|
||||||
* shard in that state still receives update requests from the parent shard
|
* shard in that state still receives update requests from the parent shard
|
||||||
* leader, however does not participate in distributed search.
|
* leader, however does not participate in distributed search.
|
||||||
*/
|
*/
|
||||||
RECOVERY;
|
RECOVERY,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sub-shards of a split shard are put in that state when the split is deemed failed
|
||||||
|
* by the overseer even though all replicas are active because either the leader node is
|
||||||
|
* no longer live or has a different ephemeral owner (zk session id). Such conditions can potentially
|
||||||
|
* lead to data loss. See SOLR-9438 for details. A shard in that state will neither receive
|
||||||
|
* update requests from the parent shard leader, nor participate in distributed search.
|
||||||
|
*/
|
||||||
|
RECOVERY_FAILED;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
|
|
Loading…
Reference in New Issue