mirror of https://github.com/apache/druid.git
Plumbers: Move plumber.add out of try/catch for ParseException.
The incremental indexes handle that now so it's not necessary. Also, add debug logging and more detailed exceptions to the incremental indexes for the case where there are parse exceptions during aggregation.
This commit is contained in:
parent
1e49092ce7
commit
8a11161b20
|
@ -111,6 +111,7 @@ import org.junit.Assert;
|
|||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.internal.matchers.ThrowableCauseMatcher;
|
||||
import org.junit.internal.matchers.ThrowableMessageMatcher;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
@ -446,7 +447,19 @@ public class RealtimeIndexTaskTest
|
|||
expectedException.expectCause(CoreMatchers.<Throwable>instanceOf(ParseException.class));
|
||||
expectedException.expectCause(
|
||||
ThrowableMessageMatcher.hasMessage(
|
||||
CoreMatchers.containsString("Unable to parse metrics[met1], value[foo]")
|
||||
CoreMatchers.containsString("Encountered parse error for aggregator[met1]")
|
||||
)
|
||||
);
|
||||
expectedException.expect(
|
||||
ThrowableCauseMatcher.hasCause(
|
||||
ThrowableCauseMatcher.hasCause(
|
||||
CoreMatchers.allOf(
|
||||
CoreMatchers.<Throwable>instanceOf(ParseException.class),
|
||||
ThrowableMessageMatcher.hasMessage(
|
||||
CoreMatchers.containsString("Unable to parse metrics[met1], value[foo]")
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
statusFuture.get();
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.google.common.base.Throwables;
|
|||
import com.google.common.collect.Maps;
|
||||
import com.metamx.common.IAE;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.common.parsers.ParseException;
|
||||
import io.druid.collections.ResourceHolder;
|
||||
import io.druid.collections.StupidPool;
|
||||
|
@ -46,6 +47,8 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
*/
|
||||
public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
|
||||
{
|
||||
private static final Logger log = new Logger(OffheapIncrementalIndex.class);
|
||||
|
||||
private final StupidPool<ByteBuffer> bufferPool;
|
||||
|
||||
private final List<ResourceHolder<ByteBuffer>> aggBuffers = new ArrayList<>();
|
||||
|
@ -275,7 +278,9 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
|
|||
} catch (ParseException e) {
|
||||
// "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
|
||||
if (reportParseExceptions) {
|
||||
throw e;
|
||||
throw new ParseException(e, "Encountered parse error for aggregator[%s]", getMetricAggs()[i].getName());
|
||||
} else {
|
||||
log.debug(e, "Encountered parse error, skipping aggregator[%s].", getMetricAggs()[i].getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.segment.incremental;
|
|||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.common.parsers.ParseException;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
|
@ -33,7 +34,6 @@ import io.druid.segment.DimensionSelector;
|
|||
import io.druid.segment.FloatColumnSelector;
|
||||
import io.druid.segment.LongColumnSelector;
|
||||
import io.druid.segment.ObjectColumnSelector;
|
||||
import io.druid.segment.column.ValueType;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -47,6 +47,8 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
*/
|
||||
public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
|
||||
{
|
||||
private static final Logger log = new Logger(OnheapIncrementalIndex.class);
|
||||
|
||||
private final ConcurrentHashMap<Integer, Aggregator[]> aggregators = new ConcurrentHashMap<>();
|
||||
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
|
||||
private final AtomicInteger indexIncrement = new AtomicInteger(0);
|
||||
|
@ -198,10 +200,13 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
|
|||
synchronized (agg) {
|
||||
try {
|
||||
agg.aggregate();
|
||||
} catch (ParseException e) {
|
||||
}
|
||||
catch (ParseException e) {
|
||||
// "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
|
||||
if (reportParseExceptions) {
|
||||
throw e;
|
||||
throw new ParseException(e, "Encountered parse error for aggregator[%s]", agg.getName());
|
||||
} else {
|
||||
log.debug(e, "Encountered parse error, skipping aggregator[%s].", agg.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,29 +46,9 @@ public class Plumbers
|
|||
final FireDepartmentMetrics metrics
|
||||
)
|
||||
{
|
||||
final InputRow inputRow;
|
||||
try {
|
||||
final InputRow inputRow = firehose.nextRow();
|
||||
|
||||
if (inputRow == null) {
|
||||
if (reportParseExceptions) {
|
||||
throw new ParseException("null input row");
|
||||
} else {
|
||||
log.debug("Discarded null input row, considering unparseable.");
|
||||
metrics.incrementUnparseable();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Included in ParseException try/catch, as additional parsing can be done during indexing.
|
||||
int numRows = plumber.add(inputRow, committerSupplier);
|
||||
|
||||
if (numRows == -1) {
|
||||
metrics.incrementThrownAway();
|
||||
log.debug("Discarded row[%s], considering thrownAway.", inputRow);
|
||||
return;
|
||||
}
|
||||
|
||||
metrics.incrementProcessed();
|
||||
inputRow = firehose.nextRow();
|
||||
}
|
||||
catch (ParseException e) {
|
||||
if (reportParseExceptions) {
|
||||
|
@ -76,12 +56,36 @@ public class Plumbers
|
|||
} else {
|
||||
log.debug(e, "Discarded row due to exception, considering unparseable.");
|
||||
metrics.incrementUnparseable();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (inputRow == null) {
|
||||
if (reportParseExceptions) {
|
||||
throw new ParseException("null input row");
|
||||
} else {
|
||||
log.debug("Discarded null input row, considering unparseable.");
|
||||
metrics.incrementUnparseable();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
final int numRows;
|
||||
try {
|
||||
numRows = plumber.add(inputRow, committerSupplier);
|
||||
}
|
||||
catch (IndexSizeExceededException e) {
|
||||
// Shouldn't happen if this is only being called by a single thread.
|
||||
// plumber.add should be swapping out indexes before they fill up.
|
||||
throw new ISE(e, "WTF?! Index size exceeded, this shouldn't happen. Bad Plumber!");
|
||||
}
|
||||
|
||||
if (numRows == -1) {
|
||||
metrics.incrementThrownAway();
|
||||
log.debug("Discarded row[%s], considering thrownAway.", inputRow);
|
||||
return;
|
||||
}
|
||||
|
||||
metrics.incrementProcessed();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue