Consider null inputRows and parse errors as unparseable during realtime ingestion.

Also, harmonize exception handling between the RealtimeIndexTask and the RealtimeManager.
Conditions other than null inputRows and parse errors bubble up in both.
This commit is contained in:
Gian Merlino 2015-07-09 12:11:38 -07:00
parent b3d360f990
commit 9068bcd062
3 changed files with 125 additions and 80 deletions

View File

@ -58,7 +58,6 @@ import org.joda.time.Period;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
public class RealtimeIndexTask extends AbstractTask
@ -293,35 +292,40 @@ public class RealtimeIndexTask extends AbstractTask
long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
while (firehose.hasMore()) {
final InputRow inputRow;
try {
inputRow = firehose.nextRow();
if (inputRow == null) {
log.debug("thrown away null input row, considering unparseable");
fireDepartment.getMetrics().incrementUnparseable();
continue;
}
int currCount = plumber.add(inputRow);
if (currCount == -1) {
fireDepartment.getMetrics().incrementThrownAway();
log.debug("Throwing away event[%s]", inputRow);
if (System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
continue;
}
fireDepartment.getMetrics().incrementProcessed();
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
}
catch (ParseException e) {
log.warn(e, "unparseable line");
log.debug(e, "thrown away line due to exception, considering unparseable");
fireDepartment.getMetrics().incrementUnparseable();
continue;
}
int currCount = plumber.add(inputRow);
if (currCount == -1) {
fireDepartment.getMetrics().incrementThrownAway();
log.debug("Throwing away event[%s]", inputRow);
if (System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
continue;
}
fireDepartment.getMetrics().incrementProcessed();
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
}
}

View File

@ -252,8 +252,14 @@ public class RealtimeManager implements QuerySegmentWalker
try {
try {
inputRow = firehose.nextRow();
if (inputRow == null) {
log.debug("thrown away null input row, considering unparseable");
metrics.incrementUnparseable();
continue;
}
}
catch (Exception e) {
catch (ParseException e) {
log.debug(e, "thrown away line due to exception, considering unparseable");
metrics.incrementUnparseable();
continue;

View File

@ -21,6 +21,7 @@ import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import com.metamx.common.parsers.ParseException;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
@ -40,10 +41,10 @@ import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
import io.druid.segment.realtime.plumber.Sink;
import io.druid.utils.Runnables;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -64,8 +65,11 @@ public class RealtimeManagerTest
@Before
public void setUp() throws Exception
{
final List<InputRow> rows = Arrays.asList(
makeRow(new DateTime("9000-01-01").getMillis()), makeRow(new DateTime().getMillis())
final List<TestInputRowHolder> rows = Arrays.asList(
makeRow(new DateTime("9000-01-01").getMillis()),
makeRow(new ParseException("parse error")),
null,
makeRow(new DateTime().getMillis())
);
schema = new DataSchema(
@ -137,71 +141,97 @@ public class RealtimeManagerTest
Assert.assertEquals(1, realtimeManager.getMetrics("test").processed());
Assert.assertEquals(1, realtimeManager.getMetrics("test").thrownAway());
Assert.assertEquals(2, realtimeManager.getMetrics("test").unparseable());
Assert.assertTrue(plumber.isStartedJob());
Assert.assertTrue(plumber.isFinishedJob());
Assert.assertEquals(1, plumber.getPersistCount());
}
private InputRow makeRow(final long timestamp)
private TestInputRowHolder makeRow(final long timestamp)
{
return new InputRow()
{
@Override
public List<String> getDimensions()
{
return Arrays.asList("testDim");
}
@Override
public long getTimestampFromEpoch()
{
return timestamp;
}
@Override
public DateTime getTimestamp()
{
return new DateTime(timestamp);
}
@Override
public List<String> getDimension(String dimension)
{
return Lists.newArrayList();
}
@Override
public float getFloatMetric(String metric)
{
return 0;
}
@Override
public long getLongMetric(String metric)
{
return 0L;
}
@Override
public Object getRaw(String dimension)
{
return null;
}
@Override
public int compareTo(Row o)
{
return 0;
}
};
return new TestInputRowHolder(timestamp, null);
}
private TestInputRowHolder makeRow(final RuntimeException e)
{
return new TestInputRowHolder(0, e);
}
private static class TestInputRowHolder
{
private long timestamp;
private RuntimeException exception;
public TestInputRowHolder(long timestamp, RuntimeException exception)
{
this.timestamp = timestamp;
this.exception = exception;
}
public InputRow getRow()
{
if (exception != null) {
throw exception;
}
return new InputRow()
{
@Override
public List<String> getDimensions()
{
return Arrays.asList("testDim");
}
@Override
public long getTimestampFromEpoch()
{
return timestamp;
}
@Override
public DateTime getTimestamp()
{
return new DateTime(timestamp);
}
@Override
public List<String> getDimension(String dimension)
{
return Lists.newArrayList();
}
@Override
public float getFloatMetric(String metric)
{
return 0;
}
@Override
public long getLongMetric(String metric)
{
return 0L;
}
@Override
public Object getRaw(String dimension)
{
return null;
}
@Override
public int compareTo(Row o)
{
return 0;
}
};
}
}
private static class TestFirehose implements Firehose
{
private final Iterator<InputRow> rows;
private final Iterator<TestInputRowHolder> rows;
private TestFirehose(Iterator<InputRow> rows)
private TestFirehose(Iterator<TestInputRowHolder> rows)
{
this.rows = rows;
}
@ -215,7 +245,12 @@ public class RealtimeManagerTest
@Override
public InputRow nextRow()
{
return rows.next();
final TestInputRowHolder holder = rows.next();
if (holder == null) {
return null;
} else {
return holder.getRow();
}
}
@Override