diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 0565b170f00..b812c21ee0d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -443,6 +443,8 @@ Release 2.8.0 - UNRELEASED HDFS-7649. Multihoming docs should emphasize using hostnames in configurations. (Brahma Reddy Battula via Arpit Agarwal) + HDFS-8824. Do not use small blocks for balancing the cluster. (szetszwo) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index f44f0d718db..eedebad917b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -349,6 +349,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_BALANCER_DISPATCHERTHREADS_DEFAULT = 200; public static final String DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY = "dfs.balancer.max-size-to-move"; public static final long DFS_BALANCER_MAX_SIZE_TO_MOVE_DEFAULT = 10L*1024*1024*1024; + public static final String DFS_BALANCER_GETBLOCKS_SIZE_KEY = "dfs.balancer.getBlocks.size"; + public static final long DFS_BALANCER_GETBLOCKS_SIZE_DEFAULT = 2L*1024*1024*1024; // 2GB + public static final String DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY = "dfs.balancer.getBlocks.min-block-size"; + public static final long DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT = 10L*1024*1024; // 10MB public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index e96664f44bf..4f3f18e28be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -252,10 +252,17 @@ public class Balancer { DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); + final long getBlocksSize = getLong(conf, + DFSConfigKeys.DFS_BALANCER_GETBLOCKS_SIZE_KEY, + DFSConfigKeys.DFS_BALANCER_GETBLOCKS_SIZE_DEFAULT); + final long getBlocksMinBlockSize = getLong(conf, + DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, + DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT); + this.nnc = theblockpool; this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded, p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads, - maxConcurrentMovesPerNode, conf); + maxConcurrentMovesPerNode, getBlocksSize, getBlocksMinBlockSize, conf); this.threshold = p.threshold; this.policy = p.policy; this.runDuringUpgrade = p.runDuringUpgrade; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index a4adf7facb2..b06219c9a01 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -84,9 +84,6 @@ import com.google.common.base.Preconditions; public class Dispatcher { static final Log LOG = LogFactory.getLog(Dispatcher.class); - private static final long GB = 1L << 30; // 1GB - private static final long MAX_BLOCKS_SIZE_TO_FETCH = 2 * GB; - private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5; /** * the period of time to delay the usage of a DataNode after hitting @@ -121,6 +118,9 @@ public class Dispatcher { /** The maximum number of concurrent blocks moves at a datanode */ private final int maxConcurrentMovesPerNode; + private final long getBlocksSize; + private final long getBlocksMinBlockSize; + private final int ioFileBufferSize; static class Allocator { @@ -652,8 +652,9 @@ public class Dispatcher { * @return the total size of the received blocks in the number of bytes. */ private long getBlockList() throws IOException { - final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive); + final long size = Math.min(getBlocksSize, blocksToReceive); final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size); + if (LOG.isTraceEnabled()) { LOG.trace("getBlocks(" + getDatanodeInfo() + ", " + StringUtils.TraditionalBinaryPrefix.long2String(size, "B", 2) @@ -662,6 +663,11 @@ public class Dispatcher { long bytesReceived = 0; for (BlockWithLocations blk : newBlocks.getBlocks()) { + // Skip small blocks. + if (blk.getBlock().getNumBytes() < getBlocksMinBlockSize) { + continue; + } + bytesReceived += blk.getBlock().getNumBytes(); synchronized (globalBlocks) { final DBlock block = globalBlocks.get(blk.getBlock()); @@ -840,9 +846,19 @@ public class Dispatcher { } } + /** Constructor called by Mover. */ public Dispatcher(NameNodeConnector nnc, Set includedNodes, Set excludedNodes, long movedWinWidth, int moverThreads, int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) { + this(nnc, includedNodes, excludedNodes, movedWinWidth, + moverThreads, dispatcherThreads, maxConcurrentMovesPerNode, + 0L, 0L, conf); + } + + Dispatcher(NameNodeConnector nnc, Set includedNodes, + Set excludedNodes, long movedWinWidth, int moverThreads, + int dispatcherThreads, int maxConcurrentMovesPerNode, + long getBlocksSize, long getBlocksMinBlockSize, Configuration conf) { this.nnc = nnc; this.excludedNodes = excludedNodes; this.includedNodes = includedNodes; @@ -855,6 +871,9 @@ public class Dispatcher { this.moverThreadAllocator = new Allocator(moverThreads); this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode; + this.getBlocksSize = getBlocksSize; + this.getBlocksMinBlockSize = getBlocksMinBlockSize; + this.saslClient = new SaslDataTransferClient(conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth); @@ -1003,9 +1022,6 @@ public class Dispatcher { return getBytesMoved() - bytesLastMoved; } - /** The sleeping period before checking if block move is completed again */ - static private long blockMoveWaitTime = 30000L; - /** * Wait for all block move confirmations. * @return true if there is failed move execution @@ -1027,7 +1043,7 @@ public class Dispatcher { return hasFailure; // all pending queues are empty } try { - Thread.sleep(blockMoveWaitTime); + Thread.sleep(1000); } catch (InterruptedException ignored) { } } @@ -1154,12 +1170,6 @@ public class Dispatcher { movedBlocks.cleanup(); } - /** set the sleeping period for block move completion check */ - @VisibleForTesting - public static void setBlockMoveWaitTime(long time) { - blockMoveWaitTime = time; - } - @VisibleForTesting public static void setDelayAfterErrors(long time) { delayAfterErrors = time; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index bb8a45b1907..5e1b45b337d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -118,8 +118,6 @@ public class TestBalancer { } public static void initTestSetup() { - Dispatcher.setBlockMoveWaitTime(1000L) ; - // do not create id file since it occupies the disk space NameNodeConnector.setWrite2IdFile(false); } @@ -128,9 +126,12 @@ public class TestBalancer { conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE); conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L); SimulatedFSDataset.setFactory(conf); + conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L); + conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L); } static void initConfWithRamDisk(Configuration conf, @@ -142,6 +143,8 @@ public class TestBalancer { conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1); LazyPersistTestCase.initCacheManipulator(); + + conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L); } /* create a file with a length of fileLen */ @@ -1336,6 +1339,8 @@ public class TestBalancer { conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); + conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L); + final int BLOCK_SIZE = 1024*1024; cluster = new MiniDFSCluster .Builder(conf) @@ -1410,6 +1415,8 @@ public class TestBalancer { conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L); + conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L); + int numOfDatanodes =2; final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(2)