SOLR-12674: RollupStream should not use the HashQueryParser for 1 worker

This commit is contained in:
Varun Thacker 2018-08-19 11:58:14 -07:00
parent d6e1d4a4ba
commit 3e4545219e
5 changed files with 67 additions and 6 deletions

View File

@ -245,6 +245,8 @@ Bug Fixes
* SOLR-12475: Fix MaxSizeAutoCommitTest failures (Rupa Shankar, Anshum Gupta)
* SOLR-12674: RollupStream should not use the HashQueryParser for 1 worker. (Varun Thacker)
Optimizations
----------------------

View File

@ -39,6 +39,7 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRef;
import org.apache.lucene.util.CharsRefBuilder;
import org.apache.lucene.util.FixedBitSet;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.schema.FieldType;
@ -68,6 +69,9 @@ public class HashQParserPlugin extends QParserPlugin {
public Query parse() {
int workers = localParams.getInt("workers", 0);
if (workers < 2) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "workers needs to be more than 1");
}
int worker = localParams.getInt("worker", 0);
String keys = params.get("partitionKeys");
keys = keys.replace(" ", "");

View File

@ -66,7 +66,7 @@ public class TestHashQParserPlugin extends SolrTestCaseJ4 {
//Test with string hash
ModifiableSolrParams params = new ModifiableSolrParams();
params.add("q", "*:*");
params.add("fq", "{!hash worker=0 workers=1 cost="+getCost(random())+"}");
params.add("fq", "{!hash worker=0 workers=2 cost="+getCost(random())+"}");
params.add("partitionKeys", "a_s");
params.add("wt", "xml");
String response = h.query(req(params));
@ -75,7 +75,7 @@ public class TestHashQParserPlugin extends SolrTestCaseJ4 {
//Test with int hash
params = new ModifiableSolrParams();
params.add("q", "*:*");
params.add("fq", "{!hash worker=0 workers=1 cost="+getCost(random())+"}");
params.add("fq", "{!hash worker=0 workers=2 cost="+getCost(random())+"}");
params.add("partitionKeys", "a_i");
params.add("wt", "xml");
response = h.query(req(params));

View File

@ -131,14 +131,12 @@ public class SolrStream extends TupleStream {
private SolrParams loadParams(SolrParams paramsIn) throws IOException {
ModifiableSolrParams solrParams = new ModifiableSolrParams(paramsIn);
if (params.get("partitionKeys") != null) {
if(!params.get("partitionKeys").equals("none")) {
if(!params.get("partitionKeys").equals("none") && numWorkers > 1) {
String partitionFilter = getPartitionFilter();
solrParams.add("fq", partitionFilter);
}
} else {
if(numWorkers > 1) {
} else if(numWorkers > 1) {
throw new IOException("When numWorkers > 1 partitionKeys must be set. Set partitionKeys=none to send the entire stream to each worker.");
}
}
if(checkpoint > 0) {

View File

@ -1784,6 +1784,63 @@ public void testParallelRankStream() throws Exception {
}
@Test
public void testRollupWithNoParallel() throws Exception {
new UpdateRequest()
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
.add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
.add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
.add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
.add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
.add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
.add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
.add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10")
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
StreamContext streamContext = new StreamContext();
SolrClientCache solrClientCache = new SolrClientCache();
streamContext.setSolrClientCache(solrClientCache);
try {
//Intentionally adding partitionKeys to trigger SOLR-12674
SolrParams sParamsA = mapParams("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "partitionKeys", "a_s");
CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
Bucket[] buckets = {new Bucket("a_s")};
Metric[] metrics = {new SumMetric("a_i"),
new SumMetric("a_f"),
new MinMetric("a_i"),
new MinMetric("a_f"),
new MaxMetric("a_i"),
new MaxMetric("a_f"),
new MeanMetric("a_i"),
new MeanMetric("a_f"),
new CountMetric()};
RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
rollupStream.setStreamContext(streamContext);
List<Tuple> tuples = getTuples(rollupStream);
assertEquals(3, tuples.size());
List<String> shardUrls = TupleStream.getShards(cluster.getZkServer().getZkAddress(), COLLECTIONORALIAS, streamContext);
ModifiableSolrParams solrParams = new ModifiableSolrParams();
solrParams.add("qt", "/stream");
solrParams.add("expr", "rollup(search(" + COLLECTIONORALIAS + ",q=\"*:*\",fl=\"a_s,a_i,a_f\",sort=\"a_s desc\",partitionKeys=\"a_s\"),over=\"a_s\")\n");
SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams);
streamContext = new StreamContext();
solrStream.setStreamContext(streamContext);
tuples = getTuples(solrStream);
assert (tuples.size() == 3);
} finally {
solrClientCache.close();
}
}
@Test
public void testParallelRollupStream() throws Exception {