mirror of https://github.com/apache/druid.git
Create new dynamic config to pause coordinator helpers when needed (#9224)
* Create new dynamic config to pause coordinator helpers when needed * Fix spelling mistakes flagged in Travis build * Add an integration test for coordinator pause dynamic config * Improve documentation for new dynamic coordinator config and remove un-needed info logs in favor of debug * address naming convention of 'deep store' vs 'deep storage' in new configs doc line * Fix newline at end of configuration index.md * Last try to resolve newline issue in configuration readme * fix spell checks from travis build * Fix another flagges spelling error from Travis
This commit is contained in:
parent
98cefc61fa
commit
2e1dbe598c
|
@ -741,7 +741,8 @@ A sample Coordinator dynamic config JSON object is shown below:
|
|||
"emitBalancingStats": false,
|
||||
"killDataSourceWhitelist": ["wikipedia", "testDatasource"],
|
||||
"decommissioningNodes": ["localhost:8182", "localhost:8282"],
|
||||
"decommissioningMaxPercentOfMaxSegmentsToMove": 70
|
||||
"decommissioningMaxPercentOfMaxSegmentsToMove": 70,
|
||||
"pauseCoordinator": false
|
||||
}
|
||||
```
|
||||
|
||||
|
@ -763,6 +764,8 @@ Issuing a GET request at the same URL will return the spec that is currently in
|
|||
|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be queued for loading to any given server. This parameter could be used to speed up segments loading process, especially if there are "slow" nodes in the cluster (with low loading speed) or if too much segments scheduled to be replicated to some particular node (faster loading could be preferred to better segments distribution). Desired value depends on segments loading speed, acceptable replication time and number of nodes. Value 1000 could be a start point for a rather big cluster. Default value is 0 (loading queue is unbounded) |0|
|
||||
|`decommissioningNodes`| List of historical servers to 'decommission'. Coordinator will not assign new segments to 'decommissioning' servers, and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate specified by `decommissioningMaxPercentOfMaxSegmentsToMove`.|none|
|
||||
|`decommissioningMaxPercentOfMaxSegmentsToMove`| The maximum number of segments that may be moved away from 'decommissioning' servers to non-decommissioning (that is, active) servers during one Coordinator run. This value is relative to the total maximum segment movements allowed during one run which is determined by `maxSegmentsToMove`. If `decommissioningMaxPercentOfMaxSegmentsToMove` is 0, segments will neither be moved from _or to_ 'decommissioning' servers, effectively putting them in a sort of "maintenance" mode that will not participate in balancing or assignment by load rules. Decommissioning can also become stalled if there are no available active servers to place the segments. By leveraging the maximum percent of decommissioning segment movements, an operator can prevent active servers from overload by prioritizing balancing, or decrease decommissioning time instead. The value should be between 0 and 100.|70|
|
||||
|`pauseCoordinator`| Boolean flag for whether or not the coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` Interface. Such duties include: Segment balancing, Segment compaction, Emission of metrics controlled by the dynamic coordinator config `emitBalancingStats`, Submitting kill tasks for unused segments (if enabled), Logging of used segments in the cluster, Marking of newly unused or overshadowed segments, Matching and execution of load/drop rules for used segments, Unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS Name Nodes with downtime and don't want the coordinator to be directing Historical Nodes to hit the Name Node with API requests until maintenance is done and the deep store is declared healthy for use again. |false|
|
||||
|
||||
|
||||
To view the audit history of Coordinator dynamic config issue a GET request to the URL -
|
||||
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.druid.java.util.http.client.Request;
|
|||
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
|
||||
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
|
||||
import org.apache.druid.query.lookup.LookupsState;
|
||||
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
|
||||
import org.apache.druid.server.lookup.cache.LookupExtractorFactoryMapContainer;
|
||||
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||
import org.apache.druid.testing.guice.TestClient;
|
||||
|
@ -295,6 +296,41 @@ public class CoordinatorResourceTestClient
|
|||
return isLoaded;
|
||||
}
|
||||
|
||||
public void postDynamicConfig(CoordinatorDynamicConfig coordinatorDynamicConfig) throws Exception
|
||||
{
|
||||
String url = StringUtils.format("%sconfig", getCoordinatorURL());
|
||||
StatusResponseHolder response = httpClient.go(
|
||||
new Request(HttpMethod.POST, new URL(url)).setContent(
|
||||
"application/json",
|
||||
jsonMapper.writeValueAsBytes(coordinatorDynamicConfig)
|
||||
), responseHandler
|
||||
).get();
|
||||
|
||||
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
|
||||
throw new ISE(
|
||||
"Error while setting dynamic config[%s] status[%s] content[%s]",
|
||||
url,
|
||||
response.getStatus(),
|
||||
response.getContent()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public CoordinatorDynamicConfig getDynamicConfig()
|
||||
{
|
||||
String url = StringUtils.format("%sconfig", getCoordinatorURL());
|
||||
CoordinatorDynamicConfig config;
|
||||
|
||||
try {
|
||||
StatusResponseHolder response = makeRequest(HttpMethod.GET, url);
|
||||
config = jsonMapper.readValue(response.getContent(), CoordinatorDynamicConfig.class);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return config;
|
||||
}
|
||||
|
||||
private StatusResponseHolder makeRequest(HttpMethod method, String url)
|
||||
{
|
||||
try {
|
||||
|
|
|
@ -62,10 +62,12 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest
|
|||
String dataSource,
|
||||
String indexTaskFilePath,
|
||||
String queryFilePath,
|
||||
boolean waitForNewVersion
|
||||
boolean waitForNewVersion,
|
||||
boolean runTestQueries,
|
||||
boolean waitForSegmentsToLoad
|
||||
) throws IOException
|
||||
{
|
||||
doIndexTest(dataSource, indexTaskFilePath, Function.identity(), queryFilePath, waitForNewVersion);
|
||||
doIndexTest(dataSource, indexTaskFilePath, Function.identity(), queryFilePath, waitForNewVersion, runTestQueries, waitForSegmentsToLoad);
|
||||
}
|
||||
|
||||
void doIndexTest(
|
||||
|
@ -73,7 +75,9 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest
|
|||
String indexTaskFilePath,
|
||||
Function<String, String> taskSpecTransform,
|
||||
String queryFilePath,
|
||||
boolean waitForNewVersion
|
||||
boolean waitForNewVersion,
|
||||
boolean runTestQueries,
|
||||
boolean waitForSegmentsToLoad
|
||||
) throws IOException
|
||||
{
|
||||
final String fullDatasourceName = dataSource + config.getExtraDatasourceNameSuffix();
|
||||
|
@ -85,29 +89,31 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest
|
|||
)
|
||||
);
|
||||
|
||||
submitTaskAndWait(taskSpec, fullDatasourceName, waitForNewVersion);
|
||||
try {
|
||||
|
||||
String queryResponseTemplate;
|
||||
submitTaskAndWait(taskSpec, fullDatasourceName, waitForNewVersion, waitForSegmentsToLoad);
|
||||
if (runTestQueries) {
|
||||
try {
|
||||
InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryFilePath);
|
||||
queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new ISE(e, "could not read query file: %s", queryFilePath);
|
||||
}
|
||||
|
||||
queryResponseTemplate = StringUtils.replace(
|
||||
queryResponseTemplate,
|
||||
"%%DATASOURCE%%",
|
||||
fullDatasourceName
|
||||
);
|
||||
queryHelper.testQueriesFromString(queryResponseTemplate, 2);
|
||||
String queryResponseTemplate;
|
||||
try {
|
||||
InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryFilePath);
|
||||
queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new ISE(e, "could not read query file: %s", queryFilePath);
|
||||
}
|
||||
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.error(e, "Error while testing");
|
||||
throw new RuntimeException(e);
|
||||
queryResponseTemplate = StringUtils.replace(
|
||||
queryResponseTemplate,
|
||||
"%%DATASOURCE%%",
|
||||
fullDatasourceName
|
||||
);
|
||||
queryHelper.testQueriesFromString(queryResponseTemplate, 2);
|
||||
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.error(e, "Error while testing");
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -146,7 +152,7 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest
|
|||
|
||||
taskSpec = taskSpecTransform.apply(taskSpec);
|
||||
|
||||
submitTaskAndWait(taskSpec, fullReindexDatasourceName, false);
|
||||
submitTaskAndWait(taskSpec, fullReindexDatasourceName, false, true);
|
||||
try {
|
||||
String queryResponseTemplate;
|
||||
try {
|
||||
|
@ -190,7 +196,7 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest
|
|||
fullDatasourceName
|
||||
);
|
||||
|
||||
submitTaskAndWait(taskSpec, fullDatasourceName, false);
|
||||
submitTaskAndWait(taskSpec, fullDatasourceName, false, true);
|
||||
try {
|
||||
sqlQueryHelper.testQueriesFromFile(queryFilePath, 2);
|
||||
}
|
||||
|
@ -200,7 +206,12 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest
|
|||
}
|
||||
}
|
||||
|
||||
private void submitTaskAndWait(String taskSpec, String dataSourceName, boolean waitForNewVersion)
|
||||
private void submitTaskAndWait(
|
||||
String taskSpec,
|
||||
String dataSourceName,
|
||||
boolean waitForNewVersion,
|
||||
boolean waitForSegmentsToLoad
|
||||
)
|
||||
{
|
||||
final List<DataSegment> oldVersions = waitForNewVersion ? coordinator.getAvailableSegments(dataSourceName) : null;
|
||||
|
||||
|
@ -249,9 +260,11 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest
|
|||
);
|
||||
}
|
||||
|
||||
ITRetryUtil.retryUntilTrue(
|
||||
() -> coordinator.areSegmentsLoaded(dataSourceName), "Segment Load"
|
||||
);
|
||||
if (waitForSegmentsToLoad) {
|
||||
ITRetryUtil.retryUntilTrue(
|
||||
() -> coordinator.areSegmentsLoaded(dataSourceName), "Segment Load"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private long countCompleteSubTasks(final String dataSource, final boolean perfectRollup)
|
||||
|
|
|
@ -89,7 +89,9 @@ public class ITBestEffortRollupParallelIndexTest extends AbstractITBatchIndexTes
|
|||
INDEX_TASK,
|
||||
rollupTransform,
|
||||
INDEX_QUERIES_RESOURCE,
|
||||
false
|
||||
false,
|
||||
true,
|
||||
true
|
||||
);
|
||||
|
||||
// Index again, this time only choosing the second data file, and without explicit intervals chosen.
|
||||
|
@ -99,6 +101,8 @@ public class ITBestEffortRollupParallelIndexTest extends AbstractITBatchIndexTes
|
|||
REINDEX_TASK,
|
||||
rollupTransform,
|
||||
REINDEX_QUERIES_RESOURCE,
|
||||
true,
|
||||
true,
|
||||
true
|
||||
);
|
||||
|
||||
|
|
|
@ -49,7 +49,9 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
|
|||
INDEX_DATASOURCE,
|
||||
INDEX_TASK,
|
||||
INDEX_QUERIES_RESOURCE,
|
||||
false
|
||||
false,
|
||||
true,
|
||||
true
|
||||
);
|
||||
doReindexTest(
|
||||
INDEX_DATASOURCE,
|
||||
|
|
|
@ -89,7 +89,9 @@ public class ITPerfectRollupParallelIndexTest extends AbstractITBatchIndexTest
|
|||
INDEX_TASK,
|
||||
rollupTransform,
|
||||
INDEX_QUERIES_RESOURCE,
|
||||
false
|
||||
false,
|
||||
true,
|
||||
true
|
||||
);
|
||||
|
||||
doReindexTest(
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.tests.indexer;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
|
||||
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
|
||||
import org.apache.druid.testing.guice.DruidTestModuleFactory;
|
||||
import org.apache.druid.testing.utils.ITRetryUtil;
|
||||
import org.testng.annotations.Guice;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
||||
public class ITTestCoordinatorPausedTest extends AbstractITBatchIndexTest
|
||||
{
|
||||
private static final Logger LOG = new Logger(ITUnionQueryTest.class);
|
||||
private static final String INDEX_DATASOURCE = "wikipedia_index_test";
|
||||
private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json";
|
||||
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
|
||||
private static final CoordinatorDynamicConfig DYNAMIC_CONFIG_PAUSED =
|
||||
CoordinatorDynamicConfig.builder().withPauseCoordination(true).build();
|
||||
private static final CoordinatorDynamicConfig DYNAMIC_CONFIG_DEFAULT =
|
||||
CoordinatorDynamicConfig.builder().build();
|
||||
|
||||
@Inject
|
||||
CoordinatorResourceTestClient coordinatorClient;
|
||||
|
||||
@Test
|
||||
public void testCoordinatorPause() throws Exception
|
||||
{
|
||||
try (
|
||||
final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix())
|
||||
) {
|
||||
coordinatorClient.postDynamicConfig(DYNAMIC_CONFIG_PAUSED);
|
||||
doIndexTest(
|
||||
INDEX_DATASOURCE,
|
||||
INDEX_TASK,
|
||||
INDEX_QUERIES_RESOURCE,
|
||||
false,
|
||||
false,
|
||||
false
|
||||
);
|
||||
TimeUnit.MINUTES.sleep(3);
|
||||
if (coordinatorClient.areSegmentsLoaded(INDEX_DATASOURCE)) {
|
||||
throw new IllegalStateException("Segments Were Loaded Early!");
|
||||
}
|
||||
coordinatorClient.postDynamicConfig(DYNAMIC_CONFIG_DEFAULT);
|
||||
ITRetryUtil.retryUntilTrue(
|
||||
() -> coordinator.areSegmentsLoaded(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()), "Segment Load"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -84,6 +84,7 @@ public class CoordinatorDynamicConfig
|
|||
* See {@link LoadQueuePeon}, {@link org.apache.druid.server.coordinator.rules.LoadRule#run}
|
||||
*/
|
||||
private final int maxSegmentsInNodeLoadingQueue;
|
||||
private final boolean pauseCoordination;
|
||||
|
||||
@JsonCreator
|
||||
public CoordinatorDynamicConfig(
|
||||
|
@ -113,7 +114,8 @@ public class CoordinatorDynamicConfig
|
|||
@JsonProperty("killPendingSegmentsSkipList") Object dataSourcesToNotKillStalePendingSegmentsIn,
|
||||
@JsonProperty("maxSegmentsInNodeLoadingQueue") int maxSegmentsInNodeLoadingQueue,
|
||||
@JsonProperty("decommissioningNodes") Object decommissioningNodes,
|
||||
@JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") int decommissioningMaxPercentOfMaxSegmentsToMove
|
||||
@JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") int decommissioningMaxPercentOfMaxSegmentsToMove,
|
||||
@JsonProperty("pauseCoordination") boolean pauseCoordination
|
||||
)
|
||||
{
|
||||
this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments =
|
||||
|
@ -142,6 +144,7 @@ public class CoordinatorDynamicConfig
|
|||
"can't have killUnusedSegmentsInAllDataSources and non-empty specificDataSourcesToKillUnusedSegmentsIn"
|
||||
);
|
||||
}
|
||||
this.pauseCoordination = pauseCoordination;
|
||||
}
|
||||
|
||||
private static Set<String> parseJsonStringOrArray(Object jsonStringOrArray)
|
||||
|
@ -284,6 +287,12 @@ public class CoordinatorDynamicConfig
|
|||
return decommissioningMaxPercentOfMaxSegmentsToMove;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean getPauseCoordination()
|
||||
{
|
||||
return pauseCoordination;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
@ -303,6 +312,7 @@ public class CoordinatorDynamicConfig
|
|||
", maxSegmentsInNodeLoadingQueue=" + maxSegmentsInNodeLoadingQueue +
|
||||
", decommissioningNodes=" + decommissioningNodes +
|
||||
", decommissioningMaxPercentOfMaxSegmentsToMove=" + decommissioningMaxPercentOfMaxSegmentsToMove +
|
||||
", pauseCoordination=" + pauseCoordination +
|
||||
'}';
|
||||
}
|
||||
|
||||
|
@ -358,6 +368,9 @@ public class CoordinatorDynamicConfig
|
|||
if (!Objects.equals(decommissioningNodes, that.decommissioningNodes)) {
|
||||
return false;
|
||||
}
|
||||
if (pauseCoordination != that.pauseCoordination) {
|
||||
return false;
|
||||
}
|
||||
return decommissioningMaxPercentOfMaxSegmentsToMove == that.decommissioningMaxPercentOfMaxSegmentsToMove;
|
||||
}
|
||||
|
||||
|
@ -378,7 +391,8 @@ public class CoordinatorDynamicConfig
|
|||
specificDataSourcesToKillUnusedSegmentsIn,
|
||||
dataSourcesToNotKillStalePendingSegmentsIn,
|
||||
decommissioningNodes,
|
||||
decommissioningMaxPercentOfMaxSegmentsToMove
|
||||
decommissioningMaxPercentOfMaxSegmentsToMove,
|
||||
pauseCoordination
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -401,6 +415,7 @@ public class CoordinatorDynamicConfig
|
|||
private static final boolean DEFAULT_KILL_UNUSED_SEGMENTS_IN_ALL_DATA_SOURCES = false;
|
||||
private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 0;
|
||||
private static final int DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70;
|
||||
private static final boolean DEFAULT_PAUSE_COORDINATION = false;
|
||||
|
||||
private Long leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments;
|
||||
private Long mergeBytesLimit;
|
||||
|
@ -416,6 +431,7 @@ public class CoordinatorDynamicConfig
|
|||
private Integer maxSegmentsInNodeLoadingQueue;
|
||||
private Object decommissioningNodes;
|
||||
private Integer decommissioningMaxPercentOfMaxSegmentsToMove;
|
||||
private Boolean pauseCoordination;
|
||||
|
||||
public Builder()
|
||||
{
|
||||
|
@ -438,7 +454,8 @@ public class CoordinatorDynamicConfig
|
|||
@JsonProperty("maxSegmentsInNodeLoadingQueue") @Nullable Integer maxSegmentsInNodeLoadingQueue,
|
||||
@JsonProperty("decommissioningNodes") @Nullable Object decommissioningNodes,
|
||||
@JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove")
|
||||
@Nullable Integer decommissioningMaxPercentOfMaxSegmentsToMove
|
||||
@Nullable Integer decommissioningMaxPercentOfMaxSegmentsToMove,
|
||||
@JsonProperty("pauseCoordination") @Nullable Boolean pauseCoordination
|
||||
)
|
||||
{
|
||||
this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments =
|
||||
|
@ -456,6 +473,7 @@ public class CoordinatorDynamicConfig
|
|||
this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
|
||||
this.decommissioningNodes = decommissioningNodes;
|
||||
this.decommissioningMaxPercentOfMaxSegmentsToMove = decommissioningMaxPercentOfMaxSegmentsToMove;
|
||||
this.pauseCoordination = pauseCoordination;
|
||||
}
|
||||
|
||||
public Builder withLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments(long leadingTimeMillis)
|
||||
|
@ -536,6 +554,12 @@ public class CoordinatorDynamicConfig
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder withPauseCoordination(boolean pauseCoordination)
|
||||
{
|
||||
this.pauseCoordination = pauseCoordination;
|
||||
return this;
|
||||
}
|
||||
|
||||
public CoordinatorDynamicConfig build()
|
||||
{
|
||||
return new CoordinatorDynamicConfig(
|
||||
|
@ -560,7 +584,8 @@ public class CoordinatorDynamicConfig
|
|||
decommissioningNodes,
|
||||
decommissioningMaxPercentOfMaxSegmentsToMove == null
|
||||
? DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT
|
||||
: decommissioningMaxPercentOfMaxSegmentsToMove
|
||||
: decommissioningMaxPercentOfMaxSegmentsToMove,
|
||||
pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION : pauseCoordination
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -592,7 +617,8 @@ public class CoordinatorDynamicConfig
|
|||
decommissioningNodes == null ? defaults.getDecommissioningNodes() : decommissioningNodes,
|
||||
decommissioningMaxPercentOfMaxSegmentsToMove == null
|
||||
? defaults.getDecommissioningMaxPercentOfMaxSegmentsToMove()
|
||||
: decommissioningMaxPercentOfMaxSegmentsToMove
|
||||
: decommissioningMaxPercentOfMaxSegmentsToMove,
|
||||
pauseCoordination == null ? defaults.getPauseCoordination() : pauseCoordination
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -678,9 +678,23 @@ public class DruidCoordinator
|
|||
.withEmitter(emitter)
|
||||
.withBalancerStrategy(balancerStrategy)
|
||||
.build();
|
||||
|
||||
boolean coordinationPaused = getDynamicConfigs().getPauseCoordination();
|
||||
if (coordinationPaused
|
||||
&& coordLeaderSelector.isLeader()
|
||||
&& startingLeaderCounter == coordLeaderSelector.localTerm()) {
|
||||
|
||||
log.debug(
|
||||
"Coordination is paused via dynamic configs! I will not be running Coordination Duties at this time"
|
||||
);
|
||||
}
|
||||
|
||||
for (CoordinatorDuty duty : duties) {
|
||||
// Don't read state and run state in the same duty otherwise racy conditions may exist
|
||||
if (coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) {
|
||||
if (!coordinationPaused
|
||||
&& coordLeaderSelector.isLeader()
|
||||
&& startingLeaderCounter == coordLeaderSelector.localTerm()) {
|
||||
|
||||
params = duty.run(params);
|
||||
|
||||
if (params == null) {
|
||||
|
|
|
@ -1098,6 +1098,7 @@ public class RunRulesTest
|
|||
.withSpecificDataSourcesToKillUnusedSegmentsIn(null)
|
||||
.withKillUnusedSegmentsInAllDataSources(false)
|
||||
.withMaxSegmentsInNodeLoadingQueue(1000)
|
||||
.withPauseCoordination(false)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,7 +51,8 @@ public class CoordinatorDynamicConfigTest
|
|||
+ " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n"
|
||||
+ " \"maxSegmentsInNodeLoadingQueue\": 1,\n"
|
||||
+ " \"decommissioningNodes\": [\"host1\", \"host2\"],\n"
|
||||
+ " \"decommissioningMaxPercentOfMaxSegmentsToMove\": 9\n"
|
||||
+ " \"decommissioningMaxPercentOfMaxSegmentsToMove\": 9,\n"
|
||||
+ " \"pauseCoordination\": false\n"
|
||||
+ "}\n";
|
||||
|
||||
CoordinatorDynamicConfig actual = mapper.readValue(
|
||||
|
@ -65,13 +66,16 @@ public class CoordinatorDynamicConfigTest
|
|||
);
|
||||
ImmutableSet<String> decommissioning = ImmutableSet.of("host1", "host2");
|
||||
ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
|
||||
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9);
|
||||
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false);
|
||||
|
||||
actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual);
|
||||
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9);
|
||||
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9, false);
|
||||
|
||||
actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual);
|
||||
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5);
|
||||
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false);
|
||||
|
||||
actual = CoordinatorDynamicConfig.builder().withPauseCoordination(true).build(actual);
|
||||
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -101,13 +105,13 @@ public class CoordinatorDynamicConfigTest
|
|||
);
|
||||
ImmutableSet<String> decommissioning = ImmutableSet.of();
|
||||
ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
|
||||
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0);
|
||||
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false);
|
||||
|
||||
actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual);
|
||||
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0);
|
||||
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false);
|
||||
|
||||
actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual);
|
||||
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5);
|
||||
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -135,7 +139,7 @@ public class CoordinatorDynamicConfigTest
|
|||
),
|
||||
CoordinatorDynamicConfig.class
|
||||
);
|
||||
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of("test1", "test2"), false, 1, ImmutableSet.of(), 0);
|
||||
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of("test1", "test2"), false, 1, ImmutableSet.of(), 0, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -164,7 +168,7 @@ public class CoordinatorDynamicConfigTest
|
|||
CoordinatorDynamicConfig.class
|
||||
);
|
||||
|
||||
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0);
|
||||
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0, false);
|
||||
|
||||
//ensure whitelist is empty when killAllDataSources is true
|
||||
try {
|
||||
|
@ -209,7 +213,7 @@ public class CoordinatorDynamicConfigTest
|
|||
CoordinatorDynamicConfig.class
|
||||
);
|
||||
|
||||
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0);
|
||||
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -217,7 +221,7 @@ public class CoordinatorDynamicConfigTest
|
|||
{
|
||||
CoordinatorDynamicConfig defaultConfig = CoordinatorDynamicConfig.builder().build();
|
||||
ImmutableSet<String> emptyList = ImmutableSet.of();
|
||||
assertConfig(defaultConfig, 900000, 524288000, 100, 5, 15, 10, 1, false, emptyList, false, 0, emptyList, 70);
|
||||
assertConfig(defaultConfig, 900000, 524288000, 100, 5, 15, 10, 1, false, emptyList, false, 0, emptyList, 70, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -231,7 +235,7 @@ public class CoordinatorDynamicConfigTest
|
|||
Assert.assertEquals(
|
||||
current,
|
||||
new CoordinatorDynamicConfig
|
||||
.Builder(null, null, null, null, null, null, null, null, null, null, null, null, null, null)
|
||||
.Builder(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null)
|
||||
.build(current)
|
||||
);
|
||||
}
|
||||
|
@ -259,7 +263,8 @@ public class CoordinatorDynamicConfigTest
|
|||
boolean expectedKillUnusedSegmentsInAllDataSources,
|
||||
int expectedMaxSegmentsInNodeLoadingQueue,
|
||||
Set<String> decommissioningNodes,
|
||||
int decommissioningMaxPercentOfMaxSegmentsToMove
|
||||
int decommissioningMaxPercentOfMaxSegmentsToMove,
|
||||
boolean pauseCoordination
|
||||
)
|
||||
{
|
||||
Assert.assertEquals(
|
||||
|
@ -284,5 +289,6 @@ public class CoordinatorDynamicConfigTest
|
|||
decommissioningMaxPercentOfMaxSegmentsToMove,
|
||||
config.getDecommissioningMaxPercentOfMaxSegmentsToMove()
|
||||
);
|
||||
Assert.assertEquals(pauseCoordination, config.getPauseCoordination());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue