mirror of https://github.com/apache/druid.git
finish making MAX_SEGMENTS_TO_MOVE a config item
This commit is contained in:
parent
ba82fe746a
commit
6fda5330fd
|
@ -44,7 +44,7 @@ import java.util.Set;
|
|||
public class BalancerCostAnalyzer
|
||||
{
|
||||
private static final Logger log = new Logger(BalancerCostAnalyzer.class);
|
||||
private final int MAX_SEGMENTS_TO_MOVE;
|
||||
private int MAX_SEGMENTS_TO_MOVE;
|
||||
private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24;
|
||||
private static final int SEVEN_DAYS_IN_MILLIS = 7 * DAY_IN_MILLIS;
|
||||
private static final int THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS;
|
||||
|
@ -58,20 +58,21 @@ public class BalancerCostAnalyzer
|
|||
private double totalCostChange;
|
||||
private int totalSegments;
|
||||
|
||||
public BalancerCostAnalyzer(DateTime referenceTimestamp, int MAX_SEGMENTS_TO_MOVE)
|
||||
public BalancerCostAnalyzer(DateTime referenceTimestamp)
|
||||
{
|
||||
this.referenceTimestamp = referenceTimestamp;
|
||||
this.MAX_SEGMENTS_TO_MOVE = MAX_SEGMENTS_TO_MOVE;
|
||||
|
||||
rand = new Random(0);
|
||||
totalCostChange = 0;
|
||||
}
|
||||
|
||||
public void init(List<ServerHolder> serverHolderList, DruidMasterRuntimeParams params)
|
||||
{
|
||||
this.serverHolderList = serverHolderList;
|
||||
this.initialTotalCost = calculateInitialTotalCost(serverHolderList);
|
||||
this.normalization = calculateNormalization(serverHolderList);
|
||||
this.serverHolderList = serverHolderList;
|
||||
this.totalSegments = params.getAvailableSegments().size();
|
||||
this.MAX_SEGMENTS_TO_MOVE = params.getMaxSegmentsToMove();
|
||||
}
|
||||
|
||||
public double getInitialTotalCost()
|
||||
|
|
|
@ -575,6 +575,7 @@ public class DruidMaster
|
|||
.withEmitter(emitter)
|
||||
.withMergeBytesLimit(config.getMergeBytesLimit())
|
||||
.withMergeSegmentsLimit(config.getMergeSegmentsLimit())
|
||||
.withMaxSegmentsToMove(config.getMaxSegmentsToMove())
|
||||
.build();
|
||||
|
||||
for (DruidMasterHelper helper : helpers) {
|
||||
|
@ -680,7 +681,7 @@ public class DruidMaster
|
|||
},
|
||||
new DruidMasterRuleRunner(DruidMaster.this),
|
||||
new DruidMasterCleanup(DruidMaster.this),
|
||||
new DruidMasterBalancer(DruidMaster.this, new BalancerCostAnalyzer(DateTime.now(), config.getMaxSegmentsToMove())),
|
||||
new DruidMasterBalancer(DruidMaster.this, new BalancerCostAnalyzer(DateTime.now())),
|
||||
new DruidMasterLogger()
|
||||
)
|
||||
);
|
||||
|
|
|
@ -49,6 +49,7 @@ public class DruidMasterRuntimeParams
|
|||
private final MasterStats stats;
|
||||
private final long mergeBytesLimit;
|
||||
private final int mergeSegmentsLimit;
|
||||
private final int maxSegmentsToMove;
|
||||
|
||||
public DruidMasterRuntimeParams(
|
||||
long startTime,
|
||||
|
@ -62,7 +63,8 @@ public class DruidMasterRuntimeParams
|
|||
long millisToWaitBeforeDeleting,
|
||||
MasterStats stats,
|
||||
long mergeBytesLimit,
|
||||
int mergeSegmentsLimit
|
||||
int mergeSegmentsLimit,
|
||||
int maxSegmentsToMove
|
||||
)
|
||||
{
|
||||
this.startTime = startTime;
|
||||
|
@ -77,6 +79,12 @@ public class DruidMasterRuntimeParams
|
|||
this.stats = stats;
|
||||
this.mergeBytesLimit = mergeBytesLimit;
|
||||
this.mergeSegmentsLimit = mergeSegmentsLimit;
|
||||
this.maxSegmentsToMove = maxSegmentsToMove;
|
||||
}
|
||||
|
||||
public int getMaxSegmentsToMove()
|
||||
{
|
||||
return maxSegmentsToMove;
|
||||
}
|
||||
|
||||
public long getStartTime()
|
||||
|
@ -163,7 +171,8 @@ public class DruidMasterRuntimeParams
|
|||
millisToWaitBeforeDeleting,
|
||||
stats,
|
||||
mergeBytesLimit,
|
||||
mergeSegmentsLimit
|
||||
mergeSegmentsLimit,
|
||||
maxSegmentsToMove
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -181,6 +190,7 @@ public class DruidMasterRuntimeParams
|
|||
private MasterStats stats;
|
||||
private long mergeBytesLimit;
|
||||
private int mergeSegmentsLimit;
|
||||
private int maxSegmentsToMove;
|
||||
|
||||
Builder()
|
||||
{
|
||||
|
@ -196,6 +206,7 @@ public class DruidMasterRuntimeParams
|
|||
this.stats = new MasterStats();
|
||||
this.mergeBytesLimit = 0;
|
||||
this.mergeSegmentsLimit = 0;
|
||||
this.maxSegmentsToMove = 0;
|
||||
}
|
||||
|
||||
Builder(
|
||||
|
@ -210,7 +221,8 @@ public class DruidMasterRuntimeParams
|
|||
long millisToWaitBeforeDeleting,
|
||||
MasterStats stats,
|
||||
long mergeBytesLimit,
|
||||
int mergeSegmentsLimit
|
||||
int mergeSegmentsLimit,
|
||||
int maxSegmentsToMove
|
||||
)
|
||||
{
|
||||
this.startTime = startTime;
|
||||
|
@ -225,6 +237,7 @@ public class DruidMasterRuntimeParams
|
|||
this.stats = stats;
|
||||
this.mergeBytesLimit = mergeBytesLimit;
|
||||
this.mergeSegmentsLimit = mergeSegmentsLimit;
|
||||
this.maxSegmentsToMove = maxSegmentsToMove;
|
||||
}
|
||||
|
||||
public DruidMasterRuntimeParams build()
|
||||
|
@ -241,7 +254,8 @@ public class DruidMasterRuntimeParams
|
|||
millisToWaitBeforeDeleting,
|
||||
stats,
|
||||
mergeBytesLimit,
|
||||
mergeSegmentsLimit
|
||||
mergeSegmentsLimit,
|
||||
maxSegmentsToMove
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -316,5 +330,11 @@ public class DruidMasterRuntimeParams
|
|||
this.mergeSegmentsLimit = mergeSegmentsLimit;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withMaxSegmentsToMove(int maxSegmentsToMove)
|
||||
{
|
||||
this.maxSegmentsToMove = maxSegmentsToMove;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -65,7 +65,7 @@ public abstract class LoadRule implements Rule
|
|||
totalReplicants,
|
||||
serverQueue,
|
||||
segment,
|
||||
master.getConfig().getMaxSegmentsToMove()
|
||||
params.getMaxSegmentsToMove()
|
||||
)
|
||||
);
|
||||
stats.accumulate(drop(expectedReplicants, clusterReplicants, segment, params));
|
||||
|
@ -87,7 +87,7 @@ public abstract class LoadRule implements Rule
|
|||
|
||||
List<ServerHolder> assignedServers = Lists.newArrayList();
|
||||
while (totalReplicants < expectedReplicants) {
|
||||
BalancerCostAnalyzer analyzer = new BalancerCostAnalyzer(DateTime.now(), MAX_SEGMENTS_TO_MOVE);
|
||||
BalancerCostAnalyzer analyzer = new BalancerCostAnalyzer(DateTime.now());
|
||||
BalancerCostAnalyzer.BalancerCostAnalyzerHelper helper = analyzer.new BalancerCostAnalyzerHelper(serverHolderList, segment);
|
||||
helper.computeAllCosts();
|
||||
Pair<Double, ServerHolder> minPair = helper.getCostsServerHolderPairs().pollFirst();
|
||||
|
|
|
@ -44,6 +44,7 @@ import java.util.Map;
|
|||
*/
|
||||
public class DruidMasterBalancerTest
|
||||
{
|
||||
private static final int MAX_SEGMENTS_TO_MOVE = 5;
|
||||
private DruidMaster master;
|
||||
private DruidServer druidServer1;
|
||||
private DruidServer druidServer2;
|
||||
|
@ -222,6 +223,7 @@ public class DruidMasterBalancerTest
|
|||
)
|
||||
.withLoadManagementPeons(ImmutableMap.of("from", peon, "to", peon))
|
||||
.withAvailableSegments(segments.values())
|
||||
.withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
|
||||
.build();
|
||||
|
||||
params = new DruidMasterBalancer(master, new BalancerCostAnalyzer(new DateTime("2013-01-01"))).run(params);
|
||||
|
@ -370,6 +372,7 @@ public class DruidMasterBalancerTest
|
|||
)
|
||||
.withLoadManagementPeons(ImmutableMap.of("1", peon, "2", peon, "3", peon, "4", peon))
|
||||
.withAvailableSegments(segments.values())
|
||||
.withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
|
||||
.build();
|
||||
|
||||
params = new DruidMasterBalancer(master, new BalancerCostAnalyzer(new DateTime("2013-01-01"))).run(params);
|
||||
|
|
|
@ -175,6 +175,7 @@ public class DruidMasterRuleRunnerTest
|
|||
.withAvailableSegments(availableSegments)
|
||||
.withDatabaseRuleManager(databaseRuleManager)
|
||||
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
|
||||
.withMaxSegmentsToMove(5)
|
||||
.build();
|
||||
|
||||
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
|
||||
|
|
|
@ -124,6 +124,12 @@ public class DruidMasterTest
|
|||
{
|
||||
return "";
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxSegmentsToMove()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
},
|
||||
null,
|
||||
null,
|
||||
|
|
Loading…
Reference in New Issue