diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java index 07a7bf1b2ef..5548636b000 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java @@ -19,7 +19,6 @@ package org.apache.druid.server.coordinator; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import org.apache.druid.client.DataSourcesSnapshot; @@ -256,7 +255,6 @@ public class DruidCoordinatorRuntimeParams public DruidCoordinatorRuntimeParams build() { initStatsIfRequired(); - initSegmentLoadingConfigIfRequired(); initSegmentAssignerIfRequired(); return new DruidCoordinatorRuntimeParams( @@ -282,15 +280,10 @@ public class DruidCoordinatorRuntimeParams stats = stats == null ? new CoordinatorRunStats(debugDimensions) : stats; } - private void initSegmentLoadingConfigIfRequired() - { - if (segmentLoadingConfig == null - && coordinatorDynamicConfig != null - && usedSegments != null) { - segmentLoadingConfig = SegmentLoadingConfig.create(coordinatorDynamicConfig, usedSegments.size()); - } - } - + /** + * Initializes {@link StrategicSegmentAssigner} used by historical management + * duties for segment load/drop/move. + */ private void initSegmentAssignerIfRequired() { if (segmentAssigner != null || loadQueueManager == null) { @@ -299,8 +292,13 @@ public class DruidCoordinatorRuntimeParams Preconditions.checkNotNull(druidCluster); Preconditions.checkNotNull(balancerStrategy); - Preconditions.checkNotNull(segmentLoadingConfig); + Preconditions.checkNotNull(usedSegments); Preconditions.checkNotNull(stats); + + if (segmentLoadingConfig == null) { + segmentLoadingConfig = SegmentLoadingConfig.create(coordinatorDynamicConfig, usedSegments.size()); + } + segmentAssigner = new StrategicSegmentAssigner( loadQueueManager, druidCluster, @@ -339,16 +337,12 @@ public class DruidCoordinatorRuntimeParams return this; } - /** This method must be used in test code only. */ - @VisibleForTesting - public Builder withUsedSegmentsInTest(DataSegment... usedSegments) + public Builder withUsedSegments(DataSegment... usedSegments) { - return withUsedSegmentsInTest(Arrays.asList(usedSegments)); + return withUsedSegments(Arrays.asList(usedSegments)); } - /** This method must be used in test code only. */ - @VisibleForTesting - public Builder withUsedSegmentsInTest(Collection usedSegments) + public Builder withUsedSegments(Collection usedSegments) { this.usedSegments = createUsedSegmentsSet(usedSegments); this.dataSourcesSnapshot = DataSourcesSnapshot.fromUsedSegments(usedSegments, ImmutableMap.of()); @@ -361,6 +355,12 @@ public class DruidCoordinatorRuntimeParams return this; } + public Builder withSegmentLoadingConfig(SegmentLoadingConfig config) + { + this.segmentLoadingConfig = config; + return this; + } + public Builder withCompactionConfig(CoordinatorCompactionConfig config) { this.coordinatorCompactionConfig = config; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java index 197adce91e1..f066235efc9 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java @@ -84,7 +84,8 @@ public class PrepareBalancerAndLoadQueues implements CoordinatorDuty taskMaster.resetPeonsForNewServers(currentServers); final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig(); - final SegmentLoadingConfig segmentLoadingConfig = params.getSegmentLoadingConfig(); + final SegmentLoadingConfig segmentLoadingConfig + = SegmentLoadingConfig.create(dynamicConfig, params.getUsedSegments().size()); final DruidCluster cluster = prepareCluster(dynamicConfig, segmentLoadingConfig, currentServers); cancelLoadsOnDecommissioningServers(cluster); @@ -103,6 +104,7 @@ public class PrepareBalancerAndLoadQueues implements CoordinatorDuty return params.buildFromExisting() .withDruidCluster(cluster) .withBalancerStrategy(balancerStrategy) + .withSegmentLoadingConfig(segmentLoadingConfig) .withSegmentAssignerUsing(loadQueueManager) .build(); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java index 2f97972dc77..84f2d471b07 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java @@ -28,9 +28,9 @@ public class Stats { // Decisions taken in a run public static final CoordinatorStat ASSIGNED - = CoordinatorStat.toLogAndEmit("assigned", "segment/assigned/count", CoordinatorStat.Level.INFO); + = CoordinatorStat.toDebugAndEmit("assigned", "segment/assigned/count"); public static final CoordinatorStat DROPPED - = CoordinatorStat.toLogAndEmit("dropped", "segment/dropped/count", CoordinatorStat.Level.INFO); + = CoordinatorStat.toDebugAndEmit("dropped", "segment/dropped/count"); public static final CoordinatorStat DELETED = CoordinatorStat.toLogAndEmit("deleted", "segment/deleted/count", CoordinatorStat.Level.INFO); public static final CoordinatorStat MOVED diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java index ba69b1b1eec..3e5bc87f9b3 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java @@ -130,7 +130,7 @@ public class BalanceSegmentsProfiler DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams .newBuilder(DateTimes.nowUtc()) .withDruidCluster(druidCluster) - .withUsedSegmentsInTest(segments) + .withUsedSegments(segments) .withDynamicConfigs( CoordinatorDynamicConfig .builder() @@ -185,7 +185,7 @@ public class BalanceSegmentsProfiler ) .build() ) - .withUsedSegmentsInTest(segments) + .withUsedSegments(segments) .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build()) .withSegmentAssignerUsing(loadQueueManager) .build(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 380650f27ac..5a7c940c8df 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -43,6 +43,7 @@ import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.metadata.MetadataRuleManager; @@ -80,7 +81,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; /** @@ -165,7 +165,7 @@ public class DruidCoordinatorTest extends CuratorTestBase ); loadQueuePeon.start(); druidNode = new DruidNode("hey", "what", false, 1234, null, true, false); - scheduledExecutorFactory = (corePoolSize, nameFormat) -> Executors.newSingleThreadScheduledExecutor(); + scheduledExecutorFactory = ScheduledExecutors::fixed; leaderAnnouncerLatch = new CountDownLatch(1); leaderUnannouncerLatch = new CountDownLatch(1); coordinator = new DruidCoordinator( diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java index db2bcda006d..4122a4c752a 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java @@ -395,7 +395,7 @@ public class BalanceSegmentsTest return DruidCoordinatorRuntimeParams .newBuilder(DateTimes.nowUtc()) .withDruidCluster(DruidCluster.builder().addTier("normal", servers).build()) - .withUsedSegmentsInTest(allSegments) + .withUsedSegments(allSegments) .withBroadcastDatasources(broadcastDatasources) .withBalancerStrategy(balancerStrategy) .withSegmentAssignerUsing(loadQueueManager); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStatsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStatsTest.java index f1e46f70b41..fcbeeebc726 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStatsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStatsTest.java @@ -48,7 +48,7 @@ public class CollectSegmentAndServerStatsTest DruidCoordinatorRuntimeParams runtimeParams = DruidCoordinatorRuntimeParams.newBuilder(DateTimes.nowUtc()) .withDruidCluster(DruidCluster.EMPTY) - .withUsedSegmentsInTest() + .withUsedSegments() .withBalancerStrategy(new RandomBalancerStrategy()) .withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null)) .build(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java index 32cd1f66ef8..a10a2731fc5 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java @@ -337,7 +337,7 @@ public class RunRulesTest return DruidCoordinatorRuntimeParams .newBuilder(DateTimes.nowUtc().minusDays(1)) .withDruidCluster(druidCluster) - .withUsedSegmentsInTest(dataSegments) + .withUsedSegments(dataSegments) .withDatabaseRuleManager(databaseRuleManager); } @@ -830,7 +830,7 @@ public class RunRulesTest stats = runDutyAndGetStats( createCoordinatorRuntimeParams(druidCluster) - .withUsedSegmentsInTest(overFlowSegment) + .withUsedSegments(overFlowSegment) .withBalancerStrategy(balancerStrategy) .withSegmentAssignerUsing(loadQueueManager) .build() @@ -950,7 +950,7 @@ public class RunRulesTest .build(); DruidCoordinatorRuntimeParams params = createCoordinatorRuntimeParams(druidCluster) - .withUsedSegmentsInTest(longerUsedSegments) + .withUsedSegments(longerUsedSegments) .withBalancerStrategy(new CostBalancerStrategy(balancerExecutor)) .withSegmentAssignerUsing(loadQueueManager) .build(); @@ -1004,7 +1004,7 @@ public class RunRulesTest ).build(); DruidCoordinatorRuntimeParams params = createCoordinatorRuntimeParams(druidCluster) - .withUsedSegmentsInTest(usedSegments) + .withUsedSegments(usedSegments) .withBalancerStrategy(new CostBalancerStrategy(balancerExecutor)) .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build()) .withSegmentAssignerUsing(loadQueueManager) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java index 3199abfc0fd..db0af054e60 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java @@ -259,7 +259,7 @@ public class UnloadUnusedSegmentsTest .addRealtimes(new ServerHolder(indexerServer, indexerPeon, false)) .build() ) - .withUsedSegmentsInTest(usedSegments) + .withUsedSegments(usedSegments) .withBroadcastDatasources(Collections.singleton(broadcastDatasource)) .withDatabaseRuleManager(databaseRuleManager) .build(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java index a9f1485e064..46e62fe1432 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java @@ -236,7 +236,7 @@ public class BroadcastDistributionRuleTest return DruidCoordinatorRuntimeParams .newBuilder(DateTimes.nowUtc()) .withDruidCluster(druidCluster) - .withUsedSegmentsInTest(usedSegments) + .withUsedSegments(usedSegments) .withBalancerStrategy(new RandomBalancerStrategy()) .withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null)) .build(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java index 0f91d096938..801df8ebd76 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java @@ -145,7 +145,7 @@ public class LoadRuleTest .newBuilder(DateTimes.nowUtc()) .withDruidCluster(druidCluster) .withBalancerStrategy(balancerStrategy) - .withUsedSegmentsInTest(usedSegments) + .withUsedSegments(usedSegments) .withDynamicConfigs( CoordinatorDynamicConfig.builder() .withSmartSegmentLoading(false) @@ -335,7 +335,7 @@ public class LoadRuleTest .newBuilder(DateTimes.nowUtc()) .withDruidCluster(druidCluster) .withBalancerStrategy(balancerStrategy) - .withUsedSegmentsInTest(dataSegment1, dataSegment2, dataSegment3) + .withUsedSegments(dataSegment1, dataSegment2, dataSegment3) .withDynamicConfigs( CoordinatorDynamicConfig.builder() .withSmartSegmentLoading(false)