diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java index 3ddd8245f8e..c581ccc2b12 100644 --- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java @@ -103,6 +103,7 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.TimelineLookup; import org.apache.druid.timeline.VersionedIntervalTimeline; @@ -177,7 +178,7 @@ public class CachingClusteredClientBenchmark new Std() .addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE) .addValue(ObjectMapper.class.getName(), JSON_MAPPER) - .addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT) + .addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT) ); } diff --git a/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java b/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java index 40fa0c31a7b..af1f1fe57d6 100644 --- a/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java @@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.client.DataSourcesSnapshot; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.server.coordinator.helper.CompactionSegmentIterator; import org.apache.druid.server.coordinator.helper.CompactionSegmentSearchPolicy; @@ -60,7 +61,7 @@ public class NewestSegmentFirstPolicyBenchmark { private static final String DATA_SOURCE_PREFIX = "dataSource_"; - private final CompactionSegmentSearchPolicy policy = new NewestSegmentFirstPolicy(); + private final CompactionSegmentSearchPolicy policy = new NewestSegmentFirstPolicy(new DefaultObjectMapper()); @Param("100") private int numDataSources; @@ -72,7 +73,7 @@ public class NewestSegmentFirstPolicyBenchmark private int numPartitionsPerDayInterval; @Param("800000000") - private long targetCompactionSizeBytes; + private long inputSegmentSizeBytes; @Param("1000000") private long segmentSizeBytes; @@ -94,8 +95,7 @@ public class NewestSegmentFirstPolicyBenchmark new DataSourceCompactionConfig( dataSource, 0, - targetCompactionSizeBytes, - targetCompactionSizeBytes, + inputSegmentSizeBytes, null, null, null, diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/DynamicPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/DynamicPartitionsSpec.java index 242c0918c67..141af59d1c6 100644 --- a/core/src/main/java/org/apache/druid/indexer/partitions/DynamicPartitionsSpec.java +++ b/core/src/main/java/org/apache/druid/indexer/partitions/DynamicPartitionsSpec.java @@ -30,11 +30,15 @@ import java.util.Objects; */ public class DynamicPartitionsSpec implements PartitionsSpec { + /** + * Default maxTotalRows for most task types except compaction task. + */ public static final long DEFAULT_MAX_TOTAL_ROWS = 20_000_000; static final String NAME = "dynamic"; private final int maxRowsPerSegment; - private final long maxTotalRows; + @Nullable + private final Long maxTotalRows; @JsonCreator public DynamicPartitionsSpec( @@ -45,7 +49,7 @@ public class DynamicPartitionsSpec implements PartitionsSpec this.maxRowsPerSegment = PartitionsSpec.isEffectivelyNull(maxRowsPerSegment) ? DEFAULT_MAX_ROWS_PER_SEGMENT : maxRowsPerSegment; - this.maxTotalRows = PartitionsSpec.isEffectivelyNull(maxTotalRows) ? DEFAULT_MAX_TOTAL_ROWS : maxTotalRows; + this.maxTotalRows = maxTotalRows; } @Override @@ -55,12 +59,22 @@ public class DynamicPartitionsSpec implements PartitionsSpec return maxRowsPerSegment; } + @Nullable @JsonProperty - public long getMaxTotalRows() + public Long getMaxTotalRows() { return maxTotalRows; } + /** + * Get the given maxTotalRows or the default. + * The default can be different depending on the caller. + */ + public long getMaxTotalRowsOr(long defaultMaxTotalRows) + { + return PartitionsSpec.isEffectivelyNull(maxTotalRows) ? defaultMaxTotalRows : maxTotalRows; + } + @Override public boolean needsDeterminePartitions(boolean useForHadoopTask) { @@ -78,7 +92,7 @@ public class DynamicPartitionsSpec implements PartitionsSpec } DynamicPartitionsSpec that = (DynamicPartitionsSpec) o; return maxRowsPerSegment == that.maxRowsPerSegment && - maxTotalRows == that.maxTotalRows; + Objects.equals(maxTotalRows, that.maxTotalRows); } @Override diff --git a/core/src/main/java/org/apache/druid/timeline/CompactionState.java b/core/src/main/java/org/apache/druid/timeline/CompactionState.java new file mode 100644 index 00000000000..c30f427c19c --- /dev/null +++ b/core/src/main/java/org/apache/druid/timeline/CompactionState.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.timeline; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexer.partitions.PartitionsSpec; + +import java.util.Map; +import java.util.Objects; + +/** + * This class describes what compaction task spec was used to create a given segment. + * The compaction task is a task that reads Druid segments and overwrites them with new ones. Since this task always + * reads segments in the same order, the same task spec will always create the same set of segments + * (not same segment ID, but same content). + * + * Note that this class doesn't include all fields in the compaction task spec. Only the configurations that can + * affect the content of segment should be included. + * + * @see DataSegment#lastCompactionState + */ +public class CompactionState +{ + private final PartitionsSpec partitionsSpec; + // org.apache.druid.segment.IndexSpec cannot be used here because it's in the 'processing' module which + // has a dependency on the 'core' module where this class is. + private final Map indexSpec; + + @JsonCreator + public CompactionState( + @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec, + @JsonProperty("indexSpec") Map indexSpec + ) + { + this.partitionsSpec = partitionsSpec; + this.indexSpec = indexSpec; + } + + @JsonProperty + public PartitionsSpec getPartitionsSpec() + { + return partitionsSpec; + } + + @JsonProperty + public Map getIndexSpec() + { + return indexSpec; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CompactionState that = (CompactionState) o; + return Objects.equals(partitionsSpec, that.partitionsSpec) && + Objects.equals(indexSpec, that.indexSpec); + } + + @Override + public int hashCode() + { + return Objects.hash(partitionsSpec, indexSpec); + } + + @Override + public String toString() + { + return "CompactionState{" + + "partitionsSpec=" + partitionsSpec + + ", indexSpec=" + indexSpec + + '}'; + } +} diff --git a/core/src/main/java/org/apache/druid/timeline/DataSegment.java b/core/src/main/java/org/apache/druid/timeline/DataSegment.java index c427f6a04ea..6475dc49c3f 100644 --- a/core/src/main/java/org/apache/druid/timeline/DataSegment.java +++ b/core/src/main/java/org/apache/druid/timeline/DataSegment.java @@ -36,15 +36,12 @@ import it.unimi.dsi.fastutil.objects.Object2ObjectArrayMap; import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.jackson.CommaListJoinDeserializer; import org.apache.druid.jackson.CommaListJoinSerializer; -import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.ShardSpec; -import org.joda.time.DateTime; import org.joda.time.Interval; import javax.annotation.Nullable; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -64,21 +61,23 @@ public class DataSegment implements Comparable, Overshadowable STRING_INTERNER = Interners.newWeakInterner(); private static final Interner> DIMENSIONS_INTERNER = Interners.newWeakInterner(); private static final Interner> METRICS_INTERNER = Interners.newWeakInterner(); + private static final Interner COMPACTION_STATE_INTERNER = Interners.newWeakInterner(); private static final Map PRUNED_LOAD_SPEC = ImmutableMap.of( "load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space", "" @@ -91,6 +90,16 @@ public class DataSegment implements Comparable, Overshadowable dimensions; private final List metrics; private final ShardSpec shardSpec; + + /** + * Stores some configurations of the compaction task which created this segment. + * This field is filled in the metadata store only when "storeCompactionState" is set true in the context of the + * compaction task which is false by default. + * Also, this field can be pruned in many Druid modules when this class is loaded from the metadata store. + * See {@link PruneLastCompactionState} for details. + */ + @Nullable + private final CompactionState lastCompactionState; private final long size; public DataSegment( @@ -99,6 +108,7 @@ public class DataSegment implements Comparable, Overshadowable dimensions, List metrics, ShardSpec shardSpec, + CompactionState lastCompactionState, Integer binaryVersion, long size ) @@ -111,6 +121,7 @@ public class DataSegment implements Comparable, Overshadowable, Overshadowable loadSpec, + List dimensions, + List metrics, + ShardSpec shardSpec, + CompactionState lastCompactionState, + Integer binaryVersion, + long size + ) + { + this( + dataSource, + interval, + version, + loadSpec, + dimensions, + metrics, + shardSpec, + lastCompactionState, binaryVersion, size, - PruneLoadSpecHolder.DEFAULT + PruneSpecsHolder.DEFAULT ); } @@ -158,18 +197,22 @@ public class DataSegment implements Comparable, Overshadowable metrics, @JsonProperty("shardSpec") @Nullable ShardSpec shardSpec, + @JsonProperty("lastCompactionState") @Nullable CompactionState lastCompactionState, @JsonProperty("binaryVersion") Integer binaryVersion, @JsonProperty("size") long size, - @JacksonInject PruneLoadSpecHolder pruneLoadSpecHolder + @JacksonInject PruneSpecsHolder pruneSpecsHolder ) { this.id = SegmentId.of(dataSource, interval, version, shardSpec); - this.loadSpec = pruneLoadSpecHolder.pruneLoadSpec ? PRUNED_LOAD_SPEC : prepareLoadSpec(loadSpec); + this.loadSpec = pruneSpecsHolder.pruneLoadSpec ? PRUNED_LOAD_SPEC : prepareLoadSpec(loadSpec); // Deduplicating dimensions and metrics lists as a whole because they are very likely the same for the same // dataSource this.dimensions = prepareDimensionsOrMetrics(dimensions, DIMENSIONS_INTERNER); this.metrics = prepareDimensionsOrMetrics(metrics, METRICS_INTERNER); this.shardSpec = (shardSpec == null) ? new NumberedShardSpec(0, 1) : shardSpec; + this.lastCompactionState = pruneSpecsHolder.pruneLastCompactionState + ? null + : prepareCompactionState(lastCompactionState); this.binaryVersion = binaryVersion; this.size = size; } @@ -188,6 +231,15 @@ public class DataSegment implements Comparable, Overshadowable prepareDimensionsOrMetrics(@Nullable List list, Interner> interner) { if (list == null) { @@ -256,6 +308,13 @@ public class DataSegment implements Comparable, Overshadowable, Overshadowable shuffled = new ArrayList<>(Arrays.asList(sortedOrder)); - Collections.shuffle(shuffled); - - Set theSet = new TreeSet<>(DataSegment.bucketMonthComparator()); - theSet.addAll(shuffled); - - int index = 0; - for (DataSegment dataSegment : theSet) { - Assert.assertEquals(sortedOrder[index], dataSegment); - ++index; - } - } - private DataSegment makeDataSegment(String dataSource, String interval, String version) { return DataSegment.builder() diff --git a/core/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java b/core/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java index 4e69b4e8fe2..e5cb4faafcf 100644 --- a/core/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java +++ b/core/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java @@ -29,6 +29,7 @@ import org.apache.druid.TestObjectMapper; import org.apache.druid.jackson.CommaListJoinDeserializer; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.jackson.JacksonUtils; +import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.ShardSpec; import org.joda.time.Interval; @@ -50,7 +51,7 @@ public class SegmentWithOvershadowedStatusTest public void setUp() { InjectableValues.Std injectableValues = new InjectableValues.Std(); - injectableValues.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT); + injectableValues.addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT); MAPPER.setInjectableValues(injectableValues); } @@ -68,6 +69,7 @@ public class SegmentWithOvershadowedStatusTest Arrays.asList("dim1", "dim2"), Arrays.asList("met1", "met2"), NoneShardSpec.instance(), + null, TEST_VERSION, 1 ); @@ -79,7 +81,7 @@ public class SegmentWithOvershadowedStatusTest JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT ); - Assert.assertEquals(11, objectMap.size()); + Assert.assertEquals(12, objectMap.size()); Assert.assertEquals("something", objectMap.get("dataSource")); Assert.assertEquals(interval.toString(), objectMap.get("interval")); Assert.assertEquals("1", objectMap.get("version")); @@ -133,6 +135,7 @@ class TestSegmentWithOvershadowedStatus extends DataSegment @Nullable List metrics, @JsonProperty("shardSpec") @Nullable ShardSpec shardSpec, + @JsonProperty("lasCompactionState") @Nullable CompactionState lastCompactionState, @JsonProperty("binaryVersion") Integer binaryVersion, @JsonProperty("size") long size, @JsonProperty("overshadowed") boolean overshadowed @@ -146,6 +149,7 @@ class TestSegmentWithOvershadowedStatus extends DataSegment dimensions, metrics, shardSpec, + lastCompactionState, binaryVersion, size ); diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 3d0e7f6a90a..56a94ee41e9 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -786,8 +786,7 @@ A description of the compaction config is: |`dataSource`|dataSource name to be compacted.|yes| |`taskPriority`|[Priority](../ingestion/tasks.html#priority) of compaction task.|no (default = 25)| |`inputSegmentSizeBytes`|Maximum number of total segment bytes processed per compaction task. Since a time chunk must be processed in its entirety, if the segments for a particular time chunk have a total size in bytes greater than this parameter, compaction will not run for that time chunk. Because each compaction task runs with a single thread, setting this value too far above 1–2GB will result in compaction tasks taking an excessive amount of time.|no (default = 419430400)| -|`targetCompactionSizeBytes`|The target segment size, for each segment, after compaction. The actual sizes of compacted segments might be slightly larger or smaller than this value. Each compaction task may generate more than one output segment, and it will try to keep each output segment close to this configured size. This configuration cannot be used together with `maxRowsPerSegment`.|no (default = 419430400)| -|`maxRowsPerSegment`|Max number of rows per segment after compaction. This configuration cannot be used together with `targetCompactionSizeBytes`.|no| +|`maxRowsPerSegment`|Max number of rows per segment after compaction.|no| |`skipOffsetFromLatest`|The offset for searching segments to be compacted. Strongly recommended to set for realtime dataSources. |no (default = "P1D")| |`tuningConfig`|Tuning config for compaction tasks. See below [Compaction Task TuningConfig](#compaction-tuningconfig).|no| |`taskContext`|[Task context](../ingestion/tasks.html#context) for compaction tasks.|no| diff --git a/docs/design/coordinator.md b/docs/design/coordinator.md index 8c2483b7191..80721bcace8 100644 --- a/docs/design/coordinator.md +++ b/docs/design/coordinator.md @@ -63,11 +63,11 @@ To ensure an even distribution of segments across Historical processes in the cl ### Compacting Segments -Each run, the Druid Coordinator compacts small segments abutting each other. This is useful when you have a lot of small -segments which may degrade query performance as well as increase disk space usage. See [Segment Size Optimization](../operations/segment-optimization.md) for details. +Each run, the Druid Coordinator compacts segments by merging small segments or splitting a large one. This is useful when your segments are not optimized +in terms of segment size which may degrade query performance. See [Segment Size Optimization](../operations/segment-optimization.md) for details. -The Coordinator first finds the segments to compact together based on the [segment search policy](#segment-search-policy). -Once some segments are found, it launches a [compaction task](../ingestion/tasks.md#compact) to compact those segments. +The Coordinator first finds the segments to compact based on the [segment search policy](#segment-search-policy). +Once some segments are found, it issues a [compaction task](../ingestion/tasks.md#compact) to compact those segments. The maximum number of running compaction tasks is `min(sum of worker capacity * slotRatio, maxSlots)`. Note that even though `min(sum of worker capacity * slotRatio, maxSlots)` = 0, at least one compaction task is always submitted if the compaction is enabled for a dataSource. @@ -76,30 +76,41 @@ See [Compaction Configuration API](../operations/api-reference.html#compaction-c Compaction tasks might fail due to the following reasons. - If the input segments of a compaction task are removed or overshadowed before it starts, that compaction task fails immediately. -- If a task of a higher priority acquires a lock for an interval overlapping with the interval of a compaction task, the compaction task fails. +- If a task of a higher priority acquires a [time chunk lock](../ingestion/tasks.html#locking) for an interval overlapping with the interval of a compaction task, the compaction task fails. -Once a compaction task fails, the Coordinator simply finds the segments for the interval of the failed task again, and launches a new compaction task in the next run. +Once a compaction task fails, the Coordinator simply checks the segments in the interval of the failed task again, and issues another compaction task in the next run. ### Segment search policy -#### Newest segment first policy +#### Recent segment first policy -At every coordinator run, this policy searches for segments to compact by iterating segments from the latest to the oldest. -Once it finds the latest segment among all dataSources, it checks if the segment is _compactable_ with other segments of the same dataSource which have the same or abutting intervals. -Note that segments are compactable if their total size is smaller than or equal to the configured `inputSegmentSizeBytes`. +At every coordinator run, this policy looks up time chunks in order of newest-to-oldest and checks whether the segments in those time chunks +need compaction or not. +A set of segments need compaction if all conditions below are satisfied. -Here are some details with an example. Let us assume we have two dataSources (`foo`, `bar`) -and 5 segments (`foo_2017-10-01T00:00:00.000Z_2017-11-01T00:00:00.000Z_VERSION`, `foo_2017-11-01T00:00:00.000Z_2017-12-01T00:00:00.000Z_VERSION`, `bar_2017-08-01T00:00:00.000Z_2017-09-01T00:00:00.000Z_VERSION`, `bar_2017-09-01T00:00:00.000Z_2017-10-01T00:00:00.000Z_VERSION`, `bar_2017-10-01T00:00:00.000Z_2017-11-01T00:00:00.000Z_VERSION`). -When each segment has the same size of 10 MB and `inputSegmentSizeBytes` is 20 MB, this policy first returns two segments (`foo_2017-10-01T00:00:00.000Z_2017-11-01T00:00:00.000Z_VERSION` and `foo_2017-11-01T00:00:00.000Z_2017-12-01T00:00:00.000Z_VERSION`) to compact together because -`foo_2017-11-01T00:00:00.000Z_2017-12-01T00:00:00.000Z_VERSION` is the latest segment and `foo_2017-10-01T00:00:00.000Z_2017-11-01T00:00:00.000Z_VERSION` abuts to it. +1) Total size of segments in the time chunk is smaller than or equal to the configured `inputSegmentSizeBytes`. +2) Segments have never been compacted yet or compaction spec has been updated since the last compaction, especially `maxRowsPerSegment`, `maxTotalRows`, and `indexSpec`. -If the coordinator has enough task slots for compaction, this policy would continue searching for the next segments and return -`bar_2017-10-01T00:00:00.000Z_2017-11-01T00:00:00.000Z_VERSION` and `bar_2017-09-01T00:00:00.000Z_2017-10-01T00:00:00.000Z_VERSION`. -Note that `bar_2017-08-01T00:00:00.000Z_2017-09-01T00:00:00.000Z_VERSION` is not compacted together even though it abuts to `bar_2017-09-01T00:00:00.000Z_2017-10-01T00:00:00.000Z_VERSION`. -This is because the total segment size to compact would be greater than `inputSegmentSizeBytes` if it's included. +Here are some details with an example. Suppose we have two dataSources (`foo`, `bar`) as seen below: + +- `foo` + - `foo_2017-11-01T00:00:00.000Z_2017-12-01T00:00:00.000Z_VERSION` + - `foo_2017-11-01T00:00:00.000Z_2017-12-01T00:00:00.000Z_VERSION_1` + - `foo_2017-09-01T00:00:00.000Z_2017-10-01T00:00:00.000Z_VERSION` +- `bar` + - `bar_2017-10-01T00:00:00.000Z_2017-11-01T00:00:00.000Z_VERSION` + - `bar_2017-10-01T00:00:00.000Z_2017-11-01T00:00:00.000Z_VERSION_1` + +Assuming that each segment is 10 MB and haven't been compacted yet, this policy first returns two segments of +`foo_2017-11-01T00:00:00.000Z_2017-12-01T00:00:00.000Z_VERSION` and `foo_2017-11-01T00:00:00.000Z_2017-12-01T00:00:00.000Z_VERSION_1` to compact together because +`2017-11-01T00:00:00.000Z/2017-12-01T00:00:00.000Z` is the most recent time chunk. + +If the coordinator has enough task slots for compaction, this policy will continue searching for the next segments and return +`bar_2017-10-01T00:00:00.000Z_2017-11-01T00:00:00.000Z_VERSION` and `bar_2017-10-01T00:00:00.000Z_2017-11-01T00:00:00.000Z_VERSION_1`. +Finally, `foo_2017-09-01T00:00:00.000Z_2017-10-01T00:00:00.000Z_VERSION` will be picked up even though there is only one segment in the time chunk of `2017-09-01T00:00:00.000Z/2017-10-01T00:00:00.000Z`. The search start point can be changed by setting [skipOffsetFromLatest](../configuration/index.html#compaction-dynamic-configuration). -If this is set, this policy will ignore the segments falling into the interval of (the end time of the very latest segment - `skipOffsetFromLatest`). +If this is set, this policy will ignore the segments falling into the time chunk of (the end time of the most recent segment - `skipOffsetFromLatest`). This is to avoid conflicts between compaction tasks and realtime tasks. Note that realtime tasks have a higher priority than compaction tasks by default. Realtime tasks will revoke the locks of compaction tasks if their intervals overlap, resulting in the termination of the compaction task. diff --git a/docs/ingestion/data-management.md b/docs/ingestion/data-management.md index f0cc822ca4c..893c186a0a2 100644 --- a/docs/ingestion/data-management.md +++ b/docs/ingestion/data-management.md @@ -102,7 +102,6 @@ Compaction tasks merge all segments of the given interval. The syntax is: "ioConfig": , "dimensions" , "segmentGranularity": , - "targetCompactionSizeBytes": "tuningConfig" , "context": } @@ -117,7 +116,6 @@ Compaction tasks merge all segments of the given interval. The syntax is: |`dimensionsSpec`|Custom dimensionsSpec. Compaction task will use this dimensionsSpec if exist instead of generating one. See below for more details.|No| |`metricsSpec`|Custom metricsSpec. Compaction task will use this metricsSpec if specified rather than generating one.|No| |`segmentGranularity`|If this is set, compactionTask will change the segment granularity for the given interval. See `segmentGranularity` of [`granularitySpec`](index.md#granularityspec) for more details. See the below table for the behavior.|No| -|`targetCompactionSizeBytes`|Target segment size after compaction. Cannot be used with `maxRowsPerSegment`, `maxTotalRows`, and `numShards` in tuningConfig.|No| |`tuningConfig`|[Index task tuningConfig](../ingestion/native-batch.md#tuningconfig)|No| |`context`|[Task context](../ingestion/tasks.md#context)|No| diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java index a14668b9be3..7cf7df535b5 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java @@ -43,6 +43,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.loading.LocalDataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.hadoop.conf.Configuration; @@ -79,7 +80,7 @@ public class HdfsDataSegmentPusherTest objectMapper = new TestObjectMapper(); InjectableValues.Std injectableValues = new InjectableValues.Std(); injectableValues.addValue(ObjectMapper.class, objectMapper); - injectableValues.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT); + injectableValues.addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT); objectMapper.setInjectableValues(injectableValues); } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java index 93884933dd3..3a930dbeead 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java @@ -21,7 +21,6 @@ package org.apache.druid.indexing.kafka; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig; import org.apache.druid.indexing.kafka.test.TestModifiedKafkaIndexTaskTuningConfig; import org.apache.druid.jackson.DefaultObjectMapper; @@ -63,7 +62,7 @@ public class KafkaIndexTaskTuningConfigTest Assert.assertNotNull(config.getBasePersistDirectory()); Assert.assertEquals(1000000, config.getMaxRowsInMemory()); Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue()); - Assert.assertEquals(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS, config.getMaxTotalRows().longValue()); + Assert.assertNull(config.getMaxTotalRows()); Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); Assert.assertEquals(0, config.getMaxPendingPersists()); Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); @@ -250,28 +249,4 @@ public class KafkaIndexTaskTuningConfigTest Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions()); Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions()); } - - private static KafkaIndexTaskTuningConfig copy(KafkaIndexTaskTuningConfig config) - { - return new KafkaIndexTaskTuningConfig( - config.getMaxRowsInMemory(), - config.getMaxBytesInMemory(), - config.getMaxRowsPerSegment(), - config.getMaxTotalRows(), - config.getIntermediatePersistPeriod(), - config.getBasePersistDirectory(), - 0, - config.getIndexSpec(), - config.getIndexSpecForIntermediatePersists(), - true, - config.isReportParseExceptions(), - config.getHandoffConditionTimeout(), - config.isResetOffsetAutomatically(), - config.getSegmentWriteOutMediumFactory(), - config.getIntermediateHandoffPeriod(), - config.isLogParseExceptions(), - config.getMaxParseExceptions(), - config.getMaxSavedParseExceptions() - ); - } } diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java index d0d7180a6e4..091033b3096 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java @@ -53,6 +53,7 @@ import org.apache.druid.segment.realtime.firehose.IngestSegmentFirehose; import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter; import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; import org.joda.time.Interval; import org.junit.Assert; @@ -82,7 +83,7 @@ public class BatchDeltaIngestionTest MAPPER.registerSubtypes(new NamedType(HashBasedNumberedShardSpec.class, "hashed")); InjectableValues inject = new InjectableValues.Std() .addValue(ObjectMapper.class, MAPPER) - .addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT); + .addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT); MAPPER.setInjectableValues(inject); INDEX_IO = HadoopDruidIndexerConfig.INDEX_IO; diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java index 844e26659b6..174d2817491 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java @@ -37,6 +37,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; import org.apache.druid.timeline.partition.NoneShardSpec; import org.easymock.EasyMock; import org.joda.time.Interval; @@ -65,7 +66,7 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest jsonMapper.setInjectableValues( new InjectableValues.Std() .addValue(ObjectMapper.class, jsonMapper) - .addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT) + .addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT) ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java index cbb2dcebc02..627945853f4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java @@ -163,6 +163,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera return partitionsSpec.getMaxTotalRows(); } + @Override public DynamicPartitionsSpec getPartitionsSpec() { return partitionsSpec; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 564c965723f..c48acea51b9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -40,6 +40,7 @@ import org.apache.druid.discovery.LookupNodeService; import org.apache.druid.discovery.NodeType; import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator; import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; @@ -370,7 +371,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements if (addResult.isOk()) { final boolean isPushRequired = addResult.isPushRequired( tuningConfig.getPartitionsSpec().getMaxRowsPerSegment(), - tuningConfig.getPartitionsSpec().getMaxTotalRows() + tuningConfig.getPartitionsSpec().getMaxTotalRowsOr(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS) ); if (isPushRequired) { publishSegments(driver, publisher, committerSupplier, sequenceName); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java index 13b9701e8a0..827053e8fb7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java @@ -38,7 +38,8 @@ public final class BatchAppenderators FireDepartmentMetrics metrics, TaskToolbox toolbox, DataSchema dataSchema, - AppenderatorConfig appenderatorConfig + AppenderatorConfig appenderatorConfig, + boolean storeCompactionState ) { return newAppenderator( @@ -48,7 +49,8 @@ public final class BatchAppenderators toolbox, dataSchema, appenderatorConfig, - toolbox.getSegmentPusher() + toolbox.getSegmentPusher(), + storeCompactionState ); } @@ -59,13 +61,15 @@ public final class BatchAppenderators TaskToolbox toolbox, DataSchema dataSchema, AppenderatorConfig appenderatorConfig, - DataSegmentPusher segmentPusher + DataSegmentPusher segmentPusher, + boolean storeCompactionState ) { return appenderatorsManager.createOfflineAppenderatorForTask( taskId, dataSchema, appenderatorConfig.withBasePersistDirectory(toolbox.getPersistDir()), + storeCompactionState, metrics, segmentPusher, toolbox.getObjectMapper(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 4161a35f17f..492c7125cd1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -45,7 +45,7 @@ import org.apache.druid.indexer.Checks; import org.apache.druid.indexer.Property; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; -import org.apache.druid.indexer.partitions.HashedPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentLoaderFactory; import org.apache.druid.indexing.common.TaskToolbox; @@ -58,7 +58,6 @@ import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.JodaUtils; -import org.apache.druid.java.util.common.Numbers; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; @@ -80,7 +79,6 @@ import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; -import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineLookup; @@ -131,8 +129,6 @@ public class CompactionTask extends AbstractBatchIndexTask @Nullable private final Granularity segmentGranularity; @Nullable - private final Long targetCompactionSizeBytes; - @Nullable private final IndexTuningConfig tuningConfig; private final ObjectMapper jsonMapper; @JsonIgnore @@ -184,7 +180,6 @@ public class CompactionTask extends AbstractBatchIndexTask @JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec dimensionsSpec, @JsonProperty("metricsSpec") @Nullable final AggregatorFactory[] metricsSpec, @JsonProperty("segmentGranularity") @Nullable final Granularity segmentGranularity, - @JsonProperty("targetCompactionSizeBytes") @Nullable final Long targetCompactionSizeBytes, @JsonProperty("tuningConfig") @Nullable final IndexTuningConfig tuningConfig, @JsonProperty("context") @Nullable final Map context, @JacksonInject ObjectMapper jsonMapper, @@ -220,11 +215,10 @@ public class CompactionTask extends AbstractBatchIndexTask this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec; this.metricsSpec = metricsSpec; this.segmentGranularity = segmentGranularity; - this.targetCompactionSizeBytes = targetCompactionSizeBytes; this.tuningConfig = tuningConfig; this.jsonMapper = jsonMapper; this.segmentProvider = new SegmentProvider(dataSource, this.ioConfig.getInputSpec()); - this.partitionConfigurationManager = new PartitionConfigurationManager(targetCompactionSizeBytes, tuningConfig); + this.partitionConfigurationManager = new PartitionConfigurationManager(tuningConfig); this.authorizerMapper = authorizerMapper; this.chatHandlerProvider = chatHandlerProvider; this.rowIngestionMetersFactory = rowIngestionMetersFactory; @@ -262,13 +256,6 @@ public class CompactionTask extends AbstractBatchIndexTask return segmentGranularity; } - @Nullable - @JsonProperty - public Long getTargetCompactionSizeBytes() - { - return targetCompactionSizeBytes; - } - @Nullable @JsonProperty public IndexTuningConfig getTuningConfig() @@ -437,9 +424,7 @@ public class CompactionTask extends AbstractBatchIndexTask toolbox.getIndexIO() ); - final IndexTuningConfig compactionTuningConfig = partitionConfigurationManager.computeTuningConfig( - queryableIndexAndSegments - ); + final IndexTuningConfig compactionTuningConfig = partitionConfigurationManager.computeTuningConfig(); if (segmentGranularity == null) { // original granularity @@ -801,113 +786,32 @@ public class CompactionTask extends AbstractBatchIndexTask @VisibleForTesting static class PartitionConfigurationManager { - @Nullable - private final Long targetCompactionSizeBytes; @Nullable private final IndexTuningConfig tuningConfig; - PartitionConfigurationManager(@Nullable Long targetCompactionSizeBytes, @Nullable IndexTuningConfig tuningConfig) + PartitionConfigurationManager(@Nullable IndexTuningConfig tuningConfig) { - this.targetCompactionSizeBytes = getValidTargetCompactionSizeBytes(targetCompactionSizeBytes, tuningConfig); this.tuningConfig = tuningConfig; } @Nullable - IndexTuningConfig computeTuningConfig(List> queryableIndexAndSegments) + IndexTuningConfig computeTuningConfig() { - if (!hasPartitionConfig(tuningConfig)) { - final long nonNullTargetCompactionSizeBytes = Preconditions.checkNotNull( - targetCompactionSizeBytes, - "targetCompactionSizeBytes" + IndexTuningConfig newTuningConfig = tuningConfig == null + ? IndexTuningConfig.createDefault() + : tuningConfig; + PartitionsSpec partitionsSpec = newTuningConfig.getGivenOrDefaultPartitionsSpec(); + if (partitionsSpec instanceof DynamicPartitionsSpec) { + final DynamicPartitionsSpec dynamicPartitionsSpec = (DynamicPartitionsSpec) partitionsSpec; + partitionsSpec = new DynamicPartitionsSpec( + dynamicPartitionsSpec.getMaxRowsPerSegment(), + // Setting maxTotalRows to Long.MAX_VALUE to respect the computed maxRowsPerSegment. + // If this is set to something too small, compactionTask can generate small segments + // which need to be compacted again, which in turn making auto compaction stuck in the same interval. + dynamicPartitionsSpec.getMaxTotalRowsOr(Long.MAX_VALUE) ); - // Find IndexTuningConfig.maxRowsPerSegment which is the number of rows per segment. - // Assume that the segment size is proportional to the number of rows. We can improve this later. - final long totalNumRows = queryableIndexAndSegments - .stream() - .mapToLong(queryableIndexAndDataSegment -> queryableIndexAndDataSegment.lhs.getNumRows()) - .sum(); - final long totalSizeBytes = queryableIndexAndSegments - .stream() - .mapToLong(queryableIndexAndDataSegment -> queryableIndexAndDataSegment.rhs.getSize()) - .sum(); - - if (totalSizeBytes == 0L) { - throw new ISE("Total input segment size is 0 byte"); - } - - final double avgRowsPerByte = totalNumRows / (double) totalSizeBytes; - final long maxRowsPerSegmentLong = Math.round(avgRowsPerByte * nonNullTargetCompactionSizeBytes); - final int maxRowsPerSegment = Numbers.toIntExact( - maxRowsPerSegmentLong, - StringUtils.format( - "Estimated maxRowsPerSegment[%s] is out of integer value range. " - + "Please consider reducing targetCompactionSizeBytes[%s].", - maxRowsPerSegmentLong, - targetCompactionSizeBytes - ) - ); - Preconditions.checkState(maxRowsPerSegment > 0, "Negative maxRowsPerSegment[%s]", maxRowsPerSegment); - - log.info( - "Estimated maxRowsPerSegment[%d] = avgRowsPerByte[%f] * targetCompactionSizeBytes[%d]", - maxRowsPerSegment, - avgRowsPerByte, - nonNullTargetCompactionSizeBytes - ); - // Setting maxTotalRows to Long.MAX_VALUE to respect the computed maxRowsPerSegment. - // If this is set to something too small, compactionTask can generate small segments - // which need to be compacted again, which in turn making auto compaction stuck in the same interval. - final IndexTuningConfig newTuningConfig = tuningConfig == null - ? IndexTuningConfig.createDefault() - : tuningConfig; - if (newTuningConfig.isForceGuaranteedRollup()) { - return newTuningConfig.withPartitionsSpec(new HashedPartitionsSpec(maxRowsPerSegment, null, null)); - } else { - return newTuningConfig.withPartitionsSpec(new DynamicPartitionsSpec(maxRowsPerSegment, Long.MAX_VALUE)); - } - } else { - return tuningConfig; - } - } - - /** - * Check the validity of {@link #targetCompactionSizeBytes} and return a valid value. Note that - * targetCompactionSizeBytes cannot be used with {@link IndexTuningConfig#getPartitionsSpec} together. - * {@link #hasPartitionConfig} checks one of those configs is set. - *

- * This throws an {@link IllegalArgumentException} if targetCompactionSizeBytes is set and hasPartitionConfig - * returns true. If targetCompactionSizeBytes is not set, this returns null or - * {@link DataSourceCompactionConfig#DEFAULT_TARGET_COMPACTION_SIZE_BYTES} according to the result of - * hasPartitionConfig. - */ - @Nullable - private static Long getValidTargetCompactionSizeBytes( - @Nullable Long targetCompactionSizeBytes, - @Nullable IndexTuningConfig tuningConfig - ) - { - if (targetCompactionSizeBytes != null && tuningConfig != null) { - Preconditions.checkArgument( - !hasPartitionConfig(tuningConfig), - "targetCompactionSizeBytes[%s] cannot be used with partitionsSpec[%s]", - targetCompactionSizeBytes, - tuningConfig.getPartitionsSpec() - ); - return targetCompactionSizeBytes; - } else { - return hasPartitionConfig(tuningConfig) - ? null - : DataSourceCompactionConfig.DEFAULT_TARGET_COMPACTION_SIZE_BYTES; - } - } - - private static boolean hasPartitionConfig(@Nullable IndexTuningConfig tuningConfig) - { - if (tuningConfig != null) { - return tuningConfig.getPartitionsSpec() != null; - } else { - return false; } + return newTuningConfig.withPartitionsSpec(partitionsSpec); } } @@ -931,8 +835,6 @@ public class CompactionTask extends AbstractBatchIndexTask @Nullable private Granularity segmentGranularity; @Nullable - private Long targetCompactionSizeBytes; - @Nullable private IndexTuningConfig tuningConfig; @Nullable private Map context; @@ -994,12 +896,6 @@ public class CompactionTask extends AbstractBatchIndexTask return this; } - public Builder targetCompactionSizeBytes(long targetCompactionSizeBytes) - { - this.targetCompactionSizeBytes = targetCompactionSizeBytes; - return this; - } - public Builder tuningConfig(IndexTuningConfig tuningConfig) { this.tuningConfig = tuningConfig; @@ -1025,7 +921,6 @@ public class CompactionTask extends AbstractBatchIndexTask dimensionsSpec, metricsSpec, segmentGranularity, - targetCompactionSizeBytes, tuningConfig, context, jsonMapper, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FiniteFirehoseProcessor.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FiniteFirehoseProcessor.java index 98c447100fe..a3463f41ecc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FiniteFirehoseProcessor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FiniteFirehoseProcessor.java @@ -128,7 +128,7 @@ public class FiniteFirehoseProcessor if (dynamicPartitionsSpec != null) { final boolean isPushRequired = addResult.isPushRequired( dynamicPartitionsSpec.getMaxRowsPerSegment(), - dynamicPartitionsSpec.getMaxTotalRows() + dynamicPartitionsSpec.getMaxTotalRowsOr(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS) ); if (isPushRequired) { // There can be some segments waiting for being pushed even though no more rows will be added to them diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index ae9593ee779..51b44f18d11 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -873,7 +873,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler buildSegmentsFireDepartmentMetrics, toolbox, dataSchema, - tuningConfig + tuningConfig, + getContextValue(Tasks.STORE_COMPACTION_STATE_KEY, Tasks.DEFAULT_STORE_COMPACTION_STATE) ); boolean exceptionOccurred = false; try (final BatchAppenderatorDriver driver = BatchAppenderators.newDriver(appenderator, toolbox, segmentAllocator)) { @@ -1321,6 +1322,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler @JsonProperty @Nullable + @Override public PartitionsSpec getPartitionsSpec() { return partitionsSpec; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java index 3aa617d16fa..ada53eedd1d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java @@ -37,10 +37,15 @@ public class Tasks public static final int DEFAULT_TASK_PRIORITY = 0; public static final long DEFAULT_LOCK_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5); public static final boolean DEFAULT_FORCE_TIME_CHUNK_LOCK = true; + public static final boolean DEFAULT_STORE_COMPACTION_STATE = false; public static final String PRIORITY_KEY = "priority"; public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout"; public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock"; + // This context is used in auto compaction. When it is set in the context, the segments created by the task + // will fill 'lastCompactionState' in its metadata. This will be used to track what segments are compacted or not. + // See DataSegment and NewestSegmentFirstIterator for more details. + public static final String STORE_COMPACTION_STATE_KEY = "storeCompactionState"; public static SortedSet computeCompactIntervals(SortedSet intervals) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java index 34a93599b7e..dd35a43b500 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java @@ -288,7 +288,8 @@ public class PartialSegmentGenerateTask extends AbstractBatchIndexTask toolbox, dataSchema, tuningConfig, - new ShuffleDataSegmentPusher(supervisorTaskId, getId(), toolbox.getIntermediaryDataManager()) + new ShuffleDataSegmentPusher(supervisorTaskId, getId(), toolbox.getIntermediaryDataManager()), + getContextValue(Tasks.STORE_COMPACTION_STATE_KEY, Tasks.DEFAULT_STORE_COMPACTION_STATE) ); boolean exceptionOccurred = false; try (final BatchAppenderatorDriver driver = BatchAppenderators.newDriver(appenderator, toolbox, segmentAllocator)) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index 9b2d43d041c..7f9590a11cd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -417,7 +417,8 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask fireDepartmentMetrics, toolbox, dataSchema, - tuningConfig + tuningConfig, + getContextValue(Tasks.STORE_COMPACTION_STATE_KEY, Tasks.DEFAULT_STORE_COMPACTION_STATE) ); boolean exceptionOccurred = false; try ( @@ -461,7 +462,7 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask if (addResult.isOk()) { final boolean isPushRequired = addResult.isPushRequired( partitionsSpec.getMaxRowsPerSegment(), - partitionsSpec.getMaxTotalRows() + partitionsSpec.getMaxTotalRowsOr(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS) ); if (isPushRequired) { // There can be some segments waiting for being published even though any rows won't be added to them. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 8e85c9e6697..c35830a48de 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -44,6 +44,7 @@ import org.apache.druid.discovery.LookupNodeService; import org.apache.druid.discovery.NodeType; import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.indexing.common.LockGranularity; @@ -638,7 +639,7 @@ public abstract class SeekableStreamIndexTaskRunner constructorFeeder() @@ -191,6 +207,7 @@ public class CompactionTaskRunTest extends IngestionTestBase Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1), segments.get(i).getInterval() ); + Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState()); if (lockGranularity == LockGranularity.SEGMENT) { Assert.assertEquals( new NumberedOverwriteShardSpec(32768, 0, 2, (short) 1, (short) 1), @@ -235,6 +252,7 @@ public class CompactionTaskRunTest extends IngestionTestBase Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1), segments.get(i).getInterval() ); + Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState()); if (lockGranularity == LockGranularity.SEGMENT) { Assert.assertEquals( new NumberedOverwriteShardSpec(PartitionIds.NON_ROOT_GEN_START_PARTITION_ID, 0, 2, (short) 1, (short) 1), @@ -261,6 +279,7 @@ public class CompactionTaskRunTest extends IngestionTestBase Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1), segments.get(i).getInterval() ); + Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState()); if (lockGranularity == LockGranularity.SEGMENT) { Assert.assertEquals( new NumberedOverwriteShardSpec( @@ -364,6 +383,7 @@ public class CompactionTaskRunTest extends IngestionTestBase Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1), segments.get(i).getInterval() ); + Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState()); if (lockGranularity == LockGranularity.SEGMENT) { Assert.assertEquals( new NumberedOverwriteShardSpec(PartitionIds.NON_ROOT_GEN_START_PARTITION_ID, 0, 2, (short) 1, (short) 1), @@ -408,6 +428,7 @@ public class CompactionTaskRunTest extends IngestionTestBase Assert.assertEquals(Intervals.of("2014-01-01/2014-01-02"), segments.get(0).getInterval()); Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(0).getShardSpec()); + Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(0).getLastCompactionState()); // hour segmentGranularity final CompactionTask compactionTask2 = builder @@ -425,6 +446,7 @@ public class CompactionTaskRunTest extends IngestionTestBase for (int i = 0; i < 3; i++) { Assert.assertEquals(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1), segments.get(i).getInterval()); Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(i).getShardSpec()); + Assert.assertEquals(DEFAULT_COMPACTION_STATE, segments.get(i).getLastCompactionState()); } } @@ -727,6 +749,7 @@ public class CompactionTaskRunTest extends IngestionTestBase ); task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); + task.addToContext(Tasks.STORE_COMPACTION_STATE_KEY, true); if (task.isReady(box.getTaskActionClient())) { if (readyLatchToCountDown != null) { readyLatchToCountDown.countDown(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index fd201d933c6..f8fe11a53a9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -62,7 +62,6 @@ import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; @@ -418,7 +417,6 @@ public class CompactionTaskTest Assert.assertEquals(expected.getIoConfig(), actual.getIoConfig()); Assert.assertEquals(expected.getDimensionsSpec(), actual.getDimensionsSpec()); Assert.assertArrayEquals(expected.getMetricsSpec(), actual.getMetricsSpec()); - Assert.assertEquals(expected.getTargetCompactionSizeBytes(), actual.getTargetCompactionSizeBytes()); Assert.assertEquals(expected.getTuningConfig(), actual.getTuningConfig()); Assert.assertEquals(expected.getContext(), actual.getContext()); } @@ -429,7 +427,7 @@ public class CompactionTaskTest final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), - new PartitionConfigurationManager(null, TUNING_CONFIG), + new PartitionConfigurationManager(TUNING_CONFIG), null, null, null, @@ -489,7 +487,7 @@ public class CompactionTaskTest final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), - new PartitionConfigurationManager(null, tuningConfig), + new PartitionConfigurationManager(tuningConfig), null, null, null, @@ -550,7 +548,7 @@ public class CompactionTaskTest final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), - new PartitionConfigurationManager(null, tuningConfig), + new PartitionConfigurationManager(tuningConfig), null, null, null, @@ -611,7 +609,7 @@ public class CompactionTaskTest final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), - new PartitionConfigurationManager(null, tuningConfig), + new PartitionConfigurationManager(tuningConfig), null, null, null, @@ -672,7 +670,7 @@ public class CompactionTaskTest final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), - new PartitionConfigurationManager(null, TUNING_CONFIG), + new PartitionConfigurationManager(TUNING_CONFIG), customSpec, null, null, @@ -713,7 +711,7 @@ public class CompactionTaskTest final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), - new PartitionConfigurationManager(null, TUNING_CONFIG), + new PartitionConfigurationManager(TUNING_CONFIG), null, customMetricsSpec, null, @@ -747,7 +745,7 @@ public class CompactionTaskTest final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(SEGMENTS)), - new PartitionConfigurationManager(null, TUNING_CONFIG), + new PartitionConfigurationManager(TUNING_CONFIG), null, null, null, @@ -787,7 +785,7 @@ public class CompactionTaskTest CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(segments)), - new PartitionConfigurationManager(null, TUNING_CONFIG), + new PartitionConfigurationManager(TUNING_CONFIG), null, null, null, @@ -810,7 +808,7 @@ public class CompactionTaskTest CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(segments)), - new PartitionConfigurationManager(null, TUNING_CONFIG), + new PartitionConfigurationManager(TUNING_CONFIG), null, null, null, @@ -844,59 +842,13 @@ public class CompactionTaskTest .build(); } - @Test - public void testTargetPartitionSizeWithPartitionConfig() throws IOException, SegmentLoadingException - { - final IndexTuningConfig tuningConfig = new IndexTuningConfig( - null, - null, - 500000, - 1000000L, - null, - null, - null, - null, - new HashedPartitionsSpec(6, null, null), - new IndexSpec( - new RoaringBitmapSerdeFactory(true), - CompressionStrategy.LZ4, - CompressionStrategy.LZF, - LongEncodingStrategy.LONGS - ), - null, - 5000, - true, - false, - null, - 100L, - null, - null, - null, - null - ); - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("targetCompactionSizeBytes[6] cannot be used with"); - final List ingestionSpecs = CompactionTask.createIngestionSchema( - toolbox, - new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), - new PartitionConfigurationManager(6L, tuningConfig), - null, - null, - null, - OBJECT_MAPPER, - COORDINATOR_CLIENT, - segmentLoaderFactory, - RETRY_POLICY_FACTORY - ); - } - @Test public void testSegmentGranularity() throws IOException, SegmentLoadingException { final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), - new PartitionConfigurationManager(null, TUNING_CONFIG), + new PartitionConfigurationManager(TUNING_CONFIG), null, null, new PeriodGranularity(Period.months(3), null, null), @@ -931,7 +883,7 @@ public class CompactionTaskTest final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), - new PartitionConfigurationManager(null, TUNING_CONFIG), + new PartitionConfigurationManager(TUNING_CONFIG), null, null, null, @@ -958,27 +910,6 @@ public class CompactionTaskTest ); } - @Test - public void testHugeTargetCompactionSize() - { - final PartitionConfigurationManager manager = new PartitionConfigurationManager(Long.MAX_VALUE, TUNING_CONFIG); - final TestIndexIO indexIO = (TestIndexIO) toolbox.getIndexIO(); - final Map queryableIndexMap = indexIO.getQueryableIndexMap(); - final List> segments = new ArrayList<>(); - - for (Entry entry : SEGMENT_MAP.entrySet()) { - final DataSegment segment = entry.getKey(); - final File file = entry.getValue(); - segments.add(Pair.of(Preconditions.checkNotNull(queryableIndexMap.get(file)), segment)); - } - - expectedException.expect(ArithmeticException.class); - expectedException.expectMessage( - CoreMatchers.startsWith("Estimated maxRowsPerSegment[922337203685477632] is out of integer value range.") - ); - manager.computeTuningConfig(segments); - } - private static List getExpectedDimensionsSpecForAutoGeneration() { return ImmutableList.of( @@ -1045,7 +976,7 @@ public class CompactionTaskTest null, null, null, - new HashedPartitionsSpec(41943040, null, null), // automatically computed targetPartitionSize + new HashedPartitionsSpec(null, null, null), // automatically computed targetPartitionSize new IndexSpec( new RoaringBitmapSerdeFactory(true), CompressionStrategy.LZ4, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java index 47d2bcd49db..9ab63139859 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java @@ -89,6 +89,7 @@ public class TestAppenderatorsManager implements AppenderatorsManager String taskId, DataSchema schema, AppenderatorConfig config, + boolean storeCompactionState, FireDepartmentMetrics metrics, DataSegmentPusher dataSegmentPusher, ObjectMapper objectMapper, @@ -99,6 +100,7 @@ public class TestAppenderatorsManager implements AppenderatorsManager return Appenderators.createOffline( schema, config, + storeCompactionState, metrics, dataSegmentPusher, objectMapper, diff --git a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json index 4437e725e28..a0efc0f8566 100644 --- a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json +++ b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json @@ -13,6 +13,6 @@ "is_available": 1, "is_realtime": 0, "is_overshadowed": 0, - "payload": "{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\"loadSpec\":{\"load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space\":\"\"},\"dimensions\":\"anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user\",\"metrics\":\"added,count,deleted,delta,delta_hist,unique_users,variation\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":446027801,\"identifier\":\"auth_test_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\",\"overshadowed\":false}" + "payload": "{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\"loadSpec\":{\"load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space\":\"\"},\"dimensions\":\"anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user\",\"metrics\":\"added,count,deleted,delta,delta_hist,unique_users,variation\",\"shardSpec\":{\"type\":\"none\"},\"lastCompactionState\":null,\"binaryVersion\":9,\"size\":446027801,\"identifier\":\"auth_test_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\",\"overshadowed\":false}" } ] diff --git a/processing/src/test/java/org/apache/druid/segment/TestHelper.java b/processing/src/test/java/org/apache/druid/segment/TestHelper.java index 36d6115dae0..a5dd96c4591 100644 --- a/processing/src/test/java/org/apache/druid/segment/TestHelper.java +++ b/processing/src/test/java/org/apache/druid/segment/TestHelper.java @@ -36,7 +36,7 @@ import org.apache.druid.query.timeseries.TimeseriesResultValue; import org.apache.druid.query.topn.TopNResultValue; import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; -import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; import org.junit.Assert; import java.util.HashMap; @@ -80,7 +80,7 @@ public class TestHelper new InjectableValues.Std() .addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE) .addValue(ObjectMapper.class.getName(), mapper) - .addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT) + .addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT) ); return mapper; } diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java index 887d4b86e58..ffb283cfe2c 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java @@ -22,7 +22,6 @@ package org.apache.druid.client.indexing; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import javax.annotation.Nullable; import java.util.Map; import java.util.Objects; @@ -34,8 +33,6 @@ public class ClientCompactQuery implements ClientQuery { private final String dataSource; private final ClientCompactionIOConfig ioConfig; - @Nullable - private final Long targetCompactionSizeBytes; private final ClientCompactQueryTuningConfig tuningConfig; private final Map context; @@ -43,14 +40,12 @@ public class ClientCompactQuery implements ClientQuery public ClientCompactQuery( @JsonProperty("dataSource") String dataSource, @JsonProperty("ioConfig") ClientCompactionIOConfig ioConfig, - @JsonProperty("targetCompactionSizeBytes") @Nullable Long targetCompactionSizeBytes, @JsonProperty("tuningConfig") ClientCompactQueryTuningConfig tuningConfig, @JsonProperty("context") Map context ) { this.dataSource = dataSource; this.ioConfig = ioConfig; - this.targetCompactionSizeBytes = targetCompactionSizeBytes; this.tuningConfig = tuningConfig; this.context = context; } @@ -75,13 +70,6 @@ public class ClientCompactQuery implements ClientQuery return ioConfig; } - @JsonProperty - @Nullable - public Long getTargetCompactionSizeBytes() - { - return targetCompactionSizeBytes; - } - @JsonProperty public ClientCompactQueryTuningConfig getTuningConfig() { @@ -106,7 +94,6 @@ public class ClientCompactQuery implements ClientQuery ClientCompactQuery that = (ClientCompactQuery) o; return Objects.equals(dataSource, that.dataSource) && Objects.equals(ioConfig, that.ioConfig) && - Objects.equals(targetCompactionSizeBytes, that.targetCompactionSizeBytes) && Objects.equals(tuningConfig, that.tuningConfig) && Objects.equals(context, that.context); } @@ -114,7 +101,7 @@ public class ClientCompactQuery implements ClientQuery @Override public int hashCode() { - return Objects.hash(dataSource, ioConfig, targetCompactionSizeBytes, tuningConfig, context); + return Objects.hash(dataSource, ioConfig, tuningConfig, context); } @Override @@ -123,7 +110,6 @@ public class ClientCompactQuery implements ClientQuery return "ClientCompactQuery{" + "dataSource='" + dataSource + '\'' + ", ioConfig=" + ioConfig + - ", targetCompactionSizeBytes=" + targetCompactionSizeBytes + ", tuningConfig=" + tuningConfig + ", context=" + context + '}'; diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java index 343bf49e445..cedacc8498d 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java @@ -116,6 +116,11 @@ public class ClientCompactQueryTuningConfig return maxTotalRows; } + public long getMaxTotalRowsOr(long defaultMaxTotalRows) + { + return maxTotalRows == null ? defaultMaxTotalRows : maxTotalRows; + } + @JsonProperty @Nullable public IndexSpec getIndexSpec() diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java index 0efb9c97c84..38e2501a4ad 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java @@ -73,13 +73,12 @@ public class HttpIndexingServiceClient implements IndexingServiceClient @Override public String compactSegments( List segments, - @Nullable Long targetCompactionSizeBytes, int compactionTaskPriority, ClientCompactQueryTuningConfig tuningConfig, @Nullable Map context ) { - Preconditions.checkArgument(segments.size() > 1, "Expect two or more segments to compact"); + Preconditions.checkArgument(!segments.isEmpty(), "Expect non-empty segments to compact"); final String dataSource = segments.get(0).getDataSource(); Preconditions.checkArgument( @@ -94,7 +93,6 @@ public class HttpIndexingServiceClient implements IndexingServiceClient new ClientCompactQuery( dataSource, new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments)), - targetCompactionSizeBytes, tuningConfig, context ) diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java index 39fd93fa772..4843d9ff706 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java @@ -37,7 +37,6 @@ public interface IndexingServiceClient String compactSegments( List segments, - @Nullable Long targetCompactionSizeBytes, int compactionTaskPriority, @Nullable ClientCompactQueryTuningConfig tuningConfig, @Nullable Map context diff --git a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java index c62c52bff5e..827a030d38c 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.io.Files; +import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.java.util.common.ISE; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig; @@ -225,6 +226,12 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig return maxPendingPersists; } + @Override + public PartitionsSpec getPartitionsSpec() + { + throw new UnsupportedOperationException(); + } + @JsonProperty public ShardSpec getShardSpec() { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/FireDepartment.java b/server/src/main/java/org/apache/druid/segment/realtime/FireDepartment.java index f448ce1df0d..2edf910c003 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/FireDepartment.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/FireDepartment.java @@ -27,7 +27,6 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.IngestionSpec; import org.apache.druid.segment.indexing.RealtimeIOConfig; import org.apache.druid.segment.indexing.RealtimeTuningConfig; -import org.apache.druid.segment.realtime.plumber.Plumber; import java.io.IOException; @@ -88,11 +87,6 @@ public class FireDepartment extends IngestionSpec( - String.CASE_INSENSITIVE_ORDER - ), - objectMapper, - emitter, - conglomerate, - queryExecutorService, - Preconditions.checkNotNull(cache, "cache"), - cacheConfig, - cachePopulatorStats - ), - indexIO, - indexMerger, - cache - ); - log.info("Created Appenderator for dataSource[%s].", schema.getDataSource()); - } - /** * This constructor allows the caller to provide its own SinkQuerySegmentWalker. * @@ -210,11 +164,12 @@ public class AppenderatorImpl implements Appenderator AppenderatorImpl( DataSchema schema, AppenderatorConfig tuningConfig, + boolean storeCompactionState, FireDepartmentMetrics metrics, DataSegmentPusher dataSegmentPusher, ObjectMapper objectMapper, DataSegmentAnnouncer segmentAnnouncer, - SinkQuerySegmentWalker sinkQuerySegmentWalker, + @Nullable SinkQuerySegmentWalker sinkQuerySegmentWalker, IndexIO indexIO, IndexMerger indexMerger, Cache cache @@ -222,6 +177,7 @@ public class AppenderatorImpl implements Appenderator { this.schema = Preconditions.checkNotNull(schema, "schema"); this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig"); + this.storeCompactionState = storeCompactionState; this.metrics = Preconditions.checkNotNull(metrics, "metrics"); this.dataSegmentPusher = Preconditions.checkNotNull(dataSegmentPusher, "dataSegmentPusher"); this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper"); @@ -243,7 +199,6 @@ public class AppenderatorImpl implements Appenderator log.info("Created Appenderator for dataSource[%s].", schema.getDataSource()); } - @Override public String getDataSource() { @@ -429,10 +384,15 @@ public class AppenderatorImpl implements Appenderator Sink retVal = sinks.get(identifier); if (retVal == null) { + final Map indexSpecMap = objectMapper.convertValue( + tuningConfig.getIndexSpec(), + new TypeReference>() {} + ); retVal = new Sink( identifier.getInterval(), schema, identifier.getShardSpec(), + storeCompactionState ? new CompactionState(tuningConfig.getPartitionsSpec(), indexSpecMap) : null, identifier.getVersion(), tuningConfig.getMaxRowsInMemory(), maxBytesTuningConfig, @@ -796,8 +756,7 @@ public class AppenderatorImpl implements Appenderator // semantics. () -> dataSegmentPusher.push( mergedFile, - sink.getSegment() - .withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)), + sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)), useUniquePath ), exception -> exception instanceof Exception, @@ -1104,6 +1063,7 @@ public class AppenderatorImpl implements Appenderator identifier.getInterval(), schema, identifier.getShardSpec(), + null, identifier.getVersion(), tuningConfig.getMaxRowsInMemory(), maxBytesTuningConfig, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java index d58772ade6a..f0ad1401379 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.realtime.appenderator; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CachePopulatorStats; @@ -32,6 +33,7 @@ import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.VersionedIntervalTimeline; import java.util.concurrent.ExecutorService; @@ -57,24 +59,34 @@ public class Appenderators return new AppenderatorImpl( schema, config, + false, metrics, dataSegmentPusher, objectMapper, - conglomerate, segmentAnnouncer, - emitter, - queryExecutorService, + new SinkQuerySegmentWalker( + schema.getDataSource(), + new VersionedIntervalTimeline<>( + String.CASE_INSENSITIVE_ORDER + ), + objectMapper, + emitter, + conglomerate, + queryExecutorService, + Preconditions.checkNotNull(cache, "cache"), + cacheConfig, + cachePopulatorStats + ), indexIO, indexMerger, - cache, - cacheConfig, - cachePopulatorStats + cache ); } public static Appenderator createOffline( DataSchema schema, AppenderatorConfig config, + boolean storeCompactionState, FireDepartmentMetrics metrics, DataSegmentPusher dataSegmentPusher, ObjectMapper objectMapper, @@ -85,10 +97,10 @@ public class Appenderators return new AppenderatorImpl( schema, config, + storeCompactionState, metrics, dataSegmentPusher, objectMapper, - null, new DataSegmentAnnouncer() { @Override @@ -116,11 +128,8 @@ public class Appenderators } }, null, - null, indexIO, indexMerger, - null, - null, null ); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java index 100bbe39d34..830f4b8c99a 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java @@ -86,6 +86,7 @@ public interface AppenderatorsManager String taskId, DataSchema schema, AppenderatorConfig config, + boolean storeCompactionState, FireDepartmentMetrics metrics, DataSegmentPusher dataSegmentPusher, ObjectMapper objectMapper, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java index be85e2c9d27..2658fd1dc3c 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java @@ -54,6 +54,15 @@ public class DefaultOfflineAppenderatorFactory implements AppenderatorFactory @Override public Appenderator build(DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics) { - return Appenderators.createOffline(schema, config, metrics, dataSegmentPusher, objectMapper, indexIO, indexMerger); + return Appenderators.createOffline( + schema, + config, + false, + metrics, + dataSegmentPusher, + objectMapper, + indexIO, + indexMerger + ); } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java index 06556524c3c..c1be5ff2e00 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java @@ -78,6 +78,7 @@ public class DummyForInjectionAppenderatorsManager implements AppenderatorsManag String taskId, DataSchema schema, AppenderatorConfig config, + boolean storeCompactionState, FireDepartmentMetrics metrics, DataSegmentPusher dataSegmentPusher, ObjectMapper objectMapper, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java index a4ba3a05b01..2a18936b86d 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java @@ -106,6 +106,7 @@ public class PeonAppenderatorsManager implements AppenderatorsManager String taskId, DataSchema schema, AppenderatorConfig config, + boolean storeCompactionState, FireDepartmentMetrics metrics, DataSegmentPusher dataSegmentPusher, ObjectMapper objectMapper, @@ -120,6 +121,7 @@ public class PeonAppenderatorsManager implements AppenderatorsManager batchAppenderator = Appenderators.createOffline( schema, config, + storeCompactionState, metrics, dataSegmentPusher, objectMapper, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java index ccbc0da03ef..5327a7537c0 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java @@ -31,6 +31,7 @@ import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.guice.annotations.Processing; +import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.UOE; @@ -164,6 +165,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager Appenderator appenderator = new AppenderatorImpl( schema, rewriteAppenderatorConfigMemoryLimits(config), + false, metrics, dataSegmentPusher, objectMapper, @@ -184,6 +186,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager String taskId, DataSchema schema, AppenderatorConfig config, + boolean storeCompactionState, FireDepartmentMetrics metrics, DataSegmentPusher dataSegmentPusher, ObjectMapper objectMapper, @@ -194,14 +197,13 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager synchronized (this) { DatasourceBundle datasourceBundle = datasourceBundles.computeIfAbsent( schema.getDataSource(), - (datasource) -> { - return new DatasourceBundle(datasource); - } + DatasourceBundle::new ); Appenderator appenderator = Appenderators.createOffline( schema, rewriteAppenderatorConfigMemoryLimits(config), + storeCompactionState, metrics, dataSegmentPusher, objectMapper, @@ -397,6 +399,12 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager return baseConfig.getMaxTotalRows(); } + @Override + public PartitionsSpec getPartitionsSpec() + { + return baseConfig.getPartitionsSpec(); + } + @Override public Period getIntermediatePersistPeriod() { @@ -471,7 +479,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager File outDir, IndexSpec indexSpec, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory - ) throws IOException + ) { ListenableFuture mergeFuture = mergeExecutor.submit( new Callable() @@ -511,7 +519,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager File outDir, IndexSpec indexSpec, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory - ) throws IOException + ) { ListenableFuture mergeFuture = mergeExecutor.submit( new Callable() @@ -550,7 +558,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager AggregatorFactory[] metricAggs, File outDir, IndexSpec indexSpec - ) throws IOException + ) { throw new UOE(ERROR_MSG); } @@ -560,7 +568,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager File inDir, File outDir, IndexSpec indexSpec - ) throws IOException + ) { throw new UOE(ERROR_MSG); } @@ -572,7 +580,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager File outDir, IndexSpec indexSpec, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory - ) throws IOException + ) { throw new UOE(ERROR_MSG); } @@ -583,7 +591,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager File outDir, IndexSpec indexSpec, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory - ) throws IOException + ) { throw new UOE(ERROR_MSG); } @@ -596,7 +604,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager IndexSpec indexSpec, ProgressIndicator progress, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory - ) throws IOException + ) { throw new UOE(ERROR_MSG); } @@ -610,7 +618,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager IndexSpec indexSpec, ProgressIndicator progress, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory - ) throws IOException + ) { throw new UOE(ERROR_MSG); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java index 98dcdfa8895..6db28ce87ce 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java @@ -717,6 +717,7 @@ public class RealtimePlumber implements Plumber sinkInterval, schema, config.getShardSpec(), + null, versioningPolicy.getVersion(sinkInterval), config.getMaxRowsInMemory(), TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()), diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java index d3218fbc024..c62533cfce8 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java @@ -36,11 +36,13 @@ import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.incremental.IndexSizeExceededException; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.realtime.FireHydrant; +import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.Overshadowable; import org.apache.druid.timeline.partition.ShardSpec; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -62,6 +64,8 @@ public class Sink implements Iterable, Overshadowable private final Interval interval; private final DataSchema schema; private final ShardSpec shardSpec; + @Nullable + private final CompactionState compactionState; private final String version; private final int maxRowsInMemory; private final long maxBytesInMemory; @@ -85,22 +89,51 @@ public class Sink implements Iterable, Overshadowable String dedupColumn ) { - this.schema = schema; - this.shardSpec = shardSpec; - this.interval = interval; - this.version = version; - this.maxRowsInMemory = maxRowsInMemory; - this.maxBytesInMemory = maxBytesInMemory; - this.reportParseExceptions = reportParseExceptions; - this.dedupColumn = dedupColumn; - - makeNewCurrIndex(interval.getStartMillis(), schema); + this( + interval, + schema, + shardSpec, + null, + version, + maxRowsInMemory, + maxBytesInMemory, + reportParseExceptions, + dedupColumn, + Collections.emptyList() + ); } public Sink( Interval interval, DataSchema schema, ShardSpec shardSpec, + @Nullable CompactionState compactionState, + String version, + int maxRowsInMemory, + long maxBytesInMemory, + boolean reportParseExceptions, + String dedupColumn + ) + { + this( + interval, + schema, + shardSpec, + compactionState, + version, + maxRowsInMemory, + maxBytesInMemory, + reportParseExceptions, + dedupColumn, + Collections.emptyList() + ); + } + + public Sink( + Interval interval, + DataSchema schema, + ShardSpec shardSpec, + @Nullable CompactionState compactionState, String version, int maxRowsInMemory, long maxBytesInMemory, @@ -111,6 +144,7 @@ public class Sink implements Iterable, Overshadowable { this.schema = schema; this.shardSpec = shardSpec; + this.compactionState = compactionState; this.interval = interval; this.version = version; this.maxRowsInMemory = maxRowsInMemory; @@ -244,6 +278,7 @@ public class Sink implements Iterable, Overshadowable Collections.emptyList(), Lists.transform(Arrays.asList(schema.getAggregators()), AggregatorFactory::getName), shardSpec, + compactionState, null, 0 ); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java index 9c42a11b073..da5cde019a3 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java @@ -43,8 +43,6 @@ public class DataSourceCompactionConfig private final String dataSource; private final int taskPriority; private final long inputSegmentSizeBytes; - @Nullable - private final Long targetCompactionSizeBytes; // The number of input segments is limited because the byte size of a serialized task spec is limited by // RemoteTaskRunnerConfig.maxZnodeBytes. @Nullable @@ -58,7 +56,6 @@ public class DataSourceCompactionConfig @JsonProperty("dataSource") String dataSource, @JsonProperty("taskPriority") @Nullable Integer taskPriority, @JsonProperty("inputSegmentSizeBytes") @Nullable Long inputSegmentSizeBytes, - @JsonProperty("targetCompactionSizeBytes") @Nullable Long targetCompactionSizeBytes, @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, @JsonProperty("skipOffsetFromLatest") @Nullable Period skipOffsetFromLatest, @JsonProperty("tuningConfig") @Nullable UserCompactTuningConfig tuningConfig, @@ -72,62 +69,12 @@ public class DataSourceCompactionConfig this.inputSegmentSizeBytes = inputSegmentSizeBytes == null ? DEFAULT_INPUT_SEGMENT_SIZE_BYTES : inputSegmentSizeBytes; - this.targetCompactionSizeBytes = getValidTargetCompactionSizeBytes( - targetCompactionSizeBytes, - maxRowsPerSegment, - tuningConfig - ); this.maxRowsPerSegment = maxRowsPerSegment; this.skipOffsetFromLatest = skipOffsetFromLatest == null ? DEFAULT_SKIP_OFFSET_FROM_LATEST : skipOffsetFromLatest; this.tuningConfig = tuningConfig; this.taskContext = taskContext; } - /** - * This method is copied from {@code CompactionTask#getValidTargetCompactionSizeBytes}. The only difference is this - * method doesn't check 'numShards' which is not supported by {@link UserCompactTuningConfig}. - * - * Currently, we can't use the same method here because it's in a different module. Until we figure out how to reuse - * the same method, this method must be synced with {@code CompactionTask#getValidTargetCompactionSizeBytes}. - */ - @Nullable - private static Long getValidTargetCompactionSizeBytes( - @Nullable Long targetCompactionSizeBytes, - @Nullable Integer maxRowsPerSegment, - @Nullable UserCompactTuningConfig tuningConfig - ) - { - if (targetCompactionSizeBytes != null) { - Preconditions.checkArgument( - !hasPartitionConfig(maxRowsPerSegment, tuningConfig), - "targetCompactionSizeBytes[%s] cannot be used with maxRowsPerSegment[%s] and maxTotalRows[%s]", - targetCompactionSizeBytes, - maxRowsPerSegment, - tuningConfig == null ? null : tuningConfig.getMaxTotalRows() - ); - return targetCompactionSizeBytes; - } else { - return hasPartitionConfig(maxRowsPerSegment, tuningConfig) ? null : DEFAULT_TARGET_COMPACTION_SIZE_BYTES; - } - } - - /** - * his method is copied from {@code CompactionTask#hasPartitionConfig}. The two differences are - * 1) this method doesn't check 'numShards' which is not supported by {@link UserCompactTuningConfig}, and - * 2) this method accepts an additional 'maxRowsPerSegment' parameter since it's not supported by - * {@link UserCompactTuningConfig}. - * - * Currently, we can't use the same method here because it's in a different module. Until we figure out how to reuse - * the same method, this method must be synced with {@code CompactionTask#hasPartitionConfig}. - */ - private static boolean hasPartitionConfig( - @Nullable Integer maxRowsPerSegment, - @Nullable UserCompactTuningConfig tuningConfig - ) - { - return maxRowsPerSegment != null || (tuningConfig != null && tuningConfig.getMaxTotalRows() != null); - } - @JsonProperty public String getDataSource() { @@ -146,13 +93,6 @@ public class DataSourceCompactionConfig return inputSegmentSizeBytes; } - @JsonProperty - @Nullable - public Long getTargetCompactionSizeBytes() - { - return targetCompactionSizeBytes; - } - @JsonProperty @Nullable public Integer getMaxRowsPerSegment() @@ -193,7 +133,6 @@ public class DataSourceCompactionConfig return taskPriority == that.taskPriority && inputSegmentSizeBytes == that.inputSegmentSizeBytes && Objects.equals(dataSource, that.dataSource) && - Objects.equals(targetCompactionSizeBytes, that.targetCompactionSizeBytes) && Objects.equals(skipOffsetFromLatest, that.skipOffsetFromLatest) && Objects.equals(tuningConfig, that.tuningConfig) && Objects.equals(taskContext, that.taskContext); @@ -206,7 +145,6 @@ public class DataSourceCompactionConfig dataSource, taskPriority, inputSegmentSizeBytes, - targetCompactionSizeBytes, skipOffsetFromLatest, tuningConfig, taskContext diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 765cb30343c..e98e8d38e9c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -167,7 +167,8 @@ public class DruidCoordinator @CoordinatorIndexingServiceHelper Set indexingServiceHelpers, BalancerStrategyFactory factory, LookupCoordinatorManager lookupCoordinatorManager, - @Coordinator DruidLeaderSelector coordLeaderSelector + @Coordinator DruidLeaderSelector coordLeaderSelector, + DruidCoordinatorSegmentCompactor segmentCompactor ) { this( @@ -188,7 +189,8 @@ public class DruidCoordinator indexingServiceHelpers, factory, lookupCoordinatorManager, - coordLeaderSelector + coordLeaderSelector, + segmentCompactor ); } @@ -210,7 +212,8 @@ public class DruidCoordinator Set indexingServiceHelpers, BalancerStrategyFactory factory, LookupCoordinatorManager lookupCoordinatorManager, - DruidLeaderSelector coordLeaderSelector + DruidLeaderSelector coordLeaderSelector, + DruidCoordinatorSegmentCompactor segmentCompactor ) { this.config = config; @@ -235,7 +238,7 @@ public class DruidCoordinator this.lookupCoordinatorManager = lookupCoordinatorManager; this.coordLeaderSelector = coordLeaderSelector; - this.segmentCompactor = new DruidCoordinatorSegmentCompactor(indexingServiceClient); + this.segmentCompactor = segmentCompactor; } public boolean isLeader() diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java index 04ae6576a51..e6a6705385b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java @@ -19,6 +19,7 @@ package org.apache.druid.server.coordinator.helper; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Iterables; import com.google.inject.Inject; import it.unimi.dsi.fastutil.objects.Object2LongMap; @@ -37,6 +38,7 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -51,16 +53,22 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper // Should be synced with CompactionTask.TYPE private static final String COMPACT_TASK_TYPE = "compact"; + // Should be synced with Tasks.STORE_COMPACTION_STATE_KEY + private static final String STORE_COMPACTION_STATE_KEY = "storeCompactionState"; private static final Logger LOG = new Logger(DruidCoordinatorSegmentCompactor.class); - private final CompactionSegmentSearchPolicy policy = new NewestSegmentFirstPolicy(); + private final CompactionSegmentSearchPolicy policy; private final IndexingServiceClient indexingServiceClient; private Object2LongMap remainingSegmentSizeBytes; @Inject - public DruidCoordinatorSegmentCompactor(IndexingServiceClient indexingServiceClient) + public DruidCoordinatorSegmentCompactor( + ObjectMapper objectMapper, + IndexingServiceClient indexingServiceClient + ) { + this.policy = new NewestSegmentFirstPolicy(objectMapper); this.indexingServiceClient = indexingServiceClient; } @@ -158,33 +166,39 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper for (; iterator.hasNext() && numSubmittedTasks < numAvailableCompactionTaskSlots; numSubmittedTasks++) { final List segmentsToCompact = iterator.next(); - final String dataSourceName = segmentsToCompact.get(0).getDataSource(); - if (segmentsToCompact.size() > 1) { + if (!segmentsToCompact.isEmpty()) { + final String dataSourceName = segmentsToCompact.get(0).getDataSource(); final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName); // make tuningConfig final String taskId = indexingServiceClient.compactSegments( segmentsToCompact, - config.getTargetCompactionSizeBytes(), config.getTaskPriority(), ClientCompactQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()), - config.getTaskContext() + newAutoCompactionContext(config.getTaskContext()) ); LOG.info( "Submitted a compactTask[%s] for segments %s", taskId, Iterables.transform(segmentsToCompact, DataSegment::getId) ); - } else if (segmentsToCompact.size() == 1) { - throw new ISE("Found one segments[%s] to compact", segmentsToCompact); } else { - throw new ISE("Failed to find segments for dataSource[%s]", dataSourceName); + throw new ISE("segmentsToCompact is empty?"); } } return makeStats(numSubmittedTasks, iterator); } + private Map newAutoCompactionContext(@Nullable Map configuredContext) + { + final Map newContext = configuredContext == null + ? new HashMap<>() + : new HashMap<>(configuredContext); + newContext.put(STORE_COMPACTION_STATE_KEY, true); + return newContext; + } + private CoordinatorStats makeStats(int numCompactionTasks, CompactionSegmentIterator iterator) { final CoordinatorStats stats = new CoordinatorStats(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java index 06a5d6a60db..1ade7403668 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java @@ -19,15 +19,20 @@ package org.apache.druid.server.coordinator.helper; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.segment.IndexSpec; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; @@ -45,6 +50,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.PriorityQueue; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -56,6 +62,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator { private static final Logger log = new Logger(NewestSegmentFirstIterator.class); + private final ObjectMapper objectMapper; private final Map compactionConfigs; private final Map> dataSources; @@ -69,11 +76,13 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator ); NewestSegmentFirstIterator( + ObjectMapper objectMapper, Map compactionConfigs, Map> dataSources, Map> skipIntervals ) { + this.objectMapper = objectMapper; this.compactionConfigs = compactionConfigs; this.dataSources = dataSources; this.timelineIterators = new HashMap<>(dataSources.size()); @@ -84,7 +93,11 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator final DataSourceCompactionConfig config = compactionConfigs.get(dataSource); if (config != null && !timeline.isEmpty()) { - final List searchIntervals = findInitialSearchInterval(timeline, config.getSkipOffsetFromLatest(), skipIntervals.get(dataSource)); + final List searchIntervals = findInitialSearchInterval( + timeline, + config.getSkipOffsetFromLatest(), + skipIntervals.get(dataSource) + ); if (!searchIntervals.isEmpty()) { timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchIntervals)); } @@ -175,14 +188,14 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator config ); - if (segmentsToCompact.getNumSegments() > 1) { + if (!segmentsToCompact.isEmpty()) { queue.add(new QueueEntry(segmentsToCompact.segments)); } } /** * Iterates the given {@link VersionedIntervalTimeline}. Only compactible {@link TimelineObjectHolder}s are returned, - * which means the holder always has at least two {@link DataSegment}s. + * which means the holder always has at least one {@link DataSegment}. */ private static class CompactibleTimelineObjectHolderCursor implements Iterator> { @@ -201,7 +214,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator .filter(holder -> { final List> chunks = Lists.newArrayList(holder.getObject().iterator()); final long partitionBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum(); - return chunks.size() > 1 + return !chunks.isEmpty() && partitionBytes > 0 && interval.contains(chunks.get(0).getObject().getInterval()); }) @@ -229,61 +242,115 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator } } + private boolean needsCompaction(DataSourceCompactionConfig config, SegmentsToCompact candidates) + { + Preconditions.checkState(!candidates.isEmpty(), "Empty candidates"); + final int maxRowsPerSegment = config.getMaxRowsPerSegment() == null + ? PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT + : config.getMaxRowsPerSegment(); + @Nullable Long maxTotalRows = config.getTuningConfig() == null + ? null + : config.getTuningConfig().getMaxTotalRows(); + maxTotalRows = maxTotalRows == null ? Long.MAX_VALUE : maxTotalRows; + + final CompactionState lastCompactionState = candidates.segments.get(0).getLastCompactionState(); + if (lastCompactionState == null) { + log.info("Candidate segment[%s] is not compacted yet. Needs compaction.", candidates.segments.get(0)); + return true; + } + + final boolean allCandidatesHaveSameLastCompactionState = candidates + .segments + .stream() + .allMatch(segment -> lastCompactionState.equals(segment.getLastCompactionState())); + + if (!allCandidatesHaveSameLastCompactionState) { + log.info("Candidates[%s] were compacted with different partitions spec. Needs compaction.", candidates.segments); + return true; + } + + final PartitionsSpec segmentPartitionsSpec = lastCompactionState.getPartitionsSpec(); + if (!(segmentPartitionsSpec instanceof DynamicPartitionsSpec)) { + log.info( + "Candidate segment[%s] was compacted with a non dynamic partitions spec. Needs compaction.", + candidates.segments.get(0) + ); + return true; + } + final DynamicPartitionsSpec dynamicPartitionsSpec = (DynamicPartitionsSpec) segmentPartitionsSpec; + final IndexSpec segmentIndexSpec = objectMapper.convertValue(lastCompactionState.getIndexSpec(), IndexSpec.class); + final IndexSpec configuredIndexSpec; + if (config.getTuningConfig() == null || config.getTuningConfig().getIndexSpec() == null) { + configuredIndexSpec = new IndexSpec(); + } else { + configuredIndexSpec = config.getTuningConfig().getIndexSpec(); + } + boolean needsCompaction = false; + if (!Objects.equals(maxRowsPerSegment, dynamicPartitionsSpec.getMaxRowsPerSegment()) + || !Objects.equals(maxTotalRows, dynamicPartitionsSpec.getMaxTotalRows())) { + log.info( + "Configured maxRowsPerSegment[%s] and maxTotalRows[%s] are differenet from " + + "the partitionsSpec[%s] of segments. Needs compaction.", + maxRowsPerSegment, + maxTotalRows, + dynamicPartitionsSpec + ); + needsCompaction = true; + } + // segmentIndexSpec cannot be null. + if (!segmentIndexSpec.equals(configuredIndexSpec)) { + log.info( + "Configured indexSpec[%s] is different from the one[%s] of segments. Needs compaction", + configuredIndexSpec, + segmentIndexSpec + ); + needsCompaction = true; + } + + return needsCompaction; + } + /** * Find segments to compact together for the given intervalToSearch. It progressively searches the given * intervalToSearch in time order (latest first). The timeline lookup duration is one day. It means, the timeline is * looked up for the last one day of the given intervalToSearch, and the next day is searched again if the size of * found segments are not enough to compact. This is repeated until enough amount of segments are found. * - * @param compactibleTimelineObjectHolderCursor timeline iterator - * @param config compaction config - * * @return segments to compact */ - private static SegmentsToCompact findSegmentsToCompact( + private SegmentsToCompact findSegmentsToCompact( final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor, final DataSourceCompactionConfig config ) { final long inputSegmentSize = config.getInputSegmentSizeBytes(); - final @Nullable Long targetCompactionSizeBytes = config.getTargetCompactionSizeBytes(); - // Finds segments to compact together while iterating timeline from latest to oldest while (compactibleTimelineObjectHolderCursor.hasNext()) { final SegmentsToCompact candidates = new SegmentsToCompact(compactibleTimelineObjectHolderCursor.next()); - final boolean isCompactibleSize = candidates.getTotalSize() <= inputSegmentSize; - final boolean needsCompaction = SegmentCompactorUtil.needsCompaction( - targetCompactionSizeBytes, - candidates.segments - ); - if (isCompactibleSize && needsCompaction) { - return candidates; + if (!candidates.isEmpty()) { + final boolean isCompactibleSize = candidates.getTotalSize() <= inputSegmentSize; + final boolean needsCompaction = needsCompaction(config, candidates); + + if (isCompactibleSize && needsCompaction) { + return candidates; + } else { + if (!isCompactibleSize) { + log.warn( + "total segment size[%d] for datasource[%s] and interval[%s] is larger than inputSegmentSize[%d]." + + " Continue to the next interval.", + candidates.getTotalSize(), + candidates.segments.get(0).getDataSource(), + candidates.segments.get(0).getInterval(), + inputSegmentSize + ); + } + } } else { - if (!isCompactibleSize) { - log.warn( - "total segment size[%d] for datasource[%s] and interval[%s] is larger than inputSegmentSize[%d]." - + " Continue to the next interval.", - candidates.getTotalSize(), - candidates.segments.get(0).getDataSource(), - candidates.segments.get(0).getInterval(), - inputSegmentSize - ); - } - if (!needsCompaction) { - log.warn( - "Size of most of segments[%s] is larger than targetCompactionSizeBytes[%s] " - + "for datasource[%s] and interval[%s]. Skipping compaction for this interval.", - candidates.segments.stream().map(DataSegment::getSize).collect(Collectors.toList()), - targetCompactionSizeBytes, - candidates.segments.get(0).getDataSource(), - candidates.segments.get(0).getInterval() - ); - } + throw new ISE("No segment is found?"); } } - - // Return an empty set if nothing is found + log.info("All segments look good! Nothing to compact"); return new SegmentsToCompact(); } @@ -458,6 +525,11 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator this.totalSize = segments.stream().mapToLong(DataSegment::getSize).sum(); } + private boolean isEmpty() + { + return segments.isEmpty(); + } + private int getNumSegments() { return segments.size(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicy.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicy.java index 8a4118221d0..f5f74a20d19 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicy.java @@ -19,6 +19,7 @@ package org.apache.druid.server.coordinator.helper; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.VersionedIntervalTimeline; @@ -32,6 +33,13 @@ import java.util.Map; */ public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy { + private final ObjectMapper objectMapper; + + public NewestSegmentFirstPolicy(ObjectMapper objectMapper) + { + this.objectMapper = objectMapper; + } + @Override public CompactionSegmentIterator reset( Map compactionConfigs, @@ -39,6 +47,6 @@ public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy Map> skipIntervals ) { - return new NewestSegmentFirstIterator(compactionConfigs, dataSources, skipIntervals); + return new NewestSegmentFirstIterator(objectMapper, compactionConfigs, dataSources, skipIntervals); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java index d68c3a0d40b..3473b35abb3 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java @@ -20,38 +20,13 @@ package org.apache.druid.server.coordinator.helper; import com.google.common.base.Preconditions; -import org.apache.druid.timeline.DataSegment; import org.joda.time.Interval; -import javax.annotation.Nullable; -import java.util.List; - /** * Util class used by {@link DruidCoordinatorSegmentCompactor} and {@link CompactionSegmentSearchPolicy}. */ class SegmentCompactorUtil { - /** - * The allowed error rate of the segment size after compaction. - * Its value is determined experimentally. - */ - private static final double ALLOWED_ERROR_OF_SEGMENT_SIZE = .2; - - static boolean needsCompaction(@Nullable Long targetCompactionSizeBytes, List candidates) - { - if (targetCompactionSizeBytes == null) { - // If targetCompactionSizeBytes is null, we have no way to check that the given segments need compaction or not. - return true; - } - final double minTargetThreshold = targetCompactionSizeBytes * (1 - ALLOWED_ERROR_OF_SEGMENT_SIZE); - final double maxTargetThreshold = targetCompactionSizeBytes * (1 + ALLOWED_ERROR_OF_SEGMENT_SIZE); - - return candidates - .stream() - .filter(segment -> segment.getSize() < minTargetThreshold || segment.getSize() > maxTargetThreshold) - .count() > 1; - } - /** * Removes {@code smallInterval} from {@code largeInterval}. The end of both intervals should be same. * diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index 2b8e2f52b81..25d1d7efbb2 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -1761,6 +1761,7 @@ public class CachingClusteredClientTest null, null, new SingleDimensionShardSpec(dimension, start, end, partitionNum), + null, 9, 0L ); diff --git a/server/src/test/java/org/apache/druid/client/ImmutableDruidDataSourceTest.java b/server/src/test/java/org/apache/druid/client/ImmutableDruidDataSourceTest.java index bf3d47f4ca2..bf8e4544e53 100644 --- a/server/src/test/java/org/apache/druid/client/ImmutableDruidDataSourceTest.java +++ b/server/src/test/java/org/apache/druid/client/ImmutableDruidDataSourceTest.java @@ -28,7 +28,7 @@ import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.test.utils.ImmutableDruidDataSourceTestUtils; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.DataSegment.PruneLoadSpecHolder; +import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -48,7 +48,7 @@ public class ImmutableDruidDataSourceTest final ImmutableDruidDataSource dataSource = getImmutableDruidDataSource(segment); final ObjectMapper objectMapper = new DefaultObjectMapper() - .setInjectableValues(new Std().addValue(PruneLoadSpecHolder.class, PruneLoadSpecHolder.DEFAULT)); + .setInjectableValues(new Std().addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT)); final String json = objectMapper.writeValueAsString(dataSource); ImmutableDruidDataSourceTestUtils.assertEquals(dataSource, objectMapper.readValue(json, @@ -84,16 +84,17 @@ public class ImmutableDruidDataSourceTest private DataSegment getTestSegment() { return new DataSegment( - "test", - Intervals.of("2017/2018"), - "version", - null, - ImmutableList.of("dim1", "dim2"), - ImmutableList.of("met1", "met2"), - null, - 1, - 100L, - PruneLoadSpecHolder.DEFAULT + "test", + Intervals.of("2017/2018"), + "version", + null, + ImmutableList.of("dim1", "dim2"), + ImmutableList.of("met1", "met2"), + null, + null, + 1, + 100L, + PruneSpecsHolder.DEFAULT ); } diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java index 794ea08ad05..172a211604c 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java +++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java @@ -47,7 +47,6 @@ public class NoopIndexingServiceClient implements IndexingServiceClient @Override public String compactSegments( List segments, - @Nullable Long targetCompactionSizeBytes, int compactionTaskPriority, @Nullable ClientCompactQueryTuningConfig tuningConfig, @Nullable Map context diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/SegmentPublishResultTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/SegmentPublishResultTest.java index 1772a9d9c9f..5d4ec61a617 100644 --- a/server/src/test/java/org/apache/druid/indexing/overlord/SegmentPublishResultTest.java +++ b/server/src/test/java/org/apache/druid/indexing/overlord/SegmentPublishResultTest.java @@ -25,7 +25,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.DataSegment.PruneLoadSpecHolder; +import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; @@ -35,7 +35,7 @@ import java.io.IOException; public class SegmentPublishResultTest { private final ObjectMapper objectMapper = new DefaultObjectMapper() - .setInjectableValues(new Std().addValue(PruneLoadSpecHolder.class, PruneLoadSpecHolder.DEFAULT)); + .setInjectableValues(new Std().addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT)); @Test public void testSerdeOkResult() throws IOException diff --git a/server/src/test/java/org/apache/druid/server/ServerTestHelper.java b/server/src/test/java/org/apache/druid/server/ServerTestHelper.java index 7713c7a7272..784b79a7665 100644 --- a/server/src/test/java/org/apache/druid/server/ServerTestHelper.java +++ b/server/src/test/java/org/apache/druid/server/ServerTestHelper.java @@ -22,7 +22,7 @@ package org.apache.druid.server; import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.jackson.DefaultObjectMapper; -import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; public class ServerTestHelper { @@ -32,7 +32,7 @@ public class ServerTestHelper MAPPER.setInjectableValues( new InjectableValues.Std() .addValue(ObjectMapper.class.getName(), MAPPER) - .addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT) + .addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT) ); } } diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestDropTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestDropTest.java index fb1cd69a144..85c8838b3c6 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestDropTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestDropTest.java @@ -65,7 +65,7 @@ public class SegmentChangeRequestDropTest JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT ); - Assert.assertEquals(11, objectMap.size()); + Assert.assertEquals(12, objectMap.size()); Assert.assertEquals("drop", objectMap.get("action")); Assert.assertEquals("something", objectMap.get("dataSource")); Assert.assertEquals(interval.toString(), objectMap.get("interval")); diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestLoadTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestLoadTest.java index d165e3480f5..5f80d836646 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestLoadTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentChangeRequestLoadTest.java @@ -64,7 +64,7 @@ public class SegmentChangeRequestLoadTest mapper.writeValueAsString(segmentDrop), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT ); - Assert.assertEquals(11, objectMap.size()); + Assert.assertEquals(12, objectMap.size()); Assert.assertEquals("load", objectMap.get("action")); Assert.assertEquals("something", objectMap.get("dataSource")); Assert.assertEquals(interval.toString(), objectMap.get("interval")); diff --git a/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java b/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java index c38f0ad3400..effc42f30bf 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java @@ -293,7 +293,7 @@ public class BatchDataSegmentAnnouncerTest } List zNodes = cf.getChildren().forPath(TEST_SEGMENTS_PATH); - Assert.assertEquals(20, zNodes.size()); + Assert.assertEquals(25, zNodes.size()); Set segments = Sets.newHashSet(testSegments); for (String zNode : zNodes) { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java index 7e9659e2060..361355226bd 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java @@ -249,7 +249,8 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase null, new CostBalancerStrategyFactory(), EasyMock.createNiceMock(LookupCoordinatorManager.class), - new TestDruidLeaderSelector() + new TestDruidLeaderSelector(), + null ); } @@ -546,7 +547,8 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase null, new CostBalancerStrategyFactory(), EasyMock.createNiceMock(LookupCoordinatorManager.class), - new TestDruidLeaderSelector() + new TestDruidLeaderSelector(), + null ); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java index 3081a0bffd0..5d5b9df3a09 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java @@ -49,7 +49,6 @@ public class DataSourceCompactionConfigTest "dataSource", null, 500L, - 100L, null, new Period(3600), null, @@ -61,7 +60,6 @@ public class DataSourceCompactionConfigTest Assert.assertEquals(config.getDataSource(), fromJson.getDataSource()); Assert.assertEquals(25, fromJson.getTaskPriority()); Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes()); - Assert.assertEquals(config.getTargetCompactionSizeBytes(), fromJson.getTargetCompactionSizeBytes()); Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment()); Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest()); Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig()); @@ -75,7 +73,6 @@ public class DataSourceCompactionConfigTest "dataSource", null, 500L, - null, 30, new Period(3600), null, @@ -87,7 +84,6 @@ public class DataSourceCompactionConfigTest Assert.assertEquals(config.getDataSource(), fromJson.getDataSource()); Assert.assertEquals(25, fromJson.getTaskPriority()); Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes()); - Assert.assertNull(fromJson.getTargetCompactionSizeBytes()); Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment()); Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest()); Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig()); @@ -113,7 +109,6 @@ public class DataSourceCompactionConfigTest null, 500L, null, - null, new Period(3600), new UserCompactTuningConfig( null, @@ -131,58 +126,12 @@ public class DataSourceCompactionConfigTest Assert.assertEquals(config.getDataSource(), fromJson.getDataSource()); Assert.assertEquals(25, fromJson.getTaskPriority()); Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes()); - Assert.assertNull(fromJson.getTargetCompactionSizeBytes()); Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment()); Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest()); Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig()); Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext()); } - @Test - public void testSerdeTargetCompactionSizeBytesWithMaxRowsPerSegment() - { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage( - "targetCompactionSizeBytes[10000] cannot be used with maxRowsPerSegment[1000] and maxTotalRows[null]" - ); - final DataSourceCompactionConfig config = new DataSourceCompactionConfig( - "dataSource", - null, - 500L, - 10000L, - 1000, - new Period(3600), - null, - ImmutableMap.of("key", "val") - ); - } - - @Test - public void testSerdeTargetCompactionSizeBytesWithMaxTotalRows() - { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage( - "targetCompactionSizeBytes[10000] cannot be used with maxRowsPerSegment[null] and maxTotalRows[10000]" - ); - final DataSourceCompactionConfig config = new DataSourceCompactionConfig( - "dataSource", - null, - 500L, - 10000L, - null, - new Period(3600), - new UserCompactTuningConfig( - null, - null, - 10000L, - null, - null, - null - ), - ImmutableMap.of("key", "val") - ); - } - @Test public void testSerdeMaxTotalRowsWithMaxRowsPerSegment() throws IOException { @@ -190,7 +139,6 @@ public class DataSourceCompactionConfigTest "dataSource", null, 500L, - null, 10000, new Period(3600), new UserCompactTuningConfig( @@ -210,7 +158,6 @@ public class DataSourceCompactionConfigTest Assert.assertEquals(config.getDataSource(), fromJson.getDataSource()); Assert.assertEquals(25, fromJson.getTaskPriority()); Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes()); - Assert.assertNull(fromJson.getTargetCompactionSizeBytes()); Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment()); Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest()); Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig()); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index ee5185b68a8..06419789b4a 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -214,7 +214,8 @@ public class DruidCoordinatorTest extends CuratorTestBase null, new CostBalancerStrategyFactory(), EasyMock.createNiceMock(LookupCoordinatorManager.class), - new TestDruidLeaderSelector() + new TestDruidLeaderSelector(), + null ); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java index 36e6a695745..1398a39aa91 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java @@ -28,6 +28,8 @@ import org.apache.druid.client.indexing.ClientCompactQueryTuningConfig; import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.client.indexing.NoopIndexingServiceClient; import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; @@ -35,6 +37,7 @@ import org.apache.druid.server.coordinator.CoordinatorRuntimeParamsTestHelpers; import org.apache.druid.server.coordinator.CoordinatorStats; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; @@ -47,7 +50,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -66,7 +68,6 @@ public class DruidCoordinatorSegmentCompactorTest @Override public String compactSegments( List segments, - @Nullable Long targetCompactionSizeBytes, int compactionTaskPriority, ClientCompactQueryTuningConfig tuningConfig, Map context @@ -97,6 +98,22 @@ public class DruidCoordinatorSegmentCompactorTest segments.get(0).getDimensions(), segments.get(0).getMetrics(), new NumberedShardSpec(i, 0), + new CompactionState( + new DynamicPartitionsSpec( + tuningConfig.getMaxRowsPerSegment(), + tuningConfig.getMaxTotalRowsOr(Long.MAX_VALUE) + ), + ImmutableMap.of( + "bitmap", + ImmutableMap.of("type", "concise"), + "dimensionCompression", + "lz4", + "metricCompression", + "lz4", + "longEncoding", + "longs" + ) + ), 1, segmentSize ); @@ -178,7 +195,10 @@ public class DruidCoordinatorSegmentCompactorTest @Test public void testRun() { - final DruidCoordinatorSegmentCompactor compactor = new DruidCoordinatorSegmentCompactor(indexingServiceClient); + final DruidCoordinatorSegmentCompactor compactor = new DruidCoordinatorSegmentCompactor( + new DefaultObjectMapper(), + indexingServiceClient + ); final Supplier expectedVersionSupplier = new Supplier() { @@ -375,7 +395,6 @@ public class DruidCoordinatorSegmentCompactorTest dataSource, 0, 50L, - 20L, null, new Period("PT1H"), // smaller than segment interval null, diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java index 5e3e8b96e7d..ce681f31870 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java @@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator.helper; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.guava.Comparators; @@ -49,7 +50,7 @@ public class NewestSegmentFirstPolicyTest private static final long DEFAULT_SEGMENT_SIZE = 1000; private static final int DEFAULT_NUM_SEGMENTS_PER_SHARD = 4; - private final NewestSegmentFirstPolicy policy = new NewestSegmentFirstPolicy(); + private final NewestSegmentFirstPolicy policy = new NewestSegmentFirstPolicy(new DefaultObjectMapper()); @Test public void testLargeOffsetAndSmallSegmentInterval() @@ -280,73 +281,60 @@ public class NewestSegmentFirstPolicyTest ); } - @Test - public void testIgnoreSingleSegmentToCompact() - { - final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, new Period("P1D"))), - ImmutableMap.of( - DATA_SOURCE, - createTimeline( - new SegmentGenerateSpec( - Intervals.of("2017-12-02T00:00:00/2017-12-03T00:00:00"), - new Period("P1D"), - 200, - 1 - ), - new SegmentGenerateSpec( - Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"), - new Period("P1D"), - 200, - 1 - ) - ) - ), - Collections.emptyMap() - ); - - Assert.assertFalse(iterator.hasNext()); - } - @Test public void testClearSegmentsToCompactWhenSkippingSegments() { - final long maxSizeOfSegmentsToCompact = 800000; + final long inputSegmentSizeBytes = 800000; final VersionedIntervalTimeline timeline = createTimeline( new SegmentGenerateSpec( Intervals.of("2017-12-03T00:00:00/2017-12-04T00:00:00"), new Period("P1D"), - maxSizeOfSegmentsToCompact / 2 + 10, + inputSegmentSizeBytes / 2 + 10, 1 ), new SegmentGenerateSpec( Intervals.of("2017-12-02T00:00:00/2017-12-03T00:00:00"), new Period("P1D"), - maxSizeOfSegmentsToCompact + 10, // large segment + inputSegmentSizeBytes + 10, // large segment 1 ), new SegmentGenerateSpec( Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"), new Period("P1D"), - maxSizeOfSegmentsToCompact / 3 + 10, + inputSegmentSizeBytes / 3 + 10, 2 ) ); final CompactionSegmentIterator iterator = policy.reset( - ImmutableMap.of(DATA_SOURCE, createCompactionConfig(maxSizeOfSegmentsToCompact, new Period("P0D"))), + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(inputSegmentSizeBytes, new Period("P0D"))), ImmutableMap.of(DATA_SOURCE, timeline), Collections.emptyMap() ); - final List expectedSegmentsToCompact = timeline - .lookup(Intervals.of("2017-12-01/2017-12-02")) - .stream() - .flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false)) - .map(PartitionChunk::getObject) - .collect(Collectors.toList()); - + final List expectedSegmentsToCompact = new ArrayList<>(); + expectedSegmentsToCompact.addAll( + timeline + .lookup(Intervals.of("2017-12-03/2017-12-04")) + .stream() + .flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false)) + .map(PartitionChunk::getObject) + .collect(Collectors.toList()) + ); Assert.assertTrue(iterator.hasNext()); Assert.assertEquals(expectedSegmentsToCompact, iterator.next()); + + expectedSegmentsToCompact.clear(); + expectedSegmentsToCompact.addAll( + timeline + .lookup(Intervals.of("2017-12-01/2017-12-02")) + .stream() + .flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false)) + .map(PartitionChunk::getObject) + .collect(Collectors.toList()) + ); + Assert.assertTrue(iterator.hasNext()); + Assert.assertEquals(expectedSegmentsToCompact, iterator.next()); + Assert.assertFalse(iterator.hasNext()); } @@ -569,15 +557,14 @@ public class NewestSegmentFirstPolicyTest } private DataSourceCompactionConfig createCompactionConfig( - long targetCompactionSizeBytes, + long inputSegmentSizeBytes, Period skipOffsetFromLatest ) { return new DataSourceCompactionConfig( DATA_SOURCE, 0, - targetCompactionSizeBytes, - targetCompactionSizeBytes, + inputSegmentSizeBytes, null, skipOffsetFromLatest, null, diff --git a/server/src/test/java/org/apache/druid/server/http/ServersResourceTest.java b/server/src/test/java/org/apache/druid/server/http/ServersResourceTest.java index f96d2a3e74c..8f94ac4c7a3 100644 --- a/server/src/test/java/org/apache/druid/server/http/ServersResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/ServersResourceTest.java @@ -73,7 +73,7 @@ public class ServersResourceTest + "\"priority\":0," + "\"segments\":{\"dataSource_2016-03-22T14:00:00.000Z_2016-03-22T15:00:00.000Z_v0\":" + "{\"dataSource\":\"dataSource\",\"interval\":\"2016-03-22T14:00:00.000Z/2016-03-22T15:00:00.000Z\",\"version\":\"v0\",\"loadSpec\":{},\"dimensions\":\"\",\"metrics\":\"\"," - + "\"shardSpec\":{\"type\":\"numbered\",\"partitionNum\":0,\"partitions\":1},\"binaryVersion\":null,\"size\":1,\"identifier\":\"dataSource_2016-03-22T14:00:00.000Z_2016-03-22T15:00:00.000Z_v0\"}}," + + "\"shardSpec\":{\"type\":\"numbered\",\"partitionNum\":0,\"partitions\":1},\"lastCompactionState\":null,\"binaryVersion\":null,\"size\":1,\"identifier\":\"dataSource_2016-03-22T14:00:00.000Z_2016-03-22T15:00:00.000Z_v0\"}}," + "\"currSize\":1}]"; Assert.assertEquals(expected, result); } @@ -99,7 +99,7 @@ public class ServersResourceTest + "\"priority\":0," + "\"segments\":{\"dataSource_2016-03-22T14:00:00.000Z_2016-03-22T15:00:00.000Z_v0\":" + "{\"dataSource\":\"dataSource\",\"interval\":\"2016-03-22T14:00:00.000Z/2016-03-22T15:00:00.000Z\",\"version\":\"v0\",\"loadSpec\":{},\"dimensions\":\"\",\"metrics\":\"\"," - + "\"shardSpec\":{\"type\":\"numbered\",\"partitionNum\":0,\"partitions\":1},\"binaryVersion\":null,\"size\":1,\"identifier\":\"dataSource_2016-03-22T14:00:00.000Z_2016-03-22T15:00:00.000Z_v0\"}}," + + "\"shardSpec\":{\"type\":\"numbered\",\"partitionNum\":0,\"partitions\":1},\"lastCompactionState\":null,\"binaryVersion\":null,\"size\":1,\"identifier\":\"dataSource_2016-03-22T14:00:00.000Z_2016-03-22T15:00:00.000Z_v0\"}}," + "\"currSize\":1}"; Assert.assertEquals(expected, result); } diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java index cf1ea3ca1bf..b585c71756c 100644 --- a/services/src/main/java/org/apache/druid/cli/CliBroker.java +++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java @@ -54,6 +54,7 @@ import org.apache.druid.server.initialization.jetty.JettyServerInitializer; import org.apache.druid.server.metrics.QueryCountStatsProvider; import org.apache.druid.server.router.TieredBrokerConfig; import org.apache.druid.sql.guice.SqlModule; +import org.apache.druid.timeline.PruneLastCompactionState; import org.apache.druid.timeline.PruneLoadSpec; import org.eclipse.jetty.server.Server; @@ -88,6 +89,7 @@ public class CliBroker extends ServerRunnable binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8082); binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(8282); binder.bindConstant().annotatedWith(PruneLoadSpec.class).to(true); + binder.bindConstant().annotatedWith(PruneLastCompactionState.class).to(true); binder.bind(CachingClusteredClient.class).in(LazySingleton.class); LifecycleModule.register(binder, BrokerServerView.class); diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 118b911586b..a313e6bc586 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -146,8 +146,7 @@ public class CliCoordinator extends ServerRunnable ConfigProvider.bind(binder, DruidCoordinatorConfig.class); - binder.bind(MetadataStorage.class) - .toProvider(MetadataStorageProvider.class); + binder.bind(MetadataStorage.class).toProvider(MetadataStorageProvider.class); JsonConfigProvider.bind(binder, "druid.manager.segments", MetadataSegmentManagerConfig.class); JsonConfigProvider.bind(binder, "druid.manager.rules", MetadataRuleManagerConfig.class); diff --git a/services/src/main/java/org/apache/druid/cli/CliHistorical.java b/services/src/main/java/org/apache/druid/cli/CliHistorical.java index cf81439d53f..ba55ab7be21 100644 --- a/services/src/main/java/org/apache/druid/cli/CliHistorical.java +++ b/services/src/main/java/org/apache/druid/cli/CliHistorical.java @@ -49,6 +49,7 @@ import org.apache.druid.server.http.HistoricalResource; import org.apache.druid.server.http.SegmentListerResource; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; import org.apache.druid.server.metrics.QueryCountStatsProvider; +import org.apache.druid.timeline.PruneLastCompactionState; import org.eclipse.jetty.server.Server; import java.util.List; @@ -79,6 +80,7 @@ public class CliHistorical extends ServerRunnable binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/historical"); binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8083); binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(8283); + binder.bindConstant().annotatedWith(PruneLastCompactionState.class).to(true); // register Server before binding ZkCoordinator to ensure HTTP endpoints are available immediately LifecycleModule.register(binder, Server.class); diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java index d3e30968f28..2ee7e314e3b 100644 --- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java @@ -64,6 +64,7 @@ import org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppendera import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.server.DruidNode; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; +import org.apache.druid.timeline.PruneLastCompactionState; import org.eclipse.jetty.server.Server; import java.util.List; @@ -96,6 +97,7 @@ public class CliMiddleManager extends ServerRunnable binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/middlemanager"); binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8091); binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(8291); + binder.bindConstant().annotatedWith(PruneLastCompactionState.class).to(true); IndexingServiceModuleHelper.configureTaskRunnerConfigs(binder); diff --git a/services/src/main/java/org/apache/druid/cli/ExportMetadata.java b/services/src/main/java/org/apache/druid/cli/ExportMetadata.java index 56457c300fa..c3202959bea 100644 --- a/services/src/main/java/org/apache/druid/cli/ExportMetadata.java +++ b/services/src/main/java/org/apache/druid/cli/ExportMetadata.java @@ -43,6 +43,7 @@ import org.apache.druid.metadata.SQLMetadataConnector; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.server.DruidNode; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; import javax.annotation.Nullable; import javax.xml.bind.DatatypeConverter; @@ -190,7 +191,7 @@ public class ExportMetadata extends GuiceRunnable { InjectableValues.Std injectableValues = new InjectableValues.Std(); injectableValues.addValue(ObjectMapper.class, JSON_MAPPER); - injectableValues.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT); + injectableValues.addValue(PruneSpecsHolder.class, PruneSpecsHolder.DEFAULT); JSON_MAPPER.setInjectableValues(injectableValues); if (hadoopStorageDirectory != null && newLocalPath != null) { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java index 1b2573e8583..133cce05820 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java @@ -53,6 +53,7 @@ import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.sql.calcite.util.TestServerInventoryView; import org.apache.druid.sql.calcite.view.NoopViewManager; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.LinearShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; @@ -178,9 +179,10 @@ public class DruidSchemaTest extends CalciteTestBase ImmutableList.of("dim1", "dim2"), ImmutableList.of("met1", "met2"), new NumberedShardSpec(2, 3), + null, 1, 100L, - DataSegment.PruneLoadSpecHolder.DEFAULT + PruneSpecsHolder.DEFAULT ); final List realtimeSegments = ImmutableList.of(segment1); final TimelineServerView serverView = new TestServerInventoryView(walker.getSegments(), realtimeSegments); diff --git a/web-console/src/dialogs/compaction-dialog/__snapshots__/compaction-dialog.spec.tsx.snap b/web-console/src/dialogs/compaction-dialog/__snapshots__/compaction-dialog.spec.tsx.snap index dd96dc149d2..aba536c9cda 100644 --- a/web-console/src/dialogs/compaction-dialog/__snapshots__/compaction-dialog.spec.tsx.snap +++ b/web-console/src/dialogs/compaction-dialog/__snapshots__/compaction-dialog.spec.tsx.snap @@ -228,7 +228,7 @@ exports[`compaction dialog matches snapshot 1`] = `

{ - static DEFAULT_TARGET_COMPACTION_SIZE_BYTES = 419430400; + static DEFAULT_MAX_ROWS_PER_SEGMENT = 5000000; constructor(props: CompactionDialogProps) { super(props); @@ -108,9 +108,9 @@ export class CompactionDialog extends React.PureComponent< ), }, { - name: 'targetCompactionSizeBytes', + name: 'maxRowsPerSegment', type: 'number', - defaultValue: CompactionDialog.DEFAULT_TARGET_COMPACTION_SIZE_BYTES, + defaultValue: CompactionDialog.DEFAULT_MAX_ROWS_PER_SEGMENT, info: (

The target segment size, for each segment, after compaction. The actual sizes of diff --git a/web-console/src/views/datasource-view/datasource-view.tsx b/web-console/src/views/datasource-view/datasource-view.tsx index 7f03f1ce91c..92d86484b2b 100644 --- a/web-console/src/views/datasource-view/datasource-view.tsx +++ b/web-console/src/views/datasource-view/datasource-view.tsx @@ -874,12 +874,12 @@ GROUP BY 1`; const { compaction } = row.original; let text: string; if (compaction) { - if (compaction.targetCompactionSizeBytes == null) { - text = `Target: Default (${formatBytes( - CompactionDialog.DEFAULT_TARGET_COMPACTION_SIZE_BYTES, + if (compaction.maxRowsPerSegment == null) { + text = `Target: Default (${formatNumber( + CompactionDialog.DEFAULT_MAX_ROWS_PER_SEGMENT, )})`; } else { - text = `Target: ${formatBytes(compaction.targetCompactionSizeBytes)}`; + text = `Target: ${formatNumber(compaction.maxRowsPerSegment)}`; } } else { text = 'None';