SOLR-10704 Wait until all leader replicas are recovered before deleting

the originals.
This commit is contained in:
Andrzej Bialecki 2017-06-13 17:36:51 +02:00
parent f29e2d1c7c
commit 232eff0893
3 changed files with 102 additions and 6 deletions

View File

@ -364,6 +364,9 @@ Bug Fixes
* SOLR-10835: Add support for point fields in Export Handler (Tomás Fernández Löbbe)
* SOLR-10704: REPLACENODE may cause data loss when replicationFactor is 1. (ab, shalin)
Optimizations
----------------------
* SOLR-10634: JSON Facet API: When a field/terms facet will retrieve all buckets (i.e. limit:-1)

View File

@ -20,15 +20,18 @@ package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
@ -60,6 +63,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
String source = message.getStr("source");
String target = message.getStr("target");
String async = message.getStr("async");
int timeout = message.getInt("timeout", 10 * 60); // 10 minutes
boolean parallel = message.getBool("parallel", false);
ClusterState clusterState = zkStateReader.getClusterState();
@ -70,13 +74,34 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target Node: " + target + " is not live");
}
List<ZkNodeProps> sourceReplicas = getReplicasOfNode(source, clusterState);
// how many leaders are we moving? for these replicas we have to make sure that either:
// * another existing replica can become a leader, or
// * we wait until the newly created replica completes recovery (and can become the new leader)
int numLeaders = 0;
for (ZkNodeProps props : sourceReplicas) {
if (props.getBool(ZkStateReader.LEADER_PROP, false)) {
numLeaders++;
}
}
// map of collectionName_coreNodeName to watchers
Map<String, RecoveryWatcher> watchers = new HashMap<>();
List<ZkNodeProps> createdReplicas = new ArrayList<>();
AtomicBoolean anyOneFailed = new AtomicBoolean(false);
CountDownLatch countDownLatch = new CountDownLatch(sourceReplicas.size());
CountDownLatch replicasToRecover = new CountDownLatch(numLeaders);
for (ZkNodeProps sourceReplica : sourceReplicas) {
if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false)) {
String shardName = sourceReplica.getStr(SHARD_ID_PROP);
String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
String collectionName = sourceReplica.getStr(COLLECTION_PROP);
String key = collectionName + "_" + replicaName;
RecoveryWatcher watcher = new RecoveryWatcher(collectionName, shardName, replicaName, replicasToRecover);
watchers.put(key, watcher);
zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
}
NamedList nl = new NamedList();
log.info("Going to create replica for collection={} shard={} on node={}", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target);
ZkNodeProps msg = sourceReplica.plus("parallel", String.valueOf(parallel)).plus(CoreAdminParams.NODE, target);
@ -106,10 +131,26 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
}
}
log.debug("Waiting for replace node action to complete");
countDownLatch.await(5, TimeUnit.MINUTES);
log.debug("Finished waiting for replace node action to complete");
log.debug("Waiting for replicas to be added");
if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
log.info("Timed out waiting for replicas to be added");
anyOneFailed.set(true);
} else {
log.debug("Finished waiting for replicas to be added");
}
// now wait for leader replicas to recover
log.debug("Waiting for " + numLeaders + " leader replicas to recover");
if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) {
log.info("Timed out waiting for " + replicasToRecover.getCount() + " leader replicas to recover");
anyOneFailed.set(true);
} else {
log.debug("Finished waiting for leader replicas to recover");
}
// remove the watchers, we're done either way
for (RecoveryWatcher watcher : watchers.values()) {
zkStateReader.removeCollectionStateWatcher(watcher.collectionId, watcher);
}
if (anyOneFailed.get()) {
log.info("Failed to create some replicas. Cleaning up all replicas on target node");
CountDownLatch cleanupLatch = new CountDownLatch(createdReplicas.size());
@ -134,6 +175,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
}
}
cleanupLatch.await(5, TimeUnit.MINUTES);
return;
}
@ -155,6 +197,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
ZkStateReader.CORE_NAME_PROP, replica.getCoreName(),
ZkStateReader.REPLICA_PROP, replica.getName(),
ZkStateReader.REPLICA_TYPE, replica.getType().name(),
ZkStateReader.LEADER_PROP, String.valueOf(replica.equals(slice.getLeader())),
CoreAdminParams.NODE, source);
sourceReplicas.add(props);
}
@ -163,4 +206,49 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
}
return sourceReplicas;
}
// we use this watcher to wait for replicas to recover
private static class RecoveryWatcher implements CollectionStateWatcher {
String collectionId;
String shardId;
String replicaId;
CountDownLatch countDownLatch;
RecoveryWatcher(String collectionId, String shardId, String replicaId, CountDownLatch countDownLatch) {
this.collectionId = collectionId;
this.shardId = shardId;
this.replicaId = replicaId;
this.countDownLatch = countDownLatch;
}
@Override
public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
if (collectionState == null) { // collection has been deleted - don't wait
countDownLatch.countDown();
return true;
}
Slice slice = collectionState.getSlice(shardId);
if (slice == null) { // shard has been removed - don't wait
countDownLatch.countDown();
return true;
}
for (Replica replica : slice.getReplicas()) {
// check if another replica exists - doesn't have to be the one we're moving
// as long as it's active and can become a leader, in which case we don't have to wait
// for recovery of specifically the one that we've just added
if (!replica.getName().equals(replicaId)) {
if (replica.getType().equals(Replica.Type.PULL)) { // not eligible for leader election
continue;
}
// check its state
if (replica.isActive(liveNodes)) { // recovered - stop waiting
countDownLatch.countDown();
return true;
}
}
}
// set the watch again to wait for the new replica to recover
return false;
}
}
}

View File

@ -67,11 +67,16 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
CollectionAdminRequest.Create create;
// NOTE: always using the createCollection that takes in 'int' for all types of replicas, so we never
// have to worry about null checking when comparing the Create command with the final Slices
create = pickRandom(CollectionAdminRequest.createCollection(coll, "conf1", 5, 2,0,0),
create = pickRandom(
CollectionAdminRequest.createCollection(coll, "conf1", 5, 2,0,0),
CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,1,0),
CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,1,1),
CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,0,1),
CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,2,0));
CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,2,0),
// check also replicationFactor 1
CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,0,0),
CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,1,0)
);
create.setCreateNodeSet(StrUtils.join(l, ',')).setMaxShardsPerNode(3);
cloudClient.request(create);
log.info("excluded_node : {} ", emptyNode);