svn merge -c 1417130 from trunk for HDFS-4234. Use generic code for choosing datanode in Balancer.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1488850 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
16995501d3
commit
9b8d329e65
|
@ -128,6 +128,8 @@ Release 2.1.0-beta - UNRELEASED
|
|||
HDFS-4698. Provide client-side metrics for remote reads, local reads, and
|
||||
short-circuit reads. (Colin Patrick McCabe via atm)
|
||||
|
||||
HDFS-4234. Use generic code for choosing datanode in Balancer. (szetszwo)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -76,6 +76,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocat
|
|||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.net.NetworkTopology;
|
||||
import org.apache.hadoop.net.Node;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
@ -558,7 +559,7 @@ public class Balancer {
|
|||
}
|
||||
|
||||
/** Decide if still need to move more bytes */
|
||||
protected boolean isMoveQuotaFull() {
|
||||
protected boolean hasSpaceForScheduling() {
|
||||
return scheduledSize<maxSize2Move;
|
||||
}
|
||||
|
||||
|
@ -923,23 +924,53 @@ public class Balancer {
|
|||
LOG.info(nodes.size() + " " + name + ": " + nodes);
|
||||
}
|
||||
|
||||
/* Decide all <source, target> pairs and
|
||||
/** A matcher interface for matching nodes. */
|
||||
private interface Matcher {
|
||||
/** Given the cluster topology, does the left node match the right node? */
|
||||
boolean match(NetworkTopology cluster, Node left, Node right);
|
||||
}
|
||||
|
||||
/** Match datanodes in the same node group. */
|
||||
static final Matcher SAME_NODE_GROUP = new Matcher() {
|
||||
@Override
|
||||
public boolean match(NetworkTopology cluster, Node left, Node right) {
|
||||
return cluster.isOnSameNodeGroup(left, right);
|
||||
}
|
||||
};
|
||||
|
||||
/** Match datanodes in the same rack. */
|
||||
static final Matcher SAME_RACK = new Matcher() {
|
||||
@Override
|
||||
public boolean match(NetworkTopology cluster, Node left, Node right) {
|
||||
return cluster.isOnSameRack(left, right);
|
||||
}
|
||||
};
|
||||
|
||||
/** Match any datanode with any other datanode. */
|
||||
static final Matcher ANY_OTHER = new Matcher() {
|
||||
@Override
|
||||
public boolean match(NetworkTopology cluster, Node left, Node right) {
|
||||
return left != right;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Decide all <source, target> pairs and
|
||||
* the number of bytes to move from a source to a target
|
||||
* Maximum bytes to be moved per node is
|
||||
* Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE).
|
||||
* Return total number of bytes to move in this iteration
|
||||
*/
|
||||
private long chooseNodes() {
|
||||
// First, match nodes on the same node group if cluster has nodegroup
|
||||
// awareness
|
||||
// First, match nodes on the same node group if cluster is node group aware
|
||||
if (cluster.isNodeGroupAware()) {
|
||||
chooseNodesOnSameNodeGroup();
|
||||
chooseNodes(SAME_NODE_GROUP);
|
||||
}
|
||||
|
||||
// Then, match nodes on the same rack
|
||||
chooseNodes(true);
|
||||
// At last, match nodes on different racks
|
||||
chooseNodes(false);
|
||||
chooseNodes(SAME_RACK);
|
||||
// At last, match all remaining nodes
|
||||
chooseNodes(ANY_OTHER);
|
||||
|
||||
assert (datanodes.size() >= sources.size()+targets.size())
|
||||
: "Mismatched number of datanodes (" +
|
||||
|
@ -953,57 +984,55 @@ public class Balancer {
|
|||
}
|
||||
return bytesToMove;
|
||||
}
|
||||
|
||||
/**
|
||||
* Decide all <source, target> pairs where source and target are
|
||||
* on the same NodeGroup
|
||||
*/
|
||||
private void chooseNodesOnSameNodeGroup() {
|
||||
|
||||
/** Decide all <source, target> pairs according to the matcher. */
|
||||
private void chooseNodes(final Matcher matcher) {
|
||||
/* first step: match each overUtilized datanode (source) to
|
||||
* one or more underUtilized datanodes within same NodeGroup(targets).
|
||||
* one or more underUtilized datanodes (targets).
|
||||
*/
|
||||
chooseOnSameNodeGroup(overUtilizedDatanodes, underUtilizedDatanodes);
|
||||
|
||||
/* match each remaining overutilized datanode (source) to below average
|
||||
* utilized datanodes within the same NodeGroup(targets).
|
||||
chooseDatanodes(overUtilizedDatanodes, underUtilizedDatanodes, matcher);
|
||||
|
||||
/* match each remaining overutilized datanode (source) to
|
||||
* below average utilized datanodes (targets).
|
||||
* Note only overutilized datanodes that haven't had that max bytes to move
|
||||
* satisfied in step 1 are selected
|
||||
*/
|
||||
chooseOnSameNodeGroup(overUtilizedDatanodes, belowAvgUtilizedDatanodes);
|
||||
chooseDatanodes(overUtilizedDatanodes, belowAvgUtilizedDatanodes, matcher);
|
||||
|
||||
/* match each remaining underutilized datanode to above average utilized
|
||||
* datanodes within the same NodeGroup.
|
||||
/* match each remaining underutilized datanode (target) to
|
||||
* above average utilized datanodes (source).
|
||||
* Note only underutilized datanodes that have not had that max bytes to
|
||||
* move satisfied in step 1 are selected.
|
||||
*/
|
||||
chooseOnSameNodeGroup(underUtilizedDatanodes, aboveAvgUtilizedDatanodes);
|
||||
chooseDatanodes(underUtilizedDatanodes, aboveAvgUtilizedDatanodes, matcher);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Match two sets of nodes within the same NodeGroup, one should be source
|
||||
* nodes (utilization > Avg), and the other should be destination nodes
|
||||
* (utilization < Avg).
|
||||
* @param datanodes
|
||||
* @param candidates
|
||||
* For each datanode, choose matching nodes from the candidates. Either the
|
||||
* datanodes or the candidates are source nodes with (utilization > Avg), and
|
||||
* the others are target nodes with (utilization < Avg).
|
||||
*/
|
||||
private <D extends BalancerDatanode, C extends BalancerDatanode> void
|
||||
chooseOnSameNodeGroup(Collection<D> datanodes, Collection<C> candidates) {
|
||||
chooseDatanodes(Collection<D> datanodes, Collection<C> candidates,
|
||||
Matcher matcher) {
|
||||
for (Iterator<D> i = datanodes.iterator(); i.hasNext();) {
|
||||
final D datanode = i.next();
|
||||
for(; chooseOnSameNodeGroup(datanode, candidates.iterator()); );
|
||||
if (!datanode.isMoveQuotaFull()) {
|
||||
for(; chooseForOneDatanode(datanode, candidates, matcher); );
|
||||
if (!datanode.hasSpaceForScheduling()) {
|
||||
i.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Match one datanode with a set of candidates nodes within the same NodeGroup.
|
||||
* For the given datanode, choose a candidate and then schedule it.
|
||||
* @return true if a candidate is chosen; false if no candidates is chosen.
|
||||
*/
|
||||
private <T extends BalancerDatanode> boolean chooseOnSameNodeGroup(
|
||||
BalancerDatanode dn, Iterator<T> candidates) {
|
||||
final T chosen = chooseCandidateOnSameNodeGroup(dn, candidates);
|
||||
private <C extends BalancerDatanode> boolean chooseForOneDatanode(
|
||||
BalancerDatanode dn, Collection<C> candidates, Matcher matcher) {
|
||||
final Iterator<C> i = candidates.iterator();
|
||||
final C chosen = chooseCandidate(dn, i, matcher);
|
||||
|
||||
if (chosen == null) {
|
||||
return false;
|
||||
}
|
||||
|
@ -1012,8 +1041,8 @@ public class Balancer {
|
|||
} else {
|
||||
matchSourceWithTargetToMove((Source)chosen, dn);
|
||||
}
|
||||
if (!chosen.isMoveQuotaFull()) {
|
||||
candidates.remove();
|
||||
if (!chosen.hasSpaceForScheduling()) {
|
||||
i.remove();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -1030,19 +1059,15 @@ public class Balancer {
|
|||
+source.datanode.getName() + " to " + target.datanode.getName());
|
||||
}
|
||||
|
||||
/** choose a datanode from <code>candidates</code> within the same NodeGroup
|
||||
* of <code>dn</code>.
|
||||
*/
|
||||
private <T extends BalancerDatanode> T chooseCandidateOnSameNodeGroup(
|
||||
BalancerDatanode dn, Iterator<T> candidates) {
|
||||
if (dn.isMoveQuotaFull()) {
|
||||
/** Choose a candidate for the given datanode. */
|
||||
private <D extends BalancerDatanode, C extends BalancerDatanode>
|
||||
C chooseCandidate(D dn, Iterator<C> candidates, Matcher matcher) {
|
||||
if (dn.hasSpaceForScheduling()) {
|
||||
for(; candidates.hasNext(); ) {
|
||||
final T c = candidates.next();
|
||||
if (!c.isMoveQuotaFull()) {
|
||||
final C c = candidates.next();
|
||||
if (!c.hasSpaceForScheduling()) {
|
||||
candidates.remove();
|
||||
continue;
|
||||
}
|
||||
if (cluster.isOnSameNodeGroup(dn.getDatanode(), c.getDatanode())) {
|
||||
} else if (matcher.match(cluster, dn.getDatanode(), c.getDatanode())) {
|
||||
return c;
|
||||
}
|
||||
}
|
||||
|
@ -1050,148 +1075,6 @@ public class Balancer {
|
|||
return null;
|
||||
}
|
||||
|
||||
/* if onRack is true, decide all <source, target> pairs
|
||||
* where source and target are on the same rack; Otherwise
|
||||
* decide all <source, target> pairs where source and target are
|
||||
* on different racks
|
||||
*/
|
||||
private void chooseNodes(boolean onRack) {
|
||||
/* first step: match each overUtilized datanode (source) to
|
||||
* one or more underUtilized datanodes (targets).
|
||||
*/
|
||||
chooseTargets(underUtilizedDatanodes, onRack);
|
||||
|
||||
/* match each remaining overutilized datanode (source) to
|
||||
* below average utilized datanodes (targets).
|
||||
* Note only overutilized datanodes that haven't had that max bytes to move
|
||||
* satisfied in step 1 are selected
|
||||
*/
|
||||
chooseTargets(belowAvgUtilizedDatanodes, onRack);
|
||||
|
||||
/* match each remaining underutilized datanode (target) to
|
||||
* above average utilized datanodes (source).
|
||||
* Note only underutilized datanodes that have not had that max bytes to
|
||||
* move satisfied in step 1 are selected.
|
||||
*/
|
||||
chooseSources(aboveAvgUtilizedDatanodes, onRack);
|
||||
}
|
||||
|
||||
/* choose targets from the target candidate list for each over utilized
|
||||
* source datanode. OnRackTarget determines if the chosen target
|
||||
* should be on the same rack as the source
|
||||
*/
|
||||
private void chooseTargets(
|
||||
Collection<BalancerDatanode> targetCandidates, boolean onRackTarget ) {
|
||||
for (Iterator<Source> srcIterator = overUtilizedDatanodes.iterator();
|
||||
srcIterator.hasNext();) {
|
||||
Source source = srcIterator.next();
|
||||
while (chooseTarget(source, targetCandidates.iterator(), onRackTarget)) {
|
||||
}
|
||||
if (!source.isMoveQuotaFull()) {
|
||||
srcIterator.remove();
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
/* choose sources from the source candidate list for each under utilized
|
||||
* target datanode. onRackSource determines if the chosen source
|
||||
* should be on the same rack as the target
|
||||
*/
|
||||
private void chooseSources(
|
||||
Collection<Source> sourceCandidates, boolean onRackSource) {
|
||||
for (Iterator<BalancerDatanode> targetIterator =
|
||||
underUtilizedDatanodes.iterator(); targetIterator.hasNext();) {
|
||||
BalancerDatanode target = targetIterator.next();
|
||||
while (chooseSource(target, sourceCandidates.iterator(), onRackSource)) {
|
||||
}
|
||||
if (!target.isMoveQuotaFull()) {
|
||||
targetIterator.remove();
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
/* For the given source, choose targets from the target candidate list.
|
||||
* OnRackTarget determines if the chosen target
|
||||
* should be on the same rack as the source
|
||||
*/
|
||||
private boolean chooseTarget(Source source,
|
||||
Iterator<BalancerDatanode> targetCandidates, boolean onRackTarget) {
|
||||
if (!source.isMoveQuotaFull()) {
|
||||
return false;
|
||||
}
|
||||
boolean foundTarget = false;
|
||||
BalancerDatanode target = null;
|
||||
while (!foundTarget && targetCandidates.hasNext()) {
|
||||
target = targetCandidates.next();
|
||||
if (!target.isMoveQuotaFull()) {
|
||||
targetCandidates.remove();
|
||||
continue;
|
||||
}
|
||||
if (onRackTarget) {
|
||||
// choose from on-rack nodes
|
||||
if (cluster.isOnSameRack(source.datanode, target.datanode)) {
|
||||
foundTarget = true;
|
||||
}
|
||||
} else {
|
||||
// choose from off-rack nodes
|
||||
if (!cluster.isOnSameRack(source.datanode, target.datanode)) {
|
||||
foundTarget = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (foundTarget) {
|
||||
assert(target != null):"Choose a null target";
|
||||
matchSourceWithTargetToMove(source, target);
|
||||
if (!target.isMoveQuotaFull()) {
|
||||
targetCandidates.remove();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/* For the given target, choose sources from the source candidate list.
|
||||
* OnRackSource determines if the chosen source
|
||||
* should be on the same rack as the target
|
||||
*/
|
||||
private boolean chooseSource(BalancerDatanode target,
|
||||
Iterator<Source> sourceCandidates, boolean onRackSource) {
|
||||
if (!target.isMoveQuotaFull()) {
|
||||
return false;
|
||||
}
|
||||
boolean foundSource = false;
|
||||
Source source = null;
|
||||
while (!foundSource && sourceCandidates.hasNext()) {
|
||||
source = sourceCandidates.next();
|
||||
if (!source.isMoveQuotaFull()) {
|
||||
sourceCandidates.remove();
|
||||
continue;
|
||||
}
|
||||
if (onRackSource) {
|
||||
// choose from on-rack nodes
|
||||
if ( cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) {
|
||||
foundSource = true;
|
||||
}
|
||||
} else {
|
||||
// choose from off-rack nodes
|
||||
if (!cluster.isOnSameRack(source.datanode, target.datanode)) {
|
||||
foundSource = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (foundSource) {
|
||||
assert(source != null):"Choose a null source";
|
||||
matchSourceWithTargetToMove(source, target);
|
||||
if ( !source.isMoveQuotaFull()) {
|
||||
sourceCandidates.remove();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static class BytesMoved {
|
||||
private long bytesMoved = 0L;;
|
||||
private synchronized void inc( long bytes ) {
|
||||
|
|
|
@ -17,13 +17,15 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.balancer;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -43,7 +45,7 @@ import org.junit.Test;
|
|||
/**
|
||||
* This class tests if a balancer schedules tasks correctly.
|
||||
*/
|
||||
public class TestBalancerWithNodeGroup extends TestCase {
|
||||
public class TestBalancerWithNodeGroup {
|
||||
private static final Log LOG = LogFactory.getLog(
|
||||
"org.apache.hadoop.hdfs.TestBalancerWithNodeGroup");
|
||||
|
||||
|
|
Loading…
Reference in New Issue