From e423e99997e0faf54856fb6b525bc0c03471e78a Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Thu, 5 Aug 2021 14:26:58 -0400 Subject: [PATCH] Update default maxSegmentsInNodeLoadingQueue (#11540) * Update default maxSegmentsInNodeLoadingQueue Update the default maxSegmentsInNodeLoadingQueue from 0 (unbounded) to 100. An unbounded maxSegmentsInNodeLoadingQueue can cause cluster instability. Since this is the default druid operators need to run into this instability and then look through the docs to see that the recommended value for a large cluster is 1000. This change makes it so the default will prevent clusters from falling over as they grow over time. * update tests * codestyle --- docs/configuration/index.md | 2 +- .../coordinator/CoordinatorDynamicConfig.java | 36 ++- .../http/CoordinatorDynamicConfigTest.java | 302 +++++++++++++++++- 3 files changed, 309 insertions(+), 31 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 8caaf1cc6bc..09b336d337a 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -840,7 +840,7 @@ Issuing a GET request at the same URL will return the spec that is currently in |`killDataSourceWhitelist`|List of specific data sources for which kill tasks are sent if property `druid.coordinator.kill.on` is true. This can be a list of comma-separated data source names or a JSON array.|none| |`killAllDataSources`|Send kill tasks for ALL dataSources if property `druid.coordinator.kill.on` is true. If this is set to true then `killDataSourceWhitelist` must not be specified or be empty list.|false| |`killPendingSegmentsSkipList`|List of data sources for which pendingSegments are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is true. This can be a list of comma-separated data sources or a JSON array.|none| -|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be queued for loading to any given server. This parameter could be used to speed up segments loading process, especially if there are "slow" nodes in the cluster (with low loading speed) or if too much segments scheduled to be replicated to some particular node (faster loading could be preferred to better segments distribution). Desired value depends on segments loading speed, acceptable replication time and number of nodes. Value 1000 could be a start point for a rather big cluster. Default value is 0 (loading queue is unbounded) |0| +|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be queued for loading to any given server. This parameter could be used to speed up segments loading process, especially if there are "slow" nodes in the cluster (with low loading speed) or if too much segments scheduled to be replicated to some particular node (faster loading could be preferred to better segments distribution). Desired value depends on segments loading speed, acceptable replication time and number of nodes. Value 1000 could be a start point for a rather big cluster. Default value is 100. |100| |`decommissioningNodes`| List of historical servers to 'decommission'. Coordinator will not assign new segments to 'decommissioning' servers, and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate specified by `decommissioningMaxPercentOfMaxSegmentsToMove`.|none| |`decommissioningMaxPercentOfMaxSegmentsToMove`| The maximum number of segments that may be moved away from 'decommissioning' servers to non-decommissioning (that is, active) servers during one Coordinator run. This value is relative to the total maximum segment movements allowed during one run which is determined by `maxSegmentsToMove`. If `decommissioningMaxPercentOfMaxSegmentsToMove` is 0, segments will neither be moved from _or to_ 'decommissioning' servers, effectively putting them in a sort of "maintenance" mode that will not participate in balancing or assignment by load rules. Decommissioning can also become stalled if there are no available active servers to place the segments. By leveraging the maximum percent of decommissioning segment movements, an operator can prevent active servers from overload by prioritizing balancing, or decrease decommissioning time instead. The value should be between 0 and 100.|70| |`pauseCoordination`| Boolean flag for whether or not the coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` Interface. Such duties include: Segment balancing, Segment compaction, Emission of metrics controlled by the dynamic coordinator config `emitBalancingStats`, Submitting kill tasks for unused segments (if enabled), Logging of used segments in the cluster, Marking of newly unused or overshadowed segments, Matching and execution of load/drop rules for used segments, Unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS Name Nodes with downtime and don't want the coordinator to be directing Historical Nodes to hit the Name Node with API requests until maintenance is done and the deep store is declared healthy for use again. |false| diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index d4bf3e8b05d..29d8052e967 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -61,7 +61,9 @@ public class CoordinatorDynamicConfig private final int balancerComputeThreads; private final boolean emitBalancingStats; - /** If true {@link KillUnusedSegments} sends kill tasks for unused segments in all data sources. */ + /** + * If true {@link KillUnusedSegments} sends kill tasks for unused segments in all data sources. + */ private final boolean killUnusedSegmentsInAllDataSources; /** @@ -74,7 +76,7 @@ public class CoordinatorDynamicConfig /** * Stale pending segments belonging to the data sources in this list are not killed by {@link * KillStalePendingSegments}. In other words, segments in these data sources are "protected". - * + *

* Pending segments are considered "stale" when their created_time is older than {@link * KillStalePendingSegments#KEEP_PENDING_SEGMENTS_OFFSET} from now. */ @@ -134,7 +136,7 @@ public class CoordinatorDynamicConfig // Keeping the legacy 'killPendingSegmentsSkipList' property name for backward compatibility. When the project is // updated to Jackson 2.9 it could be changed, see https://github.com/apache/druid/issues/7152 @JsonProperty("killPendingSegmentsSkipList") Object dataSourcesToNotKillStalePendingSegmentsIn, - @JsonProperty("maxSegmentsInNodeLoadingQueue") int maxSegmentsInNodeLoadingQueue, + @JsonProperty("maxSegmentsInNodeLoadingQueue") @Nullable Integer maxSegmentsInNodeLoadingQueue, @JsonProperty("decommissioningNodes") Object decommissioningNodes, @JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") int decommissioningMaxPercentOfMaxSegmentsToMove, @JsonProperty("pauseCoordination") boolean pauseCoordination, @@ -149,11 +151,12 @@ public class CoordinatorDynamicConfig this.maxSegmentsToMove = maxSegmentsToMove; if (percentOfSegmentsToConsiderPerMove == null) { - log.debug("percentOfSegmentsToConsiderPerMove was null! This is likely because your metastore does not " - + "reflect this configuration being added to Druid in a recent release. Druid is defaulting the value " - + "to the Druid default of %f. It is recommended that you re-submit your dynamic config with your " - + "desired value for percentOfSegmentsToConsideredPerMove", - Builder.DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE + log.debug( + "percentOfSegmentsToConsiderPerMove was null! This is likely because your metastore does not " + + "reflect this configuration being added to Druid in a recent release. Druid is defaulting the value " + + "to the Druid default of %f. It is recommended that you re-submit your dynamic config with your " + + "desired value for percentOfSegmentsToConsideredPerMove", + Builder.DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE ); percentOfSegmentsToConsiderPerMove = Builder.DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE; } @@ -172,7 +175,9 @@ public class CoordinatorDynamicConfig this.specificDataSourcesToKillUnusedSegmentsIn = parseJsonStringOrArray(specificDataSourcesToKillUnusedSegmentsIn); this.dataSourcesToNotKillStalePendingSegmentsIn = parseJsonStringOrArray(dataSourcesToNotKillStalePendingSegmentsIn); - this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue; + this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue == null + ? Builder.DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE + : maxSegmentsInNodeLoadingQueue; this.decommissioningNodes = parseJsonStringOrArray(decommissioningNodes); Preconditions.checkArgument( decommissioningMaxPercentOfMaxSegmentsToMove >= 0 && decommissioningMaxPercentOfMaxSegmentsToMove <= 100, @@ -517,7 +522,7 @@ public class CoordinatorDynamicConfig private static final boolean DEFAULT_EMIT_BALANCING_STATS = false; private static final boolean DEFAULT_USE_BATCHED_SEGMENT_SAMPLER = false; private static final boolean DEFAULT_KILL_UNUSED_SEGMENTS_IN_ALL_DATA_SOURCES = false; - private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 0; + private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 100; private static final int DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70; private static final boolean DEFAULT_PAUSE_COORDINATION = false; private static final boolean DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT = false; @@ -732,7 +737,8 @@ public class CoordinatorDynamicConfig : decommissioningMaxPercentOfMaxSegmentsToMove, pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION : pauseCoordination, replicateAfterLoadTimeout == null ? DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout, - maxNonPrimaryReplicantsToLoad == null ? DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD : maxNonPrimaryReplicantsToLoad + maxNonPrimaryReplicantsToLoad == null ? DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD + : maxNonPrimaryReplicantsToLoad ); } @@ -745,7 +751,9 @@ public class CoordinatorDynamicConfig mergeBytesLimit == null ? defaults.getMergeBytesLimit() : mergeBytesLimit, mergeSegmentsLimit == null ? defaults.getMergeSegmentsLimit() : mergeSegmentsLimit, maxSegmentsToMove == null ? defaults.getMaxSegmentsToMove() : maxSegmentsToMove, - percentOfSegmentsToConsiderPerMove == null ? defaults.getPercentOfSegmentsToConsiderPerMove() : percentOfSegmentsToConsiderPerMove, + percentOfSegmentsToConsiderPerMove == null + ? defaults.getPercentOfSegmentsToConsiderPerMove() + : percentOfSegmentsToConsiderPerMove, useBatchedSegmentSampler == null ? defaults.useBatchedSegmentSampler() : useBatchedSegmentSampler, replicantLifetime == null ? defaults.getReplicantLifetime() : replicantLifetime, replicationThrottleLimit == null ? defaults.getReplicationThrottleLimit() : replicationThrottleLimit, @@ -769,7 +777,9 @@ public class CoordinatorDynamicConfig : decommissioningMaxPercentOfMaxSegmentsToMove, pauseCoordination == null ? defaults.getPauseCoordination() : pauseCoordination, replicateAfterLoadTimeout == null ? defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout, - maxNonPrimaryReplicantsToLoad == null ? defaults.getMaxNonPrimaryReplicantsToLoad() : maxNonPrimaryReplicantsToLoad + maxNonPrimaryReplicantsToLoad == null + ? defaults.getMaxNonPrimaryReplicantsToLoad() + : maxNonPrimaryReplicantsToLoad ); } } diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index b6776a8d578..b96591a1664 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -31,9 +31,12 @@ import org.junit.Test; import java.util.Set; /** + * */ public class CoordinatorDynamicConfigTest { + private static final int EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 100; + private final ObjectMapper mapper = TestHelper.makeJsonMapper(); @Test @@ -69,25 +72,158 @@ public class CoordinatorDynamicConfigTest ); ImmutableSet decommissioning = ImmutableSet.of("host1", "host2"); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false, false, Integer.MAX_VALUE); + assertConfig( + actual, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 2, + true, + whitelist, + false, + 1, + decommissioning, + 9, + false, + false, + Integer.MAX_VALUE + ); actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9, false, false, Integer.MAX_VALUE); + assertConfig( + actual, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 2, + true, + whitelist, + false, + 1, + ImmutableSet.of("host1"), + 9, + false, + false, + Integer.MAX_VALUE + ); actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false, false, Integer.MAX_VALUE); + assertConfig( + actual, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 2, + true, + whitelist, + false, + 1, + ImmutableSet.of("host1"), + 5, + false, + false, + Integer.MAX_VALUE + ); actual = CoordinatorDynamicConfig.builder().withPauseCoordination(true).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, false, Integer.MAX_VALUE); + assertConfig( + actual, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 2, + true, + whitelist, + false, + 1, + ImmutableSet.of("host1"), + 5, + true, + false, + Integer.MAX_VALUE + ); actual = CoordinatorDynamicConfig.builder().withPercentOfSegmentsToConsiderPerMove(10).build(actual); - assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, false, Integer.MAX_VALUE); + assertConfig( + actual, + 1, + 1, + 1, + 1, + 10, + 1, + 1, + 2, + true, + whitelist, + false, + 1, + ImmutableSet.of("host1"), + 5, + true, + false, + Integer.MAX_VALUE + ); actual = CoordinatorDynamicConfig.builder().withReplicateAfterLoadTimeout(true).build(actual); - assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, true, Integer.MAX_VALUE); + assertConfig( + actual, + 1, + 1, + 1, + 1, + 10, + 1, + 1, + 2, + true, + whitelist, + false, + 1, + ImmutableSet.of("host1"), + 5, + true, + true, + Integer.MAX_VALUE + ); actual = CoordinatorDynamicConfig.builder().withMaxNonPrimaryReplicantsToLoad(10).build(actual); - assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, true, 10); + assertConfig( + actual, + 1, + 1, + 1, + 1, + 10, + 1, + 1, + 2, + true, + whitelist, + false, + 1, + ImmutableSet.of("host1"), + 5, + true, + true, + 10 + ); } @@ -118,13 +254,70 @@ public class CoordinatorDynamicConfigTest ); ImmutableSet decommissioning = ImmutableSet.of(); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false, false, Integer.MAX_VALUE); + assertConfig( + actual, + 1, + 1, + 1, + 1, + 100, + 1, + 1, + 2, + true, + whitelist, + false, + 1, + decommissioning, + 0, + false, + false, + Integer.MAX_VALUE + ); actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); - assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false, false, Integer.MAX_VALUE); + assertConfig( + actual, + 1, + 1, + 1, + 1, + 100, + 1, + 1, + 2, + true, + whitelist, + false, + 1, + ImmutableSet.of("host1"), + 0, + false, + false, + Integer.MAX_VALUE + ); actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual); - assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false, false, Integer.MAX_VALUE); + assertConfig( + actual, + 1, + 1, + 1, + 1, + 100, + 1, + 1, + 2, + true, + whitelist, + false, + 1, + ImmutableSet.of("host1"), + 5, + false, + false, + Integer.MAX_VALUE + ); } @Test @@ -271,7 +464,26 @@ public class CoordinatorDynamicConfigTest ); ImmutableSet decommissioning = ImmutableSet.of("host1", "host2"); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false, false, Integer.MAX_VALUE); + assertConfig( + actual, + 1, + 1, + 1, + 1, + 100, + 1, + 1, + 2, + true, + whitelist, + false, + 1, + decommissioning, + 9, + false, + false, + Integer.MAX_VALUE + ); } @Test @@ -301,7 +513,26 @@ public class CoordinatorDynamicConfigTest CoordinatorDynamicConfig.class ); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0, false, false, Integer.MAX_VALUE); + assertConfig( + actual, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 2, + true, + ImmutableSet.of(), + true, + 1, + ImmutableSet.of(), + 0, + false, + false, + Integer.MAX_VALUE + ); //ensure whitelist is empty when killAllDataSources is true try { @@ -348,7 +579,26 @@ public class CoordinatorDynamicConfigTest CoordinatorDynamicConfig.class ); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0, false, false, Integer.MAX_VALUE); + assertConfig( + actual, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 2, + true, + ImmutableSet.of(), + true, + EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE, + ImmutableSet.of(), + 0, + false, + false, + Integer.MAX_VALUE + ); } @Test @@ -369,7 +619,7 @@ public class CoordinatorDynamicConfigTest false, emptyList, false, - 0, + EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE, emptyList, 70, false, @@ -388,9 +638,27 @@ public class CoordinatorDynamicConfigTest Assert.assertEquals( current, - new CoordinatorDynamicConfig - .Builder(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null) - .build(current) + new CoordinatorDynamicConfig.Builder( + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ).build(current) ); }