SOLR-8832: Faulty DaemonStream shutdown procedures

This commit is contained in:
jbernste 2016-03-11 15:34:31 -05:00
parent 164d6ccd2d
commit 26f230a474
2 changed files with 11 additions and 3 deletions

View File

@ -171,11 +171,16 @@ public class DaemonStream extends TupleStream implements Expressible {
this.tupleStream.setStreamContext(streamContext); this.tupleStream.setStreamContext(streamContext);
} }
public void shutdown() {
streamRunner.setShutdown(true);
}
public void close() { public void close() {
if(closed) { if(closed) {
return; return;
} }
streamRunner.setShutdown(true); streamRunner.setShutdown(true);
this.closed = true;
} }
public List<TupleStream> children() { public List<TupleStream> children() {
@ -226,7 +231,6 @@ public class DaemonStream extends TupleStream implements Expressible {
public synchronized void setShutdown(boolean shutdown) { public synchronized void setShutdown(boolean shutdown) {
this.shutdown = shutdown; this.shutdown = shutdown;
interrupt(); //We could be blocked on the queue or sleeping
} }
public synchronized boolean getShutdown() { public synchronized boolean getShutdown() {

View File

@ -623,7 +623,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
DaemonStream daemonStream; DaemonStream daemonStream;
expression = StreamExpressionParser.parse("daemon(rollup(" expression = StreamExpressionParser.parse("daemon(rollup("
+ "search(collection1, q=*:*, fl=\"a_i,a_s\", sort=\"a_s asc\")," + "search(collection1, q=\"*:*\", fl=\"a_i,a_s\", sort=\"a_s asc\"),"
+ "over=\"a_s\"," + "over=\"a_s\","
+ "sum(a_i)" + "sum(a_i)"
+ "), id=\"test\", runInterval=\"1000\", queueSize=\"9\")"); + "), id=\"test\", runInterval=\"1000\", queueSize=\"9\")");
@ -2366,13 +2366,17 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
assertEquals(14, (long) tuple.getLong(id)); assertEquals(14, (long) tuple.getLong(id));
tuple = dstream.read(); // This should trigger a checkpoint as it's the 4th read from the stream. tuple = dstream.read(); // This should trigger a checkpoint as it's the 4th read from the stream.
assertEquals(15, (long) tuple.getLong(id)); assertEquals(15, (long) tuple.getLong(id));
dstream.shutdown();
tuple = dstream.read();
assertTrue(tuple.EOF);
} finally { } finally {
dstream.close(); dstream.close();
} }
} finally { } finally {
cache.close();
del("*:*"); del("*:*");
commit(); commit();
cache.close();
} }
} }