mirror of https://github.com/apache/druid.git
Support overlapping segment intervals in auto compaction (#12062)
* add impl * add impl * fix more bugs * add tests * fix checkstyle * address comments * address comments * fix test
This commit is contained in:
parent
fe71fc414f
commit
b53e7f4d12
|
@ -25,6 +25,7 @@ import com.google.inject.Inject;
|
|||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
|
||||
import org.apache.druid.data.input.impl.DimensionsSpec;
|
||||
import org.apache.druid.indexer.TaskState;
|
||||
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
|
||||
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
|
||||
import org.apache.druid.indexer.partitions.PartitionsSpec;
|
||||
|
@ -775,6 +776,59 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoCompactionDutyWithOverlappingInterval() throws Exception
|
||||
{
|
||||
final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles"));
|
||||
Map<String, Object> specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.WEEK, Granularities.NONE, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono))));
|
||||
// Create WEEK segment with 2013-08-26 to 2013-09-02
|
||||
loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs);
|
||||
specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.MONTH, Granularities.NONE, false, ImmutableList.of(new Interval("2013-09-01/2013-09-02", chrono))));
|
||||
// Create MONTH segment with 2013-09-01 to 2013-10-01
|
||||
loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs);
|
||||
|
||||
try (final Closeable ignored = unloader(fullDatasourceName)) {
|
||||
verifySegmentsCount(2);
|
||||
|
||||
// Result is not rollup
|
||||
// For dim "page", result has values "Gypsy Danger" and "Striker Eureka"
|
||||
Map<String, Object> expectedResult = ImmutableMap.of(
|
||||
"%%EXPECTED_COUNT_RESULT%%",
|
||||
2,
|
||||
"%%EXPECTED_SCAN_RESULT%%",
|
||||
ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
|
||||
);
|
||||
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
|
||||
|
||||
submitCompactionConfig(
|
||||
MAX_ROWS_PER_SEGMENT_COMPACTED,
|
||||
NO_SKIP_OFFSET,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
false
|
||||
);
|
||||
// Compact the MONTH segment
|
||||
forceTriggerAutoCompaction(2);
|
||||
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
|
||||
|
||||
// Compact the WEEK segment
|
||||
forceTriggerAutoCompaction(2);
|
||||
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
|
||||
|
||||
// Verify all task succeed
|
||||
List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
|
||||
for (TaskResponseObject taskResponseObject : compactTasksBefore) {
|
||||
Assert.assertEquals(TaskState.SUCCESS, taskResponseObject.getStatus());
|
||||
}
|
||||
|
||||
// Verify compacted segments does not get compacted again
|
||||
forceTriggerAutoCompaction(2);
|
||||
List<TaskResponseObject> compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName);
|
||||
Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
|
||||
}
|
||||
}
|
||||
|
||||
private void loadData(String indexTask) throws Exception
|
||||
{
|
||||
loadData(indexTask, ImmutableMap.of());
|
||||
|
|
|
@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.apache.druid.java.util.common.IAE;
|
||||
import org.apache.druid.java.util.common.JodaUtils;
|
||||
import org.apache.druid.segment.SegmentUtils;
|
||||
import org.apache.druid.timeline.DataSegment;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
|
@ -51,7 +50,7 @@ public class ClientCompactionIntervalSpec
|
|||
JodaUtils.umbrellaInterval(
|
||||
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
|
||||
),
|
||||
SegmentUtils.hashIds(segments)
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -31,8 +31,10 @@ import org.apache.druid.client.indexing.IndexingServiceClient;
|
|||
import org.apache.druid.client.indexing.TaskPayloadResponse;
|
||||
import org.apache.druid.indexer.TaskStatusPlus;
|
||||
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
|
||||
import org.apache.druid.java.util.common.IAE;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||
import org.apache.druid.java.util.common.granularity.GranularityType;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
|
||||
import org.apache.druid.server.coordinator.CompactionStatistics;
|
||||
|
@ -343,18 +345,37 @@ public class CompactSegments implements CoordinatorDuty
|
|||
snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size());
|
||||
|
||||
final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName);
|
||||
|
||||
// Create granularitySpec to send to compaction task
|
||||
ClientCompactionTaskGranularitySpec granularitySpec;
|
||||
if (config.getGranularitySpec() != null) {
|
||||
granularitySpec = new ClientCompactionTaskGranularitySpec(
|
||||
config.getGranularitySpec().getSegmentGranularity(),
|
||||
config.getGranularitySpec().getQueryGranularity(),
|
||||
config.getGranularitySpec().isRollup()
|
||||
|
||||
);
|
||||
Granularity segmentGranularityToUse = null;
|
||||
if (config.getGranularitySpec() == null || config.getGranularitySpec().getSegmentGranularity() == null) {
|
||||
// Determines segmentGranularity from the segmentsToCompact
|
||||
// Each batch of segmentToCompact from CompactionSegmentIterator will contains the same interval as
|
||||
// segmentGranularity is not set in the compaction config
|
||||
Interval interval = segmentsToCompact.get(0).getInterval();
|
||||
if (segmentsToCompact.stream().allMatch(segment -> interval.overlaps(segment.getInterval()))) {
|
||||
try {
|
||||
segmentGranularityToUse = GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity();
|
||||
}
|
||||
catch (IAE iae) {
|
||||
// This case can happen if the existing segment interval result in complicated periods.
|
||||
// Fall back to setting segmentGranularity as null
|
||||
LOG.warn("Cannot determine segmentGranularity from interval [%s]", interval);
|
||||
}
|
||||
} else {
|
||||
LOG.warn("segmentsToCompact does not have the same interval. Fallback to not setting segmentGranularity for auto compaction task");
|
||||
}
|
||||
} else {
|
||||
granularitySpec = null;
|
||||
segmentGranularityToUse = config.getGranularitySpec().getSegmentGranularity();
|
||||
}
|
||||
granularitySpec = new ClientCompactionTaskGranularitySpec(
|
||||
segmentGranularityToUse,
|
||||
config.getGranularitySpec() != null ? config.getGranularitySpec().getQueryGranularity() : null,
|
||||
config.getGranularitySpec() != null ? config.getGranularitySpec().isRollup() : null
|
||||
|
||||
);
|
||||
|
||||
// Create dimensionsSpec to send to compaction task
|
||||
ClientCompactionTaskDimensionsSpec dimensionsSpec;
|
||||
if (config.getDimensionsSpec() != null) {
|
||||
|
|
|
@ -24,8 +24,13 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.druid.discovery.DruidLeaderClient;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.jackson.JacksonUtils;
|
||||
import org.apache.druid.java.util.http.client.Request;
|
||||
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
|
||||
import org.apache.druid.timeline.DataSegment;
|
||||
import org.apache.druid.timeline.partition.NoneShardSpec;
|
||||
import org.easymock.Capture;
|
||||
import org.easymock.EasyMock;
|
||||
import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
@ -48,6 +53,7 @@ public class HttpIndexingServiceClientTest
|
|||
private HttpIndexingServiceClient httpIndexingServiceClient;
|
||||
private ObjectMapper jsonMapper;
|
||||
private DruidLeaderClient druidLeaderClient;
|
||||
private ObjectMapper mockMapper;
|
||||
|
||||
@Rule
|
||||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
@ -57,6 +63,8 @@ public class HttpIndexingServiceClientTest
|
|||
{
|
||||
jsonMapper = new DefaultObjectMapper();
|
||||
druidLeaderClient = EasyMock.createMock(DruidLeaderClient.class);
|
||||
mockMapper = EasyMock.createMock(ObjectMapper.class);
|
||||
|
||||
httpIndexingServiceClient = new HttpIndexingServiceClient(
|
||||
jsonMapper,
|
||||
druidLeaderClient
|
||||
|
@ -268,5 +276,70 @@ public class HttpIndexingServiceClientTest
|
|||
EasyMock.verify(druidLeaderClient, response);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompact() throws Exception
|
||||
{
|
||||
DataSegment segment = new DataSegment(
|
||||
"test",
|
||||
Intervals.of("2015-04-12/2015-04-13"),
|
||||
"1",
|
||||
ImmutableMap.of("bucket", "bucket", "path", "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip"),
|
||||
null,
|
||||
null,
|
||||
NoneShardSpec.instance(),
|
||||
0,
|
||||
1
|
||||
);
|
||||
Capture captureTask = EasyMock.newCapture();
|
||||
HttpResponse response = EasyMock.createMock(HttpResponse.class);
|
||||
EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
|
||||
EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0));
|
||||
EasyMock.replay(response);
|
||||
|
||||
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
|
||||
response,
|
||||
StandardCharsets.UTF_8
|
||||
).addChunk(jsonMapper.writeValueAsString(ImmutableMap.of("task", "aaa")));
|
||||
|
||||
EasyMock.expect(druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/task"))
|
||||
.andReturn(new Request(HttpMethod.POST, new URL("http://localhost:8090/druid/indexer/v1/task")))
|
||||
.anyTimes();
|
||||
EasyMock.expect(druidLeaderClient.go(EasyMock.anyObject(Request.class)))
|
||||
.andReturn(responseHolder)
|
||||
.anyTimes();
|
||||
EasyMock.expect(mockMapper.writeValueAsBytes(EasyMock.capture(captureTask)))
|
||||
.andReturn(new byte[]{1, 2, 3})
|
||||
.anyTimes();
|
||||
EasyMock.expect(mockMapper.readValue(EasyMock.anyString(), EasyMock.eq(JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT)))
|
||||
.andReturn(ImmutableMap.of())
|
||||
.anyTimes();
|
||||
EasyMock.replay(druidLeaderClient, mockMapper);
|
||||
|
||||
HttpIndexingServiceClient httpIndexingServiceClient = new HttpIndexingServiceClient(
|
||||
mockMapper,
|
||||
druidLeaderClient
|
||||
);
|
||||
|
||||
try {
|
||||
httpIndexingServiceClient.compactSegments(
|
||||
"test-compact",
|
||||
ImmutableList.of(segment),
|
||||
50,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
// Ignore IllegalStateException as taskId is internally generated and returned task id will failed check
|
||||
Assert.assertEquals(IllegalStateException.class.getName(), e.getCause().getClass().getName());
|
||||
}
|
||||
|
||||
ClientCompactionTaskQuery taskQuery = (ClientCompactionTaskQuery) captureTask.getValue();
|
||||
Assert.assertNull(taskQuery.getIoConfig().getInputSpec().getSha256OfSortedSegmentIds());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -702,7 +702,9 @@ public class CompactSegmentsTest
|
|||
);
|
||||
// Only the same amount of segments as the original PARTITION_PER_TIME_INTERVAL since segment granulartity is the same
|
||||
Assert.assertEquals(PARTITION_PER_TIME_INTERVAL, segmentsCaptor.getValue().size());
|
||||
Assert.assertNull(granularitySpecArgumentCaptor.getValue());
|
||||
Assert.assertNull(granularitySpecArgumentCaptor.getValue().getSegmentGranularity());
|
||||
Assert.assertNull(granularitySpecArgumentCaptor.getValue().getQueryGranularity());
|
||||
Assert.assertNull(granularitySpecArgumentCaptor.getValue().isRollup());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1390,6 +1392,194 @@ public class CompactSegmentsTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDetermineSegmentGranularityFromSegmentsToCompact()
|
||||
{
|
||||
String dataSourceName = DATA_SOURCE_PREFIX + 1;
|
||||
List<DataSegment> segments = new ArrayList<>();
|
||||
segments.add(
|
||||
new DataSegment(
|
||||
dataSourceName,
|
||||
Intervals.of("2017-01-01T00:00:00/2017-01-02T00:00:00"),
|
||||
"1",
|
||||
null,
|
||||
ImmutableList.of(),
|
||||
ImmutableList.of(),
|
||||
shardSpecFactory.apply(0, 2),
|
||||
0,
|
||||
10L
|
||||
)
|
||||
);
|
||||
segments.add(
|
||||
new DataSegment(
|
||||
dataSourceName,
|
||||
Intervals.of("2017-01-01T00:00:00/2017-01-02T00:00:00"),
|
||||
"1",
|
||||
null,
|
||||
ImmutableList.of(),
|
||||
ImmutableList.of(),
|
||||
shardSpecFactory.apply(1, 2),
|
||||
0,
|
||||
10L
|
||||
)
|
||||
);
|
||||
dataSources = DataSourcesSnapshot
|
||||
.fromUsedSegments(segments, ImmutableMap.of())
|
||||
.getUsedSegmentsTimelinesPerDataSource();
|
||||
|
||||
|
||||
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
|
||||
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
|
||||
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
|
||||
compactionConfigs.add(
|
||||
new DataSourceCompactionConfig(
|
||||
dataSourceName,
|
||||
0,
|
||||
500L,
|
||||
null,
|
||||
new Period("PT0H"), // smaller than segment interval
|
||||
new UserCompactionTaskQueryTuningConfig(
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
partitionsSpec,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
3,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
)
|
||||
);
|
||||
doCompactSegments(compactSegments, compactionConfigs);
|
||||
ArgumentCaptor<List<DataSegment>> segmentsCaptor = ArgumentCaptor.forClass(List.class);
|
||||
ArgumentCaptor<ClientCompactionTaskGranularitySpec> granularitySpecArgumentCaptor = ArgumentCaptor.forClass(
|
||||
ClientCompactionTaskGranularitySpec.class);
|
||||
Mockito.verify(mockIndexingServiceClient).compactSegments(
|
||||
ArgumentMatchers.anyString(),
|
||||
segmentsCaptor.capture(),
|
||||
ArgumentMatchers.anyInt(),
|
||||
ArgumentMatchers.any(),
|
||||
granularitySpecArgumentCaptor.capture(),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.any()
|
||||
);
|
||||
Assert.assertEquals(2, segmentsCaptor.getValue().size());
|
||||
ClientCompactionTaskGranularitySpec actual = granularitySpecArgumentCaptor.getValue();
|
||||
Assert.assertNotNull(actual);
|
||||
ClientCompactionTaskGranularitySpec expected = new ClientCompactionTaskGranularitySpec(Granularities.DAY, null, null);
|
||||
Assert.assertEquals(expected, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDetermineSegmentGranularityFromSegmentGranularityInCompactionConfig()
|
||||
{
|
||||
String dataSourceName = DATA_SOURCE_PREFIX + 1;
|
||||
List<DataSegment> segments = new ArrayList<>();
|
||||
segments.add(
|
||||
new DataSegment(
|
||||
dataSourceName,
|
||||
Intervals.of("2017-01-01T00:00:00/2017-01-02T00:00:00"),
|
||||
"1",
|
||||
null,
|
||||
ImmutableList.of(),
|
||||
ImmutableList.of(),
|
||||
shardSpecFactory.apply(0, 2),
|
||||
0,
|
||||
10L
|
||||
)
|
||||
);
|
||||
segments.add(
|
||||
new DataSegment(
|
||||
dataSourceName,
|
||||
Intervals.of("2017-01-01T00:00:00/2017-01-02T00:00:00"),
|
||||
"1",
|
||||
null,
|
||||
ImmutableList.of(),
|
||||
ImmutableList.of(),
|
||||
shardSpecFactory.apply(1, 2),
|
||||
0,
|
||||
10L
|
||||
)
|
||||
);
|
||||
dataSources = DataSourcesSnapshot
|
||||
.fromUsedSegments(segments, ImmutableMap.of())
|
||||
.getUsedSegmentsTimelinesPerDataSource();
|
||||
|
||||
|
||||
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
|
||||
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
|
||||
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
|
||||
compactionConfigs.add(
|
||||
new DataSourceCompactionConfig(
|
||||
dataSourceName,
|
||||
0,
|
||||
500L,
|
||||
null,
|
||||
new Period("PT0H"), // smaller than segment interval
|
||||
new UserCompactionTaskQueryTuningConfig(
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
partitionsSpec,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
3,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
),
|
||||
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
)
|
||||
);
|
||||
doCompactSegments(compactSegments, compactionConfigs);
|
||||
ArgumentCaptor<List<DataSegment>> segmentsCaptor = ArgumentCaptor.forClass(List.class);
|
||||
ArgumentCaptor<ClientCompactionTaskGranularitySpec> granularitySpecArgumentCaptor = ArgumentCaptor.forClass(
|
||||
ClientCompactionTaskGranularitySpec.class);
|
||||
Mockito.verify(mockIndexingServiceClient).compactSegments(
|
||||
ArgumentMatchers.anyString(),
|
||||
segmentsCaptor.capture(),
|
||||
ArgumentMatchers.anyInt(),
|
||||
ArgumentMatchers.any(),
|
||||
granularitySpecArgumentCaptor.capture(),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.any()
|
||||
);
|
||||
Assert.assertEquals(2, segmentsCaptor.getValue().size());
|
||||
ClientCompactionTaskGranularitySpec actual = granularitySpecArgumentCaptor.getValue();
|
||||
Assert.assertNotNull(actual);
|
||||
ClientCompactionTaskGranularitySpec expected = new ClientCompactionTaskGranularitySpec(Granularities.YEAR, null, null);
|
||||
Assert.assertEquals(expected, actual);
|
||||
}
|
||||
|
||||
private void verifySnapshot(
|
||||
CompactSegments compactSegments,
|
||||
AutoCompactionSnapshot.AutoCompactionScheduleStatus scheduleStatus,
|
||||
|
|
Loading…
Reference in New Issue