diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 27f04402dbe..f85727bbe9e 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -45,6 +45,10 @@ Apache UIMA 2.3.1 Apache ZooKeeper 3.4.10 Jetty 9.3.20.v20170531 +New Features +---------------------- +* SOLR-11448: Implement an option in collection commands to wait for final results. (ab) + Bug Fixes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/ActiveReplicaWatcher.java b/solr/core/src/java/org/apache/solr/cloud/ActiveReplicaWatcher.java new file mode 100644 index 00000000000..3819aec4185 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/ActiveReplicaWatcher.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +import org.apache.solr.common.cloud.CollectionStateWatcher; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkStateReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Watch for replicas to become {@link org.apache.solr.common.cloud.Replica.State#ACTIVE}. Watcher is + * terminated (its {@link #onStateChanged(Set, DocCollection)} method returns false) when all listed + * replicas become active. + *

Additionally, the provided {@link CountDownLatch} instance can be used to await + * for all listed replicas to become active.

+ */ +public class ActiveReplicaWatcher implements CollectionStateWatcher { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final String collection; + private final List replicaIds = new ArrayList<>(); + private final List solrCoreNames = new ArrayList<>(); + private final List activeReplicas = new ArrayList<>(); + + private CountDownLatch countDownLatch; + + /** + * Construct the watcher. At least one replicaId or solrCoreName must be provided. + * @param collection collection name + * @param replicaIds list of replica id-s + * @param solrCoreNames list of SolrCore names + * @param countDownLatch optional latch to await for all provided replicas to become active. This latch will be + * counted down by at most the number of provided replica id-s / SolrCore names. + */ + public ActiveReplicaWatcher(String collection, List replicaIds, List solrCoreNames, CountDownLatch countDownLatch) { + if (replicaIds == null && solrCoreNames == null) { + throw new IllegalArgumentException("Either replicaId or solrCoreName must be provided."); + } + if (replicaIds != null) { + this.replicaIds.addAll(replicaIds); + } + if (solrCoreNames != null) { + this.solrCoreNames.addAll(solrCoreNames); + } + if (this.replicaIds.isEmpty() && this.solrCoreNames.isEmpty()) { + throw new IllegalArgumentException("At least one replicaId or solrCoreName must be provided"); + } + this.collection = collection; + this.countDownLatch = countDownLatch; + } + + /** + * Collection name. + */ + public String getCollection() { + return collection; + } + + /** + * Return the list of active replicas found so far. + */ + public List getActiveReplicas() { + return activeReplicas; + } + + /** + * Return the list of replica id-s that are not active yet (or unverified). + */ + public List getReplicaIds() { + return replicaIds; + } + + /** + * Return a list of SolrCore names that are not active yet (or unverified). + */ + public List getSolrCoreNames() { + return solrCoreNames; + } + + @Override + public String toString() { + return "ActiveReplicaWatcher{" + + "collection='" + collection + '\'' + + ", replicaIds=" + replicaIds + + ", solrCoreNames=" + solrCoreNames + + ", activeReplicas=" + activeReplicas + + '}'; + } + + @Override + public boolean onStateChanged(Set liveNodes, DocCollection collectionState) { + log.debug("-- onStateChanged: " + collectionState); + if (collectionState == null) { // collection has been deleted - don't wait + if (countDownLatch != null) { + for (int i = 0; i < replicaIds.size() + solrCoreNames.size(); i++) { + countDownLatch.countDown(); + } + } + replicaIds.clear(); + solrCoreNames.clear(); + return true; + } + for (Slice slice : collectionState.getSlices()) { + for (Replica replica : slice.getReplicas()) { + if (replicaIds.contains(replica.getName())) { + if (replica.isActive(liveNodes)) { + activeReplicas.add(replica); + replicaIds.remove(replica.getName()); + if (countDownLatch != null) { + countDownLatch.countDown(); + } + } + } else if (solrCoreNames.contains(replica.getStr(ZkStateReader.CORE_NAME_PROP))) { + if (replica.isActive(liveNodes)) { + activeReplicas.add(replica); + solrCoreNames.remove(replica.getStr(ZkStateReader.CORE_NAME_PROP)); + if (countDownLatch != null) { + countDownLatch.countDown(); + } + } + } + } + } + if (replicaIds.isEmpty() && solrCoreNames.isEmpty()) { + return true; + } else { + return false; + } + } +} diff --git a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java index d4331f94a4a..f533c5606aa 100644 --- a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java @@ -25,6 +25,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.Locale; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang.StringUtils; @@ -54,6 +56,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP; import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA; import static org.apache.solr.common.params.CommonAdminParams.ASYNC; +import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE; public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -77,8 +80,10 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { String shard = message.getStr(SHARD_ID_PROP); String coreName = message.getStr(CoreAdminParams.NAME); String coreNodeName = message.getStr(CoreAdminParams.CORE_NODE_NAME); + int timeout = message.getInt("timeout", 10 * 60); // 10 minutes Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT)); boolean parallel = message.getBool("parallel", false); + boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false); if (StringUtils.isBlank(coreName)) { coreName = message.getStr(CoreAdminParams.PROPERTY_PREFIX + CoreAdminParams.NAME); } @@ -222,8 +227,22 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { if (onComplete != null) onComplete.run(); }; - if (!parallel) { - runnable.run(); + if (!parallel || waitForFinalState) { + if (waitForFinalState) { + CountDownLatch countDownLatch = new CountDownLatch(1); + ActiveReplicaWatcher watcher = new ActiveReplicaWatcher(collection, null, Collections.singletonList(coreName), countDownLatch); + try { + zkStateReader.registerCollectionStateWatcher(collection, watcher); + runnable.run(); + if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting " + timeout + " seconds for replica to become active."); + } + } finally { + zkStateReader.removeCollectionStateWatcher(collection, watcher); + } + } else { + runnable.run(); + } } else { ocmh.tpe.submit(runnable); } @@ -236,4 +255,5 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { ZkStateReader.NODE_NAME_PROP, node ); } + } diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java index 93bc47a1454..3925a7e9988 100644 --- a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java @@ -46,6 +46,7 @@ import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZooKeeperException; import org.apache.solr.common.params.CollectionAdminParams; +import org.apache.solr.common.params.CommonAdminParams; import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.NamedList; @@ -73,6 +74,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_P import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS; import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA; import static org.apache.solr.common.params.CommonAdminParams.ASYNC; +import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE; import static org.apache.solr.common.params.CommonParams.NAME; import static org.apache.solr.common.util.StrUtils.formatString; @@ -89,6 +91,7 @@ public class CreateCollectionCmd implements Cmd { @Override public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception { final String collectionName = message.getStr(NAME); + final boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false); log.info("Create collection {}", collectionName); if (clusterState.hasCollection(collectionName)) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName); @@ -242,7 +245,8 @@ public class CreateCollectionCmd implements Cmd { ZkStateReader.CORE_NAME_PROP, coreName, ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(), ZkStateReader.BASE_URL_PROP, baseUrl, - ZkStateReader.REPLICA_TYPE, replicaPosition.type.name()); + ZkStateReader.REPLICA_TYPE, replicaPosition.type.name(), + CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState)); Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props)); } @@ -317,6 +321,7 @@ public class CreateCollectionCmd implements Cmd { PolicyHelper.clearFlagAndDecref(ocmh.policySessionRef); } } + String getConfigName(String coll, ZkNodeProps message) throws KeeperException, InterruptedException { String configName = message.getStr(COLL_CONF); diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java index 90cecd774e2..70f964f7de4 100644 --- a/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java @@ -39,6 +39,7 @@ import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.ReplicaPosition; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CommonAdminParams; import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.SimpleOrderedMap; @@ -68,6 +69,7 @@ public class CreateShardCmd implements Cmd { public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception { String collectionName = message.getStr(COLLECTION_PROP); String sliceName = message.getStr(SHARD_ID_PROP); + boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false); log.info("Create shard invoked: {}", message); if (collectionName == null || sliceName == null) @@ -134,7 +136,8 @@ public class CreateShardCmd implements Cmd { SHARD_ID_PROP, sliceName, ZkStateReader.REPLICA_TYPE, position.type.name(), CoreAdminParams.NODE, nodeName, - CoreAdminParams.NAME, coreName); + CoreAdminParams.NAME, coreName, + CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState)); Map propertyParams = new HashMap<>(); ocmh.addPropertyParams(message, propertyParams); addReplicasProps = addReplicasProps.plus(propertyParams); diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderRecoveryWatcher.java b/solr/core/src/java/org/apache/solr/cloud/LeaderRecoveryWatcher.java new file mode 100644 index 00000000000..bf9a3fea363 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/LeaderRecoveryWatcher.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud; + +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +import org.apache.solr.common.cloud.CollectionStateWatcher; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkStateReader; + +/** + * We use this watcher to wait for any eligible replica in a shard to become active so that it can become a leader. + */ +public class LeaderRecoveryWatcher implements CollectionStateWatcher { + String collectionId; + String shardId; + String replicaId; + String targetCore; + CountDownLatch countDownLatch; + + /** + * Watch for recovery of a replica + * + * @param collectionId collection name + * @param shardId shard id + * @param replicaId source replica name (coreNodeName) + * @param targetCore specific target core name - if null then any active replica will do + * @param countDownLatch countdown when recovered + */ + LeaderRecoveryWatcher(String collectionId, String shardId, String replicaId, String targetCore, CountDownLatch countDownLatch) { + this.collectionId = collectionId; + this.shardId = shardId; + this.replicaId = replicaId; + this.targetCore = targetCore; + this.countDownLatch = countDownLatch; + } + + @Override + public boolean onStateChanged(Set liveNodes, DocCollection collectionState) { + if (collectionState == null) { // collection has been deleted - don't wait + countDownLatch.countDown(); + return true; + } + Slice slice = collectionState.getSlice(shardId); + if (slice == null) { // shard has been removed - don't wait + countDownLatch.countDown(); + return true; + } + for (Replica replica : slice.getReplicas()) { + // check if another replica exists - doesn't have to be the one we're moving + // as long as it's active and can become a leader, in which case we don't have to wait + // for recovery of specifically the one that we've just added + if (!replica.getName().equals(replicaId)) { + if (replica.getType().equals(Replica.Type.PULL)) { // not eligible for leader election + continue; + } + // check its state + String coreName = replica.getStr(ZkStateReader.CORE_NAME_PROP); + if (targetCore != null && !targetCore.equals(coreName)) { + continue; + } + if (replica.isActive(liveNodes)) { // recovered - stop waiting + countDownLatch.countDown(); + return true; + } + } + } + // set the watch again to wait for the new replica to recover + return false; + } +} diff --git a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java index f94353e1022..18bf9689fe0 100644 --- a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java @@ -45,6 +45,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP; import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; import static org.apache.solr.common.params.CommonAdminParams.ASYNC; +import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE; public class MoveReplicaCmd implements Cmd{ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -65,6 +66,7 @@ public class MoveReplicaCmd implements Cmd{ ocmh.checkRequired(message, COLLECTION_PROP, "targetNode"); String collection = message.getStr(COLLECTION_PROP); String targetNode = message.getStr("targetNode"); + boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false); int timeout = message.getInt("timeout", 10 * 60); // 10 minutes String async = message.getStr(ASYNC); @@ -111,14 +113,14 @@ public class MoveReplicaCmd implements Cmd{ assert slice != null; Object dataDir = replica.get("dataDir"); if (dataDir != null && dataDir.toString().startsWith("hdfs:/")) { - moveHdfsReplica(clusterState, results, dataDir.toString(), targetNode, async, coll, replica, slice, timeout); + moveHdfsReplica(clusterState, results, dataDir.toString(), targetNode, async, coll, replica, slice, timeout, waitForFinalState); } else { - moveNormalReplica(clusterState, results, targetNode, async, coll, replica, slice, timeout); + moveNormalReplica(clusterState, results, targetNode, async, coll, replica, slice, timeout, waitForFinalState); } } private void moveHdfsReplica(ClusterState clusterState, NamedList results, String dataDir, String targetNode, String async, - DocCollection coll, Replica replica, Slice slice, int timeout) throws Exception { + DocCollection coll, Replica replica, Slice slice, int timeout, boolean waitForFinalState) throws Exception { String skipCreateReplicaInClusterState = "true"; if (clusterState.getLiveNodes().contains(replica.getNodeName())) { skipCreateReplicaInClusterState = "false"; @@ -183,7 +185,7 @@ public class MoveReplicaCmd implements Cmd{ } private void moveNormalReplica(ClusterState clusterState, NamedList results, String targetNode, String async, - DocCollection coll, Replica replica, Slice slice, int timeout) throws Exception { + DocCollection coll, Replica replica, Slice slice, int timeout, boolean waitForFinalState) throws Exception { String newCoreName = Assign.buildCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(), coll, slice.getName(), replica.getType()); ZkNodeProps addReplicasProps = new ZkNodeProps( COLLECTION_PROP, coll.getName(), @@ -193,10 +195,9 @@ public class MoveReplicaCmd implements Cmd{ if(async!=null) addReplicasProps.getProperties().put(ASYNC, async); NamedList addResult = new NamedList(); CountDownLatch countDownLatch = new CountDownLatch(1); - ReplaceNodeCmd.RecoveryWatcher watcher = null; - if (replica.equals(slice.getLeader())) { - watcher = new ReplaceNodeCmd.RecoveryWatcher(coll.getName(), slice.getName(), - replica.getName(), null, countDownLatch); + ActiveReplicaWatcher watcher = null; + if (replica.equals(slice.getLeader()) || waitForFinalState) { + watcher = new ActiveReplicaWatcher(coll.getName(), Collections.singletonList(replica.getName()), null, countDownLatch); ocmh.zkStateReader.registerCollectionStateWatcher(coll.getName(), watcher); } ocmh.addReplica(clusterState, addReplicasProps, addResult, null); @@ -221,7 +222,7 @@ public class MoveReplicaCmd implements Cmd{ results.add("failure", errorString); return; } else { - log.debug("Replica " + watcher.getRecoveredReplica() + " is active - deleting the source..."); + log.debug("Replica " + watcher.getActiveReplicas() + " is active - deleting the source..."); } } finally { ocmh.zkStateReader.removeCollectionStateWatcher(coll.getName(), watcher); diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java index ef3fd89ec4e..98d875f6c6b 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java @@ -20,11 +20,11 @@ package org.apache.solr.cloud; import java.lang.invoke.MethodHandles; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -38,6 +38,7 @@ import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CollectionParams; +import org.apache.solr.common.params.CommonAdminParams; import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.util.NamedList; import org.apache.zookeeper.KeeperException; @@ -62,6 +63,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd { ZkStateReader zkStateReader = ocmh.zkStateReader; String source = message.getStr(CollectionParams.SOURCE_NODE, message.getStr("source")); String target = message.getStr(CollectionParams.TARGET_NODE, message.getStr("target")); + boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false); if (source == null || target == null) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "sourceNode and targetNode are required params" ); } @@ -80,14 +82,15 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd { // how many leaders are we moving? for these replicas we have to make sure that either: // * another existing replica can become a leader, or // * we wait until the newly created replica completes recovery (and can become the new leader) + // If waitForFinalState=true we wait for all replicas int numLeaders = 0; for (ZkNodeProps props : sourceReplicas) { - if (props.getBool(ZkStateReader.LEADER_PROP, false)) { + if (props.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) { numLeaders++; } } // map of collectionName_coreNodeName to watchers - Map watchers = new HashMap<>(); + Map watchers = new HashMap<>(); List createdReplicas = new ArrayList<>(); AtomicBoolean anyOneFailed = new AtomicBoolean(false); @@ -122,15 +125,24 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd { if (addedReplica != null) { createdReplicas.add(addedReplica); - if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false)) { + if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) { String shardName = sourceReplica.getStr(SHARD_ID_PROP); String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP); String collectionName = sourceReplica.getStr(COLLECTION_PROP); String key = collectionName + "_" + replicaName; - RecoveryWatcher watcher = new RecoveryWatcher(collectionName, shardName, replicaName, - addedReplica.getStr(ZkStateReader.CORE_NAME_PROP), replicasToRecover); + CollectionStateWatcher watcher; + if (waitForFinalState) { + watcher = new ActiveReplicaWatcher(collectionName, null, + Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)), replicasToRecover); + } else { + watcher = new LeaderRecoveryWatcher(collectionName, shardName, replicaName, + addedReplica.getStr(ZkStateReader.CORE_NAME_PROP), replicasToRecover); + } watchers.put(key, watcher); + log.debug("--- adding " + key + ", " + watcher); zkStateReader.registerCollectionStateWatcher(collectionName, watcher); + } else { + log.debug("--- not waiting for " + addedReplica); } } } @@ -152,8 +164,8 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd { log.debug("Finished waiting for leader replicas to recover"); } // remove the watchers, we're done either way - for (RecoveryWatcher watcher : watchers.values()) { - zkStateReader.removeCollectionStateWatcher(watcher.collectionId, watcher); + for (Map.Entry e : watchers.entrySet()) { + zkStateReader.removeCollectionStateWatcher(e.getKey(), e.getValue()); } if (anyOneFailed.get()) { log.info("Failed to create some replicas. Cleaning up all replicas on target node"); @@ -211,68 +223,4 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd { return sourceReplicas; } - // we use this watcher to wait for replicas to recover - static class RecoveryWatcher implements CollectionStateWatcher { - String collectionId; - String shardId; - String replicaId; - String targetCore; - CountDownLatch countDownLatch; - Replica recovered; - - /** - * Watch for recovery of a replica - * @param collectionId collection name - * @param shardId shard id - * @param replicaId source replica name (coreNodeName) - * @param targetCore specific target core name - if null then any active replica will do - * @param countDownLatch countdown when recovered - */ - RecoveryWatcher(String collectionId, String shardId, String replicaId, String targetCore, CountDownLatch countDownLatch) { - this.collectionId = collectionId; - this.shardId = shardId; - this.replicaId = replicaId; - this.targetCore = targetCore; - this.countDownLatch = countDownLatch; - } - - @Override - public boolean onStateChanged(Set liveNodes, DocCollection collectionState) { - if (collectionState == null) { // collection has been deleted - don't wait - countDownLatch.countDown(); - return true; - } - Slice slice = collectionState.getSlice(shardId); - if (slice == null) { // shard has been removed - don't wait - countDownLatch.countDown(); - return true; - } - for (Replica replica : slice.getReplicas()) { - // check if another replica exists - doesn't have to be the one we're moving - // as long as it's active and can become a leader, in which case we don't have to wait - // for recovery of specifically the one that we've just added - if (!replica.getName().equals(replicaId)) { - if (replica.getType().equals(Replica.Type.PULL)) { // not eligible for leader election - continue; - } - // check its state - String coreName = replica.getStr(ZkStateReader.CORE_NAME_PROP); - if (targetCore != null && !targetCore.equals(coreName)) { - continue; - } - if (replica.isActive(liveNodes)) { // recovered - stop waiting - recovered = replica; - countDownLatch.countDown(); - return true; - } - } - } - // set the watch again to wait for the new replica to recover - return false; - } - - public Replica getRecoveredReplica() { - return recovered; - } - } } diff --git a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java index 10cc7c871af..4a91ba58c85 100644 --- a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java @@ -43,6 +43,7 @@ import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CommonAdminParams; import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.NamedList; @@ -80,6 +81,7 @@ public class SplitShardCmd implements Cmd { public boolean split(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception { String collectionName = message.getStr("collection"); String slice = message.getStr(ZkStateReader.SHARD_ID_PROP); + boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false); log.info("Split shard invoked"); ZkStateReader zkStateReader = ocmh.zkStateReader; @@ -282,6 +284,7 @@ public class SplitShardCmd implements Cmd { propMap.put(SHARD_ID_PROP, subSlice); propMap.put("node", nodeName); propMap.put(CoreAdminParams.NAME, subShardName); + propMap.put(CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState)); // copy over property params: for (String key : message.keySet()) { if (key.startsWith(COLL_PROP_PREFIX)) { @@ -406,7 +409,8 @@ public class SplitShardCmd implements Cmd { ZkStateReader.CORE_NAME_PROP, shardName, ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(), ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(subShardNodeName), - ZkStateReader.NODE_NAME_PROP, subShardNodeName); + ZkStateReader.NODE_NAME_PROP, subShardNodeName, + CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState)); Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props)); HashMap propMap = new HashMap<>(); @@ -428,6 +432,8 @@ public class SplitShardCmd implements Cmd { // special flag param to instruct addReplica not to create the replica in cluster state again propMap.put(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, "true"); + propMap.put(CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState)); + replicas.add(propMap); } diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java index 3d0122215d3..95d1a1ce35a 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java @@ -130,6 +130,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS; import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP; import static org.apache.solr.common.params.CollectionParams.CollectionAction.*; import static org.apache.solr.common.params.CommonAdminParams.ASYNC; +import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE; import static org.apache.solr.common.params.CommonParams.NAME; import static org.apache.solr.common.params.CommonParams.VALUE_LONG; import static org.apache.solr.common.params.CoreAdminParams.DATA_DIR; @@ -414,7 +415,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission PULL_REPLICAS, TLOG_REPLICAS, NRT_REPLICAS, - POLICY); + POLICY, + WAIT_FOR_FINAL_STATE); if (props.get(STATE_FORMAT) == null) { props.put(STATE_FORMAT, "2"); @@ -505,7 +507,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission COLLECTION_PROP, SHARD_ID_PROP, "split.key", - CoreAdminParams.RANGES); + CoreAdminParams.RANGES, + WAIT_FOR_FINAL_STATE); return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX); }), DELETESHARD_OP(DELETESHARD, (req, rsp, h) -> { @@ -532,7 +535,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission throw new SolrException(ErrorCode.BAD_REQUEST, "shards can be added only to 'implicit' collections"); req.getParams().getAll(map, REPLICATION_FACTOR, - CREATE_NODE_SET); + CREATE_NODE_SET, + WAIT_FOR_FINAL_STATE); return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX); }), DELETEREPLICA_OP(DELETEREPLICA, (req, rsp, h) -> { @@ -644,7 +648,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission INSTANCE_DIR, DATA_DIR, ULOG_DIR, - REPLICA_TYPE); + REPLICA_TYPE, + WAIT_FOR_FINAL_STATE); return copyPropertiesWithPrefix(req.getParams(), props, COLL_PROP_PREFIX); }), OVERSEERSTATUS_OP(OVERSEERSTATUS, (req, rsp, h) -> (Map) new LinkedHashMap<>()), @@ -901,7 +906,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission if (targetNode == null) { throw new SolrException(ErrorCode.BAD_REQUEST, CollectionParams.TARGET_NODE + " is a require parameter"); } - return params.getAll(null, "source", "target", CollectionParams.SOURCE_NODE, CollectionParams.TARGET_NODE); + return params.getAll(null, "source", "target", WAIT_FOR_FINAL_STATE, CollectionParams.SOURCE_NODE, CollectionParams.TARGET_NODE); }), MOVEREPLICA_OP(MOVEREPLICA, (req, rsp, h) -> { Map map = req.getParams().required().getAll(null, @@ -911,6 +916,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission CollectionParams.FROM_NODE, CollectionParams.SOURCE_NODE, CollectionParams.TARGET_NODE, + WAIT_FOR_FINAL_STATE, "replica", "shard"); }), diff --git a/solr/core/src/test/org/apache/solr/cloud/AddReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/AddReplicaTest.java new file mode 100644 index 00000000000..636138087d7 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/AddReplicaTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud; + +import java.lang.invoke.MethodHandles; +import java.util.Collection; + +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.response.RequestStatusState; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.util.LogLevel; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +@LogLevel("org.apache.solr.cloud=DEBUG") +public class AddReplicaTest extends SolrCloudTestCase { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @BeforeClass + public static void setupCluster() throws Exception { + configureCluster(4) + .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf")) + .configure(); + } + + @Test + public void test() throws Exception { + cluster.waitForAllNodes(5000); + String collection = "addreplicatest_coll"; + + CloudSolrClient cloudClient = cluster.getSolrClient(); + + CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collection, "conf1", 2, 1); + create.setMaxShardsPerNode(2); + cloudClient.request(create); + + ClusterState clusterState = cloudClient.getZkStateReader().getClusterState(); + DocCollection coll = clusterState.getCollection(collection); + String sliceName = coll.getSlices().iterator().next().getName(); + Collection replicas = coll.getSlice(sliceName).getReplicas(); + CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest.addReplicaToShard(collection, sliceName); + addReplica.processAsync("000", cloudClient); + CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus("000"); + CollectionAdminRequest.RequestStatusResponse rsp = requestStatus.process(cloudClient); + assertTrue(rsp.getRequestStatus() != RequestStatusState.COMPLETED); + // wait for async request success + boolean success = false; + for (int i = 0; i < 200; i++) { + rsp = requestStatus.process(cloudClient); + if (rsp.getRequestStatus() == RequestStatusState.COMPLETED) { + success = true; + break; + } + assertFalse(rsp.getRequestStatus() == RequestStatusState.FAILED); + Thread.sleep(500); + } + assertTrue(success); + Collection replicas2 = cloudClient.getZkStateReader().getClusterState().getCollection(collection).getSlice(sliceName).getReplicas(); + replicas2.removeAll(replicas); + assertEquals(1, replicas2.size()); + Replica r = replicas2.iterator().next(); + assertTrue(r.toString(), r.getState() != Replica.State.ACTIVE); + + // use waitForFinalState + addReplica.setWaitForFinalState(true); + addReplica.processAsync("001", cloudClient); + requestStatus = CollectionAdminRequest.requestStatus("001"); + rsp = requestStatus.process(cloudClient); + assertTrue(rsp.getRequestStatus() != RequestStatusState.COMPLETED); + // wait for async request success + success = false; + for (int i = 0; i < 200; i++) { + rsp = requestStatus.process(cloudClient); + if (rsp.getRequestStatus() == RequestStatusState.COMPLETED) { + success = true; + break; + } + assertFalse(rsp.toString(), rsp.getRequestStatus() == RequestStatusState.FAILED); + Thread.sleep(500); + } + assertTrue(success); + // let the client watch fire + Thread.sleep(1000); + clusterState = cloudClient.getZkStateReader().getClusterState(); + coll = clusterState.getCollection(collection); + Collection replicas3 = coll.getSlice(sliceName).getReplicas(); + replicas3.removeAll(replicas); + String replica2 = replicas2.iterator().next().getName(); + assertEquals(2, replicas3.size()); + for (Replica replica : replicas3) { + if (replica.getName().equals(replica2)) { + continue; // may be still recovering + } + assertTrue(coll.toString() + "\n" + replica.toString(), replica.getState() == Replica.State.ACTIVE); + } + } +} diff --git a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java index 4843a2309ec..9ed751f2f98 100644 --- a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java @@ -273,12 +273,6 @@ public class MoveReplicaTest extends SolrCloudTestCase { assertFalse(success); Set newWatchers = new HashSet<>(accessor.getStateWatchers(coll)); - for (Iterator it = newWatchers.iterator(); it.hasNext(); ) { - CollectionStateWatcher watcher = it.next(); - if (watcher instanceof ReplaceNodeCmd.RecoveryWatcher) { - it.remove(); - } - } assertEquals(watchers, newWatchers); } diff --git a/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java index db6221224fd..f5ed3109990 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java @@ -22,6 +22,8 @@ import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; +import java.util.Iterator; +import java.util.List; import java.util.Set; import org.apache.solr.client.solrj.impl.CloudSolrClient; @@ -82,6 +84,8 @@ public class ReplaceNodeTest extends SolrCloudTestCase { ); create.setCreateNodeSet(StrUtils.join(l, ',')).setMaxShardsPerNode(3); cloudClient.request(create); + DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(coll); + log.debug("### Before decommission: " + collection); log.info("excluded_node : {} ", emptyNode); createReplaceNodeRequest(node2bdecommissioned, emptyNode, null).processAsync("000", cloudClient); CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus("000"); @@ -101,8 +105,20 @@ public class ReplaceNodeTest extends SolrCloudTestCase { assertTrue(status.getCoreStatus().size() == 0); } - //let's do it back - createReplaceNodeRequest(emptyNode, node2bdecommissioned, Boolean.TRUE).processAsync("001", cloudClient); + Thread.sleep(5000); + collection = cloudClient.getZkStateReader().getClusterState().getCollection(coll); + log.debug("### After decommission: " + collection); + // check what are replica states on the decommissioned node + List replicas = collection.getReplicas(node2bdecommissioned); + if (replicas == null) { + replicas = Collections.emptyList(); + } + log.debug("### Existing replicas on decommissioned node: " + replicas); + + //let's do it back - this time wait for recoveries + CollectionAdminRequest.AsyncCollectionAdminRequest replaceNodeRequest = createReplaceNodeRequest(emptyNode, node2bdecommissioned, Boolean.TRUE); + replaceNodeRequest.setWaitForFinalState(true); + replaceNodeRequest.processAsync("001", cloudClient); requestStatus = CollectionAdminRequest.requestStatus("001"); for (int i = 0; i < 200; i++) { @@ -119,14 +135,35 @@ public class ReplaceNodeTest extends SolrCloudTestCase { CoreAdminResponse status = CoreAdminRequest.getStatus(null, coreclient); assertEquals("Expecting no cores but found some: " + status.getCoreStatus(), 0, status.getCoreStatus().size()); } - - DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(coll); + + collection = cloudClient.getZkStateReader().getClusterState().getCollection(coll); assertEquals(create.getNumShards().intValue(), collection.getSlices().size()); for (Slice s:collection.getSlices()) { assertEquals(create.getNumNrtReplicas().intValue(), s.getReplicas(EnumSet.of(Replica.Type.NRT)).size()); assertEquals(create.getNumTlogReplicas().intValue(), s.getReplicas(EnumSet.of(Replica.Type.TLOG)).size()); assertEquals(create.getNumPullReplicas().intValue(), s.getReplicas(EnumSet.of(Replica.Type.PULL)).size()); } + // make sure all newly created replicas on node are active + List newReplicas = collection.getReplicas(node2bdecommissioned); + replicas.forEach(r -> { + for (Iterator it = newReplicas.iterator(); it.hasNext(); ) { + Replica nr = it.next(); + if (nr.getName().equals(r.getName())) { + it.remove(); + } + } + }); + assertFalse(newReplicas.isEmpty()); + for (Replica r : newReplicas) { + assertEquals(r.toString(), Replica.State.ACTIVE, r.getState()); + } + // make sure all replicas on emptyNode are not active + replicas = collection.getReplicas(emptyNode); + if (replicas != null) { + for (Replica r : replicas) { + assertFalse(r.toString(), Replica.State.ACTIVE.equals(r.getState())); + } + } } private CollectionAdminRequest.AsyncCollectionAdminRequest createReplaceNodeRequest(String sourceNode, String targetNode, Boolean parallel) { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java index 930ff7f3c24..48cea6112ce 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java @@ -120,6 +120,9 @@ public abstract class CollectionAdminRequest */ public abstract static class AsyncCollectionAdminRequest extends CollectionAdminRequest { + protected String asyncId = null; + protected boolean waitForFinalState = false; + public AsyncCollectionAdminRequest(CollectionAction action) { super(action); } @@ -133,12 +136,14 @@ public abstract class CollectionAdminRequest return UUID.randomUUID().toString(); } - protected String asyncId = null; - public String getAsyncId() { return asyncId; } + public void setWaitForFinalState(boolean waitForFinalState) { + this.waitForFinalState = waitForFinalState; + } + public void setAsyncId(String asyncId) { this.asyncId = asyncId; } @@ -200,6 +205,9 @@ public abstract class CollectionAdminRequest if (asyncId != null) { params.set(CommonAdminParams.ASYNC, asyncId); } + if (waitForFinalState) { + params.set(CommonAdminParams.WAIT_FOR_FINAL_STATE, waitForFinalState); + } return params; } } diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CommonAdminParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CommonAdminParams.java index 89895f63edb..f20afa7ca23 100644 --- a/solr/solrj/src/java/org/apache/solr/common/params/CommonAdminParams.java +++ b/solr/solrj/src/java/org/apache/solr/common/params/CommonAdminParams.java @@ -20,6 +20,6 @@ public interface CommonAdminParams { /** async or not? **/ - public static final String ASYNC = "async"; - + String ASYNC = "async"; + String WAIT_FOR_FINAL_STATE = "waitForFinalState"; } diff --git a/solr/solrj/src/resources/apispec/collections.Commands.json b/solr/solrj/src/resources/apispec/collections.Commands.json index 2e3769b9dfb..294b1633bbe 100644 --- a/solr/solrj/src/resources/apispec/collections.Commands.json +++ b/solr/solrj/src/resources/apispec/collections.Commands.json @@ -115,6 +115,11 @@ "async": { "type": "string", "description": "Defines a request ID that can be used to track this action after it's submitted. The action will be processed asynchronously." + }, + "waitForFinalState": { + "type": "boolean", + "description": "If true then request will complete only when all affected replicas become active.", + "default": false } }, "required": [ diff --git a/solr/solrj/src/resources/apispec/collections.collection.shards.Commands.json b/solr/solrj/src/resources/apispec/collections.collection.shards.Commands.json index 5e8e96b3114..25d3e9d0736 100644 --- a/solr/solrj/src/resources/apispec/collections.collection.shards.Commands.json +++ b/solr/solrj/src/resources/apispec/collections.collection.shards.Commands.json @@ -37,6 +37,11 @@ "async": { "type": "string", "description": "Defines a request ID that can be used to track this action after it's submitted. The action will be processed asynchronously when this is defined. This command can be long-running, so running it asynchronously is recommended." + }, + "waitForFinalState": { + "type": "boolean", + "description": "If true then request will complete only when all affected replicas become active.", + "default": false } } }, @@ -63,6 +68,11 @@ "async": { "type": "string", "description": "Defines a request ID that can be used to track this action after it's submitted. The action will be processed asynchronously when this is defined." + }, + "waitForFinalState": { + "type": "boolean", + "description": "If true then request will complete only when all affected replicas become active.", + "default": false } }, "required":["shard"] @@ -106,6 +116,11 @@ "type": "string", "enum":["NRT", "TLOG", "PULL"], "description": "The type of replica to add. NRT (default), TLOG or PULL" + }, + "waitForFinalState": { + "type": "boolean", + "description": "If true then request will complete only when all affected replicas become active.", + "default": false } }, "required":["shard"]