diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index eca0009be90..8f70bb1df60 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -387,6 +387,9 @@ Release 2.6.0 - UNRELEASED HDFS-6828. Separate block replica dispatching from Balancer. (szetszwo via jing9) + HDFS-6837. Code cleanup for Balancer and Dispatcher. (szetszwo via + jing9) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 391d7edefc1..7661d25ee7f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -44,7 +44,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.StorageType; -import org.apache.hadoop.hdfs.server.balancer.Dispatcher.BalancerDatanode; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Source; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Task; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Util; @@ -184,10 +185,10 @@ public class Balancer { // all data node lists private final Collection overUtilized = new LinkedList(); private final Collection aboveAvgUtilized = new LinkedList(); - private final Collection belowAvgUtilized - = new LinkedList(); - private final Collection underUtilized - = new LinkedList(); + private final Collection belowAvgUtilized + = new LinkedList(); + private final Collection underUtilized + = new LinkedList(); /* Check that this Balancer is compatible with the Block Placement Policy * used by the Namenode. @@ -209,8 +210,22 @@ public class Balancer { * when connection fails. */ Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) { + final long movedWinWidth = conf.getLong( + DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, + DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT); + final int moverThreads = conf.getInt( + DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY, + DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT); + final int dispatcherThreads = conf.getInt( + DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, + DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT); + final int maxConcurrentMovesPerNode = conf.getInt( + DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); + this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded, - p.nodesToBeExcluded, conf); + p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads, + maxConcurrentMovesPerNode, conf); this.threshold = p.threshold; this.policy = p.policy; } @@ -255,7 +270,7 @@ public class Balancer { // over-utilized, above-average, below-average and under-utilized. long overLoadedBytes = 0L, underLoadedBytes = 0L; for(DatanodeStorageReport r : reports) { - final BalancerDatanode dn = dispatcher.newDatanode(r); + final DDatanode dn = dispatcher.newDatanode(r); for(StorageType t : StorageType.asList()) { final Double utilization = policy.getUtilization(r, t); if (utilization == null) { // datanode does not have such storage type @@ -268,9 +283,9 @@ public class Balancer { final long maxSize2Move = computeMaxSize2Move(capacity, getRemaining(r, t), utilizationDiff, threshold); - final BalancerDatanode.StorageGroup g; + final StorageGroup g; if (utilizationDiff > 0) { - final Source s = dn.addSource(t, utilization, maxSize2Move, dispatcher); + final Source s = dn.addSource(t, maxSize2Move, dispatcher); if (thresholdDiff <= 0) { // within threshold aboveAvgUtilized.add(s); } else { @@ -279,7 +294,7 @@ public class Balancer { } g = s; } else { - g = dn.addStorageGroup(t, utilization, maxSize2Move); + g = dn.addStorageGroup(t, maxSize2Move); if (thresholdDiff <= 0) { // within threshold belowAvgUtilized.add(g); } else { @@ -328,7 +343,7 @@ public class Balancer { logUtilizationCollection("underutilized", underUtilized); } - private static + private static void logUtilizationCollection(String name, Collection items) { LOG.info(items.size() + " " + name + ": " + items); } @@ -381,8 +396,7 @@ public class Balancer { * datanodes or the candidates are source nodes with (utilization > Avg), and * the others are target nodes with (utilization < Avg). */ - private + private void chooseStorageGroups(Collection groups, Collection candidates, Matcher matcher) { for(final Iterator i = groups.iterator(); i.hasNext();) { @@ -398,9 +412,8 @@ public class Balancer { * For the given datanode, choose a candidate and then schedule it. * @return true if a candidate is chosen; false if no candidates is chosen. */ - private - boolean choose4One(BalancerDatanode.StorageGroup g, - Collection candidates, Matcher matcher) { + private boolean choose4One(StorageGroup g, + Collection candidates, Matcher matcher) { final Iterator i = candidates.iterator(); final C chosen = chooseCandidate(g, i, matcher); @@ -418,8 +431,7 @@ public class Balancer { return true; } - private void matchSourceWithTargetToMove(Source source, - BalancerDatanode.StorageGroup target) { + private void matchSourceWithTargetToMove(Source source, StorageGroup target) { long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove()); final Task task = new Task(target, size); source.addTask(task); @@ -430,8 +442,7 @@ public class Balancer { } /** Choose a candidate for the given datanode. */ - private + private C chooseCandidate(G g, Iterator candidates, Matcher matcher) { if (g.hasSpaceForScheduling()) { for(; candidates.hasNext(); ) { @@ -439,7 +450,7 @@ public class Balancer { if (!c.hasSpaceForScheduling()) { candidates.remove(); } else if (matcher.match(dispatcher.getCluster(), - g.getDatanode(), c.getDatanode())) { + g.getDatanodeInfo(), c.getDatanodeInfo())) { return c; } } @@ -457,34 +468,15 @@ public class Balancer { dispatcher.reset(conf);; } - // Exit status - enum ReturnStatus { - // These int values will map directly to the balancer process's exit code. - SUCCESS(0), - IN_PROGRESS(1), - ALREADY_RUNNING(-1), - NO_MOVE_BLOCK(-2), - NO_MOVE_PROGRESS(-3), - IO_EXCEPTION(-4), - ILLEGAL_ARGS(-5), - INTERRUPTED(-6); - - final int code; - - ReturnStatus(int code) { - this.code = code; - } - } - /** Run an iteration for all datanodes. */ - private ReturnStatus run(int iteration, Formatter formatter, + private ExitStatus run(int iteration, Formatter formatter, Configuration conf) { try { final List reports = dispatcher.init(); final long bytesLeftToMove = init(reports); if (bytesLeftToMove == 0) { System.out.println("The cluster is balanced. Exiting..."); - return ReturnStatus.SUCCESS; + return ExitStatus.SUCCESS; } else { LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove) + " to make the cluster balanced." ); @@ -498,7 +490,7 @@ public class Balancer { final long bytesToMove = chooseStorageGroups(); if (bytesToMove == 0) { System.out.println("No block can be moved. Exiting..."); - return ReturnStatus.NO_MOVE_BLOCK; + return ExitStatus.NO_MOVE_BLOCK; } else { LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) + " in this iteration"); @@ -519,19 +511,19 @@ public class Balancer { * Exit no byte has been moved for 5 consecutive iterations. */ if (!dispatcher.dispatchAndCheckContinue()) { - return ReturnStatus.NO_MOVE_PROGRESS; + return ExitStatus.NO_MOVE_PROGRESS; } - return ReturnStatus.IN_PROGRESS; + return ExitStatus.IN_PROGRESS; } catch (IllegalArgumentException e) { System.out.println(e + ". Exiting ..."); - return ReturnStatus.ILLEGAL_ARGS; + return ExitStatus.ILLEGAL_ARGUMENTS; } catch (IOException e) { System.out.println(e + ". Exiting ..."); - return ReturnStatus.IO_EXCEPTION; + return ExitStatus.IO_EXCEPTION; } catch (InterruptedException e) { System.out.println(e + ". Exiting ..."); - return ReturnStatus.INTERRUPTED; + return ExitStatus.INTERRUPTED; } finally { dispatcher.shutdownNow(); } @@ -570,14 +562,14 @@ public class Balancer { Collections.shuffle(connectors); for(NameNodeConnector nnc : connectors) { final Balancer b = new Balancer(nnc, p, conf); - final ReturnStatus r = b.run(iteration, formatter, conf); + final ExitStatus r = b.run(iteration, formatter, conf); // clean all lists b.resetData(conf); - if (r == ReturnStatus.IN_PROGRESS) { + if (r == ExitStatus.IN_PROGRESS) { done = false; - } else if (r != ReturnStatus.SUCCESS) { + } else if (r != ExitStatus.SUCCESS) { //must be an error statue, return. - return r.code; + return r.getExitCode(); } } @@ -590,7 +582,7 @@ public class Balancer { nnc.close(); } } - return ReturnStatus.SUCCESS.code; + return ExitStatus.SUCCESS.getExitCode(); } /* Given elaspedTime in ms, return a printable string */ @@ -661,10 +653,10 @@ public class Balancer { return Balancer.run(namenodes, parse(args), conf); } catch (IOException e) { System.out.println(e + ". Exiting ..."); - return ReturnStatus.IO_EXCEPTION.code; + return ExitStatus.IO_EXCEPTION.getExitCode(); } catch (InterruptedException e) { System.out.println(e + ". Exiting ..."); - return ReturnStatus.INTERRUPTED.code; + return ExitStatus.INTERRUPTED.getExitCode(); } finally { System.out.format("%-24s ", DateFormat.getDateTimeInstance().format(new Date())); System.out.println("Balancing took " + time2Str(Time.now()-startTime)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index e8236bbe1c0..4a6f96be7e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -48,7 +48,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; @@ -63,6 +62,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; @@ -91,7 +91,6 @@ public class Dispatcher { // minutes private final NameNodeConnector nnc; - private final KeyManager keyManager; private final SaslDataTransferClient saslClient; /** Set of datanodes to be excluded. */ @@ -100,11 +99,10 @@ public class Dispatcher { private final Set includedNodes; private final Collection sources = new HashSet(); - private final Collection targets - = new HashSet(); + private final Collection targets = new HashSet(); private final GlobalBlockMap globalBlocks = new GlobalBlockMap(); - private final MovedBlocks movedBlocks; + private final MovedBlocks movedBlocks; /** Map (datanodeUuid,storageType -> StorageGroup) */ private final StorageGroupMap storageGroupMap = new StorageGroupMap(); @@ -135,8 +133,7 @@ public class Dispatcher { } /** Remove all blocks except for the moved blocks. */ - private void removeAllButRetain( - MovedBlocks movedBlocks) { + private void removeAllButRetain(MovedBlocks movedBlocks) { for (Iterator i = map.keySet().iterator(); i.hasNext();) { if (!movedBlocks.contains(i.next())) { i.remove(); @@ -150,17 +147,15 @@ public class Dispatcher { return datanodeUuid + ":" + storageType; } - private final Map map - = new HashMap(); + private final Map map = new HashMap(); - BalancerDatanode.StorageGroup get(String datanodeUuid, - StorageType storageType) { + StorageGroup get(String datanodeUuid, StorageType storageType) { return map.get(toKey(datanodeUuid, storageType)); } - void put(BalancerDatanode.StorageGroup g) { - final String key = toKey(g.getDatanode().getDatanodeUuid(), g.storageType); - final BalancerDatanode.StorageGroup existing = map.put(key, g); + void put(StorageGroup g) { + final String key = toKey(g.getDatanodeInfo().getDatanodeUuid(), g.storageType); + final StorageGroup existing = map.put(key, g); Preconditions.checkState(existing == null); } @@ -177,8 +172,8 @@ public class Dispatcher { private class PendingMove { private DBlock block; private Source source; - private BalancerDatanode proxySource; - private BalancerDatanode.StorageGroup target; + private DDatanode proxySource; + private StorageGroup target; private PendingMove() { } @@ -235,24 +230,24 @@ public class Dispatcher { * @return true if a proxy is found; otherwise false */ private boolean chooseProxySource() { - final DatanodeInfo targetDN = target.getDatanode(); + final DatanodeInfo targetDN = target.getDatanodeInfo(); // if node group is supported, first try add nodes in the same node group if (cluster.isNodeGroupAware()) { - for (BalancerDatanode.StorageGroup loc : block.getLocations()) { - if (cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN) + for (StorageGroup loc : block.getLocations()) { + if (cluster.isOnSameNodeGroup(loc.getDatanodeInfo(), targetDN) && addTo(loc)) { return true; } } } // check if there is replica which is on the same rack with the target - for (BalancerDatanode.StorageGroup loc : block.getLocations()) { - if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) { + for (StorageGroup loc : block.getLocations()) { + if (cluster.isOnSameRack(loc.getDatanodeInfo(), targetDN) && addTo(loc)) { return true; } } // find out a non-busy replica - for (BalancerDatanode.StorageGroup loc : block.getLocations()) { + for (StorageGroup loc : block.getLocations()) { if (addTo(loc)) { return true; } @@ -261,10 +256,10 @@ public class Dispatcher { } /** add to a proxy source for specific block movement */ - private boolean addTo(BalancerDatanode.StorageGroup g) { - final BalancerDatanode bdn = g.getBalancerDatanode(); - if (bdn.addPendingBlock(this)) { - proxySource = bdn; + private boolean addTo(StorageGroup g) { + final DDatanode dn = g.getDDatanode(); + if (dn.addPendingBlock(this)) { + proxySource = dn; return true; } return false; @@ -281,14 +276,13 @@ public class Dispatcher { DataInputStream in = null; try { sock.connect( - NetUtils.createSocketAddr(target.getDatanode().getXferAddr()), + NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()), HdfsServerConstants.READ_TIMEOUT); /* * Unfortunately we don't have a good way to know if the Datanode is * taking a really long time to move a block, OR something has gone * wrong and it's never going to finish. To deal with this scenario, we - * set a long timeout (20 minutes) to avoid hanging the balancer - * indefinitely. + * set a long timeout (20 minutes) to avoid hanging indefinitely. */ sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT); @@ -298,9 +292,10 @@ public class Dispatcher { InputStream unbufIn = sock.getInputStream(); ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(), block.getBlock()); - Token accessToken = keyManager.getAccessToken(eb); + final KeyManager km = nnc.getKeyManager(); + Token accessToken = km.getAccessToken(eb); IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, - unbufIn, keyManager, accessToken, target.getDatanode()); + unbufIn, km, accessToken, target.getDatanodeInfo()); unbufOut = saslStreams.out; unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, @@ -314,21 +309,19 @@ public class Dispatcher { LOG.info("Successfully moved " + this); } catch (IOException e) { LOG.warn("Failed to move " + this + ": " + e.getMessage()); - /* - * proxy or target may have an issue, insert a small delay before using - * these nodes further. This avoids a potential storm of - * "threads quota exceeded" Warnings when the balancer gets out of sync - * with work going on in datanode. - */ + // Proxy or target may have some issues, delay before using these nodes + // further in order to avoid a potential storm of "threads quota + // exceeded" warnings when the dispatcher gets out of sync with work + // going on in datanodes. proxySource.activateDelay(DELAY_AFTER_ERROR); - target.getBalancerDatanode().activateDelay(DELAY_AFTER_ERROR); + target.getDDatanode().activateDelay(DELAY_AFTER_ERROR); } finally { IOUtils.closeStream(out); IOUtils.closeStream(in); IOUtils.closeSocket(sock); proxySource.removePendingBlock(this); - target.getBalancerDatanode().removePendingBlock(this); + target.getDDatanode().removePendingBlock(this); synchronized (this) { reset(); @@ -342,8 +335,8 @@ public class Dispatcher { /** Send a block replace request to the output stream */ private void sendRequest(DataOutputStream out, ExtendedBlock eb, Token accessToken) throws IOException { - new Sender(out).replaceBlock(eb, target.storageType, accessToken, source - .getDatanode().getDatanodeUuid(), proxySource.datanode); + new Sender(out).replaceBlock(eb, target.storageType, accessToken, + source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode); } /** Receive a block copy response from the input stream */ @@ -368,8 +361,7 @@ public class Dispatcher { } /** A class for keeping track of block locations in the dispatcher. */ - private static class DBlock extends - MovedBlocks.Locations { + private static class DBlock extends MovedBlocks.Locations { DBlock(Block block) { super(block); } @@ -377,10 +369,10 @@ public class Dispatcher { /** The class represents a desired move. */ static class Task { - private final BalancerDatanode.StorageGroup target; + private final StorageGroup target; private long size; // bytes scheduled to move - Task(BalancerDatanode.StorageGroup target, long size) { + Task(StorageGroup target, long size) { this.target = target; this.size = size; } @@ -391,28 +383,25 @@ public class Dispatcher { } /** A class that keeps track of a datanode. */ - static class BalancerDatanode { + static class DDatanode { /** A group of storages in a datanode with the same storage type. */ class StorageGroup { final StorageType storageType; - final double utilization; final long maxSize2Move; private long scheduledSize = 0L; - private StorageGroup(StorageType storageType, double utilization, - long maxSize2Move) { + private StorageGroup(StorageType storageType, long maxSize2Move) { this.storageType = storageType; - this.utilization = utilization; this.maxSize2Move = maxSize2Move; } - BalancerDatanode getBalancerDatanode() { - return BalancerDatanode.this; + private DDatanode getDDatanode() { + return DDatanode.this; } - DatanodeInfo getDatanode() { - return BalancerDatanode.this.datanode; + DatanodeInfo getDatanodeInfo() { + return DDatanode.this.datanode; } /** Decide if still need to move more bytes */ @@ -447,7 +436,7 @@ public class Dispatcher { @Override public String toString() { - return "" + utilization; + return getDisplayName(); } } @@ -461,10 +450,10 @@ public class Dispatcher { @Override public String toString() { - return getClass().getSimpleName() + ":" + datanode + ":" + storageMap; + return getClass().getSimpleName() + ":" + datanode + ":" + storageMap.values(); } - private BalancerDatanode(DatanodeStorageReport r, int maxConcurrentMoves) { + private DDatanode(DatanodeStorageReport r, int maxConcurrentMoves) { this.datanode = r.getDatanodeInfo(); this.maxConcurrentMoves = maxConcurrentMoves; this.pendings = new ArrayList(maxConcurrentMoves); @@ -475,18 +464,14 @@ public class Dispatcher { Preconditions.checkState(existing == null); } - StorageGroup addStorageGroup(StorageType storageType, double utilization, - long maxSize2Move) { - final StorageGroup g = new StorageGroup(storageType, utilization, - maxSize2Move); + StorageGroup addStorageGroup(StorageType storageType, long maxSize2Move) { + final StorageGroup g = new StorageGroup(storageType, maxSize2Move); put(storageType, g); return g; } - Source addSource(StorageType storageType, double utilization, - long maxSize2Move, Dispatcher balancer) { - final Source s = balancer.new Source(storageType, utilization, - maxSize2Move, this); + Source addSource(StorageType storageType, long maxSize2Move, Dispatcher d) { + final Source s = d.new Source(storageType, maxSize2Move, this); put(storageType, s); return s; } @@ -528,7 +513,7 @@ public class Dispatcher { } /** A node that can be the sources of a block move */ - class Source extends BalancerDatanode.StorageGroup { + class Source extends DDatanode.StorageGroup { private final List tasks = new ArrayList(2); private long blocksToReceive = 0L; @@ -539,9 +524,8 @@ public class Dispatcher { */ private final List srcBlocks = new ArrayList(); - private Source(StorageType storageType, double utilization, - long maxSize2Move, BalancerDatanode dn) { - dn.super(storageType, utilization, maxSize2Move); + private Source(StorageType storageType, long maxSize2Move, DDatanode dn) { + dn.super(storageType, maxSize2Move); } /** Add a task */ @@ -565,7 +549,7 @@ public class Dispatcher { */ private long getBlockList() throws IOException { final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive); - final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanode(), size); + final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size); long bytesReceived = 0; for (BlockWithLocations blk : newBlocks.getBlocks()) { @@ -579,7 +563,7 @@ public class Dispatcher { final String[] datanodeUuids = blk.getDatanodeUuids(); final StorageType[] storageTypes = blk.getStorageTypes(); for (int i = 0; i < datanodeUuids.length; i++) { - final BalancerDatanode.StorageGroup g = storageGroupMap.get( + final StorageGroup g = storageGroupMap.get( datanodeUuids[i], storageTypes[i]); if (g != null) { // not unknown block.addLocation(g); @@ -617,7 +601,7 @@ public class Dispatcher { private PendingMove chooseNextMove() { for (Iterator i = tasks.iterator(); i.hasNext();) { final Task task = i.next(); - final BalancerDatanode target = task.target.getBalancerDatanode(); + final DDatanode target = task.target.getDDatanode(); PendingMove pendingBlock = new PendingMove(); if (target.addPendingBlock(pendingBlock)) { // target is not busy, so do a tentative block allocation @@ -670,7 +654,7 @@ public class Dispatcher { final long startTime = Time.monotonicNow(); this.blocksToReceive = 2 * getScheduledSize(); boolean isTimeUp = false; - int noPendingBlockIteration = 0; + int noPendingMoveIteration = 0; while (!isTimeUp && getScheduledSize() > 0 && (!srcBlocks.isEmpty() || blocksToReceive > 0)) { final PendingMove p = chooseNextMove(); @@ -699,11 +683,11 @@ public class Dispatcher { return; } } else { - // source node cannot find a pendingBlockToMove, iteration +1 - noPendingBlockIteration++; + // source node cannot find a pending block to move, iteration +1 + noPendingMoveIteration++; // in case no blocks can be moved for source node's task, // jump out of while-loop after 5 iterations. - if (noPendingBlockIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) { + if (noPendingMoveIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) { resetScheduledSize(); } } @@ -726,29 +710,19 @@ public class Dispatcher { } } - Dispatcher(NameNodeConnector theblockpool, Set includedNodes, - Set excludedNodes, Configuration conf) { - this.nnc = theblockpool; - this.keyManager = nnc.getKeyManager(); + public Dispatcher(NameNodeConnector nnc, Set includedNodes, + Set excludedNodes, long movedWinWidth, int moverThreads, + int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) { + this.nnc = nnc; this.excludedNodes = excludedNodes; this.includedNodes = includedNodes; - - final long movedWinWidth = conf.getLong( - DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, - DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT); - movedBlocks = new MovedBlocks(movedWinWidth); + this.movedBlocks = new MovedBlocks(movedWinWidth); this.cluster = NetworkTopology.getInstance(conf); - this.moveExecutor = Executors.newFixedThreadPool(conf.getInt( - DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY, - DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT)); - this.dispatchExecutor = Executors.newFixedThreadPool(conf.getInt( - DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, - DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT)); - this.maxConcurrentMovesPerNode = conf.getInt( - DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, - DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); + this.moveExecutor = Executors.newFixedThreadPool(moverThreads); + this.dispatchExecutor = Executors.newFixedThreadPool(dispatcherThreads); + this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode; final boolean fallbackToSimpleAuthAllowed = conf.getBoolean( CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, @@ -784,7 +758,7 @@ public class Dispatcher { return b; } - void add(Source source, BalancerDatanode.StorageGroup target) { + void add(Source source, StorageGroup target) { sources.add(source); targets.add(target); } @@ -826,8 +800,8 @@ public class Dispatcher { return trimmed; } - public BalancerDatanode newDatanode(DatanodeStorageReport r) { - return new BalancerDatanode(r, maxConcurrentMovesPerNode); + public DDatanode newDatanode(DatanodeStorageReport r) { + return new DDatanode(r, maxConcurrentMovesPerNode); } public boolean dispatchAndCheckContinue() throws InterruptedException { @@ -884,8 +858,8 @@ public class Dispatcher { private void waitForMoveCompletion() { for(;;) { boolean empty = true; - for (BalancerDatanode.StorageGroup t : targets) { - if (!t.getBalancerDatanode().isPendingQEmpty()) { + for (StorageGroup t : targets) { + if (!t.getDDatanode().isPendingQEmpty()) { empty = false; break; } @@ -907,8 +881,8 @@ public class Dispatcher { * 2. the block does not have a replica on the target; * 3. doing the move does not reduce the number of racks that the block has */ - private boolean isGoodBlockCandidate(Source source, - BalancerDatanode.StorageGroup target, DBlock block) { + private boolean isGoodBlockCandidate(Source source, StorageGroup target, + DBlock block) { if (source.storageType != target.storageType) { return false; } @@ -933,17 +907,17 @@ public class Dispatcher { * Determine whether moving the given block replica from source to target * would reduce the number of racks of the block replicas. */ - private boolean reduceNumOfRacks(Source source, - BalancerDatanode.StorageGroup target, DBlock block) { - final DatanodeInfo sourceDn = source.getDatanode(); - if (cluster.isOnSameRack(sourceDn, target.getDatanode())) { + private boolean reduceNumOfRacks(Source source, StorageGroup target, + DBlock block) { + final DatanodeInfo sourceDn = source.getDatanodeInfo(); + if (cluster.isOnSameRack(sourceDn, target.getDatanodeInfo())) { // source and target are on the same rack return false; } boolean notOnSameRack = true; synchronized (block) { - for (BalancerDatanode.StorageGroup loc : block.getLocations()) { - if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) { + for (StorageGroup loc : block.getLocations()) { + if (cluster.isOnSameRack(loc.getDatanodeInfo(), target.getDatanodeInfo())) { notOnSameRack = false; break; } @@ -953,8 +927,8 @@ public class Dispatcher { // target is not on the same rack as any replica return false; } - for (BalancerDatanode.StorageGroup g : block.getLocations()) { - if (g != source && cluster.isOnSameRack(g.getDatanode(), sourceDn)) { + for (StorageGroup g : block.getLocations()) { + if (g != source && cluster.isOnSameRack(g.getDatanodeInfo(), sourceDn)) { // source is on the same rack of another replica return false; } @@ -971,10 +945,10 @@ public class Dispatcher { * group with target */ private boolean isOnSameNodeGroupWithReplicas( - BalancerDatanode.StorageGroup target, DBlock block, Source source) { - final DatanodeInfo targetDn = target.getDatanode(); - for (BalancerDatanode.StorageGroup g : block.getLocations()) { - if (g != source && cluster.isOnSameNodeGroup(g.getDatanode(), targetDn)) { + StorageGroup target, DBlock block, Source source) { + final DatanodeInfo targetDn = target.getDatanodeInfo(); + for (StorageGroup g : block.getLocations()) { + if (g != source && cluster.isOnSameNodeGroup(g.getDatanodeInfo(), targetDn)) { return true; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java new file mode 100644 index 00000000000..e36258ffca7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.balancer; + +/** + * Exit status - The values associated with each exit status is directly mapped + * to the process's exit code in command line. + */ +public enum ExitStatus { + SUCCESS(0), + IN_PROGRESS(1), + ALREADY_RUNNING(-1), + NO_MOVE_BLOCK(-2), + NO_MOVE_PROGRESS(-3), + IO_EXCEPTION(-4), + ILLEGAL_ARGUMENTS(-5), + INTERRUPTED(-6); + + private final int code; + + private ExitStatus(int code) { + this.code = code; + } + + /** @return the command line exit code. */ + public int getExitCode() { + return code; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 2e59daf8d08..d509668bdc3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -570,10 +570,10 @@ public class TestBalancer { final int r = Balancer.run(namenodes, p, conf); if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) { - assertEquals(Balancer.ReturnStatus.NO_MOVE_PROGRESS.code, r); + assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); return; } else { - assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); + assertEquals(ExitStatus.SUCCESS.getExitCode(), r); } waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); LOG.info("Rebalancing with default ctor."); @@ -717,7 +717,7 @@ public class TestBalancer { Balancer.Parameters.DEFAULT.threshold, datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded); final int r = Balancer.run(namenodes, p, conf); - assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); + assertEquals(ExitStatus.SUCCESS.getExitCode(), r); } finally { cluster.shutdown(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java index c543f73695b..d9d70d1a3ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java @@ -98,7 +98,7 @@ public class TestBalancerWithHANameNodes { assertEquals(1, namenodes.size()); assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster))); final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); - assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); + assertEquals(ExitStatus.SUCCESS.getExitCode(), r); TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, Balancer.Parameters.DEFAULT); } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java index 9f8bb32a1b0..a16a9791009 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java @@ -160,7 +160,7 @@ public class TestBalancerWithMultipleNameNodes { // start rebalancing final Collection namenodes = DFSUtil.getNsServiceRpcUris(s.conf); final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, s.conf); - Assert.assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); + Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), r); LOG.info("BALANCER 2"); wait(s.clients, totalUsed, totalCapacity); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java index ef21a7586d9..9961a2e2704 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java @@ -176,7 +176,7 @@ public class TestBalancerWithNodeGroup { // start rebalancing Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); - assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); + assertEquals(ExitStatus.SUCCESS.getExitCode(), r); waitForHeartBeat(totalUsedSpace, totalCapacity); LOG.info("Rebalancing with default factor."); @@ -190,8 +190,8 @@ public class TestBalancerWithNodeGroup { // start rebalancing Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); - Assert.assertTrue(r == Balancer.ReturnStatus.SUCCESS.code || - (r == Balancer.ReturnStatus.NO_MOVE_PROGRESS.code)); + Assert.assertTrue(r == ExitStatus.SUCCESS.getExitCode() || + (r == ExitStatus.NO_MOVE_PROGRESS.getExitCode())); waitForHeartBeat(totalUsedSpace, totalCapacity); LOG.info("Rebalancing with default factor."); }