diff --git a/docs/content/ingestion/stream-pull.md b/docs/content/ingestion/stream-pull.md index 18937d88344..e43b15a14af 100644 --- a/docs/content/ingestion/stream-pull.md +++ b/docs/content/ingestion/stream-pull.md @@ -153,6 +153,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |buildV9Directly|Boolean|Whether to build v9 index directly instead of building v8 index and convert it to v9 format|no (default = false)| |persistThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the persisting thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default = 0; inherit and do not override)| |mergeThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the merging thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default = 0; inherit and do not override)| +|reportParseExceptions|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion. If false, unparseable rows and fields will be skipped. If an entire row is skipped, the "unparseable" counter will be incremented. If some fields in a row were parseable and some were not, the parseable fields will be indexed and the "unparseable" counter will not be incremented.|false| Before enabling thread priority settings, users are highly encouraged to read the [original pull request](https://github.com/druid-io/druid/pull/984) and other documentation about proper use of `-XX:+UseThreadPriorities`. diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index c69c705cff8..ca1db6bcc1d 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -229,6 +229,7 @@ public class IndexGeneratorJob implements Jobby OnheapIncrementalIndex newIndex = new OnheapIncrementalIndex( indexSchema, + !tuningConfig.isIgnoreInvalidRows(), tuningConfig.getRowFlushBoundary() ); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index f237f0f26e0..8d23b54a214 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -143,7 +143,8 @@ public class IndexTask extends AbstractFixedIntervalTask indexSpec, buildV9Directly, 0, - 0 + 0, + true ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 568b11926b2..192d79685c9 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -28,12 +28,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.primitives.Ints; import com.metamx.common.guava.CloseQuietly; -import com.metamx.common.parsers.ParseException; import com.metamx.emitter.EmittingLogger; import io.druid.data.input.Committer; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; -import io.druid.data.input.InputRow; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; @@ -51,6 +49,7 @@ import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeIOConfig; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.RealtimeMetricsMonitor; import io.druid.segment.realtime.SegmentPublisher; import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; @@ -117,6 +116,9 @@ public class RealtimeIndexTask extends AbstractTask @JsonIgnore private volatile Firehose firehose = null; + @JsonIgnore + private volatile FireDepartmentMetrics metrics = null; + @JsonIgnore private volatile boolean gracefullyStopped = false; @@ -285,6 +287,7 @@ public class RealtimeIndexTask extends AbstractTask realtimeIOConfig, tuningConfig ); + this.metrics = fireDepartment.getMetrics(); final RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor( ImmutableList.of(fireDepartment), ImmutableMap.of( @@ -313,7 +316,7 @@ public class RealtimeIndexTask extends AbstractTask toolbox.getObjectMapper() ); - this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, fireDepartment.getMetrics()); + this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, metrics); Supplier committerSupplier = null; @@ -337,7 +340,13 @@ public class RealtimeIndexTask extends AbstractTask // Time to read data! while (firehose != null && (!gracefullyStopped || firehoseDrainableByClosing) && firehose.hasMore()) { - Plumbers.addNextRow(committerSupplier, firehose, plumber, fireDepartment.getMetrics()); + Plumbers.addNextRow( + committerSupplier, + firehose, + plumber, + tuningConfig.isReportParseExceptions(), + metrics + ); } } catch (Throwable e) { @@ -459,6 +468,15 @@ public class RealtimeIndexTask extends AbstractTask return firehose; } + /** + * Public for tests. + */ + @JsonIgnore + public FireDepartmentMetrics getMetrics() + { + return metrics; + } + @JsonProperty("spec") public FireDepartment getRealtimeIngestionSchema() { @@ -468,10 +486,12 @@ public class RealtimeIndexTask extends AbstractTask /** * Is a firehose from this factory drainable by closing it? If so, we should drain on stopGracefully rather than * abruptly stopping. - *

+ * * This is a hack to get around the fact that the Firehose and FirehoseFactory interfaces do not help us do this. + * + * Protected for tests. */ - private static boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory) + protected boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory) { return firehoseFactory instanceof EventReceiverFirehoseFactory || (firehoseFactory instanceof TimedShutoffFirehoseFactory diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index 37914a8df42..269dd8099da 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -20,14 +20,15 @@ package io.druid.indexing.common.task; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.api.client.util.Charsets; -import com.google.api.client.util.Sets; +import com.google.common.base.Charsets; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -36,6 +37,7 @@ import com.metamx.common.ISE; import com.metamx.common.Pair; import com.metamx.common.guava.Sequences; import com.metamx.common.logger.Logger; +import com.metamx.common.parsers.ParseException; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.LoggingEmitter; import com.metamx.emitter.service.ServiceEmitter; @@ -43,8 +45,11 @@ import com.metamx.metrics.MonitorScheduler; import io.druid.client.cache.CacheConfig; import io.druid.client.cache.MapCache; import io.druid.concurrent.Execs; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; +import io.druid.data.input.impl.InputRowParser; import io.druid.granularity.QueryGranularity; import io.druid.indexing.common.SegmentLoaderFactory; import io.druid.indexing.common.TaskStatus; @@ -93,14 +98,12 @@ import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.StorageLocationConfig; import io.druid.segment.realtime.FireDepartment; -import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; -import io.druid.segment.realtime.firehose.NoopChatHandlerProvider; import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; -import io.druid.server.coordination.DruidServerMetadata; -import io.druid.server.metrics.EventReceiverFirehoseRegister; +import io.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory; import io.druid.timeline.DataSegment; import org.easymock.EasyMock; +import org.hamcrest.CoreMatchers; import org.joda.time.DateTime; import org.joda.time.Period; import org.junit.After; @@ -108,6 +111,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.internal.matchers.ThrowableMessageMatcher; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -116,24 +121,18 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @RunWith(Parameterized.class) public class RealtimeIndexTaskTest { private static final Logger log = new Logger(RealtimeIndexTaskTest.class); - private static final DruidServerMetadata dummyServer = new DruidServerMetadata( - "dummy", - "dummy_host", - 0, - "historical", - "dummy_tier", - 0 - ); private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); private static final ServiceEmitter emitter = new ServiceEmitter( "service", @@ -145,6 +144,89 @@ public class RealtimeIndexTaskTest ) ); + private static final String FAIL_DIM = "__fail__"; + + private static class TestFirehose implements Firehose + { + private final List queue = Lists.newLinkedList(); + private boolean closed = false; + + public void addRows(List rows) + { + synchronized (this) { + queue.addAll(rows); + notifyAll(); + } + } + + @Override + public boolean hasMore() + { + try { + synchronized (this) { + while (queue.isEmpty() && !closed) { + wait(); + } + return !queue.isEmpty(); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + } + + @Override + public InputRow nextRow() + { + synchronized (this) { + final InputRow row = queue.remove(0); + if (row != null && row.getDimensions().contains(FAIL_DIM)) { + throw new ParseException(FAIL_DIM); + } + return row; + } + } + + @Override + public Runnable commit() + { + return new Runnable() + { + @Override + public void run() + { + // do nothing + } + }; + } + + @Override + public void close() throws IOException + { + synchronized (this) { + closed = true; + notifyAll(); + } + } + } + + private static class TestFirehoseFactory implements FirehoseFactory + { + public TestFirehoseFactory() + { + } + + @Override + public Firehose connect(InputRowParser parser) throws IOException, ParseException + { + return new TestFirehose(); + } + } + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); @@ -206,20 +288,24 @@ public class RealtimeIndexTaskTest Thread.sleep(50); } - final EventReceiverFirehoseFactory.EventReceiverFirehose firehose = - (EventReceiverFirehoseFactory.EventReceiverFirehose) task.getFirehose(); + final TestFirehose firehose = (TestFirehose) task.getFirehose(); firehose.addRows( ImmutableList.of( new MapBasedInputRow( now, ImmutableList.of("dim1"), - ImmutableMap.of("dim1", "foo") + ImmutableMap.of("dim1", "foo", "met1", "1") + ), + new MapBasedInputRow( + now.minus(new Period("P1D")), + ImmutableList.of("dim1"), + ImmutableMap.of("dim1", "foo", "met1", 2.0) ), new MapBasedInputRow( now, ImmutableList.of("dim2"), - ImmutableMap.of("dim2", "bar") + ImmutableMap.of("dim2", "bar", "met1", 2.0) ) ) ); @@ -234,8 +320,160 @@ public class RealtimeIndexTaskTest publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); - // Do a query. - Assert.assertEquals(2, countEvents(task)); + // Check metrics. + Assert.assertEquals(2, task.getMetrics().processed()); + Assert.assertEquals(1, task.getMetrics().thrownAway()); + Assert.assertEquals(0, task.getMetrics().unparseable()); + + // Do some queries. + Assert.assertEquals(2, sumMetric(task, "rows")); + Assert.assertEquals(3, sumMetric(task, "met1")); + + // Simulate handoff. + for (Map.Entry> entry : handOffCallbacks.entrySet()) { + final Pair executorRunnablePair = entry.getValue(); + Assert.assertEquals( + new SegmentDescriptor( + publishedSegment.getInterval(), + publishedSegment.getVersion(), + publishedSegment.getShardSpec().getPartitionNum() + ), + entry.getKey() + ); + executorRunnablePair.lhs.execute(executorRunnablePair.rhs); + } + handOffCallbacks.clear(); + + // Wait for the task to finish. + final TaskStatus taskStatus = statusFuture.get(); + Assert.assertEquals(TaskStatus.Status.SUCCESS, taskStatus.getStatusCode()); + } + + @Test(timeout = 60_000L) + public void testReportParseExceptionsOnBadMetric() throws Exception + { + final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); + final RealtimeIndexTask task = makeRealtimeTask(null, true); + final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder()); + final ListenableFuture statusFuture = runTask(task, taskToolbox); + + // Wait for firehose to show up, it starts off null. + while (task.getFirehose() == null) { + Thread.sleep(50); + } + + final TestFirehose firehose = (TestFirehose) task.getFirehose(); + + firehose.addRows( + ImmutableList.of( + new MapBasedInputRow( + now, + ImmutableList.of("dim1"), + ImmutableMap.of("dim1", "foo", "met1", "1") + ), + new MapBasedInputRow( + now, + ImmutableList.of("dim1"), + ImmutableMap.of("dim1", "foo", "met1", "foo") + ), + new MapBasedInputRow( + now.minus(new Period("P1D")), + ImmutableList.of("dim1"), + ImmutableMap.of("dim1", "foo", "met1", "foo") + ), + new MapBasedInputRow( + now, + ImmutableList.of("dim2"), + ImmutableMap.of("dim2", "bar", "met1", 2.0) + ) + ) + ); + + // Stop the firehose, this will drain out existing events. + firehose.close(); + + // Wait for the task to finish. + expectedException.expect(ExecutionException.class); + expectedException.expectCause(CoreMatchers.instanceOf(ParseException.class)); + expectedException.expectCause( + ThrowableMessageMatcher.hasMessage( + CoreMatchers.containsString("Unable to parse metrics[met1], value[foo]") + ) + ); + statusFuture.get(); + } + + @Test(timeout = 60_000L) + public void testNoReportParseExceptions() throws Exception + { + final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); + final RealtimeIndexTask task = makeRealtimeTask(null, false); + final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder()); + final ListenableFuture statusFuture = runTask(task, taskToolbox); + final DataSegment publishedSegment; + + // Wait for firehose to show up, it starts off null. + while (task.getFirehose() == null) { + Thread.sleep(50); + } + + final TestFirehose firehose = (TestFirehose) task.getFirehose(); + + firehose.addRows( + Arrays.asList( + // Good row- will be processed. + new MapBasedInputRow( + now, + ImmutableList.of("dim1"), + ImmutableMap.of("dim1", "foo", "met1", "1") + ), + // Null row- will be unparseable. + null, + // Bad metric- will count as processed, but that particular metric won't update. + new MapBasedInputRow( + now, + ImmutableList.of("dim1"), + ImmutableMap.of("dim1", "foo", "met1", "foo") + ), + // Bad row- will be unparseable. + new MapBasedInputRow( + now, + ImmutableList.of("dim1", FAIL_DIM), + ImmutableMap.of("dim1", "foo", "met1", 2.0) + ), + // Old row- will be thrownAway. + new MapBasedInputRow( + now.minus(new Period("P1D")), + ImmutableList.of("dim1"), + ImmutableMap.of("dim1", "foo", "met1", 2.0) + ), + // Good row- will be processed. + new MapBasedInputRow( + now, + ImmutableList.of("dim2"), + ImmutableMap.of("dim2", "bar", "met1", 2.0) + ) + ) + ); + + // Stop the firehose, this will drain out existing events. + firehose.close(); + + // Wait for publish. + while (mdc.getPublished().isEmpty()) { + Thread.sleep(50); + } + + publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); + + // Check metrics. + Assert.assertEquals(3, task.getMetrics().processed()); + Assert.assertEquals(1, task.getMetrics().thrownAway()); + Assert.assertEquals(2, task.getMetrics().unparseable()); + + // Do some queries. + Assert.assertEquals(3, sumMetric(task, "rows")); + Assert.assertEquals(3, sumMetric(task, "met1")); // Simulate handoff. for (Map.Entry> entry : handOffCallbacks.entrySet()) { @@ -275,8 +513,7 @@ public class RealtimeIndexTaskTest Thread.sleep(50); } - final EventReceiverFirehoseFactory.EventReceiverFirehose firehose = - (EventReceiverFirehoseFactory.EventReceiverFirehose) task1.getFirehose(); + final TestFirehose firehose = (TestFirehose) task1.getFirehose(); firehose.addRows( ImmutableList.of( @@ -312,10 +549,9 @@ public class RealtimeIndexTaskTest } // Do a query, at this point the previous data should be loaded. - Assert.assertEquals(1, countEvents(task2)); + Assert.assertEquals(1, sumMetric(task2, "rows")); - final EventReceiverFirehoseFactory.EventReceiverFirehose firehose = - (EventReceiverFirehoseFactory.EventReceiverFirehose) task2.getFirehose(); + final TestFirehose firehose = (TestFirehose) task2.getFirehose(); firehose.addRows( ImmutableList.of( @@ -338,7 +574,7 @@ public class RealtimeIndexTaskTest publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); // Do a query. - Assert.assertEquals(2, countEvents(task2)); + Assert.assertEquals(2, sumMetric(task2, "rows")); // Simulate handoff. for (Map.Entry> entry : handOffCallbacks.entrySet()) { @@ -361,7 +597,7 @@ public class RealtimeIndexTaskTest } } - @Test(timeout = 10000L) + @Test(timeout = 60_000L) public void testRestoreAfterHandoffAttemptDuringShutdown() throws Exception { final TaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); @@ -380,8 +616,7 @@ public class RealtimeIndexTaskTest Thread.sleep(50); } - final EventReceiverFirehoseFactory.EventReceiverFirehose firehose = - (EventReceiverFirehoseFactory.EventReceiverFirehose) task1.getFirehose(); + final TestFirehose firehose = (TestFirehose) task1.getFirehose(); firehose.addRows( ImmutableList.of( @@ -404,7 +639,7 @@ public class RealtimeIndexTaskTest publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); // Do a query. - Assert.assertEquals(1, countEvents(task1)); + Assert.assertEquals(1, sumMetric(task1, "rows")); // Trigger graceful shutdown. task1.stopGracefully(); @@ -427,8 +662,7 @@ public class RealtimeIndexTaskTest } // Stop the firehose again, this will start another handoff. - final EventReceiverFirehoseFactory.EventReceiverFirehose firehose = - (EventReceiverFirehoseFactory.EventReceiverFirehose) task2.getFirehose(); + final TestFirehose firehose = (TestFirehose) task2.getFirehose(); // Stop the firehose, this will trigger a finishJob. firehose.close(); @@ -462,7 +696,7 @@ public class RealtimeIndexTaskTest } } - @Test(timeout = 10000L) + @Test(timeout = 60_000L) public void testRestoreCorruptData() throws Exception { final File directory = tempFolder.newFolder(); @@ -479,8 +713,7 @@ public class RealtimeIndexTaskTest Thread.sleep(50); } - final EventReceiverFirehoseFactory.EventReceiverFirehose firehose = - (EventReceiverFirehoseFactory.EventReceiverFirehose) task1.getFirehose(); + final TestFirehose firehose = (TestFirehose) task1.getFirehose(); firehose.addRows( ImmutableList.of( @@ -536,7 +769,7 @@ public class RealtimeIndexTaskTest } } - @Test(timeout = 10000L) + @Test(timeout = 60_000L) public void testStopBeforeStarting() throws Exception { final File directory = tempFolder.newFolder(); @@ -577,24 +810,22 @@ public class RealtimeIndexTaskTest } private RealtimeIndexTask makeRealtimeTask(final String taskId) + { + return makeRealtimeTask(taskId, true); + } + + private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportParseExceptions) { ObjectMapper objectMapper = new DefaultObjectMapper(); DataSchema dataSchema = new DataSchema( "test_ds", null, - new AggregatorFactory[]{new CountAggregatorFactory("rows")}, + new AggregatorFactory[]{new CountAggregatorFactory("rows"), new LongSumAggregatorFactory("met1", "met1")}, new UniformGranularitySpec(Granularity.DAY, QueryGranularity.NONE, null), objectMapper ); RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig( - new EventReceiverFirehoseFactory( - "foo", - 100, - new NoopChatHandlerProvider(), - objectMapper, - null, - new EventReceiverFirehoseRegister() - ), + new TestFirehoseFactory(), null, null ); @@ -604,19 +835,28 @@ public class RealtimeIndexTaskTest new Period("PT10M"), null, null, - null, + new ServerTimeRejectionPolicyFactory(), null, null, null, buildV9Directly, - 0, 0 + 0, + 0, + reportParseExceptions ); return new RealtimeIndexTask( taskId, null, new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig), null - ); + ) + { + @Override + protected boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory) + { + return true; + } + }; } private TaskToolbox makeToolbox( @@ -760,14 +1000,14 @@ public class RealtimeIndexTaskTest return toolboxFactory.build(task); } - public long countEvents(final Task task) throws Exception + public long sumMetric(final Task task, final String metric) throws Exception { // Do a query. TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() .dataSource("test_ds") .aggregators( ImmutableList.of( - new LongSumAggregatorFactory("rows", "rows") + new LongSumAggregatorFactory(metric, metric) ) ).granularity(QueryGranularity.ALL) .intervals("2000/3000") @@ -777,6 +1017,6 @@ public class RealtimeIndexTaskTest task.getQueryRunner(query).run(query, ImmutableMap.of()), Lists.>newArrayList() ); - return results.get(0).getValue().getLongMetric("rows"); + return results.isEmpty() ? 0 : results.get(0).getValue().getLongMetric(metric); } } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 54825cf63cf..0c31d3df2ee 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -335,7 +335,8 @@ public class TaskSerdeTest indexSpec, null, 0, - 0 + 0, + true ) ), null @@ -358,6 +359,7 @@ public class TaskSerdeTest Granularity.HOUR, task.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity() ); + Assert.assertTrue(task.getRealtimeIngestionSchema().getTuningConfig().isReportParseExceptions()); Assert.assertEquals(task.getId(), task2.getId()); Assert.assertEquals(task.getGroupId(), task2.getGroupId()); diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index cb85e37e5bb..38ea377dd14 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -144,6 +144,7 @@ public class IngestSegmentFirehoseFactoryTest .build(); final OnheapIncrementalIndex index = new OnheapIncrementalIndex( schema, + true, MAX_ROWS * MAX_SHARD_NUMBER ); diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index 4baa7accc11..2bec7885455 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -218,7 +218,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest } ) .build(); - final OnheapIncrementalIndex index = new OnheapIncrementalIndex(schema, rows.length); + final OnheapIncrementalIndex index = new OnheapIncrementalIndex(schema, true, rows.length); for (InputRow row : rows) { try { diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index dbb3055bc74..1ed7caf47d9 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -1179,7 +1179,9 @@ public class TaskLifecycleTest null, null, null, - 0, 0 + 0, + 0, + null ); FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig); return new RealtimeIndexTask( diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index 49caa8013bd..f47f151903e 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -87,6 +87,7 @@ public class GroupByQueryHelper gran, aggs.toArray(new AggregatorFactory[aggs.size()]), false, + true, config.getMaxResults(), bufferPool ); @@ -98,6 +99,7 @@ public class GroupByQueryHelper gran, aggs.toArray(new AggregatorFactory[aggs.size()]), false, + true, config.getMaxResults() ); } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 6d45e547987..c690f201d40 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -51,7 +51,6 @@ import io.druid.segment.column.Column; import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.column.ColumnCapabilitiesImpl; import io.druid.segment.column.ValueType; -import io.druid.segment.data.Indexed; import io.druid.segment.data.IndexedInts; import io.druid.segment.serde.ComplexMetricExtractor; import io.druid.segment.serde.ComplexMetricSerde; @@ -268,6 +267,7 @@ public abstract class IncrementalIndex implements Iterable, private final AggregatorFactory[] metrics; private final AggregatorType[] aggs; private final boolean deserializeComplexMetrics; + private final boolean reportParseExceptions; private final Metadata metadata; private final Map metricDescs; @@ -295,10 +295,13 @@ public abstract class IncrementalIndex implements Iterable, * @param incrementalIndexSchema the schema to use for incremental index * @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input * value for aggregators that return metrics other than float. + * @param reportParseExceptions flag whether or not to report ParseExceptions that occur while extracting values + * from input rows */ public IncrementalIndex( final IncrementalIndexSchema incrementalIndexSchema, - final boolean deserializeComplexMetrics + final boolean deserializeComplexMetrics, + final boolean reportParseExceptions ) { this.minTimestamp = incrementalIndexSchema.getMinTimestamp(); @@ -306,6 +309,7 @@ public abstract class IncrementalIndex implements Iterable, this.metrics = incrementalIndexSchema.getMetrics(); this.rowTransformers = new CopyOnWriteArrayList<>(); this.deserializeComplexMetrics = deserializeComplexMetrics; + this.reportParseExceptions = reportParseExceptions; this.metadata = new Metadata().setAggregators(getCombiningAggregators(metrics)); @@ -367,6 +371,7 @@ public abstract class IncrementalIndex implements Iterable, protected abstract Integer addToFacts( AggregatorFactory[] metrics, boolean deserializeComplexMetrics, + boolean reportParseExceptions, InputRow row, AtomicInteger numEntries, TimeAndDims key, @@ -416,7 +421,16 @@ public abstract class IncrementalIndex implements Iterable, */ public int add(InputRow row) throws IndexSizeExceededException { TimeAndDims key = toTimeAndDims(row); - final int rv = addToFacts(metrics, deserializeComplexMetrics, row, numEntries, key, in, rowSupplier); + final int rv = addToFacts( + metrics, + deserializeComplexMetrics, + reportParseExceptions, + row, + numEntries, + key, + in, + rowSupplier + ); updateMaxIngestedTime(row.getTimestamp()); return rv; } diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index fdbb74ce579..47f4c5ca4f1 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -24,6 +24,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.metamx.common.IAE; import com.metamx.common.ISE; +import com.metamx.common.parsers.ParseException; import io.druid.collections.ResourceHolder; import io.druid.collections.StupidPool; import io.druid.data.input.InputRow; @@ -69,11 +70,12 @@ public class OffheapIncrementalIndex extends IncrementalIndex public OffheapIncrementalIndex( IncrementalIndexSchema incrementalIndexSchema, boolean deserializeComplexMetrics, + boolean reportParseExceptions, int maxRowCount, StupidPool bufferPool ) { - super(incrementalIndexSchema, deserializeComplexMetrics); + super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions); this.maxRowCount = maxRowCount; this.bufferPool = bufferPool; this.facts = new ConcurrentSkipListMap<>(dimsComparator()); @@ -97,6 +99,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex QueryGranularity gran, final AggregatorFactory[] metrics, boolean deserializeComplexMetrics, + boolean reportParseExceptions, int maxRowCount, StupidPool bufferPool ) @@ -107,6 +110,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex .withMetrics(metrics) .build(), deserializeComplexMetrics, + reportParseExceptions, maxRowCount, bufferPool ); @@ -126,6 +130,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex .withMetrics(metrics) .build(), true, + true, maxRowCount, bufferPool ); @@ -181,6 +186,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex protected Integer addToFacts( AggregatorFactory[] metrics, boolean deserializeComplexMetrics, + boolean reportParseExceptions, InputRow row, AtomicInteger numEntries, TimeAndDims key, @@ -264,7 +270,14 @@ public class OffheapIncrementalIndex extends IncrementalIndex final BufferAggregator agg = getAggs()[i]; synchronized (agg) { - agg.aggregate(aggBuffer, bufferOffset + aggOffsetInBuffer[i]); + try { + agg.aggregate(aggBuffer, bufferOffset + aggOffsetInBuffer[i]); + } catch (ParseException e) { + // "aggregate" can throw ParseExceptions if a selector expects something but gets something else. + if (reportParseExceptions) { + throw e; + } + } } } rowContainer.set(null); diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java index 6b4e6b39214..67252400dd5 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -22,6 +22,7 @@ package io.druid.segment.incremental; import com.google.common.base.Supplier; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.metamx.common.parsers.ParseException; import io.druid.data.input.InputRow; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.Aggregator; @@ -56,10 +57,11 @@ public class OnheapIncrementalIndex extends IncrementalIndex public OnheapIncrementalIndex( IncrementalIndexSchema incrementalIndexSchema, boolean deserializeComplexMetrics, + boolean reportParseExceptions, int maxRowCount ) { - super(incrementalIndexSchema, deserializeComplexMetrics); + super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions); this.maxRowCount = maxRowCount; this.facts = new ConcurrentSkipListMap<>(dimsComparator()); } @@ -69,6 +71,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex QueryGranularity gran, final AggregatorFactory[] metrics, boolean deserializeComplexMetrics, + boolean reportParseExceptions, int maxRowCount ) { @@ -78,6 +81,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex .withMetrics(metrics) .build(), deserializeComplexMetrics, + reportParseExceptions, maxRowCount ); } @@ -95,16 +99,18 @@ public class OnheapIncrementalIndex extends IncrementalIndex .withMetrics(metrics) .build(), true, + true, maxRowCount ); } public OnheapIncrementalIndex( IncrementalIndexSchema incrementalIndexSchema, + boolean reportParseExceptions, int maxRowCount ) { - this(incrementalIndexSchema, true, maxRowCount); + this(incrementalIndexSchema, true, reportParseExceptions, maxRowCount); } @Override @@ -139,6 +145,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex protected Integer addToFacts( AggregatorFactory[] metrics, boolean deserializeComplexMetrics, + boolean reportParseExceptions, InputRow row, AtomicInteger numEntries, TimeAndDims key, @@ -188,7 +195,14 @@ public class OnheapIncrementalIndex extends IncrementalIndex for (Aggregator agg : aggs) { synchronized (agg) { - agg.aggregate(); + try { + agg.aggregate(); + } catch (ParseException e) { + // "aggregate" can throw ParseExceptions if a selector expects something but gets something else. + if (reportParseExceptions) { + throw e; + } + } } } diff --git a/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java b/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java index e3d8f947ee2..c56b4521bfc 100644 --- a/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java +++ b/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java @@ -98,6 +98,7 @@ public class MultiValuedDimensionTest new CountAggregatorFactory("count") }, true, + true, 5000 ); diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java index 3f11d467566..163a4a3b886 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java @@ -311,7 +311,7 @@ public class AggregationTestHelper List toMerge = new ArrayList<>(); try { - index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, maxRowCount); + index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, maxRowCount); while (rows.hasNext()) { Object row = rows.next(); if (!index.canAppendRow()) { @@ -319,7 +319,7 @@ public class AggregationTestHelper toMerge.add(tmp); indexMerger.persist(index, tmp, new IndexSpec()); index.close(); - index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, maxRowCount); + index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, maxRowCount); } if (row instanceof String && parser instanceof StringInputRowParser) { //Note: this is required because StringInputRowParser is InputRowParser as opposed to diff --git a/processing/src/test/java/io/druid/segment/IndexMergerTest.java b/processing/src/test/java/io/druid/segment/IndexMergerTest.java index ab1dc5af47d..9ba83d69a22 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerTest.java @@ -882,9 +882,9 @@ public class IndexMergerTest .build(); - IncrementalIndex toPersist1 = new OnheapIncrementalIndex(schema, 1000); - IncrementalIndex toPersist2 = new OnheapIncrementalIndex(schema, 1000); - IncrementalIndex toPersist3 = new OnheapIncrementalIndex(schema, 1000); + IncrementalIndex toPersist1 = new OnheapIncrementalIndex(schema, true, 1000); + IncrementalIndex toPersist2 = new OnheapIncrementalIndex(schema, true, 1000); + IncrementalIndex toPersist3 = new OnheapIncrementalIndex(schema, true, 1000); addDimValuesToIndex(toPersist1, "dimA", Arrays.asList("1", "2")); addDimValuesToIndex(toPersist2, "dimA", Arrays.asList("1", "2")); @@ -1626,10 +1626,7 @@ public class IndexMergerTest new AggregatorFactory[]{new CountAggregatorFactory("count")} ); - return new OnheapIncrementalIndex( - schema, - 1000 - ); + return new OnheapIncrementalIndex(schema, true, 1000); } private AggregatorFactory[] getCombiningAggregators(AggregatorFactory[] aggregators) diff --git a/processing/src/test/java/io/druid/segment/TestIndex.java b/processing/src/test/java/io/druid/segment/TestIndex.java index 8014e999430..c6ecc351c29 100644 --- a/processing/src/test/java/io/druid/segment/TestIndex.java +++ b/processing/src/test/java/io/druid/segment/TestIndex.java @@ -181,10 +181,7 @@ public class TestIndex .withQueryGranularity(QueryGranularity.NONE) .withMetrics(METRIC_AGGS) .build(); - final IncrementalIndex retVal = new OnheapIncrementalIndex( - schema, - 10000 - ); + final IncrementalIndex retVal = new OnheapIncrementalIndex(schema, true, 10000); final AtomicLong startTime = new AtomicLong(); int lineCount; diff --git a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java index da1707e8e9f..89a1195e3a8 100644 --- a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java +++ b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java @@ -32,6 +32,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.metamx.common.guava.Sequences; +import com.metamx.common.parsers.ParseException; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; import io.druid.granularity.QueryGranularity; @@ -135,6 +136,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark protected Integer addToFacts( AggregatorFactory[] metrics, boolean deserializeComplexMetrics, + boolean reportParseExceptions, InputRow row, AtomicInteger numEntries, TimeAndDims key, @@ -185,7 +187,15 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark for (Aggregator agg : aggs) { synchronized (agg) { - agg.aggregate(); + try { + agg.aggregate(); + } + catch (ParseException e) { + // "aggregate" can throw ParseExceptions if a selector expects something but gets something else. + if (reportParseExceptions) { + throw e; + } + } } } @@ -225,7 +235,8 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark return new MapBasedInputRow(timestamp, dimensionList, builder.build()); } - @Ignore @Test + @Ignore + @Test @BenchmarkOptions(callgc = true, clock = Clock.REAL_TIME, warmupRounds = 10, benchmarkRounds = 20) public void testConcurrentAddRead() throws InterruptedException, ExecutionException, NoSuchMethodException, IllegalAccessException, diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java index b2925a8e47e..07e42126230 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java @@ -47,6 +47,7 @@ public class RealtimeTuningConfig implements TuningConfig private static final ShardSpec defaultShardSpec = new NoneShardSpec(); private static final IndexSpec defaultIndexSpec = new IndexSpec(); private static final Boolean defaultBuildV9Directly = Boolean.FALSE; + private static final Boolean defaultReportParseExceptions = Boolean.FALSE; // Might make sense for this to be a builder public static RealtimeTuningConfig makeDefaultTuningConfig() @@ -63,7 +64,8 @@ public class RealtimeTuningConfig implements TuningConfig defaultIndexSpec, defaultBuildV9Directly, 0, - 0 + 0, + defaultReportParseExceptions ); } @@ -76,9 +78,10 @@ public class RealtimeTuningConfig implements TuningConfig private final int maxPendingPersists; private final ShardSpec shardSpec; private final IndexSpec indexSpec; - private final Boolean buildV9Directly; + private final boolean buildV9Directly; private final int persistThreadPriority; private final int mergeThreadPriority; + private final boolean reportParseExceptions; @JsonCreator public RealtimeTuningConfig( @@ -93,7 +96,8 @@ public class RealtimeTuningConfig implements TuningConfig @JsonProperty("indexSpec") IndexSpec indexSpec, @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("persistThreadPriority") int persistThreadPriority, - @JsonProperty("mergeThreadPriority") int mergeThreadPriority + @JsonProperty("mergeThreadPriority") int mergeThreadPriority, + @JsonProperty("reportParseExceptions") Boolean reportParseExceptions ) { this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory; @@ -112,6 +116,9 @@ public class RealtimeTuningConfig implements TuningConfig this.buildV9Directly = buildV9Directly == null ? defaultBuildV9Directly : buildV9Directly; this.mergeThreadPriority = mergeThreadPriority; this.persistThreadPriority = persistThreadPriority; + this.reportParseExceptions = reportParseExceptions == null + ? defaultReportParseExceptions + : reportParseExceptions; } @JsonProperty @@ -174,6 +181,7 @@ public class RealtimeTuningConfig implements TuningConfig return buildV9Directly; } + @JsonProperty public int getPersistThreadPriority() { return this.persistThreadPriority; @@ -185,6 +193,12 @@ public class RealtimeTuningConfig implements TuningConfig return this.mergeThreadPriority; } + @JsonProperty + public boolean isReportParseExceptions() + { + return reportParseExceptions; + } + public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) { return new RealtimeTuningConfig( @@ -199,7 +213,8 @@ public class RealtimeTuningConfig implements TuningConfig indexSpec, buildV9Directly, persistThreadPriority, - mergeThreadPriority + mergeThreadPriority, + reportParseExceptions ); } @@ -217,7 +232,8 @@ public class RealtimeTuningConfig implements TuningConfig indexSpec, buildV9Directly, persistThreadPriority, - mergeThreadPriority + mergeThreadPriority, + reportParseExceptions ); } } diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index fe00fec7d92..6f1080b8ea3 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -30,7 +30,6 @@ import com.google.inject.Inject; import com.metamx.common.guava.CloseQuietly; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; -import com.metamx.common.parsers.ParseException; import com.metamx.emitter.EmittingLogger; import io.druid.data.input.Committer; import io.druid.data.input.Firehose; @@ -45,7 +44,6 @@ import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChest; import io.druid.query.SegmentDescriptor; -import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.realtime.plumber.Committers; @@ -340,7 +338,7 @@ public class RealtimeManager implements QuerySegmentWalker { final Supplier committerSupplier = Committers.supplierFromFirehose(firehose); while (firehose.hasMore()) { - Plumbers.addNextRow(committerSupplier, firehose, plumber, metrics); + Plumbers.addNextRow(committerSupplier, firehose, plumber, config.isReportParseExceptions(), metrics); } } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Plumbers.java b/server/src/main/java/io/druid/segment/realtime/plumber/Plumbers.java index c3338f81c80..53511b0a10e 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Plumbers.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Plumbers.java @@ -42,6 +42,7 @@ public class Plumbers final Supplier committerSupplier, final Firehose firehose, final Plumber plumber, + final boolean reportParseExceptions, final FireDepartmentMetrics metrics ) { @@ -49,9 +50,13 @@ public class Plumbers final InputRow inputRow = firehose.nextRow(); if (inputRow == null) { - log.debug("Discarded null input row, considering unparseable."); - metrics.incrementUnparseable(); - return; + if (reportParseExceptions) { + throw new ParseException("null input row"); + } else { + log.debug("Discarded null input row, considering unparseable."); + metrics.incrementUnparseable(); + return; + } } // Included in ParseException try/catch, as additional parsing can be done during indexing. @@ -66,8 +71,12 @@ public class Plumbers metrics.incrementProcessed(); } catch (ParseException e) { - log.debug(e, "Discarded row due to exception, considering unparseable."); - metrics.incrementUnparseable(); + if (reportParseExceptions) { + throw e; + } else { + log.debug(e, "Discarded row due to exception, considering unparseable."); + metrics.incrementUnparseable(); + } } catch (IndexSizeExceededException e) { // Shouldn't happen if this is only being called by a single thread. diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index b84217bcb75..c6e4bb71265 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -199,6 +199,7 @@ public class Sink implements Iterable .build(); final IncrementalIndex newIndex = new OnheapIncrementalIndex( indexSchema, + config.isReportParseExceptions(), config.getMaxRowsInMemory() ); diff --git a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java index db2694a10df..2cdb8c09be6 100644 --- a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java +++ b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java @@ -116,9 +116,7 @@ public class FireDepartmentTest ), null ), - new RealtimeTuningConfig( - null, null, null, null, null, null, null, null, null, null, 0, 0 - ) + RealtimeTuningConfig.makeDefaultTuningConfig() ); String json = jsonMapper.writeValueAsString(schema); diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index c5884ebf6ac..0f1f31571fb 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -153,7 +153,8 @@ public class RealtimeManagerTest null, null, 0, - 0 + 0, + null ); plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString())); diff --git a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java index beeada2b51b..fb219618077 100644 --- a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java +++ b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java @@ -120,7 +120,7 @@ public class IngestSegmentFirehoseTest IncrementalIndex index = null; try { - index = new OnheapIncrementalIndex(0, QueryGranularity.NONE, aggregators, true, 5000); + index = new OnheapIncrementalIndex(0, QueryGranularity.NONE, aggregators, true, true, 5000); for (String line : rows) { index.add(parser.parse(line)); } diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 297d3940d58..e528c2571c9 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -195,7 +195,9 @@ public class RealtimePlumberSchoolTest null, null, buildV9Directly, - 0, 0 + 0, + 0, + false ); realtimePlumberSchool = new RealtimePlumberSchool( diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java index 34c85923f90..980cbf77d99 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java @@ -68,7 +68,9 @@ public class SinkTest null, null, null, - 0, 0 + 0, + 0, + null ); final Sink sink = new Sink(interval, schema, tuningConfig, version);