mirror of https://github.com/apache/druid.git
Merge pull request #399 from metamx/multithread-costbalancer
Multithread costbalancer
This commit is contained in:
commit
0ee2323875
|
@ -31,6 +31,7 @@ public class CoordinatorDynamicConfig
|
||||||
private final int maxSegmentsToMove;
|
private final int maxSegmentsToMove;
|
||||||
private final int replicantLifetime;
|
private final int replicantLifetime;
|
||||||
private final int replicationThrottleLimit;
|
private final int replicationThrottleLimit;
|
||||||
|
private final int balancerComputeThreads;
|
||||||
private final boolean emitBalancingStats;
|
private final boolean emitBalancingStats;
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
|
@ -41,6 +42,7 @@ public class CoordinatorDynamicConfig
|
||||||
@JsonProperty("maxSegmentsToMove") int maxSegmentsToMove,
|
@JsonProperty("maxSegmentsToMove") int maxSegmentsToMove,
|
||||||
@JsonProperty("replicantLifetime") int replicantLifetime,
|
@JsonProperty("replicantLifetime") int replicantLifetime,
|
||||||
@JsonProperty("replicationThrottleLimit") int replicationThrottleLimit,
|
@JsonProperty("replicationThrottleLimit") int replicationThrottleLimit,
|
||||||
|
@JsonProperty("balancerComputeThreads") int balancerComputeThreads,
|
||||||
@JsonProperty("emitBalancingStats") boolean emitBalancingStats
|
@JsonProperty("emitBalancingStats") boolean emitBalancingStats
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
@ -51,6 +53,7 @@ public class CoordinatorDynamicConfig
|
||||||
this.replicantLifetime = replicantLifetime;
|
this.replicantLifetime = replicantLifetime;
|
||||||
this.replicationThrottleLimit = replicationThrottleLimit;
|
this.replicationThrottleLimit = replicationThrottleLimit;
|
||||||
this.emitBalancingStats = emitBalancingStats;
|
this.emitBalancingStats = emitBalancingStats;
|
||||||
|
this.balancerComputeThreads = Math.min(balancerComputeThreads, Runtime.getRuntime().availableProcessors() - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
|
@ -95,6 +98,12 @@ public class CoordinatorDynamicConfig
|
||||||
return replicationThrottleLimit;
|
return replicationThrottleLimit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
public int getBalancerComputeThreads()
|
||||||
|
{
|
||||||
|
return balancerComputeThreads;
|
||||||
|
}
|
||||||
|
|
||||||
public static class Builder
|
public static class Builder
|
||||||
{
|
{
|
||||||
private long millisToWaitBeforeDeleting;
|
private long millisToWaitBeforeDeleting;
|
||||||
|
@ -104,10 +113,11 @@ public class CoordinatorDynamicConfig
|
||||||
private int replicantLifetime;
|
private int replicantLifetime;
|
||||||
private int replicationThrottleLimit;
|
private int replicationThrottleLimit;
|
||||||
private boolean emitBalancingStats;
|
private boolean emitBalancingStats;
|
||||||
|
private int balancerComputeThreads;
|
||||||
|
|
||||||
public Builder()
|
public Builder()
|
||||||
{
|
{
|
||||||
this(15 * 60 * 1000L, 100000000L, Integer.MAX_VALUE, 5, 15, 10, false);
|
this(15 * 60 * 1000L, 100000000L, Integer.MAX_VALUE, 5, 15, 10, 1, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Builder(
|
private Builder(
|
||||||
|
@ -117,6 +127,7 @@ public class CoordinatorDynamicConfig
|
||||||
int maxSegmentsToMove,
|
int maxSegmentsToMove,
|
||||||
int replicantLifetime,
|
int replicantLifetime,
|
||||||
int replicationThrottleLimit,
|
int replicationThrottleLimit,
|
||||||
|
int balancerComputeThreads,
|
||||||
boolean emitBalancingStats
|
boolean emitBalancingStats
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
@ -127,6 +138,7 @@ public class CoordinatorDynamicConfig
|
||||||
this.replicantLifetime = replicantLifetime;
|
this.replicantLifetime = replicantLifetime;
|
||||||
this.replicationThrottleLimit = replicationThrottleLimit;
|
this.replicationThrottleLimit = replicationThrottleLimit;
|
||||||
this.emitBalancingStats = emitBalancingStats;
|
this.emitBalancingStats = emitBalancingStats;
|
||||||
|
this.balancerComputeThreads = balancerComputeThreads;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder withMillisToWaitBeforeDeleting(long millisToWaitBeforeDeleting)
|
public Builder withMillisToWaitBeforeDeleting(long millisToWaitBeforeDeleting)
|
||||||
|
@ -165,6 +177,12 @@ public class CoordinatorDynamicConfig
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder withBalancerComputeThreads(int balancerComputeThreads)
|
||||||
|
{
|
||||||
|
this.balancerComputeThreads = balancerComputeThreads;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public CoordinatorDynamicConfig build()
|
public CoordinatorDynamicConfig build()
|
||||||
{
|
{
|
||||||
return new CoordinatorDynamicConfig(
|
return new CoordinatorDynamicConfig(
|
||||||
|
@ -174,6 +192,7 @@ public class CoordinatorDynamicConfig
|
||||||
maxSegmentsToMove,
|
maxSegmentsToMove,
|
||||||
replicantLifetime,
|
replicantLifetime,
|
||||||
replicationThrottleLimit,
|
replicationThrottleLimit,
|
||||||
|
balancerComputeThreads,
|
||||||
emitBalancingStats
|
emitBalancingStats
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,11 @@
|
||||||
|
|
||||||
package io.druid.server.coordinator;
|
package io.druid.server.coordinator;
|
||||||
|
|
||||||
|
import com.google.api.client.util.Lists;
|
||||||
|
import com.google.common.util.concurrent.Futures;
|
||||||
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||||
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import com.metamx.common.Pair;
|
import com.metamx.common.Pair;
|
||||||
import com.metamx.emitter.EmittingLogger;
|
import com.metamx.emitter.EmittingLogger;
|
||||||
import io.druid.timeline.DataSegment;
|
import io.druid.timeline.DataSegment;
|
||||||
|
@ -26,6 +31,8 @@ import org.joda.time.DateTime;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
public class CostBalancerStrategy implements BalancerStrategy
|
public class CostBalancerStrategy implements BalancerStrategy
|
||||||
{
|
{
|
||||||
|
@ -33,11 +40,13 @@ public class CostBalancerStrategy implements BalancerStrategy
|
||||||
private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24;
|
private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24;
|
||||||
private static final int SEVEN_DAYS_IN_MILLIS = 7 * DAY_IN_MILLIS;
|
private static final int SEVEN_DAYS_IN_MILLIS = 7 * DAY_IN_MILLIS;
|
||||||
private static final int THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS;
|
private static final int THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS;
|
||||||
private final DateTime referenceTimestamp;
|
private final long referenceTimestamp;
|
||||||
|
private final int threadCount;
|
||||||
|
|
||||||
public CostBalancerStrategy(DateTime referenceTimestamp)
|
public CostBalancerStrategy(DateTime referenceTimestamp, int threadCount)
|
||||||
{
|
{
|
||||||
this.referenceTimestamp = referenceTimestamp;
|
this.referenceTimestamp = referenceTimestamp.getMillis();
|
||||||
|
this.threadCount = threadCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -61,55 +70,6 @@ public class CostBalancerStrategy implements BalancerStrategy
|
||||||
return chooseBestServer(proposalSegment, serverHolders, true).rhs;
|
return chooseBestServer(proposalSegment, serverHolders, true).rhs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* For assignment, we want to move to the lowest cost server that isn't already serving the segment.
|
|
||||||
*
|
|
||||||
* @param proposalSegment A DataSegment that we are proposing to move.
|
|
||||||
* @param serverHolders An iterable of ServerHolders for a particular tier.
|
|
||||||
*
|
|
||||||
* @return A ServerHolder with the new home for a segment.
|
|
||||||
*/
|
|
||||||
|
|
||||||
private Pair<Double, ServerHolder> chooseBestServer(
|
|
||||||
final DataSegment proposalSegment,
|
|
||||||
final Iterable<ServerHolder> serverHolders,
|
|
||||||
boolean includeCurrentServer
|
|
||||||
)
|
|
||||||
{
|
|
||||||
|
|
||||||
Pair<Double, ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY, null);
|
|
||||||
final long proposalSegmentSize = proposalSegment.getSize();
|
|
||||||
|
|
||||||
for (ServerHolder server : serverHolders) {
|
|
||||||
if (includeCurrentServer || !server.isServingSegment(proposalSegment)) {
|
|
||||||
/** Don't calculate cost if the server doesn't have enough space or is loading the segment */
|
|
||||||
if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** The contribution to the total cost of a given server by proposing to move the segment to that server is... */
|
|
||||||
double cost = 0f;
|
|
||||||
/** the sum of the costs of other (exclusive of the proposalSegment) segments on the server */
|
|
||||||
for (DataSegment segment : server.getServer().getSegments().values()) {
|
|
||||||
if (!proposalSegment.equals(segment)) {
|
|
||||||
cost += computeJointSegmentCosts(proposalSegment, segment);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/** plus the costs of segments that will be loaded */
|
|
||||||
for (DataSegment segment : server.getPeon().getSegmentsToLoad()) {
|
|
||||||
cost += computeJointSegmentCosts(proposalSegment, segment);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (cost < bestServer.lhs) {
|
|
||||||
bestServer = Pair.of(cost, server);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return bestServer;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This defines the unnormalized cost function between two segments. There is a base cost given by
|
* This defines the unnormalized cost function between two segments. There is a base cost given by
|
||||||
* the minimum size of the two segments and additional penalties.
|
* the minimum size of the two segments and additional penalties.
|
||||||
|
@ -136,12 +96,8 @@ public class CostBalancerStrategy implements BalancerStrategy
|
||||||
dataSourcePenalty = 2;
|
dataSourcePenalty = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
double maxDiff = Math.max(
|
double segment1diff = referenceTimestamp - segment1.getInterval().getEndMillis();
|
||||||
referenceTimestamp.getMillis() - segment1.getInterval().getEndMillis(),
|
double segment2diff = referenceTimestamp - segment2.getInterval().getEndMillis();
|
||||||
referenceTimestamp.getMillis() - segment2.getInterval().getEndMillis()
|
|
||||||
);
|
|
||||||
double segment1diff = referenceTimestamp.getMillis() - segment1.getInterval().getEndMillis();
|
|
||||||
double segment2diff = referenceTimestamp.getMillis() - segment2.getInterval().getEndMillis();
|
|
||||||
if (segment1diff < SEVEN_DAYS_IN_MILLIS && segment2diff < SEVEN_DAYS_IN_MILLIS) {
|
if (segment1diff < SEVEN_DAYS_IN_MILLIS && segment2diff < SEVEN_DAYS_IN_MILLIS) {
|
||||||
recencyPenalty = (2 - segment1diff / SEVEN_DAYS_IN_MILLIS) * (2 - segment2diff / SEVEN_DAYS_IN_MILLIS);
|
recencyPenalty = (2 - segment1diff / SEVEN_DAYS_IN_MILLIS) * (2 - segment2diff / SEVEN_DAYS_IN_MILLIS);
|
||||||
}
|
}
|
||||||
|
@ -230,6 +186,87 @@ public class CostBalancerStrategy implements BalancerStrategy
|
||||||
normalization,
|
normalization,
|
||||||
normalizedInitialCost
|
normalizedInitialCost
|
||||||
);
|
);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
protected double computeCost(
|
||||||
|
final DataSegment proposalSegment, final ServerHolder server, final boolean includeCurrentServer
|
||||||
|
)
|
||||||
|
{
|
||||||
|
final long proposalSegmentSize = proposalSegment.getSize();
|
||||||
|
|
||||||
|
if (includeCurrentServer || !server.isServingSegment(proposalSegment)) {
|
||||||
|
/** Don't calculate cost if the server doesn't have enough space or is loading the segment */
|
||||||
|
if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) {
|
||||||
|
return Double.POSITIVE_INFINITY;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** The contribution to the total cost of a given server by proposing to move the segment to that server is... */
|
||||||
|
double cost = 0f;
|
||||||
|
/** the sum of the costs of other (exclusive of the proposalSegment) segments on the server */
|
||||||
|
for (DataSegment segment : server.getServer().getSegments().values()) {
|
||||||
|
if (!proposalSegment.equals(segment)) {
|
||||||
|
cost += computeJointSegmentCosts(proposalSegment, segment);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/** plus the costs of segments that will be loaded */
|
||||||
|
for (DataSegment segment : server.getPeon().getSegmentsToLoad()) {
|
||||||
|
cost += computeJointSegmentCosts(proposalSegment, segment);
|
||||||
|
}
|
||||||
|
return cost;
|
||||||
|
}
|
||||||
|
return Double.POSITIVE_INFINITY;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For assignment, we want to move to the lowest cost server that isn't already serving the segment.
|
||||||
|
*
|
||||||
|
* @param proposalSegment A DataSegment that we are proposing to move.
|
||||||
|
* @param serverHolders An iterable of ServerHolders for a particular tier.
|
||||||
|
*
|
||||||
|
* @return A ServerHolder with the new home for a segment.
|
||||||
|
*/
|
||||||
|
|
||||||
|
protected Pair<Double, ServerHolder> chooseBestServer(
|
||||||
|
final DataSegment proposalSegment,
|
||||||
|
final Iterable<ServerHolder> serverHolders,
|
||||||
|
final boolean includeCurrentServer
|
||||||
|
)
|
||||||
|
{
|
||||||
|
Pair<Double, ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY, null);
|
||||||
|
|
||||||
|
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadCount));
|
||||||
|
List<ListenableFuture<Pair<Double, ServerHolder>>> futures = Lists.newArrayList();
|
||||||
|
|
||||||
|
for (final ServerHolder server : serverHolders) {
|
||||||
|
futures.add(
|
||||||
|
service.submit(
|
||||||
|
new Callable<Pair<Double, ServerHolder>>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Pair<Double, ServerHolder> call() throws Exception
|
||||||
|
{
|
||||||
|
return Pair.of(computeCost(proposalSegment, server, includeCurrentServer), server);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
final ListenableFuture<List<Pair<Double, ServerHolder>>> resultsFuture = Futures.allAsList(futures);
|
||||||
|
|
||||||
|
try {
|
||||||
|
for (Pair<Double, ServerHolder> server : resultsFuture.get()) {
|
||||||
|
if (server.lhs < bestServer.lhs) {
|
||||||
|
bestServer = server;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to complete cost computation.").emit();
|
||||||
|
}
|
||||||
|
service.shutdown();
|
||||||
|
return bestServer;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,10 +22,16 @@ import org.joda.time.DateTime;
|
||||||
|
|
||||||
public class CostBalancerStrategyFactory implements BalancerStrategyFactory
|
public class CostBalancerStrategyFactory implements BalancerStrategyFactory
|
||||||
{
|
{
|
||||||
|
private final int threadCount;
|
||||||
|
|
||||||
|
public CostBalancerStrategyFactory(int costBalancerStrategyThreadCount)
|
||||||
|
{
|
||||||
|
this.threadCount = costBalancerStrategyThreadCount;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp)
|
public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp)
|
||||||
{
|
{
|
||||||
return new CostBalancerStrategy(referenceTimestamp);
|
return new CostBalancerStrategy(referenceTimestamp, threadCount);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,10 +31,12 @@ import com.google.common.io.Closeables;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.metamx.common.IAE;
|
import com.metamx.common.IAE;
|
||||||
import com.metamx.common.Pair;
|
import com.metamx.common.Pair;
|
||||||
|
import com.metamx.common.concurrent.ExecutorServices;
|
||||||
import com.metamx.common.concurrent.ScheduledExecutorFactory;
|
import com.metamx.common.concurrent.ScheduledExecutorFactory;
|
||||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||||
import com.metamx.common.guava.Comparators;
|
import com.metamx.common.guava.Comparators;
|
||||||
import com.metamx.common.guava.FunctionalIterable;
|
import com.metamx.common.guava.FunctionalIterable;
|
||||||
|
import com.metamx.common.lifecycle.Lifecycle;
|
||||||
import com.metamx.common.lifecycle.LifecycleStart;
|
import com.metamx.common.lifecycle.LifecycleStart;
|
||||||
import com.metamx.common.lifecycle.LifecycleStop;
|
import com.metamx.common.lifecycle.LifecycleStop;
|
||||||
import com.metamx.emitter.EmittingLogger;
|
import com.metamx.emitter.EmittingLogger;
|
||||||
|
@ -753,6 +755,9 @@ public class DruidCoordinator
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
BalancerStrategyFactory factory =
|
||||||
|
new CostBalancerStrategyFactory(getDynamicConfigs().getBalancerComputeThreads());
|
||||||
|
|
||||||
// Do coordinator stuff.
|
// Do coordinator stuff.
|
||||||
DruidCoordinatorRuntimeParams params =
|
DruidCoordinatorRuntimeParams params =
|
||||||
DruidCoordinatorRuntimeParams.newBuilder()
|
DruidCoordinatorRuntimeParams.newBuilder()
|
||||||
|
@ -760,6 +765,7 @@ public class DruidCoordinator
|
||||||
.withDatasources(databaseSegmentManager.getInventory())
|
.withDatasources(databaseSegmentManager.getInventory())
|
||||||
.withDynamicConfigs(getDynamicConfigs())
|
.withDynamicConfigs(getDynamicConfigs())
|
||||||
.withEmitter(emitter)
|
.withEmitter(emitter)
|
||||||
|
.withBalancerStrategyFactory(factory)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
for (DruidCoordinatorHelper helper : helpers) {
|
for (DruidCoordinatorHelper helper : helpers) {
|
||||||
|
|
|
@ -206,7 +206,7 @@ public class DruidCoordinatorRuntimeParams
|
||||||
this.stats = new CoordinatorStats();
|
this.stats = new CoordinatorStats();
|
||||||
this.coordinatorDynamicConfig = new CoordinatorDynamicConfig.Builder().build();
|
this.coordinatorDynamicConfig = new CoordinatorDynamicConfig.Builder().build();
|
||||||
this.balancerReferenceTimestamp = null;
|
this.balancerReferenceTimestamp = null;
|
||||||
this.strategyFactory = new CostBalancerStrategyFactory();
|
this.strategyFactory = new CostBalancerStrategyFactory(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
Builder(
|
Builder(
|
||||||
|
|
|
@ -0,0 +1,149 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.server.coordination;
|
||||||
|
|
||||||
|
import com.google.api.client.util.Lists;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import io.druid.client.DruidServer;
|
||||||
|
import io.druid.server.coordinator.BalancerStrategy;
|
||||||
|
import io.druid.server.coordinator.CostBalancerStrategy;
|
||||||
|
import io.druid.server.coordinator.LoadQueuePeonTester;
|
||||||
|
import io.druid.server.coordinator.ServerHolder;
|
||||||
|
import io.druid.timeline.DataSegment;
|
||||||
|
import junit.framework.Assert;
|
||||||
|
import org.easymock.EasyMock;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
import org.joda.time.DateTimeZone;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class CostBalancerStrategyTest
|
||||||
|
{
|
||||||
|
private final List<ServerHolder> serverHolderList = Lists.newArrayList();
|
||||||
|
private final Interval day = DateTime.now().toDateMidnight().toInterval();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create Druid cluster with 10 servers having 100 segments each, and 1 server with 98 segment
|
||||||
|
* Cost Balancer Strategy should assign the next segment to the server with less segments.
|
||||||
|
*/
|
||||||
|
public void setupDummyCluster(int serverCount, int maxSegments)
|
||||||
|
{
|
||||||
|
// Create 10 servers with current size being 3K & max size being 10K
|
||||||
|
// Each having having 100 segments
|
||||||
|
for (int i = 0; i < serverCount; i++) {
|
||||||
|
LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
|
||||||
|
DruidServer druidServer = EasyMock.createMock(DruidServer.class);
|
||||||
|
EasyMock.expect(druidServer.getName()).andReturn("DruidServer_Name_" + i).anyTimes();
|
||||||
|
EasyMock.expect(druidServer.getCurrSize()).andReturn(3000L).anyTimes();
|
||||||
|
EasyMock.expect(druidServer.getMaxSize()).andReturn(10000000L).anyTimes();
|
||||||
|
|
||||||
|
EasyMock.expect(druidServer.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
|
||||||
|
Map<String, DataSegment> segments = Maps.newHashMap();
|
||||||
|
for (int j = 0; j < maxSegments; j++) {
|
||||||
|
DataSegment segment = getSegment(j);
|
||||||
|
segments.put(segment.getIdentifier(), segment);
|
||||||
|
EasyMock.expect(druidServer.getSegment(segment.getIdentifier())).andReturn(segment).anyTimes();
|
||||||
|
}
|
||||||
|
|
||||||
|
EasyMock.expect(druidServer.getSegments()).andReturn(segments).anyTimes();
|
||||||
|
|
||||||
|
EasyMock.replay(druidServer);
|
||||||
|
serverHolderList.add(new ServerHolder(druidServer, fromPeon));
|
||||||
|
}
|
||||||
|
|
||||||
|
// The best server to be available for next segment assignment has only 98 Segments
|
||||||
|
LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
|
||||||
|
DruidServer druidServer = EasyMock.createMock(DruidServer.class);
|
||||||
|
EasyMock.expect(druidServer.getName()).andReturn("BEST_SERVER").anyTimes();
|
||||||
|
EasyMock.expect(druidServer.getCurrSize()).andReturn(3000L).anyTimes();
|
||||||
|
EasyMock.expect(druidServer.getMaxSize()).andReturn(10000000L).anyTimes();
|
||||||
|
|
||||||
|
EasyMock.expect(druidServer.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
|
||||||
|
Map<String, DataSegment> segments = Maps.newHashMap();
|
||||||
|
for (int j = 0; j < (maxSegments - 2); j++) {
|
||||||
|
DataSegment segment = getSegment(j);
|
||||||
|
segments.put(segment.getIdentifier(), segment);
|
||||||
|
EasyMock.expect(druidServer.getSegment(segment.getIdentifier())).andReturn(segment).anyTimes();
|
||||||
|
}
|
||||||
|
EasyMock.expect(druidServer.getSegments()).andReturn(segments).anyTimes();
|
||||||
|
|
||||||
|
EasyMock.replay(druidServer);
|
||||||
|
serverHolderList.add(new ServerHolder(druidServer, fromPeon));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns segment with dummy id and size 100
|
||||||
|
*
|
||||||
|
* @param index
|
||||||
|
*
|
||||||
|
* @return segment
|
||||||
|
*/
|
||||||
|
private DataSegment getSegment(int index)
|
||||||
|
{
|
||||||
|
// Not using EasyMock as it hampers the performance of multithreads.
|
||||||
|
DataSegment segment = new DataSegment(
|
||||||
|
"DUMMY", day, String.valueOf(index), Maps.<String, Object>newConcurrentMap(),
|
||||||
|
Lists.<String>newArrayList(), Lists.<String>newArrayList(), null, 0, index * 100L
|
||||||
|
);
|
||||||
|
return segment;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCostBalancerMultithreadStrategy() throws InterruptedException
|
||||||
|
{
|
||||||
|
setupDummyCluster(10, 20);
|
||||||
|
DataSegment segment = getSegment(1000);
|
||||||
|
|
||||||
|
BalancerStrategy strategy = new CostBalancerStrategy(DateTime.now(DateTimeZone.UTC), 1);
|
||||||
|
ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
|
||||||
|
Assert.assertNotNull("Should be able to find a place for new segment!!", holder);
|
||||||
|
Assert.assertEquals("Best Server should be BEST_SERVER", "BEST_SERVER", holder.getServer().getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPerf() throws InterruptedException
|
||||||
|
{
|
||||||
|
setupDummyCluster(100, 500);
|
||||||
|
DataSegment segment = getSegment(1000);
|
||||||
|
|
||||||
|
BalancerStrategy singleThreadStrategy = new CostBalancerStrategy(DateTime.now(DateTimeZone.UTC), 1);
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
singleThreadStrategy.findNewSegmentHomeReplicator(segment, serverHolderList);
|
||||||
|
long end = System.currentTimeMillis();
|
||||||
|
long latencySingleThread = end - start;
|
||||||
|
|
||||||
|
BalancerStrategy strategy = new CostBalancerStrategy(DateTime.now(DateTimeZone.UTC), 4);
|
||||||
|
start = System.currentTimeMillis();
|
||||||
|
strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
|
||||||
|
end = System.currentTimeMillis();
|
||||||
|
long latencyMultiThread = end - start;
|
||||||
|
|
||||||
|
System.err.println("Latency - Single Threaded (ms): " + latencySingleThread);
|
||||||
|
System.err.println("Latency - Multi Threaded (ms): " + latencyMultiThread);
|
||||||
|
|
||||||
|
Assert.assertTrue("Latency of multi-thread strategy should always be less than single thread.", latencyMultiThread < latencySingleThread);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue