diff --git a/docs/configuration/index.md b/docs/configuration/index.md index ac439660214..1e952b8ee7f 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -741,7 +741,8 @@ A sample Coordinator dynamic config JSON object is shown below: "emitBalancingStats": false, "killDataSourceWhitelist": ["wikipedia", "testDatasource"], "decommissioningNodes": ["localhost:8182", "localhost:8282"], - "decommissioningMaxPercentOfMaxSegmentsToMove": 70 + "decommissioningMaxPercentOfMaxSegmentsToMove": 70, + "pauseCoordinator": false } ``` @@ -763,6 +764,8 @@ Issuing a GET request at the same URL will return the spec that is currently in |`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be queued for loading to any given server. This parameter could be used to speed up segments loading process, especially if there are "slow" nodes in the cluster (with low loading speed) or if too much segments scheduled to be replicated to some particular node (faster loading could be preferred to better segments distribution). Desired value depends on segments loading speed, acceptable replication time and number of nodes. Value 1000 could be a start point for a rather big cluster. Default value is 0 (loading queue is unbounded) |0| |`decommissioningNodes`| List of historical servers to 'decommission'. Coordinator will not assign new segments to 'decommissioning' servers, and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate specified by `decommissioningMaxPercentOfMaxSegmentsToMove`.|none| |`decommissioningMaxPercentOfMaxSegmentsToMove`| The maximum number of segments that may be moved away from 'decommissioning' servers to non-decommissioning (that is, active) servers during one Coordinator run. This value is relative to the total maximum segment movements allowed during one run which is determined by `maxSegmentsToMove`. If `decommissioningMaxPercentOfMaxSegmentsToMove` is 0, segments will neither be moved from _or to_ 'decommissioning' servers, effectively putting them in a sort of "maintenance" mode that will not participate in balancing or assignment by load rules. Decommissioning can also become stalled if there are no available active servers to place the segments. By leveraging the maximum percent of decommissioning segment movements, an operator can prevent active servers from overload by prioritizing balancing, or decrease decommissioning time instead. The value should be between 0 and 100.|70| +|`pauseCoordinator`| Boolean flag for whether or not the coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` Interface. Such duties include: Segment balancing, Segment compaction, Emission of metrics controlled by the dynamic coordinator config `emitBalancingStats`, Submitting kill tasks for unused segments (if enabled), Logging of used segments in the cluster, Marking of newly unused or overshadowed segments, Matching and execution of load/drop rules for used segments, Unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS Name Nodes with downtime and don't want the coordinator to be directing Historical Nodes to hit the Name Node with API requests until maintenance is done and the deep store is declared healthy for use again. |false| + To view the audit history of Coordinator dynamic config issue a GET request to the URL - @@ -1839,4 +1842,4 @@ Supported query contexts: |`druid.router.http.readTimeout`|The timeout for data reads from Broker processes.|`PT15M`| |`druid.router.http.numMaxThreads`|Maximum number of worker threads to handle HTTP requests and responses|`max(10, ((number of cores * 17) / 16 + 2) + 30)`| |`druid.router.http.numRequestsQueued`|Maximum number of requests that may be queued to a destination|`1024`| -|`druid.router.http.requestBuffersize`|Size of the content buffer for receiving requests. These buffers are only used for active connections that have requests with bodies that will not fit within the header buffer|`8 * 1024`| \ No newline at end of file +|`druid.router.http.requestBuffersize`|Size of the content buffer for receiving requests. These buffers are only used for active connections that have requests with bodies that will not fit within the header buffer|`8 * 1024`| diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java index 7e0adff6255..b6d5e28801c 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java @@ -32,6 +32,7 @@ import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.StatusResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.apache.druid.query.lookup.LookupsState; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.lookup.cache.LookupExtractorFactoryMapContainer; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.guice.TestClient; @@ -295,6 +296,41 @@ public class CoordinatorResourceTestClient return isLoaded; } + public void postDynamicConfig(CoordinatorDynamicConfig coordinatorDynamicConfig) throws Exception + { + String url = StringUtils.format("%sconfig", getCoordinatorURL()); + StatusResponseHolder response = httpClient.go( + new Request(HttpMethod.POST, new URL(url)).setContent( + "application/json", + jsonMapper.writeValueAsBytes(coordinatorDynamicConfig) + ), responseHandler + ).get(); + + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while setting dynamic config[%s] status[%s] content[%s]", + url, + response.getStatus(), + response.getContent() + ); + } + } + + public CoordinatorDynamicConfig getDynamicConfig() + { + String url = StringUtils.format("%sconfig", getCoordinatorURL()); + CoordinatorDynamicConfig config; + + try { + StatusResponseHolder response = makeRequest(HttpMethod.GET, url); + config = jsonMapper.readValue(response.getContent(), CoordinatorDynamicConfig.class); + } + catch (Exception e) { + throw new RuntimeException(e); + } + return config; + } + private StatusResponseHolder makeRequest(HttpMethod method, String url) { try { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java index 9fc01a7451d..e8cf277a075 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java @@ -62,10 +62,12 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest String dataSource, String indexTaskFilePath, String queryFilePath, - boolean waitForNewVersion + boolean waitForNewVersion, + boolean runTestQueries, + boolean waitForSegmentsToLoad ) throws IOException { - doIndexTest(dataSource, indexTaskFilePath, Function.identity(), queryFilePath, waitForNewVersion); + doIndexTest(dataSource, indexTaskFilePath, Function.identity(), queryFilePath, waitForNewVersion, runTestQueries, waitForSegmentsToLoad); } void doIndexTest( @@ -73,7 +75,9 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest String indexTaskFilePath, Function taskSpecTransform, String queryFilePath, - boolean waitForNewVersion + boolean waitForNewVersion, + boolean runTestQueries, + boolean waitForSegmentsToLoad ) throws IOException { final String fullDatasourceName = dataSource + config.getExtraDatasourceNameSuffix(); @@ -85,29 +89,31 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest ) ); - submitTaskAndWait(taskSpec, fullDatasourceName, waitForNewVersion); - try { - - String queryResponseTemplate; + submitTaskAndWait(taskSpec, fullDatasourceName, waitForNewVersion, waitForSegmentsToLoad); + if (runTestQueries) { try { - InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryFilePath); - queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8); - } - catch (IOException e) { - throw new ISE(e, "could not read query file: %s", queryFilePath); - } - queryResponseTemplate = StringUtils.replace( - queryResponseTemplate, - "%%DATASOURCE%%", - fullDatasourceName - ); - queryHelper.testQueriesFromString(queryResponseTemplate, 2); + String queryResponseTemplate; + try { + InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryFilePath); + queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8); + } + catch (IOException e) { + throw new ISE(e, "could not read query file: %s", queryFilePath); + } - } - catch (Exception e) { - LOG.error(e, "Error while testing"); - throw new RuntimeException(e); + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%DATASOURCE%%", + fullDatasourceName + ); + queryHelper.testQueriesFromString(queryResponseTemplate, 2); + + } + catch (Exception e) { + LOG.error(e, "Error while testing"); + throw new RuntimeException(e); + } } } @@ -146,7 +152,7 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest taskSpec = taskSpecTransform.apply(taskSpec); - submitTaskAndWait(taskSpec, fullReindexDatasourceName, false); + submitTaskAndWait(taskSpec, fullReindexDatasourceName, false, true); try { String queryResponseTemplate; try { @@ -190,7 +196,7 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest fullDatasourceName ); - submitTaskAndWait(taskSpec, fullDatasourceName, false); + submitTaskAndWait(taskSpec, fullDatasourceName, false, true); try { sqlQueryHelper.testQueriesFromFile(queryFilePath, 2); } @@ -200,7 +206,12 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest } } - private void submitTaskAndWait(String taskSpec, String dataSourceName, boolean waitForNewVersion) + private void submitTaskAndWait( + String taskSpec, + String dataSourceName, + boolean waitForNewVersion, + boolean waitForSegmentsToLoad + ) { final List oldVersions = waitForNewVersion ? coordinator.getAvailableSegments(dataSourceName) : null; @@ -249,9 +260,11 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest ); } - ITRetryUtil.retryUntilTrue( - () -> coordinator.areSegmentsLoaded(dataSourceName), "Segment Load" - ); + if (waitForSegmentsToLoad) { + ITRetryUtil.retryUntilTrue( + () -> coordinator.areSegmentsLoaded(dataSourceName), "Segment Load" + ); + } } private long countCompleteSubTasks(final String dataSource, final boolean perfectRollup) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITBestEffortRollupParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITBestEffortRollupParallelIndexTest.java index 0c975b208e6..0b14f92dc56 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITBestEffortRollupParallelIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITBestEffortRollupParallelIndexTest.java @@ -89,7 +89,9 @@ public class ITBestEffortRollupParallelIndexTest extends AbstractITBatchIndexTes INDEX_TASK, rollupTransform, INDEX_QUERIES_RESOURCE, - false + false, + true, + true ); // Index again, this time only choosing the second data file, and without explicit intervals chosen. @@ -99,6 +101,8 @@ public class ITBestEffortRollupParallelIndexTest extends AbstractITBatchIndexTes REINDEX_TASK, rollupTransform, REINDEX_QUERIES_RESOURCE, + true, + true, true ); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java index 8feae3ac1b8..6ff804ea0e2 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java @@ -49,7 +49,9 @@ public class ITIndexerTest extends AbstractITBatchIndexTest INDEX_DATASOURCE, INDEX_TASK, INDEX_QUERIES_RESOURCE, - false + false, + true, + true ); doReindexTest( INDEX_DATASOURCE, diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java index 03442032de0..067c2245568 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java @@ -89,7 +89,9 @@ public class ITPerfectRollupParallelIndexTest extends AbstractITBatchIndexTest INDEX_TASK, rollupTransform, INDEX_QUERIES_RESOURCE, - false + false, + true, + true ); doReindexTest( diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITTestCoordinatorPausedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITTestCoordinatorPausedTest.java new file mode 100644 index 00000000000..7d34b47d706 --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITTestCoordinatorPausedTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.indexer; + +import com.google.inject.Inject; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.testing.clients.CoordinatorResourceTestClient; +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.testing.utils.ITRetryUtil; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import java.io.Closeable; +import java.util.concurrent.TimeUnit; + +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITTestCoordinatorPausedTest extends AbstractITBatchIndexTest +{ + private static final Logger LOG = new Logger(ITUnionQueryTest.class); + private static final String INDEX_DATASOURCE = "wikipedia_index_test"; + private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json"; + private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json"; + private static final CoordinatorDynamicConfig DYNAMIC_CONFIG_PAUSED = + CoordinatorDynamicConfig.builder().withPauseCoordination(true).build(); + private static final CoordinatorDynamicConfig DYNAMIC_CONFIG_DEFAULT = + CoordinatorDynamicConfig.builder().build(); + + @Inject + CoordinatorResourceTestClient coordinatorClient; + + @Test + public void testCoordinatorPause() throws Exception + { + try ( + final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()) + ) { + coordinatorClient.postDynamicConfig(DYNAMIC_CONFIG_PAUSED); + doIndexTest( + INDEX_DATASOURCE, + INDEX_TASK, + INDEX_QUERIES_RESOURCE, + false, + false, + false + ); + TimeUnit.MINUTES.sleep(3); + if (coordinatorClient.areSegmentsLoaded(INDEX_DATASOURCE)) { + throw new IllegalStateException("Segments Were Loaded Early!"); + } + coordinatorClient.postDynamicConfig(DYNAMIC_CONFIG_DEFAULT); + ITRetryUtil.retryUntilTrue( + () -> coordinator.areSegmentsLoaded(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()), "Segment Load" + ); + } + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index 7f24a53840a..bafeb73c508 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -84,6 +84,7 @@ public class CoordinatorDynamicConfig * See {@link LoadQueuePeon}, {@link org.apache.druid.server.coordinator.rules.LoadRule#run} */ private final int maxSegmentsInNodeLoadingQueue; + private final boolean pauseCoordination; @JsonCreator public CoordinatorDynamicConfig( @@ -113,7 +114,8 @@ public class CoordinatorDynamicConfig @JsonProperty("killPendingSegmentsSkipList") Object dataSourcesToNotKillStalePendingSegmentsIn, @JsonProperty("maxSegmentsInNodeLoadingQueue") int maxSegmentsInNodeLoadingQueue, @JsonProperty("decommissioningNodes") Object decommissioningNodes, - @JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") int decommissioningMaxPercentOfMaxSegmentsToMove + @JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") int decommissioningMaxPercentOfMaxSegmentsToMove, + @JsonProperty("pauseCoordination") boolean pauseCoordination ) { this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments = @@ -142,6 +144,7 @@ public class CoordinatorDynamicConfig "can't have killUnusedSegmentsInAllDataSources and non-empty specificDataSourcesToKillUnusedSegmentsIn" ); } + this.pauseCoordination = pauseCoordination; } private static Set parseJsonStringOrArray(Object jsonStringOrArray) @@ -284,6 +287,12 @@ public class CoordinatorDynamicConfig return decommissioningMaxPercentOfMaxSegmentsToMove; } + @JsonProperty + public boolean getPauseCoordination() + { + return pauseCoordination; + } + @Override public String toString() { @@ -303,6 +312,7 @@ public class CoordinatorDynamicConfig ", maxSegmentsInNodeLoadingQueue=" + maxSegmentsInNodeLoadingQueue + ", decommissioningNodes=" + decommissioningNodes + ", decommissioningMaxPercentOfMaxSegmentsToMove=" + decommissioningMaxPercentOfMaxSegmentsToMove + + ", pauseCoordination=" + pauseCoordination + '}'; } @@ -358,6 +368,9 @@ public class CoordinatorDynamicConfig if (!Objects.equals(decommissioningNodes, that.decommissioningNodes)) { return false; } + if (pauseCoordination != that.pauseCoordination) { + return false; + } return decommissioningMaxPercentOfMaxSegmentsToMove == that.decommissioningMaxPercentOfMaxSegmentsToMove; } @@ -378,7 +391,8 @@ public class CoordinatorDynamicConfig specificDataSourcesToKillUnusedSegmentsIn, dataSourcesToNotKillStalePendingSegmentsIn, decommissioningNodes, - decommissioningMaxPercentOfMaxSegmentsToMove + decommissioningMaxPercentOfMaxSegmentsToMove, + pauseCoordination ); } @@ -401,6 +415,7 @@ public class CoordinatorDynamicConfig private static final boolean DEFAULT_KILL_UNUSED_SEGMENTS_IN_ALL_DATA_SOURCES = false; private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 0; private static final int DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70; + private static final boolean DEFAULT_PAUSE_COORDINATION = false; private Long leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments; private Long mergeBytesLimit; @@ -416,6 +431,7 @@ public class CoordinatorDynamicConfig private Integer maxSegmentsInNodeLoadingQueue; private Object decommissioningNodes; private Integer decommissioningMaxPercentOfMaxSegmentsToMove; + private Boolean pauseCoordination; public Builder() { @@ -438,7 +454,8 @@ public class CoordinatorDynamicConfig @JsonProperty("maxSegmentsInNodeLoadingQueue") @Nullable Integer maxSegmentsInNodeLoadingQueue, @JsonProperty("decommissioningNodes") @Nullable Object decommissioningNodes, @JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") - @Nullable Integer decommissioningMaxPercentOfMaxSegmentsToMove + @Nullable Integer decommissioningMaxPercentOfMaxSegmentsToMove, + @JsonProperty("pauseCoordination") @Nullable Boolean pauseCoordination ) { this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments = @@ -456,6 +473,7 @@ public class CoordinatorDynamicConfig this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue; this.decommissioningNodes = decommissioningNodes; this.decommissioningMaxPercentOfMaxSegmentsToMove = decommissioningMaxPercentOfMaxSegmentsToMove; + this.pauseCoordination = pauseCoordination; } public Builder withLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments(long leadingTimeMillis) @@ -536,6 +554,12 @@ public class CoordinatorDynamicConfig return this; } + public Builder withPauseCoordination(boolean pauseCoordination) + { + this.pauseCoordination = pauseCoordination; + return this; + } + public CoordinatorDynamicConfig build() { return new CoordinatorDynamicConfig( @@ -560,7 +584,8 @@ public class CoordinatorDynamicConfig decommissioningNodes, decommissioningMaxPercentOfMaxSegmentsToMove == null ? DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT - : decommissioningMaxPercentOfMaxSegmentsToMove + : decommissioningMaxPercentOfMaxSegmentsToMove, + pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION : pauseCoordination ); } @@ -592,7 +617,8 @@ public class CoordinatorDynamicConfig decommissioningNodes == null ? defaults.getDecommissioningNodes() : decommissioningNodes, decommissioningMaxPercentOfMaxSegmentsToMove == null ? defaults.getDecommissioningMaxPercentOfMaxSegmentsToMove() - : decommissioningMaxPercentOfMaxSegmentsToMove + : decommissioningMaxPercentOfMaxSegmentsToMove, + pauseCoordination == null ? defaults.getPauseCoordination() : pauseCoordination ); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 539ba869ffb..72c648d8488 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -678,9 +678,23 @@ public class DruidCoordinator .withEmitter(emitter) .withBalancerStrategy(balancerStrategy) .build(); + + boolean coordinationPaused = getDynamicConfigs().getPauseCoordination(); + if (coordinationPaused + && coordLeaderSelector.isLeader() + && startingLeaderCounter == coordLeaderSelector.localTerm()) { + + log.debug( + "Coordination is paused via dynamic configs! I will not be running Coordination Duties at this time" + ); + } + for (CoordinatorDuty duty : duties) { // Don't read state and run state in the same duty otherwise racy conditions may exist - if (coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) { + if (!coordinationPaused + && coordLeaderSelector.isLeader() + && startingLeaderCounter == coordLeaderSelector.localTerm()) { + params = duty.run(params); if (params == null) { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java index f5582550d8e..96f38c3a852 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java @@ -1098,6 +1098,7 @@ public class RunRulesTest .withSpecificDataSourcesToKillUnusedSegmentsIn(null) .withKillUnusedSegmentsInAllDataSources(false) .withMaxSegmentsInNodeLoadingQueue(1000) + .withPauseCoordination(false) .build(); } } diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index f678364b38b..0050c0e0739 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -51,7 +51,8 @@ public class CoordinatorDynamicConfigTest + " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n" + " \"maxSegmentsInNodeLoadingQueue\": 1,\n" + " \"decommissioningNodes\": [\"host1\", \"host2\"],\n" - + " \"decommissioningMaxPercentOfMaxSegmentsToMove\": 9\n" + + " \"decommissioningMaxPercentOfMaxSegmentsToMove\": 9,\n" + + " \"pauseCoordination\": false\n" + "}\n"; CoordinatorDynamicConfig actual = mapper.readValue( @@ -65,13 +66,16 @@ public class CoordinatorDynamicConfigTest ); ImmutableSet decommissioning = ImmutableSet.of("host1", "host2"); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false); actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9, false); actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false); + + actual = CoordinatorDynamicConfig.builder().withPauseCoordination(true).build(actual); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true); } @Test @@ -101,13 +105,13 @@ public class CoordinatorDynamicConfigTest ); ImmutableSet decommissioning = ImmutableSet.of(); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false); actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false); actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false); } @Test @@ -135,7 +139,7 @@ public class CoordinatorDynamicConfigTest ), CoordinatorDynamicConfig.class ); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of("test1", "test2"), false, 1, ImmutableSet.of(), 0); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of("test1", "test2"), false, 1, ImmutableSet.of(), 0, false); } @Test @@ -164,7 +168,7 @@ public class CoordinatorDynamicConfigTest CoordinatorDynamicConfig.class ); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0, false); //ensure whitelist is empty when killAllDataSources is true try { @@ -209,7 +213,7 @@ public class CoordinatorDynamicConfigTest CoordinatorDynamicConfig.class ); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0, false); } @Test @@ -217,7 +221,7 @@ public class CoordinatorDynamicConfigTest { CoordinatorDynamicConfig defaultConfig = CoordinatorDynamicConfig.builder().build(); ImmutableSet emptyList = ImmutableSet.of(); - assertConfig(defaultConfig, 900000, 524288000, 100, 5, 15, 10, 1, false, emptyList, false, 0, emptyList, 70); + assertConfig(defaultConfig, 900000, 524288000, 100, 5, 15, 10, 1, false, emptyList, false, 0, emptyList, 70, false); } @Test @@ -231,7 +235,7 @@ public class CoordinatorDynamicConfigTest Assert.assertEquals( current, new CoordinatorDynamicConfig - .Builder(null, null, null, null, null, null, null, null, null, null, null, null, null, null) + .Builder(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null) .build(current) ); } @@ -259,7 +263,8 @@ public class CoordinatorDynamicConfigTest boolean expectedKillUnusedSegmentsInAllDataSources, int expectedMaxSegmentsInNodeLoadingQueue, Set decommissioningNodes, - int decommissioningMaxPercentOfMaxSegmentsToMove + int decommissioningMaxPercentOfMaxSegmentsToMove, + boolean pauseCoordination ) { Assert.assertEquals( @@ -284,5 +289,6 @@ public class CoordinatorDynamicConfigTest decommissioningMaxPercentOfMaxSegmentsToMove, config.getDecommissioningMaxPercentOfMaxSegmentsToMove() ); + Assert.assertEquals(pauseCoordination, config.getPauseCoordination()); } }