SOLR-6832: Queries be served locally rather than being forwarded to another replica

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1659748 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy Potter 2015-02-14 02:37:34 +00:00
parent 3dc8f4a437
commit 429588097c
7 changed files with 204 additions and 2 deletions

View File

@ -111,6 +111,9 @@ New Features
* SOLR-7019: Support changing field key when using interval faceting.
(Tomás Fernández Löbbe)
* SOLR-6832: Queries be served locally rather than being forwarded to another replica.
(Sachin Goyal, Timothy Potter)
Bug Fixes
----------------------

View File

@ -40,13 +40,18 @@ import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.ConnectException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
@ -63,6 +68,7 @@ public class HttpShardHandler extends ShardHandler {
private Map<String,List<String>> shardToURLs;
private HttpClient httpClient;
protected static Logger log = LoggerFactory.getLogger(HttpShardHandler.class);
public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory, HttpClient httpClient) {
this.httpClient = httpClient;
@ -101,20 +107,69 @@ public class HttpShardHandler extends ShardHandler {
// Not thread safe... don't use in Callable.
// Don't modify the returned URL list.
private List<String> getURLs(String shard) {
private List<String> getURLs(ShardRequest sreq, String shard) {
List<String> urls = shardToURLs.get(shard);
if (urls == null) {
urls = httpShardHandlerFactory.makeURLList(shard);
preferCurrentHostForDistributedReq(sreq, urls);
shardToURLs.put(shard, urls);
}
return urls;
}
/**
* A distributed request is made via {@link LBHttpSolrClient} to the first live server in the URL list.
* This means it is just as likely to choose current host as any of the other hosts.
* This function makes sure that the cores of current host are always put first in the URL list.
* If all nodes prefer local-cores then a bad/heavily-loaded node will receive less requests from healthy nodes.
* This will help prevent a distributed deadlock or timeouts in all the healthy nodes due to one bad node.
*/
private void preferCurrentHostForDistributedReq(final ShardRequest sreq, final List<String> urls) {
if (sreq == null || sreq.rb == null || sreq.rb.req == null || urls == null || urls.size() <= 1)
return;
SolrQueryRequest req = sreq.rb.req;
// determine if we should apply the local preference
if (!req.getOriginalParams().getBool(CommonParams.PREFER_LOCAL_SHARDS, false))
return;
// Get this node's base URL from ZK
SolrCore core = req.getCore();
ZkController zkController = (core != null) ? core.getCoreDescriptor().getCoreContainer().getZkController() : null;
String currentHostAddress = (zkController != null) ? zkController.getBaseUrl() : null;
if (currentHostAddress == null) {
log.debug("Couldn't determine current host address to prefer local shards " +
"because either core is null? {} or there is no ZkController? {}",
Boolean.valueOf(core == null), Boolean.valueOf(zkController == null));
return;
}
if (log.isDebugEnabled())
log.debug("Trying to prefer local shard on {} among the urls: {}",
currentHostAddress, Arrays.toString(urls.toArray()));
ListIterator<String> itr = urls.listIterator();
while (itr.hasNext()) {
String url = itr.next();
if (url.startsWith(currentHostAddress)) {
// move current URL to the fore-front
itr.remove();
urls.add(0, url);
if (log.isDebugEnabled())
log.debug("Applied local shard preference for urls: {}",
Arrays.toString(urls.toArray()));
break;
}
}
}
@Override
public void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {
// do this outside of the callable for thread safety reasons
final List<String> urls = getURLs(shard);
final List<String> urls = getURLs(sreq, shard);
Callable<ShardResponse> task = new Callable<ShardResponse>() {
@Override

View File

@ -147,6 +147,7 @@ public class ResponseBuilder
public void addRequest(SearchComponent me, ShardRequest sreq) {
outgoing.add(sreq);
sreq.rb = this;
if ((sreq.purpose & ShardRequest.PURPOSE_PRIVATE) == 0) {
// if this isn't a private request, let other components modify it.
for (SearchComponent component : components) {

View File

@ -49,6 +49,7 @@ public class ShardRequest {
public ModifiableSolrParams params;
public ResponseBuilder rb;
/** list of responses... filled out by framework */
public List<ShardResponse> responses = new ArrayList<>();

View File

@ -805,6 +805,21 @@
<lst name="defaults">
<str name="echoParams">explicit</str>
<int name="rows">10</int>
<!-- Controls the distribution of a query to shards other than itself.
Consider making 'preferLocalShards' true when:
1) maxShardsPerNode > 1
2) Number of shards > 1
3) CloudSolrClient or LbHttpSolrServer is used by clients.
Without this option, every core broadcasts the distributed query to
a replica of each shard where the replicas are chosen randomly.
This option directs the cores to prefer cores hosted locally, thus
preventing network delays between machines.
This behavior also immunizes a bad/slow machine from slowing down all
the good machines (if those good machines were querying this bad one).
Specify this option=false for clients connecting through HttpSolrServer
-->
<bool name="preferLocalShards">false</bool>
</lst>
<!-- In addition to defaults, "appends" params can be specified
to identify values which should be appended to the list of

View File

@ -220,5 +220,9 @@ public interface CommonParams {
*/
public static final String REQUEST_PURPOSE = "requestPurpose";
/**
* When querying a node, prefer local node's cores for distributed queries.
*/
public static final String PREFER_LOCAL_SHARDS = "preferLocalShards";
}

View File

@ -17,6 +17,12 @@ package org.apache.solr.client.solrj.impl;
* limitations under the License.
*/
import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@ -44,6 +50,7 @@ import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -53,7 +60,11 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@ -115,6 +126,7 @@ public class CloudSolrClientTest extends AbstractFullDistribZkTestBase {
stateVersionParamTest();
customHttpClientTest();
testOverwriteOption();
preferLocalShardsTest();
}
private void testOverwriteOption() throws Exception, SolrServerException,
@ -349,6 +361,117 @@ public class CloudSolrClientTest extends AbstractFullDistribZkTestBase {
cloudClient.close();
}
/**
* Tests if the specification of 'preferLocalShards' in the query-params
* limits the distributed query to locally hosted shards only
*/
private void preferLocalShardsTest() throws Exception {
String collectionName = "localShardsTestColl";
int liveNodes = getCommonCloudSolrClient()
.getZkStateReader().getClusterState().getLiveNodes().size();
// For preferLocalShards to succeed in a test, every shard should have
// all its cores on the same node.
// Hence the below configuration for our collection
Map<String, Object> props = makeMap(
REPLICATION_FACTOR, liveNodes,
MAX_SHARDS_PER_NODE, liveNodes,
NUM_SLICES, liveNodes);
Map<String,List<Integer>> collectionInfos = new HashMap<String,List<Integer>>();
createCollection(collectionInfos, collectionName, props, controlClientCloud);
waitForRecoveriesToFinish(collectionName, false);
CloudSolrClient cloudClient = createCloudClient(collectionName);
assertNotNull(cloudClient);
handle.clear();
handle.put("timestamp", SKIPVAL);
waitForThingsToLevelOut(30);
// Remove any documents from previous test (if any)
controlClient.deleteByQuery("*:*");
cloudClient.deleteByQuery("*:*");
controlClient.commit();
cloudClient.commit();
// Add some new documents
SolrInputDocument doc1 = new SolrInputDocument();
doc1.addField(id, "0");
doc1.addField("a_t", "hello1");
SolrInputDocument doc2 = new SolrInputDocument();
doc2.addField(id, "2");
doc2.addField("a_t", "hello2");
SolrInputDocument doc3 = new SolrInputDocument();
doc3.addField(id, "3");
doc3.addField("a_t", "hello2");
UpdateRequest request = new UpdateRequest();
request.add(doc1);
request.add(doc2);
request.add(doc3);
request.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, false);
// Run the actual test for 'preferLocalShards'
queryWithPreferLocalShards(cloudClient, true, collectionName);
// Cleanup
controlClient.deleteByQuery("*:*");
cloudClient.deleteByQuery("*:*");
controlClient.commit();
cloudClient.commit();
cloudClient.close();
}
private void queryWithPreferLocalShards(CloudSolrClient cloudClient,
boolean preferLocalShards,
String collectionName)
throws Exception
{
SolrQuery qRequest = new SolrQuery();
qRequest.setQuery("*:*");
ModifiableSolrParams qParams = new ModifiableSolrParams();
qParams.add("preferLocalShards", Boolean.toString(preferLocalShards));
qParams.add("shards.info", "true");
qRequest.add(qParams);
// CloudSolrClient sends the request to some node.
// And since all the nodes are hosting cores from all shards, the
// distributed query formed by this node will select cores from the
// local shards only
QueryResponse qResponse = cloudClient.query (qRequest);
Object shardsInfo = qResponse.getResponse().get("shards.info");
assertNotNull("Unable to obtain shards.info", shardsInfo);
// Iterate over shards-info and check what cores responded
SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>)shardsInfo;
Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
List<String> shardAddresses = new ArrayList<String>();
while (itr.hasNext()) {
Map.Entry<String, ?> e = itr.next();
assertTrue("Did not find map-type value in shards.info", e.getValue() instanceof Map);
String shardAddress = (String)((Map)e.getValue()).get("shardAddress");
assertNotNull("shards.info did not return 'shardAddress' parameter", shardAddress);
shardAddresses.add(shardAddress);
}
log.info("Shards giving the response: " + Arrays.toString(shardAddresses.toArray()));
// Make sure the distributed queries were directed to a single node only
if (preferLocalShards) {
Set<Integer> ports = new HashSet<Integer>();
for (String shardAddr: shardAddresses) {
URL url = new URL (shardAddr);
ports.add(url.getPort());
}
// This assertion would hold true as long as every shard has a core on each node
assertTrue ("Response was not received from shards on a single node",
shardAddresses.size() > 1 && ports.size()==1);
}
}
private Long getNumRequests(String baseUrl, String collectionName) throws
SolrServerException, IOException {