From 848d49ca87b72935f7c4f72923bec52e9d8aea60 Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Mon, 4 Mar 2013 13:03:15 -0800 Subject: [PATCH] fix bugs in balancing logic and removing auto-scaled node --- .../merger/coordinator/RemoteTaskRunner.java | 2 - .../SimpleResourceManagementStrategy.java | 2 +- .../druid/master/BalancerCostAnalyzer.java | 74 +++++++++---------- .../druid/master/DruidMasterBalancer.java | 38 ++++------ .../metamx/druid/master/rules/LoadRule.java | 2 +- 5 files changed, 54 insertions(+), 64 deletions(-) diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index 739a910473f..e4a13136152 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -457,8 +457,6 @@ public class RemoteTaskRunner implements TaskRunner if (runningTasks.containsKey(taskId)) { log.info("Task %s just disappeared!", taskId); retryTask(runningTasks.get(taskId), worker.getHost()); - } else { - log.info("A task disappeared I didn't know about: %s", taskId); } } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java index 1cd3f8e46e6..74d34d718d2 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java @@ -167,7 +167,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat ) ); - if (thoseLazyWorkers.isEmpty()) { + if (thoseLazyWorkers.size() <= minNumWorkers) { return false; } diff --git a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java index 799bccca581..6519785e76b 100644 --- a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java +++ b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java @@ -52,11 +52,12 @@ public class BalancerCostAnalyzer /** * Calculates the cost normalization. This is such that the normalized cost is lower bounded * by 1 (e.g. when each segment gets its own compute node). - * @param serverHolders - * A list of ServerHolders for a particular tier. - * @return The normalization value (the sum of the diagonal entries in the - * pairwise cost matrix). This is the cost of a cluster if each - * segment were to get its own compute node. + * + * @param serverHolders A list of ServerHolders for a particular tier. + * + * @return The normalization value (the sum of the diagonal entries in the + * pairwise cost matrix). This is the cost of a cluster if each + * segment were to get its own compute node. */ public double calculateNormalization(final List serverHolders) { @@ -71,9 +72,10 @@ public class BalancerCostAnalyzer /** * Calculates the initial cost of the Druid segment configuration. - * @param serverHolders - * A list of ServerHolders for a particular tier. - * @return The initial cost of the Druid tier. + * + * @param serverHolders A list of ServerHolders for a particular tier. + * + * @return The initial cost of the Druid tier. */ public double calculateInitialTotalCost(final List serverHolders) { @@ -96,11 +98,11 @@ public class BalancerCostAnalyzer * dataSourcePenalty: if two segments belong to the same data source, they are more likely to be involved * in the same queries * gapPenalty: it is more likely that segments close together in time will be queried together - * @param segment1 - * The first DataSegment. - * @param segment2 - * The second DataSegment. - * @return The joint cost of placing the two DataSegments together on one node. + * + * @param segment1 The first DataSegment. + * @param segment2 The second DataSegment. + * + * @return The joint cost of placing the two DataSegments together on one node. */ public double computeJointSegmentCosts(final DataSegment segment1, final DataSegment segment2) { @@ -140,12 +142,12 @@ public class BalancerCostAnalyzer /** * Sample from each server with probability proportional to the number of segments on that server. - * @param serverHolders - * A list of ServerHolders for a particular tier. - * @param numSegments - - * @return A ServerHolder sampled with probability proportional to the - * number of segments on that server + * + * @param serverHolders A list of ServerHolders for a particular tier. + * @param numSegments + * + * @return A ServerHolder sampled with probability proportional to the + * number of segments on that server */ private ServerHolder sampleServer(final List serverHolders, final int numSegments) { @@ -163,11 +165,11 @@ public class BalancerCostAnalyzer /** * The balancing application requires us to pick a proposal segment. - * @param serverHolders - * A list of ServerHolders for a particular tier. - * @param numSegments - * The total number of segments on a particular tier. - * @return A BalancerSegmentHolder sampled uniformly at random. + * + * @param serverHolders A list of ServerHolders for a particular tier. + * @param numSegments The total number of segments on a particular tier. + * + * @return A BalancerSegmentHolder sampled uniformly at random. */ public BalancerSegmentHolder pickSegmentToMove(final List serverHolders, final int numSegments) { @@ -175,7 +177,7 @@ public class BalancerCostAnalyzer ServerHolder fromServerHolder = sampleServer(serverHolders, numSegments); /** and actually pick that segment uniformly at random w.p. 1 / numSegmentsOnServer - so that the probability of picking a segment is 1 / totalSegments. */ + so that the probability of picking a segment is 1 / totalSegments. */ List segments = Lists.newArrayList(fromServerHolder.getServer().getSegments().values()); DataSegment proposalSegment = segments.get(rand.nextInt(segments.size())); @@ -184,18 +186,15 @@ public class BalancerCostAnalyzer /** * The assignment application requires us to supply a proposal segment. - * @param proposalSegment - * A DataSegment that we are proposing to move. - * @param serverHolders - * An iterable of ServerHolders for a particular tier. - * @param assign - * A boolean that is true if used in assignment else false in balancing. - * @return A ServerHolder with the new home for a segment. + * + * @param proposalSegment A DataSegment that we are proposing to move. + * @param serverHolders An iterable of ServerHolders for a particular tier. + * + * @return A ServerHolder with the new home for a segment. */ public ServerHolder findNewSegmentHome( final DataSegment proposalSegment, - final Iterable serverHolders, - final boolean assign + final Iterable serverHolders ) { final long proposalSegmentSize = proposalSegment.getSize(); @@ -204,10 +203,9 @@ public class BalancerCostAnalyzer for (ServerHolder server : serverHolders) { /** Don't calculate cost if the server doesn't have enough space or is loading the segment */ - if (proposalSegmentSize > server.getAvailableSize() - || server.isLoadingSegment(proposalSegment) - /** or if the ask is assignment and the server is serving the segment. */ - || (assign && server.isServingSegment(proposalSegment)) ) { + if (proposalSegmentSize > server.getAvailableSize() || + server.isLoadingSegment(proposalSegment) || + server.isServingSegment(proposalSegment)) { continue; } diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java index 3d66a21f9d7..7678e85624d 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java @@ -19,6 +19,7 @@ package com.metamx.druid.master; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.MinMaxPriorityQueue; import com.metamx.common.guava.Comparators; @@ -66,9 +67,9 @@ public class DruidMasterBalancer implements DruidMasterHelper holder.reduceLifetime(); if (holder.getLifetime() <= 0) { log.makeAlert("[%s]: Balancer move segments queue has a segment stuck", tier) - .addData("segment", holder.getSegment().getIdentifier()) - .addData("server", holder.getFromServer().getStringProps()) - .emit(); + .addData("segment", holder.getSegment().getIdentifier()) + .addData("server", holder.getFromServer().getStringProps()) + .emit(); } } } @@ -95,7 +96,13 @@ public class DruidMasterBalancer implements DruidMasterHelper continue; } - final List serverHolderList = new ArrayList(entry.getValue()); + final List serverHolderList = Lists.newArrayList(entry.getValue()); + + if (serverHolderList.size() <= 1) { + log.info("[%s]: One or fewer servers found. Cannot balance.", tier); + continue; + } + int numSegments = 0; for (ServerHolder server : serverHolderList) { numSegments += server.getServer().getSegments().size(); @@ -107,19 +114,14 @@ public class DruidMasterBalancer implements DruidMasterHelper } int iter = 0; - while (iter < maxSegmentsToMove) { iter++; final BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList, numSegments); - final ServerHolder holder = analyzer.findNewSegmentHome(segmentToMove.getSegment(), serverHolderList, false); + final ServerHolder holder = analyzer.findNewSegmentHome(segmentToMove.getSegment(), serverHolderList); if (holder == null) { continue; } - final DruidServer toServer = holder.getServer(); - - if (!currentlyMovingSegments.get(tier).containsKey(segmentToMove.getSegment())) { - moveSegment(segmentToMove, toServer, params); - } + moveSegment(segmentToMove, holder.getServer(), params); } final double initialTotalCost = analyzer.calculateInitialTotalCost(serverHolderList); @@ -132,21 +134,13 @@ public class DruidMasterBalancer implements DruidMasterHelper stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size()); log.info( - "Initial Total Cost: [%f], Normalization: [%f], Initial Normalized Cost: [%f], Segments Moved: [%d]", + "[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial Normalized Cost: [%f], Segments Moved: [%d]", + tier, initialTotalCost, normalization, normalizedInitialCost, currentlyMovingSegments.get(tier).size() ); - - if (serverHolderList.size() <= 1) { - log.info( - "[%s]: One or fewer servers found. Cannot balance.", - tier - ); - continue; - } - } return params.buildFromExisting() @@ -194,7 +188,7 @@ public class DruidMasterBalancer implements DruidMasterHelper log.makeAlert(e, String.format("[%s] : Moving exception", segmentName)).emit(); } } else { - currentlyMovingSegments.get(toServer.getTier()).remove(segment); + currentlyMovingSegments.get(toServer.getTier()).remove(segmentName); } } diff --git a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java index a59d9c795b5..4cd6acae640 100644 --- a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java +++ b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java @@ -91,7 +91,7 @@ public abstract class LoadRule implements Rule final MasterStats stats = new MasterStats(); while (totalReplicants < expectedReplicants) { - final ServerHolder holder = analyzer.findNewSegmentHome(segment, serverHolderList, true); + final ServerHolder holder = analyzer.findNewSegmentHome(segment, serverHolderList); if (holder == null) { log.warn(