mirror of https://github.com/apache/druid.git
Skip ALL granularity compaction (#13304)
* Skip autocompaction for datasources with ETERNITY segments
This commit is contained in:
parent
650840ddaf
commit
47c32a9d92
|
@ -174,6 +174,8 @@ The following auto-compaction configuration compacts existing `HOUR` segments in
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
> Auto-compaction skips datasources containing ALL granularity segments when the target granularity is different.
|
||||||
|
|
||||||
### Update partitioning scheme
|
### Update partitioning scheme
|
||||||
|
|
||||||
For your `wikipedia` datasource, you want to optimize segment access when regularly ingesting data without compromising compute time when querying the data. Your ingestion spec for batch append uses [dynamic partitioning](../ingestion/native-batch.md#dynamic-partitioning) to optimize for write-time operations, while your stream ingestion partitioning is configured by the stream service. You want to implement auto-compaction to reorganize the data with a suitable read-time partitioning using [multi-dimension range partitioning](../ingestion/native-batch.md#multi-dimension-range-partitioning). Based on the dimensions frequently accessed in queries, you wish to partition on the following dimensions: `channel`, `countryName`, `namespace`.
|
For your `wikipedia` datasource, you want to optimize segment access when regularly ingesting data without compromising compute time when querying the data. Your ingestion spec for batch append uses [dynamic partitioning](../ingestion/native-batch.md#dynamic-partitioning) to optimize for write-time operations, while your stream ingestion partitioning is configured by the stream service. You want to implement auto-compaction to reorganize the data with a suitable read-time partitioning using [multi-dimension range partitioning](../ingestion/native-batch.md#multi-dimension-range-partitioning). Based on the dimensions frequently accessed in queries, you wish to partition on the following dimensions: `channel`, `countryName`, `namespace`.
|
||||||
|
|
|
@ -129,6 +129,11 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
||||||
// For example, if the original is interval of 2020-01-28/2020-02-03 with WEEK granularity
|
// For example, if the original is interval of 2020-01-28/2020-02-03 with WEEK granularity
|
||||||
// and the configuredSegmentGranularity is MONTH, the segment will be split to two segments
|
// and the configuredSegmentGranularity is MONTH, the segment will be split to two segments
|
||||||
// of 2020-01/2020-02 and 2020-02/2020-03.
|
// of 2020-01/2020-02 and 2020-02/2020-03.
|
||||||
|
if (Intervals.ETERNITY.equals(segment.getInterval())) {
|
||||||
|
// This is to prevent the coordinator from crashing as raised in https://github.com/apache/druid/issues/13208
|
||||||
|
log.warn("Cannot compact datasource[%s] with ALL granularity", dataSource);
|
||||||
|
return;
|
||||||
|
}
|
||||||
for (Interval interval : configuredSegmentGranularity.getIterable(segment.getInterval())) {
|
for (Interval interval : configuredSegmentGranularity.getIterable(segment.getInterval())) {
|
||||||
intervalToPartitionMap.computeIfAbsent(interval, k -> new HashSet<>()).add(segment);
|
intervalToPartitionMap.computeIfAbsent(interval, k -> new HashSet<>()).add(segment);
|
||||||
}
|
}
|
||||||
|
@ -627,6 +632,10 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
||||||
final List<Interval> searchIntervals = new ArrayList<>();
|
final List<Interval> searchIntervals = new ArrayList<>();
|
||||||
|
|
||||||
for (Interval lookupInterval : filteredInterval) {
|
for (Interval lookupInterval : filteredInterval) {
|
||||||
|
if (Intervals.ETERNITY.equals(lookupInterval)) {
|
||||||
|
log.warn("Cannot compact datasource[%s] since interval is ETERNITY.", dataSourceName);
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
final List<DataSegment> segments = timeline
|
final List<DataSegment> segments = timeline
|
||||||
.findNonOvershadowedObjectsInInterval(lookupInterval, Partitions.ONLY_COMPLETE)
|
.findNonOvershadowedObjectsInInterval(lookupInterval, Partitions.ONLY_COMPLETE)
|
||||||
.stream()
|
.stream()
|
||||||
|
|
|
@ -69,6 +69,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TimeZone;
|
import java.util.TimeZone;
|
||||||
|
@ -1584,6 +1585,102 @@ public class NewestSegmentFirstPolicyTest
|
||||||
Assert.assertFalse(iterator.hasNext());
|
Assert.assertFalse(iterator.hasNext());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSkipAllGranularityToDefault()
|
||||||
|
{
|
||||||
|
CompactionSegmentIterator iterator = policy.reset(
|
||||||
|
ImmutableMap.of(DATA_SOURCE,
|
||||||
|
createCompactionConfig(10000,
|
||||||
|
new Period("P0D"),
|
||||||
|
null
|
||||||
|
)
|
||||||
|
),
|
||||||
|
ImmutableMap.of(
|
||||||
|
DATA_SOURCE,
|
||||||
|
SegmentTimeline.forSegments(ImmutableSet.of(
|
||||||
|
new DataSegment(
|
||||||
|
DATA_SOURCE,
|
||||||
|
Intervals.ETERNITY,
|
||||||
|
"0",
|
||||||
|
new HashMap<>(),
|
||||||
|
new ArrayList<>(),
|
||||||
|
new ArrayList<>(),
|
||||||
|
new NumberedShardSpec(0, 0),
|
||||||
|
0,
|
||||||
|
100)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
Collections.emptyMap()
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertFalse(iterator.hasNext());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSkipAllToAllGranularity()
|
||||||
|
{
|
||||||
|
CompactionSegmentIterator iterator = policy.reset(
|
||||||
|
ImmutableMap.of(DATA_SOURCE,
|
||||||
|
createCompactionConfig(10000,
|
||||||
|
new Period("P0D"),
|
||||||
|
new UserCompactionTaskGranularityConfig(Granularities.ALL, null, null)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
ImmutableMap.of(
|
||||||
|
DATA_SOURCE,
|
||||||
|
SegmentTimeline.forSegments(ImmutableSet.of(
|
||||||
|
new DataSegment(
|
||||||
|
DATA_SOURCE,
|
||||||
|
Intervals.ETERNITY,
|
||||||
|
"0",
|
||||||
|
new HashMap<>(),
|
||||||
|
new ArrayList<>(),
|
||||||
|
new ArrayList<>(),
|
||||||
|
new NumberedShardSpec(0, 0),
|
||||||
|
0,
|
||||||
|
100)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
Collections.emptyMap()
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertFalse(iterator.hasNext());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSkipAllToFinerGranularity()
|
||||||
|
{
|
||||||
|
CompactionSegmentIterator iterator = policy.reset(
|
||||||
|
ImmutableMap.of(DATA_SOURCE,
|
||||||
|
createCompactionConfig(10000,
|
||||||
|
new Period("P0D"),
|
||||||
|
new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
ImmutableMap.of(
|
||||||
|
DATA_SOURCE,
|
||||||
|
SegmentTimeline.forSegments(ImmutableSet.of(
|
||||||
|
new DataSegment(
|
||||||
|
DATA_SOURCE,
|
||||||
|
Intervals.ETERNITY,
|
||||||
|
"0",
|
||||||
|
new HashMap<>(),
|
||||||
|
new ArrayList<>(),
|
||||||
|
new ArrayList<>(),
|
||||||
|
new NumberedShardSpec(0, 0),
|
||||||
|
0,
|
||||||
|
100)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
Collections.emptyMap()
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertFalse(iterator.hasNext());
|
||||||
|
}
|
||||||
|
|
||||||
private static void assertCompactSegmentIntervals(
|
private static void assertCompactSegmentIntervals(
|
||||||
CompactionSegmentIterator iterator,
|
CompactionSegmentIterator iterator,
|
||||||
Period segmentPeriod,
|
Period segmentPeriod,
|
||||||
|
|
Loading…
Reference in New Issue