From c2604657edd7999666034e2b0c29ca9e9a8a3fb6 Mon Sep 17 00:00:00 2001 From: Himadri Singh Date: Thu, 13 Feb 2014 19:04:54 +0530 Subject: [PATCH] Extracted common methods --- .../AbstractCostBalancerStrategy.java | 202 ++++++++++++++++++ .../CostBalancerMultithreadStrategy.java | 160 +------------- ...ostBalancerMultithreadStrategyFactory.java | 2 +- .../coordinator/CostBalancerStrategy.java | 164 +------------- 4 files changed, 209 insertions(+), 319 deletions(-) create mode 100644 server/src/main/java/io/druid/server/coordinator/AbstractCostBalancerStrategy.java diff --git a/server/src/main/java/io/druid/server/coordinator/AbstractCostBalancerStrategy.java b/server/src/main/java/io/druid/server/coordinator/AbstractCostBalancerStrategy.java new file mode 100644 index 00000000000..25c0f63238c --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/AbstractCostBalancerStrategy.java @@ -0,0 +1,202 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.coordinator; + +import com.metamx.common.Pair; +import com.metamx.emitter.EmittingLogger; +import io.druid.timeline.DataSegment; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import java.util.List; + +public abstract class AbstractCostBalancerStrategy implements BalancerStrategy +{ + private static final EmittingLogger log = new EmittingLogger(AbstractCostBalancerStrategy.class); + private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24; + private static final int SEVEN_DAYS_IN_MILLIS = 7 * DAY_IN_MILLIS; + private static final int THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS; + private final DateTime referenceTimestamp; + + public AbstractCostBalancerStrategy(DateTime referenceTimestamp) + { + this.referenceTimestamp = referenceTimestamp; + } + + @Override + public ServerHolder findNewSegmentHomeReplicator( + DataSegment proposalSegment, List serverHolders + ) + { + ServerHolder holder = chooseBestServer(proposalSegment, serverHolders, false).rhs; + if (holder != null && !holder.isServingSegment(proposalSegment)) { + return holder; + } + return null; + } + + + @Override + public ServerHolder findNewSegmentHomeBalancer( + DataSegment proposalSegment, List serverHolders + ) + { + return chooseBestServer(proposalSegment, serverHolders, true).rhs; + } + + + /** + * For assignment, we want to move to the lowest cost server that isn't already serving the segment. + * + * @param proposalSegment A DataSegment that we are proposing to move. + * @param serverHolders An iterable of ServerHolders for a particular tier. + * + * @return A ServerHolder with the new home for a segment. + */ + + protected abstract Pair chooseBestServer( + final DataSegment proposalSegment, + final Iterable serverHolders, + boolean includeCurrentServer + ); + + /** + * This defines the unnormalized cost function between two segments. There is a base cost given by + * the minimum size of the two segments and additional penalties. + * recencyPenalty: it is more likely that recent segments will be queried together + * dataSourcePenalty: if two segments belong to the same data source, they are more likely to be involved + * in the same queries + * gapPenalty: it is more likely that segments close together in time will be queried together + * + * @param segment1 The first DataSegment. + * @param segment2 The second DataSegment. + * + * @return The joint cost of placing the two DataSegments together on one node. + */ + public double computeJointSegmentCosts(final DataSegment segment1, final DataSegment segment2) + { + final Interval gap = segment1.getInterval().gap(segment2.getInterval()); + + final double baseCost = Math.min(segment1.getSize(), segment2.getSize()); + double recencyPenalty = 1; + double dataSourcePenalty = 1; + double gapPenalty = 1; + + if (segment1.getDataSource().equals(segment2.getDataSource())) { + dataSourcePenalty = 2; + } + + double maxDiff = Math.max( + referenceTimestamp.getMillis() - segment1.getInterval().getEndMillis(), + referenceTimestamp.getMillis() - segment2.getInterval().getEndMillis() + ); + double segment1diff = referenceTimestamp.getMillis() - segment1.getInterval().getEndMillis(); + double segment2diff = referenceTimestamp.getMillis() - segment2.getInterval().getEndMillis(); + if (segment1diff < SEVEN_DAYS_IN_MILLIS && segment2diff < SEVEN_DAYS_IN_MILLIS) { + recencyPenalty = (2 - segment1diff / SEVEN_DAYS_IN_MILLIS) * (2 - segment2diff / SEVEN_DAYS_IN_MILLIS); + } + + /** gap is null if the two segment intervals overlap or if they're adjacent */ + if (gap == null) { + gapPenalty = 2; + } else { + long gapMillis = gap.toDurationMillis(); + if (gapMillis < THIRTY_DAYS_IN_MILLIS) { + gapPenalty = 2 - gapMillis / THIRTY_DAYS_IN_MILLIS; + } + } + + final double cost = baseCost * recencyPenalty * dataSourcePenalty * gapPenalty; + + return cost; + } + + public BalancerSegmentHolder pickSegmentToMove(final List serverHolders) + { + ReservoirSegmentSampler sampler = new ReservoirSegmentSampler(); + return sampler.getRandomBalancerSegmentHolder(serverHolders); + } + + /** + * Calculates the initial cost of the Druid segment configuration. + * + * @param serverHolders A list of ServerHolders for a particular tier. + * + * @return The initial cost of the Druid tier. + */ + public double calculateInitialTotalCost(final List serverHolders) + { + double cost = 0; + for (ServerHolder server : serverHolders) { + DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{}); + for (int i = 0; i < segments.length; ++i) { + for (int j = i; j < segments.length; ++j) { + cost += computeJointSegmentCosts(segments[i], segments[j]); + } + } + } + return cost; + } + + /** + * Calculates the cost normalization. This is such that the normalized cost is lower bounded + * by 1 (e.g. when each segment gets its own historical node). + * + * @param serverHolders A list of ServerHolders for a particular tier. + * + * @return The normalization value (the sum of the diagonal entries in the + * pairwise cost matrix). This is the cost of a cluster if each + * segment were to get its own historical node. + */ + public double calculateNormalization(final List serverHolders) + { + double cost = 0; + for (ServerHolder server : serverHolders) { + for (DataSegment segment : server.getServer().getSegments().values()) { + cost += computeJointSegmentCosts(segment, segment); + } + } + return cost; + } + + @Override + public void emitStats( + String tier, + CoordinatorStats stats, List serverHolderList + ) + { + final double initialTotalCost = calculateInitialTotalCost(serverHolderList); + final double normalization = calculateNormalization(serverHolderList); + final double normalizedInitialCost = initialTotalCost / normalization; + + stats.addToTieredStat("initialCost", tier, (long) initialTotalCost); + stats.addToTieredStat("normalization", tier, (long) normalization); + stats.addToTieredStat("normalizedInitialCostTimesOneThousand", tier, (long) (normalizedInitialCost * 1000)); + + log.info( + "[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial Normalized Cost: [%f]", + tier, + initialTotalCost, + normalization, + normalizedInitialCost + ); + + } +} \ No newline at end of file diff --git a/server/src/main/java/io/druid/server/coordinator/CostBalancerMultithreadStrategy.java b/server/src/main/java/io/druid/server/coordinator/CostBalancerMultithreadStrategy.java index c4e8f6351c2..168bd5a221b 100644 --- a/server/src/main/java/io/druid/server/coordinator/CostBalancerMultithreadStrategy.java +++ b/server/src/main/java/io/druid/server/coordinator/CostBalancerMultithreadStrategy.java @@ -33,51 +33,16 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -public class CostBalancerMultithreadStrategy implements BalancerStrategy +public class CostBalancerMultithreadStrategy extends AbstractCostBalancerStrategy { private static final EmittingLogger log = new EmittingLogger(CostBalancerMultithreadStrategy.class); - private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24; - private static final int SEVEN_DAYS_IN_MILLIS = 7 * DAY_IN_MILLIS; - private static final int THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS; - private final long referenceTimestampInMillis; public CostBalancerMultithreadStrategy(DateTime referenceTimestamp) { - this.referenceTimestampInMillis = referenceTimestamp.getMillis(); + super(referenceTimestamp); } - @Override - public ServerHolder findNewSegmentHomeReplicator( - DataSegment proposalSegment, List serverHolders - ) - { - ServerHolder holder = chooseBestServer(proposalSegment, serverHolders, false).rhs; - if (holder != null && !holder.isServingSegment(proposalSegment)) { - return holder; - } - return null; - } - - - @Override - public ServerHolder findNewSegmentHomeBalancer( - DataSegment proposalSegment, List serverHolders - ) - { - return chooseBestServer(proposalSegment, serverHolders, true).rhs; - } - - - /** - * For assignment, we want to move to the lowest cost server that isn't already serving the segment. - * - * @param proposalSegment A DataSegment that we are proposing to move. - * @param serverHolders An iterable of ServerHolders for a particular tier. - * - * @return A ServerHolder with the new home for a segment. - */ - - private Pair chooseBestServer( + protected Pair chooseBestServer( final DataSegment proposalSegment, final Iterable serverHolders, final boolean includeCurrentServer @@ -151,123 +116,4 @@ public class CostBalancerMultithreadStrategy implements BalancerStrategy return Pair.of(Double.POSITIVE_INFINITY, null); } } - - /** - * This defines the unnormalized cost function between two segments. There is a base cost given by - * the minimum size of the two segments and additional penalties. - * recencyPenalty: it is more likely that recent segments will be queried together - * dataSourcePenalty: if two segments belong to the same data source, they are more likely to be involved - * in the same queries - * gapPenalty: it is more likely that segments close together in time will be queried together - * - * @param segment1 The first DataSegment. - * @param segment2 The second DataSegment. - * - * @return The joint cost of placing the two DataSegments together on one node. - */ - public double computeJointSegmentCosts(final DataSegment segment1, final DataSegment segment2) - { - final Interval gap = segment1.getInterval().gap(segment2.getInterval()); - - final double baseCost = Math.min(segment1.getSize(), segment2.getSize()); - double recencyPenalty = 1; - double dataSourcePenalty = 1; - double gapPenalty = 1; - - if (segment1.getDataSource().equals(segment2.getDataSource())) { - dataSourcePenalty = 2; - } - - double segment1diff = referenceTimestampInMillis - segment1.getInterval().getEndMillis(); - double segment2diff = referenceTimestampInMillis - segment2.getInterval().getEndMillis(); - if (segment1diff < SEVEN_DAYS_IN_MILLIS && segment2diff < SEVEN_DAYS_IN_MILLIS) { - recencyPenalty = (2 - segment1diff / SEVEN_DAYS_IN_MILLIS) * (2 - segment2diff / SEVEN_DAYS_IN_MILLIS); - } - - /** gap is null if the two segment intervals overlap or if they're adjacent */ - if (gap == null) { - gapPenalty = 2; - } else { - long gapMillis = gap.toDurationMillis(); - if (gapMillis < THIRTY_DAYS_IN_MILLIS) { - gapPenalty = 2 - gapMillis / THIRTY_DAYS_IN_MILLIS; - } - } - - final double cost = baseCost * recencyPenalty * dataSourcePenalty * gapPenalty; - - return cost; - } - - public BalancerSegmentHolder pickSegmentToMove(final List serverHolders) - { - ReservoirSegmentSampler sampler = new ReservoirSegmentSampler(); - return sampler.getRandomBalancerSegmentHolder(serverHolders); - } - - /** - * Calculates the initial cost of the Druid segment configuration. - * - * @param serverHolders A list of ServerHolders for a particular tier. - * - * @return The initial cost of the Druid tier. - */ - public double calculateInitialTotalCost(final List serverHolders) - { - double cost = 0; - for (ServerHolder server : serverHolders) { - DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{}); - for (int i = 0; i < segments.length; ++i) { - for (int j = i; j < segments.length; ++j) { - cost += computeJointSegmentCosts(segments[i], segments[j]); - } - } - } - return cost; - } - - /** - * Calculates the cost normalization. This is such that the normalized cost is lower bounded - * by 1 (e.g. when each segment gets its own historical node). - * - * @param serverHolders A list of ServerHolders for a particular tier. - * - * @return The normalization value (the sum of the diagonal entries in the - * pairwise cost matrix). This is the cost of a cluster if each - * segment were to get its own historical node. - */ - public double calculateNormalization(final List serverHolders) - { - double cost = 0; - for (ServerHolder server : serverHolders) { - for (DataSegment segment : server.getServer().getSegments().values()) { - cost += computeJointSegmentCosts(segment, segment); - } - } - return cost; - } - - @Override - public void emitStats( - String tier, - CoordinatorStats stats, List serverHolderList - ) - { - final double initialTotalCost = calculateInitialTotalCost(serverHolderList); - final double normalization = calculateNormalization(serverHolderList); - final double normalizedInitialCost = initialTotalCost / normalization; - - stats.addToTieredStat("initialCost", tier, (long) initialTotalCost); - stats.addToTieredStat("normalization", tier, (long) normalization); - stats.addToTieredStat("normalizedInitialCostTimesOneThousand", tier, (long) (normalizedInitialCost * 1000)); - - log.info( - "[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial Normalized Cost: [%f]", - tier, - initialTotalCost, - normalization, - normalizedInitialCost - ); - - } } \ No newline at end of file diff --git a/server/src/main/java/io/druid/server/coordinator/CostBalancerMultithreadStrategyFactory.java b/server/src/main/java/io/druid/server/coordinator/CostBalancerMultithreadStrategyFactory.java index 0bd02f21c3d..d1373f49989 100644 --- a/server/src/main/java/io/druid/server/coordinator/CostBalancerMultithreadStrategyFactory.java +++ b/server/src/main/java/io/druid/server/coordinator/CostBalancerMultithreadStrategyFactory.java @@ -26,6 +26,6 @@ public class CostBalancerMultithreadStrategyFactory implements BalancerStrategyF @Override public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp) { - return new CostBalancerStrategy(referenceTimestamp); + return new CostBalancerMultithreadStrategy(referenceTimestamp); } } diff --git a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java index a5cb45e7e7f..82f60261a2e 100644 --- a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java +++ b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java @@ -27,51 +27,16 @@ import org.joda.time.Interval; import java.util.List; -public class CostBalancerStrategy implements BalancerStrategy +public class CostBalancerStrategy extends AbstractCostBalancerStrategy { private static final EmittingLogger log = new EmittingLogger(CostBalancerStrategy.class); - private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24; - private static final int SEVEN_DAYS_IN_MILLIS = 7 * DAY_IN_MILLIS; - private static final int THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS; - private final DateTime referenceTimestamp; public CostBalancerStrategy(DateTime referenceTimestamp) { - this.referenceTimestamp = referenceTimestamp; + super(referenceTimestamp); } - @Override - public ServerHolder findNewSegmentHomeReplicator( - DataSegment proposalSegment, List serverHolders - ) - { - ServerHolder holder = chooseBestServer(proposalSegment, serverHolders, false).rhs; - if (holder != null && !holder.isServingSegment(proposalSegment)) { - return holder; - } - return null; - } - - - @Override - public ServerHolder findNewSegmentHomeBalancer( - DataSegment proposalSegment, List serverHolders - ) - { - return chooseBestServer(proposalSegment, serverHolders, true).rhs; - } - - - /** - * For assignment, we want to move to the lowest cost server that isn't already serving the segment. - * - * @param proposalSegment A DataSegment that we are proposing to move. - * @param serverHolders An iterable of ServerHolders for a particular tier. - * - * @return A ServerHolder with the new home for a segment. - */ - - private Pair chooseBestServer( + protected Pair chooseBestServer( final DataSegment proposalSegment, final Iterable serverHolders, boolean includeCurrentServer @@ -109,127 +74,4 @@ public class CostBalancerStrategy implements BalancerStrategy return bestServer; } - - /** - * This defines the unnormalized cost function between two segments. There is a base cost given by - * the minimum size of the two segments and additional penalties. - * recencyPenalty: it is more likely that recent segments will be queried together - * dataSourcePenalty: if two segments belong to the same data source, they are more likely to be involved - * in the same queries - * gapPenalty: it is more likely that segments close together in time will be queried together - * - * @param segment1 The first DataSegment. - * @param segment2 The second DataSegment. - * - * @return The joint cost of placing the two DataSegments together on one node. - */ - public double computeJointSegmentCosts(final DataSegment segment1, final DataSegment segment2) - { - final Interval gap = segment1.getInterval().gap(segment2.getInterval()); - - final double baseCost = Math.min(segment1.getSize(), segment2.getSize()); - double recencyPenalty = 1; - double dataSourcePenalty = 1; - double gapPenalty = 1; - - if (segment1.getDataSource().equals(segment2.getDataSource())) { - dataSourcePenalty = 2; - } - - double maxDiff = Math.max( - referenceTimestamp.getMillis() - segment1.getInterval().getEndMillis(), - referenceTimestamp.getMillis() - segment2.getInterval().getEndMillis() - ); - double segment1diff = referenceTimestamp.getMillis() - segment1.getInterval().getEndMillis(); - double segment2diff = referenceTimestamp.getMillis() - segment2.getInterval().getEndMillis(); - if (segment1diff < SEVEN_DAYS_IN_MILLIS && segment2diff < SEVEN_DAYS_IN_MILLIS) { - recencyPenalty = (2 - segment1diff / SEVEN_DAYS_IN_MILLIS) * (2 - segment2diff / SEVEN_DAYS_IN_MILLIS); - } - - /** gap is null if the two segment intervals overlap or if they're adjacent */ - if (gap == null) { - gapPenalty = 2; - } else { - long gapMillis = gap.toDurationMillis(); - if (gapMillis < THIRTY_DAYS_IN_MILLIS) { - gapPenalty = 2 - gapMillis / THIRTY_DAYS_IN_MILLIS; - } - } - - final double cost = baseCost * recencyPenalty * dataSourcePenalty * gapPenalty; - - return cost; - } - - public BalancerSegmentHolder pickSegmentToMove(final List serverHolders) - { - ReservoirSegmentSampler sampler = new ReservoirSegmentSampler(); - return sampler.getRandomBalancerSegmentHolder(serverHolders); - } - - /** - * Calculates the initial cost of the Druid segment configuration. - * - * @param serverHolders A list of ServerHolders for a particular tier. - * - * @return The initial cost of the Druid tier. - */ - public double calculateInitialTotalCost(final List serverHolders) - { - double cost = 0; - for (ServerHolder server : serverHolders) { - DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{}); - for (int i = 0; i < segments.length; ++i) { - for (int j = i; j < segments.length; ++j) { - cost += computeJointSegmentCosts(segments[i], segments[j]); - } - } - } - return cost; - } - - /** - * Calculates the cost normalization. This is such that the normalized cost is lower bounded - * by 1 (e.g. when each segment gets its own historical node). - * - * @param serverHolders A list of ServerHolders for a particular tier. - * - * @return The normalization value (the sum of the diagonal entries in the - * pairwise cost matrix). This is the cost of a cluster if each - * segment were to get its own historical node. - */ - public double calculateNormalization(final List serverHolders) - { - double cost = 0; - for (ServerHolder server : serverHolders) { - for (DataSegment segment : server.getServer().getSegments().values()) { - cost += computeJointSegmentCosts(segment, segment); - } - } - return cost; - } - - @Override - public void emitStats( - String tier, - CoordinatorStats stats, List serverHolderList - ) - { - final double initialTotalCost = calculateInitialTotalCost(serverHolderList); - final double normalization = calculateNormalization(serverHolderList); - final double normalizedInitialCost = initialTotalCost / normalization; - - stats.addToTieredStat("initialCost", tier, (long) initialTotalCost); - stats.addToTieredStat("normalization", tier, (long) normalization); - stats.addToTieredStat("normalizedInitialCostTimesOneThousand", tier, (long) (normalizedInitialCost * 1000)); - - log.info( - "[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial Normalized Cost: [%f]", - tier, - initialTotalCost, - normalization, - normalizedInitialCost - ); - - } } \ No newline at end of file