diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 4e0edb88e0d..dbba75da469 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -429,6 +429,9 @@ Release 2.8.0 - UNRELEASED HDFS-8772. Fix TestStandbyIsHot#testDatanodeRestarts which occasionally fails. (Walter Su via wang). + HDFS-8818. Changes the global moveExecutor to per datanode executors and + changes MAX_SIZE_TO_MOVE to be configurable. (szetszwo) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 6f9ee9e93e4..f44f0d718db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -347,6 +347,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_BALANCER_MOVERTHREADS_DEFAULT = 1000; public static final String DFS_BALANCER_DISPATCHERTHREADS_KEY = "dfs.balancer.dispatcherThreads"; public static final int DFS_BALANCER_DISPATCHERTHREADS_DEFAULT = 200; + public static final String DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY = "dfs.balancer.max-size-to-move"; + public static final long DFS_BALANCER_MAX_SIZE_TO_MOVE_DEFAULT = 10L*1024*1024*1024; + public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth"; public static final long DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index a1425e6bfdf..e96664f44bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -34,6 +34,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; @@ -168,9 +169,6 @@ public class Balancer { static final Path BALANCER_ID_PATH = new Path("/system/balancer.id"); - private static final long GB = 1L << 30; //1GB - private static final long MAX_SIZE_TO_MOVE = 10*GB; - private static final String USAGE = "Usage: hdfs balancer" + "\n\t[-policy ]\tthe balancing policy: " + BalancingPolicy.Node.INSTANCE.getName() + " or " @@ -193,6 +191,7 @@ public class Balancer { private final BalancingPolicy policy; private final boolean runDuringUpgrade; private final double threshold; + private final long maxSizeToMove; // all data node lists private final Collection overUtilized = new LinkedList(); @@ -214,6 +213,24 @@ private static void checkReplicationPolicyCompatibility(Configuration conf } } + static long getLong(Configuration conf, String key, long defaultValue) { + final long v = conf.getLong(key, defaultValue); + LOG.info(key + " = " + v + " (default=" + defaultValue + ")"); + if (v <= 0) { + throw new HadoopIllegalArgumentException(key + " = " + v + " <= " + 0); + } + return v; + } + + static int getInt(Configuration conf, String key, int defaultValue) { + final int v = conf.getInt(key, defaultValue); + LOG.info(key + " = " + v + " (default=" + defaultValue + ")"); + if (v <= 0) { + throw new HadoopIllegalArgumentException(key + " = " + v + " <= " + 0); + } + return v; + } + /** * Construct a balancer. * Initialize balancer. It sets the value of the threshold, and @@ -222,16 +239,16 @@ private static void checkReplicationPolicyCompatibility(Configuration conf * when connection fails. */ Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) { - final long movedWinWidth = conf.getLong( + final long movedWinWidth = getLong(conf, DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT); - final int moverThreads = conf.getInt( + final int moverThreads = getInt(conf, DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY, DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT); - final int dispatcherThreads = conf.getInt( + final int dispatcherThreads = getInt(conf, DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT); - final int maxConcurrentMovesPerNode = conf.getInt( + final int maxConcurrentMovesPerNode = getInt(conf, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); @@ -242,6 +259,10 @@ private static void checkReplicationPolicyCompatibility(Configuration conf this.threshold = p.threshold; this.policy = p.policy; this.runDuringUpgrade = p.runDuringUpgrade; + + this.maxSizeToMove = getLong(conf, + DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY, + DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_DEFAULT); } private static long getCapacity(DatanodeStorageReport report, StorageType t) { @@ -295,7 +316,7 @@ private long init(List reports) { final double utilizationDiff = utilization - policy.getAvgUtilization(t); final double thresholdDiff = Math.abs(utilizationDiff) - threshold; final long maxSize2Move = computeMaxSize2Move(capacity, - getRemaining(r, t), utilizationDiff, threshold); + getRemaining(r, t), utilizationDiff, threshold, maxSizeToMove); final StorageGroup g; if (utilizationDiff > 0) { @@ -332,13 +353,13 @@ private long init(List reports) { } private static long computeMaxSize2Move(final long capacity, final long remaining, - final double utilizationDiff, final double threshold) { + final double utilizationDiff, final double threshold, final long max) { final double diff = Math.min(threshold, Math.abs(utilizationDiff)); long maxSizeToMove = percentage2bytes(diff, capacity); if (utilizationDiff < 0) { maxSizeToMove = Math.min(remaining, maxSizeToMove); } - return Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove); + return Math.min(max, maxSizeToMove); } private static long percentage2bytes(double percentage, long capacity) { @@ -388,6 +409,7 @@ private void chooseStorageGroups(final Matcher matcher) { /* first step: match each overUtilized datanode (source) to * one or more underUtilized datanodes (targets). */ + LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => underUtilized"); chooseStorageGroups(overUtilized, underUtilized, matcher); /* match each remaining overutilized datanode (source) to @@ -395,6 +417,7 @@ private void chooseStorageGroups(final Matcher matcher) { * Note only overutilized datanodes that haven't had that max bytes to move * satisfied in step 1 are selected */ + LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => belowAvgUtilized"); chooseStorageGroups(overUtilized, belowAvgUtilized, matcher); /* match each remaining underutilized datanode (target) to @@ -402,6 +425,7 @@ private void chooseStorageGroups(final Matcher matcher) { * Note only underutilized datanodes that have not had that max bytes to * move satisfied in step 1 are selected. */ + LOG.info("chooseStorageGroups for " + matcher + ": underUtilized => aboveAvgUtilized"); chooseStorageGroups(underUtilized, aboveAvgUtilized, matcher); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index b0fac35eaa8..a4adf7facb2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -114,14 +114,39 @@ public class Dispatcher { private NetworkTopology cluster; - private final ExecutorService moveExecutor; private final ExecutorService dispatchExecutor; + private final Allocator moverThreadAllocator; + /** The maximum number of concurrent blocks moves at a datanode */ private final int maxConcurrentMovesPerNode; private final int ioFileBufferSize; + static class Allocator { + private final int max; + private int count = 0; + + Allocator(int max) { + this.max = max; + } + + synchronized int allocate(int n) { + final int remaining = max - count; + if (remaining <= 0) { + return 0; + } else { + final int allocated = remaining < n? remaining: n; + count += allocated; + return allocated; + } + } + + synchronized void reset() { + count = 0; + } + } + private static class GlobalBlockMap { private final Map map = new HashMap(); @@ -287,9 +312,7 @@ private boolean addTo(StorageGroup g) { /** Dispatch the move to the proxy source & wait for the response. */ private void dispatch() { - if (LOG.isDebugEnabled()) { - LOG.debug("Start moving " + this); - } + LOG.info("Start moving " + this); Socket sock = new Socket(); DataOutputStream out = null; @@ -504,7 +527,7 @@ public boolean equals(Object obj) { private final List pendings; private volatile boolean hasFailure = false; private volatile boolean hasSuccess = false; - private final int maxConcurrentMoves; + private ExecutorService moveExecutor; @Override public String toString() { @@ -513,7 +536,6 @@ public String toString() { private DDatanode(DatanodeInfo datanode, int maxConcurrentMoves) { this.datanode = datanode; - this.maxConcurrentMoves = maxConcurrentMoves; this.pendings = new ArrayList(maxConcurrentMoves); } @@ -521,6 +543,21 @@ public DatanodeInfo getDatanodeInfo() { return datanode; } + synchronized ExecutorService initMoveExecutor(int poolSize) { + return moveExecutor = Executors.newFixedThreadPool(poolSize); + } + + synchronized ExecutorService getMoveExecutor() { + return moveExecutor; + } + + synchronized void shutdownMoveExecutor() { + if (moveExecutor != null) { + moveExecutor.shutdown(); + moveExecutor = null; + } + } + private static void put(StorageType storageType, G g, EnumMap map) { final StorageGroup existing = map.put(storageType, g); @@ -541,6 +578,7 @@ public Source addSource(StorageType storageType, long maxSize2Move, Dispatcher d synchronized private void activateDelay(long delta) { delayUntil = Time.monotonicNow() + delta; + LOG.info(this + " activateDelay " + delta/1000.0 + " seconds"); } synchronized private boolean isDelayActive() { @@ -551,11 +589,6 @@ synchronized private boolean isDelayActive() { return true; } - /** Check if the node can schedule more blocks to move */ - synchronized boolean isPendingQNotFull() { - return pendings.size() < maxConcurrentMoves; - } - /** Check if all the dispatched moves are done */ synchronized boolean isPendingQEmpty() { return pendings.isEmpty(); @@ -563,7 +596,7 @@ synchronized boolean isPendingQEmpty() { /** Add a scheduled block move to the node */ synchronized boolean addPendingBlock(PendingMove pendingBlock) { - if (!isDelayActive() && isPendingQNotFull()) { + if (!isDelayActive()) { return pendings.add(pendingBlock); } return false; @@ -621,6 +654,11 @@ Iterator getBlockIterator() { private long getBlockList() throws IOException { final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive); final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size); + if (LOG.isTraceEnabled()) { + LOG.trace("getBlocks(" + getDatanodeInfo() + ", " + + StringUtils.TraditionalBinaryPrefix.long2String(size, "B", 2) + + ") returns " + newBlocks.getBlocks().length + " blocks."); + } long bytesReceived = 0; for (BlockWithLocations blk : newBlocks.getBlocks()) { @@ -642,7 +680,9 @@ private long getBlockList() throws IOException { } } if (!srcBlocks.contains(block) && isGoodBlockCandidate(block)) { - // filter bad candidates + if (LOG.isTraceEnabled()) { + LOG.trace("Add " + block + " to " + this); + } srcBlocks.add(block); } } @@ -710,11 +750,9 @@ private void removeMovedBlocks() { } } - private static final int SOURCE_BLOCKS_MIN_SIZE = 5; - /** @return if should fetch more blocks from namenode */ private boolean shouldFetchMoreBlocks() { - return srcBlocks.size() < SOURCE_BLOCKS_MIN_SIZE && blocksToReceive > 0; + return blocksToReceive > 0; } private static final long MAX_ITERATION_TIME = 20 * 60 * 1000L; // 20 mins @@ -734,6 +772,11 @@ private void dispatchBlocks() { int noPendingMoveIteration = 0; while (!isTimeUp && getScheduledSize() > 0 && (!srcBlocks.isEmpty() || blocksToReceive > 0)) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + " blocksToReceive=" + blocksToReceive + + ", scheduledSize=" + getScheduledSize() + + ", srcBlocks#=" + srcBlocks.size()); + } final PendingMove p = chooseNextMove(); if (p != null) { // Reset no pending move counter @@ -761,12 +804,16 @@ private void dispatchBlocks() { // in case no blocks can be moved for source node's task, // jump out of while-loop after 5 iterations. if (noPendingMoveIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) { + LOG.info("Failed to find a pending move " + noPendingMoveIteration + + " times. Skipping " + this); resetScheduledSize(); } } // check if time is up or not if (Time.monotonicNow() - startTime > MAX_ITERATION_TIME) { + LOG.info("Time up (max time=" + MAX_ITERATION_TIME/1000 + + " seconds). Skipping " + this); isTimeUp = true; continue; } @@ -803,9 +850,9 @@ public Dispatcher(NameNodeConnector nnc, Set includedNodes, this.cluster = NetworkTopology.getInstance(conf); - this.moveExecutor = Executors.newFixedThreadPool(moverThreads); this.dispatchExecutor = dispatcherThreads == 0? null : Executors.newFixedThreadPool(dispatcherThreads); + this.moverThreadAllocator = new Allocator(moverThreads); this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode; this.saslClient = new SaslDataTransferClient(conf, @@ -890,8 +937,22 @@ public DDatanode newDatanode(DatanodeInfo datanode) { return new DDatanode(datanode, maxConcurrentMovesPerNode); } + public void executePendingMove(final PendingMove p) { // move the block + final DDatanode targetDn = p.target.getDDatanode(); + ExecutorService moveExecutor = targetDn.getMoveExecutor(); + if (moveExecutor == null) { + final int nThreads = moverThreadAllocator.allocate(maxConcurrentMovesPerNode); + if (nThreads > 0) { + moveExecutor = targetDn.initMoveExecutor(nThreads); + } + } + if (moveExecutor == null) { + LOG.warn("No mover threads available: skip moving " + p); + return; + } + moveExecutor.execute(new Runnable() { @Override public void run() { @@ -1083,6 +1144,11 @@ void reset(Configuration conf) { cluster = NetworkTopology.getInstance(conf); storageGroupMap.clear(); sources.clear(); + + moverThreadAllocator.reset(); + for(StorageGroup t : targets) { + t.getDDatanode().shutdownMoveExecutor(); + } targets.clear(); globalBlocks.removeAllButRetain(movedBlocks); movedBlocks.cleanup(); @@ -1104,7 +1170,6 @@ public void shutdownNow() { if (dispatchExecutor != null) { dispatchExecutor.shutdownNow(); } - moveExecutor.shutdownNow(); } static class Util { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java index 18b9cd8ecf5..4906226f106 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java @@ -77,6 +77,11 @@ public Block getBlock() { public long getNumBytes() { return block.getNumBytes(); } + + @Override + public String toString() { + return block + " size=" + getNumBytes(); + } } private static final int CUR_WIN = 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 65427e9e821..bb8a45b1907 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -19,7 +19,13 @@ import static org.apache.hadoop.fs.StorageType.DEFAULT; import static org.apache.hadoop.fs.StorageType.RAM_DISK; -import static org.apache.hadoop.hdfs.DFSConfigKeys.*; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -29,8 +35,8 @@ import java.io.IOException; import java.io.PrintWriter; import java.net.InetAddress; -import java.net.URI; import java.net.InetSocketAddress; +import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -45,6 +51,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -59,12 +66,16 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.NameNodeProxies; -import org.apache.hadoop.hdfs.protocol.*; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli; import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters; import org.apache.hadoop.hdfs.server.balancer.Balancer.Result; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase; @@ -927,7 +938,7 @@ void testBalancer1Internal(Configuration conf) throws Exception { new String[] {RACK0, RACK1}); } - @Test(timeout=100000) + @Test(expected=HadoopIllegalArgumentException.class) public void testBalancerWithZeroThreadsForMove() throws Exception { Configuration conf = new HdfsConfiguration(); conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 0);