maxSegmentsInQueue in CoordinatorDinamicConfig (#4445)

* Add maxSegmentsInQueue parameter to CoordinatorDinamicConfig and use it in LoadRule to improve segments loading and replication time

* Rename maxSegmentsInQueue to maxSegmentsInNodeLoadingQueue

* Make CoordinatorDynamicConfig constructor private; add/fix tests; set default maxSegmentsInNodeLoadingQueue to 0 (unbounded)

* Docs added for maxSegmentsInNodeLoadingQueue parameter in CoordinatorDynamicConfig

* More docs for maxSegmentsInNodeLoadingQueue and style fixes
This commit is contained in:
dgolitsyn 2017-06-28 07:58:36 +04:00 committed by Roman Leventov
parent 79fd5338e3
commit e04b8be52e
9 changed files with 450 additions and 164 deletions

View File

@ -93,6 +93,7 @@ Issuing a GET request at the same URL will return the spec that is currently in
|`emitBalancingStats`|Boolean flag for whether or not we should emit balancing stats. This is an expensive operation.|false|
|`killDataSourceWhitelist`|List of dataSources for which kill tasks are sent if property `druid.coordinator.kill.on` is true.|none|
|`killAllDataSources`|Send kill tasks for ALL dataSources if property `druid.coordinator.kill.on` is true. If this is set to true then `killDataSourceWhitelist` must not be specified or be empty list.|false|
|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be queued for loading to any given server. This parameter could be used to speed up segments loading process, especially if there are "slow" nodes in the cluster (with low loading speed) or if too much segments scheduled to be replicated to some particular node (faster loading could be preferred to better segments distribution). Desired value depends on segments loading speed, acceptable replication time and number of nodes. Value 1000 could be a start point for a rather big cluster. Default value is 0 (loading queue is unbounded) |0|
To view the audit history of coordinator dynamic config issue a GET request to the URL -

View File

@ -43,6 +43,13 @@ public class CoordinatorDynamicConfig
private final boolean emitBalancingStats;
private final boolean killAllDataSources;
private final Set<String> killDataSourceWhitelist;
/**
* The maximum number of segments that could be queued for loading to any given server.
* Default values is 0 with the meaning of "unbounded" (any number of
* segments could be added to the loading queue for any server).
* See {@link LoadQueuePeon}, {@link io.druid.server.coordinator.rules.LoadRule#run}
*/
private final int maxSegmentsInNodeLoadingQueue;
@JsonCreator
public CoordinatorDynamicConfig(
@ -60,7 +67,8 @@ public class CoordinatorDynamicConfig
// coordinator console can not send array of strings in the update request.
// See https://github.com/druid-io/druid/issues/3055
@JsonProperty("killDataSourceWhitelist") Object killDataSourceWhitelist,
@JsonProperty("killAllDataSources") Boolean killAllDataSources
@JsonProperty("killAllDataSources") Boolean killAllDataSources,
@JsonProperty("maxSegmentsInNodeLoadingQueue") Integer maxSegmentsInNodeLoadingQueue
)
{
CoordinatorDynamicConfig current = configManager.watch(
@ -94,12 +102,14 @@ public class CoordinatorDynamicConfig
? current.getKillDataSourceWhitelist()
: parseKillDataSourceWhitelist(killDataSourceWhitelist);
this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue == null ? current.getMaxSegmentsInNodeLoadingQueue() : maxSegmentsInNodeLoadingQueue;
if (this.killAllDataSources && !this.killDataSourceWhitelist.isEmpty()) {
throw new IAE("can't have killAllDataSources and non-empty killDataSourceWhitelist");
}
}
public CoordinatorDynamicConfig(
private CoordinatorDynamicConfig(
long millisToWaitBeforeDeleting,
long mergeBytesLimit,
int mergeSegmentsLimit,
@ -109,7 +119,8 @@ public class CoordinatorDynamicConfig
int balancerComputeThreads,
boolean emitBalancingStats,
Object killDataSourceWhitelist,
boolean killAllDataSources
boolean killAllDataSources,
int maxSegmentsInNodeLoadingQueue
)
{
this.maxSegmentsToMove = maxSegmentsToMove;
@ -122,6 +133,7 @@ public class CoordinatorDynamicConfig
this.balancerComputeThreads = Math.max(balancerComputeThreads, 1);
this.killAllDataSources = killAllDataSources;
this.killDataSourceWhitelist = parseKillDataSourceWhitelist(killDataSourceWhitelist);
this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
if (this.killAllDataSources && !this.killDataSourceWhitelist.isEmpty()) {
throw new IAE("can't have killAllDataSources and non-empty killDataSourceWhitelist");
@ -207,6 +219,12 @@ public class CoordinatorDynamicConfig
return killAllDataSources;
}
@JsonProperty
public int getMaxSegmentsInNodeLoadingQueue()
{
return maxSegmentsInNodeLoadingQueue;
}
@Override
public String toString()
{
@ -221,6 +239,7 @@ public class CoordinatorDynamicConfig
", emitBalancingStats=" + emitBalancingStats +
", killDataSourceWhitelist=" + killDataSourceWhitelist +
", killAllDataSources=" + killAllDataSources +
", maxSegmentsInNodeLoadingQueue=" + maxSegmentsInNodeLoadingQueue +
'}';
}
@ -263,6 +282,9 @@ public class CoordinatorDynamicConfig
if (killAllDataSources != that.killAllDataSources) {
return false;
}
if (maxSegmentsInNodeLoadingQueue != that.maxSegmentsInNodeLoadingQueue) {
return false;
}
return !(killDataSourceWhitelist != null
? !killDataSourceWhitelist.equals(that.killDataSourceWhitelist)
: that.killDataSourceWhitelist != null);
@ -282,9 +304,15 @@ public class CoordinatorDynamicConfig
result = 31 * result + (emitBalancingStats ? 1 : 0);
result = 31 * result + (killAllDataSources ? 1 : 0);
result = 31 * result + (killDataSourceWhitelist != null ? killDataSourceWhitelist.hashCode() : 0);
result = 31 * result + maxSegmentsInNodeLoadingQueue;
return result;
}
public static Builder builder()
{
return new Builder();
}
public static class Builder
{
private long millisToWaitBeforeDeleting;
@ -297,10 +325,11 @@ public class CoordinatorDynamicConfig
private int balancerComputeThreads;
private Set<String> killDataSourceWhitelist;
private boolean killAllDataSources;
private int maxSegmentsInNodeLoadingQueue;
public Builder()
{
this(15 * 60 * 1000L, 524288000L, 100, 5, 15, 10, 1, false, null, false);
this(15 * 60 * 1000L, 524288000L, 100, 5, 15, 10, 1, false, null, false, 0);
}
private Builder(
@ -313,7 +342,8 @@ public class CoordinatorDynamicConfig
int balancerComputeThreads,
boolean emitBalancingStats,
Set<String> killDataSourceWhitelist,
boolean killAllDataSources
boolean killAllDataSources,
int maxSegmentsInNodeLoadingQueue
)
{
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
@ -326,6 +356,7 @@ public class CoordinatorDynamicConfig
this.balancerComputeThreads = balancerComputeThreads;
this.killDataSourceWhitelist = killDataSourceWhitelist;
this.killAllDataSources = killAllDataSources;
this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
}
public Builder withMillisToWaitBeforeDeleting(long millisToWaitBeforeDeleting)
@ -370,12 +401,30 @@ public class CoordinatorDynamicConfig
return this;
}
public Builder withEmitBalancingStats(boolean emitBalancingStats)
{
this.emitBalancingStats = emitBalancingStats;
return this;
}
public Builder withKillDataSourceWhitelist(Set<String> killDataSourceWhitelist)
{
this.killDataSourceWhitelist = killDataSourceWhitelist;
return this;
}
public Builder withKillAllDataSources(boolean killAllDataSources)
{
this.killAllDataSources = killAllDataSources;
return this;
}
public Builder withMaxSegmentsInNodeLoadingQueue(int maxSegmentsInNodeLoadingQueue)
{
this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
return this;
}
public CoordinatorDynamicConfig build()
{
return new CoordinatorDynamicConfig(
@ -388,7 +437,8 @@ public class CoordinatorDynamicConfig
balancerComputeThreads,
emitBalancingStats,
killDataSourceWhitelist,
killAllDataSources
killAllDataSources,
maxSegmentsInNodeLoadingQueue
);
}
}

View File

@ -127,6 +127,11 @@ public class LoadQueuePeon
return failedAssignCount.getAndSet(0);
}
public int getNumberOfSegmentsInQueue()
{
return segmentsToLoad.size();
}
public void loadSegment(
final DataSegment segment,
final LoadPeonCallback callback

View File

@ -106,6 +106,11 @@ public class ServerHolder implements Comparable<ServerHolder>
return peon.getSegmentsToLoad().contains(segment);
}
public int getNumberOfSegmentsInQueue()
{
return peon.getNumberOfSegmentsInQueue();
}
@Override
public int compareTo(ServerHolder serverHolder)
{

View File

@ -35,7 +35,10 @@ import io.druid.timeline.DataSegment;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* LoadRules indicate the number of replicants a segment should have in a given tier.
@ -61,15 +64,29 @@ public abstract class LoadRule implements Rule
final int totalReplicantsInTier = params.getSegmentReplicantLookup()
.getTotalReplicants(segment.getIdentifier(), tier);
final int loadedReplicantsInTier = params.getSegmentReplicantLookup()
.getLoadedReplicants(segment.getIdentifier(), tier);
.getLoadedReplicants(segment.getIdentifier(), tier);
final MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getHistoricalsByTier(tier);
if (serverQueue == null) {
log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit();
continue;
}
final List<ServerHolder> serverHolderList = Lists.newArrayList(serverQueue);
final int maxSegmentsInNodeLoadingQueue = params.getCoordinatorDynamicConfig()
.getMaxSegmentsInNodeLoadingQueue();
Predicate<ServerHolder> serverHolderPredicate;
if (maxSegmentsInNodeLoadingQueue > 0) {
serverHolderPredicate = s -> (s != null && s.getNumberOfSegmentsInQueue() < maxSegmentsInNodeLoadingQueue);
} else {
serverHolderPredicate = Objects::nonNull;
}
final List<ServerHolder> serverHolderList = serverQueue.stream()
.filter(serverHolderPredicate)
.collect(Collectors.toList());
final BalancerStrategy strategy = params.getBalancerStrategy();
if (availableSegments.contains(segment)) {
CoordinatorStats assignStats = assign(
@ -215,11 +232,12 @@ public abstract class LoadRule implements Rule
return stats;
}
protected void validateTieredReplicants(Map<String, Integer> tieredReplicants){
if(tieredReplicants.size() == 0) {
protected void validateTieredReplicants(Map<String, Integer> tieredReplicants)
{
if (tieredReplicants.size() == 0) {
throw new IAE("A rule with empty tiered replicants is invalid");
}
for (Map.Entry<String, Integer> entry: tieredReplicants.entrySet()) {
for (Map.Entry<String, Integer> entry : tieredReplicants.entrySet()) {
if (entry.getValue() == null) {
throw new IAE("Replicant value cannot be empty");
}

View File

@ -118,15 +118,22 @@ public class DruidCoordinatorRuleRunnerTest
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
mockEmptyPeon();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), ImmutableMap.<String, Integer>of("hot", 1)),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), ImmutableMap.<String, Integer>of("normal", 1)),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), ImmutableMap.<String, Integer>of("cold", 1))
new IntervalLoadRule(
new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"),
ImmutableMap.<String, Integer>of("hot", 1)
),
new IntervalLoadRule(
new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"),
ImmutableMap.<String, Integer>of("normal", 1)
),
new IntervalLoadRule(
new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"),
ImmutableMap.<String, Integer>of("cold", 1)
)
)).atLeastOnce();
EasyMock.replay(databaseRuleManager);
@ -185,9 +192,9 @@ public class DruidCoordinatorRuleRunnerTest
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
@ -226,14 +233,18 @@ public class DruidCoordinatorRuleRunnerTest
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
mockEmptyPeon();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), ImmutableMap.<String, Integer>of("hot", 2)),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), ImmutableMap.<String, Integer>of("cold", 1))
new IntervalLoadRule(
new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"),
ImmutableMap.<String, Integer>of("hot", 2)
),
new IntervalLoadRule(
new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"),
ImmutableMap.<String, Integer>of("cold", 1)
)
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
@ -288,9 +299,9 @@ public class DruidCoordinatorRuleRunnerTest
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
@ -327,14 +338,18 @@ public class DruidCoordinatorRuleRunnerTest
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
mockEmptyPeon();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), ImmutableMap.<String, Integer>of("hot", 1)),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), ImmutableMap.<String, Integer>of("normal", 1))
new IntervalLoadRule(
new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"),
ImmutableMap.<String, Integer>of("hot", 1)
),
new IntervalLoadRule(
new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"),
ImmutableMap.<String, Integer>of("normal", 1)
)
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
@ -385,9 +400,9 @@ public class DruidCoordinatorRuleRunnerTest
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
@ -417,9 +432,7 @@ public class DruidCoordinatorRuleRunnerTest
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
mockEmptyPeon();
emitter.emit(EasyMock.<ServiceEventBuilder>anyObject());
EasyMock.expectLastCall().times(12);
@ -427,8 +440,14 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), ImmutableMap.<String, Integer>of("hot",1)),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), ImmutableMap.<String, Integer>of("normal", 1))
new IntervalLoadRule(
new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"),
ImmutableMap.<String, Integer>of("hot", 1)
),
new IntervalLoadRule(
new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"),
ImmutableMap.<String, Integer>of("normal", 1)
)
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
@ -456,9 +475,9 @@ public class DruidCoordinatorRuleRunnerTest
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
@ -488,7 +507,10 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-02T00:00:00.000Z/2012-01-03T00:00:00.000Z"), ImmutableMap.<String, Integer>of("normal", 1))
new IntervalLoadRule(
new Interval("2012-01-02T00:00:00.000Z/2012-01-03T00:00:00.000Z"),
ImmutableMap.<String, Integer>of("normal", 1)
)
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
@ -534,14 +556,10 @@ public class DruidCoordinatorRuleRunnerTest
{
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
mockEmptyPeon();
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(
new CoordinatorDynamicConfig(
0, 0, 0, 0, 1, 24, 0, false, null, false
)
createCoordinatorDynamicConfig()
).anyTimes();
coordinator.removeSegment(EasyMock.<DataSegment>anyObject());
EasyMock.expectLastCall().atLeastOnce();
@ -549,7 +567,10 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), ImmutableMap.<String, Integer>of("normal", 1)),
new IntervalLoadRule(
new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"),
ImmutableMap.<String, Integer>of("normal", 1)
),
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
).atLeastOnce();
@ -585,9 +606,9 @@ public class DruidCoordinatorRuleRunnerTest
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
@ -614,13 +635,14 @@ public class DruidCoordinatorRuleRunnerTest
mockCoordinator();
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
mockEmptyPeon();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), ImmutableMap.<String, Integer>of("normal", 1)),
new IntervalLoadRule(
new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"),
ImmutableMap.<String, Integer>of("normal", 1)
),
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
).atLeastOnce();
@ -670,9 +692,9 @@ public class DruidCoordinatorRuleRunnerTest
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
@ -702,13 +724,14 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expectLastCall().atLeastOnce();
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
mockEmptyPeon();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), ImmutableMap.<String, Integer>of("hot", 1)),
new IntervalLoadRule(
new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"),
ImmutableMap.<String, Integer>of("hot", 1)
),
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
).atLeastOnce();
@ -762,9 +785,9 @@ public class DruidCoordinatorRuleRunnerTest
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
@ -792,13 +815,14 @@ public class DruidCoordinatorRuleRunnerTest
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
mockEmptyPeon();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), ImmutableMap.<String, Integer>of("hot", 1)),
new IntervalLoadRule(
new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"),
ImmutableMap.<String, Integer>of("hot", 1)
),
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
).atLeastOnce();
@ -850,9 +874,9 @@ public class DruidCoordinatorRuleRunnerTest
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
@ -880,7 +904,10 @@ public class DruidCoordinatorRuleRunnerTest
mockCoordinator();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T01:00:00.000Z"), ImmutableMap.<String, Integer>of("normal", 0))
new IntervalLoadRule(
new Interval("2012-01-01T00:00:00.000Z/2012-01-01T01:00:00.000Z"),
ImmutableMap.<String, Integer>of("normal", 0)
)
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
@ -916,9 +943,7 @@ public class DruidCoordinatorRuleRunnerTest
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
mockEmptyPeon();
LoadQueuePeon anotherMockPeon = EasyMock.createMock(LoadQueuePeon.class);
EasyMock.expect(anotherMockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
@ -951,9 +976,9 @@ public class DruidCoordinatorRuleRunnerTest
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
@ -987,13 +1012,14 @@ public class DruidCoordinatorRuleRunnerTest
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
mockEmptyPeon();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2013-01-01T00:00:00.000Z"), ImmutableMap.<String, Integer>of("hot", 2))
new IntervalLoadRule(
new Interval("2012-01-01T00:00:00.000Z/2013-01-01T00:00:00.000Z"),
ImmutableMap.<String, Integer>of("hot", 2)
)
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
@ -1032,9 +1058,9 @@ public class DruidCoordinatorRuleRunnerTest
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
@ -1097,18 +1123,19 @@ public class DruidCoordinatorRuleRunnerTest
public void testReplicantThrottleAcrossTiers() throws Exception
{
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(
new CoordinatorDynamicConfig(
0, 0, 0, 0, 1, 7, 0, false, null, false
)
CoordinatorDynamicConfig.builder()
.withReplicationThrottleLimit(7)
.withReplicantLifetime(1)
.withMaxSegmentsInNodeLoadingQueue(1000)
.build()
).atLeastOnce();
coordinator.removeSegment(EasyMock.<DataSegment>anyObject());
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(coordinator);
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
mockEmptyPeon();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
@ -1162,9 +1189,9 @@ public class DruidCoordinatorRuleRunnerTest
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
@ -1195,13 +1222,14 @@ public class DruidCoordinatorRuleRunnerTest
mockCoordinator();
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
mockEmptyPeon();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2013-01-02T00:00:00.000Z"), ImmutableMap.<String, Integer>of("normal", 1))
new IntervalLoadRule(
new Interval("2012-01-01T00:00:00.000Z/2013-01-02T00:00:00.000Z"),
ImmutableMap.<String, Integer>of("normal", 1)
)
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
@ -1265,9 +1293,9 @@ public class DruidCoordinatorRuleRunnerTest
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
@ -1320,9 +1348,7 @@ public class DruidCoordinatorRuleRunnerTest
mockCoordinator();
mockPeon.loadSegment(EasyMock.eq(v2), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().once();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
mockEmptyPeon();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
@ -1353,9 +1379,9 @@ public class DruidCoordinatorRuleRunnerTest
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
@ -1387,12 +1413,35 @@ public class DruidCoordinatorRuleRunnerTest
private void mockCoordinator()
{
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(
new CoordinatorDynamicConfig(
0, 0, 0, 0, 1, 24, 0, false, null, false
)
createCoordinatorDynamicConfig()
).anyTimes();
coordinator.removeSegment(EasyMock.<DataSegment>anyObject());
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(coordinator);
}
private void mockEmptyPeon()
{
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes();
EasyMock.replay(mockPeon);
}
private CoordinatorDynamicConfig createCoordinatorDynamicConfig()
{
return CoordinatorDynamicConfig.builder()
.withMillisToWaitBeforeDeleting(0)
.withMergeBytesLimit(0)
.withMergeSegmentsLimit(0)
.withMaxSegmentsToMove(0)
.withReplicantLifetime(1)
.withReplicationThrottleLimit(24)
.withBalancerComputeThreads(0)
.withEmitBalancingStats(false)
.withKillDataSourceWhitelist(null)
.withKillAllDataSources(false)
.withMaxSegmentsInNodeLoadingQueue(1000)
.build();
}
}

View File

@ -46,4 +46,10 @@ public class LoadQueuePeonTester extends LoadQueuePeon
{
return segmentsToLoad;
}
@Override
public int getNumberOfSegmentsInQueue()
{
return segmentsToLoad.size();
}
}

View File

@ -36,12 +36,14 @@ import io.druid.client.DruidServer;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.server.coordination.ServerType;
import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.CoordinatorDynamicConfig;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.CostBalancerStrategyFactory;
import io.druid.server.coordinator.DruidCluster;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.server.coordinator.LoadPeonCallback;
import io.druid.server.coordinator.LoadQueuePeon;
import io.druid.server.coordinator.LoadQueuePeonTester;
import io.druid.server.coordinator.ReplicationThrottler;
import io.druid.server.coordinator.SegmentReplicantLookup;
import io.druid.server.coordinator.ServerHolder;
@ -91,17 +93,7 @@ public class LoadRuleTest
for (String tier : Arrays.asList("hot", DruidServer.DEFAULT_TIER)) {
throttler.updateReplicationState(tier);
}
segment = new DataSegment(
"foo",
new Interval("0/3000"),
new DateTime().toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
NoneShardSpec.instance(),
0,
0
);
segment = createDataSegment("foo");
}
@After
@ -118,6 +110,7 @@ public class LoadRuleTest
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes();
EasyMock.replay(mockPeon);
LoadRule rule = new LoadRule()
@ -197,9 +190,9 @@ public class LoadRuleTest
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
CoordinatorStats stats = rule.run(
null,
@ -225,6 +218,7 @@ public class LoadRuleTest
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).anyTimes();
EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes();
EasyMock.replay(mockPeon);
LoadRule rule = new LoadRule()
@ -308,9 +302,9 @@ public class LoadRuleTest
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
CoordinatorStats stats = rule.run(
null,
@ -336,6 +330,7 @@ public class LoadRuleTest
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes();
EasyMock.replay(mockPeon);
LoadRule rule = new LoadRule()
@ -399,9 +394,9 @@ public class LoadRuleTest
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
CoordinatorStats stats = rule.run(
null,
@ -426,6 +421,7 @@ public class LoadRuleTest
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).anyTimes();
EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes();
EasyMock.replay(mockPeon);
LoadRule rule = new LoadRule()
@ -505,9 +501,9 @@ public class LoadRuleTest
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
CoordinatorStats stats = rule.run(
null,
@ -524,4 +520,115 @@ public class LoadRuleTest
Assert.assertEquals(1L, stats.getTieredStat("droppedCount", "hot"));
exec.shutdown();
}
@Test
public void testMaxLoadingQueueSize() throws Exception
{
EasyMock.replay(mockPeon);
LoadQueuePeonTester peon = new LoadQueuePeonTester();
LoadRule rule = new LoadRule()
{
private final Map<String, Integer> tiers = ImmutableMap.of(
"hot", 1
);
@Override
public Map<String, Integer> getTieredReplicants()
{
return tiers;
}
@Override
public int getNumReplicants(String tier)
{
return tiers.get(tier);
}
@Override
public String getType()
{
return "test";
}
@Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
return true;
}
@Override
public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
{
return true;
}
};
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
new DruidServer(
"serverHot",
"hostHot",
1000,
ServerType.HISTORICAL,
"hot",
0
).toImmutableDruidServer(),
peon
)
)
)
)
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DataSegment dataSegment1 = createDataSegment("ds1");
DataSegment dataSegment2 = createDataSegment("ds2");
DataSegment dataSegment3 = createDataSegment("ds3");
DruidCoordinatorRuntimeParams params =
DruidCoordinatorRuntimeParams
.newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withReplicationManager(throttler)
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.withAvailableSegments(Arrays.asList(dataSegment1, dataSegment2, dataSegment3))
.withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMaxSegmentsInNodeLoadingQueue(2).build())
.build();
CoordinatorStats stats1 = rule.run(null, params, dataSegment1);
CoordinatorStats stats2 = rule.run(null, params, dataSegment2);
CoordinatorStats stats3 = rule.run(null, params, dataSegment3);
Assert.assertEquals(1L, stats1.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot"));
Assert.assertEquals(1L, stats2.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot"));
Assert.assertEquals(0L, stats3.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot"));
exec.shutdown();
}
private DataSegment createDataSegment(String dataSource)
{
return new DataSegment(
dataSource,
new Interval("0/3000"),
new DateTime().toString(),
Maps.newHashMap(),
Lists.newArrayList(),
Lists.newArrayList(),
NoneShardSpec.instance(),
0,
0
);
}
}

View File

@ -32,6 +32,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
/**
@ -69,7 +70,8 @@ public class CoordinatorDynamicConfigTest
+ " \"replicationThrottleLimit\": 1,\n"
+ " \"balancerComputeThreads\": 2, \n"
+ " \"emitBalancingStats\": true,\n"
+ " \"killDataSourceWhitelist\": [\"test1\",\"test2\"]\n"
+ " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n"
+ " \"maxSegmentsInNodeLoadingQueue\": 1\n"
+ "}\n";
CoordinatorDynamicConfig actual = mapper.readValue(
@ -81,11 +83,7 @@ public class CoordinatorDynamicConfigTest
),
CoordinatorDynamicConfig.class
);
Assert.assertEquals(
new CoordinatorDynamicConfig(1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of("test1", "test2"), false),
actual
);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of("test1", "test2"), false, 1);
}
@Test
@ -100,7 +98,36 @@ public class CoordinatorDynamicConfigTest
+ " \"replicationThrottleLimit\": 1,\n"
+ " \"balancerComputeThreads\": 2, \n"
+ " \"emitBalancingStats\": true,\n"
+ " \"killDataSourceWhitelist\": \" test1 ,test2 \"\n"
+ " \"killDataSourceWhitelist\": \" test1 ,test2 \", \n"
+ " \"maxSegmentsInNodeLoadingQueue\": 1\n"
+ "}\n";
CoordinatorDynamicConfig actual = mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
CoordinatorDynamicConfig.class
)
),
CoordinatorDynamicConfig.class
);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of("test1", "test2"), false, 1);
}
@Test
public void testSerdeWithKillAllDataSources() throws Exception
{
String jsonStr = "{\n"
+ " \"millisToWaitBeforeDeleting\": 1,\n"
+ " \"mergeBytesLimit\": 1,\n"
+ " \"mergeSegmentsLimit\" : 1,\n"
+ " \"maxSegmentsToMove\": 1,\n"
+ " \"replicantLifetime\": 1,\n"
+ " \"replicationThrottleLimit\": 1,\n"
+ " \"balancerComputeThreads\": 2, \n"
+ " \"emitBalancingStats\": true,\n"
+ " \"killAllDataSources\": true,\n"
+ " \"maxSegmentsInNodeLoadingQueue\": 1\n"
+ "}\n";
CoordinatorDynamicConfig actual = mapper.readValue(
@ -113,14 +140,27 @@ public class CoordinatorDynamicConfigTest
CoordinatorDynamicConfig.class
);
Assert.assertEquals(
new CoordinatorDynamicConfig(1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of("test1", "test2"), false),
actual
);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1);
//ensure whitelist is empty when killAllDataSources is true
try {
jsonStr = "{\n"
+ " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n"
+ " \"killAllDataSources\": true\n"
+ "}\n";
mapper.readValue(
jsonStr,
CoordinatorDynamicConfig.class
);
Assert.fail("deserialization should fail.");
} catch (JsonMappingException e) {
Assert.assertTrue(e.getCause() instanceof IAE);
}
}
@Test
public void testSerdeWithKillAllDataSources() throws Exception
public void testDeserializeWithoutMaxSegmentsInNodeLoadingQueue() throws Exception
{
String jsonStr = "{\n"
+ " \"millisToWaitBeforeDeleting\": 1,\n"
@ -144,43 +184,23 @@ public class CoordinatorDynamicConfigTest
CoordinatorDynamicConfig.class
);
Assert.assertEquals(
new CoordinatorDynamicConfig(1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true),
actual
);
//ensure whitelist is empty when killAllDataSources is true
try {
jsonStr = "{\n"
+ " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n"
+ " \"killAllDataSources\": true\n"
+ "}\n";
mapper.readValue(
jsonStr,
CoordinatorDynamicConfig.class
);
Assert.fail("deserialization should fail.");
} catch (JsonMappingException e) {
Assert.assertTrue(e.getCause() instanceof IAE);
}
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0);
}
@Test
public void testBuilderDefaults()
{
Assert.assertEquals(
new CoordinatorDynamicConfig(900000, 524288000, 100, 5, 15, 10, 1, false, null, false),
new CoordinatorDynamicConfig.Builder().build()
);
CoordinatorDynamicConfig defaultConfig = CoordinatorDynamicConfig.builder().build();
assertConfig(defaultConfig, 900000, 524288000, 100, 5, 15, 10, 1, false, ImmutableSet.of(), false, 0);
}
@Test
public void testUpdate()
{
CoordinatorDynamicConfig current = new CoordinatorDynamicConfig(99, 99, 99, 99, 99, 99, 99, true, ImmutableSet.of("x"), false);
CoordinatorDynamicConfig current = CoordinatorDynamicConfig.builder()
.withKillDataSourceWhitelist(ImmutableSet.of("x"))
.build();
JacksonConfigManager mock = EasyMock.mock(JacksonConfigManager.class);
EasyMock.expect(mock.watch(CoordinatorDynamicConfig.CONFIG_KEY, CoordinatorDynamicConfig.class)).andReturn(
new AtomicReference<>(current)
@ -188,17 +208,42 @@ public class CoordinatorDynamicConfigTest
EasyMock.replay(mock);
Assert.assertEquals(
current,
new CoordinatorDynamicConfig(mock, null, null, null, null, null, null, null, null, null, null)
new CoordinatorDynamicConfig(mock, null, null, null, null, null, null, null, null, null, null, null)
);
}
@Test
public void testEqualsAndHashCodeSanity()
{
CoordinatorDynamicConfig config1 = new CoordinatorDynamicConfig(900000, 524288000, 100, 5, 15, 10, 1, false, null, false);
CoordinatorDynamicConfig config2 = new CoordinatorDynamicConfig(900000, 524288000, 100, 5, 15, 10, 1, false, null, false);
CoordinatorDynamicConfig config1 = CoordinatorDynamicConfig.builder().build();
CoordinatorDynamicConfig config2 = CoordinatorDynamicConfig.builder().build();
Assert.assertEquals(config1, config2);
Assert.assertEquals(config1.hashCode(), config2.hashCode());
}
private void assertConfig(CoordinatorDynamicConfig config,
long expectedMillisToWaitBeforeDeleting,
long expectedMergeBytesLimit,
int expectedMergeSegmentsLimit,
int expectedMaxSegmentsToMove,
int expectedReplicantLifetime,
int expectedReplicationThrottleLimit,
int expectedBalancerComputeThreads,
boolean expectedEmitingBalancingStats,
Set<String> expectedKillDataSourceWhitelist,
boolean expectedKillAllDataSources,
int expectedMaxSegmentsInNodeLoadingQueue)
{
Assert.assertEquals(expectedMillisToWaitBeforeDeleting, config.getMillisToWaitBeforeDeleting());
Assert.assertEquals(expectedMergeBytesLimit, config.getMergeBytesLimit());
Assert.assertEquals(expectedMergeSegmentsLimit, config.getMergeSegmentsLimit());
Assert.assertEquals(expectedMaxSegmentsToMove, config.getMaxSegmentsToMove());
Assert.assertEquals(expectedReplicantLifetime, config.getReplicantLifetime());
Assert.assertEquals(expectedReplicationThrottleLimit, config.getReplicationThrottleLimit());
Assert.assertEquals(expectedBalancerComputeThreads, config.getBalancerComputeThreads());
Assert.assertEquals(expectedEmitingBalancingStats, config.emitBalancingStats());
Assert.assertEquals(expectedKillDataSourceWhitelist, config.getKillDataSourceWhitelist());
Assert.assertEquals(expectedKillAllDataSources, config.isKillAllDataSources());
Assert.assertEquals(expectedMaxSegmentsInNodeLoadingQueue, config.getMaxSegmentsInNodeLoadingQueue());
}
}