diff --git a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java new file mode 100644 index 00000000000..86b220420f9 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java @@ -0,0 +1,229 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.master; + +import com.google.common.collect.Lists; +import com.google.common.collect.MinMaxPriorityQueue; +import com.google.common.collect.Sets; +import com.metamx.common.Pair; +import com.metamx.common.logger.Logger; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.client.DruidDataSource; +import com.metamx.druid.client.DruidServer; +import org.joda.time.Interval; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.Set; + +/** + * The BalancerCostAnalyzer + */ +public class BalancerCostAnalyzer +{ + private static final Logger log = new Logger(BalancerCostAnalyzer.class); + private static final int MAX_SEGMENTS_TO_MOVE = 5; + private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24; + private float initialTotalCost; + private float totalCostChange; + + private List serverHolderList; + private Random rand; + + public BalancerCostAnalyzer(){ + rand = new Random(); + totalCostChange = 0f; + } + + public void init(List serverHolderList){ + this.initialTotalCost = calculateInitialTotalCost(serverHolderList); + this.serverHolderList = serverHolderList; + } + + public float getInitialTotalCost() { + return initialTotalCost; + } + + public float getTotalCostChange() { + return totalCostChange; + } + + public float calculateInitialTotalCost(List serverHolderList){ + int cost = 0; + for (ServerHolder server : serverHolderList) { + DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{}); + for (int i = 0; i < segments.length; ++i) { + for (int j = i; j < segments.length; ++j) { + cost += computeJointSegmentCosts(segments[i], segments[j]); + } + } + } + return cost; + } + + public float calculateTotalCostChange(List serverHolderList, Set segmentHoldersToMove){ + int cost = 0; + Iterator it = segmentHoldersToMove.iterator(); + while (it.hasNext()) { + BalancerSegmentHolder2 segmentHolder = (BalancerSegmentHolder2) it.next(); + for (DataSegment fromSegment : segmentHolder.getFromServer().getSegments().values()) { + cost -= computeJointSegmentCosts(segmentHolder.getSegment(), fromSegment); + } + for (DataSegment toSegment : segmentHolder.getToServer().getSegments().values()) { + cost += computeJointSegmentCosts(segmentHolder.getSegment(), toSegment); + } + return cost; + } + + for (ServerHolder server : serverHolderList) { + DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{}); + for (int i = 0; i < segments.length; ++i) { + for (int j = i; j < segments.length; ++j) { + cost += computeJointSegmentCosts(segments[i], segments[j]); + } + } + } + return cost; + } + + public float computeJointSegmentCosts(DataSegment segment1, DataSegment segment2){ + float cost = 0f; + Interval gap = segment1.getInterval().gap(segment2.getInterval()); + + // gap is null if the two segment intervals overlap or if they're adjacent + if (gap == null) { + cost += 1f; + } else { + long gapMillis = gap.toDurationMillis(); + if (gapMillis < DAY_IN_MILLIS) cost += 1f; + } + + if(segment1.getDataSource().equals(segment2.getDataSource())) cost += 1f; + + return cost; + } + + public Set findSegmentsToMove() + { + Set segmentHoldersToMove = Sets.newHashSet(); + Set movingSegments = Sets.newHashSet(); + + int counter = 0; + + while (segmentHoldersToMove.size() < MAX_SEGMENTS_TO_MOVE && counter < 2 * MAX_SEGMENTS_TO_MOVE) { + counter++; + ServerHolder fromServerHolder = serverHolderList.get(rand.nextInt(serverHolderList.size())); + List segments = Lists.newArrayList(fromServerHolder.getServer().getSegments().values()); + if (segments.size() == 0) continue; + DataSegment proposalSegment = segments.get(rand.nextInt(segments.size())); + if (movingSegments.contains(proposalSegment)) continue; + + //Just need a regular priority queue for the min. element. + MinMaxPriorityQueue> pQueue = MinMaxPriorityQueue.orderedBy( + new Comparator>() + { + @Override + public int compare( + Pair o, + Pair o1 + ) + { + return Float.compare(o.lhs, o1.lhs); + } + } + ).create(); + + for (ServerHolder server : serverHolderList) { + float cost = 0f; + for (DataSegment segment : server.getServer().getSegments().values()) { + cost += computeJointSegmentCosts(proposalSegment, segment); + } + + //Take into account costs of segments that will be moved. + Iterator it = segmentHoldersToMove.iterator(); + while (it.hasNext()) { + BalancerSegmentHolder2 segmentToMove = (BalancerSegmentHolder2) it.next(); + if (server.getServer().equals(segmentToMove.getToServer())) { + cost += computeJointSegmentCosts(proposalSegment, segmentToMove.getSegment()); + } + if (server.getServer().equals(segmentToMove.getFromServer())) { + cost -= computeJointSegmentCosts(proposalSegment, segmentToMove.getSegment()); + } + } + + pQueue.add(Pair.of(cost, server)); + } + + Pair minPair = pQueue.peekFirst(); + if (!minPair.rhs.equals(fromServerHolder)) { + movingSegments.add(proposalSegment); + segmentHoldersToMove.add( + new BalancerSegmentHolder2( + fromServerHolder.getServer(), + minPair.rhs.getServer(), + proposalSegment + ) + ); + totalCostChange += minPair.lhs; + } + + } + + totalCostChange = calculateTotalCostChange(serverHolderList, segmentHoldersToMove); + return segmentHoldersToMove; + + /* + double currPercentDiff = getPercentDiff(); + + if (currPercentDiff < PERCENT_THRESHOLD) { + log.info("Cluster usage is balanced."); + return segmentsToMove; + } + + List dataSources = Lists.newArrayList(server.getDataSources()); + Collections.shuffle(dataSources); + + for (DruidDataSource dataSource : dataSources) { + List segments = Lists.newArrayList(dataSource.getSegments()); + Collections.shuffle(segments); + + for (DataSegment segment : segments) { + if (segmentsToMove.size() >= MAX_SEGMENTS_TO_MOVE) { + return segmentsToMove; + } + + if (getLookaheadPercentDiff(highestSizeUsed - segment.getSize(), lowestSizeUsed + segment.getSize()) + < currPercentDiff) { + segmentsToMove.add(new BalancerSegmentHolder(server, segment)); + update(highestSizeUsed - segment.getSize(), lowestSizeUsed + segment.getSize()); + } + } + } + + return segmentsToMove; + */ + } +} + + diff --git a/server/src/main/java/com/metamx/druid/master/BalancerSegmentHolder2.java b/server/src/main/java/com/metamx/druid/master/BalancerSegmentHolder2.java new file mode 100644 index 00000000000..22277285ee0 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/master/BalancerSegmentHolder2.java @@ -0,0 +1,70 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.master; + +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.client.DruidServer; + +/** + */ +public class BalancerSegmentHolder2 +{ + private final DruidServer fromServer; + private final DruidServer toServer; + private final DataSegment segment; + + private volatile int lifetime = 15; + + public BalancerSegmentHolder2( + DruidServer fromServer, + DruidServer toServer, + DataSegment segment + ) + { + this.fromServer = fromServer; + this.toServer = toServer; + this.segment = segment; + } + + public DruidServer getFromServer() + { + return fromServer; + } + + public DruidServer getToServer() + { + return toServer; + } + + public DataSegment getSegment() + { + return segment; + } + + public int getLifetime() + { + return lifetime; + } + + public void reduceLifetime() + { + lifetime--; + } +} diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java index 80172c432fa..4dfbb6018bb 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java @@ -674,7 +674,7 @@ public class DruidMaster }, new DruidMasterRuleRunner(DruidMaster.this), new DruidMasterCleanup(DruidMaster.this), - new DruidMasterBalancer(DruidMaster.this, new BalancerAnalyzer()), + new DruidMasterBalancer(DruidMaster.this, new BalancerCostAnalyzer()), new DruidMasterLogger() ) ); diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java index 75d3534fef6..ba417f401e4 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java @@ -24,14 +24,19 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.Sets; +import com.metamx.common.Pair; import com.metamx.common.guava.Comparators; import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DruidServer; import com.metamx.emitter.EmittingLogger; +import org.joda.time.Interval; +import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; @@ -52,14 +57,14 @@ public class DruidMasterBalancer implements DruidMasterHelper ); private static final EmittingLogger log = new EmittingLogger(DruidMasterBalancer.class); - private final BalancerAnalyzer analyzer; + private final BalancerCostAnalyzer analyzer; private final DruidMaster master; - private final Map> currentlyMovingSegments = Maps.newHashMap(); + private final Map> currentlyMovingSegments = Maps.newHashMap(); public DruidMasterBalancer( DruidMaster master, - BalancerAnalyzer analyzer + BalancerCostAnalyzer analyzer ) { this.master = master; @@ -68,7 +73,7 @@ public class DruidMasterBalancer implements DruidMasterHelper private void reduceLifetimes(String tier) { - for (BalancerSegmentHolder holder : currentlyMovingSegments.get(tier).values()) { + for (BalancerSegmentHolder2 holder : currentlyMovingSegments.get(tier).values()) { holder.reduceLifetime(); if (holder.getLifetime() <= 0) { log.makeAlert( @@ -92,7 +97,7 @@ public class DruidMasterBalancer implements DruidMasterHelper String tier = entry.getKey(); if (currentlyMovingSegments.get(tier) == null) { - currentlyMovingSegments.put(tier, new ConcurrentHashMap()); + currentlyMovingSegments.put(tier, new ConcurrentHashMap()); } if (!currentlyMovingSegments.get(tier).isEmpty()) { @@ -108,6 +113,21 @@ public class DruidMasterBalancer implements DruidMasterHelper TreeSet serversByPercentUsed = Sets.newTreeSet(percentUsedComparator); serversByPercentUsed.addAll(entry.getValue()); + List serverHolderList = new ArrayList(entry.getValue()); + + analyzer.init(serverHolderList); + log.info( + "Initial Total Cost: [%s]", + analyzer.getInitialTotalCost() + ); + moveSegments(analyzer.findSegmentsToMove(), params); + + stats.addToTieredStat("costChange", tier, (long) analyzer.getTotalCostChange()); + log.info( + "Cost Change: [%s]", + analyzer.getTotalCostChange() + ); + if (serversByPercentUsed.size() <= 1) { log.info( "[%s]: No unique values found for highest and lowest percent used servers: nothing to balance", @@ -119,6 +139,7 @@ public class DruidMasterBalancer implements DruidMasterHelper ServerHolder highestPercentUsedServer = serversByPercentUsed.first(); ServerHolder lowestPercentUsedServer = serversByPercentUsed.last(); + /* analyzer.init(highestPercentUsedServer, lowestPercentUsedServer); log.info( @@ -149,6 +170,8 @@ public class DruidMasterBalancer implements DruidMasterHelper analyzer.findSegmentsToMove(highestPercentUsedServer.getServer()), params ); + */ + stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size()); } @@ -159,53 +182,55 @@ public class DruidMasterBalancer implements DruidMasterHelper } private void moveSegments( - final DruidServer server, - final Set segments, + final Set segments, final DruidMasterRuntimeParams params ) { - String toServer = server.getName(); - LoadQueuePeon toPeon = params.getLoadManagementPeons().get(toServer); - for (final BalancerSegmentHolder segment : Sets.newHashSet(segments)) { - String fromServer = segment.getServer().getName(); + for (final BalancerSegmentHolder2 segment : Sets.newHashSet(segments)) { + final DruidServer toServer = segment.getToServer(); + final String toServerName = segment.getToServer().getName(); + LoadQueuePeon toPeon = params.getLoadManagementPeons().get(toServerName); + + String fromServer = segment.getFromServer().getName(); DataSegment segmentToMove = segment.getSegment(); final String segmentName = segmentToMove.getIdentifier(); if (!toPeon.getSegmentsToLoad().contains(segmentToMove) && - (server.getSegment(segmentName) == null) && - new ServerHolder(server, toPeon).getAvailableSize() > segmentToMove.getSize()) { + (toServer.getSegment(segmentName) == null) && + new ServerHolder(toServer, toPeon).getAvailableSize() > segmentToMove.getSize()) { log.info( "Moving [%s] from [%s] to [%s]", segmentName, fromServer, - toServer + toServerName ); try { master.moveSegment( fromServer, - toServer, + toServerName, segmentToMove.getIdentifier(), new LoadPeonCallback() { @Override protected void execute() { - Map movingSegments = currentlyMovingSegments.get(server.getTier()); + Map movingSegments = currentlyMovingSegments.get(toServer.getTier()); if (movingSegments != null) { movingSegments.remove(segmentName); } } } ); - currentlyMovingSegments.get(server.getTier()).put(segmentName, segment); + currentlyMovingSegments.get(toServer.getTier()).put(segmentName, segment); } catch (Exception e) { log.makeAlert(e, String.format("[%s] : Moving exception", segmentName)).emit(); } } else { - currentlyMovingSegments.get(server.getTier()).remove(segment); + currentlyMovingSegments.get(toServer.getTier()).remove(segment); } } + } } \ No newline at end of file diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java index 7e1304aba7a..21fb99dc637 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java @@ -27,11 +27,14 @@ import com.metamx.druid.client.DruidDataSource; import com.metamx.druid.client.DruidServer; import junit.framework.Assert; import org.easymock.EasyMock; +import org.joda.time.Interval; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; /** */ @@ -46,6 +49,8 @@ public class DruidMasterBalancerTest private DataSegment segment4; private LoadQueuePeon peon; private DruidDataSource dataSource; + private Map segments; + private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24; @Before public void setUp() throws Exception @@ -78,17 +83,25 @@ public class DruidMasterBalancerTest @Test public void testRun() { + Map segments = new HashMap(); + segments.put("segment1", segment1); + segments.put("segment2", segment2); + segments.put("segment3", segment3); + segments.put("segment4", segment4); + // Mock some servers of different usages EasyMock.expect(druidServerHigh.getName()).andReturn("from").atLeastOnce(); EasyMock.expect(druidServerHigh.getCurrSize()).andReturn(30L).atLeastOnce(); EasyMock.expect(druidServerHigh.getMaxSize()).andReturn(100L).atLeastOnce(); - EasyMock.expect(druidServerHigh.getDataSources()).andReturn(Arrays.asList(dataSource)).atLeastOnce(); + EasyMock.expect(druidServerHigh.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes(); + EasyMock.expect(druidServerHigh.getSegments()).andReturn(segments).anyTimes(); EasyMock.replay(druidServerHigh); EasyMock.expect(druidServerLow.getName()).andReturn("to").atLeastOnce(); EasyMock.expect(druidServerLow.getTier()).andReturn("normal").atLeastOnce(); EasyMock.expect(druidServerLow.getCurrSize()).andReturn(0L).atLeastOnce(); EasyMock.expect(druidServerLow.getMaxSize()).andReturn(100L).atLeastOnce(); + EasyMock.expect(druidServerLow.getSegments()).andReturn(new HashMap()).anyTimes(); EasyMock.expect(druidServerLow.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes(); EasyMock.expect(druidServerLow.getSegment("segment1")).andReturn(null).anyTimes(); EasyMock.expect(druidServerLow.getSegment("segment2")).andReturn(null).anyTimes(); @@ -104,21 +117,29 @@ public class DruidMasterBalancerTest segment3, segment4 ) - ).atLeastOnce(); + ).anyTimes(); EasyMock.replay(dataSource); // Mock some segments of different sizes EasyMock.expect(segment1.getSize()).andReturn(11L).anyTimes(); EasyMock.expect(segment1.getIdentifier()).andReturn("segment1").anyTimes(); + EasyMock.expect(segment1.getDataSource()).andReturn("datasource1").anyTimes(); + EasyMock.expect(segment1.getInterval()).andReturn(new Interval(0, 1 * DAY_IN_MILLIS)).anyTimes(); EasyMock.replay(segment1); EasyMock.expect(segment2.getSize()).andReturn(7L).anyTimes(); EasyMock.expect(segment2.getIdentifier()).andReturn("segment2").anyTimes(); + EasyMock.expect(segment2.getDataSource()).andReturn("datasource1").anyTimes(); + EasyMock.expect(segment2.getInterval()).andReturn(new Interval(10 * DAY_IN_MILLIS, 11 * DAY_IN_MILLIS)).anyTimes(); EasyMock.replay(segment2); EasyMock.expect(segment3.getSize()).andReturn(4L).anyTimes(); EasyMock.expect(segment3.getIdentifier()).andReturn("segment3").anyTimes(); + EasyMock.expect(segment3.getDataSource()).andReturn("datasource1").anyTimes(); + EasyMock.expect(segment3.getInterval()).andReturn(new Interval(0, 1 * DAY_IN_MILLIS)).anyTimes(); EasyMock.replay(segment3); EasyMock.expect(segment4.getSize()).andReturn(8L).anyTimes(); EasyMock.expect(segment4.getIdentifier()).andReturn("segment4").anyTimes(); + EasyMock.expect(segment4.getDataSource()).andReturn("datasource1").anyTimes(); + EasyMock.expect(segment4.getInterval()).andReturn(new Interval(10 * DAY_IN_MILLIS, 11 * DAY_IN_MILLIS)).anyTimes(); EasyMock.replay(segment4); // Mock stuff that the master needs @@ -154,7 +175,7 @@ public class DruidMasterBalancerTest .withLoadManagementPeons(ImmutableMap.of("from", peon, "to", peon)) .build(); - params = new DruidMasterBalancer(master, new BalancerAnalyzer()).run(params); + params = new DruidMasterBalancer(master, new BalancerCostAnalyzer()).run(params); Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0); } }