diff --git a/docs/content/configuration/coordinator.md b/docs/content/configuration/coordinator.md index a655506172c..cde94abe7e1 100644 --- a/docs/content/configuration/coordinator.md +++ b/docs/content/configuration/coordinator.md @@ -93,6 +93,7 @@ Issuing a GET request at the same URL will return the spec that is currently in |`emitBalancingStats`|Boolean flag for whether or not we should emit balancing stats. This is an expensive operation.|false| |`killDataSourceWhitelist`|List of dataSources for which kill tasks are sent if property `druid.coordinator.kill.on` is true.|none| |`killAllDataSources`|Send kill tasks for ALL dataSources if property `druid.coordinator.kill.on` is true. If this is set to true then `killDataSourceWhitelist` must not be specified or be empty list.|false| +|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be queued for loading to any given server. This parameter could be used to speed up segments loading process, especially if there are "slow" nodes in the cluster (with low loading speed) or if too much segments scheduled to be replicated to some particular node (faster loading could be preferred to better segments distribution). Desired value depends on segments loading speed, acceptable replication time and number of nodes. Value 1000 could be a start point for a rather big cluster. Default value is 0 (loading queue is unbounded) |0| To view the audit history of coordinator dynamic config issue a GET request to the URL - diff --git a/server/src/main/java/io/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/io/druid/server/coordinator/CoordinatorDynamicConfig.java index b509d259d90..b75d42055ed 100644 --- a/server/src/main/java/io/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/io/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -43,6 +43,13 @@ public class CoordinatorDynamicConfig private final boolean emitBalancingStats; private final boolean killAllDataSources; private final Set killDataSourceWhitelist; + /** + * The maximum number of segments that could be queued for loading to any given server. + * Default values is 0 with the meaning of "unbounded" (any number of + * segments could be added to the loading queue for any server). + * See {@link LoadQueuePeon}, {@link io.druid.server.coordinator.rules.LoadRule#run} + */ + private final int maxSegmentsInNodeLoadingQueue; @JsonCreator public CoordinatorDynamicConfig( @@ -60,7 +67,8 @@ public class CoordinatorDynamicConfig // coordinator console can not send array of strings in the update request. // See https://github.com/druid-io/druid/issues/3055 @JsonProperty("killDataSourceWhitelist") Object killDataSourceWhitelist, - @JsonProperty("killAllDataSources") Boolean killAllDataSources + @JsonProperty("killAllDataSources") Boolean killAllDataSources, + @JsonProperty("maxSegmentsInNodeLoadingQueue") Integer maxSegmentsInNodeLoadingQueue ) { CoordinatorDynamicConfig current = configManager.watch( @@ -94,12 +102,14 @@ public class CoordinatorDynamicConfig ? current.getKillDataSourceWhitelist() : parseKillDataSourceWhitelist(killDataSourceWhitelist); + this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue == null ? current.getMaxSegmentsInNodeLoadingQueue() : maxSegmentsInNodeLoadingQueue; + if (this.killAllDataSources && !this.killDataSourceWhitelist.isEmpty()) { throw new IAE("can't have killAllDataSources and non-empty killDataSourceWhitelist"); } } - public CoordinatorDynamicConfig( + private CoordinatorDynamicConfig( long millisToWaitBeforeDeleting, long mergeBytesLimit, int mergeSegmentsLimit, @@ -109,7 +119,8 @@ public class CoordinatorDynamicConfig int balancerComputeThreads, boolean emitBalancingStats, Object killDataSourceWhitelist, - boolean killAllDataSources + boolean killAllDataSources, + int maxSegmentsInNodeLoadingQueue ) { this.maxSegmentsToMove = maxSegmentsToMove; @@ -122,6 +133,7 @@ public class CoordinatorDynamicConfig this.balancerComputeThreads = Math.max(balancerComputeThreads, 1); this.killAllDataSources = killAllDataSources; this.killDataSourceWhitelist = parseKillDataSourceWhitelist(killDataSourceWhitelist); + this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue; if (this.killAllDataSources && !this.killDataSourceWhitelist.isEmpty()) { throw new IAE("can't have killAllDataSources and non-empty killDataSourceWhitelist"); @@ -207,6 +219,12 @@ public class CoordinatorDynamicConfig return killAllDataSources; } + @JsonProperty + public int getMaxSegmentsInNodeLoadingQueue() + { + return maxSegmentsInNodeLoadingQueue; + } + @Override public String toString() { @@ -221,6 +239,7 @@ public class CoordinatorDynamicConfig ", emitBalancingStats=" + emitBalancingStats + ", killDataSourceWhitelist=" + killDataSourceWhitelist + ", killAllDataSources=" + killAllDataSources + + ", maxSegmentsInNodeLoadingQueue=" + maxSegmentsInNodeLoadingQueue + '}'; } @@ -263,6 +282,9 @@ public class CoordinatorDynamicConfig if (killAllDataSources != that.killAllDataSources) { return false; } + if (maxSegmentsInNodeLoadingQueue != that.maxSegmentsInNodeLoadingQueue) { + return false; + } return !(killDataSourceWhitelist != null ? !killDataSourceWhitelist.equals(that.killDataSourceWhitelist) : that.killDataSourceWhitelist != null); @@ -282,9 +304,15 @@ public class CoordinatorDynamicConfig result = 31 * result + (emitBalancingStats ? 1 : 0); result = 31 * result + (killAllDataSources ? 1 : 0); result = 31 * result + (killDataSourceWhitelist != null ? killDataSourceWhitelist.hashCode() : 0); + result = 31 * result + maxSegmentsInNodeLoadingQueue; return result; } + public static Builder builder() + { + return new Builder(); + } + public static class Builder { private long millisToWaitBeforeDeleting; @@ -297,10 +325,11 @@ public class CoordinatorDynamicConfig private int balancerComputeThreads; private Set killDataSourceWhitelist; private boolean killAllDataSources; + private int maxSegmentsInNodeLoadingQueue; public Builder() { - this(15 * 60 * 1000L, 524288000L, 100, 5, 15, 10, 1, false, null, false); + this(15 * 60 * 1000L, 524288000L, 100, 5, 15, 10, 1, false, null, false, 0); } private Builder( @@ -313,7 +342,8 @@ public class CoordinatorDynamicConfig int balancerComputeThreads, boolean emitBalancingStats, Set killDataSourceWhitelist, - boolean killAllDataSources + boolean killAllDataSources, + int maxSegmentsInNodeLoadingQueue ) { this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting; @@ -326,6 +356,7 @@ public class CoordinatorDynamicConfig this.balancerComputeThreads = balancerComputeThreads; this.killDataSourceWhitelist = killDataSourceWhitelist; this.killAllDataSources = killAllDataSources; + this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue; } public Builder withMillisToWaitBeforeDeleting(long millisToWaitBeforeDeleting) @@ -370,12 +401,30 @@ public class CoordinatorDynamicConfig return this; } + public Builder withEmitBalancingStats(boolean emitBalancingStats) + { + this.emitBalancingStats = emitBalancingStats; + return this; + } + public Builder withKillDataSourceWhitelist(Set killDataSourceWhitelist) { this.killDataSourceWhitelist = killDataSourceWhitelist; return this; } + public Builder withKillAllDataSources(boolean killAllDataSources) + { + this.killAllDataSources = killAllDataSources; + return this; + } + + public Builder withMaxSegmentsInNodeLoadingQueue(int maxSegmentsInNodeLoadingQueue) + { + this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue; + return this; + } + public CoordinatorDynamicConfig build() { return new CoordinatorDynamicConfig( @@ -388,7 +437,8 @@ public class CoordinatorDynamicConfig balancerComputeThreads, emitBalancingStats, killDataSourceWhitelist, - killAllDataSources + killAllDataSources, + maxSegmentsInNodeLoadingQueue ); } } diff --git a/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java index 12dbf358d25..3ee0cdea538 100644 --- a/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java +++ b/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java @@ -127,6 +127,11 @@ public class LoadQueuePeon return failedAssignCount.getAndSet(0); } + public int getNumberOfSegmentsInQueue() + { + return segmentsToLoad.size(); + } + public void loadSegment( final DataSegment segment, final LoadPeonCallback callback diff --git a/server/src/main/java/io/druid/server/coordinator/ServerHolder.java b/server/src/main/java/io/druid/server/coordinator/ServerHolder.java index 12b14fc3cba..ad4fd7379dc 100644 --- a/server/src/main/java/io/druid/server/coordinator/ServerHolder.java +++ b/server/src/main/java/io/druid/server/coordinator/ServerHolder.java @@ -106,6 +106,11 @@ public class ServerHolder implements Comparable return peon.getSegmentsToLoad().contains(segment); } + public int getNumberOfSegmentsInQueue() + { + return peon.getNumberOfSegmentsInQueue(); + } + @Override public int compareTo(ServerHolder serverHolder) { diff --git a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java index 3ce2dcb7593..3275e7a4793 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java @@ -35,7 +35,10 @@ import io.druid.timeline.DataSegment; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; /** * LoadRules indicate the number of replicants a segment should have in a given tier. @@ -61,15 +64,29 @@ public abstract class LoadRule implements Rule final int totalReplicantsInTier = params.getSegmentReplicantLookup() .getTotalReplicants(segment.getIdentifier(), tier); final int loadedReplicantsInTier = params.getSegmentReplicantLookup() - .getLoadedReplicants(segment.getIdentifier(), tier); + .getLoadedReplicants(segment.getIdentifier(), tier); final MinMaxPriorityQueue serverQueue = params.getDruidCluster().getHistoricalsByTier(tier); + if (serverQueue == null) { log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit(); continue; } - final List serverHolderList = Lists.newArrayList(serverQueue); + final int maxSegmentsInNodeLoadingQueue = params.getCoordinatorDynamicConfig() + .getMaxSegmentsInNodeLoadingQueue(); + + Predicate serverHolderPredicate; + if (maxSegmentsInNodeLoadingQueue > 0) { + serverHolderPredicate = s -> (s != null && s.getNumberOfSegmentsInQueue() < maxSegmentsInNodeLoadingQueue); + } else { + serverHolderPredicate = Objects::nonNull; + } + + final List serverHolderList = serverQueue.stream() + .filter(serverHolderPredicate) + .collect(Collectors.toList()); + final BalancerStrategy strategy = params.getBalancerStrategy(); if (availableSegments.contains(segment)) { CoordinatorStats assignStats = assign( @@ -215,11 +232,12 @@ public abstract class LoadRule implements Rule return stats; } - protected void validateTieredReplicants(Map tieredReplicants){ - if(tieredReplicants.size() == 0) { + protected void validateTieredReplicants(Map tieredReplicants) + { + if (tieredReplicants.size() == 0) { throw new IAE("A rule with empty tiered replicants is invalid"); } - for (Map.Entry entry: tieredReplicants.entrySet()) { + for (Map.Entry entry : tieredReplicants.entrySet()) { if (entry.getValue() == null) { throw new IAE("Replicant value cannot be empty"); } diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index f59fbdcb246..f426d11a1de 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -118,15 +118,22 @@ public class DruidCoordinatorRuleRunnerTest mockCoordinator(); mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); - EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); - EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); - EasyMock.replay(mockPeon); + mockEmptyPeon(); EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( Lists.newArrayList( - new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), ImmutableMap.of("hot", 1)), - new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), ImmutableMap.of("normal", 1)), - new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), ImmutableMap.of("cold", 1)) + new IntervalLoadRule( + new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), + ImmutableMap.of("hot", 1) + ), + new IntervalLoadRule( + new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), + ImmutableMap.of("normal", 1) + ), + new IntervalLoadRule( + new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), + ImmutableMap.of("cold", 1) + ) )).atLeastOnce(); EasyMock.replay(databaseRuleManager); @@ -185,9 +192,9 @@ public class DruidCoordinatorRuleRunnerTest ); ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1)); + Executors.newFixedThreadPool(1)); BalancerStrategy balancerStrategy = - new CostBalancerStrategyFactory().createBalancerStrategy(exec); + new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() @@ -226,14 +233,18 @@ public class DruidCoordinatorRuleRunnerTest mockCoordinator(); mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); - EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); - EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); - EasyMock.replay(mockPeon); + mockEmptyPeon(); EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( Lists.newArrayList( - new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), ImmutableMap.of("hot", 2)), - new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), ImmutableMap.of("cold", 1)) + new IntervalLoadRule( + new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), + ImmutableMap.of("hot", 2) + ), + new IntervalLoadRule( + new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), + ImmutableMap.of("cold", 1) + ) ) ).atLeastOnce(); EasyMock.replay(databaseRuleManager); @@ -288,9 +299,9 @@ public class DruidCoordinatorRuleRunnerTest ); ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1)); + Executors.newFixedThreadPool(1)); BalancerStrategy balancerStrategy = - new CostBalancerStrategyFactory().createBalancerStrategy(exec); + new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() @@ -327,14 +338,18 @@ public class DruidCoordinatorRuleRunnerTest mockCoordinator(); mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); - EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); - EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); - EasyMock.replay(mockPeon); + mockEmptyPeon(); EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( Lists.newArrayList( - new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), ImmutableMap.of("hot", 1)), - new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), ImmutableMap.of("normal", 1)) + new IntervalLoadRule( + new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), + ImmutableMap.of("hot", 1) + ), + new IntervalLoadRule( + new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), + ImmutableMap.of("normal", 1) + ) ) ).atLeastOnce(); EasyMock.replay(databaseRuleManager); @@ -385,9 +400,9 @@ public class DruidCoordinatorRuleRunnerTest SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1)); + Executors.newFixedThreadPool(1)); BalancerStrategy balancerStrategy = - new CostBalancerStrategyFactory().createBalancerStrategy(exec); + new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() @@ -417,9 +432,7 @@ public class DruidCoordinatorRuleRunnerTest mockCoordinator(); mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); - EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); - EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); - EasyMock.replay(mockPeon); + mockEmptyPeon(); emitter.emit(EasyMock.anyObject()); EasyMock.expectLastCall().times(12); @@ -427,8 +440,14 @@ public class DruidCoordinatorRuleRunnerTest EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( Lists.newArrayList( - new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), ImmutableMap.of("hot",1)), - new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), ImmutableMap.of("normal", 1)) + new IntervalLoadRule( + new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), + ImmutableMap.of("hot", 1) + ), + new IntervalLoadRule( + new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), + ImmutableMap.of("normal", 1) + ) ) ).atLeastOnce(); EasyMock.replay(databaseRuleManager); @@ -456,9 +475,9 @@ public class DruidCoordinatorRuleRunnerTest ); ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1)); + Executors.newFixedThreadPool(1)); BalancerStrategy balancerStrategy = - new CostBalancerStrategyFactory().createBalancerStrategy(exec); + new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() @@ -488,7 +507,10 @@ public class DruidCoordinatorRuleRunnerTest EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( Lists.newArrayList( - new IntervalLoadRule(new Interval("2012-01-02T00:00:00.000Z/2012-01-03T00:00:00.000Z"), ImmutableMap.of("normal", 1)) + new IntervalLoadRule( + new Interval("2012-01-02T00:00:00.000Z/2012-01-03T00:00:00.000Z"), + ImmutableMap.of("normal", 1) + ) ) ).atLeastOnce(); EasyMock.replay(databaseRuleManager); @@ -534,14 +556,10 @@ public class DruidCoordinatorRuleRunnerTest { mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); - EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); - EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); - EasyMock.replay(mockPeon); + mockEmptyPeon(); EasyMock.expect(coordinator.getDynamicConfigs()).andReturn( - new CoordinatorDynamicConfig( - 0, 0, 0, 0, 1, 24, 0, false, null, false - ) + createCoordinatorDynamicConfig() ).anyTimes(); coordinator.removeSegment(EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); @@ -549,7 +567,10 @@ public class DruidCoordinatorRuleRunnerTest EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( Lists.newArrayList( - new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), ImmutableMap.of("normal", 1)), + new IntervalLoadRule( + new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), + ImmutableMap.of("normal", 1) + ), new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z")) ) ).atLeastOnce(); @@ -585,9 +606,9 @@ public class DruidCoordinatorRuleRunnerTest SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1)); + Executors.newFixedThreadPool(1)); BalancerStrategy balancerStrategy = - new CostBalancerStrategyFactory().createBalancerStrategy(exec); + new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) @@ -614,13 +635,14 @@ public class DruidCoordinatorRuleRunnerTest mockCoordinator(); mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); - EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); - EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); - EasyMock.replay(mockPeon); + mockEmptyPeon(); EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( Lists.newArrayList( - new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), ImmutableMap.of("normal", 1)), + new IntervalLoadRule( + new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), + ImmutableMap.of("normal", 1) + ), new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z")) ) ).atLeastOnce(); @@ -670,9 +692,9 @@ public class DruidCoordinatorRuleRunnerTest SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1)); + Executors.newFixedThreadPool(1)); BalancerStrategy balancerStrategy = - new CostBalancerStrategyFactory().createBalancerStrategy(exec); + new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) @@ -702,13 +724,14 @@ public class DruidCoordinatorRuleRunnerTest EasyMock.expectLastCall().atLeastOnce(); mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); - EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); - EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); - EasyMock.replay(mockPeon); + mockEmptyPeon(); EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( Lists.newArrayList( - new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), ImmutableMap.of("hot", 1)), + new IntervalLoadRule( + new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), + ImmutableMap.of("hot", 1) + ), new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z")) ) ).atLeastOnce(); @@ -762,9 +785,9 @@ public class DruidCoordinatorRuleRunnerTest SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1)); + Executors.newFixedThreadPool(1)); BalancerStrategy balancerStrategy = - new CostBalancerStrategyFactory().createBalancerStrategy(exec); + new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) @@ -792,13 +815,14 @@ public class DruidCoordinatorRuleRunnerTest mockCoordinator(); mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); - EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); - EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); - EasyMock.replay(mockPeon); + mockEmptyPeon(); EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( Lists.newArrayList( - new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), ImmutableMap.of("hot", 1)), + new IntervalLoadRule( + new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), + ImmutableMap.of("hot", 1) + ), new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z")) ) ).atLeastOnce(); @@ -850,9 +874,9 @@ public class DruidCoordinatorRuleRunnerTest SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1)); + Executors.newFixedThreadPool(1)); BalancerStrategy balancerStrategy = - new CostBalancerStrategyFactory().createBalancerStrategy(exec); + new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) @@ -880,7 +904,10 @@ public class DruidCoordinatorRuleRunnerTest mockCoordinator(); EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( Lists.newArrayList( - new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T01:00:00.000Z"), ImmutableMap.of("normal", 0)) + new IntervalLoadRule( + new Interval("2012-01-01T00:00:00.000Z/2012-01-01T01:00:00.000Z"), + ImmutableMap.of("normal", 0) + ) ) ).atLeastOnce(); EasyMock.replay(databaseRuleManager); @@ -916,9 +943,7 @@ public class DruidCoordinatorRuleRunnerTest mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); - EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); - EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); - EasyMock.replay(mockPeon); + mockEmptyPeon(); LoadQueuePeon anotherMockPeon = EasyMock.createMock(LoadQueuePeon.class); EasyMock.expect(anotherMockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); @@ -951,9 +976,9 @@ public class DruidCoordinatorRuleRunnerTest SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1)); + Executors.newFixedThreadPool(1)); BalancerStrategy balancerStrategy = - new CostBalancerStrategyFactory().createBalancerStrategy(exec); + new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) @@ -987,13 +1012,14 @@ public class DruidCoordinatorRuleRunnerTest mockCoordinator(); mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); - EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); - EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); - EasyMock.replay(mockPeon); + mockEmptyPeon(); EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( Lists.newArrayList( - new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2013-01-01T00:00:00.000Z"), ImmutableMap.of("hot", 2)) + new IntervalLoadRule( + new Interval("2012-01-01T00:00:00.000Z/2013-01-01T00:00:00.000Z"), + ImmutableMap.of("hot", 2) + ) ) ).atLeastOnce(); EasyMock.replay(databaseRuleManager); @@ -1032,9 +1058,9 @@ public class DruidCoordinatorRuleRunnerTest ); ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1)); + Executors.newFixedThreadPool(1)); BalancerStrategy balancerStrategy = - new CostBalancerStrategyFactory().createBalancerStrategy(exec); + new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() @@ -1097,18 +1123,19 @@ public class DruidCoordinatorRuleRunnerTest public void testReplicantThrottleAcrossTiers() throws Exception { EasyMock.expect(coordinator.getDynamicConfigs()).andReturn( - new CoordinatorDynamicConfig( - 0, 0, 0, 0, 1, 7, 0, false, null, false - ) + CoordinatorDynamicConfig.builder() + .withReplicationThrottleLimit(7) + .withReplicantLifetime(1) + .withMaxSegmentsInNodeLoadingQueue(1000) + .build() + ).atLeastOnce(); coordinator.removeSegment(EasyMock.anyObject()); EasyMock.expectLastCall().anyTimes(); EasyMock.replay(coordinator); mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); - EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); - EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); - EasyMock.replay(mockPeon); + mockEmptyPeon(); EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( Lists.newArrayList( @@ -1162,9 +1189,9 @@ public class DruidCoordinatorRuleRunnerTest ); ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1)); + Executors.newFixedThreadPool(1)); BalancerStrategy balancerStrategy = - new CostBalancerStrategyFactory().createBalancerStrategy(exec); + new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() @@ -1195,13 +1222,14 @@ public class DruidCoordinatorRuleRunnerTest mockCoordinator(); mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); - EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); - EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); - EasyMock.replay(mockPeon); + mockEmptyPeon(); EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( Lists.newArrayList( - new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2013-01-02T00:00:00.000Z"), ImmutableMap.of("normal", 1)) + new IntervalLoadRule( + new Interval("2012-01-01T00:00:00.000Z/2013-01-02T00:00:00.000Z"), + ImmutableMap.of("normal", 1) + ) ) ).atLeastOnce(); EasyMock.replay(databaseRuleManager); @@ -1265,9 +1293,9 @@ public class DruidCoordinatorRuleRunnerTest SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1)); + Executors.newFixedThreadPool(1)); BalancerStrategy balancerStrategy = - new CostBalancerStrategyFactory().createBalancerStrategy(exec); + new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) @@ -1320,9 +1348,7 @@ public class DruidCoordinatorRuleRunnerTest mockCoordinator(); mockPeon.loadSegment(EasyMock.eq(v2), EasyMock.anyObject()); EasyMock.expectLastCall().once(); - EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); - EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); - EasyMock.replay(mockPeon); + mockEmptyPeon(); EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( Lists.newArrayList( @@ -1353,9 +1379,9 @@ public class DruidCoordinatorRuleRunnerTest ); ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1)); + Executors.newFixedThreadPool(1)); BalancerStrategy balancerStrategy = - new CostBalancerStrategyFactory().createBalancerStrategy(exec); + new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() @@ -1387,12 +1413,35 @@ public class DruidCoordinatorRuleRunnerTest private void mockCoordinator() { EasyMock.expect(coordinator.getDynamicConfigs()).andReturn( - new CoordinatorDynamicConfig( - 0, 0, 0, 0, 1, 24, 0, false, null, false - ) + createCoordinatorDynamicConfig() ).anyTimes(); coordinator.removeSegment(EasyMock.anyObject()); EasyMock.expectLastCall().anyTimes(); EasyMock.replay(coordinator); } + + private void mockEmptyPeon() + { + EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); + EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); + EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes(); + EasyMock.replay(mockPeon); + } + + private CoordinatorDynamicConfig createCoordinatorDynamicConfig() + { + return CoordinatorDynamicConfig.builder() + .withMillisToWaitBeforeDeleting(0) + .withMergeBytesLimit(0) + .withMergeSegmentsLimit(0) + .withMaxSegmentsToMove(0) + .withReplicantLifetime(1) + .withReplicationThrottleLimit(24) + .withBalancerComputeThreads(0) + .withEmitBalancingStats(false) + .withKillDataSourceWhitelist(null) + .withKillAllDataSources(false) + .withMaxSegmentsInNodeLoadingQueue(1000) + .build(); + } } diff --git a/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTester.java b/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTester.java index a6e2e5673d4..f394ad04b2e 100644 --- a/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTester.java +++ b/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTester.java @@ -46,4 +46,10 @@ public class LoadQueuePeonTester extends LoadQueuePeon { return segmentsToLoad; } + + @Override + public int getNumberOfSegmentsInQueue() + { + return segmentsToLoad.size(); + } } diff --git a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java index 64d91e7c124..2a584ce61c5 100644 --- a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java @@ -36,12 +36,14 @@ import io.druid.client.DruidServer; import io.druid.jackson.DefaultObjectMapper; import io.druid.server.coordination.ServerType; import io.druid.server.coordinator.BalancerStrategy; +import io.druid.server.coordinator.CoordinatorDynamicConfig; import io.druid.server.coordinator.CoordinatorStats; import io.druid.server.coordinator.CostBalancerStrategyFactory; import io.druid.server.coordinator.DruidCluster; import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; import io.druid.server.coordinator.LoadPeonCallback; import io.druid.server.coordinator.LoadQueuePeon; +import io.druid.server.coordinator.LoadQueuePeonTester; import io.druid.server.coordinator.ReplicationThrottler; import io.druid.server.coordinator.SegmentReplicantLookup; import io.druid.server.coordinator.ServerHolder; @@ -91,17 +93,7 @@ public class LoadRuleTest for (String tier : Arrays.asList("hot", DruidServer.DEFAULT_TIER)) { throttler.updateReplicationState(tier); } - segment = new DataSegment( - "foo", - new Interval("0/3000"), - new DateTime().toString(), - Maps.newHashMap(), - Lists.newArrayList(), - Lists.newArrayList(), - NoneShardSpec.instance(), - 0, - 0 - ); + segment = createDataSegment("foo"); } @After @@ -118,6 +110,7 @@ public class LoadRuleTest EasyMock.expectLastCall().atLeastOnce(); EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); + EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes(); EasyMock.replay(mockPeon); LoadRule rule = new LoadRule() @@ -197,9 +190,9 @@ public class LoadRuleTest ); ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1)); + Executors.newFixedThreadPool(1)); BalancerStrategy balancerStrategy = - new CostBalancerStrategyFactory().createBalancerStrategy(exec); + new CostBalancerStrategyFactory().createBalancerStrategy(exec); CoordinatorStats stats = rule.run( null, @@ -225,6 +218,7 @@ public class LoadRuleTest EasyMock.expectLastCall().atLeastOnce(); EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).anyTimes(); + EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes(); EasyMock.replay(mockPeon); LoadRule rule = new LoadRule() @@ -308,9 +302,9 @@ public class LoadRuleTest ); ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1)); + Executors.newFixedThreadPool(1)); BalancerStrategy balancerStrategy = - new CostBalancerStrategyFactory().createBalancerStrategy(exec); + new CostBalancerStrategyFactory().createBalancerStrategy(exec); CoordinatorStats stats = rule.run( null, @@ -336,6 +330,7 @@ public class LoadRuleTest EasyMock.expectLastCall().atLeastOnce(); EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); + EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes(); EasyMock.replay(mockPeon); LoadRule rule = new LoadRule() @@ -399,9 +394,9 @@ public class LoadRuleTest ); ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1)); + Executors.newFixedThreadPool(1)); BalancerStrategy balancerStrategy = - new CostBalancerStrategyFactory().createBalancerStrategy(exec); + new CostBalancerStrategyFactory().createBalancerStrategy(exec); CoordinatorStats stats = rule.run( null, @@ -426,6 +421,7 @@ public class LoadRuleTest EasyMock.expectLastCall().atLeastOnce(); EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).anyTimes(); + EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes(); EasyMock.replay(mockPeon); LoadRule rule = new LoadRule() @@ -505,9 +501,9 @@ public class LoadRuleTest ); ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1)); + Executors.newFixedThreadPool(1)); BalancerStrategy balancerStrategy = - new CostBalancerStrategyFactory().createBalancerStrategy(exec); + new CostBalancerStrategyFactory().createBalancerStrategy(exec); CoordinatorStats stats = rule.run( null, @@ -524,4 +520,115 @@ public class LoadRuleTest Assert.assertEquals(1L, stats.getTieredStat("droppedCount", "hot")); exec.shutdown(); } + + @Test + public void testMaxLoadingQueueSize() throws Exception + { + EasyMock.replay(mockPeon); + LoadQueuePeonTester peon = new LoadQueuePeonTester(); + + LoadRule rule = new LoadRule() + { + private final Map tiers = ImmutableMap.of( + "hot", 1 + ); + + @Override + public Map getTieredReplicants() + { + return tiers; + } + + @Override + public int getNumReplicants(String tier) + { + return tiers.get(tier); + } + + @Override + public String getType() + { + return "test"; + } + + @Override + public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) + { + return true; + } + + @Override + public boolean appliesTo(Interval interval, DateTime referenceTimestamp) + { + return true; + } + }; + + DruidCluster druidCluster = new DruidCluster( + null, + ImmutableMap.of( + "hot", + MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( + Arrays.asList( + new ServerHolder( + new DruidServer( + "serverHot", + "hostHot", + 1000, + ServerType.HISTORICAL, + "hot", + 0 + ).toImmutableDruidServer(), + peon + ) + ) + ) + ) + ); + + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + + DataSegment dataSegment1 = createDataSegment("ds1"); + DataSegment dataSegment2 = createDataSegment("ds2"); + DataSegment dataSegment3 = createDataSegment("ds3"); + + DruidCoordinatorRuntimeParams params = + DruidCoordinatorRuntimeParams + .newBuilder() + .withDruidCluster(druidCluster) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) + .withReplicationManager(throttler) + .withBalancerStrategy(balancerStrategy) + .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) + .withAvailableSegments(Arrays.asList(dataSegment1, dataSegment2, dataSegment3)) + .withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMaxSegmentsInNodeLoadingQueue(2).build()) + .build(); + + CoordinatorStats stats1 = rule.run(null, params, dataSegment1); + CoordinatorStats stats2 = rule.run(null, params, dataSegment2); + CoordinatorStats stats3 = rule.run(null, params, dataSegment3); + + Assert.assertEquals(1L, stats1.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot")); + Assert.assertEquals(1L, stats2.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot")); + Assert.assertEquals(0L, stats3.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot")); + exec.shutdown(); + } + + private DataSegment createDataSegment(String dataSource) + { + return new DataSegment( + dataSource, + new Interval("0/3000"), + new DateTime().toString(), + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), + NoneShardSpec.instance(), + 0, + 0 + ); + } } diff --git a/server/src/test/java/io/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/io/druid/server/http/CoordinatorDynamicConfigTest.java index cf6f022c2aa..cab658aa463 100644 --- a/server/src/test/java/io/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/io/druid/server/http/CoordinatorDynamicConfigTest.java @@ -32,6 +32,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; /** @@ -69,7 +70,8 @@ public class CoordinatorDynamicConfigTest + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" + " \"emitBalancingStats\": true,\n" - + " \"killDataSourceWhitelist\": [\"test1\",\"test2\"]\n" + + " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n" + + " \"maxSegmentsInNodeLoadingQueue\": 1\n" + "}\n"; CoordinatorDynamicConfig actual = mapper.readValue( @@ -81,11 +83,7 @@ public class CoordinatorDynamicConfigTest ), CoordinatorDynamicConfig.class ); - - Assert.assertEquals( - new CoordinatorDynamicConfig(1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of("test1", "test2"), false), - actual - ); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of("test1", "test2"), false, 1); } @Test @@ -100,7 +98,36 @@ public class CoordinatorDynamicConfigTest + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" + " \"emitBalancingStats\": true,\n" - + " \"killDataSourceWhitelist\": \" test1 ,test2 \"\n" + + " \"killDataSourceWhitelist\": \" test1 ,test2 \", \n" + + " \"maxSegmentsInNodeLoadingQueue\": 1\n" + + "}\n"; + + CoordinatorDynamicConfig actual = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class + ); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of("test1", "test2"), false, 1); + } + + @Test + public void testSerdeWithKillAllDataSources() throws Exception + { + String jsonStr = "{\n" + + " \"millisToWaitBeforeDeleting\": 1,\n" + + " \"mergeBytesLimit\": 1,\n" + + " \"mergeSegmentsLimit\" : 1,\n" + + " \"maxSegmentsToMove\": 1,\n" + + " \"replicantLifetime\": 1,\n" + + " \"replicationThrottleLimit\": 1,\n" + + " \"balancerComputeThreads\": 2, \n" + + " \"emitBalancingStats\": true,\n" + + " \"killAllDataSources\": true,\n" + + " \"maxSegmentsInNodeLoadingQueue\": 1\n" + "}\n"; CoordinatorDynamicConfig actual = mapper.readValue( @@ -113,14 +140,27 @@ public class CoordinatorDynamicConfigTest CoordinatorDynamicConfig.class ); - Assert.assertEquals( - new CoordinatorDynamicConfig(1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of("test1", "test2"), false), - actual - ); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1); + + //ensure whitelist is empty when killAllDataSources is true + try { + jsonStr = "{\n" + + " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n" + + " \"killAllDataSources\": true\n" + + "}\n"; + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ); + + Assert.fail("deserialization should fail."); + } catch (JsonMappingException e) { + Assert.assertTrue(e.getCause() instanceof IAE); + } } @Test - public void testSerdeWithKillAllDataSources() throws Exception + public void testDeserializeWithoutMaxSegmentsInNodeLoadingQueue() throws Exception { String jsonStr = "{\n" + " \"millisToWaitBeforeDeleting\": 1,\n" @@ -144,43 +184,23 @@ public class CoordinatorDynamicConfigTest CoordinatorDynamicConfig.class ); - Assert.assertEquals( - new CoordinatorDynamicConfig(1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true), - actual - ); - - - - //ensure whitelist is empty when killAllDataSources is true - try { - jsonStr = "{\n" - + " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n" - + " \"killAllDataSources\": true\n" - + "}\n"; - mapper.readValue( - jsonStr, - CoordinatorDynamicConfig.class - ); - - Assert.fail("deserialization should fail."); - } catch (JsonMappingException e) { - Assert.assertTrue(e.getCause() instanceof IAE); - } + assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0); } @Test public void testBuilderDefaults() { - Assert.assertEquals( - new CoordinatorDynamicConfig(900000, 524288000, 100, 5, 15, 10, 1, false, null, false), - new CoordinatorDynamicConfig.Builder().build() - ); + + CoordinatorDynamicConfig defaultConfig = CoordinatorDynamicConfig.builder().build(); + assertConfig(defaultConfig, 900000, 524288000, 100, 5, 15, 10, 1, false, ImmutableSet.of(), false, 0); } @Test public void testUpdate() { - CoordinatorDynamicConfig current = new CoordinatorDynamicConfig(99, 99, 99, 99, 99, 99, 99, true, ImmutableSet.of("x"), false); + CoordinatorDynamicConfig current = CoordinatorDynamicConfig.builder() + .withKillDataSourceWhitelist(ImmutableSet.of("x")) + .build(); JacksonConfigManager mock = EasyMock.mock(JacksonConfigManager.class); EasyMock.expect(mock.watch(CoordinatorDynamicConfig.CONFIG_KEY, CoordinatorDynamicConfig.class)).andReturn( new AtomicReference<>(current) @@ -188,17 +208,42 @@ public class CoordinatorDynamicConfigTest EasyMock.replay(mock); Assert.assertEquals( current, - new CoordinatorDynamicConfig(mock, null, null, null, null, null, null, null, null, null, null) + new CoordinatorDynamicConfig(mock, null, null, null, null, null, null, null, null, null, null, null) ); } @Test public void testEqualsAndHashCodeSanity() { - CoordinatorDynamicConfig config1 = new CoordinatorDynamicConfig(900000, 524288000, 100, 5, 15, 10, 1, false, null, false); - CoordinatorDynamicConfig config2 = new CoordinatorDynamicConfig(900000, 524288000, 100, 5, 15, 10, 1, false, null, false); - + CoordinatorDynamicConfig config1 = CoordinatorDynamicConfig.builder().build(); + CoordinatorDynamicConfig config2 = CoordinatorDynamicConfig.builder().build(); Assert.assertEquals(config1, config2); Assert.assertEquals(config1.hashCode(), config2.hashCode()); } + + private void assertConfig(CoordinatorDynamicConfig config, + long expectedMillisToWaitBeforeDeleting, + long expectedMergeBytesLimit, + int expectedMergeSegmentsLimit, + int expectedMaxSegmentsToMove, + int expectedReplicantLifetime, + int expectedReplicationThrottleLimit, + int expectedBalancerComputeThreads, + boolean expectedEmitingBalancingStats, + Set expectedKillDataSourceWhitelist, + boolean expectedKillAllDataSources, + int expectedMaxSegmentsInNodeLoadingQueue) + { + Assert.assertEquals(expectedMillisToWaitBeforeDeleting, config.getMillisToWaitBeforeDeleting()); + Assert.assertEquals(expectedMergeBytesLimit, config.getMergeBytesLimit()); + Assert.assertEquals(expectedMergeSegmentsLimit, config.getMergeSegmentsLimit()); + Assert.assertEquals(expectedMaxSegmentsToMove, config.getMaxSegmentsToMove()); + Assert.assertEquals(expectedReplicantLifetime, config.getReplicantLifetime()); + Assert.assertEquals(expectedReplicationThrottleLimit, config.getReplicationThrottleLimit()); + Assert.assertEquals(expectedBalancerComputeThreads, config.getBalancerComputeThreads()); + Assert.assertEquals(expectedEmitingBalancingStats, config.emitBalancingStats()); + Assert.assertEquals(expectedKillDataSourceWhitelist, config.getKillDataSourceWhitelist()); + Assert.assertEquals(expectedKillAllDataSources, config.isKillAllDataSources()); + Assert.assertEquals(expectedMaxSegmentsInNodeLoadingQueue, config.getMaxSegmentsInNodeLoadingQueue()); + } }