This commit is contained in:
Nelson Ray 2013-01-04 11:20:06 -08:00
parent 117cac2795
commit 402ddbf261
5 changed files with 116 additions and 19 deletions

View File

@ -25,6 +25,8 @@ import com.google.common.collect.Sets;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer;
import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.util.Comparator; import java.util.Comparator;
@ -36,23 +38,28 @@ import java.util.Set;
/** /**
* The BalancerCostAnalyzer will compute the total initial cost of the cluster, with costs defined in * The BalancerCostAnalyzer will compute the total initial cost of the cluster, with costs defined in
* computeJointSegmentCosts. It will then propose to move randomly chosen segments from their respective * computeJointSegmentCosts. It will then propose to move (pseudo-)randomly chosen segments from their
* initial servers to other servers, chosen greedily to minimize the cost of the cluster. * respective initial servers to other servers, chosen greedily to minimize the cost of the cluster.
*/ */
public class BalancerCostAnalyzer public class BalancerCostAnalyzer
{ {
private static final Logger log = new Logger(BalancerCostAnalyzer.class); private static final Logger log = new Logger(BalancerCostAnalyzer.class);
private static final int MAX_SEGMENTS_TO_MOVE = 5; private static final int MAX_SEGMENTS_TO_MOVE = 5;
private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24; private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24;
private static final int SEVEN_DAYS_IN_MILLIS = 7 * DAY_IN_MILLIS;
private static final int THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS;
private List<ServerHolder> serverHolderList; private List<ServerHolder> serverHolderList;
private Random rand; private Random rand;
private DateTime referenceTimestamp;
private double initialTotalCost; private double initialTotalCost;
private double normalization;
private double totalCostChange; private double totalCostChange;
public BalancerCostAnalyzer() public BalancerCostAnalyzer(DateTime referenceTimestamp)
{ {
this.referenceTimestamp = referenceTimestamp;
rand = new Random(0); rand = new Random(0);
totalCostChange = 0; totalCostChange = 0;
} }
@ -60,6 +67,7 @@ public class BalancerCostAnalyzer
public void init(List<ServerHolder> serverHolderList) public void init(List<ServerHolder> serverHolderList)
{ {
this.initialTotalCost = calculateInitialTotalCost(serverHolderList); this.initialTotalCost = calculateInitialTotalCost(serverHolderList);
this.normalization = calculateNormalization(serverHolderList);
this.serverHolderList = serverHolderList; this.serverHolderList = serverHolderList;
} }
@ -68,11 +76,38 @@ public class BalancerCostAnalyzer
return initialTotalCost; return initialTotalCost;
} }
public double getNormalization()
{
return normalization;
}
public double getNormalizedInitialCost()
{
return initialTotalCost / normalization;
}
public double getTotalCostChange() public double getTotalCostChange()
{ {
return totalCostChange; return totalCostChange;
} }
/*
* Calculates the cost normalization. This is such that the normalized cost is lower bounded
* by 1 (e.g. when each segment gets its own compute node).
*/
private double calculateNormalization(List<ServerHolder> serverHolderList)
{
double cost = 0;
for (ServerHolder server : serverHolderList) {
DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{});
for (int i = 0; i < segments.length; ++i) {
cost += computeJointSegmentCosts(segments[i], segments[i]);
}
}
return cost;
}
// Calculates the initial cost of the Druid segment configuration.
private double calculateInitialTotalCost(List<ServerHolder> serverHolderList) private double calculateInitialTotalCost(List<ServerHolder> serverHolderList)
{ {
double cost = 0; double cost = 0;
@ -87,28 +122,56 @@ public class BalancerCostAnalyzer
return cost; return cost;
} }
/*
* This defines the unnormalized cost function between two segments. There is a base cost given by
* the minimum size of the two segments and additional penalties.
* recencyPenalty: it is more likely that recent segments will be queried together
* dataSourcePenalty: if two segments belong to the same data source, they are more likely to be involved
* in the same queries
* gapPenalty: it is more likely that segments close together in time will be queried together
*/
public double computeJointSegmentCosts(DataSegment segment1, DataSegment segment2) public double computeJointSegmentCosts(DataSegment segment1, DataSegment segment2)
{ {
double cost = 0; double cost = 0;
Interval gap = segment1.getInterval().gap(segment2.getInterval()); Interval gap = segment1.getInterval().gap(segment2.getInterval());
double baseCost = Math.min(segment1.getSize(), segment2.getSize());
double recencyPenalty = 1;
double dataSourcePenalty = 1;
double gapPenalty = 1;
if (segment1.getDataSource().equals(segment2.getDataSource())) {
dataSourcePenalty = 2;
}
double maxDiff = Math.max(
referenceTimestamp.getMillis() - segment1.getInterval().getEndMillis(),
referenceTimestamp.getMillis() - segment2.getInterval().getEndMillis()
);
if (maxDiff < SEVEN_DAYS_IN_MILLIS) {
recencyPenalty = 2 - maxDiff / SEVEN_DAYS_IN_MILLIS;
}
// gap is null if the two segment intervals overlap or if they're adjacent // gap is null if the two segment intervals overlap or if they're adjacent
if (gap == null) { if (gap == null) {
cost += 1f; gapPenalty = 2;
} else { } else {
long gapMillis = gap.toDurationMillis(); long gapMillis = gap.toDurationMillis();
if (gapMillis < DAY_IN_MILLIS) { if (gapMillis < THIRTY_DAYS_IN_MILLIS) {
cost += 1f; gapPenalty = 2 - gapMillis / THIRTY_DAYS_IN_MILLIS;
} }
} }
if (segment1.getDataSource().equals(segment2.getDataSource())) { cost = baseCost * recencyPenalty * dataSourcePenalty * gapPenalty;
cost += 1f;
}
return cost; return cost;
} }
/*
* These could be anonymous in BalancerCostAnalyzerHelper
* Their purpose is to unify the balance/assignment code since a segment that has not yet been assigned
* does not have a source server.
*/
public class NullServerHolder extends ServerHolder public class NullServerHolder extends ServerHolder
{ {
public NullServerHolder() public NullServerHolder()
@ -116,10 +179,24 @@ public class BalancerCostAnalyzer
super(null, null); super(null, null);
} }
@Override public class NullDruidServer extends DruidServer
public boolean equals(Object o)
{ {
return false; public NullDruidServer()
{
super(null, null, 0, null, null);
}
@Override
public boolean equals(Object o)
{
return false;
}
}
@Override
public DruidServer getServer()
{
return new NullDruidServer();
} }
} }
@ -200,18 +277,20 @@ public class BalancerCostAnalyzer
public void computeAllCosts() public void computeAllCosts()
{ {
// The contribution to the total cost of a given server by proposing to move the segment to that server is...
for (ServerHolder server : serverHolderList) { for (ServerHolder server : serverHolderList) {
double cost = 0f; double cost = 0f;
// the sum of the costs of other (inclusive) segments on the server
for (DataSegment segment : server.getServer().getSegments().values()) { for (DataSegment segment : server.getServer().getSegments().values()) {
cost += computeJointSegmentCosts(proposalSegment, segment); cost += computeJointSegmentCosts(proposalSegment, segment);
} }
// self cost // plus the self cost if the proposed new server is different
if (!server.getServer().equals(fromServerHolder.getServer())) { if (!fromServerHolder.getServer().equals(server.getServer())) {
cost += computeJointSegmentCosts(proposalSegment, proposalSegment); cost += computeJointSegmentCosts(proposalSegment, proposalSegment);
} }
// Take into account costs of segments that will be moved. // plus the costs of segments that will be moved.
Iterator it = segmentHoldersToMove.iterator(); Iterator it = segmentHoldersToMove.iterator();
while (it.hasNext()) { while (it.hasNext()) {
BalancerSegmentHolder segmentToMove = (BalancerSegmentHolder) it.next(); BalancerSegmentHolder segmentToMove = (BalancerSegmentHolder) it.next();
@ -223,10 +302,12 @@ public class BalancerCostAnalyzer
} }
} }
if (server.getServer().equals(fromServerHolder.getServer())) { // currCost keeps track of the current cost for that server (so we can compute the cost change).
if (fromServerHolder.getServer().equals(server.getServer())) {
currCost = cost; currCost = cost;
} }
// Only enter the queue if the server has enough size.
if (proposalSegment.getSize() < server.getAvailableSize()) { if (proposalSegment.getSize() < server.getAvailableSize()) {
costsServerHolderPairs.add(Pair.of(cost, server)); costsServerHolderPairs.add(Pair.of(cost, server));
} }

View File

@ -52,6 +52,7 @@ import com.metamx.phonebook.PhoneBookPeon;
import com.netflix.curator.x.discovery.ServiceProvider; import com.netflix.curator.x.discovery.ServiceProvider;
import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.DateTime;
import org.joda.time.Duration; import org.joda.time.Duration;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -674,7 +675,7 @@ public class DruidMaster
}, },
new DruidMasterRuleRunner(DruidMaster.this), new DruidMasterRuleRunner(DruidMaster.this),
new DruidMasterCleanup(DruidMaster.this), new DruidMasterCleanup(DruidMaster.this),
new DruidMasterBalancer(DruidMaster.this, new BalancerCostAnalyzer()), new DruidMasterBalancer(DruidMaster.this, new BalancerCostAnalyzer(DateTime.now())),
new DruidMasterLogger() new DruidMasterLogger()
) )
); );

View File

@ -111,6 +111,19 @@ public class DruidMasterBalancer implements DruidMasterHelper
"Initial Total Cost: [%s]", "Initial Total Cost: [%s]",
analyzer.getInitialTotalCost() analyzer.getInitialTotalCost()
); );
analyzer.init(serverHolderList);
log.info(
"Normalization: [%s]",
analyzer.getNormalization()
);
analyzer.init(serverHolderList);
log.info(
"Normalized Inital Cost: [%s]",
analyzer.getNormalizedInitialCost()
);
moveSegments(analyzer.findSegmentsToMove(), params); moveSegments(analyzer.findSegmentsToMove(), params);
stats.addToTieredStat("costChange", tier, (long) analyzer.getTotalCostChange()); stats.addToTieredStat("costChange", tier, (long) analyzer.getTotalCostChange());

View File

@ -31,6 +31,7 @@ import com.metamx.druid.master.LoadPeonCallback;
import com.metamx.druid.master.MasterStats; import com.metamx.druid.master.MasterStats;
import com.metamx.druid.master.ServerHolder; import com.metamx.druid.master.ServerHolder;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import org.joda.time.DateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -77,7 +78,7 @@ public abstract class LoadRule implements Rule
List<ServerHolder> assignedServers = Lists.newArrayList(); List<ServerHolder> assignedServers = Lists.newArrayList();
while (totalReplicants < expectedReplicants) { while (totalReplicants < expectedReplicants) {
BalancerCostAnalyzer analyzer = new BalancerCostAnalyzer(); BalancerCostAnalyzer analyzer = new BalancerCostAnalyzer(DateTime.now());
BalancerCostAnalyzer.BalancerCostAnalyzerHelper helper = analyzer.new BalancerCostAnalyzerHelper(serverHolderList, segment); BalancerCostAnalyzer.BalancerCostAnalyzerHelper helper = analyzer.new BalancerCostAnalyzerHelper(serverHolderList, segment);
helper.computeAllCosts(); helper.computeAllCosts();
Pair<Double, ServerHolder> minPair = helper.getCostsServerHolderPairs().pollFirst(); Pair<Double, ServerHolder> minPair = helper.getCostsServerHolderPairs().pollFirst();

View File

@ -27,6 +27,7 @@ import com.metamx.druid.client.DruidDataSource;
import com.metamx.druid.client.DruidServer; import com.metamx.druid.client.DruidServer;
import junit.framework.Assert; import junit.framework.Assert;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -182,7 +183,7 @@ public class DruidMasterBalancerTest
.withLoadManagementPeons(ImmutableMap.of("from", peon, "to", peon)) .withLoadManagementPeons(ImmutableMap.of("from", peon, "to", peon))
.build(); .build();
params = new DruidMasterBalancer(master, new BalancerCostAnalyzer()).run(params); params = new DruidMasterBalancer(master, new BalancerCostAnalyzer(new DateTime("2013-01-01"))).run(params);
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0); Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("costChange").get("normal").get() > 0); Assert.assertTrue(params.getMasterStats().getPerTierStats().get("costChange").get("normal").get() > 0);
} }