mirror of https://github.com/apache/lucene.git
SOLR-8298: small preferLocalShards implementation refactor
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1715208 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b7195573ec
commit
491bb5ba90
|
@ -535,6 +535,8 @@ Other Changes
|
||||||
* SOLR-8283: factor out StrParser from QueryParsing.StrParser and SortSpecParsing[Test]
|
* SOLR-8283: factor out StrParser from QueryParsing.StrParser and SortSpecParsing[Test]
|
||||||
from QueryParsing[Test] (Christine Poerschke)
|
from QueryParsing[Test] (Christine Poerschke)
|
||||||
|
|
||||||
|
* SOLR-8298: small preferLocalShards implementation refactor (Christine Poerschke)
|
||||||
|
|
||||||
================== 5.3.1 ==================
|
================== 5.3.1 ==================
|
||||||
|
|
||||||
Bug Fixes
|
Bug Fixes
|
||||||
|
|
|
@ -114,11 +114,13 @@ public class HttpShardHandler extends ShardHandler {
|
||||||
|
|
||||||
// Not thread safe... don't use in Callable.
|
// Not thread safe... don't use in Callable.
|
||||||
// Don't modify the returned URL list.
|
// Don't modify the returned URL list.
|
||||||
private List<String> getURLs(ShardRequest sreq, String shard) {
|
private List<String> getURLs(String shard, String preferredHostAddress) {
|
||||||
List<String> urls = shardToURLs.get(shard);
|
List<String> urls = shardToURLs.get(shard);
|
||||||
if (urls == null) {
|
if (urls == null) {
|
||||||
urls = httpShardHandlerFactory.makeURLList(shard);
|
urls = httpShardHandlerFactory.makeURLList(shard);
|
||||||
preferCurrentHostForDistributedReq(sreq, urls);
|
if (preferredHostAddress != null && urls.size() > 1) {
|
||||||
|
preferCurrentHostForDistributedReq(preferredHostAddress, urls);
|
||||||
|
}
|
||||||
shardToURLs.put(shard, urls);
|
shardToURLs.put(shard, urls);
|
||||||
}
|
}
|
||||||
return urls;
|
return urls;
|
||||||
|
@ -131,27 +133,7 @@ public class HttpShardHandler extends ShardHandler {
|
||||||
* If all nodes prefer local-cores then a bad/heavily-loaded node will receive less requests from healthy nodes.
|
* If all nodes prefer local-cores then a bad/heavily-loaded node will receive less requests from healthy nodes.
|
||||||
* This will help prevent a distributed deadlock or timeouts in all the healthy nodes due to one bad node.
|
* This will help prevent a distributed deadlock or timeouts in all the healthy nodes due to one bad node.
|
||||||
*/
|
*/
|
||||||
private void preferCurrentHostForDistributedReq(final ShardRequest sreq, final List<String> urls) {
|
private void preferCurrentHostForDistributedReq(final String currentHostAddress, final List<String> urls) {
|
||||||
if (sreq == null || sreq.rb == null || sreq.rb.req == null || urls == null || urls.size() <= 1)
|
|
||||||
return;
|
|
||||||
|
|
||||||
SolrQueryRequest req = sreq.rb.req;
|
|
||||||
|
|
||||||
// determine if we should apply the local preference
|
|
||||||
if (!req.getOriginalParams().getBool(CommonParams.PREFER_LOCAL_SHARDS, false))
|
|
||||||
return;
|
|
||||||
|
|
||||||
// Get this node's base URL from ZK
|
|
||||||
SolrCore core = req.getCore();
|
|
||||||
ZkController zkController = (core != null) ? core.getCoreDescriptor().getCoreContainer().getZkController() : null;
|
|
||||||
String currentHostAddress = (zkController != null) ? zkController.getBaseUrl() : null;
|
|
||||||
if (currentHostAddress == null) {
|
|
||||||
log.debug("Couldn't determine current host address to prefer local shards " +
|
|
||||||
"because either core is null? {} or there is no ZkController? {}",
|
|
||||||
Boolean.valueOf(core == null), Boolean.valueOf(zkController == null));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (log.isDebugEnabled())
|
if (log.isDebugEnabled())
|
||||||
log.debug("Trying to prefer local shard on {} among the urls: {}",
|
log.debug("Trying to prefer local shard on {} among the urls: {}",
|
||||||
currentHostAddress, Arrays.toString(urls.toArray()));
|
currentHostAddress, Arrays.toString(urls.toArray()));
|
||||||
|
@ -174,9 +156,9 @@ public class HttpShardHandler extends ShardHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {
|
public void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params, String preferredHostAddress) {
|
||||||
// do this outside of the callable for thread safety reasons
|
// do this outside of the callable for thread safety reasons
|
||||||
final List<String> urls = getURLs(sreq, shard);
|
final List<String> urls = getURLs(shard, preferredHostAddress);
|
||||||
|
|
||||||
Callable<ShardResponse> task = new Callable<ShardResponse>() {
|
Callable<ShardResponse> task = new Callable<ShardResponse>() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -335,6 +317,12 @@ public class HttpShardHandler extends ShardHandler {
|
||||||
CloudDescriptor cloudDescriptor = coreDescriptor.getCloudDescriptor();
|
CloudDescriptor cloudDescriptor = coreDescriptor.getCloudDescriptor();
|
||||||
ZkController zkController = coreDescriptor.getCoreContainer().getZkController();
|
ZkController zkController = coreDescriptor.getCoreContainer().getZkController();
|
||||||
|
|
||||||
|
if (params.getBool(CommonParams.PREFER_LOCAL_SHARDS, false)) {
|
||||||
|
rb.preferredHostAddress = (zkController != null) ? zkController.getBaseUrl() : null;
|
||||||
|
if (rb.preferredHostAddress == null) {
|
||||||
|
log.warn("Couldn't determine current host address to prefer local shards");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (shards != null) {
|
if (shards != null) {
|
||||||
List<String> lst = StrUtils.splitSmart(shards, ",", true);
|
List<String> lst = StrUtils.splitSmart(shards, ",", true);
|
||||||
|
|
|
@ -136,6 +136,7 @@ public class ResponseBuilder
|
||||||
public int shards_start = -1;
|
public int shards_start = -1;
|
||||||
public List<ShardRequest> outgoing; // requests to be sent
|
public List<ShardRequest> outgoing; // requests to be sent
|
||||||
public List<ShardRequest> finished; // requests that have received responses from all shards
|
public List<ShardRequest> finished; // requests that have received responses from all shards
|
||||||
|
public String preferredHostAddress = null;
|
||||||
public String shortCircuitedURL;
|
public String shortCircuitedURL;
|
||||||
|
|
||||||
public int getShardNum(String shard) {
|
public int getShardNum(String shard) {
|
||||||
|
@ -147,7 +148,6 @@ public class ResponseBuilder
|
||||||
|
|
||||||
public void addRequest(SearchComponent me, ShardRequest sreq) {
|
public void addRequest(SearchComponent me, ShardRequest sreq) {
|
||||||
outgoing.add(sreq);
|
outgoing.add(sreq);
|
||||||
sreq.rb = this;
|
|
||||||
if ((sreq.purpose & ShardRequest.PURPOSE_PRIVATE) == 0) {
|
if ((sreq.purpose & ShardRequest.PURPOSE_PRIVATE) == 0) {
|
||||||
// if this isn't a private request, let other components modify it.
|
// if this isn't a private request, let other components modify it.
|
||||||
for (SearchComponent component : components) {
|
for (SearchComponent component : components) {
|
||||||
|
|
|
@ -370,7 +370,7 @@ public class SearchHandler extends RequestHandlerBase implements SolrCoreAware ,
|
||||||
params.remove(CommonParams.QT);
|
params.remove(CommonParams.QT);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
shardHandler1.submit(sreq, shard, params);
|
shardHandler1.submit(sreq, shard, params, rb.preferredHostAddress);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,10 @@ import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
|
|
||||||
public abstract class ShardHandler {
|
public abstract class ShardHandler {
|
||||||
public abstract void prepDistributed(ResponseBuilder rb);
|
public abstract void prepDistributed(ResponseBuilder rb);
|
||||||
public abstract void submit(ShardRequest sreq, String shard, ModifiableSolrParams params) ;
|
public void submit(ShardRequest sreq, String shard, ModifiableSolrParams params) {
|
||||||
|
submit(sreq, shard, params, null);
|
||||||
|
}
|
||||||
|
public abstract void submit(ShardRequest sreq, String shard, ModifiableSolrParams params, String preferredHostAddress);
|
||||||
public abstract ShardResponse takeCompletedIncludingErrors();
|
public abstract ShardResponse takeCompletedIncludingErrors();
|
||||||
public abstract ShardResponse takeCompletedOrError();
|
public abstract ShardResponse takeCompletedOrError();
|
||||||
public abstract void cancelAll();
|
public abstract void cancelAll();
|
||||||
|
|
|
@ -49,7 +49,6 @@ public class ShardRequest {
|
||||||
|
|
||||||
public ModifiableSolrParams params;
|
public ModifiableSolrParams params;
|
||||||
|
|
||||||
public ResponseBuilder rb;
|
|
||||||
|
|
||||||
/** list of responses... filled out by framework */
|
/** list of responses... filled out by framework */
|
||||||
public List<ShardResponse> responses = new ArrayList<>();
|
public List<ShardResponse> responses = new ArrayList<>();
|
||||||
|
|
|
@ -43,7 +43,7 @@ public class MockShardHandlerFactory extends ShardHandlerFactory implements Plug
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void submit(ShardRequest sreq, String shard,
|
public void submit(ShardRequest sreq, String shard,
|
||||||
ModifiableSolrParams params) {}
|
ModifiableSolrParams params, String preferredHostAddress) {}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ShardResponse takeCompletedIncludingErrors() {
|
public ShardResponse takeCompletedIncludingErrors() {
|
||||||
|
|
|
@ -92,7 +92,7 @@ public class TrackingShardHandlerFactory extends HttpShardHandlerFactory {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void submit(ShardRequest sreq, String shard, ModifiableSolrParams params) {
|
public void submit(ShardRequest sreq, String shard, ModifiableSolrParams params, String preferredHostAddress) {
|
||||||
synchronized (TrackingShardHandlerFactory.this) {
|
synchronized (TrackingShardHandlerFactory.this) {
|
||||||
if (isTracking()) {
|
if (isTracking()) {
|
||||||
queue.offer(new ShardRequestAndParams(sreq, shard, params));
|
queue.offer(new ShardRequestAndParams(sreq, shard, params));
|
||||||
|
|
Loading…
Reference in New Issue