diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java index 358eea4a509..2f6539480d1 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java @@ -151,7 +151,7 @@ public class DaemonStream extends TupleStream implements Expressible { } public void open() { - this.streamRunner = new StreamRunner(runInterval); + this.streamRunner = new StreamRunner(runInterval, id); this.streamRunner.start(); } @@ -215,11 +215,13 @@ public class DaemonStream extends TupleStream implements Expressible { private long sleepMillis = 1000; private long runInterval; private long lastRun; + private String id; private boolean shutdown; - public StreamRunner(long runInterval) { + public StreamRunner(long runInterval, String id) { this.runInterval = runInterval; + this.id = id; } public synchronized void setShutdown(boolean shutdown) { @@ -232,6 +234,7 @@ public class DaemonStream extends TupleStream implements Expressible { } public void run() { + int errors = 0; setStartTime(new Date().getTime()); OUTER: while (!getShutdown()) { @@ -244,6 +247,7 @@ public class DaemonStream extends TupleStream implements Expressible { while (true) { Tuple tuple = tupleStream.read(); if (tuple.EOF) { + errors = 0; // Reset errors on successful run. break INNER; } else if (!eatTuples) { try { @@ -255,7 +259,15 @@ public class DaemonStream extends TupleStream implements Expressible { } } catch (IOException e) { exception = e; - logger.error("Error in DaemonStream", e); + logger.error("Error in DaemonStream:"+id, e); + ++errors; + if(errors > 100) { + logger.error("Too many consectutive errors. Stopping DaemonStream:"+id); + break OUTER; + } + } catch (Throwable t) { + logger.error("Fatal Error in DaemonStream:"+id, t); + //For anything other then IOException break out of the loop and shutdown the thread. break OUTER; } finally { try { @@ -263,7 +275,7 @@ public class DaemonStream extends TupleStream implements Expressible { } catch (IOException e1) { if (exception == null) { exception = e1; - logger.error("Error in DaemonStream", e1); + logger.error("Error in DaemonStream:"+id, e1); break OUTER; } } @@ -273,7 +285,7 @@ public class DaemonStream extends TupleStream implements Expressible { try { Thread.sleep(sleepMillis); } catch (InterruptedException e) { - logger.error("Error in DaemonStream", e); + logger.error("Error in DaemonStream:"+id, e); break OUTER; } } @@ -285,7 +297,7 @@ public class DaemonStream extends TupleStream implements Expressible { try { queue.put(tuple); } catch (InterruptedException e) { - logger.error("Error in DaemonStream", e); + logger.error("Error in DaemonStream:"+id, e); } } setStopTime(new Date().getTime());