HDFS-8818. Changes the global moveExecutor to per datanode executors and changes MAX_SIZE_TO_MOVE to be configurable.

(cherry picked from commit ac8d153046)
This commit is contained in:
Tsz-Wo Nicholas Sze 2015-08-10 16:52:02 -07:00 committed by Zhe Zhang
parent d7da70332f
commit 32d810133c
5 changed files with 134 additions and 32 deletions

View File

@ -436,6 +436,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_BALANCER_MOVERTHREADS_DEFAULT = 1000;
public static final String DFS_BALANCER_DISPATCHERTHREADS_KEY = "dfs.balancer.dispatcherThreads";
public static final int DFS_BALANCER_DISPATCHERTHREADS_DEFAULT = 200;
public static final String DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY = "dfs.balancer.max-size-to-move";
public static final long DFS_BALANCER_MAX_SIZE_TO_MOVE_DEFAULT = 10L*1024*1024*1024;
public static final String DFS_BALANCER_KEYTAB_ENABLED_KEY = "dfs.balancer.keytab.enabled";
public static final boolean DFS_BALANCER_KEYTAB_ENABLED_DEFAULT = false;
public static final String DFS_BALANCER_ADDRESS_KEY = "dfs.balancer.address";

View File

@ -35,6 +35,7 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
@ -172,9 +173,6 @@ public class Balancer {
static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
private static final long GB = 1L << 30; //1GB
private static final long MAX_SIZE_TO_MOVE = 10*GB;
private static final String USAGE = "Usage: hdfs balancer"
+ "\n\t[-policy <policy>]\tthe balancing policy: "
+ BalancingPolicy.Node.INSTANCE.getName() + " or "
@ -190,7 +188,8 @@ public class Balancer {
private final Dispatcher dispatcher;
private final BalancingPolicy policy;
private final double threshold;
private final long maxSizeToMove;
// all data node lists
private final Collection<Source> overUtilized = new LinkedList<Source>();
private final Collection<Source> aboveAvgUtilized = new LinkedList<Source>();
@ -211,6 +210,24 @@ public class Balancer {
}
}
static long getLong(Configuration conf, String key, long defaultValue) {
final long v = conf.getLong(key, defaultValue);
LOG.info(key + " = " + v + " (default=" + defaultValue + ")");
if (v <= 0) {
throw new HadoopIllegalArgumentException(key + " = " + v + " <= " + 0);
}
return v;
}
static int getInt(Configuration conf, String key, int defaultValue) {
final int v = conf.getInt(key, defaultValue);
LOG.info(key + " = " + v + " (default=" + defaultValue + ")");
if (v <= 0) {
throw new HadoopIllegalArgumentException(key + " = " + v + " <= " + 0);
}
return v;
}
/**
* Construct a balancer.
* Initialize balancer. It sets the value of the threshold, and
@ -219,16 +236,16 @@ public class Balancer {
* when connection fails.
*/
Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
final long movedWinWidth = conf.getLong(
final long movedWinWidth = getLong(conf,
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
final int moverThreads = conf.getInt(
final int moverThreads = getInt(conf,
DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY,
DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT);
final int dispatcherThreads = conf.getInt(
final int dispatcherThreads = getInt(conf,
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT);
final int maxConcurrentMovesPerNode = conf.getInt(
final int maxConcurrentMovesPerNode = getInt(conf,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
@ -237,6 +254,10 @@ public class Balancer {
maxConcurrentMovesPerNode, conf);
this.threshold = p.threshold;
this.policy = p.policy;
this.maxSizeToMove = getLong(conf,
DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY,
DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_DEFAULT);
}
private static long getCapacity(DatanodeStorageReport report, StorageType t) {
@ -290,7 +311,7 @@ public class Balancer {
final double utilizationDiff = utilization - policy.getAvgUtilization(t);
final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
final long maxSize2Move = computeMaxSize2Move(capacity,
getRemaining(r, t), utilizationDiff, threshold);
getRemaining(r, t), utilizationDiff, threshold, maxSizeToMove);
final StorageGroup g;
if (utilizationDiff > 0) {
@ -327,13 +348,13 @@ public class Balancer {
}
private static long computeMaxSize2Move(final long capacity, final long remaining,
final double utilizationDiff, final double threshold) {
final double utilizationDiff, final double threshold, final long max) {
final double diff = Math.min(threshold, Math.abs(utilizationDiff));
long maxSizeToMove = precentage2bytes(diff, capacity);
if (utilizationDiff < 0) {
maxSizeToMove = Math.min(remaining, maxSizeToMove);
}
return Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
return Math.min(max, maxSizeToMove);
}
private static long precentage2bytes(double precentage, long capacity) {
@ -383,6 +404,7 @@ public class Balancer {
/* first step: match each overUtilized datanode (source) to
* one or more underUtilized datanodes (targets).
*/
LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => underUtilized");
chooseStorageGroups(overUtilized, underUtilized, matcher);
/* match each remaining overutilized datanode (source) to
@ -390,6 +412,7 @@ public class Balancer {
* Note only overutilized datanodes that haven't had that max bytes to move
* satisfied in step 1 are selected
*/
LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => belowAvgUtilized");
chooseStorageGroups(overUtilized, belowAvgUtilized, matcher);
/* match each remaining underutilized datanode (target) to
@ -397,6 +420,7 @@ public class Balancer {
* Note only underutilized datanodes that have not had that max bytes to
* move satisfied in step 1 are selected.
*/
LOG.info("chooseStorageGroups for " + matcher + ": underUtilized => aboveAvgUtilized");
chooseStorageGroups(underUtilized, aboveAvgUtilized, matcher);
}

View File

@ -115,12 +115,37 @@ public class Dispatcher {
private NetworkTopology cluster;
private final ExecutorService moveExecutor;
private final ExecutorService dispatchExecutor;
private final Allocator moverThreadAllocator;
/** The maximum number of concurrent blocks moves at a datanode */
private final int maxConcurrentMovesPerNode;
static class Allocator {
private final int max;
private int count = 0;
Allocator(int max) {
this.max = max;
}
synchronized int allocate(int n) {
final int remaining = max - count;
if (remaining <= 0) {
return 0;
} else {
final int allocated = remaining < n? remaining: n;
count += allocated;
return allocated;
}
}
synchronized void reset() {
count = 0;
}
}
private static class GlobalBlockMap {
private final Map<Block, DBlock> map = new HashMap<Block, DBlock>();
@ -286,9 +311,7 @@ public class Dispatcher {
/** Dispatch the move to the proxy source & wait for the response. */
private void dispatch() {
if (LOG.isDebugEnabled()) {
LOG.debug("Start moving " + this);
}
LOG.info("Start moving " + this);
Socket sock = new Socket();
DataOutputStream out = null;
@ -501,7 +524,7 @@ public class Dispatcher {
/** blocks being moved but not confirmed yet */
private final List<PendingMove> pendings;
private volatile boolean hasFailure = false;
private final int maxConcurrentMoves;
private ExecutorService moveExecutor;
@Override
public String toString() {
@ -510,7 +533,6 @@ public class Dispatcher {
private DDatanode(DatanodeInfo datanode, int maxConcurrentMoves) {
this.datanode = datanode;
this.maxConcurrentMoves = maxConcurrentMoves;
this.pendings = new ArrayList<PendingMove>(maxConcurrentMoves);
}
@ -518,6 +540,21 @@ public class Dispatcher {
return datanode;
}
synchronized ExecutorService initMoveExecutor(int poolSize) {
return moveExecutor = Executors.newFixedThreadPool(poolSize);
}
synchronized ExecutorService getMoveExecutor() {
return moveExecutor;
}
synchronized void shutdownMoveExecutor() {
if (moveExecutor != null) {
moveExecutor.shutdown();
moveExecutor = null;
}
}
private static <G extends StorageGroup> void put(StorageType storageType,
G g, EnumMap<StorageType, G> map) {
final StorageGroup existing = map.put(storageType, g);
@ -538,6 +575,7 @@ public class Dispatcher {
synchronized private void activateDelay(long delta) {
delayUntil = Time.monotonicNow() + delta;
LOG.info(this + " activateDelay " + delta/1000.0 + " seconds");
}
synchronized private boolean isDelayActive() {
@ -548,11 +586,6 @@ public class Dispatcher {
return true;
}
/** Check if the node can schedule more blocks to move */
synchronized boolean isPendingQNotFull() {
return pendings.size() < maxConcurrentMoves;
}
/** Check if all the dispatched moves are done */
synchronized boolean isPendingQEmpty() {
return pendings.isEmpty();
@ -560,7 +593,7 @@ public class Dispatcher {
/** Add a scheduled block move to the node */
synchronized boolean addPendingBlock(PendingMove pendingBlock) {
if (!isDelayActive() && isPendingQNotFull()) {
if (!isDelayActive()) {
return pendings.add(pendingBlock);
}
return false;
@ -614,6 +647,11 @@ public class Dispatcher {
private long getBlockList() throws IOException {
final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size);
if (LOG.isTraceEnabled()) {
LOG.trace("getBlocks(" + getDatanodeInfo() + ", "
+ StringUtils.TraditionalBinaryPrefix.long2String(size, "B", 2)
+ ") returns " + newBlocks.getBlocks().length + " blocks.");
}
long bytesReceived = 0;
for (BlockWithLocations blk : newBlocks.getBlocks()) {
@ -635,7 +673,9 @@ public class Dispatcher {
}
}
if (!srcBlocks.contains(block) && isGoodBlockCandidate(block)) {
// filter bad candidates
if (LOG.isTraceEnabled()) {
LOG.trace("Add " + block + " to " + this);
}
srcBlocks.add(block);
}
}
@ -703,11 +743,9 @@ public class Dispatcher {
}
}
private static final int SOURCE_BLOCKS_MIN_SIZE = 5;
/** @return if should fetch more blocks from namenode */
private boolean shouldFetchMoreBlocks() {
return srcBlocks.size() < SOURCE_BLOCKS_MIN_SIZE && blocksToReceive > 0;
return blocksToReceive > 0;
}
private static final long MAX_ITERATION_TIME = 20 * 60 * 1000L; // 20 mins
@ -727,6 +765,11 @@ public class Dispatcher {
int noPendingMoveIteration = 0;
while (!isTimeUp && getScheduledSize() > 0
&& (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + " blocksToReceive=" + blocksToReceive
+ ", scheduledSize=" + getScheduledSize()
+ ", srcBlocks#=" + srcBlocks.size());
}
final PendingMove p = chooseNextMove();
if (p != null) {
// Reset no pending move counter
@ -754,12 +797,16 @@ public class Dispatcher {
// in case no blocks can be moved for source node's task,
// jump out of while-loop after 5 iterations.
if (noPendingMoveIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) {
LOG.info("Failed to find a pending move " + noPendingMoveIteration
+ " times. Skipping " + this);
resetScheduledSize();
}
}
// check if time is up or not
if (Time.monotonicNow() - startTime > MAX_ITERATION_TIME) {
LOG.info("Time up (max time=" + MAX_ITERATION_TIME/1000
+ " seconds). Skipping " + this);
isTimeUp = true;
continue;
}
@ -796,9 +843,9 @@ public class Dispatcher {
this.cluster = NetworkTopology.getInstance(conf);
this.moveExecutor = Executors.newFixedThreadPool(moverThreads);
this.dispatchExecutor = dispatcherThreads == 0? null
: Executors.newFixedThreadPool(dispatcherThreads);
this.moverThreadAllocator = new Allocator(moverThreads);
this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
this.saslClient = new SaslDataTransferClient(conf,
@ -882,8 +929,22 @@ public class Dispatcher {
return new DDatanode(datanode, maxConcurrentMovesPerNode);
}
public void executePendingMove(final PendingMove p) {
// move the block
final DDatanode targetDn = p.target.getDDatanode();
ExecutorService moveExecutor = targetDn.getMoveExecutor();
if (moveExecutor == null) {
final int nThreads = moverThreadAllocator.allocate(maxConcurrentMovesPerNode);
if (nThreads > 0) {
moveExecutor = targetDn.initMoveExecutor(nThreads);
}
}
if (moveExecutor == null) {
LOG.warn("No mover threads available: skip moving " + p);
return;
}
moveExecutor.execute(new Runnable() {
@Override
public void run() {
@ -1063,6 +1124,11 @@ public class Dispatcher {
cluster = NetworkTopology.getInstance(conf);
storageGroupMap.clear();
sources.clear();
moverThreadAllocator.reset();
for(StorageGroup t : targets) {
t.getDDatanode().shutdownMoveExecutor();
}
targets.clear();
globalBlocks.removeAllButRetain(movedBlocks);
movedBlocks.cleanup();
@ -1084,7 +1150,6 @@ public class Dispatcher {
if (dispatchExecutor != null) {
dispatchExecutor.shutdownNow();
}
moveExecutor.shutdownNow();
}
static class Util {

View File

@ -77,6 +77,11 @@ public class MovedBlocks<L> {
public long getNumBytes() {
return block.getNumBytes();
}
@Override
public String toString() {
return block + " size=" + getNumBytes();
}
}
private static final int CUR_WIN = 0;

View File

@ -50,9 +50,9 @@ import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.URI;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -68,6 +68,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@ -83,8 +84,13 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
@ -1023,7 +1029,7 @@ public class TestBalancer {
new String[] {RACK0, RACK1});
}
@Test(timeout=100000)
@Test(expected=HadoopIllegalArgumentException.class)
public void testBalancerWithZeroThreadsForMove() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 0);