Better handling of ParseExceptions.

Two changes:
- Allow IncrementalIndex to suppress ParseExceptions on "aggregate".
- Add "reportParseExceptions" option to realtime tuning configs. By default this is "false".

Behavior of the counters should now be:

- processed: Number of rows indexed, including rows where some fields could be parsed and some could not.
- thrownAway: Number of rows thrown away due to rejection policy.
- unparseable: Number of rows thrown away due to being completely unparseable (no fields salvageable at all).

If "reportParseExceptions" is true then "unparseable" will always be zero (because a parse error would
cause an exception to be thrown). In addition, "processed" will only include fully parseable rows
(because even partial parse failures will cause exceptions to be thrown).

Fixes #2510.
This commit is contained in:
Gian Merlino 2016-02-19 13:35:06 -08:00
parent 0c984f9e32
commit 3534483433
27 changed files with 448 additions and 104 deletions

View File

@ -153,6 +153,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|buildV9Directly|Boolean|Whether to build v9 index directly instead of building v8 index and convert it to v9 format|no (default = false)|
|persistThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the persisting thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default = 0; inherit and do not override)|
|mergeThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the merging thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default = 0; inherit and do not override)|
|reportParseExceptions|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion. If false, unparseable rows and fields will be skipped. If an entire row is skipped, the "unparseable" counter will be incremented. If some fields in a row were parseable and some were not, the parseable fields will be indexed and the "unparseable" counter will not be incremented.|false|
Before enabling thread priority settings, users are highly encouraged to read the [original pull request](https://github.com/druid-io/druid/pull/984) and other documentation about proper use of `-XX:+UseThreadPriorities`.

View File

@ -229,6 +229,7 @@ public class IndexGeneratorJob implements Jobby
OnheapIncrementalIndex newIndex = new OnheapIncrementalIndex(
indexSchema,
!tuningConfig.isIgnoreInvalidRows(),
tuningConfig.getRowFlushBoundary()
);

View File

@ -143,7 +143,8 @@ public class IndexTask extends AbstractFixedIntervalTask
indexSpec,
buildV9Directly,
0,
0
0,
true
);
}

View File

@ -28,12 +28,10 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.Ints;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.parsers.ParseException;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
@ -51,6 +49,7 @@ import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.RealtimeMetricsMonitor;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
@ -117,6 +116,9 @@ public class RealtimeIndexTask extends AbstractTask
@JsonIgnore
private volatile Firehose firehose = null;
@JsonIgnore
private volatile FireDepartmentMetrics metrics = null;
@JsonIgnore
private volatile boolean gracefullyStopped = false;
@ -285,6 +287,7 @@ public class RealtimeIndexTask extends AbstractTask
realtimeIOConfig,
tuningConfig
);
this.metrics = fireDepartment.getMetrics();
final RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor(
ImmutableList.of(fireDepartment),
ImmutableMap.of(
@ -313,7 +316,7 @@ public class RealtimeIndexTask extends AbstractTask
toolbox.getObjectMapper()
);
this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, fireDepartment.getMetrics());
this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, metrics);
Supplier<Committer> committerSupplier = null;
@ -337,7 +340,13 @@ public class RealtimeIndexTask extends AbstractTask
// Time to read data!
while (firehose != null && (!gracefullyStopped || firehoseDrainableByClosing) && firehose.hasMore()) {
Plumbers.addNextRow(committerSupplier, firehose, plumber, fireDepartment.getMetrics());
Plumbers.addNextRow(
committerSupplier,
firehose,
plumber,
tuningConfig.isReportParseExceptions(),
metrics
);
}
}
catch (Throwable e) {
@ -459,6 +468,15 @@ public class RealtimeIndexTask extends AbstractTask
return firehose;
}
/**
* Public for tests.
*/
@JsonIgnore
public FireDepartmentMetrics getMetrics()
{
return metrics;
}
@JsonProperty("spec")
public FireDepartment getRealtimeIngestionSchema()
{
@ -468,10 +486,12 @@ public class RealtimeIndexTask extends AbstractTask
/**
* Is a firehose from this factory drainable by closing it? If so, we should drain on stopGracefully rather than
* abruptly stopping.
* <p>
*
* This is a hack to get around the fact that the Firehose and FirehoseFactory interfaces do not help us do this.
*
* Protected for tests.
*/
private static boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory)
protected boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory)
{
return firehoseFactory instanceof EventReceiverFirehoseFactory
|| (firehoseFactory instanceof TimedShutoffFirehoseFactory

View File

@ -20,14 +20,15 @@
package io.druid.indexing.common.task;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Charsets;
import com.google.api.client.util.Sets;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@ -36,6 +37,7 @@ import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import com.metamx.common.parsers.ParseException;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.LoggingEmitter;
import com.metamx.emitter.service.ServiceEmitter;
@ -43,8 +45,11 @@ import com.metamx.metrics.MonitorScheduler;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.MapCache;
import io.druid.concurrent.Execs;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskStatus;
@ -93,14 +98,12 @@ import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import io.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.metrics.EventReceiverFirehoseRegister;
import io.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory;
import io.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.After;
@ -108,6 +111,8 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -116,24 +121,18 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
@RunWith(Parameterized.class)
public class RealtimeIndexTaskTest
{
private static final Logger log = new Logger(RealtimeIndexTaskTest.class);
private static final DruidServerMetadata dummyServer = new DruidServerMetadata(
"dummy",
"dummy_host",
0,
"historical",
"dummy_tier",
0
);
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
private static final ServiceEmitter emitter = new ServiceEmitter(
"service",
@ -145,6 +144,89 @@ public class RealtimeIndexTaskTest
)
);
private static final String FAIL_DIM = "__fail__";
private static class TestFirehose implements Firehose
{
private final List<InputRow> queue = Lists.newLinkedList();
private boolean closed = false;
public void addRows(List<InputRow> rows)
{
synchronized (this) {
queue.addAll(rows);
notifyAll();
}
}
@Override
public boolean hasMore()
{
try {
synchronized (this) {
while (queue.isEmpty() && !closed) {
wait();
}
return !queue.isEmpty();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
}
@Override
public InputRow nextRow()
{
synchronized (this) {
final InputRow row = queue.remove(0);
if (row != null && row.getDimensions().contains(FAIL_DIM)) {
throw new ParseException(FAIL_DIM);
}
return row;
}
}
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
// do nothing
}
};
}
@Override
public void close() throws IOException
{
synchronized (this) {
closed = true;
notifyAll();
}
}
}
private static class TestFirehoseFactory implements FirehoseFactory
{
public TestFirehoseFactory()
{
}
@Override
public Firehose connect(InputRowParser parser) throws IOException, ParseException
{
return new TestFirehose();
}
}
@Rule
public final ExpectedException expectedException = ExpectedException.none();
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
@ -206,20 +288,24 @@ public class RealtimeIndexTaskTest
Thread.sleep(50);
}
final EventReceiverFirehoseFactory.EventReceiverFirehose firehose =
(EventReceiverFirehoseFactory.EventReceiverFirehose) task.getFirehose();
final TestFirehose firehose = (TestFirehose) task.getFirehose();
firehose.addRows(
ImmutableList.<InputRow>of(
new MapBasedInputRow(
now,
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo")
ImmutableMap.<String, Object>of("dim1", "foo", "met1", "1")
),
new MapBasedInputRow(
now.minus(new Period("P1D")),
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo", "met1", 2.0)
),
new MapBasedInputRow(
now,
ImmutableList.of("dim2"),
ImmutableMap.<String, Object>of("dim2", "bar")
ImmutableMap.<String, Object>of("dim2", "bar", "met1", 2.0)
)
)
);
@ -234,8 +320,160 @@ public class RealtimeIndexTaskTest
publishedSegment = Iterables.getOnlyElement(mdc.getPublished());
// Do a query.
Assert.assertEquals(2, countEvents(task));
// Check metrics.
Assert.assertEquals(2, task.getMetrics().processed());
Assert.assertEquals(1, task.getMetrics().thrownAway());
Assert.assertEquals(0, task.getMetrics().unparseable());
// Do some queries.
Assert.assertEquals(2, sumMetric(task, "rows"));
Assert.assertEquals(3, sumMetric(task, "met1"));
// Simulate handoff.
for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : handOffCallbacks.entrySet()) {
final Pair<Executor, Runnable> executorRunnablePair = entry.getValue();
Assert.assertEquals(
new SegmentDescriptor(
publishedSegment.getInterval(),
publishedSegment.getVersion(),
publishedSegment.getShardSpec().getPartitionNum()
),
entry.getKey()
);
executorRunnablePair.lhs.execute(executorRunnablePair.rhs);
}
handOffCallbacks.clear();
// Wait for the task to finish.
final TaskStatus taskStatus = statusFuture.get();
Assert.assertEquals(TaskStatus.Status.SUCCESS, taskStatus.getStatusCode());
}
@Test(timeout = 60_000L)
public void testReportParseExceptionsOnBadMetric() throws Exception
{
final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
final RealtimeIndexTask task = makeRealtimeTask(null, true);
final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder());
final ListenableFuture<TaskStatus> statusFuture = runTask(task, taskToolbox);
// Wait for firehose to show up, it starts off null.
while (task.getFirehose() == null) {
Thread.sleep(50);
}
final TestFirehose firehose = (TestFirehose) task.getFirehose();
firehose.addRows(
ImmutableList.<InputRow>of(
new MapBasedInputRow(
now,
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo", "met1", "1")
),
new MapBasedInputRow(
now,
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo", "met1", "foo")
),
new MapBasedInputRow(
now.minus(new Period("P1D")),
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo", "met1", "foo")
),
new MapBasedInputRow(
now,
ImmutableList.of("dim2"),
ImmutableMap.<String, Object>of("dim2", "bar", "met1", 2.0)
)
)
);
// Stop the firehose, this will drain out existing events.
firehose.close();
// Wait for the task to finish.
expectedException.expect(ExecutionException.class);
expectedException.expectCause(CoreMatchers.<Throwable>instanceOf(ParseException.class));
expectedException.expectCause(
ThrowableMessageMatcher.hasMessage(
CoreMatchers.containsString("Unable to parse metrics[met1], value[foo]")
)
);
statusFuture.get();
}
@Test(timeout = 60_000L)
public void testNoReportParseExceptions() throws Exception
{
final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
final RealtimeIndexTask task = makeRealtimeTask(null, false);
final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder());
final ListenableFuture<TaskStatus> statusFuture = runTask(task, taskToolbox);
final DataSegment publishedSegment;
// Wait for firehose to show up, it starts off null.
while (task.getFirehose() == null) {
Thread.sleep(50);
}
final TestFirehose firehose = (TestFirehose) task.getFirehose();
firehose.addRows(
Arrays.<InputRow>asList(
// Good row- will be processed.
new MapBasedInputRow(
now,
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo", "met1", "1")
),
// Null row- will be unparseable.
null,
// Bad metric- will count as processed, but that particular metric won't update.
new MapBasedInputRow(
now,
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo", "met1", "foo")
),
// Bad row- will be unparseable.
new MapBasedInputRow(
now,
ImmutableList.of("dim1", FAIL_DIM),
ImmutableMap.<String, Object>of("dim1", "foo", "met1", 2.0)
),
// Old row- will be thrownAway.
new MapBasedInputRow(
now.minus(new Period("P1D")),
ImmutableList.of("dim1"),
ImmutableMap.<String, Object>of("dim1", "foo", "met1", 2.0)
),
// Good row- will be processed.
new MapBasedInputRow(
now,
ImmutableList.of("dim2"),
ImmutableMap.<String, Object>of("dim2", "bar", "met1", 2.0)
)
)
);
// Stop the firehose, this will drain out existing events.
firehose.close();
// Wait for publish.
while (mdc.getPublished().isEmpty()) {
Thread.sleep(50);
}
publishedSegment = Iterables.getOnlyElement(mdc.getPublished());
// Check metrics.
Assert.assertEquals(3, task.getMetrics().processed());
Assert.assertEquals(1, task.getMetrics().thrownAway());
Assert.assertEquals(2, task.getMetrics().unparseable());
// Do some queries.
Assert.assertEquals(3, sumMetric(task, "rows"));
Assert.assertEquals(3, sumMetric(task, "met1"));
// Simulate handoff.
for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : handOffCallbacks.entrySet()) {
@ -275,8 +513,7 @@ public class RealtimeIndexTaskTest
Thread.sleep(50);
}
final EventReceiverFirehoseFactory.EventReceiverFirehose firehose =
(EventReceiverFirehoseFactory.EventReceiverFirehose) task1.getFirehose();
final TestFirehose firehose = (TestFirehose) task1.getFirehose();
firehose.addRows(
ImmutableList.<InputRow>of(
@ -312,10 +549,9 @@ public class RealtimeIndexTaskTest
}
// Do a query, at this point the previous data should be loaded.
Assert.assertEquals(1, countEvents(task2));
Assert.assertEquals(1, sumMetric(task2, "rows"));
final EventReceiverFirehoseFactory.EventReceiverFirehose firehose =
(EventReceiverFirehoseFactory.EventReceiverFirehose) task2.getFirehose();
final TestFirehose firehose = (TestFirehose) task2.getFirehose();
firehose.addRows(
ImmutableList.<InputRow>of(
@ -338,7 +574,7 @@ public class RealtimeIndexTaskTest
publishedSegment = Iterables.getOnlyElement(mdc.getPublished());
// Do a query.
Assert.assertEquals(2, countEvents(task2));
Assert.assertEquals(2, sumMetric(task2, "rows"));
// Simulate handoff.
for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : handOffCallbacks.entrySet()) {
@ -361,7 +597,7 @@ public class RealtimeIndexTaskTest
}
}
@Test(timeout = 10000L)
@Test(timeout = 60_000L)
public void testRestoreAfterHandoffAttemptDuringShutdown() throws Exception
{
final TaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
@ -380,8 +616,7 @@ public class RealtimeIndexTaskTest
Thread.sleep(50);
}
final EventReceiverFirehoseFactory.EventReceiverFirehose firehose =
(EventReceiverFirehoseFactory.EventReceiverFirehose) task1.getFirehose();
final TestFirehose firehose = (TestFirehose) task1.getFirehose();
firehose.addRows(
ImmutableList.<InputRow>of(
@ -404,7 +639,7 @@ public class RealtimeIndexTaskTest
publishedSegment = Iterables.getOnlyElement(mdc.getPublished());
// Do a query.
Assert.assertEquals(1, countEvents(task1));
Assert.assertEquals(1, sumMetric(task1, "rows"));
// Trigger graceful shutdown.
task1.stopGracefully();
@ -427,8 +662,7 @@ public class RealtimeIndexTaskTest
}
// Stop the firehose again, this will start another handoff.
final EventReceiverFirehoseFactory.EventReceiverFirehose firehose =
(EventReceiverFirehoseFactory.EventReceiverFirehose) task2.getFirehose();
final TestFirehose firehose = (TestFirehose) task2.getFirehose();
// Stop the firehose, this will trigger a finishJob.
firehose.close();
@ -462,7 +696,7 @@ public class RealtimeIndexTaskTest
}
}
@Test(timeout = 10000L)
@Test(timeout = 60_000L)
public void testRestoreCorruptData() throws Exception
{
final File directory = tempFolder.newFolder();
@ -479,8 +713,7 @@ public class RealtimeIndexTaskTest
Thread.sleep(50);
}
final EventReceiverFirehoseFactory.EventReceiverFirehose firehose =
(EventReceiverFirehoseFactory.EventReceiverFirehose) task1.getFirehose();
final TestFirehose firehose = (TestFirehose) task1.getFirehose();
firehose.addRows(
ImmutableList.<InputRow>of(
@ -536,7 +769,7 @@ public class RealtimeIndexTaskTest
}
}
@Test(timeout = 10000L)
@Test(timeout = 60_000L)
public void testStopBeforeStarting() throws Exception
{
final File directory = tempFolder.newFolder();
@ -577,24 +810,22 @@ public class RealtimeIndexTaskTest
}
private RealtimeIndexTask makeRealtimeTask(final String taskId)
{
return makeRealtimeTask(taskId, true);
}
private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportParseExceptions)
{
ObjectMapper objectMapper = new DefaultObjectMapper();
DataSchema dataSchema = new DataSchema(
"test_ds",
null,
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new AggregatorFactory[]{new CountAggregatorFactory("rows"), new LongSumAggregatorFactory("met1", "met1")},
new UniformGranularitySpec(Granularity.DAY, QueryGranularity.NONE, null),
objectMapper
);
RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig(
new EventReceiverFirehoseFactory(
"foo",
100,
new NoopChatHandlerProvider(),
objectMapper,
null,
new EventReceiverFirehoseRegister()
),
new TestFirehoseFactory(),
null,
null
);
@ -604,19 +835,28 @@ public class RealtimeIndexTaskTest
new Period("PT10M"),
null,
null,
null,
new ServerTimeRejectionPolicyFactory(),
null,
null,
null,
buildV9Directly,
0, 0
0,
0,
reportParseExceptions
);
return new RealtimeIndexTask(
taskId,
null,
new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig),
null
);
)
{
@Override
protected boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory)
{
return true;
}
};
}
private TaskToolbox makeToolbox(
@ -760,14 +1000,14 @@ public class RealtimeIndexTaskTest
return toolboxFactory.build(task);
}
public long countEvents(final Task task) throws Exception
public long sumMetric(final Task task, final String metric) throws Exception
{
// Do a query.
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("test_ds")
.aggregators(
ImmutableList.<AggregatorFactory>of(
new LongSumAggregatorFactory("rows", "rows")
new LongSumAggregatorFactory(metric, metric)
)
).granularity(QueryGranularity.ALL)
.intervals("2000/3000")
@ -777,6 +1017,6 @@ public class RealtimeIndexTaskTest
task.getQueryRunner(query).run(query, ImmutableMap.<String, Object>of()),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
return results.get(0).getValue().getLongMetric("rows");
return results.isEmpty() ? 0 : results.get(0).getValue().getLongMetric(metric);
}
}

View File

@ -335,7 +335,8 @@ public class TaskSerdeTest
indexSpec,
null,
0,
0
0,
true
)
),
null
@ -358,6 +359,7 @@ public class TaskSerdeTest
Granularity.HOUR,
task.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity()
);
Assert.assertTrue(task.getRealtimeIngestionSchema().getTuningConfig().isReportParseExceptions());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());

View File

@ -144,6 +144,7 @@ public class IngestSegmentFirehoseFactoryTest
.build();
final OnheapIncrementalIndex index = new OnheapIncrementalIndex(
schema,
true,
MAX_ROWS * MAX_SHARD_NUMBER
);

View File

@ -218,7 +218,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
}
)
.build();
final OnheapIncrementalIndex index = new OnheapIncrementalIndex(schema, rows.length);
final OnheapIncrementalIndex index = new OnheapIncrementalIndex(schema, true, rows.length);
for (InputRow row : rows) {
try {

View File

@ -1179,7 +1179,9 @@ public class TaskLifecycleTest
null,
null,
null,
0, 0
0,
0,
null
);
FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig);
return new RealtimeIndexTask(

View File

@ -87,6 +87,7 @@ public class GroupByQueryHelper
gran,
aggs.toArray(new AggregatorFactory[aggs.size()]),
false,
true,
config.getMaxResults(),
bufferPool
);
@ -98,6 +99,7 @@ public class GroupByQueryHelper
gran,
aggs.toArray(new AggregatorFactory[aggs.size()]),
false,
true,
config.getMaxResults()
);
}

View File

@ -51,7 +51,6 @@ import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.serde.ComplexMetricExtractor;
import io.druid.segment.serde.ComplexMetricSerde;
@ -268,6 +267,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
private final AggregatorFactory[] metrics;
private final AggregatorType[] aggs;
private final boolean deserializeComplexMetrics;
private final boolean reportParseExceptions;
private final Metadata metadata;
private final Map<String, MetricDesc> metricDescs;
@ -295,10 +295,13 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
* @param incrementalIndexSchema the schema to use for incremental index
* @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input
* value for aggregators that return metrics other than float.
* @param reportParseExceptions flag whether or not to report ParseExceptions that occur while extracting values
* from input rows
*/
public IncrementalIndex(
final IncrementalIndexSchema incrementalIndexSchema,
final boolean deserializeComplexMetrics
final boolean deserializeComplexMetrics,
final boolean reportParseExceptions
)
{
this.minTimestamp = incrementalIndexSchema.getMinTimestamp();
@ -306,6 +309,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
this.metrics = incrementalIndexSchema.getMetrics();
this.rowTransformers = new CopyOnWriteArrayList<>();
this.deserializeComplexMetrics = deserializeComplexMetrics;
this.reportParseExceptions = reportParseExceptions;
this.metadata = new Metadata().setAggregators(getCombiningAggregators(metrics));
@ -367,6 +371,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
protected abstract Integer addToFacts(
AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
InputRow row,
AtomicInteger numEntries,
TimeAndDims key,
@ -416,7 +421,16 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
*/
public int add(InputRow row) throws IndexSizeExceededException {
TimeAndDims key = toTimeAndDims(row);
final int rv = addToFacts(metrics, deserializeComplexMetrics, row, numEntries, key, in, rowSupplier);
final int rv = addToFacts(
metrics,
deserializeComplexMetrics,
reportParseExceptions,
row,
numEntries,
key,
in,
rowSupplier
);
updateMaxIngestedTime(row.getTimestamp());
return rv;
}

View File

@ -24,6 +24,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.parsers.ParseException;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
@ -69,11 +70,12 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
public OffheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
int maxRowCount,
StupidPool<ByteBuffer> bufferPool
)
{
super(incrementalIndexSchema, deserializeComplexMetrics);
super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions);
this.maxRowCount = maxRowCount;
this.bufferPool = bufferPool;
this.facts = new ConcurrentSkipListMap<>(dimsComparator());
@ -97,6 +99,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
QueryGranularity gran,
final AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
int maxRowCount,
StupidPool<ByteBuffer> bufferPool
)
@ -107,6 +110,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
.withMetrics(metrics)
.build(),
deserializeComplexMetrics,
reportParseExceptions,
maxRowCount,
bufferPool
);
@ -126,6 +130,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
.withMetrics(metrics)
.build(),
true,
true,
maxRowCount,
bufferPool
);
@ -181,6 +186,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
protected Integer addToFacts(
AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
InputRow row,
AtomicInteger numEntries,
TimeAndDims key,
@ -264,7 +270,14 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
final BufferAggregator agg = getAggs()[i];
synchronized (agg) {
agg.aggregate(aggBuffer, bufferOffset + aggOffsetInBuffer[i]);
try {
agg.aggregate(aggBuffer, bufferOffset + aggOffsetInBuffer[i]);
} catch (ParseException e) {
// "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
if (reportParseExceptions) {
throw e;
}
}
}
}
rowContainer.set(null);

View File

@ -22,6 +22,7 @@ package io.druid.segment.incremental;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.common.parsers.ParseException;
import io.druid.data.input.InputRow;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.Aggregator;
@ -56,10 +57,11 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
public OnheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
int maxRowCount
)
{
super(incrementalIndexSchema, deserializeComplexMetrics);
super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions);
this.maxRowCount = maxRowCount;
this.facts = new ConcurrentSkipListMap<>(dimsComparator());
}
@ -69,6 +71,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
QueryGranularity gran,
final AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
int maxRowCount
)
{
@ -78,6 +81,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
.withMetrics(metrics)
.build(),
deserializeComplexMetrics,
reportParseExceptions,
maxRowCount
);
}
@ -95,16 +99,18 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
.withMetrics(metrics)
.build(),
true,
true,
maxRowCount
);
}
public OnheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
boolean reportParseExceptions,
int maxRowCount
)
{
this(incrementalIndexSchema, true, maxRowCount);
this(incrementalIndexSchema, true, reportParseExceptions, maxRowCount);
}
@Override
@ -139,6 +145,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
protected Integer addToFacts(
AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
InputRow row,
AtomicInteger numEntries,
TimeAndDims key,
@ -188,7 +195,14 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
for (Aggregator agg : aggs) {
synchronized (agg) {
agg.aggregate();
try {
agg.aggregate();
} catch (ParseException e) {
// "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
if (reportParseExceptions) {
throw e;
}
}
}
}

View File

@ -98,6 +98,7 @@ public class MultiValuedDimensionTest
new CountAggregatorFactory("count")
},
true,
true,
5000
);

View File

@ -311,7 +311,7 @@ public class AggregationTestHelper
List<File> toMerge = new ArrayList<>();
try {
index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, maxRowCount);
index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, maxRowCount);
while (rows.hasNext()) {
Object row = rows.next();
if (!index.canAppendRow()) {
@ -319,7 +319,7 @@ public class AggregationTestHelper
toMerge.add(tmp);
indexMerger.persist(index, tmp, new IndexSpec());
index.close();
index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, maxRowCount);
index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, maxRowCount);
}
if (row instanceof String && parser instanceof StringInputRowParser) {
//Note: this is required because StringInputRowParser is InputRowParser<ByteBuffer> as opposed to

View File

@ -882,9 +882,9 @@ public class IndexMergerTest
.build();
IncrementalIndex toPersist1 = new OnheapIncrementalIndex(schema, 1000);
IncrementalIndex toPersist2 = new OnheapIncrementalIndex(schema, 1000);
IncrementalIndex toPersist3 = new OnheapIncrementalIndex(schema, 1000);
IncrementalIndex toPersist1 = new OnheapIncrementalIndex(schema, true, 1000);
IncrementalIndex toPersist2 = new OnheapIncrementalIndex(schema, true, 1000);
IncrementalIndex toPersist3 = new OnheapIncrementalIndex(schema, true, 1000);
addDimValuesToIndex(toPersist1, "dimA", Arrays.asList("1", "2"));
addDimValuesToIndex(toPersist2, "dimA", Arrays.asList("1", "2"));
@ -1626,10 +1626,7 @@ public class IndexMergerTest
new AggregatorFactory[]{new CountAggregatorFactory("count")}
);
return new OnheapIncrementalIndex(
schema,
1000
);
return new OnheapIncrementalIndex(schema, true, 1000);
}
private AggregatorFactory[] getCombiningAggregators(AggregatorFactory[] aggregators)

View File

@ -181,10 +181,7 @@ public class TestIndex
.withQueryGranularity(QueryGranularity.NONE)
.withMetrics(METRIC_AGGS)
.build();
final IncrementalIndex retVal = new OnheapIncrementalIndex(
schema,
10000
);
final IncrementalIndex retVal = new OnheapIncrementalIndex(schema, true, 10000);
final AtomicLong startTime = new AtomicLong();
int lineCount;

View File

@ -32,6 +32,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.guava.Sequences;
import com.metamx.common.parsers.ParseException;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.granularity.QueryGranularity;
@ -135,6 +136,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
protected Integer addToFacts(
AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
InputRow row,
AtomicInteger numEntries,
TimeAndDims key,
@ -185,7 +187,15 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
for (Aggregator agg : aggs) {
synchronized (agg) {
agg.aggregate();
try {
agg.aggregate();
}
catch (ParseException e) {
// "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
if (reportParseExceptions) {
throw e;
}
}
}
}
@ -225,7 +235,8 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
return new MapBasedInputRow(timestamp, dimensionList, builder.build());
}
@Ignore @Test
@Ignore
@Test
@BenchmarkOptions(callgc = true, clock = Clock.REAL_TIME, warmupRounds = 10, benchmarkRounds = 20)
public void testConcurrentAddRead()
throws InterruptedException, ExecutionException, NoSuchMethodException, IllegalAccessException,

View File

@ -47,6 +47,7 @@ public class RealtimeTuningConfig implements TuningConfig
private static final ShardSpec defaultShardSpec = new NoneShardSpec();
private static final IndexSpec defaultIndexSpec = new IndexSpec();
private static final Boolean defaultBuildV9Directly = Boolean.FALSE;
private static final Boolean defaultReportParseExceptions = Boolean.FALSE;
// Might make sense for this to be a builder
public static RealtimeTuningConfig makeDefaultTuningConfig()
@ -63,7 +64,8 @@ public class RealtimeTuningConfig implements TuningConfig
defaultIndexSpec,
defaultBuildV9Directly,
0,
0
0,
defaultReportParseExceptions
);
}
@ -76,9 +78,10 @@ public class RealtimeTuningConfig implements TuningConfig
private final int maxPendingPersists;
private final ShardSpec shardSpec;
private final IndexSpec indexSpec;
private final Boolean buildV9Directly;
private final boolean buildV9Directly;
private final int persistThreadPriority;
private final int mergeThreadPriority;
private final boolean reportParseExceptions;
@JsonCreator
public RealtimeTuningConfig(
@ -93,7 +96,8 @@ public class RealtimeTuningConfig implements TuningConfig
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("persistThreadPriority") int persistThreadPriority,
@JsonProperty("mergeThreadPriority") int mergeThreadPriority
@JsonProperty("mergeThreadPriority") int mergeThreadPriority,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions
)
{
this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory;
@ -112,6 +116,9 @@ public class RealtimeTuningConfig implements TuningConfig
this.buildV9Directly = buildV9Directly == null ? defaultBuildV9Directly : buildV9Directly;
this.mergeThreadPriority = mergeThreadPriority;
this.persistThreadPriority = persistThreadPriority;
this.reportParseExceptions = reportParseExceptions == null
? defaultReportParseExceptions
: reportParseExceptions;
}
@JsonProperty
@ -174,6 +181,7 @@ public class RealtimeTuningConfig implements TuningConfig
return buildV9Directly;
}
@JsonProperty
public int getPersistThreadPriority()
{
return this.persistThreadPriority;
@ -185,6 +193,12 @@ public class RealtimeTuningConfig implements TuningConfig
return this.mergeThreadPriority;
}
@JsonProperty
public boolean isReportParseExceptions()
{
return reportParseExceptions;
}
public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
{
return new RealtimeTuningConfig(
@ -199,7 +213,8 @@ public class RealtimeTuningConfig implements TuningConfig
indexSpec,
buildV9Directly,
persistThreadPriority,
mergeThreadPriority
mergeThreadPriority,
reportParseExceptions
);
}
@ -217,7 +232,8 @@ public class RealtimeTuningConfig implements TuningConfig
indexSpec,
buildV9Directly,
persistThreadPriority,
mergeThreadPriority
mergeThreadPriority,
reportParseExceptions
);
}
}

View File

@ -30,7 +30,6 @@ import com.google.inject.Inject;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.parsers.ParseException;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose;
@ -45,7 +44,6 @@ import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.SegmentDescriptor;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.plumber.Committers;
@ -340,7 +338,7 @@ public class RealtimeManager implements QuerySegmentWalker
{
final Supplier<Committer> committerSupplier = Committers.supplierFromFirehose(firehose);
while (firehose.hasMore()) {
Plumbers.addNextRow(committerSupplier, firehose, plumber, metrics);
Plumbers.addNextRow(committerSupplier, firehose, plumber, config.isReportParseExceptions(), metrics);
}
}

View File

@ -42,6 +42,7 @@ public class Plumbers
final Supplier<Committer> committerSupplier,
final Firehose firehose,
final Plumber plumber,
final boolean reportParseExceptions,
final FireDepartmentMetrics metrics
)
{
@ -49,9 +50,13 @@ public class Plumbers
final InputRow inputRow = firehose.nextRow();
if (inputRow == null) {
log.debug("Discarded null input row, considering unparseable.");
metrics.incrementUnparseable();
return;
if (reportParseExceptions) {
throw new ParseException("null input row");
} else {
log.debug("Discarded null input row, considering unparseable.");
metrics.incrementUnparseable();
return;
}
}
// Included in ParseException try/catch, as additional parsing can be done during indexing.
@ -66,8 +71,12 @@ public class Plumbers
metrics.incrementProcessed();
}
catch (ParseException e) {
log.debug(e, "Discarded row due to exception, considering unparseable.");
metrics.incrementUnparseable();
if (reportParseExceptions) {
throw e;
} else {
log.debug(e, "Discarded row due to exception, considering unparseable.");
metrics.incrementUnparseable();
}
}
catch (IndexSizeExceededException e) {
// Shouldn't happen if this is only being called by a single thread.

View File

@ -199,6 +199,7 @@ public class Sink implements Iterable<FireHydrant>
.build();
final IncrementalIndex newIndex = new OnheapIncrementalIndex(
indexSchema,
config.isReportParseExceptions(),
config.getMaxRowsInMemory()
);

View File

@ -116,9 +116,7 @@ public class FireDepartmentTest
),
null
),
new RealtimeTuningConfig(
null, null, null, null, null, null, null, null, null, null, 0, 0
)
RealtimeTuningConfig.makeDefaultTuningConfig()
);
String json = jsonMapper.writeValueAsString(schema);

View File

@ -153,7 +153,8 @@ public class RealtimeManagerTest
null,
null,
0,
0
0,
null
);
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString()));

View File

@ -120,7 +120,7 @@ public class IngestSegmentFirehoseTest
IncrementalIndex index = null;
try {
index = new OnheapIncrementalIndex(0, QueryGranularity.NONE, aggregators, true, 5000);
index = new OnheapIncrementalIndex(0, QueryGranularity.NONE, aggregators, true, true, 5000);
for (String line : rows) {
index.add(parser.parse(line));
}

View File

@ -195,7 +195,9 @@ public class RealtimePlumberSchoolTest
null,
null,
buildV9Directly,
0, 0
0,
0,
false
);
realtimePlumberSchool = new RealtimePlumberSchool(

View File

@ -68,7 +68,9 @@ public class SinkTest
null,
null,
null,
0, 0
0,
0,
null
);
final Sink sink = new Sink(interval, schema, tuningConfig, version);