mirror of https://github.com/apache/druid.git
initial cost analyzer commit
This commit is contained in:
parent
5137af0750
commit
366216f9f3
|
@ -0,0 +1,229 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.metamx.druid.master;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.MinMaxPriorityQueue;
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
|
import com.metamx.common.Pair;
|
||||||
|
import com.metamx.common.logger.Logger;
|
||||||
|
import com.metamx.druid.client.DataSegment;
|
||||||
|
import com.metamx.druid.client.DruidDataSource;
|
||||||
|
import com.metamx.druid.client.DruidServer;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The BalancerCostAnalyzer
|
||||||
|
*/
|
||||||
|
public class BalancerCostAnalyzer
|
||||||
|
{
|
||||||
|
private static final Logger log = new Logger(BalancerCostAnalyzer.class);
|
||||||
|
private static final int MAX_SEGMENTS_TO_MOVE = 5;
|
||||||
|
private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24;
|
||||||
|
private float initialTotalCost;
|
||||||
|
private float totalCostChange;
|
||||||
|
|
||||||
|
private List<ServerHolder> serverHolderList;
|
||||||
|
private Random rand;
|
||||||
|
|
||||||
|
public BalancerCostAnalyzer(){
|
||||||
|
rand = new Random();
|
||||||
|
totalCostChange = 0f;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void init(List<ServerHolder> serverHolderList){
|
||||||
|
this.initialTotalCost = calculateInitialTotalCost(serverHolderList);
|
||||||
|
this.serverHolderList = serverHolderList;
|
||||||
|
}
|
||||||
|
|
||||||
|
public float getInitialTotalCost() {
|
||||||
|
return initialTotalCost;
|
||||||
|
}
|
||||||
|
|
||||||
|
public float getTotalCostChange() {
|
||||||
|
return totalCostChange;
|
||||||
|
}
|
||||||
|
|
||||||
|
public float calculateInitialTotalCost(List<ServerHolder> serverHolderList){
|
||||||
|
int cost = 0;
|
||||||
|
for (ServerHolder server : serverHolderList) {
|
||||||
|
DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{});
|
||||||
|
for (int i = 0; i < segments.length; ++i) {
|
||||||
|
for (int j = i; j < segments.length; ++j) {
|
||||||
|
cost += computeJointSegmentCosts(segments[i], segments[j]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return cost;
|
||||||
|
}
|
||||||
|
|
||||||
|
public float calculateTotalCostChange(List<ServerHolder> serverHolderList, Set<BalancerSegmentHolder2> segmentHoldersToMove){
|
||||||
|
int cost = 0;
|
||||||
|
Iterator it = segmentHoldersToMove.iterator();
|
||||||
|
while (it.hasNext()) {
|
||||||
|
BalancerSegmentHolder2 segmentHolder = (BalancerSegmentHolder2) it.next();
|
||||||
|
for (DataSegment fromSegment : segmentHolder.getFromServer().getSegments().values()) {
|
||||||
|
cost -= computeJointSegmentCosts(segmentHolder.getSegment(), fromSegment);
|
||||||
|
}
|
||||||
|
for (DataSegment toSegment : segmentHolder.getToServer().getSegments().values()) {
|
||||||
|
cost += computeJointSegmentCosts(segmentHolder.getSegment(), toSegment);
|
||||||
|
}
|
||||||
|
return cost;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (ServerHolder server : serverHolderList) {
|
||||||
|
DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{});
|
||||||
|
for (int i = 0; i < segments.length; ++i) {
|
||||||
|
for (int j = i; j < segments.length; ++j) {
|
||||||
|
cost += computeJointSegmentCosts(segments[i], segments[j]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return cost;
|
||||||
|
}
|
||||||
|
|
||||||
|
public float computeJointSegmentCosts(DataSegment segment1, DataSegment segment2){
|
||||||
|
float cost = 0f;
|
||||||
|
Interval gap = segment1.getInterval().gap(segment2.getInterval());
|
||||||
|
|
||||||
|
// gap is null if the two segment intervals overlap or if they're adjacent
|
||||||
|
if (gap == null) {
|
||||||
|
cost += 1f;
|
||||||
|
} else {
|
||||||
|
long gapMillis = gap.toDurationMillis();
|
||||||
|
if (gapMillis < DAY_IN_MILLIS) cost += 1f;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(segment1.getDataSource().equals(segment2.getDataSource())) cost += 1f;
|
||||||
|
|
||||||
|
return cost;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Set<BalancerSegmentHolder2> findSegmentsToMove()
|
||||||
|
{
|
||||||
|
Set<BalancerSegmentHolder2> segmentHoldersToMove = Sets.newHashSet();
|
||||||
|
Set<DataSegment> movingSegments = Sets.newHashSet();
|
||||||
|
|
||||||
|
int counter = 0;
|
||||||
|
|
||||||
|
while (segmentHoldersToMove.size() < MAX_SEGMENTS_TO_MOVE && counter < 2 * MAX_SEGMENTS_TO_MOVE) {
|
||||||
|
counter++;
|
||||||
|
ServerHolder fromServerHolder = serverHolderList.get(rand.nextInt(serverHolderList.size()));
|
||||||
|
List<DataSegment> segments = Lists.newArrayList(fromServerHolder.getServer().getSegments().values());
|
||||||
|
if (segments.size() == 0) continue;
|
||||||
|
DataSegment proposalSegment = segments.get(rand.nextInt(segments.size()));
|
||||||
|
if (movingSegments.contains(proposalSegment)) continue;
|
||||||
|
|
||||||
|
//Just need a regular priority queue for the min. element.
|
||||||
|
MinMaxPriorityQueue<Pair<Float, ServerHolder>> pQueue = MinMaxPriorityQueue.orderedBy(
|
||||||
|
new Comparator<Pair<Float, ServerHolder>>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public int compare(
|
||||||
|
Pair<Float, ServerHolder> o,
|
||||||
|
Pair<Float, ServerHolder> o1
|
||||||
|
)
|
||||||
|
{
|
||||||
|
return Float.compare(o.lhs, o1.lhs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
).create();
|
||||||
|
|
||||||
|
for (ServerHolder server : serverHolderList) {
|
||||||
|
float cost = 0f;
|
||||||
|
for (DataSegment segment : server.getServer().getSegments().values()) {
|
||||||
|
cost += computeJointSegmentCosts(proposalSegment, segment);
|
||||||
|
}
|
||||||
|
|
||||||
|
//Take into account costs of segments that will be moved.
|
||||||
|
Iterator it = segmentHoldersToMove.iterator();
|
||||||
|
while (it.hasNext()) {
|
||||||
|
BalancerSegmentHolder2 segmentToMove = (BalancerSegmentHolder2) it.next();
|
||||||
|
if (server.getServer().equals(segmentToMove.getToServer())) {
|
||||||
|
cost += computeJointSegmentCosts(proposalSegment, segmentToMove.getSegment());
|
||||||
|
}
|
||||||
|
if (server.getServer().equals(segmentToMove.getFromServer())) {
|
||||||
|
cost -= computeJointSegmentCosts(proposalSegment, segmentToMove.getSegment());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pQueue.add(Pair.of(cost, server));
|
||||||
|
}
|
||||||
|
|
||||||
|
Pair<Float, ServerHolder> minPair = pQueue.peekFirst();
|
||||||
|
if (!minPair.rhs.equals(fromServerHolder)) {
|
||||||
|
movingSegments.add(proposalSegment);
|
||||||
|
segmentHoldersToMove.add(
|
||||||
|
new BalancerSegmentHolder2(
|
||||||
|
fromServerHolder.getServer(),
|
||||||
|
minPair.rhs.getServer(),
|
||||||
|
proposalSegment
|
||||||
|
)
|
||||||
|
);
|
||||||
|
totalCostChange += minPair.lhs;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
totalCostChange = calculateTotalCostChange(serverHolderList, segmentHoldersToMove);
|
||||||
|
return segmentHoldersToMove;
|
||||||
|
|
||||||
|
/*
|
||||||
|
double currPercentDiff = getPercentDiff();
|
||||||
|
|
||||||
|
if (currPercentDiff < PERCENT_THRESHOLD) {
|
||||||
|
log.info("Cluster usage is balanced.");
|
||||||
|
return segmentsToMove;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<DruidDataSource> dataSources = Lists.newArrayList(server.getDataSources());
|
||||||
|
Collections.shuffle(dataSources);
|
||||||
|
|
||||||
|
for (DruidDataSource dataSource : dataSources) {
|
||||||
|
List<DataSegment> segments = Lists.newArrayList(dataSource.getSegments());
|
||||||
|
Collections.shuffle(segments);
|
||||||
|
|
||||||
|
for (DataSegment segment : segments) {
|
||||||
|
if (segmentsToMove.size() >= MAX_SEGMENTS_TO_MOVE) {
|
||||||
|
return segmentsToMove;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (getLookaheadPercentDiff(highestSizeUsed - segment.getSize(), lowestSizeUsed + segment.getSize())
|
||||||
|
< currPercentDiff) {
|
||||||
|
segmentsToMove.add(new BalancerSegmentHolder(server, segment));
|
||||||
|
update(highestSizeUsed - segment.getSize(), lowestSizeUsed + segment.getSize());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return segmentsToMove;
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,70 @@
|
||||||
|
/*
|
||||||
|
* Druid - a distributed column store.
|
||||||
|
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or
|
||||||
|
* modify it under the terms of the GNU General Public License
|
||||||
|
* as published by the Free Software Foundation; either version 2
|
||||||
|
* of the License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.metamx.druid.master;
|
||||||
|
|
||||||
|
import com.metamx.druid.client.DataSegment;
|
||||||
|
import com.metamx.druid.client.DruidServer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class BalancerSegmentHolder2
|
||||||
|
{
|
||||||
|
private final DruidServer fromServer;
|
||||||
|
private final DruidServer toServer;
|
||||||
|
private final DataSegment segment;
|
||||||
|
|
||||||
|
private volatile int lifetime = 15;
|
||||||
|
|
||||||
|
public BalancerSegmentHolder2(
|
||||||
|
DruidServer fromServer,
|
||||||
|
DruidServer toServer,
|
||||||
|
DataSegment segment
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.fromServer = fromServer;
|
||||||
|
this.toServer = toServer;
|
||||||
|
this.segment = segment;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DruidServer getFromServer()
|
||||||
|
{
|
||||||
|
return fromServer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DruidServer getToServer()
|
||||||
|
{
|
||||||
|
return toServer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DataSegment getSegment()
|
||||||
|
{
|
||||||
|
return segment;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getLifetime()
|
||||||
|
{
|
||||||
|
return lifetime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void reduceLifetime()
|
||||||
|
{
|
||||||
|
lifetime--;
|
||||||
|
}
|
||||||
|
}
|
|
@ -674,7 +674,7 @@ public class DruidMaster
|
||||||
},
|
},
|
||||||
new DruidMasterRuleRunner(DruidMaster.this),
|
new DruidMasterRuleRunner(DruidMaster.this),
|
||||||
new DruidMasterCleanup(DruidMaster.this),
|
new DruidMasterCleanup(DruidMaster.this),
|
||||||
new DruidMasterBalancer(DruidMaster.this, new BalancerAnalyzer()),
|
new DruidMasterBalancer(DruidMaster.this, new BalancerCostAnalyzer()),
|
||||||
new DruidMasterLogger()
|
new DruidMasterLogger()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
|
@ -24,14 +24,19 @@ import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.collect.MinMaxPriorityQueue;
|
import com.google.common.collect.MinMaxPriorityQueue;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
|
import com.metamx.common.Pair;
|
||||||
import com.metamx.common.guava.Comparators;
|
import com.metamx.common.guava.Comparators;
|
||||||
import com.metamx.druid.client.DataSegment;
|
import com.metamx.druid.client.DataSegment;
|
||||||
import com.metamx.druid.client.DruidServer;
|
import com.metamx.druid.client.DruidServer;
|
||||||
import com.metamx.emitter.EmittingLogger;
|
import com.metamx.emitter.EmittingLogger;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
@ -52,14 +57,14 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
||||||
);
|
);
|
||||||
private static final EmittingLogger log = new EmittingLogger(DruidMasterBalancer.class);
|
private static final EmittingLogger log = new EmittingLogger(DruidMasterBalancer.class);
|
||||||
|
|
||||||
private final BalancerAnalyzer analyzer;
|
private final BalancerCostAnalyzer analyzer;
|
||||||
private final DruidMaster master;
|
private final DruidMaster master;
|
||||||
|
|
||||||
private final Map<String, ConcurrentHashMap<String, BalancerSegmentHolder>> currentlyMovingSegments = Maps.newHashMap();
|
private final Map<String, ConcurrentHashMap<String, BalancerSegmentHolder2>> currentlyMovingSegments = Maps.newHashMap();
|
||||||
|
|
||||||
public DruidMasterBalancer(
|
public DruidMasterBalancer(
|
||||||
DruidMaster master,
|
DruidMaster master,
|
||||||
BalancerAnalyzer analyzer
|
BalancerCostAnalyzer analyzer
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.master = master;
|
this.master = master;
|
||||||
|
@ -68,7 +73,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
||||||
|
|
||||||
private void reduceLifetimes(String tier)
|
private void reduceLifetimes(String tier)
|
||||||
{
|
{
|
||||||
for (BalancerSegmentHolder holder : currentlyMovingSegments.get(tier).values()) {
|
for (BalancerSegmentHolder2 holder : currentlyMovingSegments.get(tier).values()) {
|
||||||
holder.reduceLifetime();
|
holder.reduceLifetime();
|
||||||
if (holder.getLifetime() <= 0) {
|
if (holder.getLifetime() <= 0) {
|
||||||
log.makeAlert(
|
log.makeAlert(
|
||||||
|
@ -92,7 +97,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
||||||
String tier = entry.getKey();
|
String tier = entry.getKey();
|
||||||
|
|
||||||
if (currentlyMovingSegments.get(tier) == null) {
|
if (currentlyMovingSegments.get(tier) == null) {
|
||||||
currentlyMovingSegments.put(tier, new ConcurrentHashMap<String, BalancerSegmentHolder>());
|
currentlyMovingSegments.put(tier, new ConcurrentHashMap<String, BalancerSegmentHolder2>());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!currentlyMovingSegments.get(tier).isEmpty()) {
|
if (!currentlyMovingSegments.get(tier).isEmpty()) {
|
||||||
|
@ -108,6 +113,21 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
||||||
TreeSet<ServerHolder> serversByPercentUsed = Sets.newTreeSet(percentUsedComparator);
|
TreeSet<ServerHolder> serversByPercentUsed = Sets.newTreeSet(percentUsedComparator);
|
||||||
serversByPercentUsed.addAll(entry.getValue());
|
serversByPercentUsed.addAll(entry.getValue());
|
||||||
|
|
||||||
|
List<ServerHolder> serverHolderList = new ArrayList<ServerHolder>(entry.getValue());
|
||||||
|
|
||||||
|
analyzer.init(serverHolderList);
|
||||||
|
log.info(
|
||||||
|
"Initial Total Cost: [%s]",
|
||||||
|
analyzer.getInitialTotalCost()
|
||||||
|
);
|
||||||
|
moveSegments(analyzer.findSegmentsToMove(), params);
|
||||||
|
|
||||||
|
stats.addToTieredStat("costChange", tier, (long) analyzer.getTotalCostChange());
|
||||||
|
log.info(
|
||||||
|
"Cost Change: [%s]",
|
||||||
|
analyzer.getTotalCostChange()
|
||||||
|
);
|
||||||
|
|
||||||
if (serversByPercentUsed.size() <= 1) {
|
if (serversByPercentUsed.size() <= 1) {
|
||||||
log.info(
|
log.info(
|
||||||
"[%s]: No unique values found for highest and lowest percent used servers: nothing to balance",
|
"[%s]: No unique values found for highest and lowest percent used servers: nothing to balance",
|
||||||
|
@ -119,6 +139,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
||||||
ServerHolder highestPercentUsedServer = serversByPercentUsed.first();
|
ServerHolder highestPercentUsedServer = serversByPercentUsed.first();
|
||||||
ServerHolder lowestPercentUsedServer = serversByPercentUsed.last();
|
ServerHolder lowestPercentUsedServer = serversByPercentUsed.last();
|
||||||
|
|
||||||
|
/*
|
||||||
analyzer.init(highestPercentUsedServer, lowestPercentUsedServer);
|
analyzer.init(highestPercentUsedServer, lowestPercentUsedServer);
|
||||||
|
|
||||||
log.info(
|
log.info(
|
||||||
|
@ -149,6 +170,8 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
||||||
analyzer.findSegmentsToMove(highestPercentUsedServer.getServer()),
|
analyzer.findSegmentsToMove(highestPercentUsedServer.getServer()),
|
||||||
params
|
params
|
||||||
);
|
);
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size());
|
stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size());
|
||||||
}
|
}
|
||||||
|
@ -159,53 +182,55 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
||||||
}
|
}
|
||||||
|
|
||||||
private void moveSegments(
|
private void moveSegments(
|
||||||
final DruidServer server,
|
final Set<BalancerSegmentHolder2> segments,
|
||||||
final Set<BalancerSegmentHolder> segments,
|
|
||||||
final DruidMasterRuntimeParams params
|
final DruidMasterRuntimeParams params
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
String toServer = server.getName();
|
|
||||||
LoadQueuePeon toPeon = params.getLoadManagementPeons().get(toServer);
|
|
||||||
|
|
||||||
for (final BalancerSegmentHolder segment : Sets.newHashSet(segments)) {
|
for (final BalancerSegmentHolder2 segment : Sets.newHashSet(segments)) {
|
||||||
String fromServer = segment.getServer().getName();
|
final DruidServer toServer = segment.getToServer();
|
||||||
|
final String toServerName = segment.getToServer().getName();
|
||||||
|
LoadQueuePeon toPeon = params.getLoadManagementPeons().get(toServerName);
|
||||||
|
|
||||||
|
String fromServer = segment.getFromServer().getName();
|
||||||
DataSegment segmentToMove = segment.getSegment();
|
DataSegment segmentToMove = segment.getSegment();
|
||||||
final String segmentName = segmentToMove.getIdentifier();
|
final String segmentName = segmentToMove.getIdentifier();
|
||||||
|
|
||||||
if (!toPeon.getSegmentsToLoad().contains(segmentToMove) &&
|
if (!toPeon.getSegmentsToLoad().contains(segmentToMove) &&
|
||||||
(server.getSegment(segmentName) == null) &&
|
(toServer.getSegment(segmentName) == null) &&
|
||||||
new ServerHolder(server, toPeon).getAvailableSize() > segmentToMove.getSize()) {
|
new ServerHolder(toServer, toPeon).getAvailableSize() > segmentToMove.getSize()) {
|
||||||
log.info(
|
log.info(
|
||||||
"Moving [%s] from [%s] to [%s]",
|
"Moving [%s] from [%s] to [%s]",
|
||||||
segmentName,
|
segmentName,
|
||||||
fromServer,
|
fromServer,
|
||||||
toServer
|
toServerName
|
||||||
);
|
);
|
||||||
try {
|
try {
|
||||||
master.moveSegment(
|
master.moveSegment(
|
||||||
fromServer,
|
fromServer,
|
||||||
toServer,
|
toServerName,
|
||||||
segmentToMove.getIdentifier(),
|
segmentToMove.getIdentifier(),
|
||||||
new LoadPeonCallback()
|
new LoadPeonCallback()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
protected void execute()
|
protected void execute()
|
||||||
{
|
{
|
||||||
Map<String, BalancerSegmentHolder> movingSegments = currentlyMovingSegments.get(server.getTier());
|
Map<String, BalancerSegmentHolder2> movingSegments = currentlyMovingSegments.get(toServer.getTier());
|
||||||
if (movingSegments != null) {
|
if (movingSegments != null) {
|
||||||
movingSegments.remove(segmentName);
|
movingSegments.remove(segmentName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
currentlyMovingSegments.get(server.getTier()).put(segmentName, segment);
|
currentlyMovingSegments.get(toServer.getTier()).put(segmentName, segment);
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
log.makeAlert(e, String.format("[%s] : Moving exception", segmentName)).emit();
|
log.makeAlert(e, String.format("[%s] : Moving exception", segmentName)).emit();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
currentlyMovingSegments.get(server.getTier()).remove(segment);
|
currentlyMovingSegments.get(toServer.getTier()).remove(segment);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -27,11 +27,14 @@ import com.metamx.druid.client.DruidDataSource;
|
||||||
import com.metamx.druid.client.DruidServer;
|
import com.metamx.druid.client.DruidServer;
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
import org.easymock.EasyMock;
|
import org.easymock.EasyMock;
|
||||||
|
import org.joda.time.Interval;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
|
@ -46,6 +49,8 @@ public class DruidMasterBalancerTest
|
||||||
private DataSegment segment4;
|
private DataSegment segment4;
|
||||||
private LoadQueuePeon peon;
|
private LoadQueuePeon peon;
|
||||||
private DruidDataSource dataSource;
|
private DruidDataSource dataSource;
|
||||||
|
private Map<String, DataSegment> segments;
|
||||||
|
private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception
|
public void setUp() throws Exception
|
||||||
|
@ -78,17 +83,25 @@ public class DruidMasterBalancerTest
|
||||||
@Test
|
@Test
|
||||||
public void testRun()
|
public void testRun()
|
||||||
{
|
{
|
||||||
|
Map<String, DataSegment> segments = new HashMap<String, DataSegment>();
|
||||||
|
segments.put("segment1", segment1);
|
||||||
|
segments.put("segment2", segment2);
|
||||||
|
segments.put("segment3", segment3);
|
||||||
|
segments.put("segment4", segment4);
|
||||||
|
|
||||||
// Mock some servers of different usages
|
// Mock some servers of different usages
|
||||||
EasyMock.expect(druidServerHigh.getName()).andReturn("from").atLeastOnce();
|
EasyMock.expect(druidServerHigh.getName()).andReturn("from").atLeastOnce();
|
||||||
EasyMock.expect(druidServerHigh.getCurrSize()).andReturn(30L).atLeastOnce();
|
EasyMock.expect(druidServerHigh.getCurrSize()).andReturn(30L).atLeastOnce();
|
||||||
EasyMock.expect(druidServerHigh.getMaxSize()).andReturn(100L).atLeastOnce();
|
EasyMock.expect(druidServerHigh.getMaxSize()).andReturn(100L).atLeastOnce();
|
||||||
EasyMock.expect(druidServerHigh.getDataSources()).andReturn(Arrays.asList(dataSource)).atLeastOnce();
|
EasyMock.expect(druidServerHigh.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes();
|
||||||
|
EasyMock.expect(druidServerHigh.getSegments()).andReturn(segments).anyTimes();
|
||||||
EasyMock.replay(druidServerHigh);
|
EasyMock.replay(druidServerHigh);
|
||||||
|
|
||||||
EasyMock.expect(druidServerLow.getName()).andReturn("to").atLeastOnce();
|
EasyMock.expect(druidServerLow.getName()).andReturn("to").atLeastOnce();
|
||||||
EasyMock.expect(druidServerLow.getTier()).andReturn("normal").atLeastOnce();
|
EasyMock.expect(druidServerLow.getTier()).andReturn("normal").atLeastOnce();
|
||||||
EasyMock.expect(druidServerLow.getCurrSize()).andReturn(0L).atLeastOnce();
|
EasyMock.expect(druidServerLow.getCurrSize()).andReturn(0L).atLeastOnce();
|
||||||
EasyMock.expect(druidServerLow.getMaxSize()).andReturn(100L).atLeastOnce();
|
EasyMock.expect(druidServerLow.getMaxSize()).andReturn(100L).atLeastOnce();
|
||||||
|
EasyMock.expect(druidServerLow.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
|
||||||
EasyMock.expect(druidServerLow.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes();
|
EasyMock.expect(druidServerLow.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes();
|
||||||
EasyMock.expect(druidServerLow.getSegment("segment1")).andReturn(null).anyTimes();
|
EasyMock.expect(druidServerLow.getSegment("segment1")).andReturn(null).anyTimes();
|
||||||
EasyMock.expect(druidServerLow.getSegment("segment2")).andReturn(null).anyTimes();
|
EasyMock.expect(druidServerLow.getSegment("segment2")).andReturn(null).anyTimes();
|
||||||
|
@ -104,21 +117,29 @@ public class DruidMasterBalancerTest
|
||||||
segment3,
|
segment3,
|
||||||
segment4
|
segment4
|
||||||
)
|
)
|
||||||
).atLeastOnce();
|
).anyTimes();
|
||||||
EasyMock.replay(dataSource);
|
EasyMock.replay(dataSource);
|
||||||
|
|
||||||
// Mock some segments of different sizes
|
// Mock some segments of different sizes
|
||||||
EasyMock.expect(segment1.getSize()).andReturn(11L).anyTimes();
|
EasyMock.expect(segment1.getSize()).andReturn(11L).anyTimes();
|
||||||
EasyMock.expect(segment1.getIdentifier()).andReturn("segment1").anyTimes();
|
EasyMock.expect(segment1.getIdentifier()).andReturn("segment1").anyTimes();
|
||||||
|
EasyMock.expect(segment1.getDataSource()).andReturn("datasource1").anyTimes();
|
||||||
|
EasyMock.expect(segment1.getInterval()).andReturn(new Interval(0, 1 * DAY_IN_MILLIS)).anyTimes();
|
||||||
EasyMock.replay(segment1);
|
EasyMock.replay(segment1);
|
||||||
EasyMock.expect(segment2.getSize()).andReturn(7L).anyTimes();
|
EasyMock.expect(segment2.getSize()).andReturn(7L).anyTimes();
|
||||||
EasyMock.expect(segment2.getIdentifier()).andReturn("segment2").anyTimes();
|
EasyMock.expect(segment2.getIdentifier()).andReturn("segment2").anyTimes();
|
||||||
|
EasyMock.expect(segment2.getDataSource()).andReturn("datasource1").anyTimes();
|
||||||
|
EasyMock.expect(segment2.getInterval()).andReturn(new Interval(10 * DAY_IN_MILLIS, 11 * DAY_IN_MILLIS)).anyTimes();
|
||||||
EasyMock.replay(segment2);
|
EasyMock.replay(segment2);
|
||||||
EasyMock.expect(segment3.getSize()).andReturn(4L).anyTimes();
|
EasyMock.expect(segment3.getSize()).andReturn(4L).anyTimes();
|
||||||
EasyMock.expect(segment3.getIdentifier()).andReturn("segment3").anyTimes();
|
EasyMock.expect(segment3.getIdentifier()).andReturn("segment3").anyTimes();
|
||||||
|
EasyMock.expect(segment3.getDataSource()).andReturn("datasource1").anyTimes();
|
||||||
|
EasyMock.expect(segment3.getInterval()).andReturn(new Interval(0, 1 * DAY_IN_MILLIS)).anyTimes();
|
||||||
EasyMock.replay(segment3);
|
EasyMock.replay(segment3);
|
||||||
EasyMock.expect(segment4.getSize()).andReturn(8L).anyTimes();
|
EasyMock.expect(segment4.getSize()).andReturn(8L).anyTimes();
|
||||||
EasyMock.expect(segment4.getIdentifier()).andReturn("segment4").anyTimes();
|
EasyMock.expect(segment4.getIdentifier()).andReturn("segment4").anyTimes();
|
||||||
|
EasyMock.expect(segment4.getDataSource()).andReturn("datasource1").anyTimes();
|
||||||
|
EasyMock.expect(segment4.getInterval()).andReturn(new Interval(10 * DAY_IN_MILLIS, 11 * DAY_IN_MILLIS)).anyTimes();
|
||||||
EasyMock.replay(segment4);
|
EasyMock.replay(segment4);
|
||||||
|
|
||||||
// Mock stuff that the master needs
|
// Mock stuff that the master needs
|
||||||
|
@ -154,7 +175,7 @@ public class DruidMasterBalancerTest
|
||||||
.withLoadManagementPeons(ImmutableMap.of("from", peon, "to", peon))
|
.withLoadManagementPeons(ImmutableMap.of("from", peon, "to", peon))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
params = new DruidMasterBalancer(master, new BalancerAnalyzer()).run(params);
|
params = new DruidMasterBalancer(master, new BalancerCostAnalyzer()).run(params);
|
||||||
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
|
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue