SOLR-8708: DaemonStream should catch InterruptedException when reading underlying stream.

This commit is contained in:
jbernste 2016-02-20 22:33:13 -05:00
parent 2fd90cd489
commit 2b3529c3b8
1 changed files with 18 additions and 6 deletions

View File

@ -151,7 +151,7 @@ public class DaemonStream extends TupleStream implements Expressible {
} }
public void open() { public void open() {
this.streamRunner = new StreamRunner(runInterval); this.streamRunner = new StreamRunner(runInterval, id);
this.streamRunner.start(); this.streamRunner.start();
} }
@ -215,11 +215,13 @@ public class DaemonStream extends TupleStream implements Expressible {
private long sleepMillis = 1000; private long sleepMillis = 1000;
private long runInterval; private long runInterval;
private long lastRun; private long lastRun;
private String id;
private boolean shutdown; private boolean shutdown;
public StreamRunner(long runInterval) { public StreamRunner(long runInterval, String id) {
this.runInterval = runInterval; this.runInterval = runInterval;
this.id = id;
} }
public synchronized void setShutdown(boolean shutdown) { public synchronized void setShutdown(boolean shutdown) {
@ -232,6 +234,7 @@ public class DaemonStream extends TupleStream implements Expressible {
} }
public void run() { public void run() {
int errors = 0;
setStartTime(new Date().getTime()); setStartTime(new Date().getTime());
OUTER: OUTER:
while (!getShutdown()) { while (!getShutdown()) {
@ -244,6 +247,7 @@ public class DaemonStream extends TupleStream implements Expressible {
while (true) { while (true) {
Tuple tuple = tupleStream.read(); Tuple tuple = tupleStream.read();
if (tuple.EOF) { if (tuple.EOF) {
errors = 0; // Reset errors on successful run.
break INNER; break INNER;
} else if (!eatTuples) { } else if (!eatTuples) {
try { try {
@ -255,7 +259,15 @@ public class DaemonStream extends TupleStream implements Expressible {
} }
} catch (IOException e) { } catch (IOException e) {
exception = e; exception = e;
logger.error("Error in DaemonStream", e); logger.error("Error in DaemonStream:"+id, e);
++errors;
if(errors > 100) {
logger.error("Too many consectutive errors. Stopping DaemonStream:"+id);
break OUTER;
}
} catch (Throwable t) {
logger.error("Fatal Error in DaemonStream:"+id, t);
//For anything other then IOException break out of the loop and shutdown the thread.
break OUTER; break OUTER;
} finally { } finally {
try { try {
@ -263,7 +275,7 @@ public class DaemonStream extends TupleStream implements Expressible {
} catch (IOException e1) { } catch (IOException e1) {
if (exception == null) { if (exception == null) {
exception = e1; exception = e1;
logger.error("Error in DaemonStream", e1); logger.error("Error in DaemonStream:"+id, e1);
break OUTER; break OUTER;
} }
} }
@ -273,7 +285,7 @@ public class DaemonStream extends TupleStream implements Expressible {
try { try {
Thread.sleep(sleepMillis); Thread.sleep(sleepMillis);
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.error("Error in DaemonStream", e); logger.error("Error in DaemonStream:"+id, e);
break OUTER; break OUTER;
} }
} }
@ -285,7 +297,7 @@ public class DaemonStream extends TupleStream implements Expressible {
try { try {
queue.put(tuple); queue.put(tuple);
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.error("Error in DaemonStream", e); logger.error("Error in DaemonStream:"+id, e);
} }
} }
setStopTime(new Date().getTime()); setStopTime(new Date().getTime());