diff --git a/docs/content/ingestion/tasks.md b/docs/content/ingestion/tasks.md index 7d63884e2ee..dace2978dd5 100644 --- a/docs/content/ingestion/tasks.md +++ b/docs/content/ingestion/tasks.md @@ -279,13 +279,16 @@ An example of compaction task is This compaction task reads _all segments_ of the interval `2017-01-01/2018-01-01` and results in new segments. Note that intervals of the input segments are merged into a single interval of `2017-01-01/2018-01-01` no matter what the segmentGranularity was. -To controll the number of result segments, you can set `targetPartitionSize` or `numShards`. See [indexTuningConfig](#tuningconfig) for more details. +To control the number of result segments, you can set `targetPartitionSize` or `numShards`. See [indexTuningConfig](#tuningconfig) for more details. To merge each day's worth of data into separate segments, you can submit multiple `compact` tasks, one for each day. They will run in parallel. A compaction task internally generates an `index` task spec for performing compaction work with some fixed parameters. For example, its `firehose` is always the [ingestSegmentSpec](./firehose.html), and `dimensionsSpec` and `metricsSpec` include all dimensions and metrics of the input segments by default. +Compaction tasks will exit with a failure status code, without doing anything, if the interval you specify has no +data segments loaded in it (or if the interval you specify is empty). + The output segment can have different metadata from the input segments unless all input segments have the same metadata. - Dimensions: since Druid supports schema change, the dimensions can be different across segments even if they are a part of the same dataSource. diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java index 2ffd5be8a90..05c9c578a0a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java @@ -49,6 +49,7 @@ import io.druid.indexing.common.task.IndexTask.IndexIOConfig; import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec; import io.druid.indexing.common.task.IndexTask.IndexTuningConfig; import io.druid.indexing.firehose.IngestSegmentFirehoseFactory; +import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.JodaUtils; import io.druid.java.util.common.Pair; @@ -136,6 +137,10 @@ public class CompactionTask extends AbstractTask Preconditions.checkArgument(interval != null || segments != null, "interval or segments should be specified"); Preconditions.checkArgument(interval == null || segments == null, "one of interval and segments should be null"); + if (interval != null && interval.toDurationMillis() == 0) { + throw new IAE("Interval[%s] is empty, must specify a nonempty interval", interval); + } + this.interval = interval; this.segments = segments; this.dimensionsSpec = dimensionsSpec; @@ -225,7 +230,7 @@ public class CompactionTask extends AbstractTask } if (indexTaskSpec == null) { - log.warn("Failed to generate compaction spec"); + log.warn("Interval[%s] has no segments, nothing to do.", interval); return TaskStatus.failure(getId()); } else { final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(indexTaskSpec); @@ -237,7 +242,7 @@ public class CompactionTask extends AbstractTask /** * Generate {@link IndexIngestionSpec} from input segments. - + * * @return null if input segments don't exist. Otherwise, a generated ingestionSpec. */ @Nullable diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 0baafbdb81a..3efc39be9e8 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -190,7 +190,7 @@ public class IndexTask extends AbstractTask implements ChatHandler @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory ) { - this( + this( id, makeGroupId(ingestionSchema), taskResource, @@ -267,6 +267,13 @@ public class IndexTask extends AbstractTask implements ChatHandler static boolean isReady(TaskActionClient actionClient, SortedSet intervals) throws IOException { + // Sanity check preventing empty intervals (which cannot be locked, and don't make sense anyway). + for (Interval interval : intervals) { + if (interval.toDurationMillis() == 0) { + throw new ISE("Cannot run with empty interval[%s]", interval); + } + } + final List locks = getTaskLocks(actionClient); if (locks.size() == 0) { try { @@ -394,7 +401,7 @@ public class IndexTask extends AbstractTask implements ChatHandler } @Override - public TaskStatus run(final TaskToolbox toolbox) throws Exception + public TaskStatus run(final TaskToolbox toolbox) { try { if (chatHandlerProvider.isPresent()) { @@ -777,7 +784,7 @@ public class IndexTask extends AbstractTask implements ChatHandler determinePartitionsMeters.incrementUnparseable(); if (determinePartitionsMeters.getUnparseable() > ingestionSchema.getTuningConfig() - .getMaxParseExceptions()) { + .getMaxParseExceptions()) { throw new RuntimeException("Max parse exceptions exceeded, terminating task..."); } } @@ -917,7 +924,12 @@ public class IndexTask extends AbstractTask implements ChatHandler }; try ( - final Appenderator appenderator = newAppenderator(buildSegmentsFireDepartmentMetrics, toolbox, dataSchema, tuningConfig); + final Appenderator appenderator = newAppenderator( + buildSegmentsFireDepartmentMetrics, + toolbox, + dataSchema, + tuningConfig + ); final BatchAppenderatorDriver driver = newDriver(appenderator, toolbox, segmentAllocator); final Firehose firehose = firehoseFactory.connect(dataSchema.getParser(), firehoseTempDir) ) { @@ -1288,7 +1300,8 @@ public class IndexTask extends AbstractTask implements ChatHandler @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, @JsonProperty("publishTimeout") @Nullable Long publishTimeout, // deprecated @JsonProperty("pushTimeout") @Nullable Long pushTimeout, - @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + @JsonProperty("segmentWriteOutMediumFactory") @Nullable + SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions @@ -1368,12 +1381,16 @@ public class IndexTask extends AbstractTask implements ChatHandler this.maxParseExceptions = 0; this.maxSavedParseExceptions = maxSavedParseExceptions == null ? 0 : Math.min(1, maxSavedParseExceptions); } else { - this.maxParseExceptions = maxParseExceptions == null ? TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS : maxParseExceptions; + this.maxParseExceptions = maxParseExceptions == null + ? TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS + : maxParseExceptions; this.maxSavedParseExceptions = maxSavedParseExceptions == null - ? TuningConfig.DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS - : maxSavedParseExceptions; + ? TuningConfig.DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS + : maxSavedParseExceptions; } - this.logParseExceptions = logParseExceptions == null ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS : logParseExceptions; + this.logParseExceptions = logParseExceptions == null + ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS + : logParseExceptions; } private static Integer initializeTargetPartitionSize(Integer numShards, Integer targetPartitionSize) diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java index 846e14dbb73..99adee034b3 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java @@ -453,6 +453,28 @@ public class CompactionTaskTest ); } + @Test + public void testEmptyInterval() throws Exception + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage(CoreMatchers.containsString("must specify a nonempty interval")); + + final CompactionTask task = new CompactionTask( + null, + null, + "foo", + Intervals.of("2000-01-01/2000-01-01"), + null, + null, + null, + null, + objectMapper, + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + new NoopChatHandlerProvider(), + null + ); + } + private static DimensionsSpec getExpectedDimensionsSpecForAutoGeneration() { return new DimensionsSpec(