SOLR-11982: Add support for indicating preferred replica types for queries

This commit is contained in:
Tomas Fernandez Lobbe 2018-04-11 16:23:00 -07:00
parent 5bd7b03e71
commit 8927d469cb
8 changed files with 426 additions and 63 deletions

View File

@ -90,6 +90,9 @@ New Features
* SOLR-12181: Add index size autoscaling trigger, based on document count or size in bytes. (ab) * SOLR-12181: Add index size autoscaling trigger, based on document count or size in bytes. (ab)
* SOLR-11982: Add possibility to define replica order with the shards.preference parameter to e.g. prefer PULL replicas
for distributed queries. (Ere Maijala, Tomás Fernández Löbbe)
Bug Fixes Bug Fixes
---------------------- ----------------------

View File

@ -29,12 +29,12 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils; import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.URLUtil; import org.apache.solr.common.util.URLUtil;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.PluginInfo; import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrInfoBean; import org.apache.solr.core.SolrInfoBean;
import org.apache.solr.metrics.SolrMetricManager; import org.apache.solr.metrics.SolrMetricManager;
@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
@ -303,30 +304,61 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
/** /**
* A distributed request is made via {@link LBHttpSolrClient} to the first live server in the URL list. * A distributed request is made via {@link LBHttpSolrClient} to the first live server in the URL list.
* This means it is just as likely to choose current host as any of the other hosts. * This means it is just as likely to choose current host as any of the other hosts.
* This function makes sure that the cores of current host are always put first in the URL list. * This function makes sure that the cores are sorted according to the given list of preferences.
* If all nodes prefer local-cores then a bad/heavily-loaded node will receive less requests from healthy nodes. * E.g. If all nodes prefer local cores then a bad/heavily-loaded node will receive less requests from
* This will help prevent a distributed deadlock or timeouts in all the healthy nodes due to one bad node. * healthy nodes. This will help prevent a distributed deadlock or timeouts in all the healthy nodes due
* to one bad node.
*/ */
private static class IsOnPreferredHostComparator implements Comparator<Object> { static class NodePreferenceRulesComparator implements Comparator<Object> {
final private String preferredHostAddress; private static class PreferenceRule {
public IsOnPreferredHostComparator(String preferredHostAddress) { public final String name;
this.preferredHostAddress = preferredHostAddress; public final String value;
public PreferenceRule(String name, String value) {
this.name = name;
this.value = value;
}
}
private final SolrQueryRequest request;
private List<PreferenceRule> preferenceRules;
private String localHostAddress = null;
public NodePreferenceRulesComparator(final List<String> sortRules, final SolrQueryRequest request) {
this.request = request;
this.preferenceRules = new ArrayList<PreferenceRule>(sortRules.size());
sortRules.forEach(rule -> {
String[] parts = rule.split(":", 2);
if (parts.length != 2) {
throw new IllegalArgumentException("Invalid " + ShardParams.SHARDS_PREFERENCE + " rule: " + rule);
}
this.preferenceRules.add(new PreferenceRule(parts[0], parts[1]));
});
} }
@Override @Override
public int compare(Object left, Object right) { public int compare(Object left, Object right) {
final boolean lhs = hasPrefix(objectToString(left)); for (PreferenceRule preferenceRule: this.preferenceRules) {
final boolean rhs = hasPrefix(objectToString(right)); final boolean lhs;
if (lhs != rhs) { final boolean rhs;
if (lhs) { switch (preferenceRule.name) {
return -1; case ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE:
} else { lhs = hasReplicaType(left, preferenceRule.value);
return +1; rhs = hasReplicaType(right, preferenceRule.value);
break;
case ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION:
lhs = hasCoreUrlPrefix(left, preferenceRule.value);
rhs = hasCoreUrlPrefix(right, preferenceRule.value);
break;
default:
throw new IllegalArgumentException("Invalid " + ShardParams.SHARDS_PREFERENCE + " type: " + preferenceRule.name);
}
if (lhs != rhs) {
return lhs ? -1 : +1;
} }
} else {
return 0;
} }
return 0;
} }
private String objectToString(Object o) { private boolean hasCoreUrlPrefix(Object o, String prefix) {
final String s; final String s;
if (o instanceof String) { if (o instanceof String) {
s = (String)o; s = (String)o;
@ -334,44 +366,80 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
else if (o instanceof Replica) { else if (o instanceof Replica) {
s = ((Replica)o).getCoreUrl(); s = ((Replica)o).getCoreUrl();
} else { } else {
s = null; return false;
} }
return s; if (prefix.equals(ShardParams.REPLICA_LOCAL)) {
if (null == localHostAddress) {
final ZkController zkController = this.request.getCore().getCoreContainer().getZkController();
localHostAddress = zkController != null ? zkController.getBaseUrl() : "";
if (localHostAddress.isEmpty()) {
log.warn("Couldn't determine current host address for sorting of local replicas");
}
}
if (!localHostAddress.isEmpty()) {
if (s.startsWith(localHostAddress)) {
return true;
}
}
} else {
if (s.startsWith(prefix)) {
return true;
}
}
return false;
} }
private boolean hasPrefix(String s) { private static boolean hasReplicaType(Object o, String preferred) {
return s != null && s.startsWith(preferredHostAddress); if (!(o instanceof Replica)) {
return false;
}
final String s = ((Replica)o).getType().toString();
return s.equals(preferred);
} }
} }
protected ReplicaListTransformer getReplicaListTransformer(final SolrQueryRequest req)
{
final SolrParams params = req.getParams();
if (params.getBool(CommonParams.PREFER_LOCAL_SHARDS, false)) { protected ReplicaListTransformer getReplicaListTransformer(final SolrQueryRequest req) {
final CoreDescriptor coreDescriptor = req.getCore().getCoreDescriptor(); final SolrParams params = req.getParams();
final ZkController zkController = req.getCore().getCoreContainer().getZkController(); @SuppressWarnings("deprecation")
final String preferredHostAddress = (zkController != null) ? zkController.getBaseUrl() : null; final boolean preferLocalShards = params.getBool(CommonParams.PREFER_LOCAL_SHARDS, false);
if (preferredHostAddress == null) { final String shardsPreferenceSpec = params.get(ShardParams.SHARDS_PREFERENCE, "");
log.warn("Couldn't determine current host address to prefer local shards");
} else { if (preferLocalShards || !shardsPreferenceSpec.isEmpty()) {
return new ShufflingReplicaListTransformer(r) { if (preferLocalShards && !shardsPreferenceSpec.isEmpty()) {
@Override throw new SolrException(
public void transform(List<?> choices) SolrException.ErrorCode.BAD_REQUEST,
{ "preferLocalShards is deprecated and must not be used with shards.preference"
if (choices.size() > 1) { );
super.transform(choices); }
if (log.isDebugEnabled()) { List<String> preferenceRules = StrUtils.splitSmart(shardsPreferenceSpec, ',');
log.debug("Trying to prefer local shard on {} among the choices: {}", if (preferLocalShards) {
preferredHostAddress, Arrays.toString(choices.toArray())); preferenceRules.add(ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION + ":" + ShardParams.REPLICA_LOCAL);
} }
choices.sort(new IsOnPreferredHostComparator(preferredHostAddress));
if (log.isDebugEnabled()) { return new ShufflingReplicaListTransformer(r) {
log.debug("Applied local shard preference for choices: {}", @Override
Arrays.toString(choices.toArray())); public void transform(List<?> choices)
} {
if (choices.size() > 1) {
super.transform(choices);
if (log.isDebugEnabled()) {
log.debug("Applying the following sorting preferences to replicas: {}",
Arrays.toString(preferenceRules.toArray()));
}
try {
choices.sort(new NodePreferenceRulesComparator(preferenceRules, req));
} catch (IllegalArgumentException iae) {
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST,
iae.getMessage()
);
}
if (log.isDebugEnabled()) {
log.debug("Applied sorting preferences to replica list: {}",
Arrays.toString(choices.toArray()));
} }
} }
}; }
} };
} }
return shufflingReplicaListTransformer; return shufflingReplicaListTransformer;
@ -409,4 +477,5 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
manager.registry(registry), manager.registry(registry),
SolrMetricManager.mkName("httpShardExecutor", expandedScope, "threadPool")); SolrMetricManager.mkName("httpShardExecutor", expandedScope, "threadPool"));
} }
} }

View File

@ -24,6 +24,10 @@ import java.util.List;
import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.impl.LBHttpSolrClient; import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.CoreContainer; import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.component.HttpShardHandlerFactory; import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.handler.component.ShardHandlerFactory; import org.apache.solr.handler.component.ShardHandlerFactory;
@ -99,4 +103,119 @@ public class TestHttpShardHandlerFactory extends SolrTestCaseJ4 {
} }
} }
@SuppressWarnings("unchecked")
public void testNodePreferenceRulesComparator() throws Exception {
List<Replica> replicas = new ArrayList<Replica>();
replicas.add(
new Replica(
"node1",
map(
ZkStateReader.BASE_URL_PROP, "http://host1:8983/solr",
ZkStateReader.NODE_NAME_PROP, "node1",
ZkStateReader.CORE_NAME_PROP, "collection1",
ZkStateReader.REPLICA_TYPE, "NRT"
)
)
);
replicas.add(
new Replica(
"node2",
map(
ZkStateReader.BASE_URL_PROP, "http://host2:8983/solr",
ZkStateReader.NODE_NAME_PROP, "node2",
ZkStateReader.CORE_NAME_PROP, "collection1",
ZkStateReader.REPLICA_TYPE, "TLOG"
)
)
);
replicas.add(
new Replica(
"node3",
map(
ZkStateReader.BASE_URL_PROP, "http://host2_2:8983/solr",
ZkStateReader.NODE_NAME_PROP, "node3",
ZkStateReader.CORE_NAME_PROP, "collection1",
ZkStateReader.REPLICA_TYPE, "PULL"
)
)
);
// Simple replica type rule
List<String> rules = StrUtils.splitSmart(
ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":NRT," +
ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":TLOG",
','
);
HttpShardHandlerFactory.NodePreferenceRulesComparator comparator =
new HttpShardHandlerFactory.NodePreferenceRulesComparator(rules, null);
replicas.sort(comparator);
assertEquals("node1", replicas.get(0).getNodeName());
assertEquals("node2", replicas.get(1).getNodeName());
// Another simple replica type rule
rules = StrUtils.splitSmart(
ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":TLOG," +
ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":NRT",
','
);
comparator = new HttpShardHandlerFactory.NodePreferenceRulesComparator(rules, null);
replicas.sort(comparator);
assertEquals("node2", replicas.get(0).getNodeName());
assertEquals("node1", replicas.get(1).getNodeName());
// replicaLocation rule
rules = StrUtils.splitSmart(ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION + ":http://host2:8983", ',');
comparator = new HttpShardHandlerFactory.NodePreferenceRulesComparator(rules, null);
replicas.sort(comparator);
assertEquals("node2", replicas.get(0).getNodeName());
assertEquals("node1", replicas.get(1).getNodeName());
// Add a replica so that sorting by replicaType:TLOG can cause a tie
replicas.add(
new Replica(
"node4",
map(
ZkStateReader.BASE_URL_PROP, "http://host2_2:8983/solr",
ZkStateReader.NODE_NAME_PROP, "node4",
ZkStateReader.CORE_NAME_PROP, "collection1",
ZkStateReader.REPLICA_TYPE, "TLOG"
)
)
);
// replicaType and replicaLocation combined rule
rules = StrUtils.splitSmart(
ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":NRT," +
ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":TLOG," +
ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION + ":http://host2_2",
','
);
comparator = new HttpShardHandlerFactory.NodePreferenceRulesComparator(rules, null);
replicas.sort(comparator);
assertEquals("node1", replicas.get(0).getNodeName());
assertEquals("node4", replicas.get(1).getNodeName());
assertEquals("node2", replicas.get(2).getNodeName());
assertEquals("node3", replicas.get(3).getNodeName());
// Bad rule
rules = StrUtils.splitSmart(ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE, ',');
try {
comparator = new HttpShardHandlerFactory.NodePreferenceRulesComparator(rules, null);
replicas.sort(comparator);
fail();
} catch (IllegalArgumentException e) {
assertEquals("Invalid shards.preference rule: " + ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE, e.getMessage());
}
// Unknown rule
rules = StrUtils.splitSmart("badRule:test", ',');
try {
comparator = new HttpShardHandlerFactory.NodePreferenceRulesComparator(rules, null);
replicas.sort(comparator);
fail();
} catch (IllegalArgumentException e) {
assertEquals("Invalid shards.preference type: badRule", e.getMessage());
}
}
} }

View File

@ -138,6 +138,46 @@ For example, a deadlock might occur in the case of two shards, each with just a
== preferLocalShards Parameter == preferLocalShards Parameter
Deprecated, use `shards.preference=replica.location:local` instead (see below).
Solr allows you to pass an optional boolean parameter named `preferLocalShards` to indicate that a distributed query should prefer local replicas of a shard when available. In other words, if a query includes `preferLocalShards=true`, then the query controller will look for local replicas to service the query instead of selecting replicas at random from across the cluster. This is useful when a query requests many fields or large fields to be returned per document because it avoids moving large amounts of data over the network when it is available locally. In addition, this feature can be useful for minimizing the impact of a problematic replica with degraded performance, as it reduces the likelihood that the degraded replica will be hit by other healthy replicas. Solr allows you to pass an optional boolean parameter named `preferLocalShards` to indicate that a distributed query should prefer local replicas of a shard when available. In other words, if a query includes `preferLocalShards=true`, then the query controller will look for local replicas to service the query instead of selecting replicas at random from across the cluster. This is useful when a query requests many fields or large fields to be returned per document because it avoids moving large amounts of data over the network when it is available locally. In addition, this feature can be useful for minimizing the impact of a problematic replica with degraded performance, as it reduces the likelihood that the degraded replica will be hit by other healthy replicas.
Lastly, it follows that the value of this feature diminishes as the number of shards in a collection increases because the query controller will have to direct the query to non-local replicas for most of the shards. In other words, this feature is mostly useful for optimizing queries directed towards collections with a small number of shards and many replicas. Also, this option should only be used if you are load balancing requests across all nodes that host replicas for the collection you are querying, as Solr's CloudSolrClient will do. If not load-balancing, this feature can introduce a hotspot in the cluster since queries won't be evenly distributed across the cluster. Lastly, it follows that the value of this feature diminishes as the number of shards in a collection increases because the query controller will have to direct the query to non-local replicas for most of the shards. In other words, this feature is mostly useful for optimizing queries directed towards collections with a small number of shards and many replicas. Also, this option should only be used if you are load balancing requests across all nodes that host replicas for the collection you are querying, as Solr's CloudSolrClient will do. If not load-balancing, this feature can introduce a hotspot in the cluster since queries won't be evenly distributed across the cluster.
== shards.preference Parameter
Solr allows you to pass an optional string parameter named `shards.preference` to indicate that a distributed query should sort the available replicas in the given order of precedence within each shard. The syntax is: `shards.preference=property:value`. The order of the properties and the values are significant meaning that the first one is the primary sort, second one is secondary etc.
IMPORTANT: `shards.preference` only works for distributed queries, i.e. queries targeting multiple shards. Not implemented yet for single shard scenarios
The properties that can be specified are as follows:
`replica.type`::
One or more replica types that are preferred. Any combination of PULL, TLOG and NRT is allowed.
`replica.location`::
One or more replica locations that are preferred. A location starts with `http://hostname:port`. Matching is done for the given string as a prefix, so it's possible to e.g. leave out the port. `local` may be used as special value to denote any local replica running on the same Solr instance as the one handling the query. This is useful when a query requests many fields or large fields to be returned per document because it avoids moving large amounts of data over the network when it is available locally. In addition, this feature can be useful for minimizing the impact of a problematic replica with degraded performance, as it reduces the likelihood that the degraded replica will be hit by other healthy replicas.
The value of `replica.location:local` diminishes as the number of shards (that have no locally-available replicas) in a collection increases because the query controller will have to direct the query to non-local replicas for most of the shards. In other words, this feature is mostly useful for optimizing queries directed towards collections with a small number of shards and many replicas. Also, this option should only be used if you are load balancing requests across all nodes that host replicas for the collection you are querying, as Solr's CloudSolrClient will do. If not load-balancing, this feature can introduce a hotspot in the cluster since queries won't be evenly distributed across the cluster.
Examples:
* Prefer PULL replicas:
`shards.preference=replica.type:PULL`
* Prefer PULL replicas, or TLOG replicas if PULL replicas not available:
`shards.preference=replica.type:PULL,replica.type:TLOG`
* Prefer any local replicas:
`shards.preference=replica.location:local`
* Prefer any replicas on a host called "server1" with "server2" as the secondary option:
`shards.preference=replica.location:http://server1,replica.location:http://server2`
* Prefer PULL replicas if available, otherwise TLOG replicas, and local ones among those:
`shards.preference=replica.type:PULL,replica.type:TLOG,replica.location:local`
* Prefer local replicas, and among them PULL replicas when available TLOG otherwise:
`shards.preference=replica.location:local,replica.type:PULL,replica.type:TLOG`
Note that if you provide the settings in a query string, they need to be properly URL-encoded.

View File

@ -86,6 +86,10 @@ If the PULL replica cannot connect to ZooKeeper, it would be removed from the cl
If the PULL replica dies or is unreachable for any other reason, it won't be query-able. When it rejoins the cluster, it would replicate from the leader and when that is complete, it would be ready to serve queries again. If the PULL replica dies or is unreachable for any other reason, it won't be query-able. When it rejoins the cluster, it would replicate from the leader and when that is complete, it would be ready to serve queries again.
=== Queries with Preferred Replica Types
By default all replicas serve queries. See the section <<distributed-requests.adoc#shards-preference-parameter,shards.preference Parameter>> for details on how to indicate preferred replica types for queries.
== Document Routing == Document Routing
Solr offers the ability to specify the router implementation used by a collection by specifying the `router.name` parameter when <<collections-api.adoc#create,creating your collection>>. Solr offers the ability to specify the router implementation used by a collection by specifying the `router.name` parameter when <<collections-api.adoc#create,creating your collection>>.

View File

@ -269,6 +269,7 @@ public interface CommonParams {
/** /**
* When querying a node, prefer local node's cores for distributed queries. * When querying a node, prefer local node's cores for distributed queries.
* @deprecated Use {@code ShardParams.SHARDS_PREFERENCE}
*/ */
String PREFER_LOCAL_SHARDS = "preferLocalShards"; String PREFER_LOCAL_SHARDS = "preferLocalShards";

View File

@ -52,6 +52,18 @@ public interface ShardParams {
/** query purpose for shard requests */ /** query purpose for shard requests */
String SHARDS_PURPOSE = "shards.purpose"; String SHARDS_PURPOSE = "shards.purpose";
/** Shards sorting rules */
String SHARDS_PREFERENCE = "shards.preference";
/** Replica type sort rule */
String SHARDS_PREFERENCE_REPLICA_TYPE = "replica.type";
/** Replica location sort rule */
String SHARDS_PREFERENCE_REPLICA_LOCATION = "replica.location";
/** Value denoting local replicas */
String REPLICA_LOCAL = "local";
String _ROUTE_ = "_route_"; String _ROUTE_ = "_route_";
/** Force a single-pass distributed query? (true/false) */ /** Force a single-pass distributed query? (true/false) */

View File

@ -416,18 +416,24 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
.commit(getRandomClient(), collectionName); .commit(getRandomClient(), collectionName);
// Run the actual test for 'preferLocalShards' // Run the actual test for 'preferLocalShards'
queryWithPreferLocalShards(getRandomClient(), true, collectionName); queryWithShardsPreferenceRules(getRandomClient(), false, collectionName);
queryWithShardsPreferenceRules(getRandomClient(), true, collectionName);
} }
private void queryWithPreferLocalShards(CloudSolrClient cloudClient, @SuppressWarnings("deprecation")
boolean preferLocalShards, private void queryWithShardsPreferenceRules(CloudSolrClient cloudClient,
boolean useShardsPreference,
String collectionName) String collectionName)
throws Exception throws Exception
{ {
SolrQuery qRequest = new SolrQuery("*:*"); SolrQuery qRequest = new SolrQuery("*:*");
ModifiableSolrParams qParams = new ModifiableSolrParams(); ModifiableSolrParams qParams = new ModifiableSolrParams();
qParams.add(CommonParams.PREFER_LOCAL_SHARDS, Boolean.toString(preferLocalShards)); if (useShardsPreference) {
qParams.add(ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION + ":" + ShardParams.REPLICA_LOCAL);
} else {
qParams.add(CommonParams.PREFER_LOCAL_SHARDS, "true");
}
qParams.add(ShardParams.SHARDS_INFO, "true"); qParams.add(ShardParams.SHARDS_INFO, "true");
qRequest.add(qParams); qRequest.add(qParams);
@ -454,17 +460,15 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
log.info("Shards giving the response: " + Arrays.toString(shardAddresses.toArray())); log.info("Shards giving the response: " + Arrays.toString(shardAddresses.toArray()));
// Make sure the distributed queries were directed to a single node only // Make sure the distributed queries were directed to a single node only
if (preferLocalShards) { Set<Integer> ports = new HashSet<Integer>();
Set<Integer> ports = new HashSet<Integer>(); for (String shardAddr: shardAddresses) {
for (String shardAddr: shardAddresses) { URL url = new URL (shardAddr);
URL url = new URL (shardAddr); ports.add(url.getPort());
ports.add(url.getPort());
}
// This assertion would hold true as long as every shard has a core on each node
assertTrue ("Response was not received from shards on a single node",
shardAddresses.size() > 1 && ports.size()==1);
} }
// This assertion would hold true as long as every shard has a core on each node
assertTrue ("Response was not received from shards on a single node",
shardAddresses.size() > 1 && ports.size()==1);
} }
private Long getNumRequests(String baseUrl, String collectionName) throws private Long getNumRequests(String baseUrl, String collectionName) throws
@ -844,4 +848,115 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
} }
} }
/**
* Tests if the specification of 'preferReplicaTypes' in the query-params
* limits the distributed query to locally hosted shards only
*/
@Test
public void preferReplicaTypesTest() throws Exception {
String collectionName = "replicaTypesTestColl";
int liveNodes = cluster.getJettySolrRunners().size();
// For these tests we need to have multiple replica types.
// Hence the below configuration for our collection
CollectionAdminRequest.createCollection(collectionName, "conf", liveNodes, 1, 1, Math.max(1, liveNodes - 2))
.setMaxShardsPerNode(liveNodes)
.processAndWait(cluster.getSolrClient(), TIMEOUT);
AbstractDistribZkTestBase.waitForRecoveriesToFinish(collectionName, cluster.getSolrClient().getZkStateReader(), false, true, TIMEOUT);
// Add some new documents
new UpdateRequest()
.add(id, "0", "a_t", "hello1")
.add(id, "2", "a_t", "hello2")
.add(id, "3", "a_t", "hello2")
.commit(getRandomClient(), collectionName);
// Run the actual tests for 'shards.preference=replica.type:*'
queryWithPreferReplicaTypes(getRandomClient(), "PULL", false, collectionName);
queryWithPreferReplicaTypes(getRandomClient(), "PULL|TLOG", false, collectionName);
queryWithPreferReplicaTypes(getRandomClient(), "TLOG", false, collectionName);
queryWithPreferReplicaTypes(getRandomClient(), "TLOG|PULL", false, collectionName);
queryWithPreferReplicaTypes(getRandomClient(), "NRT", false, collectionName);
queryWithPreferReplicaTypes(getRandomClient(), "NRT|PULL", false, collectionName);
// Test to verify that preferLocalShards=true doesn't break this
queryWithPreferReplicaTypes(getRandomClient(), "PULL", true, collectionName);
queryWithPreferReplicaTypes(getRandomClient(), "PULL|TLOG", true, collectionName);
queryWithPreferReplicaTypes(getRandomClient(), "TLOG", true, collectionName);
queryWithPreferReplicaTypes(getRandomClient(), "TLOG|PULL", true, collectionName);
queryWithPreferReplicaTypes(getRandomClient(), "NRT", false, collectionName);
queryWithPreferReplicaTypes(getRandomClient(), "NRT|PULL", true, collectionName);
}
private void queryWithPreferReplicaTypes(CloudSolrClient cloudClient,
String preferReplicaTypes,
boolean preferLocalShards,
String collectionName)
throws Exception
{
SolrQuery qRequest = new SolrQuery("*:*");
ModifiableSolrParams qParams = new ModifiableSolrParams();
final List<String> preferredTypes = Arrays.asList(preferReplicaTypes.split("\\|"));
StringBuilder rule = new StringBuilder();
preferredTypes.forEach(type -> {
if (rule.length() != 0) {
rule.append(',');
}
rule.append(ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE);
rule.append(':');
rule.append(type);
});
if (preferLocalShards) {
if (rule.length() != 0) {
rule.append(',');
}
rule.append(ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION);
rule.append(":local");
}
qParams.add(ShardParams.SHARDS_PREFERENCE, rule.toString());
qParams.add(ShardParams.SHARDS_INFO, "true");
qRequest.add(qParams);
// CloudSolrClient sends the request to some node.
// And since all the nodes are hosting cores from all shards, the
// distributed query formed by this node will select cores from the
// local shards only
QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo);
Map<String, String> replicaTypeMap = new HashMap<String, String>();
DocCollection collection = getCollectionState(collectionName);
for (Slice slice : collection.getSlices()) {
for (Replica replica : slice.getReplicas()) {
String coreUrl = replica.getCoreUrl();
// It seems replica reports its core URL with a trailing slash while shard
// info returned from the query doesn't. Oh well.
if (coreUrl.endsWith("/")) {
coreUrl = coreUrl.substring(0, coreUrl.length() - 1);
}
replicaTypeMap.put(coreUrl, replica.getType().toString());
}
}
// Iterate over shards-info and check that replicas of correct type responded
SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>)shardsInfo;
Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
List<String> shardAddresses = new ArrayList<String>();
while (itr.hasNext()) {
Map.Entry<String, ?> e = itr.next();
assertTrue("Did not find map-type value in "+ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
String shardAddress = (String)((Map)e.getValue()).get("shardAddress");
assertNotNull(ShardParams.SHARDS_INFO+" did not return 'shardAddress' parameter", shardAddress);
assertTrue(replicaTypeMap.containsKey(shardAddress));
assertTrue(preferredTypes.indexOf(replicaTypeMap.get(shardAddress)) == 0);
shardAddresses.add(shardAddress);
}
assertTrue("No responses", shardAddresses.size() > 0);
log.info("Shards giving the response: " + Arrays.toString(shardAddresses.toArray()));
}
} }