HDFS-8824. Do not use small blocks for balancing the cluster.

This commit is contained in:
Tsz-Wo Nicholas Sze 2015-08-14 13:03:19 -07:00
parent b094f8b589
commit 61b9e5f7ff
5 changed files with 47 additions and 17 deletions

View File

@ -443,6 +443,8 @@ Release 2.8.0 - UNRELEASED
HDFS-7649. Multihoming docs should emphasize using hostnames in HDFS-7649. Multihoming docs should emphasize using hostnames in
configurations. (Brahma Reddy Battula via Arpit Agarwal) configurations. (Brahma Reddy Battula via Arpit Agarwal)
HDFS-8824. Do not use small blocks for balancing the cluster. (szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -349,6 +349,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_BALANCER_DISPATCHERTHREADS_DEFAULT = 200; public static final int DFS_BALANCER_DISPATCHERTHREADS_DEFAULT = 200;
public static final String DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY = "dfs.balancer.max-size-to-move"; public static final String DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY = "dfs.balancer.max-size-to-move";
public static final long DFS_BALANCER_MAX_SIZE_TO_MOVE_DEFAULT = 10L*1024*1024*1024; public static final long DFS_BALANCER_MAX_SIZE_TO_MOVE_DEFAULT = 10L*1024*1024*1024;
public static final String DFS_BALANCER_GETBLOCKS_SIZE_KEY = "dfs.balancer.getBlocks.size";
public static final long DFS_BALANCER_GETBLOCKS_SIZE_DEFAULT = 2L*1024*1024*1024; // 2GB
public static final String DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY = "dfs.balancer.getBlocks.min-block-size";
public static final long DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT = 10L*1024*1024; // 10MB
public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth"; public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth";

View File

@ -252,10 +252,17 @@ static int getInt(Configuration conf, String key, int defaultValue) {
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
final long getBlocksSize = getLong(conf,
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_SIZE_KEY,
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_SIZE_DEFAULT);
final long getBlocksMinBlockSize = getLong(conf,
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT);
this.nnc = theblockpool; this.nnc = theblockpool;
this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded, this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded,
p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads, p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads,
maxConcurrentMovesPerNode, conf); maxConcurrentMovesPerNode, getBlocksSize, getBlocksMinBlockSize, conf);
this.threshold = p.threshold; this.threshold = p.threshold;
this.policy = p.policy; this.policy = p.policy;
this.runDuringUpgrade = p.runDuringUpgrade; this.runDuringUpgrade = p.runDuringUpgrade;

View File

@ -84,9 +84,6 @@
public class Dispatcher { public class Dispatcher {
static final Log LOG = LogFactory.getLog(Dispatcher.class); static final Log LOG = LogFactory.getLog(Dispatcher.class);
private static final long GB = 1L << 30; // 1GB
private static final long MAX_BLOCKS_SIZE_TO_FETCH = 2 * GB;
private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5; private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5;
/** /**
* the period of time to delay the usage of a DataNode after hitting * the period of time to delay the usage of a DataNode after hitting
@ -121,6 +118,9 @@ public class Dispatcher {
/** The maximum number of concurrent blocks moves at a datanode */ /** The maximum number of concurrent blocks moves at a datanode */
private final int maxConcurrentMovesPerNode; private final int maxConcurrentMovesPerNode;
private final long getBlocksSize;
private final long getBlocksMinBlockSize;
private final int ioFileBufferSize; private final int ioFileBufferSize;
static class Allocator { static class Allocator {
@ -652,8 +652,9 @@ Iterator<DBlock> getBlockIterator() {
* @return the total size of the received blocks in the number of bytes. * @return the total size of the received blocks in the number of bytes.
*/ */
private long getBlockList() throws IOException { private long getBlockList() throws IOException {
final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive); final long size = Math.min(getBlocksSize, blocksToReceive);
final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size); final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("getBlocks(" + getDatanodeInfo() + ", " LOG.trace("getBlocks(" + getDatanodeInfo() + ", "
+ StringUtils.TraditionalBinaryPrefix.long2String(size, "B", 2) + StringUtils.TraditionalBinaryPrefix.long2String(size, "B", 2)
@ -662,6 +663,11 @@ private long getBlockList() throws IOException {
long bytesReceived = 0; long bytesReceived = 0;
for (BlockWithLocations blk : newBlocks.getBlocks()) { for (BlockWithLocations blk : newBlocks.getBlocks()) {
// Skip small blocks.
if (blk.getBlock().getNumBytes() < getBlocksMinBlockSize) {
continue;
}
bytesReceived += blk.getBlock().getNumBytes(); bytesReceived += blk.getBlock().getNumBytes();
synchronized (globalBlocks) { synchronized (globalBlocks) {
final DBlock block = globalBlocks.get(blk.getBlock()); final DBlock block = globalBlocks.get(blk.getBlock());
@ -840,9 +846,19 @@ public boolean equals(Object obj) {
} }
} }
/** Constructor called by Mover. */
public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes, public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
Set<String> excludedNodes, long movedWinWidth, int moverThreads, Set<String> excludedNodes, long movedWinWidth, int moverThreads,
int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) { int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) {
this(nnc, includedNodes, excludedNodes, movedWinWidth,
moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
0L, 0L, conf);
}
Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
Set<String> excludedNodes, long movedWinWidth, int moverThreads,
int dispatcherThreads, int maxConcurrentMovesPerNode,
long getBlocksSize, long getBlocksMinBlockSize, Configuration conf) {
this.nnc = nnc; this.nnc = nnc;
this.excludedNodes = excludedNodes; this.excludedNodes = excludedNodes;
this.includedNodes = includedNodes; this.includedNodes = includedNodes;
@ -855,6 +871,9 @@ public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
this.moverThreadAllocator = new Allocator(moverThreads); this.moverThreadAllocator = new Allocator(moverThreads);
this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode; this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
this.getBlocksSize = getBlocksSize;
this.getBlocksMinBlockSize = getBlocksMinBlockSize;
this.saslClient = new SaslDataTransferClient(conf, this.saslClient = new SaslDataTransferClient(conf,
DataTransferSaslUtil.getSaslPropertiesResolver(conf), DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth); TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth);
@ -1003,9 +1022,6 @@ public void run() {
return getBytesMoved() - bytesLastMoved; return getBytesMoved() - bytesLastMoved;
} }
/** The sleeping period before checking if block move is completed again */
static private long blockMoveWaitTime = 30000L;
/** /**
* Wait for all block move confirmations. * Wait for all block move confirmations.
* @return true if there is failed move execution * @return true if there is failed move execution
@ -1027,7 +1043,7 @@ public static boolean waitForMoveCompletion(
return hasFailure; // all pending queues are empty return hasFailure; // all pending queues are empty
} }
try { try {
Thread.sleep(blockMoveWaitTime); Thread.sleep(1000);
} catch (InterruptedException ignored) { } catch (InterruptedException ignored) {
} }
} }
@ -1154,12 +1170,6 @@ void reset(Configuration conf) {
movedBlocks.cleanup(); movedBlocks.cleanup();
} }
/** set the sleeping period for block move completion check */
@VisibleForTesting
public static void setBlockMoveWaitTime(long time) {
blockMoveWaitTime = time;
}
@VisibleForTesting @VisibleForTesting
public static void setDelayAfterErrors(long time) { public static void setDelayAfterErrors(long time) {
delayAfterErrors = time; delayAfterErrors = time;

View File

@ -118,8 +118,6 @@ public class TestBalancer {
} }
public static void initTestSetup() { public static void initTestSetup() {
Dispatcher.setBlockMoveWaitTime(1000L) ;
// do not create id file since it occupies the disk space // do not create id file since it occupies the disk space
NameNodeConnector.setWrite2IdFile(false); NameNodeConnector.setWrite2IdFile(false);
} }
@ -128,9 +126,12 @@ static void initConf(Configuration conf) {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE); conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L); conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L);
SimulatedFSDataset.setFactory(conf); SimulatedFSDataset.setFactory(conf);
conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L); conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
} }
static void initConfWithRamDisk(Configuration conf, static void initConfWithRamDisk(Configuration conf,
@ -142,6 +143,8 @@ static void initConfWithRamDisk(Configuration conf,
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1); conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
LazyPersistTestCase.initCacheManipulator(); LazyPersistTestCase.initCacheManipulator();
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
} }
/* create a file with a length of <code>fileLen</code> */ /* create a file with a length of <code>fileLen</code> */
@ -1336,6 +1339,8 @@ public void testBalancerDuringUpgrade() throws Exception {
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
final int BLOCK_SIZE = 1024*1024; final int BLOCK_SIZE = 1024*1024;
cluster = new MiniDFSCluster cluster = new MiniDFSCluster
.Builder(conf) .Builder(conf)
@ -1410,6 +1415,8 @@ public void testTwoReplicaShouldNotInSameDN() throws Exception {
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L); conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L);
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
int numOfDatanodes =2; int numOfDatanodes =2;
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2) .numDataNodes(2)