diff --git a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java index 0c68ae8385b..94a55363bc8 100644 --- a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java +++ b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java @@ -25,6 +25,8 @@ import com.google.common.collect.Sets; import com.metamx.common.Pair; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; +import com.metamx.druid.client.DruidServer; +import org.joda.time.DateTime; import org.joda.time.Interval; import java.util.Comparator; @@ -36,23 +38,28 @@ import java.util.Set; /** * The BalancerCostAnalyzer will compute the total initial cost of the cluster, with costs defined in - * computeJointSegmentCosts. It will then propose to move randomly chosen segments from their respective - * initial servers to other servers, chosen greedily to minimize the cost of the cluster. + * computeJointSegmentCosts. It will then propose to move (pseudo-)randomly chosen segments from their + * respective initial servers to other servers, chosen greedily to minimize the cost of the cluster. */ public class BalancerCostAnalyzer { private static final Logger log = new Logger(BalancerCostAnalyzer.class); private static final int MAX_SEGMENTS_TO_MOVE = 5; private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24; + private static final int SEVEN_DAYS_IN_MILLIS = 7 * DAY_IN_MILLIS; + private static final int THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS; private List serverHolderList; private Random rand; + private DateTime referenceTimestamp; private double initialTotalCost; + private double normalization; private double totalCostChange; - public BalancerCostAnalyzer() + public BalancerCostAnalyzer(DateTime referenceTimestamp) { + this.referenceTimestamp = referenceTimestamp; rand = new Random(0); totalCostChange = 0; } @@ -60,6 +67,7 @@ public class BalancerCostAnalyzer public void init(List serverHolderList) { this.initialTotalCost = calculateInitialTotalCost(serverHolderList); + this.normalization = calculateNormalization(serverHolderList); this.serverHolderList = serverHolderList; } @@ -68,11 +76,38 @@ public class BalancerCostAnalyzer return initialTotalCost; } + public double getNormalization() + { + return normalization; + } + + public double getNormalizedInitialCost() + { + return initialTotalCost / normalization; + } + public double getTotalCostChange() { return totalCostChange; } + /* + * Calculates the cost normalization. This is such that the normalized cost is lower bounded + * by 1 (e.g. when each segment gets its own compute node). + */ + private double calculateNormalization(List serverHolderList) + { + double cost = 0; + for (ServerHolder server : serverHolderList) { + DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{}); + for (int i = 0; i < segments.length; ++i) { + cost += computeJointSegmentCosts(segments[i], segments[i]); + } + } + return cost; + } + + // Calculates the initial cost of the Druid segment configuration. private double calculateInitialTotalCost(List serverHolderList) { double cost = 0; @@ -87,28 +122,56 @@ public class BalancerCostAnalyzer return cost; } + /* + * This defines the unnormalized cost function between two segments. There is a base cost given by + * the minimum size of the two segments and additional penalties. + * recencyPenalty: it is more likely that recent segments will be queried together + * dataSourcePenalty: if two segments belong to the same data source, they are more likely to be involved + * in the same queries + * gapPenalty: it is more likely that segments close together in time will be queried together + */ public double computeJointSegmentCosts(DataSegment segment1, DataSegment segment2) { double cost = 0; Interval gap = segment1.getInterval().gap(segment2.getInterval()); + double baseCost = Math.min(segment1.getSize(), segment2.getSize()); + double recencyPenalty = 1; + double dataSourcePenalty = 1; + double gapPenalty = 1; + + if (segment1.getDataSource().equals(segment2.getDataSource())) { + dataSourcePenalty = 2; + } + + double maxDiff = Math.max( + referenceTimestamp.getMillis() - segment1.getInterval().getEndMillis(), + referenceTimestamp.getMillis() - segment2.getInterval().getEndMillis() + ); + if (maxDiff < SEVEN_DAYS_IN_MILLIS) { + recencyPenalty = 2 - maxDiff / SEVEN_DAYS_IN_MILLIS; + } + // gap is null if the two segment intervals overlap or if they're adjacent if (gap == null) { - cost += 1f; + gapPenalty = 2; } else { long gapMillis = gap.toDurationMillis(); - if (gapMillis < DAY_IN_MILLIS) { - cost += 1f; + if (gapMillis < THIRTY_DAYS_IN_MILLIS) { + gapPenalty = 2 - gapMillis / THIRTY_DAYS_IN_MILLIS; } } - if (segment1.getDataSource().equals(segment2.getDataSource())) { - cost += 1f; - } + cost = baseCost * recencyPenalty * dataSourcePenalty * gapPenalty; return cost; } + /* + * These could be anonymous in BalancerCostAnalyzerHelper + * Their purpose is to unify the balance/assignment code since a segment that has not yet been assigned + * does not have a source server. + */ public class NullServerHolder extends ServerHolder { public NullServerHolder() @@ -116,10 +179,24 @@ public class BalancerCostAnalyzer super(null, null); } - @Override - public boolean equals(Object o) + public class NullDruidServer extends DruidServer { - return false; + public NullDruidServer() + { + super(null, null, 0, null, null); + } + + @Override + public boolean equals(Object o) + { + return false; + } + } + + @Override + public DruidServer getServer() + { + return new NullDruidServer(); } } @@ -200,18 +277,20 @@ public class BalancerCostAnalyzer public void computeAllCosts() { + // The contribution to the total cost of a given server by proposing to move the segment to that server is... for (ServerHolder server : serverHolderList) { double cost = 0f; + // the sum of the costs of other (inclusive) segments on the server for (DataSegment segment : server.getServer().getSegments().values()) { cost += computeJointSegmentCosts(proposalSegment, segment); } - // self cost - if (!server.getServer().equals(fromServerHolder.getServer())) { + // plus the self cost if the proposed new server is different + if (!fromServerHolder.getServer().equals(server.getServer())) { cost += computeJointSegmentCosts(proposalSegment, proposalSegment); } - // Take into account costs of segments that will be moved. + // plus the costs of segments that will be moved. Iterator it = segmentHoldersToMove.iterator(); while (it.hasNext()) { BalancerSegmentHolder segmentToMove = (BalancerSegmentHolder) it.next(); @@ -223,10 +302,12 @@ public class BalancerCostAnalyzer } } - if (server.getServer().equals(fromServerHolder.getServer())) { + // currCost keeps track of the current cost for that server (so we can compute the cost change). + if (fromServerHolder.getServer().equals(server.getServer())) { currCost = cost; } + // Only enter the queue if the server has enough size. if (proposalSegment.getSize() < server.getAvailableSize()) { costsServerHolderPairs.add(Pair.of(cost, server)); } diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java index 4dfbb6018bb..ff01957d102 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java @@ -52,6 +52,7 @@ import com.metamx.phonebook.PhoneBookPeon; import com.netflix.curator.x.discovery.ServiceProvider; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.codehaus.jackson.map.ObjectMapper; +import org.joda.time.DateTime; import org.joda.time.Duration; import javax.annotation.Nullable; @@ -674,7 +675,7 @@ public class DruidMaster }, new DruidMasterRuleRunner(DruidMaster.this), new DruidMasterCleanup(DruidMaster.this), - new DruidMasterBalancer(DruidMaster.this, new BalancerCostAnalyzer()), + new DruidMasterBalancer(DruidMaster.this, new BalancerCostAnalyzer(DateTime.now())), new DruidMasterLogger() ) ); diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java index 4e0bfdb1db2..9a7445eb03a 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java @@ -111,6 +111,19 @@ public class DruidMasterBalancer implements DruidMasterHelper "Initial Total Cost: [%s]", analyzer.getInitialTotalCost() ); + + analyzer.init(serverHolderList); + log.info( + "Normalization: [%s]", + analyzer.getNormalization() + ); + + analyzer.init(serverHolderList); + log.info( + "Normalized Inital Cost: [%s]", + analyzer.getNormalizedInitialCost() + ); + moveSegments(analyzer.findSegmentsToMove(), params); stats.addToTieredStat("costChange", tier, (long) analyzer.getTotalCostChange()); diff --git a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java index 2d26be1745b..748f66f135d 100644 --- a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java +++ b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java @@ -31,6 +31,7 @@ import com.metamx.druid.master.LoadPeonCallback; import com.metamx.druid.master.MasterStats; import com.metamx.druid.master.ServerHolder; import com.metamx.emitter.EmittingLogger; +import org.joda.time.DateTime; import java.util.ArrayList; import java.util.List; @@ -77,7 +78,7 @@ public abstract class LoadRule implements Rule List assignedServers = Lists.newArrayList(); while (totalReplicants < expectedReplicants) { - BalancerCostAnalyzer analyzer = new BalancerCostAnalyzer(); + BalancerCostAnalyzer analyzer = new BalancerCostAnalyzer(DateTime.now()); BalancerCostAnalyzer.BalancerCostAnalyzerHelper helper = analyzer.new BalancerCostAnalyzerHelper(serverHolderList, segment); helper.computeAllCosts(); Pair minPair = helper.getCostsServerHolderPairs().pollFirst(); diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java index ba1b71c8316..777b4063a10 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java @@ -27,6 +27,7 @@ import com.metamx.druid.client.DruidDataSource; import com.metamx.druid.client.DruidServer; import junit.framework.Assert; import org.easymock.EasyMock; +import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.After; import org.junit.Before; @@ -182,7 +183,7 @@ public class DruidMasterBalancerTest .withLoadManagementPeons(ImmutableMap.of("from", peon, "to", peon)) .build(); - params = new DruidMasterBalancer(master, new BalancerCostAnalyzer()).run(params); + params = new DruidMasterBalancer(master, new BalancerCostAnalyzer(new DateTime("2013-01-01"))).run(params); Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0); Assert.assertTrue(params.getMasterStats().getPerTierStats().get("costChange").get("normal").get() > 0); }