diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index 85eb0a90418..f0c85f0c6db 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -111,6 +111,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.internal.matchers.ThrowableCauseMatcher; import org.junit.internal.matchers.ThrowableMessageMatcher; import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; @@ -446,7 +447,19 @@ public class RealtimeIndexTaskTest expectedException.expectCause(CoreMatchers.instanceOf(ParseException.class)); expectedException.expectCause( ThrowableMessageMatcher.hasMessage( - CoreMatchers.containsString("Unable to parse metrics[met1], value[foo]") + CoreMatchers.containsString("Encountered parse error for aggregator[met1]") + ) + ); + expectedException.expect( + ThrowableCauseMatcher.hasCause( + ThrowableCauseMatcher.hasCause( + CoreMatchers.allOf( + CoreMatchers.instanceOf(ParseException.class), + ThrowableMessageMatcher.hasMessage( + CoreMatchers.containsString("Unable to parse metrics[met1], value[foo]") + ) + ) + ) ) ); statusFuture.get(); diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index 47f4c5ca4f1..195b8089db9 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -24,6 +24,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.metamx.common.IAE; import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; import com.metamx.common.parsers.ParseException; import io.druid.collections.ResourceHolder; import io.druid.collections.StupidPool; @@ -46,6 +47,8 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class OffheapIncrementalIndex extends IncrementalIndex { + private static final Logger log = new Logger(OffheapIncrementalIndex.class); + private final StupidPool bufferPool; private final List> aggBuffers = new ArrayList<>(); @@ -275,7 +278,9 @@ public class OffheapIncrementalIndex extends IncrementalIndex } catch (ParseException e) { // "aggregate" can throw ParseExceptions if a selector expects something but gets something else. if (reportParseExceptions) { - throw e; + throw new ParseException(e, "Encountered parse error for aggregator[%s]", getMetricAggs()[i].getName()); + } else { + log.debug(e, "Encountered parse error, skipping aggregator[%s].", getMetricAggs()[i].getName()); } } } diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java index 5f4f58e0e18..62db1a689a0 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -22,6 +22,7 @@ package io.druid.segment.incremental; import com.google.common.base.Supplier; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.metamx.common.logger.Logger; import com.metamx.common.parsers.ParseException; import io.druid.data.input.InputRow; import io.druid.granularity.QueryGranularity; @@ -33,7 +34,6 @@ import io.druid.segment.DimensionSelector; import io.druid.segment.FloatColumnSelector; import io.druid.segment.LongColumnSelector; import io.druid.segment.ObjectColumnSelector; -import io.druid.segment.column.ValueType; import java.util.List; import java.util.Map; @@ -47,6 +47,8 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class OnheapIncrementalIndex extends IncrementalIndex { + private static final Logger log = new Logger(OnheapIncrementalIndex.class); + private final ConcurrentHashMap aggregators = new ConcurrentHashMap<>(); private final ConcurrentNavigableMap facts; private final AtomicInteger indexIncrement = new AtomicInteger(0); @@ -198,10 +200,13 @@ public class OnheapIncrementalIndex extends IncrementalIndex synchronized (agg) { try { agg.aggregate(); - } catch (ParseException e) { + } + catch (ParseException e) { // "aggregate" can throw ParseExceptions if a selector expects something but gets something else. if (reportParseExceptions) { - throw e; + throw new ParseException(e, "Encountered parse error for aggregator[%s]", agg.getName()); + } else { + log.debug(e, "Encountered parse error, skipping aggregator[%s].", agg.getName()); } } } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Plumbers.java b/server/src/main/java/io/druid/segment/realtime/plumber/Plumbers.java index 53511b0a10e..5f54d4d5a2b 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Plumbers.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Plumbers.java @@ -46,29 +46,9 @@ public class Plumbers final FireDepartmentMetrics metrics ) { + final InputRow inputRow; try { - final InputRow inputRow = firehose.nextRow(); - - if (inputRow == null) { - if (reportParseExceptions) { - throw new ParseException("null input row"); - } else { - log.debug("Discarded null input row, considering unparseable."); - metrics.incrementUnparseable(); - return; - } - } - - // Included in ParseException try/catch, as additional parsing can be done during indexing. - int numRows = plumber.add(inputRow, committerSupplier); - - if (numRows == -1) { - metrics.incrementThrownAway(); - log.debug("Discarded row[%s], considering thrownAway.", inputRow); - return; - } - - metrics.incrementProcessed(); + inputRow = firehose.nextRow(); } catch (ParseException e) { if (reportParseExceptions) { @@ -76,12 +56,36 @@ public class Plumbers } else { log.debug(e, "Discarded row due to exception, considering unparseable."); metrics.incrementUnparseable(); + return; } } + + if (inputRow == null) { + if (reportParseExceptions) { + throw new ParseException("null input row"); + } else { + log.debug("Discarded null input row, considering unparseable."); + metrics.incrementUnparseable(); + return; + } + } + + final int numRows; + try { + numRows = plumber.add(inputRow, committerSupplier); + } catch (IndexSizeExceededException e) { // Shouldn't happen if this is only being called by a single thread. // plumber.add should be swapping out indexes before they fill up. throw new ISE(e, "WTF?! Index size exceeded, this shouldn't happen. Bad Plumber!"); } + + if (numRows == -1) { + metrics.incrementThrownAway(); + log.debug("Discarded row[%s], considering thrownAway.", inputRow); + return; + } + + metrics.incrementProcessed(); } }