Remove config `druid.coordinator.compaction.skipLockedIntervals` (#14807)

The value of `druid.coordinator.compaction.skipLockedIntervals` should always be `true`.
This commit is contained in:
Kashif Faraz 2023-08-14 12:31:15 +05:30 committed by GitHub
parent 0dc305f9e4
commit 786e772d26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 32 additions and 137 deletions

View File

@ -612,7 +612,7 @@ public class DruidCoordinator
{
List<CompactSegments> compactSegmentsDutyFromCustomGroups = getCompactSegmentsDutyFromCustomGroups();
if (compactSegmentsDutyFromCustomGroups.isEmpty()) {
return new CompactSegments(config, compactionSegmentSearchPolicy, overlordClient);
return new CompactSegments(compactionSegmentSearchPolicy, overlordClient);
} else {
if (compactSegmentsDutyFromCustomGroups.size() > 1) {
log.warn(

View File

@ -131,10 +131,4 @@ public abstract class DruidCoordinatorConfig
return 1;
}
@Config("druid.coordinator.compaction.skipLockedIntervals")
public boolean getCompactionSkipLockedIntervals()
{
return true;
}
}

View File

@ -48,7 +48,6 @@ import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CompactionStatistics;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
@ -85,7 +84,6 @@ public class CompactSegments implements CoordinatorCustomDuty
status -> null != status && COMPACTION_TASK_TYPE.equals(status.getType());
private final CompactionSegmentSearchPolicy policy;
private final boolean skipLockedIntervals;
private final OverlordClient overlordClient;
// This variable is updated by the Coordinator thread executing duties and
@ -95,23 +93,13 @@ public class CompactSegments implements CoordinatorCustomDuty
@Inject
@JsonCreator
public CompactSegments(
@JacksonInject DruidCoordinatorConfig config,
@JacksonInject CompactionSegmentSearchPolicy policy,
@JacksonInject OverlordClient overlordClient
)
{
this.policy = policy;
this.overlordClient = overlordClient;
this.skipLockedIntervals = config.getCompactionSkipLockedIntervals();
resetCompactionSnapshot();
LOG.info("Scheduling compaction with skipLockedIntervals [%s]", skipLockedIntervals);
}
@VisibleForTesting
public boolean isSkipLockedIntervals()
{
return skipLockedIntervals;
}
@VisibleForTesting
@ -272,11 +260,6 @@ public class CompactSegments implements CoordinatorCustomDuty
List<DataSourceCompactionConfig> compactionConfigs
)
{
if (!skipLockedIntervals) {
LOG.info("Not skipping any locked interval for Compaction");
return new HashMap<>();
}
final Map<String, Integer> minTaskPriority = compactionConfigs
.stream()
.collect(

View File

@ -46,7 +46,6 @@ public class DruidCoordinatorConfigTest
Assert.assertEquals(7776000000L, config.getCoordinatorKillDurationToRetain().getMillis());
Assert.assertEquals(100, config.getCoordinatorKillMaxSegments());
Assert.assertEquals(new Duration(15 * 60 * 1000), config.getLoadTimeoutDelay());
Assert.assertTrue(config.getCompactionSkipLockedIntervals());
Assert.assertFalse(config.getCoordinatorKillIgnoreDurationToRetain());
Assert.assertEquals("http", config.getLoadQueuePeonType());
@ -62,7 +61,6 @@ public class DruidCoordinatorConfigTest
props.setProperty("druid.coordinator.kill.pendingSegments.on", "true");
props.setProperty("druid.coordinator.load.timeout", "PT1s");
props.setProperty("druid.coordinator.loadqueuepeon.repeatDelay", "PT0.100s");
props.setProperty("druid.coordinator.compaction.skipLockedIntervals", "false");
props.setProperty("druid.coordinator.kill.ignoreDurationToRetain", "true");
factory = Config.createFactory(props);
@ -75,7 +73,6 @@ public class DruidCoordinatorConfigTest
Assert.assertEquals(new Duration("PT1s"), config.getCoordinatorKillDurationToRetain());
Assert.assertEquals(10000, config.getCoordinatorKillMaxSegments());
Assert.assertEquals(new Duration("PT1s"), config.getLoadTimeoutDelay());
Assert.assertFalse(config.getCompactionSkipLockedIntervals());
Assert.assertTrue(config.getCoordinatorKillIgnoreDurationToRetain());
// Test negative druid.coordinator.kill.durationToRetain now that it is valid.

View File

@ -733,15 +733,11 @@ public class DruidCoordinatorTest extends CuratorTestBase
@Test
public void testInitializeCompactSegmentsDutyWhenCustomDutyGroupContainsCompactSegments()
{
DruidCoordinatorConfig differentConfigUsedInCustomGroup = new TestDruidCoordinatorConfig.Builder()
.withCoordinatorStartDelay(new Duration(COORDINATOR_START_DELAY))
.withCoordinatorPeriod(new Duration(COORDINATOR_PERIOD))
.withCoordinatorKillPeriod(new Duration(COORDINATOR_PERIOD))
.withCoordinatorKillMaxSegments(10)
.withCompactionSkippedLockedIntervals(false)
.withCoordinatorKillIgnoreDurationToRetain(false)
.build();
CoordinatorCustomDutyGroup compactSegmentCustomGroup = new CoordinatorCustomDutyGroup("group1", Duration.standardSeconds(1), ImmutableList.of(new CompactSegments(differentConfigUsedInCustomGroup, null, null)));
CoordinatorCustomDutyGroup compactSegmentCustomGroup = new CoordinatorCustomDutyGroup(
"group1",
Duration.standardSeconds(1),
ImmutableList.of(new CompactSegments(null, null))
);
CoordinatorCustomDutyGroups customDutyGroups = new CoordinatorCustomDutyGroups(ImmutableSet.of(compactSegmentCustomGroup));
coordinator = new DruidCoordinator(
druidCoordinatorConfig,
@ -777,9 +773,6 @@ public class DruidCoordinatorTest extends CuratorTestBase
// CompactSegments returned by this method should be from the Custom Duty Group
CompactSegments duty = coordinator.initializeCompactSegmentsDuty(newestSegmentFirstPolicy);
Assert.assertNotNull(duty);
Assert.assertNotEquals(druidCoordinatorConfig.getCompactionSkipLockedIntervals(), duty.isSkipLockedIntervals());
// We should get the CompactSegment from the custom duty group which was created with a different config than the config in DruidCoordinator
Assert.assertEquals(differentConfigUsedInCustomGroup.getCompactionSkipLockedIntervals(), duty.isSkipLockedIntervals());
}
@Test(timeout = 3000)

View File

@ -40,7 +40,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
private final Duration coordinatorDatasourceKillPeriod;
private final Duration coordinatorDatasourceKillDurationToRetain;
private final int coordinatorKillMaxSegments;
private final boolean compactionSkipLockedIntervals;
private final boolean coordinatorKillIgnoreDurationToRetain;
private final String loadQueuePeonType;
private final Duration httpLoadQueuePeonRepeatDelay;
@ -66,7 +65,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
Duration coordinatorDatasourceKillPeriod,
Duration coordinatorDatasourceKillDurationToRetain,
int coordinatorKillMaxSegments,
boolean compactionSkipLockedIntervals,
boolean coordinatorKillIgnoreDurationToRetain,
String loadQueuePeonType,
Duration httpLoadQueuePeonRepeatDelay,
@ -92,7 +90,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
this.coordinatorDatasourceKillPeriod = coordinatorDatasourceKillPeriod;
this.coordinatorDatasourceKillDurationToRetain = coordinatorDatasourceKillDurationToRetain;
this.coordinatorKillMaxSegments = coordinatorKillMaxSegments;
this.compactionSkipLockedIntervals = compactionSkipLockedIntervals;
this.coordinatorKillIgnoreDurationToRetain = coordinatorKillIgnoreDurationToRetain;
this.loadQueuePeonType = loadQueuePeonType;
this.httpLoadQueuePeonRepeatDelay = httpLoadQueuePeonRepeatDelay;
@ -203,12 +200,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
return loadTimeoutDelay == null ? super.getLoadTimeoutDelay() : loadTimeoutDelay;
}
@Override
public boolean getCompactionSkipLockedIntervals()
{
return compactionSkipLockedIntervals;
}
@Override
public boolean getCoordinatorKillIgnoreDurationToRetain()
{
@ -268,7 +259,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
private static final Duration DEFAULT_HTTP_LOAD_QUEUE_PEON_REPEAT_DELAY = Duration.millis(60000);
private static final Duration DEFAULT_HTTP_LOAD_QUEUE_PEON_HOST_TIMEOUT = Duration.millis(300000);
private static final int DEFAULT_HTTP_LOAD_QUEUE_PEON_BATCH_SIZE = 1;
private static final boolean DEFAULT_COMPACTION_SKIP_LOCKED_INTERVALS = true;
private static final Duration DEFAULT_COORDINATOR_AUDIT_KILL_PERIOD = new Duration("PT86400s");
private static final Duration DEFAULT_COORDINATOR_AUTIT_KILL_DURATION_TO_RETAIN = new Duration("PT7776000s");
@ -294,7 +284,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
private Integer curatorLoadQueuePeonNumCallbackThreads;
private Duration httpLoadQueuePeonHostTimeout;
private Integer httpLoadQueuePeonBatchSize;
private Boolean compactionSkippedLockedIntervals;
private Duration coordinatorAuditKillPeriod;
private Duration coordinatorAuditKillDurationToRetain;
@ -428,12 +417,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
return this;
}
public Builder withCompactionSkippedLockedIntervals(boolean compactionSkippedLockedIntervals)
{
this.compactionSkippedLockedIntervals = compactionSkippedLockedIntervals;
return this;
}
public Builder withCoordianatorAuditKillPeriod(Duration coordinatorAuditKillPeriod)
{
this.coordinatorAuditKillPeriod = coordinatorAuditKillPeriod;
@ -466,7 +449,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
coordinatorDatasourceKillPeriod == null ? DEFAULT_COORDINATOR_DATASOURCE_KILL_PERIOD : coordinatorDatasourceKillPeriod,
coordinatorDatasourceKillDurationToRetain == null ? DEFAULT_COORDINATOR_DATASOURCE_KILL_DURATION_TO_RETAIN : coordinatorDatasourceKillDurationToRetain,
coordinatorKillMaxSegments == null ? DEFAULT_COORDINATOR_KILL_MAX_SEGMENTS : coordinatorKillMaxSegments,
compactionSkippedLockedIntervals == null ? DEFAULT_COMPACTION_SKIP_LOCKED_INTERVALS : compactionSkippedLockedIntervals,
coordinatorKillIgnoreDurationToRetain == null ? DEFAULT_COORDINATOR_KILL_IGNORE_DURATION_TO_RETAIN : coordinatorKillIgnoreDurationToRetain,
loadQueuePeonType == null ? DEFAULT_LOAD_QUEUE_PEON_TYPE : loadQueuePeonType,
httpLoadQueuePeonRepeatDelay == null ? DEFAULT_HTTP_LOAD_QUEUE_PEON_REPEAT_DELAY : httpLoadQueuePeonRepeatDelay,

View File

@ -192,7 +192,6 @@ public class CompactSegmentsTest
}
}
dataSources = DataSourcesSnapshot.fromUsedSegments(allSegments, ImmutableMap.of());
Mockito.when(COORDINATOR_CONFIG.getCompactionSkipLockedIntervals()).thenReturn(true);
}
private DataSegment createSegment(String dataSource, int startDay, boolean beforeNoon, int partition)
@ -238,12 +237,11 @@ public class CompactSegmentsTest
.addValue(CompactionSegmentSearchPolicy.class, SEARCH_POLICY)
);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, overlordClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, overlordClient);
String compactSegmentString = JSON_MAPPER.writeValueAsString(compactSegments);
CompactSegments serdeCompactSegments = JSON_MAPPER.readValue(compactSegmentString, CompactSegments.class);
Assert.assertNotNull(serdeCompactSegments);
Assert.assertEquals(COORDINATOR_CONFIG.getCompactionSkipLockedIntervals(), serdeCompactSegments.isSkipLockedIntervals());
Assert.assertSame(overlordClient, serdeCompactSegments.getOverlordClient());
}
@ -251,7 +249,7 @@ public class CompactSegmentsTest
public void testRun()
{
final TestOverlordClient overlordClient = new TestOverlordClient(JSON_MAPPER);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, overlordClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, overlordClient);
final Supplier<String> expectedVersionSupplier = new Supplier<String>()
{
@ -327,7 +325,7 @@ public class CompactSegmentsTest
public void testMakeStats()
{
final TestOverlordClient overlordClient = new TestOverlordClient(JSON_MAPPER);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, overlordClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, overlordClient);
// Before any compaction, we do not have any snapshot of compactions
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
@ -421,7 +419,7 @@ public class CompactSegmentsTest
dataSources = DataSourcesSnapshot.fromUsedSegments(segments, ImmutableMap.of());
final TestOverlordClient overlordClient = new TestOverlordClient(JSON_MAPPER);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, overlordClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, overlordClient);
// Before any compaction, we do not have any snapshot of compactions
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
@ -483,7 +481,7 @@ public class CompactSegmentsTest
public void testMakeStatsWithDeactivatedDatasource()
{
final TestOverlordClient overlordClient = new TestOverlordClient(JSON_MAPPER);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, overlordClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, overlordClient);
// Before any compaction, we do not have any snapshot of compactions
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
@ -575,7 +573,7 @@ public class CompactSegmentsTest
dataSources = DataSourcesSnapshot.fromUsedSegments(segments, ImmutableMap.of());
final TestOverlordClient overlordClient = new TestOverlordClient(JSON_MAPPER);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, overlordClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, overlordClient);
// Before any compaction, we do not have any snapshot of compactions
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
@ -634,7 +632,7 @@ public class CompactSegmentsTest
public void testRunMultipleCompactionTaskSlots()
{
final TestOverlordClient overlordClient = new TestOverlordClient(JSON_MAPPER);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, overlordClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, overlordClient);
final CoordinatorRunStats stats = doCompactSegments(compactSegments, 3);
Assert.assertEquals(3, stats.get(Stats.Compaction.AVAILABLE_SLOTS));
@ -648,7 +646,7 @@ public class CompactSegmentsTest
int maxCompactionSlot = 3;
Assert.assertTrue(maxCompactionSlot < MAXIMUM_CAPACITY_WITH_AUTO_SCALE);
final TestOverlordClient overlordClient = new TestOverlordClient(JSON_MAPPER);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, overlordClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, overlordClient);
final CoordinatorRunStats stats =
doCompactSegments(compactSegments, createCompactionConfigs(), maxCompactionSlot, true);
Assert.assertEquals(maxCompactionSlot, stats.get(Stats.Compaction.AVAILABLE_SLOTS));
@ -662,7 +660,7 @@ public class CompactSegmentsTest
int maxCompactionSlot = 100;
Assert.assertFalse(maxCompactionSlot < MAXIMUM_CAPACITY_WITH_AUTO_SCALE);
final TestOverlordClient overlordClient = new TestOverlordClient(JSON_MAPPER);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, overlordClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, overlordClient);
final CoordinatorRunStats stats =
doCompactSegments(compactSegments, createCompactionConfigs(), maxCompactionSlot, true);
Assert.assertEquals(MAXIMUM_CAPACITY_WITH_AUTO_SCALE, stats.get(Stats.Compaction.AVAILABLE_SLOTS));
@ -675,7 +673,7 @@ public class CompactSegmentsTest
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@ -732,7 +730,7 @@ public class CompactSegmentsTest
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@ -781,7 +779,7 @@ public class CompactSegmentsTest
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@ -830,7 +828,7 @@ public class CompactSegmentsTest
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@ -890,7 +888,7 @@ public class CompactSegmentsTest
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@ -942,7 +940,7 @@ public class CompactSegmentsTest
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@ -991,7 +989,7 @@ public class CompactSegmentsTest
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@ -1101,7 +1099,7 @@ public class CompactSegmentsTest
Mockito.when(mockClient.taskPayload(ArgumentMatchers.eq(conflictTaskId)))
.thenReturn(Futures.immediateFuture(runningConflictCompactionTaskPayload));
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
compactionConfigs.add(
new DataSourceCompactionConfig(
@ -1163,7 +1161,7 @@ public class CompactSegmentsTest
public void testRunParallelCompactionMultipleCompactionTaskSlots()
{
final TestOverlordClient overlordClient = new TestOverlordClient(JSON_MAPPER);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, overlordClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, overlordClient);
final CoordinatorRunStats stats = doCompactSegments(compactSegments, createCompactionConfigs(2), 4);
Assert.assertEquals(4, stats.get(Stats.Compaction.AVAILABLE_SLOTS));
@ -1195,7 +1193,7 @@ public class CompactSegmentsTest
// Verify that locked intervals are skipped and only one compaction task
// is submitted for dataSource_0
CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, overlordClient);
CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, overlordClient);
final CoordinatorRunStats stats =
doCompactSegments(compactSegments, createCompactionConfigs(2), 4);
Assert.assertEquals(1, stats.get(Stats.Compaction.SUBMITTED_TASKS));
@ -1215,7 +1213,7 @@ public class CompactSegmentsTest
NullHandling.initializeForTests();
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@ -1265,7 +1263,7 @@ public class CompactSegmentsTest
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@ -1317,7 +1315,7 @@ public class CompactSegmentsTest
AggregatorFactory[] aggregatorFactories = new AggregatorFactory[] {new CountAggregatorFactory("cnt")};
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@ -1363,58 +1361,6 @@ public class CompactSegmentsTest
Assert.assertArrayEquals(aggregatorFactories, actual);
}
@Test
public void testRunWithLockedIntervalsNoSkip()
{
Mockito.when(COORDINATOR_CONFIG.getCompactionSkipLockedIntervals()).thenReturn(false);
final TestOverlordClient overlordClient = new TestOverlordClient(JSON_MAPPER);
// Lock all intervals for all the dataSources
final String datasource0 = DATA_SOURCE_PREFIX + 0;
overlordClient.lockedIntervals
.computeIfAbsent(datasource0, k -> new ArrayList<>())
.add(Intervals.of("2017/2018"));
final String datasource1 = DATA_SOURCE_PREFIX + 1;
overlordClient.lockedIntervals
.computeIfAbsent(datasource1, k -> new ArrayList<>())
.add(Intervals.of("2017/2018"));
final String datasource2 = DATA_SOURCE_PREFIX + 2;
overlordClient.lockedIntervals
.computeIfAbsent(datasource2, k -> new ArrayList<>())
.add(Intervals.of("2017/2018"));
// Verify that no locked intervals are skipped
CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, overlordClient);
int maxTaskSlots = partitionsSpec instanceof SingleDimensionPartitionsSpec ? 5 : 3;
final CoordinatorRunStats stats = doCompactSegments(compactSegments, createCompactionConfigs(1), maxTaskSlots);
Assert.assertEquals(3, stats.get(Stats.Compaction.SUBMITTED_TASKS));
Assert.assertEquals(3, overlordClient.submittedCompactionTasks.size());
overlordClient.submittedCompactionTasks.forEach(task -> {
System.out.println(task.getDataSource() + " : " + task.getIoConfig().getInputSpec().getInterval());
});
// Verify that tasks are submitted for the latest interval of each dataSource
final Map<String, Interval> datasourceToInterval = new HashMap<>();
overlordClient.submittedCompactionTasks.forEach(
task -> datasourceToInterval.put(
task.getDataSource(), task.getIoConfig().getInputSpec().getInterval()));
Assert.assertEquals(
Intervals.of("2017-01-09T00:00:00Z/2017-01-09T12:00:00Z"),
datasourceToInterval.get(datasource0)
);
Assert.assertEquals(
Intervals.of("2017-01-09T00:00:00Z/2017-01-09T12:00:00Z"),
datasourceToInterval.get(datasource1)
);
Assert.assertEquals(
Intervals.of("2017-01-09T00:00:00Z/2017-01-09T12:00:00Z"),
datasourceToInterval.get(datasource2)
);
}
@Test
public void testDetermineSegmentGranularityFromSegmentsToCompact()
{
@ -1450,7 +1396,7 @@ public class CompactSegmentsTest
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
compactionConfigs.add(
new DataSourceCompactionConfig(
@ -1536,7 +1482,7 @@ public class CompactSegmentsTest
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
compactionConfigs.add(
new DataSourceCompactionConfig(
@ -1592,7 +1538,7 @@ public class CompactSegmentsTest
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@ -1644,7 +1590,7 @@ public class CompactSegmentsTest
{
final OverlordClient mockClient = Mockito.mock(OverlordClient.class);
final ArgumentCaptor<Object> payloadCaptor = setUpMockClient(mockClient);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockClient);
final CompactSegments compactSegments = new CompactSegments(SEARCH_POLICY, mockClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(