Compaction: Block input specs not aligned with segmentGranularity. (#14127)

* Compaction: Block input specs not aligned with segmentGranularity.

When input intervals are not aligned with segmentGranularity, data may be
overshadowed if it lies in the space between the input intervals and the
output segmentGranularity.

In MSQ REPLACE, this is a validation error. IMO the same behavior makes
sense for compaction tasks. In case anyone was depending on the ability
to compact nonaligned intervals, a configuration parameter
allowNonAlignedInterval is provided. I don't expect it to be used much.

* Remove unused.

* ITCompactionTaskTest uses non-aligned intervals.
This commit is contained in:
Gian Merlino 2023-04-25 17:06:16 -07:00 committed by GitHub
parent 89e7948159
commit a7d4162195
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 154 additions and 18 deletions

View File

@ -184,11 +184,11 @@ The compaction `ioConfig` requires specifying `inputSpec` as follows:
|Field|Description|Default|Required|
|-----|-----------|-------|--------|
|`type`|Task type. Set the value to `compact`.|none|Yes|
|`inputSpec`|Specification of the target [intervals](#interval-inputspec) or [segments](#segments-inputspec).|none|Yes|
|`inputSpec`|Specification of the target [interval](#interval-inputspec) or [segments](#segments-inputspec).|none|Yes|
|`dropExisting`|If `true`, the task replaces all existing segments fully contained by either of the following:<br />- the `interval` in the `interval` type `inputSpec`.<br />- the umbrella interval of the `segments` in the `segment` type `inputSpec`.<br />If compaction fails, Druid does not change any of the existing segments.<br />**WARNING**: `dropExisting` in `ioConfig` is a beta feature. |false|No|
|`allowNonAlignedInterval`|If `true`, the task allows an explicit [`segmentGranularity`](#compaction-granularity-spec) that is not aligned with the provided [interval](#interval-inputspec) or [segments](#segments-inputspec). This parameter is only used if [`segmentGranularity`](#compaction-granularity-spec) is explicitly provided.<br /><br />This parameter is provided for backwards compatibility. In most scenarios it should not be set, as it can lead to data being accidentally overshadowed. This parameter may be removed in a future release.|false|No|
Druid supports two supported `inputSpec` formats:
The compaction task has two kinds of `inputSpec`:
#### Interval `inputSpec`

View File

@ -20,6 +20,7 @@
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.segment.indexing.BatchIOConfig;
@ -38,15 +39,18 @@ import java.util.Objects;
public class CompactionIOConfig implements IOConfig
{
private final CompactionInputSpec inputSpec;
private final boolean allowNonAlignedInterval;
private final boolean dropExisting;
@JsonCreator
public CompactionIOConfig(
@JsonProperty("inputSpec") CompactionInputSpec inputSpec,
@JsonProperty("allowNonAlignedInterval") boolean allowNonAlignedInterval,
@JsonProperty("dropExisting") @Nullable Boolean dropExisting
)
{
this.inputSpec = inputSpec;
this.allowNonAlignedInterval = allowNonAlignedInterval;
this.dropExisting = dropExisting == null ? BatchIOConfig.DEFAULT_DROP_EXISTING : dropExisting;
}
@ -56,6 +60,13 @@ public class CompactionIOConfig implements IOConfig
return inputSpec;
}
@JsonProperty("allowNonAlignedInterval")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public boolean isAllowNonAlignedInterval()
{
return allowNonAlignedInterval;
}
@JsonProperty
public boolean isDropExisting()
{
@ -72,14 +83,15 @@ public class CompactionIOConfig implements IOConfig
return false;
}
CompactionIOConfig that = (CompactionIOConfig) o;
return dropExisting == that.dropExisting &&
Objects.equals(inputSpec, that.inputSpec);
return allowNonAlignedInterval == that.allowNonAlignedInterval
&& dropExisting == that.dropExisting
&& Objects.equals(inputSpec, that.inputSpec);
}
@Override
public int hashCode()
{
return Objects.hash(inputSpec, dropExisting);
return Objects.hash(inputSpec, allowNonAlignedInterval, dropExisting);
}
@Override
@ -87,6 +99,7 @@ public class CompactionIOConfig implements IOConfig
{
return "CompactionIOConfig{" +
"inputSpec=" + inputSpec +
", allowNonAlignedInterval=" + allowNonAlignedInterval +
", dropExisting=" + dropExisting +
'}';
}

View File

@ -216,11 +216,11 @@ public class CompactionTask extends AbstractBatchIndexTask
if (ioConfig != null) {
this.ioConfig = ioConfig;
} else if (interval != null) {
this.ioConfig = new CompactionIOConfig(new CompactionIntervalSpec(interval, null), null);
this.ioConfig = new CompactionIOConfig(new CompactionIntervalSpec(interval, null), false, null);
} else {
// We already checked segments is not null or empty above.
//noinspection ConstantConditions
this.ioConfig = new CompactionIOConfig(SpecificSegmentsSpec.fromSegments(segments), null);
this.ioConfig = new CompactionIOConfig(SpecificSegmentsSpec.fromSegments(segments), false, null);
}
this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
this.transformSpec = transformSpec;
@ -462,6 +462,7 @@ public class CompactionTask extends AbstractBatchIndexTask
final List<ParallelIndexIngestionSpec> ingestionSpecs = createIngestionSchema(
toolbox,
getTaskLockHelper().getLockGranularityToUse(),
ioConfig,
segmentProvider,
partitionConfigurationManager,
dimensionsSpec,
@ -567,9 +568,10 @@ public class CompactionTask extends AbstractBatchIndexTask
* @return an empty list if input segments don't exist. Otherwise, a generated ingestionSpec.
*/
@VisibleForTesting
static <ObjectType> List<ParallelIndexIngestionSpec> createIngestionSchema(
static List<ParallelIndexIngestionSpec> createIngestionSchema(
final TaskToolbox toolbox,
final LockGranularity lockGranularityInUse,
final CompactionIOConfig ioConfig,
final SegmentProvider segmentProvider,
final PartitionConfigurationManager partitionConfigurationManager,
@Nullable final DimensionsSpec dimensionsSpec,
@ -656,7 +658,7 @@ public class CompactionTask extends AbstractBatchIndexTask
coordinatorClient,
segmentCacheManagerFactory,
retryPolicyFactory,
dropExisting
ioConfig
),
compactionTuningConfig
)
@ -695,7 +697,7 @@ public class CompactionTask extends AbstractBatchIndexTask
coordinatorClient,
segmentCacheManagerFactory,
retryPolicyFactory,
dropExisting
ioConfig
),
compactionTuningConfig
)
@ -710,9 +712,26 @@ public class CompactionTask extends AbstractBatchIndexTask
CoordinatorClient coordinatorClient,
SegmentCacheManagerFactory segmentCacheManagerFactory,
RetryPolicyFactory retryPolicyFactory,
boolean dropExisting
CompactionIOConfig compactionIOConfig
)
{
if (!compactionIOConfig.isAllowNonAlignedInterval()) {
// Validate interval alignment.
final Granularity segmentGranularity = dataSchema.getGranularitySpec().getSegmentGranularity();
final Interval widenedInterval = Intervals.utc(
segmentGranularity.bucketStart(interval.getStart()).getMillis(),
segmentGranularity.bucketEnd(interval.getEnd().minus(1)).getMillis()
);
if (!interval.equals(widenedInterval)) {
throw new IAE(
"Interval[%s] to compact is not aligned with segmentGranularity[%s]",
interval,
segmentGranularity
);
}
}
return new ParallelIndexIOConfig(
null,
new DruidInputSource(
@ -730,7 +749,7 @@ public class CompactionTask extends AbstractBatchIndexTask
),
null,
false,
dropExisting
compactionIOConfig.isDropExisting()
);
}
@ -1226,15 +1245,21 @@ public class CompactionTask extends AbstractBatchIndexTask
return inputSpec(SpecificSegmentsSpec.fromSegments(segments));
}
public Builder ioConfig(CompactionIOConfig ioConfig)
{
this.ioConfig = ioConfig;
return this;
}
public Builder inputSpec(CompactionInputSpec inputSpec)
{
this.ioConfig = new CompactionIOConfig(inputSpec, null);
this.ioConfig = new CompactionIOConfig(inputSpec, false, null);
return this;
}
public Builder inputSpec(CompactionInputSpec inputSpec, Boolean dropExisting)
{
this.ioConfig = new CompactionIOConfig(inputSpec, dropExisting);
this.ioConfig = new CompactionIOConfig(inputSpec, false, dropExisting);
return this;
}

View File

@ -90,12 +90,15 @@ import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionIds;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
@ -654,6 +657,83 @@ public class CompactionTaskRunTest extends IngestionTestBase
}
}
@Test
public void testWithSegmentGranularityMisalignedInterval() throws Exception
{
runIndexTask();
final Builder builder = new Builder(
DATA_SOURCE,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY
);
final CompactionTask compactionTask1 = builder
.ioConfig(
new CompactionIOConfig(
new CompactionIntervalSpec(Intervals.of("2014-01-01/2014-01-02"), null),
false,
null
)
)
.segmentGranularity(Granularities.WEEK)
.build();
final IllegalArgumentException e = Assert.assertThrows(
IllegalArgumentException.class,
() -> runTask(compactionTask1)
);
MatcherAssert.assertThat(
e,
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
"Interval[2014-01-01T00:00:00.000Z/2014-01-02T00:00:00.000Z] to compact is not aligned with segmentGranularity"))
);
}
@Test
public void testWithSegmentGranularityMisalignedIntervalAllowed() throws Exception
{
runIndexTask();
final Builder builder = new Builder(
DATA_SOURCE,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY
);
// day segmentGranularity
final CompactionTask compactionTask1 = builder
.ioConfig(
new CompactionIOConfig(
new CompactionIntervalSpec(Intervals.of("2014-01-01/2014-01-02"), null),
true,
null
)
)
.segmentGranularity(Granularities.WEEK)
.build();
Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);
Assert.assertTrue(resultPair.lhs.isSuccess());
List<DataSegment> segments = resultPair.rhs;
Assert.assertEquals(1, segments.size());
Assert.assertEquals(Intervals.of("2013-12-30/2014-01-06"), segments.get(0).getInterval());
Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(0).getShardSpec());
Assert.assertEquals(
getDefaultCompactionState(
Granularities.WEEK,
Granularities.MINUTE,
ImmutableList.of(Intervals.of("2014-01-01/2014-01-01T03"))
),
segments.get(0).getLastCompactionState()
);
}
@Test
public void testCompactionWithFilterInTransformSpec() throws Exception
{

View File

@ -919,13 +919,14 @@ public class CompactionTaskTest
);
provider.checkSegments(LockGranularity.TIME_CHUNK, ImmutableList.of());
}
@Test
public void testCreateIngestionSchema() throws IOException
{
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
@ -1000,6 +1001,7 @@ public class CompactionTaskTest
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(tuningConfig),
null,
@ -1075,6 +1077,7 @@ public class CompactionTaskTest
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(tuningConfig),
null,
@ -1150,6 +1153,7 @@ public class CompactionTaskTest
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(tuningConfig),
null,
@ -1215,6 +1219,7 @@ public class CompactionTaskTest
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
customSpec,
@ -1260,6 +1265,7 @@ public class CompactionTaskTest
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
@ -1298,6 +1304,7 @@ public class CompactionTaskTest
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(SEGMENTS)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
@ -1342,6 +1349,7 @@ public class CompactionTaskTest
CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(segments)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
@ -1367,6 +1375,7 @@ public class CompactionTaskTest
CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(segments)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
@ -1404,6 +1413,7 @@ public class CompactionTaskTest
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
@ -1443,6 +1453,7 @@ public class CompactionTaskTest
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
@ -1480,6 +1491,7 @@ public class CompactionTaskTest
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
@ -1523,6 +1535,7 @@ public class CompactionTaskTest
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
@ -1561,6 +1574,7 @@ public class CompactionTaskTest
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
@ -1599,6 +1613,7 @@ public class CompactionTaskTest
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,
@ -1624,6 +1639,7 @@ public class CompactionTaskTest
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
new PartitionConfigurationManager(TUNING_CONFIG),
null,

View File

@ -6,7 +6,8 @@
"inputSpec": {
"type": "interval",
"interval": "2013-08-31/2013-09-02"
}
},
"allowNonAlignedInterval": true
},
"granularitySpec": %%GRANULARITY_SPEC%%,
"context" : {

View File

@ -6,7 +6,8 @@
"inputSpec": {
"type": "interval",
"interval": "2013-08-31/2013-09-02"
}
},
"allowNonAlignedInterval": true
},
"segmentGranularity": "%%SEGMENT_GRANULARITY%%",
"context" : {