diff --git a/realtime/src/main/java/com/metamx/druid/realtime/MinTimeFirehose.java b/realtime/src/main/java/com/metamx/druid/realtime/MinTimeFirehose.java index c9a66dbebad..86d01e4f3c2 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/MinTimeFirehose.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/MinTimeFirehose.java @@ -4,12 +4,19 @@ import com.metamx.druid.input.InputRow; import org.joda.time.DateTime; import java.io.IOException; +import java.util.NoSuchElementException; +/** + * Provides a view on a firehose that only returns rows at or after a certain minimum timestamp. + * Not thread-safe. + */ public class MinTimeFirehose implements Firehose { private final Firehose firehose; private final DateTime minTime; + private InputRow savedInputRow = null; + public MinTimeFirehose(Firehose firehose, DateTime minTime) { this.firehose = firehose; @@ -19,17 +26,37 @@ public class MinTimeFirehose implements Firehose @Override public boolean hasMore() { - return firehose.hasMore(); + if (savedInputRow != null) { + return true; + } + + while (firehose.hasMore()) { + final InputRow row = firehose.nextRow(); + if (acceptable(row)) { + savedInputRow = row; + return true; + } + } + + return false; } @Override public InputRow nextRow() { - while (true) { - final InputRow row = firehose.nextRow(); - if (row.getTimestampFromEpoch() >= minTime.getMillis()) { - return row; + if (savedInputRow != null) { + final InputRow row = savedInputRow; + savedInputRow = null; + return row; + } else { + while (firehose.hasMore()) { + final InputRow row = firehose.nextRow(); + if (acceptable(row)) { + return row; + } } + + throw new NoSuchElementException("No more rows!"); } } @@ -44,4 +71,9 @@ public class MinTimeFirehose implements Firehose { firehose.close(); } + + private boolean acceptable(InputRow row) + { + return row.getTimestampFromEpoch() >= minTime.getMillis(); + } }