mirror of https://github.com/apache/lucene.git
SOLR-7714: Reduce SearchHandler's use of ShardHandler objects across shards in a search
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1687143 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ac84d8abf4
commit
b255b716c3
|
@ -186,8 +186,13 @@ Bug Fixes
|
|||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
||||
* SOLR-7660: Avoid redundant 'exists' calls made to ZK while fetching cluster state updates. (shalin)
|
||||
|
||||
* SOLR-7714: Reduce SearchHandler's use of ShardHandler objects across shards in a search,
|
||||
from one for each shard and the federator, to just one for the federator.
|
||||
(Christine Poerschke via Ramkumar Aiyengar)
|
||||
|
||||
Other Changes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -324,160 +324,150 @@ public class HttpShardHandler extends ShardHandler {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void checkDistributed(ResponseBuilder rb) {
|
||||
SolrQueryRequest req = rb.req;
|
||||
SolrParams params = req.getParams();
|
||||
public void prepDistributed(ResponseBuilder rb) {
|
||||
final SolrQueryRequest req = rb.req;
|
||||
final SolrParams params = req.getParams();
|
||||
final String shards = params.get(ShardParams.SHARDS);
|
||||
|
||||
rb.isDistrib = params.getBool("distrib", req.getCore().getCoreDescriptor()
|
||||
.getCoreContainer().isZooKeeperAware());
|
||||
String shards = params.get(ShardParams.SHARDS);
|
||||
|
||||
// for back compat, a shards param with URLs like localhost:8983/solr will mean that this
|
||||
// search is distributed.
|
||||
boolean hasShardURL = shards != null && shards.indexOf('/') > 0;
|
||||
rb.isDistrib = hasShardURL | rb.isDistrib;
|
||||
|
||||
if (rb.isDistrib) {
|
||||
// since the cost of grabbing cloud state is still up in the air, we grab it only
|
||||
// if we need it.
|
||||
ClusterState clusterState = null;
|
||||
Map<String,Slice> slices = null;
|
||||
CoreDescriptor coreDescriptor = req.getCore().getCoreDescriptor();
|
||||
CloudDescriptor cloudDescriptor = coreDescriptor.getCloudDescriptor();
|
||||
ZkController zkController = coreDescriptor.getCoreContainer().getZkController();
|
||||
// since the cost of grabbing cloud state is still up in the air, we grab it only
|
||||
// if we need it.
|
||||
ClusterState clusterState = null;
|
||||
Map<String,Slice> slices = null;
|
||||
CoreDescriptor coreDescriptor = req.getCore().getCoreDescriptor();
|
||||
CloudDescriptor cloudDescriptor = coreDescriptor.getCloudDescriptor();
|
||||
ZkController zkController = coreDescriptor.getCoreContainer().getZkController();
|
||||
|
||||
|
||||
if (shards != null) {
|
||||
List<String> lst = StrUtils.splitSmart(shards, ",", true);
|
||||
rb.shards = lst.toArray(new String[lst.size()]);
|
||||
rb.slices = new String[rb.shards.length];
|
||||
if (shards != null) {
|
||||
List<String> lst = StrUtils.splitSmart(shards, ",", true);
|
||||
rb.shards = lst.toArray(new String[lst.size()]);
|
||||
rb.slices = new String[rb.shards.length];
|
||||
|
||||
if (zkController != null) {
|
||||
// figure out which shards are slices
|
||||
for (int i=0; i<rb.shards.length; i++) {
|
||||
if (rb.shards[i].indexOf('/') < 0) {
|
||||
// this is a logical shard
|
||||
rb.slices[i] = rb.shards[i];
|
||||
rb.shards[i] = null;
|
||||
}
|
||||
if (zkController != null) {
|
||||
// figure out which shards are slices
|
||||
for (int i=0; i<rb.shards.length; i++) {
|
||||
if (rb.shards[i].indexOf('/') < 0) {
|
||||
// this is a logical shard
|
||||
rb.slices[i] = rb.shards[i];
|
||||
rb.shards[i] = null;
|
||||
}
|
||||
}
|
||||
} else if (zkController != null) {
|
||||
// we weren't provided with an explicit list of slices to query via "shards", so use the cluster state
|
||||
}
|
||||
} else if (zkController != null) {
|
||||
// we weren't provided with an explicit list of slices to query via "shards", so use the cluster state
|
||||
|
||||
clusterState = zkController.getClusterState();
|
||||
String shardKeys = params.get(ShardParams._ROUTE_);
|
||||
clusterState = zkController.getClusterState();
|
||||
String shardKeys = params.get(ShardParams._ROUTE_);
|
||||
|
||||
// This will be the complete list of slices we need to query for this request.
|
||||
slices = new HashMap<>();
|
||||
// This will be the complete list of slices we need to query for this request.
|
||||
slices = new HashMap<>();
|
||||
|
||||
// we need to find out what collections this request is for.
|
||||
// we need to find out what collections this request is for.
|
||||
|
||||
// A comma-separated list of specified collections.
|
||||
// Eg: "collection1,collection2,collection3"
|
||||
String collections = params.get("collection");
|
||||
if (collections != null) {
|
||||
// If there were one or more collections specified in the query, split
|
||||
// each parameter and store as a separate member of a List.
|
||||
List<String> collectionList = StrUtils.splitSmart(collections, ",",
|
||||
true);
|
||||
// In turn, retrieve the slices that cover each collection from the
|
||||
// cloud state and add them to the Map 'slices'.
|
||||
for (String collectionName : collectionList) {
|
||||
// The original code produced <collection-name>_<shard-name> when the collections
|
||||
// parameter was specified (see ClientUtils.appendMap)
|
||||
// Is this necessary if ony one collection is specified?
|
||||
// i.e. should we change multiCollection to collectionList.size() > 1?
|
||||
addSlices(slices, clusterState, params, collectionName, shardKeys, true);
|
||||
}
|
||||
} else {
|
||||
// just this collection
|
||||
String collectionName = cloudDescriptor.getCollectionName();
|
||||
addSlices(slices, clusterState, params, collectionName, shardKeys, false);
|
||||
// A comma-separated list of specified collections.
|
||||
// Eg: "collection1,collection2,collection3"
|
||||
String collections = params.get("collection");
|
||||
if (collections != null) {
|
||||
// If there were one or more collections specified in the query, split
|
||||
// each parameter and store as a separate member of a List.
|
||||
List<String> collectionList = StrUtils.splitSmart(collections, ",",
|
||||
true);
|
||||
// In turn, retrieve the slices that cover each collection from the
|
||||
// cloud state and add them to the Map 'slices'.
|
||||
for (String collectionName : collectionList) {
|
||||
// The original code produced <collection-name>_<shard-name> when the collections
|
||||
// parameter was specified (see ClientUtils.appendMap)
|
||||
// Is this necessary if ony one collection is specified?
|
||||
// i.e. should we change multiCollection to collectionList.size() > 1?
|
||||
addSlices(slices, clusterState, params, collectionName, shardKeys, true);
|
||||
}
|
||||
|
||||
|
||||
// Store the logical slices in the ResponseBuilder and create a new
|
||||
// String array to hold the physical shards (which will be mapped
|
||||
// later).
|
||||
rb.slices = slices.keySet().toArray(new String[slices.size()]);
|
||||
rb.shards = new String[rb.slices.length];
|
||||
} else {
|
||||
// just this collection
|
||||
String collectionName = cloudDescriptor.getCollectionName();
|
||||
addSlices(slices, clusterState, params, collectionName, shardKeys, false);
|
||||
}
|
||||
|
||||
//
|
||||
// Map slices to shards
|
||||
//
|
||||
if (zkController != null) {
|
||||
|
||||
// Are we hosting the shard that this request is for, and are we active? If so, then handle it ourselves
|
||||
// and make it a non-distributed request.
|
||||
String ourSlice = cloudDescriptor.getShardId();
|
||||
String ourCollection = cloudDescriptor.getCollectionName();
|
||||
if (rb.slices.length == 1 && rb.slices[0] != null
|
||||
&& ( rb.slices[0].equals(ourSlice) || rb.slices[0].equals(ourCollection + "_" + ourSlice) ) // handle the <collection>_<slice> format
|
||||
&& cloudDescriptor.getLastPublished() == Replica.State.ACTIVE) {
|
||||
boolean shortCircuit = params.getBool("shortCircuit", true); // currently just a debugging parameter to check distrib search on a single node
|
||||
// Store the logical slices in the ResponseBuilder and create a new
|
||||
// String array to hold the physical shards (which will be mapped
|
||||
// later).
|
||||
rb.slices = slices.keySet().toArray(new String[slices.size()]);
|
||||
rb.shards = new String[rb.slices.length];
|
||||
}
|
||||
|
||||
String targetHandler = params.get(ShardParams.SHARDS_QT);
|
||||
shortCircuit = shortCircuit && targetHandler == null; // if a different handler is specified, don't short-circuit
|
||||
//
|
||||
// Map slices to shards
|
||||
//
|
||||
if (zkController != null) {
|
||||
|
||||
if (shortCircuit) {
|
||||
rb.isDistrib = false;
|
||||
rb.shortCircuitedURL = ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), coreDescriptor.getName());
|
||||
return;
|
||||
}
|
||||
// We shouldn't need to do anything to handle "shard.rows" since it was previously meant to be an optimization?
|
||||
// Are we hosting the shard that this request is for, and are we active? If so, then handle it ourselves
|
||||
// and make it a non-distributed request.
|
||||
String ourSlice = cloudDescriptor.getShardId();
|
||||
String ourCollection = cloudDescriptor.getCollectionName();
|
||||
if (rb.slices.length == 1 && rb.slices[0] != null
|
||||
&& ( rb.slices[0].equals(ourSlice) || rb.slices[0].equals(ourCollection + "_" + ourSlice) ) // handle the <collection>_<slice> format
|
||||
&& cloudDescriptor.getLastPublished() == Replica.State.ACTIVE) {
|
||||
boolean shortCircuit = params.getBool("shortCircuit", true); // currently just a debugging parameter to check distrib search on a single node
|
||||
|
||||
String targetHandler = params.get(ShardParams.SHARDS_QT);
|
||||
shortCircuit = shortCircuit && targetHandler == null; // if a different handler is specified, don't short-circuit
|
||||
|
||||
if (shortCircuit) {
|
||||
rb.isDistrib = false;
|
||||
rb.shortCircuitedURL = ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), coreDescriptor.getName());
|
||||
return;
|
||||
}
|
||||
// We shouldn't need to do anything to handle "shard.rows" since it was previously meant to be an optimization?
|
||||
}
|
||||
|
||||
|
||||
for (int i=0; i<rb.shards.length; i++) {
|
||||
if (rb.shards[i] == null) {
|
||||
if (clusterState == null) {
|
||||
clusterState = zkController.getClusterState();
|
||||
slices = clusterState.getSlicesMap(cloudDescriptor.getCollectionName());
|
||||
}
|
||||
String sliceName = rb.slices[i];
|
||||
|
||||
Slice slice = slices.get(sliceName);
|
||||
|
||||
if (slice==null) {
|
||||
// Treat this the same as "all servers down" for a slice, and let things continue
|
||||
// if partial results are acceptable
|
||||
rb.shards[i] = "";
|
||||
continue;
|
||||
// throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "no such shard: " + sliceName);
|
||||
}
|
||||
|
||||
Map<String, Replica> sliceShards = slice.getReplicasMap();
|
||||
|
||||
// For now, recreate the | delimited list of equivalent servers
|
||||
StringBuilder sliceShardsStr = new StringBuilder();
|
||||
boolean first = true;
|
||||
for (Replica replica : sliceShards.values()) {
|
||||
if (!clusterState.liveNodesContain(replica.getNodeName())
|
||||
|| replica.getState() != Replica.State.ACTIVE) {
|
||||
continue;
|
||||
}
|
||||
if (first) {
|
||||
first = false;
|
||||
} else {
|
||||
sliceShardsStr.append('|');
|
||||
}
|
||||
String url = ZkCoreNodeProps.getCoreUrl(replica);
|
||||
sliceShardsStr.append(url);
|
||||
}
|
||||
|
||||
if (sliceShardsStr.length() == 0) {
|
||||
boolean tolerant = rb.req.getParams().getBool(ShardParams.SHARDS_TOLERANT, false);
|
||||
if (!tolerant) {
|
||||
// stop the check when there are no replicas available for a shard
|
||||
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
|
||||
"no servers hosting shard: " + rb.slices[i]);
|
||||
}
|
||||
}
|
||||
|
||||
rb.shards[i] = sliceShardsStr.toString();
|
||||
for (int i=0; i<rb.shards.length; i++) {
|
||||
if (rb.shards[i] == null) {
|
||||
if (clusterState == null) {
|
||||
clusterState = zkController.getClusterState();
|
||||
slices = clusterState.getSlicesMap(cloudDescriptor.getCollectionName());
|
||||
}
|
||||
String sliceName = rb.slices[i];
|
||||
|
||||
Slice slice = slices.get(sliceName);
|
||||
|
||||
if (slice==null) {
|
||||
// Treat this the same as "all servers down" for a slice, and let things continue
|
||||
// if partial results are acceptable
|
||||
rb.shards[i] = "";
|
||||
continue;
|
||||
// throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "no such shard: " + sliceName);
|
||||
}
|
||||
|
||||
Map<String, Replica> sliceShards = slice.getReplicasMap();
|
||||
|
||||
// For now, recreate the | delimited list of equivalent servers
|
||||
StringBuilder sliceShardsStr = new StringBuilder();
|
||||
boolean first = true;
|
||||
for (Replica replica : sliceShards.values()) {
|
||||
if (!clusterState.liveNodesContain(replica.getNodeName())
|
||||
|| replica.getState() != Replica.State.ACTIVE) {
|
||||
continue;
|
||||
}
|
||||
if (first) {
|
||||
first = false;
|
||||
} else {
|
||||
sliceShardsStr.append('|');
|
||||
}
|
||||
String url = ZkCoreNodeProps.getCoreUrl(replica);
|
||||
sliceShardsStr.append(url);
|
||||
}
|
||||
|
||||
if (sliceShardsStr.length() == 0) {
|
||||
boolean tolerant = rb.req.getParams().getBool(ShardParams.SHARDS_TOLERANT, false);
|
||||
if (!tolerant) {
|
||||
// stop the check when there are no replicas available for a shard
|
||||
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
|
||||
"no servers hosting shard: " + rb.slices[i]);
|
||||
}
|
||||
}
|
||||
|
||||
rb.shards[i] = sliceShardsStr.toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -203,6 +203,29 @@ public class SearchHandler extends RequestHandlerBase implements SolrCoreAware ,
|
|||
return result;
|
||||
}
|
||||
|
||||
private ShardHandler getAndPrepShardHandler(SolrQueryRequest req, ResponseBuilder rb) {
|
||||
ShardHandler shardHandler = null;
|
||||
|
||||
rb.isDistrib = req.getParams().getBool("distrib", req.getCore().getCoreDescriptor()
|
||||
.getCoreContainer().isZooKeeperAware());
|
||||
if (!rb.isDistrib) {
|
||||
// for back compat, a shards param with URLs like localhost:8983/solr will mean that this
|
||||
// search is distributed.
|
||||
final String shards = req.getParams().get(ShardParams.SHARDS);
|
||||
rb.isDistrib = ((shards != null) && (shards.indexOf('/') > 0));
|
||||
}
|
||||
|
||||
if (rb.isDistrib) {
|
||||
shardHandler = shardHandlerFactory.getShardHandler();
|
||||
shardHandler.prepDistributed(rb);
|
||||
if (!rb.isDistrib) {
|
||||
shardHandler = null; // request is not distributed after all and so the shard handler is not needed
|
||||
}
|
||||
}
|
||||
|
||||
return shardHandler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception
|
||||
{
|
||||
|
@ -220,8 +243,7 @@ public class SearchHandler extends RequestHandlerBase implements SolrCoreAware ,
|
|||
|
||||
final RTimer timer = rb.isDebug() ? req.getRequestTimer() : null;
|
||||
|
||||
ShardHandler shardHandler1 = shardHandlerFactory.getShardHandler();
|
||||
shardHandler1.checkDistributed(rb);
|
||||
final ShardHandler shardHandler1 = getAndPrepShardHandler(req, rb); // creates a ShardHandler object only if it's needed
|
||||
|
||||
if (timer == null) {
|
||||
// non-debugging prepare phase
|
||||
|
@ -316,7 +338,7 @@ public class SearchHandler extends RequestHandlerBase implements SolrCoreAware ,
|
|||
if (sreq.actualShards==ShardRequest.ALL_SHARDS) {
|
||||
sreq.actualShards = rb.shards;
|
||||
}
|
||||
sreq.responses = new ArrayList<>();
|
||||
sreq.responses = new ArrayList<>(sreq.actualShards.length); // presume we'll get a response from each shard we send to
|
||||
|
||||
// TODO: map from shard to address[]
|
||||
for (String shard : sreq.actualShards) {
|
||||
|
|
|
@ -20,7 +20,7 @@ package org.apache.solr.handler.component;
|
|||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
|
||||
public abstract class ShardHandler {
|
||||
public abstract void checkDistributed(ResponseBuilder rb);
|
||||
public abstract void prepDistributed(ResponseBuilder rb);
|
||||
public abstract void submit(ShardRequest sreq, String shard, ModifiableSolrParams params) ;
|
||||
public abstract ShardResponse takeCompletedIncludingErrors();
|
||||
public abstract ShardResponse takeCompletedOrError();
|
||||
|
|
|
@ -39,7 +39,7 @@ public class MockShardHandlerFactory extends ShardHandlerFactory implements Plug
|
|||
public ShardHandler getShardHandler() {
|
||||
return new ShardHandler() {
|
||||
@Override
|
||||
public void checkDistributed(ResponseBuilder rb) {}
|
||||
public void prepDistributed(ResponseBuilder rb) {}
|
||||
|
||||
@Override
|
||||
public void submit(ShardRequest sreq, String shard,
|
||||
|
|
|
@ -88,8 +88,8 @@ public class TrackingShardHandlerFactory extends HttpShardHandlerFactory {
|
|||
final ShardHandler wrapped = super.getShardHandler();
|
||||
return new ShardHandler() {
|
||||
@Override
|
||||
public void checkDistributed(ResponseBuilder rb) {
|
||||
wrapped.checkDistributed(rb);
|
||||
public void prepDistributed(ResponseBuilder rb) {
|
||||
wrapped.prepDistributed(rb);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue