diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 6d83c534b32..7105330440d 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -40,6 +40,11 @@ Optimizations in the sets of SHOULD and FILTER clauses, or both in MUST/FILTER and MUST_NOT clauses. (Spyros Kapnissis via Adrien Grand, Uwe Schindler) +* LUCENE-7506: FastTaxonomyFacetCounts should use CPU in proportion to + the size of the intersected set of hits from the query and documents + that have a facet value, so sparse faceting works as expected + (Adrien Grand via Mike McCandless) + Other * LUCENE-7328: Remove LegacyNumericEncoding from GeoPointField. (Nick Knize) @@ -92,6 +97,10 @@ Bug Fixes * LUCENE-7493: FacetCollector.search threw an unexpected exception if you asked for zero hits but wanted facets (Mahesh via Mike McCandless) +* LUCENE-7505: AnalyzingInfixSuggester returned invalid results when + allTermsRequired is false and context filters are specified (Mike + McCandless) + Improvements * LUCENE-7439: FuzzyQuery now matches all terms within the specified diff --git a/lucene/facet/src/java/org/apache/lucene/facet/DrillSideways.java b/lucene/facet/src/java/org/apache/lucene/facet/DrillSideways.java index cc5647ef03e..57f0a32742b 100644 --- a/lucene/facet/src/java/org/apache/lucene/facet/DrillSideways.java +++ b/lucene/facet/src/java/org/apache/lucene/facet/DrillSideways.java @@ -157,7 +157,7 @@ public class DrillSideways { DrillSidewaysQuery dsq = new DrillSidewaysQuery(baseQuery, drillDownCollector, drillSidewaysCollectors, drillDownQueries, scoreSubDocsAtOnce()); if (hitCollector.needsScores() == false) { - // this is a borrible hack in order to make sure IndexSearcher will not + // this is a horrible hack in order to make sure IndexSearcher will not // attempt to cache the DrillSidewaysQuery hitCollector = new FilterCollector(hitCollector) { @Override diff --git a/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/FastTaxonomyFacetCounts.java b/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/FastTaxonomyFacetCounts.java index 7ad5430bac6..ef96073759a 100644 --- a/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/FastTaxonomyFacetCounts.java +++ b/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/FastTaxonomyFacetCounts.java @@ -17,12 +17,14 @@ package org.apache.lucene.facet.taxonomy; import java.io.IOException; +import java.util.Arrays; import java.util.List; import org.apache.lucene.facet.FacetsCollector.MatchingDocs; import org.apache.lucene.facet.FacetsCollector; import org.apache.lucene.facet.FacetsConfig; import org.apache.lucene.index.BinaryDocValues; +import org.apache.lucene.search.ConjunctionDISI; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.util.BytesRef; @@ -55,29 +57,24 @@ public class FastTaxonomyFacetCounts extends IntTaxonomyFacets { continue; } - DocIdSetIterator docs = hits.bits.iterator(); + DocIdSetIterator it = ConjunctionDISI.intersectIterators(Arrays.asList( + hits.bits.iterator(), dv)); - int doc; - while ((doc = docs.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { - if (dv.docID() < doc) { - dv.advance(doc); - } - if (dv.docID() == doc) { - final BytesRef bytesRef = dv.binaryValue(); - byte[] bytes = bytesRef.bytes; - int end = bytesRef.offset + bytesRef.length; - int ord = 0; - int offset = bytesRef.offset; - int prev = 0; - while (offset < end) { - byte b = bytes[offset++]; - if (b >= 0) { - prev = ord = ((ord << 7) | b) + prev; - ++values[ord]; - ord = 0; - } else { - ord = (ord << 7) | (b & 0x7F); - } + for (int doc = it.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = it.nextDoc()) { + final BytesRef bytesRef = dv.binaryValue(); + byte[] bytes = bytesRef.bytes; + int end = bytesRef.offset + bytesRef.length; + int ord = 0; + int offset = bytesRef.offset; + int prev = 0; + while (offset < end) { + byte b = bytes[offset++]; + if (b >= 0) { + prev = ord = ((ord << 7) | b) + prev; + ++values[ord]; + ord = 0; + } else { + ord = (ord << 7) | (b & 0x7F); } } } diff --git a/lucene/suggest/src/java/org/apache/lucene/search/suggest/analyzing/AnalyzingInfixSuggester.java b/lucene/suggest/src/java/org/apache/lucene/search/suggest/analyzing/AnalyzingInfixSuggester.java index d05c39fbe24..aa602373f5a 100644 --- a/lucene/suggest/src/java/org/apache/lucene/search/suggest/analyzing/AnalyzingInfixSuggester.java +++ b/lucene/suggest/src/java/org/apache/lucene/search/suggest/analyzing/AnalyzingInfixSuggester.java @@ -560,12 +560,18 @@ public class AnalyzingInfixSuggester extends Lookup implements Closeable { } if (allMustNot) { - //all are MUST_NOT: add the contextQuery to the main query instead (not as sub-query) + // All are MUST_NOT: add the contextQuery to the main query instead (not as sub-query) for (BooleanClause clause : contextQuery.clauses()) { query.add(clause); } + } else if (allTermsRequired == false) { + // We must carefully upgrade the query clauses to MUST: + BooleanQuery.Builder newQuery = new BooleanQuery.Builder(); + newQuery.add(query.build(), BooleanClause.Occur.MUST); + newQuery.add(contextQuery, BooleanClause.Occur.MUST); + query = newQuery; } else { - //Add contextQuery as sub-query + // Add contextQuery as sub-query query.add(contextQuery, BooleanClause.Occur.MUST); } } @@ -577,7 +583,7 @@ public class AnalyzingInfixSuggester extends Lookup implements Closeable { Query finalQuery = finishQuery(query, allTermsRequired); - //System.out.println("finalQuery=" + query); + //System.out.println("finalQuery=" + finalQuery); // Sort by weight, descending: TopFieldCollector c = TopFieldCollector.create(SORT, num, true, false, false); diff --git a/lucene/suggest/src/test/org/apache/lucene/search/suggest/analyzing/AnalyzingInfixSuggesterTest.java b/lucene/suggest/src/test/org/apache/lucene/search/suggest/analyzing/AnalyzingInfixSuggesterTest.java index 69d3ed6a121..d98d0523f56 100644 --- a/lucene/suggest/src/test/org/apache/lucene/search/suggest/analyzing/AnalyzingInfixSuggesterTest.java +++ b/lucene/suggest/src/test/org/apache/lucene/search/suggest/analyzing/AnalyzingInfixSuggesterTest.java @@ -1258,4 +1258,80 @@ public class AnalyzingInfixSuggesterTest extends LuceneTestCase { a.close(); } } + + public void testContextNotAllTermsRequired() throws Exception { + + Input keys[] = new Input[] { + new Input("lend me your ear", 8, new BytesRef("foobar"), asSet("foo", "bar")), + new Input("a penny saved is a penny earned", 10, new BytesRef("foobaz"), asSet("foo", "baz")) + }; + Path tempDir = createTempDir("analyzingInfixContext"); + + Analyzer a = new MockAnalyzer(random(), MockTokenizer.WHITESPACE, false); + AnalyzingInfixSuggester suggester = new AnalyzingInfixSuggester(newFSDirectory(tempDir), a, a, 3, false); + suggester.build(new InputArrayIterator(keys)); + + // No context provided, all results returned + List results = suggester.lookup(TestUtil.stringToCharSequence("ear", random()), 10, false, true); + assertEquals(2, results.size()); + LookupResult result = results.get(0); + assertEquals("a penny saved is a penny earned", result.key); + assertEquals("a penny saved is a penny earned", result.highlightKey); + assertEquals(10, result.value); + assertEquals(new BytesRef("foobaz"), result.payload); + assertNotNull(result.contexts); + assertEquals(2, result.contexts.size()); + assertTrue(result.contexts.contains(new BytesRef("foo"))); + assertTrue(result.contexts.contains(new BytesRef("baz"))); + + result = results.get(1); + assertEquals("lend me your ear", result.key); + assertEquals("lend me your ear", result.highlightKey); + assertEquals(8, result.value); + assertEquals(new BytesRef("foobar"), result.payload); + assertNotNull(result.contexts); + assertEquals(2, result.contexts.size()); + assertTrue(result.contexts.contains(new BytesRef("foo"))); + assertTrue(result.contexts.contains(new BytesRef("bar"))); + + // Both have "foo" context: + results = suggester.lookup(TestUtil.stringToCharSequence("ear", random()), asSet("foo"), 10, false, true); + assertEquals(2, results.size()); + + result = results.get(0); + assertEquals("a penny saved is a penny earned", result.key); + assertEquals("a penny saved is a penny earned", result.highlightKey); + assertEquals(10, result.value); + assertEquals(new BytesRef("foobaz"), result.payload); + assertNotNull(result.contexts); + assertEquals(2, result.contexts.size()); + assertTrue(result.contexts.contains(new BytesRef("foo"))); + assertTrue(result.contexts.contains(new BytesRef("baz"))); + + result = results.get(1); + assertEquals("lend me your ear", result.key); + assertEquals("lend me your ear", result.highlightKey); + assertEquals(8, result.value); + assertEquals(new BytesRef("foobar"), result.payload); + assertNotNull(result.contexts); + assertEquals(2, result.contexts.size()); + assertTrue(result.contexts.contains(new BytesRef("foo"))); + assertTrue(result.contexts.contains(new BytesRef("bar"))); + + // Only one has "foo" context and len + results = suggester.lookup(TestUtil.stringToCharSequence("len", random()), asSet("foo"), 10, false, true); + assertEquals(1, results.size()); + + result = results.get(0); + assertEquals("lend me your ear", result.key); + assertEquals("lend me your ear", result.highlightKey); + assertEquals(8, result.value); + assertEquals(new BytesRef("foobar"), result.payload); + assertNotNull(result.contexts); + assertEquals(2, result.contexts.size()); + assertTrue(result.contexts.contains(new BytesRef("foo"))); + assertTrue(result.contexts.contains(new BytesRef("bar"))); + + suggester.close(); + } } diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java index b9f30bc271b..3e841bdc4ee 100644 --- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java @@ -19,6 +19,7 @@ package org.apache.solr.handler; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -97,7 +98,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, private StreamFactory streamFactory = new StreamFactory(); private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private String coreName; - private Map daemons = new HashMap<>(); + private Map daemons = Collections.synchronizedMap(new HashMap()); @Override public PermissionNameProvider.Name getPermissionName(AuthorizationContext request) { @@ -245,6 +246,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, if(daemons.containsKey(daemonStream.getId())) { daemons.remove(daemonStream.getId()).close(); } + daemonStream.setDaemons(daemons); daemonStream.open(); //This will start the deamonStream daemons.put(daemonStream.getId(), daemonStream); rsp.add("result-set", new DaemonResponseStream("Deamon:"+daemonStream.getId()+" started on "+coreName)); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java index 77648dfd738..8214f9a3805 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java @@ -52,6 +52,8 @@ public class DaemonStream extends TupleStream implements Expressible { private Exception exception; private long runInterval; private String id; + private Map daemons; + private boolean terminate; private boolean closed = false; private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -64,10 +66,13 @@ public class DaemonStream extends TupleStream implements Expressible { StreamExpressionNamedParameter idExpression = factory.getNamedOperand(expression, "id"); StreamExpressionNamedParameter runExpression = factory.getNamedOperand(expression, "runInterval"); StreamExpressionNamedParameter queueExpression = factory.getNamedOperand(expression, "queueSize"); + StreamExpressionNamedParameter terminateExpression = factory.getNamedOperand(expression, "terminate"); + String id = null; long runInterval = 0L; int queueSize = 0; + boolean terminate = false; if(idExpression == null) { throw new IOException("Invalid expression id parameter expected"); @@ -82,24 +87,26 @@ public class DaemonStream extends TupleStream implements Expressible { } if(queueExpression != null) { - queueSize= Integer.parseInt(((StreamExpressionValue)queueExpression.getParameter()).getValue()); + queueSize= Integer.parseInt(((StreamExpressionValue) queueExpression.getParameter()).getValue()); } - // validate expression contains only what we want. - if(expression.getParameters().size() != streamExpressions.size() + 2 && - expression.getParameters().size() != streamExpressions.size() + 3) { - throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression)); + if(terminateExpression != null) { + terminate = Boolean.parseBoolean(((StreamExpressionValue) terminateExpression.getParameter()).getValue()); } if(1 != streamExpressions.size()){ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size())); } - init(tupleStream, id, runInterval, queueSize); + init(tupleStream, id, runInterval, queueSize, terminate); + } + + public DaemonStream(TupleStream tupleStream, String id, long runInterval, int queueSize, boolean terminate) { + init(tupleStream, id, runInterval, queueSize, terminate); } public DaemonStream(TupleStream tupleStream, String id, long runInterval, int queueSize) { - init(tupleStream, id, runInterval, queueSize); + this(tupleStream, id, runInterval, queueSize, false); } @Override @@ -126,6 +133,7 @@ public class DaemonStream extends TupleStream implements Expressible { expression.addParameter(new StreamExpressionNamedParameter("id", id)); expression.addParameter(new StreamExpressionNamedParameter("runInterval", Long.toString(runInterval))); expression.addParameter(new StreamExpressionNamedParameter("queueSize", Integer.toString(queueSize))); + expression.addParameter(new StreamExpressionNamedParameter("terminate", Boolean.toString(terminate))); return expression; } @@ -148,10 +156,16 @@ public class DaemonStream extends TupleStream implements Expressible { } public void init(TupleStream tupleStream, String id, long runInterval, int queueSize) { + init(tupleStream, id, runInterval, queueSize, false); + } + + public void init(TupleStream tupleStream, String id, long runInterval, int queueSize, boolean terminate) { this.tupleStream = tupleStream; this.id = id; this.runInterval = runInterval; this.queueSize = queueSize; + this.terminate = terminate; + if(queueSize > 0) { queue = new ArrayBlockingQueue(queueSize); eatTuples = false; @@ -228,6 +242,10 @@ public class DaemonStream extends TupleStream implements Expressible { return tuple; } + public void setDaemons(Map daemons) { + this.daemons = daemons; + } + private synchronized void incrementIterations() { ++iterations; } @@ -279,6 +297,18 @@ public class DaemonStream extends TupleStream implements Expressible { errors = 0; // Reset errors on successful run. if (tuple.fields.containsKey("sleepMillis")) { this.sleepMillis = tuple.getLong("sleepMillis"); + + if(terminate && sleepMillis > 0) { + //TopicStream provides sleepMillis > 0 if the last run had no Tuples. + //This means the topic queue is empty. Time to terminate. + //Remove ourselves from the daemons map. + if(daemons != null) { + daemons.remove(id); + } + //Break out of the thread loop and end the run. + break OUTER; + } + this.runInterval = -1; } break INNER; diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java index 842f6a66338..7b5777d2314 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java @@ -533,24 +533,24 @@ public class StreamExpressionTest extends SolrCloudTestCase { // Basic test desc expression = StreamExpressionParser.parse("top(" - + "n=2," - + "unique(" - + "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\")," - + "over=\"a_f\")," - + "sort=\"a_f desc\")"); + + "n=2," + + "unique(" + + "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\")," + + "over=\"a_f\")," + + "sort=\"a_f desc\")"); stream = new RankStream(expression, factory); tuples = getTuples(stream); assert(tuples.size() == 2); - assertOrder(tuples, 4,3); + assertOrder(tuples, 4, 3); // full factory stream = factory.constructStream("top(" - + "n=4," - + "unique(" - + "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")," - + "over=\"a_f\")," - + "sort=\"a_f asc\")"); + + "n=4," + + "unique(" + + "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")," + + "over=\"a_f\")," + + "sort=\"a_f asc\")"); tuples = getTuples(stream); assert(tuples.size() == 4); @@ -827,7 +827,7 @@ public class StreamExpressionTest extends SolrCloudTestCase { .withFunctionName("parallel", ParallelStream.class) .withFunctionName("fetch", FetchStream.class); - stream = factory.constructStream("parallel("+COLLECTION+", workers=2, sort=\"a_f asc\", fetch("+COLLECTION+", search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"2\", fl=\"subject\"))"); + stream = factory.constructStream("parallel(" + COLLECTION + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTION + ", search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"2\", fl=\"subject\"))"); tuples = getTuples(stream); assert(tuples.size() == 10); @@ -853,7 +853,7 @@ public class StreamExpressionTest extends SolrCloudTestCase { assertTrue("blah blah blah 9".equals(t.getString("subject"))); - stream = factory.constructStream("parallel("+COLLECTION+", workers=2, sort=\"a_f asc\", fetch("+COLLECTION+", search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\"))"); + stream = factory.constructStream("parallel(" + COLLECTION + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTION + ", search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\"))"); tuples = getTuples(stream); assert(tuples.size() == 10); @@ -1003,6 +1003,45 @@ public class StreamExpressionTest extends SolrCloudTestCase { } + + @Test + public void testTerminatingDaemonStream() throws Exception { + + new UpdateRequest() + .add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "1") + .add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "2") + .add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3") + .add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4") + .add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "5") + .add(id, "5", "a_s", "hello", "a_i", "10", "a_f", "6") + .add(id, "6", "a_s", "hello", "a_i", "11", "a_f", "7") + .add(id, "7", "a_s", "hello", "a_i", "12", "a_f", "8") + .add(id, "8", "a_s", "hello", "a_i", "13", "a_f", "9") + .add(id, "9", "a_s", "hello", "a_i", "14", "a_f", "10") + .commit(cluster.getSolrClient(), COLLECTION); + + StreamFactory factory = new StreamFactory() + .withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress()) + .withFunctionName("topic", TopicStream.class) + .withFunctionName("daemon", DaemonStream.class); + + StreamExpression expression; + DaemonStream daemonStream; + + SolrClientCache cache = new SolrClientCache(); + StreamContext context = new StreamContext(); + context.setSolrClientCache(cache); + expression = StreamExpressionParser.parse("daemon(topic("+COLLECTION+","+COLLECTION+", q=\"a_s:hello\", initialCheckpoint=0, id=\"topic1\", rows=2, fl=\"id\"" + + "), id=test, runInterval=1000, terminate=true, queueSize=50)"); + daemonStream = (DaemonStream)factory.constructStream(expression); + daemonStream.setStreamContext(context); + + List tuples = getTuples(daemonStream); + assertTrue(tuples.size() == 10); + cache.close(); + } + + @Test public void testRollupStream() throws Exception { @@ -1367,7 +1406,7 @@ public class StreamExpressionTest extends SolrCloudTestCase { assert(tuples.size() == 9); - assertOrder(tuples, 0,1,2,3,4,7,6,8,9); + assertOrder(tuples, 0, 1, 2, 3, 4, 7, 6, 8, 9); //Test descending @@ -1376,7 +1415,7 @@ public class StreamExpressionTest extends SolrCloudTestCase { tuples = getTuples(pstream); assert(tuples.size() == 8); - assertOrder(tuples, 9,8,6,4,3,2,1,0); + assertOrder(tuples, 9, 8, 6, 4, 3, 2, 1, 0); } @@ -1627,7 +1666,7 @@ public class StreamExpressionTest extends SolrCloudTestCase { stream = new LeftOuterJoinStream(expression, factory); tuples = getTuples(stream); assert(tuples.size() == 10); - assertOrder(tuples, 7,6,3,4,5,1,1,15,15,2); + assertOrder(tuples, 7, 6, 3, 4, 5, 1, 1, 15, 15, 2); // Results in both searches, no join matches expression = StreamExpressionParser.parse("leftOuterJoin(" @@ -1637,7 +1676,7 @@ public class StreamExpressionTest extends SolrCloudTestCase { stream = new LeftOuterJoinStream(expression, factory); tuples = getTuples(stream); assert(tuples.size() == 8); - assertOrder(tuples, 1,15,2,3,4,5,6,7); + assertOrder(tuples, 1, 15, 2, 3, 4, 5, 6, 7); // Differing field names expression = StreamExpressionParser.parse("leftOuterJoin(" @@ -1647,7 +1686,7 @@ public class StreamExpressionTest extends SolrCloudTestCase { stream = new LeftOuterJoinStream(expression, factory); tuples = getTuples(stream); assert(tuples.size() == 10); - assertOrder(tuples, 1,1,15,15,2,3,4,5,6,7); + assertOrder(tuples, 1, 1, 15, 15, 2, 3, 4, 5, 6, 7); } @@ -1764,7 +1803,7 @@ public class StreamExpressionTest extends SolrCloudTestCase { stream = new OuterHashJoinStream(expression, factory); tuples = getTuples(stream); assert(tuples.size() == 10); - assertOrder(tuples, 1,1,15,15,2,3,4,5,6,7); + assertOrder(tuples, 1, 1, 15, 15, 2, 3, 4, 5, 6, 7); // Basic desc expression = StreamExpressionParser.parse("outerHashJoin(" @@ -1794,7 +1833,7 @@ public class StreamExpressionTest extends SolrCloudTestCase { stream = new OuterHashJoinStream(expression, factory); tuples = getTuples(stream); assert(tuples.size() == 10); - assertOrder(tuples, 1,1,15,15,2,3,4,5,6,7); + assertOrder(tuples, 1, 1, 15, 15, 2, 3, 4, 5, 6, 7); } @Test @@ -3202,6 +3241,120 @@ public class StreamExpressionTest extends SolrCloudTestCase { CollectionAdminRequest.deleteCollection("parallelDestinationCollection1").process(cluster.getSolrClient()); } + + @Test + public void testParallelTerminatingDaemonUpdateStream() throws Exception { + + CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).process(cluster.getSolrClient()); + AbstractDistribZkTestBase.waitForRecoveriesToFinish("parallelDestinationCollection1", cluster.getSolrClient().getZkStateReader(), + false, true, TIMEOUT); + + new UpdateRequest() + .add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "0", "s_multi", "aaaa", "s_multi", "bbbb", "i_multi", "4", "i_multi", "7") + .add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "0", "s_multi", "aaaa1", "s_multi", "bbbb1", "i_multi", "44", "i_multi", "77") + .add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3", "s_multi", "aaaa2", "s_multi", "bbbb2", "i_multi", "444", "i_multi", "777") + .add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4", "s_multi", "aaaa3", "s_multi", "bbbb3", "i_multi", "4444", "i_multi", "7777") + .add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "1", "s_multi", "aaaa4", "s_multi", "bbbb4", "i_multi", "44444", "i_multi", "77777") + .commit(cluster.getSolrClient(), "collection1"); + + StreamExpression expression; + TupleStream stream; + Tuple t; + + String zkHost = cluster.getZkServer().getZkAddress(); + StreamFactory factory = new StreamFactory() + .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()) + .withCollectionZkHost("parallelDestinationCollection1", cluster.getZkServer().getZkAddress()) + .withFunctionName("topic", TopicStream.class) + .withFunctionName("update", UpdateStream.class) + .withFunctionName("parallel", ParallelStream.class) + .withFunctionName("daemon", DaemonStream.class); + + //Copy all docs to destinationCollection + String updateExpression = "daemon(update(parallelDestinationCollection1, batchSize=2, topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", partitionKeys=\"a_f\", initialCheckpoint=0, id=\"topic1\")), terminate=true, runInterval=\"1000\", id=\"test\")"; + TupleStream parallelUpdateStream = factory.constructStream("parallel(collection1, " + updateExpression + ", workers=\"2\", zkHost=\""+zkHost+"\", sort=\"batchNumber asc\")"); + List tuples = getTuples(parallelUpdateStream); + assert(tuples.size() == 2); + + + ModifiableSolrParams sParams = new ModifiableSolrParams(StreamingTest.mapParams(CommonParams.QT, "/stream", "action", "list")); + + int workersComplete = 0; + + //Daemons should terminate after the topic is completed + //Loop through all shards and wait for the daemons to be gone from the listing. + for(JettySolrRunner jetty : cluster.getJettySolrRunners()) { + INNER: + while(true) { + SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/collection1", sParams); + solrStream.open(); + Tuple tupleResponse = solrStream.read(); + if (tupleResponse.EOF) { + solrStream.close(); + ++workersComplete; + break INNER; + } else { + solrStream.close(); + Thread.sleep(1000); + } + } + } + + assertEquals(cluster.getJettySolrRunners().size(), workersComplete); + + cluster.getSolrClient().commit("parallelDestinationCollection1"); + + //Ensure that destinationCollection actually has the new docs. + expression = StreamExpressionParser.parse("search(parallelDestinationCollection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_i asc\")"); + stream = new CloudSolrStream(expression, factory); + tuples = getTuples(stream); + assertEquals(5, tuples.size()); + + Tuple tuple = tuples.get(0); + assert(tuple.getLong("id") == 0); + assert(tuple.get("a_s").equals("hello")); + assert(tuple.getLong("a_i") == 0); + assert(tuple.getDouble("a_f") == 0.0); + assertList(tuple.getStrings("s_multi"), "aaaa", "bbbb"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("4"), Long.parseLong("7")); + + tuple = tuples.get(1); + assert(tuple.getLong("id") == 1); + assert(tuple.get("a_s").equals("hello")); + assert(tuple.getLong("a_i") == 1); + assert(tuple.getDouble("a_f") == 1.0); + assertList(tuple.getStrings("s_multi"), "aaaa4", "bbbb4"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("44444"), Long.parseLong("77777")); + + tuple = tuples.get(2); + assert(tuple.getLong("id") == 2); + assert(tuple.get("a_s").equals("hello")); + assert(tuple.getLong("a_i") == 2); + assert(tuple.getDouble("a_f") == 0.0); + assertList(tuple.getStrings("s_multi"), "aaaa1", "bbbb1"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("44"), Long.parseLong("77")); + + tuple = tuples.get(3); + assert(tuple.getLong("id") == 3); + assert(tuple.get("a_s").equals("hello")); + assert(tuple.getLong("a_i") == 3); + assert(tuple.getDouble("a_f") == 3.0); + assertList(tuple.getStrings("s_multi"), "aaaa2", "bbbb2"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("444"), Long.parseLong("777")); + + tuple = tuples.get(4); + assert(tuple.getLong("id") == 4); + assert(tuple.get("a_s").equals("hello")); + assert(tuple.getLong("a_i") == 4); + assert(tuple.getDouble("a_f") == 4.0); + assertList(tuple.getStrings("s_multi"), "aaaa3", "bbbb3"); + assertList(tuple.getLongs("i_multi"), Long.parseLong("4444"), Long.parseLong("7777")); + + CollectionAdminRequest.deleteCollection("parallelDestinationCollection1").process(cluster.getSolrClient()); + } + + + //////////////////////////////////////////// @Test public void testCommitStream() throws Exception {