SOLR-8550: Add asynchronous DaemonStreams to the Streaming API

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1726291 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Joel Bernstein 2016-01-22 19:07:15 +00:00
parent f4cdb556e4
commit 077e78f417
6 changed files with 735 additions and 21 deletions

View File

@ -140,6 +140,8 @@ New Features
* SOLR-8556: Add ConcatOperation to be used with the SelectStream (Joel Bernstein, Dennis Gove) * SOLR-8556: Add ConcatOperation to be used with the SelectStream (Joel Bernstein, Dennis Gove)
* SOLR-8550: Add asynchronous DaemonStreams to the Streaming API (Joel Bernstein)
Bug Fixes Bug Fixes
---------------------- ----------------------
* SOLR-8386: Add field option in the new admin UI schema page loads up even when no schemaFactory has been * SOLR-8386: Add field option in the new admin UI schema page loads up even when no schemaFactory has been

View File

@ -19,7 +19,9 @@ package org.apache.solr.handler;
import java.io.IOException; import java.io.IOException;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
@ -31,26 +33,7 @@ import org.apache.solr.client.solrj.io.ops.ConcatOperation;
import org.apache.solr.client.solrj.io.ops.DistinctOperation; import org.apache.solr.client.solrj.io.ops.DistinctOperation;
import org.apache.solr.client.solrj.io.ops.GroupOperation; import org.apache.solr.client.solrj.io.ops.GroupOperation;
import org.apache.solr.client.solrj.io.ops.ReplaceOperation; import org.apache.solr.client.solrj.io.ops.ReplaceOperation;
import org.apache.solr.client.solrj.io.stream.CloudSolrStream; import org.apache.solr.client.solrj.io.stream.*;
import org.apache.solr.client.solrj.io.stream.ComplementStream;
import org.apache.solr.client.solrj.io.stream.ExceptionStream;
import org.apache.solr.client.solrj.io.stream.FacetStream;
import org.apache.solr.client.solrj.io.stream.HashJoinStream;
import org.apache.solr.client.solrj.io.stream.InnerJoinStream;
import org.apache.solr.client.solrj.io.stream.IntersectStream;
import org.apache.solr.client.solrj.io.stream.JDBCStream;
import org.apache.solr.client.solrj.io.stream.LeftOuterJoinStream;
import org.apache.solr.client.solrj.io.stream.MergeStream;
import org.apache.solr.client.solrj.io.stream.OuterHashJoinStream;
import org.apache.solr.client.solrj.io.stream.ParallelStream;
import org.apache.solr.client.solrj.io.stream.RankStream;
import org.apache.solr.client.solrj.io.stream.ReducerStream;
import org.apache.solr.client.solrj.io.stream.RollupStream;
import org.apache.solr.client.solrj.io.stream.StatsStream;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.UniqueStream;
import org.apache.solr.client.solrj.io.stream.UpdateStream;
import org.apache.solr.client.solrj.io.stream.expr.Expressible; import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric; import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
@ -78,6 +61,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
private StreamFactory streamFactory = new StreamFactory(); private StreamFactory streamFactory = new StreamFactory();
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String coreName; private String coreName;
private Map<String, DaemonStream> daemons = new HashMap();
public void inform(SolrCore core) { public void inform(SolrCore core) {
@ -123,6 +107,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
.withFunctionName("jdbc", JDBCStream.class) .withFunctionName("jdbc", JDBCStream.class)
.withFunctionName("intersect", IntersectStream.class) .withFunctionName("intersect", IntersectStream.class)
.withFunctionName("complement", ComplementStream.class) .withFunctionName("complement", ComplementStream.class)
.withFunctionName("daemon", DaemonStream.class)
// metrics // metrics
.withFunctionName("min", MinMetric.class) .withFunctionName("min", MinMetric.class)
@ -167,6 +152,12 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
SolrParams params = req.getParams(); SolrParams params = req.getParams();
params = adjustParams(params); params = adjustParams(params);
req.setParams(params); req.setParams(params);
if(params.get("action") != null) {
handleAdmin(req, rsp, params);
return;
}
TupleStream tupleStream = null; TupleStream tupleStream = null;
try { try {
@ -187,7 +178,46 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
context.setSolrClientCache(clientCache); context.setSolrClientCache(clientCache);
context.put("core", this.coreName); context.put("core", this.coreName);
tupleStream.setStreamContext(context); tupleStream.setStreamContext(context);
rsp.add("result-set", new TimerStream(new ExceptionStream(tupleStream))); if(tupleStream instanceof DaemonStream) {
DaemonStream daemonStream = (DaemonStream)tupleStream;
if(daemons.containsKey(daemonStream.getId())) {
daemons.remove(daemonStream.getId()).close();
}
daemonStream.open(); //This will start the deamonStream
daemons.put(daemonStream.getId(), daemonStream);
rsp.add("result-set", new DaemonResponseStream("Deamon:"+daemonStream.getId()+" started on "+coreName));
} else {
rsp.add("result-set", new TimerStream(new ExceptionStream(tupleStream)));
}
}
private void handleAdmin(SolrQueryRequest req, SolrQueryResponse rsp, SolrParams params) {
String action = params.get("action");
if("stop".equalsIgnoreCase(action)) {
String id = params.get("id");
DaemonStream d = daemons.get(id);
if(d != null) {
d.close();
rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " stopped on " + coreName));
} else {
rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " not found on " + coreName));
}
} else if("start".equalsIgnoreCase(action)) {
String id = params.get("id");
DaemonStream d = daemons.get(id);
d.open();
rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " started on " + coreName));
} else if("list".equalsIgnoreCase(action)) {
Collection<DaemonStream> vals = daemons.values();
rsp.add("result-set", new DaemonCollectionStream(vals));
} else if("kill".equalsIgnoreCase(action)) {
String id = params.get("id");
DaemonStream d = daemons.remove(id);
if (d != null) {
d.close();
}
rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " killed on " + coreName));
}
} }
private SolrParams adjustParams(SolrParams params) { private SolrParams adjustParams(SolrParams params) {
@ -238,6 +268,78 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware {
} }
} }
public static class DaemonCollectionStream extends TupleStream {
private Iterator<DaemonStream> it;
public DaemonCollectionStream(Collection<DaemonStream> col) {
this.it = col.iterator();
}
public StreamComparator getStreamSort() {
return null;
}
public void close() {
}
public void open() {
}
public void setStreamContext(StreamContext context) {
}
public List<TupleStream> children() {
return null;
}
public Tuple read() {
if(it.hasNext()) {
return it.next().getInfo();
} else {
Map m = new HashMap();
m.put("EOF", true);
return new Tuple(m);
}
}
}
public static class DaemonResponseStream extends TupleStream {
private String message;
private boolean sendEOF = false;
public DaemonResponseStream(String message) {
this.message = message;
}
public StreamComparator getStreamSort() {
return null;
}
public void close() {
}
public void open() {
}
public void setStreamContext(StreamContext context) {
}
public List<TupleStream> children() {
return null;
}
public Tuple read() {
if (sendEOF) {
Map m = new HashMap();
m.put("EOF", true);
return new Tuple(m);
} else {
sendEOF = true;
Map m = new HashMap();
m.put("DaemonOp",message);
return new Tuple(m);
}
}
}
public static class TimerStream extends TupleStream { public static class TimerStream extends TupleStream {
private long begin; private long begin;

View File

@ -0,0 +1,292 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.lang.invoke.MethodHandles;
import java.util.Locale;
import java.util.Map;
import java.util.HashMap;
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DaemonStream extends TupleStream implements Expressible {
private TupleStream tupleStream;
private StreamRunner streamRunner;
private ArrayBlockingQueue<Tuple> queue;
private int queueSize;
private boolean eatTuples;
private long iterations;
private long startTime;
private long stopTime;
private Exception exception;
private long runInterval;
private String id;
private boolean closed = false;
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public DaemonStream(StreamExpression expression, StreamFactory factory) throws IOException{
List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
TupleStream tupleStream = factory.constructStream(streamExpressions.get(0));
StreamExpressionNamedParameter idExpression = factory.getNamedOperand(expression, "id");
StreamExpressionNamedParameter runExpression = factory.getNamedOperand(expression, "runInterval");
StreamExpressionNamedParameter queueExpression = factory.getNamedOperand(expression, "queueSize");
String id = null;
long runInterval = 0L;
int queueSize = 0;
if(idExpression == null) {
throw new IOException("Invalid expression id parameter expected");
} else {
id = ((StreamExpressionValue) idExpression.getParameter()).getValue();
}
if(runExpression == null) {
throw new IOException("Invalid expression runInterval parameter expected");
} else {
runInterval = Long.parseLong(((StreamExpressionValue) runExpression.getParameter()).getValue());
}
if(queueExpression != null) {
queueSize= Integer.parseInt(((StreamExpressionValue)queueExpression.getParameter()).getValue());
}
// validate expression contains only what we want.
if(expression.getParameters().size() != streamExpressions.size() + 2 &&
expression.getParameters().size() != streamExpressions.size() + 3) {
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found", expression));
}
if(1 != streamExpressions.size()){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single stream but found %d",expression, streamExpressions.size()));
}
init(tupleStream, id, runInterval, queueSize);
}
public DaemonStream(TupleStream tupleStream, String id, long runInterval, int queueSize) {
init(tupleStream, id, runInterval, queueSize);
}
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// streams
if(tupleStream instanceof Expressible){
expression.addParameter(((Expressible)tupleStream).toExpression(factory));
} else {
throw new IOException("This UniqueStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
expression.addParameter(new StreamExpressionNamedParameter("id", id));
expression.addParameter(new StreamExpressionNamedParameter("runInterval", Long.toString(runInterval)));
expression.addParameter(new StreamExpressionNamedParameter("queueSize", Integer.toString(queueSize)));
return expression;
}
public int remainingCapacity() {
return this.queue.remainingCapacity();
}
public void init(TupleStream tupleStream, String id, long runInterval, int queueSize) {
this.tupleStream = tupleStream;
this.id = id;
this.runInterval = runInterval;
this.queueSize = queueSize;
if(queueSize > 0) {
queue = new ArrayBlockingQueue(queueSize);
eatTuples = false;
} else {
eatTuples = true;
}
}
public int hashCode() {
return id.hashCode();
}
public boolean equals(Object o) {
if(o instanceof DaemonStream) {
return id.equals(((DaemonStream)o).id);
}
return false;
}
public String getId() {
return id;
}
public void open() {
this.streamRunner = new StreamRunner(runInterval);
this.streamRunner.start();
}
public Tuple read() throws IOException {
try {
return queue.take();
} catch (Exception e) {
throw new IOException(e);
}
}
public StreamComparator getStreamSort() {
return tupleStream.getStreamSort();
}
public void setStreamContext(StreamContext streamContext) {
this.tupleStream.setStreamContext(streamContext);
}
public void close() {
if(closed) {
return;
}
streamRunner.setShutdown(true);
}
public List<TupleStream> children() {
List<TupleStream> children = new ArrayList();
children.add(tupleStream);
return children;
}
public synchronized Tuple getInfo() {
Tuple tuple = new Tuple(new HashMap());
tuple.put("id", id);
tuple.put("startTime", startTime);
tuple.put("stopTime", stopTime);
tuple.put("iterations", iterations);
tuple.put("state", streamRunner.getState().toString());
if(exception != null) {
tuple.put("exception", exception.getMessage());
}
return tuple;
}
private synchronized void incrementIterations() {
++iterations;
}
private synchronized void setStartTime(long startTime) {
this.startTime = startTime;
}
private synchronized void setStopTime(long stopTime) {
this.stopTime = stopTime;
}
private class StreamRunner extends Thread {
private long sleepMillis = 1000;
private long runInterval;
private long lastRun;
private boolean shutdown;
public StreamRunner(long runInterval) {
this.runInterval = runInterval;
}
public synchronized void setShutdown(boolean shutdown) {
this.shutdown = shutdown;
interrupt(); //We could be blocked on the queue or sleeping
}
public synchronized boolean getShutdown() {
return shutdown;
}
public void run() {
setStartTime(new Date().getTime());
OUTER:
while (!getShutdown()) {
long now = new Date().getTime();
if((now-lastRun) > this.runInterval) {
lastRun = now;
try {
tupleStream.open();
INNER:
while (true) {
Tuple tuple = tupleStream.read();
if (tuple.EOF) {
break INNER;
} else if (!eatTuples) {
try {
queue.put(tuple);
} catch(InterruptedException e) {
break OUTER;
}
}
}
} catch (IOException e) {
exception = e;
logger.error("Error in DaemonStream", e);
break OUTER;
} finally {
try {
tupleStream.close();
} catch (IOException e1) {
if (exception == null) {
exception = e1;
logger.error("Error in DaemonStream", e1);
break OUTER;
}
}
}
}
incrementIterations();
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
logger.error("Error in DaemonStream", e);
break OUTER;
}
}
if(!eatTuples) {
Map m = new HashMap();
m.put("EOF", true);
Tuple tuple = new Tuple(m);
queue.offer(tuple);
}
setStopTime(new Date().getTime());
}
}
}

View File

@ -162,6 +162,9 @@ public class RollupStream extends TupleStream implements Expressible {
public void close() throws IOException { public void close() throws IOException {
tupleStream.close(); tupleStream.close();
this.currentMetrics = null;
this.currentKey = new HashKey("-");
this.finished = false;
} }
public Tuple read() throws IOException { public Tuple read() throws IOException {

View File

@ -20,6 +20,7 @@ package org.apache.solr.client.solrj.io.stream;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
@ -46,6 +47,7 @@ import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase; import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.cloud.AbstractZkTestCase; import org.apache.solr.cloud.AbstractZkTestCase;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.CommonParams;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
@ -134,6 +136,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
testUniqueStream(); testUniqueStream();
testRollupStream(); testRollupStream();
testStatsStream(); testStatsStream();
testDaemonStream();
testParallelUniqueStream(); testParallelUniqueStream();
testParallelReducerStream(); testParallelReducerStream();
testParallelRankStream(); testParallelRankStream();
@ -148,6 +151,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
testSubFacetStream(); testSubFacetStream();
testUpdateStream(); testUpdateStream();
testParallelUpdateStream(); testParallelUpdateStream();
testParallelDaemonUpdateStream();
testIntersectStream(); testIntersectStream();
testParallelIntersectStream(); testParallelIntersectStream();
testComplementStream(); testComplementStream();
@ -525,6 +529,130 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
commit(); commit();
} }
private void testDaemonStream() throws Exception {
indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
indexr(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6");
indexr(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7");
indexr(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8");
indexr(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9");
indexr(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");
commit();
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", zkServer.getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("rollup", RollupStream.class)
.withFunctionName("sum", SumMetric.class)
.withFunctionName("min", MinMetric.class)
.withFunctionName("max", MaxMetric.class)
.withFunctionName("avg", MeanMetric.class)
.withFunctionName("count", CountMetric.class)
.withFunctionName("daemon", DaemonStream.class);
StreamExpression expression;
DaemonStream daemonStream;
expression = StreamExpressionParser.parse("daemon(rollup("
+ "search(collection1, q=*:*, fl=\"a_i,a_s\", sort=\"a_s asc\"),"
+ "over=\"a_s\","
+ "sum(a_i)"
+ "), id=\"test\", runInterval=\"1000\", queueSize=\"9\")");
daemonStream = (DaemonStream)factory.constructStream(expression);
//Test Long and Double Sums
daemonStream.open(); // This will start the daemon thread
for(int i=0; i<4; i++) {
Tuple tuple = daemonStream.read(); // Reads from the queue
String bucket = tuple.getString("a_s");
Double sumi = tuple.getDouble("sum(a_i)");
//System.out.println("#################################### Bucket 1:"+bucket);
assertTrue(bucket.equals("hello0"));
assertTrue(sumi.doubleValue() == 17.0D);
tuple = daemonStream.read();
bucket = tuple.getString("a_s");
sumi = tuple.getDouble("sum(a_i)");
//System.out.println("#################################### Bucket 2:"+bucket);
assertTrue(bucket.equals("hello3"));
assertTrue(sumi.doubleValue() == 38.0D);
tuple = daemonStream.read();
bucket = tuple.getString("a_s");
sumi = tuple.getDouble("sum(a_i)");
//System.out.println("#################################### Bucket 3:"+bucket);
assertTrue(bucket.equals("hello4"));
assertTrue(sumi.longValue() == 15);
}
//Now lets wait until the internal queue fills up
while(daemonStream.remainingCapacity() > 0) {
try {
Thread.sleep(1000);
} catch (Exception e) {
}
}
//OK capacity is full, let's index a new doc
indexr(id, "10", "a_s", "hello0", "a_i", "1", "a_f", "10");
commit();
//Now lets clear the existing docs in the queue 9, plus 3 more to get passed the run that was blocked. The next run should
//have the tuples with the updated count.
for(int i=0; i<12;i++) {
daemonStream.read();
}
//And rerun the loop. It should have a new count for hello0
for(int i=0; i<4; i++) {
Tuple tuple = daemonStream.read(); // Reads from the queue
String bucket = tuple.getString("a_s");
Double sumi = tuple.getDouble("sum(a_i)");
//System.out.println("#################################### Bucket 1:"+bucket);
assertTrue(bucket.equals("hello0"));
assertTrue(sumi.doubleValue() == 18.0D);
tuple = daemonStream.read();
bucket = tuple.getString("a_s");
sumi = tuple.getDouble("sum(a_i)");
//System.out.println("#################################### Bucket 2:"+bucket);
assertTrue(bucket.equals("hello3"));
assertTrue(sumi.doubleValue() == 38.0D);
tuple = daemonStream.read();
bucket = tuple.getString("a_s");
sumi = tuple.getDouble("sum(a_i)");
//System.out.println("#################################### Bucket 3:"+bucket);
assertTrue(bucket.equals("hello4"));
assertTrue(sumi.longValue() == 15);
}
daemonStream.close(); //This should stop the daemon thread
del("*:*");
commit();
}
private void testRollupStream() throws Exception { private void testRollupStream() throws Exception {
indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1"); indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
@ -2206,6 +2334,174 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
del("*:*"); del("*:*");
commit(); commit();
} }
private void testParallelDaemonUpdateStream() throws Exception {
CloudSolrClient destinationCollectionClient = createCloudClient("parallelDestinationCollection1");
createCollection("parallelDestinationCollection1", destinationCollectionClient, 2, 2);
indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0", "s_multi", "aaaa", "s_multi", "bbbb", "i_multi", "4", "i_multi", "7");
indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0", "s_multi", "aaaa1", "s_multi", "bbbb1", "i_multi", "44", "i_multi", "77");
indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3", "s_multi", "aaaa2", "s_multi", "bbbb2", "i_multi", "444", "i_multi", "777");
indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "s_multi", "aaaa3", "s_multi", "bbbb3", "i_multi", "4444", "i_multi", "7777");
indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1", "s_multi", "aaaa4", "s_multi", "bbbb4", "i_multi", "44444", "i_multi", "77777");
commit();
waitForRecoveriesToFinish("parallelDestinationCollection1", false);
StreamExpression expression;
TupleStream stream;
Tuple t;
String zkHost = zkServer.getZkAddress();
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", zkServer.getZkAddress())
.withCollectionZkHost("parallelDestinationCollection1", zkServer.getZkAddress())
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("update", UpdateStream.class)
.withFunctionName("parallel", ParallelStream.class)
.withFunctionName("daemon", DaemonStream.class);
//Copy all docs to destinationCollection
String updateExpression = "daemon(update(parallelDestinationCollection1, batchSize=2, search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_f asc, a_i asc\", partitionKeys=\"a_f\")), runInterval=\"1000\", id=\"test\")";
TupleStream parallelUpdateStream = factory.constructStream("parallel(collection1, " + updateExpression + ", workers=\"2\", zkHost=\""+zkHost+"\", sort=\"batchNumber asc\")");
List<Tuple> tuples = getTuples(parallelUpdateStream);
assert(tuples.size() == 2);
//Lets sleep long enough for daemon updates to run.
//Lets stop the daemons
Map params = new HashMap();
params.put(CommonParams.QT,"/stream");
params.put("action","list");
int workersComplete = 0;
for(CloudJettyRunner jetty : this.cloudJettys) {
int iterations = 0;
INNER:
while(iterations == 0) {
SolrStream solrStream = new SolrStream(jetty.url, params);
solrStream.open();
Tuple tupleResponse = solrStream.read();
if (tupleResponse.EOF) {
solrStream.close();
break INNER;
} else {
long l = tupleResponse.getLong("iterations");
if(l > 0) {
++workersComplete;
} else {
try {
Thread.sleep(1000);
} catch(Exception e) {
}
}
iterations = (int) l;
solrStream.close();
}
}
}
assert(workersComplete == 2);
destinationCollectionClient.commit();
//Lets stop the daemons
params = new HashMap();
params.put(CommonParams.QT,"/stream");
params.put("action", "stop");
params.put("id", "test");
for(CloudJettyRunner jetty : this.cloudJettys) {
SolrStream solrStream = new SolrStream(jetty.url, params);
solrStream.open();
Tuple tupleResponse = solrStream.read();
solrStream.close();
}
params = new HashMap();
params.put(CommonParams.QT,"/stream");
params.put("action","list");
workersComplete = 0;
for(CloudJettyRunner jetty : this.cloudJettys) {
long stopTime = 0;
INNER:
while(stopTime == 0) {
SolrStream solrStream = new SolrStream(jetty.url, params);
solrStream.open();
Tuple tupleResponse = solrStream.read();
if (tupleResponse.EOF) {
solrStream.close();
break INNER;
} else {
stopTime = tupleResponse.getLong("stopTime");
if (stopTime > 0) {
++workersComplete;
} else {
try {
Thread.sleep(1000);
} catch(Exception e) {
}
}
solrStream.close();
}
}
}
assertTrue(workersComplete == 2);
//Ensure that destinationCollection actually has the new docs.
expression = StreamExpressionParser.parse("search(parallelDestinationCollection1, q=*:*, fl=\"id,a_s,a_i,a_f,s_multi,i_multi\", sort=\"a_i asc\")");
stream = new CloudSolrStream(expression, factory);
tuples = getTuples(stream);
assertEquals(5, tuples.size());
Tuple tuple = tuples.get(0);
assert(tuple.getLong("id") == 0);
assert(tuple.get("a_s").equals("hello0"));
assert(tuple.getLong("a_i") == 0);
assert(tuple.getDouble("a_f") == 0.0);
assertList(tuple.getStrings("s_multi"), "aaaa", "bbbb");
assertList(tuple.getLongs("i_multi"), Long.parseLong("4"), Long.parseLong("7"));
tuple = tuples.get(1);
assert(tuple.getLong("id") == 1);
assert(tuple.get("a_s").equals("hello1"));
assert(tuple.getLong("a_i") == 1);
assert(tuple.getDouble("a_f") == 1.0);
assertList(tuple.getStrings("s_multi"), "aaaa4", "bbbb4");
assertList(tuple.getLongs("i_multi"), Long.parseLong("44444"), Long.parseLong("77777"));
tuple = tuples.get(2);
assert(tuple.getLong("id") == 2);
assert(tuple.get("a_s").equals("hello2"));
assert(tuple.getLong("a_i") == 2);
assert(tuple.getDouble("a_f") == 0.0);
assertList(tuple.getStrings("s_multi"), "aaaa1", "bbbb1");
assertList(tuple.getLongs("i_multi"), Long.parseLong("44"), Long.parseLong("77"));
tuple = tuples.get(3);
assert(tuple.getLong("id") == 3);
assert(tuple.get("a_s").equals("hello3"));
assert(tuple.getLong("a_i") == 3);
assert(tuple.getDouble("a_f") == 3.0);
assertList(tuple.getStrings("s_multi"), "aaaa2", "bbbb2");
assertList(tuple.getLongs("i_multi"), Long.parseLong("444"), Long.parseLong("777"));
tuple = tuples.get(4);
assert(tuple.getLong("id") == 4);
assert(tuple.get("a_s").equals("hello4"));
assert(tuple.getLong("a_i") == 4);
assert(tuple.getDouble("a_f") == 4.0);
assertList(tuple.getStrings("s_multi"), "aaaa3", "bbbb3");
assertList(tuple.getLongs("i_multi"), Long.parseLong("4444"), Long.parseLong("7777"));
destinationCollectionClient.deleteByQuery("*:*");
destinationCollectionClient.commit();
destinationCollectionClient.close();
del("*:*");
commit();
}
private void testIntersectStream() throws Exception{ private void testIntersectStream() throws Exception{
indexr(id, "0", "a_s", "setA", "a_i", "0"); indexr(id, "0", "a_s", "setA", "a_i", "0");

View File

@ -61,6 +61,7 @@ public class StreamExpressionToExpessionTest extends LuceneTestCase {
.withFunctionName("min", MinMetric.class) .withFunctionName("min", MinMetric.class)
.withFunctionName("max", MaxMetric.class) .withFunctionName("max", MaxMetric.class)
.withFunctionName("avg", MeanMetric.class) .withFunctionName("avg", MeanMetric.class)
.withFunctionName("daemon", DaemonStream.class)
; ;
} }
@ -102,6 +103,24 @@ public class StreamExpressionToExpessionTest extends LuceneTestCase {
assertTrue(expressionString.contains("a_s as fieldA")); assertTrue(expressionString.contains("a_s as fieldA"));
} }
@Test
public void testDaemonStream() throws Exception {
DaemonStream stream;
String expressionString;
// Basic test
stream = new DaemonStream(StreamExpressionParser.parse("daemon(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), id=\"blah\", runInterval=\"1000\", queueSize=\"100\")"), factory);
expressionString = stream.toExpression(factory).toString();
assertTrue(expressionString.contains("daemon(search(collection1,"));
assertTrue(expressionString.contains("q=\"*:*\""));
assertTrue(expressionString.contains("fl=\"id,a_s,a_i,a_f\""));
assertTrue(expressionString.contains("sort=\"a_f asc, a_i asc\""));
assertTrue(expressionString.contains("id=blah"));
assertTrue(expressionString.contains("queueSize=100"));
assertTrue(expressionString.contains("runInterval=1000"));
}
@Test @Test
public void testStatsStream() throws Exception { public void testStatsStream() throws Exception {