SOLR-8832: Faulty DaemonStream shutdown procedures

This commit is contained in:
jbernste 2016-03-11 15:34:31 -05:00
parent 50c413e865
commit 007d41c9f5
2 changed files with 11 additions and 3 deletions

View File

@ -171,11 +171,16 @@ public class DaemonStream extends TupleStream implements Expressible {
this.tupleStream.setStreamContext(streamContext);
}
public void shutdown() {
streamRunner.setShutdown(true);
}
public void close() {
if(closed) {
return;
}
streamRunner.setShutdown(true);
this.closed = true;
}
public List<TupleStream> children() {
@ -226,7 +231,6 @@ public class DaemonStream extends TupleStream implements Expressible {
public synchronized void setShutdown(boolean shutdown) {
this.shutdown = shutdown;
interrupt(); //We could be blocked on the queue or sleeping
}
public synchronized boolean getShutdown() {

View File

@ -623,7 +623,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
DaemonStream daemonStream;
expression = StreamExpressionParser.parse("daemon(rollup("
+ "search(collection1, q=*:*, fl=\"a_i,a_s\", sort=\"a_s asc\"),"
+ "search(collection1, q=\"*:*\", fl=\"a_i,a_s\", sort=\"a_s asc\"),"
+ "over=\"a_s\","
+ "sum(a_i)"
+ "), id=\"test\", runInterval=\"1000\", queueSize=\"9\")");
@ -2366,13 +2366,17 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
assertEquals(14, (long) tuple.getLong(id));
tuple = dstream.read(); // This should trigger a checkpoint as it's the 4th read from the stream.
assertEquals(15, (long) tuple.getLong(id));
dstream.shutdown();
tuple = dstream.read();
assertTrue(tuple.EOF);
} finally {
dstream.close();
}
} finally {
cache.close();
del("*:*");
commit();
cache.close();
}
}