From fcbac9619391d7ba51e088a7b41e218dd59ebe20 Mon Sep 17 00:00:00 2001 From: Nelson Ray Date: Tue, 12 Feb 2013 10:47:30 -0800 Subject: [PATCH] revert change to findNewSegmentHome: now return single ServerHolder --- .../druid/master/BalancerCostAnalyzer.java | 47 +++++++------------ .../druid/master/DruidMasterBalancer.java | 11 +++-- .../metamx/druid/master/rules/LoadRule.java | 17 +++---- .../druid/master/DruidMasterBalancerTest.java | 6 +++ 4 files changed, 39 insertions(+), 42 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java index cfd99c142c8..3a08c4a05f3 100644 --- a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java +++ b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java @@ -20,14 +20,11 @@ package com.metamx.druid.master; import com.google.common.collect.Lists; -import com.google.common.collect.MinMaxPriorityQueue; -import com.metamx.common.Pair; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import org.joda.time.DateTime; import org.joda.time.Interval; -import java.util.Comparator; import java.util.List; import java.util.Random; @@ -61,7 +58,7 @@ public class BalancerCostAnalyzer * pairwise cost matrix). This is the cost of a cluster if each * segment were to get its own compute node. */ - public double calculateNormalization(List serverHolders) + public double calculateNormalization(final List serverHolders) { double cost = 0; for (ServerHolder server : serverHolders) { @@ -78,7 +75,7 @@ public class BalancerCostAnalyzer * A list of ServerHolders for a particular tier. * @return The initial cost of the Druid tier. */ - public double calculateInitialTotalCost(List serverHolders) + public double calculateInitialTotalCost(final List serverHolders) { double cost = 0; for (ServerHolder server : serverHolders) { @@ -105,7 +102,7 @@ public class BalancerCostAnalyzer * The second DataSegment. * @return The joint cost of placing the two DataSegments together on one node. */ - public double computeJointSegmentCosts(DataSegment segment1, DataSegment segment2) + public double computeJointSegmentCosts(final DataSegment segment1, final DataSegment segment2) { final Interval gap = segment1.getInterval().gap(segment2.getInterval()); @@ -150,7 +147,7 @@ public class BalancerCostAnalyzer * @return A ServerHolder sampled with probability proportional to the * number of segments on that server */ - private ServerHolder sampleServer(List serverHolders, int numSegments) + private ServerHolder sampleServer(final List serverHolders, final int numSegments) { final int num = rand.nextInt(numSegments); int cumulativeSegments = 0; @@ -172,7 +169,7 @@ public class BalancerCostAnalyzer * The total number of segments on a particular tier. * @return A BalancerSegmentHolder sampled uniformly at random. */ - public BalancerSegmentHolder pickSegmentToMove(List serverHolders, int numSegments) + public BalancerSegmentHolder pickSegmentToMove(final List serverHolders, final int numSegments) { /** We want to sample from each server w.p. numSegmentsOnServer / totalSegments */ ServerHolder fromServerHolder = sampleServer(serverHolders, numSegments); @@ -191,30 +188,19 @@ public class BalancerCostAnalyzer * A DataSegment that we are proposing to move. * @param serverHolders * An iterable of ServerHolders for a particular tier. - * @return A MinMaxPriorityQueue of costs of putting the proposalSegment on the server and ServerHolders. + * @return A ServerHolder with the new home for a segment. */ - public MinMaxPriorityQueue> findNewSegmentHome(DataSegment proposalSegment, Iterable serverHolders) + public ServerHolder findNewSegmentHome(final DataSegment proposalSegment, final Iterable serverHolders) { - // Just need a regular priority queue for the min. element. - final MinMaxPriorityQueue> costServerPairs = MinMaxPriorityQueue.orderedBy( - new Comparator>() - { - @Override - public int compare( - Pair o, - Pair o1 - ) - { - return Double.compare(o.lhs, o1.lhs); - } - } - ).create(); - final long proposalSegmentSize = proposalSegment.getSize(); + double minCost = Double.MAX_VALUE; + ServerHolder toServer = null; for (ServerHolder server : serverHolders) { - /** Only calculate costs if the server has enough space. */ - if (proposalSegmentSize > server.getAvailableSize()) { + /** Don't calculate cost if the server doesn't have enough space or is serving/loading the segment. */ + if (proposalSegmentSize > server.getAvailableSize() + || server.isServingSegment(proposalSegment) + || server.isLoadingSegment(proposalSegment)) { break; } @@ -231,10 +217,13 @@ public class BalancerCostAnalyzer cost += computeJointSegmentCosts(proposalSegment, segment); } - costServerPairs.add(Pair.of(cost, server)); + if (cost < minCost) { + minCost = cost; + toServer = server; + } } - return costServerPairs; + return toServer; } } diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java index d766f2d663f..d82ada0b207 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java @@ -109,12 +109,17 @@ public class DruidMasterBalancer implements DruidMasterHelper int iter = 0; while (iter < maxSegmentsToMove) { - BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList, numSegments); - DruidServer toServer = analyzer.findNewSegmentHome(segmentToMove.getSegment(), serverHolderList).pollFirst().rhs.getServer(); + iter++; + final BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList, numSegments); + final ServerHolder holder = analyzer.findNewSegmentHome(segmentToMove.getSegment(), serverHolderList); + if (holder == null) { + continue; + } + final DruidServer toServer = holder.getServer(); + if (!currentlyMovingSegments.get(tier).containsKey(segmentToMove.getSegment())) { moveSegment(segmentToMove, toServer, params); } - iter++; } final double initialTotalCost = analyzer.calculateInitialTotalCost(serverHolderList); diff --git a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java index 7c26c8b2d6d..0dadf86d9da 100644 --- a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java +++ b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java @@ -62,14 +62,14 @@ public abstract class LoadRule implements Rule final List serverHolderList = new ArrayList(serverQueue); final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp(); final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp); - MinMaxPriorityQueue> serverCostQueue = analyzer.findNewSegmentHome(segment, serverHolderList); stats.accumulate( assign( params.getReplicationManager(), expectedReplicants, totalReplicants, - serverCostQueue, + analyzer, + serverHolderList, segment ) ); @@ -81,16 +81,17 @@ public abstract class LoadRule implements Rule private MasterStats assign( final ReplicationThrottler replicationManager, - int expectedReplicants, + final int expectedReplicants, int totalReplicants, - MinMaxPriorityQueue> serverQueue, + final BalancerCostAnalyzer analyzer, + final List serverHolderList, final DataSegment segment ) { - MasterStats stats = new MasterStats(); + final MasterStats stats = new MasterStats(); while (totalReplicants < expectedReplicants) { - ServerHolder holder = serverQueue.pollFirst().rhs; + ServerHolder holder = analyzer.findNewSegmentHome(segment, serverHolderList); if (holder == null) { log.warn( @@ -102,10 +103,6 @@ public abstract class LoadRule implements Rule break; } - if (holder.isServingSegment(segment) || holder.isLoadingSegment(segment)) { - continue; - } - if (totalReplicants > 0) { // don't throttle if there's only 1 copy of this segment in the cluster if (!replicationManager.canAddReplicant(getTier()) || !replicationManager.registerReplicantCreation(getTier(), segment.getIdentifier())) { diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java index 504c4f4cfef..2ff1dade115 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java @@ -143,6 +143,7 @@ public class DruidMasterBalancerTest EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce(); EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce(); EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes(); + EasyMock.expect(druidServer1.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); EasyMock.replay(druidServer1); EasyMock.expect(druidServer2.getName()).andReturn("to").atLeastOnce(); @@ -150,6 +151,7 @@ public class DruidMasterBalancerTest EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce(); EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce(); EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap()).anyTimes(); + EasyMock.expect(druidServer2.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); EasyMock.replay(druidServer2); EasyMock.replay(druidServer3); @@ -202,6 +204,7 @@ public class DruidMasterBalancerTest EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce(); EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce(); EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes(); + EasyMock.expect(druidServer1.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); EasyMock.replay(druidServer1); EasyMock.expect(druidServer2.getName()).andReturn("2").atLeastOnce(); @@ -209,6 +212,7 @@ public class DruidMasterBalancerTest EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce(); EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce(); EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap()).anyTimes(); + EasyMock.expect(druidServer2.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); EasyMock.replay(druidServer2); EasyMock.expect(druidServer3.getName()).andReturn("3").atLeastOnce(); @@ -216,6 +220,7 @@ public class DruidMasterBalancerTest EasyMock.expect(druidServer3.getCurrSize()).andReturn(0L).atLeastOnce(); EasyMock.expect(druidServer3.getMaxSize()).andReturn(100L).atLeastOnce(); EasyMock.expect(druidServer3.getSegments()).andReturn(new HashMap()).anyTimes(); + EasyMock.expect(druidServer3.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); EasyMock.replay(druidServer3); EasyMock.expect(druidServer4.getName()).andReturn("4").atLeastOnce(); @@ -223,6 +228,7 @@ public class DruidMasterBalancerTest EasyMock.expect(druidServer4.getCurrSize()).andReturn(0L).atLeastOnce(); EasyMock.expect(druidServer4.getMaxSize()).andReturn(100L).atLeastOnce(); EasyMock.expect(druidServer4.getSegments()).andReturn(new HashMap()).anyTimes(); + EasyMock.expect(druidServer4.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); EasyMock.replay(druidServer4); // Mock stuff that the master needs