Create dynamic config that can limit number of non-primary replicants loaded per coordination cycle (#11135)

* lay the groundwork for throttling replicant loads per RunRules execution

* Add dynamic coordinator config to control new replicant threshold.

* remove redundant line

* add some unit tests

* fix checkstyle error

* add documentation for new dynamic config

* improve docs and logs

* Alter how null is handled for new config. If null, manually set as default
This commit is contained in:
Lucas Capistrant 2021-05-05 07:39:36 -05:00 committed by GitHub
parent 6f0b4d90d8
commit bb3c810b36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 299 additions and 32 deletions

View File

@ -809,7 +809,8 @@ A sample Coordinator dynamic config JSON object is shown below:
"decommissioningNodes": ["localhost:8182", "localhost:8282"], "decommissioningNodes": ["localhost:8182", "localhost:8282"],
"decommissioningMaxPercentOfMaxSegmentsToMove": 70, "decommissioningMaxPercentOfMaxSegmentsToMove": 70,
"pauseCoordination": false, "pauseCoordination": false,
"replicateAfterLoadTimeout": false "replicateAfterLoadTimeout": false,
"maxNonPrimaryReplicantsToLoad": 2147483647
} }
``` ```
@ -834,6 +835,7 @@ Issuing a GET request at the same URL will return the spec that is currently in
|`decommissioningMaxPercentOfMaxSegmentsToMove`| The maximum number of segments that may be moved away from 'decommissioning' servers to non-decommissioning (that is, active) servers during one Coordinator run. This value is relative to the total maximum segment movements allowed during one run which is determined by `maxSegmentsToMove`. If `decommissioningMaxPercentOfMaxSegmentsToMove` is 0, segments will neither be moved from _or to_ 'decommissioning' servers, effectively putting them in a sort of "maintenance" mode that will not participate in balancing or assignment by load rules. Decommissioning can also become stalled if there are no available active servers to place the segments. By leveraging the maximum percent of decommissioning segment movements, an operator can prevent active servers from overload by prioritizing balancing, or decrease decommissioning time instead. The value should be between 0 and 100.|70| |`decommissioningMaxPercentOfMaxSegmentsToMove`| The maximum number of segments that may be moved away from 'decommissioning' servers to non-decommissioning (that is, active) servers during one Coordinator run. This value is relative to the total maximum segment movements allowed during one run which is determined by `maxSegmentsToMove`. If `decommissioningMaxPercentOfMaxSegmentsToMove` is 0, segments will neither be moved from _or to_ 'decommissioning' servers, effectively putting them in a sort of "maintenance" mode that will not participate in balancing or assignment by load rules. Decommissioning can also become stalled if there are no available active servers to place the segments. By leveraging the maximum percent of decommissioning segment movements, an operator can prevent active servers from overload by prioritizing balancing, or decrease decommissioning time instead. The value should be between 0 and 100.|70|
|`pauseCoordination`| Boolean flag for whether or not the coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` Interface. Such duties include: Segment balancing, Segment compaction, Emission of metrics controlled by the dynamic coordinator config `emitBalancingStats`, Submitting kill tasks for unused segments (if enabled), Logging of used segments in the cluster, Marking of newly unused or overshadowed segments, Matching and execution of load/drop rules for used segments, Unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS Name Nodes with downtime and don't want the coordinator to be directing Historical Nodes to hit the Name Node with API requests until maintenance is done and the deep store is declared healthy for use again. |false| |`pauseCoordination`| Boolean flag for whether or not the coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` Interface. Such duties include: Segment balancing, Segment compaction, Emission of metrics controlled by the dynamic coordinator config `emitBalancingStats`, Submitting kill tasks for unused segments (if enabled), Logging of used segments in the cluster, Marking of newly unused or overshadowed segments, Matching and execution of load/drop rules for used segments, Unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS Name Nodes with downtime and don't want the coordinator to be directing Historical Nodes to hit the Name Node with API requests until maintenance is done and the deep store is declared healthy for use again. |false|
|`replicateAfterLoadTimeout`| Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow historicals in the cluster. However, the slow historical may still load the segment later and the coordinator may issue drop requests if the segment is over-replicated.|false| |`replicateAfterLoadTimeout`| Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow historicals in the cluster. However, the slow historical may still load the segment later and the coordinator may issue drop requests if the segment is over-replicated.|false|
|`maxNonPrimaryReplicantsToLoad`|This is the maximum number of non-primary segment replicants to load per Coordination run. This number can be set to put a hard upper limit on the number of replicants loaded. It is a tool that can help prevent long delays in new data being available for query after events that require many non-primary replicants to be loaded by the cluster; such as a Historical node disconnecting from the cluster. The default value essentially means there is no limit on the number of replicants loaded per coordination cycle. If you want to use a non-default value for this config, you may want to start with it being `~20%` of the number of segments found on your Historical server with the most segments. You can use the Druid metric, `coordinator/time` with the filter `duty=org.apache.druid.server.coordinator.duty.RunRules` to see how different values of this config impact your Coordinator execution time.|`Integer.MAX_VALUE`|
To view the audit history of Coordinator dynamic config issue a GET request to the URL - To view the audit history of Coordinator dynamic config issue a GET request to the URL -

View File

@ -96,6 +96,13 @@ public class CoordinatorDynamicConfig
*/ */
private final boolean replicateAfterLoadTimeout; private final boolean replicateAfterLoadTimeout;
/**
* This is the maximum number of non-primary segment replicants to load per Coordination run. This number can
* be set to put a hard upper limit on the number of replicants loaded. It is a tool that can help prevent
* long delays in new data loads after events such as a Historical server leaving the cluster.
*/
private final int maxNonPrimaryReplicantsToLoad;
private static final Logger log = new Logger(CoordinatorDynamicConfig.class); private static final Logger log = new Logger(CoordinatorDynamicConfig.class);
@JsonCreator @JsonCreator
@ -129,7 +136,8 @@ public class CoordinatorDynamicConfig
@JsonProperty("decommissioningNodes") Object decommissioningNodes, @JsonProperty("decommissioningNodes") Object decommissioningNodes,
@JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") int decommissioningMaxPercentOfMaxSegmentsToMove, @JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") int decommissioningMaxPercentOfMaxSegmentsToMove,
@JsonProperty("pauseCoordination") boolean pauseCoordination, @JsonProperty("pauseCoordination") boolean pauseCoordination,
@JsonProperty("replicateAfterLoadTimeout") boolean replicateAfterLoadTimeout @JsonProperty("replicateAfterLoadTimeout") boolean replicateAfterLoadTimeout,
@JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer maxNonPrimaryReplicantsToLoad
) )
{ {
this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments = this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments =
@ -176,6 +184,22 @@ public class CoordinatorDynamicConfig
} }
this.pauseCoordination = pauseCoordination; this.pauseCoordination = pauseCoordination;
this.replicateAfterLoadTimeout = replicateAfterLoadTimeout; this.replicateAfterLoadTimeout = replicateAfterLoadTimeout;
if (maxNonPrimaryReplicantsToLoad == null) {
log.debug(
"maxNonPrimaryReplicantsToLoad was null! This is likely because your metastore does not "
+ "reflect this configuration being added to Druid in a recent release. Druid is defaulting the value "
+ "to the Druid default of %d. It is recommended that you re-submit your dynamic config with your "
+ "desired value for maxNonPrimaryReplicantsToLoad",
Builder.DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD
);
maxNonPrimaryReplicantsToLoad = Builder.DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD;
}
Preconditions.checkArgument(
maxNonPrimaryReplicantsToLoad >= 0,
"maxNonPrimaryReplicantsToLoad must be greater than or equal to 0."
);
this.maxNonPrimaryReplicantsToLoad = maxNonPrimaryReplicantsToLoad;
} }
private static Set<String> parseJsonStringOrArray(Object jsonStringOrArray) private static Set<String> parseJsonStringOrArray(Object jsonStringOrArray)
@ -336,6 +360,13 @@ public class CoordinatorDynamicConfig
return replicateAfterLoadTimeout; return replicateAfterLoadTimeout;
} }
@Min(0)
@JsonProperty
public int getMaxNonPrimaryReplicantsToLoad()
{
return maxNonPrimaryReplicantsToLoad;
}
@Override @Override
public String toString() public String toString()
{ {
@ -358,6 +389,7 @@ public class CoordinatorDynamicConfig
", decommissioningMaxPercentOfMaxSegmentsToMove=" + decommissioningMaxPercentOfMaxSegmentsToMove + ", decommissioningMaxPercentOfMaxSegmentsToMove=" + decommissioningMaxPercentOfMaxSegmentsToMove +
", pauseCoordination=" + pauseCoordination + ", pauseCoordination=" + pauseCoordination +
", replicateAfterLoadTimeout=" + replicateAfterLoadTimeout + ", replicateAfterLoadTimeout=" + replicateAfterLoadTimeout +
", maxNonPrimaryReplicantsToLoad=" + maxNonPrimaryReplicantsToLoad +
'}'; '}';
} }
@ -422,6 +454,9 @@ public class CoordinatorDynamicConfig
if (replicateAfterLoadTimeout != that.replicateAfterLoadTimeout) { if (replicateAfterLoadTimeout != that.replicateAfterLoadTimeout) {
return false; return false;
} }
if (maxNonPrimaryReplicantsToLoad != that.maxNonPrimaryReplicantsToLoad) {
return false;
}
return decommissioningMaxPercentOfMaxSegmentsToMove == that.decommissioningMaxPercentOfMaxSegmentsToMove; return decommissioningMaxPercentOfMaxSegmentsToMove == that.decommissioningMaxPercentOfMaxSegmentsToMove;
} }
@ -444,7 +479,8 @@ public class CoordinatorDynamicConfig
dataSourcesToNotKillStalePendingSegmentsIn, dataSourcesToNotKillStalePendingSegmentsIn,
decommissioningNodes, decommissioningNodes,
decommissioningMaxPercentOfMaxSegmentsToMove, decommissioningMaxPercentOfMaxSegmentsToMove,
pauseCoordination pauseCoordination,
maxNonPrimaryReplicantsToLoad
); );
} }
@ -470,6 +506,7 @@ public class CoordinatorDynamicConfig
private static final int DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70; private static final int DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70;
private static final boolean DEFAULT_PAUSE_COORDINATION = false; private static final boolean DEFAULT_PAUSE_COORDINATION = false;
private static final boolean DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT = false; private static final boolean DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT = false;
private static final int DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD = Integer.MAX_VALUE;
private Long leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments; private Long leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments;
private Long mergeBytesLimit; private Long mergeBytesLimit;
@ -488,6 +525,7 @@ public class CoordinatorDynamicConfig
private Integer decommissioningMaxPercentOfMaxSegmentsToMove; private Integer decommissioningMaxPercentOfMaxSegmentsToMove;
private Boolean pauseCoordination; private Boolean pauseCoordination;
private Boolean replicateAfterLoadTimeout; private Boolean replicateAfterLoadTimeout;
private Integer maxNonPrimaryReplicantsToLoad;
public Builder() public Builder()
{ {
@ -513,7 +551,8 @@ public class CoordinatorDynamicConfig
@JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") @JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove")
@Nullable Integer decommissioningMaxPercentOfMaxSegmentsToMove, @Nullable Integer decommissioningMaxPercentOfMaxSegmentsToMove,
@JsonProperty("pauseCoordination") @Nullable Boolean pauseCoordination, @JsonProperty("pauseCoordination") @Nullable Boolean pauseCoordination,
@JsonProperty("replicateAfterLoadTimeout") @Nullable Boolean replicateAfterLoadTimeout @JsonProperty("replicateAfterLoadTimeout") @Nullable Boolean replicateAfterLoadTimeout,
@JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer maxNonPrimaryReplicantsToLoad
) )
{ {
this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments = this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments =
@ -534,6 +573,7 @@ public class CoordinatorDynamicConfig
this.decommissioningMaxPercentOfMaxSegmentsToMove = decommissioningMaxPercentOfMaxSegmentsToMove; this.decommissioningMaxPercentOfMaxSegmentsToMove = decommissioningMaxPercentOfMaxSegmentsToMove;
this.pauseCoordination = pauseCoordination; this.pauseCoordination = pauseCoordination;
this.replicateAfterLoadTimeout = replicateAfterLoadTimeout; this.replicateAfterLoadTimeout = replicateAfterLoadTimeout;
this.maxNonPrimaryReplicantsToLoad = maxNonPrimaryReplicantsToLoad;
} }
public Builder withLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments(long leadingTimeMillis) public Builder withLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments(long leadingTimeMillis)
@ -632,6 +672,12 @@ public class CoordinatorDynamicConfig
return this; return this;
} }
public Builder withMaxNonPrimaryReplicantsToLoad(int maxNonPrimaryReplicantsToLoad)
{
this.maxNonPrimaryReplicantsToLoad = maxNonPrimaryReplicantsToLoad;
return this;
}
public CoordinatorDynamicConfig build() public CoordinatorDynamicConfig build()
{ {
return new CoordinatorDynamicConfig( return new CoordinatorDynamicConfig(
@ -660,7 +706,8 @@ public class CoordinatorDynamicConfig
? DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT ? DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT
: decommissioningMaxPercentOfMaxSegmentsToMove, : decommissioningMaxPercentOfMaxSegmentsToMove,
pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION : pauseCoordination, pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION : pauseCoordination,
replicateAfterLoadTimeout == null ? DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout replicateAfterLoadTimeout == null ? DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout,
maxNonPrimaryReplicantsToLoad == null ? DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD : maxNonPrimaryReplicantsToLoad
); );
} }
@ -695,7 +742,8 @@ public class CoordinatorDynamicConfig
? defaults.getDecommissioningMaxPercentOfMaxSegmentsToMove() ? defaults.getDecommissioningMaxPercentOfMaxSegmentsToMove()
: decommissioningMaxPercentOfMaxSegmentsToMove, : decommissioningMaxPercentOfMaxSegmentsToMove,
pauseCoordination == null ? defaults.getPauseCoordination() : pauseCoordination, pauseCoordination == null ? defaults.getPauseCoordination() : pauseCoordination,
replicateAfterLoadTimeout == null ? defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout replicateAfterLoadTimeout == null ? defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout,
maxNonPrimaryReplicantsToLoad == null ? defaults.getMaxNonPrimaryReplicantsToLoad() : maxNonPrimaryReplicantsToLoad
); );
} }
} }

View File

@ -41,16 +41,18 @@ public class ReplicationThrottler
private volatile int maxReplicants; private volatile int maxReplicants;
private volatile int maxLifetime; private volatile int maxLifetime;
private volatile boolean loadPrimaryReplicantsOnly;
public ReplicationThrottler(int maxReplicants, int maxLifetime) public ReplicationThrottler(int maxReplicants, int maxLifetime, boolean loadPrimaryReplicantsOnly)
{ {
updateParams(maxReplicants, maxLifetime); updateParams(maxReplicants, maxLifetime, loadPrimaryReplicantsOnly);
} }
public void updateParams(int maxReplicants, int maxLifetime) public void updateParams(int maxReplicants, int maxLifetime, boolean loadPrimaryReplicantsOnly)
{ {
this.maxReplicants = maxReplicants; this.maxReplicants = maxReplicants;
this.maxLifetime = maxLifetime; this.maxLifetime = maxLifetime;
this.loadPrimaryReplicantsOnly = loadPrimaryReplicantsOnly;
} }
public void updateReplicationState(String tier) public void updateReplicationState(String tier)
@ -58,6 +60,16 @@ public class ReplicationThrottler
update(tier, currentlyReplicating, replicatingLookup, "create"); update(tier, currentlyReplicating, replicatingLookup, "create");
} }
public boolean isLoadPrimaryReplicantsOnly()
{
return loadPrimaryReplicantsOnly;
}
public void setLoadPrimaryReplicantsOnly(boolean loadPrimaryReplicantsOnly)
{
this.loadPrimaryReplicantsOnly = loadPrimaryReplicantsOnly;
}
private void update(String tier, ReplicatorSegmentHolder holder, Map<String, Boolean> lookup, String type) private void update(String tier, ReplicatorSegmentHolder holder, Map<String, Boolean> lookup, String type)
{ {
int size = holder.getNumProcessing(tier); int size = holder.getNumProcessing(tier);
@ -87,7 +99,7 @@ public class ReplicationThrottler
public boolean canCreateReplicant(String tier) public boolean canCreateReplicant(String tier)
{ {
return replicatingLookup.get(tier) && !currentlyReplicating.isAtMaxReplicants(tier); return !loadPrimaryReplicantsOnly && replicatingLookup.get(tier) && !currentlyReplicating.isAtMaxReplicants(tier);
} }
public void registerReplicantCreation(String tier, SegmentId segmentId, String serverId) public void registerReplicantCreation(String tier, SegmentId segmentId, String serverId)

View File

@ -55,7 +55,8 @@ public class RunRules implements CoordinatorDuty
this( this(
new ReplicationThrottler( new ReplicationThrottler(
coordinator.getDynamicConfigs().getReplicationThrottleLimit(), coordinator.getDynamicConfigs().getReplicationThrottleLimit(),
coordinator.getDynamicConfigs().getReplicantLifetime() coordinator.getDynamicConfigs().getReplicantLifetime(),
false
), ),
coordinator coordinator
); );
@ -72,7 +73,8 @@ public class RunRules implements CoordinatorDuty
{ {
replicatorThrottler.updateParams( replicatorThrottler.updateParams(
coordinator.getDynamicConfigs().getReplicationThrottleLimit(), coordinator.getDynamicConfigs().getReplicationThrottleLimit(),
coordinator.getDynamicConfigs().getReplicantLifetime() coordinator.getDynamicConfigs().getReplicantLifetime(),
false
); );
CoordinatorStats stats = new CoordinatorStats(); CoordinatorStats stats = new CoordinatorStats();
@ -128,6 +130,18 @@ public class RunRules implements CoordinatorDuty
boolean foundMatchingRule = false; boolean foundMatchingRule = false;
for (Rule rule : rules) { for (Rule rule : rules) {
if (rule.appliesTo(segment, now)) { if (rule.appliesTo(segment, now)) {
if (
stats.getGlobalStat(
"totalNonPrimaryReplicantsLoaded") >= paramsWithReplicationManager.getCoordinatorDynamicConfig()
.getMaxNonPrimaryReplicantsToLoad()
&& !paramsWithReplicationManager.getReplicationManager().isLoadPrimaryReplicantsOnly()
) {
log.info(
"Maximum number of non-primary replicants [%d] have been loaded for the current RunRules execution. Only loading primary replicants from here on for this coordinator run cycle.",
paramsWithReplicationManager.getCoordinatorDynamicConfig().getMaxNonPrimaryReplicantsToLoad()
);
paramsWithReplicationManager.getReplicationManager().setLoadPrimaryReplicantsOnly(true);
}
stats.accumulate(rule.run(coordinator, paramsWithReplicationManager, segment)); stats.accumulate(rule.run(coordinator, paramsWithReplicationManager, segment));
foundMatchingRule = true; foundMatchingRule = true;
break; break;

View File

@ -57,6 +57,7 @@ public abstract class LoadRule implements Rule
private static final EmittingLogger log = new EmittingLogger(LoadRule.class); private static final EmittingLogger log = new EmittingLogger(LoadRule.class);
static final String ASSIGNED_COUNT = "assignedCount"; static final String ASSIGNED_COUNT = "assignedCount";
static final String DROPPED_COUNT = "droppedCount"; static final String DROPPED_COUNT = "droppedCount";
public final String NON_PRIMARY_ASSIGNED_COUNT = "totalNonPrimaryReplicantsLoaded";
public static final String REQUIRED_CAPACITY = "requiredCapacity"; public static final String REQUIRED_CAPACITY = "requiredCapacity";
private final Object2IntMap<String> targetReplicants = new Object2IntOpenHashMap<>(); private final Object2IntMap<String> targetReplicants = new Object2IntOpenHashMap<>();
@ -180,6 +181,10 @@ public abstract class LoadRule implements Rule
createLoadQueueSizeLimitingPredicate(params).and(holder -> !holder.equals(primaryHolderToLoad)), createLoadQueueSizeLimitingPredicate(params).and(holder -> !holder.equals(primaryHolderToLoad)),
segment segment
); );
// numAssigned - 1 because we don't want to count the primary assignment
stats.addToGlobalStat(NON_PRIMARY_ASSIGNED_COUNT, numAssigned - 1);
stats.addToTieredStat(ASSIGNED_COUNT, tier, numAssigned); stats.addToTieredStat(ASSIGNED_COUNT, tier, numAssigned);
// do assign replicas for the other tiers. // do assign replicas for the other tiers.
@ -305,6 +310,7 @@ public abstract class LoadRule implements Rule
createLoadQueueSizeLimitingPredicate(params), createLoadQueueSizeLimitingPredicate(params),
segment segment
); );
stats.addToGlobalStat(NON_PRIMARY_ASSIGNED_COUNT, numAssigned);
stats.addToTieredStat(ASSIGNED_COUNT, tier, numAssigned); stats.addToTieredStat(ASSIGNED_COUNT, tier, numAssigned);
} }
} }

View File

@ -148,7 +148,7 @@ public class BalanceSegmentsProfiler
) )
.withEmitter(emitter) .withEmitter(emitter)
.withDatabaseRuleManager(manager) .withDatabaseRuleManager(manager)
.withReplicationManager(new ReplicationThrottler(2, 500)) .withReplicationManager(new ReplicationThrottler(2, 500, false))
.build(); .build();
BalanceSegmentsTester tester = new BalanceSegmentsTester(coordinator); BalanceSegmentsTester tester = new BalanceSegmentsTester(coordinator);

View File

@ -100,7 +100,7 @@ public class RunRulesTest
start = start.plusHours(1); start = start.plusHours(1);
} }
ruleRunner = new RunRules(new ReplicationThrottler(24, 1), coordinator); ruleRunner = new RunRules(new ReplicationThrottler(24, 1, false), coordinator);
} }
@After @After
@ -110,6 +110,134 @@ public class RunRulesTest
EasyMock.verify(databaseRuleManager); EasyMock.verify(databaseRuleManager);
} }
/**
* Nodes:
* normal - 2 replicants
* maxNonPrimaryReplicantsToLoad - 10
* Expect only 34 segments to be loaded despite there being 48 primary + non-primary replicants to load!
*/
@Test
public void testOneTierTwoReplicantsWithStrictReplicantLimit()
{
mockCoordinator();
mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockEmptyPeon();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
Collections.singletonList(
new IntervalLoadRule(
Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"),
ImmutableMap.of("normal", 2)
)
)).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidCluster druidCluster = DruidClusterBuilder
.newBuilder()
.addTier(
"normal",
new ServerHolder(
new DruidServer("serverNorm", "hostNorm", null, 1000, ServerType.HISTORICAL, "normal", 0)
.toImmutableDruidServer(),
mockPeon
),
new ServerHolder(
new DruidServer("serverNorm2", "hostNorm2", null, 1000, ServerType.HISTORICAL, "normal", 0)
.toImmutableDruidServer(),
mockPeon
)
)
.build();
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).withMaxNonPrimaryReplicantsToLoad(10).build())
.build();
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertEquals(34L, stats.getTieredStat("assignedCount", "normal"));
Assert.assertEquals(10L, stats.getGlobalStat("totalNonPrimaryReplicantsLoaded"));
exec.shutdown();
EasyMock.verify(mockPeon);
}
/**
* Nodes:
* normal - 2 replicants
* hot - 2 replicants
* maxNonPrimaryReplicantsToLoad - 48
* Expect only 72 segments to be loaded despite there being 96 primary + non-primary replicants to load!
*/
@Test
public void testTwoTiersTwoReplicantsWithStrictReplicantLimit()
{
mockCoordinator();
mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockEmptyPeon();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
Collections.singletonList(
new IntervalLoadRule(
Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"),
ImmutableMap.of("hot", 2, "normal", 2)
)
)).atLeastOnce();
EasyMock.replay(databaseRuleManager);
DruidCluster druidCluster = DruidClusterBuilder
.newBuilder()
.addTier(
"hot",
new ServerHolder(
new DruidServer("serverHot", "hostHot", null, 1000, ServerType.HISTORICAL, "hot", 0)
.toImmutableDruidServer(),
mockPeon
),
new ServerHolder(
new DruidServer("serverHot2", "hostHot2", null, 1000, ServerType.HISTORICAL, "hot", 0)
.toImmutableDruidServer(),
mockPeon
)
)
.addTier(
"normal",
new ServerHolder(
new DruidServer("serverNorm", "hostNorm", null, 1000, ServerType.HISTORICAL, "normal", 0)
.toImmutableDruidServer(),
mockPeon
),
new ServerHolder(
new DruidServer("serverNorm2", "hostNorm2", null, 1000, ServerType.HISTORICAL, "normal", 0)
.toImmutableDruidServer(),
mockPeon
)
)
.build();
ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).withMaxNonPrimaryReplicantsToLoad(48).build())
.build();
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertEquals(72L, stats.getTieredStat("assignedCount", "hot") + stats.getTieredStat("assignedCount", "normal"));
Assert.assertEquals(48L, stats.getGlobalStat("totalNonPrimaryReplicantsLoaded"));
exec.shutdown();
EasyMock.verify(mockPeon);
}
/** /**
* Nodes: * Nodes:
* hot - 1 replicant * hot - 1 replicant
@ -906,7 +1034,7 @@ public class RunRulesTest
DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy).build(); DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy).build();
RunRules runner = new RunRules(new ReplicationThrottler(7, 1), coordinator); RunRules runner = new RunRules(new ReplicationThrottler(7, 1, false), coordinator);
DruidCoordinatorRuntimeParams afterParams = runner.run(params); DruidCoordinatorRuntimeParams afterParams = runner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats(); CoordinatorStats stats = afterParams.getCoordinatorStats();

View File

@ -54,7 +54,8 @@ public class CoordinatorDynamicConfigTest
+ " \"decommissioningNodes\": [\"host1\", \"host2\"],\n" + " \"decommissioningNodes\": [\"host1\", \"host2\"],\n"
+ " \"decommissioningMaxPercentOfMaxSegmentsToMove\": 9,\n" + " \"decommissioningMaxPercentOfMaxSegmentsToMove\": 9,\n"
+ " \"pauseCoordination\": false,\n" + " \"pauseCoordination\": false,\n"
+ " \"replicateAfterLoadTimeout\": false\n" + " \"replicateAfterLoadTimeout\": false,\n"
+ " \"maxNonPrimaryReplicantsToLoad\": 2147483647\n"
+ "}\n"; + "}\n";
CoordinatorDynamicConfig actual = mapper.readValue( CoordinatorDynamicConfig actual = mapper.readValue(
@ -68,22 +69,25 @@ public class CoordinatorDynamicConfigTest
); );
ImmutableSet<String> decommissioning = ImmutableSet.of("host1", "host2"); ImmutableSet<String> decommissioning = ImmutableSet.of("host1", "host2");
ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2"); ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false, false); assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false, false, Integer.MAX_VALUE);
actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9, false, false); assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9, false, false, Integer.MAX_VALUE);
actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual); actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false, false); assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false, false, Integer.MAX_VALUE);
actual = CoordinatorDynamicConfig.builder().withPauseCoordination(true).build(actual); actual = CoordinatorDynamicConfig.builder().withPauseCoordination(true).build(actual);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, false); assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, false, Integer.MAX_VALUE);
actual = CoordinatorDynamicConfig.builder().withPercentOfSegmentsToConsiderPerMove(10).build(actual); actual = CoordinatorDynamicConfig.builder().withPercentOfSegmentsToConsiderPerMove(10).build(actual);
assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, false); assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, false, Integer.MAX_VALUE);
actual = CoordinatorDynamicConfig.builder().withReplicateAfterLoadTimeout(true).build(actual); actual = CoordinatorDynamicConfig.builder().withReplicateAfterLoadTimeout(true).build(actual);
assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, true); assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, true, Integer.MAX_VALUE);
actual = CoordinatorDynamicConfig.builder().withMaxNonPrimaryReplicantsToLoad(10).build(actual);
assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, true, 10);
} }
@ -114,13 +118,13 @@ public class CoordinatorDynamicConfigTest
); );
ImmutableSet<String> decommissioning = ImmutableSet.of(); ImmutableSet<String> decommissioning = ImmutableSet.of();
ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2"); ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false, false); assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false, false, Integer.MAX_VALUE);
actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual);
assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false, false); assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false, false, Integer.MAX_VALUE);
actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual); actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual);
assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false, false); assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false, false, Integer.MAX_VALUE);
} }
@Test @Test
@ -166,7 +170,8 @@ public class CoordinatorDynamicConfigTest
ImmutableSet.of(), ImmutableSet.of(),
0, 0,
false, false,
false false,
Integer.MAX_VALUE
); );
} }
@ -266,7 +271,7 @@ public class CoordinatorDynamicConfigTest
); );
ImmutableSet<String> decommissioning = ImmutableSet.of("host1", "host2"); ImmutableSet<String> decommissioning = ImmutableSet.of("host1", "host2");
ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2"); ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false, false); assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false, false, Integer.MAX_VALUE);
} }
@Test @Test
@ -296,7 +301,7 @@ public class CoordinatorDynamicConfigTest
CoordinatorDynamicConfig.class CoordinatorDynamicConfig.class
); );
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0, false, false); assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0, false, false, Integer.MAX_VALUE);
//ensure whitelist is empty when killAllDataSources is true //ensure whitelist is empty when killAllDataSources is true
try { try {
@ -343,7 +348,7 @@ public class CoordinatorDynamicConfigTest
CoordinatorDynamicConfig.class CoordinatorDynamicConfig.class
); );
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0, false, false); assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0, false, false, Integer.MAX_VALUE);
} }
@Test @Test
@ -368,7 +373,8 @@ public class CoordinatorDynamicConfigTest
emptyList, emptyList,
70, 70,
false, false,
false false,
Integer.MAX_VALUE
); );
} }
@ -383,11 +389,36 @@ public class CoordinatorDynamicConfigTest
Assert.assertEquals( Assert.assertEquals(
current, current,
new CoordinatorDynamicConfig new CoordinatorDynamicConfig
.Builder(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null) .Builder(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null)
.build(current) .build(current)
); );
} }
@Test
public void testSerdeHandleInvalidMaxNonPrimaryReplicantsToLoad() throws Exception
{
try {
String jsonStr = "{\n"
+ " \"maxNonPrimaryReplicantsToLoad\": -1\n"
+ "}\n";
mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
CoordinatorDynamicConfig.class
)
),
CoordinatorDynamicConfig.class
);
Assert.fail("deserialization should fail.");
}
catch (JsonMappingException e) {
Assert.assertTrue(e.getCause() instanceof IllegalArgumentException);
}
}
@Test @Test
public void testEqualsAndHashCodeSanity() public void testEqualsAndHashCodeSanity()
{ {
@ -414,7 +445,8 @@ public class CoordinatorDynamicConfigTest
Set<String> decommissioningNodes, Set<String> decommissioningNodes,
int decommissioningMaxPercentOfMaxSegmentsToMove, int decommissioningMaxPercentOfMaxSegmentsToMove,
boolean pauseCoordination, boolean pauseCoordination,
boolean replicateAfterLoadTimeout boolean replicateAfterLoadTimeout,
int maxNonPrimaryReplicantsToLoad
) )
{ {
Assert.assertEquals( Assert.assertEquals(
@ -442,5 +474,6 @@ public class CoordinatorDynamicConfigTest
); );
Assert.assertEquals(pauseCoordination, config.getPauseCoordination()); Assert.assertEquals(pauseCoordination, config.getPauseCoordination());
Assert.assertEquals(replicateAfterLoadTimeout, config.getReplicateAfterLoadTimeout()); Assert.assertEquals(replicateAfterLoadTimeout, config.getReplicateAfterLoadTimeout());
Assert.assertEquals(maxNonPrimaryReplicantsToLoad, config.getMaxNonPrimaryReplicantsToLoad());
} }
} }

View File

@ -191,6 +191,14 @@ exports[`coordinator dynamic config matches snapshot 1`] = `
"name": "replicateAfterLoadTimeout", "name": "replicateAfterLoadTimeout",
"type": "boolean", "type": "boolean",
}, },
Object {
"defaultValue": 2147483647,
"info": <React.Fragment>
The maximum number of non-primary replicants to load in a single Coordinator cycle. Once this limit is hit, only primary replicants will be loaded for the remainder of the cycle. Tuning this value lower can help reduce the delay in loading primary segments when the cluster has a very large number of non-primary replicants to load (such as when a single historical drops out of the cluster leaving many under-replicated segments).
</React.Fragment>,
"name": "maxNonPrimaryReplicantsToLoad",
"type": "number",
},
] ]
} }
model={Object {}} model={Object {}}

View File

@ -37,6 +37,7 @@ export interface CoordinatorDynamicConfig {
decommissioningNodes?: string[]; decommissioningNodes?: string[];
decommissioningMaxPercentOfMaxSegmentsToMove?: number; decommissioningMaxPercentOfMaxSegmentsToMove?: number;
pauseCoordination?: boolean; pauseCoordination?: boolean;
maxNonPrimaryReplicantsToLoad?: number;
} }
export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: Field<CoordinatorDynamicConfig>[] = [ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: Field<CoordinatorDynamicConfig>[] = [
@ -234,4 +235,18 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: Field<CoordinatorDynamicConfig>[
</> </>
), ),
}, },
{
name: 'maxNonPrimaryReplicantsToLoad',
type: 'number',
defaultValue: 2147483647,
info: (
<>
The maximum number of non-primary replicants to load in a single Coordinator cycle. Once
this limit is hit, only primary replicants will be loaded for the remainder of the cycle.
Tuning this value lower can help reduce the delay in loading primary segments when the
cluster has a very large number of non-primary replicants to load (such as when a single
historical drops out of the cluster leaving many under-replicated segments).
</>
),
},
]; ];

View File

@ -1787,6 +1787,7 @@ preloaded
queryType queryType
remoteTaskRunnerConfig remoteTaskRunnerConfig
rendezvousHash rendezvousHash
replicants
resultsets resultsets
roundRobin roundRobin
runtime.properties runtime.properties