Normalized Cost Balancer (#3632)

* Normalized Cost Balancer

* Adding documentation and renaming to use diskNormalizedCostBalancer

* Remove balancer from the strings

* Update docs and include random cost balancer

* Fix checkstyle issues
This commit is contained in:
Niketh Sabbineni 2016-12-05 17:18:20 -08:00 committed by cheddar
parent c74d267f50
commit d904c79081
17 changed files with 460 additions and 130 deletions

View File

@ -32,6 +32,7 @@ The coordinator node uses several of the global configs in [Configuration](../co
|`druid.coordinator.kill.period`|How often to send kill tasks to the indexing service. Value must be greater than `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|P1D (1 Day)|
|`druid.coordinator.kill.durationToRetain`| Do not kill segments in last `durationToRetain`, must be greater or equal to 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|PT-1S (-1 seconds)|
|`druid.coordinator.kill.maxSegments`|Kill at most n segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|0|
|`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy that the coordinator should use to distribute segments among the historicals. Use `diskNormalized` to distribute segments among nodes so that the disks fill up uniformly and use `random` to randomly pick nodes to distribute segments.|`cost`|
### Metadata Retrieval

View File

@ -18,11 +18,17 @@
*/
package io.druid.server.coordinator;
import org.joda.time.DateTime;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.util.concurrent.ListeningExecutorService;
import java.io.Closeable;
public interface BalancerStrategyFactory extends Closeable
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "strategy", defaultImpl = CostBalancerStrategyFactory.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "diskNormalized", value = DiskNormalizedCostBalancerStrategyFactory.class),
@JsonSubTypes.Type(name = "cost", value = CostBalancerStrategyFactory.class),
@JsonSubTypes.Type(name = "random", value = RandomBalancerStrategyFactory.class),
})
public interface BalancerStrategyFactory
{
public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp);
public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec);
}

View File

@ -19,30 +19,12 @@
package io.druid.server.coordinator;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.joda.time.DateTime;
import java.io.IOException;
import java.util.concurrent.Executors;
public class CostBalancerStrategyFactory implements BalancerStrategyFactory
{
private final ListeningExecutorService exec;
public CostBalancerStrategyFactory(int costBalancerStrategyThreadCount)
{
this.exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(costBalancerStrategyThreadCount));
}
@Override
public CostBalancerStrategy createBalancerStrategy(DateTime referenceTimestamp)
public CostBalancerStrategy createBalancerStrategy(ListeningExecutorService exec)
{
return new CostBalancerStrategy(exec);
}
@Override
public void close() throws IOException
{
exec.shutdownNow();
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordinator;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.druid.timeline.DataSegment;
public class DiskNormalizedCostBalancerStrategy extends CostBalancerStrategy
{
public DiskNormalizedCostBalancerStrategy(ListeningExecutorService exec)
{
super(exec);
}
/**
* Averages the cost obtained from CostBalancerStrategy. Also the costs are weighted according to their usage ratios.
* This ensures that all the hosts will have the same % disk utilization.
*/
@Override
protected double computeCost(
final DataSegment proposalSegment, final ServerHolder server, final boolean includeCurrentServer
)
{
double cost = super.computeCost(proposalSegment, server, includeCurrentServer);
if(cost == Double.POSITIVE_INFINITY){
return cost;
}
int nSegments = 1;
if(server.getServer().getSegments().size() > 0)
{
nSegments = server.getServer().getSegments().size();
}
double normalizedCost = cost/nSegments;
double usageRatio = (double)server.getServer().getCurrSize()/(double)server.getServer().getMaxSize();
return normalizedCost*usageRatio;
}
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordinator;
import com.google.common.util.concurrent.ListeningExecutorService;
public class DiskNormalizedCostBalancerStrategyFactory implements BalancerStrategyFactory
{
@Override
public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec)
{
return new DiskNormalizedCostBalancerStrategy(exec);
}
}

View File

@ -28,6 +28,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
@ -84,6 +86,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
@ -129,7 +132,7 @@ public class DruidCoordinator
private volatile int leaderCounter = 0;
private volatile boolean leader = false;
private volatile SegmentReplicantLookup segmentReplicantLookup = null;
private final BalancerStrategyFactory factory;
@Inject
public DruidCoordinator(
@ -146,7 +149,8 @@ public class DruidCoordinator
LoadQueueTaskMaster taskMaster,
ServiceAnnouncer serviceAnnouncer,
@Self DruidNode self,
@CoordinatorIndexingServiceHelper Set<DruidCoordinatorHelper> indexingServiceHelpers
@CoordinatorIndexingServiceHelper Set<DruidCoordinatorHelper> indexingServiceHelpers,
BalancerStrategyFactory factory
)
{
this(
@ -164,7 +168,8 @@ public class DruidCoordinator
serviceAnnouncer,
self,
Maps.<String, LoadQueuePeon>newConcurrentMap(),
indexingServiceHelpers
indexingServiceHelpers,
factory
);
}
@ -183,7 +188,8 @@ public class DruidCoordinator
ServiceAnnouncer serviceAnnouncer,
DruidNode self,
ConcurrentMap<String, LoadQueuePeon> loadQueuePeonMap,
Set<DruidCoordinatorHelper> indexingServiceHelpers
Set<DruidCoordinatorHelper> indexingServiceHelpers,
BalancerStrategyFactory factory
)
{
this.config = config;
@ -205,6 +211,7 @@ public class DruidCoordinator
this.leaderLatch = new AtomicReference<>(null);
this.loadManagementPeons = loadQueuePeonMap;
this.factory = factory;
}
public boolean isLeader()
@ -664,6 +671,7 @@ public class DruidCoordinator
@Override
public void run()
{
ListeningExecutorService balancerExec = null;
try {
synchronized (lock) {
final LeaderLatch latch = leaderLatch.get();
@ -686,27 +694,32 @@ public class DruidCoordinator
}
}
try (BalancerStrategyFactory factory =
new CostBalancerStrategyFactory(getDynamicConfigs().getBalancerComputeThreads())) {
// Do coordinator stuff.
DruidCoordinatorRuntimeParams params =
DruidCoordinatorRuntimeParams.newBuilder()
.withStartTime(startTime)
.withDatasources(metadataSegmentManager.getInventory())
.withDynamicConfigs(getDynamicConfigs())
.withEmitter(emitter)
.withBalancerStrategyFactory(factory)
.build();
for (DruidCoordinatorHelper helper : helpers) {
// Don't read state and run state in the same helper otherwise racy conditions may exist
if (leader && startingLeaderCounter == leaderCounter) {
params = helper.run(params);
}
balancerExec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(getDynamicConfigs().getBalancerComputeThreads()));
BalancerStrategy balancerStrategy = factory.createBalancerStrategy(balancerExec);
// Do coordinator stuff.
DruidCoordinatorRuntimeParams params =
DruidCoordinatorRuntimeParams.newBuilder()
.withStartTime(startTime)
.withDatasources(metadataSegmentManager.getInventory())
.withDynamicConfigs(getDynamicConfigs())
.withEmitter(emitter)
.withBalancerStrategy(balancerStrategy)
.build();
for (DruidCoordinatorHelper helper : helpers) {
// Don't read state and run state in the same helper otherwise racy conditions may exist
if (leader && startingLeaderCounter == leaderCounter) {
params = helper.run(params);
}
}
}
catch (Exception e) {
log.makeAlert(e, "Caught exception, ignoring so that schedule keeps going.").emit();
} finally {
if(balancerExec != null){
balancerExec.shutdownNow();
}
}
}
}

View File

@ -48,7 +48,7 @@ public class DruidCoordinatorRuntimeParams
private final CoordinatorDynamicConfig coordinatorDynamicConfig;
private final CoordinatorStats stats;
private final DateTime balancerReferenceTimestamp;
private final BalancerStrategyFactory strategyFactory;
private final BalancerStrategy balancerStrategy;
public DruidCoordinatorRuntimeParams(
long startTime,
@ -63,7 +63,7 @@ public class DruidCoordinatorRuntimeParams
CoordinatorDynamicConfig coordinatorDynamicConfig,
CoordinatorStats stats,
DateTime balancerReferenceTimestamp,
BalancerStrategyFactory strategyFactory
BalancerStrategy balancerStrategy
)
{
this.startTime = startTime;
@ -78,7 +78,7 @@ public class DruidCoordinatorRuntimeParams
this.coordinatorDynamicConfig = coordinatorDynamicConfig;
this.stats = stats;
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
this.strategyFactory = strategyFactory;
this.balancerStrategy = balancerStrategy;
}
public long getStartTime()
@ -141,9 +141,9 @@ public class DruidCoordinatorRuntimeParams
return balancerReferenceTimestamp;
}
public BalancerStrategyFactory getBalancerStrategyFactory()
public BalancerStrategy getBalancerStrategy()
{
return strategyFactory;
return balancerStrategy;
}
public boolean hasDeletionWaitTimeElapsed()
@ -171,7 +171,7 @@ public class DruidCoordinatorRuntimeParams
coordinatorDynamicConfig,
stats,
balancerReferenceTimestamp,
strategyFactory
balancerStrategy
);
}
@ -190,7 +190,7 @@ public class DruidCoordinatorRuntimeParams
coordinatorDynamicConfig,
stats,
balancerReferenceTimestamp,
strategyFactory
balancerStrategy
);
}
@ -208,7 +208,7 @@ public class DruidCoordinatorRuntimeParams
private CoordinatorDynamicConfig coordinatorDynamicConfig;
private CoordinatorStats stats;
private DateTime balancerReferenceTimestamp;
private BalancerStrategyFactory strategyFactory;
private BalancerStrategy balancerStrategy;
Builder()
{
@ -239,7 +239,7 @@ public class DruidCoordinatorRuntimeParams
CoordinatorDynamicConfig coordinatorDynamicConfig,
CoordinatorStats stats,
DateTime balancerReferenceTimestamp,
BalancerStrategyFactory strategyFactory
BalancerStrategy balancerStrategy
)
{
this.startTime = startTime;
@ -254,7 +254,7 @@ public class DruidCoordinatorRuntimeParams
this.coordinatorDynamicConfig = coordinatorDynamicConfig;
this.stats = stats;
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
this.strategyFactory=strategyFactory;
this.balancerStrategy=balancerStrategy;
}
public DruidCoordinatorRuntimeParams build()
@ -272,7 +272,7 @@ public class DruidCoordinatorRuntimeParams
coordinatorDynamicConfig,
stats,
balancerReferenceTimestamp,
strategyFactory
balancerStrategy
);
}
@ -348,9 +348,9 @@ public class DruidCoordinatorRuntimeParams
return this;
}
public Builder withBalancerStrategyFactory(BalancerStrategyFactory strategyFactory)
public Builder withBalancerStrategy(BalancerStrategy balancerStrategy)
{
this.strategyFactory=strategyFactory;
this.balancerStrategy=balancerStrategy;
return this;
}
}

View File

@ -18,21 +18,13 @@
*/
package io.druid.server.coordinator;
import org.joda.time.DateTime;
import java.io.IOException;
import com.google.common.util.concurrent.ListeningExecutorService;
public class RandomBalancerStrategyFactory implements BalancerStrategyFactory
{
@Override
public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp)
public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec)
{
return new RandomBalancerStrategy();
}
@Override
public void close() throws IOException
{
// No-op
}
}

View File

@ -34,7 +34,6 @@ import io.druid.server.coordinator.LoadPeonCallback;
import io.druid.server.coordinator.LoadQueuePeon;
import io.druid.server.coordinator.ServerHolder;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import java.util.Comparator;
import java.util.List;
@ -85,8 +84,7 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
final CoordinatorStats stats = new CoordinatorStats();
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp);
final BalancerStrategy strategy = params.getBalancerStrategy();
final int maxSegmentsToMove = params.getCoordinatorDynamicConfig().getMaxSegmentsToMove();
for (Map.Entry<String, MinMaxPriorityQueue<ServerHolder>> entry :

View File

@ -33,7 +33,6 @@ import io.druid.server.coordinator.LoadPeonCallback;
import io.druid.server.coordinator.ReplicationThrottler;
import io.druid.server.coordinator.ServerHolder;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import java.util.List;
import java.util.Map;
@ -72,8 +71,7 @@ public abstract class LoadRule implements Rule
}
final List<ServerHolder> serverHolderList = Lists.newArrayList(serverQueue);
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp);
final BalancerStrategy strategy = params.getBalancerStrategy();
if (availableSegments.contains(segment)) {
CoordinatorStats assignStats = assign(
params.getReplicationManager(),

View File

@ -21,8 +21,8 @@ package io.druid.server.coordinator;
import com.carrotsearch.junitbenchmarks.AbstractBenchmark;
import com.carrotsearch.junitbenchmarks.BenchmarkOptions;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -33,29 +33,32 @@ import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
@Ignore
@RunWith(Parameterized.class)
public class CostBalancerStrategyBenchmark extends AbstractBenchmark
{
@Parameterized.Parameters
public static List<CostBalancerStrategyFactory[]> factoryClasses()
public static List<CostBalancerStrategy[]> factoryClasses()
{
return Arrays.asList(
(CostBalancerStrategyFactory[]) Arrays.asList(
new CostBalancerStrategyFactory(1)
(CostBalancerStrategy[]) Arrays.asList(
new CostBalancerStrategy(MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1)))
).toArray(),
(CostBalancerStrategyFactory[]) Arrays.asList(
new CostBalancerStrategyFactory(4)
(CostBalancerStrategy[]) Arrays.asList(
new CostBalancerStrategy(MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(4)))
).toArray()
);
}
private final CostBalancerStrategy strategy;
public CostBalancerStrategyBenchmark(CostBalancerStrategyFactory factory)
public CostBalancerStrategyBenchmark(CostBalancerStrategy costBalancerStrategy)
{
this.strategy = factory.createBalancerStrategy(DateTime.now());
this.strategy = costBalancerStrategy;
}
private static List<ServerHolder> serverHolderList;

View File

@ -0,0 +1,142 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordinator;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.ImmutableDruidDataSource;
import io.druid.client.ImmutableDruidServer;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
public class DiskNormalizedCostBalancerStrategyTest
{
private static final Interval day = new Interval("2015-01-01T00/2015-01-01T01");
/**
* Create Druid cluster with serverCount servers having maxSegments segments each, and 1 server with 98 segment
* Cost Balancer Strategy should assign the next segment to the server with less segments.
*/
public static List<ServerHolder> setupDummyCluster(int serverCount, int maxSegments)
{
List<ServerHolder> serverHolderList = Lists.newArrayList();
// Create 10 servers with current size being 3K & max size being 10K
// Each having having 100 segments
for (int i = 0; i < serverCount; i++) {
LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
Map<String, DataSegment> segments = Maps.newHashMap();
for (int j = 0; j < maxSegments; j++) {
DataSegment segment = getSegment(j);
segments.put(segment.getIdentifier(), segment);
}
serverHolderList.add(
new ServerHolder(
new ImmutableDruidServer(
new DruidServerMetadata("DruidServer_Name_" + i, "localhost", 10000000L, "hot", "hot", 1),
3000L,
ImmutableMap.of("DUMMY", EasyMock.createMock(ImmutableDruidDataSource.class)),
ImmutableMap.copyOf(segments)
),
fromPeon
));
}
// The best server to be available for next segment assignment has greater max Size
LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
ImmutableDruidServer druidServer = EasyMock.createMock(ImmutableDruidServer.class);
EasyMock.expect(druidServer.getName()).andReturn("BEST_SERVER").anyTimes();
EasyMock.expect(druidServer.getCurrSize()).andReturn(3000L).anyTimes();
EasyMock.expect(druidServer.getMaxSize()).andReturn(100000000L).anyTimes();
EasyMock.expect(druidServer.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
Map<String, DataSegment> segments = Maps.newHashMap();
for (int j = 0; j < maxSegments; j++) {
DataSegment segment = getSegment(j);
segments.put(segment.getIdentifier(), segment);
EasyMock.expect(druidServer.getSegment(segment.getIdentifier())).andReturn(segment).anyTimes();
}
EasyMock.expect(druidServer.getSegments()).andReturn(segments).anyTimes();
EasyMock.replay(druidServer);
serverHolderList.add(new ServerHolder(druidServer, fromPeon));
return serverHolderList;
}
/**
* Returns segment with dummy id and size 100
*
* @param index
*
* @return segment
*/
public static DataSegment getSegment(int index)
{
return getSegment(index, "DUMMY", day);
}
public static DataSegment getSegment(int index, String dataSource, Interval interval)
{
// Not using EasyMock as it hampers the performance of multithreads.
DataSegment segment = new DataSegment(
dataSource, interval, String.valueOf(index), Maps.<String, Object>newConcurrentMap(),
Lists.<String>newArrayList(), Lists.<String>newArrayList(), null, 0, index * 100L
);
return segment;
}
@Test
public void testNormalizedCostBalancerMultiThreadedStrategy() throws InterruptedException
{
List<ServerHolder> serverHolderList = setupDummyCluster(10, 20);
DataSegment segment = getSegment(1000);
BalancerStrategy strategy = new DiskNormalizedCostBalancerStrategy(
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4))
);
ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
Assert.assertNotNull("Should be able to find a place for new segment!!", holder);
Assert.assertEquals("Best Server should be BEST_SERVER", "BEST_SERVER", holder.getServer().getName());
}
@Test
public void testNormalizedCostBalancerSingleThreadStrategy() throws InterruptedException
{
List<ServerHolder> serverHolderList = setupDummyCluster(10, 20);
DataSegment segment = getSegment(1000);
BalancerStrategy strategy = new DiskNormalizedCostBalancerStrategy(
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1))
);
ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
Assert.assertNotNull("Should be able to find a place for new segment!!", holder);
Assert.assertEquals("Best Server should be BEST_SERVER", "BEST_SERVER", holder.getServer().getName());
}
}

View File

@ -23,6 +23,8 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.ImmutableDruidServer;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
@ -38,6 +40,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
/**
*/
@ -167,6 +170,10 @@ public class DruidCoordinatorBalancerTest
LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
LoadQueuePeonTester toPeon = new LoadQueuePeonTester();
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params =
DruidCoordinatorRuntimeParams.newBuilder()
@ -198,19 +205,17 @@ public class DruidCoordinatorBalancerTest
MAX_SEGMENTS_TO_MOVE
).build()
)
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
params = new DruidCoordinatorBalancerTester(coordinator).run(params);
Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() < segments.size());
params.getBalancerStrategyFactory().close();
exec.shutdown();
}
@Test
public void testRun1() throws IOException
{
@ -244,6 +249,11 @@ public class DruidCoordinatorBalancerTest
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(coordinator);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
LoadQueuePeonTester toPeon = new LoadQueuePeonTester();
DruidCoordinatorRuntimeParams params =
@ -275,13 +285,13 @@ public class DruidCoordinatorBalancerTest
new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
.build()
)
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
params = new DruidCoordinatorBalancerTester(coordinator).run(params);
Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
params.getBalancerStrategyFactory().close();
exec.shutdown();
}
@ -335,6 +345,11 @@ public class DruidCoordinatorBalancerTest
LoadQueuePeonTester peon3 = new LoadQueuePeonTester();
LoadQueuePeonTester peon4 = new LoadQueuePeonTester();
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params =
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(
@ -371,13 +386,13 @@ public class DruidCoordinatorBalancerTest
MAX_SEGMENTS_TO_MOVE
).build()
)
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
params = new DruidCoordinatorBalancerTester(coordinator).run(params);
Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
params.getBalancerStrategyFactory().close();
exec.shutdown();
}
}

View File

@ -25,6 +25,8 @@ import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceEventBuilder;
@ -50,6 +52,7 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
/**
*/
@ -179,13 +182,18 @@ public class DruidCoordinatorRuleRunnerTest
)
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove(5).build())
.build();
@ -199,8 +207,8 @@ public class DruidCoordinatorRuleRunnerTest
Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null);
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
exec.shutdown();
EasyMock.verify(mockPeon);
params.getBalancerStrategyFactory().close();
}
/**
@ -276,13 +284,18 @@ public class DruidCoordinatorRuleRunnerTest
)
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
@ -294,8 +307,8 @@ public class DruidCoordinatorRuleRunnerTest
Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null);
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
exec.shutdown();
EasyMock.verify(mockPeon);
params.getBalancerStrategyFactory().close();
}
/**
@ -367,13 +380,18 @@ public class DruidCoordinatorRuleRunnerTest
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
@ -385,8 +403,8 @@ public class DruidCoordinatorRuleRunnerTest
Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null);
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
exec.shutdown();
EasyMock.verify(mockPeon);
params.getBalancerStrategyFactory().close();
}
@Test
@ -432,6 +450,11 @@ public class DruidCoordinatorRuleRunnerTest
)
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withEmitter(emitter)
@ -439,15 +462,15 @@ public class DruidCoordinatorRuleRunnerTest
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
ruleRunner.run(params);
exec.shutdown();
EasyMock.verify(emitter);
EasyMock.verify(mockPeon);
params.getBalancerStrategyFactory().close();
}
@Test
@ -554,13 +577,18 @@ public class DruidCoordinatorRuleRunnerTest
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
@ -569,8 +597,8 @@ public class DruidCoordinatorRuleRunnerTest
Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12);
exec.shutdown();
EasyMock.verify(coordinator);
params.getBalancerStrategyFactory().close();
}
@Test
@ -633,13 +661,18 @@ public class DruidCoordinatorRuleRunnerTest
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
@ -649,8 +682,8 @@ public class DruidCoordinatorRuleRunnerTest
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 1);
Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12);
exec.shutdown();
EasyMock.verify(mockPeon);
params.getBalancerStrategyFactory().close();
}
@Test
@ -719,13 +752,18 @@ public class DruidCoordinatorRuleRunnerTest
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
@ -735,8 +773,8 @@ public class DruidCoordinatorRuleRunnerTest
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 1);
Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12);
exec.shutdown();
EasyMock.verify(mockPeon);
params.getBalancerStrategyFactory().close();
}
@Test
@ -801,13 +839,18 @@ public class DruidCoordinatorRuleRunnerTest
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
@ -817,8 +860,8 @@ public class DruidCoordinatorRuleRunnerTest
Assert.assertTrue(stats.getPerTierStats().get("droppedCount") == null);
Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12);
exec.shutdown();
EasyMock.verify(mockPeon);
params.getBalancerStrategyFactory().close();
}
@Test
@ -896,13 +939,18 @@ public class DruidCoordinatorRuleRunnerTest
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
@ -911,9 +959,9 @@ public class DruidCoordinatorRuleRunnerTest
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 1);
exec.shutdown();
EasyMock.verify(mockPeon);
EasyMock.verify(anotherMockPeon);
params.getBalancerStrategyFactory().close();
}
/**
@ -971,13 +1019,18 @@ public class DruidCoordinatorRuleRunnerTest
)
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
@ -999,7 +1052,6 @@ public class DruidCoordinatorRuleRunnerTest
1,
0
);
afterParams.getBalancerStrategyFactory().close();
afterParams = ruleRunner.run(
new DruidCoordinatorRuntimeParams.Builder()
@ -1007,7 +1059,7 @@ public class DruidCoordinatorRuleRunnerTest
.withEmitter(emitter)
.withAvailableSegments(Arrays.asList(overFlowSegment))
.withDatabaseRuleManager(databaseRuleManager)
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.build()
@ -1019,7 +1071,7 @@ public class DruidCoordinatorRuleRunnerTest
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
EasyMock.verify(mockPeon);
afterParams.getBalancerStrategyFactory().close();
exec.shutdown();
}
/**
@ -1096,12 +1148,17 @@ public class DruidCoordinatorRuleRunnerTest
)
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.build();
@ -1116,7 +1173,7 @@ public class DruidCoordinatorRuleRunnerTest
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
EasyMock.verify(mockPeon);
params.getBalancerStrategyFactory().close();
exec.shutdown();
}
@Test
@ -1193,13 +1250,18 @@ public class DruidCoordinatorRuleRunnerTest
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(longerAvailableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
@ -1208,7 +1270,7 @@ public class DruidCoordinatorRuleRunnerTest
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 24);
EasyMock.verify(mockPeon);
params.getBalancerStrategyFactory().close();
exec.shutdown();
}
@Test
@ -1274,13 +1336,18 @@ public class DruidCoordinatorRuleRunnerTest
)
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove(5).build())
.build();
@ -1298,7 +1365,7 @@ public class DruidCoordinatorRuleRunnerTest
Assert.assertEquals(availableSegments, afterParams.getAvailableSegments());
EasyMock.verify(mockPeon);
params.getBalancerStrategyFactory().close();
exec.shutdown();
}
private void mockCoordinator()

View File

@ -189,7 +189,8 @@ public class DruidCoordinatorTest extends CuratorTestBase
},
druidNode,
loadManagementPeons,
null
null,
new CostBalancerStrategyFactory()
);
}

View File

@ -26,12 +26,15 @@ import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.logger.Logger;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.LoggingEmitter;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.DruidServer;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.CostBalancerStrategyFactory;
import io.druid.server.coordinator.DruidCluster;
@ -53,6 +56,7 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.Executors;
/**
*/
@ -190,14 +194,18 @@ public class LoadRuleTest
)
);
CostBalancerStrategyFactory costBalancerStrategyFactory = new CostBalancerStrategyFactory(1);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
CoordinatorStats stats = rule.run(
null,
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withReplicationManager(throttler)
.withBalancerStrategyFactory(costBalancerStrategyFactory)
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.withAvailableSegments(Arrays.asList(segment)).build(),
segment
@ -205,7 +213,7 @@ public class LoadRuleTest
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 1);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get(DruidServer.DEFAULT_TIER).get() == 2);
costBalancerStrategyFactory.close();
exec.shutdown();
}
@Test
@ -296,14 +304,18 @@ public class LoadRuleTest
)
);
CostBalancerStrategyFactory costBalancerStrategyFactory = new CostBalancerStrategyFactory(1);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
CoordinatorStats stats = rule.run(
null,
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withReplicationManager(throttler)
.withBalancerStrategyFactory(costBalancerStrategyFactory)
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.withAvailableSegments(Arrays.asList(segment)).build(),
segment
@ -311,7 +323,7 @@ public class LoadRuleTest
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1);
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get(DruidServer.DEFAULT_TIER).get() == 1);
costBalancerStrategyFactory.close();
exec.shutdown();
}
@Test
@ -381,21 +393,26 @@ public class LoadRuleTest
)
)
);
CostBalancerStrategyFactory costBalancerStrategyFactory = new CostBalancerStrategyFactory(1);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
CoordinatorStats stats = rule.run(
null,
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withReplicationManager(throttler)
.withBalancerStrategyFactory(costBalancerStrategyFactory)
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.withAvailableSegments(Arrays.asList(segment)).build(),
segment
);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 1);
costBalancerStrategyFactory.close();
exec.shutdown();
}
@Test
@ -481,7 +498,11 @@ CostBalancerStrategyFactory costBalancerStrategyFactory = new CostBalancerStrate
)
)
);
CostBalancerStrategyFactory costBalancerStrategyFactory = new CostBalancerStrategyFactory(1);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
CoordinatorStats stats = rule.run(
null,
@ -489,13 +510,13 @@ CostBalancerStrategyFactory costBalancerStrategyFactory = new CostBalancerStrate
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withReplicationManager(throttler)
.withBalancerStrategyFactory(costBalancerStrategyFactory)
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.withAvailableSegments(Arrays.asList(segment)).build(),
segment
);
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1);
costBalancerStrategyFactory.close();
exec.shutdown();
}
}

View File

@ -51,6 +51,7 @@ import io.druid.metadata.MetadataSegmentManagerProvider;
import io.druid.metadata.MetadataStorage;
import io.druid.metadata.MetadataStorageProvider;
import io.druid.server.audit.AuditManagerProvider;
import io.druid.server.coordinator.BalancerStrategyFactory;
import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.DruidCoordinatorConfig;
import io.druid.server.coordinator.LoadQueueTaskMaster;
@ -128,6 +129,7 @@ public class CliCoordinator extends ServerRunnable
JsonConfigProvider.bind(binder, "druid.manager.segments", MetadataSegmentManagerConfig.class);
JsonConfigProvider.bind(binder, "druid.manager.rules", MetadataRuleManagerConfig.class);
JsonConfigProvider.bind(binder, "druid.manager.lookups", LookupCoordinatorManagerConfig.class);
JsonConfigProvider.bind(binder, "druid.coordinator.balancer", BalancerStrategyFactory.class);
binder.bind(RedirectFilter.class).in(LazySingleton.class);
binder.bind(RedirectInfo.class).to(CoordinatorRedirectInfo.class).in(LazySingleton.class);