SOLR-8878: Allow the DaemonStream run rate be controlled by the internal stream

This commit is contained in:
jbernste 2016-03-20 22:04:14 -04:00
parent 0b221826a3
commit 5c074dac28
3 changed files with 92 additions and 15 deletions

View File

@ -73,7 +73,7 @@ public class DaemonStream extends TupleStream implements Expressible {
}
if(runExpression == null) {
throw new IOException("Invalid expression runInterval parameter expected");
runInterval = 2000;
} else {
runInterval = Long.parseLong(((StreamExpressionValue) runExpression.getParameter()).getValue());
}
@ -243,7 +243,7 @@ public class DaemonStream extends TupleStream implements Expressible {
OUTER:
while (!getShutdown()) {
long now = new Date().getTime();
if((now-lastRun) > this.runInterval) {
if ((now - lastRun) > this.runInterval) {
lastRun = now;
try {
tupleStream.open();
@ -252,25 +252,31 @@ public class DaemonStream extends TupleStream implements Expressible {
Tuple tuple = tupleStream.read();
if (tuple.EOF) {
errors = 0; // Reset errors on successful run.
if (tuple.fields.containsKey("sleepMillis")) {
this.sleepMillis = tuple.getLong("sleepMillis");
this.runInterval = -1;
}
break INNER;
} else if (!eatTuples) {
try {
queue.put(tuple);
} catch(InterruptedException e) {
} catch (InterruptedException e) {
break OUTER;
}
}
}
} catch (IOException e) {
e.printStackTrace();
exception = e;
logger.error("Error in DaemonStream:"+id, e);
logger.error("Error in DaemonStream:" + id, e);
++errors;
if(errors > 100) {
logger.error("Too many consectutive errors. Stopping DaemonStream:"+id);
if (errors > 100) {
logger.error("Too many consectutive errors. Stopping DaemonStream:" + id);
break OUTER;
}
} catch (Throwable t) {
logger.error("Fatal Error in DaemonStream:"+id, t);
t.printStackTrace();
logger.error("Fatal Error in DaemonStream:" + id, t);
//For anything other then IOException break out of the loop and shutdown the thread.
break OUTER;
} finally {
@ -279,18 +285,21 @@ public class DaemonStream extends TupleStream implements Expressible {
} catch (IOException e1) {
if (exception == null) {
exception = e1;
logger.error("Error in DaemonStream:"+id, e1);
logger.error("Error in DaemonStream:" + id, e1);
break OUTER;
}
}
}
}
incrementIterations();
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
logger.error("Error in DaemonStream:"+id, e);
break OUTER;
if (sleepMillis > 0) {
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
logger.error("Error in DaemonStream:" + id, e);
break OUTER;
}
}
}

View File

@ -65,6 +65,7 @@ public class TopicStream extends CloudSolrStream implements Expressible {
private static final long serialVersionUID = 1;
private long count;
private int runCount;
private String id;
protected long checkpointEvery;
@ -98,6 +99,9 @@ public class TopicStream extends CloudSolrStream implements Expressible {
this.checkpointEvery = checkpointEvery;
this.id = id;
this.comp = new FieldComparator("_version_", ComparatorOrder.ASCENDING);
if(!params.containsKey("rows")) {
params.put("rows", "500");
}
}
public TopicStream(StreamExpression expression, StreamFactory factory) throws IOException{
@ -257,6 +261,7 @@ public class TopicStream extends CloudSolrStream implements Expressible {
}
public void close() throws IOException {
runCount = 0;
try {
persistCheckpoints();
} finally {
@ -277,10 +282,17 @@ public class TopicStream extends CloudSolrStream implements Expressible {
Tuple tuple = _read();
if(tuple.EOF) {
if(runCount > 0) {
tuple.put("sleepMillis", 0);
} else {
tuple.put("sleepMillis", 1000);
}
return tuple;
}
++count;
++runCount;
if(checkpointEvery > -1 && (count % checkpointEvery) == 0) {
persistCheckpoints();
}
@ -427,7 +439,9 @@ public class TopicStream extends CloudSolrStream implements Expressible {
params.put("distrib", "false"); // We are the aggregator.
String fl = params.get("fl");
params.put("sort", "_version_ asc");
fl += ",_version_";
if(!fl.contains("_version_")) {
fl += ",_version_";
}
params.put("fl", fl);
Random random = new Random();

View File

@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
@ -226,7 +227,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
attachStreamFactory(pstream);
List<Tuple> tuples = getTuples(pstream);
assert(tuples.size() == 5);
assertOrder(tuples, 0,1,3,4,6);
assertOrder(tuples, 0, 1, 3, 4, 6);
//Test the eofTuples
@ -1369,7 +1370,59 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
}
private void testDaemonTopicStream() throws Exception {
String zkHost = zkServer.getZkAddress();
StreamContext context = new StreamContext();
SolrClientCache cache = new SolrClientCache();
context.setSolrClientCache(cache);
Map params = new HashMap();
params.put("q","a_s:hello0");
params.put("rows", "500");
params.put("fl", "id");
TopicStream topicStream = new TopicStream(zkHost, "collection1", "collection1", "50000000", 1000000, params);
DaemonStream daemonStream = new DaemonStream(topicStream, "daemon1", 1000, 500);
daemonStream.setStreamContext(context);
daemonStream.open();
indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");
indexr(id, "3", "a_s", "hello0", "a_i", "3", "a_f", "3");
indexr(id, "4", "a_s", "hello0", "a_i", "4", "a_f", "4");
indexr(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5");
commit();
for(int i=0; i<5; i++) {
daemonStream.read();
}
indexr(id, "5", "a_s", "hello0", "a_i", "4", "a_f", "4");
indexr(id, "6", "a_s", "hello0", "a_i", "4", "a_f", "4");
commit();
for(int i=0; i<2; i++) {
daemonStream.read();
}
daemonStream.shutdown();
Tuple tuple = daemonStream.read();
assertTrue(tuple.EOF);
daemonStream.close();
cache.close();
del("*:*");
commit();
}
private void testParallelRollupStream() throws Exception {
@ -1799,6 +1852,7 @@ public class StreamingTest extends AbstractFullDistribZkTestBase {
testSubFacetStream();
testStatsStream();
//testExceptionStream();
testDaemonTopicStream();
testParallelEOF();
testParallelUniqueStream();
testParallelRankStream();