CircularList round-robin iterator for the KillUnusedSegments duty (#16719)

* Round-robin iterator for datasources to kill.

Currently there's a fairness problem in the KillUnusedSegments duty
where the duty consistently selects the same set of datasources as discovered
from the metadata store or dynamic config params. This is a problem especially
when there are multiple unused. In a medium to large cluster, while we can increase
the task slots to increase the likelihood of broader coverage. This patch adds a simple
round-robin iterator to select datasources and has the following properties:

1. Starts with an initial random cursor position in an ordered list of candidates.
2. Consecutive {@code next()} iterations from {@link #getIterator()} are guaranteed to be deterministic
   unless the set of candidates change when {@link #updateCandidates(Set)} is called.
3. Guarantees that no duplicate candidates are returned in two consecutive {@code next()} iterations.

* Renames in RoundRobinIteratorTest.

* Address review comments.

1. Clarify javadocs on the ordered list. Also flesh out the details a bit more.
2. Rename the test hooks to make intent clearer and fix typo.
3. Add NotThreadSafe annotation.
4. Remove one potentially noisy log that's in the path of iteration.

* Add null check to input candidates.

* More commentary.

* Addres review feedback: downgrade some new info logs to debug; invert condition.

Remove redundant comments.
Remove rendundant variable tracking.

* CircularList adjustments.

* Updates to CircularList and cleanup RoundRobinInterator.

* One more case and add more tests.

* Make advanceCursor private for now.

* Review comments.
This commit is contained in:
Abhishek Radhakrishnan 2024-07-26 12:20:49 -07:00 committed by GitHub
parent 9b76d13ff8
commit 3c493dc3ed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 477 additions and 37 deletions

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.collections;
import javax.annotation.concurrent.NotThreadSafe;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
/**
* A circular list that is backed by an ordered list of elements containing no duplicates. The list is ordered by the
* supplied comparator. The iterator keeps track of the current position, so iterating the list multiple times will
* resume from the last location and continue until a caller explicitly terminates it.
* <p>
* This class is not thread-safe and must be used from a single thread.
*/
@NotThreadSafe
public class CircularList<T> implements Iterable<T>
{
private final List<T> elements = new ArrayList<>();
private int currentPosition;
public CircularList(final Set<T> elements, final Comparator<? super T> comparator)
{
this.elements.addAll(elements);
this.elements.sort(comparator);
this.currentPosition = -1;
}
@Override
public Iterator<T> iterator()
{
return new Iterator<T>()
{
@Override
public boolean hasNext()
{
return elements.size() > 0;
}
@Override
public T next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
advanceCursor();
return elements.get(currentPosition);
}
private void advanceCursor()
{
if (++currentPosition >= elements.size()) {
currentPosition = 0;
}
}
};
}
/**
* @return true if the supplied set is equal to the set used to instantiate this circular list, otherwise false.
*/
public boolean equalsSet(final Set<T> inputSet)
{
return new HashSet<>(elements).equals(inputSet);
}
}

View File

@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.collections;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
public class CircularListTest
{
@Test
public void testIterateInNaturalOrder()
{
final Set<String> input = ImmutableSet.of("b", "a", "c");
final CircularList<String> circularList = new CircularList<>(input, Comparator.naturalOrder());
final List<String> observedElements = new ArrayList<>();
int cnt = 0;
for (String x : circularList) {
observedElements.add(x);
if (++cnt >= input.size()) {
break;
}
}
Assert.assertEquals(ImmutableList.of("a", "b", "c"), observedElements);
}
@Test
public void testIterateInReverseOrder()
{
final Set<Integer> input = ImmutableSet.of(-1, 100, 0, -4);
final CircularList<Integer> circularList = new CircularList<>(input, Comparator.reverseOrder());
final List<Integer> observedElements = new ArrayList<>();
int cnt = 0;
for (Integer x : circularList) {
observedElements.add(x);
if (++cnt >= 2 * input.size()) {
break;
}
}
Assert.assertEquals(ImmutableList.of(100, 0, -1, -4, 100, 0, -1, -4), observedElements);
}
@Test
public void testIteratorResumesFromLastPosition()
{
final Set<String> input = ImmutableSet.of("a", "b", "c", "d", "e", "f");
final CircularList<String> circularList = new CircularList<>(input, Comparator.naturalOrder());
List<String> observedElements = new ArrayList<>();
int cnt = 0;
for (String element : circularList) {
observedElements.add(element);
if (++cnt >= input.size() / 2) {
break;
}
}
Assert.assertEquals(ImmutableList.of("a", "b", "c"), observedElements);
observedElements = new ArrayList<>();
for (String element : circularList) {
observedElements.add(element);
// Resume and go till the end, and add two more elements looping around
if (++cnt == input.size() + 2) {
break;
}
}
Assert.assertEquals(ImmutableList.of("d", "e", "f", "a", "b"), observedElements);
}
@Test
public void testEqualsSet()
{
final Set<String> input = ImmutableSet.of("a", "b", "c");
final CircularList<String> circularList = new CircularList<>(input, Comparator.naturalOrder());
Assert.assertTrue(circularList.equalsSet(ImmutableSet.of("b", "a", "c")));
Assert.assertFalse(circularList.equalsSet(ImmutableSet.of("c")));
Assert.assertFalse(circularList.equalsSet(ImmutableSet.of("a", "c")));
}
@Test
public void testEmptyIterator()
{
final Set<String> input = ImmutableSet.of();
final CircularList<String> circularList = new CircularList<>(input, Comparator.naturalOrder());
final List<String> observedElements = new ArrayList<>();
int cnt = 0;
for (String x : circularList) {
observedElements.add(x);
if (++cnt >= input.size()) {
break;
}
}
Assert.assertEquals(ImmutableList.of(), observedElements);
}
@Test
public void testNextOnEmptyIteratorThrowsException()
{
final Set<String> input = ImmutableSet.of();
final CircularList<String> circularList = new CircularList<>(input, Comparator.naturalOrder());
final Iterator<String> iterator = circularList.iterator();
Assert.assertFalse(iterator.hasNext());
Assert.assertThrows(NoSuchElementException.class, iterator::next);
}
}

View File

@ -201,7 +201,7 @@ public interface SegmentsMetadataManager
*/ */
List<Interval> getUnusedSegmentIntervals( List<Interval> getUnusedSegmentIntervals(
String dataSource, String dataSource,
DateTime minStartTime, @Nullable DateTime minStartTime,
DateTime maxEndTime, DateTime maxEndTime,
int limit, int limit,
DateTime maxUsedStatusLastUpdatedTime DateTime maxUsedStatusLastUpdatedTime

View File

@ -20,6 +20,8 @@
package org.apache.druid.server.coordinator.duty; package org.apache.druid.server.coordinator.duty;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.collect.Sets;
import org.apache.druid.collections.CircularList;
import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
@ -40,10 +42,11 @@ import org.joda.time.Duration;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.ArrayList; import java.util.Comparator;
import java.util.Collection; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/** /**
@ -58,6 +61,11 @@ import java.util.concurrent.ConcurrentHashMap;
* as there can be multiple unused segments with different {@code used_status_last_updated} time. * as there can be multiple unused segments with different {@code used_status_last_updated} time.
* </p> * </p>
* <p> * <p>
* The datasources to be killed during each cycle are selected from {@link #datasourceCircularKillList}. This state is
* refreshed in a run if the set of datasources to be killed changes. Consecutive duplicate datasources are avoided
* across runs, provided there are other datasources to be killed.
* </p>
* <p>
* See {@link org.apache.druid.indexing.common.task.KillUnusedSegmentsTask}. * See {@link org.apache.druid.indexing.common.task.KillUnusedSegmentsTask}.
* </p> * </p>
*/ */
@ -75,18 +83,22 @@ public class KillUnusedSegments implements CoordinatorDuty
private final Duration durationToRetain; private final Duration durationToRetain;
private final boolean ignoreDurationToRetain; private final boolean ignoreDurationToRetain;
private final int maxSegmentsToKill; private final int maxSegmentsToKill;
private final Duration bufferPeriod;
/** /**
* Used to keep track of the last interval end time that was killed for each * Used to keep track of the last interval end time that was killed for each
* datasource. * datasource.
*/ */
private final Map<String, DateTime> datasourceToLastKillIntervalEnd; private final Map<String, DateTime> datasourceToLastKillIntervalEnd;
private DateTime lastKillTime; private DateTime lastKillTime;
private final Duration bufferPeriod;
private final SegmentsMetadataManager segmentsMetadataManager; private final SegmentsMetadataManager segmentsMetadataManager;
private final OverlordClient overlordClient; private final OverlordClient overlordClient;
private String prevDatasourceKilled;
private CircularList<String> datasourceCircularKillList;
public KillUnusedSegments( public KillUnusedSegments(
SegmentsMetadataManager segmentsMetadataManager, SegmentsMetadataManager segmentsMetadataManager,
OverlordClient overlordClient, OverlordClient overlordClient,
@ -94,7 +106,6 @@ public class KillUnusedSegments implements CoordinatorDuty
) )
{ {
this.period = killConfig.getCleanupPeriod(); this.period = killConfig.getCleanupPeriod();
this.maxSegmentsToKill = killConfig.getMaxSegments(); this.maxSegmentsToKill = killConfig.getMaxSegments();
this.ignoreDurationToRetain = killConfig.isIgnoreDurationToRetain(); this.ignoreDurationToRetain = killConfig.isIgnoreDurationToRetain();
this.durationToRetain = killConfig.getDurationToRetain(); this.durationToRetain = killConfig.getDurationToRetain();
@ -107,8 +118,6 @@ public class KillUnusedSegments implements CoordinatorDuty
} }
this.bufferPeriod = killConfig.getBufferPeriod(); this.bufferPeriod = killConfig.getBufferPeriod();
datasourceToLastKillIntervalEnd = new ConcurrentHashMap<>();
log.info( log.info(
"Kill task scheduling enabled with period[%s], durationToRetain[%s], bufferPeriod[%s], maxSegmentsToKill[%s]", "Kill task scheduling enabled with period[%s], durationToRetain[%s], bufferPeriod[%s], maxSegmentsToKill[%s]",
this.period, this.period,
@ -119,6 +128,7 @@ public class KillUnusedSegments implements CoordinatorDuty
this.segmentsMetadataManager = segmentsMetadataManager; this.segmentsMetadataManager = segmentsMetadataManager;
this.overlordClient = overlordClient; this.overlordClient = overlordClient;
this.datasourceToLastKillIntervalEnd = new ConcurrentHashMap<>();
} }
@Override @Override
@ -141,18 +151,27 @@ public class KillUnusedSegments implements CoordinatorDuty
final CoordinatorRunStats stats = params.getCoordinatorStats(); final CoordinatorRunStats stats = params.getCoordinatorStats();
final int availableKillTaskSlots = getAvailableKillTaskSlots(dynamicConfig, stats); final int availableKillTaskSlots = getAvailableKillTaskSlots(dynamicConfig, stats);
Collection<String> dataSourcesToKill = dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn(); if (availableKillTaskSlots <= 0) {
log.debug("Skipping KillUnusedSegments because there are no available kill task slots.");
if (availableKillTaskSlots > 0) { return params;
// If no datasource has been specified, all are eligible for killing unused segments
if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
dataSourcesToKill = segmentsMetadataManager.retrieveAllDataSourceNames();
}
lastKillTime = DateTimes.nowUtc();
killUnusedSegments(dataSourcesToKill, availableKillTaskSlots, stats);
} }
final Set<String> dataSourcesToKill;
if (CollectionUtils.isNullOrEmpty(dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn())) {
dataSourcesToKill = segmentsMetadataManager.retrieveAllDataSourceNames();
} else {
dataSourcesToKill = dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn();
}
if (datasourceCircularKillList == null ||
!datasourceCircularKillList.equalsSet(dataSourcesToKill)) {
datasourceCircularKillList = new CircularList<>(dataSourcesToKill, Comparator.naturalOrder());
}
lastKillTime = DateTimes.nowUtc();
killUnusedSegments(dataSourcesToKill, availableKillTaskSlots, stats);
// any datasources that are no longer being considered for kill should have their // any datasources that are no longer being considered for kill should have their
// last kill interval removed from map. // last kill interval removed from map.
datasourceToLastKillIntervalEnd.keySet().retainAll(dataSourcesToKill); datasourceToLastKillIntervalEnd.keySet().retainAll(dataSourcesToKill);
@ -163,30 +182,37 @@ public class KillUnusedSegments implements CoordinatorDuty
* Spawn kill tasks for each datasource in {@code dataSourcesToKill} upto {@code availableKillTaskSlots}. * Spawn kill tasks for each datasource in {@code dataSourcesToKill} upto {@code availableKillTaskSlots}.
*/ */
private void killUnusedSegments( private void killUnusedSegments(
@Nullable final Collection<String> dataSourcesToKill, final Set<String> dataSourcesToKill,
final int availableKillTaskSlots, final int availableKillTaskSlots,
final CoordinatorRunStats stats final CoordinatorRunStats stats
) )
{ {
if (CollectionUtils.isNullOrEmpty(dataSourcesToKill) || availableKillTaskSlots <= 0) { if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
log.debug("Skipping KillUnusedSegments because there are no datasources to kill.");
stats.add(Stats.Kill.SUBMITTED_TASKS, 0); stats.add(Stats.Kill.SUBMITTED_TASKS, 0);
return; return;
} }
final Collection<String> remainingDatasourcesToKill = new ArrayList<>(dataSourcesToKill); final Set<String> remainingDatasourcesToKill = new HashSet<>(dataSourcesToKill);
int submittedTasks = 0; int submittedTasks = 0;
for (String dataSource : dataSourcesToKill) { for (String dataSource : datasourceCircularKillList) {
if (submittedTasks >= availableKillTaskSlots) { if (dataSource.equals(prevDatasourceKilled) && remainingDatasourcesToKill.size() > 1) {
log.info( // Skip this dataSource if it's the same as the previous one and there are remaining datasources to kill.
"Submitted [%d] kill tasks and reached kill task slot limit [%d].", continue;
submittedTasks, availableKillTaskSlots } else {
); prevDatasourceKilled = dataSource;
break; remainingDatasourcesToKill.remove(dataSource);
} }
final DateTime maxUsedStatusLastUpdatedTime = DateTimes.nowUtc().minus(bufferPeriod); final DateTime maxUsedStatusLastUpdatedTime = DateTimes.nowUtc().minus(bufferPeriod);
final Interval intervalToKill = findIntervalForKill(dataSource, maxUsedStatusLastUpdatedTime, stats); final Interval intervalToKill = findIntervalForKill(dataSource, maxUsedStatusLastUpdatedTime, stats);
if (intervalToKill == null) { if (intervalToKill == null) {
datasourceToLastKillIntervalEnd.remove(dataSource); datasourceToLastKillIntervalEnd.remove(dataSource);
// If no interval is found for this datasource, either terminate or continue based on remaining datasources to kill.
if (remainingDatasourcesToKill.isEmpty()) {
break;
}
continue; continue;
} }
@ -204,7 +230,11 @@ public class KillUnusedSegments implements CoordinatorDuty
); );
++submittedTasks; ++submittedTasks;
datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd()); datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd());
remainingDatasourcesToKill.remove(dataSource);
// Termination conditions.
if (remainingDatasourcesToKill.isEmpty() || submittedTasks >= availableKillTaskSlots) {
break;
}
} }
catch (Exception ex) { catch (Exception ex) {
log.error(ex, "Failed to submit kill task for dataSource[%s] in interval[%s]", dataSource, intervalToKill); log.error(ex, "Failed to submit kill task for dataSource[%s] in interval[%s]", dataSource, intervalToKill);
@ -216,8 +246,12 @@ public class KillUnusedSegments implements CoordinatorDuty
} }
log.info( log.info(
"Submitted [%d] kill tasks for [%d] datasources. Remaining datasources to kill: %s", "Submitted [%d] kill tasks for [%d] datasources: [%s]. Remaining [%d] datasources to kill: [%s].",
submittedTasks, dataSourcesToKill.size() - remainingDatasourcesToKill.size(), remainingDatasourcesToKill submittedTasks,
dataSourcesToKill.size() - remainingDatasourcesToKill.size(),
Sets.difference(dataSourcesToKill, remainingDatasourcesToKill),
remainingDatasourcesToKill.size(),
remainingDatasourcesToKill
); );
stats.add(Stats.Kill.SUBMITTED_TASKS, submittedTasks); stats.add(Stats.Kill.SUBMITTED_TASKS, submittedTasks);
@ -230,13 +264,14 @@ public class KillUnusedSegments implements CoordinatorDuty
final CoordinatorRunStats stats final CoordinatorRunStats stats
) )
{ {
final DateTime minStartTime = datasourceToLastKillIntervalEnd.get(dataSource);
final DateTime maxEndTime = ignoreDurationToRetain final DateTime maxEndTime = ignoreDurationToRetain
? DateTimes.COMPARE_DATE_AS_STRING_MAX ? DateTimes.COMPARE_DATE_AS_STRING_MAX
: DateTimes.nowUtc().minus(durationToRetain); : DateTimes.nowUtc().minus(durationToRetain);
final List<Interval> unusedSegmentIntervals = segmentsMetadataManager.getUnusedSegmentIntervals( final List<Interval> unusedSegmentIntervals = segmentsMetadataManager.getUnusedSegmentIntervals(
dataSource, dataSource,
datasourceToLastKillIntervalEnd.get(dataSource), minStartTime,
maxEndTime, maxEndTime,
maxSegmentsToKill, maxSegmentsToKill,
maxUsedStatusLastUpdatedTime maxUsedStatusLastUpdatedTime

View File

@ -225,6 +225,151 @@ public class KillUnusedSegmentsTest
validateLastKillStateAndReset(DS2, null); validateLastKillStateAndReset(DS2, null);
} }
/**
* Set up multiple datasources {@link #DS1}, {@link #DS2} and {@link #DS3} with unused segments with 2 kill task
* slots. Running the kill duty each time should pick at least one unique datasource in a round-robin manner.
*/
@Test
public void testRoundRobinKillMultipleDatasources()
{
configBuilder.withIgnoreDurationToRetain(true)
.withMaxSegmentsToKill(2);
dynamicConfigBuilder.withMaxKillTaskSlots(2);
createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1));
createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(1));
createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(1));
createAndAddUnusedSegment(DS1, NEXT_DAY, VERSION, NOW.minusDays(1));
createAndAddUnusedSegment(DS1, NEXT_MONTH, VERSION, NOW.minusDays(1));
createAndAddUnusedSegment(DS2, YEAR_OLD, VERSION, NOW.minusDays(1));
createAndAddUnusedSegment(DS2, DAY_OLD, VERSION, NOW.minusDays(1));
createAndAddUnusedSegment(DS2, NEXT_DAY, VERSION, NOW.minusDays(1));
createAndAddUnusedSegment(DS3, YEAR_OLD, VERSION, NOW.minusDays(1));
createAndAddUnusedSegment(DS3, DAY_OLD, VERSION, NOW.minusDays(1));
createAndAddUnusedSegment(DS3, NEXT_DAY, VERSION, NOW.minusDays(1));
initDuty();
CoordinatorRunStats stats = runDutyAndGetStats();
Assert.assertEquals(2, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(2, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
stats = runDutyAndGetStats();
Assert.assertEquals(4, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(4, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(4, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS3_STAT_KEY));
Assert.assertEquals(4, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
stats = runDutyAndGetStats();
Assert.assertEquals(6, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(6, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(6, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS3_STAT_KEY));
stats = runDutyAndGetStats();
Assert.assertEquals(8, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(7, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(8, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(5, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
}
/**
* The set of datasources to kill change in consecutive runs. The kill duty should avoid selecting two
* consecutive datasources across runs as long as there are other datasources to kill.
*/
@Test
public void testRoundRobinKillWhenDatasourcesChange()
{
configBuilder.withIgnoreDurationToRetain(true)
.withMaxSegmentsToKill(2);
dynamicConfigBuilder.withMaxKillTaskSlots(1);
createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1));
createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(1));
createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(1));
initDuty();
CoordinatorRunStats stats = runDutyAndGetStats();
Assert.assertEquals(1, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(1, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
validateLastKillStateAndReset(DS1, new Interval(YEAR_OLD.getStart(), MONTH_OLD.getEnd()));
createAndAddUnusedSegment(DS2, YEAR_OLD, VERSION, NOW.minusDays(1));
createAndAddUnusedSegment(DS2, DAY_OLD, VERSION, NOW.minusDays(1));
createAndAddUnusedSegment(DS2, NEXT_DAY, VERSION, NOW.minusDays(1));
stats = runDutyAndGetStats();
Assert.assertEquals(2, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(2, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
stats = runDutyAndGetStats();
Assert.assertEquals(3, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(3, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(3, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
stats = runDutyAndGetStats();
Assert.assertEquals(4, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(4, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(4, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
}
@Test
public void testKillSingleDatasourceMultipleRuns()
{
configBuilder.withIgnoreDurationToRetain(true)
.withMaxSegmentsToKill(2);
dynamicConfigBuilder.withMaxKillTaskSlots(2);
createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1));
createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(1));
createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(1));
initDuty();
CoordinatorRunStats stats = runDutyAndGetStats();
Assert.assertEquals(2, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(2, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
stats = runDutyAndGetStats();
Assert.assertEquals(4, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(4, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
stats = runDutyAndGetStats();
Assert.assertEquals(6, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(6, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
}
/** /**
* The {@code DAY_OLD} and {@code HOUR_OLD} segments are "more recent" in terms of last updated time. * The {@code DAY_OLD} and {@code HOUR_OLD} segments are "more recent" in terms of last updated time.
* Even though they fall within the umbrella kill interval computed by the duty, the kill task will narrow down to * Even though they fall within the umbrella kill interval computed by the duty, the kill task will narrow down to
@ -407,6 +552,36 @@ public class KillUnusedSegmentsTest
validateLastKillStateAndReset(DS1, YEAR_OLD); validateLastKillStateAndReset(DS1, YEAR_OLD);
} }
@Test
public void testKillDatasourceWithNoUnusedSegmentsInInitialRun()
{
configBuilder.withMaxSegmentsToKill(1);
// create a datasource but no unused segments yet.
createAndAddUsedSegment(DS1, YEAR_OLD, VERSION);
initDuty();
CoordinatorRunStats stats = runDutyAndGetStats();
Assert.assertEquals(10, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(0, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(10, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(0, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(10));
createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(10));
createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(2));
stats = runDutyAndGetStats();
Assert.assertEquals(20, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(20, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
validateLastKillStateAndReset(DS1, YEAR_OLD);
}
/** /**
* The kill period is honored after the first indexing run. * The kill period is honored after the first indexing run.
*/ */
@ -723,12 +898,7 @@ public class KillUnusedSegmentsTest
overlordClient.deleteLastKillInterval(dataSource); overlordClient.deleteLastKillInterval(dataSource);
} }
private void createAndAddUnusedSegment( private DataSegment createAndAddUsedSegment(final String dataSource, final Interval interval, final String version)
final String dataSource,
final Interval interval,
final String version,
final DateTime lastUpdatedTime
)
{ {
final DataSegment segment = createSegment(dataSource, interval, version); final DataSegment segment = createSegment(dataSource, interval, version);
try { try {
@ -737,6 +907,17 @@ public class KillUnusedSegmentsTest
catch (IOException e) { catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
return segment;
}
private void createAndAddUnusedSegment(
final String dataSource,
final Interval interval,
final String version,
final DateTime lastUpdatedTime
)
{
final DataSegment segment = createAndAddUsedSegment(dataSource, interval, version);
sqlSegmentsMetadataManager.markSegmentsAsUnused(ImmutableSet.of(segment.getId())); sqlSegmentsMetadataManager.markSegmentsAsUnused(ImmutableSet.of(segment.getId()));
derbyConnectorRule.segments().updateUsedStatusLastUpdated(segment.getId().toString(), lastUpdatedTime); derbyConnectorRule.segments().updateUsedStatusLastUpdated(segment.getId().toString(), lastUpdatedTime);
} }