HDFS-11015. Enforce timeout in balancer. Contributed by Kihwal Lee.

(cherry picked from commit f6367c5f44)
(cherry picked from commit ff806cbfc7)
(cherry picked from commit 262518fa5b)
This commit is contained in:
Zhe Zhang 2016-10-25 10:18:57 -07:00
parent d05e737b84
commit c05d701161
5 changed files with 62 additions and 15 deletions

View File

@ -175,6 +175,8 @@ Release 2.7.4 - UNRELEASED
HDFS-10627. Volume Scanner marks a block as "suspect" even if HDFS-10627. Volume Scanner marks a block as "suspect" even if
the exception is network-related. (Rushabh S Shah via kihwal) the exception is network-related. (Rushabh S Shah via kihwal)
HDFS-11015. Enforce timeout in balancer. (kihwal via zhz)
Release 2.7.3 - 2016-08-25 Release 2.7.3 - 2016-08-25
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -454,6 +454,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_BALANCER_GETBLOCKS_SIZE_DEFAULT = 2L*1024*1024*1024; // 2GB public static final long DFS_BALANCER_GETBLOCKS_SIZE_DEFAULT = 2L*1024*1024*1024; // 2GB
public static final String DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY = "dfs.balancer.getBlocks.min-block-size"; public static final String DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY = "dfs.balancer.getBlocks.min-block-size";
public static final long DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT = 10L*1024*1024; // 10MB public static final long DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT = 10L*1024*1024; // 10MB
public static final String DFS_BALANCER_BLOCK_MOVE_TIMEOUT = "dfs.balancer.block-move.timeout";
public static final int DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT = 0;
public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth"; public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth";
public static final long DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L; public static final long DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;

View File

@ -255,10 +255,14 @@ public class Balancer {
final long getBlocksMinBlockSize = getLong(conf, final long getBlocksMinBlockSize = getLong(conf,
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT); DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT);
final int blockMoveTimeout = conf.getInt(
DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT,
DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT);
this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded, this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded,
p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads, p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads,
maxConcurrentMovesPerNode, getBlocksSize, getBlocksMinBlockSize, conf); maxConcurrentMovesPerNode, getBlocksSize, getBlocksMinBlockSize,
blockMoveTimeout, conf);
this.threshold = p.threshold; this.threshold = p.threshold;
this.policy = p.policy; this.policy = p.policy;

View File

@ -121,6 +121,7 @@ public class Dispatcher {
private final long getBlocksSize; private final long getBlocksSize;
private final long getBlocksMinBlockSize; private final long getBlocksMinBlockSize;
private final long blockMoveTimeout;
static class Allocator { static class Allocator {
private final int max; private final int max;
@ -321,6 +322,11 @@ public class Dispatcher {
NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()), NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()),
HdfsServerConstants.READ_TIMEOUT); HdfsServerConstants.READ_TIMEOUT);
// Set read timeout so that it doesn't hang forever against
// unresponsive nodes. Datanode normally sends IN_PROGRESS response
// twice within the client read timeout period (every 30 seconds by
// default). Here, we make it give up after 5 minutes of no response.
sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT * 5);
sock.setKeepAlive(true); sock.setKeepAlive(true);
OutputStream unbufOut = sock.getOutputStream(); OutputStream unbufOut = sock.getOutputStream();
@ -375,13 +381,26 @@ public class Dispatcher {
source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode); source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode);
} }
/** Receive a block copy response from the input stream */ /** Check whether to continue waiting for response */
private boolean stopWaitingForResponse(long startTime) {
return source.isIterationOver() ||
(blockMoveTimeout > 0 &&
(Time.monotonicNow() - startTime > blockMoveTimeout));
}
/** Receive a reportedBlock copy response from the input stream */
private void receiveResponse(DataInputStream in) throws IOException { private void receiveResponse(DataInputStream in) throws IOException {
long startTime = Time.monotonicNow();
BlockOpResponseProto response = BlockOpResponseProto response =
BlockOpResponseProto.parseFrom(vintPrefixed(in)); BlockOpResponseProto.parseFrom(vintPrefixed(in));
while (response.getStatus() == Status.IN_PROGRESS) { while (response.getStatus() == Status.IN_PROGRESS) {
// read intermediate responses // read intermediate responses
response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
// Stop waiting for slow block moves. Even if it stops waiting,
// the actual move may continue.
if (stopWaitingForResponse(startTime)) {
throw new IOException("Block move timed out");
}
} }
String logInfo = "block move is failed"; String logInfo = "block move is failed";
DataTransferProtoUtil.checkBlockOpStatus(response, logInfo); DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
@ -614,6 +633,7 @@ public class Dispatcher {
private final List<Task> tasks = new ArrayList<Task>(2); private final List<Task> tasks = new ArrayList<Task>(2);
private long blocksToReceive = 0L; private long blocksToReceive = 0L;
private final long startTime = Time.monotonicNow();
/** /**
* Source blocks point to the objects in {@link Dispatcher#globalBlocks} * Source blocks point to the objects in {@link Dispatcher#globalBlocks}
* because we want to keep one copy of a block and be aware that the * because we want to keep one copy of a block and be aware that the
@ -625,6 +645,13 @@ public class Dispatcher {
dn.super(storageType, maxSize2Move); dn.super(storageType, maxSize2Move);
} }
/**
* Check if the iteration is over
*/
public boolean isIterationOver() {
return (Time.monotonicNow()-startTime > MAX_ITERATION_TIME);
}
/** Add a task */ /** Add a task */
void addTask(Task task) { void addTask(Task task) {
Preconditions.checkState(task.target != this, Preconditions.checkState(task.target != this,
@ -765,11 +792,9 @@ public class Dispatcher {
* elapsed time of the iteration has exceeded the max time limit. * elapsed time of the iteration has exceeded the max time limit.
*/ */
private void dispatchBlocks() { private void dispatchBlocks() {
final long startTime = Time.monotonicNow();
this.blocksToReceive = 2 * getScheduledSize(); this.blocksToReceive = 2 * getScheduledSize();
boolean isTimeUp = false;
int noPendingMoveIteration = 0; int noPendingMoveIteration = 0;
while (!isTimeUp && getScheduledSize() > 0 while (getScheduledSize() > 0 && !isIterationOver()
&& (!srcBlocks.isEmpty() || blocksToReceive > 0)) { && (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace(this + " blocksToReceive=" + blocksToReceive LOG.trace(this + " blocksToReceive=" + blocksToReceive
@ -809,14 +834,6 @@ public class Dispatcher {
} }
} }
// check if time is up or not
if (Time.monotonicNow() - startTime > MAX_ITERATION_TIME) {
LOG.info("Time up (max time=" + MAX_ITERATION_TIME/1000
+ " seconds). Skipping " + this);
isTimeUp = true;
continue;
}
// Now we can not schedule any block to move and there are // Now we can not schedule any block to move and there are
// no new blocks added to the source block list, so we wait. // no new blocks added to the source block list, so we wait.
try { try {
@ -826,6 +843,11 @@ public class Dispatcher {
} catch (InterruptedException ignored) { } catch (InterruptedException ignored) {
} }
} }
if (isIterationOver()) {
LOG.info("The maximum iteration time (" + MAX_ITERATION_TIME/1000
+ " seconds) has been reached. Stopping " + this);
}
} }
@Override @Override
@ -845,13 +867,14 @@ public class Dispatcher {
int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) { int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) {
this(nnc, includedNodes, excludedNodes, movedWinWidth, this(nnc, includedNodes, excludedNodes, movedWinWidth,
moverThreads, dispatcherThreads, maxConcurrentMovesPerNode, moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
0L, 0L, conf); 0L, 0L, 0, conf);
} }
Dispatcher(NameNodeConnector nnc, Set<String> includedNodes, Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
Set<String> excludedNodes, long movedWinWidth, int moverThreads, Set<String> excludedNodes, long movedWinWidth, int moverThreads,
int dispatcherThreads, int maxConcurrentMovesPerNode, int dispatcherThreads, int maxConcurrentMovesPerNode,
long getBlocksSize, long getBlocksMinBlockSize, Configuration conf) { long getBlocksSize, long getBlocksMinBlockSize,
int blockMoveTimeout, Configuration conf) {
this.nnc = nnc; this.nnc = nnc;
this.excludedNodes = excludedNodes; this.excludedNodes = excludedNodes;
this.includedNodes = includedNodes; this.includedNodes = includedNodes;
@ -866,6 +889,7 @@ public class Dispatcher {
this.getBlocksSize = getBlocksSize; this.getBlocksSize = getBlocksSize;
this.getBlocksMinBlockSize = getBlocksMinBlockSize; this.getBlocksMinBlockSize = getBlocksMinBlockSize;
this.blockMoveTimeout = blockMoveTimeout;
this.saslClient = new SaslDataTransferClient(conf, this.saslClient = new SaslDataTransferClient(conf,
DataTransferSaslUtil.getSaslPropertiesResolver(conf), DataTransferSaslUtil.getSaslPropertiesResolver(conf),

View File

@ -2531,6 +2531,21 @@
</description> </description>
</property> </property>
<property>
<name>dfs.balancer.block-move.timeout</name>
<value>0</value>
<description>
Maximum amount of time in milliseconds for a block to move. If this is set
greater than 0, Balancer will stop waiting for a block move completion
after this time. In typical clusters, a 3 to 5 minute timeout is reasonable.
If timeout happens to a large proportion of block moves, this needs to be
increased. It could also be that too much work is dispatched and many nodes
are constantly exceeding the bandwidth limit as a result. In that case,
other balancer parameters might need to be adjusted.
It is disabled (0) by default.
</description>
</property>
<property> <property>
<name>dfs.lock.suppress.warning.interval</name> <name>dfs.lock.suppress.warning.interval</name>
<value>10s</value> <value>10s</value>