diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 6440f56b588..a3fa80f86d0 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -25,6 +25,7 @@ import com.google.inject.Inject; import org.apache.commons.io.IOUtils; import org.apache.druid.data.input.MaxSizeSplitHintSpec; import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; @@ -775,6 +776,59 @@ public class ITAutoCompactionTest extends AbstractIndexerTest } } + @Test + public void testAutoCompactionDutyWithOverlappingInterval() throws Exception + { + final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles")); + Map specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.WEEK, Granularities.NONE, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono)))); + // Create WEEK segment with 2013-08-26 to 2013-09-02 + loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs); + specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.MONTH, Granularities.NONE, false, ImmutableList.of(new Interval("2013-09-01/2013-09-02", chrono)))); + // Create MONTH segment with 2013-09-01 to 2013-10-01 + loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs); + + try (final Closeable ignored = unloader(fullDatasourceName)) { + verifySegmentsCount(2); + + // Result is not rollup + // For dim "page", result has values "Gypsy Danger" and "Striker Eureka" + Map expectedResult = ImmutableMap.of( + "%%EXPECTED_COUNT_RESULT%%", + 2, + "%%EXPECTED_SCAN_RESULT%%", + ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0)))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + + submitCompactionConfig( + MAX_ROWS_PER_SEGMENT_COMPACTED, + NO_SKIP_OFFSET, + null, + null, + null, + false + ); + // Compact the MONTH segment + forceTriggerAutoCompaction(2); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + + // Compact the WEEK segment + forceTriggerAutoCompaction(2); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + + // Verify all task succeed + List compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName); + for (TaskResponseObject taskResponseObject : compactTasksBefore) { + Assert.assertEquals(TaskState.SUCCESS, taskResponseObject.getStatus()); + } + + // Verify compacted segments does not get compacted again + forceTriggerAutoCompaction(2); + List compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName); + Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size()); + } + } + private void loadData(String indexTask) throws Exception { loadData(indexTask, ImmutableMap.of()); diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java index ddec6356834..6aaa197a63b 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.JodaUtils; -import org.apache.druid.segment.SegmentUtils; import org.apache.druid.timeline.DataSegment; import org.joda.time.Interval; @@ -51,7 +50,7 @@ public class ClientCompactionIntervalSpec JodaUtils.umbrellaInterval( segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()) ), - SegmentUtils.hashIds(segments) + null ); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index 634caf302a4..f5c8b3a4250 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -31,8 +31,10 @@ import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.client.indexing.TaskPayloadResponse; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.CompactionStatistics; @@ -343,18 +345,37 @@ public class CompactSegments implements CoordinatorDuty snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size()); final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName); + // Create granularitySpec to send to compaction task ClientCompactionTaskGranularitySpec granularitySpec; - if (config.getGranularitySpec() != null) { - granularitySpec = new ClientCompactionTaskGranularitySpec( - config.getGranularitySpec().getSegmentGranularity(), - config.getGranularitySpec().getQueryGranularity(), - config.getGranularitySpec().isRollup() - - ); + Granularity segmentGranularityToUse = null; + if (config.getGranularitySpec() == null || config.getGranularitySpec().getSegmentGranularity() == null) { + // Determines segmentGranularity from the segmentsToCompact + // Each batch of segmentToCompact from CompactionSegmentIterator will contains the same interval as + // segmentGranularity is not set in the compaction config + Interval interval = segmentsToCompact.get(0).getInterval(); + if (segmentsToCompact.stream().allMatch(segment -> interval.overlaps(segment.getInterval()))) { + try { + segmentGranularityToUse = GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity(); + } + catch (IAE iae) { + // This case can happen if the existing segment interval result in complicated periods. + // Fall back to setting segmentGranularity as null + LOG.warn("Cannot determine segmentGranularity from interval [%s]", interval); + } + } else { + LOG.warn("segmentsToCompact does not have the same interval. Fallback to not setting segmentGranularity for auto compaction task"); + } } else { - granularitySpec = null; + segmentGranularityToUse = config.getGranularitySpec().getSegmentGranularity(); } + granularitySpec = new ClientCompactionTaskGranularitySpec( + segmentGranularityToUse, + config.getGranularitySpec() != null ? config.getGranularitySpec().getQueryGranularity() : null, + config.getGranularitySpec() != null ? config.getGranularitySpec().isRollup() : null + + ); + // Create dimensionsSpec to send to compaction task ClientCompactionTaskDimensionsSpec dimensionsSpec; if (config.getDimensionsSpec() != null) { diff --git a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java index b2fed8528e3..cf1a84f7487 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java @@ -24,8 +24,13 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NoneShardSpec; +import org.easymock.Capture; import org.easymock.EasyMock; import org.jboss.netty.buffer.BigEndianHeapChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer; @@ -48,6 +53,7 @@ public class HttpIndexingServiceClientTest private HttpIndexingServiceClient httpIndexingServiceClient; private ObjectMapper jsonMapper; private DruidLeaderClient druidLeaderClient; + private ObjectMapper mockMapper; @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -57,6 +63,8 @@ public class HttpIndexingServiceClientTest { jsonMapper = new DefaultObjectMapper(); druidLeaderClient = EasyMock.createMock(DruidLeaderClient.class); + mockMapper = EasyMock.createMock(ObjectMapper.class); + httpIndexingServiceClient = new HttpIndexingServiceClient( jsonMapper, druidLeaderClient @@ -268,5 +276,70 @@ public class HttpIndexingServiceClientTest EasyMock.verify(druidLeaderClient, response); } + @Test + public void testCompact() throws Exception + { + DataSegment segment = new DataSegment( + "test", + Intervals.of("2015-04-12/2015-04-13"), + "1", + ImmutableMap.of("bucket", "bucket", "path", "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip"), + null, + null, + NoneShardSpec.instance(), + 0, + 1 + ); + Capture captureTask = EasyMock.newCapture(); + HttpResponse response = EasyMock.createMock(HttpResponse.class); + EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); + EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0)); + EasyMock.replay(response); + + StringFullResponseHolder responseHolder = new StringFullResponseHolder( + response, + StandardCharsets.UTF_8 + ).addChunk(jsonMapper.writeValueAsString(ImmutableMap.of("task", "aaa"))); + + EasyMock.expect(druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/task")) + .andReturn(new Request(HttpMethod.POST, new URL("http://localhost:8090/druid/indexer/v1/task"))) + .anyTimes(); + EasyMock.expect(druidLeaderClient.go(EasyMock.anyObject(Request.class))) + .andReturn(responseHolder) + .anyTimes(); + EasyMock.expect(mockMapper.writeValueAsBytes(EasyMock.capture(captureTask))) + .andReturn(new byte[]{1, 2, 3}) + .anyTimes(); + EasyMock.expect(mockMapper.readValue(EasyMock.anyString(), EasyMock.eq(JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT))) + .andReturn(ImmutableMap.of()) + .anyTimes(); + EasyMock.replay(druidLeaderClient, mockMapper); + + HttpIndexingServiceClient httpIndexingServiceClient = new HttpIndexingServiceClient( + mockMapper, + druidLeaderClient + ); + + try { + httpIndexingServiceClient.compactSegments( + "test-compact", + ImmutableList.of(segment), + 50, + null, + null, + null, + null, + null, + null + ); + } + catch (Exception e) { + // Ignore IllegalStateException as taskId is internally generated and returned task id will failed check + Assert.assertEquals(IllegalStateException.class.getName(), e.getCause().getClass().getName()); + } + + ClientCompactionTaskQuery taskQuery = (ClientCompactionTaskQuery) captureTask.getValue(); + Assert.assertNull(taskQuery.getIoConfig().getInputSpec().getSha256OfSortedSegmentIds()); + } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index 68b9eb2ca73..8fb46a442ff 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -702,7 +702,9 @@ public class CompactSegmentsTest ); // Only the same amount of segments as the original PARTITION_PER_TIME_INTERVAL since segment granulartity is the same Assert.assertEquals(PARTITION_PER_TIME_INTERVAL, segmentsCaptor.getValue().size()); - Assert.assertNull(granularitySpecArgumentCaptor.getValue()); + Assert.assertNull(granularitySpecArgumentCaptor.getValue().getSegmentGranularity()); + Assert.assertNull(granularitySpecArgumentCaptor.getValue().getQueryGranularity()); + Assert.assertNull(granularitySpecArgumentCaptor.getValue().isRollup()); } @Test @@ -1390,6 +1392,194 @@ public class CompactSegmentsTest ); } + @Test + public void testDetermineSegmentGranularityFromSegmentsToCompact() + { + String dataSourceName = DATA_SOURCE_PREFIX + 1; + List segments = new ArrayList<>(); + segments.add( + new DataSegment( + dataSourceName, + Intervals.of("2017-01-01T00:00:00/2017-01-02T00:00:00"), + "1", + null, + ImmutableList.of(), + ImmutableList.of(), + shardSpecFactory.apply(0, 2), + 0, + 10L + ) + ); + segments.add( + new DataSegment( + dataSourceName, + Intervals.of("2017-01-01T00:00:00/2017-01-02T00:00:00"), + "1", + null, + ImmutableList.of(), + ImmutableList.of(), + shardSpecFactory.apply(1, 2), + 0, + 10L + ) + ); + dataSources = DataSourcesSnapshot + .fromUsedSegments(segments, ImmutableMap.of()) + .getUsedSegmentsTimelinesPerDataSource(); + + + final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); + final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient); + final List compactionConfigs = new ArrayList<>(); + compactionConfigs.add( + new DataSourceCompactionConfig( + dataSourceName, + 0, + 500L, + null, + new Period("PT0H"), // smaller than segment interval + new UserCompactionTaskQueryTuningConfig( + null, + null, + null, + null, + partitionsSpec, + null, + null, + null, + null, + null, + 3, + null, + null, + null, + null, + null, + null + ), + null, + null, + null, + null, + null + ) + ); + doCompactSegments(compactSegments, compactionConfigs); + ArgumentCaptor> segmentsCaptor = ArgumentCaptor.forClass(List.class); + ArgumentCaptor granularitySpecArgumentCaptor = ArgumentCaptor.forClass( + ClientCompactionTaskGranularitySpec.class); + Mockito.verify(mockIndexingServiceClient).compactSegments( + ArgumentMatchers.anyString(), + segmentsCaptor.capture(), + ArgumentMatchers.anyInt(), + ArgumentMatchers.any(), + granularitySpecArgumentCaptor.capture(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + ); + Assert.assertEquals(2, segmentsCaptor.getValue().size()); + ClientCompactionTaskGranularitySpec actual = granularitySpecArgumentCaptor.getValue(); + Assert.assertNotNull(actual); + ClientCompactionTaskGranularitySpec expected = new ClientCompactionTaskGranularitySpec(Granularities.DAY, null, null); + Assert.assertEquals(expected, actual); + } + + @Test + public void testDetermineSegmentGranularityFromSegmentGranularityInCompactionConfig() + { + String dataSourceName = DATA_SOURCE_PREFIX + 1; + List segments = new ArrayList<>(); + segments.add( + new DataSegment( + dataSourceName, + Intervals.of("2017-01-01T00:00:00/2017-01-02T00:00:00"), + "1", + null, + ImmutableList.of(), + ImmutableList.of(), + shardSpecFactory.apply(0, 2), + 0, + 10L + ) + ); + segments.add( + new DataSegment( + dataSourceName, + Intervals.of("2017-01-01T00:00:00/2017-01-02T00:00:00"), + "1", + null, + ImmutableList.of(), + ImmutableList.of(), + shardSpecFactory.apply(1, 2), + 0, + 10L + ) + ); + dataSources = DataSourcesSnapshot + .fromUsedSegments(segments, ImmutableMap.of()) + .getUsedSegmentsTimelinesPerDataSource(); + + + final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); + final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient); + final List compactionConfigs = new ArrayList<>(); + compactionConfigs.add( + new DataSourceCompactionConfig( + dataSourceName, + 0, + 500L, + null, + new Period("PT0H"), // smaller than segment interval + new UserCompactionTaskQueryTuningConfig( + null, + null, + null, + null, + partitionsSpec, + null, + null, + null, + null, + null, + 3, + null, + null, + null, + null, + null, + null + ), + new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null), + null, + null, + null, + null + ) + ); + doCompactSegments(compactSegments, compactionConfigs); + ArgumentCaptor> segmentsCaptor = ArgumentCaptor.forClass(List.class); + ArgumentCaptor granularitySpecArgumentCaptor = ArgumentCaptor.forClass( + ClientCompactionTaskGranularitySpec.class); + Mockito.verify(mockIndexingServiceClient).compactSegments( + ArgumentMatchers.anyString(), + segmentsCaptor.capture(), + ArgumentMatchers.anyInt(), + ArgumentMatchers.any(), + granularitySpecArgumentCaptor.capture(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + ); + Assert.assertEquals(2, segmentsCaptor.getValue().size()); + ClientCompactionTaskGranularitySpec actual = granularitySpecArgumentCaptor.getValue(); + Assert.assertNotNull(actual); + ClientCompactionTaskGranularitySpec expected = new ClientCompactionTaskGranularitySpec(Granularities.YEAR, null, null); + Assert.assertEquals(expected, actual); + } + private void verifySnapshot( CompactSegments compactSegments, AutoCompactionSnapshot.AutoCompactionScheduleStatus scheduleStatus,