SOLR-13306 Add a request parameter to execute a streaming expression locally

This commit is contained in:
Gus Heck 2019-05-08 12:13:07 -04:00
parent f4399a495e
commit 76b854cb4f
11 changed files with 246 additions and 74 deletions

View File

@ -73,7 +73,9 @@ Jetty 9.4.14.v20181114
New Features
----------------------
* SOLR-13320 : add an update param failOnVersionConflicts=false to updates not fail when there is a version conflict (noble)
* SOLR-13320: add an update param failOnVersionConflicts=false to updates not fail when there is a version conflict (noble)
* SOLR-13306: Add a request parameter to execute a streaming expression locally
================== 8.1.0 ==================

View File

@ -162,6 +162,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
int worker = params.getInt("workerID", 0);
int numWorkers = params.getInt("numWorkers", 1);
boolean local = params.getBool("streamLocalOnly", false);
StreamContext context = new StreamContext();
context.put("shards", getCollectionShards(params));
context.workerID = worker;
@ -171,6 +172,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
context.setObjectCache(objectCache);
context.put("core", this.coreName);
context.put("solr-core", req.getCore());
context.setLocal(local);
tupleStream.setStreamContext(context);
// if asking for explanation then go get it

View File

@ -114,6 +114,15 @@ unless the jvm has been started with `-DStreamingExpressionMacros=true` (usually
Because streaming expressions relies on the `/export` handler, many of the field and field type requirements to use `/export` are also requirements for `/stream`, particularly for `sort` and `fl` parameters. Please see the section <<exporting-result-sets.adoc#exporting-result-sets,Exporting Result Sets>> for details.
=== Local Execution
In certain special cases such as joining documents on a value that is 1:1 with the portion of the id used in
composite routing, the relevant data is always co-located on the same node. When this happens, fanning out requests
among many nodes and waiting for a response from all nodes is inefficient. In cases where data co-location holds true
for the entire expression, it may be faster for the client to send the expression to each slice with
`&streamLocalOnly=true` and handle merging of the results (if required) locally. This is an advanced option, relying
on a convenient organization of the index, and should only be considered if normal usage poses a performance issue.
== Types of Streaming Expressions
=== About Stream Sources

View File

@ -381,6 +381,9 @@ public class CloudSolrStream extends TupleStream implements Expressible {
SolrStream solrStream = new SolrStream(shardUrl, mParams);
if(streamContext != null) {
solrStream.setStreamContext(streamContext);
if (streamContext.isLocal()) {
solrStream.setDistrib(false);
}
}
solrStream.setFieldMappings(this.fieldMappings);
solrStreams.add(solrStream);

View File

@ -398,6 +398,9 @@ public class SignificantTermsStream extends TupleStream implements Expressible{
params.add("minTermLength", Integer.toString(minTermLength));
params.add("field", field);
params.add("numTerms", String.valueOf(numTerms*5));
if (streamContext.isLocal()) {
params.add("distrib", "false");
}
QueryRequest request= new QueryRequest(params, SolrRequest.METHOD.POST);
QueryResponse response = request.process(solrClient);

View File

@ -65,6 +65,7 @@ public class SolrStream extends TupleStream {
private String slice;
private long checkpoint = -1;
private CloseableHttpResponse closeableHttpResponse;
private boolean distrib = true;
/**
* @param baseUrl Base URL of the stream.
@ -89,6 +90,7 @@ public class SolrStream extends TupleStream {
}
public void setStreamContext(StreamContext context) {
this.distrib = !context.isLocal();
this.numWorkers = context.numWorkers;
this.workerID = context.workerID;
this.cache = context.getSolrClientCache();
@ -106,7 +108,11 @@ public class SolrStream extends TupleStream {
}
try {
tupleStreamParser = constructParser(client, loadParams(params));
SolrParams requestParams = loadParams(params);
if (!distrib) {
((ModifiableSolrParams) requestParams).add("distrib","false");
}
tupleStreamParser = constructParser(client, requestParams);
} catch (Exception e) {
throw new IOException("params " + params, e);
}
@ -128,7 +134,7 @@ public class SolrStream extends TupleStream {
this.checkpoint = checkpoint;
}
private SolrParams loadParams(SolrParams paramsIn) throws IOException {
private ModifiableSolrParams loadParams(SolrParams paramsIn) throws IOException {
ModifiableSolrParams solrParams = new ModifiableSolrParams(paramsIn);
if (params.get("partitionKeys") != null) {
if(!params.get("partitionKeys").equals("none") && numWorkers > 1) {
@ -219,6 +225,14 @@ public class SolrStream extends TupleStream {
}
}
public void setDistrib(boolean distrib) {
this.distrib = distrib;
}
public boolean getDistrib() {
return distrib;
}
public static class HandledException extends IOException {
public HandledException(String msg) {
super(msg);

View File

@ -200,6 +200,9 @@ public class SqlStream extends TupleStream implements Expressible {
this.tupleStream = new SolrStream(url, mParams);
if(streamContext != null) {
tupleStream.setStreamContext(streamContext);
if(streamContext.isLocal()) {
mParams.add("distrib", "false");
}
}
} catch (Exception e) {
throw new IOException(e);

View File

@ -206,6 +206,9 @@ public class StatsStream extends TupleStream implements Expressible {
addStats(paramsLoc, metrics);
paramsLoc.set("stats", "true");
paramsLoc.set("rows", "0");
if (streamContext.isLocal()) {
paramsLoc.set("distrib", "false");
}
Map<String, List<String>> shardsMap = (Map<String, List<String>>)streamContext.get("shards");
if(shardsMap == null) {

View File

@ -34,7 +34,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
**/
public class StreamContext implements Serializable{
public class StreamContext implements Serializable {
private Map entries = new HashMap();
private Map tupleContext = new HashMap();
@ -45,6 +45,7 @@ public class StreamContext implements Serializable{
private SolrClientCache clientCache;
private ModelCache modelCache;
private StreamFactory streamFactory;
private boolean local;
public ConcurrentMap getObjectCache() {
return this.objectCache;
@ -54,7 +55,7 @@ public class StreamContext implements Serializable{
this.objectCache = objectCache;
}
public Map<String, Object> getLets(){
public Map<String, Object> getLets() {
return lets;
}
@ -101,4 +102,12 @@ public class StreamContext implements Serializable{
public StreamFactory getStreamFactory() {
return this.streamFactory;
}
public void setLocal(boolean local) {
this.local = local;
}
public boolean isLocal() {
return local;
}
}

View File

@ -150,6 +150,10 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
shards.add(url);
}
}
Object core = streamContext.get("core");
if (streamContext != null && streamContext.isLocal() && core != null) {
shards.removeIf(shardUrl -> !shardUrl.contains((CharSequence) core));
}
return shards;
}

View File

@ -49,6 +49,9 @@ import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.junit.Assume;
@ -2297,6 +2300,122 @@ public void testParallelRankStream() throws Exception {
}
}
/**
* This test verifies that setting a core into the stream context entries and streamContext.local = true causes the
* streaming expression to only consider data found on the local node.
*/
@Test
public void streamLocalTests() throws Exception {
new UpdateRequest()
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
.add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamContext streamContext = new StreamContext();
streamContext.setLocal(true);
ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
List<String> strings = zkStateReader.aliasesManager.getAliases().resolveAliases(COLLECTIONORALIAS);
String collName = strings.size() > 0 ? strings.get(0) : COLLECTIONORALIAS;
zkStateReader.forceUpdateCollection(collName);
DocCollection collection = zkStateReader.getClusterState().getCollectionOrNull(collName);
List<Replica> replicas = collection.getReplicas();
streamContext.getEntries().put("core",replicas.get(random().nextInt(replicas.size())).getCoreName());
SolrClientCache solrClientCache = new SolrClientCache();
streamContext.setSolrClientCache(solrClientCache);
//Basic CloudSolrStream Test with Descending Sort
try {
SolrParams sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i desc");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
stream.setStreamContext(streamContext);
List<Tuple> tuples = getTuples(stream);
// note if hashing algo changes this might break
switch (tuples.size()) {
case 5: // 1 shard
assertOrder(tuples, 4, 3, 2, 1, 0);
break;
case 3: // 2 shards case 1 (randomized)
assertOrder(tuples, 4, 1, 0);
break;
case 2: // 2 shards case 2 (randomized)
assertOrder(tuples, 3, 2);
break;
default: // nope, no way, no how, never good.
fail("should have 3, 5 or 2 tuples, has hashing algorithm changed?");
}
//With Ascending Sort
sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
stream.setStreamContext(streamContext);
tuples = getTuples(stream);
// note if hashing algo changes this might break
switch (tuples.size()) {
case 5: // 1 shard
assertOrder(tuples, 0, 1, 2, 3, 4);
break;
case 3: // 2 shards case 1 (randomized)
assertOrder(tuples, 0, 1, 4);
break;
case 2: // 2 shards case 2 (randomized)
assertOrder(tuples, 2, 3);
break;
default: // nope, no way, no how, never good.
fail("should have 3, 5 or 2 tuples, has hashing algorithm changed?");
}
//Test compound sort
sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i desc");
stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
stream.setStreamContext(streamContext);
tuples = getTuples(stream);
// note if hashing algo changes this might break
switch (tuples.size()) {
case 5: // 1 shard
assertOrder(tuples, 2, 0, 1, 3, 4);
break;
case 3: // 2 shards case 1 (randomized)
assertOrder(tuples, 0, 1, 4);
break;
case 2: // 2 shards case 2 (randomized)
assertOrder(tuples, 2, 3);
break;
default: // nope, no way, no how, never good.
fail("should have 3, 5 or 2 tuples, has hashing algorithm changed?");
}
sParams = mapParams("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
stream.setStreamContext(streamContext);
tuples = getTuples(stream);
// note if hashing algo changes this might break
switch (tuples.size()) {
case 5: // 1 shard
assertOrder(tuples, 0, 2, 1, 3, 4);
break;
case 3: // 2 shards case 1 (randomized)
assertOrder(tuples, 0, 1, 4);
break;
case 2: // 2 shards case 2 (randomized)
assertOrder(tuples, 2, 3);
break;
default: // nope, no way, no how, never good.
fail("should have 3, 5 or 2 tuples, has hashing algorithm changed?");
}
} finally {
solrClientCache.close();
}
}
@Test
public void testDateBoolSorting() throws Exception {
@ -2463,8 +2582,9 @@ public void testParallelRankStream() throws Exception {
for(int val : ids) {
Tuple t = tuples.get(i);
String tip = (String)t.get("id");
if(!tip.equals(Integer.toString(val))) {
throw new Exception("Found value:"+tip+" expecting:"+val);
String valStr = Integer.toString(val);
if(!tip.equals(valStr)) {
assertEquals("Found value:"+tip+" expecting:"+valStr, val, tip);
}
++i;
}