mirror of https://github.com/apache/druid.git
added strategy pattern and RandomBalancerStrategy
This commit is contained in:
parent
a03dcc6429
commit
8ee71c8ceb
|
@ -0,0 +1,19 @@
|
|||
package com.metamx.druid.master;
|
||||
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
public abstract class BalancerStrategy
|
||||
{
|
||||
protected final DateTime referenceTimestamp;
|
||||
protected final Random rand;
|
||||
public BalancerStrategy(DateTime referenceTimestamp){
|
||||
this.referenceTimestamp=referenceTimestamp;
|
||||
rand=new Random(0);
|
||||
}
|
||||
public abstract ServerHolder findNewSegmentHome(final DataSegment proposalSegment,final Iterable<ServerHolder> serverHolders);
|
||||
public abstract BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders);
|
||||
}
|
|
@ -1,95 +1,91 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.master;
|
||||
|
||||
import com.google.common.collect.MinMaxPriorityQueue;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
/**
|
||||
* The BalancerCostAnalyzer will compute the total initial cost of the cluster, with costs defined in
|
||||
* computeJointSegmentCosts. It will then propose to move (pseudo-)randomly chosen segments from their
|
||||
* respective initial servers to other servers, chosen greedily to minimize the cost of the cluster.
|
||||
*/
|
||||
public class BalancerCostAnalyzer
|
||||
public class CostBalancerStrategy extends BalancerStrategy
|
||||
{
|
||||
private static final Logger log = new Logger(BalancerCostAnalyzer.class);
|
||||
private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24;
|
||||
private static final int SEVEN_DAYS_IN_MILLIS = 7 * DAY_IN_MILLIS;
|
||||
private static final int THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS;
|
||||
private final Random rand;
|
||||
private final DateTime referenceTimestamp;
|
||||
|
||||
public BalancerCostAnalyzer(DateTime referenceTimestamp)
|
||||
public CostBalancerStrategy(DateTime referenceTimeStamp)
|
||||
{
|
||||
this.referenceTimestamp = referenceTimestamp;
|
||||
rand = new Random(0);
|
||||
super(referenceTimeStamp);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates the cost normalization. This is such that the normalized cost is lower bounded
|
||||
* by 1 (e.g. when each segment gets its own compute node).
|
||||
*
|
||||
* @param serverHolders A list of ServerHolders for a particular tier.
|
||||
*
|
||||
* @return The normalization value (the sum of the diagonal entries in the
|
||||
* pairwise cost matrix). This is the cost of a cluster if each
|
||||
* segment were to get its own compute node.
|
||||
*/
|
||||
public double calculateNormalization(final List<ServerHolder> serverHolders)
|
||||
@Override
|
||||
public ServerHolder findNewSegmentHome(
|
||||
DataSegment proposalSegment, Iterable<ServerHolder> serverHolders
|
||||
)
|
||||
{
|
||||
double cost = 0;
|
||||
return computeCosts(proposalSegment, serverHolders).rhs;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* For assignment, we want to move to the lowest cost server that isn't already serving the segment.
|
||||
*
|
||||
* @param proposalSegment A DataSegment that we are proposing to move.
|
||||
* @param serverHolders An iterable of ServerHolders for a particular tier.
|
||||
*
|
||||
* @return A ServerHolder with the new home for a segment.
|
||||
*/
|
||||
|
||||
private Pair<Double, ServerHolder> computeCosts(
|
||||
final DataSegment proposalSegment,
|
||||
final Iterable<ServerHolder> serverHolders
|
||||
)
|
||||
{
|
||||
|
||||
Pair<Double,ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY,null);
|
||||
MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsAndServers = MinMaxPriorityQueue.orderedBy(
|
||||
new Comparator<Pair<Double, ServerHolder>>()
|
||||
{
|
||||
@Override
|
||||
public int compare(
|
||||
Pair<Double, ServerHolder> o,
|
||||
Pair<Double, ServerHolder> o1
|
||||
)
|
||||
{
|
||||
return Double.compare(o.lhs, o1.lhs);
|
||||
}
|
||||
}
|
||||
).create();
|
||||
|
||||
final long proposalSegmentSize = proposalSegment.getSize();
|
||||
|
||||
for (ServerHolder server : serverHolders) {
|
||||
for (DataSegment segment : server.getServer().getSegments().values()) {
|
||||
cost += computeJointSegmentCosts(segment, segment);
|
||||
/** Don't calculate cost if the server doesn't have enough space or is loading the segment */
|
||||
if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
return cost;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates the initial cost of the Druid segment configuration.
|
||||
*
|
||||
* @param serverHolders A list of ServerHolders for a particular tier.
|
||||
*
|
||||
* @return The initial cost of the Druid tier.
|
||||
*/
|
||||
public double calculateInitialTotalCost(final List<ServerHolder> serverHolders)
|
||||
{
|
||||
double cost = 0;
|
||||
for (ServerHolder server : serverHolders) {
|
||||
DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{});
|
||||
for (int i = 0; i < segments.length; ++i) {
|
||||
for (int j = i; j < segments.length; ++j) {
|
||||
cost += computeJointSegmentCosts(segments[i], segments[j]);
|
||||
/** The contribution to the total cost of a given server by proposing to move the segment to that server is... */
|
||||
double cost = 0f;
|
||||
/** the sum of the costs of other (exclusive of the proposalSegment) segments on the server */
|
||||
for (DataSegment segment : server.getServer().getSegments().values()) {
|
||||
if (!proposalSegment.equals(segment)) {
|
||||
cost += computeJointSegmentCosts(proposalSegment, segment);
|
||||
}
|
||||
}
|
||||
/** plus the costs of segments that will be loaded */
|
||||
for (DataSegment segment : server.getPeon().getSegmentsToLoad()) {
|
||||
cost += computeJointSegmentCosts(proposalSegment, segment);
|
||||
}
|
||||
|
||||
if (cost<bestServer.lhs && !server.isServingSegment(proposalSegment)){
|
||||
bestServer= Pair.of(cost,server);
|
||||
}
|
||||
}
|
||||
return cost;
|
||||
|
||||
return bestServer;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -141,107 +137,11 @@ public class BalancerCostAnalyzer
|
|||
return cost;
|
||||
}
|
||||
|
||||
/**
|
||||
* The balancing application requires us to pick a proposal segment uniformly at random from the set of
|
||||
* all servers. We use reservoir sampling to do this.
|
||||
*
|
||||
* @param serverHolders A list of ServerHolders for a particular tier.
|
||||
*
|
||||
* @return A BalancerSegmentHolder sampled uniformly at random.
|
||||
*/
|
||||
public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders)
|
||||
{
|
||||
ServerHolder fromServerHolder = null;
|
||||
DataSegment proposalSegment = null;
|
||||
int numSoFar = 0;
|
||||
|
||||
for (ServerHolder server : serverHolders) {
|
||||
for (DataSegment segment : server.getServer().getSegments().values()) {
|
||||
int randNum = rand.nextInt(numSoFar + 1);
|
||||
// w.p. 1 / (numSoFar + 1), swap out the server and segment
|
||||
if (randNum == numSoFar) {
|
||||
fromServerHolder = server;
|
||||
proposalSegment = segment;
|
||||
numSoFar++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return new BalancerSegmentHolder(fromServerHolder.getServer(), proposalSegment);
|
||||
ReservoirSegmentSampler sampler = new ReservoirSegmentSampler();
|
||||
return sampler.getRandomBalancerSegmentHolder(serverHolders);
|
||||
}
|
||||
|
||||
/**
|
||||
* For balancing, we want to only make a move if the minimum cost server is not already serving the segment.
|
||||
*
|
||||
* @param proposalSegment A DataSegment that we are proposing to move.
|
||||
* @param serverHolders An iterable of ServerHolders for a particular tier.
|
||||
*
|
||||
* @return A ServerHolder with the new home for a segment.
|
||||
*/
|
||||
public ServerHolder findNewSegmentHome(
|
||||
final DataSegment proposalSegment,
|
||||
final Iterable<ServerHolder> serverHolders
|
||||
)
|
||||
{
|
||||
return computeCosts(proposalSegment, serverHolders).rhs;
|
||||
}
|
||||
|
||||
/**
|
||||
* For assignment, we want to move to the lowest cost server that isn't already serving the segment.
|
||||
*
|
||||
* @param proposalSegment A DataSegment that we are proposing to move.
|
||||
* @param serverHolders An iterable of ServerHolders for a particular tier.
|
||||
*
|
||||
* @return A ServerHolder with the new home for a segment.
|
||||
*/
|
||||
|
||||
private Pair<Double, ServerHolder> computeCosts(
|
||||
final DataSegment proposalSegment,
|
||||
final Iterable<ServerHolder> serverHolders
|
||||
)
|
||||
{
|
||||
Pair<Double,ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY,null);
|
||||
MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsAndServers = MinMaxPriorityQueue.orderedBy(
|
||||
new Comparator<Pair<Double, ServerHolder>>()
|
||||
{
|
||||
@Override
|
||||
public int compare(
|
||||
Pair<Double, ServerHolder> o,
|
||||
Pair<Double, ServerHolder> o1
|
||||
)
|
||||
{
|
||||
return Double.compare(o.lhs, o1.lhs);
|
||||
}
|
||||
}
|
||||
).create();
|
||||
|
||||
final long proposalSegmentSize = proposalSegment.getSize();
|
||||
|
||||
for (ServerHolder server : serverHolders) {
|
||||
/** Don't calculate cost if the server doesn't have enough space or is loading the segment */
|
||||
if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
/** The contribution to the total cost of a given server by proposing to move the segment to that server is... */
|
||||
double cost = 0f;
|
||||
/** the sum of the costs of other (exclusive of the proposalSegment) segments on the server */
|
||||
for (DataSegment segment : server.getServer().getSegments().values()) {
|
||||
if (!proposalSegment.equals(segment)) {
|
||||
cost += computeJointSegmentCosts(proposalSegment, segment);
|
||||
}
|
||||
}
|
||||
/** plus the costs of segments that will be loaded */
|
||||
for (DataSegment segment : server.getPeon().getSegmentsToLoad()) {
|
||||
cost += computeJointSegmentCosts(proposalSegment, segment);
|
||||
}
|
||||
|
||||
if (cost<bestServer.lhs && !server.isServingSegment(proposalSegment)){
|
||||
bestServer= Pair.of(cost,server);
|
||||
}
|
||||
}
|
||||
|
||||
return bestServer;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -78,7 +78,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
|||
{
|
||||
final MasterStats stats = new MasterStats();
|
||||
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
|
||||
final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp);
|
||||
final BalancerStrategy strategy = params.getBalancerStrategy(referenceTimestamp);
|
||||
final int maxSegmentsToMove = params.getMaxSegmentsToMove();
|
||||
|
||||
for (Map.Entry<String, MinMaxPriorityQueue<ServerHolder>> entry :
|
||||
|
@ -113,10 +113,10 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
|||
}
|
||||
|
||||
for (int iter = 0; iter < maxSegmentsToMove; iter++) {
|
||||
final BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList);
|
||||
final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(serverHolderList);
|
||||
|
||||
if (params.getAvailableSegments().contains(segmentToMove.getSegment())) {
|
||||
final ServerHolder holder = analyzer.findNewSegmentHome(segmentToMove.getSegment(), serverHolderList);
|
||||
final ServerHolder holder = strategy.findNewSegmentHome(segmentToMove.getSegment(), serverHolderList);
|
||||
|
||||
if (holder != null) {
|
||||
moveSegment(segmentToMove, holder.getServer(), params);
|
||||
|
|
|
@ -163,9 +163,9 @@ public class DruidMasterRuntimeParams
|
|||
return balancerReferenceTimestamp;
|
||||
}
|
||||
|
||||
public BalancerCostAnalyzer getBalancerCostAnalyzer(DateTime referenceTimestamp)
|
||||
public BalancerStrategy getBalancerStrategy(DateTime referenceTimestamp)
|
||||
{
|
||||
return new BalancerCostAnalyzer(referenceTimestamp);
|
||||
return new CostBalancerStrategy(referenceTimestamp);
|
||||
}
|
||||
|
||||
public boolean hasDeletionWaitTimeElapsed()
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
package com.metamx.druid.master;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class RandomBalancerStrategy extends BalancerStrategy
|
||||
{
|
||||
private final ReservoirSegmentSampler sampler = new ReservoirSegmentSampler();
|
||||
|
||||
public RandomBalancerStrategy(DateTime referenceTimestamp)
|
||||
{
|
||||
super(referenceTimestamp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServerHolder findNewSegmentHome(
|
||||
DataSegment proposalSegment, Iterable<ServerHolder> serverHolders
|
||||
)
|
||||
{
|
||||
return sampler.getRandomServerHolder(Lists.newArrayList(serverHolders));
|
||||
}
|
||||
|
||||
@Override
|
||||
public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders)
|
||||
{
|
||||
return sampler.getRandomBalancerSegmentHolder(serverHolders);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
package com.metamx.druid.master;
|
||||
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
public class ReservoirSegmentSampler
|
||||
{
|
||||
public BalancerSegmentHolder getRandomBalancerSegmentHolder(final List<ServerHolder> serverHolders)
|
||||
{
|
||||
Random rand = new Random(0);
|
||||
ServerHolder fromServerHolder = null;
|
||||
DataSegment proposalSegment = null;
|
||||
int numSoFar = 0;
|
||||
|
||||
for (ServerHolder server : serverHolders) {
|
||||
for (DataSegment segment : server.getServer().getSegments().values()) {
|
||||
int randNum = rand.nextInt(numSoFar + 1);
|
||||
// w.p. 1 / (numSoFar+1), swap out the server and segment
|
||||
if (randNum == numSoFar) {
|
||||
fromServerHolder = server;
|
||||
proposalSegment = segment;
|
||||
}
|
||||
numSoFar++;
|
||||
}
|
||||
}
|
||||
return new BalancerSegmentHolder(fromServerHolder.getServer(), proposalSegment);
|
||||
}
|
||||
|
||||
|
||||
public ServerHolder getRandomServerHolder(final List<ServerHolder> serverHolders)
|
||||
{
|
||||
ServerHolder fromServerHolder = null;
|
||||
Random rand = new Random(0);
|
||||
int numSoFar = 0;
|
||||
for (ServerHolder server : serverHolders) {
|
||||
int randNum = rand.nextInt(numSoFar + 1);
|
||||
|
||||
if(randNum==numSoFar) {
|
||||
fromServerHolder=server;
|
||||
}
|
||||
numSoFar++;
|
||||
}
|
||||
return fromServerHolder;
|
||||
}
|
||||
}
|
|
@ -22,7 +22,7 @@ package com.metamx.druid.master.rules;
|
|||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.MinMaxPriorityQueue;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.master.BalancerCostAnalyzer;
|
||||
import com.metamx.druid.master.BalancerStrategy;
|
||||
import com.metamx.druid.master.DruidMaster;
|
||||
import com.metamx.druid.master.DruidMasterRuntimeParams;
|
||||
import com.metamx.druid.master.LoadPeonCallback;
|
||||
|
@ -60,15 +60,14 @@ public abstract class LoadRule implements Rule
|
|||
|
||||
final List<ServerHolder> serverHolderList = new ArrayList<ServerHolder>(serverQueue);
|
||||
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
|
||||
final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp);
|
||||
|
||||
final BalancerStrategy strategy = params.getBalancerStrategy(referenceTimestamp);
|
||||
if (params.getAvailableSegments().contains(segment)) {
|
||||
stats.accumulate(
|
||||
assign(
|
||||
params.getReplicationManager(),
|
||||
expectedReplicants,
|
||||
totalReplicants,
|
||||
analyzer,
|
||||
strategy,
|
||||
serverHolderList,
|
||||
segment
|
||||
)
|
||||
|
@ -84,7 +83,7 @@ public abstract class LoadRule implements Rule
|
|||
final ReplicationThrottler replicationManager,
|
||||
final int expectedReplicants,
|
||||
int totalReplicants,
|
||||
final BalancerCostAnalyzer analyzer,
|
||||
final BalancerStrategy strategy,
|
||||
final List<ServerHolder> serverHolderList,
|
||||
final DataSegment segment
|
||||
)
|
||||
|
@ -98,7 +97,7 @@ public abstract class LoadRule implements Rule
|
|||
break;
|
||||
}
|
||||
|
||||
final ServerHolder holder = analyzer.findNewSegmentHome(segment, serverHolderList);
|
||||
final ServerHolder holder = strategy.findNewSegmentHome(segment, serverHolderList);
|
||||
|
||||
if (holder == null) {
|
||||
log.warn(
|
||||
|
|
Loading…
Reference in New Issue