Optimize CostBalancerStrategy (#2910)

* Optimize CostBalancerStrategy

Ignore benchmark test in normal run

fix test

review comments

fix compilation

fix test

* review comments

* review comment
This commit is contained in:
Nishant 2016-05-05 20:59:08 +05:30 committed by Fangjin Yang
parent b489f63698
commit a2dd57cf65
11 changed files with 272 additions and 91 deletions

View File

@ -20,7 +20,9 @@ package io.druid.server.coordinator;
import org.joda.time.DateTime;
public interface BalancerStrategyFactory
import java.io.Closeable;
public interface BalancerStrategyFactory extends Closeable
{
public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp);
}

View File

@ -32,7 +32,7 @@ import org.joda.time.Interval;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
public class CostBalancerStrategy implements BalancerStrategy
{
@ -41,12 +41,23 @@ public class CostBalancerStrategy implements BalancerStrategy
private static final long SEVEN_DAYS_IN_MILLIS = 7 * DAY_IN_MILLIS;
private static final long THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS;
private final long referenceTimestamp;
private final int threadCount;
private final ListeningExecutorService exec;
public CostBalancerStrategy(DateTime referenceTimestamp, int threadCount)
public static long gapMillis(Interval interval1, Interval interval2)
{
if (interval1.getStartMillis() > interval2.getEndMillis()) {
return interval1.getStartMillis() - interval2.getEndMillis();
} else if (interval2.getStartMillis() > interval1.getEndMillis()) {
return interval2.getStartMillis() - interval1.getEndMillis();
} else {
return 0;
}
}
public CostBalancerStrategy(DateTime referenceTimestamp, ListeningExecutorService exec)
{
this.referenceTimestamp = referenceTimestamp.getMillis();
this.threadCount = threadCount;
this.exec = exec;
}
@Override
@ -85,7 +96,7 @@ public class CostBalancerStrategy implements BalancerStrategy
*/
public double computeJointSegmentCosts(final DataSegment segment1, final DataSegment segment2)
{
final Interval gap = segment1.getInterval().gap(segment2.getInterval());
final long gapMillis = gapMillis(segment1.getInterval(), segment2.getInterval());
final double baseCost = Math.min(segment1.getSize(), segment2.getSize());
double recencyPenalty = 1;
@ -103,10 +114,9 @@ public class CostBalancerStrategy implements BalancerStrategy
}
/** gap is null if the two segment intervals overlap or if they're adjacent */
if (gap == null) {
if (gapMillis == 0) {
gapPenalty = 2;
} else {
long gapMillis = gap.toDurationMillis();
if (gapMillis < THIRTY_DAYS_IN_MILLIS) {
gapPenalty = 2 - gapMillis / THIRTY_DAYS_IN_MILLIS;
}
@ -151,8 +161,8 @@ public class CostBalancerStrategy implements BalancerStrategy
* @param serverHolders A list of ServerHolders for a particular tier.
*
* @return The normalization value (the sum of the diagonal entries in the
* pairwise cost matrix). This is the cost of a cluster if each
* segment were to get its own historical node.
* pairwise cost matrix). This is the cost of a cluster if each
* segment were to get its own historical node.
*/
public double calculateNormalization(final List<ServerHolder> serverHolders)
{
@ -234,12 +244,11 @@ public class CostBalancerStrategy implements BalancerStrategy
{
Pair<Double, ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY, null);
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadCount));
List<ListenableFuture<Pair<Double, ServerHolder>>> futures = Lists.newArrayList();
for (final ServerHolder server : serverHolders) {
futures.add(
service.submit(
exec.submit(
new Callable<Pair<Double, ServerHolder>>()
{
@Override
@ -264,9 +273,7 @@ public class CostBalancerStrategy implements BalancerStrategy
catch (Exception e) {
log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to complete cost computation.").emit();
}
service.shutdown();
return bestServer;
}
}

View File

@ -18,20 +18,31 @@
*/
package io.druid.server.coordinator;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.joda.time.DateTime;
import java.io.IOException;
import java.util.concurrent.Executors;
public class CostBalancerStrategyFactory implements BalancerStrategyFactory
{
private final int threadCount;
private final ListeningExecutorService exec;
public CostBalancerStrategyFactory(int costBalancerStrategyThreadCount)
{
this.threadCount = costBalancerStrategyThreadCount;
this.exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(costBalancerStrategyThreadCount));
}
@Override
public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp)
{
return new CostBalancerStrategy(referenceTimestamp, threadCount);
return new CostBalancerStrategy(referenceTimestamp, exec);
}
@Override
public void close() throws IOException
{
exec.shutdownNow();
}
}

View File

@ -686,23 +686,22 @@ public class DruidCoordinator
}
}
BalancerStrategyFactory factory =
new CostBalancerStrategyFactory(getDynamicConfigs().getBalancerComputeThreads());
// Do coordinator stuff.
DruidCoordinatorRuntimeParams params =
DruidCoordinatorRuntimeParams.newBuilder()
.withStartTime(startTime)
.withDatasources(metadataSegmentManager.getInventory())
.withDynamicConfigs(getDynamicConfigs())
.withEmitter(emitter)
.withBalancerStrategyFactory(factory)
.build();
for (DruidCoordinatorHelper helper : helpers) {
// Don't read state and run state in the same helper otherwise racy conditions may exist
if (leader && startingLeaderCounter == leaderCounter) {
params = helper.run(params);
try (BalancerStrategyFactory factory =
new CostBalancerStrategyFactory(getDynamicConfigs().getBalancerComputeThreads())) {
// Do coordinator stuff.
DruidCoordinatorRuntimeParams params =
DruidCoordinatorRuntimeParams.newBuilder()
.withStartTime(startTime)
.withDatasources(metadataSegmentManager.getInventory())
.withDynamicConfigs(getDynamicConfigs())
.withEmitter(emitter)
.withBalancerStrategyFactory(factory)
.build();
for (DruidCoordinatorHelper helper : helpers) {
// Don't read state and run state in the same helper otherwise racy conditions may exist
if (leader && startingLeaderCounter == leaderCounter) {
params = helper.run(params);
}
}
}
}

View File

@ -21,7 +21,6 @@ package io.druid.server.coordinator;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.metamx.common.guava.Comparators;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.DruidDataSource;
import io.druid.metadata.MetadataRuleManager;
@ -206,7 +205,6 @@ public class DruidCoordinatorRuntimeParams
this.stats = new CoordinatorStats();
this.coordinatorDynamicConfig = new CoordinatorDynamicConfig.Builder().build();
this.balancerReferenceTimestamp = DateTime.now();
this.strategyFactory = new CostBalancerStrategyFactory(1);
}
Builder(

View File

@ -20,6 +20,8 @@ package io.druid.server.coordinator;
import org.joda.time.DateTime;
import java.io.IOException;
public class RandomBalancerStrategyFactory implements BalancerStrategyFactory
{
@Override
@ -27,4 +29,10 @@ public class RandomBalancerStrategyFactory implements BalancerStrategyFactory
{
return new RandomBalancerStrategy();
}
@Override
public void close() throws IOException
{
// No-op
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordination;
import com.carrotsearch.junitbenchmarks.AbstractBenchmark;
import com.carrotsearch.junitbenchmarks.BenchmarkOptions;
import io.druid.server.coordinator.CostBalancerStrategy;
import io.druid.server.coordinator.CostBalancerStrategyFactory;
import io.druid.server.coordinator.ServerHolder;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.List;
@Ignore
@RunWith(Parameterized.class)
public class CostBalancerStrategyBenchmark extends AbstractBenchmark
{
@Parameterized.Parameters
public static List<CostBalancerStrategyFactory[]> factoryClasses()
{
return Arrays.asList(
(CostBalancerStrategyFactory[]) Arrays.asList(
new CostBalancerStrategyFactory(1)
).toArray(),
(CostBalancerStrategyFactory[]) Arrays.asList(
new CostBalancerStrategyFactory(4)
).toArray()
);
}
private final CostBalancerStrategy strategy;
public CostBalancerStrategyBenchmark(CostBalancerStrategyFactory factory)
{
this.strategy = (CostBalancerStrategy) factory.createBalancerStrategy(DateTime.now());
}
private static List<ServerHolder> serverHolderList;
volatile ServerHolder selected;
@BeforeClass
public static void setup()
{
serverHolderList = CostBalancerStrategyTest.setupDummyCluster(5, 20000);
}
@AfterClass
public static void tearDown(){
serverHolderList = null;
}
@Test
@BenchmarkOptions(warmupRounds = 10, benchmarkRounds = 1000)
public void testBenchmark() throws InterruptedException
{
DataSegment segment = CostBalancerStrategyTest.getSegment(1000, "testds", interval1);
selected = strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
}
// Benchmark Joda Interval Gap impl vs CostBalancer.gapMillis
private final Interval interval1 = new Interval("2015-01-01T01:00:00Z/2015-01-01T02:00:00Z");
private final Interval interval2 = new Interval("2015-02-01T01:00:00Z/2015-02-01T02:00:00Z");
volatile Long sum;
@BenchmarkOptions(warmupRounds = 1000, benchmarkRounds = 1000000)
@Test
public void testJodaGap()
{
long diff = 0;
for (int i = 0; i < 1000; i++) {
diff = diff + interval1.gap(interval2).toDurationMillis();
}
sum = diff;
}
@BenchmarkOptions(warmupRounds = 1000, benchmarkRounds = 1000000)
@Test
public void testBalancerGapMillis()
{
long diff = 0;
for (int i = 0; i < 1000; i++) {
diff = diff + CostBalancerStrategy.gapMillis(interval1, interval2);
}
sum = diff;
}
}

View File

@ -19,9 +19,13 @@
package io.druid.server.coordination;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.ImmutableDruidDataSource;
import io.druid.client.ImmutableDruidServer;
import io.druid.concurrent.Execs;
import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.CostBalancerStrategy;
import io.druid.server.coordinator.LoadQueuePeonTester;
@ -37,39 +41,39 @@ import org.junit.Test;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
public class CostBalancerStrategyTest
{
private final List<ServerHolder> serverHolderList = Lists.newArrayList();
private final Interval day = DateTime.now().toDateMidnight().toInterval();
private static final Interval day = DateTime.now().toDateMidnight().toInterval();
/**
* Create Druid cluster with 10 servers having 100 segments each, and 1 server with 98 segment
* Create Druid cluster with serverCount servers having maxSegments segments each, and 1 server with 98 segment
* Cost Balancer Strategy should assign the next segment to the server with less segments.
*/
public void setupDummyCluster(int serverCount, int maxSegments)
public static List<ServerHolder> setupDummyCluster(int serverCount, int maxSegments)
{
List<ServerHolder> serverHolderList = Lists.newArrayList();
// Create 10 servers with current size being 3K & max size being 10K
// Each having having 100 segments
for (int i = 0; i < serverCount; i++) {
LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
ImmutableDruidServer druidServer = EasyMock.createMock(ImmutableDruidServer.class);
EasyMock.expect(druidServer.getName()).andReturn("DruidServer_Name_" + i).anyTimes();
EasyMock.expect(druidServer.getCurrSize()).andReturn(3000L).anyTimes();
EasyMock.expect(druidServer.getMaxSize()).andReturn(10000000L).anyTimes();
EasyMock.expect(druidServer.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
Map<String, DataSegment> segments = Maps.newHashMap();
for (int j = 0; j < maxSegments; j++) {
DataSegment segment = getSegment(j);
segments.put(segment.getIdentifier(), segment);
EasyMock.expect(druidServer.getSegment(segment.getIdentifier())).andReturn(segment).anyTimes();
}
EasyMock.expect(druidServer.getSegments()).andReturn(segments).anyTimes();
EasyMock.replay(druidServer);
serverHolderList.add(new ServerHolder(druidServer, fromPeon));
serverHolderList.add(
new ServerHolder(
new ImmutableDruidServer(
new DruidServerMetadata("DruidServer_Name_" + i, "localhost", 10000000L, "hot", "hot", 1),
3000L,
ImmutableMap.of("DUMMY", EasyMock.createMock(ImmutableDruidDataSource.class)),
ImmutableMap.copyOf(segments)
),
fromPeon
));
}
// The best server to be available for next segment assignment has only 98 Segments
@ -90,6 +94,7 @@ public class CostBalancerStrategyTest
EasyMock.replay(druidServer);
serverHolderList.add(new ServerHolder(druidServer, fromPeon));
return serverHolderList;
}
/**
@ -99,12 +104,12 @@ public class CostBalancerStrategyTest
*
* @return segment
*/
private DataSegment getSegment(int index)
public static DataSegment getSegment(int index)
{
return getSegment(index, "DUMMY", day);
}
private DataSegment getSegment(int index, String dataSource, Interval interval)
public static DataSegment getSegment(int index, String dataSource, Interval interval)
{
// Not using EasyMock as it hampers the performance of multithreads.
DataSegment segment = new DataSegment(
@ -117,11 +122,14 @@ public class CostBalancerStrategyTest
@Test
public void testCostBalancerMultiThreadedStrategy() throws InterruptedException
{
setupDummyCluster(10, 20);
List<ServerHolder> serverHolderList = setupDummyCluster(10, 20);
DataSegment segment = getSegment(1000);
final DateTime referenceTimestamp = new DateTime("2014-01-01");
BalancerStrategy strategy = new CostBalancerStrategy(referenceTimestamp, 4);
BalancerStrategy strategy = new CostBalancerStrategy(
referenceTimestamp,
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4))
);
ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
Assert.assertNotNull("Should be able to find a place for new segment!!", holder);
Assert.assertEquals("Best Server should be BEST_SERVER", "BEST_SERVER", holder.getServer().getName());
@ -130,44 +138,27 @@ public class CostBalancerStrategyTest
@Test
public void testCostBalancerSingleThreadStrategy() throws InterruptedException
{
setupDummyCluster(10, 20);
List<ServerHolder> serverHolderList = setupDummyCluster(10, 20);
DataSegment segment = getSegment(1000);
final DateTime referenceTimestamp = new DateTime("2014-01-01");
BalancerStrategy strategy = new CostBalancerStrategy(referenceTimestamp, 1);
BalancerStrategy strategy = new CostBalancerStrategy(
referenceTimestamp,
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1))
);
ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
Assert.assertNotNull("Should be able to find a place for new segment!!", holder);
Assert.assertEquals("Best Server should be BEST_SERVER", "BEST_SERVER", holder.getServer().getName());
}
@Test
@Ignore
public void testBenchmark() throws InterruptedException
{
setupDummyCluster(100, 500);
DataSegment segment = getSegment(1000);
BalancerStrategy singleThreadStrategy = new CostBalancerStrategy(DateTime.now(DateTimeZone.UTC), 1);
long start = System.currentTimeMillis();
singleThreadStrategy.findNewSegmentHomeReplicator(segment, serverHolderList);
long end = System.currentTimeMillis();
long latencySingleThread = end - start;
BalancerStrategy strategy = new CostBalancerStrategy(DateTime.now(DateTimeZone.UTC), 4);
start = System.currentTimeMillis();
strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
end = System.currentTimeMillis();
long latencyMultiThread = end - start;
System.err.println("Latency - Single Threaded (ms): " + latencySingleThread);
System.err.println("Latency - Multi Threaded (ms): " + latencyMultiThread);
}
@Test
public void testComputeJointSegmentCost()
{
DateTime referenceTime = new DateTime("2014-01-01T00:00:00");
CostBalancerStrategy strategy = new CostBalancerStrategy(referenceTime, 4);
CostBalancerStrategy strategy = new CostBalancerStrategy(
referenceTime,
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4))
);
double segmentCost = strategy.computeJointSegmentCosts(
getSegment(
100,
@ -187,7 +178,6 @@ public class CostBalancerStrategyTest
)
);
Assert.assertEquals(138028.62811791385d, segmentCost, 0);
}
}

View File

@ -34,6 +34,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@ -134,7 +135,7 @@ public class DruidCoordinatorBalancerTest
@Test
public void testMoveToEmptyServerBalancer()
public void testMoveToEmptyServerBalancer() throws IOException
{
EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce();
EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
@ -197,19 +198,21 @@ public class DruidCoordinatorBalancerTest
MAX_SEGMENTS_TO_MOVE
).build()
)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
params = new DruidCoordinatorBalancerTester(coordinator).run(params);
Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() < segments.size());
params.getBalancerStrategyFactory().close();
}
@Test
public void testRun1()
public void testRun1() throws IOException
{
// Mock some servers of different usages
@ -272,16 +275,18 @@ public class DruidCoordinatorBalancerTest
new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
.build()
)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
params = new DruidCoordinatorBalancerTester(coordinator).run(params);
Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
params.getBalancerStrategyFactory().close();
}
@Test
public void testRun2()
public void testRun2() throws IOException
{
// Mock some servers of different usages
EasyMock.expect(druidServer1.getName()).andReturn("1").atLeastOnce();
@ -366,11 +371,13 @@ public class DruidCoordinatorBalancerTest
MAX_SEGMENTS_TO_MOVE
).build()
)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
params = new DruidCoordinatorBalancerTester(coordinator).run(params);
Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
params.getBalancerStrategyFactory().close();
}
}

View File

@ -182,6 +182,8 @@ public class DruidCoordinatorRuleRunnerTest
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove(5).build())
.build();
@ -195,6 +197,7 @@ public class DruidCoordinatorRuleRunnerTest
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
EasyMock.verify(mockPeon);
params.getBalancerStrategyFactory().close();
}
/**
@ -276,6 +279,7 @@ public class DruidCoordinatorRuleRunnerTest
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
@ -288,6 +292,7 @@ public class DruidCoordinatorRuleRunnerTest
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
EasyMock.verify(mockPeon);
params.getBalancerStrategyFactory().close();
}
/**
@ -365,6 +370,7 @@ public class DruidCoordinatorRuleRunnerTest
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
@ -377,6 +383,7 @@ public class DruidCoordinatorRuleRunnerTest
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
EasyMock.verify(mockPeon);
params.getBalancerStrategyFactory().close();
}
@Test
@ -429,6 +436,7 @@ public class DruidCoordinatorRuleRunnerTest
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
@ -436,6 +444,7 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.verify(emitter);
EasyMock.verify(mockPeon);
params.getBalancerStrategyFactory().close();
}
@Test
@ -548,6 +557,7 @@ public class DruidCoordinatorRuleRunnerTest
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
@ -557,6 +567,7 @@ public class DruidCoordinatorRuleRunnerTest
Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12);
EasyMock.verify(coordinator);
params.getBalancerStrategyFactory().close();
}
@Test
@ -625,6 +636,7 @@ public class DruidCoordinatorRuleRunnerTest
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
@ -635,6 +647,7 @@ public class DruidCoordinatorRuleRunnerTest
Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12);
EasyMock.verify(mockPeon);
params.getBalancerStrategyFactory().close();
}
@Test
@ -709,6 +722,7 @@ public class DruidCoordinatorRuleRunnerTest
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
@ -719,6 +733,7 @@ public class DruidCoordinatorRuleRunnerTest
Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12);
EasyMock.verify(mockPeon);
params.getBalancerStrategyFactory().close();
}
@Test
@ -789,6 +804,7 @@ public class DruidCoordinatorRuleRunnerTest
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
@ -799,6 +815,7 @@ public class DruidCoordinatorRuleRunnerTest
Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12);
EasyMock.verify(mockPeon);
params.getBalancerStrategyFactory().close();
}
@Test
@ -882,6 +899,7 @@ public class DruidCoordinatorRuleRunnerTest
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
@ -892,6 +910,7 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.verify(mockPeon);
EasyMock.verify(anotherMockPeon);
params.getBalancerStrategyFactory().close();
}
/**
@ -955,6 +974,8 @@ public class DruidCoordinatorRuleRunnerTest
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
@ -975,6 +996,7 @@ public class DruidCoordinatorRuleRunnerTest
1,
0
);
afterParams.getBalancerStrategyFactory().close();
afterParams = ruleRunner.run(
new DruidCoordinatorRuntimeParams.Builder()
@ -982,6 +1004,8 @@ public class DruidCoordinatorRuleRunnerTest
.withEmitter(emitter)
.withAvailableSegments(Arrays.asList(overFlowSegment))
.withDatabaseRuleManager(databaseRuleManager)
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.build()
);
@ -992,6 +1016,7 @@ public class DruidCoordinatorRuleRunnerTest
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
EasyMock.verify(mockPeon);
afterParams.getBalancerStrategyFactory().close();
}
/**
@ -1073,6 +1098,8 @@ public class DruidCoordinatorRuleRunnerTest
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.build();
@ -1086,6 +1113,7 @@ public class DruidCoordinatorRuleRunnerTest
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
EasyMock.verify(mockPeon);
params.getBalancerStrategyFactory().close();
}
@Test
@ -1168,6 +1196,7 @@ public class DruidCoordinatorRuleRunnerTest
.withAvailableSegments(longerAvailableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategyFactory(new CostBalancerStrategyFactory(1))
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
@ -1176,6 +1205,7 @@ public class DruidCoordinatorRuleRunnerTest
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 24);
EasyMock.verify(mockPeon);
params.getBalancerStrategyFactory().close();
}
private void mockCoordinator()

View File

@ -33,6 +33,7 @@ import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.DruidServer;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.CostBalancerStrategyFactory;
import io.druid.server.coordinator.DruidCluster;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.server.coordinator.LoadPeonCallback;
@ -189,18 +190,22 @@ public class LoadRuleTest
)
);
CostBalancerStrategyFactory costBalancerStrategyFactory = new CostBalancerStrategyFactory(1);
CoordinatorStats stats = rule.run(
null,
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withReplicationManager(throttler)
.withBalancerStrategyFactory(costBalancerStrategyFactory)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.withAvailableSegments(Arrays.asList(segment)).build(),
segment
);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 1);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get(DruidServer.DEFAULT_TIER).get() == 2);
costBalancerStrategyFactory.close();
}
@Test
@ -291,19 +296,22 @@ public class LoadRuleTest
)
);
CostBalancerStrategyFactory costBalancerStrategyFactory = new CostBalancerStrategyFactory(1);
CoordinatorStats stats = rule.run(
null,
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withReplicationManager(throttler)
.withBalancerStrategyFactory(costBalancerStrategyFactory)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.withAvailableSegments(Arrays.asList(segment)).build(),
segment
);
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1);
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get(DruidServer.DEFAULT_TIER).get() == 1);
costBalancerStrategyFactory.close();
}
@Test
@ -373,18 +381,21 @@ public class LoadRuleTest
)
)
);
CostBalancerStrategyFactory costBalancerStrategyFactory = new CostBalancerStrategyFactory(1);
CoordinatorStats stats = rule.run(
null,
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withReplicationManager(throttler)
.withBalancerStrategyFactory(costBalancerStrategyFactory)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.withAvailableSegments(Arrays.asList(segment)).build(),
segment
);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 1);
costBalancerStrategyFactory.close();
}
@Test
@ -470,6 +481,7 @@ public class LoadRuleTest
)
)
);
CostBalancerStrategyFactory costBalancerStrategyFactory = new CostBalancerStrategyFactory(1);
CoordinatorStats stats = rule.run(
null,
@ -477,10 +489,13 @@ public class LoadRuleTest
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withReplicationManager(throttler)
.withBalancerStrategyFactory(costBalancerStrategyFactory)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.withAvailableSegments(Arrays.asList(segment)).build(),
segment
);
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1);
costBalancerStrategyFactory.close();
}
}