diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java index 661704f401e..e0f118654a3 100644 --- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java @@ -31,9 +31,17 @@ import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; import org.apache.solr.client.solrj.io.graph.GatherNodesStream; import org.apache.solr.client.solrj.io.graph.ShortestPathStream; +import org.apache.solr.client.solrj.io.ops.AndOperation; import org.apache.solr.client.solrj.io.ops.ConcatOperation; import org.apache.solr.client.solrj.io.ops.DistinctOperation; +import org.apache.solr.client.solrj.io.ops.EqualsOperation; +import org.apache.solr.client.solrj.io.ops.GreaterThanEqualToOperation; +import org.apache.solr.client.solrj.io.ops.GreaterThanOperation; import org.apache.solr.client.solrj.io.ops.GroupOperation; +import org.apache.solr.client.solrj.io.ops.LessThanEqualToOperation; +import org.apache.solr.client.solrj.io.ops.LessThanOperation; +import org.apache.solr.client.solrj.io.ops.NotOperation; +import org.apache.solr.client.solrj.io.ops.OrOperation; import org.apache.solr.client.solrj.io.ops.ReplaceOperation; import org.apache.solr.client.solrj.io.stream.*; import org.apache.solr.client.solrj.io.stream.expr.Explanation; @@ -154,7 +162,16 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, // stream reduction operations .withFunctionName("group", GroupOperation.class) - .withFunctionName("distinct", DistinctOperation.class); + .withFunctionName("distinct", DistinctOperation.class) + .withFunctionName("having", HavingStream.class) + .withFunctionName("and", AndOperation.class) + .withFunctionName("or", OrOperation.class) + .withFunctionName("not", NotOperation.class) + .withFunctionName("gt", GreaterThanOperation.class) + .withFunctionName("lt", LessThanOperation.class) + .withFunctionName("eq", EqualsOperation.class) + .withFunctionName("lteq", LessThanEqualToOperation.class) + .withFunctionName("gteq", GreaterThanEqualToOperation.class); // This pulls all the overrides and additions from the config List pluginInfos = core.getSolrConfig().getPluginInfos(Expressible.class.getName()); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java index 1316af4fc53..fd088f11f85 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java @@ -33,8 +33,16 @@ import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.ComparatorOrder; import org.apache.solr.client.solrj.io.comp.FieldComparator; +import org.apache.solr.client.solrj.io.ops.AndOperation; import org.apache.solr.client.solrj.io.ops.ConcatOperation; +import org.apache.solr.client.solrj.io.ops.EqualsOperation; +import org.apache.solr.client.solrj.io.ops.GreaterThanEqualToOperation; +import org.apache.solr.client.solrj.io.ops.GreaterThanOperation; import org.apache.solr.client.solrj.io.ops.GroupOperation; +import org.apache.solr.client.solrj.io.ops.LessThanEqualToOperation; +import org.apache.solr.client.solrj.io.ops.LessThanOperation; +import org.apache.solr.client.solrj.io.ops.NotOperation; +import org.apache.solr.client.solrj.io.ops.OrOperation; import org.apache.solr.client.solrj.io.ops.ReplaceOperation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; @@ -803,6 +811,199 @@ public class StreamExpressionTest extends SolrCloudTestCase { } + + @Test + public void testHavingStream() throws Exception { + + SolrClientCache solrClientCache = new SolrClientCache(); + + new UpdateRequest() + .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1", "subject", "blah blah blah 0") + .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2", "subject", "blah blah blah 2") + .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "subject", "blah blah blah 3") + .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "subject", "blah blah blah 4") + .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5", "subject", "blah blah blah 1") + .add(id, "5", "a_s", "hello3", "a_i", "5", "a_f", "6", "subject", "blah blah blah 5") + .add(id, "6", "a_s", "hello4", "a_i", "6", "a_f", "7", "subject", "blah blah blah 6") + .add(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "8", "subject", "blah blah blah 7") + .add(id, "8", "a_s", "hello3", "a_i", "8", "a_f", "9", "subject", "blah blah blah 8") + .add(id, "9", "a_s", "hello0", "a_i", "9", "a_f", "10", "subject", "blah blah blah 9") + .commit(cluster.getSolrClient(), COLLECTIONORALIAS); + + TupleStream stream; + List tuples; + + StreamFactory factory = new StreamFactory() + .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress()) + .withFunctionName("search", CloudSolrStream.class) + .withFunctionName("having", HavingStream.class) + .withFunctionName("and", AndOperation.class) + .withFunctionName("or", OrOperation.class) + .withFunctionName("not", NotOperation.class) + .withFunctionName("gt", GreaterThanOperation.class) + .withFunctionName("lt", LessThanOperation.class) + .withFunctionName("eq", EqualsOperation.class) + .withFunctionName("lteq", LessThanEqualToOperation.class) + .withFunctionName("gteq", GreaterThanEqualToOperation.class); + + stream = factory.constructStream("having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), eq(a_i, 9))"); + StreamContext context = new StreamContext(); + context.setSolrClientCache(solrClientCache); + stream.setStreamContext(context); + tuples = getTuples(stream); + + assert(tuples.size() == 1); + Tuple t = tuples.get(0); + assertTrue(t.getString("id").equals("9")); + + stream = factory.constructStream("having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), and(eq(a_i, 9),lt(a_i, 10)))"); + context = new StreamContext(); + context.setSolrClientCache(solrClientCache); + stream.setStreamContext(context); + tuples = getTuples(stream); + + assert(tuples.size() == 1); + t = tuples.get(0); + assertTrue(t.getString("id").equals("9")); + + stream = factory.constructStream("having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), or(eq(a_i, 9),eq(a_i, 8)))"); + context = new StreamContext(); + context.setSolrClientCache(solrClientCache); + stream.setStreamContext(context); + tuples = getTuples(stream); + + assert(tuples.size() == 2); + t = tuples.get(0); + assertTrue(t.getString("id").equals("8")); + + t = tuples.get(1); + assertTrue(t.getString("id").equals("9")); + + + stream = factory.constructStream("having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), and(eq(a_i, 9),not(eq(a_i, 9))))"); + context = new StreamContext(); + context.setSolrClientCache(solrClientCache); + stream.setStreamContext(context); + tuples = getTuples(stream); + + assert(tuples.size() == 0); + + + stream = factory.constructStream("having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\"), and(lteq(a_i, 9), gteq(a_i, 8)))"); + context = new StreamContext(); + context.setSolrClientCache(solrClientCache); + stream.setStreamContext(context); + tuples = getTuples(stream); + + System.out.println("####Tuples:"+tuples.size()); + assert(tuples.size() == 2); + + t = tuples.get(0); + assertTrue(t.getString("id").equals("8")); + + t = tuples.get(1); + assertTrue(t.getString("id").equals("9")); + + solrClientCache.close(); + } + + + @Test + public void testParallelHavingStream() throws Exception { + + SolrClientCache solrClientCache = new SolrClientCache(); + + new UpdateRequest() + .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1", "subject", "blah blah blah 0") + .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2", "subject", "blah blah blah 2") + .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "subject", "blah blah blah 3") + .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "subject", "blah blah blah 4") + .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5", "subject", "blah blah blah 1") + .add(id, "5", "a_s", "hello3", "a_i", "5", "a_f", "6", "subject", "blah blah blah 5") + .add(id, "6", "a_s", "hello4", "a_i", "6", "a_f", "7", "subject", "blah blah blah 6") + .add(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "8", "subject", "blah blah blah 7") + .add(id, "8", "a_s", "hello3", "a_i", "8", "a_f", "9", "subject", "blah blah blah 8") + .add(id, "9", "a_s", "hello0", "a_i", "9", "a_f", "10", "subject", "blah blah blah 9") + .commit(cluster.getSolrClient(), COLLECTIONORALIAS); + + TupleStream stream; + List tuples; + + StreamFactory factory = new StreamFactory() + .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress()) + .withFunctionName("search", CloudSolrStream.class) + .withFunctionName("having", HavingStream.class) + .withFunctionName("and", AndOperation.class) + .withFunctionName("or", OrOperation.class) + .withFunctionName("not", NotOperation.class) + .withFunctionName("gt", GreaterThanOperation.class) + .withFunctionName("lt", LessThanOperation.class) + .withFunctionName("eq", EqualsOperation.class) + .withFunctionName("lteq", LessThanEqualToOperation.class) + .withFunctionName("gteq", GreaterThanEqualToOperation.class) + .withFunctionName("parallel", ParallelStream.class); + + stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), eq(a_i, 9)))"); + StreamContext context = new StreamContext(); + context.setSolrClientCache(solrClientCache); + stream.setStreamContext(context); + tuples = getTuples(stream); + + assert(tuples.size() == 1); + Tuple t = tuples.get(0); + assertTrue(t.getString("id").equals("9")); + + stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), and(eq(a_i, 9),lt(a_i, 10))))"); + context = new StreamContext(); + context.setSolrClientCache(solrClientCache); + stream.setStreamContext(context); + tuples = getTuples(stream); + + assert(tuples.size() == 1); + t = tuples.get(0); + assertTrue(t.getString("id").equals("9")); + + stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\",having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), or(eq(a_i, 9),eq(a_i, 8))))"); + context = new StreamContext(); + context.setSolrClientCache(solrClientCache); + stream.setStreamContext(context); + tuples = getTuples(stream); + + assert(tuples.size() == 2); + t = tuples.get(0); + assertTrue(t.getString("id").equals("8")); + + t = tuples.get(1); + assertTrue(t.getString("id").equals("9")); + + + stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\", having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), and(eq(a_i, 9),not(eq(a_i, 9)))))"); + context = new StreamContext(); + context.setSolrClientCache(solrClientCache); + stream.setStreamContext(context); + tuples = getTuples(stream); + + assert(tuples.size() == 0); + + + stream = factory.constructStream("parallel(" + COLLECTIONORALIAS + ", workers=2, sort=\"a_f asc\",having(search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=id), and(lteq(a_i, 9), gteq(a_i, 8))))"); + context = new StreamContext(); + context.setSolrClientCache(solrClientCache); + stream.setStreamContext(context); + tuples = getTuples(stream); + + System.out.println("####Tuples:"+tuples.size()); + assert(tuples.size() == 2); + + t = tuples.get(0); + assertTrue(t.getString("id").equals("8")); + + t = tuples.get(1); + assertTrue(t.getString("id").equals("9")); + + solrClientCache.close(); + } + @Test public void testFetchStream() throws Exception {