Merge pull request #1506 from gianm/realtime-plumber-nulls

Consider null inputRows and parse errors as unparseable during realtime ingestion.
This commit is contained in:
Himanshu 2015-07-13 10:12:12 -05:00
commit 725086cc89
3 changed files with 125 additions and 80 deletions

View File

@ -58,7 +58,6 @@ import org.joda.time.Period;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
public class RealtimeIndexTask extends AbstractTask
@ -293,9 +292,19 @@ public class RealtimeIndexTask extends AbstractTask
long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
while (firehose.hasMore()) {
final InputRow inputRow;
try {
inputRow = firehose.nextRow();
if (inputRow == null) {
log.debug("thrown away null input row, considering unparseable");
fireDepartment.getMetrics().incrementUnparseable();
continue;
}
}
catch (ParseException e) {
log.debug(e, "thrown away line due to exception, considering unparseable");
fireDepartment.getMetrics().incrementUnparseable();
continue;
}
@ -319,11 +328,6 @@ public class RealtimeIndexTask extends AbstractTask
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
}
catch (ParseException e) {
log.warn(e, "unparseable line");
fireDepartment.getMetrics().incrementUnparseable();
}
}
}
catch (Throwable e) {
normalExit = false;

View File

@ -252,8 +252,14 @@ public class RealtimeManager implements QuerySegmentWalker
try {
try {
inputRow = firehose.nextRow();
if (inputRow == null) {
log.debug("thrown away null input row, considering unparseable");
metrics.incrementUnparseable();
continue;
}
catch (Exception e) {
}
catch (ParseException e) {
log.debug(e, "thrown away line due to exception, considering unparseable");
metrics.incrementUnparseable();
continue;

View File

@ -21,6 +21,7 @@ import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import com.metamx.common.parsers.ParseException;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
@ -40,10 +41,10 @@ import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
import io.druid.segment.realtime.plumber.Sink;
import io.druid.utils.Runnables;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -64,8 +65,11 @@ public class RealtimeManagerTest
@Before
public void setUp() throws Exception
{
final List<InputRow> rows = Arrays.asList(
makeRow(new DateTime("9000-01-01").getMillis()), makeRow(new DateTime().getMillis())
final List<TestInputRowHolder> rows = Arrays.asList(
makeRow(new DateTime("9000-01-01").getMillis()),
makeRow(new ParseException("parse error")),
null,
makeRow(new DateTime().getMillis())
);
schema = new DataSchema(
@ -137,13 +141,39 @@ public class RealtimeManagerTest
Assert.assertEquals(1, realtimeManager.getMetrics("test").processed());
Assert.assertEquals(1, realtimeManager.getMetrics("test").thrownAway());
Assert.assertEquals(2, realtimeManager.getMetrics("test").unparseable());
Assert.assertTrue(plumber.isStartedJob());
Assert.assertTrue(plumber.isFinishedJob());
Assert.assertEquals(1, plumber.getPersistCount());
}
private InputRow makeRow(final long timestamp)
private TestInputRowHolder makeRow(final long timestamp)
{
return new TestInputRowHolder(timestamp, null);
}
private TestInputRowHolder makeRow(final RuntimeException e)
{
return new TestInputRowHolder(0, e);
}
private static class TestInputRowHolder
{
private long timestamp;
private RuntimeException exception;
public TestInputRowHolder(long timestamp, RuntimeException exception)
{
this.timestamp = timestamp;
this.exception = exception;
}
public InputRow getRow()
{
if (exception != null) {
throw exception;
}
return new InputRow()
{
@Override
@ -195,13 +225,13 @@ public class RealtimeManagerTest
}
};
}
}
private static class TestFirehose implements Firehose
{
private final Iterator<InputRow> rows;
private final Iterator<TestInputRowHolder> rows;
private TestFirehose(Iterator<InputRow> rows)
private TestFirehose(Iterator<TestInputRowHolder> rows)
{
this.rows = rows;
}
@ -215,7 +245,12 @@ public class RealtimeManagerTest
@Override
public InputRow nextRow()
{
return rows.next();
final TestInputRowHolder holder = rows.next();
if (holder == null) {
return null;
} else {
return holder.getRow();
}
}
@Override