diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 565e469ee64..15c9df5c1e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -772,6 +772,9 @@ Release 2.8.0 - UNRELEASED HDFS-8772. Fix TestStandbyIsHot#testDatanodeRestarts which occasionally fails. (Walter Su via wang) + HDFS-8818. Changes the global moveExecutor to per datanode executors and + changes MAX_SIZE_TO_MOVE to be configurable. (szetszwo) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 1e5bf0deb8e..4ef7a4d09f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -351,6 +351,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_BALANCER_MOVERTHREADS_DEFAULT = 1000; public static final String DFS_BALANCER_DISPATCHERTHREADS_KEY = "dfs.balancer.dispatcherThreads"; public static final int DFS_BALANCER_DISPATCHERTHREADS_DEFAULT = 200; + public static final String DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY = "dfs.balancer.max-size-to-move"; + public static final long DFS_BALANCER_MAX_SIZE_TO_MOVE_DEFAULT = 10L*1024*1024*1024; + public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth"; public static final long DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 8b7d802de6d..742a30072af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -34,6 +34,7 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; @@ -167,9 +168,6 @@ public class Balancer { static final Path BALANCER_ID_PATH = new Path("/system/balancer.id"); - private static final long GB = 1L << 30; //1GB - private static final long MAX_SIZE_TO_MOVE = 10*GB; - private static final String USAGE = "Usage: hdfs balancer" + "\n\t[-policy ]\tthe balancing policy: " + BalancingPolicy.Node.INSTANCE.getName() + " or " @@ -192,6 +190,7 @@ public class Balancer { private final BalancingPolicy policy; private final boolean runDuringUpgrade; private final double threshold; + private final long maxSizeToMove; // all data node lists private final Collection overUtilized = new LinkedList(); @@ -213,6 +212,24 @@ public class Balancer { } } + static long getLong(Configuration conf, String key, long defaultValue) { + final long v = conf.getLong(key, defaultValue); + LOG.info(key + " = " + v + " (default=" + defaultValue + ")"); + if (v <= 0) { + throw new HadoopIllegalArgumentException(key + " = " + v + " <= " + 0); + } + return v; + } + + static int getInt(Configuration conf, String key, int defaultValue) { + final int v = conf.getInt(key, defaultValue); + LOG.info(key + " = " + v + " (default=" + defaultValue + ")"); + if (v <= 0) { + throw new HadoopIllegalArgumentException(key + " = " + v + " <= " + 0); + } + return v; + } + /** * Construct a balancer. * Initialize balancer. It sets the value of the threshold, and @@ -221,16 +238,16 @@ public class Balancer { * when connection fails. */ Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) { - final long movedWinWidth = conf.getLong( + final long movedWinWidth = getLong(conf, DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT); - final int moverThreads = conf.getInt( + final int moverThreads = getInt(conf, DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY, DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT); - final int dispatcherThreads = conf.getInt( + final int dispatcherThreads = getInt(conf, DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT); - final int maxConcurrentMovesPerNode = conf.getInt( + final int maxConcurrentMovesPerNode = getInt(conf, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); @@ -241,6 +258,10 @@ public class Balancer { this.threshold = p.threshold; this.policy = p.policy; this.runDuringUpgrade = p.runDuringUpgrade; + + this.maxSizeToMove = getLong(conf, + DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY, + DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_DEFAULT); } private static long getCapacity(DatanodeStorageReport report, StorageType t) { @@ -294,7 +315,7 @@ public class Balancer { final double utilizationDiff = utilization - policy.getAvgUtilization(t); final double thresholdDiff = Math.abs(utilizationDiff) - threshold; final long maxSize2Move = computeMaxSize2Move(capacity, - getRemaining(r, t), utilizationDiff, threshold); + getRemaining(r, t), utilizationDiff, threshold, maxSizeToMove); final StorageGroup g; if (utilizationDiff > 0) { @@ -331,13 +352,13 @@ public class Balancer { } private static long computeMaxSize2Move(final long capacity, final long remaining, - final double utilizationDiff, final double threshold) { + final double utilizationDiff, final double threshold, final long max) { final double diff = Math.min(threshold, Math.abs(utilizationDiff)); long maxSizeToMove = percentage2bytes(diff, capacity); if (utilizationDiff < 0) { maxSizeToMove = Math.min(remaining, maxSizeToMove); } - return Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove); + return Math.min(max, maxSizeToMove); } private static long percentage2bytes(double percentage, long capacity) { @@ -387,6 +408,7 @@ public class Balancer { /* first step: match each overUtilized datanode (source) to * one or more underUtilized datanodes (targets). */ + LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => underUtilized"); chooseStorageGroups(overUtilized, underUtilized, matcher); /* match each remaining overutilized datanode (source) to @@ -394,6 +416,7 @@ public class Balancer { * Note only overutilized datanodes that haven't had that max bytes to move * satisfied in step 1 are selected */ + LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => belowAvgUtilized"); chooseStorageGroups(overUtilized, belowAvgUtilized, matcher); /* match each remaining underutilized datanode (target) to @@ -401,6 +424,7 @@ public class Balancer { * Note only underutilized datanodes that have not had that max bytes to * move satisfied in step 1 are selected. */ + LOG.info("chooseStorageGroups for " + matcher + ": underUtilized => aboveAvgUtilized"); chooseStorageGroups(underUtilized, aboveAvgUtilized, matcher); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 298b86df100..b4b06eed3e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -112,14 +112,39 @@ public class Dispatcher { private NetworkTopology cluster; - private final ExecutorService moveExecutor; private final ExecutorService dispatchExecutor; + private final Allocator moverThreadAllocator; + /** The maximum number of concurrent blocks moves at a datanode */ private final int maxConcurrentMovesPerNode; private final int ioFileBufferSize; + static class Allocator { + private final int max; + private int count = 0; + + Allocator(int max) { + this.max = max; + } + + synchronized int allocate(int n) { + final int remaining = max - count; + if (remaining <= 0) { + return 0; + } else { + final int allocated = remaining < n? remaining: n; + count += allocated; + return allocated; + } + } + + synchronized void reset() { + count = 0; + } + } + private static class GlobalBlockMap { private final Map map = new HashMap(); @@ -285,9 +310,7 @@ public class Dispatcher { /** Dispatch the move to the proxy source & wait for the response. */ private void dispatch() { - if (LOG.isDebugEnabled()) { - LOG.debug("Start moving " + this); - } + LOG.info("Start moving " + this); Socket sock = new Socket(); DataOutputStream out = null; @@ -502,7 +525,7 @@ public class Dispatcher { private final List pendings; private volatile boolean hasFailure = false; private volatile boolean hasSuccess = false; - private final int maxConcurrentMoves; + private ExecutorService moveExecutor; @Override public String toString() { @@ -511,7 +534,6 @@ public class Dispatcher { private DDatanode(DatanodeInfo datanode, int maxConcurrentMoves) { this.datanode = datanode; - this.maxConcurrentMoves = maxConcurrentMoves; this.pendings = new ArrayList(maxConcurrentMoves); } @@ -519,6 +541,21 @@ public class Dispatcher { return datanode; } + synchronized ExecutorService initMoveExecutor(int poolSize) { + return moveExecutor = Executors.newFixedThreadPool(poolSize); + } + + synchronized ExecutorService getMoveExecutor() { + return moveExecutor; + } + + synchronized void shutdownMoveExecutor() { + if (moveExecutor != null) { + moveExecutor.shutdown(); + moveExecutor = null; + } + } + private static void put(StorageType storageType, G g, EnumMap map) { final StorageGroup existing = map.put(storageType, g); @@ -539,6 +576,7 @@ public class Dispatcher { synchronized private void activateDelay(long delta) { delayUntil = Time.monotonicNow() + delta; + LOG.info(this + " activateDelay " + delta/1000.0 + " seconds"); } synchronized private boolean isDelayActive() { @@ -549,11 +587,6 @@ public class Dispatcher { return true; } - /** Check if the node can schedule more blocks to move */ - synchronized boolean isPendingQNotFull() { - return pendings.size() < maxConcurrentMoves; - } - /** Check if all the dispatched moves are done */ synchronized boolean isPendingQEmpty() { return pendings.isEmpty(); @@ -561,7 +594,7 @@ public class Dispatcher { /** Add a scheduled block move to the node */ synchronized boolean addPendingBlock(PendingMove pendingBlock) { - if (!isDelayActive() && isPendingQNotFull()) { + if (!isDelayActive()) { return pendings.add(pendingBlock); } return false; @@ -619,6 +652,11 @@ public class Dispatcher { private long getBlockList() throws IOException { final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive); final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size); + if (LOG.isTraceEnabled()) { + LOG.trace("getBlocks(" + getDatanodeInfo() + ", " + + StringUtils.TraditionalBinaryPrefix.long2String(size, "B", 2) + + ") returns " + newBlocks.getBlocks().length + " blocks."); + } long bytesReceived = 0; for (BlockWithLocations blk : newBlocks.getBlocks()) { @@ -640,7 +678,9 @@ public class Dispatcher { } } if (!srcBlocks.contains(block) && isGoodBlockCandidate(block)) { - // filter bad candidates + if (LOG.isTraceEnabled()) { + LOG.trace("Add " + block + " to " + this); + } srcBlocks.add(block); } } @@ -708,11 +748,9 @@ public class Dispatcher { } } - private static final int SOURCE_BLOCKS_MIN_SIZE = 5; - /** @return if should fetch more blocks from namenode */ private boolean shouldFetchMoreBlocks() { - return srcBlocks.size() < SOURCE_BLOCKS_MIN_SIZE && blocksToReceive > 0; + return blocksToReceive > 0; } private static final long MAX_ITERATION_TIME = 20 * 60 * 1000L; // 20 mins @@ -732,6 +770,11 @@ public class Dispatcher { int noPendingMoveIteration = 0; while (!isTimeUp && getScheduledSize() > 0 && (!srcBlocks.isEmpty() || blocksToReceive > 0)) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + " blocksToReceive=" + blocksToReceive + + ", scheduledSize=" + getScheduledSize() + + ", srcBlocks#=" + srcBlocks.size()); + } final PendingMove p = chooseNextMove(); if (p != null) { // Reset no pending move counter @@ -759,12 +802,16 @@ public class Dispatcher { // in case no blocks can be moved for source node's task, // jump out of while-loop after 5 iterations. if (noPendingMoveIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) { + LOG.info("Failed to find a pending move " + noPendingMoveIteration + + " times. Skipping " + this); resetScheduledSize(); } } // check if time is up or not if (Time.monotonicNow() - startTime > MAX_ITERATION_TIME) { + LOG.info("Time up (max time=" + MAX_ITERATION_TIME/1000 + + " seconds). Skipping " + this); isTimeUp = true; continue; } @@ -801,9 +848,9 @@ public class Dispatcher { this.cluster = NetworkTopology.getInstance(conf); - this.moveExecutor = Executors.newFixedThreadPool(moverThreads); this.dispatchExecutor = dispatcherThreads == 0? null : Executors.newFixedThreadPool(dispatcherThreads); + this.moverThreadAllocator = new Allocator(moverThreads); this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode; this.saslClient = new SaslDataTransferClient(conf, @@ -888,8 +935,22 @@ public class Dispatcher { return new DDatanode(datanode, maxConcurrentMovesPerNode); } + public void executePendingMove(final PendingMove p) { // move the block + final DDatanode targetDn = p.target.getDDatanode(); + ExecutorService moveExecutor = targetDn.getMoveExecutor(); + if (moveExecutor == null) { + final int nThreads = moverThreadAllocator.allocate(maxConcurrentMovesPerNode); + if (nThreads > 0) { + moveExecutor = targetDn.initMoveExecutor(nThreads); + } + } + if (moveExecutor == null) { + LOG.warn("No mover threads available: skip moving " + p); + return; + } + moveExecutor.execute(new Runnable() { @Override public void run() { @@ -1081,6 +1142,11 @@ public class Dispatcher { cluster = NetworkTopology.getInstance(conf); storageGroupMap.clear(); sources.clear(); + + moverThreadAllocator.reset(); + for(StorageGroup t : targets) { + t.getDDatanode().shutdownMoveExecutor(); + } targets.clear(); globalBlocks.removeAllButRetain(movedBlocks); movedBlocks.cleanup(); @@ -1102,7 +1168,6 @@ public class Dispatcher { if (dispatchExecutor != null) { dispatchExecutor.shutdownNow(); } - moveExecutor.shutdownNow(); } static class Util { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java index 18b9cd8ecf5..4906226f106 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java @@ -77,6 +77,11 @@ public class MovedBlocks { public long getNumBytes() { return block.getNumBytes(); } + + @Override + public String toString() { + return block + " size=" + getNumBytes(); + } } private static final int CUR_WIN = 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index e1ce1b3d6e3..194aa0f8307 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -19,7 +19,13 @@ package org.apache.hadoop.hdfs.server.balancer; import static org.apache.hadoop.fs.StorageType.DEFAULT; import static org.apache.hadoop.fs.StorageType.RAM_DISK; -import static org.apache.hadoop.hdfs.DFSConfigKeys.*; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -29,8 +35,8 @@ import java.io.File; import java.io.IOException; import java.io.PrintWriter; import java.net.InetAddress; -import java.net.URI; import java.net.InetSocketAddress; +import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -45,6 +51,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -59,12 +66,16 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.NameNodeProxies; -import org.apache.hadoop.hdfs.protocol.*; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli; import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters; import org.apache.hadoop.hdfs.server.balancer.Balancer.Result; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase; @@ -927,7 +938,7 @@ public class TestBalancer { new String[] {RACK0, RACK1}); } - @Test(timeout=100000) + @Test(expected=HadoopIllegalArgumentException.class) public void testBalancerWithZeroThreadsForMove() throws Exception { Configuration conf = new HdfsConfiguration(); conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 0);