HDFS-8824. Do not use small blocks for balancing the cluster.
This commit is contained in:
parent
1569228ec9
commit
2bc0a4f299
|
@ -786,6 +786,8 @@ Release 2.8.0 - UNRELEASED
|
||||||
HDFS-7649. Multihoming docs should emphasize using hostnames in
|
HDFS-7649. Multihoming docs should emphasize using hostnames in
|
||||||
configurations. (Brahma Reddy Battula via Arpit Agarwal)
|
configurations. (Brahma Reddy Battula via Arpit Agarwal)
|
||||||
|
|
||||||
|
HDFS-8824. Do not use small blocks for balancing the cluster. (szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
|
|
@ -353,6 +353,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final int DFS_BALANCER_DISPATCHERTHREADS_DEFAULT = 200;
|
public static final int DFS_BALANCER_DISPATCHERTHREADS_DEFAULT = 200;
|
||||||
public static final String DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY = "dfs.balancer.max-size-to-move";
|
public static final String DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY = "dfs.balancer.max-size-to-move";
|
||||||
public static final long DFS_BALANCER_MAX_SIZE_TO_MOVE_DEFAULT = 10L*1024*1024*1024;
|
public static final long DFS_BALANCER_MAX_SIZE_TO_MOVE_DEFAULT = 10L*1024*1024*1024;
|
||||||
|
public static final String DFS_BALANCER_GETBLOCKS_SIZE_KEY = "dfs.balancer.getBlocks.size";
|
||||||
|
public static final long DFS_BALANCER_GETBLOCKS_SIZE_DEFAULT = 2L*1024*1024*1024; // 2GB
|
||||||
|
public static final String DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY = "dfs.balancer.getBlocks.min-block-size";
|
||||||
|
public static final long DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT = 10L*1024*1024; // 10MB
|
||||||
|
|
||||||
|
|
||||||
public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth";
|
public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth";
|
||||||
|
|
|
@ -251,10 +251,17 @@ public class Balancer {
|
||||||
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
||||||
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
|
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
|
||||||
|
|
||||||
|
final long getBlocksSize = getLong(conf,
|
||||||
|
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_SIZE_KEY,
|
||||||
|
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_SIZE_DEFAULT);
|
||||||
|
final long getBlocksMinBlockSize = getLong(conf,
|
||||||
|
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY,
|
||||||
|
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT);
|
||||||
|
|
||||||
this.nnc = theblockpool;
|
this.nnc = theblockpool;
|
||||||
this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded,
|
this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded,
|
||||||
p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads,
|
p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads,
|
||||||
maxConcurrentMovesPerNode, conf);
|
maxConcurrentMovesPerNode, getBlocksSize, getBlocksMinBlockSize, conf);
|
||||||
this.threshold = p.threshold;
|
this.threshold = p.threshold;
|
||||||
this.policy = p.policy;
|
this.policy = p.policy;
|
||||||
this.runDuringUpgrade = p.runDuringUpgrade;
|
this.runDuringUpgrade = p.runDuringUpgrade;
|
||||||
|
|
|
@ -82,9 +82,6 @@ import com.google.common.base.Preconditions;
|
||||||
public class Dispatcher {
|
public class Dispatcher {
|
||||||
static final Log LOG = LogFactory.getLog(Dispatcher.class);
|
static final Log LOG = LogFactory.getLog(Dispatcher.class);
|
||||||
|
|
||||||
private static final long GB = 1L << 30; // 1GB
|
|
||||||
private static final long MAX_BLOCKS_SIZE_TO_FETCH = 2 * GB;
|
|
||||||
|
|
||||||
private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5;
|
private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5;
|
||||||
/**
|
/**
|
||||||
* the period of time to delay the usage of a DataNode after hitting
|
* the period of time to delay the usage of a DataNode after hitting
|
||||||
|
@ -119,6 +116,9 @@ public class Dispatcher {
|
||||||
/** The maximum number of concurrent blocks moves at a datanode */
|
/** The maximum number of concurrent blocks moves at a datanode */
|
||||||
private final int maxConcurrentMovesPerNode;
|
private final int maxConcurrentMovesPerNode;
|
||||||
|
|
||||||
|
private final long getBlocksSize;
|
||||||
|
private final long getBlocksMinBlockSize;
|
||||||
|
|
||||||
private final int ioFileBufferSize;
|
private final int ioFileBufferSize;
|
||||||
|
|
||||||
static class Allocator {
|
static class Allocator {
|
||||||
|
@ -650,8 +650,9 @@ public class Dispatcher {
|
||||||
* @return the total size of the received blocks in the number of bytes.
|
* @return the total size of the received blocks in the number of bytes.
|
||||||
*/
|
*/
|
||||||
private long getBlockList() throws IOException {
|
private long getBlockList() throws IOException {
|
||||||
final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
|
final long size = Math.min(getBlocksSize, blocksToReceive);
|
||||||
final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size);
|
final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size);
|
||||||
|
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("getBlocks(" + getDatanodeInfo() + ", "
|
LOG.trace("getBlocks(" + getDatanodeInfo() + ", "
|
||||||
+ StringUtils.TraditionalBinaryPrefix.long2String(size, "B", 2)
|
+ StringUtils.TraditionalBinaryPrefix.long2String(size, "B", 2)
|
||||||
|
@ -660,6 +661,11 @@ public class Dispatcher {
|
||||||
|
|
||||||
long bytesReceived = 0;
|
long bytesReceived = 0;
|
||||||
for (BlockWithLocations blk : newBlocks.getBlocks()) {
|
for (BlockWithLocations blk : newBlocks.getBlocks()) {
|
||||||
|
// Skip small blocks.
|
||||||
|
if (blk.getBlock().getNumBytes() < getBlocksMinBlockSize) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
bytesReceived += blk.getBlock().getNumBytes();
|
bytesReceived += blk.getBlock().getNumBytes();
|
||||||
synchronized (globalBlocks) {
|
synchronized (globalBlocks) {
|
||||||
final DBlock block = globalBlocks.get(blk.getBlock());
|
final DBlock block = globalBlocks.get(blk.getBlock());
|
||||||
|
@ -838,9 +844,19 @@ public class Dispatcher {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Constructor called by Mover. */
|
||||||
public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
|
public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
|
||||||
Set<String> excludedNodes, long movedWinWidth, int moverThreads,
|
Set<String> excludedNodes, long movedWinWidth, int moverThreads,
|
||||||
int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) {
|
int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) {
|
||||||
|
this(nnc, includedNodes, excludedNodes, movedWinWidth,
|
||||||
|
moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
|
||||||
|
0L, 0L, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
|
||||||
|
Set<String> excludedNodes, long movedWinWidth, int moverThreads,
|
||||||
|
int dispatcherThreads, int maxConcurrentMovesPerNode,
|
||||||
|
long getBlocksSize, long getBlocksMinBlockSize, Configuration conf) {
|
||||||
this.nnc = nnc;
|
this.nnc = nnc;
|
||||||
this.excludedNodes = excludedNodes;
|
this.excludedNodes = excludedNodes;
|
||||||
this.includedNodes = includedNodes;
|
this.includedNodes = includedNodes;
|
||||||
|
@ -853,6 +869,9 @@ public class Dispatcher {
|
||||||
this.moverThreadAllocator = new Allocator(moverThreads);
|
this.moverThreadAllocator = new Allocator(moverThreads);
|
||||||
this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
|
this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
|
||||||
|
|
||||||
|
this.getBlocksSize = getBlocksSize;
|
||||||
|
this.getBlocksMinBlockSize = getBlocksMinBlockSize;
|
||||||
|
|
||||||
this.saslClient = new SaslDataTransferClient(conf,
|
this.saslClient = new SaslDataTransferClient(conf,
|
||||||
DataTransferSaslUtil.getSaslPropertiesResolver(conf),
|
DataTransferSaslUtil.getSaslPropertiesResolver(conf),
|
||||||
TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth);
|
TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth);
|
||||||
|
@ -1001,9 +1020,6 @@ public class Dispatcher {
|
||||||
return getBytesMoved() - bytesLastMoved;
|
return getBytesMoved() - bytesLastMoved;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** The sleeping period before checking if block move is completed again */
|
|
||||||
static private long blockMoveWaitTime = 30000L;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wait for all block move confirmations.
|
* Wait for all block move confirmations.
|
||||||
* @return true if there is failed move execution
|
* @return true if there is failed move execution
|
||||||
|
@ -1025,7 +1041,7 @@ public class Dispatcher {
|
||||||
return hasFailure; // all pending queues are empty
|
return hasFailure; // all pending queues are empty
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
Thread.sleep(blockMoveWaitTime);
|
Thread.sleep(1000);
|
||||||
} catch (InterruptedException ignored) {
|
} catch (InterruptedException ignored) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1152,12 +1168,6 @@ public class Dispatcher {
|
||||||
movedBlocks.cleanup();
|
movedBlocks.cleanup();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** set the sleeping period for block move completion check */
|
|
||||||
@VisibleForTesting
|
|
||||||
public static void setBlockMoveWaitTime(long time) {
|
|
||||||
blockMoveWaitTime = time;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static void setDelayAfterErrors(long time) {
|
public static void setDelayAfterErrors(long time) {
|
||||||
delayAfterErrors = time;
|
delayAfterErrors = time;
|
||||||
|
|
|
@ -118,8 +118,6 @@ public class TestBalancer {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void initTestSetup() {
|
public static void initTestSetup() {
|
||||||
Dispatcher.setBlockMoveWaitTime(1000L) ;
|
|
||||||
|
|
||||||
// do not create id file since it occupies the disk space
|
// do not create id file since it occupies the disk space
|
||||||
NameNodeConnector.setWrite2IdFile(false);
|
NameNodeConnector.setWrite2IdFile(false);
|
||||||
}
|
}
|
||||||
|
@ -128,9 +126,12 @@ public class TestBalancer {
|
||||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
|
||||||
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
|
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
|
||||||
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
|
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
|
||||||
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L);
|
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L);
|
||||||
SimulatedFSDataset.setFactory(conf);
|
SimulatedFSDataset.setFactory(conf);
|
||||||
|
|
||||||
conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
|
conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void initConfWithRamDisk(Configuration conf,
|
static void initConfWithRamDisk(Configuration conf,
|
||||||
|
@ -142,6 +143,8 @@ public class TestBalancer {
|
||||||
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
|
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
|
||||||
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
|
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
|
||||||
LazyPersistTestCase.initCacheManipulator();
|
LazyPersistTestCase.initCacheManipulator();
|
||||||
|
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* create a file with a length of <code>fileLen</code> */
|
/* create a file with a length of <code>fileLen</code> */
|
||||||
|
@ -1334,6 +1337,8 @@ public class TestBalancer {
|
||||||
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
|
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
|
||||||
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
|
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
|
||||||
|
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
|
||||||
|
|
||||||
final int BLOCK_SIZE = 1024*1024;
|
final int BLOCK_SIZE = 1024*1024;
|
||||||
cluster = new MiniDFSCluster
|
cluster = new MiniDFSCluster
|
||||||
.Builder(conf)
|
.Builder(conf)
|
||||||
|
@ -1408,6 +1413,8 @@ public class TestBalancer {
|
||||||
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
|
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
|
||||||
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L);
|
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L);
|
||||||
|
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
|
||||||
|
|
||||||
int numOfDatanodes =2;
|
int numOfDatanodes =2;
|
||||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
.numDataNodes(2)
|
.numDataNodes(2)
|
||||||
|
|
Loading…
Reference in New Issue