Do not remove segments from currentlyMovingSegments in DruidBalancer if move is impossible or not needed (#4472)

* Do not remove segment that should not be moved from currentlyMovingSegments (segments are removed by callbacks or not inserted)

* Replace putIfAbsent with computeIfAbsent in DruidBalancer

* Refactoring
This commit is contained in:
dgolitsyn 2017-08-08 17:22:59 +03:00 committed by Roman Leventov
parent bbe7fb8c46
commit 4dd1e2b59e
3 changed files with 281 additions and 324 deletions

View File

@ -20,12 +20,10 @@
package io.druid.server.coordinator.helper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.ImmutableDruidServer;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.guava.Comparators;
import io.druid.server.coordinator.BalancerSegmentHolder;
import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.CoordinatorStats;
@ -37,6 +35,7 @@ import io.druid.server.coordinator.ServerHolder;
import io.druid.timeline.DataSegment;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -45,21 +44,15 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
{
public static final Comparator<ServerHolder> percentUsedComparator = Comparators.inverse(
new Comparator<ServerHolder>()
{
@Override
public int compare(ServerHolder lhs, ServerHolder rhs)
{
return lhs.getPercentUsed().compareTo(rhs.getPercentUsed());
}
}
);
public static final Comparator<ServerHolder> percentUsedComparator =
Comparator.comparing(ServerHolder::getPercentUsed).reversed();
protected static final EmittingLogger log = new EmittingLogger(DruidCoordinatorBalancer.class);
protected final DruidCoordinator coordinator;
protected final Map<String, ConcurrentHashMap<String, BalancerSegmentHolder>> currentlyMovingSegments = Maps.newHashMap();
protected final Map<String, ConcurrentHashMap<String, BalancerSegmentHolder>> currentlyMovingSegments =
new HashMap<>();
public DruidCoordinatorBalancer(
DruidCoordinator coordinator
@ -85,74 +78,76 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
final CoordinatorStats stats = new CoordinatorStats();
params.getDruidCluster().getHistoricals().forEach((String tier, MinMaxPriorityQueue<ServerHolder> servers) -> {
balanceTier(params, tier, servers, stats);
});
return params.buildFromExisting().withCoordinatorStats(stats).build();
}
private void balanceTier(
DruidCoordinatorRuntimeParams params,
String tier,
MinMaxPriorityQueue<ServerHolder> servers,
CoordinatorStats stats
)
{
final BalancerStrategy strategy = params.getBalancerStrategy();
final int maxSegmentsToMove = params.getCoordinatorDynamicConfig().getMaxSegmentsToMove();
for (Map.Entry<String, MinMaxPriorityQueue<ServerHolder>> entry :
params.getDruidCluster().getHistoricals().entrySet()) {
String tier = entry.getKey();
if (currentlyMovingSegments.get(tier) == null) {
currentlyMovingSegments.put(tier, new ConcurrentHashMap<String, BalancerSegmentHolder>());
}
if (!currentlyMovingSegments.get(tier).isEmpty()) {
reduceLifetimes(tier);
log.info("[%s]: Still waiting on %,d segments to be moved", tier, currentlyMovingSegments.size());
continue;
}
final List<ServerHolder> serverHolderList = Lists.newArrayList(entry.getValue());
if (serverHolderList.size() <= 1) {
log.info("[%s]: One or fewer servers found. Cannot balance.", tier);
continue;
}
int numSegments = 0;
for (ServerHolder server : serverHolderList) {
numSegments += server.getServer().getSegments().size();
}
if (numSegments == 0) {
log.info("No segments found. Cannot balance.");
continue;
}
long unmoved = 0L;
for (int iter = 0; iter < maxSegmentsToMove; iter++) {
final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(serverHolderList);
if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) {
final ServerHolder holder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), serverHolderList);
if (holder != null) {
moveSegment(segmentToMove, holder.getServer(), params);
} else {
++unmoved;
}
}
}
if (unmoved == maxSegmentsToMove) {
// Cluster should be alive and constantly adjusting
log.info("No good moves found in tier [%s]", tier);
}
stats.addToTieredStat("unmovedCount", tier, unmoved);
stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size());
if (params.getCoordinatorDynamicConfig().emitBalancingStats()) {
strategy.emitStats(tier, stats, serverHolderList);
}
log.info(
"[%s]: Segments Moved: [%d] Segments Let Alone: [%d]",
tier,
currentlyMovingSegments.get(tier).size(),
unmoved
);
currentlyMovingSegments.computeIfAbsent(tier, t -> new ConcurrentHashMap<>());
if (!currentlyMovingSegments.get(tier).isEmpty()) {
reduceLifetimes(tier);
log.info("[%s]: Still waiting on %,d segments to be moved", tier, currentlyMovingSegments.size());
return;
}
return params.buildFromExisting()
.withCoordinatorStats(stats)
.build();
final List<ServerHolder> serverHolderList = Lists.newArrayList(servers);
if (serverHolderList.size() <= 1) {
log.info("[%s]: One or fewer servers found. Cannot balance.", tier);
return;
}
int numSegments = 0;
for (ServerHolder server : serverHolderList) {
numSegments += server.getServer().getSegments().size();
}
if (numSegments == 0) {
log.info("No segments found. Cannot balance.");
return;
}
long unmoved = 0L;
for (int iter = 0; iter < maxSegmentsToMove; iter++) {
final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(serverHolderList);
if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) {
final ServerHolder holder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), serverHolderList);
if (holder != null) {
moveSegment(segmentToMove, holder.getServer(), params);
} else {
++unmoved;
}
}
}
if (unmoved == maxSegmentsToMove) {
// Cluster should be alive and constantly adjusting
log.info("No good moves found in tier [%s]", tier);
}
stats.addToTieredStat("unmovedCount", tier, unmoved);
stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size());
if (params.getCoordinatorDynamicConfig().emitBalancingStats()) {
strategy.emitStats(tier, stats, serverHolderList);
}
log.info(
"[%s]: Segments Moved: [%d] Segments Let Alone: [%d]",
tier,
currentlyMovingSegments.get(tier).size(),
unmoved
);
}
protected void moveSegment(
@ -174,18 +169,9 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
LoadPeonCallback callback = null;
try {
currentlyMovingSegments.get(toServer.getTier()).put(segmentName, segment);
callback = new LoadPeonCallback()
{
@Override
public void execute()
{
Map<String, BalancerSegmentHolder> movingSegments = currentlyMovingSegments.get(toServer.getTier());
if (movingSegments != null) {
movingSegments.remove(segmentName);
}
}
};
Map<String, BalancerSegmentHolder> movingSegments = currentlyMovingSegments.get(toServer.getTier());
movingSegments.put(segmentName, segment);
callback = () -> movingSegments.remove(segmentName);
coordinator.moveSegment(
fromServer,
toServer,
@ -199,9 +185,6 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
callback.execute();
}
}
} else {
currentlyMovingSegments.get(toServer.getTier()).remove(segmentName);
}
}
}

View File

@ -19,6 +19,7 @@
package io.druid.server.coordinator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -37,10 +38,14 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
*/
@ -52,11 +57,19 @@ public class DruidCoordinatorBalancerTest
private ImmutableDruidServer druidServer2;
private ImmutableDruidServer druidServer3;
private ImmutableDruidServer druidServer4;
private List<ImmutableDruidServer> druidServers;
private LoadQueuePeonTester peon1;
private LoadQueuePeonTester peon2;
private LoadQueuePeonTester peon3;
private LoadQueuePeonTester peon4;
private List<LoadQueuePeon> peons;
private DataSegment segment1;
private DataSegment segment2;
private DataSegment segment3;
private DataSegment segment4;
Map<String, DataSegment> segments;
private Map<String, DataSegment> segments;
private ListeningExecutorService balancerStrategyExecutor;
private BalancerStrategy balancerStrategy;
@Before
public void setUp() throws Exception
@ -78,9 +91,9 @@ public class DruidCoordinatorBalancerTest
"datasource1",
new Interval(start1, start1.plusHours(1)),
version.toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
Maps.newHashMap(),
Lists.newArrayList(),
Lists.newArrayList(),
NoneShardSpec.instance(),
0,
11L
@ -89,9 +102,9 @@ public class DruidCoordinatorBalancerTest
"datasource1",
new Interval(start2, start2.plusHours(1)),
version.toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
Maps.newHashMap(),
Lists.newArrayList(),
Lists.newArrayList(),
NoneShardSpec.instance(),
0,
7L
@ -100,9 +113,9 @@ public class DruidCoordinatorBalancerTest
"datasource2",
new Interval(start1, start1.plusHours(1)),
version.toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
Maps.newHashMap(),
Lists.newArrayList(),
Lists.newArrayList(),
NoneShardSpec.instance(),
0,
4L
@ -111,19 +124,30 @@ public class DruidCoordinatorBalancerTest
"datasource2",
new Interval(start2, start2.plusHours(1)),
version.toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
Maps.newHashMap(),
Lists.newArrayList(),
Lists.newArrayList(),
NoneShardSpec.instance(),
0,
8L
);
segments = new HashMap<String, DataSegment>();
segments = new HashMap<>();
segments.put("datasource1_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z", segment1);
segments.put("datasource1_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z", segment2);
segments.put("datasource2_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z", segment3);
segments.put("datasource2_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z", segment4);
peon1 = new LoadQueuePeonTester();
peon2 = new LoadQueuePeonTester();
peon3 = new LoadQueuePeonTester();
peon4 = new LoadQueuePeonTester();
druidServers = ImmutableList.of(druidServer1, druidServer2, druidServer3, druidServer4);
peons = ImmutableList.of(peon1, peon2, peon3, peon4);
balancerStrategyExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(balancerStrategyExecutor);
}
@After
@ -134,166 +158,87 @@ public class DruidCoordinatorBalancerTest
EasyMock.verify(druidServer2);
EasyMock.verify(druidServer3);
EasyMock.verify(druidServer4);
balancerStrategyExecutor.shutdownNow();
}
@Test
public void testMoveToEmptyServerBalancer() throws IOException
{
EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce();
EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes();
EasyMock.expect(druidServer1.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer1);
EasyMock.expect(druidServer2.getName()).andReturn("to").atLeastOnce();
EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
EasyMock.expect(druidServer2.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer2);
mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyMap());
EasyMock.replay(druidServer3);
EasyMock.replay(druidServer4);
// Mock stuff that the coordinator needs
coordinator.moveSegment(
EasyMock.<ImmutableDruidServer>anyObject(),
EasyMock.<ImmutableDruidServer>anyObject(),
EasyMock.<DataSegment>anyObject(),
EasyMock.<LoadPeonCallback>anyObject()
);
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(coordinator);
mockCoordinator(coordinator);
LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
LoadQueuePeonTester toPeon = new LoadQueuePeonTester();
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params =
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(
new DruidCluster(
null,
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
"normal",
MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator)
.create(
Arrays.asList(
new ServerHolder(druidServer1, fromPeon),
new ServerHolder(druidServer2, toPeon)
)
)
)
)
)
.withLoadManagementPeons(
ImmutableMap.<String, LoadQueuePeon>of(
"from",
fromPeon,
"to",
toPeon
)
)
.withAvailableSegments(segments.values())
.withDynamicConfigs(
new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove(
MAX_SEGMENTS_TO_MOVE
).build()
)
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2),
ImmutableList.of(peon1, peon2)
).build();
params = new DruidCoordinatorBalancerTester(coordinator).run(params);
Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0);
Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") < segments.size());
exec.shutdown();
}
@Test
public void testMoveSameSegmentTwice() throws Exception
{
mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyMap());
EasyMock.replay(druidServer3);
EasyMock.replay(druidServer4);
// Mock stuff that the coordinator needs
mockCoordinator(coordinator);
BalancerStrategy predefinedPickOrderStrategy = new PredefinedPickOrderBalancerStrategy(
balancerStrategy,
ImmutableList.of(
new BalancerSegmentHolder(druidServer1, segment1)
)
);
DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2),
ImmutableList.of(peon1, peon2)
)
.withBalancerStrategy(predefinedPickOrderStrategy)
.withDynamicConfigs(
new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove(
2
).build()
)
.build();
params = new DruidCoordinatorBalancerTester(coordinator).run(params);
Assert.assertEquals(1, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
}
@Test
public void testRun1() throws IOException
{
// Mock some servers of different usages
EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce();
EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes();
EasyMock.expect(druidServer1.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer1);
EasyMock.expect(druidServer2.getName()).andReturn("to").atLeastOnce();
EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
EasyMock.expect(druidServer2.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer2);
mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyMap());
EasyMock.replay(druidServer3);
EasyMock.replay(druidServer4);
// Mock stuff that the coordinator needs
coordinator.moveSegment(
EasyMock.<ImmutableDruidServer>anyObject(),
EasyMock.<ImmutableDruidServer>anyObject(),
EasyMock.<DataSegment>anyObject(),
EasyMock.<LoadPeonCallback>anyObject()
);
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(coordinator);
mockCoordinator(coordinator);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
LoadQueuePeonTester toPeon = new LoadQueuePeonTester();
DruidCoordinatorRuntimeParams params =
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(
new DruidCluster(
null,
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
"normal",
MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator)
.create(
Arrays.asList(
new ServerHolder(druidServer1, fromPeon),
new ServerHolder(druidServer2, toPeon)
)
)
)
)
)
.withLoadManagementPeons(
ImmutableMap.<String, LoadQueuePeon>of(
"from",
fromPeon,
"to",
toPeon
)
)
.withAvailableSegments(segments.values())
.withDynamicConfigs(
new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
.build()
)
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2),
ImmutableList.of(peon1, peon2)
).build();
params = new DruidCoordinatorBalancerTester(coordinator).run(params);
Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0);
exec.shutdown();
}
@ -301,101 +246,132 @@ public class DruidCoordinatorBalancerTest
public void testRun2() throws IOException
{
// Mock some servers of different usages
EasyMock.expect(druidServer1.getName()).andReturn("1").atLeastOnce();
EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes();
EasyMock.expect(druidServer1.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer1);
EasyMock.expect(druidServer2.getName()).andReturn("2").atLeastOnce();
EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
EasyMock.expect(druidServer2.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer2);
EasyMock.expect(druidServer3.getName()).andReturn("3").atLeastOnce();
EasyMock.expect(druidServer3.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer3.getCurrSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(druidServer3.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer3.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
EasyMock.expect(druidServer3.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer3);
EasyMock.expect(druidServer4.getName()).andReturn("4").atLeastOnce();
EasyMock.expect(druidServer4.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer4.getCurrSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(druidServer4.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer4.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
EasyMock.expect(druidServer4.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer4);
mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyMap());
mockDruidServer(druidServer3, "3", "normal", 0L, 100L, Collections.emptyMap());
mockDruidServer(druidServer4, "4", "normal", 0L, 100L, Collections.emptyMap());
// Mock stuff that the coordinator needs
coordinator.moveSegment(
EasyMock.<ImmutableDruidServer>anyObject(),
EasyMock.<ImmutableDruidServer>anyObject(),
EasyMock.<DataSegment>anyObject(),
EasyMock.<LoadPeonCallback>anyObject()
);
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(coordinator);
mockCoordinator(coordinator);
LoadQueuePeonTester peon1 = new LoadQueuePeonTester();
LoadQueuePeonTester peon2 = new LoadQueuePeonTester();
LoadQueuePeonTester peon3 = new LoadQueuePeonTester();
LoadQueuePeonTester peon4 = new LoadQueuePeonTester();
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DruidCoordinatorRuntimeParams params =
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(
new DruidCluster(
null,
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
"normal",
MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator)
.create(
Arrays.asList(
new ServerHolder(druidServer1, peon1),
new ServerHolder(druidServer2, peon2),
new ServerHolder(druidServer3, peon3),
new ServerHolder(druidServer4, peon4)
)
)
)
)
)
.withLoadManagementPeons(
ImmutableMap.<String, LoadQueuePeon>of(
"1",
peon1,
"2",
peon2,
"3",
peon3,
"4",
peon4
)
)
.withAvailableSegments(segments.values())
.withDynamicConfigs(
new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove(
MAX_SEGMENTS_TO_MOVE
).build()
)
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(druidServers, peons).build();
params = new DruidCoordinatorBalancerTester(coordinator).run(params);
Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0);
exec.shutdown();
}
private DruidCoordinatorRuntimeParams.Builder defaullRuntimeParamsBuilder(
List<ImmutableDruidServer> druidServers,
List<LoadQueuePeon> peons
)
{
return DruidCoordinatorRuntimeParams
.newBuilder()
.withDruidCluster(
new DruidCluster(
null,
ImmutableMap.of(
"normal",
MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator)
.create(
IntStream
.range(0, druidServers.size())
.mapToObj(i -> new ServerHolder(druidServers.get(i), peons.get(i)))
.collect(Collectors.toList())
)
)
)
)
.withLoadManagementPeons(
IntStream
.range(0, peons.size())
.boxed()
.collect(Collectors.toMap(i -> String.valueOf(i + 1), peons::get))
)
.withAvailableSegments(segments.values())
.withDynamicConfigs(
new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove(
MAX_SEGMENTS_TO_MOVE
).build()
)
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"));
}
private void mockDruidServer(
ImmutableDruidServer druidServer,
String name,
String tier,
long currentSize,
long maxSize,
Map<String, DataSegment> segments
)
{
EasyMock.expect(druidServer.getName()).andReturn(name).atLeastOnce();
EasyMock.expect(druidServer.getTier()).andReturn(tier).anyTimes();
EasyMock.expect(druidServer.getCurrSize()).andReturn(currentSize).atLeastOnce();
EasyMock.expect(druidServer.getMaxSize()).andReturn(maxSize).atLeastOnce();
EasyMock.expect(druidServer.getSegments()).andReturn(segments).anyTimes();
EasyMock.expect(druidServer.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer);
}
private void mockCoordinator(DruidCoordinator coordinator)
{
coordinator.moveSegment(
EasyMock.anyObject(),
EasyMock.anyObject(),
EasyMock.anyObject(),
EasyMock.anyObject()
);
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(coordinator);
}
private static class PredefinedPickOrderBalancerStrategy implements BalancerStrategy
{
private final BalancerStrategy delegate;
private final List<BalancerSegmentHolder> pickOrder;
private final AtomicInteger pickCounter = new AtomicInteger(0);
public PredefinedPickOrderBalancerStrategy(
BalancerStrategy delegate,
List<BalancerSegmentHolder> pickOrder
)
{
this.delegate = delegate;
this.pickOrder = pickOrder;
}
@Override
public ServerHolder findNewSegmentHomeBalancer(
DataSegment proposalSegment, List<ServerHolder> serverHolders
)
{
return delegate.findNewSegmentHomeBalancer(proposalSegment, serverHolders);
}
@Override
public ServerHolder findNewSegmentHomeReplicator(
DataSegment proposalSegment, List<ServerHolder> serverHolders
)
{
return delegate.findNewSegmentHomeReplicator(proposalSegment, serverHolders);
}
@Override
public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders)
{
return pickOrder.get(pickCounter.getAndIncrement() % pickOrder.size());
}
@Override
public void emitStats(
String tier, CoordinatorStats stats, List<ServerHolder> serverHolderList
)
{
delegate.emitStats(tier, stats, serverHolderList);
}
}
}

View File

@ -70,8 +70,6 @@ public class DruidCoordinatorBalancerTester extends DruidCoordinatorBalancer
catch (Exception e) {
log.info(e, StringUtils.format("[%s] : Moving exception", segmentName));
}
} else {
currentlyMovingSegments.get("normal").remove(segmentName);
}
}
}