HDFS-11015. Enforce timeout in balancer. Contributed by Kihwal Lee.

(cherry picked from commit f6367c5f44)
This commit is contained in:
Zhe Zhang 2016-10-25 10:18:57 -07:00
parent 4c5c6c8c44
commit ff806cbfc7
4 changed files with 59 additions and 14 deletions

View File

@ -472,6 +472,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_BALANCER_ADDRESS_DEFAULT= "0.0.0.0:0";
public static final String DFS_BALANCER_KEYTAB_FILE_KEY = "dfs.balancer.keytab.file";
public static final String DFS_BALANCER_KERBEROS_PRINCIPAL_KEY = "dfs.balancer.kerberos.principal";
public static final String DFS_BALANCER_BLOCK_MOVE_TIMEOUT = "dfs.balancer.block-move.timeout";
public static final int DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT = 0;
public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth";

View File

@ -280,13 +280,16 @@ static int getInt(Configuration conf, String key, int defaultValue) {
final long getBlocksMinBlockSize = getLongBytes(conf,
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT);
final int blockMoveTimeout = conf.getInt(
DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT,
DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT);
this.nnc = theblockpool;
this.dispatcher =
new Dispatcher(theblockpool, p.getIncludedNodes(),
p.getExcludedNodes(), movedWinWidth, moverThreads,
dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize,
getBlocksMinBlockSize, conf);
getBlocksMinBlockSize, blockMoveTimeout, conf);
this.threshold = p.getThreshold();
this.policy = p.getBalancingPolicy();
this.sourceNodes = p.getSourceNodes();

View File

@ -121,6 +121,7 @@ public class Dispatcher {
private final long getBlocksSize;
private final long getBlocksMinBlockSize;
private final long blockMoveTimeout;
private final int ioFileBufferSize;
@ -327,6 +328,11 @@ private void dispatch() {
getXferAddr(Dispatcher.this.connectToDnViaHostname)),
HdfsConstants.READ_TIMEOUT);
// Set read timeout so that it doesn't hang forever against
// unresponsive nodes. Datanode normally sends IN_PROGRESS response
// twice within the client read timeout period (every 30 seconds by
// default). Here, we make it give up after 5 minutes of no response.
sock.setSoTimeout(HdfsConstants.READ_TIMEOUT * 5);
sock.setKeepAlive(true);
OutputStream unbufOut = sock.getOutputStream();
@ -382,13 +388,26 @@ private void sendRequest(DataOutputStream out, ExtendedBlock eb,
source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode);
}
/** Receive a block copy response from the input stream */
/** Check whether to continue waiting for response */
private boolean stopWaitingForResponse(long startTime) {
return source.isIterationOver() ||
(blockMoveTimeout > 0 &&
(Time.monotonicNow() - startTime > blockMoveTimeout));
}
/** Receive a reportedBlock copy response from the input stream */
private void receiveResponse(DataInputStream in) throws IOException {
long startTime = Time.monotonicNow();
BlockOpResponseProto response =
BlockOpResponseProto.parseFrom(vintPrefixed(in));
while (response.getStatus() == Status.IN_PROGRESS) {
// read intermediate responses
response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
// Stop waiting for slow block moves. Even if it stops waiting,
// the actual move may continue.
if (stopWaitingForResponse(startTime)) {
throw new IOException("Block move timed out");
}
}
String logInfo = "block move is failed";
DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
@ -626,6 +645,7 @@ public class Source extends DDatanode.StorageGroup {
private final List<Task> tasks = new ArrayList<Task>(2);
private long blocksToReceive = 0L;
private final long startTime = Time.monotonicNow();
/**
* Source blocks point to the objects in {@link Dispatcher#globalBlocks}
* because we want to keep one copy of a block and be aware that the
@ -637,6 +657,13 @@ private Source(StorageType storageType, long maxSize2Move, DDatanode dn) {
dn.super(storageType, maxSize2Move);
}
/**
* Check if the iteration is over
*/
public boolean isIterationOver() {
return (Time.monotonicNow()-startTime > MAX_ITERATION_TIME);
}
/** Add a task */
void addTask(Task task) {
Preconditions.checkState(task.target != this,
@ -777,24 +804,15 @@ private boolean shouldFetchMoreBlocks() {
* elapsed time of the iteration has exceeded the max time limit.
*/
private void dispatchBlocks() {
final long startTime = Time.monotonicNow();
this.blocksToReceive = 2 * getScheduledSize();
boolean isTimeUp = false;
int noPendingMoveIteration = 0;
while (!isTimeUp && getScheduledSize() > 0
while (getScheduledSize() > 0 && !isIterationOver()
&& (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + " blocksToReceive=" + blocksToReceive
+ ", scheduledSize=" + getScheduledSize()
+ ", srcBlocks#=" + srcBlocks.size());
}
// check if time is up or not
if (Time.monotonicNow() - startTime > MAX_ITERATION_TIME) {
LOG.info("Time up (max time=" + MAX_ITERATION_TIME/1000
+ " seconds). Skipping " + this);
isTimeUp = true;
continue;
}
final PendingMove p = chooseNextMove();
if (p != null) {
// Reset no pending move counter
@ -841,6 +859,11 @@ private void dispatchBlocks() {
} catch (InterruptedException ignored) {
}
}
if (isIterationOver()) {
LOG.info("The maximum iteration time (" + MAX_ITERATION_TIME/1000
+ " seconds) has been reached. Stopping " + this);
}
}
@Override
@ -860,13 +883,14 @@ public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) {
this(nnc, includedNodes, excludedNodes, movedWinWidth,
moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
0L, 0L, conf);
0L, 0L, 0, conf);
}
Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
Set<String> excludedNodes, long movedWinWidth, int moverThreads,
int dispatcherThreads, int maxConcurrentMovesPerNode,
long getBlocksSize, long getBlocksMinBlockSize, Configuration conf) {
long getBlocksSize, long getBlocksMinBlockSize,
int blockMoveTimeout, Configuration conf) {
this.nnc = nnc;
this.excludedNodes = excludedNodes;
this.includedNodes = includedNodes;
@ -881,6 +905,7 @@ public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
this.getBlocksSize = getBlocksSize;
this.getBlocksMinBlockSize = getBlocksMinBlockSize;
this.blockMoveTimeout = blockMoveTimeout;
this.saslClient = new SaslDataTransferClient(conf,
DataTransferSaslUtil.getSaslPropertiesResolver(conf),

View File

@ -3117,6 +3117,21 @@
</description>
</property>
<property>
<name>dfs.balancer.block-move.timeout</name>
<value>0</value>
<description>
Maximum amount of time in milliseconds for a block to move. If this is set
greater than 0, Balancer will stop waiting for a block move completion
after this time. In typical clusters, a 3 to 5 minute timeout is reasonable.
If timeout happens to a large proportion of block moves, this needs to be
increased. It could also be that too much work is dispatched and many nodes
are constantly exceeding the bandwidth limit as a result. In that case,
other balancer parameters might need to be adjusted.
It is disabled (0) by default.
</description>
</property>
<property>
<name>dfs.block.invalidate.limit</name>
<value>1000</value>