From ff806cbfc7f5d1bdccd51a1802b6d69f4777219e Mon Sep 17 00:00:00 2001 From: Zhe Zhang Date: Tue, 25 Oct 2016 10:18:57 -0700 Subject: [PATCH] HDFS-11015. Enforce timeout in balancer. Contributed by Kihwal Lee. (cherry picked from commit f6367c5f44a88cb5eb7edffb015b10b657504a61) --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 + .../hadoop/hdfs/server/balancer/Balancer.java | 5 +- .../hdfs/server/balancer/Dispatcher.java | 51 ++++++++++++++----- .../src/main/resources/hdfs-default.xml | 15 ++++++ 4 files changed, 59 insertions(+), 14 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 6b6a4e09914..70486219a10 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -472,6 +472,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_BALANCER_ADDRESS_DEFAULT= "0.0.0.0:0"; public static final String DFS_BALANCER_KEYTAB_FILE_KEY = "dfs.balancer.keytab.file"; public static final String DFS_BALANCER_KERBEROS_PRINCIPAL_KEY = "dfs.balancer.kerberos.principal"; + public static final String DFS_BALANCER_BLOCK_MOVE_TIMEOUT = "dfs.balancer.block-move.timeout"; + public static final int DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT = 0; public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index fcd46695781..68c27b684d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -280,13 +280,16 @@ public class Balancer { final long getBlocksMinBlockSize = getLongBytes(conf, DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT); + final int blockMoveTimeout = conf.getInt( + DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT, + DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT); this.nnc = theblockpool; this.dispatcher = new Dispatcher(theblockpool, p.getIncludedNodes(), p.getExcludedNodes(), movedWinWidth, moverThreads, dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize, - getBlocksMinBlockSize, conf); + getBlocksMinBlockSize, blockMoveTimeout, conf); this.threshold = p.getThreshold(); this.policy = p.getBalancingPolicy(); this.sourceNodes = p.getSourceNodes(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 1dbe85bba26..9b4a5e363e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -121,6 +121,7 @@ public class Dispatcher { private final long getBlocksSize; private final long getBlocksMinBlockSize; + private final long blockMoveTimeout; private final int ioFileBufferSize; @@ -327,6 +328,11 @@ public class Dispatcher { getXferAddr(Dispatcher.this.connectToDnViaHostname)), HdfsConstants.READ_TIMEOUT); + // Set read timeout so that it doesn't hang forever against + // unresponsive nodes. Datanode normally sends IN_PROGRESS response + // twice within the client read timeout period (every 30 seconds by + // default). Here, we make it give up after 5 minutes of no response. + sock.setSoTimeout(HdfsConstants.READ_TIMEOUT * 5); sock.setKeepAlive(true); OutputStream unbufOut = sock.getOutputStream(); @@ -382,13 +388,26 @@ public class Dispatcher { source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode); } - /** Receive a block copy response from the input stream */ + /** Check whether to continue waiting for response */ + private boolean stopWaitingForResponse(long startTime) { + return source.isIterationOver() || + (blockMoveTimeout > 0 && + (Time.monotonicNow() - startTime > blockMoveTimeout)); + } + + /** Receive a reportedBlock copy response from the input stream */ private void receiveResponse(DataInputStream in) throws IOException { + long startTime = Time.monotonicNow(); BlockOpResponseProto response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); while (response.getStatus() == Status.IN_PROGRESS) { // read intermediate responses response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); + // Stop waiting for slow block moves. Even if it stops waiting, + // the actual move may continue. + if (stopWaitingForResponse(startTime)) { + throw new IOException("Block move timed out"); + } } String logInfo = "block move is failed"; DataTransferProtoUtil.checkBlockOpStatus(response, logInfo); @@ -626,6 +645,7 @@ public class Dispatcher { private final List tasks = new ArrayList(2); private long blocksToReceive = 0L; + private final long startTime = Time.monotonicNow(); /** * Source blocks point to the objects in {@link Dispatcher#globalBlocks} * because we want to keep one copy of a block and be aware that the @@ -637,6 +657,13 @@ public class Dispatcher { dn.super(storageType, maxSize2Move); } + /** + * Check if the iteration is over + */ + public boolean isIterationOver() { + return (Time.monotonicNow()-startTime > MAX_ITERATION_TIME); + } + /** Add a task */ void addTask(Task task) { Preconditions.checkState(task.target != this, @@ -777,24 +804,15 @@ public class Dispatcher { * elapsed time of the iteration has exceeded the max time limit. */ private void dispatchBlocks() { - final long startTime = Time.monotonicNow(); this.blocksToReceive = 2 * getScheduledSize(); - boolean isTimeUp = false; int noPendingMoveIteration = 0; - while (!isTimeUp && getScheduledSize() > 0 + while (getScheduledSize() > 0 && !isIterationOver() && (!srcBlocks.isEmpty() || blocksToReceive > 0)) { if (LOG.isTraceEnabled()) { LOG.trace(this + " blocksToReceive=" + blocksToReceive + ", scheduledSize=" + getScheduledSize() + ", srcBlocks#=" + srcBlocks.size()); } - // check if time is up or not - if (Time.monotonicNow() - startTime > MAX_ITERATION_TIME) { - LOG.info("Time up (max time=" + MAX_ITERATION_TIME/1000 - + " seconds). Skipping " + this); - isTimeUp = true; - continue; - } final PendingMove p = chooseNextMove(); if (p != null) { // Reset no pending move counter @@ -841,6 +859,11 @@ public class Dispatcher { } catch (InterruptedException ignored) { } } + + if (isIterationOver()) { + LOG.info("The maximum iteration time (" + MAX_ITERATION_TIME/1000 + + " seconds) has been reached. Stopping " + this); + } } @Override @@ -860,13 +883,14 @@ public class Dispatcher { int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) { this(nnc, includedNodes, excludedNodes, movedWinWidth, moverThreads, dispatcherThreads, maxConcurrentMovesPerNode, - 0L, 0L, conf); + 0L, 0L, 0, conf); } Dispatcher(NameNodeConnector nnc, Set includedNodes, Set excludedNodes, long movedWinWidth, int moverThreads, int dispatcherThreads, int maxConcurrentMovesPerNode, - long getBlocksSize, long getBlocksMinBlockSize, Configuration conf) { + long getBlocksSize, long getBlocksMinBlockSize, + int blockMoveTimeout, Configuration conf) { this.nnc = nnc; this.excludedNodes = excludedNodes; this.includedNodes = includedNodes; @@ -881,6 +905,7 @@ public class Dispatcher { this.getBlocksSize = getBlocksSize; this.getBlocksMinBlockSize = getBlocksMinBlockSize; + this.blockMoveTimeout = blockMoveTimeout; this.saslClient = new SaslDataTransferClient(conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 7fca68bf997..56c200adc51 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -3117,6 +3117,21 @@ + + dfs.balancer.block-move.timeout + 0 + + Maximum amount of time in milliseconds for a block to move. If this is set + greater than 0, Balancer will stop waiting for a block move completion + after this time. In typical clusters, a 3 to 5 minute timeout is reasonable. + If timeout happens to a large proportion of block moves, this needs to be + increased. It could also be that too much work is dispatched and many nodes + are constantly exceeding the bandwidth limit as a result. In that case, + other balancer parameters might need to be adjusted. + It is disabled (0) by default. + + + dfs.block.invalidate.limit 1000