HDFS-6837. Merge r1617337 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1617338 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2014-08-11 18:02:57 +00:00
parent b7958c5df5
commit f805ac93a9
8 changed files with 189 additions and 176 deletions

View File

@ -129,6 +129,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6828. Separate block replica dispatching from Balancer. (szetszwo via HDFS-6828. Separate block replica dispatching from Balancer. (szetszwo via
jing9) jing9)
HDFS-6837. Code cleanup for Balancer and Dispatcher. (szetszwo via
jing9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang) HDFS-6690. Deduplicate xattr names in memory. (wang)

View File

@ -44,7 +44,8 @@
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.BalancerDatanode; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Source; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Source;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Task; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Task;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Util; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Util;
@ -185,10 +186,10 @@ public class Balancer {
// all data node lists // all data node lists
private final Collection<Source> overUtilized = new LinkedList<Source>(); private final Collection<Source> overUtilized = new LinkedList<Source>();
private final Collection<Source> aboveAvgUtilized = new LinkedList<Source>(); private final Collection<Source> aboveAvgUtilized = new LinkedList<Source>();
private final Collection<BalancerDatanode.StorageGroup> belowAvgUtilized private final Collection<StorageGroup> belowAvgUtilized
= new LinkedList<BalancerDatanode.StorageGroup>(); = new LinkedList<StorageGroup>();
private final Collection<BalancerDatanode.StorageGroup> underUtilized private final Collection<StorageGroup> underUtilized
= new LinkedList<BalancerDatanode.StorageGroup>(); = new LinkedList<StorageGroup>();
/* Check that this Balancer is compatible with the Block Placement Policy /* Check that this Balancer is compatible with the Block Placement Policy
* used by the Namenode. * used by the Namenode.
@ -210,8 +211,22 @@ private static void checkReplicationPolicyCompatibility(Configuration conf
* when connection fails. * when connection fails.
*/ */
Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) { Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
final long movedWinWidth = conf.getLong(
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
final int moverThreads = conf.getInt(
DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY,
DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT);
final int dispatcherThreads = conf.getInt(
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT);
final int maxConcurrentMovesPerNode = conf.getInt(
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded, this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded,
p.nodesToBeExcluded, conf); p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads,
maxConcurrentMovesPerNode, conf);
this.threshold = p.threshold; this.threshold = p.threshold;
this.policy = p.policy; this.policy = p.policy;
} }
@ -256,7 +271,7 @@ private long init(List<DatanodeStorageReport> reports) {
// over-utilized, above-average, below-average and under-utilized. // over-utilized, above-average, below-average and under-utilized.
long overLoadedBytes = 0L, underLoadedBytes = 0L; long overLoadedBytes = 0L, underLoadedBytes = 0L;
for(DatanodeStorageReport r : reports) { for(DatanodeStorageReport r : reports) {
final BalancerDatanode dn = dispatcher.newDatanode(r); final DDatanode dn = dispatcher.newDatanode(r);
for(StorageType t : StorageType.asList()) { for(StorageType t : StorageType.asList()) {
final Double utilization = policy.getUtilization(r, t); final Double utilization = policy.getUtilization(r, t);
if (utilization == null) { // datanode does not have such storage type if (utilization == null) { // datanode does not have such storage type
@ -269,9 +284,9 @@ private long init(List<DatanodeStorageReport> reports) {
final long maxSize2Move = computeMaxSize2Move(capacity, final long maxSize2Move = computeMaxSize2Move(capacity,
getRemaining(r, t), utilizationDiff, threshold); getRemaining(r, t), utilizationDiff, threshold);
final BalancerDatanode.StorageGroup g; final StorageGroup g;
if (utilizationDiff > 0) { if (utilizationDiff > 0) {
final Source s = dn.addSource(t, utilization, maxSize2Move, dispatcher); final Source s = dn.addSource(t, maxSize2Move, dispatcher);
if (thresholdDiff <= 0) { // within threshold if (thresholdDiff <= 0) { // within threshold
aboveAvgUtilized.add(s); aboveAvgUtilized.add(s);
} else { } else {
@ -280,7 +295,7 @@ private long init(List<DatanodeStorageReport> reports) {
} }
g = s; g = s;
} else { } else {
g = dn.addStorageGroup(t, utilization, maxSize2Move); g = dn.addStorageGroup(t, maxSize2Move);
if (thresholdDiff <= 0) { // within threshold if (thresholdDiff <= 0) { // within threshold
belowAvgUtilized.add(g); belowAvgUtilized.add(g);
} else { } else {
@ -329,7 +344,7 @@ private void logUtilizationCollections() {
logUtilizationCollection("underutilized", underUtilized); logUtilizationCollection("underutilized", underUtilized);
} }
private static <T extends BalancerDatanode.StorageGroup> private static <T extends StorageGroup>
void logUtilizationCollection(String name, Collection<T> items) { void logUtilizationCollection(String name, Collection<T> items) {
LOG.info(items.size() + " " + name + ": " + items); LOG.info(items.size() + " " + name + ": " + items);
} }
@ -382,8 +397,7 @@ private void chooseStorageGroups(final Matcher matcher) {
* datanodes or the candidates are source nodes with (utilization > Avg), and * datanodes or the candidates are source nodes with (utilization > Avg), and
* the others are target nodes with (utilization < Avg). * the others are target nodes with (utilization < Avg).
*/ */
private <G extends BalancerDatanode.StorageGroup, private <G extends StorageGroup, C extends StorageGroup>
C extends BalancerDatanode.StorageGroup>
void chooseStorageGroups(Collection<G> groups, Collection<C> candidates, void chooseStorageGroups(Collection<G> groups, Collection<C> candidates,
Matcher matcher) { Matcher matcher) {
for(final Iterator<G> i = groups.iterator(); i.hasNext();) { for(final Iterator<G> i = groups.iterator(); i.hasNext();) {
@ -399,9 +413,8 @@ void chooseStorageGroups(Collection<G> groups, Collection<C> candidates,
* For the given datanode, choose a candidate and then schedule it. * For the given datanode, choose a candidate and then schedule it.
* @return true if a candidate is chosen; false if no candidates is chosen. * @return true if a candidate is chosen; false if no candidates is chosen.
*/ */
private <C extends BalancerDatanode.StorageGroup> private <C extends StorageGroup> boolean choose4One(StorageGroup g,
boolean choose4One(BalancerDatanode.StorageGroup g, Collection<C> candidates, Matcher matcher) {
Collection<C> candidates, Matcher matcher) {
final Iterator<C> i = candidates.iterator(); final Iterator<C> i = candidates.iterator();
final C chosen = chooseCandidate(g, i, matcher); final C chosen = chooseCandidate(g, i, matcher);
@ -419,8 +432,7 @@ boolean choose4One(BalancerDatanode.StorageGroup g,
return true; return true;
} }
private void matchSourceWithTargetToMove(Source source, private void matchSourceWithTargetToMove(Source source, StorageGroup target) {
BalancerDatanode.StorageGroup target) {
long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove()); long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
final Task task = new Task(target, size); final Task task = new Task(target, size);
source.addTask(task); source.addTask(task);
@ -431,8 +443,7 @@ private void matchSourceWithTargetToMove(Source source,
} }
/** Choose a candidate for the given datanode. */ /** Choose a candidate for the given datanode. */
private <G extends BalancerDatanode.StorageGroup, private <G extends StorageGroup, C extends StorageGroup>
C extends BalancerDatanode.StorageGroup>
C chooseCandidate(G g, Iterator<C> candidates, Matcher matcher) { C chooseCandidate(G g, Iterator<C> candidates, Matcher matcher) {
if (g.hasSpaceForScheduling()) { if (g.hasSpaceForScheduling()) {
for(; candidates.hasNext(); ) { for(; candidates.hasNext(); ) {
@ -440,7 +451,7 @@ C chooseCandidate(G g, Iterator<C> candidates, Matcher matcher) {
if (!c.hasSpaceForScheduling()) { if (!c.hasSpaceForScheduling()) {
candidates.remove(); candidates.remove();
} else if (matcher.match(dispatcher.getCluster(), } else if (matcher.match(dispatcher.getCluster(),
g.getDatanode(), c.getDatanode())) { g.getDatanodeInfo(), c.getDatanodeInfo())) {
return c; return c;
} }
} }
@ -458,34 +469,15 @@ private void resetData(Configuration conf) {
dispatcher.reset(conf);; dispatcher.reset(conf);;
} }
// Exit status
enum ReturnStatus {
// These int values will map directly to the balancer process's exit code.
SUCCESS(0),
IN_PROGRESS(1),
ALREADY_RUNNING(-1),
NO_MOVE_BLOCK(-2),
NO_MOVE_PROGRESS(-3),
IO_EXCEPTION(-4),
ILLEGAL_ARGS(-5),
INTERRUPTED(-6);
final int code;
ReturnStatus(int code) {
this.code = code;
}
}
/** Run an iteration for all datanodes. */ /** Run an iteration for all datanodes. */
private ReturnStatus run(int iteration, Formatter formatter, private ExitStatus run(int iteration, Formatter formatter,
Configuration conf) { Configuration conf) {
try { try {
final List<DatanodeStorageReport> reports = dispatcher.init(); final List<DatanodeStorageReport> reports = dispatcher.init();
final long bytesLeftToMove = init(reports); final long bytesLeftToMove = init(reports);
if (bytesLeftToMove == 0) { if (bytesLeftToMove == 0) {
System.out.println("The cluster is balanced. Exiting..."); System.out.println("The cluster is balanced. Exiting...");
return ReturnStatus.SUCCESS; return ExitStatus.SUCCESS;
} else { } else {
LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove) LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove)
+ " to make the cluster balanced." ); + " to make the cluster balanced." );
@ -499,7 +491,7 @@ private ReturnStatus run(int iteration, Formatter formatter,
final long bytesToMove = chooseStorageGroups(); final long bytesToMove = chooseStorageGroups();
if (bytesToMove == 0) { if (bytesToMove == 0) {
System.out.println("No block can be moved. Exiting..."); System.out.println("No block can be moved. Exiting...");
return ReturnStatus.NO_MOVE_BLOCK; return ExitStatus.NO_MOVE_BLOCK;
} else { } else {
LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) + LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) +
" in this iteration"); " in this iteration");
@ -520,19 +512,19 @@ private ReturnStatus run(int iteration, Formatter formatter,
* Exit no byte has been moved for 5 consecutive iterations. * Exit no byte has been moved for 5 consecutive iterations.
*/ */
if (!dispatcher.dispatchAndCheckContinue()) { if (!dispatcher.dispatchAndCheckContinue()) {
return ReturnStatus.NO_MOVE_PROGRESS; return ExitStatus.NO_MOVE_PROGRESS;
} }
return ReturnStatus.IN_PROGRESS; return ExitStatus.IN_PROGRESS;
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
System.out.println(e + ". Exiting ..."); System.out.println(e + ". Exiting ...");
return ReturnStatus.ILLEGAL_ARGS; return ExitStatus.ILLEGAL_ARGUMENTS;
} catch (IOException e) { } catch (IOException e) {
System.out.println(e + ". Exiting ..."); System.out.println(e + ". Exiting ...");
return ReturnStatus.IO_EXCEPTION; return ExitStatus.IO_EXCEPTION;
} catch (InterruptedException e) { } catch (InterruptedException e) {
System.out.println(e + ". Exiting ..."); System.out.println(e + ". Exiting ...");
return ReturnStatus.INTERRUPTED; return ExitStatus.INTERRUPTED;
} finally { } finally {
dispatcher.shutdownNow(); dispatcher.shutdownNow();
} }
@ -571,14 +563,14 @@ static int run(Collection<URI> namenodes, final Parameters p,
Collections.shuffle(connectors); Collections.shuffle(connectors);
for(NameNodeConnector nnc : connectors) { for(NameNodeConnector nnc : connectors) {
final Balancer b = new Balancer(nnc, p, conf); final Balancer b = new Balancer(nnc, p, conf);
final ReturnStatus r = b.run(iteration, formatter, conf); final ExitStatus r = b.run(iteration, formatter, conf);
// clean all lists // clean all lists
b.resetData(conf); b.resetData(conf);
if (r == ReturnStatus.IN_PROGRESS) { if (r == ExitStatus.IN_PROGRESS) {
done = false; done = false;
} else if (r != ReturnStatus.SUCCESS) { } else if (r != ExitStatus.SUCCESS) {
//must be an error statue, return. //must be an error statue, return.
return r.code; return r.getExitCode();
} }
} }
@ -591,7 +583,7 @@ static int run(Collection<URI> namenodes, final Parameters p,
nnc.close(); nnc.close();
} }
} }
return ReturnStatus.SUCCESS.code; return ExitStatus.SUCCESS.getExitCode();
} }
/* Given elaspedTime in ms, return a printable string */ /* Given elaspedTime in ms, return a printable string */
@ -662,10 +654,10 @@ public int run(String[] args) {
return Balancer.run(namenodes, parse(args), conf); return Balancer.run(namenodes, parse(args), conf);
} catch (IOException e) { } catch (IOException e) {
System.out.println(e + ". Exiting ..."); System.out.println(e + ". Exiting ...");
return ReturnStatus.IO_EXCEPTION.code; return ExitStatus.IO_EXCEPTION.getExitCode();
} catch (InterruptedException e) { } catch (InterruptedException e) {
System.out.println(e + ". Exiting ..."); System.out.println(e + ". Exiting ...");
return ReturnStatus.INTERRUPTED.code; return ExitStatus.INTERRUPTED.getExitCode();
} finally { } finally {
System.out.format("%-24s ", DateFormat.getDateTimeInstance().format(new Date())); System.out.format("%-24s ", DateFormat.getDateTimeInstance().format(new Date()));
System.out.println("Balancing took " + time2Str(Time.now()-startTime)); System.out.println("Balancing took " + time2Str(Time.now()-startTime));

View File

@ -48,7 +48,6 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
@ -63,6 +62,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
@ -91,7 +91,6 @@ public class Dispatcher {
// minutes // minutes
private final NameNodeConnector nnc; private final NameNodeConnector nnc;
private final KeyManager keyManager;
private final SaslDataTransferClient saslClient; private final SaslDataTransferClient saslClient;
/** Set of datanodes to be excluded. */ /** Set of datanodes to be excluded. */
@ -100,11 +99,10 @@ public class Dispatcher {
private final Set<String> includedNodes; private final Set<String> includedNodes;
private final Collection<Source> sources = new HashSet<Source>(); private final Collection<Source> sources = new HashSet<Source>();
private final Collection<BalancerDatanode.StorageGroup> targets private final Collection<StorageGroup> targets = new HashSet<StorageGroup>();
= new HashSet<BalancerDatanode.StorageGroup>();
private final GlobalBlockMap globalBlocks = new GlobalBlockMap(); private final GlobalBlockMap globalBlocks = new GlobalBlockMap();
private final MovedBlocks<BalancerDatanode.StorageGroup> movedBlocks; private final MovedBlocks<StorageGroup> movedBlocks;
/** Map (datanodeUuid,storageType -> StorageGroup) */ /** Map (datanodeUuid,storageType -> StorageGroup) */
private final StorageGroupMap storageGroupMap = new StorageGroupMap(); private final StorageGroupMap storageGroupMap = new StorageGroupMap();
@ -135,8 +133,7 @@ private DBlock get(Block b) {
} }
/** Remove all blocks except for the moved blocks. */ /** Remove all blocks except for the moved blocks. */
private void removeAllButRetain( private void removeAllButRetain(MovedBlocks<StorageGroup> movedBlocks) {
MovedBlocks<BalancerDatanode.StorageGroup> movedBlocks) {
for (Iterator<Block> i = map.keySet().iterator(); i.hasNext();) { for (Iterator<Block> i = map.keySet().iterator(); i.hasNext();) {
if (!movedBlocks.contains(i.next())) { if (!movedBlocks.contains(i.next())) {
i.remove(); i.remove();
@ -150,17 +147,15 @@ private static String toKey(String datanodeUuid, StorageType storageType) {
return datanodeUuid + ":" + storageType; return datanodeUuid + ":" + storageType;
} }
private final Map<String, BalancerDatanode.StorageGroup> map private final Map<String, StorageGroup> map = new HashMap<String, StorageGroup>();
= new HashMap<String, BalancerDatanode.StorageGroup>();
BalancerDatanode.StorageGroup get(String datanodeUuid, StorageGroup get(String datanodeUuid, StorageType storageType) {
StorageType storageType) {
return map.get(toKey(datanodeUuid, storageType)); return map.get(toKey(datanodeUuid, storageType));
} }
void put(BalancerDatanode.StorageGroup g) { void put(StorageGroup g) {
final String key = toKey(g.getDatanode().getDatanodeUuid(), g.storageType); final String key = toKey(g.getDatanodeInfo().getDatanodeUuid(), g.storageType);
final BalancerDatanode.StorageGroup existing = map.put(key, g); final StorageGroup existing = map.put(key, g);
Preconditions.checkState(existing == null); Preconditions.checkState(existing == null);
} }
@ -177,8 +172,8 @@ void clear() {
private class PendingMove { private class PendingMove {
private DBlock block; private DBlock block;
private Source source; private Source source;
private BalancerDatanode proxySource; private DDatanode proxySource;
private BalancerDatanode.StorageGroup target; private StorageGroup target;
private PendingMove() { private PendingMove() {
} }
@ -235,24 +230,24 @@ private boolean markMovedIfGoodBlock(DBlock block) {
* @return true if a proxy is found; otherwise false * @return true if a proxy is found; otherwise false
*/ */
private boolean chooseProxySource() { private boolean chooseProxySource() {
final DatanodeInfo targetDN = target.getDatanode(); final DatanodeInfo targetDN = target.getDatanodeInfo();
// if node group is supported, first try add nodes in the same node group // if node group is supported, first try add nodes in the same node group
if (cluster.isNodeGroupAware()) { if (cluster.isNodeGroupAware()) {
for (BalancerDatanode.StorageGroup loc : block.getLocations()) { for (StorageGroup loc : block.getLocations()) {
if (cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN) if (cluster.isOnSameNodeGroup(loc.getDatanodeInfo(), targetDN)
&& addTo(loc)) { && addTo(loc)) {
return true; return true;
} }
} }
} }
// check if there is replica which is on the same rack with the target // check if there is replica which is on the same rack with the target
for (BalancerDatanode.StorageGroup loc : block.getLocations()) { for (StorageGroup loc : block.getLocations()) {
if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) { if (cluster.isOnSameRack(loc.getDatanodeInfo(), targetDN) && addTo(loc)) {
return true; return true;
} }
} }
// find out a non-busy replica // find out a non-busy replica
for (BalancerDatanode.StorageGroup loc : block.getLocations()) { for (StorageGroup loc : block.getLocations()) {
if (addTo(loc)) { if (addTo(loc)) {
return true; return true;
} }
@ -261,10 +256,10 @@ && addTo(loc)) {
} }
/** add to a proxy source for specific block movement */ /** add to a proxy source for specific block movement */
private boolean addTo(BalancerDatanode.StorageGroup g) { private boolean addTo(StorageGroup g) {
final BalancerDatanode bdn = g.getBalancerDatanode(); final DDatanode dn = g.getDDatanode();
if (bdn.addPendingBlock(this)) { if (dn.addPendingBlock(this)) {
proxySource = bdn; proxySource = dn;
return true; return true;
} }
return false; return false;
@ -281,14 +276,13 @@ private void dispatch() {
DataInputStream in = null; DataInputStream in = null;
try { try {
sock.connect( sock.connect(
NetUtils.createSocketAddr(target.getDatanode().getXferAddr()), NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()),
HdfsServerConstants.READ_TIMEOUT); HdfsServerConstants.READ_TIMEOUT);
/* /*
* Unfortunately we don't have a good way to know if the Datanode is * Unfortunately we don't have a good way to know if the Datanode is
* taking a really long time to move a block, OR something has gone * taking a really long time to move a block, OR something has gone
* wrong and it's never going to finish. To deal with this scenario, we * wrong and it's never going to finish. To deal with this scenario, we
* set a long timeout (20 minutes) to avoid hanging the balancer * set a long timeout (20 minutes) to avoid hanging indefinitely.
* indefinitely.
*/ */
sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT); sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT);
@ -298,9 +292,10 @@ private void dispatch() {
InputStream unbufIn = sock.getInputStream(); InputStream unbufIn = sock.getInputStream();
ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(), ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(),
block.getBlock()); block.getBlock());
Token<BlockTokenIdentifier> accessToken = keyManager.getAccessToken(eb); final KeyManager km = nnc.getKeyManager();
Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb);
IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
unbufIn, keyManager, accessToken, target.getDatanode()); unbufIn, km, accessToken, target.getDatanodeInfo());
unbufOut = saslStreams.out; unbufOut = saslStreams.out;
unbufIn = saslStreams.in; unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut, out = new DataOutputStream(new BufferedOutputStream(unbufOut,
@ -314,21 +309,19 @@ private void dispatch() {
LOG.info("Successfully moved " + this); LOG.info("Successfully moved " + this);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to move " + this + ": " + e.getMessage()); LOG.warn("Failed to move " + this + ": " + e.getMessage());
/* // Proxy or target may have some issues, delay before using these nodes
* proxy or target may have an issue, insert a small delay before using // further in order to avoid a potential storm of "threads quota
* these nodes further. This avoids a potential storm of // exceeded" warnings when the dispatcher gets out of sync with work
* "threads quota exceeded" Warnings when the balancer gets out of sync // going on in datanodes.
* with work going on in datanode.
*/
proxySource.activateDelay(DELAY_AFTER_ERROR); proxySource.activateDelay(DELAY_AFTER_ERROR);
target.getBalancerDatanode().activateDelay(DELAY_AFTER_ERROR); target.getDDatanode().activateDelay(DELAY_AFTER_ERROR);
} finally { } finally {
IOUtils.closeStream(out); IOUtils.closeStream(out);
IOUtils.closeStream(in); IOUtils.closeStream(in);
IOUtils.closeSocket(sock); IOUtils.closeSocket(sock);
proxySource.removePendingBlock(this); proxySource.removePendingBlock(this);
target.getBalancerDatanode().removePendingBlock(this); target.getDDatanode().removePendingBlock(this);
synchronized (this) { synchronized (this) {
reset(); reset();
@ -342,8 +335,8 @@ private void dispatch() {
/** Send a block replace request to the output stream */ /** Send a block replace request to the output stream */
private void sendRequest(DataOutputStream out, ExtendedBlock eb, private void sendRequest(DataOutputStream out, ExtendedBlock eb,
Token<BlockTokenIdentifier> accessToken) throws IOException { Token<BlockTokenIdentifier> accessToken) throws IOException {
new Sender(out).replaceBlock(eb, target.storageType, accessToken, source new Sender(out).replaceBlock(eb, target.storageType, accessToken,
.getDatanode().getDatanodeUuid(), proxySource.datanode); source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode);
} }
/** Receive a block copy response from the input stream */ /** Receive a block copy response from the input stream */
@ -368,8 +361,7 @@ private void reset() {
} }
/** A class for keeping track of block locations in the dispatcher. */ /** A class for keeping track of block locations in the dispatcher. */
private static class DBlock extends private static class DBlock extends MovedBlocks.Locations<StorageGroup> {
MovedBlocks.Locations<BalancerDatanode.StorageGroup> {
DBlock(Block block) { DBlock(Block block) {
super(block); super(block);
} }
@ -377,10 +369,10 @@ private static class DBlock extends
/** The class represents a desired move. */ /** The class represents a desired move. */
static class Task { static class Task {
private final BalancerDatanode.StorageGroup target; private final StorageGroup target;
private long size; // bytes scheduled to move private long size; // bytes scheduled to move
Task(BalancerDatanode.StorageGroup target, long size) { Task(StorageGroup target, long size) {
this.target = target; this.target = target;
this.size = size; this.size = size;
} }
@ -391,28 +383,25 @@ long getSize() {
} }
/** A class that keeps track of a datanode. */ /** A class that keeps track of a datanode. */
static class BalancerDatanode { static class DDatanode {
/** A group of storages in a datanode with the same storage type. */ /** A group of storages in a datanode with the same storage type. */
class StorageGroup { class StorageGroup {
final StorageType storageType; final StorageType storageType;
final double utilization;
final long maxSize2Move; final long maxSize2Move;
private long scheduledSize = 0L; private long scheduledSize = 0L;
private StorageGroup(StorageType storageType, double utilization, private StorageGroup(StorageType storageType, long maxSize2Move) {
long maxSize2Move) {
this.storageType = storageType; this.storageType = storageType;
this.utilization = utilization;
this.maxSize2Move = maxSize2Move; this.maxSize2Move = maxSize2Move;
} }
BalancerDatanode getBalancerDatanode() { private DDatanode getDDatanode() {
return BalancerDatanode.this; return DDatanode.this;
} }
DatanodeInfo getDatanode() { DatanodeInfo getDatanodeInfo() {
return BalancerDatanode.this.datanode; return DDatanode.this.datanode;
} }
/** Decide if still need to move more bytes */ /** Decide if still need to move more bytes */
@ -447,7 +436,7 @@ String getDisplayName() {
@Override @Override
public String toString() { public String toString() {
return "" + utilization; return getDisplayName();
} }
} }
@ -461,10 +450,10 @@ public String toString() {
@Override @Override
public String toString() { public String toString() {
return getClass().getSimpleName() + ":" + datanode + ":" + storageMap; return getClass().getSimpleName() + ":" + datanode + ":" + storageMap.values();
} }
private BalancerDatanode(DatanodeStorageReport r, int maxConcurrentMoves) { private DDatanode(DatanodeStorageReport r, int maxConcurrentMoves) {
this.datanode = r.getDatanodeInfo(); this.datanode = r.getDatanodeInfo();
this.maxConcurrentMoves = maxConcurrentMoves; this.maxConcurrentMoves = maxConcurrentMoves;
this.pendings = new ArrayList<PendingMove>(maxConcurrentMoves); this.pendings = new ArrayList<PendingMove>(maxConcurrentMoves);
@ -475,18 +464,14 @@ private void put(StorageType storageType, StorageGroup g) {
Preconditions.checkState(existing == null); Preconditions.checkState(existing == null);
} }
StorageGroup addStorageGroup(StorageType storageType, double utilization, StorageGroup addStorageGroup(StorageType storageType, long maxSize2Move) {
long maxSize2Move) { final StorageGroup g = new StorageGroup(storageType, maxSize2Move);
final StorageGroup g = new StorageGroup(storageType, utilization,
maxSize2Move);
put(storageType, g); put(storageType, g);
return g; return g;
} }
Source addSource(StorageType storageType, double utilization, Source addSource(StorageType storageType, long maxSize2Move, Dispatcher d) {
long maxSize2Move, Dispatcher balancer) { final Source s = d.new Source(storageType, maxSize2Move, this);
final Source s = balancer.new Source(storageType, utilization,
maxSize2Move, this);
put(storageType, s); put(storageType, s);
return s; return s;
} }
@ -528,7 +513,7 @@ synchronized boolean removePendingBlock(PendingMove pendingBlock) {
} }
/** A node that can be the sources of a block move */ /** A node that can be the sources of a block move */
class Source extends BalancerDatanode.StorageGroup { class Source extends DDatanode.StorageGroup {
private final List<Task> tasks = new ArrayList<Task>(2); private final List<Task> tasks = new ArrayList<Task>(2);
private long blocksToReceive = 0L; private long blocksToReceive = 0L;
@ -539,9 +524,8 @@ class Source extends BalancerDatanode.StorageGroup {
*/ */
private final List<DBlock> srcBlocks = new ArrayList<DBlock>(); private final List<DBlock> srcBlocks = new ArrayList<DBlock>();
private Source(StorageType storageType, double utilization, private Source(StorageType storageType, long maxSize2Move, DDatanode dn) {
long maxSize2Move, BalancerDatanode dn) { dn.super(storageType, maxSize2Move);
dn.super(storageType, utilization, maxSize2Move);
} }
/** Add a task */ /** Add a task */
@ -565,7 +549,7 @@ Iterator<DBlock> getBlockIterator() {
*/ */
private long getBlockList() throws IOException { private long getBlockList() throws IOException {
final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive); final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanode(), size); final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size);
long bytesReceived = 0; long bytesReceived = 0;
for (BlockWithLocations blk : newBlocks.getBlocks()) { for (BlockWithLocations blk : newBlocks.getBlocks()) {
@ -579,7 +563,7 @@ private long getBlockList() throws IOException {
final String[] datanodeUuids = blk.getDatanodeUuids(); final String[] datanodeUuids = blk.getDatanodeUuids();
final StorageType[] storageTypes = blk.getStorageTypes(); final StorageType[] storageTypes = blk.getStorageTypes();
for (int i = 0; i < datanodeUuids.length; i++) { for (int i = 0; i < datanodeUuids.length; i++) {
final BalancerDatanode.StorageGroup g = storageGroupMap.get( final StorageGroup g = storageGroupMap.get(
datanodeUuids[i], storageTypes[i]); datanodeUuids[i], storageTypes[i]);
if (g != null) { // not unknown if (g != null) { // not unknown
block.addLocation(g); block.addLocation(g);
@ -617,7 +601,7 @@ private boolean isGoodBlockCandidate(DBlock block) {
private PendingMove chooseNextMove() { private PendingMove chooseNextMove() {
for (Iterator<Task> i = tasks.iterator(); i.hasNext();) { for (Iterator<Task> i = tasks.iterator(); i.hasNext();) {
final Task task = i.next(); final Task task = i.next();
final BalancerDatanode target = task.target.getBalancerDatanode(); final DDatanode target = task.target.getDDatanode();
PendingMove pendingBlock = new PendingMove(); PendingMove pendingBlock = new PendingMove();
if (target.addPendingBlock(pendingBlock)) { if (target.addPendingBlock(pendingBlock)) {
// target is not busy, so do a tentative block allocation // target is not busy, so do a tentative block allocation
@ -670,7 +654,7 @@ private void dispatchBlocks() {
final long startTime = Time.monotonicNow(); final long startTime = Time.monotonicNow();
this.blocksToReceive = 2 * getScheduledSize(); this.blocksToReceive = 2 * getScheduledSize();
boolean isTimeUp = false; boolean isTimeUp = false;
int noPendingBlockIteration = 0; int noPendingMoveIteration = 0;
while (!isTimeUp && getScheduledSize() > 0 while (!isTimeUp && getScheduledSize() > 0
&& (!srcBlocks.isEmpty() || blocksToReceive > 0)) { && (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
final PendingMove p = chooseNextMove(); final PendingMove p = chooseNextMove();
@ -699,11 +683,11 @@ public void run() {
return; return;
} }
} else { } else {
// source node cannot find a pendingBlockToMove, iteration +1 // source node cannot find a pending block to move, iteration +1
noPendingBlockIteration++; noPendingMoveIteration++;
// in case no blocks can be moved for source node's task, // in case no blocks can be moved for source node's task,
// jump out of while-loop after 5 iterations. // jump out of while-loop after 5 iterations.
if (noPendingBlockIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) { if (noPendingMoveIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) {
resetScheduledSize(); resetScheduledSize();
} }
} }
@ -726,29 +710,19 @@ public void run() {
} }
} }
Dispatcher(NameNodeConnector theblockpool, Set<String> includedNodes, public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
Set<String> excludedNodes, Configuration conf) { Set<String> excludedNodes, long movedWinWidth, int moverThreads,
this.nnc = theblockpool; int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) {
this.keyManager = nnc.getKeyManager(); this.nnc = nnc;
this.excludedNodes = excludedNodes; this.excludedNodes = excludedNodes;
this.includedNodes = includedNodes; this.includedNodes = includedNodes;
this.movedBlocks = new MovedBlocks<StorageGroup>(movedWinWidth);
final long movedWinWidth = conf.getLong(
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
movedBlocks = new MovedBlocks<BalancerDatanode.StorageGroup>(movedWinWidth);
this.cluster = NetworkTopology.getInstance(conf); this.cluster = NetworkTopology.getInstance(conf);
this.moveExecutor = Executors.newFixedThreadPool(conf.getInt( this.moveExecutor = Executors.newFixedThreadPool(moverThreads);
DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY, this.dispatchExecutor = Executors.newFixedThreadPool(dispatcherThreads);
DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT)); this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
this.dispatchExecutor = Executors.newFixedThreadPool(conf.getInt(
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT));
this.maxConcurrentMovesPerNode = conf.getInt(
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
final boolean fallbackToSimpleAuthAllowed = conf.getBoolean( final boolean fallbackToSimpleAuthAllowed = conf.getBoolean(
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
@ -784,7 +758,7 @@ long bytesToMove() {
return b; return b;
} }
void add(Source source, BalancerDatanode.StorageGroup target) { void add(Source source, StorageGroup target) {
sources.add(source); sources.add(source);
targets.add(target); targets.add(target);
} }
@ -826,8 +800,8 @@ List<DatanodeStorageReport> init() throws IOException {
return trimmed; return trimmed;
} }
public BalancerDatanode newDatanode(DatanodeStorageReport r) { public DDatanode newDatanode(DatanodeStorageReport r) {
return new BalancerDatanode(r, maxConcurrentMovesPerNode); return new DDatanode(r, maxConcurrentMovesPerNode);
} }
public boolean dispatchAndCheckContinue() throws InterruptedException { public boolean dispatchAndCheckContinue() throws InterruptedException {
@ -884,8 +858,8 @@ static void setBlockMoveWaitTime(long time) {
private void waitForMoveCompletion() { private void waitForMoveCompletion() {
for(;;) { for(;;) {
boolean empty = true; boolean empty = true;
for (BalancerDatanode.StorageGroup t : targets) { for (StorageGroup t : targets) {
if (!t.getBalancerDatanode().isPendingQEmpty()) { if (!t.getDDatanode().isPendingQEmpty()) {
empty = false; empty = false;
break; break;
} }
@ -907,8 +881,8 @@ private void waitForMoveCompletion() {
* 2. the block does not have a replica on the target; * 2. the block does not have a replica on the target;
* 3. doing the move does not reduce the number of racks that the block has * 3. doing the move does not reduce the number of racks that the block has
*/ */
private boolean isGoodBlockCandidate(Source source, private boolean isGoodBlockCandidate(Source source, StorageGroup target,
BalancerDatanode.StorageGroup target, DBlock block) { DBlock block) {
if (source.storageType != target.storageType) { if (source.storageType != target.storageType) {
return false; return false;
} }
@ -933,17 +907,17 @@ && isOnSameNodeGroupWithReplicas(target, block, source)) {
* Determine whether moving the given block replica from source to target * Determine whether moving the given block replica from source to target
* would reduce the number of racks of the block replicas. * would reduce the number of racks of the block replicas.
*/ */
private boolean reduceNumOfRacks(Source source, private boolean reduceNumOfRacks(Source source, StorageGroup target,
BalancerDatanode.StorageGroup target, DBlock block) { DBlock block) {
final DatanodeInfo sourceDn = source.getDatanode(); final DatanodeInfo sourceDn = source.getDatanodeInfo();
if (cluster.isOnSameRack(sourceDn, target.getDatanode())) { if (cluster.isOnSameRack(sourceDn, target.getDatanodeInfo())) {
// source and target are on the same rack // source and target are on the same rack
return false; return false;
} }
boolean notOnSameRack = true; boolean notOnSameRack = true;
synchronized (block) { synchronized (block) {
for (BalancerDatanode.StorageGroup loc : block.getLocations()) { for (StorageGroup loc : block.getLocations()) {
if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) { if (cluster.isOnSameRack(loc.getDatanodeInfo(), target.getDatanodeInfo())) {
notOnSameRack = false; notOnSameRack = false;
break; break;
} }
@ -953,8 +927,8 @@ private boolean reduceNumOfRacks(Source source,
// target is not on the same rack as any replica // target is not on the same rack as any replica
return false; return false;
} }
for (BalancerDatanode.StorageGroup g : block.getLocations()) { for (StorageGroup g : block.getLocations()) {
if (g != source && cluster.isOnSameRack(g.getDatanode(), sourceDn)) { if (g != source && cluster.isOnSameRack(g.getDatanodeInfo(), sourceDn)) {
// source is on the same rack of another replica // source is on the same rack of another replica
return false; return false;
} }
@ -971,10 +945,10 @@ private boolean reduceNumOfRacks(Source source,
* group with target * group with target
*/ */
private boolean isOnSameNodeGroupWithReplicas( private boolean isOnSameNodeGroupWithReplicas(
BalancerDatanode.StorageGroup target, DBlock block, Source source) { StorageGroup target, DBlock block, Source source) {
final DatanodeInfo targetDn = target.getDatanode(); final DatanodeInfo targetDn = target.getDatanodeInfo();
for (BalancerDatanode.StorageGroup g : block.getLocations()) { for (StorageGroup g : block.getLocations()) {
if (g != source && cluster.isOnSameNodeGroup(g.getDatanode(), targetDn)) { if (g != source && cluster.isOnSameNodeGroup(g.getDatanodeInfo(), targetDn)) {
return true; return true;
} }
} }

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.balancer;
/**
* Exit status - The values associated with each exit status is directly mapped
* to the process's exit code in command line.
*/
public enum ExitStatus {
SUCCESS(0),
IN_PROGRESS(1),
ALREADY_RUNNING(-1),
NO_MOVE_BLOCK(-2),
NO_MOVE_PROGRESS(-3),
IO_EXCEPTION(-4),
ILLEGAL_ARGUMENTS(-5),
INTERRUPTED(-6);
private final int code;
private ExitStatus(int code) {
this.code = code;
}
/** @return the command line exit code. */
public int getExitCode() {
return code;
}
}

View File

@ -570,10 +570,10 @@ private void runBalancer(Configuration conf,
final int r = Balancer.run(namenodes, p, conf); final int r = Balancer.run(namenodes, p, conf);
if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) { DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) {
assertEquals(Balancer.ReturnStatus.NO_MOVE_PROGRESS.code, r); assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
return; return;
} else { } else {
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
} }
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
LOG.info("Rebalancing with default ctor."); LOG.info("Rebalancing with default ctor.");
@ -717,7 +717,7 @@ public void testUnknownDatanode() throws Exception {
Balancer.Parameters.DEFAULT.threshold, Balancer.Parameters.DEFAULT.threshold,
datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded); datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded);
final int r = Balancer.run(namenodes, p, conf); final int r = Balancer.run(namenodes, p, conf);
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
} }

View File

@ -98,7 +98,7 @@ public void testBalancerWithHANameNodes() throws Exception {
assertEquals(1, namenodes.size()); assertEquals(1, namenodes.size());
assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster))); assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster)));
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client, TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
cluster, Balancer.Parameters.DEFAULT); cluster, Balancer.Parameters.DEFAULT);
} finally { } finally {

View File

@ -160,7 +160,7 @@ static void runBalancer(Suite s,
// start rebalancing // start rebalancing
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(s.conf); final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(s.conf);
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, s.conf); final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, s.conf);
Assert.assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
LOG.info("BALANCER 2"); LOG.info("BALANCER 2");
wait(s.clients, totalUsed, totalCapacity); wait(s.clients, totalUsed, totalCapacity);

View File

@ -176,7 +176,7 @@ private void runBalancer(Configuration conf,
// start rebalancing // start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
waitForHeartBeat(totalUsedSpace, totalCapacity); waitForHeartBeat(totalUsedSpace, totalCapacity);
LOG.info("Rebalancing with default factor."); LOG.info("Rebalancing with default factor.");
@ -190,8 +190,8 @@ private void runBalancerCanFinish(Configuration conf,
// start rebalancing // start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
Assert.assertTrue(r == Balancer.ReturnStatus.SUCCESS.code || Assert.assertTrue(r == ExitStatus.SUCCESS.getExitCode() ||
(r == Balancer.ReturnStatus.NO_MOVE_PROGRESS.code)); (r == ExitStatus.NO_MOVE_PROGRESS.getExitCode()));
waitForHeartBeat(totalUsedSpace, totalCapacity); waitForHeartBeat(totalUsedSpace, totalCapacity);
LOG.info("Rebalancing with default factor."); LOG.info("Rebalancing with default factor.");
} }