mirror of https://github.com/apache/druid.git
added tests for random sampler and changed abstract class to interface
This commit is contained in:
parent
8ee71c8ceb
commit
34a9525966
|
@ -1,19 +1,31 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
package com.metamx.druid.master;
|
package com.metamx.druid.master;
|
||||||
|
|
||||||
import com.metamx.druid.client.DataSegment;
|
import com.metamx.druid.client.DataSegment;
|
||||||
import org.joda.time.DateTime;
|
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
|
||||||
|
|
||||||
public abstract class BalancerStrategy
|
public interface BalancerStrategy
|
||||||
{
|
{
|
||||||
protected final DateTime referenceTimestamp;
|
public ServerHolder findNewSegmentHome(final DataSegment proposalSegment,final Iterable<ServerHolder> serverHolders);
|
||||||
protected final Random rand;
|
public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders);
|
||||||
public BalancerStrategy(DateTime referenceTimestamp){
|
public void emitStats(String tier, MasterStats stats, List<ServerHolder> serverHolderList);
|
||||||
this.referenceTimestamp=referenceTimestamp;
|
|
||||||
rand=new Random(0);
|
|
||||||
}
|
|
||||||
public abstract ServerHolder findNewSegmentHome(final DataSegment proposalSegment,final Iterable<ServerHolder> serverHolders);
|
|
||||||
public abstract BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders);
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,26 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
package com.metamx.druid.master;
|
||||||
|
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
|
public interface BalancerStrategyFactory
|
||||||
|
{
|
||||||
|
public BalancerStrategy getBalancerStrategy(DateTime referenceTimestamp);
|
||||||
|
}
|
|
@ -1,23 +1,45 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
package com.metamx.druid.master;
|
package com.metamx.druid.master;
|
||||||
|
|
||||||
import com.google.common.collect.MinMaxPriorityQueue;
|
import com.google.common.collect.MinMaxPriorityQueue;
|
||||||
import com.metamx.common.Pair;
|
import com.metamx.common.Pair;
|
||||||
import com.metamx.druid.client.DataSegment;
|
import com.metamx.druid.client.DataSegment;
|
||||||
|
import com.metamx.emitter.EmittingLogger;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
public class CostBalancerStrategy extends BalancerStrategy
|
public class CostBalancerStrategy implements BalancerStrategy
|
||||||
{
|
{
|
||||||
|
private static final EmittingLogger log = new EmittingLogger(CostBalancerStrategy.class);
|
||||||
|
private final DateTime referenceTimestamp;
|
||||||
private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24;
|
private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24;
|
||||||
private static final int SEVEN_DAYS_IN_MILLIS = 7 * DAY_IN_MILLIS;
|
private static final int SEVEN_DAYS_IN_MILLIS = 7 * DAY_IN_MILLIS;
|
||||||
private static final int THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS;
|
private static final int THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS;
|
||||||
|
|
||||||
public CostBalancerStrategy(DateTime referenceTimeStamp)
|
public CostBalancerStrategy(DateTime referenceTimestamp)
|
||||||
{
|
{
|
||||||
super(referenceTimeStamp);
|
this.referenceTimestamp=referenceTimestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -44,7 +66,7 @@ public class CostBalancerStrategy extends BalancerStrategy
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
|
||||||
Pair<Double,ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY,null);
|
Pair<Double, ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY, null);
|
||||||
MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsAndServers = MinMaxPriorityQueue.orderedBy(
|
MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsAndServers = MinMaxPriorityQueue.orderedBy(
|
||||||
new Comparator<Pair<Double, ServerHolder>>()
|
new Comparator<Pair<Double, ServerHolder>>()
|
||||||
{
|
{
|
||||||
|
@ -80,8 +102,8 @@ public class CostBalancerStrategy extends BalancerStrategy
|
||||||
cost += computeJointSegmentCosts(proposalSegment, segment);
|
cost += computeJointSegmentCosts(proposalSegment, segment);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (cost<bestServer.lhs && !server.isServingSegment(proposalSegment)){
|
if (cost < bestServer.lhs && !server.isServingSegment(proposalSegment)) {
|
||||||
bestServer= Pair.of(cost,server);
|
bestServer = Pair.of(cost, server);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -143,5 +165,70 @@ public class CostBalancerStrategy extends BalancerStrategy
|
||||||
return sampler.getRandomBalancerSegmentHolder(serverHolders);
|
return sampler.getRandomBalancerSegmentHolder(serverHolders);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculates the initial cost of the Druid segment configuration.
|
||||||
|
*
|
||||||
|
* @param serverHolders A list of ServerHolders for a particular tier.
|
||||||
|
*
|
||||||
|
* @return The initial cost of the Druid tier.
|
||||||
|
*/
|
||||||
|
public double calculateInitialTotalCost(final List<ServerHolder> serverHolders)
|
||||||
|
{
|
||||||
|
double cost = 0;
|
||||||
|
for (ServerHolder server : serverHolders) {
|
||||||
|
DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{});
|
||||||
|
for (int i = 0; i < segments.length; ++i) {
|
||||||
|
for (int j = i; j < segments.length; ++j) {
|
||||||
|
cost += computeJointSegmentCosts(segments[i], segments[j]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return cost;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculates the cost normalization. This is such that the normalized cost is lower bounded
|
||||||
|
* by 1 (e.g. when each segment gets its own compute node).
|
||||||
|
*
|
||||||
|
* @param serverHolders A list of ServerHolders for a particular tier.
|
||||||
|
*
|
||||||
|
* @return The normalization value (the sum of the diagonal entries in the
|
||||||
|
* pairwise cost matrix). This is the cost of a cluster if each
|
||||||
|
* segment were to get its own compute node.
|
||||||
|
*/
|
||||||
|
public double calculateNormalization(final List<ServerHolder> serverHolders)
|
||||||
|
{
|
||||||
|
double cost = 0;
|
||||||
|
for (ServerHolder server : serverHolders) {
|
||||||
|
for (DataSegment segment : server.getServer().getSegments().values()) {
|
||||||
|
cost += computeJointSegmentCosts(segment, segment);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return cost;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void emitStats( String tier,
|
||||||
|
MasterStats stats, List<ServerHolder> serverHolderList
|
||||||
|
)
|
||||||
|
{
|
||||||
|
final double initialTotalCost = calculateInitialTotalCost(serverHolderList);
|
||||||
|
final double normalization = calculateNormalization(serverHolderList);
|
||||||
|
final double normalizedInitialCost = initialTotalCost / normalization;
|
||||||
|
|
||||||
|
stats.addToTieredStat("initialCost", tier, (long) initialTotalCost);
|
||||||
|
stats.addToTieredStat("normalization", tier, (long) normalization);
|
||||||
|
stats.addToTieredStat("normalizedInitialCostTimesOneThousand", tier, (long) (normalizedInitialCost * 1000));
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial Normalized Cost: [%f]",
|
||||||
|
tier,
|
||||||
|
initialTotalCost,
|
||||||
|
normalization,
|
||||||
|
normalizedInitialCost
|
||||||
|
);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
|
@ -0,0 +1,31 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
package com.metamx.druid.master;
|
||||||
|
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
|
public class CostBalancerStrategyFactory implements BalancerStrategyFactory
|
||||||
|
{
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BalancerStrategy getBalancerStrategy(DateTime referenceTimestamp)
|
||||||
|
{
|
||||||
|
return new CostBalancerStrategy(referenceTimestamp);
|
||||||
|
}
|
||||||
|
}
|
|
@ -92,7 +92,6 @@ public class DruidMaster
|
||||||
private final IndexingServiceClient indexingServiceClient;
|
private final IndexingServiceClient indexingServiceClient;
|
||||||
private final ScheduledExecutorService exec;
|
private final ScheduledExecutorService exec;
|
||||||
private final LoadQueueTaskMaster taskMaster;
|
private final LoadQueueTaskMaster taskMaster;
|
||||||
|
|
||||||
private final Map<String, LoadQueuePeon> loadManagementPeons;
|
private final Map<String, LoadQueuePeon> loadManagementPeons;
|
||||||
private final AtomicReference<LeaderLatch> leaderLatch;
|
private final AtomicReference<LeaderLatch> leaderLatch;
|
||||||
|
|
||||||
|
@ -660,6 +659,7 @@ public class DruidMaster
|
||||||
.withMergeBytesLimit(config.getMergeBytesLimit())
|
.withMergeBytesLimit(config.getMergeBytesLimit())
|
||||||
.withMergeSegmentsLimit(config.getMergeSegmentsLimit())
|
.withMergeSegmentsLimit(config.getMergeSegmentsLimit())
|
||||||
.withMaxSegmentsToMove(config.getMaxSegmentsToMove())
|
.withMaxSegmentsToMove(config.getMaxSegmentsToMove())
|
||||||
|
.withEmitBalancingCostParams(config.getEmitBalancerCostParams())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
for (DruidMasterHelper helper : helpers) {
|
for (DruidMasterHelper helper : helpers) {
|
||||||
|
|
|
@ -78,7 +78,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
||||||
{
|
{
|
||||||
final MasterStats stats = new MasterStats();
|
final MasterStats stats = new MasterStats();
|
||||||
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
|
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
|
||||||
final BalancerStrategy strategy = params.getBalancerStrategy(referenceTimestamp);
|
final BalancerStrategy strategy = params.getBalancerStrategyFactory().getBalancerStrategy(referenceTimestamp);
|
||||||
final int maxSegmentsToMove = params.getMaxSegmentsToMove();
|
final int maxSegmentsToMove = params.getMaxSegmentsToMove();
|
||||||
|
|
||||||
for (Map.Entry<String, MinMaxPriorityQueue<ServerHolder>> entry :
|
for (Map.Entry<String, MinMaxPriorityQueue<ServerHolder>> entry :
|
||||||
|
@ -123,9 +123,17 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size());
|
stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size());
|
||||||
}
|
if (params.getEmitBalancingCostParams())
|
||||||
|
{
|
||||||
|
strategy.emitStats(tier, stats, serverHolderList);
|
||||||
|
|
||||||
|
}
|
||||||
|
log.info(
|
||||||
|
"[%s]: Segments Moved: [%d]",tier,currentlyMovingSegments.get(tier).size()
|
||||||
|
);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
return params.buildFromExisting()
|
return params.buildFromExisting()
|
||||||
.withMasterStats(stats)
|
.withMasterStats(stats)
|
||||||
|
|
|
@ -34,6 +34,11 @@ public abstract class DruidMasterConfig
|
||||||
@Default("PT600s")
|
@Default("PT600s")
|
||||||
public abstract Duration getMasterStartDelay();
|
public abstract Duration getMasterStartDelay();
|
||||||
|
|
||||||
|
@Config("druid.master.emitBalancerCostParams")
|
||||||
|
public boolean getEmitBalancerCostParams(){
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
@Config("druid.master.period")
|
@Config("druid.master.period")
|
||||||
@Default("PT60s")
|
@Default("PT60s")
|
||||||
public abstract Duration getMasterPeriod();
|
public abstract Duration getMasterPeriod();
|
||||||
|
|
|
@ -52,6 +52,8 @@ public class DruidMasterRuntimeParams
|
||||||
private final int mergeSegmentsLimit;
|
private final int mergeSegmentsLimit;
|
||||||
private final int maxSegmentsToMove;
|
private final int maxSegmentsToMove;
|
||||||
private final DateTime balancerReferenceTimestamp;
|
private final DateTime balancerReferenceTimestamp;
|
||||||
|
private final boolean emitBalancingCostParams;
|
||||||
|
private final BalancerStrategyFactory strategyFactory;
|
||||||
|
|
||||||
public DruidMasterRuntimeParams(
|
public DruidMasterRuntimeParams(
|
||||||
long startTime,
|
long startTime,
|
||||||
|
@ -68,7 +70,9 @@ public class DruidMasterRuntimeParams
|
||||||
long mergeBytesLimit,
|
long mergeBytesLimit,
|
||||||
int mergeSegmentsLimit,
|
int mergeSegmentsLimit,
|
||||||
int maxSegmentsToMove,
|
int maxSegmentsToMove,
|
||||||
DateTime balancerReferenceTimestamp
|
DateTime balancerReferenceTimestamp,
|
||||||
|
boolean emitBalancingCostParams,
|
||||||
|
BalancerStrategyFactory strategyFactory
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.startTime = startTime;
|
this.startTime = startTime;
|
||||||
|
@ -86,6 +90,13 @@ public class DruidMasterRuntimeParams
|
||||||
this.mergeSegmentsLimit = mergeSegmentsLimit;
|
this.mergeSegmentsLimit = mergeSegmentsLimit;
|
||||||
this.maxSegmentsToMove = maxSegmentsToMove;
|
this.maxSegmentsToMove = maxSegmentsToMove;
|
||||||
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
|
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
|
||||||
|
this.emitBalancingCostParams=emitBalancingCostParams;
|
||||||
|
this.strategyFactory=strategyFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean getEmitBalancingCostParams()
|
||||||
|
{
|
||||||
|
return emitBalancingCostParams;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getStartTime()
|
public long getStartTime()
|
||||||
|
@ -163,9 +174,9 @@ public class DruidMasterRuntimeParams
|
||||||
return balancerReferenceTimestamp;
|
return balancerReferenceTimestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
public BalancerStrategy getBalancerStrategy(DateTime referenceTimestamp)
|
public BalancerStrategyFactory getBalancerStrategyFactory()
|
||||||
{
|
{
|
||||||
return new CostBalancerStrategy(referenceTimestamp);
|
return strategyFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean hasDeletionWaitTimeElapsed()
|
public boolean hasDeletionWaitTimeElapsed()
|
||||||
|
@ -195,7 +206,8 @@ public class DruidMasterRuntimeParams
|
||||||
mergeBytesLimit,
|
mergeBytesLimit,
|
||||||
mergeSegmentsLimit,
|
mergeSegmentsLimit,
|
||||||
maxSegmentsToMove,
|
maxSegmentsToMove,
|
||||||
balancerReferenceTimestamp
|
balancerReferenceTimestamp,
|
||||||
|
emitBalancingCostParams
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -216,6 +228,8 @@ public class DruidMasterRuntimeParams
|
||||||
private int mergeSegmentsLimit;
|
private int mergeSegmentsLimit;
|
||||||
private int maxSegmentsToMove;
|
private int maxSegmentsToMove;
|
||||||
private DateTime balancerReferenceTimestamp;
|
private DateTime balancerReferenceTimestamp;
|
||||||
|
private boolean emitBalancingCostParams;
|
||||||
|
private BalancerStrategyFactory strategyFactory;
|
||||||
|
|
||||||
Builder()
|
Builder()
|
||||||
{
|
{
|
||||||
|
@ -234,6 +248,8 @@ public class DruidMasterRuntimeParams
|
||||||
this.mergeSegmentsLimit = 0;
|
this.mergeSegmentsLimit = 0;
|
||||||
this.maxSegmentsToMove = 0;
|
this.maxSegmentsToMove = 0;
|
||||||
this.balancerReferenceTimestamp = null;
|
this.balancerReferenceTimestamp = null;
|
||||||
|
this.emitBalancingCostParams=false;
|
||||||
|
this.strategyFactory=new CostBalancerStrategyFactory();
|
||||||
}
|
}
|
||||||
|
|
||||||
Builder(
|
Builder(
|
||||||
|
@ -251,7 +267,8 @@ public class DruidMasterRuntimeParams
|
||||||
long mergeBytesLimit,
|
long mergeBytesLimit,
|
||||||
int mergeSegmentsLimit,
|
int mergeSegmentsLimit,
|
||||||
int maxSegmentsToMove,
|
int maxSegmentsToMove,
|
||||||
DateTime balancerReferenceTimestamp
|
DateTime balancerReferenceTimestamp,
|
||||||
|
boolean emitBalancingCostParams
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.startTime = startTime;
|
this.startTime = startTime;
|
||||||
|
@ -269,6 +286,7 @@ public class DruidMasterRuntimeParams
|
||||||
this.mergeSegmentsLimit = mergeSegmentsLimit;
|
this.mergeSegmentsLimit = mergeSegmentsLimit;
|
||||||
this.maxSegmentsToMove = maxSegmentsToMove;
|
this.maxSegmentsToMove = maxSegmentsToMove;
|
||||||
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
|
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
|
||||||
|
this.emitBalancingCostParams=emitBalancingCostParams;
|
||||||
}
|
}
|
||||||
|
|
||||||
public DruidMasterRuntimeParams build()
|
public DruidMasterRuntimeParams build()
|
||||||
|
@ -288,10 +306,21 @@ public class DruidMasterRuntimeParams
|
||||||
mergeBytesLimit,
|
mergeBytesLimit,
|
||||||
mergeSegmentsLimit,
|
mergeSegmentsLimit,
|
||||||
maxSegmentsToMove,
|
maxSegmentsToMove,
|
||||||
balancerReferenceTimestamp
|
balancerReferenceTimestamp,
|
||||||
|
emitBalancingCostParams,
|
||||||
|
strategyFactory
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder withBalancerStrategy(BalancerStrategyFactory strategyFactory){
|
||||||
|
this.strategyFactory=strategyFactory;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder withEmitBalancingCostParams(boolean param){
|
||||||
|
emitBalancingCostParams=param;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
public Builder withStartTime(long time)
|
public Builder withStartTime(long time)
|
||||||
{
|
{
|
||||||
startTime = time;
|
startTime = time;
|
||||||
|
|
|
@ -1,26 +1,39 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
package com.metamx.druid.master;
|
package com.metamx.druid.master;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.metamx.druid.client.DataSegment;
|
import com.metamx.druid.client.DataSegment;
|
||||||
import org.joda.time.DateTime;
|
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
public class RandomBalancerStrategy extends BalancerStrategy
|
public class RandomBalancerStrategy implements BalancerStrategy
|
||||||
{
|
{
|
||||||
private final ReservoirSegmentSampler sampler = new ReservoirSegmentSampler();
|
private final ReservoirSegmentSampler sampler = new ReservoirSegmentSampler();
|
||||||
|
|
||||||
public RandomBalancerStrategy(DateTime referenceTimestamp)
|
|
||||||
{
|
|
||||||
super(referenceTimestamp);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ServerHolder findNewSegmentHome(
|
public ServerHolder findNewSegmentHome(
|
||||||
DataSegment proposalSegment, Iterable<ServerHolder> serverHolders
|
DataSegment proposalSegment, Iterable<ServerHolder> serverHolders
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
return sampler.getRandomServerHolder(Lists.newArrayList(serverHolders));
|
return sampler.getRandomServerHolder(Lists.newArrayList(serverHolders));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -28,4 +41,12 @@ public class RandomBalancerStrategy extends BalancerStrategy
|
||||||
{
|
{
|
||||||
return sampler.getRandomBalancerSegmentHolder(serverHolders);
|
return sampler.getRandomBalancerSegmentHolder(serverHolders);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void emitStats(
|
||||||
|
String tier, MasterStats stats, List<ServerHolder> serverHolderList
|
||||||
|
)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
package com.metamx.druid.master;
|
||||||
|
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
|
public class RandomBalancerStrategyFactory implements BalancerStrategyFactory
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public BalancerStrategy getBalancerStrategy(DateTime referenceTimestamp)
|
||||||
|
{
|
||||||
|
return new RandomBalancerStrategy();
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,3 +1,22 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
package com.metamx.druid.master;
|
package com.metamx.druid.master;
|
||||||
|
|
||||||
import com.metamx.druid.client.DataSegment;
|
import com.metamx.druid.client.DataSegment;
|
||||||
|
@ -7,9 +26,10 @@ import java.util.Random;
|
||||||
|
|
||||||
public class ReservoirSegmentSampler
|
public class ReservoirSegmentSampler
|
||||||
{
|
{
|
||||||
|
private final Random rand = new Random();
|
||||||
|
|
||||||
public BalancerSegmentHolder getRandomBalancerSegmentHolder(final List<ServerHolder> serverHolders)
|
public BalancerSegmentHolder getRandomBalancerSegmentHolder(final List<ServerHolder> serverHolders)
|
||||||
{
|
{
|
||||||
Random rand = new Random(0);
|
|
||||||
ServerHolder fromServerHolder = null;
|
ServerHolder fromServerHolder = null;
|
||||||
DataSegment proposalSegment = null;
|
DataSegment proposalSegment = null;
|
||||||
int numSoFar = 0;
|
int numSoFar = 0;
|
||||||
|
@ -31,17 +51,16 @@ public class ReservoirSegmentSampler
|
||||||
|
|
||||||
public ServerHolder getRandomServerHolder(final List<ServerHolder> serverHolders)
|
public ServerHolder getRandomServerHolder(final List<ServerHolder> serverHolders)
|
||||||
{
|
{
|
||||||
ServerHolder fromServerHolder = null;
|
ServerHolder serverHolder = null;
|
||||||
Random rand = new Random(0);
|
|
||||||
int numSoFar = 0;
|
int numSoFar = 0;
|
||||||
for (ServerHolder server : serverHolders) {
|
for (ServerHolder server : serverHolders) {
|
||||||
int randNum = rand.nextInt(numSoFar + 1);
|
int randNum = rand.nextInt(numSoFar + 1);
|
||||||
|
|
||||||
if(randNum==numSoFar) {
|
if (randNum == numSoFar) {
|
||||||
fromServerHolder=server;
|
serverHolder = server;
|
||||||
}
|
}
|
||||||
numSoFar++;
|
numSoFar++;
|
||||||
}
|
}
|
||||||
return fromServerHolder;
|
return serverHolder;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,7 +60,7 @@ public abstract class LoadRule implements Rule
|
||||||
|
|
||||||
final List<ServerHolder> serverHolderList = new ArrayList<ServerHolder>(serverQueue);
|
final List<ServerHolder> serverHolderList = new ArrayList<ServerHolder>(serverQueue);
|
||||||
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
|
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
|
||||||
final BalancerStrategy strategy = params.getBalancerStrategy(referenceTimestamp);
|
final BalancerStrategy strategy = params.getBalancerStrategyFactory().getBalancerStrategy(referenceTimestamp);
|
||||||
if (params.getAvailableSegments().contains(segment)) {
|
if (params.getAvailableSegments().contains(segment)) {
|
||||||
stats.accumulate(
|
stats.accumulate(
|
||||||
assign(
|
assign(
|
||||||
|
|
|
@ -194,6 +194,70 @@ public class DruidMasterBalancerTest
|
||||||
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
|
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRun1RandomSampler(){
|
||||||
|
// Mock some servers of different usages
|
||||||
|
|
||||||
|
EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce();
|
||||||
|
EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
|
||||||
|
EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
|
||||||
|
EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes();
|
||||||
|
EasyMock.expect(druidServer1.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
|
||||||
|
EasyMock.replay(druidServer1);
|
||||||
|
|
||||||
|
EasyMock.expect(druidServer2.getName()).andReturn("to").atLeastOnce();
|
||||||
|
EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
|
||||||
|
EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce();
|
||||||
|
EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
|
||||||
|
EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
|
||||||
|
EasyMock.expect(druidServer2.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
|
||||||
|
EasyMock.replay(druidServer2);
|
||||||
|
|
||||||
|
EasyMock.replay(druidServer3);
|
||||||
|
EasyMock.replay(druidServer4);
|
||||||
|
|
||||||
|
// Mock stuff that the master needs
|
||||||
|
master.moveSegment(
|
||||||
|
EasyMock.<String>anyObject(),
|
||||||
|
EasyMock.<String>anyObject(),
|
||||||
|
EasyMock.<String>anyObject(),
|
||||||
|
EasyMock.<LoadPeonCallback>anyObject()
|
||||||
|
);
|
||||||
|
EasyMock.expectLastCall().anyTimes();
|
||||||
|
EasyMock.replay(master);
|
||||||
|
|
||||||
|
LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
|
||||||
|
LoadQueuePeonTester toPeon = new LoadQueuePeonTester();
|
||||||
|
|
||||||
|
DruidMasterRuntimeParams params =
|
||||||
|
DruidMasterRuntimeParams.newBuilder()
|
||||||
|
.withDruidCluster(
|
||||||
|
new DruidCluster(
|
||||||
|
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
|
||||||
|
"normal",
|
||||||
|
MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator)
|
||||||
|
.create(
|
||||||
|
Arrays.asList(
|
||||||
|
new ServerHolder(druidServer1, fromPeon),
|
||||||
|
new ServerHolder(druidServer2, toPeon)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.withLoadManagementPeons(ImmutableMap.<String, LoadQueuePeon>of("from", fromPeon, "to", toPeon))
|
||||||
|
.withAvailableSegments(segments.values())
|
||||||
|
.withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
|
||||||
|
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
|
||||||
|
.withBalancerStrategy(new RandomBalancerStrategyFactory())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
params = new DruidMasterBalancerTester(master).run(params);
|
||||||
|
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRun2()
|
public void testRun2()
|
||||||
{
|
{
|
||||||
|
@ -271,4 +335,84 @@ public class DruidMasterBalancerTest
|
||||||
params = new DruidMasterBalancerTester(master).run(params);
|
params = new DruidMasterBalancerTester(master).run(params);
|
||||||
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
|
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRun2RandomSampler()
|
||||||
|
{
|
||||||
|
// Mock some servers of different usages
|
||||||
|
EasyMock.expect(druidServer1.getName()).andReturn("1").atLeastOnce();
|
||||||
|
EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
|
||||||
|
EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
|
||||||
|
EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes();
|
||||||
|
EasyMock.expect(druidServer1.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
|
||||||
|
EasyMock.replay(druidServer1);
|
||||||
|
|
||||||
|
EasyMock.expect(druidServer2.getName()).andReturn("2").atLeastOnce();
|
||||||
|
EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
|
||||||
|
EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce();
|
||||||
|
EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
|
||||||
|
EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
|
||||||
|
EasyMock.expect(druidServer2.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
|
||||||
|
EasyMock.replay(druidServer2);
|
||||||
|
|
||||||
|
EasyMock.expect(druidServer3.getName()).andReturn("3").atLeastOnce();
|
||||||
|
EasyMock.expect(druidServer3.getTier()).andReturn("normal").anyTimes();
|
||||||
|
EasyMock.expect(druidServer3.getCurrSize()).andReturn(0L).atLeastOnce();
|
||||||
|
EasyMock.expect(druidServer3.getMaxSize()).andReturn(100L).atLeastOnce();
|
||||||
|
EasyMock.expect(druidServer3.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
|
||||||
|
EasyMock.expect(druidServer3.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
|
||||||
|
EasyMock.replay(druidServer3);
|
||||||
|
|
||||||
|
EasyMock.expect(druidServer4.getName()).andReturn("4").atLeastOnce();
|
||||||
|
EasyMock.expect(druidServer4.getTier()).andReturn("normal").anyTimes();
|
||||||
|
EasyMock.expect(druidServer4.getCurrSize()).andReturn(0L).atLeastOnce();
|
||||||
|
EasyMock.expect(druidServer4.getMaxSize()).andReturn(100L).atLeastOnce();
|
||||||
|
EasyMock.expect(druidServer4.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
|
||||||
|
EasyMock.expect(druidServer4.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
|
||||||
|
EasyMock.replay(druidServer4);
|
||||||
|
|
||||||
|
// Mock stuff that the master needs
|
||||||
|
master.moveSegment(
|
||||||
|
EasyMock.<String>anyObject(),
|
||||||
|
EasyMock.<String>anyObject(),
|
||||||
|
EasyMock.<String>anyObject(),
|
||||||
|
EasyMock.<LoadPeonCallback>anyObject()
|
||||||
|
);
|
||||||
|
EasyMock.expectLastCall().anyTimes();
|
||||||
|
EasyMock.replay(master);
|
||||||
|
|
||||||
|
LoadQueuePeonTester peon1 = new LoadQueuePeonTester();
|
||||||
|
LoadQueuePeonTester peon2 = new LoadQueuePeonTester();
|
||||||
|
LoadQueuePeonTester peon3 = new LoadQueuePeonTester();
|
||||||
|
LoadQueuePeonTester peon4 = new LoadQueuePeonTester();
|
||||||
|
|
||||||
|
DruidMasterRuntimeParams params =
|
||||||
|
DruidMasterRuntimeParams.newBuilder()
|
||||||
|
.withDruidCluster(
|
||||||
|
new DruidCluster(
|
||||||
|
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
|
||||||
|
"normal",
|
||||||
|
MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator)
|
||||||
|
.create(
|
||||||
|
Arrays.asList(
|
||||||
|
new ServerHolder(druidServer1, peon1),
|
||||||
|
new ServerHolder(druidServer2, peon2),
|
||||||
|
new ServerHolder(druidServer3, peon3),
|
||||||
|
new ServerHolder(druidServer4, peon4)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.withLoadManagementPeons(ImmutableMap.<String, LoadQueuePeon>of("1", peon1, "2", peon2, "3", peon3, "4", peon4))
|
||||||
|
.withAvailableSegments(segments.values())
|
||||||
|
.withMaxSegmentsToMove(5000)
|
||||||
|
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
|
||||||
|
.withBalancerStrategy(new RandomBalancerStrategyFactory())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
params = new DruidMasterBalancerTester(master).run(params);
|
||||||
|
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
package com.metamx.druid.master;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class RandomBalancerStrategyTest
|
||||||
|
{
|
||||||
|
@Test
|
||||||
|
public void testFindNewSegmentHome() throws Exception
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPickSegmentToMove() throws Exception
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEmitStats() throws Exception
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue