mirror of https://github.com/apache/lucene.git
Merge remote-tracking branch 'origin/master' into gradle-master
This commit is contained in:
commit
20eaaa012d
|
@ -36,7 +36,6 @@ Please review the following and check all that apply:
|
|||
|
||||
- [ ] I have reviewed the guidelines for [How to Contribute](https://wiki.apache.org/solr/HowToContribute) and my code conforms to the standards described there to the best of my ability.
|
||||
- [ ] I have created a Jira issue and added the issue ID to my pull request title.
|
||||
- [ ] I am authorized to contribute this code to the ASF and have removed any code I do not have a license to distribute.
|
||||
- [ ] I have given Solr maintainers [access](https://help.github.com/en/articles/allowing-changes-to-a-pull-request-branch-created-from-a-fork) to contribute to my PR branch. (optional but recommended)
|
||||
- [ ] I have developed this patch against the `master` branch.
|
||||
- [ ] I have run `ant precommit` and the appropriate test suite.
|
||||
|
|
|
@ -134,6 +134,8 @@ New Features
|
|||
|
||||
* SOLR-13912: Add 'countvals' aggregation in JSON FacetModule (hossman, Munendra S N)
|
||||
|
||||
* SOLR-12217: Support shards.preference in SolrJ for single shard collections. The parameter is now used by the CloudSolrClient and Streaming Expressions. (Houston Putman, Tomas Fernandez-Lobbe)
|
||||
|
||||
Improvements
|
||||
---------------------
|
||||
|
||||
|
@ -216,6 +218,11 @@ Bug Fixes
|
|||
|
||||
* SOLR-13977: solr create -c not working under Windows 10 (janhoy)
|
||||
|
||||
* SOLR-13966: LatLonPointSpatialField wasn't working correctly with atomic/partial updates or RTG.
|
||||
(Thomas Wöckinger via David Smiley)
|
||||
|
||||
* SOLR-13563: SPLITSHARD using LINK method fails on disk usage checks. (Andrew Kettmann, ab)
|
||||
|
||||
Other Changes
|
||||
---------------------
|
||||
|
||||
|
|
|
@ -153,7 +153,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
}
|
||||
|
||||
RTimerTree t = timings.sub("checkDiskSpace");
|
||||
checkDiskSpace(collectionName, slice.get(), parentShardLeader);
|
||||
checkDiskSpace(collectionName, slice.get(), parentShardLeader, splitMethod, ocmh.cloudManager);
|
||||
t.stop();
|
||||
|
||||
// let's record the ephemeralOwner of the parent leader node
|
||||
|
@ -617,10 +617,12 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError);
|
||||
}
|
||||
}
|
||||
private void checkDiskSpace(String collection, String shard, Replica parentShardLeader) throws SolrException {
|
||||
|
||||
// public and static to facilitate reuse in the simulation framework and in tests
|
||||
public static void checkDiskSpace(String collection, String shard, Replica parentShardLeader, SolrIndexSplitter.SplitMethod method, SolrCloudManager cloudManager) throws SolrException {
|
||||
// check that enough disk space is available on the parent leader node
|
||||
// otherwise the actual index splitting will always fail
|
||||
NodeStateProvider nodeStateProvider = ocmh.cloudManager.getNodeStateProvider();
|
||||
NodeStateProvider nodeStateProvider = cloudManager.getNodeStateProvider();
|
||||
Map<String, Object> nodeValues = nodeStateProvider.getNodeValues(parentShardLeader.getNodeName(),
|
||||
Collections.singletonList(ImplicitSnitch.DISK));
|
||||
Map<String, Map<String, List<ReplicaInfo>>> infos = nodeStateProvider.getReplicaInfo(parentShardLeader.getNodeName(),
|
||||
|
@ -648,9 +650,11 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
|
|||
if (freeSize == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "missing node disk space information for parent shard leader");
|
||||
}
|
||||
if (freeSize.doubleValue() < 2.0 * indexSize) {
|
||||
// 100% more for REWRITE, 5% more for LINK
|
||||
double neededSpace = method == SolrIndexSplitter.SplitMethod.REWRITE ? 2.0 * indexSize : 1.05 * indexSize;
|
||||
if (freeSize.doubleValue() < neededSpace) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "not enough free disk space to perform index split on node " +
|
||||
parentShardLeader.getNodeName() + ", required: " + (2 * indexSize) + ", available: " + freeSize);
|
||||
parentShardLeader.getNodeName() + ", required: " + neededSpace + ", available: " + freeSize);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -96,6 +96,7 @@ import org.apache.solr.common.util.NamedList;
|
|||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.core.SolrInfoBean;
|
||||
import org.apache.solr.metrics.SolrMetricManager;
|
||||
import org.apache.solr.update.SolrIndexSplitter;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -1275,6 +1276,12 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
AtomicReference<String> sliceName = new AtomicReference<>();
|
||||
sliceName.set(message.getStr(SHARD_ID_PROP));
|
||||
String splitKey = message.getStr("split.key");
|
||||
String methodStr = message.getStr(CommonAdminParams.SPLIT_METHOD, SolrIndexSplitter.SplitMethod.REWRITE.toLower());
|
||||
SolrIndexSplitter.SplitMethod splitMethod = SolrIndexSplitter.SplitMethod.get(methodStr);
|
||||
if (splitMethod == null) {
|
||||
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown value '" + CommonAdminParams.SPLIT_METHOD +
|
||||
": " + methodStr);
|
||||
}
|
||||
|
||||
ClusterState clusterState = getClusterState();
|
||||
DocCollection collection = clusterState.getCollection(collectionName);
|
||||
|
@ -1285,6 +1292,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
|
|||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Shard " + collectionName +
|
||||
" / " + sliceName.get() + " has no leader and can't be split");
|
||||
}
|
||||
SplitShardCmd.checkDiskSpace(collectionName, sliceName.get(), leader, splitMethod, cloudManager);
|
||||
SplitShardCmd.lockForSplit(cloudManager, collectionName, sliceName.get());
|
||||
// start counting buffered updates
|
||||
Map<String, Object> props = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
|
||||
|
|
|
@ -788,7 +788,19 @@ public class SimScenario implements AutoCloseable {
|
|||
Map<String, Object> values = new HashMap<>();
|
||||
params.remove(Clause.NODESET);
|
||||
for (String key : params.getParameterNames()) {
|
||||
values.put(key, params.get(key));
|
||||
String strVal = params.get(key);
|
||||
Object val;
|
||||
// try auto-converting to a number
|
||||
try {
|
||||
val = Long.parseLong(strVal);
|
||||
} catch (NumberFormatException nfe) {
|
||||
try {
|
||||
val = Double.parseDouble(strVal);
|
||||
} catch (NumberFormatException nfe1) {
|
||||
val = strVal;
|
||||
}
|
||||
}
|
||||
values.put(key, val);
|
||||
}
|
||||
for (String node : nodes) {
|
||||
scenario.cluster.getSimNodeStateProvider().simSetNodeValues(node, values);
|
||||
|
@ -812,11 +824,16 @@ public class SimScenario implements AutoCloseable {
|
|||
for (String key : params.getParameterNames()) {
|
||||
// try guessing if it's a number
|
||||
try {
|
||||
Double d = Double.valueOf(params.get(key));
|
||||
values.put(key, d);
|
||||
Integer i = Integer.valueOf(params.get(key));
|
||||
values.put(key, i);
|
||||
} catch (NumberFormatException nfe) {
|
||||
// not a number
|
||||
values.put(key, params.get(key));
|
||||
try {
|
||||
Double d = Double.valueOf(params.get(key));
|
||||
values.put(key, d);
|
||||
} catch (NumberFormatException nfe1) {
|
||||
// not a number
|
||||
values.put(key, params.get(key));
|
||||
}
|
||||
}
|
||||
}
|
||||
values.forEach((k, v) -> {
|
||||
|
|
|
@ -42,7 +42,10 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
|
|||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
||||
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
|
||||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
|
@ -162,10 +165,29 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
|
|||
return;
|
||||
}
|
||||
|
||||
|
||||
final SolrCore core = req.getCore(); // explicit check for null core (temporary?, for tests)
|
||||
ZkController zkController = core == null ? null : core.getCoreContainer().getZkController();
|
||||
RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator;
|
||||
if (zkController != null) {
|
||||
requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator(
|
||||
zkController.getZkStateReader().getClusterProperties()
|
||||
.getOrDefault(ZkStateReader.DEFAULT_SHARD_PREFERENCES, "")
|
||||
.toString(),
|
||||
zkController.getNodeName(),
|
||||
zkController.getBaseUrl(),
|
||||
zkController.getSysPropsCacher()
|
||||
);
|
||||
} else {
|
||||
requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator();
|
||||
}
|
||||
|
||||
int worker = params.getInt("workerID", 0);
|
||||
int numWorkers = params.getInt("numWorkers", 1);
|
||||
boolean local = params.getBool("streamLocalOnly", false);
|
||||
StreamContext context = new StreamContext();
|
||||
context.setRequestParams(params);
|
||||
context.setRequestReplicaListTransformerGenerator(requestReplicaListTransformerGenerator);
|
||||
context.put("shards", getCollectionShards(params));
|
||||
context.workerID = worker;
|
||||
context.numWorkers = numWorkers;
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.lucene.document.LatLonDocValuesField;
|
|||
import org.apache.lucene.document.LatLonPoint;
|
||||
import org.apache.lucene.geo.GeoEncodingUtils;
|
||||
import org.apache.lucene.index.DocValues;
|
||||
import org.apache.lucene.index.IndexableField;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.queries.function.ValueSource;
|
||||
import org.apache.lucene.search.DoubleValues;
|
||||
|
@ -76,7 +77,15 @@ public class LatLonPointSpatialField extends AbstractSpatialFieldType implements
|
|||
SchemaField schemaField = schema.getField(fieldName); // TODO change AbstractSpatialFieldType so we get schemaField?
|
||||
return new LatLonPointSpatialStrategy(ctx, fieldName, schemaField.indexed(), schemaField.hasDocValues());
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toExternal(IndexableField f) {
|
||||
if (f.numericValue() != null) {
|
||||
return decodeDocValueToString(f.numericValue().longValue());
|
||||
}
|
||||
return super.toExternal(f);
|
||||
}
|
||||
|
||||
/**
|
||||
* Decodes the docValues number into latitude and longitude components, formatting as "lat,lon".
|
||||
* The encoding is governed by {@code LatLonDocValuesField}. The decimal output representation is reflective
|
||||
|
|
|
@ -54,6 +54,10 @@
|
|||
<field name="signatureField" type="string" indexed="true" stored="false"/>
|
||||
<dynamicField name="*_sS" type="string" indexed="true" stored="true" multiValued="true"/>
|
||||
|
||||
<!-- Fields required for LatLonDocValuesField test -->
|
||||
<fieldType name="latlon" class="solr.LatLonPointSpatialField" docValues="true"/>
|
||||
<field name="latlon" type="latlon" indexed="true" stored="false"/>
|
||||
|
||||
<uniqueKey>id</uniqueKey>
|
||||
|
||||
</schema>
|
||||
|
|
|
@ -138,4 +138,29 @@ public class TestSimScenario extends SimSolrCloudTestCase {
|
|||
scenario.run();
|
||||
}
|
||||
}
|
||||
|
||||
String splitShardScenario =
|
||||
"create_cluster numNodes=2\n" +
|
||||
"solr_request /admin/collections?action=CREATE&name=testCollection&numShards=2&replicationFactor=2&maxShardsPerNode=5\n" +
|
||||
"wait_collection collection=testCollection&shards=2&replicas=2\n" +
|
||||
"set_shard_metrics collection=testCollection&shard=shard1&INDEX.sizeInBytes=1000000000\n" +
|
||||
"set_node_metrics nodeset=#ANY&freedisk=1.5\n" +
|
||||
"solr_request /admin/collection?action=SPLITSHARD&collection=testCollection&shard=shard1&splitMethod=${method}\n" +
|
||||
"wait_collection collection=testCollection&shards=4&&withInactive=true&replicas=2&requireLeaders=true\n"
|
||||
;
|
||||
@Test
|
||||
public void testSplitShard() throws Exception {
|
||||
try (SimScenario scenario = SimScenario.load(splitShardScenario)) {
|
||||
scenario.context.put("method", "REWRITE");
|
||||
scenario.run();
|
||||
} catch (Exception e) {
|
||||
assertTrue(e.toString(), e.toString().contains("not enough free disk"));
|
||||
}
|
||||
try (SimScenario scenario = SimScenario.load(splitShardScenario)) {
|
||||
scenario.context.put("method", "LINK");
|
||||
scenario.run();
|
||||
} catch (Exception e) {
|
||||
fail("should have succeeded with method LINK, but failed: " + e.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -192,7 +192,7 @@ public class SolrCoreTest extends SolrTestCaseJ4 {
|
|||
final CoreContainer cores = h.getCoreContainer();
|
||||
for (int i = 0; i < MT; ++i) {
|
||||
Callable<Integer> call = new Callable<Integer>() {
|
||||
void yield(int n) {
|
||||
void yieldInt(int n) {
|
||||
try {
|
||||
Thread.sleep(0, (n % 13 + 1) * 10);
|
||||
} catch (InterruptedException xint) {
|
||||
|
@ -208,16 +208,16 @@ public class SolrCoreTest extends SolrTestCaseJ4 {
|
|||
r += 1;
|
||||
core = cores.getCore(SolrTestCaseJ4.DEFAULT_TEST_CORENAME);
|
||||
// sprinkle concurrency hinting...
|
||||
yield(l);
|
||||
yieldInt(l);
|
||||
assertTrue("Refcount < 1", core.getOpenCount() >= 1);
|
||||
yield(l);
|
||||
yieldInt(l);
|
||||
assertTrue("Refcount > 17", core.getOpenCount() <= 17);
|
||||
yield(l);
|
||||
yieldInt(l);
|
||||
assertTrue("Handler is closed", handler1.closed == false);
|
||||
yield(l);
|
||||
yieldInt(l);
|
||||
core.close();
|
||||
core = null;
|
||||
yield(l);
|
||||
yieldInt(l);
|
||||
}
|
||||
return r;
|
||||
} finally {
|
||||
|
|
|
@ -653,7 +653,9 @@ public class NestedAtomicUpdateTest extends SolrTestCaseJ4 {
|
|||
}
|
||||
|
||||
private void testBlockAtomicSetToNullOrEmpty(boolean empty) throws Exception {
|
||||
SolrInputDocument doc = sdoc("id", "1",
|
||||
// latlon field is included to ensure reading from LatLonDocValuesField is working due to atomic update.
|
||||
// See SOLR-13966 for further details.
|
||||
SolrInputDocument doc = sdoc("id", "1", "latlon", "0,0",
|
||||
"cat_ss", new String[] {"aaa", "ccc"},
|
||||
"child1", sdocs(sdoc("id", "2", "cat_ss", "child"), sdoc("id", "3", "cat_ss", "child")));
|
||||
assertU(adoc(doc));
|
||||
|
@ -679,32 +681,35 @@ public class NestedAtomicUpdateTest extends SolrTestCaseJ4 {
|
|||
|
||||
assertJQ(req("q", "id:1"), "/response/numFound==1");
|
||||
|
||||
assertJQ(req("qt", "/get", "id", "1", "fl", "id, cat_ss, child1, [child]"), "=={\"doc\":{'id':\"1\"" +
|
||||
", cat_ss:[\"aaa\",\"ccc\"], child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"]}, {\"id\":\"3\",\"cat_ss\":[\"child\"]}]}}");
|
||||
assertJQ(req("qt", "/get", "id", "1", "fl", "id, latlon, cat_ss, child1, [child]"),
|
||||
"=={\"doc\":{'id':\"1\", \"latlon\":\"0,0\"" +
|
||||
", cat_ss:[\"aaa\",\"ccc\"], child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"]}, {\"id\":\"3\",\"cat_ss\":[\"child\"]}]}}");
|
||||
|
||||
assertU(commit());
|
||||
|
||||
assertJQ(req("qt", "/get", "id", "1", "fl", "id, cat_ss, child1, [child]"), "=={\"doc\":{'id':\"1\"" +
|
||||
", cat_ss:[\"aaa\",\"ccc\"], child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"]}, {\"id\":\"3\",\"cat_ss\":[\"child\"]}]}}");
|
||||
assertJQ(req("qt", "/get", "id", "1", "fl", "id, latlon, cat_ss, child1, [child]"),
|
||||
"=={\"doc\":{'id':\"1\", \"latlon\":\"0,0\"" +
|
||||
", cat_ss:[\"aaa\",\"ccc\"], child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"]}, {\"id\":\"3\",\"cat_ss\":[\"child\"]}]}}");
|
||||
|
||||
doc = sdoc("id", "1", "child1", Collections.singletonMap("set", empty ? new ArrayList<>() : null));
|
||||
doc = sdoc("id", "1", "child1", Collections.singletonMap("set", null));
|
||||
addAndGetVersion(doc, params("wt", "json"));
|
||||
|
||||
assertJQ(req("qt", "/get", "id", "1", "fl", "id, cat_ss, child1, [child]"),
|
||||
"=={\"doc\":{'id':\"1\", cat_ss:[\"aaa\",\"ccc\"]}}");
|
||||
assertJQ(req("qt", "/get", "id", "1", "fl", "id, latlon, cat_ss, child1, [child]"),
|
||||
"=={\"doc\":{'id':\"1\", \"latlon\":\"0,0\", cat_ss:[\"aaa\",\"ccc\"]}}");
|
||||
|
||||
assertU(commit());
|
||||
|
||||
// a cut-n-paste of the first big query, but this time it will be retrieved from the index rather than the
|
||||
// transaction log
|
||||
// this requires ChildDocTransformer to get the whole block, since the document is retrieved using an index lookup
|
||||
assertJQ(req("qt", "/get", "id", "1", "fl", "id, cat_ss, child1, [child]"),
|
||||
"=={'doc':{'id':'1', cat_ss:[\"aaa\",\"ccc\"]}}");
|
||||
assertJQ(req("qt", "/get", "id", "1", "fl", "id, latlon, cat_ss, child1, [child]"),
|
||||
"=={\"doc\":{'id':\"1\", \"latlon\":\"0,0\", cat_ss:[\"aaa\",\"ccc\"]}}");
|
||||
|
||||
// ensure the whole block has been committed correctly to the index.
|
||||
assertJQ(req("q", "id:1", "fl", "*, [child]"),
|
||||
"/response/numFound==1",
|
||||
"/response/docs/[0]/id=='1'",
|
||||
"/response/docs/[0]/latlon=='0,0'",
|
||||
"/response/docs/[0]/cat_ss/[0]==\"aaa\"",
|
||||
"/response/docs/[0]/cat_ss/[1]==\"ccc\"");
|
||||
}
|
||||
|
|
|
@ -95,19 +95,7 @@
|
|||
<New class="org.eclipse.jetty.rewrite.handler.HeaderPatternRule">
|
||||
<Set name="pattern">*</Set>
|
||||
<Set name="name">Content-Security-Policy</Set>
|
||||
<Set name="value">
|
||||
default-src 'none';
|
||||
base-uri 'none';
|
||||
connect-src 'self';
|
||||
form-action 'self';
|
||||
font-src 'self';
|
||||
frame-ancestors 'none';
|
||||
img-src 'self';
|
||||
media-src 'self';
|
||||
style-src 'self' 'unsafe-inline';
|
||||
script-src 'self';
|
||||
worker-src 'self';
|
||||
</Set>
|
||||
<Set name="value">default-src 'none'; base-uri 'none'; connect-src 'self'; form-action 'self'; font-src 'self'; frame-ancestors 'none'; img-src 'self'; media-src 'self'; style-src 'self' 'unsafe-inline'; script-src 'self'; worker-src 'self';</Set>
|
||||
</New>
|
||||
</Arg>
|
||||
</Call>
|
||||
|
|
|
@ -160,7 +160,7 @@ Solr allows you to pass an optional string parameter named `shards.preference` t
|
|||
|
||||
The syntax is: `shards.preference=_property_:__value__`. The order of the properties and the values are significant: the first one is the primary sort, the second is secondary, etc.
|
||||
|
||||
IMPORTANT: `shards.preference` only works for distributed queries, i.e., queries targeting multiple shards. Single shard scenarios are not supported.
|
||||
IMPORTANT: `shards.preference` is supported for single shard scenarios when using the SolrJ clients.
|
||||
|
||||
The properties that can be specified are as follows:
|
||||
|
||||
|
|
|
@ -204,24 +204,38 @@ A list of queries that *must* appear in matching documents. However, unlike `mus
|
|||
|
||||
== Boost Query Parser
|
||||
|
||||
`BoostQParser` extends the `QParserPlugin` and creates a boosted query from the input value. The main value is the query to be boosted. Parameter `b` is the function query to use as the boost. The query to be boosted may be of any type.
|
||||
`BoostQParser` extends the `QParserPlugin` and creates a boosted query from the input value. The main value is any query to be "wrapped" and "boosted" -- only documents which match that query will match the final query produced by this parter. Parameter `b` is a <<function-queries.adoc#available-functions,function>> to be evaluted against each document that matches the original query, and the result of the function will be multiplied into into the final score for that document.
|
||||
|
||||
=== Boost Query Parser Examples
|
||||
|
||||
Creates a query "foo" which is boosted (scores are multiplied) by the function query `log(popularity)`:
|
||||
Creates a query `name:foo` which is boosted (scores are multiplied) by the function query `log(popularity)`:
|
||||
|
||||
[source,text]
|
||||
----
|
||||
{!boost b=log(popularity)}foo
|
||||
q={!boost b=log(popularity)}name:foo
|
||||
----
|
||||
|
||||
Creates a query "foo" which is boosted by the date boosting function referenced in `ReciprocalFloatFunction`:
|
||||
Creates a query `name:foo` which has it's scores multiplied by the _inverse_ of the numeric `price` field -- effectively "demoting" documents which have a high `price` by lowering their final score:
|
||||
|
||||
[source,text]
|
||||
----
|
||||
{!boost b=recip(ms(NOW,mydatefield),3.16e-11,1,1)}foo
|
||||
// NOTE: we "add 1" to the denominator to prevent divide by zero
|
||||
q={!boost b=div(1,add(1,price))}name:foo
|
||||
----
|
||||
|
||||
The `<<function-queries.adoc#query-function,query(...)>>` function is particularly useful for situations where you want to multiply (or divide) the score for each document matching your main query by the score that document would have from another query.
|
||||
|
||||
This example uses <<local-parameters-in-queries.adoc#parameter-dereferencing,local parameter variables>> to create a query for `name:foo` which is boosted by the scores from the independently specified query `category:electronics`:
|
||||
|
||||
[source,text]
|
||||
----
|
||||
q={!boost b=query($my_boost)}name:foo
|
||||
my_boost=category:electronics
|
||||
----
|
||||
|
||||
|
||||
|
||||
|
||||
[[other-collapsing]]
|
||||
== Collapsing Query Parser
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ Included are parameters for defining if it should handle `/select` urls (for Sol
|
|||
`handleSelect` is for legacy back-compatibility; those new to Solr do not need to change anything about the way this is configured by default.
|
||||
====
|
||||
|
||||
The first configurable item is the `handleSelect` attribute on the `<requestDispatcher>` element itself. This attribute can be set to one of two values, either "true" or "false". It governs how Solr responds to requests such as `/select?qt=XXX`. The default value "false" will ignore requests to `/select` if a requestHandler is not explicitly registered with the name `/select`. A value of "true" will route query requests to the parser defined with the `qt` value.
|
||||
The first configurable item is the `handleSelect` attribute on the `<requestDispatcher>` element itself. This attribute can be set to one of two values, either "true" or "false". It governs how Solr responds to requests such as `/select?qt=XXX`. The default value "false" will ignore requests to `/select` if a requestHandler is not explicitly registered with the name `/select`. A value of "true" will route query requests to the parser defined with the `qt` value if a requestHandler is not explicitly registered with the name `/select`.
|
||||
|
||||
In recent versions of Solr, a `/select` requestHandler is defined by default, so a value of "false" will work fine. See the section <<requesthandlers-and-searchcomponents-in-solrconfig.adoc#requesthandlers-and-searchcomponents-in-solrconfig,RequestHandlers and SearchComponents in SolrConfig>> for more information.
|
||||
|
||||
|
|
|
@ -123,6 +123,15 @@ for the entire expression, it may be faster for the client to send the expressio
|
|||
`&streamLocalOnly=true` and handle merging of the results (if required) locally. This is an advanced option, relying
|
||||
on a convenient organization of the index, and should only be considered if normal usage poses a performance issue.
|
||||
|
||||
=== Request Routing
|
||||
|
||||
Streaming Expressions respect the <<distributed-requests.adoc#shards-preference-parameter,shards.preference parameter>> for any call to Solr.
|
||||
|
||||
The value of `shards.preference` that is used to route requests is determined in the following order. The first option available is used.
|
||||
- Provided as a parameter in the streaming expression (e.g. `search(...., shards.preference="replica.type:PULL")`)
|
||||
- Provided in the URL Params of the streaming expression (e.g. `http://solr_url:8983/solr/stream?expr=....&shards.preference=replica.type:PULL`)
|
||||
- Set as a default in the Cluster properties.
|
||||
|
||||
=== Adding Custom Expressions
|
||||
|
||||
Creating your own custom expressions can be easily done by implementing the {solr-javadocs}/solr-solrj/org/apache/solr/client/solrj/io/stream/expr/Expressible.html[Expressible] interface. To add a custom expression to the
|
||||
|
@ -132,7 +141,6 @@ list of known mappings for the `/stream` handler, you just need to declare it as
|
|||
<expressible name="custom" class="org.example.CustomStreamingExpression"/>
|
||||
|
||||
|
||||
|
||||
== Types of Streaming Expressions
|
||||
|
||||
=== About Stream Sources
|
||||
|
|
|
@ -115,35 +115,68 @@ A value of "0.0" - the default - makes the query a pure "disjunction max query":
|
|||
|
||||
=== bq (Boost Query) Parameter
|
||||
|
||||
The `bq` parameter specifies an additional, optional, query clause that will be added to the user's main query to influence the score. For example, if you wanted to add a relevancy boost for recent documents:
|
||||
The `bq` parameter specifies an additional, optional, query clause that will be _added_ to the user's main query as optional clauses that will influence the score. For example, if you wanted to add a boost for documents that are in a particular category you could use:
|
||||
|
||||
[source,text]
|
||||
----
|
||||
q=cheese
|
||||
bq=date:[NOW/DAY-1YEAR TO NOW/DAY]
|
||||
bq=category:food^10
|
||||
----
|
||||
|
||||
You can specify multiple `bq` parameters. If you want your query to be parsed as separate clauses with separate boosts, use multiple `bq` parameters.
|
||||
You can specify multiple `bq` parameters, which will each be added as separate clauses with separate boosts.
|
||||
|
||||
[source,text]
|
||||
----
|
||||
q=cheese
|
||||
bq=category:food^10
|
||||
bq=category:deli^5
|
||||
----
|
||||
|
||||
Using the `bq` parameter in this way is functionally equivilent to combining your `q` and `bq` params into a single larger boolean query, where the (original) `q` param is "mandatory" and the other clauses are optional:
|
||||
|
||||
[source,text]
|
||||
----
|
||||
q=(+cheese category:food^10 category:deli^5)
|
||||
----
|
||||
|
||||
The only difference between the above examples, is that using the `bq` param allows you to specify these extra clauses independently (ie: as configuration defaults) from the main query.
|
||||
|
||||
|
||||
[TIP]
|
||||
[[bq-bf-shortcomings]]
|
||||
.Additive Boosts vs Multiplicative Boosts
|
||||
====
|
||||
Generally speaking, using `bq` (or `bf`, below) is considered a poor way to "boost" documents by a secondary query because it has an "Additive" effect on the final score. The overall impact a particular `bq` param will have on a given document can vary a lot depending on the _absolute_ values of the scores from the original query as well as the `bq` query, which in turn depends on the complexity of the original query, and various scoring factors (TF, IDF, average field length, etc.)
|
||||
|
||||
"Multiplicative Boosting" is generally considered to be a more predictable method of influcing document score, because it acts as a "scaling factor" -- increasing (or decreasing) the scores of each document by a _relative_ amount.
|
||||
|
||||
The <<other-parsers.adoc#boost-query-parser,`{!boost}` QParser>> provides a convinient wrapper for implementing multiplicitive boosting, and the <<the-extended-dismax-query-parser.adoc#extended-dismax-parameters,`{!edismax}` QParser>> offers a `boost` query param short cut for using it.
|
||||
====
|
||||
|
||||
|
||||
=== bf (Boost Functions) Parameter
|
||||
|
||||
The `bf` parameter specifies functions (with optional boosts) that will be used to construct FunctionQueries which will be added to the user's main query as optional clauses that will influence the score. Any function supported natively by Solr can be used, along with a boost value. For example:
|
||||
The `bf` parameter specifies functions (with optional <<the-standard-query-parser.adoc#boosting-a-term-with,query boost>>) that will be used to construct FunctionQueries which will be _added_ to the user's main query as optional clauses that will influence the score. Any <<function-queries.adoc#available-functions,function supported natively by Solr>> can be used, along with a boost value. For example:
|
||||
|
||||
[source,text]
|
||||
----
|
||||
recip(rord(myfield),1,2,3)^1.5
|
||||
q=cheese
|
||||
bf=div(1,sum(1,price))^1.5
|
||||
----
|
||||
|
||||
Specifying functions with the bf parameter is essentially just shorthand for using the `bq` parameter combined with the `{!func}` parser.
|
||||
Specifying functions with the bf parameter is essentially just shorthand for using the `bq` parameter (<<#bq-bf-shortcomings,with the same shortcomings>>) combined with the `{!func}` parser -- with the addition of the simplified "query boost" syntax.
|
||||
|
||||
For example, if you want to show the most recent documents first, you could use either of the following:
|
||||
For example, the two `bf` params listed below, are completely equivilent to the two `bq` params below:
|
||||
|
||||
[source,text]
|
||||
----
|
||||
bf=recip(rord(creationDate),1,1000,1000)
|
||||
...or...
|
||||
bq={!func}recip(rord(creationDate),1,1000,1000)
|
||||
bf=div(sales_rank,ms(NOW,release_date))
|
||||
bf=div(1,sum(1,price))^1.5
|
||||
----
|
||||
[source,text]
|
||||
----
|
||||
bq={!func}div(sales_rank,ms(NOW,release_date))
|
||||
bq={!lucene}( {!func v='div(1,sum(1,price))'} )^1.5
|
||||
----
|
||||
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ In addition to supporting all the DisMax query parser parameters, Extended Disma
|
|||
* includes improved smart partial escaping in the case of syntax errors; fielded queries, +/-, and phrase queries are still supported in this mode.
|
||||
* improves proximity boosting by using word shingles; you do not need the query to match all words in the document before proximity boosting is applied.
|
||||
* includes advanced stopword handling: stopwords are not required in the mandatory part of the query but are still used in the proximity boosting part. If a query consists of all stopwords, such as "to be or not to be", then all words are required.
|
||||
* includes improved boost function: in Extended DisMax, the `boost` function is a multiplier rather than an addend, improving your boost results; the additive boost functions of DisMax (`bf` and `bq`) are also supported.
|
||||
* includes improved boost function: in Extended DisMax, the `boost` function is a multiplier <<the-dismax-query-parser.adoc#bq-bf-shortcomings,rather than an addend>>, improving your boost results; the additive boost functions of DisMax (`bf` and `bq`) are also supported.
|
||||
* supports pure negative nested queries: queries such as `+foo (-foo)` will match all documents.
|
||||
* lets you specify which fields the end user is allowed to query, and to disallow direct fielded searches.
|
||||
|
||||
|
@ -52,7 +52,20 @@ If `true`, the number of clauses required (<<the-dismax-query-parser.adoc#mm-min
|
|||
Note that relaxing `mm` may cause undesired side effects, such as hurting the precision of the search, depending on the nature of your index content.
|
||||
|
||||
`boost`::
|
||||
A multivalued list of strings parsed as queries with scores multiplied by the score from the main query for all matching documents. This parameter is shorthand for wrapping the query produced by eDisMax using the `BoostQParserPlugin`.
|
||||
A multivalued list of strings parsed as <<function-queries.adoc#available-functions,functions>> whose results will be multiplied into the score from the main query for all matching documents. This parameter is shorthand for wrapping the query produced by eDisMax using the <<other-parsers.adoc#boost-query-parser,`BoostQParserPlugin`>>.
|
||||
|
||||
These two examples are equivilent:
|
||||
[source,text]
|
||||
----
|
||||
q={!edismax qf=name}ipod
|
||||
boost=div(1,sum(1,price))
|
||||
----
|
||||
[source,text]
|
||||
----
|
||||
q={!boost b=div(1,sum(1,price)) v=$qq}
|
||||
qq={!edismax qf=name}ipod
|
||||
----
|
||||
|
||||
|
||||
`lowercaseOperators`::
|
||||
A Boolean parameter indicating if lowercase "and" and "or" should be treated the same as operators "AND" and "OR".
|
||||
|
@ -150,11 +163,6 @@ qf=title text last_name first_name
|
|||
f.name.qf=last_name first_name
|
||||
----
|
||||
|
||||
== Using Negative Boost
|
||||
|
||||
Negative query boosts have been supported at the "Query" object level for a long time (resulting in negative scores for matching documents). Now the QueryParsers have been updated to handle this too.
|
||||
|
||||
|
||||
== Using 'Slop'
|
||||
|
||||
`Dismax` and `Edismax` can run queries against all query fields, and also run a query in the form of a phrase against the phrase fields. (This will work only for boosting documents, not actually for matching.) However, that phrase query can have a 'slop,' which is the distance between the terms of the query while still considering it a phrase match. For example:
|
||||
|
|
|
@ -120,6 +120,14 @@ include::{example-source-dir}UsingSolrJRefGuideExamplesTest.java[tag=solrj-solrc
|
|||
|
||||
When these values are not explicitly provided, SolrJ falls back to using the defaults for the OS/environment is running on.
|
||||
|
||||
=== Cloud Request Routing
|
||||
|
||||
The SolrJ `CloudSolrClient` implementations (`CloudSolrClient` and `CloudHttp2SolrClient`) respect the <<distributed-requests.adoc#shards-preference-parameter,shards.preference parameter>>.
|
||||
Therefore requests sent to single-sharded collections, using either of the above clients, will route requests the same way that distributed requests are routed to individual shards.
|
||||
If no `shards.preference` parameter is provided, the clients will default to sorting replicas randomly.
|
||||
|
||||
For update requests, while the replicas are sorted in the order defined by the request, leader replicas will always be sorted first.
|
||||
|
||||
== Querying in SolrJ
|
||||
`SolrClient` has a number of `query()` methods for fetching results from Solr. Each of these methods takes in a `SolrParams`,an object encapsulating arbitrary query-parameters. And each method outputs a `QueryResponse`, a wrapper which can be used to access the result documents and other related metadata.
|
||||
|
||||
|
|
|
@ -44,6 +44,7 @@ import java.util.concurrent.locks.Lock;
|
|||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.solr.client.solrj.ResponseParser;
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
|
@ -55,6 +56,8 @@ import org.apache.solr.client.solrj.request.IsUpdateRequest;
|
|||
import org.apache.solr.client.solrj.request.RequestWriter;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.client.solrj.request.V2Request;
|
||||
import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
|
||||
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
|
||||
import org.apache.solr.client.solrj.util.ClientUtils;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
|
@ -69,7 +72,6 @@ import org.apache.solr.common.cloud.ImplicitDocRouter;
|
|||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.ShardParams;
|
||||
|
@ -100,6 +102,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
|
|||
|
||||
private final boolean updatesToLeaders;
|
||||
private final boolean directUpdatesToLeadersOnly;
|
||||
private final RequestReplicaListTransformerGenerator requestRLTGenerator;
|
||||
boolean parallelUpdates; //TODO final
|
||||
private ExecutorService threadPool = ExecutorUtil
|
||||
.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory(
|
||||
|
@ -221,6 +224,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
|
|||
this.updatesToLeaders = updatesToLeaders;
|
||||
this.parallelUpdates = parallelUpdates;
|
||||
this.directUpdatesToLeadersOnly = directUpdatesToLeadersOnly;
|
||||
this.requestRLTGenerator = new RequestReplicaListTransformerGenerator();
|
||||
}
|
||||
|
||||
/** Sets the cache ttl for DocCollection Objects cached . This is only applicable for collections which are persisted outside of clusterstate.json
|
||||
|
@ -467,6 +471,8 @@ public abstract class BaseCloudSolrClient extends SolrClient {
|
|||
for(String param : NON_ROUTABLE_PARAMS) {
|
||||
routableParams.remove(param);
|
||||
}
|
||||
} else {
|
||||
params = new ModifiableSolrParams();
|
||||
}
|
||||
|
||||
if (collection == null) {
|
||||
|
@ -492,10 +498,12 @@ public abstract class BaseCloudSolrClient extends SolrClient {
|
|||
return null;
|
||||
}
|
||||
|
||||
ReplicaListTransformer replicaListTransformer = requestRLTGenerator.getReplicaListTransformer(params);
|
||||
|
||||
//Create the URL map, which is keyed on slice name.
|
||||
//The value is a list of URLs for each replica in the slice.
|
||||
//The first value in the list is the leader for the slice.
|
||||
final Map<String,List<String>> urlMap = buildUrlMap(col);
|
||||
final Map<String,List<String>> urlMap = buildUrlMap(col, replicaListTransformer);
|
||||
final Map<String, ? extends LBSolrClient.Req> routes = createRoutes(updateRequest, routableParams, col, router, urlMap, idField);
|
||||
if (routes == null) {
|
||||
if (directUpdatesToLeadersOnly && hasInfoToFindLeaders(updateRequest, idField)) {
|
||||
|
@ -616,12 +624,12 @@ public abstract class BaseCloudSolrClient extends SolrClient {
|
|||
return urlMap == null ? null : updateRequest.getRoutesToCollection(router, col, urlMap, routableParams, idField);
|
||||
}
|
||||
|
||||
private Map<String,List<String>> buildUrlMap(DocCollection col) {
|
||||
private Map<String,List<String>> buildUrlMap(DocCollection col, ReplicaListTransformer replicaListTransformer) {
|
||||
Map<String, List<String>> urlMap = new HashMap<>();
|
||||
Slice[] slices = col.getActiveSlicesArr();
|
||||
for (Slice slice : slices) {
|
||||
String name = slice.getName();
|
||||
List<String> urls = new ArrayList<>();
|
||||
List<Replica> sortedReplicas = new ArrayList<>();
|
||||
Replica leader = slice.getLeader();
|
||||
if (directUpdatesToLeadersOnly && leader == null) {
|
||||
for (Replica replica : slice.getReplicas(
|
||||
|
@ -638,20 +646,22 @@ public abstract class BaseCloudSolrClient extends SolrClient {
|
|||
// take unoptimized general path - we cannot find a leader yet
|
||||
return null;
|
||||
}
|
||||
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
|
||||
String url = zkProps.getCoreUrl();
|
||||
urls.add(url);
|
||||
|
||||
if (!directUpdatesToLeadersOnly) {
|
||||
for (Replica replica : slice.getReplicas()) {
|
||||
if (!replica.getNodeName().equals(leader.getNodeName()) &&
|
||||
!replica.getName().equals(leader.getName())) {
|
||||
ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
|
||||
String url1 = zkProps1.getCoreUrl();
|
||||
urls.add(url1);
|
||||
if (!replica.equals(leader)) {
|
||||
sortedReplicas.add(replica);
|
||||
}
|
||||
}
|
||||
}
|
||||
urlMap.put(name, urls);
|
||||
|
||||
// Sort the non-leader replicas according to the request parameters
|
||||
replicaListTransformer.transform(sortedReplicas);
|
||||
|
||||
// put the leaderUrl first.
|
||||
sortedReplicas.add(0, leader);
|
||||
|
||||
urlMap.put(name, sortedReplicas.stream().map(Replica::getCoreUrl).collect(Collectors.toList()));
|
||||
}
|
||||
return urlMap;
|
||||
}
|
||||
|
@ -1046,6 +1056,8 @@ public abstract class BaseCloudSolrClient extends SolrClient {
|
|||
reqParams = new ModifiableSolrParams();
|
||||
}
|
||||
|
||||
ReplicaListTransformer replicaListTransformer = requestRLTGenerator.getReplicaListTransformer(reqParams);
|
||||
|
||||
final Set<String> liveNodes = getClusterStateProvider().getLiveNodes();
|
||||
|
||||
final List<String> theUrlList = new ArrayList<>(); // we populate this as follows...
|
||||
|
@ -1087,34 +1099,38 @@ public abstract class BaseCloudSolrClient extends SolrClient {
|
|||
}
|
||||
|
||||
// Gather URLs, grouped by leader or replica
|
||||
// TODO: allow filtering by group, role, etc
|
||||
Set<String> seenNodes = new HashSet<>();
|
||||
List<String> replicas = new ArrayList<>();
|
||||
String joinedInputCollections = StrUtils.join(inputCollections, ',');
|
||||
List<Replica> sortedReplicas = new ArrayList<>();
|
||||
List<Replica> replicas = new ArrayList<>();
|
||||
for (Slice slice : slices.values()) {
|
||||
for (ZkNodeProps nodeProps : slice.getReplicasMap().values()) {
|
||||
ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
|
||||
String node = coreNodeProps.getNodeName();
|
||||
Replica leader = slice.getLeader();
|
||||
for (Replica replica : slice.getReplicas()) {
|
||||
String node = replica.getNodeName();
|
||||
if (!liveNodes.contains(node) // Must be a live node to continue
|
||||
|| Replica.State.getState(coreNodeProps.getState()) != Replica.State.ACTIVE) // Must be an ACTIVE replica to continue
|
||||
|| replica.getState() != Replica.State.ACTIVE) // Must be an ACTIVE replica to continue
|
||||
continue;
|
||||
if (seenNodes.add(node)) { // if we haven't yet collected a URL to this node...
|
||||
String url = ZkCoreNodeProps.getCoreUrl(nodeProps.getStr(ZkStateReader.BASE_URL_PROP), joinedInputCollections);
|
||||
if (sendToLeaders && coreNodeProps.isLeader()) {
|
||||
theUrlList.add(url); // put leaders here eagerly (if sendToLeader mode)
|
||||
} else {
|
||||
replicas.add(url); // replicas here
|
||||
}
|
||||
if (sendToLeaders && replica.equals(leader)) {
|
||||
sortedReplicas.add(replica); // put leaders here eagerly (if sendToLeader mode)
|
||||
} else {
|
||||
replicas.add(replica); // replicas here
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Shuffle the leaders, if any (none if !sendToLeaders)
|
||||
Collections.shuffle(theUrlList, rand);
|
||||
// Sort the leader replicas, if any, according to the request preferences (none if !sendToLeaders)
|
||||
replicaListTransformer.transform(sortedReplicas);
|
||||
|
||||
// Shuffle the replicas, if any, and append to our list
|
||||
Collections.shuffle(replicas, rand);
|
||||
theUrlList.addAll(replicas);
|
||||
// Sort the replicas, if any, according to the request preferences and append to our list
|
||||
replicaListTransformer.transform(replicas);
|
||||
|
||||
sortedReplicas.addAll(replicas);
|
||||
|
||||
String joinedInputCollections = StrUtils.join(inputCollections, ',');
|
||||
Set<String> seenNodes = new HashSet<>();
|
||||
sortedReplicas.forEach( replica -> {
|
||||
if (seenNodes.add(replica.getNodeName())) {
|
||||
theUrlList.add(ZkCoreNodeProps.getCoreUrl(replica.getBaseUrl(), joinedInputCollections));
|
||||
}
|
||||
});
|
||||
|
||||
if (theUrlList.isEmpty()) {
|
||||
collectionStateCache.keySet().removeAll(collectionNames);
|
||||
|
|
|
@ -371,12 +371,13 @@ public class CloudSolrStream extends TupleStream implements Expressible {
|
|||
protected void constructStreams() throws IOException {
|
||||
try {
|
||||
|
||||
List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);
|
||||
|
||||
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
|
||||
mParams = adjustParams(mParams);
|
||||
mParams.set(DISTRIB, "false"); // We are the aggregator.
|
||||
|
||||
List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext, mParams);
|
||||
|
||||
for(String shardUrl : shardUrls) {
|
||||
SolrStream solrStream = new SolrStream(shardUrl, mParams);
|
||||
if(streamContext != null) {
|
||||
|
|
|
@ -309,12 +309,12 @@ public class DeepRandomStream extends TupleStream implements Expressible {
|
|||
|
||||
protected void constructStreams() throws IOException {
|
||||
try {
|
||||
|
||||
List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);
|
||||
|
||||
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
|
||||
mParams = adjustParams(mParams);
|
||||
mParams.set(DISTRIB, "false"); // We are the aggregator.
|
||||
|
||||
List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext, mParams);
|
||||
|
||||
String rows = mParams.get(ROWS);
|
||||
int r = Integer.parseInt(rows);
|
||||
int newRows = r/shardUrls.size();
|
||||
|
|
|
@ -24,6 +24,8 @@ import java.util.concurrent.ConcurrentMap;
|
|||
import org.apache.solr.client.solrj.io.ModelCache;
|
||||
import org.apache.solr.client.solrj.io.SolrClientCache;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
||||
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
|
||||
/**
|
||||
* The StreamContext is passed to TupleStreams using the TupleStream.setStreamContext() method.
|
||||
|
@ -46,6 +48,8 @@ public class StreamContext implements Serializable {
|
|||
private ModelCache modelCache;
|
||||
private StreamFactory streamFactory;
|
||||
private boolean local;
|
||||
private SolrParams requestParams;
|
||||
private RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator;
|
||||
|
||||
public ConcurrentMap getObjectCache() {
|
||||
return this.objectCache;
|
||||
|
@ -110,4 +114,20 @@ public class StreamContext implements Serializable {
|
|||
public boolean isLocal() {
|
||||
return local;
|
||||
}
|
||||
|
||||
public void setRequestParams(SolrParams requestParams) {
|
||||
this.requestParams = requestParams;
|
||||
}
|
||||
|
||||
public SolrParams getRequestParams() {
|
||||
return requestParams;
|
||||
}
|
||||
|
||||
public void setRequestReplicaListTransformerGenerator(RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator) {
|
||||
this.requestReplicaListTransformerGenerator = requestReplicaListTransformerGenerator;
|
||||
}
|
||||
|
||||
public RequestReplicaListTransformerGenerator getRequestReplicaListTransformerGenerator() {
|
||||
return requestReplicaListTransformerGenerator;
|
||||
}
|
||||
}
|
|
@ -21,26 +21,28 @@ import java.io.IOException;
|
|||
import java.io.PrintWriter;
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.io.SolrClientCache;
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.comp.StreamComparator;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
||||
import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
|
||||
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
|
||||
import org.apache.solr.common.IteratorWriter;
|
||||
import org.apache.solr.common.MapWriter;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -118,6 +120,14 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
|
|||
String collection,
|
||||
StreamContext streamContext)
|
||||
throws IOException {
|
||||
return getShards(zkHost, collection, streamContext, new ModifiableSolrParams());
|
||||
}
|
||||
|
||||
public static List<String> getShards(String zkHost,
|
||||
String collection,
|
||||
StreamContext streamContext,
|
||||
SolrParams requestParams)
|
||||
throws IOException {
|
||||
Map<String, List<String>> shardsMap = null;
|
||||
List<String> shards = new ArrayList();
|
||||
|
||||
|
@ -130,24 +140,34 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
|
|||
shards = shardsMap.get(collection);
|
||||
} else {
|
||||
//SolrCloud Sharding
|
||||
CloudSolrClient cloudSolrClient = streamContext.getSolrClientCache().getCloudSolrClient(zkHost);
|
||||
CloudSolrClient cloudSolrClient =
|
||||
Optional.ofNullable(streamContext.getSolrClientCache()).orElseGet(SolrClientCache::new).getCloudSolrClient(zkHost);
|
||||
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
Slice[] slices = CloudSolrStream.getSlices(collection, zkStateReader, true);
|
||||
Set<String> liveNodes = clusterState.getLiveNodes();
|
||||
|
||||
|
||||
ModifiableSolrParams solrParams = new ModifiableSolrParams(streamContext.getRequestParams());
|
||||
solrParams.add(requestParams);
|
||||
|
||||
RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator =
|
||||
Optional.ofNullable(streamContext.getRequestReplicaListTransformerGenerator()).orElseGet(RequestReplicaListTransformerGenerator::new);
|
||||
|
||||
ReplicaListTransformer replicaListTransformer = requestReplicaListTransformerGenerator.getReplicaListTransformer(solrParams);
|
||||
|
||||
for(Slice slice : slices) {
|
||||
Collection<Replica> replicas = slice.getReplicas();
|
||||
List<Replica> shuffler = new ArrayList<>();
|
||||
for(Replica replica : replicas) {
|
||||
if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
|
||||
shuffler.add(replica);
|
||||
List<Replica> sortedReplicas = new ArrayList<>();
|
||||
for(Replica replica : slice.getReplicas()) {
|
||||
if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
|
||||
sortedReplicas.add(replica);
|
||||
}
|
||||
}
|
||||
|
||||
Collections.shuffle(shuffler, new Random());
|
||||
Replica rep = shuffler.get(0);
|
||||
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
|
||||
String url = zkProps.getCoreUrl();
|
||||
shards.add(url);
|
||||
replicaListTransformer.transform(sortedReplicas);
|
||||
if (sortedReplicas.size() > 0) {
|
||||
shards.add(sortedReplicas.get(0).getCoreUrl());
|
||||
}
|
||||
}
|
||||
}
|
||||
Object core = streamContext.get("core");
|
||||
|
|
|
@ -166,7 +166,7 @@ public class NodePreferenceRulesComparator implements Comparator<Object> {
|
|||
return false;
|
||||
}
|
||||
final String s = ((Replica)o).getType().toString();
|
||||
return s.equals(preferred);
|
||||
return s.equalsIgnoreCase(preferred);
|
||||
}
|
||||
|
||||
public List<PreferenceRule> getSortRules() {
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.lang.invoke.MethodHandles;
|
|||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.solr.common.SolrException;
|
||||
|
@ -41,9 +42,13 @@ public class RequestReplicaListTransformerGenerator {
|
|||
(String configSpec, SolrParams requestParams, ReplicaListTransformerFactory fallback) -> shufflingReplicaListTransformer;
|
||||
private final ReplicaListTransformerFactory stableRltFactory;
|
||||
private final ReplicaListTransformerFactory defaultRltFactory;
|
||||
private final String defaultShardPreferences;
|
||||
private final String nodeName;
|
||||
private final String localHostAddress;
|
||||
private final NodesSysPropsCacher sysPropsCacher;
|
||||
|
||||
public RequestReplicaListTransformerGenerator() {
|
||||
this(RANDOM_RLTF);
|
||||
this(null);
|
||||
}
|
||||
|
||||
public RequestReplicaListTransformerGenerator(ReplicaListTransformerFactory defaultRltFactory) {
|
||||
|
@ -51,16 +56,24 @@ public class RequestReplicaListTransformerGenerator {
|
|||
}
|
||||
|
||||
public RequestReplicaListTransformerGenerator(ReplicaListTransformerFactory defaultRltFactory, ReplicaListTransformerFactory stableRltFactory) {
|
||||
this.defaultRltFactory = defaultRltFactory;
|
||||
if (stableRltFactory == null) {
|
||||
this.stableRltFactory = new AffinityReplicaListTransformerFactory();
|
||||
} else {
|
||||
this.stableRltFactory = stableRltFactory;
|
||||
}
|
||||
this(defaultRltFactory, stableRltFactory, null, null, null, null);
|
||||
}
|
||||
|
||||
public RequestReplicaListTransformerGenerator(String defaultShardPreferences, String nodeName, String localHostAddress, NodesSysPropsCacher sysPropsCacher) {
|
||||
this(null, null, defaultShardPreferences, nodeName, localHostAddress, sysPropsCacher);
|
||||
}
|
||||
|
||||
public RequestReplicaListTransformerGenerator(ReplicaListTransformerFactory defaultRltFactory, ReplicaListTransformerFactory stableRltFactory, String defaultShardPreferences, String nodeName, String localHostAddress, NodesSysPropsCacher sysPropsCacher) {
|
||||
this.defaultRltFactory = Optional.ofNullable(defaultRltFactory).orElse(RANDOM_RLTF);
|
||||
this.stableRltFactory = Optional.ofNullable(stableRltFactory).orElseGet(AffinityReplicaListTransformerFactory::new);
|
||||
this.defaultShardPreferences = Optional.ofNullable(defaultShardPreferences).orElse("");
|
||||
this.nodeName = nodeName;
|
||||
this.localHostAddress = localHostAddress;
|
||||
this.sysPropsCacher = sysPropsCacher;
|
||||
}
|
||||
|
||||
public ReplicaListTransformer getReplicaListTransformer(final SolrParams requestParams) {
|
||||
return getReplicaListTransformer(requestParams, "");
|
||||
return getReplicaListTransformer(requestParams, null);
|
||||
}
|
||||
|
||||
public ReplicaListTransformer getReplicaListTransformer(final SolrParams requestParams, String defaultShardPreferences) {
|
||||
|
@ -70,6 +83,7 @@ public class RequestReplicaListTransformerGenerator {
|
|||
public ReplicaListTransformer getReplicaListTransformer(final SolrParams requestParams, String defaultShardPreferences, String nodeName, String localHostAddress, NodesSysPropsCacher sysPropsCacher) {
|
||||
@SuppressWarnings("deprecation")
|
||||
final boolean preferLocalShards = requestParams.getBool(CommonParams.PREFER_LOCAL_SHARDS, false);
|
||||
defaultShardPreferences = Optional.ofNullable(defaultShardPreferences).orElse(this.defaultShardPreferences);
|
||||
final String shardsPreferenceSpec = requestParams.get(ShardParams.SHARDS_PREFERENCE, defaultShardPreferences);
|
||||
|
||||
if (preferLocalShards || !shardsPreferenceSpec.isEmpty()) {
|
||||
|
@ -84,7 +98,15 @@ public class RequestReplicaListTransformerGenerator {
|
|||
preferenceRules.add(new PreferenceRule(ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION, ShardParams.REPLICA_LOCAL));
|
||||
}
|
||||
|
||||
NodePreferenceRulesComparator replicaComp = new NodePreferenceRulesComparator(preferenceRules, requestParams, nodeName, localHostAddress, sysPropsCacher, defaultRltFactory, stableRltFactory);
|
||||
NodePreferenceRulesComparator replicaComp =
|
||||
new NodePreferenceRulesComparator(
|
||||
preferenceRules,
|
||||
requestParams,
|
||||
Optional.ofNullable(nodeName).orElse(this.nodeName),
|
||||
Optional.ofNullable(localHostAddress).orElse(this.localHostAddress),
|
||||
Optional.ofNullable(sysPropsCacher).orElse(this.sysPropsCacher),
|
||||
defaultRltFactory,
|
||||
stableRltFactory);
|
||||
ReplicaListTransformer baseReplicaListTransformer = replicaComp.getBaseReplicaListTransformer();
|
||||
if (replicaComp.getSortRules() == null) {
|
||||
// only applying base transformation
|
||||
|
|
|
@ -102,7 +102,7 @@ public class Replica extends ZkNodeProps {
|
|||
PULL;
|
||||
|
||||
public static Type get(String name){
|
||||
return name == null ? Type.NRT : Type.valueOf(name);
|
||||
return name == null ? Type.NRT : Type.valueOf(name.toUpperCase(Locale.ROOT));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.HashMap;
|
|||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
@ -488,6 +489,74 @@ public class CloudHttp2SolrClientTest extends SolrCloudTestCase {
|
|||
shardAddresses.size() > 1 && ports.size()==1);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Tests if the 'shards.preference' parameter works with single-sharded collections.
|
||||
*/
|
||||
@Test
|
||||
public void singleShardedPreferenceRules() throws Exception {
|
||||
String collectionName = "singleShardPreferenceTestColl";
|
||||
|
||||
int liveNodes = cluster.getJettySolrRunners().size();
|
||||
|
||||
// For testing replica.type, we want to have all replica types available for the collection
|
||||
CollectionAdminRequest.createCollection(collectionName, "conf", 1, liveNodes/3, liveNodes/3, liveNodes/3)
|
||||
.setMaxShardsPerNode(liveNodes)
|
||||
.processAndWait(cluster.getSolrClient(), TIMEOUT);
|
||||
cluster.waitForActiveCollection(collectionName, 1, liveNodes);
|
||||
|
||||
// Add some new documents
|
||||
new UpdateRequest()
|
||||
.add(id, "0", "a_t", "hello1")
|
||||
.add(id, "2", "a_t", "hello2")
|
||||
.add(id, "3", "a_t", "hello2")
|
||||
.commit(getRandomClient(), collectionName);
|
||||
|
||||
// Run the actual test for 'queryReplicaType'
|
||||
queryReplicaType(getRandomClient(), Replica.Type.PULL, collectionName);
|
||||
queryReplicaType(getRandomClient(), Replica.Type.TLOG, collectionName);
|
||||
queryReplicaType(getRandomClient(), Replica.Type.NRT, collectionName);
|
||||
}
|
||||
|
||||
private void queryReplicaType(CloudHttp2SolrClient cloudClient,
|
||||
Replica.Type typeToQuery,
|
||||
String collectionName)
|
||||
throws Exception
|
||||
{
|
||||
SolrQuery qRequest = new SolrQuery("*:*");
|
||||
|
||||
ModifiableSolrParams qParams = new ModifiableSolrParams();
|
||||
qParams.add(ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":" + typeToQuery.toString());
|
||||
qParams.add(ShardParams.SHARDS_INFO, "true");
|
||||
qRequest.add(qParams);
|
||||
|
||||
Map<String, String> replicaTypeToReplicas = mapReplicasToReplicaType(getCollectionState(collectionName));
|
||||
|
||||
QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
|
||||
|
||||
Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
|
||||
assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo);
|
||||
|
||||
// Iterate over shards-info and check what cores responded
|
||||
SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>)shardsInfo;
|
||||
Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
|
||||
List<String> shardAddresses = new ArrayList<String>();
|
||||
while (itr.hasNext()) {
|
||||
Map.Entry<String, ?> e = itr.next();
|
||||
assertTrue("Did not find map-type value in "+ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
|
||||
String shardAddress = (String)((Map)e.getValue()).get("shardAddress");
|
||||
if (shardAddress.endsWith("/")) {
|
||||
shardAddress = shardAddress.substring(0, shardAddress.length() - 1);
|
||||
}
|
||||
assertNotNull(ShardParams.SHARDS_INFO+" did not return 'shardAddress' parameter", shardAddress);
|
||||
shardAddresses.add(shardAddress);
|
||||
}
|
||||
assertEquals("Shard addresses must be of size 1, since there is only 1 shard in the collection", 1, shardAddresses.size());
|
||||
|
||||
assertEquals("Make sure that the replica queried was the replicaType desired", typeToQuery.toString().toUpperCase(Locale.ROOT), replicaTypeToReplicas.get(shardAddresses.get(0)).toUpperCase(Locale.ROOT));
|
||||
}
|
||||
|
||||
private Long getNumRequests(String baseUrl, String collectionName) throws
|
||||
SolrServerException, IOException {
|
||||
return getNumRequests(baseUrl, collectionName, "QUERY", "/select", null, false);
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.HashMap;
|
|||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
@ -427,8 +428,8 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
|
|||
|
||||
@SuppressWarnings("deprecation")
|
||||
private void queryWithShardsPreferenceRules(CloudSolrClient cloudClient,
|
||||
boolean useShardsPreference,
|
||||
String collectionName)
|
||||
boolean useShardsPreference,
|
||||
String collectionName)
|
||||
throws Exception
|
||||
{
|
||||
SolrQuery qRequest = new SolrQuery("*:*");
|
||||
|
@ -476,6 +477,72 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
|
|||
shardAddresses.size() > 1 && ports.size()==1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests if the 'shards.preference' parameter works with single-sharded collections.
|
||||
*/
|
||||
@Test
|
||||
public void singleShardedPreferenceRules() throws Exception {
|
||||
String collectionName = "singleShardPreferenceTestColl";
|
||||
|
||||
int liveNodes = cluster.getJettySolrRunners().size();
|
||||
|
||||
// For testing replica.type, we want to have all replica types available for the collection
|
||||
CollectionAdminRequest.createCollection(collectionName, "conf", 1, liveNodes/3, liveNodes/3, liveNodes/3)
|
||||
.setMaxShardsPerNode(liveNodes)
|
||||
.processAndWait(cluster.getSolrClient(), TIMEOUT);
|
||||
cluster.waitForActiveCollection(collectionName, 1, liveNodes);
|
||||
|
||||
// Add some new documents
|
||||
new UpdateRequest()
|
||||
.add(id, "0", "a_t", "hello1")
|
||||
.add(id, "2", "a_t", "hello2")
|
||||
.add(id, "3", "a_t", "hello2")
|
||||
.commit(getRandomClient(), collectionName);
|
||||
|
||||
// Run the actual test for 'queryReplicaType'
|
||||
queryReplicaType(getRandomClient(), Replica.Type.PULL, collectionName);
|
||||
queryReplicaType(getRandomClient(), Replica.Type.TLOG, collectionName);
|
||||
queryReplicaType(getRandomClient(), Replica.Type.NRT, collectionName);
|
||||
}
|
||||
|
||||
private void queryReplicaType(CloudSolrClient cloudClient,
|
||||
Replica.Type typeToQuery,
|
||||
String collectionName)
|
||||
throws Exception
|
||||
{
|
||||
SolrQuery qRequest = new SolrQuery("*:*");
|
||||
|
||||
ModifiableSolrParams qParams = new ModifiableSolrParams();
|
||||
qParams.add(ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":" + typeToQuery.toString());
|
||||
qParams.add(ShardParams.SHARDS_INFO, "true");
|
||||
qRequest.add(qParams);
|
||||
|
||||
Map<String, String> replicaTypeToReplicas = mapReplicasToReplicaType(getCollectionState(collectionName));
|
||||
|
||||
QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
|
||||
|
||||
Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
|
||||
assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo);
|
||||
|
||||
// Iterate over shards-info and check what cores responded
|
||||
SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>)shardsInfo;
|
||||
Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
|
||||
List<String> shardAddresses = new ArrayList<String>();
|
||||
while (itr.hasNext()) {
|
||||
Map.Entry<String, ?> e = itr.next();
|
||||
assertTrue("Did not find map-type value in "+ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
|
||||
String shardAddress = (String)((Map)e.getValue()).get("shardAddress");
|
||||
if (shardAddress.endsWith("/")) {
|
||||
shardAddress = shardAddress.substring(0, shardAddress.length() - 1);
|
||||
}
|
||||
assertNotNull(ShardParams.SHARDS_INFO+" did not return 'shardAddress' parameter", shardAddress);
|
||||
shardAddresses.add(shardAddress);
|
||||
}
|
||||
assertEquals("Shard addresses must be of size 1, since there is only 1 shard in the collection", 1, shardAddresses.size());
|
||||
|
||||
assertEquals("Make sure that the replica queried was the replicaType desired", typeToQuery.toString().toUpperCase(Locale.ROOT), replicaTypeToReplicas.get(shardAddresses.get(0)).toUpperCase(Locale.ROOT));
|
||||
}
|
||||
|
||||
private Long getNumRequests(String baseUrl, String collectionName) throws
|
||||
SolrServerException, IOException {
|
||||
return getNumRequests(baseUrl, collectionName, "QUERY", "/select", null, false);
|
||||
|
|
|
@ -48,11 +48,13 @@ import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
|
|||
import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.ShardParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Before;
|
||||
|
@ -71,6 +73,7 @@ import org.junit.Test;
|
|||
public class StreamingTest extends SolrCloudTestCase {
|
||||
|
||||
public static final String COLLECTIONORALIAS = "streams";
|
||||
public static final String MULTI_REPLICA_COLLECTIONORALIAS = "streams-multi-replica";
|
||||
|
||||
private static final StreamFactory streamFactory = new StreamFactory()
|
||||
.withFunctionName("search", CloudSolrStream.class)
|
||||
|
@ -103,7 +106,8 @@ public static void configureCluster() throws Exception {
|
|||
} else {
|
||||
collection = COLLECTIONORALIAS;
|
||||
}
|
||||
CollectionAdminRequest.createCollection(collection, "conf", numShards, 1).process(cluster.getSolrClient());
|
||||
CollectionAdminRequest.createCollection(collection, "conf", numShards, 1)
|
||||
.process(cluster.getSolrClient());
|
||||
cluster.waitForActiveCollection(collection, numShards, numShards);
|
||||
if (useAlias) {
|
||||
CollectionAdminRequest.createAlias(COLLECTIONORALIAS, collection).process(cluster.getSolrClient());
|
||||
|
@ -111,6 +115,20 @@ public static void configureCluster() throws Exception {
|
|||
|
||||
zkHost = cluster.getZkServer().getZkAddress();
|
||||
streamFactory.withCollectionZkHost(COLLECTIONORALIAS, zkHost);
|
||||
|
||||
// Set up multi-replica collection
|
||||
if (useAlias) {
|
||||
collection = MULTI_REPLICA_COLLECTIONORALIAS + "_collection";
|
||||
} else {
|
||||
collection = MULTI_REPLICA_COLLECTIONORALIAS;
|
||||
}
|
||||
CollectionAdminRequest.createCollection(collection, "conf", numShards, 1, 1, 1)
|
||||
.setMaxShardsPerNode(numShards * 3)
|
||||
.process(cluster.getSolrClient());
|
||||
cluster.waitForActiveCollection(collection, numShards, numShards * 3);
|
||||
if (useAlias) {
|
||||
CollectionAdminRequest.createAlias(MULTI_REPLICA_COLLECTIONORALIAS, collection).process(cluster.getSolrClient());
|
||||
}
|
||||
}
|
||||
|
||||
private static final String id = "id";
|
||||
|
@ -2554,6 +2572,43 @@ public void testParallelRankStream() throws Exception {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTupleStreamGetShardsPreference() throws Exception {
|
||||
StreamContext streamContext = new StreamContext();
|
||||
streamContext.setSolrClientCache(new SolrClientCache());
|
||||
streamContext.setRequestReplicaListTransformerGenerator(new RequestReplicaListTransformerGenerator(ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":TLOG", null, null, null));
|
||||
|
||||
streamContext.setRequestParams(mapParams(ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":nrt"));
|
||||
|
||||
try {
|
||||
ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
|
||||
List<String> strings = zkStateReader.aliasesManager.getAliases().resolveAliases(MULTI_REPLICA_COLLECTIONORALIAS);
|
||||
String collName = strings.size() > 0 ? strings.get(0) : MULTI_REPLICA_COLLECTIONORALIAS;
|
||||
Map<String, String> replicaTypeMap = mapReplicasToReplicaType(zkStateReader.getClusterState().getCollectionOrNull(collName));
|
||||
|
||||
// Test from extra params
|
||||
SolrParams sParams = mapParams("q", "*:*", ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":pull");
|
||||
testTupleStreamSorting(streamContext, sParams, "PULL", replicaTypeMap);
|
||||
|
||||
// Test defaults from streamContext.getParams()
|
||||
testTupleStreamSorting(streamContext, new ModifiableSolrParams(), "NRT", replicaTypeMap);
|
||||
|
||||
// Test defaults from the RLTG
|
||||
streamContext.setRequestParams(new ModifiableSolrParams());
|
||||
testTupleStreamSorting(streamContext, new ModifiableSolrParams(), "TLOG", replicaTypeMap);
|
||||
} finally {
|
||||
streamContext.getSolrClientCache().close();
|
||||
}
|
||||
}
|
||||
|
||||
public void testTupleStreamSorting(StreamContext streamContext, SolrParams solrParams, String replicaType, Map<String, String> replicaTypeMap) throws Exception {
|
||||
List<String> shards = TupleStream.getShards(cluster.getZkClient().getZkServerAddress(), MULTI_REPLICA_COLLECTIONORALIAS, streamContext, solrParams);
|
||||
for (String shard : shards) {
|
||||
assertEquals(shard, replicaType.toUpperCase(Locale.ROOT), replicaTypeMap.getOrDefault(shard, "").toUpperCase(Locale.ROOT));
|
||||
}
|
||||
}
|
||||
|
||||
protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
|
||||
tupleStream.open();
|
||||
List<Tuple> tuples = new ArrayList();
|
||||
|
|
|
@ -490,4 +490,21 @@ public class SolrCloudTestCase extends SolrTestCaseJ4 {
|
|||
cluster.waitForAllNodes(timeoutSeconds);
|
||||
}
|
||||
|
||||
public static Map<String, String> mapReplicasToReplicaType(DocCollection collection) {
|
||||
Map<String, String> replicaTypeMap = new HashMap<>();
|
||||
for (Slice slice : collection.getSlices()) {
|
||||
for (Replica replica : slice.getReplicas()) {
|
||||
String coreUrl = replica.getCoreUrl();
|
||||
// It seems replica reports its core URL with a trailing slash while shard
|
||||
// info returned from the query doesn't. Oh well. We will include both, just in case
|
||||
replicaTypeMap.put(coreUrl, replica.getType().toString());
|
||||
if (coreUrl.endsWith("/")) {
|
||||
replicaTypeMap.put(coreUrl.substring(0, coreUrl.length() - 1), replica.getType().toString());
|
||||
}else {
|
||||
replicaTypeMap.put(coreUrl + "/", replica.getType().toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
return replicaTypeMap;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue