From dbb9518f50d87f2a0512e8262ebc85a9f9f0e575 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 18 Mar 2022 12:46:16 -0700 Subject: [PATCH] Fix auto compaction by adjusting compaction task's interval to align with segmentGranularity when segmentGranularity is set (#12334) * add impl * add ITs * address comments * address comments * address comments * fix failure * fix checkstyle * fix checkstyle --- .../druid/java/util/common/JodaUtils.java | 19 +-- .../clients/OverlordResourceTestClient.java | 2 +- .../testing/guice/DruidTestModuleFactory.java | 4 +- .../duty/ITAutoCompactionTest.java | 101 ++++++++++++++++ .../ClientCompactionIntervalSpec.java | 37 +++++- .../indexing/HttpIndexingServiceClient.java | 4 +- .../ClientCompactionIntervalSpecTest.java | 114 ++++++++++++++++++ .../HttpIndexingServiceClientTest.java | 72 ++++++++++- 8 files changed, 336 insertions(+), 17 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java diff --git a/core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java b/core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java index 96c49d9ba0d..5d15bfe717f 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java @@ -24,9 +24,10 @@ import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; import org.apache.druid.java.util.common.guava.Comparators; import org.joda.time.DateTime; +import org.joda.time.DateTimeComparator; import org.joda.time.Interval; +import org.joda.time.chrono.ISOChronology; -import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -182,18 +183,18 @@ public class JodaUtils public static Interval umbrellaInterval(Iterable intervals) { - ArrayList startDates = new ArrayList<>(); - ArrayList endDates = new ArrayList<>(); + boolean emptyIntervals = true; + DateTimeComparator dateTimeComp = DateTimeComparator.getInstance(); + DateTime minStart = new DateTime(Long.MAX_VALUE, ISOChronology.getInstanceUTC()); + DateTime maxEnd = new DateTime(Long.MIN_VALUE, ISOChronology.getInstanceUTC()); for (Interval interval : intervals) { - startDates.add(interval.getStart()); - endDates.add(interval.getEnd()); + emptyIntervals = false; + minStart = Collections.min(ImmutableList.of(minStart, interval.getStart()), dateTimeComp); + maxEnd = Collections.max(ImmutableList.of(maxEnd, interval.getEnd()), dateTimeComp); } - DateTime minStart = minDateTime(startDates.toArray(new DateTime[0])); - DateTime maxEnd = maxDateTime(endDates.toArray(new DateTime[0])); - - if (minStart == null || maxEnd == null) { + if (emptyIntervals) { throw new IllegalArgumentException("Empty list of intervals"); } return new Interval(minStart, maxEnd); diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java index ea9481b3850..3df3d8a49d5 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java @@ -202,7 +202,7 @@ public class OverlordResourceTestClient try { StatusResponseHolder response = makeRequest( HttpMethod.GET, - StringUtils.format("%stask/%s", getIndexerURL(), taskId) + StringUtils.format("%stask/%s", getIndexerURL(), StringUtils.urlEncode(taskId)) ); LOG.debug("Task %s response %s", taskId, response.getContent()); return jsonMapper.readValue( diff --git a/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModuleFactory.java b/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModuleFactory.java index 2c3b59d9152..d08712dea35 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModuleFactory.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModuleFactory.java @@ -25,6 +25,7 @@ import com.google.inject.Module; import org.apache.druid.guice.GuiceInjectors; import org.apache.druid.guice.IndexingServiceFirehoseModule; import org.apache.druid.guice.IndexingServiceInputSourceModule; +import org.apache.druid.guice.IndexingServiceTuningConfigModule; import org.apache.druid.initialization.Initialization; import org.testng.IModuleFactory; import org.testng.ITestContext; @@ -50,7 +51,8 @@ public class DruidTestModuleFactory implements IModuleFactory return ImmutableList.of( new DruidTestModule(), new IndexingServiceFirehoseModule(), - new IndexingServiceInputSourceModule() + new IndexingServiceInputSourceModule(), + new IndexingServiceTuningConfigModule() ); } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 5081eb03162..5d9681c020e 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -30,8 +30,12 @@ import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; +import org.apache.druid.indexing.common.task.CompactionIntervalSpec; +import org.apache.druid.indexing.common.task.CompactionTask; +import org.apache.druid.indexing.overlord.http.TaskPayloadResponse; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; @@ -631,6 +635,103 @@ public class ITAutoCompactionTest extends AbstractIndexerTest } } + @Test + public void testAutoCompactionDutyWithSegmentGranularityFinerAndNotAlignWithSegment() throws Exception + { + updateCompactionTaskSlot(1, 1, null); + final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles")); + Map specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.MONTH, Granularities.DAY, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono)))); + loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs); + try (final Closeable ignored = unloader(fullDatasourceName)) { + Map expectedResult = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", "added", + "%%EXPECTED_COUNT_RESULT%%", 2, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0)))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + submitCompactionConfig( + MAX_ROWS_PER_SEGMENT_COMPACTED, + NO_SKIP_OFFSET, + new UserCompactionTaskGranularityConfig(Granularities.WEEK, null, null), + false + ); + // Before compaction, we have segments with the interval 2013-08-01/2013-09-01 and 2013-09-01/2013-10-01 + // We will compact the latest segment, 2013-09-01/2013-10-01, to WEEK. + // Since the start of the week does not align with 2013-09-01 or 2013-10-01, we expect the compaction task's + // interval to be adjusted so that the compacted WEEK segments does not unintentionally remove data of the + // non compacted 2013-08-01/2013-09-01 segment. + // Note that the compacted WEEK segment does not fully cover the original MONTH segment as the MONTH segment + // does not have data on every week on the month + forceTriggerAutoCompaction(3); + // Make sure that no data is lost after compaction + expectedResult = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", "added", + "%%EXPECTED_COUNT_RESULT%%", 2, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0)))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); + List tasks = indexer.getCompleteTasksForDataSource(fullDatasourceName); + TaskResponseObject compactTask = null; + for (TaskResponseObject task : tasks) { + if (task.getType().equals("compact")) { + compactTask = task; + } + } + Assert.assertNotNull(compactTask); + TaskPayloadResponse task = indexer.getTaskPayload(compactTask.getId()); + // Verify that compaction task interval is adjusted to align with segmentGranularity + Assert.assertEquals(Intervals.of("2013-08-26T00:00:00.000Z/2013-10-07T00:00:00.000Z"), ((CompactionIntervalSpec) ((CompactionTask) task.getPayload()).getIoConfig().getInputSpec()).getInterval()); + } + } + + @Test + public void testAutoCompactionDutyWithSegmentGranularityCoarserAndNotAlignWithSegment() throws Exception + { + updateCompactionTaskSlot(1, 1, null); + final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles")); + Map specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.WEEK, Granularities.DAY, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono)))); + loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs); + try (final Closeable ignored = unloader(fullDatasourceName)) { + Map expectedResult = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", "added", + "%%EXPECTED_COUNT_RESULT%%", 2, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0)))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + submitCompactionConfig( + MAX_ROWS_PER_SEGMENT_COMPACTED, + NO_SKIP_OFFSET, + new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null), + false + ); + // Before compaction, we have segments with the interval 2013-08-26T00:00:00.000Z/2013-09-02T00:00:00.000Z + // We will compact the latest segment to MONTH. + // Although the segments before compaction only cover 2013-08-26 to 2013-09-02, + // we expect the compaction task's interval to align with the MONTH segmentGranularity (2013-08-01 to 2013-10-01) + forceTriggerAutoCompaction(2); + // Make sure that no data is lost after compaction + expectedResult = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", "added", + "%%EXPECTED_COUNT_RESULT%%", 2, + "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0)))) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult); + verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); + List tasks = indexer.getCompleteTasksForDataSource(fullDatasourceName); + TaskResponseObject compactTask = null; + for (TaskResponseObject task : tasks) { + if (task.getType().equals("compact")) { + compactTask = task; + } + } + Assert.assertNotNull(compactTask); + TaskPayloadResponse task = indexer.getTaskPayload(compactTask.getId()); + // Verify that compaction task interval is adjusted to align with segmentGranularity + Assert.assertEquals(Intervals.of("2013-08-01T00:00:00.000Z/2013-10-01T00:00:00.000Z"), ((CompactionIntervalSpec) ((CompactionTask) task.getPayload()).getIoConfig().getInputSpec()).getInterval()); + } + } + @Test public void testAutoCompactionDutyWithRollup() throws Exception { diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java index 6aaa197a63b..7ff9ff424fd 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.timeline.DataSegment; import org.joda.time.Interval; @@ -38,18 +40,45 @@ import java.util.stream.Collectors; */ public class ClientCompactionIntervalSpec { + private static final Logger LOGGER = new Logger(ClientCompactionIntervalSpec.class); + private static final String TYPE = "interval"; private final Interval interval; @Nullable private final String sha256OfSortedSegmentIds; - public static ClientCompactionIntervalSpec fromSegments(List segments) + public static ClientCompactionIntervalSpec fromSegments(List segments, @Nullable Granularity segmentGranularity) { + Interval interval = JodaUtils.umbrellaInterval(segments.stream().map(DataSegment::getInterval).collect(Collectors.toList())); + LOGGER.info("Original umbrella interval %s in compaction task for datasource %s", interval, segments.get(0).getDataSource()); + if (segmentGranularity != null) { + // If segmentGranularity is set, then the segmentGranularity of the segments may not align with the configured segmentGranularity + // We must adjust the interval of the compaction task to fully cover and align with the segmentGranularity + // For example, + // - The umbrella interval of the segments is 2015-04-11/2015-04-12 but configured segmentGranularity is YEAR, + // if the compaction task's interval is 2015-04-11/2015-04-12 then we can run into race condition where after submitting + // the compaction task, a new segment outside of the interval (i.e. 2015-02-11/2015-02-12) got created will be lost as it is + // overshadowed by the compacted segment (compacted segment has interval 2015-01-01/2016-01-01. + // Hence, in this case, we must adjust the compaction task interval to 2015-01-01/2016-01-01. + // - The segment to be compacted has MONTH segmentGranularity with the interval 2015-02-01/2015-03-01 but configured + // segmentGranularity is WEEK. If the compaction task's interval is 2015-02-01/2015-03-01 then compacted segments created will be + // 2015-01-26/2015-02-02, 2015-02-02/2015-02-09, 2015-02-09/2015-02-16, 2015-02-16/2015-02-23, 2015-02-23/2015-03-02. + // This is because Druid's WEEK segments alway start and end on Monday. In the above example, 2015-01-26 and 2015-03-02 + // are Mondays but 2015-02-01 and 2015-03-01 are not. Hence, the WEEK segments have to start and end on 2015-01-26 and 2015-03-02. + // If the compaction task's interval is 2015-02-01/2015-03-01, then the compacted segment would cause existing data + // from 2015-01-26 to 2015-02-01 and 2015-03-01 to 2015-03-02 to be lost. Hence, in this case, + // we must adjust the compaction task interval to 2015-01-26/2015-03-02 + interval = JodaUtils.umbrellaInterval(segmentGranularity.getIterable(interval)); + LOGGER.info( + "Interval adjusted to %s in compaction task for datasource %s with configured segmentGranularity %s", + interval, + segments.get(0).getDataSource(), + segmentGranularity + ); + } return new ClientCompactionIntervalSpec( - JodaUtils.umbrellaInterval( - segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()) - ), + interval, null ); } diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java index b63f3f09887..b811f6a5bd1 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java @@ -31,6 +31,7 @@ import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; @@ -103,10 +104,11 @@ public class HttpIndexingServiceClient implements IndexingServiceClient context.put("priority", compactionTaskPriority); final String taskId = IdUtils.newTaskId(idPrefix, ClientCompactionTaskQuery.TYPE, dataSource, null); + final Granularity segmentGranularity = granularitySpec == null ? null : granularitySpec.getSegmentGranularity(); final ClientTaskQuery taskQuery = new ClientCompactionTaskQuery( taskId, dataSource, - new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments), dropExisting), + new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments, segmentGranularity), dropExisting), tuningConfig, granularitySpec, dimensionsSpec, diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java new file mode 100644 index 00000000000..646e798aa66 --- /dev/null +++ b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client.indexing; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NoneShardSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; + +public class ClientCompactionIntervalSpecTest +{ + private final DataSegment dataSegment1 = new DataSegment( + "test", + Intervals.of("2015-04-11/2015-04-12"), + DateTimes.nowUtc().toString(), + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + IndexIO.CURRENT_VERSION_ID, + 11 + ); + private final DataSegment dataSegment2 = new DataSegment( + "test", + Intervals.of("2015-04-12/2015-04-14"), + DateTimes.nowUtc().toString(), + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + IndexIO.CURRENT_VERSION_ID, + 11 + ); + private final DataSegment dataSegment3 = new DataSegment( + "test", + Intervals.of("2015-02-12/2015-03-13"), + DateTimes.nowUtc().toString(), + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + IndexIO.CURRENT_VERSION_ID, + 11 + ); + + @Test + public void testFromSegmentWithNoSegmentGranularity() + { + // The umbrella interval of segments is 2015-02-12/2015-04-14 + ClientCompactionIntervalSpec actual = ClientCompactionIntervalSpec.fromSegments(ImmutableList.of(dataSegment1, dataSegment2, dataSegment3), null); + Assert.assertEquals(Intervals.of("2015-02-12/2015-04-14"), actual.getInterval()); + } + + @Test + public void testFromSegmentWitSegmentGranularitySameAsSegment() + { + // The umbrella interval of segments is 2015-04-11/2015-04-12 + ClientCompactionIntervalSpec actual = ClientCompactionIntervalSpec.fromSegments(ImmutableList.of(dataSegment1), Granularities.DAY); + Assert.assertEquals(Intervals.of("2015-04-11/2015-04-12"), actual.getInterval()); + } + + @Test + public void testFromSegmentWithCoarserSegmentGranularity() + { + // The umbrella interval of segments is 2015-02-12/2015-04-14 + ClientCompactionIntervalSpec actual = ClientCompactionIntervalSpec.fromSegments(ImmutableList.of(dataSegment1, dataSegment2, dataSegment3), Granularities.YEAR); + // The compaction interval should be expanded to start of the year and end of the year to cover the segmentGranularity + Assert.assertEquals(Intervals.of("2015-01-01/2016-01-01"), actual.getInterval()); + } + + @Test + public void testFromSegmentWithFinerSegmentGranularityAndUmbrellaIntervalAlign() + { + // The umbrella interval of segments is 2015-02-12/2015-04-14 + ClientCompactionIntervalSpec actual = ClientCompactionIntervalSpec.fromSegments(ImmutableList.of(dataSegment1, dataSegment2, dataSegment3), Granularities.DAY); + // The segmentGranularity of DAY align with the umbrella interval (umbrella interval can be evenly divide into the segmentGranularity) + Assert.assertEquals(Intervals.of("2015-02-12/2015-04-14"), actual.getInterval()); + } + + @Test + public void testFromSegmentWithFinerSegmentGranularityAndUmbrellaIntervalNotAlign() + { + // The umbrella interval of segments is 2015-02-12/2015-04-14 + ClientCompactionIntervalSpec actual = ClientCompactionIntervalSpec.fromSegments(ImmutableList.of(dataSegment1, dataSegment2, dataSegment3), Granularities.WEEK); + // The segmentGranularity of WEEK does not align with the umbrella interval (umbrella interval cannot be evenly divide into the segmentGranularity) + // Hence the compaction interval is modified to aling with the segmentGranularity + Assert.assertEquals(Intervals.of("2015-02-09/2015-04-20"), actual.getInterval()); + } +} diff --git a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java index 445d129a11b..19fa4df0b5e 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; @@ -338,8 +339,77 @@ public class HttpIndexingServiceClientTest // Ignore IllegalStateException as taskId is internally generated and returned task id will failed check Assert.assertEquals(IllegalStateException.class.getName(), e.getCause().getClass().getName()); } - ClientCompactionTaskQuery taskQuery = (ClientCompactionTaskQuery) captureTask.getValue(); + Assert.assertEquals(Intervals.of("2015-04-12/2015-04-13"), taskQuery.getIoConfig().getInputSpec().getInterval()); + Assert.assertNull(taskQuery.getGranularitySpec()); + Assert.assertNull(taskQuery.getIoConfig().getInputSpec().getSha256OfSortedSegmentIds()); + } + + @Test + public void testCompactWithSegmentGranularity() throws Exception + { + DataSegment segment = new DataSegment( + "test", + Intervals.of("2015-04-12/2015-04-13"), + "1", + ImmutableMap.of("bucket", "bucket", "path", "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip"), + null, + null, + NoneShardSpec.instance(), + 0, + 1 + ); + Capture captureTask = EasyMock.newCapture(); + HttpResponse response = EasyMock.createMock(HttpResponse.class); + EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); + EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0)); + EasyMock.replay(response); + + StringFullResponseHolder responseHolder = new StringFullResponseHolder( + response, + StandardCharsets.UTF_8 + ).addChunk(jsonMapper.writeValueAsString(ImmutableMap.of("task", "aaa"))); + + EasyMock.expect(druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/task")) + .andReturn(new Request(HttpMethod.POST, new URL("http://localhost:8090/druid/indexer/v1/task"))) + .anyTimes(); + EasyMock.expect(druidLeaderClient.go(EasyMock.anyObject(Request.class))) + .andReturn(responseHolder) + .anyTimes(); + EasyMock.expect(mockMapper.writeValueAsBytes(EasyMock.capture(captureTask))) + .andReturn(new byte[]{1, 2, 3}) + .anyTimes(); + EasyMock.expect(mockMapper.readValue(EasyMock.anyString(), EasyMock.eq(JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT))) + .andReturn(ImmutableMap.of()) + .anyTimes(); + EasyMock.replay(druidLeaderClient, mockMapper); + + HttpIndexingServiceClient httpIndexingServiceClient = new HttpIndexingServiceClient( + mockMapper, + druidLeaderClient + ); + + try { + httpIndexingServiceClient.compactSegments( + "test-compact", + ImmutableList.of(segment), + 50, + null, + new ClientCompactionTaskGranularitySpec(Granularities.YEAR, null, null), + null, + null, + null, + null, + null + ); + } + catch (Exception e) { + // Ignore IllegalStateException as taskId is internally generated and returned task id will failed check + Assert.assertEquals(IllegalStateException.class.getName(), e.getCause().getClass().getName()); + } + ClientCompactionTaskQuery taskQuery = (ClientCompactionTaskQuery) captureTask.getValue(); + Assert.assertEquals(Intervals.of("2015-01-01/2016-01-01"), taskQuery.getIoConfig().getInputSpec().getInterval()); + Assert.assertEquals(Granularities.YEAR, taskQuery.getGranularitySpec().getSegmentGranularity()); Assert.assertNull(taskQuery.getIoConfig().getInputSpec().getSha256OfSortedSegmentIds()); }