Make dropExisting flag for Compaction configurable and add warning documentations (#11070)

* Make dropExisting flag for Compaction configurable

* fix checkstyle

* fix checkstyle

* fix test

* add tests

* fix spelling

* fix docs

* add IT

* fix test

* fix doc

* fix doc
This commit is contained in:
Maytas Monsereenusorn 2021-04-09 00:12:28 -07:00 committed by GitHub
parent 8264203cee
commit 4576152e4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 707 additions and 76 deletions

View File

@ -100,6 +100,7 @@ public class NewestSegmentFirstPolicyBenchmark
null, null,
null, null,
null, null,
null,
null null
) )
); );

View File

@ -867,6 +867,7 @@ A description of the compaction config is:
|`tuningConfig`|Tuning config for compaction tasks. See below [Compaction Task TuningConfig](#automatic-compaction-tuningconfig).|no| |`tuningConfig`|Tuning config for compaction tasks. See below [Compaction Task TuningConfig](#automatic-compaction-tuningconfig).|no|
|`taskContext`|[Task context](../ingestion/tasks.md#context) for compaction tasks.|no| |`taskContext`|[Task context](../ingestion/tasks.md#context) for compaction tasks.|no|
|`granularitySpec`|Custom `granularitySpec` to describe the `segmentGranularity` for the compacted segments. See [Automatic compaction granularitySpec](#automatic-compaction-granularityspec)|No| |`granularitySpec`|Custom `granularitySpec` to describe the `segmentGranularity` for the compacted segments. See [Automatic compaction granularitySpec](#automatic-compaction-granularityspec)|No|
|`ioConfig`|IO config for compaction tasks. See below [Compaction Task IOConfig](#automatic-compaction-ioconfig).|no|
An example of compaction config is: An example of compaction config is:
@ -918,6 +919,15 @@ You can optionally use the `granularitySpec` object to configure the segment gra
> Unlike manual compaction, automatic compaction does not support query granularity. > Unlike manual compaction, automatic compaction does not support query granularity.
###### Automatic compaction IOConfig
Auto compaction supports a subset of the [IOConfig for Parallel task](../ingestion/native-batch.md).
The below is a list of the supported configurations for auto compaction.
|Property|Description|Default|Required|
|--------|-----------|-------|--------|
|`dropExisting`|If `true`, then the generated compaction task drops (mark unused) all existing segments fully contained by the umbrella interval of the compacted segments when the task publishes new segments. If compaction fails, Druid does not drop or mark unused any segments. WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the compacted `interval`. Note that changing this config does not cause intervals to be compacted again.|false|no|
### Overlord ### Overlord
For general Overlord Process information, see [here](../design/overlord.md). For general Overlord Process information, see [here](../design/overlord.md).

View File

@ -54,7 +54,7 @@ See [Setting up a manual compaction task](#setting-up-manual-compaction) for mor
## Data handling with compaction ## Data handling with compaction
During compaction, Druid overwrites the original set of segments with the compacted set. Druid also locks the segments for the time interval being compacted to ensure data consistency. By default, compaction tasks do not modify the underlying data. You can configure the compaction task to change the query granularity or add or remove dimensions in the compaction task. This means that the only changes to query results should be the result of intentional, not automatic, changes. During compaction, Druid overwrites the original set of segments with the compacted set. Druid also locks the segments for the time interval being compacted to ensure data consistency. By default, compaction tasks do not modify the underlying data. You can configure the compaction task to change the query granularity or add or remove dimensions in the compaction task. This means that the only changes to query results should be the result of intentional, not automatic, changes.
For compaction tasks, `dropExisting` for underlying ingestion tasks is "true". This means that Druid can drop (mark unused) all the existing segments fully within interval for the compaction task. For an example of why this is important, see the suggestion for reindexing with finer granularity under [Implementation considerations](native-batch.md#implementation-considerations). For compaction tasks, `dropExisting` in `ioConfig` can be set to "true" for Druid to drop (mark unused) all existing segments fully contained by the interval of the compaction task. For an example of why this is important, see the suggestion for reindexing with finer granularity under [Implementation considerations](native-batch.md#implementation-considerations). WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the compaction task interval.
If an ingestion task needs to write data to a segment for a time interval locked for compaction, by default the ingestion task supersedes the compaction task and the compaction task fails without finishing. For manual compaction tasks you can adjust the input spec interval to avoid conflicts between ingestion and compaction. For automatic compaction, you can set the `skipOffsetFromLatest` key to adjustment the auto compaction starting point from the current time to reduce the chance of conflicts between ingestion and compaction. See [Compaction dynamic configuration](../configuration/index.md#compaction-dynamic-configuration) for more information. Another option is to set the compaction task to higher priority than the ingestion task. If an ingestion task needs to write data to a segment for a time interval locked for compaction, by default the ingestion task supersedes the compaction task and the compaction task fails without finishing. For manual compaction tasks you can adjust the input spec interval to avoid conflicts between ingestion and compaction. For automatic compaction, you can set the `skipOffsetFromLatest` key to adjustment the auto compaction starting point from the current time to reduce the chance of conflicts between ingestion and compaction. See [Compaction dynamic configuration](../configuration/index.md#compaction-dynamic-configuration) for more information. Another option is to set the compaction task to higher priority than the ingestion task.
@ -158,10 +158,12 @@ This task doesn't specify a `granularitySpec` so Druid retains the original segm
The compaction `ioConfig` requires specifying `inputSpec` as follows: The compaction `ioConfig` requires specifying `inputSpec` as follows:
|Field|Description|Required| |Field|Description|Default|Required?|
|-----|-----------|--------| |-----|-----------|-------|--------|
|`type`|Task type. Should be `compact`|Yes| |`type`|Task type. Should be `compact`|none|Yes|
|`inputSpec`|Input specification|Yes| |`inputSpec`|Input specification|none|Yes|
|`dropExisting`|If `true`, then the compaction task drops (mark unused) all existing segments fully contained by either the `interval` in the `interval` type `inputSpec` or the umbrella interval of the `segments` in the `segment` type `inputSpec` when the task publishes new compacted segments. If compaction fails, Druid does not drop or mark unused any segments. WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the compaction task interval.|false|no|
There are two supported `inputSpec`s for now. There are two supported `inputSpec`s for now.

View File

@ -91,7 +91,8 @@ The supported compression formats for native batch ingestion are `bz2`, `gz`, `x
`granularitySpec`'s intervals, the portion of those segments outside the new segments' intervals will still be visible. `granularitySpec`'s intervals, the portion of those segments outside the new segments' intervals will still be visible.
- You can set `dropExisting` flag in the `ioConfig` to true if you want the ingestion task to drop all existing segments that - You can set `dropExisting` flag in the `ioConfig` to true if you want the ingestion task to drop all existing segments that
start and end within your `granularitySpec`'s intervals. This applies whether or not the new data covers all existing segments. start and end within your `granularitySpec`'s intervals. This applies whether or not the new data covers all existing segments.
`dropExisting` only applies when `appendToExisting` is false and the `granularitySpec` contains an `interval`. `dropExisting` only applies when `appendToExisting` is false and the `granularitySpec` contains an `interval`. WARNING: this
functionality is still in beta and can result in temporary data unavailability for data within the specified `interval`
The following examples demonstrate when to set the `dropExisting` property to true in the `ioConfig`: The following examples demonstrate when to set the `dropExisting` property to true in the `ioConfig`:
@ -220,7 +221,7 @@ that range if there's some stray data with unexpected timestamps.
|type|The task type, this should always be `index_parallel`.|none|yes| |type|The task type, this should always be `index_parallel`.|none|yes|
|inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes| |inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes|
|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no| |appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no|
|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if `dropExisting` is `true`.|false|no| |dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if `dropExisting` is `true`. WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the specified `interval`.|false|no|
### `tuningConfig` ### `tuningConfig`
@ -749,7 +750,7 @@ that range if there's some stray data with unexpected timestamps.
|type|The task type, this should always be "index".|none|yes| |type|The task type, this should always be "index".|none|yes|
|inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes| |inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes|
|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no| |appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no|
|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if `dropExisting` is `true`.|false|no| |dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if `dropExisting` is `true`. WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the specified `interval`.|false|no|
### `tuningConfig` ### `tuningConfig`

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.segment.indexing.IOConfig; import org.apache.druid.segment.indexing.IOConfig;
import javax.annotation.Nullable;
import java.util.Objects; import java.util.Objects;
/** /**
@ -36,11 +37,16 @@ import java.util.Objects;
public class CompactionIOConfig implements IOConfig public class CompactionIOConfig implements IOConfig
{ {
private final CompactionInputSpec inputSpec; private final CompactionInputSpec inputSpec;
private final boolean dropExisting;
@JsonCreator @JsonCreator
public CompactionIOConfig(@JsonProperty("inputSpec") CompactionInputSpec inputSpec) public CompactionIOConfig(
@JsonProperty("inputSpec") CompactionInputSpec inputSpec,
@JsonProperty("dropExisting") @Nullable Boolean dropExisting
)
{ {
this.inputSpec = inputSpec; this.inputSpec = inputSpec;
this.dropExisting = dropExisting == null ? DEFAULT_DROP_EXISTING : dropExisting;
} }
@JsonProperty @JsonProperty
@ -49,6 +55,12 @@ public class CompactionIOConfig implements IOConfig
return inputSpec; return inputSpec;
} }
@JsonProperty
public boolean isDropExisting()
{
return dropExisting;
}
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
@ -59,13 +71,14 @@ public class CompactionIOConfig implements IOConfig
return false; return false;
} }
CompactionIOConfig that = (CompactionIOConfig) o; CompactionIOConfig that = (CompactionIOConfig) o;
return Objects.equals(inputSpec, that.inputSpec); return dropExisting == that.dropExisting &&
Objects.equals(inputSpec, that.inputSpec);
} }
@Override @Override
public int hashCode() public int hashCode()
{ {
return Objects.hash(inputSpec); return Objects.hash(inputSpec, dropExisting);
} }
@Override @Override
@ -73,6 +86,7 @@ public class CompactionIOConfig implements IOConfig
{ {
return "CompactionIOConfig{" + return "CompactionIOConfig{" +
"inputSpec=" + inputSpec + "inputSpec=" + inputSpec +
", dropExisting=" + dropExisting +
'}'; '}';
} }
} }

View File

@ -203,11 +203,11 @@ public class CompactionTask extends AbstractBatchIndexTask
if (ioConfig != null) { if (ioConfig != null) {
this.ioConfig = ioConfig; this.ioConfig = ioConfig;
} else if (interval != null) { } else if (interval != null) {
this.ioConfig = new CompactionIOConfig(new CompactionIntervalSpec(interval, null)); this.ioConfig = new CompactionIOConfig(new CompactionIntervalSpec(interval, null), null);
} else { } else {
// We already checked segments is not null or empty above. // We already checked segments is not null or empty above.
//noinspection ConstantConditions //noinspection ConstantConditions
this.ioConfig = new CompactionIOConfig(SpecificSegmentsSpec.fromSegments(segments)); this.ioConfig = new CompactionIOConfig(SpecificSegmentsSpec.fromSegments(segments), null);
} }
this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec; this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
@ -424,7 +424,8 @@ public class CompactionTask extends AbstractBatchIndexTask
granularitySpec, granularitySpec,
toolbox.getCoordinatorClient(), toolbox.getCoordinatorClient(),
segmentLoaderFactory, segmentLoaderFactory,
retryPolicyFactory retryPolicyFactory,
ioConfig.isDropExisting()
); );
final List<ParallelIndexSupervisorTask> indexTaskSpecs = IntStream final List<ParallelIndexSupervisorTask> indexTaskSpecs = IntStream
.range(0, ingestionSpecs.size()) .range(0, ingestionSpecs.size())
@ -532,7 +533,8 @@ public class CompactionTask extends AbstractBatchIndexTask
@Nullable final ClientCompactionTaskGranularitySpec granularitySpec, @Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
final CoordinatorClient coordinatorClient, final CoordinatorClient coordinatorClient,
final SegmentLoaderFactory segmentLoaderFactory, final SegmentLoaderFactory segmentLoaderFactory,
final RetryPolicyFactory retryPolicyFactory final RetryPolicyFactory retryPolicyFactory,
final boolean dropExisting
) throws IOException, SegmentLoadingException ) throws IOException, SegmentLoadingException
{ {
NonnullPair<Map<DataSegment, File>, List<TimelineObjectHolder<String, DataSegment>>> pair = prepareSegments( NonnullPair<Map<DataSegment, File>, List<TimelineObjectHolder<String, DataSegment>>> pair = prepareSegments(
@ -614,7 +616,8 @@ public class CompactionTask extends AbstractBatchIndexTask
interval, interval,
coordinatorClient, coordinatorClient,
segmentLoaderFactory, segmentLoaderFactory,
retryPolicyFactory retryPolicyFactory,
dropExisting
), ),
compactionTuningConfig compactionTuningConfig
) )
@ -641,7 +644,8 @@ public class CompactionTask extends AbstractBatchIndexTask
segmentProvider.interval, segmentProvider.interval,
coordinatorClient, coordinatorClient,
segmentLoaderFactory, segmentLoaderFactory,
retryPolicyFactory retryPolicyFactory,
dropExisting
), ),
compactionTuningConfig compactionTuningConfig
) )
@ -655,7 +659,8 @@ public class CompactionTask extends AbstractBatchIndexTask
Interval interval, Interval interval,
CoordinatorClient coordinatorClient, CoordinatorClient coordinatorClient,
SegmentLoaderFactory segmentLoaderFactory, SegmentLoaderFactory segmentLoaderFactory,
RetryPolicyFactory retryPolicyFactory RetryPolicyFactory retryPolicyFactory,
boolean dropExisting
) )
{ {
return new ParallelIndexIOConfig( return new ParallelIndexIOConfig(
@ -675,7 +680,7 @@ public class CompactionTask extends AbstractBatchIndexTask
), ),
null, null,
false, false,
true dropExisting
); );
} }
@ -1062,7 +1067,13 @@ public class CompactionTask extends AbstractBatchIndexTask
public Builder inputSpec(CompactionInputSpec inputSpec) public Builder inputSpec(CompactionInputSpec inputSpec)
{ {
this.ioConfig = new CompactionIOConfig(inputSpec); this.ioConfig = new CompactionIOConfig(inputSpec, null);
return this;
}
public Builder inputSpec(CompactionInputSpec inputSpec, Boolean dropExisting)
{
this.ioConfig = new CompactionIOConfig(inputSpec, dropExisting);
return this; return this;
} }

View File

@ -1046,7 +1046,6 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
public static class IndexIOConfig implements BatchIOConfig public static class IndexIOConfig implements BatchIOConfig
{ {
private static final boolean DEFAULT_APPEND_TO_EXISTING = false; private static final boolean DEFAULT_APPEND_TO_EXISTING = false;
private static final boolean DEFAULT_DROP_EXISTING = false;
private final FirehoseFactory firehoseFactory; private final FirehoseFactory firehoseFactory;
private final InputSource inputSource; private final InputSource inputSource;

View File

@ -83,7 +83,8 @@ public class ClientCompactionTaskQuerySerdeTest
new ClientCompactionIntervalSpec( new ClientCompactionIntervalSpec(
Intervals.of("2019/2020"), Intervals.of("2019/2020"),
"testSha256OfSortedSegmentIds" "testSha256OfSortedSegmentIds"
) ),
true
), ),
new ClientCompactionTaskQueryTuningConfig( new ClientCompactionTaskQueryTuningConfig(
null, null,
@ -201,6 +202,10 @@ public class ClientCompactionTaskQuerySerdeTest
query.getGranularitySpec().getSegmentGranularity(), query.getGranularitySpec().getSegmentGranularity(),
task.getGranularitySpec().getSegmentGranularity() task.getGranularitySpec().getSegmentGranularity()
); );
Assert.assertEquals(
query.getIoConfig().isDropExisting(),
task.getIoConfig().isDropExisting()
);
Assert.assertEquals(query.getContext(), task.getContext()); Assert.assertEquals(query.getContext(), task.getContext());
} }
@ -214,7 +219,7 @@ public class ClientCompactionTaskQuerySerdeTest
new RetryPolicyFactory(new RetryPolicyConfig()) new RetryPolicyFactory(new RetryPolicyConfig())
); );
final CompactionTask task = builder final CompactionTask task = builder
.inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds")) .inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds"), true)
.tuningConfig( .tuningConfig(
new ParallelIndexTuningConfig( new ParallelIndexTuningConfig(
null, null,
@ -269,7 +274,8 @@ public class ClientCompactionTaskQuerySerdeTest
new ClientCompactionIntervalSpec( new ClientCompactionIntervalSpec(
Intervals.of("2019/2020"), Intervals.of("2019/2020"),
"testSha256OfSortedSegmentIds" "testSha256OfSortedSegmentIds"
) ),
true
), ),
new ClientCompactionTaskQueryTuningConfig( new ClientCompactionTaskQueryTuningConfig(
100, 100,

View File

@ -443,7 +443,41 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
} }
@Test @Test
public void testCompactionDropSegmentsOfInputInterval() public void testCompactionDropSegmentsOfInputIntervalIfDropFlagIsSet()
{
runIndexTask(null, true);
Collection<DataSegment> usedSegments = getCoordinatorClient().fetchUsedSegmentsInDataSourceForIntervals(DATA_SOURCE, ImmutableList.of(INTERVAL_TO_INDEX));
Assert.assertEquals(3, usedSegments.size());
for (DataSegment segment : usedSegments) {
Assert.assertTrue(Granularities.HOUR.isAligned(segment.getInterval()));
}
final Builder builder = new Builder(
DATA_SOURCE,
getSegmentLoaderFactory(),
RETRY_POLICY_FACTORY
);
final CompactionTask compactionTask = builder
// Set the dropExisting flag to true in the IOConfig of the compaction task
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null), true)
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null))
.build();
final Set<DataSegment> compactedSegments = runTask(compactionTask);
usedSegments = getCoordinatorClient().fetchUsedSegmentsInDataSourceForIntervals(DATA_SOURCE, ImmutableList.of(INTERVAL_TO_INDEX));
// All the HOUR segments got dropped even if we do not have all MINUTES segments fully covering the 3 HOURS interval.
// In fact, we only have 3 minutes of data out of the 3 hours interval.
Assert.assertEquals(3, usedSegments.size());
for (DataSegment segment : usedSegments) {
Assert.assertTrue(Granularities.MINUTE.isAligned(segment.getInterval()));
}
}
@Test
public void testCompactionDoesNotDropSegmentsIfDropFlagNotSet()
{ {
runIndexTask(null, true); runIndexTask(null, true);
@ -467,12 +501,20 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
final Set<DataSegment> compactedSegments = runTask(compactionTask); final Set<DataSegment> compactedSegments = runTask(compactionTask);
usedSegments = getCoordinatorClient().fetchUsedSegmentsInDataSourceForIntervals(DATA_SOURCE, ImmutableList.of(INTERVAL_TO_INDEX)); usedSegments = getCoordinatorClient().fetchUsedSegmentsInDataSourceForIntervals(DATA_SOURCE, ImmutableList.of(INTERVAL_TO_INDEX));
// All the HOUR segments got dropped even if we do not have all MINUTES segments fully covering the 3 HOURS interval. // All the HOUR segments did not get dropped since MINUTES segments did not fully covering the 3 HOURS interval.
// In fact, we only have 3 minutes of data out of the 3 hours interval. Assert.assertEquals(6, usedSegments.size());
Assert.assertEquals(3, usedSegments.size()); int hourSegmentCount = 0;
int minuteSegmentCount = 0;
for (DataSegment segment : usedSegments) { for (DataSegment segment : usedSegments) {
Assert.assertTrue(Granularities.MINUTE.isAligned(segment.getInterval())); if (Granularities.MINUTE.isAligned(segment.getInterval())) {
minuteSegmentCount++;
} }
if (Granularities.MINUTE.isAligned(segment.getInterval())) {
hourSegmentCount++;
}
}
Assert.assertEquals(3, hourSegmentCount);
Assert.assertEquals(3, minuteSegmentCount);
} }
private void runIndexTask(@Nullable PartitionsSpec partitionsSpec, boolean appendToExisting) private void runIndexTask(@Nullable PartitionsSpec partitionsSpec, boolean appendToExisting)

View File

@ -803,7 +803,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
} }
@Test @Test
public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompact() throws Exception public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompactWithDropExistingTrue() throws Exception
{ {
// This test fails with segment lock because of the bug reported in https://github.com/apache/druid/issues/10911. // This test fails with segment lock because of the bug reported in https://github.com/apache/druid/issues/10911.
if (lockGranularity == LockGranularity.SEGMENT) { if (lockGranularity == LockGranularity.SEGMENT) {
@ -842,8 +842,9 @@ public class CompactionTaskRunTest extends IngestionTestBase
); );
final CompactionTask partialCompactionTask = builder final CompactionTask partialCompactionTask = builder
.interval(compactionPartialInterval)
.segmentGranularity(Granularities.MINUTE) .segmentGranularity(Granularities.MINUTE)
// Set dropExisting to true
.inputSpec(new CompactionIntervalSpec(compactionPartialInterval, null), true)
.build(); .build();
final Pair<TaskStatus, List<DataSegment>> partialCompactionResult = runTask(partialCompactionTask); final Pair<TaskStatus, List<DataSegment>> partialCompactionResult = runTask(partialCompactionTask);
@ -865,8 +866,9 @@ public class CompactionTaskRunTest extends IngestionTestBase
Assert.assertEquals(expectedSegments, segmentsAfterPartialCompaction); Assert.assertEquals(expectedSegments, segmentsAfterPartialCompaction);
final CompactionTask fullCompactionTask = builder final CompactionTask fullCompactionTask = builder
.interval(Intervals.of("2014-01-01/2014-01-02"))
.segmentGranularity(null) .segmentGranularity(null)
// Set dropExisting to true
.inputSpec(new CompactionIntervalSpec(Intervals.of("2014-01-01/2014-01-02"), null), true)
.build(); .build();
final Pair<TaskStatus, List<DataSegment>> fullCompactionResult = runTask(fullCompactionTask); final Pair<TaskStatus, List<DataSegment>> fullCompactionResult = runTask(fullCompactionTask);
@ -903,6 +905,81 @@ public class CompactionTaskRunTest extends IngestionTestBase
); );
} }
@Test
public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompactWithDropExistingFalse() throws Exception
{
// This test fails with segment lock because of the bug reported in https://github.com/apache/druid/issues/10911.
if (lockGranularity == LockGranularity.SEGMENT) {
return;
}
runIndexTask();
final Set<DataSegment> expectedSegments = new HashSet<>(
getStorageCoordinator().retrieveUsedSegmentsForIntervals(
DATA_SOURCE,
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
Segments.ONLY_VISIBLE
)
);
final Builder builder = new Builder(
DATA_SOURCE,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);
final Interval partialInterval = Intervals.of("2014-01-01T01:00:00/2014-01-01T02:00:00");
final CompactionTask partialCompactionTask = builder
.segmentGranularity(Granularities.MINUTE)
// Set dropExisting to false
.inputSpec(new CompactionIntervalSpec(partialInterval, null), false)
.build();
final Pair<TaskStatus, List<DataSegment>> partialCompactionResult = runTask(partialCompactionTask);
Assert.assertTrue(partialCompactionResult.lhs.isSuccess());
// All segments in the previous expectedSegments should still appear as they have larger segment granularity.
expectedSegments.addAll(partialCompactionResult.rhs);
final Set<DataSegment> segmentsAfterPartialCompaction = new HashSet<>(
getStorageCoordinator().retrieveUsedSegmentsForIntervals(
DATA_SOURCE,
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
Segments.ONLY_VISIBLE
)
);
Assert.assertEquals(expectedSegments, segmentsAfterPartialCompaction);
final CompactionTask fullCompactionTask = builder
.segmentGranularity(null)
// Set dropExisting to false
.inputSpec(new CompactionIntervalSpec(Intervals.of("2014-01-01/2014-01-02"), null), false)
.build();
final Pair<TaskStatus, List<DataSegment>> fullCompactionResult = runTask(fullCompactionTask);
Assert.assertTrue(fullCompactionResult.lhs.isSuccess());
final List<DataSegment> segmentsAfterFullCompaction = new ArrayList<>(
getStorageCoordinator().retrieveUsedSegmentsForIntervals(
DATA_SOURCE,
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
Segments.ONLY_VISIBLE
)
);
segmentsAfterFullCompaction.sort(
(s1, s2) -> Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), s2.getInterval())
);
Assert.assertEquals(3, segmentsAfterFullCompaction.size());
for (int i = 0; i < segmentsAfterFullCompaction.size(); i++) {
Assert.assertEquals(
Intervals.of(StringUtils.format("2014-01-01T%02d/2014-01-01T%02d", i, i + 1)),
segmentsAfterFullCompaction.get(i).getInterval()
);
}
}
@Test @Test
public void testRunIndexAndCompactForSameSegmentAtTheSameTime() throws Exception public void testRunIndexAndCompactForSameSegmentAtTheSameTime() throws Exception
{ {

View File

@ -109,6 +109,7 @@ import org.apache.druid.segment.data.ListIndexed;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory; import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.IOConfig;
import org.apache.druid.segment.indexing.RealtimeTuningConfig; import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.TuningConfig; import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
@ -848,7 +849,8 @@ public class CompactionTaskTest
null, null,
COORDINATOR_CLIENT, COORDINATOR_CLIENT,
segmentLoaderFactory, segmentLoaderFactory,
RETRY_POLICY_FACTORY RETRY_POLICY_FACTORY,
IOConfig.DEFAULT_DROP_EXISTING
); );
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@ -865,7 +867,8 @@ public class CompactionTaskTest
AGGREGATORS, AGGREGATORS,
SEGMENT_INTERVALS, SEGMENT_INTERVALS,
Granularities.MONTH, Granularities.MONTH,
Granularities.NONE Granularities.NONE,
IOConfig.DEFAULT_DROP_EXISTING
); );
} }
@ -919,7 +922,8 @@ public class CompactionTaskTest
null, null,
COORDINATOR_CLIENT, COORDINATOR_CLIENT,
segmentLoaderFactory, segmentLoaderFactory,
RETRY_POLICY_FACTORY RETRY_POLICY_FACTORY,
IOConfig.DEFAULT_DROP_EXISTING
); );
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@ -937,7 +941,8 @@ public class CompactionTaskTest
SEGMENT_INTERVALS, SEGMENT_INTERVALS,
tuningConfig, tuningConfig,
Granularities.MONTH, Granularities.MONTH,
Granularities.NONE Granularities.NONE,
IOConfig.DEFAULT_DROP_EXISTING
); );
} }
@ -991,7 +996,8 @@ public class CompactionTaskTest
null, null,
COORDINATOR_CLIENT, COORDINATOR_CLIENT,
segmentLoaderFactory, segmentLoaderFactory,
RETRY_POLICY_FACTORY RETRY_POLICY_FACTORY,
IOConfig.DEFAULT_DROP_EXISTING
); );
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@ -1009,7 +1015,8 @@ public class CompactionTaskTest
SEGMENT_INTERVALS, SEGMENT_INTERVALS,
tuningConfig, tuningConfig,
Granularities.MONTH, Granularities.MONTH,
Granularities.NONE Granularities.NONE,
IOConfig.DEFAULT_DROP_EXISTING
); );
} }
@ -1063,7 +1070,8 @@ public class CompactionTaskTest
null, null,
COORDINATOR_CLIENT, COORDINATOR_CLIENT,
segmentLoaderFactory, segmentLoaderFactory,
RETRY_POLICY_FACTORY RETRY_POLICY_FACTORY,
IOConfig.DEFAULT_DROP_EXISTING
); );
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@ -1081,7 +1089,8 @@ public class CompactionTaskTest
SEGMENT_INTERVALS, SEGMENT_INTERVALS,
tuningConfig, tuningConfig,
Granularities.MONTH, Granularities.MONTH,
Granularities.NONE Granularities.NONE,
IOConfig.DEFAULT_DROP_EXISTING
); );
} }
@ -1125,7 +1134,8 @@ public class CompactionTaskTest
null, null,
COORDINATOR_CLIENT, COORDINATOR_CLIENT,
segmentLoaderFactory, segmentLoaderFactory,
RETRY_POLICY_FACTORY RETRY_POLICY_FACTORY,
IOConfig.DEFAULT_DROP_EXISTING
); );
ingestionSpecs.sort( ingestionSpecs.sort(
@ -1143,7 +1153,8 @@ public class CompactionTaskTest
AGGREGATORS, AGGREGATORS,
SEGMENT_INTERVALS, SEGMENT_INTERVALS,
Granularities.MONTH, Granularities.MONTH,
Granularities.NONE Granularities.NONE,
IOConfig.DEFAULT_DROP_EXISTING
); );
} }
@ -1167,7 +1178,8 @@ public class CompactionTaskTest
null, null,
COORDINATOR_CLIENT, COORDINATOR_CLIENT,
segmentLoaderFactory, segmentLoaderFactory,
RETRY_POLICY_FACTORY RETRY_POLICY_FACTORY,
IOConfig.DEFAULT_DROP_EXISTING
); );
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@ -1185,7 +1197,8 @@ public class CompactionTaskTest
Arrays.asList(customMetricsSpec), Arrays.asList(customMetricsSpec),
SEGMENT_INTERVALS, SEGMENT_INTERVALS,
Granularities.MONTH, Granularities.MONTH,
Granularities.NONE Granularities.NONE,
IOConfig.DEFAULT_DROP_EXISTING
); );
} }
@ -1202,7 +1215,8 @@ public class CompactionTaskTest
null, null,
COORDINATOR_CLIENT, COORDINATOR_CLIENT,
segmentLoaderFactory, segmentLoaderFactory,
RETRY_POLICY_FACTORY RETRY_POLICY_FACTORY,
IOConfig.DEFAULT_DROP_EXISTING
); );
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@ -1219,7 +1233,8 @@ public class CompactionTaskTest
AGGREGATORS, AGGREGATORS,
SEGMENT_INTERVALS, SEGMENT_INTERVALS,
Granularities.MONTH, Granularities.MONTH,
Granularities.NONE Granularities.NONE,
IOConfig.DEFAULT_DROP_EXISTING
); );
} }
@ -1243,7 +1258,8 @@ public class CompactionTaskTest
null, null,
COORDINATOR_CLIENT, COORDINATOR_CLIENT,
segmentLoaderFactory, segmentLoaderFactory,
RETRY_POLICY_FACTORY RETRY_POLICY_FACTORY,
IOConfig.DEFAULT_DROP_EXISTING
); );
} }
@ -1266,7 +1282,8 @@ public class CompactionTaskTest
null, null,
COORDINATOR_CLIENT, COORDINATOR_CLIENT,
segmentLoaderFactory, segmentLoaderFactory,
RETRY_POLICY_FACTORY RETRY_POLICY_FACTORY,
IOConfig.DEFAULT_DROP_EXISTING
); );
} }
@ -1300,7 +1317,8 @@ public class CompactionTaskTest
new ClientCompactionTaskGranularitySpec(new PeriodGranularity(Period.months(3), null, null), null), new ClientCompactionTaskGranularitySpec(new PeriodGranularity(Period.months(3), null, null), null),
COORDINATOR_CLIENT, COORDINATOR_CLIENT,
segmentLoaderFactory, segmentLoaderFactory,
RETRY_POLICY_FACTORY RETRY_POLICY_FACTORY,
IOConfig.DEFAULT_DROP_EXISTING
); );
final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of( final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))) new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
@ -1319,7 +1337,8 @@ public class CompactionTaskTest
AGGREGATORS, AGGREGATORS,
Collections.singletonList(COMPACTION_INTERVAL), Collections.singletonList(COMPACTION_INTERVAL),
new PeriodGranularity(Period.months(3), null, null), new PeriodGranularity(Period.months(3), null, null),
Granularities.NONE Granularities.NONE,
IOConfig.DEFAULT_DROP_EXISTING
); );
} }
@ -1336,7 +1355,8 @@ public class CompactionTaskTest
new ClientCompactionTaskGranularitySpec(null, new PeriodGranularity(Period.months(3), null, null)), new ClientCompactionTaskGranularitySpec(null, new PeriodGranularity(Period.months(3), null, null)),
COORDINATOR_CLIENT, COORDINATOR_CLIENT,
segmentLoaderFactory, segmentLoaderFactory,
RETRY_POLICY_FACTORY RETRY_POLICY_FACTORY,
IOConfig.DEFAULT_DROP_EXISTING
); );
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@ -1353,7 +1373,8 @@ public class CompactionTaskTest
AGGREGATORS, AGGREGATORS,
SEGMENT_INTERVALS, SEGMENT_INTERVALS,
Granularities.MONTH, Granularities.MONTH,
new PeriodGranularity(Period.months(3), null, null) new PeriodGranularity(Period.months(3), null, null),
IOConfig.DEFAULT_DROP_EXISTING
); );
} }
@ -1373,7 +1394,8 @@ public class CompactionTaskTest
), ),
COORDINATOR_CLIENT, COORDINATOR_CLIENT,
segmentLoaderFactory, segmentLoaderFactory,
RETRY_POLICY_FACTORY RETRY_POLICY_FACTORY,
IOConfig.DEFAULT_DROP_EXISTING
); );
final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of( final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))) new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
@ -1392,7 +1414,8 @@ public class CompactionTaskTest
AGGREGATORS, AGGREGATORS,
Collections.singletonList(COMPACTION_INTERVAL), Collections.singletonList(COMPACTION_INTERVAL),
new PeriodGranularity(Period.months(3), null, null), new PeriodGranularity(Period.months(3), null, null),
new PeriodGranularity(Period.months(3), null, null) new PeriodGranularity(Period.months(3), null, null),
IOConfig.DEFAULT_DROP_EXISTING
); );
} }
@ -1409,7 +1432,8 @@ public class CompactionTaskTest
null, null,
COORDINATOR_CLIENT, COORDINATOR_CLIENT,
segmentLoaderFactory, segmentLoaderFactory,
RETRY_POLICY_FACTORY RETRY_POLICY_FACTORY,
IOConfig.DEFAULT_DROP_EXISTING
); );
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@ -1426,7 +1450,8 @@ public class CompactionTaskTest
AGGREGATORS, AGGREGATORS,
SEGMENT_INTERVALS, SEGMENT_INTERVALS,
Granularities.MONTH, Granularities.MONTH,
Granularities.NONE Granularities.NONE,
IOConfig.DEFAULT_DROP_EXISTING
); );
} }
@ -1444,7 +1469,8 @@ public class CompactionTaskTest
new ClientCompactionTaskGranularitySpec(null, null), new ClientCompactionTaskGranularitySpec(null, null),
COORDINATOR_CLIENT, COORDINATOR_CLIENT,
segmentLoaderFactory, segmentLoaderFactory,
RETRY_POLICY_FACTORY RETRY_POLICY_FACTORY,
IOConfig.DEFAULT_DROP_EXISTING
); );
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@ -1461,7 +1487,8 @@ public class CompactionTaskTest
AGGREGATORS, AGGREGATORS,
SEGMENT_INTERVALS, SEGMENT_INTERVALS,
Granularities.MONTH, Granularities.MONTH,
Granularities.NONE Granularities.NONE,
IOConfig.DEFAULT_DROP_EXISTING
); );
} }
@ -1569,7 +1596,8 @@ public class CompactionTaskTest
List<AggregatorFactory> expectedMetricsSpec, List<AggregatorFactory> expectedMetricsSpec,
List<Interval> expectedSegmentIntervals, List<Interval> expectedSegmentIntervals,
Granularity expectedSegmentGranularity, Granularity expectedSegmentGranularity,
Granularity expectedQueryGranularity Granularity expectedQueryGranularity,
boolean expectedDropExisting
) )
{ {
assertIngestionSchema( assertIngestionSchema(
@ -1615,7 +1643,8 @@ public class CompactionTaskTest
null null
), ),
expectedSegmentGranularity, expectedSegmentGranularity,
expectedQueryGranularity expectedQueryGranularity,
expectedDropExisting
); );
} }
@ -1626,7 +1655,8 @@ public class CompactionTaskTest
List<Interval> expectedSegmentIntervals, List<Interval> expectedSegmentIntervals,
CompactionTask.CompactionTuningConfig expectedTuningConfig, CompactionTask.CompactionTuningConfig expectedTuningConfig,
Granularity expectedSegmentGranularity, Granularity expectedSegmentGranularity,
Granularity expectedQueryGranularity Granularity expectedQueryGranularity,
boolean expectedDropExisting
) )
{ {
Preconditions.checkArgument( Preconditions.checkArgument(
@ -1673,6 +1703,7 @@ public class CompactionTaskTest
// assert ioConfig // assert ioConfig
final ParallelIndexIOConfig ioConfig = ingestionSchema.getIOConfig(); final ParallelIndexIOConfig ioConfig = ingestionSchema.getIOConfig();
Assert.assertFalse(ioConfig.isAppendToExisting()); Assert.assertFalse(ioConfig.isAppendToExisting());
Assert.assertEquals(expectedDropExisting, ioConfig.isDropExisting());
final InputSource inputSource = ioConfig.getInputSource(); final InputSource inputSource = ioConfig.getInputSource();
Assert.assertTrue(inputSource instanceof DruidInputSource); Assert.assertTrue(inputSource instanceof DruidInputSource);
final DruidInputSource druidInputSource = (DruidInputSource) inputSource; final DruidInputSource druidInputSource = (DruidInputSource) inputSource;

View File

@ -35,6 +35,7 @@ import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig; import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CompactionResourceTestClient; import org.apache.druid.testing.clients.CompactionResourceTestClient;
@ -168,7 +169,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
LOG.info("Auto compaction test with hash partitioning"); LOG.info("Auto compaction test with hash partitioning");
final HashedPartitionsSpec hashedPartitionsSpec = new HashedPartitionsSpec(null, 3, null); final HashedPartitionsSpec hashedPartitionsSpec = new HashedPartitionsSpec(null, 3, null);
submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null); submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null, false);
// 2 segments published per day after compaction. // 2 segments published per day after compaction.
forceTriggerAutoCompaction(4); forceTriggerAutoCompaction(4);
verifyQuery(INDEX_QUERIES_RESOURCE); verifyQuery(INDEX_QUERIES_RESOURCE);
@ -183,7 +184,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
"city", "city",
false false
); );
submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null); submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null, false);
forceTriggerAutoCompaction(2); forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE); verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(rangePartitionsSpec, 2); verifySegmentsCompacted(rangePartitionsSpec, 2);
@ -287,7 +288,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
} }
@Test @Test
public void testAutoCompactionDutyWithSegmentGranularity() throws Exception public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrue() throws Exception
{ {
loadData(INDEX_TASK); loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) { try (final Closeable ignored = unloader(fullDatasourceName)) {
@ -298,7 +299,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifyQuery(INDEX_QUERIES_RESOURCE); verifyQuery(INDEX_QUERIES_RESOURCE);
Granularity newGranularity = Granularities.YEAR; Granularity newGranularity = Granularities.YEAR;
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null)); // Set dropExisting to true
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), true);
LOG.info("Auto compaction test with YEAR segment granularity"); LOG.info("Auto compaction test with YEAR segment granularity");
@ -314,10 +316,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
checkCompactionIntervals(expectedIntervalAfterCompaction); checkCompactionIntervals(expectedIntervalAfterCompaction);
newGranularity = Granularities.DAY; newGranularity = Granularities.DAY;
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null)); // Set dropExisting to true
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), true);
LOG.info("Auto compaction test with DAY segment granularity"); LOG.info("Auto compaction test with DAY segment granularity");
// Since dropExisting is set to true...
// The earlier segment with YEAR granularity will be dropped post-compaction // The earlier segment with YEAR granularity will be dropped post-compaction
// Hence, we will only have 2013-08-31 to 2013-09-01 and 2013-09-01 to 2013-09-02. // Hence, we will only have 2013-08-31 to 2013-09-01 and 2013-09-01 to 2013-09-02.
expectedIntervalAfterCompaction = new ArrayList<>(); expectedIntervalAfterCompaction = new ArrayList<>();
@ -333,6 +337,58 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
} }
} }
@Test
public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingFalse() throws Exception
{
loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
intervalsBeforeCompaction.sort(null);
// 4 segments across 2 days (4 total)...
verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
Granularity newGranularity = Granularities.YEAR;
// Set dropExisting to false
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), false);
LOG.info("Auto compaction test with YEAR segment granularity");
List<String> expectedIntervalAfterCompaction = new ArrayList<>();
for (String interval : intervalsBeforeCompaction) {
for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
expectedIntervalAfterCompaction.add(newinterval.toString());
}
}
forceTriggerAutoCompaction(1);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(1, 1000);
checkCompactionIntervals(expectedIntervalAfterCompaction);
newGranularity = Granularities.DAY;
// Set dropExisting to false
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), false);
LOG.info("Auto compaction test with DAY segment granularity");
// Since dropExisting is set to false...
// The earlier segment with YEAR granularity is still 'used' as its not fully overshaowed.
// This is because we only have newer version on 2013-08-31 to 2013-09-01 and 2013-09-01 to 2013-09-02.
// The version for the YEAR segment is still the latest for 2013-01-01 to 2013-08-31 and 2013-09-02 to 2014-01-01.
// Hence, all three segments are available and the expected intervals are combined from the DAY and YEAR segment granularities
// (which are 2013-08-31 to 2013-09-01, 2013-09-01 to 2013-09-02 and 2013-01-01 to 2014-01-01)
for (String interval : intervalsBeforeCompaction) {
for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
expectedIntervalAfterCompaction.add(newinterval.toString());
}
}
forceTriggerAutoCompaction(3);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(3, 1000);
checkCompactionIntervals(expectedIntervalAfterCompaction);
}
}
@Test @Test
public void testAutoCompactionDutyWithSegmentGranularityAndMixedVersion() throws Exception public void testAutoCompactionDutyWithSegmentGranularityAndMixedVersion() throws Exception
{ {
@ -437,7 +493,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
} }
@Test @Test
public void testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranularityCoveringMultipleSegmentsInTimeline() throws Exception public void testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranularityCoveringMultipleSegmentsInTimelineAndDropExistingTrue() throws Exception
{ {
loadData(INDEX_TASK); loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) { try (final Closeable ignored = unloader(fullDatasourceName)) {
@ -448,7 +504,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifyQuery(INDEX_QUERIES_RESOURCE); verifyQuery(INDEX_QUERIES_RESOURCE);
Granularity newGranularity = Granularities.YEAR; Granularity newGranularity = Granularities.YEAR;
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null)); // Set dropExisting to true
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), true);
List<String> expectedIntervalAfterCompaction = new ArrayList<>(); List<String> expectedIntervalAfterCompaction = new ArrayList<>();
// We wil have one segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR) // We wil have one segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR)
@ -473,7 +530,9 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
checkCompactionIntervals(expectedIntervalAfterCompaction); checkCompactionIntervals(expectedIntervalAfterCompaction);
newGranularity = Granularities.MONTH; newGranularity = Granularities.MONTH;
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null)); // Set dropExisting to true
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), true);
// Since dropExisting is set to true...
// This will submit a single compaction task for interval of 2013-01-01/2014-01-01 with MONTH granularity // This will submit a single compaction task for interval of 2013-01-01/2014-01-01 with MONTH granularity
expectedIntervalAfterCompaction = new ArrayList<>(); expectedIntervalAfterCompaction = new ArrayList<>();
// The previous segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR) will be dropped // The previous segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR) will be dropped
@ -491,6 +550,71 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
} }
} }
@Test
public void testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranularityCoveringMultipleSegmentsInTimelineAndDropExistingFalse() throws Exception
{
loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
intervalsBeforeCompaction.sort(null);
// 4 segments across 2 days (4 total)...
verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
Granularity newGranularity = Granularities.YEAR;
// Set dropExisting to false
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), false);
List<String> expectedIntervalAfterCompaction = new ArrayList<>();
// We wil have one segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR)
for (String interval : intervalsBeforeCompaction) {
for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
expectedIntervalAfterCompaction.add(newinterval.toString());
}
}
forceTriggerAutoCompaction(1);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(expectedIntervalAfterCompaction);
loadData(INDEX_TASK);
verifySegmentsCount(5);
verifyQuery(INDEX_QUERIES_RESOURCE);
// 5 segments. 1 compacted YEAR segment and 4 newly ingested DAY segments across 2 days
// We wil have one segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR) from the compaction earlier
// two segments with interval of 2013-08-31/2013-09-01 (newly ingested with DAY)
// and two segments with interval of 2013-09-01/2013-09-02 (newly ingested with DAY)
expectedIntervalAfterCompaction.addAll(intervalsBeforeCompaction);
checkCompactionIntervals(expectedIntervalAfterCompaction);
newGranularity = Granularities.MONTH;
// Set dropExisting to false
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), false);
// Since dropExisting is set to true...
// This will submit a single compaction task for interval of 2013-01-01/2014-01-01 with MONTH granularity
expectedIntervalAfterCompaction = new ArrayList<>();
// Since dropExisting is set to false...
// We wil have one segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR) from before the compaction
for (String interval : intervalsBeforeCompaction) {
for (Interval newinterval : Granularities.YEAR.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
expectedIntervalAfterCompaction.add(newinterval.toString());
}
}
// one segments with interval of 2013-09-01/2013-10-01 (compacted with MONTH)
// and one segments with interval of 2013-10-01/2013-11-01 (compacted with MONTH)
for (String interval : intervalsBeforeCompaction) {
for (Interval newinterval : Granularities.MONTH.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
expectedIntervalAfterCompaction.add(newinterval.toString());
}
}
forceTriggerAutoCompaction(3);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(3, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(expectedIntervalAfterCompaction);
}
}
private void loadData(String indexTask) throws Exception private void loadData(String indexTask) throws Exception
{ {
String taskSpec = getResourceAsString(indexTask); String taskSpec = getResourceAsString(indexTask);
@ -537,14 +661,20 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest, UserCompactionTaskGranularityConfig granularitySpec) throws Exception private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest, UserCompactionTaskGranularityConfig granularitySpec) throws Exception
{ {
submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null), skipOffsetFromLatest, 1, granularitySpec); submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, granularitySpec, false);
}
private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest, UserCompactionTaskGranularityConfig granularitySpec, boolean dropExisting) throws Exception
{
submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null), skipOffsetFromLatest, 1, granularitySpec, dropExisting);
} }
private void submitCompactionConfig( private void submitCompactionConfig(
PartitionsSpec partitionsSpec, PartitionsSpec partitionsSpec,
Period skipOffsetFromLatest, Period skipOffsetFromLatest,
int maxNumConcurrentSubTasks, int maxNumConcurrentSubTasks,
UserCompactionTaskGranularityConfig granularitySpec UserCompactionTaskGranularityConfig granularitySpec,
boolean dropExisting
) throws Exception ) throws Exception
{ {
DataSourceCompactionConfig compactionConfig = new DataSourceCompactionConfig( DataSourceCompactionConfig compactionConfig = new DataSourceCompactionConfig(
@ -573,6 +703,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
1 1
), ),
granularitySpec, granularitySpec,
!dropExisting ? null : new UserCompactionTaskIOConfig(true),
null null
); );
compactionResource.submitCompactionConfig(compactionConfig); compactionResource.submitCompactionConfig(compactionConfig);

View File

@ -21,7 +21,9 @@ package org.apache.druid.client.indexing;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.segment.indexing.IOConfig;
import javax.annotation.Nullable;
import java.util.Objects; import java.util.Objects;
/** /**
@ -34,11 +36,16 @@ public class ClientCompactionIOConfig
private static final String TYPE = "compact"; private static final String TYPE = "compact";
private final ClientCompactionIntervalSpec inputSpec; private final ClientCompactionIntervalSpec inputSpec;
private final boolean dropExisting;
@JsonCreator @JsonCreator
public ClientCompactionIOConfig(@JsonProperty("inputSpec") ClientCompactionIntervalSpec inputSpec) public ClientCompactionIOConfig(
@JsonProperty("inputSpec") ClientCompactionIntervalSpec inputSpec,
@JsonProperty("dropExisting") @Nullable Boolean dropExisting
)
{ {
this.inputSpec = inputSpec; this.inputSpec = inputSpec;
this.dropExisting = dropExisting == null ? IOConfig.DEFAULT_DROP_EXISTING : dropExisting;
} }
@JsonProperty @JsonProperty
@ -53,6 +60,12 @@ public class ClientCompactionIOConfig
return inputSpec; return inputSpec;
} }
@JsonProperty
public boolean isDropExisting()
{
return dropExisting;
}
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
@ -63,13 +76,14 @@ public class ClientCompactionIOConfig
return false; return false;
} }
ClientCompactionIOConfig that = (ClientCompactionIOConfig) o; ClientCompactionIOConfig that = (ClientCompactionIOConfig) o;
return Objects.equals(inputSpec, that.inputSpec); return dropExisting == that.dropExisting &&
Objects.equals(inputSpec, that.inputSpec);
} }
@Override @Override
public int hashCode() public int hashCode()
{ {
return Objects.hash(inputSpec); return Objects.hash(inputSpec, dropExisting);
} }
@Override @Override
@ -77,6 +91,7 @@ public class ClientCompactionIOConfig
{ {
return "ClientCompactionIOConfig{" + return "ClientCompactionIOConfig{" +
"inputSpec=" + inputSpec + "inputSpec=" + inputSpec +
", dropExisting=" + dropExisting +
'}'; '}';
} }
} }

View File

@ -80,6 +80,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
int compactionTaskPriority, int compactionTaskPriority,
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig, @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
@Nullable ClientCompactionTaskGranularitySpec granularitySpec, @Nullable ClientCompactionTaskGranularitySpec granularitySpec,
@Nullable Boolean dropExisting,
@Nullable Map<String, Object> context @Nullable Map<String, Object> context
) )
{ {
@ -98,7 +99,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
final ClientTaskQuery taskQuery = new ClientCompactionTaskQuery( final ClientTaskQuery taskQuery = new ClientCompactionTaskQuery(
taskId, taskId,
dataSource, dataSource,
new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments)), new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments), dropExisting),
tuningConfig, tuningConfig,
granularitySpec, granularitySpec,
context context

View File

@ -41,6 +41,7 @@ public interface IndexingServiceClient
int compactionTaskPriority, int compactionTaskPriority,
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig, @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
@Nullable ClientCompactionTaskGranularitySpec granularitySpec, @Nullable ClientCompactionTaskGranularitySpec granularitySpec,
@Nullable Boolean dropExisting,
@Nullable Map<String, Object> context @Nullable Map<String, Object> context
); );

View File

@ -30,4 +30,5 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
}) })
public interface IOConfig public interface IOConfig
{ {
boolean DEFAULT_DROP_EXISTING = false;
} }

View File

@ -47,6 +47,7 @@ public class DataSourceCompactionConfig
private final Period skipOffsetFromLatest; private final Period skipOffsetFromLatest;
private final UserCompactionTaskQueryTuningConfig tuningConfig; private final UserCompactionTaskQueryTuningConfig tuningConfig;
private final UserCompactionTaskGranularityConfig granularitySpec; private final UserCompactionTaskGranularityConfig granularitySpec;
private final UserCompactionTaskIOConfig ioConfig;
private final Map<String, Object> taskContext; private final Map<String, Object> taskContext;
@JsonCreator @JsonCreator
@ -58,6 +59,7 @@ public class DataSourceCompactionConfig
@JsonProperty("skipOffsetFromLatest") @Nullable Period skipOffsetFromLatest, @JsonProperty("skipOffsetFromLatest") @Nullable Period skipOffsetFromLatest,
@JsonProperty("tuningConfig") @Nullable UserCompactionTaskQueryTuningConfig tuningConfig, @JsonProperty("tuningConfig") @Nullable UserCompactionTaskQueryTuningConfig tuningConfig,
@JsonProperty("granularitySpec") @Nullable UserCompactionTaskGranularityConfig granularitySpec, @JsonProperty("granularitySpec") @Nullable UserCompactionTaskGranularityConfig granularitySpec,
@JsonProperty("ioConfig") @Nullable UserCompactionTaskIOConfig ioConfig,
@JsonProperty("taskContext") @Nullable Map<String, Object> taskContext @JsonProperty("taskContext") @Nullable Map<String, Object> taskContext
) )
{ {
@ -71,6 +73,7 @@ public class DataSourceCompactionConfig
this.maxRowsPerSegment = maxRowsPerSegment; this.maxRowsPerSegment = maxRowsPerSegment;
this.skipOffsetFromLatest = skipOffsetFromLatest == null ? DEFAULT_SKIP_OFFSET_FROM_LATEST : skipOffsetFromLatest; this.skipOffsetFromLatest = skipOffsetFromLatest == null ? DEFAULT_SKIP_OFFSET_FROM_LATEST : skipOffsetFromLatest;
this.tuningConfig = tuningConfig; this.tuningConfig = tuningConfig;
this.ioConfig = ioConfig;
if (granularitySpec != null) { if (granularitySpec != null) {
Preconditions.checkArgument( Preconditions.checkArgument(
granularitySpec.getQueryGranularity() == null, granularitySpec.getQueryGranularity() == null,
@ -119,6 +122,13 @@ public class DataSourceCompactionConfig
return tuningConfig; return tuningConfig;
} }
@JsonProperty
@Nullable
public UserCompactionTaskIOConfig getIoConfig()
{
return ioConfig;
}
@JsonProperty @JsonProperty
@Nullable @Nullable
public UserCompactionTaskGranularityConfig getGranularitySpec() public UserCompactionTaskGranularityConfig getGranularitySpec()
@ -150,6 +160,7 @@ public class DataSourceCompactionConfig
Objects.equals(skipOffsetFromLatest, that.skipOffsetFromLatest) && Objects.equals(skipOffsetFromLatest, that.skipOffsetFromLatest) &&
Objects.equals(tuningConfig, that.tuningConfig) && Objects.equals(tuningConfig, that.tuningConfig) &&
Objects.equals(granularitySpec, that.granularitySpec) && Objects.equals(granularitySpec, that.granularitySpec) &&
Objects.equals(ioConfig, that.ioConfig) &&
Objects.equals(taskContext, that.taskContext); Objects.equals(taskContext, that.taskContext);
} }
@ -164,6 +175,7 @@ public class DataSourceCompactionConfig
skipOffsetFromLatest, skipOffsetFromLatest,
tuningConfig, tuningConfig,
granularitySpec, granularitySpec,
ioConfig,
taskContext taskContext
); );
} }

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.segment.indexing.IOConfig;
import javax.annotation.Nullable;
import java.util.Objects;
/**
* Spec containing IO configs for Auto Compaction.
* This class mimics JSON field names for fields supported in auto compaction with
* the corresponding fields in {@link IOConfig}.
* This is done for end-user ease of use. Basically, end-user will use the same syntax / JSON structure to set
* IO configs for Auto Compaction as they would for any other ingestion task.
* Note that this class simply holds IO configs and pass it to compaction task spec.
*/
public class UserCompactionTaskIOConfig
{
private final boolean dropExisting;
@JsonCreator
public UserCompactionTaskIOConfig(
@JsonProperty("dropExisting") @Nullable Boolean dropExisting
)
{
this.dropExisting = dropExisting == null ? IOConfig.DEFAULT_DROP_EXISTING : dropExisting;
}
@JsonProperty
public boolean isDropExisting()
{
return dropExisting;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
UserCompactionTaskIOConfig that = (UserCompactionTaskIOConfig) o;
return dropExisting == that.dropExisting;
}
@Override
public int hashCode()
{
return Objects.hash(dropExisting);
}
@Override
public String toString()
{
return "UserCompactionTaskIOConfig{" +
"dropExisting=" + dropExisting +
'}';
}
}

View File

@ -320,6 +320,11 @@ public class CompactSegments implements CoordinatorDuty
queryGranularitySpec = null; queryGranularitySpec = null;
} }
Boolean dropExisting = null;
if (config.getIoConfig() != null) {
dropExisting = config.getIoConfig().isDropExisting();
}
// make tuningConfig // make tuningConfig
final String taskId = indexingServiceClient.compactSegments( final String taskId = indexingServiceClient.compactSegments(
"coordinator-issued", "coordinator-issued",
@ -327,6 +332,7 @@ public class CompactSegments implements CoordinatorDuty
config.getTaskPriority(), config.getTaskPriority(),
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()), ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()),
queryGranularitySpec, queryGranularitySpec,
dropExisting,
newAutoCompactionContext(config.getTaskContext()) newAutoCompactionContext(config.getTaskContext())
); );

View File

@ -51,6 +51,7 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
int compactionTaskPriority, int compactionTaskPriority,
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig, @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
@Nullable ClientCompactionTaskGranularitySpec granularitySpec, @Nullable ClientCompactionTaskGranularitySpec granularitySpec,
@Nullable Boolean dropExisting,
@Nullable Map<String, Object> context @Nullable Map<String, Object> context
) )
{ {

View File

@ -58,6 +58,7 @@ public class DataSourceCompactionConfigTest
new Period(3600), new Period(3600),
null, null,
null, null,
null,
ImmutableMap.of("key", "val") ImmutableMap.of("key", "val")
); );
final String json = OBJECT_MAPPER.writeValueAsString(config); final String json = OBJECT_MAPPER.writeValueAsString(config);
@ -84,6 +85,7 @@ public class DataSourceCompactionConfigTest
new Period(3600), new Period(3600),
null, null,
null, null,
null,
ImmutableMap.of("key", "val") ImmutableMap.of("key", "val")
); );
final String json = OBJECT_MAPPER.writeValueAsString(config); final String json = OBJECT_MAPPER.writeValueAsString(config);
@ -127,6 +129,7 @@ public class DataSourceCompactionConfigTest
null null
), ),
null, null,
null,
ImmutableMap.of("key", "val") ImmutableMap.of("key", "val")
); );
final String json = OBJECT_MAPPER.writeValueAsString(config); final String json = OBJECT_MAPPER.writeValueAsString(config);
@ -170,6 +173,7 @@ public class DataSourceCompactionConfigTest
null null
), ),
null, null,
null,
ImmutableMap.of("key", "val") ImmutableMap.of("key", "val")
); );
@ -235,6 +239,7 @@ public class DataSourceCompactionConfigTest
new Period(3600), new Period(3600),
null, null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null), new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
null,
ImmutableMap.of("key", "val") ImmutableMap.of("key", "val")
); );
final String json = OBJECT_MAPPER.writeValueAsString(config); final String json = OBJECT_MAPPER.writeValueAsString(config);
@ -261,6 +266,7 @@ public class DataSourceCompactionConfigTest
new Period(3600), new Period(3600),
null, null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, Granularities.MONTH), new UserCompactionTaskGranularityConfig(Granularities.HOUR, Granularities.MONTH),
null,
ImmutableMap.of("key", "val") ImmutableMap.of("key", "val")
); );
} }
@ -276,6 +282,7 @@ public class DataSourceCompactionConfigTest
new Period(3600), new Period(3600),
null, null,
null, null,
null,
ImmutableMap.of("key", "val") ImmutableMap.of("key", "val")
); );
final String json = OBJECT_MAPPER.writeValueAsString(config); final String json = OBJECT_MAPPER.writeValueAsString(config);
@ -302,6 +309,7 @@ public class DataSourceCompactionConfigTest
new Period(3600), new Period(3600),
null, null,
new UserCompactionTaskGranularityConfig(null, null), new UserCompactionTaskGranularityConfig(null, null),
null,
ImmutableMap.of("key", "val") ImmutableMap.of("key", "val")
); );
final String json = OBJECT_MAPPER.writeValueAsString(config); final String json = OBJECT_MAPPER.writeValueAsString(config);
@ -316,4 +324,60 @@ public class DataSourceCompactionConfigTest
Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext()); Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec()); Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec());
} }
@Test
public void testSerdeIOConfigWithNonNullDropExisting() throws IOException
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource",
null,
500L,
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
new UserCompactionTaskIOConfig(true),
ImmutableMap.of("key", "val")
);
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class);
Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
Assert.assertEquals(25, fromJson.getTaskPriority());
Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec());
Assert.assertEquals(config.getIoConfig(), fromJson.getIoConfig());
}
@Test
public void testSerdeIOConfigWithNullDropExisting() throws IOException
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"dataSource",
null,
500L,
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
new UserCompactionTaskIOConfig(null),
ImmutableMap.of("key", "val")
);
final String json = OBJECT_MAPPER.writeValueAsString(config);
final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class);
Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
Assert.assertEquals(25, fromJson.getTaskPriority());
Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec());
Assert.assertEquals(config.getIoConfig(), fromJson.getIoConfig());
}
} }

View File

@ -66,6 +66,7 @@ import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig; import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
@ -592,6 +593,7 @@ public class CompactSegmentsTest
null null
), ),
null, null,
null,
null null
) )
); );
@ -605,6 +607,7 @@ public class CompactSegmentsTest
ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(),
ArgumentMatchers.any(), ArgumentMatchers.any(),
granularitySpecArgumentCaptor.capture(), granularitySpecArgumentCaptor.capture(),
ArgumentMatchers.any(),
ArgumentMatchers.any() ArgumentMatchers.any()
); );
// Only the same amount of segments as the original PARTITION_PER_TIME_INTERVAL since segment granulartity is the same // Only the same amount of segments as the original PARTITION_PER_TIME_INTERVAL since segment granulartity is the same
@ -612,6 +615,110 @@ public class CompactSegmentsTest
Assert.assertNull(granularitySpecArgumentCaptor.getValue()); Assert.assertNull(granularitySpecArgumentCaptor.getValue());
} }
@Test
public void testCompactWithNotNullIOConfig()
{
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, mockIndexingServiceClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
new DataSourceCompactionConfig(
dataSource,
0,
500L,
null,
new Period("PT0H"), // smaller than segment interval
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
partitionsSpec,
null,
null,
null,
null,
null,
3,
null,
null,
null,
null,
null,
null
),
null,
new UserCompactionTaskIOConfig(true),
null
)
);
doCompactSegments(compactSegments, compactionConfigs);
ArgumentCaptor<Boolean> dropExistingCapture = ArgumentCaptor.forClass(Boolean.class);
Mockito.verify(mockIndexingServiceClient).compactSegments(
ArgumentMatchers.anyString(),
ArgumentMatchers.any(),
ArgumentMatchers.anyInt(),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
dropExistingCapture.capture(),
ArgumentMatchers.any()
);
Assert.assertEquals(true, dropExistingCapture.getValue());
}
@Test
public void testCompactWithNullIOConfig()
{
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, mockIndexingServiceClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
new DataSourceCompactionConfig(
dataSource,
0,
500L,
null,
new Period("PT0H"), // smaller than segment interval
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
partitionsSpec,
null,
null,
null,
null,
null,
3,
null,
null,
null,
null,
null,
null
),
null,
null,
null
)
);
doCompactSegments(compactSegments, compactionConfigs);
ArgumentCaptor<Boolean> dropExistingCapture = ArgumentCaptor.forClass(Boolean.class);
Mockito.verify(mockIndexingServiceClient).compactSegments(
ArgumentMatchers.anyString(),
ArgumentMatchers.any(),
ArgumentMatchers.anyInt(),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
dropExistingCapture.capture(),
ArgumentMatchers.any()
);
Assert.assertNull(dropExistingCapture.getValue());
}
@Test @Test
public void testCompactWithGranularitySpec() public void testCompactWithGranularitySpec()
{ {
@ -646,6 +753,7 @@ public class CompactSegmentsTest
null null
), ),
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null),
null,
null null
) )
); );
@ -659,6 +767,7 @@ public class CompactSegmentsTest
ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(),
ArgumentMatchers.any(), ArgumentMatchers.any(),
granularitySpecArgumentCaptor.capture(), granularitySpecArgumentCaptor.capture(),
ArgumentMatchers.any(),
ArgumentMatchers.any() ArgumentMatchers.any()
); );
// All segments is compact at the same time since we changed the segment granularity to YEAR and all segment // All segments is compact at the same time since we changed the segment granularity to YEAR and all segment
@ -698,7 +807,8 @@ public class CompactSegmentsTest
new ClientCompactionIntervalSpec( new ClientCompactionIntervalSpec(
Intervals.of("2000/2099"), Intervals.of("2000/2099"),
"testSha256OfSortedSegmentIds" "testSha256OfSortedSegmentIds"
) ),
null
), ),
null, null,
new ClientCompactionTaskGranularitySpec(Granularities.DAY, null), new ClientCompactionTaskGranularitySpec(Granularities.DAY, null),
@ -737,6 +847,7 @@ public class CompactSegmentsTest
null null
), ),
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null),
null,
null null
) )
); );
@ -755,6 +866,7 @@ public class CompactSegmentsTest
ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(),
ArgumentMatchers.any(), ArgumentMatchers.any(),
granularitySpecArgumentCaptor.capture(), granularitySpecArgumentCaptor.capture(),
ArgumentMatchers.any(),
ArgumentMatchers.any() ArgumentMatchers.any()
); );
// All segments is compact at the same time since we changed the segment granularity to YEAR and all segment // All segments is compact at the same time since we changed the segment granularity to YEAR and all segment
@ -1059,6 +1171,7 @@ public class CompactSegmentsTest
null null
), ),
null, null,
null,
null null
) )
); );

View File

@ -92,6 +92,7 @@ public class NewestSegmentFirstIteratorTest
null, null,
null, null,
null, null,
null,
null null
); );
Assert.assertEquals( Assert.assertEquals(
@ -131,6 +132,7 @@ public class NewestSegmentFirstIteratorTest
null null
), ),
null, null,
null,
null null
); );
Assert.assertEquals( Assert.assertEquals(
@ -170,6 +172,7 @@ public class NewestSegmentFirstIteratorTest
null null
), ),
null, null,
null,
null null
); );
Assert.assertEquals( Assert.assertEquals(
@ -209,6 +212,7 @@ public class NewestSegmentFirstIteratorTest
null null
), ),
null, null,
null,
null null
); );
Assert.assertEquals( Assert.assertEquals(
@ -248,6 +252,7 @@ public class NewestSegmentFirstIteratorTest
null null
), ),
null, null,
null,
null null
); );
Assert.assertEquals( Assert.assertEquals(
@ -287,6 +292,7 @@ public class NewestSegmentFirstIteratorTest
null null
), ),
null, null,
null,
null null
); );
Assert.assertEquals( Assert.assertEquals(
@ -326,6 +332,7 @@ public class NewestSegmentFirstIteratorTest
null null
), ),
null, null,
null,
null null
); );
Assert.assertEquals( Assert.assertEquals(
@ -365,6 +372,7 @@ public class NewestSegmentFirstIteratorTest
null null
), ),
null, null,
null,
null null
); );
Assert.assertEquals( Assert.assertEquals(
@ -404,6 +412,7 @@ public class NewestSegmentFirstIteratorTest
null null
), ),
null, null,
null,
null null
); );
Assert.assertEquals( Assert.assertEquals(

View File

@ -1068,6 +1068,7 @@ public class NewestSegmentFirstPolicyTest
skipOffsetFromLatest, skipOffsetFromLatest,
null, null,
granularitySpec, granularitySpec,
null,
null null
); );
} }

View File

@ -1656,6 +1656,7 @@ HadoopIndexTasks
HttpEmitter HttpEmitter
HttpPostEmitter HttpPostEmitter
InetAddress.getLocalHost InetAddress.getLocalHost
IOConfig
JRE8u60 JRE8u60
KeyManager KeyManager
L1 L1