> entry : allRequests.entrySet()) {
+ numRequests += entry.getValue().size();
+ }
+ // should still be 6
+ assertEquals("Total number of shard requests do not match expected", 6, numRequests);
+ }
+}
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java
index a7c33f04d65..6fbde774c09 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java
@@ -133,7 +133,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
waitForRecoveriesToFinish(collection, zkStateReader, verbose, failOnTimeout, 330);
}
- protected void waitForRecoveriesToFinish(String collection,
+ public static void waitForRecoveriesToFinish(String collection,
ZkStateReader zkStateReader, boolean verbose, boolean failOnTimeout, int timeoutSeconds)
throws Exception {
log.info("Wait for recoveries to finish - collection: " + collection + " failOnTimeout:" + failOnTimeout + " timeout (sec):" + timeoutSeconds);
@@ -170,7 +170,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
if (verbose) System.out.println("Gave up waiting for recovery to finish..");
if (failOnTimeout) {
Diagnostics.logThreadDumps("Gave up waiting for recovery to finish. THREAD DUMP:");
- printLayout();
+ zkStateReader.getZkClient().printLayoutToStdOut();
fail("There are still nodes recoverying - waited for " + timeoutSeconds + " seconds");
// won't get here
return;
diff --git a/solr/test-framework/src/java/org/apache/solr/handler/component/TrackingShardHandlerFactory.java b/solr/test-framework/src/java/org/apache/solr/handler/component/TrackingShardHandlerFactory.java
new file mode 100644
index 00000000000..0ced4cd4bf8
--- /dev/null
+++ b/solr/test-framework/src/java/org/apache/solr/handler/component/TrackingShardHandlerFactory.java
@@ -0,0 +1,288 @@
+package org.apache.solr.handler.component;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.servlet.SolrDispatchFilter;
+
+/**
+ * A ShardHandlerFactory that extends HttpShardHandlerFactory and
+ * tracks requests made to nodes/shards such that interested parties
+ * can watch such requests and make assertions inside tests
+ *
+ * This is a test helper only and should *not* be used for production.
+ */
+public class TrackingShardHandlerFactory extends HttpShardHandlerFactory {
+
+ private Queue queue;
+
+ /**
+ * Set the tracking queue for this factory. All the ShardHandler instances
+ * created from this factory will share the queue and call {@link java.util.Queue#offer(Object)}
+ * with a {@link org.apache.solr.handler.component.TrackingShardHandlerFactory.ShardRequestAndParams}
+ * instance whenever
+ * {@link org.apache.solr.handler.component.ShardHandler#submit(ShardRequest, String, org.apache.solr.common.params.ModifiableSolrParams)}
+ * is called before the request is actually submitted to the
+ * wrapped {@link org.apache.solr.handler.component.HttpShardHandlerFactory} instance.
+ *
+ * If a tracking queue is already set then this call will overwrite and replace the
+ * previous queue with this one.
+ *
+ * @param queue the {@link java.util.Queue} to be used for tracking shard requests
+ */
+ public synchronized void setTrackingQueue(Queue queue) {
+ this.queue = queue;
+ }
+
+ /**
+ * @return the {@link java.util.Queue} being used for tracking, null if none
+ * has been set
+ */
+ public synchronized Queue getTrackingQueue() {
+ return queue;
+ }
+
+ /**
+ * @return true if a tracking queue has been set through
+ * {@link #setTrackingQueue(java.util.List, java.util.Queue)}, false otherwise
+ */
+ public synchronized boolean isTracking() {
+ return queue != null;
+ }
+
+ @Override
+ public ShardHandler getShardHandler() {
+ final ShardHandlerFactory factory = this;
+ final ShardHandler wrapped = super.getShardHandler();
+ return new ShardHandler() {
+ @Override
+ public void checkDistributed(ResponseBuilder rb) {
+ wrapped.checkDistributed(rb);
+ }
+
+ @Override
+ public void submit(ShardRequest sreq, String shard, ModifiableSolrParams params) {
+ synchronized (TrackingShardHandlerFactory.this) {
+ if (isTracking()) {
+ queue.offer(new ShardRequestAndParams(sreq, shard, params));
+ }
+ }
+ wrapped.submit(sreq, shard, params);
+ }
+
+ @Override
+ public ShardResponse takeCompletedIncludingErrors() {
+ return wrapped.takeCompletedIncludingErrors();
+ }
+
+ @Override
+ public ShardResponse takeCompletedOrError() {
+ return wrapped.takeCompletedOrError();
+ }
+
+ @Override
+ public void cancelAll() {
+ wrapped.cancelAll();
+ }
+
+ @Override
+ public ShardHandlerFactory getShardHandlerFactory() {
+ return factory;
+ }
+ };
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ }
+
+ /**
+ * Sets the tracking queue for all nodes participating in this cluster. Once this method returns,
+ * all search and core admin requests distributed to shards will be submitted to the given queue.
+ *
+ * This is equivalent to calling:
+ * TrackingShardHandlerFactory.setTrackingQueue(cluster.getJettySolrRunners(), queue)
+ *
+ * @see org.apache.solr.handler.component.TrackingShardHandlerFactory#setTrackingQueue(java.util.List, java.util.Queue)
+ */
+ public static void setTrackingQueue(MiniSolrCloudCluster cluster, Queue queue) {
+ setTrackingQueue(cluster.getJettySolrRunners(), queue);
+ }
+
+ /**
+ * Sets the tracking queue for all nodes participating in this cluster. Once this method returns,
+ * all search and core admin requests distributed to shards will be submitted to the given queue.
+ *
+ * @param runners a list of {@link org.apache.solr.client.solrj.embedded.JettySolrRunner} nodes
+ * @param queue an implementation of {@link java.util.Queue} which
+ * accepts {@link org.apache.solr.handler.component.TrackingShardHandlerFactory.ShardRequestAndParams}
+ * instances
+ */
+ public static void setTrackingQueue(List runners, Queue queue) {
+ for (JettySolrRunner runner : runners) {
+ CoreContainer container = ((SolrDispatchFilter) runner.getDispatchFilter().getFilter()).getCores();
+ ShardHandlerFactory factory = container.getShardHandlerFactory();
+ assert factory instanceof TrackingShardHandlerFactory;
+ TrackingShardHandlerFactory trackingShardHandlerFactory = (TrackingShardHandlerFactory) factory;
+ trackingShardHandlerFactory.setTrackingQueue(queue);
+ }
+ }
+
+ public static class ShardRequestAndParams {
+ public String shard;
+ public ShardRequest sreq;
+ public ModifiableSolrParams params;
+
+ public ShardRequestAndParams(ShardRequest sreq, String shard, ModifiableSolrParams params) {
+ this.sreq = sreq;
+ this.params = params;
+ this.shard = shard;
+ }
+
+ @Override
+ public String toString() {
+ return "ShardRequestAndParams{" +
+ "shard='" + shard + '\'' +
+ ", sreq=" + sreq +
+ ", params=" + params +
+ '}';
+ }
+ }
+
+ /**
+ * A queue having helper methods to select requests by shard and purpose.
+ *
+ * @see org.apache.solr.handler.component.TrackingShardHandlerFactory#setTrackingQueue(java.util.List, java.util.Queue)
+ */
+ public static class RequestTrackingQueue extends LinkedList {
+ private final Map> requests = new HashMap<>();
+
+ @Override
+ public boolean offer(ShardRequestAndParams shardRequestAndParams) {
+ List list = requests.get(shardRequestAndParams.shard);
+ if (list == null) {
+ list = new ArrayList<>();
+ }
+ list.add(shardRequestAndParams);
+ requests.put(shardRequestAndParams.shard, list);
+ return super.offer(shardRequestAndParams);
+ }
+
+ @Override
+ public void clear() {
+ requests.clear();
+ }
+
+ /**
+ * Retrieve request recorded by this queue which were sent to given collection, shard and purpose
+ *
+ * @param zkStateReader the {@link org.apache.solr.common.cloud.ZkStateReader} from which cluster state is read
+ * @param collectionName the given collection name for which requests have to be extracted
+ * @param shardId the given shard name for which requests have to be extracted
+ * @param purpose the shard purpose
+ * @return instance of {@link org.apache.solr.handler.component.TrackingShardHandlerFactory.ShardRequestAndParams}
+ * or null if none is found
+ * @throws java.lang.RuntimeException if more than one request is found to the same shard with the same purpose
+ */
+ public ShardRequestAndParams getShardRequestByPurpose(ZkStateReader zkStateReader, String collectionName, String shardId, int purpose) throws RuntimeException {
+ List shardRequests = getShardRequests(zkStateReader, collectionName, shardId);
+ List result = new ArrayList<>(1);
+ for (TrackingShardHandlerFactory.ShardRequestAndParams request : shardRequests) {
+ if ((request.sreq.purpose & purpose) != 0) {
+ result.add(request);
+ }
+ }
+ if (result.size() > 1) {
+ throw new RuntimeException("Multiple requests to the same shard with the same purpose were found. Requests: " + result);
+ }
+ return result.isEmpty() ? null : result.get(0);
+ }
+
+ /**
+ * Retrieve all requests recorded by this queue which were sent to given collection and shard
+ *
+ * @param zkStateReader the {@link org.apache.solr.common.cloud.ZkStateReader} from which cluster state is read
+ * @param collectionName the given collection name for which requests have to be extracted
+ * @param shardId the given shard name for which requests have to be extracted
+ * @return a list of {@link org.apache.solr.handler.component.TrackingShardHandlerFactory.ShardRequestAndParams}
+ * or empty list if none are found
+ */
+ public List getShardRequests(ZkStateReader zkStateReader, String collectionName, String shardId) {
+ DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
+ assert collection != null;
+ Slice slice = collection.getSlice(shardId);
+ assert slice != null;
+
+ List results = new ArrayList<>();
+ for (Map.Entry entry : slice.getReplicasMap().entrySet()) {
+ String coreUrl = new ZkCoreNodeProps(entry.getValue()).getCoreUrl();
+ List list = requests.get(coreUrl);
+ if (list != null) {
+ results.addAll(list);
+ }
+ }
+ return results;
+ }
+
+ /**
+ * Retrieves all core admin requests distributed to nodes by Collection API commands
+ *
+ * @return a list of {@link org.apache.solr.handler.component.TrackingShardHandlerFactory.ShardRequestAndParams}
+ * or empty if none found
+ */
+ public List getCoreAdminRequests() {
+ List results = new ArrayList<>();
+ Map> map = getAllRequests();
+ for (Map.Entry> entry : map.entrySet()) {
+ for (ShardRequestAndParams shardRequestAndParams : entry.getValue()) {
+ if (shardRequestAndParams.sreq.purpose == ShardRequest.PURPOSE_PRIVATE) {
+ results.add(shardRequestAndParams);
+ }
+ }
+ }
+ return results;
+ }
+
+ /**
+ * Retrieves all requests recorded by this collection as a Map of shard address (string url)
+ * to a list of {@link org.apache.solr.handler.component.TrackingShardHandlerFactory.ShardRequestAndParams}
+ *
+ * @return a {@link java.util.Map} of url strings to {@link org.apache.solr.handler.component.TrackingShardHandlerFactory.ShardRequestAndParams} objects
+ * or empty map if none have been recorded
+ */
+ public Map> getAllRequests() {
+ return requests;
+ }
+ }
+}
diff --git a/solr/test-framework/src/java/org/apache/solr/handler/component/package.html b/solr/test-framework/src/java/org/apache/solr/handler/component/package.html
new file mode 100644
index 00000000000..63496af086a
--- /dev/null
+++ b/solr/test-framework/src/java/org/apache/solr/handler/component/package.html
@@ -0,0 +1,24 @@
+
+
+
+
+
+
+Class for tracking shard requests
+
+