SOLR-11448: Implement an option in collection commands to wait for final results.

This commit is contained in:
Andrzej Bialecki 2017-10-18 13:06:19 +02:00
parent adbf6805bf
commit 8b4ab2644d
17 changed files with 516 additions and 105 deletions

View File

@ -45,6 +45,10 @@ Apache UIMA 2.3.1
Apache ZooKeeper 3.4.10
Jetty 9.3.20.v20170531
New Features
----------------------
* SOLR-11448: Implement an option in collection commands to wait for final results. (ab)
Bug Fixes
----------------------

View File

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Watch for replicas to become {@link org.apache.solr.common.cloud.Replica.State#ACTIVE}. Watcher is
* terminated (its {@link #onStateChanged(Set, DocCollection)} method returns false) when all listed
* replicas become active.
* <p>Additionally, the provided {@link CountDownLatch} instance can be used to await
* for all listed replicas to become active.</p>
*/
public class ActiveReplicaWatcher implements CollectionStateWatcher {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final String collection;
private final List<String> replicaIds = new ArrayList<>();
private final List<String> solrCoreNames = new ArrayList<>();
private final List<Replica> activeReplicas = new ArrayList<>();
private CountDownLatch countDownLatch;
/**
* Construct the watcher. At least one replicaId or solrCoreName must be provided.
* @param collection collection name
* @param replicaIds list of replica id-s
* @param solrCoreNames list of SolrCore names
* @param countDownLatch optional latch to await for all provided replicas to become active. This latch will be
* counted down by at most the number of provided replica id-s / SolrCore names.
*/
public ActiveReplicaWatcher(String collection, List<String> replicaIds, List<String> solrCoreNames, CountDownLatch countDownLatch) {
if (replicaIds == null && solrCoreNames == null) {
throw new IllegalArgumentException("Either replicaId or solrCoreName must be provided.");
}
if (replicaIds != null) {
this.replicaIds.addAll(replicaIds);
}
if (solrCoreNames != null) {
this.solrCoreNames.addAll(solrCoreNames);
}
if (this.replicaIds.isEmpty() && this.solrCoreNames.isEmpty()) {
throw new IllegalArgumentException("At least one replicaId or solrCoreName must be provided");
}
this.collection = collection;
this.countDownLatch = countDownLatch;
}
/**
* Collection name.
*/
public String getCollection() {
return collection;
}
/**
* Return the list of active replicas found so far.
*/
public List<Replica> getActiveReplicas() {
return activeReplicas;
}
/**
* Return the list of replica id-s that are not active yet (or unverified).
*/
public List<String> getReplicaIds() {
return replicaIds;
}
/**
* Return a list of SolrCore names that are not active yet (or unverified).
*/
public List<String> getSolrCoreNames() {
return solrCoreNames;
}
@Override
public String toString() {
return "ActiveReplicaWatcher{" +
"collection='" + collection + '\'' +
", replicaIds=" + replicaIds +
", solrCoreNames=" + solrCoreNames +
", activeReplicas=" + activeReplicas +
'}';
}
@Override
public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
log.debug("-- onStateChanged: " + collectionState);
if (collectionState == null) { // collection has been deleted - don't wait
if (countDownLatch != null) {
for (int i = 0; i < replicaIds.size() + solrCoreNames.size(); i++) {
countDownLatch.countDown();
}
}
replicaIds.clear();
solrCoreNames.clear();
return true;
}
for (Slice slice : collectionState.getSlices()) {
for (Replica replica : slice.getReplicas()) {
if (replicaIds.contains(replica.getName())) {
if (replica.isActive(liveNodes)) {
activeReplicas.add(replica);
replicaIds.remove(replica.getName());
if (countDownLatch != null) {
countDownLatch.countDown();
}
}
} else if (solrCoreNames.contains(replica.getStr(ZkStateReader.CORE_NAME_PROP))) {
if (replica.isActive(liveNodes)) {
activeReplicas.add(replica);
solrCoreNames.remove(replica.getStr(ZkStateReader.CORE_NAME_PROP));
if (countDownLatch != null) {
countDownLatch.countDown();
}
}
}
}
}
if (replicaIds.isEmpty() && solrCoreNames.isEmpty()) {
return true;
} else {
return false;
}
}
}

View File

@ -25,6 +25,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
@ -54,6 +56,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -77,8 +80,10 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
String shard = message.getStr(SHARD_ID_PROP);
String coreName = message.getStr(CoreAdminParams.NAME);
String coreNodeName = message.getStr(CoreAdminParams.CORE_NODE_NAME);
int timeout = message.getInt("timeout", 10 * 60); // 10 minutes
Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT));
boolean parallel = message.getBool("parallel", false);
boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
if (StringUtils.isBlank(coreName)) {
coreName = message.getStr(CoreAdminParams.PROPERTY_PREFIX + CoreAdminParams.NAME);
}
@ -222,8 +227,22 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
if (onComplete != null) onComplete.run();
};
if (!parallel) {
runnable.run();
if (!parallel || waitForFinalState) {
if (waitForFinalState) {
CountDownLatch countDownLatch = new CountDownLatch(1);
ActiveReplicaWatcher watcher = new ActiveReplicaWatcher(collection, null, Collections.singletonList(coreName), countDownLatch);
try {
zkStateReader.registerCollectionStateWatcher(collection, watcher);
runnable.run();
if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting " + timeout + " seconds for replica to become active.");
}
} finally {
zkStateReader.removeCollectionStateWatcher(collection, watcher);
}
} else {
runnable.run();
}
} else {
ocmh.tpe.submit(runnable);
}
@ -236,4 +255,5 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
ZkStateReader.NODE_NAME_PROP, node
);
}
}

View File

@ -46,6 +46,7 @@ import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
@ -73,6 +74,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_P
import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.common.util.StrUtils.formatString;
@ -89,6 +91,7 @@ public class CreateCollectionCmd implements Cmd {
@Override
public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
final String collectionName = message.getStr(NAME);
final boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
log.info("Create collection {}", collectionName);
if (clusterState.hasCollection(collectionName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName);
@ -242,7 +245,8 @@ public class CreateCollectionCmd implements Cmd {
ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
ZkStateReader.BASE_URL_PROP, baseUrl,
ZkStateReader.REPLICA_TYPE, replicaPosition.type.name());
ZkStateReader.REPLICA_TYPE, replicaPosition.type.name(),
CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
}
@ -317,6 +321,7 @@ public class CreateCollectionCmd implements Cmd {
PolicyHelper.clearFlagAndDecref(ocmh.policySessionRef);
}
}
String getConfigName(String coll, ZkNodeProps message) throws KeeperException, InterruptedException {
String configName = message.getStr(COLL_CONF);

View File

@ -39,6 +39,7 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
@ -68,6 +69,7 @@ public class CreateShardCmd implements Cmd {
public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
String collectionName = message.getStr(COLLECTION_PROP);
String sliceName = message.getStr(SHARD_ID_PROP);
boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
log.info("Create shard invoked: {}", message);
if (collectionName == null || sliceName == null)
@ -134,7 +136,8 @@ public class CreateShardCmd implements Cmd {
SHARD_ID_PROP, sliceName,
ZkStateReader.REPLICA_TYPE, position.type.name(),
CoreAdminParams.NODE, nodeName,
CoreAdminParams.NAME, coreName);
CoreAdminParams.NAME, coreName,
CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
Map<String, Object> propertyParams = new HashMap<>();
ocmh.addPropertyParams(message, propertyParams);
addReplicasProps = addReplicasProps.plus(propertyParams);

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
/**
* We use this watcher to wait for any eligible replica in a shard to become active so that it can become a leader.
*/
public class LeaderRecoveryWatcher implements CollectionStateWatcher {
String collectionId;
String shardId;
String replicaId;
String targetCore;
CountDownLatch countDownLatch;
/**
* Watch for recovery of a replica
*
* @param collectionId collection name
* @param shardId shard id
* @param replicaId source replica name (coreNodeName)
* @param targetCore specific target core name - if null then any active replica will do
* @param countDownLatch countdown when recovered
*/
LeaderRecoveryWatcher(String collectionId, String shardId, String replicaId, String targetCore, CountDownLatch countDownLatch) {
this.collectionId = collectionId;
this.shardId = shardId;
this.replicaId = replicaId;
this.targetCore = targetCore;
this.countDownLatch = countDownLatch;
}
@Override
public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
if (collectionState == null) { // collection has been deleted - don't wait
countDownLatch.countDown();
return true;
}
Slice slice = collectionState.getSlice(shardId);
if (slice == null) { // shard has been removed - don't wait
countDownLatch.countDown();
return true;
}
for (Replica replica : slice.getReplicas()) {
// check if another replica exists - doesn't have to be the one we're moving
// as long as it's active and can become a leader, in which case we don't have to wait
// for recovery of specifically the one that we've just added
if (!replica.getName().equals(replicaId)) {
if (replica.getType().equals(Replica.Type.PULL)) { // not eligible for leader election
continue;
}
// check its state
String coreName = replica.getStr(ZkStateReader.CORE_NAME_PROP);
if (targetCore != null && !targetCore.equals(coreName)) {
continue;
}
if (replica.isActive(liveNodes)) { // recovered - stop waiting
countDownLatch.countDown();
return true;
}
}
}
// set the watch again to wait for the new replica to recover
return false;
}
}

View File

@ -45,6 +45,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
public class MoveReplicaCmd implements Cmd{
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -65,6 +66,7 @@ public class MoveReplicaCmd implements Cmd{
ocmh.checkRequired(message, COLLECTION_PROP, "targetNode");
String collection = message.getStr(COLLECTION_PROP);
String targetNode = message.getStr("targetNode");
boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
int timeout = message.getInt("timeout", 10 * 60); // 10 minutes
String async = message.getStr(ASYNC);
@ -111,14 +113,14 @@ public class MoveReplicaCmd implements Cmd{
assert slice != null;
Object dataDir = replica.get("dataDir");
if (dataDir != null && dataDir.toString().startsWith("hdfs:/")) {
moveHdfsReplica(clusterState, results, dataDir.toString(), targetNode, async, coll, replica, slice, timeout);
moveHdfsReplica(clusterState, results, dataDir.toString(), targetNode, async, coll, replica, slice, timeout, waitForFinalState);
} else {
moveNormalReplica(clusterState, results, targetNode, async, coll, replica, slice, timeout);
moveNormalReplica(clusterState, results, targetNode, async, coll, replica, slice, timeout, waitForFinalState);
}
}
private void moveHdfsReplica(ClusterState clusterState, NamedList results, String dataDir, String targetNode, String async,
DocCollection coll, Replica replica, Slice slice, int timeout) throws Exception {
DocCollection coll, Replica replica, Slice slice, int timeout, boolean waitForFinalState) throws Exception {
String skipCreateReplicaInClusterState = "true";
if (clusterState.getLiveNodes().contains(replica.getNodeName())) {
skipCreateReplicaInClusterState = "false";
@ -183,7 +185,7 @@ public class MoveReplicaCmd implements Cmd{
}
private void moveNormalReplica(ClusterState clusterState, NamedList results, String targetNode, String async,
DocCollection coll, Replica replica, Slice slice, int timeout) throws Exception {
DocCollection coll, Replica replica, Slice slice, int timeout, boolean waitForFinalState) throws Exception {
String newCoreName = Assign.buildCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(), coll, slice.getName(), replica.getType());
ZkNodeProps addReplicasProps = new ZkNodeProps(
COLLECTION_PROP, coll.getName(),
@ -193,10 +195,9 @@ public class MoveReplicaCmd implements Cmd{
if(async!=null) addReplicasProps.getProperties().put(ASYNC, async);
NamedList addResult = new NamedList();
CountDownLatch countDownLatch = new CountDownLatch(1);
ReplaceNodeCmd.RecoveryWatcher watcher = null;
if (replica.equals(slice.getLeader())) {
watcher = new ReplaceNodeCmd.RecoveryWatcher(coll.getName(), slice.getName(),
replica.getName(), null, countDownLatch);
ActiveReplicaWatcher watcher = null;
if (replica.equals(slice.getLeader()) || waitForFinalState) {
watcher = new ActiveReplicaWatcher(coll.getName(), Collections.singletonList(replica.getName()), null, countDownLatch);
ocmh.zkStateReader.registerCollectionStateWatcher(coll.getName(), watcher);
}
ocmh.addReplica(clusterState, addReplicasProps, addResult, null);
@ -221,7 +222,7 @@ public class MoveReplicaCmd implements Cmd{
results.add("failure", errorString);
return;
} else {
log.debug("Replica " + watcher.getRecoveredReplica() + " is active - deleting the source...");
log.debug("Replica " + watcher.getActiveReplicas() + " is active - deleting the source...");
}
} finally {
ocmh.zkStateReader.removeCollectionStateWatcher(coll.getName(), watcher);

View File

@ -20,11 +20,11 @@ package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -38,6 +38,7 @@ import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.NamedList;
import org.apache.zookeeper.KeeperException;
@ -62,6 +63,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
ZkStateReader zkStateReader = ocmh.zkStateReader;
String source = message.getStr(CollectionParams.SOURCE_NODE, message.getStr("source"));
String target = message.getStr(CollectionParams.TARGET_NODE, message.getStr("target"));
boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
if (source == null || target == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "sourceNode and targetNode are required params" );
}
@ -80,14 +82,15 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
// how many leaders are we moving? for these replicas we have to make sure that either:
// * another existing replica can become a leader, or
// * we wait until the newly created replica completes recovery (and can become the new leader)
// If waitForFinalState=true we wait for all replicas
int numLeaders = 0;
for (ZkNodeProps props : sourceReplicas) {
if (props.getBool(ZkStateReader.LEADER_PROP, false)) {
if (props.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) {
numLeaders++;
}
}
// map of collectionName_coreNodeName to watchers
Map<String, RecoveryWatcher> watchers = new HashMap<>();
Map<String, CollectionStateWatcher> watchers = new HashMap<>();
List<ZkNodeProps> createdReplicas = new ArrayList<>();
AtomicBoolean anyOneFailed = new AtomicBoolean(false);
@ -122,15 +125,24 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
if (addedReplica != null) {
createdReplicas.add(addedReplica);
if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false)) {
if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) {
String shardName = sourceReplica.getStr(SHARD_ID_PROP);
String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
String collectionName = sourceReplica.getStr(COLLECTION_PROP);
String key = collectionName + "_" + replicaName;
RecoveryWatcher watcher = new RecoveryWatcher(collectionName, shardName, replicaName,
addedReplica.getStr(ZkStateReader.CORE_NAME_PROP), replicasToRecover);
CollectionStateWatcher watcher;
if (waitForFinalState) {
watcher = new ActiveReplicaWatcher(collectionName, null,
Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)), replicasToRecover);
} else {
watcher = new LeaderRecoveryWatcher(collectionName, shardName, replicaName,
addedReplica.getStr(ZkStateReader.CORE_NAME_PROP), replicasToRecover);
}
watchers.put(key, watcher);
log.debug("--- adding " + key + ", " + watcher);
zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
} else {
log.debug("--- not waiting for " + addedReplica);
}
}
}
@ -152,8 +164,8 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
log.debug("Finished waiting for leader replicas to recover");
}
// remove the watchers, we're done either way
for (RecoveryWatcher watcher : watchers.values()) {
zkStateReader.removeCollectionStateWatcher(watcher.collectionId, watcher);
for (Map.Entry<String, CollectionStateWatcher> e : watchers.entrySet()) {
zkStateReader.removeCollectionStateWatcher(e.getKey(), e.getValue());
}
if (anyOneFailed.get()) {
log.info("Failed to create some replicas. Cleaning up all replicas on target node");
@ -211,68 +223,4 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
return sourceReplicas;
}
// we use this watcher to wait for replicas to recover
static class RecoveryWatcher implements CollectionStateWatcher {
String collectionId;
String shardId;
String replicaId;
String targetCore;
CountDownLatch countDownLatch;
Replica recovered;
/**
* Watch for recovery of a replica
* @param collectionId collection name
* @param shardId shard id
* @param replicaId source replica name (coreNodeName)
* @param targetCore specific target core name - if null then any active replica will do
* @param countDownLatch countdown when recovered
*/
RecoveryWatcher(String collectionId, String shardId, String replicaId, String targetCore, CountDownLatch countDownLatch) {
this.collectionId = collectionId;
this.shardId = shardId;
this.replicaId = replicaId;
this.targetCore = targetCore;
this.countDownLatch = countDownLatch;
}
@Override
public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
if (collectionState == null) { // collection has been deleted - don't wait
countDownLatch.countDown();
return true;
}
Slice slice = collectionState.getSlice(shardId);
if (slice == null) { // shard has been removed - don't wait
countDownLatch.countDown();
return true;
}
for (Replica replica : slice.getReplicas()) {
// check if another replica exists - doesn't have to be the one we're moving
// as long as it's active and can become a leader, in which case we don't have to wait
// for recovery of specifically the one that we've just added
if (!replica.getName().equals(replicaId)) {
if (replica.getType().equals(Replica.Type.PULL)) { // not eligible for leader election
continue;
}
// check its state
String coreName = replica.getStr(ZkStateReader.CORE_NAME_PROP);
if (targetCore != null && !targetCore.equals(coreName)) {
continue;
}
if (replica.isActive(liveNodes)) { // recovered - stop waiting
recovered = replica;
countDownLatch.countDown();
return true;
}
}
}
// set the watch again to wait for the new replica to recover
return false;
}
public Replica getRecoveredReplica() {
return recovered;
}
}
}

View File

@ -43,6 +43,7 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
@ -80,6 +81,7 @@ public class SplitShardCmd implements Cmd {
public boolean split(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
String collectionName = message.getStr("collection");
String slice = message.getStr(ZkStateReader.SHARD_ID_PROP);
boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
log.info("Split shard invoked");
ZkStateReader zkStateReader = ocmh.zkStateReader;
@ -282,6 +284,7 @@ public class SplitShardCmd implements Cmd {
propMap.put(SHARD_ID_PROP, subSlice);
propMap.put("node", nodeName);
propMap.put(CoreAdminParams.NAME, subShardName);
propMap.put(CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
// copy over property params:
for (String key : message.keySet()) {
if (key.startsWith(COLL_PROP_PREFIX)) {
@ -406,7 +409,8 @@ public class SplitShardCmd implements Cmd {
ZkStateReader.CORE_NAME_PROP, shardName,
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(subShardNodeName),
ZkStateReader.NODE_NAME_PROP, subShardNodeName);
ZkStateReader.NODE_NAME_PROP, subShardNodeName,
CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
HashMap<String, Object> propMap = new HashMap<>();
@ -428,6 +432,8 @@ public class SplitShardCmd implements Cmd {
// special flag param to instruct addReplica not to create the replica in cluster state again
propMap.put(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, "true");
propMap.put(CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
replicas.add(propMap);
}

View File

@ -130,6 +130,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.common.params.CommonParams.VALUE_LONG;
import static org.apache.solr.common.params.CoreAdminParams.DATA_DIR;
@ -414,7 +415,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
PULL_REPLICAS,
TLOG_REPLICAS,
NRT_REPLICAS,
POLICY);
POLICY,
WAIT_FOR_FINAL_STATE);
if (props.get(STATE_FORMAT) == null) {
props.put(STATE_FORMAT, "2");
@ -505,7 +507,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
COLLECTION_PROP,
SHARD_ID_PROP,
"split.key",
CoreAdminParams.RANGES);
CoreAdminParams.RANGES,
WAIT_FOR_FINAL_STATE);
return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX);
}),
DELETESHARD_OP(DELETESHARD, (req, rsp, h) -> {
@ -532,7 +535,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
throw new SolrException(ErrorCode.BAD_REQUEST, "shards can be added only to 'implicit' collections");
req.getParams().getAll(map,
REPLICATION_FACTOR,
CREATE_NODE_SET);
CREATE_NODE_SET,
WAIT_FOR_FINAL_STATE);
return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX);
}),
DELETEREPLICA_OP(DELETEREPLICA, (req, rsp, h) -> {
@ -644,7 +648,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
INSTANCE_DIR,
DATA_DIR,
ULOG_DIR,
REPLICA_TYPE);
REPLICA_TYPE,
WAIT_FOR_FINAL_STATE);
return copyPropertiesWithPrefix(req.getParams(), props, COLL_PROP_PREFIX);
}),
OVERSEERSTATUS_OP(OVERSEERSTATUS, (req, rsp, h) -> (Map) new LinkedHashMap<>()),
@ -901,7 +906,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
if (targetNode == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, CollectionParams.TARGET_NODE + " is a require parameter");
}
return params.getAll(null, "source", "target", CollectionParams.SOURCE_NODE, CollectionParams.TARGET_NODE);
return params.getAll(null, "source", "target", WAIT_FOR_FINAL_STATE, CollectionParams.SOURCE_NODE, CollectionParams.TARGET_NODE);
}),
MOVEREPLICA_OP(MOVEREPLICA, (req, rsp, h) -> {
Map<String, Object> map = req.getParams().required().getAll(null,
@ -911,6 +916,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
CollectionParams.FROM_NODE,
CollectionParams.SOURCE_NODE,
CollectionParams.TARGET_NODE,
WAIT_FOR_FINAL_STATE,
"replica",
"shard");
}),

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.util.LogLevel;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
@LogLevel("org.apache.solr.cloud=DEBUG")
public class AddReplicaTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(4)
.addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.configure();
}
@Test
public void test() throws Exception {
cluster.waitForAllNodes(5000);
String collection = "addreplicatest_coll";
CloudSolrClient cloudClient = cluster.getSolrClient();
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collection, "conf1", 2, 1);
create.setMaxShardsPerNode(2);
cloudClient.request(create);
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
DocCollection coll = clusterState.getCollection(collection);
String sliceName = coll.getSlices().iterator().next().getName();
Collection<Replica> replicas = coll.getSlice(sliceName).getReplicas();
CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest.addReplicaToShard(collection, sliceName);
addReplica.processAsync("000", cloudClient);
CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus("000");
CollectionAdminRequest.RequestStatusResponse rsp = requestStatus.process(cloudClient);
assertTrue(rsp.getRequestStatus() != RequestStatusState.COMPLETED);
// wait for async request success
boolean success = false;
for (int i = 0; i < 200; i++) {
rsp = requestStatus.process(cloudClient);
if (rsp.getRequestStatus() == RequestStatusState.COMPLETED) {
success = true;
break;
}
assertFalse(rsp.getRequestStatus() == RequestStatusState.FAILED);
Thread.sleep(500);
}
assertTrue(success);
Collection<Replica> replicas2 = cloudClient.getZkStateReader().getClusterState().getCollection(collection).getSlice(sliceName).getReplicas();
replicas2.removeAll(replicas);
assertEquals(1, replicas2.size());
Replica r = replicas2.iterator().next();
assertTrue(r.toString(), r.getState() != Replica.State.ACTIVE);
// use waitForFinalState
addReplica.setWaitForFinalState(true);
addReplica.processAsync("001", cloudClient);
requestStatus = CollectionAdminRequest.requestStatus("001");
rsp = requestStatus.process(cloudClient);
assertTrue(rsp.getRequestStatus() != RequestStatusState.COMPLETED);
// wait for async request success
success = false;
for (int i = 0; i < 200; i++) {
rsp = requestStatus.process(cloudClient);
if (rsp.getRequestStatus() == RequestStatusState.COMPLETED) {
success = true;
break;
}
assertFalse(rsp.toString(), rsp.getRequestStatus() == RequestStatusState.FAILED);
Thread.sleep(500);
}
assertTrue(success);
// let the client watch fire
Thread.sleep(1000);
clusterState = cloudClient.getZkStateReader().getClusterState();
coll = clusterState.getCollection(collection);
Collection<Replica> replicas3 = coll.getSlice(sliceName).getReplicas();
replicas3.removeAll(replicas);
String replica2 = replicas2.iterator().next().getName();
assertEquals(2, replicas3.size());
for (Replica replica : replicas3) {
if (replica.getName().equals(replica2)) {
continue; // may be still recovering
}
assertTrue(coll.toString() + "\n" + replica.toString(), replica.getState() == Replica.State.ACTIVE);
}
}
}

View File

@ -273,12 +273,6 @@ public class MoveReplicaTest extends SolrCloudTestCase {
assertFalse(success);
Set<CollectionStateWatcher> newWatchers = new HashSet<>(accessor.getStateWatchers(coll));
for (Iterator<CollectionStateWatcher> it = newWatchers.iterator(); it.hasNext(); ) {
CollectionStateWatcher watcher = it.next();
if (watcher instanceof ReplaceNodeCmd.RecoveryWatcher) {
it.remove();
}
}
assertEquals(watchers, newWatchers);
}

View File

@ -22,6 +22,8 @@ import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
@ -82,6 +84,8 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
);
create.setCreateNodeSet(StrUtils.join(l, ',')).setMaxShardsPerNode(3);
cloudClient.request(create);
DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(coll);
log.debug("### Before decommission: " + collection);
log.info("excluded_node : {} ", emptyNode);
createReplaceNodeRequest(node2bdecommissioned, emptyNode, null).processAsync("000", cloudClient);
CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus("000");
@ -101,8 +105,20 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
assertTrue(status.getCoreStatus().size() == 0);
}
//let's do it back
createReplaceNodeRequest(emptyNode, node2bdecommissioned, Boolean.TRUE).processAsync("001", cloudClient);
Thread.sleep(5000);
collection = cloudClient.getZkStateReader().getClusterState().getCollection(coll);
log.debug("### After decommission: " + collection);
// check what are replica states on the decommissioned node
List<Replica> replicas = collection.getReplicas(node2bdecommissioned);
if (replicas == null) {
replicas = Collections.emptyList();
}
log.debug("### Existing replicas on decommissioned node: " + replicas);
//let's do it back - this time wait for recoveries
CollectionAdminRequest.AsyncCollectionAdminRequest replaceNodeRequest = createReplaceNodeRequest(emptyNode, node2bdecommissioned, Boolean.TRUE);
replaceNodeRequest.setWaitForFinalState(true);
replaceNodeRequest.processAsync("001", cloudClient);
requestStatus = CollectionAdminRequest.requestStatus("001");
for (int i = 0; i < 200; i++) {
@ -119,14 +135,35 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
CoreAdminResponse status = CoreAdminRequest.getStatus(null, coreclient);
assertEquals("Expecting no cores but found some: " + status.getCoreStatus(), 0, status.getCoreStatus().size());
}
DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(coll);
collection = cloudClient.getZkStateReader().getClusterState().getCollection(coll);
assertEquals(create.getNumShards().intValue(), collection.getSlices().size());
for (Slice s:collection.getSlices()) {
assertEquals(create.getNumNrtReplicas().intValue(), s.getReplicas(EnumSet.of(Replica.Type.NRT)).size());
assertEquals(create.getNumTlogReplicas().intValue(), s.getReplicas(EnumSet.of(Replica.Type.TLOG)).size());
assertEquals(create.getNumPullReplicas().intValue(), s.getReplicas(EnumSet.of(Replica.Type.PULL)).size());
}
// make sure all newly created replicas on node are active
List<Replica> newReplicas = collection.getReplicas(node2bdecommissioned);
replicas.forEach(r -> {
for (Iterator<Replica> it = newReplicas.iterator(); it.hasNext(); ) {
Replica nr = it.next();
if (nr.getName().equals(r.getName())) {
it.remove();
}
}
});
assertFalse(newReplicas.isEmpty());
for (Replica r : newReplicas) {
assertEquals(r.toString(), Replica.State.ACTIVE, r.getState());
}
// make sure all replicas on emptyNode are not active
replicas = collection.getReplicas(emptyNode);
if (replicas != null) {
for (Replica r : replicas) {
assertFalse(r.toString(), Replica.State.ACTIVE.equals(r.getState()));
}
}
}
private CollectionAdminRequest.AsyncCollectionAdminRequest createReplaceNodeRequest(String sourceNode, String targetNode, Boolean parallel) {

View File

@ -120,6 +120,9 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
*/
public abstract static class AsyncCollectionAdminRequest extends CollectionAdminRequest<CollectionAdminResponse> {
protected String asyncId = null;
protected boolean waitForFinalState = false;
public AsyncCollectionAdminRequest(CollectionAction action) {
super(action);
}
@ -133,12 +136,14 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
return UUID.randomUUID().toString();
}
protected String asyncId = null;
public String getAsyncId() {
return asyncId;
}
public void setWaitForFinalState(boolean waitForFinalState) {
this.waitForFinalState = waitForFinalState;
}
public void setAsyncId(String asyncId) {
this.asyncId = asyncId;
}
@ -200,6 +205,9 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
if (asyncId != null) {
params.set(CommonAdminParams.ASYNC, asyncId);
}
if (waitForFinalState) {
params.set(CommonAdminParams.WAIT_FOR_FINAL_STATE, waitForFinalState);
}
return params;
}
}

View File

@ -20,6 +20,6 @@ public interface CommonAdminParams
{
/** async or not? **/
public static final String ASYNC = "async";
String ASYNC = "async";
String WAIT_FOR_FINAL_STATE = "waitForFinalState";
}

View File

@ -115,6 +115,11 @@
"async": {
"type": "string",
"description": "Defines a request ID that can be used to track this action after it's submitted. The action will be processed asynchronously."
},
"waitForFinalState": {
"type": "boolean",
"description": "If true then request will complete only when all affected replicas become active.",
"default": false
}
},
"required": [

View File

@ -37,6 +37,11 @@
"async": {
"type": "string",
"description": "Defines a request ID that can be used to track this action after it's submitted. The action will be processed asynchronously when this is defined. This command can be long-running, so running it asynchronously is recommended."
},
"waitForFinalState": {
"type": "boolean",
"description": "If true then request will complete only when all affected replicas become active.",
"default": false
}
}
},
@ -63,6 +68,11 @@
"async": {
"type": "string",
"description": "Defines a request ID that can be used to track this action after it's submitted. The action will be processed asynchronously when this is defined."
},
"waitForFinalState": {
"type": "boolean",
"description": "If true then request will complete only when all affected replicas become active.",
"default": false
}
},
"required":["shard"]
@ -106,6 +116,11 @@
"type": "string",
"enum":["NRT", "TLOG", "PULL"],
"description": "The type of replica to add. NRT (default), TLOG or PULL"
},
"waitForFinalState": {
"type": "boolean",
"description": "If true then request will complete only when all affected replicas become active.",
"default": false
}
},
"required":["shard"]