mirror of https://github.com/apache/druid.git
put BalancerCostAnalyzer into params
This commit is contained in:
parent
617907d85b
commit
5f40dc6d9c
|
@ -676,12 +676,13 @@ public class DruidMaster
|
|||
.withDruidCluster(cluster)
|
||||
.withDatabaseRuleManager(databaseRuleManager)
|
||||
.withSegmentReplicantLookup(segmentReplicantLookup)
|
||||
.withBalancerCostAnalyzer(new BalancerCostAnalyzer(DateTime.now()))
|
||||
.build();
|
||||
}
|
||||
},
|
||||
new DruidMasterRuleRunner(DruidMaster.this),
|
||||
new DruidMasterCleanup(DruidMaster.this),
|
||||
new DruidMasterBalancer(DruidMaster.this, new BalancerCostAnalyzer(DateTime.now())),
|
||||
new DruidMasterBalancer(DruidMaster.this),
|
||||
new DruidMasterLogger()
|
||||
)
|
||||
);
|
||||
|
|
|
@ -51,18 +51,15 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
|||
);
|
||||
private static final EmittingLogger log = new EmittingLogger(DruidMasterBalancer.class);
|
||||
|
||||
private final BalancerCostAnalyzer analyzer;
|
||||
private final DruidMaster master;
|
||||
|
||||
private final Map<String, ConcurrentHashMap<String, BalancerSegmentHolder>> currentlyMovingSegments = Maps.newHashMap();
|
||||
|
||||
public DruidMasterBalancer(
|
||||
DruidMaster master,
|
||||
BalancerCostAnalyzer analyzer
|
||||
DruidMaster master
|
||||
)
|
||||
{
|
||||
this.master = master;
|
||||
this.analyzer = analyzer;
|
||||
}
|
||||
|
||||
private void reduceLifetimes(String tier)
|
||||
|
@ -106,6 +103,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
|||
|
||||
List<ServerHolder> serverHolderList = new ArrayList<ServerHolder>(entry.getValue());
|
||||
|
||||
BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer();
|
||||
analyzer.init(serverHolderList, params);
|
||||
moveSegments(analyzer.findSegmentsToMove(), params);
|
||||
|
||||
|
|
|
@ -50,6 +50,7 @@ public class DruidMasterRuntimeParams
|
|||
private final long mergeBytesLimit;
|
||||
private final int mergeSegmentsLimit;
|
||||
private final int maxSegmentsToMove;
|
||||
private final BalancerCostAnalyzer balancerCostAnalyzer;
|
||||
|
||||
public DruidMasterRuntimeParams(
|
||||
long startTime,
|
||||
|
@ -64,7 +65,8 @@ public class DruidMasterRuntimeParams
|
|||
MasterStats stats,
|
||||
long mergeBytesLimit,
|
||||
int mergeSegmentsLimit,
|
||||
int maxSegmentsToMove
|
||||
int maxSegmentsToMove,
|
||||
BalancerCostAnalyzer balancerCostAnalyzer
|
||||
)
|
||||
{
|
||||
this.startTime = startTime;
|
||||
|
@ -80,11 +82,7 @@ public class DruidMasterRuntimeParams
|
|||
this.mergeBytesLimit = mergeBytesLimit;
|
||||
this.mergeSegmentsLimit = mergeSegmentsLimit;
|
||||
this.maxSegmentsToMove = maxSegmentsToMove;
|
||||
}
|
||||
|
||||
public int getMaxSegmentsToMove()
|
||||
{
|
||||
return maxSegmentsToMove;
|
||||
this.balancerCostAnalyzer = balancerCostAnalyzer;
|
||||
}
|
||||
|
||||
public long getStartTime()
|
||||
|
@ -147,6 +145,16 @@ public class DruidMasterRuntimeParams
|
|||
return mergeSegmentsLimit;
|
||||
}
|
||||
|
||||
public int getMaxSegmentsToMove()
|
||||
{
|
||||
return maxSegmentsToMove;
|
||||
}
|
||||
|
||||
public BalancerCostAnalyzer getBalancerCostAnalyzer()
|
||||
{
|
||||
return balancerCostAnalyzer;
|
||||
}
|
||||
|
||||
public boolean hasDeletionWaitTimeElapsed()
|
||||
{
|
||||
return (System.currentTimeMillis() - getStartTime() > getMillisToWaitBeforeDeleting());
|
||||
|
@ -172,7 +180,8 @@ public class DruidMasterRuntimeParams
|
|||
stats,
|
||||
mergeBytesLimit,
|
||||
mergeSegmentsLimit,
|
||||
maxSegmentsToMove
|
||||
maxSegmentsToMove,
|
||||
balancerCostAnalyzer
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -191,6 +200,7 @@ public class DruidMasterRuntimeParams
|
|||
private long mergeBytesLimit;
|
||||
private int mergeSegmentsLimit;
|
||||
private int maxSegmentsToMove;
|
||||
private BalancerCostAnalyzer balancerCostAnalyzer;
|
||||
|
||||
Builder()
|
||||
{
|
||||
|
@ -207,6 +217,7 @@ public class DruidMasterRuntimeParams
|
|||
this.mergeBytesLimit = 0;
|
||||
this.mergeSegmentsLimit = 0;
|
||||
this.maxSegmentsToMove = 0;
|
||||
this.balancerCostAnalyzer = null;
|
||||
}
|
||||
|
||||
Builder(
|
||||
|
@ -222,7 +233,8 @@ public class DruidMasterRuntimeParams
|
|||
MasterStats stats,
|
||||
long mergeBytesLimit,
|
||||
int mergeSegmentsLimit,
|
||||
int maxSegmentsToMove
|
||||
int maxSegmentsToMove,
|
||||
BalancerCostAnalyzer balancerCostAnalyzer
|
||||
)
|
||||
{
|
||||
this.startTime = startTime;
|
||||
|
@ -238,6 +250,7 @@ public class DruidMasterRuntimeParams
|
|||
this.mergeBytesLimit = mergeBytesLimit;
|
||||
this.mergeSegmentsLimit = mergeSegmentsLimit;
|
||||
this.maxSegmentsToMove = maxSegmentsToMove;
|
||||
this.balancerCostAnalyzer = balancerCostAnalyzer;
|
||||
}
|
||||
|
||||
public DruidMasterRuntimeParams build()
|
||||
|
@ -255,7 +268,8 @@ public class DruidMasterRuntimeParams
|
|||
stats,
|
||||
mergeBytesLimit,
|
||||
mergeSegmentsLimit,
|
||||
maxSegmentsToMove
|
||||
maxSegmentsToMove,
|
||||
balancerCostAnalyzer
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -336,5 +350,11 @@ public class DruidMasterRuntimeParams
|
|||
this.maxSegmentsToMove = maxSegmentsToMove;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withBalancerCostAnalyzer(BalancerCostAnalyzer balancerCostAnalyzer)
|
||||
{
|
||||
this.balancerCostAnalyzer = balancerCostAnalyzer;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,7 +31,6 @@ import com.metamx.druid.master.LoadPeonCallback;
|
|||
import com.metamx.druid.master.MasterStats;
|
||||
import com.metamx.druid.master.ServerHolder;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
@ -65,7 +64,7 @@ public abstract class LoadRule implements Rule
|
|||
totalReplicants,
|
||||
serverQueue,
|
||||
segment,
|
||||
params.getMaxSegmentsToMove()
|
||||
params
|
||||
)
|
||||
);
|
||||
stats.accumulate(drop(expectedReplicants, clusterReplicants, segment, params));
|
||||
|
@ -78,7 +77,7 @@ public abstract class LoadRule implements Rule
|
|||
int totalReplicants,
|
||||
MinMaxPriorityQueue<ServerHolder> serverQueue,
|
||||
DataSegment segment,
|
||||
int MAX_SEGMENTS_TO_MOVE
|
||||
DruidMasterRuntimeParams params
|
||||
)
|
||||
{
|
||||
MasterStats stats = new MasterStats();
|
||||
|
@ -87,7 +86,7 @@ public abstract class LoadRule implements Rule
|
|||
|
||||
List<ServerHolder> assignedServers = Lists.newArrayList();
|
||||
while (totalReplicants < expectedReplicants) {
|
||||
BalancerCostAnalyzer analyzer = new BalancerCostAnalyzer(DateTime.now());
|
||||
BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer();
|
||||
BalancerCostAnalyzer.BalancerCostComputer helper = analyzer.new BalancerCostComputer(serverHolderList, segment);
|
||||
|
||||
Pair<Double, ServerHolder> minPair = helper.getMinPair();
|
||||
|
|
|
@ -224,9 +224,10 @@ public class DruidMasterBalancerTest
|
|||
.withLoadManagementPeons(ImmutableMap.of("from", peon, "to", peon))
|
||||
.withAvailableSegments(segments.values())
|
||||
.withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
|
||||
.withBalancerCostAnalyzer(new BalancerCostAnalyzer(new DateTime("2013-01-01")))
|
||||
.build();
|
||||
|
||||
params = new DruidMasterBalancer(master, new BalancerCostAnalyzer(new DateTime("2013-01-01"))).run(params);
|
||||
params = new DruidMasterBalancer(master).run(params);
|
||||
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
|
||||
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("costChange").get("normal").get() > 0);
|
||||
}
|
||||
|
@ -373,9 +374,10 @@ public class DruidMasterBalancerTest
|
|||
.withLoadManagementPeons(ImmutableMap.of("1", peon, "2", peon, "3", peon, "4", peon))
|
||||
.withAvailableSegments(segments.values())
|
||||
.withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
|
||||
.withBalancerCostAnalyzer(new BalancerCostAnalyzer(new DateTime("2013-01-01")))
|
||||
.build();
|
||||
|
||||
params = new DruidMasterBalancer(master, new BalancerCostAnalyzer(new DateTime("2013-01-01"))).run(params);
|
||||
params = new DruidMasterBalancer(master).run(params);
|
||||
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
|
||||
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("costChange").get("normal").get() > 0);
|
||||
}
|
||||
|
|
|
@ -176,6 +176,7 @@ public class DruidMasterRuleRunnerTest
|
|||
.withDatabaseRuleManager(databaseRuleManager)
|
||||
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
|
||||
.withMaxSegmentsToMove(5)
|
||||
.withBalancerCostAnalyzer(new BalancerCostAnalyzer(new DateTime("2013-01-01")))
|
||||
.build();
|
||||
|
||||
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
|
||||
|
@ -265,6 +266,7 @@ public class DruidMasterRuleRunnerTest
|
|||
.withAvailableSegments(availableSegments)
|
||||
.withDatabaseRuleManager(databaseRuleManager)
|
||||
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
|
||||
.withBalancerCostAnalyzer(new BalancerCostAnalyzer(new DateTime("2013-01-01")))
|
||||
.build();
|
||||
|
||||
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
|
||||
|
@ -350,6 +352,7 @@ public class DruidMasterRuleRunnerTest
|
|||
.withAvailableSegments(availableSegments)
|
||||
.withDatabaseRuleManager(databaseRuleManager)
|
||||
.withSegmentReplicantLookup(segmentReplicantLookup)
|
||||
.withBalancerCostAnalyzer(new BalancerCostAnalyzer(new DateTime("2013-01-01")))
|
||||
.build();
|
||||
|
||||
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
|
||||
|
@ -411,6 +414,7 @@ public class DruidMasterRuleRunnerTest
|
|||
.withAvailableSegments(availableSegments)
|
||||
.withDatabaseRuleManager(databaseRuleManager)
|
||||
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
|
||||
.withBalancerCostAnalyzer(new BalancerCostAnalyzer(new DateTime("2013-01-01")))
|
||||
.build();
|
||||
|
||||
|
||||
|
@ -675,6 +679,7 @@ public class DruidMasterRuleRunnerTest
|
|||
.withAvailableSegments(availableSegments)
|
||||
.withDatabaseRuleManager(databaseRuleManager)
|
||||
.withSegmentReplicantLookup(segmentReplicantLookup)
|
||||
.withBalancerCostAnalyzer(new BalancerCostAnalyzer(new DateTime("2013-01-01")))
|
||||
.build();
|
||||
|
||||
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
|
||||
|
@ -751,6 +756,7 @@ public class DruidMasterRuleRunnerTest
|
|||
.withAvailableSegments(availableSegments)
|
||||
.withDatabaseRuleManager(databaseRuleManager)
|
||||
.withSegmentReplicantLookup(segmentReplicantLookup)
|
||||
.withBalancerCostAnalyzer(new BalancerCostAnalyzer(new DateTime("2013-01-01")))
|
||||
.build();
|
||||
|
||||
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
|
||||
|
|
Loading…
Reference in New Issue