SOLR-9758: refactor preferLocalShards implementation

This commit is contained in:
Christine Poerschke 2016-12-21 19:21:50 +00:00
parent 0d3c64ab09
commit de0a046b21
10 changed files with 92 additions and 53 deletions

View File

@ -328,6 +328,8 @@ Other Changes
* SOLR-9878: fixing lazy logic for retrieving ReversedWildcardFilterFactory in SolrQueryParserBase (Mikhail Khludnev)
* SOLR-9758: refactor preferLocalShards implementation (Christine Poerschke)
================== 6.3.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

View File

@ -15,15 +15,14 @@
* limitations under the License.
*/
package org.apache.solr.handler.component;
import java.lang.invoke.MethodHandles;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
@ -114,51 +113,19 @@ public class HttpShardHandler extends ShardHandler {
// Not thread safe... don't use in Callable.
// Don't modify the returned URL list.
private List<String> getURLs(String shard, String preferredHostAddress) {
private List<String> getURLs(String shard) {
List<String> urls = shardToURLs.get(shard);
if (urls == null) {
urls = httpShardHandlerFactory.buildURLList(shard);
if (preferredHostAddress != null && urls.size() > 1) {
preferCurrentHostForDistributedReq(preferredHostAddress, urls);
}
shardToURLs.put(shard, urls);
}
return urls;
}
/**
* A distributed request is made via {@link LBHttpSolrClient} to the first live server in the URL list.
* This means it is just as likely to choose current host as any of the other hosts.
* This function makes sure that the cores of current host are always put first in the URL list.
* If all nodes prefer local-cores then a bad/heavily-loaded node will receive less requests from healthy nodes.
* This will help prevent a distributed deadlock or timeouts in all the healthy nodes due to one bad node.
*/
private void preferCurrentHostForDistributedReq(final String currentHostAddress, final List<String> urls) {
if (log.isDebugEnabled())
log.debug("Trying to prefer local shard on {} among the urls: {}",
currentHostAddress, Arrays.toString(urls.toArray()));
ListIterator<String> itr = urls.listIterator();
while (itr.hasNext()) {
String url = itr.next();
if (url.startsWith(currentHostAddress)) {
// move current URL to the fore-front
itr.remove();
urls.add(0, url);
if (log.isDebugEnabled())
log.debug("Applied local shard preference for urls: {}",
Arrays.toString(urls.toArray()));
break;
}
}
}
@Override
public void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params, String preferredHostAddress) {
public void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {
// do this outside of the callable for thread safety reasons
final List<String> urls = getURLs(shard, preferredHostAddress);
final List<String> urls = getURLs(shard);
Callable<ShardResponse> task = () -> {
@ -314,13 +281,6 @@ public class HttpShardHandler extends ShardHandler {
CloudDescriptor cloudDescriptor = coreDescriptor.getCloudDescriptor();
ZkController zkController = coreDescriptor.getCoreContainer().getZkController();
if (params.getBool(CommonParams.PREFER_LOCAL_SHARDS, false)) {
rb.preferredHostAddress = (zkController != null) ? zkController.getBaseUrl() : null;
if (rb.preferredHostAddress == null) {
log.warn("Couldn't determine current host address to prefer local shards");
}
}
final ReplicaListTransformer replicaListTransformer = httpShardHandlerFactory.getReplicaListTransformer(req);
if (shards != null) {

View File

@ -24,11 +24,16 @@ import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
import org.apache.solr.client.solrj.impl.LBHttpSolrClient.Builder;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.URLUtil;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.update.UpdateShardHandlerConfig;
import org.apache.solr.request.SolrQueryRequest;
@ -38,6 +43,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
@ -245,8 +252,80 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
return urls;
}
/**
* A distributed request is made via {@link LBHttpSolrClient} to the first live server in the URL list.
* This means it is just as likely to choose current host as any of the other hosts.
* This function makes sure that the cores of current host are always put first in the URL list.
* If all nodes prefer local-cores then a bad/heavily-loaded node will receive less requests from healthy nodes.
* This will help prevent a distributed deadlock or timeouts in all the healthy nodes due to one bad node.
*/
private class IsOnPreferredHostComparator implements Comparator<Object> {
final private String preferredHostAddress;
public IsOnPreferredHostComparator(String preferredHostAddress) {
this.preferredHostAddress = preferredHostAddress;
}
@Override
public int compare(Object left, Object right) {
final boolean lhs = hasPrefix(objectToString(left));
final boolean rhs = hasPrefix(objectToString(right));
if (lhs != rhs) {
if (lhs) {
return -1;
} else {
return +1;
}
} else {
return 0;
}
}
private String objectToString(Object o) {
final String s;
if (o instanceof String) {
s = (String)o;
}
else if (o instanceof Replica) {
s = ((Replica)o).getCoreUrl();
} else {
s = null;
}
return s;
}
private boolean hasPrefix(String s) {
return s != null && s.startsWith(preferredHostAddress);
}
}
ReplicaListTransformer getReplicaListTransformer(final SolrQueryRequest req)
{
final SolrParams params = req.getParams();
if (params.getBool(CommonParams.PREFER_LOCAL_SHARDS, false)) {
final CoreDescriptor coreDescriptor = req.getCore().getCoreDescriptor();
final ZkController zkController = coreDescriptor.getCoreContainer().getZkController();
final String preferredHostAddress = (zkController != null) ? zkController.getBaseUrl() : null;
if (preferredHostAddress == null) {
log.warn("Couldn't determine current host address to prefer local shards");
} else {
return new ShufflingReplicaListTransformer(r) {
@Override
public void transform(List<?> choices)
{
if (choices.size() > 1) {
super.transform(choices);
if (log.isDebugEnabled()) {
log.debug("Trying to prefer local shard on {} among the choices: {}",
preferredHostAddress, Arrays.toString(choices.toArray()));
}
choices.sort(new IsOnPreferredHostComparator(preferredHostAddress));
if (log.isDebugEnabled()) {
log.debug("Applied local shard preference for choices: {}",
Arrays.toString(choices.toArray()));
}
}
}
};
}
}
return shufflingReplicaListTransformer;
}

View File

@ -136,7 +136,6 @@ public class ResponseBuilder
public int shards_start = -1;
public List<ShardRequest> outgoing; // requests to be sent
public List<ShardRequest> finished; // requests that have received responses from all shards
public String preferredHostAddress = null;
public String shortCircuitedURL;
public int getShardNum(String shard) {

View File

@ -383,7 +383,7 @@ public class SearchHandler extends RequestHandlerBase implements SolrCoreAware ,
params.set(CommonParams.QT, reqPath);
} // else if path is /select, then the qt gets passed thru if set
}
shardHandler1.submit(sreq, shard, params, rb.preferredHostAddress);
shardHandler1.submit(sreq, shard, params);
}
}

View File

@ -19,10 +19,7 @@ import org.apache.solr.common.params.ModifiableSolrParams;
public abstract class ShardHandler {
public abstract void prepDistributed(ResponseBuilder rb);
public void submit(ShardRequest sreq, String shard, ModifiableSolrParams params) {
submit(sreq, shard, params, null);
}
public abstract void submit(ShardRequest sreq, String shard, ModifiableSolrParams params, String preferredHostAddress);
public abstract void submit(ShardRequest sreq, String shard, ModifiableSolrParams params);
public abstract ShardResponse takeCompletedIncludingErrors();
public abstract ShardResponse takeCompletedOrError();
public abstract void cancelAll();

View File

@ -42,7 +42,7 @@ public class MockShardHandlerFactory extends ShardHandlerFactory implements Plug
@Override
public void submit(ShardRequest sreq, String shard,
ModifiableSolrParams params, String preferredHostAddress) {}
ModifiableSolrParams params) {}
@Override
public ShardResponse takeCompletedIncludingErrors() {

View File

@ -349,7 +349,7 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
SolrQuery qRequest = new SolrQuery("*:*");
ModifiableSolrParams qParams = new ModifiableSolrParams();
qParams.add("preferLocalShards", Boolean.toString(preferLocalShards));
qParams.add(CommonParams.PREFER_LOCAL_SHARDS, Boolean.toString(preferLocalShards));
qParams.add(ShardParams.SHARDS_INFO, "true");
qRequest.add(qParams);

View File

@ -31,4 +31,6 @@ public class CommonParamsTest extends LuceneTestCase
public void testRows() { assertEquals("rows", CommonParams.ROWS); }
public void testRowsDefault() { assertEquals(10, CommonParams.ROWS_DEFAULT); }
public void testPreferLocalShards() { assertEquals("preferLocalShards", CommonParams.PREFER_LOCAL_SHARDS); }
}

View File

@ -91,7 +91,7 @@ public class TrackingShardHandlerFactory extends HttpShardHandlerFactory {
}
@Override
public void submit(ShardRequest sreq, String shard, ModifiableSolrParams params, String preferredHostAddress) {
public void submit(ShardRequest sreq, String shard, ModifiableSolrParams params) {
synchronized (TrackingShardHandlerFactory.this) {
if (isTracking()) {
queue.offer(new ShardRequestAndParams(sreq, shard, params));