SOLR-7147: Introduce new TrackingShardHandlerFactory for monitoring what requests are sent to shards during tests

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1662209 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2015-02-25 13:33:55 +00:00
parent 8e0ce8dd0b
commit 6ae36bc6e9
7 changed files with 501 additions and 53 deletions

View File

@ -194,6 +194,9 @@ Other Changes
* SOLR-7156: Fix test failures due to resource leaks on windows.
(Ishan Chattopadhyaya via shalin)
* SOLR-7147: Introduce new TrackingShardHandlerFactory for monitoring what requests
are sent to shards during tests. (hossman, shalin)
================== 5.0.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

View File

@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--
solr.xml specifying a custom shardHandlerFactory
-->
<solr>
<str name="shareSchema">${shareSchema:false}</str>
<str name="configSetBaseDir">${configSetBaseDir:configsets}</str>
<str name="coreRootDirectory">${coreRootDirectory:.}</str>
<solrcloud>
<str name="host">127.0.0.1</str>
<str name="hostContext">${hostContext:solr}</str>
<int name="hostPort">${hostPort:8983}</int>
<int name="zkClientTimeout">${solr.zkclienttimeout:30000}</int>
<bool name="genericCoreNodeNames">${genericCoreNodeNames:true}</bool>
<int name="distribUpdateConnTimeout">${distribUpdateConnTimeout:45000}</int>
<int name="distribUpdateSoTimeout">${distribUpdateSoTimeout:340000}</int>
<int name="autoReplicaFailoverWaitAfterExpiration">${autoReplicaFailoverWaitAfterExpiration:10000}</int>
<int name="autoReplicaFailoverWorkLoopDelay">${autoReplicaFailoverWorkLoopDelay:10000}</int>
<int name="autoReplicaFailoverBadNodeExpiration">${autoReplicaFailoverBadNodeExpiration:60000}</int>
</solrcloud>
<shardHandlerFactory name="shardHandlerFactory"
class="org.apache.solr.handler.component.TrackingShardHandlerFactory">
<str name="urlScheme">${urlScheme:}</str>
<int name="socketTimeout">${socketTimeout:90000}</int>
<int name="connTimeout">${connTimeout:15000}</int>
</shardHandlerFactory>
</solr>

View File

@ -126,7 +126,7 @@ public class TestMiniSolrCloudCluster extends LuceneTestCase {
try(SolrZkClient zkClient = new SolrZkClient
(miniCluster.getZkServer().getZkAddress(), AbstractZkTestCase.TIMEOUT, 45000, null)) {
ZkStateReader zkStateReader = new ZkStateReader(zkClient);
waitForRecoveriesToFinish(collectionName, zkStateReader, true, true, 330);
AbstractDistribZkTestBase.waitForRecoveriesToFinish(collectionName, zkStateReader, true, true, 330);
// modify/query collection
CloudSolrClient cloudSolrClient = miniCluster.getSolrClient();
@ -168,54 +168,4 @@ public class TestMiniSolrCloudCluster extends LuceneTestCase {
}
}
protected void waitForRecoveriesToFinish(String collection,
ZkStateReader zkStateReader, boolean verbose, boolean failOnTimeout, int timeoutSeconds)
throws Exception {
log.info("Wait for recoveries to finish - collection: " + collection + " failOnTimeout:" + failOnTimeout + " timeout (sec):" + timeoutSeconds);
boolean cont = true;
int cnt = 0;
while (cont) {
if (verbose) System.out.println("-");
boolean sawLiveRecovering = false;
zkStateReader.updateClusterState(true);
ClusterState clusterState = zkStateReader.getClusterState();
Map<String,Slice> slices = clusterState.getSlicesMap(collection);
assertNotNull("Could not find collection:" + collection, slices);
for (Map.Entry<String,Slice> entry : slices.entrySet()) {
Map<String,Replica> shards = entry.getValue().getReplicasMap();
for (Map.Entry<String,Replica> shard : shards.entrySet()) {
if (verbose) System.out.println("rstate:"
+ shard.getValue().getStr(ZkStateReader.STATE_PROP)
+ " live:"
+ clusterState.liveNodesContain(shard.getValue().getNodeName()));
String state = shard.getValue().getStr(ZkStateReader.STATE_PROP);
if ((state.equals(ZkStateReader.RECOVERING) || state
.equals(ZkStateReader.SYNC) || state.equals(ZkStateReader.DOWN))
&& clusterState.liveNodesContain(shard.getValue().getStr(
ZkStateReader.NODE_NAME_PROP))) {
sawLiveRecovering = true;
}
}
}
if (!sawLiveRecovering || cnt == timeoutSeconds) {
if (!sawLiveRecovering) {
if (verbose) System.out.println("no one is recoverying");
} else {
if (verbose) System.out.println("Gave up waiting for recovery to finish..");
if (failOnTimeout) {
fail("There are still nodes recoverying - waited for " + timeoutSeconds + " seconds");
// won't get here
return;
}
}
cont = false;
} else {
Thread.sleep(1000);
}
cnt++;
}
log.info("Recoveries finished - collection: " + collection);
}
}

View File

@ -0,0 +1,135 @@
package org.apache.solr.handler.component;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.solr.BaseDistributedSearchTestCase;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.servlet.SolrDispatchFilter;
import org.junit.Test;
/**
* Test for {@link org.apache.solr.handler.component.TrackingShardHandlerFactory}
* See SOLR-7147 for more information
*/
@SolrTestCaseJ4.SuppressSSL
public class TestTrackingShardHandlerFactory extends AbstractFullDistribZkTestBase {
public TestTrackingShardHandlerFactory() {
schemaString = "schema15.xml"; // we need a string id
}
@Override
protected String getSolrXml() {
return "solr-trackingshardhandler.xml";
}
@Test
@BaseDistributedSearchTestCase.ShardsFixed(num = 2)
public void testRequestTracking() throws Exception {
String collectionName = "testTwoPhase";
List<JettySolrRunner> runners = new ArrayList<>(jettys);
runners.add(controlJetty);
TrackingShardHandlerFactory.RequestTrackingQueue trackingQueue = new TrackingShardHandlerFactory.RequestTrackingQueue();
TrackingShardHandlerFactory.setTrackingQueue(runners, trackingQueue);
for (JettySolrRunner runner : runners) {
CoreContainer container = ((SolrDispatchFilter) runner.getDispatchFilter().getFilter()).getCores();
ShardHandlerFactory factory = container.getShardHandlerFactory();
assert factory instanceof TrackingShardHandlerFactory;
TrackingShardHandlerFactory trackingShardHandlerFactory = (TrackingShardHandlerFactory) factory;
assertSame(trackingQueue, trackingShardHandlerFactory.getTrackingQueue());
}
createCollection(collectionName, 2, 1, 1);
waitForRecoveriesToFinish(collectionName, true);
List<TrackingShardHandlerFactory.ShardRequestAndParams> coreAdminRequests = trackingQueue.getCoreAdminRequests();
assertNotNull(coreAdminRequests);
assertEquals("Unexpected number of core admin requests were found", 2, coreAdminRequests.size());
CloudSolrClient client = cloudClient;
client.setDefaultCollection(collectionName);
/*
hash of b is 95de7e03 high bits=2 shard=shard1
hash of e is 656c4367 high bits=1 shard=shard2
*/
for (int i = 0; i < 10; i++) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", (i % 2 == 0 ? "b!" : "e!") + i);
doc.addField("a_i", i);
doc.addField("a_t", "text_" + i);
client.add(doc);
}
client.commit();
client.query(new SolrQuery("*:*"));
TrackingShardHandlerFactory.ShardRequestAndParams getTopIdsRequest = trackingQueue.getShardRequestByPurpose(client.getZkStateReader(), collectionName, "shard1", ShardRequest.PURPOSE_GET_TOP_IDS);
assertNotNull(getTopIdsRequest);
getTopIdsRequest = trackingQueue.getShardRequestByPurpose(client.getZkStateReader(), collectionName, "shard2", ShardRequest.PURPOSE_GET_TOP_IDS);
assertNotNull(getTopIdsRequest);
TrackingShardHandlerFactory.ShardRequestAndParams getFieldsRequest = trackingQueue.getShardRequestByPurpose(client.getZkStateReader(), collectionName, "shard1", ShardRequest.PURPOSE_GET_FIELDS);
assertNotNull(getFieldsRequest);
getFieldsRequest = trackingQueue.getShardRequestByPurpose(client.getZkStateReader(), collectionName, "shard2", ShardRequest.PURPOSE_GET_FIELDS);
assertNotNull(getFieldsRequest);
int numRequests = 0;
Map<String, List<TrackingShardHandlerFactory.ShardRequestAndParams>> allRequests = trackingQueue.getAllRequests();
for (Map.Entry<String, List<TrackingShardHandlerFactory.ShardRequestAndParams>> entry : allRequests.entrySet()) {
numRequests += entry.getValue().size();
}
// 4 shard requests + 2 core admin requests (invoked by create collection API)
assertEquals("Total number of requests do not match expected", 6, numRequests);
// reset
TrackingShardHandlerFactory.setTrackingQueue(runners, null);
for (JettySolrRunner runner : runners) {
CoreContainer container = ((SolrDispatchFilter) runner.getDispatchFilter().getFilter()).getCores();
ShardHandlerFactory factory = container.getShardHandlerFactory();
assert factory instanceof TrackingShardHandlerFactory;
TrackingShardHandlerFactory trackingShardHandlerFactory = (TrackingShardHandlerFactory) factory;
assertFalse(trackingShardHandlerFactory.isTracking());
}
// make another request and verify
client.query(new SolrQuery("*:*"));
numRequests = 0;
allRequests = trackingQueue.getAllRequests();
for (Map.Entry<String, List<TrackingShardHandlerFactory.ShardRequestAndParams>> entry : allRequests.entrySet()) {
numRequests += entry.getValue().size();
}
// should still be 6
assertEquals("Total number of shard requests do not match expected", 6, numRequests);
}
}

View File

@ -133,7 +133,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
waitForRecoveriesToFinish(collection, zkStateReader, verbose, failOnTimeout, 330);
}
protected void waitForRecoveriesToFinish(String collection,
public static void waitForRecoveriesToFinish(String collection,
ZkStateReader zkStateReader, boolean verbose, boolean failOnTimeout, int timeoutSeconds)
throws Exception {
log.info("Wait for recoveries to finish - collection: " + collection + " failOnTimeout:" + failOnTimeout + " timeout (sec):" + timeoutSeconds);
@ -170,7 +170,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
if (verbose) System.out.println("Gave up waiting for recovery to finish..");
if (failOnTimeout) {
Diagnostics.logThreadDumps("Gave up waiting for recovery to finish. THREAD DUMP:");
printLayout();
zkStateReader.getZkClient().printLayoutToStdOut();
fail("There are still nodes recoverying - waited for " + timeoutSeconds + " seconds");
// won't get here
return;

View File

@ -0,0 +1,288 @@
package org.apache.solr.handler.component;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.servlet.SolrDispatchFilter;
/**
* A ShardHandlerFactory that extends HttpShardHandlerFactory and
* tracks requests made to nodes/shards such that interested parties
* can watch such requests and make assertions inside tests
* <p>
* This is a test helper only and should *not* be used for production.
*/
public class TrackingShardHandlerFactory extends HttpShardHandlerFactory {
private Queue<ShardRequestAndParams> queue;
/**
* Set the tracking queue for this factory. All the ShardHandler instances
* created from this factory will share the queue and call {@link java.util.Queue#offer(Object)}
* with a {@link org.apache.solr.handler.component.TrackingShardHandlerFactory.ShardRequestAndParams}
* instance whenever
* {@link org.apache.solr.handler.component.ShardHandler#submit(ShardRequest, String, org.apache.solr.common.params.ModifiableSolrParams)}
* is called before the request is actually submitted to the
* wrapped {@link org.apache.solr.handler.component.HttpShardHandlerFactory} instance.
* <p>
* If a tracking queue is already set then this call will overwrite and replace the
* previous queue with this one.
*
* @param queue the {@link java.util.Queue} to be used for tracking shard requests
*/
public synchronized void setTrackingQueue(Queue<ShardRequestAndParams> queue) {
this.queue = queue;
}
/**
* @return the {@link java.util.Queue} being used for tracking, null if none
* has been set
*/
public synchronized Queue<ShardRequestAndParams> getTrackingQueue() {
return queue;
}
/**
* @return true if a tracking queue has been set through
* {@link #setTrackingQueue(java.util.List, java.util.Queue)}, false otherwise
*/
public synchronized boolean isTracking() {
return queue != null;
}
@Override
public ShardHandler getShardHandler() {
final ShardHandlerFactory factory = this;
final ShardHandler wrapped = super.getShardHandler();
return new ShardHandler() {
@Override
public void checkDistributed(ResponseBuilder rb) {
wrapped.checkDistributed(rb);
}
@Override
public void submit(ShardRequest sreq, String shard, ModifiableSolrParams params) {
synchronized (TrackingShardHandlerFactory.this) {
if (isTracking()) {
queue.offer(new ShardRequestAndParams(sreq, shard, params));
}
}
wrapped.submit(sreq, shard, params);
}
@Override
public ShardResponse takeCompletedIncludingErrors() {
return wrapped.takeCompletedIncludingErrors();
}
@Override
public ShardResponse takeCompletedOrError() {
return wrapped.takeCompletedOrError();
}
@Override
public void cancelAll() {
wrapped.cancelAll();
}
@Override
public ShardHandlerFactory getShardHandlerFactory() {
return factory;
}
};
}
@Override
public void close() {
super.close();
}
/**
* Sets the tracking queue for all nodes participating in this cluster. Once this method returns,
* all search and core admin requests distributed to shards will be submitted to the given queue.
* <p>
* This is equivalent to calling:
* <code>TrackingShardHandlerFactory.setTrackingQueue(cluster.getJettySolrRunners(), queue)</code>
*
* @see org.apache.solr.handler.component.TrackingShardHandlerFactory#setTrackingQueue(java.util.List, java.util.Queue)
*/
public static void setTrackingQueue(MiniSolrCloudCluster cluster, Queue<ShardRequestAndParams> queue) {
setTrackingQueue(cluster.getJettySolrRunners(), queue);
}
/**
* Sets the tracking queue for all nodes participating in this cluster. Once this method returns,
* all search and core admin requests distributed to shards will be submitted to the given queue.
*
* @param runners a list of {@link org.apache.solr.client.solrj.embedded.JettySolrRunner} nodes
* @param queue an implementation of {@link java.util.Queue} which
* accepts {@link org.apache.solr.handler.component.TrackingShardHandlerFactory.ShardRequestAndParams}
* instances
*/
public static void setTrackingQueue(List<JettySolrRunner> runners, Queue<ShardRequestAndParams> queue) {
for (JettySolrRunner runner : runners) {
CoreContainer container = ((SolrDispatchFilter) runner.getDispatchFilter().getFilter()).getCores();
ShardHandlerFactory factory = container.getShardHandlerFactory();
assert factory instanceof TrackingShardHandlerFactory;
TrackingShardHandlerFactory trackingShardHandlerFactory = (TrackingShardHandlerFactory) factory;
trackingShardHandlerFactory.setTrackingQueue(queue);
}
}
public static class ShardRequestAndParams {
public String shard;
public ShardRequest sreq;
public ModifiableSolrParams params;
public ShardRequestAndParams(ShardRequest sreq, String shard, ModifiableSolrParams params) {
this.sreq = sreq;
this.params = params;
this.shard = shard;
}
@Override
public String toString() {
return "ShardRequestAndParams{" +
"shard='" + shard + '\'' +
", sreq=" + sreq +
", params=" + params +
'}';
}
}
/**
* A queue having helper methods to select requests by shard and purpose.
*
* @see org.apache.solr.handler.component.TrackingShardHandlerFactory#setTrackingQueue(java.util.List, java.util.Queue)
*/
public static class RequestTrackingQueue extends LinkedList<ShardRequestAndParams> {
private final Map<String, List<ShardRequestAndParams>> requests = new HashMap<>();
@Override
public boolean offer(ShardRequestAndParams shardRequestAndParams) {
List<ShardRequestAndParams> list = requests.get(shardRequestAndParams.shard);
if (list == null) {
list = new ArrayList<>();
}
list.add(shardRequestAndParams);
requests.put(shardRequestAndParams.shard, list);
return super.offer(shardRequestAndParams);
}
@Override
public void clear() {
requests.clear();
}
/**
* Retrieve request recorded by this queue which were sent to given collection, shard and purpose
*
* @param zkStateReader the {@link org.apache.solr.common.cloud.ZkStateReader} from which cluster state is read
* @param collectionName the given collection name for which requests have to be extracted
* @param shardId the given shard name for which requests have to be extracted
* @param purpose the shard purpose
* @return instance of {@link org.apache.solr.handler.component.TrackingShardHandlerFactory.ShardRequestAndParams}
* or null if none is found
* @throws java.lang.RuntimeException if more than one request is found to the same shard with the same purpose
*/
public ShardRequestAndParams getShardRequestByPurpose(ZkStateReader zkStateReader, String collectionName, String shardId, int purpose) throws RuntimeException {
List<TrackingShardHandlerFactory.ShardRequestAndParams> shardRequests = getShardRequests(zkStateReader, collectionName, shardId);
List<TrackingShardHandlerFactory.ShardRequestAndParams> result = new ArrayList<>(1);
for (TrackingShardHandlerFactory.ShardRequestAndParams request : shardRequests) {
if ((request.sreq.purpose & purpose) != 0) {
result.add(request);
}
}
if (result.size() > 1) {
throw new RuntimeException("Multiple requests to the same shard with the same purpose were found. Requests: " + result);
}
return result.isEmpty() ? null : result.get(0);
}
/**
* Retrieve all requests recorded by this queue which were sent to given collection and shard
*
* @param zkStateReader the {@link org.apache.solr.common.cloud.ZkStateReader} from which cluster state is read
* @param collectionName the given collection name for which requests have to be extracted
* @param shardId the given shard name for which requests have to be extracted
* @return a list of {@link org.apache.solr.handler.component.TrackingShardHandlerFactory.ShardRequestAndParams}
* or empty list if none are found
*/
public List<ShardRequestAndParams> getShardRequests(ZkStateReader zkStateReader, String collectionName, String shardId) {
DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
assert collection != null;
Slice slice = collection.getSlice(shardId);
assert slice != null;
List<TrackingShardHandlerFactory.ShardRequestAndParams> results = new ArrayList<>();
for (Map.Entry<String, Replica> entry : slice.getReplicasMap().entrySet()) {
String coreUrl = new ZkCoreNodeProps(entry.getValue()).getCoreUrl();
List<TrackingShardHandlerFactory.ShardRequestAndParams> list = requests.get(coreUrl);
if (list != null) {
results.addAll(list);
}
}
return results;
}
/**
* Retrieves all core admin requests distributed to nodes by Collection API commands
*
* @return a list of {@link org.apache.solr.handler.component.TrackingShardHandlerFactory.ShardRequestAndParams}
* or empty if none found
*/
public List<ShardRequestAndParams> getCoreAdminRequests() {
List<ShardRequestAndParams> results = new ArrayList<>();
Map<String, List<ShardRequestAndParams>> map = getAllRequests();
for (Map.Entry<String, List<ShardRequestAndParams>> entry : map.entrySet()) {
for (ShardRequestAndParams shardRequestAndParams : entry.getValue()) {
if (shardRequestAndParams.sreq.purpose == ShardRequest.PURPOSE_PRIVATE) {
results.add(shardRequestAndParams);
}
}
}
return results;
}
/**
* Retrieves all requests recorded by this collection as a Map of shard address (string url)
* to a list of {@link org.apache.solr.handler.component.TrackingShardHandlerFactory.ShardRequestAndParams}
*
* @return a {@link java.util.Map} of url strings to {@link org.apache.solr.handler.component.TrackingShardHandlerFactory.ShardRequestAndParams} objects
* or empty map if none have been recorded
*/
public Map<String, List<ShardRequestAndParams>> getAllRequests() {
return requests;
}
}
}

View File

@ -0,0 +1,24 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!doctype html public "-//w3c//dtd html 4.0 transitional//en">
<!-- not a package-info.java, because we already defined this package in core/ -->
<html>
<body>
Class for tracking shard requests
</body>
</html>