Fix auto compaction by adjusting compaction task's interval to align with segmentGranularity when segmentGranularity is set (#12334)

* add impl

* add ITs

* address comments

* address comments

* address comments

* fix failure

* fix checkstyle

* fix checkstyle
This commit is contained in:
Maytas Monsereenusorn 2022-03-18 12:46:16 -07:00 committed by GitHub
parent 6f0e5f25fa
commit dbb9518f50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 336 additions and 17 deletions

View File

@ -24,9 +24,10 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import org.apache.druid.java.util.common.guava.Comparators;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@ -182,18 +183,18 @@ public class JodaUtils
public static Interval umbrellaInterval(Iterable<Interval> intervals)
{
ArrayList<DateTime> startDates = new ArrayList<>();
ArrayList<DateTime> endDates = new ArrayList<>();
boolean emptyIntervals = true;
DateTimeComparator dateTimeComp = DateTimeComparator.getInstance();
DateTime minStart = new DateTime(Long.MAX_VALUE, ISOChronology.getInstanceUTC());
DateTime maxEnd = new DateTime(Long.MIN_VALUE, ISOChronology.getInstanceUTC());
for (Interval interval : intervals) {
startDates.add(interval.getStart());
endDates.add(interval.getEnd());
emptyIntervals = false;
minStart = Collections.min(ImmutableList.of(minStart, interval.getStart()), dateTimeComp);
maxEnd = Collections.max(ImmutableList.of(maxEnd, interval.getEnd()), dateTimeComp);
}
DateTime minStart = minDateTime(startDates.toArray(new DateTime[0]));
DateTime maxEnd = maxDateTime(endDates.toArray(new DateTime[0]));
if (minStart == null || maxEnd == null) {
if (emptyIntervals) {
throw new IllegalArgumentException("Empty list of intervals");
}
return new Interval(minStart, maxEnd);

View File

@ -202,7 +202,7 @@ public class OverlordResourceTestClient
try {
StatusResponseHolder response = makeRequest(
HttpMethod.GET,
StringUtils.format("%stask/%s", getIndexerURL(), taskId)
StringUtils.format("%stask/%s", getIndexerURL(), StringUtils.urlEncode(taskId))
);
LOG.debug("Task %s response %s", taskId, response.getContent());
return jsonMapper.readValue(

View File

@ -25,6 +25,7 @@ import com.google.inject.Module;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.IndexingServiceFirehoseModule;
import org.apache.druid.guice.IndexingServiceInputSourceModule;
import org.apache.druid.guice.IndexingServiceTuningConfigModule;
import org.apache.druid.initialization.Initialization;
import org.testng.IModuleFactory;
import org.testng.ITestContext;
@ -50,7 +51,8 @@ public class DruidTestModuleFactory implements IModuleFactory
return ImmutableList.of(
new DruidTestModule(),
new IndexingServiceFirehoseModule(),
new IndexingServiceInputSourceModule()
new IndexingServiceInputSourceModule(),
new IndexingServiceTuningConfigModule()
);
}

View File

@ -30,8 +30,12 @@ import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.task.CompactionIntervalSpec;
import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.overlord.http.TaskPayloadResponse;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
@ -631,6 +635,103 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
@Test
public void testAutoCompactionDutyWithSegmentGranularityFinerAndNotAlignWithSegment() throws Exception
{
updateCompactionTaskSlot(1, 1, null);
final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles"));
Map<String, Object> specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.MONTH, Granularities.DAY, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono))));
loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs);
try (final Closeable ignored = unloader(fullDatasourceName)) {
Map<String, Object> expectedResult = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
submitCompactionConfig(
MAX_ROWS_PER_SEGMENT_COMPACTED,
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(Granularities.WEEK, null, null),
false
);
// Before compaction, we have segments with the interval 2013-08-01/2013-09-01 and 2013-09-01/2013-10-01
// We will compact the latest segment, 2013-09-01/2013-10-01, to WEEK.
// Since the start of the week does not align with 2013-09-01 or 2013-10-01, we expect the compaction task's
// interval to be adjusted so that the compacted WEEK segments does not unintentionally remove data of the
// non compacted 2013-08-01/2013-09-01 segment.
// Note that the compacted WEEK segment does not fully cover the original MONTH segment as the MONTH segment
// does not have data on every week on the month
forceTriggerAutoCompaction(3);
// Make sure that no data is lost after compaction
expectedResult = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
List<TaskResponseObject> tasks = indexer.getCompleteTasksForDataSource(fullDatasourceName);
TaskResponseObject compactTask = null;
for (TaskResponseObject task : tasks) {
if (task.getType().equals("compact")) {
compactTask = task;
}
}
Assert.assertNotNull(compactTask);
TaskPayloadResponse task = indexer.getTaskPayload(compactTask.getId());
// Verify that compaction task interval is adjusted to align with segmentGranularity
Assert.assertEquals(Intervals.of("2013-08-26T00:00:00.000Z/2013-10-07T00:00:00.000Z"), ((CompactionIntervalSpec) ((CompactionTask) task.getPayload()).getIoConfig().getInputSpec()).getInterval());
}
}
@Test
public void testAutoCompactionDutyWithSegmentGranularityCoarserAndNotAlignWithSegment() throws Exception
{
updateCompactionTaskSlot(1, 1, null);
final ISOChronology chrono = ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles"));
Map<String, Object> specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.WEEK, Granularities.DAY, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono))));
loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs);
try (final Closeable ignored = unloader(fullDatasourceName)) {
Map<String, Object> expectedResult = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
submitCompactionConfig(
MAX_ROWS_PER_SEGMENT_COMPACTED,
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null),
false
);
// Before compaction, we have segments with the interval 2013-08-26T00:00:00.000Z/2013-09-02T00:00:00.000Z
// We will compact the latest segment to MONTH.
// Although the segments before compaction only cover 2013-08-26 to 2013-09-02,
// we expect the compaction task's interval to align with the MONTH segmentGranularity (2013-08-01 to 2013-10-01)
forceTriggerAutoCompaction(2);
// Make sure that no data is lost after compaction
expectedResult = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
List<TaskResponseObject> tasks = indexer.getCompleteTasksForDataSource(fullDatasourceName);
TaskResponseObject compactTask = null;
for (TaskResponseObject task : tasks) {
if (task.getType().equals("compact")) {
compactTask = task;
}
}
Assert.assertNotNull(compactTask);
TaskPayloadResponse task = indexer.getTaskPayload(compactTask.getId());
// Verify that compaction task interval is adjusted to align with segmentGranularity
Assert.assertEquals(Intervals.of("2013-08-01T00:00:00.000Z/2013-10-01T00:00:00.000Z"), ((CompactionIntervalSpec) ((CompactionTask) task.getPayload()).getIoConfig().getInputSpec()).getInterval());
}
}
@Test
public void testAutoCompactionDutyWithRollup() throws Exception
{

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
@ -38,18 +40,45 @@ import java.util.stream.Collectors;
*/
public class ClientCompactionIntervalSpec
{
private static final Logger LOGGER = new Logger(ClientCompactionIntervalSpec.class);
private static final String TYPE = "interval";
private final Interval interval;
@Nullable
private final String sha256OfSortedSegmentIds;
public static ClientCompactionIntervalSpec fromSegments(List<DataSegment> segments)
public static ClientCompactionIntervalSpec fromSegments(List<DataSegment> segments, @Nullable Granularity segmentGranularity)
{
Interval interval = JodaUtils.umbrellaInterval(segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()));
LOGGER.info("Original umbrella interval %s in compaction task for datasource %s", interval, segments.get(0).getDataSource());
if (segmentGranularity != null) {
// If segmentGranularity is set, then the segmentGranularity of the segments may not align with the configured segmentGranularity
// We must adjust the interval of the compaction task to fully cover and align with the segmentGranularity
// For example,
// - The umbrella interval of the segments is 2015-04-11/2015-04-12 but configured segmentGranularity is YEAR,
// if the compaction task's interval is 2015-04-11/2015-04-12 then we can run into race condition where after submitting
// the compaction task, a new segment outside of the interval (i.e. 2015-02-11/2015-02-12) got created will be lost as it is
// overshadowed by the compacted segment (compacted segment has interval 2015-01-01/2016-01-01.
// Hence, in this case, we must adjust the compaction task interval to 2015-01-01/2016-01-01.
// - The segment to be compacted has MONTH segmentGranularity with the interval 2015-02-01/2015-03-01 but configured
// segmentGranularity is WEEK. If the compaction task's interval is 2015-02-01/2015-03-01 then compacted segments created will be
// 2015-01-26/2015-02-02, 2015-02-02/2015-02-09, 2015-02-09/2015-02-16, 2015-02-16/2015-02-23, 2015-02-23/2015-03-02.
// This is because Druid's WEEK segments alway start and end on Monday. In the above example, 2015-01-26 and 2015-03-02
// are Mondays but 2015-02-01 and 2015-03-01 are not. Hence, the WEEK segments have to start and end on 2015-01-26 and 2015-03-02.
// If the compaction task's interval is 2015-02-01/2015-03-01, then the compacted segment would cause existing data
// from 2015-01-26 to 2015-02-01 and 2015-03-01 to 2015-03-02 to be lost. Hence, in this case,
// we must adjust the compaction task interval to 2015-01-26/2015-03-02
interval = JodaUtils.umbrellaInterval(segmentGranularity.getIterable(interval));
LOGGER.info(
"Interval adjusted to %s in compaction task for datasource %s with configured segmentGranularity %s",
interval,
segments.get(0).getDataSource(),
segmentGranularity
);
}
return new ClientCompactionIntervalSpec(
JodaUtils.umbrellaInterval(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
),
interval,
null
);
}

View File

@ -31,6 +31,7 @@ import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
@ -103,10 +104,11 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
context.put("priority", compactionTaskPriority);
final String taskId = IdUtils.newTaskId(idPrefix, ClientCompactionTaskQuery.TYPE, dataSource, null);
final Granularity segmentGranularity = granularitySpec == null ? null : granularitySpec.getSegmentGranularity();
final ClientTaskQuery taskQuery = new ClientCompactionTaskQuery(
taskId,
dataSource,
new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments), dropExisting),
new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments, segmentGranularity), dropExisting),
tuningConfig,
granularitySpec,
dimensionsSpec,

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.client.indexing;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
public class ClientCompactionIntervalSpecTest
{
private final DataSegment dataSegment1 = new DataSegment(
"test",
Intervals.of("2015-04-11/2015-04-12"),
DateTimes.nowUtc().toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
IndexIO.CURRENT_VERSION_ID,
11
);
private final DataSegment dataSegment2 = new DataSegment(
"test",
Intervals.of("2015-04-12/2015-04-14"),
DateTimes.nowUtc().toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
IndexIO.CURRENT_VERSION_ID,
11
);
private final DataSegment dataSegment3 = new DataSegment(
"test",
Intervals.of("2015-02-12/2015-03-13"),
DateTimes.nowUtc().toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
IndexIO.CURRENT_VERSION_ID,
11
);
@Test
public void testFromSegmentWithNoSegmentGranularity()
{
// The umbrella interval of segments is 2015-02-12/2015-04-14
ClientCompactionIntervalSpec actual = ClientCompactionIntervalSpec.fromSegments(ImmutableList.of(dataSegment1, dataSegment2, dataSegment3), null);
Assert.assertEquals(Intervals.of("2015-02-12/2015-04-14"), actual.getInterval());
}
@Test
public void testFromSegmentWitSegmentGranularitySameAsSegment()
{
// The umbrella interval of segments is 2015-04-11/2015-04-12
ClientCompactionIntervalSpec actual = ClientCompactionIntervalSpec.fromSegments(ImmutableList.of(dataSegment1), Granularities.DAY);
Assert.assertEquals(Intervals.of("2015-04-11/2015-04-12"), actual.getInterval());
}
@Test
public void testFromSegmentWithCoarserSegmentGranularity()
{
// The umbrella interval of segments is 2015-02-12/2015-04-14
ClientCompactionIntervalSpec actual = ClientCompactionIntervalSpec.fromSegments(ImmutableList.of(dataSegment1, dataSegment2, dataSegment3), Granularities.YEAR);
// The compaction interval should be expanded to start of the year and end of the year to cover the segmentGranularity
Assert.assertEquals(Intervals.of("2015-01-01/2016-01-01"), actual.getInterval());
}
@Test
public void testFromSegmentWithFinerSegmentGranularityAndUmbrellaIntervalAlign()
{
// The umbrella interval of segments is 2015-02-12/2015-04-14
ClientCompactionIntervalSpec actual = ClientCompactionIntervalSpec.fromSegments(ImmutableList.of(dataSegment1, dataSegment2, dataSegment3), Granularities.DAY);
// The segmentGranularity of DAY align with the umbrella interval (umbrella interval can be evenly divide into the segmentGranularity)
Assert.assertEquals(Intervals.of("2015-02-12/2015-04-14"), actual.getInterval());
}
@Test
public void testFromSegmentWithFinerSegmentGranularityAndUmbrellaIntervalNotAlign()
{
// The umbrella interval of segments is 2015-02-12/2015-04-14
ClientCompactionIntervalSpec actual = ClientCompactionIntervalSpec.fromSegments(ImmutableList.of(dataSegment1, dataSegment2, dataSegment3), Granularities.WEEK);
// The segmentGranularity of WEEK does not align with the umbrella interval (umbrella interval cannot be evenly divide into the segmentGranularity)
// Hence the compaction interval is modified to aling with the segmentGranularity
Assert.assertEquals(Intervals.of("2015-02-09/2015-04-20"), actual.getInterval());
}
}

View File

@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
@ -338,8 +339,77 @@ public class HttpIndexingServiceClientTest
// Ignore IllegalStateException as taskId is internally generated and returned task id will failed check
Assert.assertEquals(IllegalStateException.class.getName(), e.getCause().getClass().getName());
}
ClientCompactionTaskQuery taskQuery = (ClientCompactionTaskQuery) captureTask.getValue();
Assert.assertEquals(Intervals.of("2015-04-12/2015-04-13"), taskQuery.getIoConfig().getInputSpec().getInterval());
Assert.assertNull(taskQuery.getGranularitySpec());
Assert.assertNull(taskQuery.getIoConfig().getInputSpec().getSha256OfSortedSegmentIds());
}
@Test
public void testCompactWithSegmentGranularity() throws Exception
{
DataSegment segment = new DataSegment(
"test",
Intervals.of("2015-04-12/2015-04-13"),
"1",
ImmutableMap.of("bucket", "bucket", "path", "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip"),
null,
null,
NoneShardSpec.instance(),
0,
1
);
Capture captureTask = EasyMock.newCapture();
HttpResponse response = EasyMock.createMock(HttpResponse.class);
EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0));
EasyMock.replay(response);
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
response,
StandardCharsets.UTF_8
).addChunk(jsonMapper.writeValueAsString(ImmutableMap.of("task", "aaa")));
EasyMock.expect(druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/task"))
.andReturn(new Request(HttpMethod.POST, new URL("http://localhost:8090/druid/indexer/v1/task")))
.anyTimes();
EasyMock.expect(druidLeaderClient.go(EasyMock.anyObject(Request.class)))
.andReturn(responseHolder)
.anyTimes();
EasyMock.expect(mockMapper.writeValueAsBytes(EasyMock.capture(captureTask)))
.andReturn(new byte[]{1, 2, 3})
.anyTimes();
EasyMock.expect(mockMapper.readValue(EasyMock.anyString(), EasyMock.eq(JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT)))
.andReturn(ImmutableMap.of())
.anyTimes();
EasyMock.replay(druidLeaderClient, mockMapper);
HttpIndexingServiceClient httpIndexingServiceClient = new HttpIndexingServiceClient(
mockMapper,
druidLeaderClient
);
try {
httpIndexingServiceClient.compactSegments(
"test-compact",
ImmutableList.of(segment),
50,
null,
new ClientCompactionTaskGranularitySpec(Granularities.YEAR, null, null),
null,
null,
null,
null,
null
);
}
catch (Exception e) {
// Ignore IllegalStateException as taskId is internally generated and returned task id will failed check
Assert.assertEquals(IllegalStateException.class.getName(), e.getCause().getClass().getName());
}
ClientCompactionTaskQuery taskQuery = (ClientCompactionTaskQuery) captureTask.getValue();
Assert.assertEquals(Intervals.of("2015-01-01/2016-01-01"), taskQuery.getIoConfig().getInputSpec().getInterval());
Assert.assertEquals(Granularities.YEAR, taskQuery.getGranularitySpec().getSegmentGranularity());
Assert.assertNull(taskQuery.getIoConfig().getInputSpec().getSha256OfSortedSegmentIds());
}