From 39ff3052c383f6275fb647f7f5b641ecaf46c639 Mon Sep 17 00:00:00 2001 From: Mikhail Khludnev Date: Sun, 28 Apr 2019 11:19:15 +0300 Subject: [PATCH] SOLR-12291: fixing premature completion of async tasks * extract async tracking methods from OverseerCollectionMessageHandler into the separate class * replacing hashmap to named list to avoid entry loss --- solr/CHANGES.txt | 3 + .../cloud/api/collections/AddReplicaCmd.java | 39 ++- .../solr/cloud/api/collections/BackupCmd.java | 19 +- .../api/collections/CreateCollectionCmd.java | 10 +- .../api/collections/CreateSnapshotCmd.java | 7 +- .../api/collections/DeleteCollectionCmd.java | 19 +- .../api/collections/DeleteReplicaCmd.java | 23 +- .../api/collections/DeleteSnapshotCmd.java | 11 +- .../cloud/api/collections/MigrateCmd.java | 70 +++-- .../OverseerCollectionMessageHandler.java | 263 ++++++++++-------- .../cloud/api/collections/RestoreCmd.java | 62 +++-- .../cloud/api/collections/SplitShardCmd.java | 90 +++--- .../AsyncCallRequestStatusResponseTest.java | 49 +++- .../TestRequestStatusCollectionAPI.java | 65 +++-- .../cloud/AbstractFullDistribZkTestBase.java | 2 +- 15 files changed, 418 insertions(+), 314 deletions(-) rename solr/core/src/test/org/apache/solr/cloud/{ => api/collections}/AsyncCallRequestStatusResponseTest.java (51%) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index b20bee381d1..cf4a2a92a9a 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -198,6 +198,9 @@ Bug Fixes * SOLR-5970: Return correct status upon collection creation failure (Abraham Elmahrek, Ishan Chattopadhyaya, Jason Gerlowski, Kesharee Nandan Vishwakarma) +* SOLR-12291: prematurely reporting not yet finished async Collections API call as completed + when collection's replicas are collocated at least at one node (Varun Thacker, Mikhail Khludnev) + Improvements ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java index 5b7f8134f0a..08dc12a54ff 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java @@ -18,13 +18,27 @@ package org.apache.solr.cloud.api.collections; +import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET; +import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE; +import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS; +import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS; +import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS; +import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF; +import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION; +import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA; +import static org.apache.solr.common.params.CommonAdminParams.ASYNC; +import static org.apache.solr.common.params.CommonAdminParams.TIMEOUT; +import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE; + import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.EnumMap; -import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -37,6 +51,7 @@ import org.apache.solr.client.solrj.cloud.SolrCloudManager; import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper; import org.apache.solr.cloud.ActiveReplicaWatcher; import org.apache.solr.cloud.Overseer; +import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker; import org.apache.solr.common.SolrCloseableLatch; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; @@ -56,21 +71,6 @@ import org.apache.solr.handler.component.ShardHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET; -import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE; -import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; -import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP; -import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS; -import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS; -import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; -import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS; -import static org.apache.solr.common.params.CollectionAdminParams.COLL_CONF; -import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA; -import static org.apache.solr.common.params.CommonAdminParams.ASYNC; -import static org.apache.solr.common.params.CommonAdminParams.TIMEOUT; -import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE; - public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -157,17 +157,16 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd { ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(); ZkStateReader zkStateReader = ocmh.zkStateReader; - // For tracking async calls. - Map requestMap = new HashMap<>(); + final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId); for (CreateReplica createReplica : createReplicas) { assert createReplica.coreName != null; ModifiableSolrParams params = getReplicaParams(clusterState, message, results, collectionName, coll, skipCreateReplicaInClusterState, asyncId, shardHandler, createReplica); - ocmh.sendShardRequest(createReplica.node, params, shardHandler, asyncId, requestMap); + shardRequestTracker.sendShardRequest(createReplica.node, params, shardHandler); } Runnable runnable = () -> { - ocmh.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap); + shardRequestTracker.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica"); for (CreateReplica replica : createReplicas) { ocmh.waitForCoreNodeName(collectionName, replica.node, replica.coreName); } diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java index b9900cc75b4..bd3bd3be885 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java @@ -16,17 +16,21 @@ */ package org.apache.solr.cloud.api.collections; +import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP; +import static org.apache.solr.common.params.CommonAdminParams.ASYNC; +import static org.apache.solr.common.params.CommonParams.NAME; + import java.lang.invoke.MethodHandles; import java.net.URI; import java.time.Instant; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; -import java.util.Map; import java.util.Optional; import java.util.Properties; import org.apache.lucene.util.Version; +import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.ClusterState; @@ -51,11 +55,6 @@ import org.apache.solr.handler.component.ShardHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; -import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP; -import static org.apache.solr.common.params.CommonAdminParams.ASYNC; -import static org.apache.solr.common.params.CommonParams.NAME; - public class BackupCmd implements OverseerCollectionMessageHandler.Cmd { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -162,7 +161,6 @@ public class BackupCmd implements OverseerCollectionMessageHandler.Cmd { String asyncId = request.getStr(ASYNC); String repoName = request.getStr(CoreAdminParams.BACKUP_REPOSITORY); ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient()); - Map requestMap = new HashMap<>(); String commitName = request.getStr(CoreAdminParams.COMMIT_NAME); Optional snapshotMeta = Optional.empty(); @@ -187,6 +185,7 @@ public class BackupCmd implements OverseerCollectionMessageHandler.Cmd { shardsToConsider = snapshotMeta.get().getShards(); } + final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId); for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getActiveSlices()) { Replica replica = null; @@ -217,11 +216,11 @@ public class BackupCmd implements OverseerCollectionMessageHandler.Cmd { params.set(CoreAdminParams.COMMIT_NAME, snapshotMeta.get().getName()); } - ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap); + shardRequestTracker.sendShardRequest(replica.getNodeName(), params, shardHandler); log.debug("Sent backup request to core={} for backupName={}", coreName, backupName); } log.debug("Sent backup requests to all shard leaders for backupName={}", backupName); - ocmh.processResponses(results, shardHandler, true, "Could not backup all shards", asyncId, requestMap); + shardRequestTracker.processResponses(results, shardHandler, true, "Could not backup all shards"); } } diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java index dbbaa26c304..a71cb5370d1 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java @@ -42,6 +42,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper; import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData; import org.apache.solr.cloud.Overseer; import org.apache.solr.cloud.ZkController; +import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker; import org.apache.solr.cloud.overseer.ClusterStateMutator; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; @@ -199,10 +200,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd return; } - // For tracking async calls. - Map requestMap = new HashMap<>(); - - + final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(async); log.debug(formatString("Creating SolrCores for new collection {0}, shardNames {1} , message : {2}", collectionName, shardNames, message)); Map coresToCreate = new LinkedHashMap<>(); @@ -264,7 +262,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd if (async != null) { String coreAdminAsyncId = async + Math.abs(System.nanoTime()); params.add(ASYNC, coreAdminAsyncId); - requestMap.put(nodeName, coreAdminAsyncId); + shardRequestTracker.track(nodeName, coreAdminAsyncId); } ocmh.addPropertyParams(message, params); @@ -293,7 +291,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd } } - ocmh.processResponses(results, shardHandler, false, null, async, requestMap, Collections.emptySet()); + shardRequestTracker.processResponses(results, shardHandler, false, null, Collections.emptySet()); boolean failure = results.get("failure") != null && ((SimpleOrderedMap)results.get("failure")).size() > 0; if (failure) { // Let's cleanup as we hit an exception diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java index 4203b921965..d761a88c3db 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.ClusterState; @@ -83,11 +84,11 @@ public class CreateSnapshotCmd implements OverseerCollectionMessageHandler.Cmd { SolrSnapshotManager.createCollectionLevelSnapshot(zkClient, collectionName, new CollectionSnapshotMetaData(commitName)); log.info("Created a ZK path to store snapshot information for collection={} with commitName={}", collectionName, commitName); - Map requestMap = new HashMap<>(); NamedList shardRequestResults = new NamedList(); Map shardByCoreName = new HashMap<>(); ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient()); + final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId); for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) { for (Replica replica : slice.getReplicas()) { if (replica.getState() != State.ACTIVE) { @@ -103,7 +104,7 @@ public class CreateSnapshotCmd implements OverseerCollectionMessageHandler.Cmd { params.set(CORE_NAME_PROP, coreName); params.set(CoreAdminParams.COMMIT_NAME, commitName); - ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap); + shardRequestTracker.sendShardRequest(replica.getNodeName(), params, shardHandler); log.debug("Sent createsnapshot request to core={} with commitName={}", coreName, commitName); shardByCoreName.put(coreName, slice); @@ -115,7 +116,7 @@ public class CreateSnapshotCmd implements OverseerCollectionMessageHandler.Cmd { // This is to take care of the situation where e.g. entire shard is unavailable. Set failedShards = new HashSet<>(); - ocmh.processResponses(shardRequestResults, shardHandler, false, null, asyncId, requestMap); + shardRequestTracker.processResponses(shardRequestResults, shardHandler, false, null); NamedList success = (NamedList) shardRequestResults.get("success"); List replicas = new ArrayList<>(); if (success != null) { diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java index 0f0dfbd8a7f..d5215f7b9ca 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java @@ -18,8 +18,13 @@ package org.apache.solr.cloud.api.collections; +import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH; +import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION; +import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE; +import static org.apache.solr.common.params.CommonAdminParams.ASYNC; +import static org.apache.solr.common.params.CommonParams.NAME; + import java.lang.invoke.MethodHandles; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -51,12 +56,6 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH; -import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE; -import static org.apache.solr.common.params.CommonAdminParams.ASYNC; -import static org.apache.solr.common.params.CommonParams.NAME; - public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final OverseerCollectionMessageHandler ocmh; @@ -115,15 +114,11 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd params.set(CoreAdminParams.DELETE_METRICS_HISTORY, deleteHistory); String asyncId = message.getStr(ASYNC); - Map requestMap = null; - if (asyncId != null) { - requestMap = new HashMap<>(); - } Set okayExceptions = new HashSet<>(1); okayExceptions.add(NonExistentCoreException.class.getName()); - List failedReplicas = ocmh.collectionCmd(message, params, results, null, asyncId, requestMap, okayExceptions); + List failedReplicas = ocmh.collectionCmd(message, params, results, null, asyncId, okayExceptions); for (Replica failedReplica : failedReplicas) { boolean isSharedFS = failedReplica.getBool(ZkStateReader.SHARED_STORAGE_PROP, false) && failedReplica.get("dataDir") != null; if (isSharedFS) { diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java index 2ea163f1673..d999a6ff3c4 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java @@ -16,6 +16,12 @@ */ package org.apache.solr.cloud.api.collections; +import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; +import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP; +import static org.apache.solr.common.params.CommonAdminParams.ASYNC; + import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Collection; @@ -24,9 +30,9 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicReference; import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.Cmd; +import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; @@ -44,12 +50,6 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; -import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP; -import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; -import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP; -import static org.apache.solr.common.params.CommonAdminParams.ASYNC; - public class DeleteReplicaCmd implements Cmd { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -223,10 +223,6 @@ public class DeleteReplicaCmd implements Cmd { ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient()); String core = replica.getStr(ZkStateReader.CORE_NAME_PROP); String asyncId = message.getStr(ASYNC); - AtomicReference> requestMap = new AtomicReference<>(null); - if (asyncId != null) { - requestMap.set(new HashMap<>(1, 1.0f)); - } ModifiableSolrParams params = new ModifiableSolrParams(); params.add(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.UNLOAD.toString()); @@ -238,14 +234,15 @@ public class DeleteReplicaCmd implements Cmd { params.set(CoreAdminParams.DELETE_METRICS_HISTORY, message.getBool(CoreAdminParams.DELETE_METRICS_HISTORY, true)); boolean isLive = ocmh.zkStateReader.getClusterState().getLiveNodes().contains(replica.getNodeName()); + final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId); if (isLive) { - ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap.get()); + shardRequestTracker.sendShardRequest(replica.getNodeName(), params, shardHandler); } Callable callable = () -> { try { if (isLive) { - ocmh.processResponses(results, shardHandler, false, null, asyncId, requestMap.get()); + shardRequestTracker.processResponses(results, shardHandler, false, null); //check if the core unload removed the corenode zk entry if (ocmh.waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return Boolean.TRUE; diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java index 8e8c577d15f..88881cf21d6 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java @@ -23,21 +23,20 @@ import static org.apache.solr.common.params.CommonParams.NAME; import java.lang.invoke.MethodHandles; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Set; +import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Replica.State; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkNodeProps; -import org.apache.solr.common.cloud.Replica.State; import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction; import org.apache.solr.common.params.ModifiableSolrParams; @@ -68,7 +67,6 @@ public class DeleteSnapshotCmd implements OverseerCollectionMessageHandler.Cmd { String collectionName = ocmh.zkStateReader.getAliases().resolveSimpleAlias(extCollectionName); String commitName = message.getStr(CoreAdminParams.COMMIT_NAME); String asyncId = message.getStr(ASYNC); - Map requestMap = new HashMap<>(); NamedList shardRequestResults = new NamedList(); ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient()); SolrZkClient zkClient = ocmh.zkStateReader.getZkClient(); @@ -94,6 +92,7 @@ public class DeleteSnapshotCmd implements OverseerCollectionMessageHandler.Cmd { } } + final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId); log.info("Existing cores with snapshot for collection={} are {}", collectionName, existingCores); for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) { for (Replica replica : slice.getReplicas()) { @@ -114,12 +113,12 @@ public class DeleteSnapshotCmd implements OverseerCollectionMessageHandler.Cmd { params.set(CoreAdminParams.COMMIT_NAME, commitName); log.info("Sending deletesnapshot request to core={} with commitName={}", coreName, commitName); - ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap); + shardRequestTracker.sendShardRequest(replica.getNodeName(), params, shardHandler); } } } - ocmh.processResponses(shardRequestResults, shardHandler, false, null, asyncId, requestMap); + shardRequestTracker.processResponses(shardRequestResults, shardHandler, false, null); NamedList success = (NamedList) shardRequestResults.get("success"); List replicas = new ArrayList<>(); if (success != null) { diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java index 236d46f262c..55210f1fecd 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import org.apache.solr.client.solrj.request.CoreAdminRequest; import org.apache.solr.cloud.Overseer; +import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker; import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; @@ -163,8 +164,6 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd { log.info("Common hash range between source shard: {} and target shard: {} = " + splitRange, sourceSlice.getName(), targetSlice.getName()); Replica targetLeader = zkStateReader.getLeaderRetry(targetCollection.getName(), targetSlice.getName(), 10000); - // For tracking async calls. - Map requestMap = new HashMap<>(); log.info("Asking target leader node: " + targetLeader.getNodeName() + " core: " + targetLeader.getStr("core") + " to buffer updates"); @@ -172,10 +171,12 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd { params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTBUFFERUPDATES.toString()); params.set(CoreAdminParams.NAME, targetLeader.getStr("core")); - ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap); - - ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to request node to buffer updates", asyncId, requestMap); + { + final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId); + shardRequestTracker.sendShardRequest(targetLeader.getNodeName(), params, shardHandler); + shardRequestTracker.processResponses(results, shardHandler, true, "MIGRATE failed to request node to buffer updates"); + } ZkNodeProps m = new ZkNodeProps( Overseer.QUEUE_OPERATION, OverseerAction.ADDROUTINGRULE.toLower(), COLLECTION_PROP, sourceCollection.getName(), @@ -246,12 +247,17 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd { cmd.setState(Replica.State.ACTIVE); cmd.setCheckLive(true); cmd.setOnlyIfLeader(true); - // we don't want this to happen asynchronously - ocmh.sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()), shardHandler, null, null); - - ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection leader" + - " or timed out waiting for it to come up", asyncId, requestMap); + { + final ShardRequestTracker syncRequestTracker = ocmh.syncRequestTracker(); + // we don't want this to happen asynchronously + syncRequestTracker.sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()), + shardHandler); + syncRequestTracker.processResponses(results, shardHandler, true, + "MIGRATE failed to create temp collection leader" + + " or timed out waiting for it to come up"); + } + log.info("Asking source leader to split index"); params = new ModifiableSolrParams(); params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.SPLIT.toString()); @@ -262,9 +268,11 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd { String tempNodeName = sourceLeader.getNodeName(); - ocmh.sendShardRequest(tempNodeName, params, shardHandler, asyncId, requestMap); - ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to invoke SPLIT core admin command", asyncId, requestMap); - + { + final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId); + shardRequestTracker.sendShardRequest(tempNodeName, params, shardHandler); + shardRequestTracker.processResponses(results, shardHandler, true, "MIGRATE failed to invoke SPLIT core admin command"); + } log.info("Creating a replica of temporary collection: {} on the target leader node: {}", tempSourceCollectionName, targetLeader.getNodeName()); String tempCollectionReplica2 = Assign.buildSolrCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(), @@ -287,9 +295,11 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd { } ((AddReplicaCmd)ocmh.commandMap.get(ADDREPLICA)).addReplica(clusterState, new ZkNodeProps(props), results, null); - ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create replica of " + - "temporary collection in target leader node.", asyncId, requestMap); - + { + final ShardRequestTracker syncRequestTracker = ocmh.syncRequestTracker(); + syncRequestTracker.processResponses(results, shardHandler, true, "MIGRATE failed to create replica of " + + "temporary collection in target leader node."); + } coreNodeName = ocmh.waitForCoreNodeName(tempSourceCollectionName, targetLeader.getNodeName(), tempCollectionReplica2); // wait for the replicas to be seen as active on temp source leader @@ -303,11 +313,13 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd { cmd.setOnlyIfLeader(true); params = new ModifiableSolrParams(cmd.getParams()); - ocmh.sendShardRequest(tempSourceLeader.getNodeName(), params, shardHandler, asyncId, requestMap); - - ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection" + - " replica or timed out waiting for them to come up", asyncId, requestMap); + { + final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId); + shardRequestTracker.sendShardRequest(tempSourceLeader.getNodeName(), params, shardHandler); + shardRequestTracker.processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection" + + " replica or timed out waiting for them to come up"); + } log.info("Successfully created replica of temp source collection on target leader node"); log.info("Requesting merge of temp source collection replica to target leader"); @@ -316,20 +328,24 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd { params.set(CoreAdminParams.CORE, targetLeader.getStr("core")); params.set(CoreAdminParams.SRC_CORE, tempCollectionReplica2); - ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap); + { + final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId); + + shardRequestTracker.sendShardRequest(targetLeader.getNodeName(), params, shardHandler); String msg = "MIGRATE failed to merge " + tempCollectionReplica2 + " to " + targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName(); - ocmh.processResponses(results, shardHandler, true, msg, asyncId, requestMap); - + shardRequestTracker.processResponses(results, shardHandler, true, msg); + } log.info("Asking target leader to apply buffered updates"); params = new ModifiableSolrParams(); params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString()); params.set(CoreAdminParams.NAME, targetLeader.getStr("core")); - ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap); - ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to request node to apply buffered updates", - asyncId, requestMap); - + { + final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId); + shardRequestTracker.sendShardRequest(targetLeader.getNodeName(), params, shardHandler); + shardRequestTracker.processResponses(results, shardHandler, true, "MIGRATE failed to request node to apply buffered updates"); + } try { log.info("Deleting temporary collection: " + tempSourceCollectionName); props = makeMap( diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java index 19cbee71aae..81407c66156 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java @@ -307,11 +307,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler, params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString()); String asyncId = message.getStr(ASYNC); - Map requestMap = null; - if (asyncId != null) { - requestMap = new HashMap<>(); - } - collectionCmd(message, params, results, Replica.State.ACTIVE, asyncId, requestMap); + collectionCmd(message, params, results, Replica.State.ACTIVE, asyncId); } @SuppressWarnings("unchecked") @@ -596,34 +592,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler, } } - void sendShardRequest(String nodeName, ModifiableSolrParams params, - ShardHandler shardHandler, String asyncId, - Map requestMap) { - sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap, adminPath, zkStateReader); - - } - - public void sendShardRequest(String nodeName, ModifiableSolrParams params, ShardHandler shardHandler, - String asyncId, Map requestMap, String adminPath, - ZkStateReader zkStateReader) { - if (asyncId != null) { - String coreAdminAsyncId = asyncId + Math.abs(System.nanoTime()); - params.set(ASYNC, coreAdminAsyncId); - requestMap.put(nodeName, coreAdminAsyncId); - } - - ShardRequest sreq = new ShardRequest(); - params.set("qt", adminPath); - sreq.purpose = 1; - String replica = zkStateReader.getBaseUrlForNodeName(nodeName); - sreq.shards = new String[]{replica}; - sreq.actualShards = sreq.shards; - sreq.nodeName = nodeName; - sreq.params = params; - - shardHandler.submit(sreq, replica, sreq.params); - } - void addPropertyParams(ZkNodeProps message, ModifiableSolrParams params) { // Now add the property.key=value pairs for (String key : message.keySet()) { @@ -740,38 +708,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler, return ((AddReplicaCmd) commandMap.get(ADDREPLICA)).addReplica(clusterState, message, results, onComplete); } - void processResponses(NamedList results, ShardHandler shardHandler, boolean abortOnError, String msgOnError, - String asyncId, Map requestMap) { - processResponses(results, shardHandler, abortOnError, msgOnError, asyncId, requestMap, Collections.emptySet()); - } - - void processResponses(NamedList results, ShardHandler shardHandler, boolean abortOnError, String msgOnError, - String asyncId, Map requestMap, Set okayExceptions) { - //Processes all shard responses - ShardResponse srsp; - do { - srsp = shardHandler.takeCompletedOrError(); - if (srsp != null) { - processResponse(results, srsp, okayExceptions); - Throwable exception = srsp.getException(); - if (abortOnError && exception != null) { - // drain pending requests - while (srsp != null) { - srsp = shardHandler.takeCompletedOrError(); - } - throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError, exception); - } - } - } while (srsp != null); - - //If request is async wait for the core admin to complete before returning - if (asyncId != null) { - waitForAsyncCallsToComplete(requestMap, results); - requestMap.clear(); - } - } - - void validateConfigOrThrowSolrException(String configName) throws IOException, KeeperException, InterruptedException { boolean isValid = cloudManager.getDistribStateManager().hasData(ZkConfigManager.CONFIGS_ZKNODE + "/" + configName); if(!isValid) { @@ -804,8 +740,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler, } private List collectionCmd(ZkNodeProps message, ModifiableSolrParams params, - NamedList results, Replica.State stateMatcher, String asyncId, Map requestMap) { - return collectionCmd( message, params, results, stateMatcher, asyncId, requestMap, Collections.emptySet()); + NamedList results, Replica.State stateMatcher, String asyncId) { + return collectionCmd( message, params, results, stateMatcher, asyncId, Collections.emptySet()); } /** @@ -813,47 +749,25 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler, * @return List of replicas which is not live for receiving the request */ List collectionCmd(ZkNodeProps message, ModifiableSolrParams params, - NamedList results, Replica.State stateMatcher, String asyncId, Map requestMap, Set okayExceptions) { + NamedList results, Replica.State stateMatcher, String asyncId, Set okayExceptions) { log.info("Executing Collection Cmd={}, asyncId={}", params, asyncId); String collectionName = message.getStr(NAME); + @SuppressWarnings("deprecation") ShardHandler shardHandler = shardHandlerFactory.getShardHandler(overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient()); ClusterState clusterState = zkStateReader.getClusterState(); DocCollection coll = clusterState.getCollection(collectionName); List notLivesReplicas = new ArrayList<>(); + final ShardRequestTracker shardRequestTracker = new ShardRequestTracker(asyncId); for (Slice slice : coll.getSlices()) { - notLivesReplicas.addAll(sliceCmd(clusterState, params, stateMatcher, slice, shardHandler, asyncId, requestMap)); + notLivesReplicas.addAll(shardRequestTracker.sliceCmd(clusterState, params, stateMatcher, slice, shardHandler)); } - processResponses(results, shardHandler, false, null, asyncId, requestMap, okayExceptions); + shardRequestTracker.processResponses(results, shardHandler, false, null, okayExceptions); return notLivesReplicas; } - /** - * Send request to all replicas of a slice - * @return List of replicas which is not live for receiving the request - */ - List sliceCmd(ClusterState clusterState, ModifiableSolrParams params, Replica.State stateMatcher, - Slice slice, ShardHandler shardHandler, String asyncId, Map requestMap) { - List notLiveReplicas = new ArrayList<>(); - for (Replica replica : slice.getReplicas()) { - if ((stateMatcher == null || Replica.State.getState(replica.getStr(ZkStateReader.STATE_PROP)) == stateMatcher)) { - if (clusterState.liveNodesContain(replica.getStr(ZkStateReader.NODE_NAME_PROP))) { - // For thread safety, only simple clone the ModifiableSolrParams - ModifiableSolrParams cloneParams = new ModifiableSolrParams(); - cloneParams.add(params); - cloneParams.set(CoreAdminParams.CORE, replica.getStr(ZkStateReader.CORE_NAME_PROP)); - - sendShardRequest(replica.getStr(ZkStateReader.NODE_NAME_PROP), cloneParams, shardHandler, asyncId, requestMap); - } else { - notLiveReplicas.add(replica); - } - } - } - return notLiveReplicas; - } - - private void processResponse(NamedList results, ShardResponse srsp, Set okayExceptions) { + private void processResponse(NamedList results, ShardResponse srsp, Set okayExceptions) { Throwable e = srsp.getException(); String nodeName = srsp.getNodeName(); SolrResponse solrResponse = srsp.getSolrResponse(); @@ -862,8 +776,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler, processResponse(results, e, nodeName, solrResponse, shard, okayExceptions); } - @SuppressWarnings("unchecked") - private void processResponse(NamedList results, Throwable e, String nodeName, SolrResponse solrResponse, String shard, Set okayExceptions) { + @SuppressWarnings("deprecation") + private void processResponse(NamedList results, Throwable e, String nodeName, SolrResponse solrResponse, String shard, Set okayExceptions) { String rootThrowable = null; if (e instanceof RemoteSolrException) { rootThrowable = ((RemoteSolrException) e).getRootThrowable(); @@ -897,30 +811,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler, success.add(key, value); } - /* - * backward compatibility reasons, add the response with the async ID as top level. - * This can be removed in Solr 9 - */ - @Deprecated - public final static boolean INCLUDE_TOP_LEVEL_RESPONSE = true; - @SuppressWarnings("unchecked") - private void waitForAsyncCallsToComplete(Map requestMap, NamedList results) { - for (String k:requestMap.keySet()) { - log.debug("I am Waiting for :{}/{}", k, requestMap.get(k)); - NamedList reqResult = waitForCoreAdminAsyncCallToComplete(k, requestMap.get(k)); - if (INCLUDE_TOP_LEVEL_RESPONSE) { - results.add(requestMap.get(k), reqResult); - } - if ("failed".equalsIgnoreCase(((String)reqResult.get("STATUS")))) { - log.error("Error from shard {}: {}", k, reqResult); - addFailure(results, k, reqResult); - } else { - addSuccess(results, k, reqResult); - } - } - } - - private NamedList waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId) { + private NamedList waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId) { ShardHandler shardHandler = shardHandlerFactory.getShardHandler(); ModifiableSolrParams params = new ModifiableSolrParams(); params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTSTATUS.toString()); @@ -942,10 +833,10 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler, do { srsp = shardHandler.takeCompletedOrError(); if (srsp != null) { - NamedList results = new NamedList(); + NamedList results = new NamedList<>(); processResponse(results, srsp, Collections.emptySet()); if (srsp.getSolrResponse().getResponse() == null) { - NamedList response = new NamedList(); + NamedList response = new NamedList<>(); response.add("STATUS", "failed"); return response; } @@ -1045,4 +936,130 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler, protected interface Cmd { void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception; } + + /* + * backward compatibility reasons, add the response with the async ID as top level. + * This can be removed in Solr 9 + */ + @Deprecated + static boolean INCLUDE_TOP_LEVEL_RESPONSE = true; + + public ShardRequestTracker syncRequestTracker() { + return new ShardRequestTracker(null); + } + + public ShardRequestTracker asyncRequestTracker(String asyncId) { + return new ShardRequestTracker(asyncId); + } + + public class ShardRequestTracker{ + private final String asyncId; + private final NamedList shardAsyncIdByNode = new NamedList(); + + private ShardRequestTracker(String asyncId) { + this.asyncId = asyncId; + } + + /** + * Send request to all replicas of a slice + * @return List of replicas which is not live for receiving the request + */ + public List sliceCmd(ClusterState clusterState, ModifiableSolrParams params, Replica.State stateMatcher, + Slice slice, ShardHandler shardHandler) { + List notLiveReplicas = new ArrayList<>(); + for (Replica replica : slice.getReplicas()) { + if ((stateMatcher == null || Replica.State.getState(replica.getStr(ZkStateReader.STATE_PROP)) == stateMatcher)) { + if (clusterState.liveNodesContain(replica.getStr(ZkStateReader.NODE_NAME_PROP))) { + // For thread safety, only simple clone the ModifiableSolrParams + ModifiableSolrParams cloneParams = new ModifiableSolrParams(); + cloneParams.add(params); + cloneParams.set(CoreAdminParams.CORE, replica.getStr(ZkStateReader.CORE_NAME_PROP)); + + sendShardRequest(replica.getStr(ZkStateReader.NODE_NAME_PROP), cloneParams, shardHandler); + } else { + notLiveReplicas.add(replica); + } + } + } + return notLiveReplicas; + } + + public void sendShardRequest(String nodeName, ModifiableSolrParams params, + ShardHandler shardHandler) { + sendShardRequest(nodeName, params, shardHandler, adminPath, zkStateReader); + } + + public void sendShardRequest(String nodeName, ModifiableSolrParams params, ShardHandler shardHandler, + String adminPath, ZkStateReader zkStateReader) { + if (asyncId != null) { + String coreAdminAsyncId = asyncId + Math.abs(System.nanoTime()); + params.set(ASYNC, coreAdminAsyncId); + track(nodeName, coreAdminAsyncId); + } + + ShardRequest sreq = new ShardRequest(); + params.set("qt", adminPath); + sreq.purpose = 1; + String replica = zkStateReader.getBaseUrlForNodeName(nodeName); + sreq.shards = new String[] {replica}; + sreq.actualShards = sreq.shards; + sreq.nodeName = nodeName; + sreq.params = params; + + shardHandler.submit(sreq, replica, sreq.params); + } + + void processResponses(NamedList results, ShardHandler shardHandler, boolean abortOnError, String msgOnError) { + processResponses(results, shardHandler, abortOnError, msgOnError, Collections.emptySet()); + } + + void processResponses(NamedList results, ShardHandler shardHandler, boolean abortOnError, String msgOnError, + Set okayExceptions) { + // Processes all shard responses + ShardResponse srsp; + do { + srsp = shardHandler.takeCompletedOrError(); + if (srsp != null) { + processResponse(results, srsp, okayExceptions); + Throwable exception = srsp.getException(); + if (abortOnError && exception != null) { + // drain pending requests + while (srsp != null) { + srsp = shardHandler.takeCompletedOrError(); + } + throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError, exception); + } + } + } while (srsp != null); + + // If request is async wait for the core admin to complete before returning + if (asyncId != null) { + waitForAsyncCallsToComplete(results); + shardAsyncIdByNode.clear(); + } + } + + private void waitForAsyncCallsToComplete(NamedList results) { + for (Map.Entry nodeToAsync:shardAsyncIdByNode) { + final String node = nodeToAsync.getKey(); + final String shardAsyncId = nodeToAsync.getValue(); + log.debug("I am Waiting for :{}/{}", node, shardAsyncId); + NamedList reqResult = waitForCoreAdminAsyncCallToComplete(node, shardAsyncId); + if (INCLUDE_TOP_LEVEL_RESPONSE) { + results.add(shardAsyncId, reqResult); + } + if ("failed".equalsIgnoreCase(((String)reqResult.get("STATUS")))) { + log.error("Error from shard {}: {}", node, reqResult); + addFailure(results, node, reqResult); + } else { + addSuccess(results, node, reqResult); + } + } + } + + /** @deprecated consider to make it private after {@link CreateCollectionCmd} refactoring*/ + @Deprecated void track(String nodeName, String coreAdminAsyncId) { + shardAsyncIdByNode.add(nodeName, coreAdminAsyncId); + } + } } diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java index 198356220b3..20dfc6d1399 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java @@ -52,6 +52,7 @@ import java.util.concurrent.TimeoutException; import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper; import org.apache.solr.cloud.Overseer; +import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker; import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; @@ -95,7 +96,6 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd { ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient()); String asyncId = message.getStr(ASYNC); String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY); - Map requestMap = new HashMap<>(); CoreContainer cc = ocmh.overseer.getCoreContainer(); BackupRepository repository = cc.newBackupRepository(Optional.ofNullable(repo)); @@ -318,36 +318,42 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd { //refresh the location copy of collection state restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName); - //Copy data from backed up index to each replica - for (Slice slice : restoreCollection.getSlices()) { - ModifiableSolrParams params = new ModifiableSolrParams(); - params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.RESTORECORE.toString()); - params.set(NAME, "snapshot." + slice.getName()); - params.set(CoreAdminParams.BACKUP_LOCATION, backupPath.toASCIIString()); - params.set(CoreAdminParams.BACKUP_REPOSITORY, repo); - ocmh.sliceCmd(clusterState, params, null, slice, shardHandler, asyncId, requestMap); - } - ocmh.processResponses(new NamedList(), shardHandler, true, "Could not restore core", asyncId, requestMap); - - - for (Slice s: restoreCollection.getSlices()) { - for (Replica r : s.getReplicas()) { - String nodeName = r.getNodeName(); - String coreNodeName = r.getCoreName(); - Replica.State stateRep = r.getState(); - - log.debug("Calling REQUESTAPPLYUPDATES on: nodeName={}, coreNodeName={}, state={}" - , nodeName, coreNodeName, stateRep.name()); - + { + ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId); + // Copy data from backed up index to each replica + for (Slice slice : restoreCollection.getSlices()) { ModifiableSolrParams params = new ModifiableSolrParams(); - params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString()); - params.set(CoreAdminParams.NAME, coreNodeName); - - ocmh.sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap); + params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.RESTORECORE.toString()); + params.set(NAME, "snapshot." + slice.getName()); + params.set(CoreAdminParams.BACKUP_LOCATION, backupPath.toASCIIString()); + params.set(CoreAdminParams.BACKUP_REPOSITORY, repo); + shardRequestTracker.sliceCmd(clusterState, params, null, slice, shardHandler); } + shardRequestTracker.processResponses(new NamedList(), shardHandler, true, "Could not restore core"); + } - ocmh.processResponses(new NamedList(), shardHandler, true, "REQUESTAPPLYUPDATES calls did not succeed", asyncId, requestMap); + { + ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId); + for (Slice s : restoreCollection.getSlices()) { + for (Replica r : s.getReplicas()) { + String nodeName = r.getNodeName(); + String coreNodeName = r.getCoreName(); + Replica.State stateRep = r.getState(); + + log.debug("Calling REQUESTAPPLYUPDATES on: nodeName={}, coreNodeName={}, state={}", nodeName, coreNodeName, + stateRep.name()); + + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString()); + params.set(CoreAdminParams.NAME, coreNodeName); + + shardRequestTracker.sendShardRequest(nodeName, params, shardHandler); + } + + shardRequestTracker.processResponses(new NamedList(), shardHandler, true, + "REQUESTAPPLYUPDATES calls did not succeed"); + } } //Mark all shards in ACTIVE STATE @@ -361,7 +367,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd { ocmh.overseer.offerStateUpdate((Utils.toJSON(new ZkNodeProps(propMap)))); } - if (totalReplicasPerShard > 1) { + if (totalReplicasPerShard > 1) { log.info("Adding replicas to restored collection={}", restoreCollection.getName()); for (Slice slice : restoreCollection.getSlices()) { diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java index 46587338565..e36c76fe05a 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java @@ -40,6 +40,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.Variable.Type; import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData; import org.apache.solr.client.solrj.request.CoreAdminRequest; import org.apache.solr.cloud.Overseer; +import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker; import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; @@ -92,12 +93,13 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd { this.ocmh = ocmh; } + @SuppressWarnings("unchecked") @Override public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception { - split(state, message, results); + split(state, message,(NamedList) results); } - public boolean split(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception { + public boolean split(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception { boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false); String methodStr = message.getStr(CommonAdminParams.SPLIT_METHOD, SolrIndexSplitter.SplitMethod.REWRITE.toLower()); SolrIndexSplitter.SplitMethod splitMethod = SolrIndexSplitter.SplitMethod.get(methodStr); @@ -233,7 +235,6 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd { } final String asyncId = message.getStr(ASYNC); - Map requestMap = new HashMap<>(); String nodeName = parentShardLeader.getNodeName(); t = timings.sub("createSubSlicesAndLeadersInState"); @@ -285,30 +286,34 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd { ocmh.addReplica(clusterState, new ZkNodeProps(propMap), results, null); } + @SuppressWarnings("deprecation") ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient()); - - ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard leaders", asyncId, requestMap); - + { + final ShardRequestTracker syncRequestTracker = ocmh.syncRequestTracker(); + syncRequestTracker.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard leaders"); + } t.stop(); t = timings.sub("waitForSubSliceLeadersAlive"); - for (String subShardName : subShardNames) { - // wait for parent leader to acknowledge the sub-shard core - log.debug("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName); - String coreNodeName = ocmh.waitForCoreNodeName(collectionName, nodeName, subShardName); - CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState(); - cmd.setCoreName(subShardName); - cmd.setNodeName(nodeName); - cmd.setCoreNodeName(coreNodeName); - cmd.setState(Replica.State.ACTIVE); - cmd.setCheckLive(true); - cmd.setOnlyIfLeader(true); + { + final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId); + for (String subShardName : subShardNames) { + // wait for parent leader to acknowledge the sub-shard core + log.debug("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName); + String coreNodeName = ocmh.waitForCoreNodeName(collectionName, nodeName, subShardName); + CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState(); + cmd.setCoreName(subShardName); + cmd.setNodeName(nodeName); + cmd.setCoreNodeName(coreNodeName); + cmd.setState(Replica.State.ACTIVE); + cmd.setCheckLive(true); + cmd.setOnlyIfLeader(true); - ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams()); - ocmh.sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap); + ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams()); + shardRequestTracker.sendShardRequest(nodeName, p, shardHandler); + } + + shardRequestTracker.processResponses(results, shardHandler, true, "SPLITSHARD timed out waiting for subshard leaders to come up"); } - - ocmh.processResponses(results, shardHandler, true, "SPLITSHARD timed out waiting for subshard leaders to come up", - asyncId, requestMap); t.stop(); log.debug("Successfully created all sub-shards for collection " + collectionName + " parent shard: " + slice @@ -328,31 +333,37 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd { params.set(CoreAdminParams.RANGES, rangesStr); t = timings.sub("splitParentCore"); + { + final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId); + shardRequestTracker.sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler); - ocmh.sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler, asyncId, requestMap); - - ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to invoke SPLIT core admin command", asyncId, - requestMap); + shardRequestTracker.processResponses(results, shardHandler, true, "SPLITSHARD failed to invoke SPLIT core admin command"); + } t.stop(); log.debug("Index on shard: {} split into {} successfully", nodeName, subShardNames.size()); t = timings.sub("applyBufferedUpdates"); // apply buffered updates on sub-shards - for (int i = 0; i < subShardNames.size(); i++) { - String subShardName = subShardNames.get(i); + { + final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId); - log.debug("Applying buffered updates on : " + subShardName); + for (int i = 0; i < subShardNames.size(); i++) { + String subShardName = subShardNames.get(i); - params = new ModifiableSolrParams(); - params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString()); - params.set(CoreAdminParams.NAME, subShardName); + log.debug("Applying buffered updates on : " + subShardName); - ocmh.sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap); + params = new ModifiableSolrParams(); + params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString()); + params.set(CoreAdminParams.NAME, subShardName); + + shardRequestTracker.sendShardRequest(nodeName, params, shardHandler); + } + + shardRequestTracker.processResponses(results, shardHandler, true, + "SPLITSHARD failed while asking sub shard leaders" + + " to apply buffered updates"); } - - ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed while asking sub shard leaders" + - " to apply buffered updates", asyncId, requestMap); t.stop(); log.debug("Successfully applied buffered updates on : " + subShardNames); @@ -507,7 +518,10 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd { assert TestInjection.injectSplitFailureAfterReplicaCreation(); - ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard replicas", asyncId, requestMap); + { + final ShardRequestTracker syncRequestTracker = ocmh.syncRequestTracker(); + syncRequestTracker.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard replicas"); + } t.stop(); log.info("Successfully created all replica shards for all sub-slices " + subSlices); @@ -639,7 +653,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd { props.put(SHARD_ID_PROP, subSlice); ZkNodeProps m = new ZkNodeProps(props); try { - ocmh.commandMap.get(DELETESHARD).call(clusterState, m, new NamedList()); + ocmh.commandMap.get(DELETESHARD).call(clusterState, m, new NamedList()); } catch (Exception e) { log.warn("Cleanup failed after failed split of " + collectionName + "/" + parentShard + ": (deleting existing sub shard " + subSlice + ")", e); } diff --git a/solr/core/src/test/org/apache/solr/cloud/AsyncCallRequestStatusResponseTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/AsyncCallRequestStatusResponseTest.java similarity index 51% rename from solr/core/src/test/org/apache/solr/cloud/AsyncCallRequestStatusResponseTest.java rename to solr/core/src/test/org/apache/solr/cloud/api/collections/AsyncCallRequestStatusResponseTest.java index 31159b68f0c..d00dd679f5d 100644 --- a/solr/core/src/test/org/apache/solr/cloud/AsyncCallRequestStatusResponseTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/AsyncCallRequestStatusResponseTest.java @@ -14,32 +14,51 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.solr.cloud; +package org.apache.solr.cloud.api.collections; import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.CollectionAdminRequest.Create; import org.apache.solr.client.solrj.response.CollectionAdminResponse; import org.apache.solr.client.solrj.response.RequestStatusState; -import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler; +import org.apache.solr.cloud.AbstractFullDistribZkTestBase; +import org.apache.solr.cloud.SolrCloudTestCase; import org.apache.solr.common.util.NamedList; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; public class AsyncCallRequestStatusResponseTest extends SolrCloudTestCase { + private static boolean oldResponseEntries; + + @SuppressWarnings("deprecation") @BeforeClass public static void setupCluster() throws Exception { + oldResponseEntries = OverseerCollectionMessageHandler.INCLUDE_TOP_LEVEL_RESPONSE; + OverseerCollectionMessageHandler.INCLUDE_TOP_LEVEL_RESPONSE = random().nextBoolean(); configureCluster(2) .addConfig("conf", configset("cloud-minimal")) .configure(); } + + @SuppressWarnings("deprecation") + @AfterClass + public static void restoreFlag() throws Exception { + OverseerCollectionMessageHandler.INCLUDE_TOP_LEVEL_RESPONSE = oldResponseEntries; + } + @SuppressWarnings("deprecation") @Test public void testAsyncCallStatusResponse() throws Exception { - + int numShards = 4; + int numReplicas = 1; + Create createCollection = CollectionAdminRequest.createCollection("asynccall", "conf", numShards, numReplicas); + createCollection.setMaxShardsPerNode(100); String asyncId = - CollectionAdminRequest.createCollection("asynccall", "conf", 2, 1).processAsync(cluster.getSolrClient()); + createCollection.processAsync(cluster.getSolrClient()); - waitForState("Expected collection 'asynccall' to have 2 shards and 1 replica", "asynccall", clusterShape(2, 2)); + waitForState("Expected collection 'asynccall' to have "+numShards+" shards and "+ + numShards*numReplicas+" replica", "asynccall", clusterShape(numShards, numShards*numReplicas)); RequestStatusState state = AbstractFullDistribZkTestBase.getRequestStateAfterCompletion(asyncId, 30, cluster.getSolrClient()); assertEquals("Unexpected request status: " + state, "completed", state.getKey()); @@ -48,13 +67,25 @@ public class AsyncCallRequestStatusResponseTest extends SolrCloudTestCase { CollectionAdminResponse rsp = requestStatus.process(cluster.getSolrClient()); NamedList r = rsp.getResponse(); if (OverseerCollectionMessageHandler.INCLUDE_TOP_LEVEL_RESPONSE) { - assertEquals("Expected 5 elements in the response" + r, 5, r.size()); + final int actualNumOfElems = 3+(numShards*numReplicas); + // responseHeader, success, status, + old responses per every replica + assertEquals("Expected "+actualNumOfElems+" elements in the response" + r.jsonStr(), + actualNumOfElems, r.size()); } else { - assertEquals("Expected 3 elements in the response" + r, 3, r.size()); + // responseHeader, success, status + assertEquals("Expected 3 elements in the response" + r.jsonStr(), 3, r.size()); } assertNotNull("Expected 'responseHeader' response" + r, r.get("responseHeader")); - assertNotNull("Expected 'success' response" + r, r.get("success")); assertNotNull("Expected 'status' response" + r, r.get("status")); - assertEquals("Expected 4 elements in the success element" + r.get("success"), 4, ((NamedList)r.get("success")).size()); + { + final NamedList success = (NamedList)r.get("success"); + assertNotNull("Expected 'success' response" + r, success); + + final int actualSuccessElems = 2*(numShards*numReplicas); + // every replica responds once on submit and once on complete + assertEquals("Expected "+actualSuccessElems+ + " elements in the success element" + success.jsonStr(), + actualSuccessElems, success.size()); + } } } diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestRequestStatusCollectionAPI.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestRequestStatusCollectionAPI.java index 3d32d6c0622..a0fa70c2b37 100644 --- a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestRequestStatusCollectionAPI.java +++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestRequestStatusCollectionAPI.java @@ -17,8 +17,9 @@ package org.apache.solr.cloud.api.collections; import java.io.IOException; +import java.util.Arrays; +import java.util.Map; -import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.request.QueryRequest; @@ -44,8 +45,10 @@ public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest { params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.CREATE.toString()); params.set("name", "collection2"); - params.set("numShards", 2); - params.set("replicationFactor", 1); + int numShards = 2; + params.set("numShards", numShards); + int replicationFactor = 1; + params.set("replicationFactor", replicationFactor); params.set("maxShardsPerNode", 100); params.set("collection.configName", "conf1"); params.set(CommonAdminParams.ASYNC, "1000"); @@ -66,14 +69,18 @@ public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest { params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString()); params.set(OverseerCollectionMessageHandler.REQUESTID, "1000"); + NamedList createResponse =null; try { - message = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS); + createResponse = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS); + message = (String) createResponse.findRecursive("status","msg"); } catch (SolrServerException | IOException e) { e.printStackTrace(); } - assertEquals("found [1000] in completed tasks", message); - + assertEquals("found [1000] in completed tasks", message); + assertEquals("expecting "+numShards+" shard responses at "+createResponse, + numShards, numResponsesCompleted(createResponse)); + // Check for a random (hopefully non-existent request id params = new ModifiableSolrParams(); params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.REQUESTSTATUS.toString()); @@ -103,13 +110,18 @@ public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest { params = new ModifiableSolrParams(); params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString()); params.set(OverseerCollectionMessageHandler.REQUESTID, "1001"); + NamedList splitResponse=null; try { - message = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS); + splitResponse = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS); + message = (String) splitResponse.findRecursive("status","msg"); } catch (SolrServerException | IOException e) { e.printStackTrace(); } assertEquals("found [1001] in completed tasks", message); + // create * 2 + preprecovery *2 + split + req_apply_upd * 2 =7 + assertEquals("expecting "+(2+2+1+2)+" shard responses at "+splitResponse, + (2+2+1+2), numResponsesCompleted(splitResponse)); params = new ModifiableSolrParams(); params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.CREATE.toString()); @@ -131,12 +143,12 @@ public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest { params.set(OverseerCollectionMessageHandler.REQUESTID, "1002"); try { - message = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS); + NamedList response = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS); + message = (String) response.findRecursive("status","msg"); } catch (SolrServerException | IOException e) { e.printStackTrace(); } - assertEquals("found [1002] in failed tasks", message); params = new ModifiableSolrParams(); @@ -156,21 +168,38 @@ public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest { assertEquals("Task with the same requestid already exists.", r.get("error")); } + @SuppressWarnings("unchecked") + private int numResponsesCompleted(NamedList response) { + int sum=0; + for (String key: Arrays.asList("success","failure")) { + NamedList allStatuses = (NamedList)response.get(key); + if (allStatuses!=null) { + for (Map.Entry tuple: allStatuses) { + NamedList statusResponse = (NamedList) tuple.getValue(); + if (statusResponse.indexOf("STATUS",0)>=0) { + sum+=1; + } + } + } + } + return sum; + } + /** * Helper method to send a status request with specific retry limit and return * the message/null from the success response. */ - private String sendStatusRequestWithRetry(ModifiableSolrParams params, int maxCounter) + private NamedList sendStatusRequestWithRetry(ModifiableSolrParams params, int maxCounter) throws SolrServerException, IOException{ - String message = null; + NamedList r = null; while (maxCounter-- > 0) { - final NamedList r = sendRequest(params); - final NamedList status = (NamedList) r.get("status"); + r = sendRequest(params); + @SuppressWarnings("unchecked") + final NamedList status = (NamedList) r.get("status"); final RequestStatusState state = RequestStatusState.fromKey((String) status.get("state")); - message = (String) status.get("msg"); if (state == RequestStatusState.COMPLETED || state == RequestStatusState.FAILED) { - return message; + return r; } try { @@ -180,11 +209,11 @@ public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest { } // Return last state? - return message; + return r; } - protected NamedList sendRequest(ModifiableSolrParams params) throws SolrServerException, IOException { - SolrRequest request = new QueryRequest(params); + protected NamedList sendRequest(ModifiableSolrParams params) throws SolrServerException, IOException { + QueryRequest request = new QueryRequest(params); request.setPath("/admin/collections"); String baseUrl = ((HttpSolrClient) shardToJetty.get(SHARD1).get(0).client.getSolrClient()).getBaseURL(); diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java index 10e80ac0433..71658cc70a1 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java @@ -2341,7 +2341,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes } } - static RequestStatusState getRequestStateAfterCompletion(String requestId, int waitForSeconds, SolrClient client) + public static RequestStatusState getRequestStateAfterCompletion(String requestId, int waitForSeconds, SolrClient client) throws IOException, SolrServerException { RequestStatusState state = null; final TimeOut timeout = new TimeOut(waitForSeconds, TimeUnit.SECONDS, TimeSource.NANO_TIME);