HDFS-11015. Enforce timeout in balancer. Contributed by Kihwal Lee.
This commit is contained in:
parent
09ef97dccb
commit
f6367c5f44
|
@ -496,6 +496,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final String DFS_BALANCER_ADDRESS_DEFAULT= "0.0.0.0:0";
|
||||
public static final String DFS_BALANCER_KEYTAB_FILE_KEY = "dfs.balancer.keytab.file";
|
||||
public static final String DFS_BALANCER_KERBEROS_PRINCIPAL_KEY = "dfs.balancer.kerberos.principal";
|
||||
public static final String DFS_BALANCER_BLOCK_MOVE_TIMEOUT = "dfs.balancer.block-move.timeout";
|
||||
public static final int DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT = 0;
|
||||
|
||||
|
||||
public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth";
|
||||
|
|
|
@ -282,13 +282,16 @@ public class Balancer {
|
|||
final long getBlocksMinBlockSize = getLongBytes(conf,
|
||||
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY,
|
||||
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT);
|
||||
final int blockMoveTimeout = conf.getInt(
|
||||
DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT,
|
||||
DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT);
|
||||
|
||||
this.nnc = theblockpool;
|
||||
this.dispatcher =
|
||||
new Dispatcher(theblockpool, p.getIncludedNodes(),
|
||||
p.getExcludedNodes(), movedWinWidth, moverThreads,
|
||||
dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize,
|
||||
getBlocksMinBlockSize, conf);
|
||||
getBlocksMinBlockSize, blockMoveTimeout, conf);
|
||||
this.threshold = p.getThreshold();
|
||||
this.policy = p.getBalancingPolicy();
|
||||
this.sourceNodes = p.getSourceNodes();
|
||||
|
|
|
@ -121,6 +121,7 @@ public class Dispatcher {
|
|||
|
||||
private final long getBlocksSize;
|
||||
private final long getBlocksMinBlockSize;
|
||||
private final long blockMoveTimeout;
|
||||
|
||||
private final int ioFileBufferSize;
|
||||
|
||||
|
@ -331,6 +332,11 @@ public class Dispatcher {
|
|||
getXferAddr(Dispatcher.this.connectToDnViaHostname)),
|
||||
HdfsConstants.READ_TIMEOUT);
|
||||
|
||||
// Set read timeout so that it doesn't hang forever against
|
||||
// unresponsive nodes. Datanode normally sends IN_PROGRESS response
|
||||
// twice within the client read timeout period (every 30 seconds by
|
||||
// default). Here, we make it give up after 5 minutes of no response.
|
||||
sock.setSoTimeout(HdfsConstants.READ_TIMEOUT * 5);
|
||||
sock.setKeepAlive(true);
|
||||
|
||||
OutputStream unbufOut = sock.getOutputStream();
|
||||
|
@ -386,13 +392,26 @@ public class Dispatcher {
|
|||
source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode);
|
||||
}
|
||||
|
||||
/** Check whether to continue waiting for response */
|
||||
private boolean stopWaitingForResponse(long startTime) {
|
||||
return source.isIterationOver() ||
|
||||
(blockMoveTimeout > 0 &&
|
||||
(Time.monotonicNow() - startTime > blockMoveTimeout));
|
||||
}
|
||||
|
||||
/** Receive a reportedBlock copy response from the input stream */
|
||||
private void receiveResponse(DataInputStream in) throws IOException {
|
||||
long startTime = Time.monotonicNow();
|
||||
BlockOpResponseProto response =
|
||||
BlockOpResponseProto.parseFrom(vintPrefixed(in));
|
||||
while (response.getStatus() == Status.IN_PROGRESS) {
|
||||
// read intermediate responses
|
||||
response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
|
||||
// Stop waiting for slow block moves. Even if it stops waiting,
|
||||
// the actual move may continue.
|
||||
if (stopWaitingForResponse(startTime)) {
|
||||
throw new IOException("Block move timed out");
|
||||
}
|
||||
}
|
||||
String logInfo = "reportedBlock move is failed";
|
||||
DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
|
||||
|
@ -671,6 +690,7 @@ public class Dispatcher {
|
|||
|
||||
private final List<Task> tasks = new ArrayList<Task>(2);
|
||||
private long blocksToReceive = 0L;
|
||||
private final long startTime = Time.monotonicNow();
|
||||
/**
|
||||
* Source blocks point to the objects in {@link Dispatcher#globalBlocks}
|
||||
* because we want to keep one copy of a block and be aware that the
|
||||
|
@ -682,6 +702,13 @@ public class Dispatcher {
|
|||
dn.super(storageType, maxSize2Move);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the iteration is over
|
||||
*/
|
||||
public boolean isIterationOver() {
|
||||
return (Time.monotonicNow()-startTime > MAX_ITERATION_TIME);
|
||||
}
|
||||
|
||||
/** Add a task */
|
||||
void addTask(Task task) {
|
||||
Preconditions.checkState(task.target != this,
|
||||
|
@ -838,24 +865,15 @@ public class Dispatcher {
|
|||
* elapsed time of the iteration has exceeded the max time limit.
|
||||
*/
|
||||
private void dispatchBlocks() {
|
||||
final long startTime = Time.monotonicNow();
|
||||
this.blocksToReceive = 2 * getScheduledSize();
|
||||
boolean isTimeUp = false;
|
||||
int noPendingMoveIteration = 0;
|
||||
while (!isTimeUp && getScheduledSize() > 0
|
||||
while (getScheduledSize() > 0 && !isIterationOver()
|
||||
&& (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(this + " blocksToReceive=" + blocksToReceive
|
||||
+ ", scheduledSize=" + getScheduledSize()
|
||||
+ ", srcBlocks#=" + srcBlocks.size());
|
||||
}
|
||||
// check if time is up or not
|
||||
if (Time.monotonicNow() - startTime > MAX_ITERATION_TIME) {
|
||||
LOG.info("Time up (max time=" + MAX_ITERATION_TIME/1000
|
||||
+ " seconds). Skipping " + this);
|
||||
isTimeUp = true;
|
||||
continue;
|
||||
}
|
||||
final PendingMove p = chooseNextMove();
|
||||
if (p != null) {
|
||||
// Reset no pending move counter
|
||||
|
@ -902,6 +920,11 @@ public class Dispatcher {
|
|||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
}
|
||||
|
||||
if (isIterationOver()) {
|
||||
LOG.info("The maximum iteration time (" + MAX_ITERATION_TIME/1000
|
||||
+ " seconds) has been reached. Stopping " + this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -921,13 +944,14 @@ public class Dispatcher {
|
|||
int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) {
|
||||
this(nnc, includedNodes, excludedNodes, movedWinWidth,
|
||||
moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
|
||||
0L, 0L, conf);
|
||||
0L, 0L, 0, conf);
|
||||
}
|
||||
|
||||
Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
|
||||
Set<String> excludedNodes, long movedWinWidth, int moverThreads,
|
||||
int dispatcherThreads, int maxConcurrentMovesPerNode,
|
||||
long getBlocksSize, long getBlocksMinBlockSize, Configuration conf) {
|
||||
long getBlocksSize, long getBlocksMinBlockSize,
|
||||
int blockMoveTimeout, Configuration conf) {
|
||||
this.nnc = nnc;
|
||||
this.excludedNodes = excludedNodes;
|
||||
this.includedNodes = includedNodes;
|
||||
|
@ -942,6 +966,7 @@ public class Dispatcher {
|
|||
|
||||
this.getBlocksSize = getBlocksSize;
|
||||
this.getBlocksMinBlockSize = getBlocksMinBlockSize;
|
||||
this.blockMoveTimeout = blockMoveTimeout;
|
||||
|
||||
this.saslClient = new SaslDataTransferClient(conf,
|
||||
DataTransferSaslUtil.getSaslPropertiesResolver(conf),
|
||||
|
|
|
@ -3227,6 +3227,21 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.balancer.block-move.timeout</name>
|
||||
<value>0</value>
|
||||
<description>
|
||||
Maximum amount of time in milliseconds for a block to move. If this is set
|
||||
greater than 0, Balancer will stop waiting for a block move completion
|
||||
after this time. In typical clusters, a 3 to 5 minute timeout is reasonable.
|
||||
If timeout happens to a large proportion of block moves, this needs to be
|
||||
increased. It could also be that too much work is dispatched and many nodes
|
||||
are constantly exceeding the bandwidth limit as a result. In that case,
|
||||
other balancer parameters might need to be adjusted.
|
||||
It is disabled (0) by default.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.block.invalidate.limit</name>
|
||||
<value>1000</value>
|
||||
|
|
Loading…
Reference in New Issue