SOLR-12217: Support shards.preference in SolrJ for individual shard requests (#984)

This commit is contained in:
Houston Putman 2019-12-09 17:11:58 -05:00 committed by GitHub
parent 1eaa5b5ac2
commit 1c78d2c637
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 395 additions and 68 deletions

View File

@ -134,6 +134,8 @@ New Features
* SOLR-13912: Add 'countvals' aggregation in JSON FacetModule (hossman, Munendra S N)
* SOLR-12217: Support shards.preference in SolrJ for single shard collections. The parameter is now used by the CloudSolrClient and Streaming Expressions. (Houston Putman, Tomas Fernandez-Lobbe)
Improvements
---------------------

View File

@ -42,7 +42,10 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
@ -162,10 +165,29 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
return;
}
final SolrCore core = req.getCore(); // explicit check for null core (temporary?, for tests)
ZkController zkController = core == null ? null : core.getCoreContainer().getZkController();
RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator;
if (zkController != null) {
requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator(
zkController.getZkStateReader().getClusterProperties()
.getOrDefault(ZkStateReader.DEFAULT_SHARD_PREFERENCES, "")
.toString(),
zkController.getNodeName(),
zkController.getBaseUrl(),
zkController.getSysPropsCacher()
);
} else {
requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator();
}
int worker = params.getInt("workerID", 0);
int numWorkers = params.getInt("numWorkers", 1);
boolean local = params.getBool("streamLocalOnly", false);
StreamContext context = new StreamContext();
context.setRequestParams(params);
context.setRequestReplicaListTransformerGenerator(requestReplicaListTransformerGenerator);
context.put("shards", getCollectionShards(params));
context.workerID = worker;
context.numWorkers = numWorkers;

View File

@ -160,7 +160,7 @@ Solr allows you to pass an optional string parameter named `shards.preference` t
The syntax is: `shards.preference=_property_:__value__`. The order of the properties and the values are significant: the first one is the primary sort, the second is secondary, etc.
IMPORTANT: `shards.preference` only works for distributed queries, i.e., queries targeting multiple shards. Single shard scenarios are not supported.
IMPORTANT: `shards.preference` is supported for single shard scenarios when using the SolrJ clients.
The properties that can be specified are as follows:

View File

@ -123,6 +123,15 @@ for the entire expression, it may be faster for the client to send the expressio
`&streamLocalOnly=true` and handle merging of the results (if required) locally. This is an advanced option, relying
on a convenient organization of the index, and should only be considered if normal usage poses a performance issue.
=== Request Routing
Streaming Expressions respect the <<distributed-requests.adoc#shards-preference-parameter,shards.preference parameter>> for any call to Solr.
The value of `shards.preference` that is used to route requests is determined in the following order. The first option available is used.
- Provided as a parameter in the streaming expression (e.g. `search(...., shards.preference="replica.type:PULL")`)
- Provided in the URL Params of the streaming expression (e.g. `http://solr_url:8983/solr/stream?expr=....&shards.preference=replica.type:PULL`)
- Set as a default in the Cluster properties.
=== Adding Custom Expressions
Creating your own custom expressions can be easily done by implementing the {solr-javadocs}/solr-solrj/org/apache/solr/client/solrj/io/stream/expr/Expressible.html[Expressible] interface. To add a custom expression to the
@ -132,7 +141,6 @@ list of known mappings for the `/stream` handler, you just need to declare it as
<expressible name="custom" class="org.example.CustomStreamingExpression"/>
== Types of Streaming Expressions
=== About Stream Sources

View File

@ -120,6 +120,14 @@ include::{example-source-dir}UsingSolrJRefGuideExamplesTest.java[tag=solrj-solrc
When these values are not explicitly provided, SolrJ falls back to using the defaults for the OS/environment is running on.
=== Cloud Request Routing
The SolrJ `CloudSolrClient` implementations (`CloudSolrClient` and `CloudHttp2SolrClient`) respect the <<distributed-requests.adoc#shards-preference-parameter,shards.preference parameter>>.
Therefore requests sent to single-sharded collections, using either of the above clients, will route requests the same way that distributed requests are routed to individual shards.
If no `shards.preference` parameter is provided, the clients will default to sorting replicas randomly.
For update requests, while the replicas are sorted in the order defined by the request, leader replicas will always be sorted first.
== Querying in SolrJ
`SolrClient` has a number of `query()` methods for fetching results from Solr. Each of these methods takes in a `SolrParams`,an object encapsulating arbitrary query-parameters. And each method outputs a `QueryResponse`, a wrapper which can be used to access the result documents and other related metadata.

View File

@ -44,6 +44,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.ResponseParser;
import org.apache.solr.client.solrj.SolrClient;
@ -55,6 +56,8 @@ import org.apache.solr.client.solrj.request.IsUpdateRequest;
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
@ -69,7 +72,6 @@ import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
@ -100,6 +102,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
private final boolean updatesToLeaders;
private final boolean directUpdatesToLeadersOnly;
private final RequestReplicaListTransformerGenerator requestRLTGenerator;
boolean parallelUpdates; //TODO final
private ExecutorService threadPool = ExecutorUtil
.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory(
@ -221,6 +224,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
this.updatesToLeaders = updatesToLeaders;
this.parallelUpdates = parallelUpdates;
this.directUpdatesToLeadersOnly = directUpdatesToLeadersOnly;
this.requestRLTGenerator = new RequestReplicaListTransformerGenerator();
}
/** Sets the cache ttl for DocCollection Objects cached . This is only applicable for collections which are persisted outside of clusterstate.json
@ -467,6 +471,8 @@ public abstract class BaseCloudSolrClient extends SolrClient {
for(String param : NON_ROUTABLE_PARAMS) {
routableParams.remove(param);
}
} else {
params = new ModifiableSolrParams();
}
if (collection == null) {
@ -492,10 +498,12 @@ public abstract class BaseCloudSolrClient extends SolrClient {
return null;
}
ReplicaListTransformer replicaListTransformer = requestRLTGenerator.getReplicaListTransformer(params);
//Create the URL map, which is keyed on slice name.
//The value is a list of URLs for each replica in the slice.
//The first value in the list is the leader for the slice.
final Map<String,List<String>> urlMap = buildUrlMap(col);
final Map<String,List<String>> urlMap = buildUrlMap(col, replicaListTransformer);
final Map<String, ? extends LBSolrClient.Req> routes = createRoutes(updateRequest, routableParams, col, router, urlMap, idField);
if (routes == null) {
if (directUpdatesToLeadersOnly && hasInfoToFindLeaders(updateRequest, idField)) {
@ -616,12 +624,12 @@ public abstract class BaseCloudSolrClient extends SolrClient {
return urlMap == null ? null : updateRequest.getRoutesToCollection(router, col, urlMap, routableParams, idField);
}
private Map<String,List<String>> buildUrlMap(DocCollection col) {
private Map<String,List<String>> buildUrlMap(DocCollection col, ReplicaListTransformer replicaListTransformer) {
Map<String, List<String>> urlMap = new HashMap<>();
Slice[] slices = col.getActiveSlicesArr();
for (Slice slice : slices) {
String name = slice.getName();
List<String> urls = new ArrayList<>();
List<Replica> sortedReplicas = new ArrayList<>();
Replica leader = slice.getLeader();
if (directUpdatesToLeadersOnly && leader == null) {
for (Replica replica : slice.getReplicas(
@ -638,20 +646,22 @@ public abstract class BaseCloudSolrClient extends SolrClient {
// take unoptimized general path - we cannot find a leader yet
return null;
}
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
String url = zkProps.getCoreUrl();
urls.add(url);
if (!directUpdatesToLeadersOnly) {
for (Replica replica : slice.getReplicas()) {
if (!replica.getNodeName().equals(leader.getNodeName()) &&
!replica.getName().equals(leader.getName())) {
ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
String url1 = zkProps1.getCoreUrl();
urls.add(url1);
if (!replica.equals(leader)) {
sortedReplicas.add(replica);
}
}
}
urlMap.put(name, urls);
// Sort the non-leader replicas according to the request parameters
replicaListTransformer.transform(sortedReplicas);
// put the leaderUrl first.
sortedReplicas.add(0, leader);
urlMap.put(name, sortedReplicas.stream().map(Replica::getCoreUrl).collect(Collectors.toList()));
}
return urlMap;
}
@ -1046,6 +1056,8 @@ public abstract class BaseCloudSolrClient extends SolrClient {
reqParams = new ModifiableSolrParams();
}
ReplicaListTransformer replicaListTransformer = requestRLTGenerator.getReplicaListTransformer(reqParams);
final Set<String> liveNodes = getClusterStateProvider().getLiveNodes();
final List<String> theUrlList = new ArrayList<>(); // we populate this as follows...
@ -1087,34 +1099,38 @@ public abstract class BaseCloudSolrClient extends SolrClient {
}
// Gather URLs, grouped by leader or replica
// TODO: allow filtering by group, role, etc
Set<String> seenNodes = new HashSet<>();
List<String> replicas = new ArrayList<>();
String joinedInputCollections = StrUtils.join(inputCollections, ',');
List<Replica> sortedReplicas = new ArrayList<>();
List<Replica> replicas = new ArrayList<>();
for (Slice slice : slices.values()) {
for (ZkNodeProps nodeProps : slice.getReplicasMap().values()) {
ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
String node = coreNodeProps.getNodeName();
Replica leader = slice.getLeader();
for (Replica replica : slice.getReplicas()) {
String node = replica.getNodeName();
if (!liveNodes.contains(node) // Must be a live node to continue
|| Replica.State.getState(coreNodeProps.getState()) != Replica.State.ACTIVE) // Must be an ACTIVE replica to continue
|| replica.getState() != Replica.State.ACTIVE) // Must be an ACTIVE replica to continue
continue;
if (seenNodes.add(node)) { // if we haven't yet collected a URL to this node...
String url = ZkCoreNodeProps.getCoreUrl(nodeProps.getStr(ZkStateReader.BASE_URL_PROP), joinedInputCollections);
if (sendToLeaders && coreNodeProps.isLeader()) {
theUrlList.add(url); // put leaders here eagerly (if sendToLeader mode)
} else {
replicas.add(url); // replicas here
}
if (sendToLeaders && replica.equals(leader)) {
sortedReplicas.add(replica); // put leaders here eagerly (if sendToLeader mode)
} else {
replicas.add(replica); // replicas here
}
}
}
// Shuffle the leaders, if any (none if !sendToLeaders)
Collections.shuffle(theUrlList, rand);
// Sort the leader replicas, if any, according to the request preferences (none if !sendToLeaders)
replicaListTransformer.transform(sortedReplicas);
// Shuffle the replicas, if any, and append to our list
Collections.shuffle(replicas, rand);
theUrlList.addAll(replicas);
// Sort the replicas, if any, according to the request preferences and append to our list
replicaListTransformer.transform(replicas);
sortedReplicas.addAll(replicas);
String joinedInputCollections = StrUtils.join(inputCollections, ',');
Set<String> seenNodes = new HashSet<>();
sortedReplicas.forEach( replica -> {
if (seenNodes.add(replica.getNodeName())) {
theUrlList.add(ZkCoreNodeProps.getCoreUrl(replica.getBaseUrl(), joinedInputCollections));
}
});
if (theUrlList.isEmpty()) {
collectionStateCache.keySet().removeAll(collectionNames);

View File

@ -371,12 +371,13 @@ public class CloudSolrStream extends TupleStream implements Expressible {
protected void constructStreams() throws IOException {
try {
List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
mParams = adjustParams(mParams);
mParams.set(DISTRIB, "false"); // We are the aggregator.
List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext, mParams);
for(String shardUrl : shardUrls) {
SolrStream solrStream = new SolrStream(shardUrl, mParams);
if(streamContext != null) {

View File

@ -309,12 +309,12 @@ public class DeepRandomStream extends TupleStream implements Expressible {
protected void constructStreams() throws IOException {
try {
List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
mParams = adjustParams(mParams);
mParams.set(DISTRIB, "false"); // We are the aggregator.
List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext, mParams);
String rows = mParams.get(ROWS);
int r = Integer.parseInt(rows);
int newRows = r/shardUrls.size();

View File

@ -24,6 +24,8 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.solr.client.solrj.io.ModelCache;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.common.params.SolrParams;
/**
* The StreamContext is passed to TupleStreams using the TupleStream.setStreamContext() method.
@ -46,6 +48,8 @@ public class StreamContext implements Serializable {
private ModelCache modelCache;
private StreamFactory streamFactory;
private boolean local;
private SolrParams requestParams;
private RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator;
public ConcurrentMap getObjectCache() {
return this.objectCache;
@ -110,4 +114,20 @@ public class StreamContext implements Serializable {
public boolean isLocal() {
return local;
}
public void setRequestParams(SolrParams requestParams) {
this.requestParams = requestParams;
}
public SolrParams getRequestParams() {
return requestParams;
}
public void setRequestReplicaListTransformerGenerator(RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator) {
this.requestReplicaListTransformerGenerator = requestReplicaListTransformerGenerator;
}
public RequestReplicaListTransformerGenerator getRequestReplicaListTransformerGenerator() {
return requestReplicaListTransformerGenerator;
}
}

View File

@ -21,26 +21,28 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.Map;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.common.IteratorWriter;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
/**
@ -118,6 +120,14 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
String collection,
StreamContext streamContext)
throws IOException {
return getShards(zkHost, collection, streamContext, new ModifiableSolrParams());
}
public static List<String> getShards(String zkHost,
String collection,
StreamContext streamContext,
SolrParams requestParams)
throws IOException {
Map<String, List<String>> shardsMap = null;
List<String> shards = new ArrayList();
@ -130,24 +140,34 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
shards = shardsMap.get(collection);
} else {
//SolrCloud Sharding
CloudSolrClient cloudSolrClient = streamContext.getSolrClientCache().getCloudSolrClient(zkHost);
CloudSolrClient cloudSolrClient =
Optional.ofNullable(streamContext.getSolrClientCache()).orElseGet(SolrClientCache::new).getCloudSolrClient(zkHost);
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
Slice[] slices = CloudSolrStream.getSlices(collection, zkStateReader, true);
Set<String> liveNodes = clusterState.getLiveNodes();
ModifiableSolrParams solrParams = new ModifiableSolrParams(streamContext.getRequestParams());
solrParams.add(requestParams);
RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator =
Optional.ofNullable(streamContext.getRequestReplicaListTransformerGenerator()).orElseGet(RequestReplicaListTransformerGenerator::new);
ReplicaListTransformer replicaListTransformer = requestReplicaListTransformerGenerator.getReplicaListTransformer(solrParams);
for(Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
List<Replica> shuffler = new ArrayList<>();
for(Replica replica : replicas) {
if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
shuffler.add(replica);
List<Replica> sortedReplicas = new ArrayList<>();
for(Replica replica : slice.getReplicas()) {
if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
sortedReplicas.add(replica);
}
}
Collections.shuffle(shuffler, new Random());
Replica rep = shuffler.get(0);
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
String url = zkProps.getCoreUrl();
shards.add(url);
replicaListTransformer.transform(sortedReplicas);
if (sortedReplicas.size() > 0) {
shards.add(sortedReplicas.get(0).getCoreUrl());
}
}
}
Object core = streamContext.get("core");

View File

@ -166,7 +166,7 @@ public class NodePreferenceRulesComparator implements Comparator<Object> {
return false;
}
final String s = ((Replica)o).getType().toString();
return s.equals(preferred);
return s.equalsIgnoreCase(preferred);
}
public List<PreferenceRule> getSortRules() {

View File

@ -20,6 +20,7 @@ import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import org.apache.solr.common.SolrException;
@ -41,9 +42,13 @@ public class RequestReplicaListTransformerGenerator {
(String configSpec, SolrParams requestParams, ReplicaListTransformerFactory fallback) -> shufflingReplicaListTransformer;
private final ReplicaListTransformerFactory stableRltFactory;
private final ReplicaListTransformerFactory defaultRltFactory;
private final String defaultShardPreferences;
private final String nodeName;
private final String localHostAddress;
private final NodesSysPropsCacher sysPropsCacher;
public RequestReplicaListTransformerGenerator() {
this(RANDOM_RLTF);
this(null);
}
public RequestReplicaListTransformerGenerator(ReplicaListTransformerFactory defaultRltFactory) {
@ -51,16 +56,24 @@ public class RequestReplicaListTransformerGenerator {
}
public RequestReplicaListTransformerGenerator(ReplicaListTransformerFactory defaultRltFactory, ReplicaListTransformerFactory stableRltFactory) {
this.defaultRltFactory = defaultRltFactory;
if (stableRltFactory == null) {
this.stableRltFactory = new AffinityReplicaListTransformerFactory();
} else {
this.stableRltFactory = stableRltFactory;
}
this(defaultRltFactory, stableRltFactory, null, null, null, null);
}
public RequestReplicaListTransformerGenerator(String defaultShardPreferences, String nodeName, String localHostAddress, NodesSysPropsCacher sysPropsCacher) {
this(null, null, defaultShardPreferences, nodeName, localHostAddress, sysPropsCacher);
}
public RequestReplicaListTransformerGenerator(ReplicaListTransformerFactory defaultRltFactory, ReplicaListTransformerFactory stableRltFactory, String defaultShardPreferences, String nodeName, String localHostAddress, NodesSysPropsCacher sysPropsCacher) {
this.defaultRltFactory = Optional.ofNullable(defaultRltFactory).orElse(RANDOM_RLTF);
this.stableRltFactory = Optional.ofNullable(stableRltFactory).orElseGet(AffinityReplicaListTransformerFactory::new);
this.defaultShardPreferences = Optional.ofNullable(defaultShardPreferences).orElse("");
this.nodeName = nodeName;
this.localHostAddress = localHostAddress;
this.sysPropsCacher = sysPropsCacher;
}
public ReplicaListTransformer getReplicaListTransformer(final SolrParams requestParams) {
return getReplicaListTransformer(requestParams, "");
return getReplicaListTransformer(requestParams, null);
}
public ReplicaListTransformer getReplicaListTransformer(final SolrParams requestParams, String defaultShardPreferences) {
@ -70,6 +83,7 @@ public class RequestReplicaListTransformerGenerator {
public ReplicaListTransformer getReplicaListTransformer(final SolrParams requestParams, String defaultShardPreferences, String nodeName, String localHostAddress, NodesSysPropsCacher sysPropsCacher) {
@SuppressWarnings("deprecation")
final boolean preferLocalShards = requestParams.getBool(CommonParams.PREFER_LOCAL_SHARDS, false);
defaultShardPreferences = Optional.ofNullable(defaultShardPreferences).orElse(this.defaultShardPreferences);
final String shardsPreferenceSpec = requestParams.get(ShardParams.SHARDS_PREFERENCE, defaultShardPreferences);
if (preferLocalShards || !shardsPreferenceSpec.isEmpty()) {
@ -84,7 +98,15 @@ public class RequestReplicaListTransformerGenerator {
preferenceRules.add(new PreferenceRule(ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION, ShardParams.REPLICA_LOCAL));
}
NodePreferenceRulesComparator replicaComp = new NodePreferenceRulesComparator(preferenceRules, requestParams, nodeName, localHostAddress, sysPropsCacher, defaultRltFactory, stableRltFactory);
NodePreferenceRulesComparator replicaComp =
new NodePreferenceRulesComparator(
preferenceRules,
requestParams,
Optional.ofNullable(nodeName).orElse(this.nodeName),
Optional.ofNullable(localHostAddress).orElse(this.localHostAddress),
Optional.ofNullable(sysPropsCacher).orElse(this.sysPropsCacher),
defaultRltFactory,
stableRltFactory);
ReplicaListTransformer baseReplicaListTransformer = replicaComp.getBaseReplicaListTransformer();
if (replicaComp.getSortRules() == null) {
// only applying base transformation

View File

@ -102,7 +102,7 @@ public class Replica extends ZkNodeProps {
PULL;
public static Type get(String name){
return name == null ? Type.NRT : Type.valueOf(name);
return name == null ? Type.NRT : Type.valueOf(name.toUpperCase(Locale.ROOT));
}
}

View File

@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@ -488,6 +489,74 @@ public class CloudHttp2SolrClientTest extends SolrCloudTestCase {
shardAddresses.size() > 1 && ports.size()==1);
}
/**
* Tests if the 'shards.preference' parameter works with single-sharded collections.
*/
@Test
public void singleShardedPreferenceRules() throws Exception {
String collectionName = "singleShardPreferenceTestColl";
int liveNodes = cluster.getJettySolrRunners().size();
// For testing replica.type, we want to have all replica types available for the collection
CollectionAdminRequest.createCollection(collectionName, "conf", 1, liveNodes/3, liveNodes/3, liveNodes/3)
.setMaxShardsPerNode(liveNodes)
.processAndWait(cluster.getSolrClient(), TIMEOUT);
cluster.waitForActiveCollection(collectionName, 1, liveNodes);
// Add some new documents
new UpdateRequest()
.add(id, "0", "a_t", "hello1")
.add(id, "2", "a_t", "hello2")
.add(id, "3", "a_t", "hello2")
.commit(getRandomClient(), collectionName);
// Run the actual test for 'queryReplicaType'
queryReplicaType(getRandomClient(), Replica.Type.PULL, collectionName);
queryReplicaType(getRandomClient(), Replica.Type.TLOG, collectionName);
queryReplicaType(getRandomClient(), Replica.Type.NRT, collectionName);
}
private void queryReplicaType(CloudHttp2SolrClient cloudClient,
Replica.Type typeToQuery,
String collectionName)
throws Exception
{
SolrQuery qRequest = new SolrQuery("*:*");
ModifiableSolrParams qParams = new ModifiableSolrParams();
qParams.add(ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":" + typeToQuery.toString());
qParams.add(ShardParams.SHARDS_INFO, "true");
qRequest.add(qParams);
Map<String, String> replicaTypeToReplicas = mapReplicasToReplicaType(getCollectionState(collectionName));
QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo);
// Iterate over shards-info and check what cores responded
SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>)shardsInfo;
Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
List<String> shardAddresses = new ArrayList<String>();
while (itr.hasNext()) {
Map.Entry<String, ?> e = itr.next();
assertTrue("Did not find map-type value in "+ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
String shardAddress = (String)((Map)e.getValue()).get("shardAddress");
if (shardAddress.endsWith("/")) {
shardAddress = shardAddress.substring(0, shardAddress.length() - 1);
}
assertNotNull(ShardParams.SHARDS_INFO+" did not return 'shardAddress' parameter", shardAddress);
shardAddresses.add(shardAddress);
}
assertEquals("Shard addresses must be of size 1, since there is only 1 shard in the collection", 1, shardAddresses.size());
assertEquals("Make sure that the replica queried was the replicaType desired", typeToQuery.toString().toUpperCase(Locale.ROOT), replicaTypeToReplicas.get(shardAddresses.get(0)).toUpperCase(Locale.ROOT));
}
private Long getNumRequests(String baseUrl, String collectionName) throws
SolrServerException, IOException {
return getNumRequests(baseUrl, collectionName, "QUERY", "/select", null, false);

View File

@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@ -427,8 +428,8 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
@SuppressWarnings("deprecation")
private void queryWithShardsPreferenceRules(CloudSolrClient cloudClient,
boolean useShardsPreference,
String collectionName)
boolean useShardsPreference,
String collectionName)
throws Exception
{
SolrQuery qRequest = new SolrQuery("*:*");
@ -476,6 +477,72 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
shardAddresses.size() > 1 && ports.size()==1);
}
/**
* Tests if the 'shards.preference' parameter works with single-sharded collections.
*/
@Test
public void singleShardedPreferenceRules() throws Exception {
String collectionName = "singleShardPreferenceTestColl";
int liveNodes = cluster.getJettySolrRunners().size();
// For testing replica.type, we want to have all replica types available for the collection
CollectionAdminRequest.createCollection(collectionName, "conf", 1, liveNodes/3, liveNodes/3, liveNodes/3)
.setMaxShardsPerNode(liveNodes)
.processAndWait(cluster.getSolrClient(), TIMEOUT);
cluster.waitForActiveCollection(collectionName, 1, liveNodes);
// Add some new documents
new UpdateRequest()
.add(id, "0", "a_t", "hello1")
.add(id, "2", "a_t", "hello2")
.add(id, "3", "a_t", "hello2")
.commit(getRandomClient(), collectionName);
// Run the actual test for 'queryReplicaType'
queryReplicaType(getRandomClient(), Replica.Type.PULL, collectionName);
queryReplicaType(getRandomClient(), Replica.Type.TLOG, collectionName);
queryReplicaType(getRandomClient(), Replica.Type.NRT, collectionName);
}
private void queryReplicaType(CloudSolrClient cloudClient,
Replica.Type typeToQuery,
String collectionName)
throws Exception
{
SolrQuery qRequest = new SolrQuery("*:*");
ModifiableSolrParams qParams = new ModifiableSolrParams();
qParams.add(ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":" + typeToQuery.toString());
qParams.add(ShardParams.SHARDS_INFO, "true");
qRequest.add(qParams);
Map<String, String> replicaTypeToReplicas = mapReplicasToReplicaType(getCollectionState(collectionName));
QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo);
// Iterate over shards-info and check what cores responded
SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>)shardsInfo;
Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
List<String> shardAddresses = new ArrayList<String>();
while (itr.hasNext()) {
Map.Entry<String, ?> e = itr.next();
assertTrue("Did not find map-type value in "+ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
String shardAddress = (String)((Map)e.getValue()).get("shardAddress");
if (shardAddress.endsWith("/")) {
shardAddress = shardAddress.substring(0, shardAddress.length() - 1);
}
assertNotNull(ShardParams.SHARDS_INFO+" did not return 'shardAddress' parameter", shardAddress);
shardAddresses.add(shardAddress);
}
assertEquals("Shard addresses must be of size 1, since there is only 1 shard in the collection", 1, shardAddresses.size());
assertEquals("Make sure that the replica queried was the replicaType desired", typeToQuery.toString().toUpperCase(Locale.ROOT), replicaTypeToReplicas.get(shardAddresses.get(0)).toUpperCase(Locale.ROOT));
}
private Long getNumRequests(String baseUrl, String collectionName) throws
SolrServerException, IOException {
return getNumRequests(baseUrl, collectionName, "QUERY", "/select", null, false);

View File

@ -48,11 +48,13 @@ import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams;
import org.junit.Assume;
import org.junit.Before;
@ -71,6 +73,7 @@ import org.junit.Test;
public class StreamingTest extends SolrCloudTestCase {
public static final String COLLECTIONORALIAS = "streams";
public static final String MULTI_REPLICA_COLLECTIONORALIAS = "streams-multi-replica";
private static final StreamFactory streamFactory = new StreamFactory()
.withFunctionName("search", CloudSolrStream.class)
@ -103,7 +106,8 @@ public static void configureCluster() throws Exception {
} else {
collection = COLLECTIONORALIAS;
}
CollectionAdminRequest.createCollection(collection, "conf", numShards, 1).process(cluster.getSolrClient());
CollectionAdminRequest.createCollection(collection, "conf", numShards, 1)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(collection, numShards, numShards);
if (useAlias) {
CollectionAdminRequest.createAlias(COLLECTIONORALIAS, collection).process(cluster.getSolrClient());
@ -111,6 +115,20 @@ public static void configureCluster() throws Exception {
zkHost = cluster.getZkServer().getZkAddress();
streamFactory.withCollectionZkHost(COLLECTIONORALIAS, zkHost);
// Set up multi-replica collection
if (useAlias) {
collection = MULTI_REPLICA_COLLECTIONORALIAS + "_collection";
} else {
collection = MULTI_REPLICA_COLLECTIONORALIAS;
}
CollectionAdminRequest.createCollection(collection, "conf", numShards, 1, 1, 1)
.setMaxShardsPerNode(numShards * 3)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(collection, numShards, numShards * 3);
if (useAlias) {
CollectionAdminRequest.createAlias(MULTI_REPLICA_COLLECTIONORALIAS, collection).process(cluster.getSolrClient());
}
}
private static final String id = "id";
@ -2554,6 +2572,43 @@ public void testParallelRankStream() throws Exception {
}
}
@Test
public void testTupleStreamGetShardsPreference() throws Exception {
StreamContext streamContext = new StreamContext();
streamContext.setSolrClientCache(new SolrClientCache());
streamContext.setRequestReplicaListTransformerGenerator(new RequestReplicaListTransformerGenerator(ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":TLOG", null, null, null));
streamContext.setRequestParams(mapParams(ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":nrt"));
try {
ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
List<String> strings = zkStateReader.aliasesManager.getAliases().resolveAliases(MULTI_REPLICA_COLLECTIONORALIAS);
String collName = strings.size() > 0 ? strings.get(0) : MULTI_REPLICA_COLLECTIONORALIAS;
Map<String, String> replicaTypeMap = mapReplicasToReplicaType(zkStateReader.getClusterState().getCollectionOrNull(collName));
// Test from extra params
SolrParams sParams = mapParams("q", "*:*", ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":pull");
testTupleStreamSorting(streamContext, sParams, "PULL", replicaTypeMap);
// Test defaults from streamContext.getParams()
testTupleStreamSorting(streamContext, new ModifiableSolrParams(), "NRT", replicaTypeMap);
// Test defaults from the RLTG
streamContext.setRequestParams(new ModifiableSolrParams());
testTupleStreamSorting(streamContext, new ModifiableSolrParams(), "TLOG", replicaTypeMap);
} finally {
streamContext.getSolrClientCache().close();
}
}
public void testTupleStreamSorting(StreamContext streamContext, SolrParams solrParams, String replicaType, Map<String, String> replicaTypeMap) throws Exception {
List<String> shards = TupleStream.getShards(cluster.getZkClient().getZkServerAddress(), MULTI_REPLICA_COLLECTIONORALIAS, streamContext, solrParams);
for (String shard : shards) {
assertEquals(shard, replicaType.toUpperCase(Locale.ROOT), replicaTypeMap.getOrDefault(shard, "").toUpperCase(Locale.ROOT));
}
}
protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
tupleStream.open();
List<Tuple> tuples = new ArrayList();

View File

@ -490,4 +490,21 @@ public class SolrCloudTestCase extends SolrTestCaseJ4 {
cluster.waitForAllNodes(timeoutSeconds);
}
public static Map<String, String> mapReplicasToReplicaType(DocCollection collection) {
Map<String, String> replicaTypeMap = new HashMap<>();
for (Slice slice : collection.getSlices()) {
for (Replica replica : slice.getReplicas()) {
String coreUrl = replica.getCoreUrl();
// It seems replica reports its core URL with a trailing slash while shard
// info returned from the query doesn't. Oh well. We will include both, just in case
replicaTypeMap.put(coreUrl, replica.getType().toString());
if (coreUrl.endsWith("/")) {
replicaTypeMap.put(coreUrl.substring(0, coreUrl.length() - 1), replica.getType().toString());
}else {
replicaTypeMap.put(coreUrl + "/", replica.getType().toString());
}
}
}
return replicaTypeMap;
}
}