CompactionTask: Reject empty intervals on construction. (#6059)

* CompactionTask: Reject empty intervals on construction.

They don't make sense anyway, and it's better to fail fast.

* Switch API.
This commit is contained in:
Gian Merlino 2018-07-30 08:52:50 -07:00 committed by GitHub
parent c57f4a5db0
commit 63be028cee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 59 additions and 12 deletions

View File

@ -279,13 +279,16 @@ An example of compaction task is
This compaction task reads _all segments_ of the interval `2017-01-01/2018-01-01` and results in new segments.
Note that intervals of the input segments are merged into a single interval of `2017-01-01/2018-01-01` no matter what the segmentGranularity was.
To controll the number of result segments, you can set `targetPartitionSize` or `numShards`. See [indexTuningConfig](#tuningconfig) for more details.
To control the number of result segments, you can set `targetPartitionSize` or `numShards`. See [indexTuningConfig](#tuningconfig) for more details.
To merge each day's worth of data into separate segments, you can submit multiple `compact` tasks, one for each day. They will run in parallel.
A compaction task internally generates an `index` task spec for performing compaction work with some fixed parameters.
For example, its `firehose` is always the [ingestSegmentSpec](./firehose.html), and `dimensionsSpec` and `metricsSpec`
include all dimensions and metrics of the input segments by default.
Compaction tasks will exit with a failure status code, without doing anything, if the interval you specify has no
data segments loaded in it (or if the interval you specify is empty).
The output segment can have different metadata from the input segments unless all input segments have the same metadata.
- Dimensions: since Druid supports schema change, the dimensions can be different across segments even if they are a part of the same dataSource.

View File

@ -49,6 +49,7 @@ import io.druid.indexing.common.task.IndexTask.IndexIOConfig;
import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import io.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import io.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.JodaUtils;
import io.druid.java.util.common.Pair;
@ -136,6 +137,10 @@ public class CompactionTask extends AbstractTask
Preconditions.checkArgument(interval != null || segments != null, "interval or segments should be specified");
Preconditions.checkArgument(interval == null || segments == null, "one of interval and segments should be null");
if (interval != null && interval.toDurationMillis() == 0) {
throw new IAE("Interval[%s] is empty, must specify a nonempty interval", interval);
}
this.interval = interval;
this.segments = segments;
this.dimensionsSpec = dimensionsSpec;
@ -225,7 +230,7 @@ public class CompactionTask extends AbstractTask
}
if (indexTaskSpec == null) {
log.warn("Failed to generate compaction spec");
log.warn("Interval[%s] has no segments, nothing to do.", interval);
return TaskStatus.failure(getId());
} else {
final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(indexTaskSpec);
@ -237,7 +242,7 @@ public class CompactionTask extends AbstractTask
/**
* Generate {@link IndexIngestionSpec} from input segments.
*
* @return null if input segments don't exist. Otherwise, a generated ingestionSpec.
*/
@Nullable

View File

@ -190,7 +190,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
@JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory
)
{
this(
this(
id,
makeGroupId(ingestionSchema),
taskResource,
@ -267,6 +267,13 @@ public class IndexTask extends AbstractTask implements ChatHandler
static boolean isReady(TaskActionClient actionClient, SortedSet<Interval> intervals) throws IOException
{
// Sanity check preventing empty intervals (which cannot be locked, and don't make sense anyway).
for (Interval interval : intervals) {
if (interval.toDurationMillis() == 0) {
throw new ISE("Cannot run with empty interval[%s]", interval);
}
}
final List<TaskLock> locks = getTaskLocks(actionClient);
if (locks.size() == 0) {
try {
@ -394,7 +401,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
}
@Override
public TaskStatus run(final TaskToolbox toolbox) throws Exception
public TaskStatus run(final TaskToolbox toolbox)
{
try {
if (chatHandlerProvider.isPresent()) {
@ -777,7 +784,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
determinePartitionsMeters.incrementUnparseable();
if (determinePartitionsMeters.getUnparseable() > ingestionSchema.getTuningConfig()
.getMaxParseExceptions()) {
.getMaxParseExceptions()) {
throw new RuntimeException("Max parse exceptions exceeded, terminating task...");
}
}
@ -917,7 +924,12 @@ public class IndexTask extends AbstractTask implements ChatHandler
};
try (
final Appenderator appenderator = newAppenderator(buildSegmentsFireDepartmentMetrics, toolbox, dataSchema, tuningConfig);
final Appenderator appenderator = newAppenderator(
buildSegmentsFireDepartmentMetrics,
toolbox,
dataSchema,
tuningConfig
);
final BatchAppenderatorDriver driver = newDriver(appenderator, toolbox, segmentAllocator);
final Firehose firehose = firehoseFactory.connect(dataSchema.getParser(), firehoseTempDir)
) {
@ -1288,7 +1300,8 @@ public class IndexTask extends AbstractTask implements ChatHandler
@Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
@JsonProperty("publishTimeout") @Nullable Long publishTimeout, // deprecated
@JsonProperty("pushTimeout") @Nullable Long pushTimeout,
@JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
@JsonProperty("segmentWriteOutMediumFactory") @Nullable
SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
@ -1368,12 +1381,16 @@ public class IndexTask extends AbstractTask implements ChatHandler
this.maxParseExceptions = 0;
this.maxSavedParseExceptions = maxSavedParseExceptions == null ? 0 : Math.min(1, maxSavedParseExceptions);
} else {
this.maxParseExceptions = maxParseExceptions == null ? TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS : maxParseExceptions;
this.maxParseExceptions = maxParseExceptions == null
? TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS
: maxParseExceptions;
this.maxSavedParseExceptions = maxSavedParseExceptions == null
? TuningConfig.DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS
: maxSavedParseExceptions;
? TuningConfig.DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS
: maxSavedParseExceptions;
}
this.logParseExceptions = logParseExceptions == null ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS : logParseExceptions;
this.logParseExceptions = logParseExceptions == null
? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS
: logParseExceptions;
}
private static Integer initializeTargetPartitionSize(Integer numShards, Integer targetPartitionSize)

View File

@ -453,6 +453,28 @@ public class CompactionTaskTest
);
}
@Test
public void testEmptyInterval() throws Exception
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage(CoreMatchers.containsString("must specify a nonempty interval"));
final CompactionTask task = new CompactionTask(
null,
null,
"foo",
Intervals.of("2000-01-01/2000-01-01"),
null,
null,
null,
null,
objectMapper,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
new NoopChatHandlerProvider(),
null
);
}
private static DimensionsSpec getExpectedDimensionsSpecForAutoGeneration()
{
return new DimensionsSpec(