mirror of https://github.com/apache/druid.git
update
This commit is contained in:
parent
bb31fa7b68
commit
117cac2795
|
@ -1,131 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.master;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.client.DruidDataSource;
|
||||
import com.metamx.druid.client.DruidServer;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* The BalancerAnalyzer keeps the state of the highest and lowest percent used servers. It will update
|
||||
* these states and perform lookaheads to make sure the updated states result in a more balanced cluster.
|
||||
*/
|
||||
public class BalancerAnalyzer
|
||||
{
|
||||
private static final Logger log = new Logger(BalancerAnalyzer.class);
|
||||
private static final int PERCENT_THRESHOLD = 3;
|
||||
private static final int MAX_SEGMENTS_TO_MOVE = 5;
|
||||
|
||||
private volatile Long highestSizeUsed;
|
||||
private volatile double highestPercentUsed;
|
||||
private volatile Long highestPercentUsedServerMaxSize;
|
||||
private volatile Long lowestSizeUsed;
|
||||
private volatile double lowestPercentUsed;
|
||||
private volatile Long lowestPercentUsedServerMaxSize;
|
||||
|
||||
public BalancerAnalyzer()
|
||||
{
|
||||
this.highestSizeUsed = 0L;
|
||||
this.highestPercentUsed = 0;
|
||||
this.highestPercentUsedServerMaxSize = 0L;
|
||||
this.lowestSizeUsed = 0L;
|
||||
this.lowestPercentUsed = 0;
|
||||
this.lowestPercentUsedServerMaxSize = 0L;
|
||||
}
|
||||
|
||||
public void init(ServerHolder highestPercentUsedServer, ServerHolder lowestPercentUsedServer)
|
||||
{
|
||||
highestSizeUsed = highestPercentUsedServer.getSizeUsed();
|
||||
highestPercentUsed = highestPercentUsedServer.getPercentUsed();
|
||||
highestPercentUsedServerMaxSize = highestPercentUsedServer.getMaxSize();
|
||||
lowestSizeUsed = lowestPercentUsedServer.getSizeUsed();
|
||||
lowestPercentUsed = lowestPercentUsedServer.getPercentUsed();
|
||||
lowestPercentUsedServerMaxSize = lowestPercentUsedServer.getMaxSize();
|
||||
}
|
||||
|
||||
public void update(long newHighestSizeUsed, long newLowestSizedUsed)
|
||||
{
|
||||
highestSizeUsed = newHighestSizeUsed;
|
||||
highestPercentUsed = highestSizeUsed.doubleValue() / highestPercentUsedServerMaxSize;
|
||||
lowestSizeUsed = newLowestSizedUsed;
|
||||
lowestPercentUsed = lowestSizeUsed.doubleValue() / lowestPercentUsedServerMaxSize;
|
||||
}
|
||||
|
||||
public double getPercentDiff()
|
||||
{
|
||||
return Math.abs(
|
||||
100 * ((highestPercentUsed - lowestPercentUsed)
|
||||
/ ((highestPercentUsed + lowestPercentUsed) / 2))
|
||||
);
|
||||
}
|
||||
|
||||
public double getLookaheadPercentDiff(Long newHighestSizeUsed, Long newLowestSizedUsed)
|
||||
{
|
||||
double newHighestPercentUsed = 100 * (newHighestSizeUsed.doubleValue() / highestPercentUsedServerMaxSize);
|
||||
double newLowestPercentUsed = 100 * (newLowestSizedUsed.doubleValue() / lowestPercentUsedServerMaxSize);
|
||||
|
||||
return Math.abs(
|
||||
100 * ((newHighestPercentUsed - newLowestPercentUsed)
|
||||
/ ((newHighestPercentUsed + newLowestPercentUsed) / 2))
|
||||
);
|
||||
}
|
||||
|
||||
public Set<BalancerSegmentHolder> findSegmentsToMove(DruidServer server)
|
||||
{
|
||||
Set<BalancerSegmentHolder> segmentsToMove = Sets.newHashSet();
|
||||
double currPercentDiff = getPercentDiff();
|
||||
|
||||
if (currPercentDiff < PERCENT_THRESHOLD) {
|
||||
log.info("Cluster usage is balanced.");
|
||||
return segmentsToMove;
|
||||
}
|
||||
|
||||
List<DruidDataSource> dataSources = Lists.newArrayList(server.getDataSources());
|
||||
Collections.shuffle(dataSources);
|
||||
|
||||
for (DruidDataSource dataSource : dataSources) {
|
||||
List<DataSegment> segments = Lists.newArrayList(dataSource.getSegments());
|
||||
Collections.shuffle(segments);
|
||||
|
||||
for (DataSegment segment : segments) {
|
||||
if (segmentsToMove.size() >= MAX_SEGMENTS_TO_MOVE) {
|
||||
return segmentsToMove;
|
||||
}
|
||||
|
||||
if (getLookaheadPercentDiff(highestSizeUsed - segment.getSize(), lowestSizeUsed + segment.getSize())
|
||||
< currPercentDiff) {
|
||||
segmentsToMove.add(new BalancerSegmentHolder(server, segment));
|
||||
update(highestSizeUsed - segment.getSize(), lowestSizeUsed + segment.getSize());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return segmentsToMove;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -25,20 +25,19 @@ import com.google.common.collect.Sets;
|
|||
import com.metamx.common.Pair;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.client.DruidDataSource;
|
||||
import com.metamx.druid.client.DruidServer;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
||||
|
||||
/**
|
||||
* The BalancerCostAnalyzer
|
||||
* The BalancerCostAnalyzer will compute the total initial cost of the cluster, with costs defined in
|
||||
* computeJointSegmentCosts. It will then propose to move randomly chosen segments from their respective
|
||||
* initial servers to other servers, chosen greedily to minimize the cost of the cluster.
|
||||
*/
|
||||
public class BalancerCostAnalyzer
|
||||
{
|
||||
|
@ -52,25 +51,30 @@ public class BalancerCostAnalyzer
|
|||
private double initialTotalCost;
|
||||
private double totalCostChange;
|
||||
|
||||
public BalancerCostAnalyzer(){
|
||||
public BalancerCostAnalyzer()
|
||||
{
|
||||
rand = new Random(0);
|
||||
totalCostChange = 0;
|
||||
}
|
||||
|
||||
public void init(List<ServerHolder> serverHolderList){
|
||||
public void init(List<ServerHolder> serverHolderList)
|
||||
{
|
||||
this.initialTotalCost = calculateInitialTotalCost(serverHolderList);
|
||||
this.serverHolderList = serverHolderList;
|
||||
}
|
||||
|
||||
public double getInitialTotalCost() {
|
||||
public double getInitialTotalCost()
|
||||
{
|
||||
return initialTotalCost;
|
||||
}
|
||||
|
||||
public double getTotalCostChange() {
|
||||
public double getTotalCostChange()
|
||||
{
|
||||
return totalCostChange;
|
||||
}
|
||||
|
||||
private double calculateInitialTotalCost(List<ServerHolder> serverHolderList){
|
||||
private double calculateInitialTotalCost(List<ServerHolder> serverHolderList)
|
||||
{
|
||||
double cost = 0;
|
||||
for (ServerHolder server : serverHolderList) {
|
||||
DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{});
|
||||
|
@ -83,7 +87,8 @@ public class BalancerCostAnalyzer
|
|||
return cost;
|
||||
}
|
||||
|
||||
public double computeJointSegmentCosts(DataSegment segment1, DataSegment segment2){
|
||||
public double computeJointSegmentCosts(DataSegment segment1, DataSegment segment2)
|
||||
{
|
||||
double cost = 0;
|
||||
Interval gap = segment1.getInterval().gap(segment2.getInterval());
|
||||
|
||||
|
@ -92,32 +97,88 @@ public class BalancerCostAnalyzer
|
|||
cost += 1f;
|
||||
} else {
|
||||
long gapMillis = gap.toDurationMillis();
|
||||
if (gapMillis < DAY_IN_MILLIS) cost += 1f;
|
||||
if (gapMillis < DAY_IN_MILLIS) {
|
||||
cost += 1f;
|
||||
}
|
||||
}
|
||||
|
||||
if(segment1.getDataSource().equals(segment2.getDataSource())) cost += 1f;
|
||||
if (segment1.getDataSource().equals(segment2.getDataSource())) {
|
||||
cost += 1f;
|
||||
}
|
||||
|
||||
return cost;
|
||||
}
|
||||
|
||||
public Set<BalancerSegmentHolder2> findSegmentsToMove()
|
||||
public class NullServerHolder extends ServerHolder
|
||||
{
|
||||
Set<BalancerSegmentHolder2> segmentHoldersToMove = Sets.newHashSet();
|
||||
Set<DataSegment> movingSegments = Sets.newHashSet();
|
||||
public NullServerHolder()
|
||||
{
|
||||
super(null, null);
|
||||
}
|
||||
|
||||
int counter = 0;
|
||||
double currCost = 0;
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
while (segmentHoldersToMove.size() < MAX_SEGMENTS_TO_MOVE && counter < 3 * MAX_SEGMENTS_TO_MOVE) {
|
||||
counter++;
|
||||
ServerHolder fromServerHolder = serverHolderList.get(rand.nextInt(serverHolderList.size()));
|
||||
List<DataSegment> segments = Lists.newArrayList(fromServerHolder.getServer().getSegments().values());
|
||||
if (segments.size() == 0) continue;
|
||||
DataSegment proposalSegment = segments.get(rand.nextInt(segments.size()));
|
||||
if (movingSegments.contains(proposalSegment)) continue;
|
||||
public class BalancerCostAnalyzerHelper
|
||||
{
|
||||
private MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsServerHolderPairs;
|
||||
private List<ServerHolder> serverHolderList;
|
||||
private DataSegment proposalSegment;
|
||||
private ServerHolder fromServerHolder;
|
||||
private Set<BalancerSegmentHolder> segmentHoldersToMove;
|
||||
private double currCost;
|
||||
|
||||
public MinMaxPriorityQueue<Pair<Double, ServerHolder>> getCostsServerHolderPairs()
|
||||
{
|
||||
return costsServerHolderPairs;
|
||||
}
|
||||
|
||||
public List<ServerHolder> getServerHolderList()
|
||||
{
|
||||
return serverHolderList;
|
||||
}
|
||||
|
||||
public DataSegment getProposalSegment()
|
||||
{
|
||||
return proposalSegment;
|
||||
}
|
||||
|
||||
public ServerHolder getFromServerHolder()
|
||||
{
|
||||
return fromServerHolder;
|
||||
}
|
||||
|
||||
public Set<BalancerSegmentHolder> getSegmentHoldersToMove()
|
||||
{
|
||||
return segmentHoldersToMove;
|
||||
}
|
||||
|
||||
public double getCurrCost()
|
||||
{
|
||||
return currCost;
|
||||
}
|
||||
|
||||
public BalancerCostAnalyzerHelper(
|
||||
List<ServerHolder> serverHolderList,
|
||||
DataSegment proposalSegment
|
||||
)
|
||||
{
|
||||
this(serverHolderList, proposalSegment, new NullServerHolder(), Sets.<BalancerSegmentHolder>newHashSet());
|
||||
}
|
||||
|
||||
public BalancerCostAnalyzerHelper(
|
||||
List<ServerHolder> serverHolderList,
|
||||
DataSegment proposalSegment,
|
||||
ServerHolder fromServerHolder,
|
||||
Set<BalancerSegmentHolder> segmentHoldersToMove
|
||||
)
|
||||
{
|
||||
// Just need a regular priority queue for the min. element.
|
||||
MinMaxPriorityQueue<Pair<Double, ServerHolder>> pQueue = MinMaxPriorityQueue.orderedBy(
|
||||
this.costsServerHolderPairs = MinMaxPriorityQueue.orderedBy(
|
||||
new Comparator<Pair<Double, ServerHolder>>()
|
||||
{
|
||||
@Override
|
||||
|
@ -130,7 +191,15 @@ public class BalancerCostAnalyzer
|
|||
}
|
||||
}
|
||||
).create();
|
||||
this.serverHolderList = serverHolderList;
|
||||
this.proposalSegment = proposalSegment;
|
||||
this.fromServerHolder = fromServerHolder;
|
||||
this.segmentHoldersToMove = segmentHoldersToMove;
|
||||
this.currCost = 0;
|
||||
}
|
||||
|
||||
public void computeAllCosts()
|
||||
{
|
||||
for (ServerHolder server : serverHolderList) {
|
||||
double cost = 0f;
|
||||
for (DataSegment segment : server.getServer().getSegments().values()) {
|
||||
|
@ -138,12 +207,14 @@ public class BalancerCostAnalyzer
|
|||
}
|
||||
|
||||
// self cost
|
||||
if (!server.getServer().equals(fromServerHolder.getServer())) cost += computeJointSegmentCosts(proposalSegment, proposalSegment);
|
||||
if (!server.getServer().equals(fromServerHolder.getServer())) {
|
||||
cost += computeJointSegmentCosts(proposalSegment, proposalSegment);
|
||||
}
|
||||
|
||||
// Take into account costs of segments that will be moved.
|
||||
Iterator it = segmentHoldersToMove.iterator();
|
||||
while (it.hasNext()) {
|
||||
BalancerSegmentHolder2 segmentToMove = (BalancerSegmentHolder2) it.next();
|
||||
BalancerSegmentHolder segmentToMove = (BalancerSegmentHolder) it.next();
|
||||
if (server.getServer().equals(segmentToMove.getToServer())) {
|
||||
cost += computeJointSegmentCosts(proposalSegment, segmentToMove.getSegment());
|
||||
}
|
||||
|
@ -152,24 +223,58 @@ public class BalancerCostAnalyzer
|
|||
}
|
||||
}
|
||||
|
||||
if (server.getServer().equals(fromServerHolder.getServer())){
|
||||
if (server.getServer().equals(fromServerHolder.getServer())) {
|
||||
currCost = cost;
|
||||
}
|
||||
|
||||
pQueue.add(Pair.of(cost, server));
|
||||
if (proposalSegment.getSize() < server.getAvailableSize()) {
|
||||
costsServerHolderPairs.add(Pair.of(cost, server));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public Set<BalancerSegmentHolder> findSegmentsToMove()
|
||||
{
|
||||
Set<BalancerSegmentHolder> segmentHoldersToMove = Sets.newHashSet();
|
||||
Set<DataSegment> movingSegments = Sets.newHashSet();
|
||||
|
||||
int counter = 0;
|
||||
|
||||
while (segmentHoldersToMove.size() < MAX_SEGMENTS_TO_MOVE && counter < 3 * MAX_SEGMENTS_TO_MOVE) {
|
||||
counter++;
|
||||
ServerHolder fromServerHolder = serverHolderList.get(rand.nextInt(serverHolderList.size()));
|
||||
List<DataSegment> segments = Lists.newArrayList(fromServerHolder.getServer().getSegments().values());
|
||||
if (segments.size() == 0) {
|
||||
continue;
|
||||
}
|
||||
DataSegment proposalSegment = segments.get(rand.nextInt(segments.size()));
|
||||
if (movingSegments.contains(proposalSegment)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Pair<Double, ServerHolder> minPair = pQueue.peekFirst();
|
||||
if (!minPair.rhs.equals(fromServerHolder)) {
|
||||
BalancerCostAnalyzerHelper helper = new BalancerCostAnalyzerHelper(
|
||||
serverHolderList,
|
||||
proposalSegment,
|
||||
fromServerHolder,
|
||||
segmentHoldersToMove
|
||||
);
|
||||
helper.computeAllCosts();
|
||||
|
||||
Pair<Double, ServerHolder> minPair = helper.getCostsServerHolderPairs().pollFirst();
|
||||
|
||||
if (minPair.rhs != null && !minPair.rhs.equals(fromServerHolder)) {
|
||||
movingSegments.add(proposalSegment);
|
||||
segmentHoldersToMove.add(
|
||||
new BalancerSegmentHolder2(
|
||||
new BalancerSegmentHolder(
|
||||
fromServerHolder.getServer(),
|
||||
minPair.rhs.getServer(),
|
||||
proposalSegment
|
||||
)
|
||||
);
|
||||
totalCostChange += currCost - minPair.lhs;
|
||||
totalCostChange += helper.getCurrCost() - minPair.lhs;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -26,23 +26,31 @@ import com.metamx.druid.client.DruidServer;
|
|||
*/
|
||||
public class BalancerSegmentHolder
|
||||
{
|
||||
private final DruidServer server;
|
||||
private final DruidServer fromServer;
|
||||
private final DruidServer toServer;
|
||||
private final DataSegment segment;
|
||||
|
||||
private volatile int lifetime = 15;
|
||||
|
||||
public BalancerSegmentHolder(
|
||||
DruidServer server,
|
||||
DruidServer fromServer,
|
||||
DruidServer toServer,
|
||||
DataSegment segment
|
||||
)
|
||||
{
|
||||
this.server = server;
|
||||
this.fromServer = fromServer;
|
||||
this.toServer = toServer;
|
||||
this.segment = segment;
|
||||
}
|
||||
|
||||
public DruidServer getServer()
|
||||
public DruidServer getFromServer()
|
||||
{
|
||||
return server;
|
||||
return fromServer;
|
||||
}
|
||||
|
||||
public DruidServer getToServer()
|
||||
{
|
||||
return toServer;
|
||||
}
|
||||
|
||||
public DataSegment getSegment()
|
||||
|
|
|
@ -1,70 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.master;
|
||||
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.client.DruidServer;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class BalancerSegmentHolder2
|
||||
{
|
||||
private final DruidServer fromServer;
|
||||
private final DruidServer toServer;
|
||||
private final DataSegment segment;
|
||||
|
||||
private volatile int lifetime = 15;
|
||||
|
||||
public BalancerSegmentHolder2(
|
||||
DruidServer fromServer,
|
||||
DruidServer toServer,
|
||||
DataSegment segment
|
||||
)
|
||||
{
|
||||
this.fromServer = fromServer;
|
||||
this.toServer = toServer;
|
||||
this.segment = segment;
|
||||
}
|
||||
|
||||
public DruidServer getFromServer()
|
||||
{
|
||||
return fromServer;
|
||||
}
|
||||
|
||||
public DruidServer getToServer()
|
||||
{
|
||||
return toServer;
|
||||
}
|
||||
|
||||
public DataSegment getSegment()
|
||||
{
|
||||
return segment;
|
||||
}
|
||||
|
||||
public int getLifetime()
|
||||
{
|
||||
return lifetime;
|
||||
}
|
||||
|
||||
public void reduceLifetime()
|
||||
{
|
||||
lifetime--;
|
||||
}
|
||||
}
|
|
@ -20,25 +20,19 @@
|
|||
package com.metamx.druid.master;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.MinMaxPriorityQueue;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.common.guava.Comparators;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.client.DruidServer;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
|
@ -60,7 +54,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
|||
private final BalancerCostAnalyzer analyzer;
|
||||
private final DruidMaster master;
|
||||
|
||||
private final Map<String, ConcurrentHashMap<String, BalancerSegmentHolder2>> currentlyMovingSegments = Maps.newHashMap();
|
||||
private final Map<String, ConcurrentHashMap<String, BalancerSegmentHolder>> currentlyMovingSegments = Maps.newHashMap();
|
||||
|
||||
public DruidMasterBalancer(
|
||||
DruidMaster master,
|
||||
|
@ -73,7 +67,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
|||
|
||||
private void reduceLifetimes(String tier)
|
||||
{
|
||||
for (BalancerSegmentHolder2 holder : currentlyMovingSegments.get(tier).values()) {
|
||||
for (BalancerSegmentHolder holder : currentlyMovingSegments.get(tier).values()) {
|
||||
holder.reduceLifetime();
|
||||
if (holder.getLifetime() <= 0) {
|
||||
log.makeAlert(
|
||||
|
@ -97,7 +91,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
|||
String tier = entry.getKey();
|
||||
|
||||
if (currentlyMovingSegments.get(tier) == null) {
|
||||
currentlyMovingSegments.put(tier, new ConcurrentHashMap<String, BalancerSegmentHolder2>());
|
||||
currentlyMovingSegments.put(tier, new ConcurrentHashMap<String, BalancerSegmentHolder>());
|
||||
}
|
||||
|
||||
if (!currentlyMovingSegments.get(tier).isEmpty()) {
|
||||
|
@ -110,9 +104,6 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
|||
continue;
|
||||
}
|
||||
|
||||
//TreeSet<ServerHolder> serversByPercentUsed = Sets.newTreeSet(percentUsedComparator);
|
||||
//serversByPercentUsed.addAll(entry.getValue());
|
||||
|
||||
List<ServerHolder> serverHolderList = new ArrayList<ServerHolder>(entry.getValue());
|
||||
|
||||
analyzer.init(serverHolderList);
|
||||
|
@ -136,44 +127,6 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
|||
continue;
|
||||
}
|
||||
|
||||
/*
|
||||
ServerHolder highestPercentUsedServer = serversByPercentUsed.first();
|
||||
ServerHolder lowestPercentUsedServer = serversByPercentUsed.last();
|
||||
|
||||
|
||||
analyzer.init(highestPercentUsedServer, lowestPercentUsedServer);
|
||||
|
||||
log.info(
|
||||
"[%s]: Percent difference in percent size used between highest/lowest servers: %s%%",
|
||||
tier,
|
||||
analyzer.getPercentDiff()
|
||||
);
|
||||
|
||||
log.info(
|
||||
"[%s]: Highest percent used [%s]: size used[%s], percent used[%s%%]",
|
||||
tier,
|
||||
highestPercentUsedServer.getServer().getName(),
|
||||
highestPercentUsedServer.getSizeUsed(),
|
||||
highestPercentUsedServer.getPercentUsed()
|
||||
);
|
||||
|
||||
log.info(
|
||||
"[%s]: Lowest percent used [%s]: size used[%s], percent used[%s%%]",
|
||||
tier,
|
||||
lowestPercentUsedServer.getServer().getName(),
|
||||
lowestPercentUsedServer.getSizeUsed(),
|
||||
lowestPercentUsedServer.getPercentUsed()
|
||||
);
|
||||
|
||||
// Use the analyzer to find segments to move and then move them
|
||||
moveSegments(
|
||||
lowestPercentUsedServer.getServer(),
|
||||
analyzer.findSegmentsToMove(highestPercentUsedServer.getServer()),
|
||||
params
|
||||
);
|
||||
*/
|
||||
|
||||
|
||||
stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size());
|
||||
}
|
||||
|
||||
|
@ -183,12 +136,12 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
|||
}
|
||||
|
||||
private void moveSegments(
|
||||
final Set<BalancerSegmentHolder2> segments,
|
||||
final Set<BalancerSegmentHolder> segments,
|
||||
final DruidMasterRuntimeParams params
|
||||
)
|
||||
{
|
||||
|
||||
for (final BalancerSegmentHolder2 segment : Sets.newHashSet(segments)) {
|
||||
for (final BalancerSegmentHolder segment : Sets.newHashSet(segments)) {
|
||||
final DruidServer toServer = segment.getToServer();
|
||||
final String toServerName = segment.getToServer().getName();
|
||||
LoadQueuePeon toPeon = params.getLoadManagementPeons().get(toServerName);
|
||||
|
@ -216,7 +169,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
|
|||
@Override
|
||||
protected void execute()
|
||||
{
|
||||
Map<String, BalancerSegmentHolder2> movingSegments = currentlyMovingSegments.get(toServer.getTier());
|
||||
Map<String, BalancerSegmentHolder> movingSegments = currentlyMovingSegments.get(toServer.getTier());
|
||||
if (movingSegments != null) {
|
||||
movingSegments.remove(segmentName);
|
||||
}
|
||||
|
|
|
@ -22,7 +22,9 @@ package com.metamx.druid.master.rules;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.MinMaxPriorityQueue;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.master.BalancerCostAnalyzer;
|
||||
import com.metamx.druid.master.DruidMaster;
|
||||
import com.metamx.druid.master.DruidMasterRuntimeParams;
|
||||
import com.metamx.druid.master.LoadPeonCallback;
|
||||
|
@ -75,7 +77,12 @@ public abstract class LoadRule implements Rule
|
|||
|
||||
List<ServerHolder> assignedServers = Lists.newArrayList();
|
||||
while (totalReplicants < expectedReplicants) {
|
||||
ServerHolder holder = serverQueue.pollFirst();
|
||||
BalancerCostAnalyzer analyzer = new BalancerCostAnalyzer();
|
||||
BalancerCostAnalyzer.BalancerCostAnalyzerHelper helper = analyzer.new BalancerCostAnalyzerHelper(serverHolderList, segment);
|
||||
helper.computeAllCosts();
|
||||
Pair<Double, ServerHolder> minPair = helper.getCostsServerHolderPairs().pollFirst();
|
||||
|
||||
ServerHolder holder = minPair.rhs;
|
||||
if (holder == null) {
|
||||
log.warn(
|
||||
"Not enough %s servers[%d] to assign segment[%s]! Expected Replicants[%d]",
|
||||
|
|
|
@ -1,131 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.master;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.client.DruidDataSource;
|
||||
import com.metamx.druid.client.DruidServer;
|
||||
import junit.framework.Assert;
|
||||
import org.easymock.EasyMock;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class BalancerAnalyzerTest
|
||||
{
|
||||
private ServerHolder high;
|
||||
private ServerHolder low;
|
||||
private DruidServer server;
|
||||
private DruidDataSource dataSource;
|
||||
private DataSegment segment;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception
|
||||
{
|
||||
high = EasyMock.createMock(ServerHolder.class);
|
||||
low = EasyMock.createMock(ServerHolder.class);
|
||||
server = EasyMock.createMock(DruidServer.class);
|
||||
dataSource = EasyMock.createMock(DruidDataSource.class);
|
||||
segment = EasyMock.createMock(DataSegment.class);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception
|
||||
{
|
||||
EasyMock.verify(high);
|
||||
EasyMock.verify(low);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetPercentDifference()
|
||||
{
|
||||
EasyMock.expect(high.getSizeUsed()).andReturn(6L);
|
||||
EasyMock.expect(high.getPercentUsed()).andReturn(60.0);
|
||||
EasyMock.expect(high.getMaxSize()).andReturn(10L);
|
||||
EasyMock.replay(high);
|
||||
|
||||
EasyMock.expect(low.getSizeUsed()).andReturn(2L);
|
||||
EasyMock.expect(low.getPercentUsed()).andReturn(20.0);
|
||||
EasyMock.expect(low.getMaxSize()).andReturn(10L);
|
||||
EasyMock.replay(low);
|
||||
|
||||
BalancerAnalyzer analyzer = new BalancerAnalyzer();
|
||||
|
||||
analyzer.init(high, low);
|
||||
Assert.assertEquals(100.0, analyzer.getPercentDiff());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetLookaheadPercentDifference()
|
||||
{
|
||||
EasyMock.expect(high.getSizeUsed()).andReturn(2L);
|
||||
EasyMock.expect(high.getPercentUsed()).andReturn(20.0);
|
||||
EasyMock.expect(high.getMaxSize()).andReturn(10L);
|
||||
EasyMock.replay(high);
|
||||
|
||||
EasyMock.expect(low.getSizeUsed()).andReturn(1L);
|
||||
EasyMock.expect(low.getPercentUsed()).andReturn(10.0);
|
||||
EasyMock.expect(low.getMaxSize()).andReturn(10L);
|
||||
EasyMock.replay(low);
|
||||
|
||||
BalancerAnalyzer analyzer = new BalancerAnalyzer();
|
||||
|
||||
analyzer.init(high, low);
|
||||
Assert.assertEquals(100.0, analyzer.getLookaheadPercentDiff(2L, 6L));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindSegmentsToMove()
|
||||
{
|
||||
EasyMock.expect(high.getSizeUsed()).andReturn(6L);
|
||||
EasyMock.expect(high.getPercentUsed()).andReturn(60.0);
|
||||
EasyMock.expect(high.getMaxSize()).andReturn(10L);
|
||||
EasyMock.replay(high);
|
||||
|
||||
EasyMock.expect(low.getSizeUsed()).andReturn(2L);
|
||||
EasyMock.expect(low.getPercentUsed()).andReturn(20.0);
|
||||
EasyMock.expect(low.getMaxSize()).andReturn(10L);
|
||||
EasyMock.replay(low);
|
||||
|
||||
EasyMock.expect(segment.getSize()).andReturn(1L).atLeastOnce();
|
||||
EasyMock.replay(segment);
|
||||
|
||||
EasyMock.expect(dataSource.getSegments()).andReturn(Sets.newHashSet(segment));
|
||||
EasyMock.replay(dataSource);
|
||||
|
||||
EasyMock.expect(server.getDataSources()).andReturn(Lists.newArrayList(dataSource));
|
||||
EasyMock.replay(server);
|
||||
|
||||
BalancerAnalyzer analyzer = new BalancerAnalyzer();
|
||||
|
||||
analyzer.init(high, low);
|
||||
|
||||
Assert.assertEquals(analyzer.findSegmentsToMove(server).iterator().next().getSegment(), segment);
|
||||
|
||||
EasyMock.verify(server);
|
||||
EasyMock.verify(dataSource);
|
||||
EasyMock.verify(segment);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue