SOLR-12291: fixing premature completion of async tasks

* extract async tracking methods from OverseerCollectionMessageHandler into the separate class
* replacing hashmap to named list to avoid entry loss
This commit is contained in:
Mikhail Khludnev 2019-04-28 11:19:15 +03:00
parent 44efae15a9
commit 39ff3052c3
15 changed files with 418 additions and 314 deletions

View File

@ -198,6 +198,9 @@ Bug Fixes
* SOLR-5970: Return correct status upon collection creation failure (Abraham Elmahrek, Ishan Chattopadhyaya, * SOLR-5970: Return correct status upon collection creation failure (Abraham Elmahrek, Ishan Chattopadhyaya,
Jason Gerlowski, Kesharee Nandan Vishwakarma) Jason Gerlowski, Kesharee Nandan Vishwakarma)
* SOLR-12291: prematurely reporting not yet finished async Collections API call as completed
when collection's replicas are collocated at least at one node (Varun Thacker, Mikhail Khludnev)
Improvements Improvements
---------------------- ----------------------

View File

@ -18,13 +18,27 @@
package org.apache.solr.cloud.api.collections; package org.apache.solr.cloud.api.collections;
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET;
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonAdminParams.TIMEOUT;
import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
import java.io.IOException; import java.io.IOException;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.EnumMap; import java.util.EnumMap;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
@ -37,6 +51,7 @@ import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper; import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.cloud.ActiveReplicaWatcher; import org.apache.solr.cloud.ActiveReplicaWatcher;
import org.apache.solr.cloud.Overseer; import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.common.SolrCloseableLatch; import org.apache.solr.common.SolrCloseableLatch;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
@ -56,21 +71,6 @@ import org.apache.solr.handler.component.ShardHandler;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET;
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF;
import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonAdminParams.TIMEOUT;
import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -157,17 +157,16 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(); ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
ZkStateReader zkStateReader = ocmh.zkStateReader; ZkStateReader zkStateReader = ocmh.zkStateReader;
// For tracking async calls.
Map<String,String> requestMap = new HashMap<>();
final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
for (CreateReplica createReplica : createReplicas) { for (CreateReplica createReplica : createReplicas) {
assert createReplica.coreName != null; assert createReplica.coreName != null;
ModifiableSolrParams params = getReplicaParams(clusterState, message, results, collectionName, coll, skipCreateReplicaInClusterState, asyncId, shardHandler, createReplica); ModifiableSolrParams params = getReplicaParams(clusterState, message, results, collectionName, coll, skipCreateReplicaInClusterState, asyncId, shardHandler, createReplica);
ocmh.sendShardRequest(createReplica.node, params, shardHandler, asyncId, requestMap); shardRequestTracker.sendShardRequest(createReplica.node, params, shardHandler);
} }
Runnable runnable = () -> { Runnable runnable = () -> {
ocmh.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap); shardRequestTracker.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica");
for (CreateReplica replica : createReplicas) { for (CreateReplica replica : createReplicas) {
ocmh.waitForCoreNodeName(collectionName, replica.node, replica.coreName); ocmh.waitForCoreNodeName(collectionName, replica.node, replica.coreName);
} }

View File

@ -16,17 +16,21 @@
*/ */
package org.apache.solr.cloud.api.collections; package org.apache.solr.cloud.api.collections;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonParams.NAME;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.net.URI; import java.net.URI;
import java.time.Instant; import java.time.Instant;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Properties; import java.util.Properties;
import org.apache.lucene.util.Version; import org.apache.lucene.util.Version;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
@ -51,11 +55,6 @@ import org.apache.solr.handler.component.ShardHandler;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonParams.NAME;
public class BackupCmd implements OverseerCollectionMessageHandler.Cmd { public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -162,7 +161,6 @@ public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
String asyncId = request.getStr(ASYNC); String asyncId = request.getStr(ASYNC);
String repoName = request.getStr(CoreAdminParams.BACKUP_REPOSITORY); String repoName = request.getStr(CoreAdminParams.BACKUP_REPOSITORY);
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient()); ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
Map<String, String> requestMap = new HashMap<>();
String commitName = request.getStr(CoreAdminParams.COMMIT_NAME); String commitName = request.getStr(CoreAdminParams.COMMIT_NAME);
Optional<CollectionSnapshotMetaData> snapshotMeta = Optional.empty(); Optional<CollectionSnapshotMetaData> snapshotMeta = Optional.empty();
@ -187,6 +185,7 @@ public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
shardsToConsider = snapshotMeta.get().getShards(); shardsToConsider = snapshotMeta.get().getShards();
} }
final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getActiveSlices()) { for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getActiveSlices()) {
Replica replica = null; Replica replica = null;
@ -217,11 +216,11 @@ public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
params.set(CoreAdminParams.COMMIT_NAME, snapshotMeta.get().getName()); params.set(CoreAdminParams.COMMIT_NAME, snapshotMeta.get().getName());
} }
ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap); shardRequestTracker.sendShardRequest(replica.getNodeName(), params, shardHandler);
log.debug("Sent backup request to core={} for backupName={}", coreName, backupName); log.debug("Sent backup request to core={} for backupName={}", coreName, backupName);
} }
log.debug("Sent backup requests to all shard leaders for backupName={}", backupName); log.debug("Sent backup requests to all shard leaders for backupName={}", backupName);
ocmh.processResponses(results, shardHandler, true, "Could not backup all shards", asyncId, requestMap); shardRequestTracker.processResponses(results, shardHandler, true, "Could not backup all shards");
} }
} }

View File

@ -42,6 +42,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData; import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
import org.apache.solr.cloud.Overseer; import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.ZkController; import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.cloud.overseer.ClusterStateMutator; import org.apache.solr.cloud.overseer.ClusterStateMutator;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrException.ErrorCode;
@ -199,10 +200,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
return; return;
} }
// For tracking async calls. final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(async);
Map<String, String> requestMap = new HashMap<>();
log.debug(formatString("Creating SolrCores for new collection {0}, shardNames {1} , message : {2}", log.debug(formatString("Creating SolrCores for new collection {0}, shardNames {1} , message : {2}",
collectionName, shardNames, message)); collectionName, shardNames, message));
Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>(); Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
@ -264,7 +262,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
if (async != null) { if (async != null) {
String coreAdminAsyncId = async + Math.abs(System.nanoTime()); String coreAdminAsyncId = async + Math.abs(System.nanoTime());
params.add(ASYNC, coreAdminAsyncId); params.add(ASYNC, coreAdminAsyncId);
requestMap.put(nodeName, coreAdminAsyncId); shardRequestTracker.track(nodeName, coreAdminAsyncId);
} }
ocmh.addPropertyParams(message, params); ocmh.addPropertyParams(message, params);
@ -293,7 +291,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
} }
} }
ocmh.processResponses(results, shardHandler, false, null, async, requestMap, Collections.emptySet()); shardRequestTracker.processResponses(results, shardHandler, false, null, Collections.emptySet());
boolean failure = results.get("failure") != null && ((SimpleOrderedMap)results.get("failure")).size() > 0; boolean failure = results.get("failure") != null && ((SimpleOrderedMap)results.get("failure")).size() > 0;
if (failure) { if (failure) {
// Let's cleanup as we hit an exception // Let's cleanup as we hit an exception

View File

@ -30,6 +30,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
@ -83,11 +84,11 @@ public class CreateSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
SolrSnapshotManager.createCollectionLevelSnapshot(zkClient, collectionName, new CollectionSnapshotMetaData(commitName)); SolrSnapshotManager.createCollectionLevelSnapshot(zkClient, collectionName, new CollectionSnapshotMetaData(commitName));
log.info("Created a ZK path to store snapshot information for collection={} with commitName={}", collectionName, commitName); log.info("Created a ZK path to store snapshot information for collection={} with commitName={}", collectionName, commitName);
Map<String, String> requestMap = new HashMap<>();
NamedList shardRequestResults = new NamedList(); NamedList shardRequestResults = new NamedList();
Map<String, Slice> shardByCoreName = new HashMap<>(); Map<String, Slice> shardByCoreName = new HashMap<>();
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient()); ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) { for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
for (Replica replica : slice.getReplicas()) { for (Replica replica : slice.getReplicas()) {
if (replica.getState() != State.ACTIVE) { if (replica.getState() != State.ACTIVE) {
@ -103,7 +104,7 @@ public class CreateSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
params.set(CORE_NAME_PROP, coreName); params.set(CORE_NAME_PROP, coreName);
params.set(CoreAdminParams.COMMIT_NAME, commitName); params.set(CoreAdminParams.COMMIT_NAME, commitName);
ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap); shardRequestTracker.sendShardRequest(replica.getNodeName(), params, shardHandler);
log.debug("Sent createsnapshot request to core={} with commitName={}", coreName, commitName); log.debug("Sent createsnapshot request to core={} with commitName={}", coreName, commitName);
shardByCoreName.put(coreName, slice); shardByCoreName.put(coreName, slice);
@ -115,7 +116,7 @@ public class CreateSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
// This is to take care of the situation where e.g. entire shard is unavailable. // This is to take care of the situation where e.g. entire shard is unavailable.
Set<String> failedShards = new HashSet<>(); Set<String> failedShards = new HashSet<>();
ocmh.processResponses(shardRequestResults, shardHandler, false, null, asyncId, requestMap); shardRequestTracker.processResponses(shardRequestResults, shardHandler, false, null);
NamedList success = (NamedList) shardRequestResults.get("success"); NamedList success = (NamedList) shardRequestResults.get("success");
List<CoreSnapshotMetaData> replicas = new ArrayList<>(); List<CoreSnapshotMetaData> replicas = new ArrayList<>();
if (success != null) { if (success != null) {

View File

@ -18,8 +18,13 @@
package org.apache.solr.cloud.api.collections; package org.apache.solr.cloud.api.collections;
import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonParams.NAME;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -51,12 +56,6 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonParams.NAME;
public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd { public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final OverseerCollectionMessageHandler ocmh; private final OverseerCollectionMessageHandler ocmh;
@ -115,15 +114,11 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
params.set(CoreAdminParams.DELETE_METRICS_HISTORY, deleteHistory); params.set(CoreAdminParams.DELETE_METRICS_HISTORY, deleteHistory);
String asyncId = message.getStr(ASYNC); String asyncId = message.getStr(ASYNC);
Map<String, String> requestMap = null;
if (asyncId != null) {
requestMap = new HashMap<>();
}
Set<String> okayExceptions = new HashSet<>(1); Set<String> okayExceptions = new HashSet<>(1);
okayExceptions.add(NonExistentCoreException.class.getName()); okayExceptions.add(NonExistentCoreException.class.getName());
List<Replica> failedReplicas = ocmh.collectionCmd(message, params, results, null, asyncId, requestMap, okayExceptions); List<Replica> failedReplicas = ocmh.collectionCmd(message, params, results, null, asyncId, okayExceptions);
for (Replica failedReplica : failedReplicas) { for (Replica failedReplica : failedReplicas) {
boolean isSharedFS = failedReplica.getBool(ZkStateReader.SHARED_STORAGE_PROP, false) && failedReplica.get("dataDir") != null; boolean isSharedFS = failedReplica.getBool(ZkStateReader.SHARED_STORAGE_PROP, false) && failedReplica.get("dataDir") != null;
if (isSharedFS) { if (isSharedFS) {

View File

@ -16,6 +16,12 @@
*/ */
package org.apache.solr.cloud.api.collections; package org.apache.solr.cloud.api.collections;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -24,9 +30,9 @@ import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.Cmd; import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.Cmd;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocCollection;
@ -44,12 +50,6 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
public class DeleteReplicaCmd implements Cmd { public class DeleteReplicaCmd implements Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -223,10 +223,6 @@ public class DeleteReplicaCmd implements Cmd {
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient()); ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
String core = replica.getStr(ZkStateReader.CORE_NAME_PROP); String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
String asyncId = message.getStr(ASYNC); String asyncId = message.getStr(ASYNC);
AtomicReference<Map<String, String>> requestMap = new AtomicReference<>(null);
if (asyncId != null) {
requestMap.set(new HashMap<>(1, 1.0f));
}
ModifiableSolrParams params = new ModifiableSolrParams(); ModifiableSolrParams params = new ModifiableSolrParams();
params.add(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.UNLOAD.toString()); params.add(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.UNLOAD.toString());
@ -238,14 +234,15 @@ public class DeleteReplicaCmd implements Cmd {
params.set(CoreAdminParams.DELETE_METRICS_HISTORY, message.getBool(CoreAdminParams.DELETE_METRICS_HISTORY, true)); params.set(CoreAdminParams.DELETE_METRICS_HISTORY, message.getBool(CoreAdminParams.DELETE_METRICS_HISTORY, true));
boolean isLive = ocmh.zkStateReader.getClusterState().getLiveNodes().contains(replica.getNodeName()); boolean isLive = ocmh.zkStateReader.getClusterState().getLiveNodes().contains(replica.getNodeName());
final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
if (isLive) { if (isLive) {
ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap.get()); shardRequestTracker.sendShardRequest(replica.getNodeName(), params, shardHandler);
} }
Callable<Boolean> callable = () -> { Callable<Boolean> callable = () -> {
try { try {
if (isLive) { if (isLive) {
ocmh.processResponses(results, shardHandler, false, null, asyncId, requestMap.get()); shardRequestTracker.processResponses(results, shardHandler, false, null);
//check if the core unload removed the corenode zk entry //check if the core unload removed the corenode zk entry
if (ocmh.waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return Boolean.TRUE; if (ocmh.waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return Boolean.TRUE;

View File

@ -23,21 +23,20 @@ import static org.apache.solr.common.params.CommonParams.NAME;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Replica.State;
import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.Replica.State;
import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction; import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ModifiableSolrParams;
@ -68,7 +67,6 @@ public class DeleteSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
String collectionName = ocmh.zkStateReader.getAliases().resolveSimpleAlias(extCollectionName); String collectionName = ocmh.zkStateReader.getAliases().resolveSimpleAlias(extCollectionName);
String commitName = message.getStr(CoreAdminParams.COMMIT_NAME); String commitName = message.getStr(CoreAdminParams.COMMIT_NAME);
String asyncId = message.getStr(ASYNC); String asyncId = message.getStr(ASYNC);
Map<String, String> requestMap = new HashMap<>();
NamedList shardRequestResults = new NamedList(); NamedList shardRequestResults = new NamedList();
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient()); ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
SolrZkClient zkClient = ocmh.zkStateReader.getZkClient(); SolrZkClient zkClient = ocmh.zkStateReader.getZkClient();
@ -94,6 +92,7 @@ public class DeleteSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
} }
} }
final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
log.info("Existing cores with snapshot for collection={} are {}", collectionName, existingCores); log.info("Existing cores with snapshot for collection={} are {}", collectionName, existingCores);
for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) { for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
for (Replica replica : slice.getReplicas()) { for (Replica replica : slice.getReplicas()) {
@ -114,12 +113,12 @@ public class DeleteSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
params.set(CoreAdminParams.COMMIT_NAME, commitName); params.set(CoreAdminParams.COMMIT_NAME, commitName);
log.info("Sending deletesnapshot request to core={} with commitName={}", coreName, commitName); log.info("Sending deletesnapshot request to core={} with commitName={}", coreName, commitName);
ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap); shardRequestTracker.sendShardRequest(replica.getNodeName(), params, shardHandler);
} }
} }
} }
ocmh.processResponses(shardRequestResults, shardHandler, false, null, asyncId, requestMap); shardRequestTracker.processResponses(shardRequestResults, shardHandler, false, null);
NamedList success = (NamedList) shardRequestResults.get("success"); NamedList success = (NamedList) shardRequestResults.get("success");
List<CoreSnapshotMetaData> replicas = new ArrayList<>(); List<CoreSnapshotMetaData> replicas = new ArrayList<>();
if (success != null) { if (success != null) {

View File

@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.request.CoreAdminRequest; import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.cloud.Overseer; import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
@ -163,8 +164,6 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
log.info("Common hash range between source shard: {} and target shard: {} = " + splitRange, sourceSlice.getName(), targetSlice.getName()); log.info("Common hash range between source shard: {} and target shard: {} = " + splitRange, sourceSlice.getName(), targetSlice.getName());
Replica targetLeader = zkStateReader.getLeaderRetry(targetCollection.getName(), targetSlice.getName(), 10000); Replica targetLeader = zkStateReader.getLeaderRetry(targetCollection.getName(), targetSlice.getName(), 10000);
// For tracking async calls.
Map<String, String> requestMap = new HashMap<>();
log.info("Asking target leader node: " + targetLeader.getNodeName() + " core: " log.info("Asking target leader node: " + targetLeader.getNodeName() + " core: "
+ targetLeader.getStr("core") + " to buffer updates"); + targetLeader.getStr("core") + " to buffer updates");
@ -172,10 +171,12 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTBUFFERUPDATES.toString()); params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTBUFFERUPDATES.toString());
params.set(CoreAdminParams.NAME, targetLeader.getStr("core")); params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap); {
final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to request node to buffer updates", asyncId, requestMap); shardRequestTracker.sendShardRequest(targetLeader.getNodeName(), params, shardHandler);
shardRequestTracker.processResponses(results, shardHandler, true, "MIGRATE failed to request node to buffer updates");
}
ZkNodeProps m = new ZkNodeProps( ZkNodeProps m = new ZkNodeProps(
Overseer.QUEUE_OPERATION, OverseerAction.ADDROUTINGRULE.toLower(), Overseer.QUEUE_OPERATION, OverseerAction.ADDROUTINGRULE.toLower(),
COLLECTION_PROP, sourceCollection.getName(), COLLECTION_PROP, sourceCollection.getName(),
@ -246,11 +247,16 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
cmd.setState(Replica.State.ACTIVE); cmd.setState(Replica.State.ACTIVE);
cmd.setCheckLive(true); cmd.setCheckLive(true);
cmd.setOnlyIfLeader(true); cmd.setOnlyIfLeader(true);
{
final ShardRequestTracker syncRequestTracker = ocmh.syncRequestTracker();
// we don't want this to happen asynchronously // we don't want this to happen asynchronously
ocmh.sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()), shardHandler, null, null); syncRequestTracker.sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()),
shardHandler);
ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection leader" + syncRequestTracker.processResponses(results, shardHandler, true,
" or timed out waiting for it to come up", asyncId, requestMap); "MIGRATE failed to create temp collection leader" +
" or timed out waiting for it to come up");
}
log.info("Asking source leader to split index"); log.info("Asking source leader to split index");
params = new ModifiableSolrParams(); params = new ModifiableSolrParams();
@ -262,9 +268,11 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
String tempNodeName = sourceLeader.getNodeName(); String tempNodeName = sourceLeader.getNodeName();
ocmh.sendShardRequest(tempNodeName, params, shardHandler, asyncId, requestMap); {
ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to invoke SPLIT core admin command", asyncId, requestMap); final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
shardRequestTracker.sendShardRequest(tempNodeName, params, shardHandler);
shardRequestTracker.processResponses(results, shardHandler, true, "MIGRATE failed to invoke SPLIT core admin command");
}
log.info("Creating a replica of temporary collection: {} on the target leader node: {}", log.info("Creating a replica of temporary collection: {} on the target leader node: {}",
tempSourceCollectionName, targetLeader.getNodeName()); tempSourceCollectionName, targetLeader.getNodeName());
String tempCollectionReplica2 = Assign.buildSolrCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(), String tempCollectionReplica2 = Assign.buildSolrCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(),
@ -287,9 +295,11 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
} }
((AddReplicaCmd)ocmh.commandMap.get(ADDREPLICA)).addReplica(clusterState, new ZkNodeProps(props), results, null); ((AddReplicaCmd)ocmh.commandMap.get(ADDREPLICA)).addReplica(clusterState, new ZkNodeProps(props), results, null);
ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create replica of " + {
"temporary collection in target leader node.", asyncId, requestMap); final ShardRequestTracker syncRequestTracker = ocmh.syncRequestTracker();
syncRequestTracker.processResponses(results, shardHandler, true, "MIGRATE failed to create replica of " +
"temporary collection in target leader node.");
}
coreNodeName = ocmh.waitForCoreNodeName(tempSourceCollectionName, coreNodeName = ocmh.waitForCoreNodeName(tempSourceCollectionName,
targetLeader.getNodeName(), tempCollectionReplica2); targetLeader.getNodeName(), tempCollectionReplica2);
// wait for the replicas to be seen as active on temp source leader // wait for the replicas to be seen as active on temp source leader
@ -303,11 +313,13 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
cmd.setOnlyIfLeader(true); cmd.setOnlyIfLeader(true);
params = new ModifiableSolrParams(cmd.getParams()); params = new ModifiableSolrParams(cmd.getParams());
ocmh.sendShardRequest(tempSourceLeader.getNodeName(), params, shardHandler, asyncId, requestMap); {
final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection" + shardRequestTracker.sendShardRequest(tempSourceLeader.getNodeName(), params, shardHandler);
" replica or timed out waiting for them to come up", asyncId, requestMap);
shardRequestTracker.processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection" +
" replica or timed out waiting for them to come up");
}
log.info("Successfully created replica of temp source collection on target leader node"); log.info("Successfully created replica of temp source collection on target leader node");
log.info("Requesting merge of temp source collection replica to target leader"); log.info("Requesting merge of temp source collection replica to target leader");
@ -316,20 +328,24 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
params.set(CoreAdminParams.CORE, targetLeader.getStr("core")); params.set(CoreAdminParams.CORE, targetLeader.getStr("core"));
params.set(CoreAdminParams.SRC_CORE, tempCollectionReplica2); params.set(CoreAdminParams.SRC_CORE, tempCollectionReplica2);
ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap); {
final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
shardRequestTracker.sendShardRequest(targetLeader.getNodeName(), params, shardHandler);
String msg = "MIGRATE failed to merge " + tempCollectionReplica2 + " to " String msg = "MIGRATE failed to merge " + tempCollectionReplica2 + " to "
+ targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName(); + targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName();
ocmh.processResponses(results, shardHandler, true, msg, asyncId, requestMap); shardRequestTracker.processResponses(results, shardHandler, true, msg);
}
log.info("Asking target leader to apply buffered updates"); log.info("Asking target leader to apply buffered updates");
params = new ModifiableSolrParams(); params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString()); params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString());
params.set(CoreAdminParams.NAME, targetLeader.getStr("core")); params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap); {
ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to request node to apply buffered updates", final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
asyncId, requestMap); shardRequestTracker.sendShardRequest(targetLeader.getNodeName(), params, shardHandler);
shardRequestTracker.processResponses(results, shardHandler, true, "MIGRATE failed to request node to apply buffered updates");
}
try { try {
log.info("Deleting temporary collection: " + tempSourceCollectionName); log.info("Deleting temporary collection: " + tempSourceCollectionName);
props = makeMap( props = makeMap(

View File

@ -307,11 +307,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString()); params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
String asyncId = message.getStr(ASYNC); String asyncId = message.getStr(ASYNC);
Map<String, String> requestMap = null; collectionCmd(message, params, results, Replica.State.ACTIVE, asyncId);
if (asyncId != null) {
requestMap = new HashMap<>();
}
collectionCmd(message, params, results, Replica.State.ACTIVE, asyncId, requestMap);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -596,34 +592,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
} }
} }
void sendShardRequest(String nodeName, ModifiableSolrParams params,
ShardHandler shardHandler, String asyncId,
Map<String, String> requestMap) {
sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap, adminPath, zkStateReader);
}
public void sendShardRequest(String nodeName, ModifiableSolrParams params, ShardHandler shardHandler,
String asyncId, Map<String, String> requestMap, String adminPath,
ZkStateReader zkStateReader) {
if (asyncId != null) {
String coreAdminAsyncId = asyncId + Math.abs(System.nanoTime());
params.set(ASYNC, coreAdminAsyncId);
requestMap.put(nodeName, coreAdminAsyncId);
}
ShardRequest sreq = new ShardRequest();
params.set("qt", adminPath);
sreq.purpose = 1;
String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
sreq.shards = new String[]{replica};
sreq.actualShards = sreq.shards;
sreq.nodeName = nodeName;
sreq.params = params;
shardHandler.submit(sreq, replica, sreq.params);
}
void addPropertyParams(ZkNodeProps message, ModifiableSolrParams params) { void addPropertyParams(ZkNodeProps message, ModifiableSolrParams params) {
// Now add the property.key=value pairs // Now add the property.key=value pairs
for (String key : message.keySet()) { for (String key : message.keySet()) {
@ -740,38 +708,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
return ((AddReplicaCmd) commandMap.get(ADDREPLICA)).addReplica(clusterState, message, results, onComplete); return ((AddReplicaCmd) commandMap.get(ADDREPLICA)).addReplica(clusterState, message, results, onComplete);
} }
void processResponses(NamedList results, ShardHandler shardHandler, boolean abortOnError, String msgOnError,
String asyncId, Map<String, String> requestMap) {
processResponses(results, shardHandler, abortOnError, msgOnError, asyncId, requestMap, Collections.emptySet());
}
void processResponses(NamedList results, ShardHandler shardHandler, boolean abortOnError, String msgOnError,
String asyncId, Map<String, String> requestMap, Set<String> okayExceptions) {
//Processes all shard responses
ShardResponse srsp;
do {
srsp = shardHandler.takeCompletedOrError();
if (srsp != null) {
processResponse(results, srsp, okayExceptions);
Throwable exception = srsp.getException();
if (abortOnError && exception != null) {
// drain pending requests
while (srsp != null) {
srsp = shardHandler.takeCompletedOrError();
}
throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError, exception);
}
}
} while (srsp != null);
//If request is async wait for the core admin to complete before returning
if (asyncId != null) {
waitForAsyncCallsToComplete(requestMap, results);
requestMap.clear();
}
}
void validateConfigOrThrowSolrException(String configName) throws IOException, KeeperException, InterruptedException { void validateConfigOrThrowSolrException(String configName) throws IOException, KeeperException, InterruptedException {
boolean isValid = cloudManager.getDistribStateManager().hasData(ZkConfigManager.CONFIGS_ZKNODE + "/" + configName); boolean isValid = cloudManager.getDistribStateManager().hasData(ZkConfigManager.CONFIGS_ZKNODE + "/" + configName);
if(!isValid) { if(!isValid) {
@ -804,8 +740,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
} }
private List<Replica> collectionCmd(ZkNodeProps message, ModifiableSolrParams params, private List<Replica> collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
NamedList results, Replica.State stateMatcher, String asyncId, Map<String, String> requestMap) { NamedList<Object> results, Replica.State stateMatcher, String asyncId) {
return collectionCmd( message, params, results, stateMatcher, asyncId, requestMap, Collections.emptySet()); return collectionCmd( message, params, results, stateMatcher, asyncId, Collections.emptySet());
} }
/** /**
@ -813,47 +749,25 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
* @return List of replicas which is not live for receiving the request * @return List of replicas which is not live for receiving the request
*/ */
List<Replica> collectionCmd(ZkNodeProps message, ModifiableSolrParams params, List<Replica> collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
NamedList results, Replica.State stateMatcher, String asyncId, Map<String, String> requestMap, Set<String> okayExceptions) { NamedList<Object> results, Replica.State stateMatcher, String asyncId, Set<String> okayExceptions) {
log.info("Executing Collection Cmd={}, asyncId={}", params, asyncId); log.info("Executing Collection Cmd={}, asyncId={}", params, asyncId);
String collectionName = message.getStr(NAME); String collectionName = message.getStr(NAME);
@SuppressWarnings("deprecation")
ShardHandler shardHandler = shardHandlerFactory.getShardHandler(overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient()); ShardHandler shardHandler = shardHandlerFactory.getShardHandler(overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
ClusterState clusterState = zkStateReader.getClusterState(); ClusterState clusterState = zkStateReader.getClusterState();
DocCollection coll = clusterState.getCollection(collectionName); DocCollection coll = clusterState.getCollection(collectionName);
List<Replica> notLivesReplicas = new ArrayList<>(); List<Replica> notLivesReplicas = new ArrayList<>();
final ShardRequestTracker shardRequestTracker = new ShardRequestTracker(asyncId);
for (Slice slice : coll.getSlices()) { for (Slice slice : coll.getSlices()) {
notLivesReplicas.addAll(sliceCmd(clusterState, params, stateMatcher, slice, shardHandler, asyncId, requestMap)); notLivesReplicas.addAll(shardRequestTracker.sliceCmd(clusterState, params, stateMatcher, slice, shardHandler));
} }
processResponses(results, shardHandler, false, null, asyncId, requestMap, okayExceptions); shardRequestTracker.processResponses(results, shardHandler, false, null, okayExceptions);
return notLivesReplicas; return notLivesReplicas;
} }
/** private void processResponse(NamedList<Object> results, ShardResponse srsp, Set<String> okayExceptions) {
* Send request to all replicas of a slice
* @return List of replicas which is not live for receiving the request
*/
List<Replica> sliceCmd(ClusterState clusterState, ModifiableSolrParams params, Replica.State stateMatcher,
Slice slice, ShardHandler shardHandler, String asyncId, Map<String, String> requestMap) {
List<Replica> notLiveReplicas = new ArrayList<>();
for (Replica replica : slice.getReplicas()) {
if ((stateMatcher == null || Replica.State.getState(replica.getStr(ZkStateReader.STATE_PROP)) == stateMatcher)) {
if (clusterState.liveNodesContain(replica.getStr(ZkStateReader.NODE_NAME_PROP))) {
// For thread safety, only simple clone the ModifiableSolrParams
ModifiableSolrParams cloneParams = new ModifiableSolrParams();
cloneParams.add(params);
cloneParams.set(CoreAdminParams.CORE, replica.getStr(ZkStateReader.CORE_NAME_PROP));
sendShardRequest(replica.getStr(ZkStateReader.NODE_NAME_PROP), cloneParams, shardHandler, asyncId, requestMap);
} else {
notLiveReplicas.add(replica);
}
}
}
return notLiveReplicas;
}
private void processResponse(NamedList results, ShardResponse srsp, Set<String> okayExceptions) {
Throwable e = srsp.getException(); Throwable e = srsp.getException();
String nodeName = srsp.getNodeName(); String nodeName = srsp.getNodeName();
SolrResponse solrResponse = srsp.getSolrResponse(); SolrResponse solrResponse = srsp.getSolrResponse();
@ -862,8 +776,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
processResponse(results, e, nodeName, solrResponse, shard, okayExceptions); processResponse(results, e, nodeName, solrResponse, shard, okayExceptions);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("deprecation")
private void processResponse(NamedList results, Throwable e, String nodeName, SolrResponse solrResponse, String shard, Set<String> okayExceptions) { private void processResponse(NamedList<Object> results, Throwable e, String nodeName, SolrResponse solrResponse, String shard, Set<String> okayExceptions) {
String rootThrowable = null; String rootThrowable = null;
if (e instanceof RemoteSolrException) { if (e instanceof RemoteSolrException) {
rootThrowable = ((RemoteSolrException) e).getRootThrowable(); rootThrowable = ((RemoteSolrException) e).getRootThrowable();
@ -897,30 +811,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
success.add(key, value); success.add(key, value);
} }
/* private NamedList<Object> waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId) {
* backward compatibility reasons, add the response with the async ID as top level.
* This can be removed in Solr 9
*/
@Deprecated
public final static boolean INCLUDE_TOP_LEVEL_RESPONSE = true;
@SuppressWarnings("unchecked")
private void waitForAsyncCallsToComplete(Map<String, String> requestMap, NamedList results) {
for (String k:requestMap.keySet()) {
log.debug("I am Waiting for :{}/{}", k, requestMap.get(k));
NamedList reqResult = waitForCoreAdminAsyncCallToComplete(k, requestMap.get(k));
if (INCLUDE_TOP_LEVEL_RESPONSE) {
results.add(requestMap.get(k), reqResult);
}
if ("failed".equalsIgnoreCase(((String)reqResult.get("STATUS")))) {
log.error("Error from shard {}: {}", k, reqResult);
addFailure(results, k, reqResult);
} else {
addSuccess(results, k, reqResult);
}
}
}
private NamedList waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId) {
ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
ModifiableSolrParams params = new ModifiableSolrParams(); ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTSTATUS.toString()); params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTSTATUS.toString());
@ -942,10 +833,10 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
do { do {
srsp = shardHandler.takeCompletedOrError(); srsp = shardHandler.takeCompletedOrError();
if (srsp != null) { if (srsp != null) {
NamedList results = new NamedList(); NamedList<Object> results = new NamedList<>();
processResponse(results, srsp, Collections.emptySet()); processResponse(results, srsp, Collections.emptySet());
if (srsp.getSolrResponse().getResponse() == null) { if (srsp.getSolrResponse().getResponse() == null) {
NamedList response = new NamedList(); NamedList<Object> response = new NamedList<>();
response.add("STATUS", "failed"); response.add("STATUS", "failed");
return response; return response;
} }
@ -1045,4 +936,130 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
protected interface Cmd { protected interface Cmd {
void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception; void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception;
} }
/*
* backward compatibility reasons, add the response with the async ID as top level.
* This can be removed in Solr 9
*/
@Deprecated
static boolean INCLUDE_TOP_LEVEL_RESPONSE = true;
public ShardRequestTracker syncRequestTracker() {
return new ShardRequestTracker(null);
}
public ShardRequestTracker asyncRequestTracker(String asyncId) {
return new ShardRequestTracker(asyncId);
}
public class ShardRequestTracker{
private final String asyncId;
private final NamedList<String> shardAsyncIdByNode = new NamedList<String>();
private ShardRequestTracker(String asyncId) {
this.asyncId = asyncId;
}
/**
* Send request to all replicas of a slice
* @return List of replicas which is not live for receiving the request
*/
public List<Replica> sliceCmd(ClusterState clusterState, ModifiableSolrParams params, Replica.State stateMatcher,
Slice slice, ShardHandler shardHandler) {
List<Replica> notLiveReplicas = new ArrayList<>();
for (Replica replica : slice.getReplicas()) {
if ((stateMatcher == null || Replica.State.getState(replica.getStr(ZkStateReader.STATE_PROP)) == stateMatcher)) {
if (clusterState.liveNodesContain(replica.getStr(ZkStateReader.NODE_NAME_PROP))) {
// For thread safety, only simple clone the ModifiableSolrParams
ModifiableSolrParams cloneParams = new ModifiableSolrParams();
cloneParams.add(params);
cloneParams.set(CoreAdminParams.CORE, replica.getStr(ZkStateReader.CORE_NAME_PROP));
sendShardRequest(replica.getStr(ZkStateReader.NODE_NAME_PROP), cloneParams, shardHandler);
} else {
notLiveReplicas.add(replica);
}
}
}
return notLiveReplicas;
}
public void sendShardRequest(String nodeName, ModifiableSolrParams params,
ShardHandler shardHandler) {
sendShardRequest(nodeName, params, shardHandler, adminPath, zkStateReader);
}
public void sendShardRequest(String nodeName, ModifiableSolrParams params, ShardHandler shardHandler,
String adminPath, ZkStateReader zkStateReader) {
if (asyncId != null) {
String coreAdminAsyncId = asyncId + Math.abs(System.nanoTime());
params.set(ASYNC, coreAdminAsyncId);
track(nodeName, coreAdminAsyncId);
}
ShardRequest sreq = new ShardRequest();
params.set("qt", adminPath);
sreq.purpose = 1;
String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
sreq.shards = new String[] {replica};
sreq.actualShards = sreq.shards;
sreq.nodeName = nodeName;
sreq.params = params;
shardHandler.submit(sreq, replica, sreq.params);
}
void processResponses(NamedList<Object> results, ShardHandler shardHandler, boolean abortOnError, String msgOnError) {
processResponses(results, shardHandler, abortOnError, msgOnError, Collections.emptySet());
}
void processResponses(NamedList<Object> results, ShardHandler shardHandler, boolean abortOnError, String msgOnError,
Set<String> okayExceptions) {
// Processes all shard responses
ShardResponse srsp;
do {
srsp = shardHandler.takeCompletedOrError();
if (srsp != null) {
processResponse(results, srsp, okayExceptions);
Throwable exception = srsp.getException();
if (abortOnError && exception != null) {
// drain pending requests
while (srsp != null) {
srsp = shardHandler.takeCompletedOrError();
}
throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError, exception);
}
}
} while (srsp != null);
// If request is async wait for the core admin to complete before returning
if (asyncId != null) {
waitForAsyncCallsToComplete(results);
shardAsyncIdByNode.clear();
}
}
private void waitForAsyncCallsToComplete(NamedList<Object> results) {
for (Map.Entry<String,String> nodeToAsync:shardAsyncIdByNode) {
final String node = nodeToAsync.getKey();
final String shardAsyncId = nodeToAsync.getValue();
log.debug("I am Waiting for :{}/{}", node, shardAsyncId);
NamedList<Object> reqResult = waitForCoreAdminAsyncCallToComplete(node, shardAsyncId);
if (INCLUDE_TOP_LEVEL_RESPONSE) {
results.add(shardAsyncId, reqResult);
}
if ("failed".equalsIgnoreCase(((String)reqResult.get("STATUS")))) {
log.error("Error from shard {}: {}", node, reqResult);
addFailure(results, node, reqResult);
} else {
addSuccess(results, node, reqResult);
}
}
}
/** @deprecated consider to make it private after {@link CreateCollectionCmd} refactoring*/
@Deprecated void track(String nodeName, String coreAdminAsyncId) {
shardAsyncIdByNode.add(nodeName, coreAdminAsyncId);
}
}
} }

View File

@ -52,6 +52,7 @@ import java.util.concurrent.TimeoutException;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper; import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.cloud.Overseer; import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrException.ErrorCode;
@ -95,7 +96,6 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient()); ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
String asyncId = message.getStr(ASYNC); String asyncId = message.getStr(ASYNC);
String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY); String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY);
Map<String, String> requestMap = new HashMap<>();
CoreContainer cc = ocmh.overseer.getCoreContainer(); CoreContainer cc = ocmh.overseer.getCoreContainer();
BackupRepository repository = cc.newBackupRepository(Optional.ofNullable(repo)); BackupRepository repository = cc.newBackupRepository(Optional.ofNullable(repo));
@ -318,36 +318,42 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
//refresh the location copy of collection state //refresh the location copy of collection state
restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName); restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
//Copy data from backed up index to each replica {
ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
// Copy data from backed up index to each replica
for (Slice slice : restoreCollection.getSlices()) { for (Slice slice : restoreCollection.getSlices()) {
ModifiableSolrParams params = new ModifiableSolrParams(); ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.RESTORECORE.toString()); params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.RESTORECORE.toString());
params.set(NAME, "snapshot." + slice.getName()); params.set(NAME, "snapshot." + slice.getName());
params.set(CoreAdminParams.BACKUP_LOCATION, backupPath.toASCIIString()); params.set(CoreAdminParams.BACKUP_LOCATION, backupPath.toASCIIString());
params.set(CoreAdminParams.BACKUP_REPOSITORY, repo); params.set(CoreAdminParams.BACKUP_REPOSITORY, repo);
ocmh.sliceCmd(clusterState, params, null, slice, shardHandler, asyncId, requestMap); shardRequestTracker.sliceCmd(clusterState, params, null, slice, shardHandler);
}
shardRequestTracker.processResponses(new NamedList(), shardHandler, true, "Could not restore core");
} }
ocmh.processResponses(new NamedList(), shardHandler, true, "Could not restore core", asyncId, requestMap);
{
ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
for (Slice s: restoreCollection.getSlices()) { for (Slice s : restoreCollection.getSlices()) {
for (Replica r : s.getReplicas()) { for (Replica r : s.getReplicas()) {
String nodeName = r.getNodeName(); String nodeName = r.getNodeName();
String coreNodeName = r.getCoreName(); String coreNodeName = r.getCoreName();
Replica.State stateRep = r.getState(); Replica.State stateRep = r.getState();
log.debug("Calling REQUESTAPPLYUPDATES on: nodeName={}, coreNodeName={}, state={}" log.debug("Calling REQUESTAPPLYUPDATES on: nodeName={}, coreNodeName={}, state={}", nodeName, coreNodeName,
, nodeName, coreNodeName, stateRep.name()); stateRep.name());
ModifiableSolrParams params = new ModifiableSolrParams(); ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString()); params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString());
params.set(CoreAdminParams.NAME, coreNodeName); params.set(CoreAdminParams.NAME, coreNodeName);
ocmh.sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap); shardRequestTracker.sendShardRequest(nodeName, params, shardHandler);
} }
ocmh.processResponses(new NamedList(), shardHandler, true, "REQUESTAPPLYUPDATES calls did not succeed", asyncId, requestMap); shardRequestTracker.processResponses(new NamedList(), shardHandler, true,
"REQUESTAPPLYUPDATES calls did not succeed");
}
} }
//Mark all shards in ACTIVE STATE //Mark all shards in ACTIVE STATE

View File

@ -40,6 +40,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.Variable.Type;
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData; import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
import org.apache.solr.client.solrj.request.CoreAdminRequest; import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.cloud.Overseer; import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
@ -92,12 +93,13 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
this.ocmh = ocmh; this.ocmh = ocmh;
} }
@SuppressWarnings("unchecked")
@Override @Override
public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception { public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
split(state, message, results); split(state, message,(NamedList<Object>) results);
} }
public boolean split(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception { public boolean split(ClusterState clusterState, ZkNodeProps message, NamedList<Object> results) throws Exception {
boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false); boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
String methodStr = message.getStr(CommonAdminParams.SPLIT_METHOD, SolrIndexSplitter.SplitMethod.REWRITE.toLower()); String methodStr = message.getStr(CommonAdminParams.SPLIT_METHOD, SolrIndexSplitter.SplitMethod.REWRITE.toLower());
SolrIndexSplitter.SplitMethod splitMethod = SolrIndexSplitter.SplitMethod.get(methodStr); SolrIndexSplitter.SplitMethod splitMethod = SolrIndexSplitter.SplitMethod.get(methodStr);
@ -233,7 +235,6 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
} }
final String asyncId = message.getStr(ASYNC); final String asyncId = message.getStr(ASYNC);
Map<String, String> requestMap = new HashMap<>();
String nodeName = parentShardLeader.getNodeName(); String nodeName = parentShardLeader.getNodeName();
t = timings.sub("createSubSlicesAndLeadersInState"); t = timings.sub("createSubSlicesAndLeadersInState");
@ -285,12 +286,16 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
ocmh.addReplica(clusterState, new ZkNodeProps(propMap), results, null); ocmh.addReplica(clusterState, new ZkNodeProps(propMap), results, null);
} }
@SuppressWarnings("deprecation")
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient()); ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
{
ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard leaders", asyncId, requestMap); final ShardRequestTracker syncRequestTracker = ocmh.syncRequestTracker();
syncRequestTracker.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard leaders");
}
t.stop(); t.stop();
t = timings.sub("waitForSubSliceLeadersAlive"); t = timings.sub("waitForSubSliceLeadersAlive");
{
final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
for (String subShardName : subShardNames) { for (String subShardName : subShardNames) {
// wait for parent leader to acknowledge the sub-shard core // wait for parent leader to acknowledge the sub-shard core
log.debug("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName); log.debug("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName);
@ -304,11 +309,11 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
cmd.setOnlyIfLeader(true); cmd.setOnlyIfLeader(true);
ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams()); ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
ocmh.sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap); shardRequestTracker.sendShardRequest(nodeName, p, shardHandler);
} }
ocmh.processResponses(results, shardHandler, true, "SPLITSHARD timed out waiting for subshard leaders to come up", shardRequestTracker.processResponses(results, shardHandler, true, "SPLITSHARD timed out waiting for subshard leaders to come up");
asyncId, requestMap); }
t.stop(); t.stop();
log.debug("Successfully created all sub-shards for collection " + collectionName + " parent shard: " + slice log.debug("Successfully created all sub-shards for collection " + collectionName + " parent shard: " + slice
@ -328,17 +333,21 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
params.set(CoreAdminParams.RANGES, rangesStr); params.set(CoreAdminParams.RANGES, rangesStr);
t = timings.sub("splitParentCore"); t = timings.sub("splitParentCore");
{
final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
shardRequestTracker.sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler);
ocmh.sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler, asyncId, requestMap); shardRequestTracker.processResponses(results, shardHandler, true, "SPLITSHARD failed to invoke SPLIT core admin command");
}
ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to invoke SPLIT core admin command", asyncId,
requestMap);
t.stop(); t.stop();
log.debug("Index on shard: {} split into {} successfully", nodeName, subShardNames.size()); log.debug("Index on shard: {} split into {} successfully", nodeName, subShardNames.size());
t = timings.sub("applyBufferedUpdates"); t = timings.sub("applyBufferedUpdates");
// apply buffered updates on sub-shards // apply buffered updates on sub-shards
{
final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
for (int i = 0; i < subShardNames.size(); i++) { for (int i = 0; i < subShardNames.size(); i++) {
String subShardName = subShardNames.get(i); String subShardName = subShardNames.get(i);
@ -348,11 +357,13 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString()); params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString());
params.set(CoreAdminParams.NAME, subShardName); params.set(CoreAdminParams.NAME, subShardName);
ocmh.sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap); shardRequestTracker.sendShardRequest(nodeName, params, shardHandler);
} }
ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed while asking sub shard leaders" + shardRequestTracker.processResponses(results, shardHandler, true,
" to apply buffered updates", asyncId, requestMap); "SPLITSHARD failed while asking sub shard leaders" +
" to apply buffered updates");
}
t.stop(); t.stop();
log.debug("Successfully applied buffered updates on : " + subShardNames); log.debug("Successfully applied buffered updates on : " + subShardNames);
@ -507,7 +518,10 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
assert TestInjection.injectSplitFailureAfterReplicaCreation(); assert TestInjection.injectSplitFailureAfterReplicaCreation();
ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard replicas", asyncId, requestMap); {
final ShardRequestTracker syncRequestTracker = ocmh.syncRequestTracker();
syncRequestTracker.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard replicas");
}
t.stop(); t.stop();
log.info("Successfully created all replica shards for all sub-slices " + subSlices); log.info("Successfully created all replica shards for all sub-slices " + subSlices);
@ -639,7 +653,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
props.put(SHARD_ID_PROP, subSlice); props.put(SHARD_ID_PROP, subSlice);
ZkNodeProps m = new ZkNodeProps(props); ZkNodeProps m = new ZkNodeProps(props);
try { try {
ocmh.commandMap.get(DELETESHARD).call(clusterState, m, new NamedList()); ocmh.commandMap.get(DELETESHARD).call(clusterState, m, new NamedList<Object>());
} catch (Exception e) { } catch (Exception e) {
log.warn("Cleanup failed after failed split of " + collectionName + "/" + parentShard + ": (deleting existing sub shard " + subSlice + ")", e); log.warn("Cleanup failed after failed split of " + collectionName + "/" + parentShard + ": (deleting existing sub shard " + subSlice + ")", e);
} }

View File

@ -14,32 +14,51 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.solr.cloud; package org.apache.solr.cloud.api.collections;
import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest.Create;
import org.apache.solr.client.solrj.response.CollectionAdminResponse; import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.RequestStatusState; import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler; import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
public class AsyncCallRequestStatusResponseTest extends SolrCloudTestCase { public class AsyncCallRequestStatusResponseTest extends SolrCloudTestCase {
private static boolean oldResponseEntries;
@SuppressWarnings("deprecation")
@BeforeClass @BeforeClass
public static void setupCluster() throws Exception { public static void setupCluster() throws Exception {
oldResponseEntries = OverseerCollectionMessageHandler.INCLUDE_TOP_LEVEL_RESPONSE;
OverseerCollectionMessageHandler.INCLUDE_TOP_LEVEL_RESPONSE = random().nextBoolean();
configureCluster(2) configureCluster(2)
.addConfig("conf", configset("cloud-minimal")) .addConfig("conf", configset("cloud-minimal"))
.configure(); .configure();
} }
@SuppressWarnings("deprecation")
@AfterClass
public static void restoreFlag() throws Exception {
OverseerCollectionMessageHandler.INCLUDE_TOP_LEVEL_RESPONSE = oldResponseEntries;
}
@SuppressWarnings("deprecation")
@Test @Test
public void testAsyncCallStatusResponse() throws Exception { public void testAsyncCallStatusResponse() throws Exception {
int numShards = 4;
int numReplicas = 1;
Create createCollection = CollectionAdminRequest.createCollection("asynccall", "conf", numShards, numReplicas);
createCollection.setMaxShardsPerNode(100);
String asyncId = String asyncId =
CollectionAdminRequest.createCollection("asynccall", "conf", 2, 1).processAsync(cluster.getSolrClient()); createCollection.processAsync(cluster.getSolrClient());
waitForState("Expected collection 'asynccall' to have 2 shards and 1 replica", "asynccall", clusterShape(2, 2)); waitForState("Expected collection 'asynccall' to have "+numShards+" shards and "+
numShards*numReplicas+" replica", "asynccall", clusterShape(numShards, numShards*numReplicas));
RequestStatusState state = AbstractFullDistribZkTestBase.getRequestStateAfterCompletion(asyncId, 30, cluster.getSolrClient()); RequestStatusState state = AbstractFullDistribZkTestBase.getRequestStateAfterCompletion(asyncId, 30, cluster.getSolrClient());
assertEquals("Unexpected request status: " + state, "completed", state.getKey()); assertEquals("Unexpected request status: " + state, "completed", state.getKey());
@ -48,13 +67,25 @@ public class AsyncCallRequestStatusResponseTest extends SolrCloudTestCase {
CollectionAdminResponse rsp = requestStatus.process(cluster.getSolrClient()); CollectionAdminResponse rsp = requestStatus.process(cluster.getSolrClient());
NamedList<?> r = rsp.getResponse(); NamedList<?> r = rsp.getResponse();
if (OverseerCollectionMessageHandler.INCLUDE_TOP_LEVEL_RESPONSE) { if (OverseerCollectionMessageHandler.INCLUDE_TOP_LEVEL_RESPONSE) {
assertEquals("Expected 5 elements in the response" + r, 5, r.size()); final int actualNumOfElems = 3+(numShards*numReplicas);
// responseHeader, success, status, + old responses per every replica
assertEquals("Expected "+actualNumOfElems+" elements in the response" + r.jsonStr(),
actualNumOfElems, r.size());
} else { } else {
assertEquals("Expected 3 elements in the response" + r, 3, r.size()); // responseHeader, success, status
assertEquals("Expected 3 elements in the response" + r.jsonStr(), 3, r.size());
} }
assertNotNull("Expected 'responseHeader' response" + r, r.get("responseHeader")); assertNotNull("Expected 'responseHeader' response" + r, r.get("responseHeader"));
assertNotNull("Expected 'success' response" + r, r.get("success"));
assertNotNull("Expected 'status' response" + r, r.get("status")); assertNotNull("Expected 'status' response" + r, r.get("status"));
assertEquals("Expected 4 elements in the success element" + r.get("success"), 4, ((NamedList<?>)r.get("success")).size()); {
final NamedList<?> success = (NamedList<?>)r.get("success");
assertNotNull("Expected 'success' response" + r, success);
final int actualSuccessElems = 2*(numShards*numReplicas);
// every replica responds once on submit and once on complete
assertEquals("Expected "+actualSuccessElems+
" elements in the success element" + success.jsonStr(),
actualSuccessElems, success.size());
}
} }
} }

View File

@ -17,8 +17,9 @@
package org.apache.solr.cloud.api.collections; package org.apache.solr.cloud.api.collections;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.request.QueryRequest;
@ -44,8 +45,10 @@ public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest {
params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.CREATE.toString()); params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.CREATE.toString());
params.set("name", "collection2"); params.set("name", "collection2");
params.set("numShards", 2); int numShards = 2;
params.set("replicationFactor", 1); params.set("numShards", numShards);
int replicationFactor = 1;
params.set("replicationFactor", replicationFactor);
params.set("maxShardsPerNode", 100); params.set("maxShardsPerNode", 100);
params.set("collection.configName", "conf1"); params.set("collection.configName", "conf1");
params.set(CommonAdminParams.ASYNC, "1000"); params.set(CommonAdminParams.ASYNC, "1000");
@ -66,13 +69,17 @@ public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest {
params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString()); params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
params.set(OverseerCollectionMessageHandler.REQUESTID, "1000"); params.set(OverseerCollectionMessageHandler.REQUESTID, "1000");
NamedList<Object> createResponse =null;
try { try {
message = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS); createResponse = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS);
message = (String) createResponse.findRecursive("status","msg");
} catch (SolrServerException | IOException e) { } catch (SolrServerException | IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
assertEquals("found [1000] in completed tasks", message); assertEquals("found [1000] in completed tasks", message);
assertEquals("expecting "+numShards+" shard responses at "+createResponse,
numShards, numResponsesCompleted(createResponse));
// Check for a random (hopefully non-existent request id // Check for a random (hopefully non-existent request id
params = new ModifiableSolrParams(); params = new ModifiableSolrParams();
@ -103,13 +110,18 @@ public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest {
params = new ModifiableSolrParams(); params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString()); params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
params.set(OverseerCollectionMessageHandler.REQUESTID, "1001"); params.set(OverseerCollectionMessageHandler.REQUESTID, "1001");
NamedList<Object> splitResponse=null;
try { try {
message = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS); splitResponse = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS);
message = (String) splitResponse.findRecursive("status","msg");
} catch (SolrServerException | IOException e) { } catch (SolrServerException | IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
assertEquals("found [1001] in completed tasks", message); assertEquals("found [1001] in completed tasks", message);
// create * 2 + preprecovery *2 + split + req_apply_upd * 2 =7
assertEquals("expecting "+(2+2+1+2)+" shard responses at "+splitResponse,
(2+2+1+2), numResponsesCompleted(splitResponse));
params = new ModifiableSolrParams(); params = new ModifiableSolrParams();
params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.CREATE.toString()); params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.CREATE.toString());
@ -131,12 +143,12 @@ public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest {
params.set(OverseerCollectionMessageHandler.REQUESTID, "1002"); params.set(OverseerCollectionMessageHandler.REQUESTID, "1002");
try { try {
message = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS); NamedList<Object> response = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS);
message = (String) response.findRecursive("status","msg");
} catch (SolrServerException | IOException e) { } catch (SolrServerException | IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
assertEquals("found [1002] in failed tasks", message); assertEquals("found [1002] in failed tasks", message);
params = new ModifiableSolrParams(); params = new ModifiableSolrParams();
@ -156,21 +168,38 @@ public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest {
assertEquals("Task with the same requestid already exists.", r.get("error")); assertEquals("Task with the same requestid already exists.", r.get("error"));
} }
@SuppressWarnings("unchecked")
private int numResponsesCompleted(NamedList<Object> response) {
int sum=0;
for (String key: Arrays.asList("success","failure")) {
NamedList<Object> allStatuses = (NamedList<Object>)response.get(key);
if (allStatuses!=null) {
for (Map.Entry<String, Object> tuple: allStatuses) {
NamedList<Object> statusResponse = (NamedList<Object>) tuple.getValue();
if (statusResponse.indexOf("STATUS",0)>=0) {
sum+=1;
}
}
}
}
return sum;
}
/** /**
* Helper method to send a status request with specific retry limit and return * Helper method to send a status request with specific retry limit and return
* the message/null from the success response. * the message/null from the success response.
*/ */
private String sendStatusRequestWithRetry(ModifiableSolrParams params, int maxCounter) private NamedList<Object> sendStatusRequestWithRetry(ModifiableSolrParams params, int maxCounter)
throws SolrServerException, IOException{ throws SolrServerException, IOException{
String message = null; NamedList<Object> r = null;
while (maxCounter-- > 0) { while (maxCounter-- > 0) {
final NamedList r = sendRequest(params); r = sendRequest(params);
final NamedList status = (NamedList) r.get("status"); @SuppressWarnings("unchecked")
final NamedList<Object> status = (NamedList<Object>) r.get("status");
final RequestStatusState state = RequestStatusState.fromKey((String) status.get("state")); final RequestStatusState state = RequestStatusState.fromKey((String) status.get("state"));
message = (String) status.get("msg");
if (state == RequestStatusState.COMPLETED || state == RequestStatusState.FAILED) { if (state == RequestStatusState.COMPLETED || state == RequestStatusState.FAILED) {
return message; return r;
} }
try { try {
@ -180,11 +209,11 @@ public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest {
} }
// Return last state? // Return last state?
return message; return r;
} }
protected NamedList sendRequest(ModifiableSolrParams params) throws SolrServerException, IOException { protected NamedList<Object> sendRequest(ModifiableSolrParams params) throws SolrServerException, IOException {
SolrRequest request = new QueryRequest(params); QueryRequest request = new QueryRequest(params);
request.setPath("/admin/collections"); request.setPath("/admin/collections");
String baseUrl = ((HttpSolrClient) shardToJetty.get(SHARD1).get(0).client.getSolrClient()).getBaseURL(); String baseUrl = ((HttpSolrClient) shardToJetty.get(SHARD1).get(0).client.getSolrClient()).getBaseURL();

View File

@ -2341,7 +2341,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
} }
} }
static RequestStatusState getRequestStateAfterCompletion(String requestId, int waitForSeconds, SolrClient client) public static RequestStatusState getRequestStateAfterCompletion(String requestId, int waitForSeconds, SolrClient client)
throws IOException, SolrServerException { throws IOException, SolrServerException {
RequestStatusState state = null; RequestStatusState state = null;
final TimeOut timeout = new TimeOut(waitForSeconds, TimeUnit.SECONDS, TimeSource.NANO_TIME); final TimeOut timeout = new TimeOut(waitForSeconds, TimeUnit.SECONDS, TimeSource.NANO_TIME);