Make batched segment sampling the default, minor cleanup of coordinator config (#13391)

The batch segment sampling performs significantly better than the older method
of sampling if there are a large number of used segments. It also avoids duplicates.

Changes:
- Make batch segment sampling the default
- Deprecate the property `useBatchedSegmentSampler`
- Remove unused coordinator config `druid.coordinator.loadqueuepeon.repeatDelay`
- Cleanup `KillUnusedSegments`
- Simplify `KillUnusedSegmentsTest`, add better tests, remove redundant tests
This commit is contained in:
Kashif Faraz 2022-11-21 20:31:46 +05:30 committed by GitHub
parent bfffbabb56
commit 133054bf27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 433 additions and 519 deletions

View File

@ -105,9 +105,7 @@ public interface BalancerStrategy
}
/**
* Pick the best segments to move from one of the supplied set of servers according to the balancing strategy. This
* is the deprecated way of picking a segment to move. pickSegmentsToMove(List<ServerHoler>, Set<String>, int) uses
* a more performant bathced sampling method that will become the default picking mode in the future.
* Pick the best segments to move from one of the supplied set of servers according to the balancing strategy.
*
* @param serverHolders set of historicals to consider for moving segments
* @param broadcastDatasources Datasources that contain segments which were loaded via broadcast rules.
@ -122,6 +120,10 @@ public interface BalancerStrategy
* for implementations of this method.
* @return Iterator for set of {@link BalancerSegmentHolder} containing segment to move and server they currently
* reside on, or empty if there are no segments to pick from (i. e. all provided serverHolders are empty).
*
* @deprecated Use {@link #pickSegmentsToMove(List, Set, int)} instead as it is
* a much more performant sampling method which does not allow duplicates. This
* method will be removed in future releases.
*/
@Deprecated
default Iterator<BalancerSegmentHolder> pickSegmentsToMove(

View File

@ -56,6 +56,7 @@ public class CoordinatorDynamicConfig
private final int maxSegmentsToMove;
@Deprecated
private final double percentOfSegmentsToConsiderPerMove;
@Deprecated
private final boolean useBatchedSegmentSampler;
private final int replicantLifetime;
private final int replicationThrottleLimit;
@ -115,7 +116,7 @@ public class CoordinatorDynamicConfig
@JsonProperty("mergeSegmentsLimit") int mergeSegmentsLimit,
@JsonProperty("maxSegmentsToMove") int maxSegmentsToMove,
@Deprecated @JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Double percentOfSegmentsToConsiderPerMove,
@JsonProperty("useBatchedSegmentSampler") boolean useBatchedSegmentSampler,
@Deprecated @JsonProperty("useBatchedSegmentSampler") Boolean useBatchedSegmentSampler,
@JsonProperty("replicantLifetime") int replicantLifetime,
@JsonProperty("replicationThrottleLimit") int replicationThrottleLimit,
@JsonProperty("balancerComputeThreads") int balancerComputeThreads,
@ -161,7 +162,12 @@ public class CoordinatorDynamicConfig
);
this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove;
this.useBatchedSegmentSampler = useBatchedSegmentSampler;
if (useBatchedSegmentSampler == null) {
this.useBatchedSegmentSampler = Builder.DEFAULT_USE_BATCHED_SEGMENT_SAMPLER;
} else {
this.useBatchedSegmentSampler = useBatchedSegmentSampler;
}
this.replicantLifetime = replicantLifetime;
this.replicationThrottleLimit = replicationThrottleLimit;
this.balancerComputeThreads = Math.max(balancerComputeThreads, 1);
@ -276,6 +282,7 @@ public class CoordinatorDynamicConfig
return percentOfSegmentsToConsiderPerMove;
}
@Deprecated
@JsonProperty
public boolean useBatchedSegmentSampler()
{
@ -517,7 +524,7 @@ public class CoordinatorDynamicConfig
private static final int DEFAULT_REPLICATION_THROTTLE_LIMIT = 10;
private static final int DEFAULT_BALANCER_COMPUTE_THREADS = 1;
private static final boolean DEFAULT_EMIT_BALANCING_STATS = false;
private static final boolean DEFAULT_USE_BATCHED_SEGMENT_SAMPLER = false;
private static final boolean DEFAULT_USE_BATCHED_SEGMENT_SAMPLER = true;
private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 100;
private static final int DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70;
private static final boolean DEFAULT_PAUSE_COORDINATION = false;
@ -557,7 +564,7 @@ public class CoordinatorDynamicConfig
@JsonProperty("mergeSegmentsLimit") @Nullable Integer mergeSegmentsLimit,
@JsonProperty("maxSegmentsToMove") @Nullable Integer maxSegmentsToMove,
@Deprecated @JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Double percentOfSegmentsToConsiderPerMove,
@JsonProperty("useBatchedSegmentSampler") Boolean useBatchedSegmentSampler,
@Deprecated @JsonProperty("useBatchedSegmentSampler") Boolean useBatchedSegmentSampler,
@JsonProperty("replicantLifetime") @Nullable Integer replicantLifetime,
@JsonProperty("replicationThrottleLimit") @Nullable Integer replicationThrottleLimit,
@JsonProperty("balancerComputeThreads") @Nullable Integer balancerComputeThreads,
@ -627,6 +634,7 @@ public class CoordinatorDynamicConfig
return this;
}
@Deprecated
public Builder withUseBatchedSegmentSampler(boolean useBatchedSegmentSampler)
{
this.useBatchedSegmentSampler = useBatchedSegmentSampler;

View File

@ -101,12 +101,6 @@ public abstract class DruidCoordinatorConfig
return new Duration(15 * 60 * 1000);
}
@Config("druid.coordinator.loadqueuepeon.repeatDelay")
public Duration getLoadQueuePeonRepeatDelay()
{
return Duration.millis(50);
}
@Config("druid.coordinator.loadqueuepeon.type")
public String getLoadQueuePeonType()
{

View File

@ -19,7 +19,6 @@
package org.apache.druid.server.coordinator.duty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import org.apache.druid.client.indexing.IndexingServiceClient;
@ -33,7 +32,6 @@ import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.List;
@ -43,7 +41,7 @@ import java.util.List;
* negative meaning the interval end target will be in the future. Also, retainDuration can be ignored,
* meaning that there is no upper bound to the end interval of segments that will be killed. This action is called
* "to kill a segment".
*
* <p>
* See org.apache.druid.indexing.common.task.KillUnusedSegmentsTask.
*/
public class KillUnusedSegments implements CoordinatorDuty
@ -102,74 +100,69 @@ public class KillUnusedSegments implements CoordinatorDuty
Collection<String> dataSourcesToKill =
params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
// If no datasource has been specified, all are eligible for killing unused segments
if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
dataSourcesToKill = segmentsMetadataManager.retrieveAllDataSourceNames();
}
if (dataSourcesToKill != null &&
dataSourcesToKill.size() > 0 &&
(lastKillTime + period) < System.currentTimeMillis()) {
lastKillTime = System.currentTimeMillis();
for (String dataSource : dataSourcesToKill) {
final Interval intervalToKill = findIntervalForKill(dataSource, maxSegmentsToKill);
if (intervalToKill != null) {
try {
indexingServiceClient.killUnusedSegments("coordinator-issued", dataSource, intervalToKill);
}
catch (Exception ex) {
log.error(ex, "Failed to submit kill task for dataSource [%s]", dataSource);
if (Thread.currentThread().isInterrupted()) {
log.warn("skipping kill task scheduling because thread is interrupted.");
break;
}
}
}
}
final long currentTimeMillis = System.currentTimeMillis();
if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
log.debug("No eligible datasource to kill unused segments.");
} else if (lastKillTime + period > currentTimeMillis) {
log.debug("Skipping kill of unused segments as kill period has not elapsed yet.");
} else {
log.debug("Killing unused segments in datasources: %s", dataSourcesToKill);
lastKillTime = currentTimeMillis;
killUnusedSegments(dataSourcesToKill);
}
return params;
}
/**
* For a given datasource and limit of segments that can be killed in one task, determine the interval to be
* submitted with the kill task.
*
* @param dataSource dataSource whose unused segments are being killed.
* @param limit the maximum number of segments that can be included in the kill task.
* @return {@link Interval} to be used in the kill task.
*/
@VisibleForTesting
@Nullable
Interval findIntervalForKill(String dataSource, int limit)
private void killUnusedSegments(Collection<String> dataSourcesToKill)
{
List<Interval> unusedSegmentIntervals =
segmentsMetadataManager.getUnusedSegmentIntervals(dataSource, getEndTimeUpperLimit(), limit);
int submittedTasks = 0;
for (String dataSource : dataSourcesToKill) {
final Interval intervalToKill = findIntervalForKill(dataSource);
if (intervalToKill == null) {
continue;
}
if (unusedSegmentIntervals != null && unusedSegmentIntervals.size() > 0) {
return JodaUtils.umbrellaInterval(unusedSegmentIntervals);
} else {
try {
indexingServiceClient.killUnusedSegments("coordinator-issued", dataSource, intervalToKill);
++submittedTasks;
}
catch (Exception ex) {
log.error(ex, "Failed to submit kill task for dataSource [%s]", dataSource);
if (Thread.currentThread().isInterrupted()) {
log.warn("skipping kill task scheduling because thread is interrupted.");
break;
}
}
}
log.debug("Submitted kill tasks for [%d] datasources.", submittedTasks);
}
/**
* Calculates the interval for which segments are to be killed in a datasource.
*/
private Interval findIntervalForKill(String dataSource)
{
final DateTime maxEndTime = ignoreRetainDuration
? DateTimes.COMPARE_DATE_AS_STRING_MAX
: DateTimes.nowUtc().minus(retainDuration);
List<Interval> unusedSegmentIntervals = segmentsMetadataManager
.getUnusedSegmentIntervals(dataSource, maxEndTime, maxSegmentsToKill);
if (CollectionUtils.isNullOrEmpty(unusedSegmentIntervals)) {
return null;
} else if (unusedSegmentIntervals.size() == 1) {
return unusedSegmentIntervals.get(0);
} else {
return JodaUtils.umbrellaInterval(unusedSegmentIntervals);
}
}
/**
* Calculate the {@link DateTime} that wil form the upper bound when looking for segments that are
* eligible to be killed. If ignoreDurationToRetain is true, we have no upper bound and return a DateTime object
* for "max" time that works when comparing date strings.
*
* @return {@link DateTime} representing the upper bound time used when looking for segments to kill.
*/
@VisibleForTesting
DateTime getEndTimeUpperLimit()
{
return ignoreRetainDuration
? DateTimes.COMPARE_DATE_AS_STRING_MAX
: DateTimes.nowUtc().minus(retainDuration);
}
@VisibleForTesting
Long getRetainDuration()
{
return retainDuration;
}
}

View File

@ -238,7 +238,7 @@ public class BalanceSegmentsTest
new ServerHolder(druidServer2, peon2, true)
),
broadcastDatasources,
100.0
2
)
).andReturn(
ImmutableList.of(
@ -247,7 +247,7 @@ public class BalanceSegmentsTest
).iterator()
);
EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyDouble()))
EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(
ImmutableList.of(
new BalancerSegmentHolder(druidServer1, segment1),
@ -297,7 +297,7 @@ public class BalanceSegmentsTest
DruidCoordinatorRuntimeParams params = setupParamsForDecommissioningMaxPercentOfMaxSegmentsToMove(10);
params = new BalanceSegmentsTester(coordinator).run(params);
Assert.assertEquals(1L, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
Assert.assertEquals(ImmutableSet.of(segment2), peon3.getSegmentsToLoad());
Assert.assertEquals(ImmutableSet.of(segment1), peon3.getSegmentsToLoad());
}
/**
@ -316,7 +316,7 @@ public class BalanceSegmentsTest
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
EasyMock.expect(
strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyDouble()))
strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(
ImmutableList.of(
new BalancerSegmentHolder(druidServer1, segment2),
@ -367,7 +367,7 @@ public class BalanceSegmentsTest
mockCoordinator(coordinator);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyDouble()))
EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer1, segment1)).iterator())
.anyTimes();
EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())).andAnswer(() -> {
@ -403,7 +403,7 @@ public class BalanceSegmentsTest
ServerHolder holder2 = new ServerHolder(druidServer2, peon2, false);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyDouble()))
EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer1, segment1)).iterator())
.once();
EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject()))
@ -592,6 +592,7 @@ public class BalanceSegmentsTest
.withDynamicConfigs(
CoordinatorDynamicConfig.builder()
.withMaxSegmentsToMove(1)
.withUseBatchedSegmentSampler(false)
.withPercentOfSegmentsToConsiderPerMove(40)
.build()
)
@ -788,7 +789,7 @@ public class BalanceSegmentsTest
).andReturn(
ImmutableList.of(new BalancerSegmentHolder(druidServer2, segment2)).iterator()
);
EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyDouble()))
EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer1, segment1)).iterator());
EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(new ServerHolder(druidServer3, peon3))

View File

@ -164,7 +164,6 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
.withCoordinatorPeriod(new Duration(COORDINATOR_PERIOD))
.withCoordinatorKillPeriod(new Duration(COORDINATOR_PERIOD))
.withCoordinatorKillMaxSegments(10)
.withLoadQueuePeonRepeatDelay(new Duration("PT0s"))
.withCoordinatorKillIgnoreDurationToRetain(false)
.build();
sourceLoadQueueChildrenCache = new PathChildrenCache(

View File

@ -46,7 +46,6 @@ public class DruidCoordinatorConfigTest
Assert.assertEquals(7776000000L, config.getCoordinatorKillDurationToRetain().getMillis());
Assert.assertEquals(100, config.getCoordinatorKillMaxSegments());
Assert.assertEquals(new Duration(15 * 60 * 1000), config.getLoadTimeoutDelay());
Assert.assertEquals(Duration.millis(50), config.getLoadQueuePeonRepeatDelay());
Assert.assertTrue(config.getCompactionSkipLockedIntervals());
Assert.assertFalse(config.getCoordinatorKillIgnoreDurationToRetain());
Assert.assertEquals("http", config.getLoadQueuePeonType());
@ -76,7 +75,6 @@ public class DruidCoordinatorConfigTest
Assert.assertEquals(new Duration("PT1s"), config.getCoordinatorKillDurationToRetain());
Assert.assertEquals(10000, config.getCoordinatorKillMaxSegments());
Assert.assertEquals(new Duration("PT1s"), config.getLoadTimeoutDelay());
Assert.assertEquals(Duration.millis(100), config.getLoadQueuePeonRepeatDelay());
Assert.assertFalse(config.getCompactionSkipLockedIntervals());
Assert.assertTrue(config.getCoordinatorKillIgnoreDurationToRetain());

View File

@ -148,7 +148,6 @@ public class DruidCoordinatorTest extends CuratorTestBase
.withCoordinatorStartDelay(new Duration(COORDINATOR_START_DELAY))
.withCoordinatorPeriod(new Duration(COORDINATOR_PERIOD))
.withCoordinatorKillPeriod(new Duration(COORDINATOR_PERIOD))
.withLoadQueuePeonRepeatDelay(new Duration("PT0s"))
.withCoordinatorKillIgnoreDurationToRetain(false)
.build();
pathChildrenCache = new PathChildrenCache(
@ -833,7 +832,6 @@ public class DruidCoordinatorTest extends CuratorTestBase
.withCoordinatorPeriod(new Duration(COORDINATOR_PERIOD))
.withCoordinatorKillPeriod(new Duration(COORDINATOR_PERIOD))
.withCoordinatorKillMaxSegments(10)
.withLoadQueuePeonRepeatDelay(new Duration("PT0s"))
.withCompactionSkippedLockedIntervals(false)
.withCoordinatorKillIgnoreDurationToRetain(false)
.build();

View File

@ -75,7 +75,6 @@ public class HttpLoadQueuePeonTest
final TestDruidCoordinatorConfig config = new TestDruidCoordinatorConfig.Builder()
.withCoordinatorKillMaxSegments(10)
.withLoadQueuePeonRepeatDelay(Duration.ZERO)
.withCoordinatorKillIgnoreDurationToRetain(false)
.withHttpLoadQueuePeonBatchSize(2)
.build();

View File

@ -90,7 +90,6 @@ public class LoadQueuePeonTest extends CuratorTestBase
Execs.singleThreaded("test_load_queue_peon-%d"),
new TestDruidCoordinatorConfig.Builder()
.withCoordinatorKillMaxSegments(10)
.withLoadQueuePeonRepeatDelay(Duration.millis(0))
.withCoordinatorKillIgnoreDurationToRetain(false)
.build()
);
@ -281,7 +280,6 @@ public class LoadQueuePeonTest extends CuratorTestBase
new TestDruidCoordinatorConfig.Builder()
.withLoadTimeoutDelay(new Duration(1))
.withCoordinatorKillMaxSegments(10)
.withLoadQueuePeonRepeatDelay(new Duration("PT1s"))
.withCoordinatorKillIgnoreDurationToRetain(false)
.build()
);
@ -322,7 +320,6 @@ public class LoadQueuePeonTest extends CuratorTestBase
new TestDruidCoordinatorConfig.Builder()
.withLoadTimeoutDelay(new Duration(1))
.withCoordinatorKillMaxSegments(10)
.withLoadQueuePeonRepeatDelay(new Duration("PT1s"))
.withCoordinatorKillIgnoreDurationToRetain(false)
.build()
);

View File

@ -40,7 +40,6 @@ public class LoadQueuePeonTester extends CuratorLoadQueuePeon
new TestDruidCoordinatorConfig.Builder()
.withLoadTimeoutDelay(new Duration(1))
.withCoordinatorKillMaxSegments(10)
.withLoadQueuePeonRepeatDelay(new Duration("PT1s"))
.withCoordinatorKillIgnoreDurationToRetain(false)
.build()
);

View File

@ -19,173 +19,65 @@
package org.apache.druid.server.coordinator;
import com.google.common.collect.Lists;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.ImmutableDruidServerTests;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class ReservoirSegmentSamplerTest
{
private ImmutableDruidServer druidServer1;
private ImmutableDruidServer druidServer2;
private ImmutableDruidServer druidServer3;
private ImmutableDruidServer druidServer4;
private ServerHolder holder1;
private ServerHolder holder2;
private ServerHolder holder3;
private ServerHolder holder4;
private DataSegment segment1;
private DataSegment segment2;
private DataSegment segment3;
private DataSegment segment4;
List<DataSegment> segments1;
List<DataSegment> segments2;
List<DataSegment> segments3;
List<DataSegment> segments4;
List<DataSegment> segments;
/**
* num segments = 10 x 100 days
*/
private final List<DataSegment> segments =
CreateDataSegments.ofDatasource("wiki")
.forIntervals(100, Granularities.DAY)
.startingAt("2022-01-01")
.withNumPartitions(10)
.eachOfSizeInMb(100);
@Before
public void setUp()
{
druidServer1 = EasyMock.createMock(ImmutableDruidServer.class);
druidServer2 = EasyMock.createMock(ImmutableDruidServer.class);
druidServer3 = EasyMock.createMock(ImmutableDruidServer.class);
druidServer4 = EasyMock.createMock(ImmutableDruidServer.class);
holder1 = EasyMock.createMock(ServerHolder.class);
holder2 = EasyMock.createMock(ServerHolder.class);
holder3 = EasyMock.createMock(ServerHolder.class);
holder4 = EasyMock.createMock(ServerHolder.class);
segment1 = EasyMock.createMock(DataSegment.class);
segment2 = EasyMock.createMock(DataSegment.class);
segment3 = EasyMock.createMock(DataSegment.class);
segment4 = EasyMock.createMock(DataSegment.class);
DateTime start1 = DateTimes.of("2012-01-01");
DateTime start2 = DateTimes.of("2012-02-01");
DateTime version = DateTimes.of("2012-03-01");
segment1 = new DataSegment(
"datasource1",
new Interval(start1, start1.plusHours(1)),
version.toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
0,
11L
);
segment2 = new DataSegment(
"datasource1",
new Interval(start2, start2.plusHours(1)),
version.toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
0,
7L
);
segment3 = new DataSegment(
"datasource2",
new Interval(start1, start1.plusHours(1)),
version.toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
0,
4L
);
segment4 = new DataSegment(
"datasource2",
new Interval(start2, start2.plusHours(1)),
version.toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
0,
8L
);
segments = Lists.newArrayList(segment1, segment2, segment3, segment4);
segments1 = Collections.singletonList(segment1);
segments2 = Collections.singletonList(segment2);
segments3 = Collections.singletonList(segment3);
segments4 = Collections.singletonList(segment4);
}
//checks if every segment is selected at least once out of 5000 trials
//checks if every segment is selected at least once out of 50 trials
@Test
public void getRandomBalancerSegmentHolderTest()
public void testEverySegmentGetsPickedAtleastOnce()
{
int iterations = 5000;
EasyMock.expect(druidServer1.getType()).andReturn(ServerType.HISTORICAL).times(iterations);
ImmutableDruidServerTests.expectSegments(druidServer1, segments1);
EasyMock.replay(druidServer1);
EasyMock.expect(druidServer2.getType()).andReturn(ServerType.HISTORICAL).times(iterations);
ImmutableDruidServerTests.expectSegments(druidServer2, segments2);
EasyMock.replay(druidServer2);
EasyMock.expect(druidServer3.getType()).andReturn(ServerType.HISTORICAL).times(iterations);
ImmutableDruidServerTests.expectSegments(druidServer3, segments3);
EasyMock.replay(druidServer3);
EasyMock.expect(druidServer4.getType()).andReturn(ServerType.HISTORICAL).times(iterations);
ImmutableDruidServerTests.expectSegments(druidServer4, segments4);
EasyMock.replay(druidServer4);
// Have to use anyTimes() because the number of times a segment on a given server is chosen is indetermistic.
EasyMock.expect(holder1.getServer()).andReturn(druidServer1).anyTimes();
EasyMock.replay(holder1);
EasyMock.expect(holder2.getServer()).andReturn(druidServer2).anyTimes();
EasyMock.replay(holder2);
EasyMock.expect(holder3.getServer()).andReturn(druidServer3).anyTimes();
EasyMock.replay(holder3);
EasyMock.expect(holder4.getServer()).andReturn(druidServer4).anyTimes();
EasyMock.replay(holder4);
List<ServerHolder> holderList = new ArrayList<>();
holderList.add(holder1);
holderList.add(holder2);
holderList.add(holder3);
holderList.add(holder4);
int iterations = 50;
final List<ServerHolder> servers = Arrays.asList(
createHistorical("server1", segments.get(0)),
createHistorical("server2", segments.get(1)),
createHistorical("server3", segments.get(2)),
createHistorical("server4", segments.get(3))
);
Map<DataSegment, Integer> segmentCountMap = new HashMap<>();
for (int i = 0; i < iterations; i++) {
// due to the pseudo-randomness of this method, we may not select a segment every single time no matter what.
segmentCountMap.put(
ReservoirSegmentSampler.getRandomBalancerSegmentHolders(holderList, Collections.emptySet(), 1).get(0).getSegment(),
1
segmentCountMap.compute(
ReservoirSegmentSampler
.getRandomBalancerSegmentHolders(servers, Collections.emptySet(), 1)
.get(0).getSegment(),
(segment, count) -> count == null ? 1 : count + 1
);
}
for (DataSegment segment : segments) {
Assert.assertEquals(new Integer(1), segmentCountMap.get(segment));
}
EasyMock.verify(druidServer1, druidServer2, druidServer3, druidServer4);
EasyMock.verify(holder1, holder2, holder3, holder4);
// Verify that each segment has been chosen at least once
Assert.assertEquals(4, segmentCountMap.size());
}
/**
@ -195,56 +87,149 @@ public class ReservoirSegmentSamplerTest
@Test
public void getRandomBalancerSegmentHolderTestSegmentsToConsiderLimit()
{
int iterations = 5000;
int iterations = 50;
EasyMock.expect(druidServer1.getType()).andReturn(ServerType.HISTORICAL).times(iterations);
ImmutableDruidServerTests.expectSegments(druidServer1, segments1);
EasyMock.replay(druidServer1);
EasyMock.expect(druidServer2.getType()).andReturn(ServerType.HISTORICAL).times(iterations);
ImmutableDruidServerTests.expectSegments(druidServer2, segments2);
EasyMock.replay(druidServer2);
EasyMock.expect(druidServer3.getType()).andReturn(ServerType.HISTORICAL).times(iterations);
ImmutableDruidServerTests.expectSegments(druidServer3, segments3);
EasyMock.replay(druidServer3);
ImmutableDruidServerTests.expectSegments(druidServer4, segments4);
EasyMock.replay(druidServer4);
// Have to use anyTimes() because the number of times a segment on a given server is chosen is indetermistic.
EasyMock.expect(holder1.getServer()).andReturn(druidServer1).anyTimes();
EasyMock.replay(holder1);
EasyMock.expect(holder2.getServer()).andReturn(druidServer2).anyTimes();
EasyMock.replay(holder2);
EasyMock.expect(holder3.getServer()).andReturn(druidServer3).anyTimes();
EasyMock.replay(holder3);
// We only run getServer() each time we calculate the limit on segments to consider. Always 5k
EasyMock.expect(holder4.getServer()).andReturn(druidServer4).times(5000);
EasyMock.replay(holder4);
List<ServerHolder> holderList = new ArrayList<>();
holderList.add(holder1);
holderList.add(holder2);
holderList.add(holder3);
holderList.add(holder4);
final DataSegment excludedSegment = segments.get(3);
final List<ServerHolder> servers = Arrays.asList(
createHistorical("server1", segments.get(0)),
createHistorical("server2", segments.get(1)),
createHistorical("server3", segments.get(2)),
createHistorical("server4", excludedSegment)
);
Map<DataSegment, Integer> segmentCountMap = new HashMap<>();
final double percentOfSegmentsToConsider = 75.0;
for (int i = 0; i < iterations; i++) {
segmentCountMap.put(
ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet(), 75).getSegment(), 1
segmentCountMap.compute(
ReservoirSegmentSampler
.getRandomBalancerSegmentHolder(servers, Collections.emptySet(), percentOfSegmentsToConsider)
.getSegment(),
(segment, count) -> count == null ? 1 : count + 1
);
}
for (DataSegment segment : segments) {
if (!segment.equals(segment4)) {
Assert.assertEquals(new Integer(1), segmentCountMap.get(segment));
} else {
Assert.assertNull(segmentCountMap.get(segment));
// Verify that the segment on server4 is never chosen because of limit
Assert.assertFalse(segmentCountMap.containsKey(excludedSegment));
Assert.assertEquals(3, segmentCountMap.size());
}
@Test
public void testSegmentsOnBrokersAreIgnored()
{
final ServerHolder historical = createHistorical("hist1", segments.get(0), segments.get(1));
final ServerHolder broker = new ServerHolder(
new DruidServer("broker1", "broker1", null, 1000, ServerType.BROKER, null, 1)
.addDataSegment(segments.get(2))
.addDataSegment(segments.get(3))
.toImmutableDruidServer(),
new LoadQueuePeonTester()
);
// Try to pick all the segments on the servers
List<BalancerSegmentHolder> pickedSegments = ReservoirSegmentSampler.getRandomBalancerSegmentHolders(
Arrays.asList(historical, broker),
Collections.emptySet(),
10
);
// Verify that only the segments on the historical are picked
Assert.assertEquals(2, pickedSegments.size());
for (BalancerSegmentHolder holder : pickedSegments) {
Assert.assertEquals(historical.getServer(), holder.getFromServer());
}
}
@Test
public void testBroadcastSegmentsAreIgnored()
{
// num segments = 1 x 4 days
final String broadcastDatasource = "ds_broadcast";
final List<DataSegment> broadcastSegments
= CreateDataSegments.ofDatasource(broadcastDatasource)
.forIntervals(4, Granularities.DAY)
.startingAt("2022-01-01")
.withNumPartitions(1)
.eachOfSizeInMb(100);
final List<ServerHolder> servers = Arrays.asList(
createHistorical("server1", broadcastSegments.toArray(new DataSegment[0])),
createHistorical("server2", segments.get(0), segments.get(1))
);
// Try to pick all the segments on the servers
List<BalancerSegmentHolder> pickedSegments = ReservoirSegmentSampler
.getRandomBalancerSegmentHolders(servers, Collections.singleton(broadcastDatasource), 10);
// Verify that none of the broadcast segments are picked
Assert.assertEquals(2, pickedSegments.size());
for (BalancerSegmentHolder holder : pickedSegments) {
Assert.assertNotEquals(broadcastDatasource, holder.getSegment().getDataSource());
}
}
@Test(timeout = 60_000)
public void testNumberOfIterationsToCycleThroughAllSegments()
{
// The number of runs required for each sample percentage
// remains more or less fixed, even with a larger number of segments
final int[] samplePercentages = {100, 50, 10, 5, 1};
final int[] expectedIterations = {1, 20, 100, 200, 1000};
final int[] totalObservedIterations = new int[5];
for (int i = 0; i < 50; ++i) {
for (int j = 0; j < samplePercentages.length; ++j) {
totalObservedIterations[j] += countMinRunsWithSamplePercent(samplePercentages[j]);
}
}
EasyMock.verify(druidServer1, druidServer2, druidServer3, druidServer4);
EasyMock.verify(holder1, holder2, holder3, holder4);
for (int j = 0; j < samplePercentages.length; ++j) {
double avgObservedIterations = totalObservedIterations[j] / 50.0;
Assert.assertTrue(avgObservedIterations <= expectedIterations[j]);
}
}
/**
* Returns the minimum number of iterations of the reservoir sampling required
* to pick each segment atleast once.
* <p>
* {@code k = sampleSize = totalNumSegments * samplePercentage}
*/
private int countMinRunsWithSamplePercent(int samplePercentage)
{
final int numSegments = segments.size();
final List<ServerHolder> servers = Arrays.asList(
createHistorical("server1", segments.subList(0, numSegments / 2).toArray(new DataSegment[0])),
createHistorical("server2", segments.subList(numSegments / 2, numSegments).toArray(new DataSegment[0]))
);
final Set<DataSegment> pickedSegments = new HashSet<>();
int sampleSize = (int) (numSegments * samplePercentage / 100.0);
int numIterations = 1;
for (; numIterations < 10000; ++numIterations) {
ReservoirSegmentSampler
.getRandomBalancerSegmentHolders(servers, Collections.emptySet(), sampleSize)
.forEach(holder -> pickedSegments.add(holder.getSegment()));
if (pickedSegments.size() >= numSegments) {
break;
}
}
return numIterations;
}
private ServerHolder createHistorical(String serverName, DataSegment... loadedSegments)
{
final DruidServer server =
new DruidServer(serverName, serverName, null, 100000, ServerType.HISTORICAL, "normal", 1);
for (DataSegment segment : loadedSegments) {
server.addDataSegment(segment);
}
return new ServerHolder(server.toImmutableDruidServer(), new LoadQueuePeonTester());
}
}

View File

@ -39,7 +39,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
private final Duration coordinatorRuleKillDurationToRetain;
private final Duration coordinatorDatasourceKillPeriod;
private final Duration coordinatorDatasourceKillDurationToRetain;
private final Duration loadQueuePeonRepeatDelay;
private final int coordinatorKillMaxSegments;
private final boolean compactionSkipLockedIntervals;
private final boolean coordinatorKillIgnoreDurationToRetain;
@ -67,7 +66,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
Duration coordinatorDatasourceKillPeriod,
Duration coordinatorDatasourceKillDurationToRetain,
int coordinatorKillMaxSegments,
Duration loadQueuePeonRepeatDelay,
boolean compactionSkipLockedIntervals,
boolean coordinatorKillIgnoreDurationToRetain,
String loadQueuePeonType,
@ -94,7 +92,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
this.coordinatorDatasourceKillPeriod = coordinatorDatasourceKillPeriod;
this.coordinatorDatasourceKillDurationToRetain = coordinatorDatasourceKillDurationToRetain;
this.coordinatorKillMaxSegments = coordinatorKillMaxSegments;
this.loadQueuePeonRepeatDelay = loadQueuePeonRepeatDelay;
this.compactionSkipLockedIntervals = compactionSkipLockedIntervals;
this.coordinatorKillIgnoreDurationToRetain = coordinatorKillIgnoreDurationToRetain;
this.loadQueuePeonType = loadQueuePeonType;
@ -206,12 +203,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
return loadTimeoutDelay == null ? super.getLoadTimeoutDelay() : loadTimeoutDelay;
}
@Override
public Duration getLoadQueuePeonRepeatDelay()
{
return loadQueuePeonRepeatDelay;
}
@Override
public boolean getCompactionSkipLockedIntervals()
{
@ -272,7 +263,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
private static final Duration DEFAULT_COORDINATOR_DATASOURCE_KILL_PERIOD = new Duration("PT86400s");
private static final Duration DEFAULT_COORDINATOR_DATASOURCE_KILL_DURATION_TO_RETAIN = new Duration("PT7776000s");
private static final Duration DEFAULT_LOAD_TIMEOUT_DELAY = new Duration(15 * 60 * 1000);
private static final Duration DEFAULT_LOAD_QUEUE_PEON_REPEAT_DELAY = Duration.millis(50);
private static final String DEFAULT_LOAD_QUEUE_PEON_TYPE = "curator";
private static final int DEFAULT_CURATOR_LOAD_QUEUE_PEON_NUM_CALLBACK_THREADS = 2;
private static final Duration DEFAULT_HTTP_LOAD_QUEUE_PEON_REPEAT_DELAY = Duration.millis(60000);
@ -299,7 +289,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
private Duration coordinatorDatasourceKillPeriod;
private Duration coordinatorDatasourceKillDurationToRetain;
private Duration loadTimeoutDelay;
private Duration loadQueuePeonRepeatDelay;
private String loadQueuePeonType;
private Duration httpLoadQueuePeonRepeatDelay;
private Integer curatorLoadQueuePeonNumCallbackThreads;
@ -409,12 +398,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
return this;
}
public Builder withLoadQueuePeonRepeatDelay(Duration loadQueuePeonRepeatDelay)
{
this.loadQueuePeonRepeatDelay = loadQueuePeonRepeatDelay;
return this;
}
public Builder withLoadQueuePeonType(String loadQueuePeonType)
{
this.loadQueuePeonType = loadQueuePeonType;
@ -483,7 +466,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
coordinatorDatasourceKillPeriod == null ? DEFAULT_COORDINATOR_DATASOURCE_KILL_PERIOD : coordinatorDatasourceKillPeriod,
coordinatorDatasourceKillDurationToRetain == null ? DEFAULT_COORDINATOR_DATASOURCE_KILL_DURATION_TO_RETAIN : coordinatorDatasourceKillDurationToRetain,
coordinatorKillMaxSegments == null ? DEFAULT_COORDINATOR_KILL_MAX_SEGMENTS : coordinatorKillMaxSegments,
loadQueuePeonRepeatDelay == null ? DEFAULT_LOAD_QUEUE_PEON_REPEAT_DELAY : loadQueuePeonRepeatDelay,
compactionSkippedLockedIntervals == null ? DEFAULT_COMPACTION_SKIP_LOCKED_INTERVALS : compactionSkippedLockedIntervals,
coordinatorKillIgnoreDurationToRetain == null ? DEFAULT_COORDINATOR_KILL_IGNORE_DURATION_TO_RETAIN : coordinatorKillIgnoreDurationToRetain,
loadQueuePeonType == null ? DEFAULT_LOAD_QUEUE_PEON_TYPE : loadQueuePeonType,

View File

@ -20,281 +20,221 @@
package org.apache.druid.server.coordinator.duty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
import org.easymock.EasyMock;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.junit.Assert;
import org.joda.time.Period;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
/**
*
*/
@RunWith(Enclosed.class)
@RunWith(MockitoJUnitRunner.class)
public class KillUnusedSegmentsTest
{
/**
* Standing up new tests with mocks was easier than trying to move the existing tests to use mocks for consistency.
* In the future, if all tests are moved to use the same structure, this inner static class can be gotten rid of.
*/
@RunWith(MockitoJUnitRunner.class)
public static class MockedTest
private static final int MAX_SEGMENTS_TO_KILL = 10;
private static final Duration COORDINATOR_KILL_PERIOD = Duration.standardMinutes(2);
private static final Duration DURATION_TO_RETAIN = Duration.standardDays(1);
private static final Duration INDEXING_PERIOD = Duration.standardMinutes(1);
@Mock
private SegmentsMetadataManager segmentsMetadataManager;
@Mock
private IndexingServiceClient indexingServiceClient;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private DruidCoordinatorConfig config;
@Mock
private DruidCoordinatorRuntimeParams params;
@Mock
private CoordinatorDynamicConfig coordinatorDynamicConfig;
private DataSegment yearOldSegment;
private DataSegment monthOldSegment;
private DataSegment dayOldSegment;
private DataSegment hourOldSegment;
private DataSegment nextDaySegment;
private DataSegment nextMonthSegment;
private KillUnusedSegments target;
@Before
public void setup()
{
private static final Set<String> ALL_DATASOURCES = ImmutableSet.of("DS1", "DS2", "DS3");
private static final int MAX_SEGMENTS_TO_KILL = 10;
private static final Duration COORDINATOR_KILL_PERIOD = Duration.standardMinutes(2);
private static final Duration DURATION_TO_RETAIN = Duration.standardDays(1);
private static final Duration INDEXING_PERIOD = Duration.standardMinutes(1);
Mockito.doReturn(coordinatorDynamicConfig).when(params).getCoordinatorDynamicConfig();
Mockito.doReturn(COORDINATOR_KILL_PERIOD).when(config).getCoordinatorKillPeriod();
Mockito.doReturn(DURATION_TO_RETAIN).when(config).getCoordinatorKillDurationToRetain();
Mockito.doReturn(INDEXING_PERIOD).when(config).getCoordinatorIndexingPeriod();
Mockito.doReturn(MAX_SEGMENTS_TO_KILL).when(config).getCoordinatorKillMaxSegments();
@Mock
private SegmentsMetadataManager segmentsMetadataManager;
@Mock
private IndexingServiceClient indexingServiceClient;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private DruidCoordinatorConfig config;
Mockito.doReturn(Collections.singleton("DS1"))
.when(coordinatorDynamicConfig).getSpecificDataSourcesToKillUnusedSegmentsIn();
@Mock
private DruidCoordinatorRuntimeParams params;
@Mock
private CoordinatorDynamicConfig coordinatorDynamicConfig;
private KillUnusedSegments target;
final DateTime now = DateTimes.nowUtc();
@Before
public void setup()
{
Mockito.doReturn(coordinatorDynamicConfig).when(params).getCoordinatorDynamicConfig();
Mockito.doReturn(ALL_DATASOURCES).when(segmentsMetadataManager).retrieveAllDataSourceNames();
Mockito.doReturn(COORDINATOR_KILL_PERIOD).when(config).getCoordinatorKillPeriod();
Mockito.doReturn(DURATION_TO_RETAIN).when(config).getCoordinatorKillDurationToRetain();
Mockito.doReturn(INDEXING_PERIOD).when(config).getCoordinatorIndexingPeriod();
Mockito.doReturn(MAX_SEGMENTS_TO_KILL).when(config).getCoordinatorKillMaxSegments();
target = new KillUnusedSegments(segmentsMetadataManager, indexingServiceClient, config);
}
@Test
public void testRunWihNoIntervalShouldNotKillAnySegments()
{
target.run(params);
Mockito.verify(indexingServiceClient, Mockito.never())
.killUnusedSegments(anyString(), anyString(), any(Interval.class));
}
yearOldSegment = createSegmentWithEnd(now.minusDays(365));
monthOldSegment = createSegmentWithEnd(now.minusDays(30));
dayOldSegment = createSegmentWithEnd(now.minusDays(1));
hourOldSegment = createSegmentWithEnd(now.minusHours(1));
nextDaySegment = createSegmentWithEnd(now.plusDays(1));
nextMonthSegment = createSegmentWithEnd(now.plusDays(30));
@Test
public void testRunWihSpecificDatasourceAndNoIntervalShouldNotKillAnySegments()
{
Mockito.when(coordinatorDynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn()).thenReturn(Collections.singleton("DS1"));
target.run(params);
Mockito.verify(indexingServiceClient, Mockito.never())
.killUnusedSegments(anyString(), anyString(), any(Interval.class));
}
final List<DataSegment> unusedSegments = ImmutableList.of(
yearOldSegment,
monthOldSegment,
dayOldSegment,
hourOldSegment,
nextDaySegment,
nextMonthSegment
);
Mockito.when(
segmentsMetadataManager.getUnusedSegmentIntervals(
ArgumentMatchers.anyString(),
ArgumentMatchers.any(),
ArgumentMatchers.anyInt()
)
).thenAnswer(invocation -> {
DateTime maxEndTime = invocation.getArgument(1);
List<Interval> unusedIntervals =
unusedSegments.stream()
.map(DataSegment::getInterval)
.filter(i -> i.getEnd().isBefore(maxEndTime))
.collect(Collectors.toList());
int limit = invocation.getArgument(2);
return unusedIntervals.size() <= limit ? unusedIntervals : unusedIntervals.subList(0, limit);
});
target = new KillUnusedSegments(segmentsMetadataManager, indexingServiceClient, config);
}
public static class FindIntervalsTest
@Test
public void testRunWithNoIntervalShouldNotKillAnySegments()
{
@Test
public void testFindIntervalForKill()
{
testFindIntervalForKill(null, null);
testFindIntervalForKill(ImmutableList.of(), null);
Mockito.doReturn(null).when(segmentsMetadataManager).getUnusedSegmentIntervals(
ArgumentMatchers.anyString(),
ArgumentMatchers.any(),
ArgumentMatchers.anyInt()
);
testFindIntervalForKill(ImmutableList.of(Intervals.of("2014/2015")), Intervals.of("2014/2015"));
target.run(params);
Mockito.verify(indexingServiceClient, Mockito.never())
.killUnusedSegments(anyString(), anyString(), any(Interval.class));
}
testFindIntervalForKill(
ImmutableList.of(Intervals.of("2014/2015"), Intervals.of("2016/2017")),
Intervals.of("2014/2017")
);
@Test
public void testRunWithSpecificDatasourceAndNoIntervalShouldNotKillAnySegments()
{
Mockito.doReturn(Duration.standardDays(400))
.when(config).getCoordinatorKillDurationToRetain();
target = new KillUnusedSegments(segmentsMetadataManager, indexingServiceClient, config);
testFindIntervalForKill(
ImmutableList.of(Intervals.of("2014/2015"), Intervals.of("2015/2016")),
Intervals.of("2014/2016")
);
// No unused segment is older than the retention period
target.run(params);
Mockito.verify(indexingServiceClient, Mockito.never())
.killUnusedSegments(anyString(), anyString(), any(Interval.class));
}
testFindIntervalForKill(
ImmutableList.of(Intervals.of("2015/2016"), Intervals.of("2014/2015")),
Intervals.of("2014/2016")
);
@Test
public void testDurationToRetain()
{
// Only segments more than a day old are killed
Interval expectedKillInterval = new Interval(
yearOldSegment.getInterval().getStart(),
dayOldSegment.getInterval().getEnd()
);
runAndVerifyKillInterval(expectedKillInterval);
}
testFindIntervalForKill(
ImmutableList.of(Intervals.of("2015/2017"), Intervals.of("2014/2016")),
Intervals.of("2014/2017")
);
@Test
public void testNegativeDurationToRetain()
{
// Duration to retain = -1 day, reinit target for config to take effect
Mockito.doReturn(DURATION_TO_RETAIN.negated())
.when(config).getCoordinatorKillDurationToRetain();
target = new KillUnusedSegments(segmentsMetadataManager, indexingServiceClient, config);
testFindIntervalForKill(
ImmutableList.of(
Intervals.of("2015/2019"),
Intervals.of("2014/2016"),
Intervals.of("2018/2020")
),
Intervals.of("2014/2020")
);
// Segments upto 1 day in the future are killed
Interval expectedKillInterval = new Interval(
yearOldSegment.getInterval().getStart(),
nextDaySegment.getInterval().getEnd()
);
runAndVerifyKillInterval(expectedKillInterval);
}
testFindIntervalForKill(
ImmutableList.of(
Intervals.of("2015/2019"),
Intervals.of("2014/2016"),
Intervals.of("2018/2020"),
Intervals.of("2021/2022")
),
Intervals.of("2014/2022")
);
}
@Test
public void testIgnoreDurationToRetain()
{
Mockito.doReturn(true)
.when(config).getCoordinatorKillIgnoreDurationToRetain();
target = new KillUnusedSegments(segmentsMetadataManager, indexingServiceClient, config);
private void testFindIntervalForKill(List<Interval> segmentIntervals, Interval expected)
{
SegmentsMetadataManager segmentsMetadataManager = EasyMock.createMock(SegmentsMetadataManager.class);
EasyMock.expect(
segmentsMetadataManager.getUnusedSegmentIntervals(
EasyMock.anyString(),
EasyMock.anyObject(DateTime.class),
EasyMock.anyInt()
)
).andReturn(segmentIntervals);
EasyMock.replay(segmentsMetadataManager);
IndexingServiceClient indexingServiceClient = EasyMock.createMock(IndexingServiceClient.class);
// All future and past unused segments are killed
Interval expectedKillInterval = new Interval(
yearOldSegment.getInterval().getStart(),
nextMonthSegment.getInterval().getEnd()
);
runAndVerifyKillInterval(expectedKillInterval);
}
KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments(
segmentsMetadataManager,
indexingServiceClient,
new TestDruidCoordinatorConfig.Builder()
.withCoordinatorIndexingPeriod(Duration.parse("PT76400S"))
.withLoadTimeoutDelay(new Duration(1))
.withCoordinatorKillPeriod(Duration.parse("PT86400S"))
.withCoordinatorKillDurationToRetain(Duration.parse("PT86400S"))
.withCoordinatorKillMaxSegments(1000)
.withLoadQueuePeonRepeatDelay(Duration.ZERO)
.withCoordinatorKillIgnoreDurationToRetain(false)
.build()
);
@Test
public void testMaxSegmentsToKill()
{
Mockito.doReturn(1)
.when(config).getCoordinatorKillMaxSegments();
target = new KillUnusedSegments(segmentsMetadataManager, indexingServiceClient, config);
Assert.assertEquals(
expected,
unusedSegmentsKiller.findIntervalForKill("test", 10000)
);
}
// Only 1 unused segment is killed
runAndVerifyKillInterval(yearOldSegment.getInterval());
}
/**
* Test that retainDuration is properly set based on the value available in the
* Coordinator config. Positive and Negative durations should work as well as
* null, if and only if ignoreDurationToRetain is true.
*/
@Test
public void testRetainDurationValues()
{
// Positive duration to retain
KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments(
null,
null,
new TestDruidCoordinatorConfig.Builder()
.withCoordinatorIndexingPeriod(Duration.parse("PT76400S"))
.withLoadTimeoutDelay(new Duration(1))
.withCoordinatorKillPeriod(Duration.parse("PT86400S"))
.withCoordinatorKillDurationToRetain(Duration.parse("PT86400S"))
.withCoordinatorKillMaxSegments(1000)
.withLoadQueuePeonRepeatDelay(Duration.ZERO)
.withCoordinatorKillIgnoreDurationToRetain(false)
.build()
);
Assert.assertEquals((Long) Duration.parse("PT86400S").getMillis(), unusedSegmentsKiller.getRetainDuration());
private void runAndVerifyKillInterval(Interval expectedKillInterval)
{
target.run(params);
Mockito.verify(indexingServiceClient, Mockito.times(1)).killUnusedSegments(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("DS1"),
ArgumentMatchers.eq(expectedKillInterval)
);
}
// Negative duration to retain
unusedSegmentsKiller = new KillUnusedSegments(
null,
null,
new TestDruidCoordinatorConfig.Builder()
.withCoordinatorIndexingPeriod(Duration.parse("PT76400S"))
.withLoadTimeoutDelay(new Duration(1))
.withCoordinatorKillPeriod(Duration.parse("PT86400S"))
.withCoordinatorKillDurationToRetain(Duration.parse("PT-86400S"))
.withCoordinatorKillMaxSegments(1000)
.withLoadQueuePeonRepeatDelay(Duration.ZERO)
.withCoordinatorKillIgnoreDurationToRetain(false)
.build()
);
Assert.assertEquals((Long) Duration.parse("PT-86400S").getMillis(), unusedSegmentsKiller.getRetainDuration());
}
/**
* Test that the end time upper limit is properly computated for both positive and
* negative durations. Also ensure that if durationToRetain is to be ignored, that
* the upper limit is {@link DateTime} max time.
*/
@Test
public void testGetEndTimeUpperLimit()
{
// If ignoreDurationToRetain is true, ignore the value configured for durationToRetain and return 9999-12-31T23:59
KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments(
null,
null,
new TestDruidCoordinatorConfig.Builder()
.withCoordinatorIndexingPeriod(Duration.parse("PT76400S"))
.withLoadTimeoutDelay(new Duration(1))
.withCoordinatorKillPeriod(Duration.parse("PT86400S"))
.withCoordinatorKillDurationToRetain(Duration.parse("PT86400S"))
.withCoordinatorKillMaxSegments(1000)
.withLoadQueuePeonRepeatDelay(Duration.ZERO)
.withCoordinatorKillIgnoreDurationToRetain(true)
.build()
);
Assert.assertEquals(
DateTimes.COMPARE_DATE_AS_STRING_MAX,
unusedSegmentsKiller.getEndTimeUpperLimit()
);
// Testing a negative durationToRetain period returns proper date in future
unusedSegmentsKiller = new KillUnusedSegments(
null,
null,
new TestDruidCoordinatorConfig.Builder()
.withCoordinatorIndexingPeriod(Duration.parse("PT76400S"))
.withLoadTimeoutDelay(new Duration(1))
.withCoordinatorKillPeriod(Duration.parse("PT86400S"))
.withCoordinatorKillDurationToRetain(Duration.parse("PT-86400S"))
.withCoordinatorKillMaxSegments(1000)
.withLoadQueuePeonRepeatDelay(Duration.ZERO)
.withCoordinatorKillIgnoreDurationToRetain(false)
.build()
);
DateTime expectedTime = DateTimes.nowUtc().minus(Duration.parse("PT-86400S").getMillis());
Assert.assertEquals(expectedTime, unusedSegmentsKiller.getEndTimeUpperLimit());
// Testing a positive durationToRetain period returns expected value in the past
unusedSegmentsKiller = new KillUnusedSegments(
null,
null,
new TestDruidCoordinatorConfig.Builder()
.withCoordinatorIndexingPeriod(Duration.parse("PT76400S"))
.withLoadTimeoutDelay(new Duration(1))
.withCoordinatorKillPeriod(Duration.parse("PT86400S"))
.withCoordinatorKillDurationToRetain(Duration.parse("PT86400S"))
.withCoordinatorKillMaxSegments(1000)
.withLoadQueuePeonRepeatDelay(Duration.ZERO)
.withCoordinatorKillIgnoreDurationToRetain(false)
.build()
);
expectedTime = DateTimes.nowUtc().minus(Duration.parse("PT86400S").getMillis());
Assert.assertEquals(expectedTime, unusedSegmentsKiller.getEndTimeUpperLimit());
}
private DataSegment createSegmentWithEnd(DateTime endTime)
{
return new DataSegment(
"DS1",
new Interval(Period.days(1), endTime),
DateTimes.nowUtc().toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
1,
0
);
}
}

View File

@ -453,7 +453,6 @@ public class CoordinatorSimulationBuilder
.withCoordinatorStartDelay(new Duration(1L))
.withCoordinatorPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD))
.withCoordinatorKillPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD))
.withLoadQueuePeonRepeatDelay(new Duration("PT0S"))
.withLoadQueuePeonType("http")
.withCoordinatorKillIgnoreDurationToRetain(false)
.build();

View File

@ -78,6 +78,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
1,
true,
1,
1,
2,
@ -100,6 +101,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
1,
true,
1,
1,
2,
@ -122,6 +124,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
1,
true,
1,
1,
2,
@ -144,6 +147,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
1,
true,
1,
1,
2,
@ -158,7 +162,10 @@ public class CoordinatorDynamicConfigTest
Integer.MAX_VALUE
);
actual = CoordinatorDynamicConfig.builder().withPercentOfSegmentsToConsiderPerMove(10).build(actual);
actual = CoordinatorDynamicConfig.builder()
.withPercentOfSegmentsToConsiderPerMove(10)
.withUseBatchedSegmentSampler(false)
.build(actual);
assertConfig(
actual,
1,
@ -166,6 +173,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
10,
false,
1,
1,
2,
@ -188,6 +196,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
10,
false,
1,
1,
2,
@ -210,6 +219,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
10,
false,
1,
1,
2,
@ -315,7 +325,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
100,
1,
true, 1,
1,
2,
true,
@ -337,6 +347,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
100,
true,
1,
1,
2,
@ -359,6 +370,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
100,
true,
1,
1,
2,
@ -407,6 +419,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
1,
true,
1,
1,
2,
@ -525,6 +538,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
100,
true,
1,
1,
2,
@ -574,6 +588,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
1,
true,
1,
1,
2,
@ -638,6 +653,7 @@ public class CoordinatorDynamicConfigTest
1,
1,
1,
true,
1,
1,
2,
@ -665,6 +681,7 @@ public class CoordinatorDynamicConfigTest
100,
5,
100,
true,
15,
10,
1,
@ -695,6 +712,7 @@ public class CoordinatorDynamicConfigTest
100,
5,
100,
true,
15,
10,
1,
@ -785,6 +803,7 @@ public class CoordinatorDynamicConfigTest
int expectedMergeSegmentsLimit,
int expectedMaxSegmentsToMove,
int expectedPercentOfSegmentsToConsiderPerMove,
boolean expectedUseBatchedSegmentSampler,
int expectedReplicantLifetime,
int expectedReplicationThrottleLimit,
int expectedBalancerComputeThreads,
@ -807,6 +826,7 @@ public class CoordinatorDynamicConfigTest
Assert.assertEquals(expectedMergeSegmentsLimit, config.getMergeSegmentsLimit());
Assert.assertEquals(expectedMaxSegmentsToMove, config.getMaxSegmentsToMove());
Assert.assertEquals(expectedPercentOfSegmentsToConsiderPerMove, config.getPercentOfSegmentsToConsiderPerMove(), 0);
Assert.assertEquals(expectedUseBatchedSegmentSampler, config.useBatchedSegmentSampler());
Assert.assertEquals(expectedReplicantLifetime, config.getReplicantLifetime());
Assert.assertEquals(expectedReplicationThrottleLimit, config.getReplicationThrottleLimit());
Assert.assertEquals(expectedBalancerComputeThreads, config.getBalancerComputeThreads());

View File

@ -197,7 +197,7 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: Field<CoordinatorDynamicConfig>[
{
name: 'useBatchedSegmentSampler',
type: 'boolean',
defaultValue: false,
defaultValue: true,
info: (
<>
Boolean flag for whether or not we should use the Reservoir Sampling with a reservoir of