HDFS-10966. Enhance Dispatcher logic on deciding when to give up a source DataNode. Contributed by Mark Wagner and Zhe Zhang.

(cherry picked from commit 774b0cd845)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
This commit is contained in:
Kihwal Lee 2016-11-21 10:52:22 -06:00
parent 74503ed6b4
commit f5410f0814
6 changed files with 55 additions and 15 deletions

View File

@ -469,6 +469,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_BALANCER_KERBEROS_PRINCIPAL_KEY = "dfs.balancer.kerberos.principal"; public static final String DFS_BALANCER_KERBEROS_PRINCIPAL_KEY = "dfs.balancer.kerberos.principal";
public static final String DFS_BALANCER_BLOCK_MOVE_TIMEOUT = "dfs.balancer.block-move.timeout"; public static final String DFS_BALANCER_BLOCK_MOVE_TIMEOUT = "dfs.balancer.block-move.timeout";
public static final int DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT = 0; public static final int DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT = 0;
public static final String DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY = "dfs.balancer.max-no-move-interval";
public static final int DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; // One minute
public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth"; public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth";
@ -477,6 +479,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_MOVER_MOVERTHREADS_DEFAULT = 1000; public static final int DFS_MOVER_MOVERTHREADS_DEFAULT = 1000;
public static final String DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY = "dfs.mover.retry.max.attempts"; public static final String DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY = "dfs.mover.retry.max.attempts";
public static final int DFS_MOVER_RETRY_MAX_ATTEMPTS_DEFAULT = 10; public static final int DFS_MOVER_RETRY_MAX_ATTEMPTS_DEFAULT = 10;
public static final String DFS_MOVER_MAX_NO_MOVE_INTERVAL_KEY = "dfs.mover.max-no-move-interval";
public static final int DFS_MOVER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; // One minute
public static final String DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address"; public static final String DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";
public static final int DFS_DATANODE_DEFAULT_PORT = 50010; public static final int DFS_DATANODE_DEFAULT_PORT = 50010;

View File

@ -283,13 +283,16 @@ public class Balancer {
final int blockMoveTimeout = conf.getInt( final int blockMoveTimeout = conf.getInt(
DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT, DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT,
DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT); DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT);
final int maxNoMoveInterval = conf.getInt(
DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY,
DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT);
this.nnc = theblockpool; this.nnc = theblockpool;
this.dispatcher = this.dispatcher =
new Dispatcher(theblockpool, p.getIncludedNodes(), new Dispatcher(theblockpool, p.getIncludedNodes(),
p.getExcludedNodes(), movedWinWidth, moverThreads, p.getExcludedNodes(), movedWinWidth, moverThreads,
dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize, dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize,
getBlocksMinBlockSize, blockMoveTimeout, conf); getBlocksMinBlockSize, blockMoveTimeout, maxNoMoveInterval, conf);
this.threshold = p.getThreshold(); this.threshold = p.getThreshold();
this.policy = p.getBalancingPolicy(); this.policy = p.getBalancingPolicy();
this.sourceNodes = p.getSourceNodes(); this.sourceNodes = p.getSourceNodes();

View File

@ -122,6 +122,11 @@ public class Dispatcher {
private final long getBlocksSize; private final long getBlocksSize;
private final long getBlocksMinBlockSize; private final long getBlocksMinBlockSize;
private final long blockMoveTimeout; private final long blockMoveTimeout;
/**
* If no block can be moved out of a {@link Source} after this configured
* amount of time, the Source should give up choosing the next possible move.
*/
private final int maxNoMoveInterval;
private final int ioFileBufferSize; private final int ioFileBufferSize;
@ -805,7 +810,7 @@ public class Dispatcher {
*/ */
private void dispatchBlocks() { private void dispatchBlocks() {
this.blocksToReceive = 2 * getScheduledSize(); this.blocksToReceive = 2 * getScheduledSize();
int noPendingMoveIteration = 0; long previousMoveTimestamp = Time.monotonicNow();
while (getScheduledSize() > 0 && !isIterationOver() while (getScheduledSize() > 0 && !isIterationOver()
&& (!srcBlocks.isEmpty() || blocksToReceive > 0)) { && (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
@ -815,8 +820,8 @@ public class Dispatcher {
} }
final PendingMove p = chooseNextMove(); final PendingMove p = chooseNextMove();
if (p != null) { if (p != null) {
// Reset no pending move counter // Reset previous move timestamp
noPendingMoveIteration=0; previousMoveTimestamp = Time.monotonicNow();
executePendingMove(p); executePendingMove(p);
continue; continue;
} }
@ -839,13 +844,11 @@ public class Dispatcher {
return; return;
} }
} else { } else {
// source node cannot find a pending block to move, iteration +1 // jump out of while-loop after the configured timeout.
noPendingMoveIteration++; long noMoveInterval = Time.monotonicNow() - previousMoveTimestamp;
// in case no blocks can be moved for source node's task, if (noMoveInterval > maxNoMoveInterval) {
// jump out of while-loop after 5 iterations. LOG.info("Failed to find a pending move for " + noMoveInterval
if (noPendingMoveIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) { + " ms. Skipping " + this);
LOG.info("Failed to find a pending move " + noPendingMoveIteration
+ " times. Skipping " + this);
resetScheduledSize(); resetScheduledSize();
} }
} }
@ -856,6 +859,9 @@ public class Dispatcher {
synchronized (Dispatcher.this) { synchronized (Dispatcher.this) {
Dispatcher.this.wait(1000); // wait for targets/sources to be idle Dispatcher.this.wait(1000); // wait for targets/sources to be idle
} }
// Didn't find a possible move in this iteration of the while loop,
// adding a small delay before choosing next move again.
Thread.sleep(100);
} catch (InterruptedException ignored) { } catch (InterruptedException ignored) {
} }
} }
@ -880,17 +886,18 @@ public class Dispatcher {
/** Constructor called by Mover. */ /** Constructor called by Mover. */
public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes, public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
Set<String> excludedNodes, long movedWinWidth, int moverThreads, Set<String> excludedNodes, long movedWinWidth, int moverThreads,
int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) { int dispatcherThreads, int maxConcurrentMovesPerNode,
int maxNoMoveInterval, Configuration conf) {
this(nnc, includedNodes, excludedNodes, movedWinWidth, this(nnc, includedNodes, excludedNodes, movedWinWidth,
moverThreads, dispatcherThreads, maxConcurrentMovesPerNode, moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
0L, 0L, 0, conf); 0L, 0L, 0, maxNoMoveInterval, conf);
} }
Dispatcher(NameNodeConnector nnc, Set<String> includedNodes, Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
Set<String> excludedNodes, long movedWinWidth, int moverThreads, Set<String> excludedNodes, long movedWinWidth, int moverThreads,
int dispatcherThreads, int maxConcurrentMovesPerNode, int dispatcherThreads, int maxConcurrentMovesPerNode,
long getBlocksSize, long getBlocksMinBlockSize, long getBlocksSize, long getBlocksMinBlockSize,
int blockMoveTimeout, Configuration conf) { int blockMoveTimeout, int maxNoMoveInterval, Configuration conf) {
this.nnc = nnc; this.nnc = nnc;
this.excludedNodes = excludedNodes; this.excludedNodes = excludedNodes;
this.includedNodes = includedNodes; this.includedNodes = includedNodes;
@ -906,6 +913,7 @@ public class Dispatcher {
this.getBlocksSize = getBlocksSize; this.getBlocksSize = getBlocksSize;
this.getBlocksMinBlockSize = getBlocksMinBlockSize; this.getBlocksMinBlockSize = getBlocksMinBlockSize;
this.blockMoveTimeout = blockMoveTimeout; this.blockMoveTimeout = blockMoveTimeout;
this.maxNoMoveInterval = maxNoMoveInterval;
this.saslClient = new SaslDataTransferClient(conf, this.saslClient = new SaslDataTransferClient(conf,
DataTransferSaslUtil.getSaslPropertiesResolver(conf), DataTransferSaslUtil.getSaslPropertiesResolver(conf),

View File

@ -123,13 +123,16 @@ public class Mover {
final int maxConcurrentMovesPerNode = conf.getInt( final int maxConcurrentMovesPerNode = conf.getInt(
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
final int maxNoMoveInterval = conf.getInt(
DFSConfigKeys.DFS_MOVER_MAX_NO_MOVE_INTERVAL_KEY,
DFSConfigKeys.DFS_MOVER_MAX_NO_MOVE_INTERVAL_DEFAULT);
this.retryMaxAttempts = conf.getInt( this.retryMaxAttempts = conf.getInt(
DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY,
DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_DEFAULT); DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_DEFAULT);
this.retryCount = retryCount; this.retryCount = retryCount;
this.dispatcher = new Dispatcher(nnc, Collections.<String> emptySet(), this.dispatcher = new Dispatcher(nnc, Collections.<String> emptySet(),
Collections.<String> emptySet(), movedWinWidth, moverThreads, 0, Collections.<String> emptySet(), movedWinWidth, moverThreads, 0,
maxConcurrentMovesPerNode, conf); maxConcurrentMovesPerNode, maxNoMoveInterval, conf);
this.storages = new StorageMap(); this.storages = new StorageMap();
this.targetPaths = nnc.getTargetPaths(); this.targetPaths = nnc.getTargetPaths();
this.blockStoragePolicies = new BlockStoragePolicy[1 << this.blockStoragePolicies = new BlockStoragePolicy[1 <<

View File

@ -873,6 +873,16 @@
</description> </description>
</property> </property>
<property>
<name>dfs.mover.max-no-move-interval</name>
<value>60000</value>
<description>
If this specified amount of time has elapsed and no block has been moved
out of a source DataNode, on more effort will be made to move blocks out of
this DataNode in the current Mover iteration.
</description>
</property>
<property> <property>
<name>dfs.hosts</name> <name>dfs.hosts</name>
<value></value> <value></value>
@ -3062,6 +3072,16 @@
</description> </description>
</property> </property>
<property>
<name>dfs.balancer.max-no-move-interval</name>
<value>60000</value>
<description>
If this specified amount of time has elapsed and no block has been moved
out of a source DataNode, on more effort will be made to move blocks out of
this DataNode in the current Balancer iteration.
</description>
</property>
<property> <property>
<name>dfs.lock.suppress.warning.interval</name> <name>dfs.lock.suppress.warning.interval</name>
<value>10s</value> <value>10s</value>

View File

@ -233,6 +233,7 @@ public class TestBalancer {
conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L); conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L); conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
conf.setInt(DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, 5*1000);
} }
static void initConfWithRamDisk(Configuration conf, static void initConfWithRamDisk(Configuration conf,
@ -243,6 +244,7 @@ public class TestBalancer {
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1); conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
conf.setInt(DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, 5*1000);
LazyPersistTestCase.initCacheManipulator(); LazyPersistTestCase.initCacheManipulator();
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L); conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);