diff --git a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java index 6478614ba57..4a48137a888 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java @@ -105,9 +105,7 @@ public interface BalancerStrategy } /** - * Pick the best segments to move from one of the supplied set of servers according to the balancing strategy. This - * is the deprecated way of picking a segment to move. pickSegmentsToMove(List, Set, int) uses - * a more performant bathced sampling method that will become the default picking mode in the future. + * Pick the best segments to move from one of the supplied set of servers according to the balancing strategy. * * @param serverHolders set of historicals to consider for moving segments * @param broadcastDatasources Datasources that contain segments which were loaded via broadcast rules. @@ -122,6 +120,10 @@ public interface BalancerStrategy * for implementations of this method. * @return Iterator for set of {@link BalancerSegmentHolder} containing segment to move and server they currently * reside on, or empty if there are no segments to pick from (i. e. all provided serverHolders are empty). + * + * @deprecated Use {@link #pickSegmentsToMove(List, Set, int)} instead as it is + * a much more performant sampling method which does not allow duplicates. This + * method will be removed in future releases. */ @Deprecated default Iterator pickSegmentsToMove( diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index 475a8f88cde..b9f0d490a3d 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -56,6 +56,7 @@ public class CoordinatorDynamicConfig private final int maxSegmentsToMove; @Deprecated private final double percentOfSegmentsToConsiderPerMove; + @Deprecated private final boolean useBatchedSegmentSampler; private final int replicantLifetime; private final int replicationThrottleLimit; @@ -115,7 +116,7 @@ public class CoordinatorDynamicConfig @JsonProperty("mergeSegmentsLimit") int mergeSegmentsLimit, @JsonProperty("maxSegmentsToMove") int maxSegmentsToMove, @Deprecated @JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Double percentOfSegmentsToConsiderPerMove, - @JsonProperty("useBatchedSegmentSampler") boolean useBatchedSegmentSampler, + @Deprecated @JsonProperty("useBatchedSegmentSampler") Boolean useBatchedSegmentSampler, @JsonProperty("replicantLifetime") int replicantLifetime, @JsonProperty("replicationThrottleLimit") int replicationThrottleLimit, @JsonProperty("balancerComputeThreads") int balancerComputeThreads, @@ -161,7 +162,12 @@ public class CoordinatorDynamicConfig ); this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove; - this.useBatchedSegmentSampler = useBatchedSegmentSampler; + if (useBatchedSegmentSampler == null) { + this.useBatchedSegmentSampler = Builder.DEFAULT_USE_BATCHED_SEGMENT_SAMPLER; + } else { + this.useBatchedSegmentSampler = useBatchedSegmentSampler; + } + this.replicantLifetime = replicantLifetime; this.replicationThrottleLimit = replicationThrottleLimit; this.balancerComputeThreads = Math.max(balancerComputeThreads, 1); @@ -276,6 +282,7 @@ public class CoordinatorDynamicConfig return percentOfSegmentsToConsiderPerMove; } + @Deprecated @JsonProperty public boolean useBatchedSegmentSampler() { @@ -517,7 +524,7 @@ public class CoordinatorDynamicConfig private static final int DEFAULT_REPLICATION_THROTTLE_LIMIT = 10; private static final int DEFAULT_BALANCER_COMPUTE_THREADS = 1; private static final boolean DEFAULT_EMIT_BALANCING_STATS = false; - private static final boolean DEFAULT_USE_BATCHED_SEGMENT_SAMPLER = false; + private static final boolean DEFAULT_USE_BATCHED_SEGMENT_SAMPLER = true; private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 100; private static final int DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70; private static final boolean DEFAULT_PAUSE_COORDINATION = false; @@ -557,7 +564,7 @@ public class CoordinatorDynamicConfig @JsonProperty("mergeSegmentsLimit") @Nullable Integer mergeSegmentsLimit, @JsonProperty("maxSegmentsToMove") @Nullable Integer maxSegmentsToMove, @Deprecated @JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Double percentOfSegmentsToConsiderPerMove, - @JsonProperty("useBatchedSegmentSampler") Boolean useBatchedSegmentSampler, + @Deprecated @JsonProperty("useBatchedSegmentSampler") Boolean useBatchedSegmentSampler, @JsonProperty("replicantLifetime") @Nullable Integer replicantLifetime, @JsonProperty("replicationThrottleLimit") @Nullable Integer replicationThrottleLimit, @JsonProperty("balancerComputeThreads") @Nullable Integer balancerComputeThreads, @@ -627,6 +634,7 @@ public class CoordinatorDynamicConfig return this; } + @Deprecated public Builder withUseBatchedSegmentSampler(boolean useBatchedSegmentSampler) { this.useBatchedSegmentSampler = useBatchedSegmentSampler; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java index cdd708ea9d3..9495f30f2c8 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java @@ -101,12 +101,6 @@ public abstract class DruidCoordinatorConfig return new Duration(15 * 60 * 1000); } - @Config("druid.coordinator.loadqueuepeon.repeatDelay") - public Duration getLoadQueuePeonRepeatDelay() - { - return Duration.millis(50); - } - @Config("druid.coordinator.loadqueuepeon.type") public String getLoadQueuePeonType() { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java index 2ab3762ba92..f1659b33350 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java @@ -19,7 +19,6 @@ package org.apache.druid.server.coordinator.duty; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.inject.Inject; import org.apache.druid.client.indexing.IndexingServiceClient; @@ -33,7 +32,6 @@ import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTime; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.util.Collection; import java.util.List; @@ -43,7 +41,7 @@ import java.util.List; * negative meaning the interval end target will be in the future. Also, retainDuration can be ignored, * meaning that there is no upper bound to the end interval of segments that will be killed. This action is called * "to kill a segment". - * + *

* See org.apache.druid.indexing.common.task.KillUnusedSegmentsTask. */ public class KillUnusedSegments implements CoordinatorDuty @@ -102,74 +100,69 @@ public class KillUnusedSegments implements CoordinatorDuty Collection dataSourcesToKill = params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn(); + // If no datasource has been specified, all are eligible for killing unused segments if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) { dataSourcesToKill = segmentsMetadataManager.retrieveAllDataSourceNames(); } - if (dataSourcesToKill != null && - dataSourcesToKill.size() > 0 && - (lastKillTime + period) < System.currentTimeMillis()) { - lastKillTime = System.currentTimeMillis(); - - for (String dataSource : dataSourcesToKill) { - final Interval intervalToKill = findIntervalForKill(dataSource, maxSegmentsToKill); - if (intervalToKill != null) { - try { - indexingServiceClient.killUnusedSegments("coordinator-issued", dataSource, intervalToKill); - } - catch (Exception ex) { - log.error(ex, "Failed to submit kill task for dataSource [%s]", dataSource); - if (Thread.currentThread().isInterrupted()) { - log.warn("skipping kill task scheduling because thread is interrupted."); - break; - } - } - } - } + final long currentTimeMillis = System.currentTimeMillis(); + if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) { + log.debug("No eligible datasource to kill unused segments."); + } else if (lastKillTime + period > currentTimeMillis) { + log.debug("Skipping kill of unused segments as kill period has not elapsed yet."); + } else { + log.debug("Killing unused segments in datasources: %s", dataSourcesToKill); + lastKillTime = currentTimeMillis; + killUnusedSegments(dataSourcesToKill); } + return params; } - /** - * For a given datasource and limit of segments that can be killed in one task, determine the interval to be - * submitted with the kill task. - * - * @param dataSource dataSource whose unused segments are being killed. - * @param limit the maximum number of segments that can be included in the kill task. - * @return {@link Interval} to be used in the kill task. - */ - @VisibleForTesting - @Nullable - Interval findIntervalForKill(String dataSource, int limit) + private void killUnusedSegments(Collection dataSourcesToKill) { - List unusedSegmentIntervals = - segmentsMetadataManager.getUnusedSegmentIntervals(dataSource, getEndTimeUpperLimit(), limit); + int submittedTasks = 0; + for (String dataSource : dataSourcesToKill) { + final Interval intervalToKill = findIntervalForKill(dataSource); + if (intervalToKill == null) { + continue; + } - if (unusedSegmentIntervals != null && unusedSegmentIntervals.size() > 0) { - return JodaUtils.umbrellaInterval(unusedSegmentIntervals); - } else { + try { + indexingServiceClient.killUnusedSegments("coordinator-issued", dataSource, intervalToKill); + ++submittedTasks; + } + catch (Exception ex) { + log.error(ex, "Failed to submit kill task for dataSource [%s]", dataSource); + if (Thread.currentThread().isInterrupted()) { + log.warn("skipping kill task scheduling because thread is interrupted."); + break; + } + } + } + + log.debug("Submitted kill tasks for [%d] datasources.", submittedTasks); + } + + /** + * Calculates the interval for which segments are to be killed in a datasource. + */ + private Interval findIntervalForKill(String dataSource) + { + final DateTime maxEndTime = ignoreRetainDuration + ? DateTimes.COMPARE_DATE_AS_STRING_MAX + : DateTimes.nowUtc().minus(retainDuration); + + List unusedSegmentIntervals = segmentsMetadataManager + .getUnusedSegmentIntervals(dataSource, maxEndTime, maxSegmentsToKill); + + if (CollectionUtils.isNullOrEmpty(unusedSegmentIntervals)) { return null; + } else if (unusedSegmentIntervals.size() == 1) { + return unusedSegmentIntervals.get(0); + } else { + return JodaUtils.umbrellaInterval(unusedSegmentIntervals); } } - /** - * Calculate the {@link DateTime} that wil form the upper bound when looking for segments that are - * eligible to be killed. If ignoreDurationToRetain is true, we have no upper bound and return a DateTime object - * for "max" time that works when comparing date strings. - * - * @return {@link DateTime} representing the upper bound time used when looking for segments to kill. - */ - @VisibleForTesting - DateTime getEndTimeUpperLimit() - { - return ignoreRetainDuration - ? DateTimes.COMPARE_DATE_AS_STRING_MAX - : DateTimes.nowUtc().minus(retainDuration); - } - - @VisibleForTesting - Long getRetainDuration() - { - return retainDuration; - } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java index a7c594fe094..e2bb7a816ba 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java @@ -238,7 +238,7 @@ public class BalanceSegmentsTest new ServerHolder(druidServer2, peon2, true) ), broadcastDatasources, - 100.0 + 2 ) ).andReturn( ImmutableList.of( @@ -247,7 +247,7 @@ public class BalanceSegmentsTest ).iterator() ); - EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyDouble())) + EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) .andReturn( ImmutableList.of( new BalancerSegmentHolder(druidServer1, segment1), @@ -297,7 +297,7 @@ public class BalanceSegmentsTest DruidCoordinatorRuntimeParams params = setupParamsForDecommissioningMaxPercentOfMaxSegmentsToMove(10); params = new BalanceSegmentsTester(coordinator).run(params); Assert.assertEquals(1L, params.getCoordinatorStats().getTieredStat("movedCount", "normal")); - Assert.assertEquals(ImmutableSet.of(segment2), peon3.getSegmentsToLoad()); + Assert.assertEquals(ImmutableSet.of(segment1), peon3.getSegmentsToLoad()); } /** @@ -316,7 +316,7 @@ public class BalanceSegmentsTest BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); EasyMock.expect( - strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyDouble())) + strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) .andReturn( ImmutableList.of( new BalancerSegmentHolder(druidServer1, segment2), @@ -367,7 +367,7 @@ public class BalanceSegmentsTest mockCoordinator(coordinator); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyDouble())) + EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) .andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer1, segment1)).iterator()) .anyTimes(); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())).andAnswer(() -> { @@ -403,7 +403,7 @@ public class BalanceSegmentsTest ServerHolder holder2 = new ServerHolder(druidServer2, peon2, false); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyDouble())) + EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) .andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer1, segment1)).iterator()) .once(); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())) @@ -592,6 +592,7 @@ public class BalanceSegmentsTest .withDynamicConfigs( CoordinatorDynamicConfig.builder() .withMaxSegmentsToMove(1) + .withUseBatchedSegmentSampler(false) .withPercentOfSegmentsToConsiderPerMove(40) .build() ) @@ -788,7 +789,7 @@ public class BalanceSegmentsTest ).andReturn( ImmutableList.of(new BalancerSegmentHolder(druidServer2, segment2)).iterator() ); - EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyDouble())) + EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) .andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer1, segment1)).iterator()); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())) .andReturn(new ServerHolder(druidServer3, peon3)) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java index 926bccc53e1..4538bc867b7 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java @@ -164,7 +164,6 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase .withCoordinatorPeriod(new Duration(COORDINATOR_PERIOD)) .withCoordinatorKillPeriod(new Duration(COORDINATOR_PERIOD)) .withCoordinatorKillMaxSegments(10) - .withLoadQueuePeonRepeatDelay(new Duration("PT0s")) .withCoordinatorKillIgnoreDurationToRetain(false) .build(); sourceLoadQueueChildrenCache = new PathChildrenCache( diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java index cfbe2b7de97..db264e55e02 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java @@ -46,7 +46,6 @@ public class DruidCoordinatorConfigTest Assert.assertEquals(7776000000L, config.getCoordinatorKillDurationToRetain().getMillis()); Assert.assertEquals(100, config.getCoordinatorKillMaxSegments()); Assert.assertEquals(new Duration(15 * 60 * 1000), config.getLoadTimeoutDelay()); - Assert.assertEquals(Duration.millis(50), config.getLoadQueuePeonRepeatDelay()); Assert.assertTrue(config.getCompactionSkipLockedIntervals()); Assert.assertFalse(config.getCoordinatorKillIgnoreDurationToRetain()); Assert.assertEquals("http", config.getLoadQueuePeonType()); @@ -76,7 +75,6 @@ public class DruidCoordinatorConfigTest Assert.assertEquals(new Duration("PT1s"), config.getCoordinatorKillDurationToRetain()); Assert.assertEquals(10000, config.getCoordinatorKillMaxSegments()); Assert.assertEquals(new Duration("PT1s"), config.getLoadTimeoutDelay()); - Assert.assertEquals(Duration.millis(100), config.getLoadQueuePeonRepeatDelay()); Assert.assertFalse(config.getCompactionSkipLockedIntervals()); Assert.assertTrue(config.getCoordinatorKillIgnoreDurationToRetain()); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 980c336aa2d..a0502d95448 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -148,7 +148,6 @@ public class DruidCoordinatorTest extends CuratorTestBase .withCoordinatorStartDelay(new Duration(COORDINATOR_START_DELAY)) .withCoordinatorPeriod(new Duration(COORDINATOR_PERIOD)) .withCoordinatorKillPeriod(new Duration(COORDINATOR_PERIOD)) - .withLoadQueuePeonRepeatDelay(new Duration("PT0s")) .withCoordinatorKillIgnoreDurationToRetain(false) .build(); pathChildrenCache = new PathChildrenCache( @@ -833,7 +832,6 @@ public class DruidCoordinatorTest extends CuratorTestBase .withCoordinatorPeriod(new Duration(COORDINATOR_PERIOD)) .withCoordinatorKillPeriod(new Duration(COORDINATOR_PERIOD)) .withCoordinatorKillMaxSegments(10) - .withLoadQueuePeonRepeatDelay(new Duration("PT0s")) .withCompactionSkippedLockedIntervals(false) .withCoordinatorKillIgnoreDurationToRetain(false) .build(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java index a1a1f06e05b..aed032b196d 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java @@ -75,7 +75,6 @@ public class HttpLoadQueuePeonTest final TestDruidCoordinatorConfig config = new TestDruidCoordinatorConfig.Builder() .withCoordinatorKillMaxSegments(10) - .withLoadQueuePeonRepeatDelay(Duration.ZERO) .withCoordinatorKillIgnoreDurationToRetain(false) .withHttpLoadQueuePeonBatchSize(2) .build(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java index 6410f9e5ef4..3929aa64d59 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java @@ -90,7 +90,6 @@ public class LoadQueuePeonTest extends CuratorTestBase Execs.singleThreaded("test_load_queue_peon-%d"), new TestDruidCoordinatorConfig.Builder() .withCoordinatorKillMaxSegments(10) - .withLoadQueuePeonRepeatDelay(Duration.millis(0)) .withCoordinatorKillIgnoreDurationToRetain(false) .build() ); @@ -281,7 +280,6 @@ public class LoadQueuePeonTest extends CuratorTestBase new TestDruidCoordinatorConfig.Builder() .withLoadTimeoutDelay(new Duration(1)) .withCoordinatorKillMaxSegments(10) - .withLoadQueuePeonRepeatDelay(new Duration("PT1s")) .withCoordinatorKillIgnoreDurationToRetain(false) .build() ); @@ -322,7 +320,6 @@ public class LoadQueuePeonTest extends CuratorTestBase new TestDruidCoordinatorConfig.Builder() .withLoadTimeoutDelay(new Duration(1)) .withCoordinatorKillMaxSegments(10) - .withLoadQueuePeonRepeatDelay(new Duration("PT1s")) .withCoordinatorKillIgnoreDurationToRetain(false) .build() ); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java index 9abcb105c39..a7b92eaf8d0 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java @@ -40,7 +40,6 @@ public class LoadQueuePeonTester extends CuratorLoadQueuePeon new TestDruidCoordinatorConfig.Builder() .withLoadTimeoutDelay(new Duration(1)) .withCoordinatorKillMaxSegments(10) - .withLoadQueuePeonRepeatDelay(new Duration("PT1s")) .withCoordinatorKillIgnoreDurationToRetain(false) .build() ); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java index 70086abd772..4181ba2bbe7 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java @@ -19,173 +19,65 @@ package org.apache.druid.server.coordinator; -import com.google.common.collect.Lists; -import org.apache.druid.client.ImmutableDruidServer; -import org.apache.druid.client.ImmutableDruidServerTests; -import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.client.DruidServer; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.partition.NoneShardSpec; -import org.easymock.EasyMock; -import org.joda.time.DateTime; -import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; public class ReservoirSegmentSamplerTest { - private ImmutableDruidServer druidServer1; - private ImmutableDruidServer druidServer2; - private ImmutableDruidServer druidServer3; - private ImmutableDruidServer druidServer4; - private ServerHolder holder1; - private ServerHolder holder2; - private ServerHolder holder3; - private ServerHolder holder4; - - private DataSegment segment1; - private DataSegment segment2; - private DataSegment segment3; - private DataSegment segment4; - List segments1; - List segments2; - List segments3; - List segments4; - List segments; + /** + * num segments = 10 x 100 days + */ + private final List segments = + CreateDataSegments.ofDatasource("wiki") + .forIntervals(100, Granularities.DAY) + .startingAt("2022-01-01") + .withNumPartitions(10) + .eachOfSizeInMb(100); @Before public void setUp() { - druidServer1 = EasyMock.createMock(ImmutableDruidServer.class); - druidServer2 = EasyMock.createMock(ImmutableDruidServer.class); - druidServer3 = EasyMock.createMock(ImmutableDruidServer.class); - druidServer4 = EasyMock.createMock(ImmutableDruidServer.class); - holder1 = EasyMock.createMock(ServerHolder.class); - holder2 = EasyMock.createMock(ServerHolder.class); - holder3 = EasyMock.createMock(ServerHolder.class); - holder4 = EasyMock.createMock(ServerHolder.class); - segment1 = EasyMock.createMock(DataSegment.class); - segment2 = EasyMock.createMock(DataSegment.class); - segment3 = EasyMock.createMock(DataSegment.class); - segment4 = EasyMock.createMock(DataSegment.class); - - DateTime start1 = DateTimes.of("2012-01-01"); - DateTime start2 = DateTimes.of("2012-02-01"); - DateTime version = DateTimes.of("2012-03-01"); - segment1 = new DataSegment( - "datasource1", - new Interval(start1, start1.plusHours(1)), - version.toString(), - new HashMap<>(), - new ArrayList<>(), - new ArrayList<>(), - NoneShardSpec.instance(), - 0, - 11L - ); - segment2 = new DataSegment( - "datasource1", - new Interval(start2, start2.plusHours(1)), - version.toString(), - new HashMap<>(), - new ArrayList<>(), - new ArrayList<>(), - NoneShardSpec.instance(), - 0, - 7L - ); - segment3 = new DataSegment( - "datasource2", - new Interval(start1, start1.plusHours(1)), - version.toString(), - new HashMap<>(), - new ArrayList<>(), - new ArrayList<>(), - NoneShardSpec.instance(), - 0, - 4L - ); - segment4 = new DataSegment( - "datasource2", - new Interval(start2, start2.plusHours(1)), - version.toString(), - new HashMap<>(), - new ArrayList<>(), - new ArrayList<>(), - NoneShardSpec.instance(), - 0, - 8L - ); - - segments = Lists.newArrayList(segment1, segment2, segment3, segment4); - - segments1 = Collections.singletonList(segment1); - segments2 = Collections.singletonList(segment2); - segments3 = Collections.singletonList(segment3); - segments4 = Collections.singletonList(segment4); } - //checks if every segment is selected at least once out of 5000 trials + //checks if every segment is selected at least once out of 50 trials @Test - public void getRandomBalancerSegmentHolderTest() + public void testEverySegmentGetsPickedAtleastOnce() { - int iterations = 5000; - - EasyMock.expect(druidServer1.getType()).andReturn(ServerType.HISTORICAL).times(iterations); - ImmutableDruidServerTests.expectSegments(druidServer1, segments1); - EasyMock.replay(druidServer1); - - EasyMock.expect(druidServer2.getType()).andReturn(ServerType.HISTORICAL).times(iterations); - ImmutableDruidServerTests.expectSegments(druidServer2, segments2); - EasyMock.replay(druidServer2); - - EasyMock.expect(druidServer3.getType()).andReturn(ServerType.HISTORICAL).times(iterations); - ImmutableDruidServerTests.expectSegments(druidServer3, segments3); - EasyMock.replay(druidServer3); - - EasyMock.expect(druidServer4.getType()).andReturn(ServerType.HISTORICAL).times(iterations); - ImmutableDruidServerTests.expectSegments(druidServer4, segments4); - EasyMock.replay(druidServer4); - - // Have to use anyTimes() because the number of times a segment on a given server is chosen is indetermistic. - EasyMock.expect(holder1.getServer()).andReturn(druidServer1).anyTimes(); - EasyMock.replay(holder1); - EasyMock.expect(holder2.getServer()).andReturn(druidServer2).anyTimes(); - EasyMock.replay(holder2); - EasyMock.expect(holder3.getServer()).andReturn(druidServer3).anyTimes(); - EasyMock.replay(holder3); - EasyMock.expect(holder4.getServer()).andReturn(druidServer4).anyTimes(); - EasyMock.replay(holder4); - - List holderList = new ArrayList<>(); - holderList.add(holder1); - holderList.add(holder2); - holderList.add(holder3); - holderList.add(holder4); + int iterations = 50; + final List servers = Arrays.asList( + createHistorical("server1", segments.get(0)), + createHistorical("server2", segments.get(1)), + createHistorical("server3", segments.get(2)), + createHistorical("server4", segments.get(3)) + ); Map segmentCountMap = new HashMap<>(); for (int i = 0; i < iterations; i++) { // due to the pseudo-randomness of this method, we may not select a segment every single time no matter what. - segmentCountMap.put( - ReservoirSegmentSampler.getRandomBalancerSegmentHolders(holderList, Collections.emptySet(), 1).get(0).getSegment(), - 1 + segmentCountMap.compute( + ReservoirSegmentSampler + .getRandomBalancerSegmentHolders(servers, Collections.emptySet(), 1) + .get(0).getSegment(), + (segment, count) -> count == null ? 1 : count + 1 ); } - for (DataSegment segment : segments) { - Assert.assertEquals(new Integer(1), segmentCountMap.get(segment)); - } - - EasyMock.verify(druidServer1, druidServer2, druidServer3, druidServer4); - EasyMock.verify(holder1, holder2, holder3, holder4); + // Verify that each segment has been chosen at least once + Assert.assertEquals(4, segmentCountMap.size()); } /** @@ -195,56 +87,149 @@ public class ReservoirSegmentSamplerTest @Test public void getRandomBalancerSegmentHolderTestSegmentsToConsiderLimit() { - int iterations = 5000; + int iterations = 50; - EasyMock.expect(druidServer1.getType()).andReturn(ServerType.HISTORICAL).times(iterations); - ImmutableDruidServerTests.expectSegments(druidServer1, segments1); - EasyMock.replay(druidServer1); - - EasyMock.expect(druidServer2.getType()).andReturn(ServerType.HISTORICAL).times(iterations); - ImmutableDruidServerTests.expectSegments(druidServer2, segments2); - EasyMock.replay(druidServer2); - - EasyMock.expect(druidServer3.getType()).andReturn(ServerType.HISTORICAL).times(iterations); - ImmutableDruidServerTests.expectSegments(druidServer3, segments3); - EasyMock.replay(druidServer3); - - ImmutableDruidServerTests.expectSegments(druidServer4, segments4); - EasyMock.replay(druidServer4); - - // Have to use anyTimes() because the number of times a segment on a given server is chosen is indetermistic. - EasyMock.expect(holder1.getServer()).andReturn(druidServer1).anyTimes(); - EasyMock.replay(holder1); - EasyMock.expect(holder2.getServer()).andReturn(druidServer2).anyTimes(); - EasyMock.replay(holder2); - EasyMock.expect(holder3.getServer()).andReturn(druidServer3).anyTimes(); - EasyMock.replay(holder3); - // We only run getServer() each time we calculate the limit on segments to consider. Always 5k - EasyMock.expect(holder4.getServer()).andReturn(druidServer4).times(5000); - EasyMock.replay(holder4); - - List holderList = new ArrayList<>(); - holderList.add(holder1); - holderList.add(holder2); - holderList.add(holder3); - holderList.add(holder4); + final DataSegment excludedSegment = segments.get(3); + final List servers = Arrays.asList( + createHistorical("server1", segments.get(0)), + createHistorical("server2", segments.get(1)), + createHistorical("server3", segments.get(2)), + createHistorical("server4", excludedSegment) + ); Map segmentCountMap = new HashMap<>(); + + final double percentOfSegmentsToConsider = 75.0; for (int i = 0; i < iterations; i++) { - segmentCountMap.put( - ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet(), 75).getSegment(), 1 + segmentCountMap.compute( + ReservoirSegmentSampler + .getRandomBalancerSegmentHolder(servers, Collections.emptySet(), percentOfSegmentsToConsider) + .getSegment(), + (segment, count) -> count == null ? 1 : count + 1 ); } - for (DataSegment segment : segments) { - if (!segment.equals(segment4)) { - Assert.assertEquals(new Integer(1), segmentCountMap.get(segment)); - } else { - Assert.assertNull(segmentCountMap.get(segment)); + // Verify that the segment on server4 is never chosen because of limit + Assert.assertFalse(segmentCountMap.containsKey(excludedSegment)); + Assert.assertEquals(3, segmentCountMap.size()); + } + + @Test + public void testSegmentsOnBrokersAreIgnored() + { + final ServerHolder historical = createHistorical("hist1", segments.get(0), segments.get(1)); + + final ServerHolder broker = new ServerHolder( + new DruidServer("broker1", "broker1", null, 1000, ServerType.BROKER, null, 1) + .addDataSegment(segments.get(2)) + .addDataSegment(segments.get(3)) + .toImmutableDruidServer(), + new LoadQueuePeonTester() + ); + + // Try to pick all the segments on the servers + List pickedSegments = ReservoirSegmentSampler.getRandomBalancerSegmentHolders( + Arrays.asList(historical, broker), + Collections.emptySet(), + 10 + ); + + // Verify that only the segments on the historical are picked + Assert.assertEquals(2, pickedSegments.size()); + for (BalancerSegmentHolder holder : pickedSegments) { + Assert.assertEquals(historical.getServer(), holder.getFromServer()); + } + } + + @Test + public void testBroadcastSegmentsAreIgnored() + { + // num segments = 1 x 4 days + final String broadcastDatasource = "ds_broadcast"; + final List broadcastSegments + = CreateDataSegments.ofDatasource(broadcastDatasource) + .forIntervals(4, Granularities.DAY) + .startingAt("2022-01-01") + .withNumPartitions(1) + .eachOfSizeInMb(100); + + final List servers = Arrays.asList( + createHistorical("server1", broadcastSegments.toArray(new DataSegment[0])), + createHistorical("server2", segments.get(0), segments.get(1)) + ); + + // Try to pick all the segments on the servers + List pickedSegments = ReservoirSegmentSampler + .getRandomBalancerSegmentHolders(servers, Collections.singleton(broadcastDatasource), 10); + + // Verify that none of the broadcast segments are picked + Assert.assertEquals(2, pickedSegments.size()); + for (BalancerSegmentHolder holder : pickedSegments) { + Assert.assertNotEquals(broadcastDatasource, holder.getSegment().getDataSource()); + } + } + + @Test(timeout = 60_000) + public void testNumberOfIterationsToCycleThroughAllSegments() + { + // The number of runs required for each sample percentage + // remains more or less fixed, even with a larger number of segments + final int[] samplePercentages = {100, 50, 10, 5, 1}; + final int[] expectedIterations = {1, 20, 100, 200, 1000}; + + final int[] totalObservedIterations = new int[5]; + for (int i = 0; i < 50; ++i) { + for (int j = 0; j < samplePercentages.length; ++j) { + totalObservedIterations[j] += countMinRunsWithSamplePercent(samplePercentages[j]); } } - EasyMock.verify(druidServer1, druidServer2, druidServer3, druidServer4); - EasyMock.verify(holder1, holder2, holder3, holder4); + for (int j = 0; j < samplePercentages.length; ++j) { + double avgObservedIterations = totalObservedIterations[j] / 50.0; + Assert.assertTrue(avgObservedIterations <= expectedIterations[j]); + } + + } + + /** + * Returns the minimum number of iterations of the reservoir sampling required + * to pick each segment atleast once. + *

+ * {@code k = sampleSize = totalNumSegments * samplePercentage} + */ + private int countMinRunsWithSamplePercent(int samplePercentage) + { + final int numSegments = segments.size(); + final List servers = Arrays.asList( + createHistorical("server1", segments.subList(0, numSegments / 2).toArray(new DataSegment[0])), + createHistorical("server2", segments.subList(numSegments / 2, numSegments).toArray(new DataSegment[0])) + ); + + final Set pickedSegments = new HashSet<>(); + + int sampleSize = (int) (numSegments * samplePercentage / 100.0); + + int numIterations = 1; + for (; numIterations < 10000; ++numIterations) { + ReservoirSegmentSampler + .getRandomBalancerSegmentHolders(servers, Collections.emptySet(), sampleSize) + .forEach(holder -> pickedSegments.add(holder.getSegment())); + + if (pickedSegments.size() >= numSegments) { + break; + } + } + + return numIterations; + } + + private ServerHolder createHistorical(String serverName, DataSegment... loadedSegments) + { + final DruidServer server = + new DruidServer(serverName, serverName, null, 100000, ServerType.HISTORICAL, "normal", 1); + for (DataSegment segment : loadedSegments) { + server.addDataSegment(segment); + } + return new ServerHolder(server.toImmutableDruidServer(), new LoadQueuePeonTester()); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java index 8153dd79428..271bda11314 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java @@ -39,7 +39,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig private final Duration coordinatorRuleKillDurationToRetain; private final Duration coordinatorDatasourceKillPeriod; private final Duration coordinatorDatasourceKillDurationToRetain; - private final Duration loadQueuePeonRepeatDelay; private final int coordinatorKillMaxSegments; private final boolean compactionSkipLockedIntervals; private final boolean coordinatorKillIgnoreDurationToRetain; @@ -67,7 +66,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig Duration coordinatorDatasourceKillPeriod, Duration coordinatorDatasourceKillDurationToRetain, int coordinatorKillMaxSegments, - Duration loadQueuePeonRepeatDelay, boolean compactionSkipLockedIntervals, boolean coordinatorKillIgnoreDurationToRetain, String loadQueuePeonType, @@ -94,7 +92,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig this.coordinatorDatasourceKillPeriod = coordinatorDatasourceKillPeriod; this.coordinatorDatasourceKillDurationToRetain = coordinatorDatasourceKillDurationToRetain; this.coordinatorKillMaxSegments = coordinatorKillMaxSegments; - this.loadQueuePeonRepeatDelay = loadQueuePeonRepeatDelay; this.compactionSkipLockedIntervals = compactionSkipLockedIntervals; this.coordinatorKillIgnoreDurationToRetain = coordinatorKillIgnoreDurationToRetain; this.loadQueuePeonType = loadQueuePeonType; @@ -206,12 +203,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig return loadTimeoutDelay == null ? super.getLoadTimeoutDelay() : loadTimeoutDelay; } - @Override - public Duration getLoadQueuePeonRepeatDelay() - { - return loadQueuePeonRepeatDelay; - } - @Override public boolean getCompactionSkipLockedIntervals() { @@ -272,7 +263,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig private static final Duration DEFAULT_COORDINATOR_DATASOURCE_KILL_PERIOD = new Duration("PT86400s"); private static final Duration DEFAULT_COORDINATOR_DATASOURCE_KILL_DURATION_TO_RETAIN = new Duration("PT7776000s"); private static final Duration DEFAULT_LOAD_TIMEOUT_DELAY = new Duration(15 * 60 * 1000); - private static final Duration DEFAULT_LOAD_QUEUE_PEON_REPEAT_DELAY = Duration.millis(50); private static final String DEFAULT_LOAD_QUEUE_PEON_TYPE = "curator"; private static final int DEFAULT_CURATOR_LOAD_QUEUE_PEON_NUM_CALLBACK_THREADS = 2; private static final Duration DEFAULT_HTTP_LOAD_QUEUE_PEON_REPEAT_DELAY = Duration.millis(60000); @@ -299,7 +289,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig private Duration coordinatorDatasourceKillPeriod; private Duration coordinatorDatasourceKillDurationToRetain; private Duration loadTimeoutDelay; - private Duration loadQueuePeonRepeatDelay; private String loadQueuePeonType; private Duration httpLoadQueuePeonRepeatDelay; private Integer curatorLoadQueuePeonNumCallbackThreads; @@ -409,12 +398,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig return this; } - public Builder withLoadQueuePeonRepeatDelay(Duration loadQueuePeonRepeatDelay) - { - this.loadQueuePeonRepeatDelay = loadQueuePeonRepeatDelay; - return this; - } - public Builder withLoadQueuePeonType(String loadQueuePeonType) { this.loadQueuePeonType = loadQueuePeonType; @@ -483,7 +466,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig coordinatorDatasourceKillPeriod == null ? DEFAULT_COORDINATOR_DATASOURCE_KILL_PERIOD : coordinatorDatasourceKillPeriod, coordinatorDatasourceKillDurationToRetain == null ? DEFAULT_COORDINATOR_DATASOURCE_KILL_DURATION_TO_RETAIN : coordinatorDatasourceKillDurationToRetain, coordinatorKillMaxSegments == null ? DEFAULT_COORDINATOR_KILL_MAX_SEGMENTS : coordinatorKillMaxSegments, - loadQueuePeonRepeatDelay == null ? DEFAULT_LOAD_QUEUE_PEON_REPEAT_DELAY : loadQueuePeonRepeatDelay, compactionSkippedLockedIntervals == null ? DEFAULT_COMPACTION_SKIP_LOCKED_INTERVALS : compactionSkippedLockedIntervals, coordinatorKillIgnoreDurationToRetain == null ? DEFAULT_COORDINATOR_KILL_IGNORE_DURATION_TO_RETAIN : coordinatorKillIgnoreDurationToRetain, loadQueuePeonType == null ? DEFAULT_LOAD_QUEUE_PEON_TYPE : loadQueuePeonType, diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java index 48d730cd7c7..ed7fea5aaf3 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java @@ -20,281 +20,221 @@ package org.apache.druid.server.coordinator.duty; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.Intervals; import org.apache.druid.metadata.SegmentsMetadataManager; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.coordinator.DruidCoordinatorConfig; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; -import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig; -import org.easymock.EasyMock; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NoneShardSpec; import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Interval; -import org.junit.Assert; +import org.joda.time.Period; import org.junit.Before; import org.junit.Test; -import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; import org.mockito.Answers; +import org.mockito.ArgumentMatchers; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; -import java.util.Set; +import java.util.stream.Collectors; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; /** + * */ -@RunWith(Enclosed.class) +@RunWith(MockitoJUnitRunner.class) public class KillUnusedSegmentsTest { - /** - * Standing up new tests with mocks was easier than trying to move the existing tests to use mocks for consistency. - * In the future, if all tests are moved to use the same structure, this inner static class can be gotten rid of. - */ - @RunWith(MockitoJUnitRunner.class) - public static class MockedTest + private static final int MAX_SEGMENTS_TO_KILL = 10; + private static final Duration COORDINATOR_KILL_PERIOD = Duration.standardMinutes(2); + private static final Duration DURATION_TO_RETAIN = Duration.standardDays(1); + private static final Duration INDEXING_PERIOD = Duration.standardMinutes(1); + + @Mock + private SegmentsMetadataManager segmentsMetadataManager; + @Mock + private IndexingServiceClient indexingServiceClient; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private DruidCoordinatorConfig config; + + @Mock + private DruidCoordinatorRuntimeParams params; + @Mock + private CoordinatorDynamicConfig coordinatorDynamicConfig; + + private DataSegment yearOldSegment; + private DataSegment monthOldSegment; + private DataSegment dayOldSegment; + private DataSegment hourOldSegment; + private DataSegment nextDaySegment; + private DataSegment nextMonthSegment; + + private KillUnusedSegments target; + + @Before + public void setup() { - private static final Set ALL_DATASOURCES = ImmutableSet.of("DS1", "DS2", "DS3"); - private static final int MAX_SEGMENTS_TO_KILL = 10; - private static final Duration COORDINATOR_KILL_PERIOD = Duration.standardMinutes(2); - private static final Duration DURATION_TO_RETAIN = Duration.standardDays(1); - private static final Duration INDEXING_PERIOD = Duration.standardMinutes(1); + Mockito.doReturn(coordinatorDynamicConfig).when(params).getCoordinatorDynamicConfig(); + Mockito.doReturn(COORDINATOR_KILL_PERIOD).when(config).getCoordinatorKillPeriod(); + Mockito.doReturn(DURATION_TO_RETAIN).when(config).getCoordinatorKillDurationToRetain(); + Mockito.doReturn(INDEXING_PERIOD).when(config).getCoordinatorIndexingPeriod(); + Mockito.doReturn(MAX_SEGMENTS_TO_KILL).when(config).getCoordinatorKillMaxSegments(); - @Mock - private SegmentsMetadataManager segmentsMetadataManager; - @Mock - private IndexingServiceClient indexingServiceClient; - @Mock(answer = Answers.RETURNS_DEEP_STUBS) - private DruidCoordinatorConfig config; + Mockito.doReturn(Collections.singleton("DS1")) + .when(coordinatorDynamicConfig).getSpecificDataSourcesToKillUnusedSegmentsIn(); - @Mock - private DruidCoordinatorRuntimeParams params; - @Mock - private CoordinatorDynamicConfig coordinatorDynamicConfig; - private KillUnusedSegments target; + final DateTime now = DateTimes.nowUtc(); - @Before - public void setup() - { - Mockito.doReturn(coordinatorDynamicConfig).when(params).getCoordinatorDynamicConfig(); - Mockito.doReturn(ALL_DATASOURCES).when(segmentsMetadataManager).retrieveAllDataSourceNames(); - Mockito.doReturn(COORDINATOR_KILL_PERIOD).when(config).getCoordinatorKillPeriod(); - Mockito.doReturn(DURATION_TO_RETAIN).when(config).getCoordinatorKillDurationToRetain(); - Mockito.doReturn(INDEXING_PERIOD).when(config).getCoordinatorIndexingPeriod(); - Mockito.doReturn(MAX_SEGMENTS_TO_KILL).when(config).getCoordinatorKillMaxSegments(); - target = new KillUnusedSegments(segmentsMetadataManager, indexingServiceClient, config); - } - @Test - public void testRunWihNoIntervalShouldNotKillAnySegments() - { - target.run(params); - Mockito.verify(indexingServiceClient, Mockito.never()) - .killUnusedSegments(anyString(), anyString(), any(Interval.class)); - } + yearOldSegment = createSegmentWithEnd(now.minusDays(365)); + monthOldSegment = createSegmentWithEnd(now.minusDays(30)); + dayOldSegment = createSegmentWithEnd(now.minusDays(1)); + hourOldSegment = createSegmentWithEnd(now.minusHours(1)); + nextDaySegment = createSegmentWithEnd(now.plusDays(1)); + nextMonthSegment = createSegmentWithEnd(now.plusDays(30)); - @Test - public void testRunWihSpecificDatasourceAndNoIntervalShouldNotKillAnySegments() - { - Mockito.when(coordinatorDynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn()).thenReturn(Collections.singleton("DS1")); - target.run(params); - Mockito.verify(indexingServiceClient, Mockito.never()) - .killUnusedSegments(anyString(), anyString(), any(Interval.class)); - } + final List unusedSegments = ImmutableList.of( + yearOldSegment, + monthOldSegment, + dayOldSegment, + hourOldSegment, + nextDaySegment, + nextMonthSegment + ); + + Mockito.when( + segmentsMetadataManager.getUnusedSegmentIntervals( + ArgumentMatchers.anyString(), + ArgumentMatchers.any(), + ArgumentMatchers.anyInt() + ) + ).thenAnswer(invocation -> { + DateTime maxEndTime = invocation.getArgument(1); + List unusedIntervals = + unusedSegments.stream() + .map(DataSegment::getInterval) + .filter(i -> i.getEnd().isBefore(maxEndTime)) + .collect(Collectors.toList()); + + int limit = invocation.getArgument(2); + return unusedIntervals.size() <= limit ? unusedIntervals : unusedIntervals.subList(0, limit); + }); + + target = new KillUnusedSegments(segmentsMetadataManager, indexingServiceClient, config); } - public static class FindIntervalsTest + @Test + public void testRunWithNoIntervalShouldNotKillAnySegments() { - @Test - public void testFindIntervalForKill() - { - testFindIntervalForKill(null, null); - testFindIntervalForKill(ImmutableList.of(), null); + Mockito.doReturn(null).when(segmentsMetadataManager).getUnusedSegmentIntervals( + ArgumentMatchers.anyString(), + ArgumentMatchers.any(), + ArgumentMatchers.anyInt() + ); - testFindIntervalForKill(ImmutableList.of(Intervals.of("2014/2015")), Intervals.of("2014/2015")); + target.run(params); + Mockito.verify(indexingServiceClient, Mockito.never()) + .killUnusedSegments(anyString(), anyString(), any(Interval.class)); + } - testFindIntervalForKill( - ImmutableList.of(Intervals.of("2014/2015"), Intervals.of("2016/2017")), - Intervals.of("2014/2017") - ); + @Test + public void testRunWithSpecificDatasourceAndNoIntervalShouldNotKillAnySegments() + { + Mockito.doReturn(Duration.standardDays(400)) + .when(config).getCoordinatorKillDurationToRetain(); + target = new KillUnusedSegments(segmentsMetadataManager, indexingServiceClient, config); - testFindIntervalForKill( - ImmutableList.of(Intervals.of("2014/2015"), Intervals.of("2015/2016")), - Intervals.of("2014/2016") - ); + // No unused segment is older than the retention period + target.run(params); + Mockito.verify(indexingServiceClient, Mockito.never()) + .killUnusedSegments(anyString(), anyString(), any(Interval.class)); + } - testFindIntervalForKill( - ImmutableList.of(Intervals.of("2015/2016"), Intervals.of("2014/2015")), - Intervals.of("2014/2016") - ); + @Test + public void testDurationToRetain() + { + // Only segments more than a day old are killed + Interval expectedKillInterval = new Interval( + yearOldSegment.getInterval().getStart(), + dayOldSegment.getInterval().getEnd() + ); + runAndVerifyKillInterval(expectedKillInterval); + } - testFindIntervalForKill( - ImmutableList.of(Intervals.of("2015/2017"), Intervals.of("2014/2016")), - Intervals.of("2014/2017") - ); + @Test + public void testNegativeDurationToRetain() + { + // Duration to retain = -1 day, reinit target for config to take effect + Mockito.doReturn(DURATION_TO_RETAIN.negated()) + .when(config).getCoordinatorKillDurationToRetain(); + target = new KillUnusedSegments(segmentsMetadataManager, indexingServiceClient, config); - testFindIntervalForKill( - ImmutableList.of( - Intervals.of("2015/2019"), - Intervals.of("2014/2016"), - Intervals.of("2018/2020") - ), - Intervals.of("2014/2020") - ); + // Segments upto 1 day in the future are killed + Interval expectedKillInterval = new Interval( + yearOldSegment.getInterval().getStart(), + nextDaySegment.getInterval().getEnd() + ); + runAndVerifyKillInterval(expectedKillInterval); + } - testFindIntervalForKill( - ImmutableList.of( - Intervals.of("2015/2019"), - Intervals.of("2014/2016"), - Intervals.of("2018/2020"), - Intervals.of("2021/2022") - ), - Intervals.of("2014/2022") - ); - } + @Test + public void testIgnoreDurationToRetain() + { + Mockito.doReturn(true) + .when(config).getCoordinatorKillIgnoreDurationToRetain(); + target = new KillUnusedSegments(segmentsMetadataManager, indexingServiceClient, config); - private void testFindIntervalForKill(List segmentIntervals, Interval expected) - { - SegmentsMetadataManager segmentsMetadataManager = EasyMock.createMock(SegmentsMetadataManager.class); - EasyMock.expect( - segmentsMetadataManager.getUnusedSegmentIntervals( - EasyMock.anyString(), - EasyMock.anyObject(DateTime.class), - EasyMock.anyInt() - ) - ).andReturn(segmentIntervals); - EasyMock.replay(segmentsMetadataManager); - IndexingServiceClient indexingServiceClient = EasyMock.createMock(IndexingServiceClient.class); + // All future and past unused segments are killed + Interval expectedKillInterval = new Interval( + yearOldSegment.getInterval().getStart(), + nextMonthSegment.getInterval().getEnd() + ); + runAndVerifyKillInterval(expectedKillInterval); + } - KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments( - segmentsMetadataManager, - indexingServiceClient, - new TestDruidCoordinatorConfig.Builder() - .withCoordinatorIndexingPeriod(Duration.parse("PT76400S")) - .withLoadTimeoutDelay(new Duration(1)) - .withCoordinatorKillPeriod(Duration.parse("PT86400S")) - .withCoordinatorKillDurationToRetain(Duration.parse("PT86400S")) - .withCoordinatorKillMaxSegments(1000) - .withLoadQueuePeonRepeatDelay(Duration.ZERO) - .withCoordinatorKillIgnoreDurationToRetain(false) - .build() - ); + @Test + public void testMaxSegmentsToKill() + { + Mockito.doReturn(1) + .when(config).getCoordinatorKillMaxSegments(); + target = new KillUnusedSegments(segmentsMetadataManager, indexingServiceClient, config); - Assert.assertEquals( - expected, - unusedSegmentsKiller.findIntervalForKill("test", 10000) - ); - } + // Only 1 unused segment is killed + runAndVerifyKillInterval(yearOldSegment.getInterval()); + } - /** - * Test that retainDuration is properly set based on the value available in the - * Coordinator config. Positive and Negative durations should work as well as - * null, if and only if ignoreDurationToRetain is true. - */ - @Test - public void testRetainDurationValues() - { - // Positive duration to retain - KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments( - null, - null, - new TestDruidCoordinatorConfig.Builder() - .withCoordinatorIndexingPeriod(Duration.parse("PT76400S")) - .withLoadTimeoutDelay(new Duration(1)) - .withCoordinatorKillPeriod(Duration.parse("PT86400S")) - .withCoordinatorKillDurationToRetain(Duration.parse("PT86400S")) - .withCoordinatorKillMaxSegments(1000) - .withLoadQueuePeonRepeatDelay(Duration.ZERO) - .withCoordinatorKillIgnoreDurationToRetain(false) - .build() - ); - Assert.assertEquals((Long) Duration.parse("PT86400S").getMillis(), unusedSegmentsKiller.getRetainDuration()); + private void runAndVerifyKillInterval(Interval expectedKillInterval) + { + target.run(params); + Mockito.verify(indexingServiceClient, Mockito.times(1)).killUnusedSegments( + ArgumentMatchers.anyString(), + ArgumentMatchers.eq("DS1"), + ArgumentMatchers.eq(expectedKillInterval) + ); + } - // Negative duration to retain - unusedSegmentsKiller = new KillUnusedSegments( - null, - null, - new TestDruidCoordinatorConfig.Builder() - .withCoordinatorIndexingPeriod(Duration.parse("PT76400S")) - .withLoadTimeoutDelay(new Duration(1)) - .withCoordinatorKillPeriod(Duration.parse("PT86400S")) - .withCoordinatorKillDurationToRetain(Duration.parse("PT-86400S")) - .withCoordinatorKillMaxSegments(1000) - .withLoadQueuePeonRepeatDelay(Duration.ZERO) - .withCoordinatorKillIgnoreDurationToRetain(false) - .build() - ); - Assert.assertEquals((Long) Duration.parse("PT-86400S").getMillis(), unusedSegmentsKiller.getRetainDuration()); - } - - /** - * Test that the end time upper limit is properly computated for both positive and - * negative durations. Also ensure that if durationToRetain is to be ignored, that - * the upper limit is {@link DateTime} max time. - */ - @Test - public void testGetEndTimeUpperLimit() - { - // If ignoreDurationToRetain is true, ignore the value configured for durationToRetain and return 9999-12-31T23:59 - KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments( - null, - null, - new TestDruidCoordinatorConfig.Builder() - .withCoordinatorIndexingPeriod(Duration.parse("PT76400S")) - .withLoadTimeoutDelay(new Duration(1)) - .withCoordinatorKillPeriod(Duration.parse("PT86400S")) - .withCoordinatorKillDurationToRetain(Duration.parse("PT86400S")) - .withCoordinatorKillMaxSegments(1000) - .withLoadQueuePeonRepeatDelay(Duration.ZERO) - .withCoordinatorKillIgnoreDurationToRetain(true) - .build() - ); - Assert.assertEquals( - DateTimes.COMPARE_DATE_AS_STRING_MAX, - unusedSegmentsKiller.getEndTimeUpperLimit() - ); - - // Testing a negative durationToRetain period returns proper date in future - unusedSegmentsKiller = new KillUnusedSegments( - null, - null, - new TestDruidCoordinatorConfig.Builder() - .withCoordinatorIndexingPeriod(Duration.parse("PT76400S")) - .withLoadTimeoutDelay(new Duration(1)) - .withCoordinatorKillPeriod(Duration.parse("PT86400S")) - .withCoordinatorKillDurationToRetain(Duration.parse("PT-86400S")) - .withCoordinatorKillMaxSegments(1000) - .withLoadQueuePeonRepeatDelay(Duration.ZERO) - .withCoordinatorKillIgnoreDurationToRetain(false) - .build() - ); - - DateTime expectedTime = DateTimes.nowUtc().minus(Duration.parse("PT-86400S").getMillis()); - Assert.assertEquals(expectedTime, unusedSegmentsKiller.getEndTimeUpperLimit()); - - // Testing a positive durationToRetain period returns expected value in the past - unusedSegmentsKiller = new KillUnusedSegments( - null, - null, - new TestDruidCoordinatorConfig.Builder() - .withCoordinatorIndexingPeriod(Duration.parse("PT76400S")) - .withLoadTimeoutDelay(new Duration(1)) - .withCoordinatorKillPeriod(Duration.parse("PT86400S")) - .withCoordinatorKillDurationToRetain(Duration.parse("PT86400S")) - .withCoordinatorKillMaxSegments(1000) - .withLoadQueuePeonRepeatDelay(Duration.ZERO) - .withCoordinatorKillIgnoreDurationToRetain(false) - .build() - ); - expectedTime = DateTimes.nowUtc().minus(Duration.parse("PT86400S").getMillis()); - Assert.assertEquals(expectedTime, unusedSegmentsKiller.getEndTimeUpperLimit()); - } + private DataSegment createSegmentWithEnd(DateTime endTime) + { + return new DataSegment( + "DS1", + new Interval(Period.days(1), endTime), + DateTimes.nowUtc().toString(), + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + 1, + 0 + ); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java index 28b84f62e9d..01308d82e73 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java @@ -453,7 +453,6 @@ public class CoordinatorSimulationBuilder .withCoordinatorStartDelay(new Duration(1L)) .withCoordinatorPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD)) .withCoordinatorKillPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD)) - .withLoadQueuePeonRepeatDelay(new Duration("PT0S")) .withLoadQueuePeonType("http") .withCoordinatorKillIgnoreDurationToRetain(false) .build(); diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index 72489036198..707ea1d0f90 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -78,6 +78,7 @@ public class CoordinatorDynamicConfigTest 1, 1, 1, + true, 1, 1, 2, @@ -100,6 +101,7 @@ public class CoordinatorDynamicConfigTest 1, 1, 1, + true, 1, 1, 2, @@ -122,6 +124,7 @@ public class CoordinatorDynamicConfigTest 1, 1, 1, + true, 1, 1, 2, @@ -144,6 +147,7 @@ public class CoordinatorDynamicConfigTest 1, 1, 1, + true, 1, 1, 2, @@ -158,7 +162,10 @@ public class CoordinatorDynamicConfigTest Integer.MAX_VALUE ); - actual = CoordinatorDynamicConfig.builder().withPercentOfSegmentsToConsiderPerMove(10).build(actual); + actual = CoordinatorDynamicConfig.builder() + .withPercentOfSegmentsToConsiderPerMove(10) + .withUseBatchedSegmentSampler(false) + .build(actual); assertConfig( actual, 1, @@ -166,6 +173,7 @@ public class CoordinatorDynamicConfigTest 1, 1, 10, + false, 1, 1, 2, @@ -188,6 +196,7 @@ public class CoordinatorDynamicConfigTest 1, 1, 10, + false, 1, 1, 2, @@ -210,6 +219,7 @@ public class CoordinatorDynamicConfigTest 1, 1, 10, + false, 1, 1, 2, @@ -315,7 +325,7 @@ public class CoordinatorDynamicConfigTest 1, 1, 100, - 1, + true, 1, 1, 2, true, @@ -337,6 +347,7 @@ public class CoordinatorDynamicConfigTest 1, 1, 100, + true, 1, 1, 2, @@ -359,6 +370,7 @@ public class CoordinatorDynamicConfigTest 1, 1, 100, + true, 1, 1, 2, @@ -407,6 +419,7 @@ public class CoordinatorDynamicConfigTest 1, 1, 1, + true, 1, 1, 2, @@ -525,6 +538,7 @@ public class CoordinatorDynamicConfigTest 1, 1, 100, + true, 1, 1, 2, @@ -574,6 +588,7 @@ public class CoordinatorDynamicConfigTest 1, 1, 1, + true, 1, 1, 2, @@ -638,6 +653,7 @@ public class CoordinatorDynamicConfigTest 1, 1, 1, + true, 1, 1, 2, @@ -665,6 +681,7 @@ public class CoordinatorDynamicConfigTest 100, 5, 100, + true, 15, 10, 1, @@ -695,6 +712,7 @@ public class CoordinatorDynamicConfigTest 100, 5, 100, + true, 15, 10, 1, @@ -785,6 +803,7 @@ public class CoordinatorDynamicConfigTest int expectedMergeSegmentsLimit, int expectedMaxSegmentsToMove, int expectedPercentOfSegmentsToConsiderPerMove, + boolean expectedUseBatchedSegmentSampler, int expectedReplicantLifetime, int expectedReplicationThrottleLimit, int expectedBalancerComputeThreads, @@ -807,6 +826,7 @@ public class CoordinatorDynamicConfigTest Assert.assertEquals(expectedMergeSegmentsLimit, config.getMergeSegmentsLimit()); Assert.assertEquals(expectedMaxSegmentsToMove, config.getMaxSegmentsToMove()); Assert.assertEquals(expectedPercentOfSegmentsToConsiderPerMove, config.getPercentOfSegmentsToConsiderPerMove(), 0); + Assert.assertEquals(expectedUseBatchedSegmentSampler, config.useBatchedSegmentSampler()); Assert.assertEquals(expectedReplicantLifetime, config.getReplicantLifetime()); Assert.assertEquals(expectedReplicationThrottleLimit, config.getReplicationThrottleLimit()); Assert.assertEquals(expectedBalancerComputeThreads, config.getBalancerComputeThreads()); diff --git a/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx b/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx index 1d148e6b187..eeb25db09c4 100644 --- a/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx +++ b/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx @@ -197,7 +197,7 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: Field[ { name: 'useBatchedSegmentSampler', type: 'boolean', - defaultValue: false, + defaultValue: true, info: ( <> Boolean flag for whether or not we should use the Reservoir Sampling with a reservoir of