MinTimeFirehose: Correcter logic

This commit is contained in:
Gian Merlino 2013-04-19 14:51:41 -07:00
parent dbfa045df0
commit 5edbf2b4b9
1 changed files with 37 additions and 5 deletions

View File

@ -4,12 +4,19 @@ import com.metamx.druid.input.InputRow;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import java.io.IOException; import java.io.IOException;
import java.util.NoSuchElementException;
/**
* Provides a view on a firehose that only returns rows at or after a certain minimum timestamp.
* Not thread-safe.
*/
public class MinTimeFirehose implements Firehose public class MinTimeFirehose implements Firehose
{ {
private final Firehose firehose; private final Firehose firehose;
private final DateTime minTime; private final DateTime minTime;
private InputRow savedInputRow = null;
public MinTimeFirehose(Firehose firehose, DateTime minTime) public MinTimeFirehose(Firehose firehose, DateTime minTime)
{ {
this.firehose = firehose; this.firehose = firehose;
@ -19,18 +26,38 @@ public class MinTimeFirehose implements Firehose
@Override @Override
public boolean hasMore() public boolean hasMore()
{ {
return firehose.hasMore(); if (savedInputRow != null) {
return true;
}
while (firehose.hasMore()) {
final InputRow row = firehose.nextRow();
if (acceptable(row)) {
savedInputRow = row;
return true;
}
}
return false;
} }
@Override @Override
public InputRow nextRow() public InputRow nextRow()
{ {
while (true) { if (savedInputRow != null) {
final InputRow row = savedInputRow;
savedInputRow = null;
return row;
} else {
while (firehose.hasMore()) {
final InputRow row = firehose.nextRow(); final InputRow row = firehose.nextRow();
if (row.getTimestampFromEpoch() >= minTime.getMillis()) { if (acceptable(row)) {
return row; return row;
} }
} }
throw new NoSuchElementException("No more rows!");
}
} }
@Override @Override
@ -44,4 +71,9 @@ public class MinTimeFirehose implements Firehose
{ {
firehose.close(); firehose.close();
} }
private boolean acceptable(InputRow row)
{
return row.getTimestampFromEpoch() >= minTime.getMillis();
}
} }