mirror of https://github.com/apache/lucene.git
Merge remote-tracking branch 'origin/master'
This commit is contained in:
commit
67ba19a810
|
@ -40,6 +40,11 @@ Optimizations
|
||||||
in the sets of SHOULD and FILTER clauses, or both in MUST/FILTER and MUST_NOT
|
in the sets of SHOULD and FILTER clauses, or both in MUST/FILTER and MUST_NOT
|
||||||
clauses. (Spyros Kapnissis via Adrien Grand, Uwe Schindler)
|
clauses. (Spyros Kapnissis via Adrien Grand, Uwe Schindler)
|
||||||
|
|
||||||
|
* LUCENE-7506: FastTaxonomyFacetCounts should use CPU in proportion to
|
||||||
|
the size of the intersected set of hits from the query and documents
|
||||||
|
that have a facet value, so sparse faceting works as expected
|
||||||
|
(Adrien Grand via Mike McCandless)
|
||||||
|
|
||||||
Other
|
Other
|
||||||
|
|
||||||
* LUCENE-7328: Remove LegacyNumericEncoding from GeoPointField. (Nick Knize)
|
* LUCENE-7328: Remove LegacyNumericEncoding from GeoPointField. (Nick Knize)
|
||||||
|
@ -92,6 +97,10 @@ Bug Fixes
|
||||||
* LUCENE-7493: FacetCollector.search threw an unexpected exception if
|
* LUCENE-7493: FacetCollector.search threw an unexpected exception if
|
||||||
you asked for zero hits but wanted facets (Mahesh via Mike McCandless)
|
you asked for zero hits but wanted facets (Mahesh via Mike McCandless)
|
||||||
|
|
||||||
|
* LUCENE-7505: AnalyzingInfixSuggester returned invalid results when
|
||||||
|
allTermsRequired is false and context filters are specified (Mike
|
||||||
|
McCandless)
|
||||||
|
|
||||||
Improvements
|
Improvements
|
||||||
|
|
||||||
* LUCENE-7439: FuzzyQuery now matches all terms within the specified
|
* LUCENE-7439: FuzzyQuery now matches all terms within the specified
|
||||||
|
|
|
@ -157,7 +157,7 @@ public class DrillSideways {
|
||||||
|
|
||||||
DrillSidewaysQuery dsq = new DrillSidewaysQuery(baseQuery, drillDownCollector, drillSidewaysCollectors, drillDownQueries, scoreSubDocsAtOnce());
|
DrillSidewaysQuery dsq = new DrillSidewaysQuery(baseQuery, drillDownCollector, drillSidewaysCollectors, drillDownQueries, scoreSubDocsAtOnce());
|
||||||
if (hitCollector.needsScores() == false) {
|
if (hitCollector.needsScores() == false) {
|
||||||
// this is a borrible hack in order to make sure IndexSearcher will not
|
// this is a horrible hack in order to make sure IndexSearcher will not
|
||||||
// attempt to cache the DrillSidewaysQuery
|
// attempt to cache the DrillSidewaysQuery
|
||||||
hitCollector = new FilterCollector(hitCollector) {
|
hitCollector = new FilterCollector(hitCollector) {
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -17,12 +17,14 @@
|
||||||
package org.apache.lucene.facet.taxonomy;
|
package org.apache.lucene.facet.taxonomy;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.lucene.facet.FacetsCollector.MatchingDocs;
|
import org.apache.lucene.facet.FacetsCollector.MatchingDocs;
|
||||||
import org.apache.lucene.facet.FacetsCollector;
|
import org.apache.lucene.facet.FacetsCollector;
|
||||||
import org.apache.lucene.facet.FacetsConfig;
|
import org.apache.lucene.facet.FacetsConfig;
|
||||||
import org.apache.lucene.index.BinaryDocValues;
|
import org.apache.lucene.index.BinaryDocValues;
|
||||||
|
import org.apache.lucene.search.ConjunctionDISI;
|
||||||
import org.apache.lucene.search.DocIdSetIterator;
|
import org.apache.lucene.search.DocIdSetIterator;
|
||||||
import org.apache.lucene.util.BytesRef;
|
import org.apache.lucene.util.BytesRef;
|
||||||
|
|
||||||
|
@ -55,29 +57,24 @@ public class FastTaxonomyFacetCounts extends IntTaxonomyFacets {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
DocIdSetIterator docs = hits.bits.iterator();
|
DocIdSetIterator it = ConjunctionDISI.intersectIterators(Arrays.asList(
|
||||||
|
hits.bits.iterator(), dv));
|
||||||
|
|
||||||
int doc;
|
for (int doc = it.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = it.nextDoc()) {
|
||||||
while ((doc = docs.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
|
final BytesRef bytesRef = dv.binaryValue();
|
||||||
if (dv.docID() < doc) {
|
byte[] bytes = bytesRef.bytes;
|
||||||
dv.advance(doc);
|
int end = bytesRef.offset + bytesRef.length;
|
||||||
}
|
int ord = 0;
|
||||||
if (dv.docID() == doc) {
|
int offset = bytesRef.offset;
|
||||||
final BytesRef bytesRef = dv.binaryValue();
|
int prev = 0;
|
||||||
byte[] bytes = bytesRef.bytes;
|
while (offset < end) {
|
||||||
int end = bytesRef.offset + bytesRef.length;
|
byte b = bytes[offset++];
|
||||||
int ord = 0;
|
if (b >= 0) {
|
||||||
int offset = bytesRef.offset;
|
prev = ord = ((ord << 7) | b) + prev;
|
||||||
int prev = 0;
|
++values[ord];
|
||||||
while (offset < end) {
|
ord = 0;
|
||||||
byte b = bytes[offset++];
|
} else {
|
||||||
if (b >= 0) {
|
ord = (ord << 7) | (b & 0x7F);
|
||||||
prev = ord = ((ord << 7) | b) + prev;
|
|
||||||
++values[ord];
|
|
||||||
ord = 0;
|
|
||||||
} else {
|
|
||||||
ord = (ord << 7) | (b & 0x7F);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -560,12 +560,18 @@ public class AnalyzingInfixSuggester extends Lookup implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (allMustNot) {
|
if (allMustNot) {
|
||||||
//all are MUST_NOT: add the contextQuery to the main query instead (not as sub-query)
|
// All are MUST_NOT: add the contextQuery to the main query instead (not as sub-query)
|
||||||
for (BooleanClause clause : contextQuery.clauses()) {
|
for (BooleanClause clause : contextQuery.clauses()) {
|
||||||
query.add(clause);
|
query.add(clause);
|
||||||
}
|
}
|
||||||
|
} else if (allTermsRequired == false) {
|
||||||
|
// We must carefully upgrade the query clauses to MUST:
|
||||||
|
BooleanQuery.Builder newQuery = new BooleanQuery.Builder();
|
||||||
|
newQuery.add(query.build(), BooleanClause.Occur.MUST);
|
||||||
|
newQuery.add(contextQuery, BooleanClause.Occur.MUST);
|
||||||
|
query = newQuery;
|
||||||
} else {
|
} else {
|
||||||
//Add contextQuery as sub-query
|
// Add contextQuery as sub-query
|
||||||
query.add(contextQuery, BooleanClause.Occur.MUST);
|
query.add(contextQuery, BooleanClause.Occur.MUST);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -577,7 +583,7 @@ public class AnalyzingInfixSuggester extends Lookup implements Closeable {
|
||||||
|
|
||||||
Query finalQuery = finishQuery(query, allTermsRequired);
|
Query finalQuery = finishQuery(query, allTermsRequired);
|
||||||
|
|
||||||
//System.out.println("finalQuery=" + query);
|
//System.out.println("finalQuery=" + finalQuery);
|
||||||
|
|
||||||
// Sort by weight, descending:
|
// Sort by weight, descending:
|
||||||
TopFieldCollector c = TopFieldCollector.create(SORT, num, true, false, false);
|
TopFieldCollector c = TopFieldCollector.create(SORT, num, true, false, false);
|
||||||
|
|
|
@ -1258,4 +1258,80 @@ public class AnalyzingInfixSuggesterTest extends LuceneTestCase {
|
||||||
a.close();
|
a.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testContextNotAllTermsRequired() throws Exception {
|
||||||
|
|
||||||
|
Input keys[] = new Input[] {
|
||||||
|
new Input("lend me your ear", 8, new BytesRef("foobar"), asSet("foo", "bar")),
|
||||||
|
new Input("a penny saved is a penny earned", 10, new BytesRef("foobaz"), asSet("foo", "baz"))
|
||||||
|
};
|
||||||
|
Path tempDir = createTempDir("analyzingInfixContext");
|
||||||
|
|
||||||
|
Analyzer a = new MockAnalyzer(random(), MockTokenizer.WHITESPACE, false);
|
||||||
|
AnalyzingInfixSuggester suggester = new AnalyzingInfixSuggester(newFSDirectory(tempDir), a, a, 3, false);
|
||||||
|
suggester.build(new InputArrayIterator(keys));
|
||||||
|
|
||||||
|
// No context provided, all results returned
|
||||||
|
List<LookupResult> results = suggester.lookup(TestUtil.stringToCharSequence("ear", random()), 10, false, true);
|
||||||
|
assertEquals(2, results.size());
|
||||||
|
LookupResult result = results.get(0);
|
||||||
|
assertEquals("a penny saved is a penny earned", result.key);
|
||||||
|
assertEquals("a penny saved is a penny <b>ear</b>ned", result.highlightKey);
|
||||||
|
assertEquals(10, result.value);
|
||||||
|
assertEquals(new BytesRef("foobaz"), result.payload);
|
||||||
|
assertNotNull(result.contexts);
|
||||||
|
assertEquals(2, result.contexts.size());
|
||||||
|
assertTrue(result.contexts.contains(new BytesRef("foo")));
|
||||||
|
assertTrue(result.contexts.contains(new BytesRef("baz")));
|
||||||
|
|
||||||
|
result = results.get(1);
|
||||||
|
assertEquals("lend me your ear", result.key);
|
||||||
|
assertEquals("lend me your <b>ear</b>", result.highlightKey);
|
||||||
|
assertEquals(8, result.value);
|
||||||
|
assertEquals(new BytesRef("foobar"), result.payload);
|
||||||
|
assertNotNull(result.contexts);
|
||||||
|
assertEquals(2, result.contexts.size());
|
||||||
|
assertTrue(result.contexts.contains(new BytesRef("foo")));
|
||||||
|
assertTrue(result.contexts.contains(new BytesRef("bar")));
|
||||||
|
|
||||||
|
// Both have "foo" context:
|
||||||
|
results = suggester.lookup(TestUtil.stringToCharSequence("ear", random()), asSet("foo"), 10, false, true);
|
||||||
|
assertEquals(2, results.size());
|
||||||
|
|
||||||
|
result = results.get(0);
|
||||||
|
assertEquals("a penny saved is a penny earned", result.key);
|
||||||
|
assertEquals("a penny saved is a penny <b>ear</b>ned", result.highlightKey);
|
||||||
|
assertEquals(10, result.value);
|
||||||
|
assertEquals(new BytesRef("foobaz"), result.payload);
|
||||||
|
assertNotNull(result.contexts);
|
||||||
|
assertEquals(2, result.contexts.size());
|
||||||
|
assertTrue(result.contexts.contains(new BytesRef("foo")));
|
||||||
|
assertTrue(result.contexts.contains(new BytesRef("baz")));
|
||||||
|
|
||||||
|
result = results.get(1);
|
||||||
|
assertEquals("lend me your ear", result.key);
|
||||||
|
assertEquals("lend me your <b>ear</b>", result.highlightKey);
|
||||||
|
assertEquals(8, result.value);
|
||||||
|
assertEquals(new BytesRef("foobar"), result.payload);
|
||||||
|
assertNotNull(result.contexts);
|
||||||
|
assertEquals(2, result.contexts.size());
|
||||||
|
assertTrue(result.contexts.contains(new BytesRef("foo")));
|
||||||
|
assertTrue(result.contexts.contains(new BytesRef("bar")));
|
||||||
|
|
||||||
|
// Only one has "foo" context and len
|
||||||
|
results = suggester.lookup(TestUtil.stringToCharSequence("len", random()), asSet("foo"), 10, false, true);
|
||||||
|
assertEquals(1, results.size());
|
||||||
|
|
||||||
|
result = results.get(0);
|
||||||
|
assertEquals("lend me your ear", result.key);
|
||||||
|
assertEquals("<b>len</b>d me your ear", result.highlightKey);
|
||||||
|
assertEquals(8, result.value);
|
||||||
|
assertEquals(new BytesRef("foobar"), result.payload);
|
||||||
|
assertNotNull(result.contexts);
|
||||||
|
assertEquals(2, result.contexts.size());
|
||||||
|
assertTrue(result.contexts.contains(new BytesRef("foo")));
|
||||||
|
assertTrue(result.contexts.contains(new BytesRef("bar")));
|
||||||
|
|
||||||
|
suggester.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.solr.handler;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -97,7 +98,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
|
||||||
private StreamFactory streamFactory = new StreamFactory();
|
private StreamFactory streamFactory = new StreamFactory();
|
||||||
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
private String coreName;
|
private String coreName;
|
||||||
private Map<String, DaemonStream> daemons = new HashMap<>();
|
private Map<String, DaemonStream> daemons = Collections.synchronizedMap(new HashMap());
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PermissionNameProvider.Name getPermissionName(AuthorizationContext request) {
|
public PermissionNameProvider.Name getPermissionName(AuthorizationContext request) {
|
||||||
|
@ -245,6 +246,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
|
||||||
if(daemons.containsKey(daemonStream.getId())) {
|
if(daemons.containsKey(daemonStream.getId())) {
|
||||||
daemons.remove(daemonStream.getId()).close();
|
daemons.remove(daemonStream.getId()).close();
|
||||||
}
|
}
|
||||||
|
daemonStream.setDaemons(daemons);
|
||||||
daemonStream.open(); //This will start the deamonStream
|
daemonStream.open(); //This will start the deamonStream
|
||||||
daemons.put(daemonStream.getId(), daemonStream);
|
daemons.put(daemonStream.getId(), daemonStream);
|
||||||
rsp.add("result-set", new DaemonResponseStream("Deamon:"+daemonStream.getId()+" started on "+coreName));
|
rsp.add("result-set", new DaemonResponseStream("Deamon:"+daemonStream.getId()+" started on "+coreName));
|
||||||
|
|
|
@ -52,6 +52,8 @@ public class DaemonStream extends TupleStream implements Expressible {
|
||||||
private Exception exception;
|
private Exception exception;
|
||||||
private long runInterval;
|
private long runInterval;
|
||||||
private String id;
|
private String id;
|
||||||
|
private Map<String, DaemonStream> daemons;
|
||||||
|
private boolean terminate;
|
||||||
private boolean closed = false;
|
private boolean closed = false;
|
||||||
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
|
@ -64,10 +66,13 @@ public class DaemonStream extends TupleStream implements Expressible {
|
||||||
StreamExpressionNamedParameter idExpression = factory.getNamedOperand(expression, "id");
|
StreamExpressionNamedParameter idExpression = factory.getNamedOperand(expression, "id");
|
||||||
StreamExpressionNamedParameter runExpression = factory.getNamedOperand(expression, "runInterval");
|
StreamExpressionNamedParameter runExpression = factory.getNamedOperand(expression, "runInterval");
|
||||||
StreamExpressionNamedParameter queueExpression = factory.getNamedOperand(expression, "queueSize");
|
StreamExpressionNamedParameter queueExpression = factory.getNamedOperand(expression, "queueSize");
|
||||||
|
StreamExpressionNamedParameter terminateExpression = factory.getNamedOperand(expression, "terminate");
|
||||||
|
|
||||||
|
|
||||||
String id = null;
|
String id = null;
|
||||||
long runInterval = 0L;
|
long runInterval = 0L;
|
||||||
int queueSize = 0;
|
int queueSize = 0;
|
||||||
|
boolean terminate = false;
|
||||||
|
|
||||||
if(idExpression == null) {
|
if(idExpression == null) {
|
||||||
throw new IOException("Invalid expression id parameter expected");
|
throw new IOException("Invalid expression id parameter expected");
|
||||||
|
@ -82,24 +87,26 @@ public class DaemonStream extends TupleStream implements Expressible {
|
||||||
}
|
}
|
||||||
|
|
||||||
if(queueExpression != null) {
|
if(queueExpression != null) {
|
||||||
queueSize= Integer.parseInt(((StreamExpressionValue)queueExpression.getParameter()).getValue());
|
queueSize= Integer.parseInt(((StreamExpressionValue) queueExpression.getParameter()).getValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
// validate expression contains only what we want.
|
if(terminateExpression != null) {
|
||||||
if(expression.getParameters().size() != streamExpressions.size() + 2 &&
|
terminate = Boolean.parseBoolean(((StreamExpressionValue) terminateExpression.getParameter()).getValue());
|
||||||
expression.getParameters().size() != streamExpressions.size() + 3) {
|
|
||||||
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if(1 != streamExpressions.size()){
|
if(1 != streamExpressions.size()){
|
||||||
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
|
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
|
||||||
}
|
}
|
||||||
|
|
||||||
init(tupleStream, id, runInterval, queueSize);
|
init(tupleStream, id, runInterval, queueSize, terminate);
|
||||||
|
}
|
||||||
|
|
||||||
|
public DaemonStream(TupleStream tupleStream, String id, long runInterval, int queueSize, boolean terminate) {
|
||||||
|
init(tupleStream, id, runInterval, queueSize, terminate);
|
||||||
}
|
}
|
||||||
|
|
||||||
public DaemonStream(TupleStream tupleStream, String id, long runInterval, int queueSize) {
|
public DaemonStream(TupleStream tupleStream, String id, long runInterval, int queueSize) {
|
||||||
init(tupleStream, id, runInterval, queueSize);
|
this(tupleStream, id, runInterval, queueSize, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -126,6 +133,7 @@ public class DaemonStream extends TupleStream implements Expressible {
|
||||||
expression.addParameter(new StreamExpressionNamedParameter("id", id));
|
expression.addParameter(new StreamExpressionNamedParameter("id", id));
|
||||||
expression.addParameter(new StreamExpressionNamedParameter("runInterval", Long.toString(runInterval)));
|
expression.addParameter(new StreamExpressionNamedParameter("runInterval", Long.toString(runInterval)));
|
||||||
expression.addParameter(new StreamExpressionNamedParameter("queueSize", Integer.toString(queueSize)));
|
expression.addParameter(new StreamExpressionNamedParameter("queueSize", Integer.toString(queueSize)));
|
||||||
|
expression.addParameter(new StreamExpressionNamedParameter("terminate", Boolean.toString(terminate)));
|
||||||
|
|
||||||
return expression;
|
return expression;
|
||||||
}
|
}
|
||||||
|
@ -148,10 +156,16 @@ public class DaemonStream extends TupleStream implements Expressible {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void init(TupleStream tupleStream, String id, long runInterval, int queueSize) {
|
public void init(TupleStream tupleStream, String id, long runInterval, int queueSize) {
|
||||||
|
init(tupleStream, id, runInterval, queueSize, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void init(TupleStream tupleStream, String id, long runInterval, int queueSize, boolean terminate) {
|
||||||
this.tupleStream = tupleStream;
|
this.tupleStream = tupleStream;
|
||||||
this.id = id;
|
this.id = id;
|
||||||
this.runInterval = runInterval;
|
this.runInterval = runInterval;
|
||||||
this.queueSize = queueSize;
|
this.queueSize = queueSize;
|
||||||
|
this.terminate = terminate;
|
||||||
|
|
||||||
if(queueSize > 0) {
|
if(queueSize > 0) {
|
||||||
queue = new ArrayBlockingQueue(queueSize);
|
queue = new ArrayBlockingQueue(queueSize);
|
||||||
eatTuples = false;
|
eatTuples = false;
|
||||||
|
@ -228,6 +242,10 @@ public class DaemonStream extends TupleStream implements Expressible {
|
||||||
return tuple;
|
return tuple;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setDaemons(Map<String, DaemonStream> daemons) {
|
||||||
|
this.daemons = daemons;
|
||||||
|
}
|
||||||
|
|
||||||
private synchronized void incrementIterations() {
|
private synchronized void incrementIterations() {
|
||||||
++iterations;
|
++iterations;
|
||||||
}
|
}
|
||||||
|
@ -279,6 +297,18 @@ public class DaemonStream extends TupleStream implements Expressible {
|
||||||
errors = 0; // Reset errors on successful run.
|
errors = 0; // Reset errors on successful run.
|
||||||
if (tuple.fields.containsKey("sleepMillis")) {
|
if (tuple.fields.containsKey("sleepMillis")) {
|
||||||
this.sleepMillis = tuple.getLong("sleepMillis");
|
this.sleepMillis = tuple.getLong("sleepMillis");
|
||||||
|
|
||||||
|
if(terminate && sleepMillis > 0) {
|
||||||
|
//TopicStream provides sleepMillis > 0 if the last run had no Tuples.
|
||||||
|
//This means the topic queue is empty. Time to terminate.
|
||||||
|
//Remove ourselves from the daemons map.
|
||||||
|
if(daemons != null) {
|
||||||
|
daemons.remove(id);
|
||||||
|
}
|
||||||
|
//Break out of the thread loop and end the run.
|
||||||
|
break OUTER;
|
||||||
|
}
|
||||||
|
|
||||||
this.runInterval = -1;
|
this.runInterval = -1;
|
||||||
}
|
}
|
||||||
break INNER;
|
break INNER;
|
||||||
|
|
|
@ -533,24 +533,24 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
||||||
|
|
||||||
// Basic test desc
|
// Basic test desc
|
||||||
expression = StreamExpressionParser.parse("top("
|
expression = StreamExpressionParser.parse("top("
|
||||||
+ "n=2,"
|
+ "n=2,"
|
||||||
+ "unique("
|
+ "unique("
|
||||||
+ "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
|
+ "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f desc\"),"
|
||||||
+ "over=\"a_f\"),"
|
+ "over=\"a_f\"),"
|
||||||
+ "sort=\"a_f desc\")");
|
+ "sort=\"a_f desc\")");
|
||||||
stream = new RankStream(expression, factory);
|
stream = new RankStream(expression, factory);
|
||||||
tuples = getTuples(stream);
|
tuples = getTuples(stream);
|
||||||
|
|
||||||
assert(tuples.size() == 2);
|
assert(tuples.size() == 2);
|
||||||
assertOrder(tuples, 4,3);
|
assertOrder(tuples, 4, 3);
|
||||||
|
|
||||||
// full factory
|
// full factory
|
||||||
stream = factory.constructStream("top("
|
stream = factory.constructStream("top("
|
||||||
+ "n=4,"
|
+ "n=4,"
|
||||||
+ "unique("
|
+ "unique("
|
||||||
+ "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
|
+ "search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"),"
|
||||||
+ "over=\"a_f\"),"
|
+ "over=\"a_f\"),"
|
||||||
+ "sort=\"a_f asc\")");
|
+ "sort=\"a_f asc\")");
|
||||||
tuples = getTuples(stream);
|
tuples = getTuples(stream);
|
||||||
|
|
||||||
assert(tuples.size() == 4);
|
assert(tuples.size() == 4);
|
||||||
|
@ -827,7 +827,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
||||||
.withFunctionName("parallel", ParallelStream.class)
|
.withFunctionName("parallel", ParallelStream.class)
|
||||||
.withFunctionName("fetch", FetchStream.class);
|
.withFunctionName("fetch", FetchStream.class);
|
||||||
|
|
||||||
stream = factory.constructStream("parallel("+COLLECTION+", workers=2, sort=\"a_f asc\", fetch("+COLLECTION+", search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"2\", fl=\"subject\"))");
|
stream = factory.constructStream("parallel(" + COLLECTION + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTION + ", search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"2\", fl=\"subject\"))");
|
||||||
tuples = getTuples(stream);
|
tuples = getTuples(stream);
|
||||||
|
|
||||||
assert(tuples.size() == 10);
|
assert(tuples.size() == 10);
|
||||||
|
@ -853,7 +853,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
||||||
assertTrue("blah blah blah 9".equals(t.getString("subject")));
|
assertTrue("blah blah blah 9".equals(t.getString("subject")));
|
||||||
|
|
||||||
|
|
||||||
stream = factory.constructStream("parallel("+COLLECTION+", workers=2, sort=\"a_f asc\", fetch("+COLLECTION+", search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\"))");
|
stream = factory.constructStream("parallel(" + COLLECTION + ", workers=2, sort=\"a_f asc\", fetch(" + COLLECTION + ", search(" + COLLECTION + ", q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc\", partitionKeys=\"id\"), on=\"id=a_i\", batchSize=\"3\", fl=\"subject\"))");
|
||||||
tuples = getTuples(stream);
|
tuples = getTuples(stream);
|
||||||
|
|
||||||
assert(tuples.size() == 10);
|
assert(tuples.size() == 10);
|
||||||
|
@ -1003,6 +1003,45 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTerminatingDaemonStream() throws Exception {
|
||||||
|
|
||||||
|
new UpdateRequest()
|
||||||
|
.add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "1")
|
||||||
|
.add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "2")
|
||||||
|
.add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3")
|
||||||
|
.add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4")
|
||||||
|
.add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "5")
|
||||||
|
.add(id, "5", "a_s", "hello", "a_i", "10", "a_f", "6")
|
||||||
|
.add(id, "6", "a_s", "hello", "a_i", "11", "a_f", "7")
|
||||||
|
.add(id, "7", "a_s", "hello", "a_i", "12", "a_f", "8")
|
||||||
|
.add(id, "8", "a_s", "hello", "a_i", "13", "a_f", "9")
|
||||||
|
.add(id, "9", "a_s", "hello", "a_i", "14", "a_f", "10")
|
||||||
|
.commit(cluster.getSolrClient(), COLLECTION);
|
||||||
|
|
||||||
|
StreamFactory factory = new StreamFactory()
|
||||||
|
.withCollectionZkHost(COLLECTION, cluster.getZkServer().getZkAddress())
|
||||||
|
.withFunctionName("topic", TopicStream.class)
|
||||||
|
.withFunctionName("daemon", DaemonStream.class);
|
||||||
|
|
||||||
|
StreamExpression expression;
|
||||||
|
DaemonStream daemonStream;
|
||||||
|
|
||||||
|
SolrClientCache cache = new SolrClientCache();
|
||||||
|
StreamContext context = new StreamContext();
|
||||||
|
context.setSolrClientCache(cache);
|
||||||
|
expression = StreamExpressionParser.parse("daemon(topic("+COLLECTION+","+COLLECTION+", q=\"a_s:hello\", initialCheckpoint=0, id=\"topic1\", rows=2, fl=\"id\""
|
||||||
|
+ "), id=test, runInterval=1000, terminate=true, queueSize=50)");
|
||||||
|
daemonStream = (DaemonStream)factory.constructStream(expression);
|
||||||
|
daemonStream.setStreamContext(context);
|
||||||
|
|
||||||
|
List<Tuple> tuples = getTuples(daemonStream);
|
||||||
|
assertTrue(tuples.size() == 10);
|
||||||
|
cache.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRollupStream() throws Exception {
|
public void testRollupStream() throws Exception {
|
||||||
|
|
||||||
|
@ -1367,7 +1406,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
||||||
|
|
||||||
|
|
||||||
assert(tuples.size() == 9);
|
assert(tuples.size() == 9);
|
||||||
assertOrder(tuples, 0,1,2,3,4,7,6,8,9);
|
assertOrder(tuples, 0, 1, 2, 3, 4, 7, 6, 8, 9);
|
||||||
|
|
||||||
//Test descending
|
//Test descending
|
||||||
|
|
||||||
|
@ -1376,7 +1415,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
||||||
tuples = getTuples(pstream);
|
tuples = getTuples(pstream);
|
||||||
|
|
||||||
assert(tuples.size() == 8);
|
assert(tuples.size() == 8);
|
||||||
assertOrder(tuples, 9,8,6,4,3,2,1,0);
|
assertOrder(tuples, 9, 8, 6, 4, 3, 2, 1, 0);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1627,7 +1666,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
||||||
stream = new LeftOuterJoinStream(expression, factory);
|
stream = new LeftOuterJoinStream(expression, factory);
|
||||||
tuples = getTuples(stream);
|
tuples = getTuples(stream);
|
||||||
assert(tuples.size() == 10);
|
assert(tuples.size() == 10);
|
||||||
assertOrder(tuples, 7,6,3,4,5,1,1,15,15,2);
|
assertOrder(tuples, 7, 6, 3, 4, 5, 1, 1, 15, 15, 2);
|
||||||
|
|
||||||
// Results in both searches, no join matches
|
// Results in both searches, no join matches
|
||||||
expression = StreamExpressionParser.parse("leftOuterJoin("
|
expression = StreamExpressionParser.parse("leftOuterJoin("
|
||||||
|
@ -1637,7 +1676,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
||||||
stream = new LeftOuterJoinStream(expression, factory);
|
stream = new LeftOuterJoinStream(expression, factory);
|
||||||
tuples = getTuples(stream);
|
tuples = getTuples(stream);
|
||||||
assert(tuples.size() == 8);
|
assert(tuples.size() == 8);
|
||||||
assertOrder(tuples, 1,15,2,3,4,5,6,7);
|
assertOrder(tuples, 1, 15, 2, 3, 4, 5, 6, 7);
|
||||||
|
|
||||||
// Differing field names
|
// Differing field names
|
||||||
expression = StreamExpressionParser.parse("leftOuterJoin("
|
expression = StreamExpressionParser.parse("leftOuterJoin("
|
||||||
|
@ -1647,7 +1686,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
||||||
stream = new LeftOuterJoinStream(expression, factory);
|
stream = new LeftOuterJoinStream(expression, factory);
|
||||||
tuples = getTuples(stream);
|
tuples = getTuples(stream);
|
||||||
assert(tuples.size() == 10);
|
assert(tuples.size() == 10);
|
||||||
assertOrder(tuples, 1,1,15,15,2,3,4,5,6,7);
|
assertOrder(tuples, 1, 1, 15, 15, 2, 3, 4, 5, 6, 7);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1764,7 +1803,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
||||||
stream = new OuterHashJoinStream(expression, factory);
|
stream = new OuterHashJoinStream(expression, factory);
|
||||||
tuples = getTuples(stream);
|
tuples = getTuples(stream);
|
||||||
assert(tuples.size() == 10);
|
assert(tuples.size() == 10);
|
||||||
assertOrder(tuples, 1,1,15,15,2,3,4,5,6,7);
|
assertOrder(tuples, 1, 1, 15, 15, 2, 3, 4, 5, 6, 7);
|
||||||
|
|
||||||
// Basic desc
|
// Basic desc
|
||||||
expression = StreamExpressionParser.parse("outerHashJoin("
|
expression = StreamExpressionParser.parse("outerHashJoin("
|
||||||
|
@ -1794,7 +1833,7 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
||||||
stream = new OuterHashJoinStream(expression, factory);
|
stream = new OuterHashJoinStream(expression, factory);
|
||||||
tuples = getTuples(stream);
|
tuples = getTuples(stream);
|
||||||
assert(tuples.size() == 10);
|
assert(tuples.size() == 10);
|
||||||
assertOrder(tuples, 1,1,15,15,2,3,4,5,6,7);
|
assertOrder(tuples, 1, 1, 15, 15, 2, 3, 4, 5, 6, 7);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -3202,6 +3241,120 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
||||||
CollectionAdminRequest.deleteCollection("parallelDestinationCollection1").process(cluster.getSolrClient());
|
CollectionAdminRequest.deleteCollection("parallelDestinationCollection1").process(cluster.getSolrClient());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testParallelTerminatingDaemonUpdateStream() throws Exception {
|
||||||
|
|
||||||
|
CollectionAdminRequest.createCollection("parallelDestinationCollection1", "conf", 2, 1).process(cluster.getSolrClient());
|
||||||
|
AbstractDistribZkTestBase.waitForRecoveriesToFinish("parallelDestinationCollection1", cluster.getSolrClient().getZkStateReader(),
|
||||||
|
false, true, TIMEOUT);
|
||||||
|
|
||||||
|
new UpdateRequest()
|
||||||
|
.add(id, "0", "a_s", "hello", "a_i", "0", "a_f", "0", "s_multi", "aaaa", "s_multi", "bbbb", "i_multi", "4", "i_multi", "7")
|
||||||
|
.add(id, "2", "a_s", "hello", "a_i", "2", "a_f", "0", "s_multi", "aaaa1", "s_multi", "bbbb1", "i_multi", "44", "i_multi", "77")
|
||||||
|
.add(id, "3", "a_s", "hello", "a_i", "3", "a_f", "3", "s_multi", "aaaa2", "s_multi", "bbbb2", "i_multi", "444", "i_multi", "777")
|
||||||
|
.add(id, "4", "a_s", "hello", "a_i", "4", "a_f", "4", "s_multi", "aaaa3", "s_multi", "bbbb3", "i_multi", "4444", "i_multi", "7777")
|
||||||
|
.add(id, "1", "a_s", "hello", "a_i", "1", "a_f", "1", "s_multi", "aaaa4", "s_multi", "bbbb4", "i_multi", "44444", "i_multi", "77777")
|
||||||
|
.commit(cluster.getSolrClient(), "collection1");
|
||||||
|
|
||||||
|
StreamExpression expression;
|
||||||
|
TupleStream stream;
|
||||||
|
Tuple t;
|
||||||
|
|
||||||
|
String zkHost = cluster.getZkServer().getZkAddress();
|
||||||
|
StreamFactory factory = new StreamFactory()
|
||||||
|
.withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
|
||||||
|
.withCollectionZkHost("parallelDestinationCollection1", cluster.getZkServer().getZkAddress())
|
||||||
|
.withFunctionName("topic", TopicStream.class)
|
||||||
|
.withFunctionName("update", UpdateStream.class)
|
||||||
|
.withFunctionName("parallel", ParallelStream.class)
|
||||||
|
.withFunctionName("daemon", DaemonStream.class);
|
||||||
|
|
||||||
|
//Copy all docs to destinationCollection
|
||||||
|
String updateExpression = "daemon(update(parallelDestinationCollection1, batchSize=2, topic(collection1, collection1, q=\"a_s:hello\", fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", partitionKeys=\"a_f\", initialCheckpoint=0, id=\"topic1\")), terminate=true, runInterval=\"1000\", id=\"test\")";
|
||||||
|
TupleStream parallelUpdateStream = factory.constructStream("parallel(collection1, " + updateExpression + ", workers=\"2\", zkHost=\""+zkHost+"\", sort=\"batchNumber asc\")");
|
||||||
|
List<Tuple> tuples = getTuples(parallelUpdateStream);
|
||||||
|
assert(tuples.size() == 2);
|
||||||
|
|
||||||
|
|
||||||
|
ModifiableSolrParams sParams = new ModifiableSolrParams(StreamingTest.mapParams(CommonParams.QT, "/stream", "action", "list"));
|
||||||
|
|
||||||
|
int workersComplete = 0;
|
||||||
|
|
||||||
|
//Daemons should terminate after the topic is completed
|
||||||
|
//Loop through all shards and wait for the daemons to be gone from the listing.
|
||||||
|
for(JettySolrRunner jetty : cluster.getJettySolrRunners()) {
|
||||||
|
INNER:
|
||||||
|
while(true) {
|
||||||
|
SolrStream solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/collection1", sParams);
|
||||||
|
solrStream.open();
|
||||||
|
Tuple tupleResponse = solrStream.read();
|
||||||
|
if (tupleResponse.EOF) {
|
||||||
|
solrStream.close();
|
||||||
|
++workersComplete;
|
||||||
|
break INNER;
|
||||||
|
} else {
|
||||||
|
solrStream.close();
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(cluster.getJettySolrRunners().size(), workersComplete);
|
||||||
|
|
||||||
|
cluster.getSolrClient().commit("parallelDestinationCollection1");
|
||||||
|
|
||||||
|
//Ensure that destinationCollection actually has the new docs.
|
||||||
|
expression = StreamExpressionParser.parse("search(parallelDestinationCollection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_i asc\")");
|
||||||
|
stream = new CloudSolrStream(expression, factory);
|
||||||
|
tuples = getTuples(stream);
|
||||||
|
assertEquals(5, tuples.size());
|
||||||
|
|
||||||
|
Tuple tuple = tuples.get(0);
|
||||||
|
assert(tuple.getLong("id") == 0);
|
||||||
|
assert(tuple.get("a_s").equals("hello"));
|
||||||
|
assert(tuple.getLong("a_i") == 0);
|
||||||
|
assert(tuple.getDouble("a_f") == 0.0);
|
||||||
|
assertList(tuple.getStrings("s_multi"), "aaaa", "bbbb");
|
||||||
|
assertList(tuple.getLongs("i_multi"), Long.parseLong("4"), Long.parseLong("7"));
|
||||||
|
|
||||||
|
tuple = tuples.get(1);
|
||||||
|
assert(tuple.getLong("id") == 1);
|
||||||
|
assert(tuple.get("a_s").equals("hello"));
|
||||||
|
assert(tuple.getLong("a_i") == 1);
|
||||||
|
assert(tuple.getDouble("a_f") == 1.0);
|
||||||
|
assertList(tuple.getStrings("s_multi"), "aaaa4", "bbbb4");
|
||||||
|
assertList(tuple.getLongs("i_multi"), Long.parseLong("44444"), Long.parseLong("77777"));
|
||||||
|
|
||||||
|
tuple = tuples.get(2);
|
||||||
|
assert(tuple.getLong("id") == 2);
|
||||||
|
assert(tuple.get("a_s").equals("hello"));
|
||||||
|
assert(tuple.getLong("a_i") == 2);
|
||||||
|
assert(tuple.getDouble("a_f") == 0.0);
|
||||||
|
assertList(tuple.getStrings("s_multi"), "aaaa1", "bbbb1");
|
||||||
|
assertList(tuple.getLongs("i_multi"), Long.parseLong("44"), Long.parseLong("77"));
|
||||||
|
|
||||||
|
tuple = tuples.get(3);
|
||||||
|
assert(tuple.getLong("id") == 3);
|
||||||
|
assert(tuple.get("a_s").equals("hello"));
|
||||||
|
assert(tuple.getLong("a_i") == 3);
|
||||||
|
assert(tuple.getDouble("a_f") == 3.0);
|
||||||
|
assertList(tuple.getStrings("s_multi"), "aaaa2", "bbbb2");
|
||||||
|
assertList(tuple.getLongs("i_multi"), Long.parseLong("444"), Long.parseLong("777"));
|
||||||
|
|
||||||
|
tuple = tuples.get(4);
|
||||||
|
assert(tuple.getLong("id") == 4);
|
||||||
|
assert(tuple.get("a_s").equals("hello"));
|
||||||
|
assert(tuple.getLong("a_i") == 4);
|
||||||
|
assert(tuple.getDouble("a_f") == 4.0);
|
||||||
|
assertList(tuple.getStrings("s_multi"), "aaaa3", "bbbb3");
|
||||||
|
assertList(tuple.getLongs("i_multi"), Long.parseLong("4444"), Long.parseLong("7777"));
|
||||||
|
|
||||||
|
CollectionAdminRequest.deleteCollection("parallelDestinationCollection1").process(cluster.getSolrClient());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
////////////////////////////////////////////
|
////////////////////////////////////////////
|
||||||
@Test
|
@Test
|
||||||
public void testCommitStream() throws Exception {
|
public void testCommitStream() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue