mirror of
https://github.com/apache/druid.git
synced 2025-02-17 07:25:02 +00:00
Coordinator Dynamic Config changes to ease upgrading with new config value (#10724)
* Coordinator Dynamic Config changes to ease upgrading with new config value * change a log to debug level following review * changes based on review feedback * fix checkstyle
This commit is contained in:
parent
118b50195e
commit
fe0511b16a
@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.druid.common.config.JacksonConfigManager;
|
||||
import org.apache.druid.java.util.common.IAE;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
@ -87,6 +88,8 @@ public class CoordinatorDynamicConfig
|
||||
private final int maxSegmentsInNodeLoadingQueue;
|
||||
private final boolean pauseCoordination;
|
||||
|
||||
private static final Logger log = new Logger(CoordinatorDynamicConfig.class);
|
||||
|
||||
@JsonCreator
|
||||
public CoordinatorDynamicConfig(
|
||||
// Keeping the legacy 'millisToWaitBeforeDeleting' property name for backward compatibility. When the project is
|
||||
@ -96,7 +99,7 @@ public class CoordinatorDynamicConfig
|
||||
@JsonProperty("mergeBytesLimit") long mergeBytesLimit,
|
||||
@JsonProperty("mergeSegmentsLimit") int mergeSegmentsLimit,
|
||||
@JsonProperty("maxSegmentsToMove") int maxSegmentsToMove,
|
||||
@JsonProperty("percentOfSegmentsToConsiderPerMove") double percentOfSegmentsToConsiderPerMove,
|
||||
@JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Double percentOfSegmentsToConsiderPerMove,
|
||||
@JsonProperty("replicantLifetime") int replicantLifetime,
|
||||
@JsonProperty("replicationThrottleLimit") int replicationThrottleLimit,
|
||||
@JsonProperty("balancerComputeThreads") int balancerComputeThreads,
|
||||
@ -126,6 +129,15 @@ public class CoordinatorDynamicConfig
|
||||
this.mergeSegmentsLimit = mergeSegmentsLimit;
|
||||
this.maxSegmentsToMove = maxSegmentsToMove;
|
||||
|
||||
if (percentOfSegmentsToConsiderPerMove == null) {
|
||||
log.debug("percentOfSegmentsToConsiderPerMove was null! This is likely because your metastore does not "
|
||||
+ "reflect this configuration being added to Druid in a recent release. Druid is defaulting the value "
|
||||
+ "to the Druid default of %f. It is recommended that you re-submit your dynamic config with your "
|
||||
+ "desired value for percentOfSegmentsToConsideredPerMove",
|
||||
Builder.DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE
|
||||
);
|
||||
percentOfSegmentsToConsiderPerMove = Builder.DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE;
|
||||
}
|
||||
Preconditions.checkArgument(
|
||||
percentOfSegmentsToConsiderPerMove > 0 && percentOfSegmentsToConsiderPerMove <= 100,
|
||||
"percentOfSegmentsToConsiderPerMove should be between 1 and 100!"
|
||||
@ -428,7 +440,7 @@ public class CoordinatorDynamicConfig
|
||||
private static final long DEFAULT_MERGE_BYTES_LIMIT = 524_288_000L;
|
||||
private static final int DEFAULT_MERGE_SEGMENTS_LIMIT = 100;
|
||||
private static final int DEFAULT_MAX_SEGMENTS_TO_MOVE = 5;
|
||||
private static final int DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE = 100;
|
||||
private static final double DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE = 100;
|
||||
private static final int DEFAULT_REPLICANT_LIFETIME = 15;
|
||||
private static final int DEFAULT_REPLICATION_THROTTLE_LIMIT = 10;
|
||||
private static final int DEFAULT_BALANCER_COMPUTE_THREADS = 1;
|
||||
|
@ -90,7 +90,6 @@ public class CoordinatorDynamicConfigTest
|
||||
+ " \"mergeBytesLimit\": 1,\n"
|
||||
+ " \"mergeSegmentsLimit\" : 1,\n"
|
||||
+ " \"maxSegmentsToMove\": 1,\n"
|
||||
+ " \"percentOfSegmentsToConsiderPerMove\": 1,\n"
|
||||
+ " \"replicantLifetime\": 1,\n"
|
||||
+ " \"replicationThrottleLimit\": 1,\n"
|
||||
+ " \"balancerComputeThreads\": 2, \n"
|
||||
@ -110,13 +109,13 @@ public class CoordinatorDynamicConfigTest
|
||||
);
|
||||
ImmutableSet<String> decommissioning = ImmutableSet.of();
|
||||
ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
|
||||
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false);
|
||||
assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false);
|
||||
|
||||
actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual);
|
||||
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false);
|
||||
assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false);
|
||||
|
||||
actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual);
|
||||
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false);
|
||||
assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -166,7 +165,7 @@ public class CoordinatorDynamicConfigTest
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerdeCorrectsInvalidBadMaxPercentOfSegmentsToConsiderPerMove() throws Exception
|
||||
public void testSerdeHandleInvalidPercentOfSegmentsToConsiderPerMove() throws Exception
|
||||
{
|
||||
try {
|
||||
String jsonStr = "{\n"
|
||||
@ -232,6 +231,38 @@ public class CoordinatorDynamicConfigTest
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHandleMissingPercentOfSegmentsToConsiderPerMove() throws Exception
|
||||
{
|
||||
String jsonStr = "{\n"
|
||||
+ " \"millisToWaitBeforeDeleting\": 1,\n"
|
||||
+ " \"mergeBytesLimit\": 1,\n"
|
||||
+ " \"mergeSegmentsLimit\" : 1,\n"
|
||||
+ " \"maxSegmentsToMove\": 1,\n"
|
||||
+ " \"replicantLifetime\": 1,\n"
|
||||
+ " \"replicationThrottleLimit\": 1,\n"
|
||||
+ " \"balancerComputeThreads\": 2, \n"
|
||||
+ " \"emitBalancingStats\": true,\n"
|
||||
+ " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n"
|
||||
+ " \"maxSegmentsInNodeLoadingQueue\": 1,\n"
|
||||
+ " \"decommissioningNodes\": [\"host1\", \"host2\"],\n"
|
||||
+ " \"decommissioningMaxPercentOfMaxSegmentsToMove\": 9,\n"
|
||||
+ " \"pauseCoordination\": false\n"
|
||||
+ "}\n";
|
||||
CoordinatorDynamicConfig actual = mapper.readValue(
|
||||
mapper.writeValueAsString(
|
||||
mapper.readValue(
|
||||
jsonStr,
|
||||
CoordinatorDynamicConfig.class
|
||||
)
|
||||
),
|
||||
CoordinatorDynamicConfig.class
|
||||
);
|
||||
ImmutableSet<String> decommissioning = ImmutableSet.of("host1", "host2");
|
||||
ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
|
||||
assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerdeWithKillAllDataSources() throws Exception
|
||||
{
|
||||
|
Loading…
x
Reference in New Issue
Block a user