diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java index c6b020977d5..278d282fd7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java @@ -53,7 +53,7 @@ public class RouterNamenodeProtocol implements NamenodeProtocol { @Override public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, - long minBlockSize) throws IOException { + long minBlockSize, long hotBlockTimeInterval) throws IOException { rpcServer.checkOperation(OperationCategory.READ); // Get the namespace where the datanode is located @@ -78,9 +78,9 @@ public class RouterNamenodeProtocol implements NamenodeProtocol { // Forward to the proper namenode if (nsId != null) { RemoteMethod method = new RemoteMethod( - NamenodeProtocol.class, "getBlocks", - new Class[] {DatanodeInfo.class, long.class, long.class}, - datanode, size, minBlockSize); + NamenodeProtocol.class, "getBlocks", new Class[] + {DatanodeInfo.class, long.class, long.class, long.class}, + datanode, size, minBlockSize, hotBlockTimeInterval); return rpcClient.invokeSingle(nsId, method, BlocksWithLocations.class); } return null; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 315f864c75a..a8cb5c6ce8f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -1490,8 +1490,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, @Override // NamenodeProtocol public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, - long minBlockSize) throws IOException { - return nnProto.getBlocks(datanode, size, minBlockSize); + long minBlockSize, long hotBlockTimeInterval) throws IOException { + return nnProto.getBlocks(datanode, size, minBlockSize, + hotBlockTimeInterval); } @Override // NamenodeProtocol diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index 09ca0d4582f..4b997ebb5ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -1350,9 +1350,9 @@ public class TestRouterRpc { // Verify that checking that datanode works BlocksWithLocations routerBlockLocations = - routerNamenodeProtocol.getBlocks(dn0, 1024, 0); + routerNamenodeProtocol.getBlocks(dn0, 1024, 0, 0); BlocksWithLocations nnBlockLocations = - nnNamenodeProtocol.getBlocks(dn0, 1024, 0); + nnNamenodeProtocol.getBlocks(dn0, 1024, 0, 0); BlockWithLocations[] routerBlocks = routerBlockLocations.getBlocks(); BlockWithLocations[] nnBlocks = nnBlockLocations.getBlocks(); assertEquals(nnBlocks.length, routerBlocks.length); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index e904f089231..0a5caed0dec 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -713,6 +713,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long DFS_BALANCER_GETBLOCKS_SIZE_DEFAULT = 2L*1024*1024*1024; // 2GB public static final String DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY = "dfs.balancer.getBlocks.min-block-size"; public static final long DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT = 10L*1024*1024; // 10MB + public static final String DFS_BALANCER_GETBLOCKS_HOT_TIME_INTERVAL_KEY = + "dfs.balancer.getBlocks.hot-time-interval"; + public static final long DFS_BALANCER_GETBLOCKS_HOT_TIME_INTERVAL_DEFAULT = + 0; public static final String DFS_BALANCER_KEYTAB_ENABLED_KEY = "dfs.balancer.keytab.enabled"; public static final boolean DFS_BALANCER_KEYTAB_ENABLED_DEFAULT = false; public static final String DFS_BALANCER_ADDRESS_KEY = "dfs.balancer.address"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java index 49fe99b3081..e89a6b62b50 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java @@ -89,7 +89,7 @@ public class NamenodeProtocolServerSideTranslatorPB implements BlocksWithLocations blocks; try { blocks = impl.getBlocks(dnInfo, request.getSize(), - request.getMinBlockSize()); + request.getMinBlockSize(), request.getTimeInterval()); } catch (IOException e) { throw new ServiceException(e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java index 603e14d264a..201004dc6f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java @@ -102,11 +102,11 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol, @Override public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long - minBlockSize) + minBlockSize, long timeInterval) throws IOException { GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder() .setDatanode(PBHelperClient.convert((DatanodeID)datanode)).setSize(size) - .setMinBlockSize(minBlockSize).build(); + .setMinBlockSize(minBlockSize).setTimeInterval(timeInterval).build(); try { return PBHelper.convert(rpcProxy.getBlocks(NULL_CONTROLLER, req) .getBlocks()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index e5f9e8c8061..6734c977d7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -203,6 +203,7 @@ public class Balancer { + "on over-utilized machines." + "\n\t[-asService]\tRun as a long running service." + "\n\t[-sortTopNodes]" + + "\n\t[-hotBlockTimeInterval]\tprefer to move cold blocks." + "\tSort datanodes based on the utilization so " + "that highly utilized datanodes get scheduled first."; @@ -315,6 +316,14 @@ public class Balancer { final long maxIterationTime = conf.getLong( DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY, DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_DEFAULT); + /** + * Balancer prefer to get blocks which are belong to the cold files + * created before this time period. + */ + final long hotBlockTimeInterval = conf.getTimeDuration( + DFSConfigKeys.DFS_BALANCER_GETBLOCKS_HOT_TIME_INTERVAL_KEY, + DFSConfigKeys.DFS_BALANCER_GETBLOCKS_HOT_TIME_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); // DataNode configuration parameters for balancing final int maxConcurrentMovesPerNode = getInt(conf, @@ -329,7 +338,7 @@ public class Balancer { p.getExcludedNodes(), movedWinWidth, moverThreads, dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize, getBlocksMinBlockSize, blockMoveTimeout, maxNoMoveInterval, - maxIterationTime, conf); + maxIterationTime, hotBlockTimeInterval, conf); this.threshold = p.getThreshold(); this.policy = p.getBalancingPolicy(); this.sourceNodes = p.getSourceNodes(); @@ -990,6 +999,14 @@ public class Balancer { } else if ("-asService".equalsIgnoreCase(args[i])) { b.setRunAsService(true); LOG.info("Balancer will run as a long running service"); + } else if ("-hotBlockTimeInterval".equalsIgnoreCase(args[i])) { + checkArgument(++i < args.length, + "hotBlockTimeInterval value is missing: args = " + + Arrays.toString(args)); + long hotBlockTimeInterval = Long.parseLong(args[i]); + LOG.info("Using a hotBlockTimeInterval of " + + hotBlockTimeInterval); + b.setHotBlockTimeInterval(hotBlockTimeInterval); } else if ("-sortTopNodes".equalsIgnoreCase(args[i])) { b.setSortTopNodes(true); LOG.info("Balancer will sort nodes by" + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java index e614327d7ca..a8ce338af1a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java @@ -27,6 +27,7 @@ final class BalancerParameters { private final BalancingPolicy policy; private final double threshold; private final int maxIdleIteration; + private final long hotBlockTimeInterval; /** Exclude the nodes in this set. */ private final Set excludedNodes; /** If empty, include any node; otherwise, include only these nodes. */ @@ -66,6 +67,7 @@ final class BalancerParameters { this.runDuringUpgrade = builder.runDuringUpgrade; this.runAsService = builder.runAsService; this.sortTopNodes = builder.sortTopNodes; + this.hotBlockTimeInterval = builder.hotBlockTimeInterval; } BalancingPolicy getBalancingPolicy() { @@ -113,12 +115,13 @@ final class BalancerParameters { return String.format("%s.%s [%s," + " threshold = %s," + " max idle iteration = %s," + " #excluded nodes = %s," + " #included nodes = %s," + " #source nodes = %s," - + " #blockpools = %s," + " run during upgrade = %s]" + + " #blockpools = %s," + " run during upgrade = %s," + + " hot block time interval = %s]" + " sort top nodes = %s", Balancer.class.getSimpleName(), getClass().getSimpleName(), policy, threshold, maxIdleIteration, excludedNodes.size(), includedNodes.size(), sourceNodes.size(), blockpools.size(), - runDuringUpgrade, sortTopNodes); + runDuringUpgrade, sortTopNodes, hotBlockTimeInterval); } static class Builder { @@ -134,6 +137,7 @@ final class BalancerParameters { private boolean runDuringUpgrade = false; private boolean runAsService = false; private boolean sortTopNodes = false; + private long hotBlockTimeInterval = 0; Builder() { } @@ -153,6 +157,11 @@ final class BalancerParameters { return this; } + Builder setHotBlockTimeInterval(long t) { + this.hotBlockTimeInterval = t; + return this; + } + Builder setExcludedNodes(Set nodes) { this.excludedNodes = nodes; return this; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index e19fbeb956f..c34e6a3ca4a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -128,6 +128,7 @@ public class Dispatcher { private final long getBlocksSize; private final long getBlocksMinBlockSize; private final long blockMoveTimeout; + private final long hotBlockTimeInterval; /** * If no block can be moved out of a {@link Source} after this configured * amount of time, the Source should give up choosing the next possible move. @@ -797,7 +798,8 @@ public class Dispatcher { private long getBlockList() throws IOException { final long size = Math.min(getBlocksSize, blocksToReceive); final BlocksWithLocations newBlksLocs = - nnc.getBlocks(getDatanodeInfo(), size, getBlocksMinBlockSize); + nnc.getBlocks(getDatanodeInfo(), size, getBlocksMinBlockSize, + hotBlockTimeInterval); if (LOG.isTraceEnabled()) { LOG.trace("getBlocks(" + getDatanodeInfo() + ", " @@ -1011,14 +1013,15 @@ public class Dispatcher { int maxNoMoveInterval, Configuration conf) { this(nnc, includedNodes, excludedNodes, movedWinWidth, moverThreads, dispatcherThreads, maxConcurrentMovesPerNode, - 0L, 0L, 0, maxNoMoveInterval, -1, conf); + 0L, 0L, 0, maxNoMoveInterval, -1, 0, conf); } Dispatcher(NameNodeConnector nnc, Set includedNodes, Set excludedNodes, long movedWinWidth, int moverThreads, int dispatcherThreads, int maxConcurrentMovesPerNode, long getBlocksSize, long getBlocksMinBlockSize, int blockMoveTimeout, - int maxNoMoveInterval, long maxIterationTime, Configuration conf) { + int maxNoMoveInterval, long maxIterationTime, long hotBlockTimeInterval, + Configuration conf) { this.nnc = nnc; this.excludedNodes = excludedNodes; this.includedNodes = includedNodes; @@ -1034,6 +1037,7 @@ public class Dispatcher { this.getBlocksSize = getBlocksSize; this.getBlocksMinBlockSize = getBlocksMinBlockSize; + this.hotBlockTimeInterval = hotBlockTimeInterval; this.blockMoveTimeout = blockMoveTimeout; this.maxNoMoveInterval = maxNoMoveInterval; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index 7f54c63303c..4d0524276e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -249,7 +249,7 @@ public class NameNodeConnector implements Closeable { /** @return blocks with locations. */ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long - minBlockSize) throws IOException { + minBlockSize, long timeInterval) throws IOException { if (getBlocksRateLimiter != null) { getBlocksRateLimiter.acquire(); } @@ -284,7 +284,7 @@ public class NameNodeConnector implements Closeable { } else { nnproxy = namenode; } - return nnproxy.getBlocks(datanode, size, minBlockSize); + return nnproxy.getBlocks(datanode, size, minBlockSize, timeInterval); } finally { if (isRequestStandby) { LOG.info("Request #getBlocks to Standby NameNode success."); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index e42373fd21b..d612fff53fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -89,6 +89,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; +import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.INodesInPath; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; @@ -1635,9 +1636,23 @@ public class BlockManager implements BlockStatsMXBean { return liveReplicas >= getDatanodeManager().getNumLiveDataNodes(); } + private boolean isHotBlock(BlockInfo blockInfo, long time) { + INodeFile iFile = (INodeFile)getBlockCollection(blockInfo); + if(iFile == null) { + return false; + } + if(iFile.isUnderConstruction()) { + return true; + } + if (iFile.getAccessTime() > time || iFile.getModificationTime() > time) { + return true; + } + return false; + } + /** Get all blocks with location information from a datanode. */ public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode, - final long size, final long minBlockSize) throws + final long size, final long minBlockSize, final long timeInterval) throws UnregisteredNodeException { final DatanodeDescriptor node = getDatanodeManager().getDatanode(datanode); if (node == null) { @@ -1655,15 +1670,21 @@ public class BlockManager implements BlockStatsMXBean { int startBlock = ThreadLocalRandom.current().nextInt(numBlocks); Iterator iter = node.getBlockIterator(startBlock); List results = new ArrayList(); + List pending = new ArrayList(); long totalSize = 0; BlockInfo curBlock; + long hotTimePos = Time.now() - timeInterval; while(totalSize 0 && isHotBlock(curBlock, hotTimePos)) { + pending.add(curBlock); + } else { + totalSize += addBlock(curBlock, results); + } } if(totalSize 0 && isHotBlock(curBlock, hotTimePos)) { + pending.add(curBlock); + } else { + totalSize += addBlock(curBlock, results); + } } } - + // if the cold block (access before timeInterval) is less than the + // asked size, it will add the pending hot block in end of return list. + for(int i = 0; i < pending.size() && totalSize < size; i++) { + curBlock = pending.get(i); + totalSize += addBlock(curBlock, results); + } return new BlocksWithLocations( results.toArray(new BlockWithLocations[results.size()])); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index cc413a8e74d..e48e20b07fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -1893,13 +1893,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, * @param minimumBlockSize */ public BlocksWithLocations getBlocks(DatanodeID datanode, long size, long - minimumBlockSize) throws IOException { + minimumBlockSize, long timeInterval) throws IOException { checkOperation(OperationCategory.READ); readLock(); try { checkOperation(OperationCategory.READ); return getBlockManager().getBlocksWithLocations(datanode, size, - minimumBlockSize); + minimumBlockSize, timeInterval); } finally { readUnlock("getBlocks"); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index fde7ece4b21..1d648f20377 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -651,7 +651,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { ///////////////////////////////////////////////////// @Override // NamenodeProtocol public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long - minBlockSize) + minBlockSize, long timeInterval) throws IOException { if(size <= 0) { throw new IllegalArgumentException( @@ -664,7 +664,7 @@ public class NameNodeRpcServer implements NamenodeProtocols { checkNNStartup(); namesystem.checkSuperuserPrivilege(); namesystem.checkNameNodeSafeMode("Cannot execute getBlocks"); - return namesystem.getBlocks(datanode, size, minBlockSize); + return namesystem.getBlocks(datanode, size, minBlockSize, timeInterval); } @Override // NamenodeProtocol diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java index 90c3b2345f2..44ffb85f79e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java @@ -74,6 +74,8 @@ public interface NamenodeProtocol { * @param datanode a data node * @param size requested size * @param minBlockSize each block should be of this minimum Block Size + * @param hotBlockTimeInterval prefer to get blocks which are belong to + * the cold files accessed before the time interval * @return BlocksWithLocations a list of blocks & their locations * @throws IOException if size is less than or equal to 0 or datanode does not exist @@ -81,7 +83,7 @@ public interface NamenodeProtocol { @Idempotent @ReadOnly BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long - minBlockSize) throws IOException; + minBlockSize, long hotBlockTimeInterval) throws IOException; /** * Get the current block keys diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto index 97f5bcaf61f..88d9fbc2e04 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto @@ -47,6 +47,7 @@ message GetBlocksRequestProto { // cause problem during rolling upgrade, when balancers are upgraded later. // For more info refer HDFS-13356 optional uint64 minBlockSize = 3 [default = 10485760]; + optional uint64 timeInterval = 4 [default = 0]; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index c2fafb93156..b1a0b1feff1 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -6068,4 +6068,13 @@ until capacity is balanced out. + + + dfs.balancer.getBlocks.hot-time-interval + 0 + + Balancer prefer moving cold blocks i.e blocks associated with files + accessed or modified before the specified time interval. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md index 4b7a7a75104..175c865f228 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md @@ -301,6 +301,7 @@ Usage: | `-idleiterations` \ | Maximum number of idle iterations before exit. This overwrites the default idleiterations(5). | | `-runDuringUpgrade` | Whether to run the balancer during an ongoing HDFS upgrade. This is usually not desired since it will not affect used space on over-utilized machines. | | `-asService` | Run Balancer as a long running service. | +| `-hotBlockTimeInterval` | Prefer moving cold blocks i.e blocks associated with files accessed or modified before the specified time interval. | | `-h`\|`--help` | Display the tool usage and help information and exit. | Runs a cluster balancing utility. An administrator can simply press Ctrl-C to stop the rebalancing process. See [Balancer](./HdfsUserGuide.html#Balancer) for more details. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java index e82b990a4e8..1ee166e6cd3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java @@ -238,26 +238,26 @@ public class TestGetBlocks { DFSUtilClient.getNNUri(addr), NamenodeProtocol.class).getProxy(); // Should return all 13 blocks, as minBlockSize is not passed - locs = namenode.getBlocks(dataNodes[0], fileLen, 0).getBlocks(); + locs = namenode.getBlocks(dataNodes[0], fileLen, 0, 0).getBlocks(); assertEquals(blkLocsSize, locs.length); assertEquals(locs[0].getStorageIDs().length, replicationFactor); assertEquals(locs[1].getStorageIDs().length, replicationFactor); // Should return 12 blocks, as minBlockSize is blkSize - locs = namenode.getBlocks(dataNodes[0], fileLen, blkSize).getBlocks(); + locs = namenode.getBlocks(dataNodes[0], fileLen, blkSize, 0).getBlocks(); assertEquals(blkLocsSize - 1, locs.length); assertEquals(locs[0].getStorageIDs().length, replicationFactor); assertEquals(locs[1].getStorageIDs().length, replicationFactor); // get blocks of size BlockSize from dataNodes[0] locs = namenode.getBlocks(dataNodes[0], blkSize, - blkSize).getBlocks(); + blkSize, 0).getBlocks(); assertEquals(locs.length, 1); assertEquals(locs[0].getStorageIDs().length, replicationFactor); // get blocks of size 1 from dataNodes[0] - locs = namenode.getBlocks(dataNodes[0], 1, 1).getBlocks(); + locs = namenode.getBlocks(dataNodes[0], 1, 1, 0).getBlocks(); assertEquals(locs.length, 1); assertEquals(locs[0].getStorageIDs().length, replicationFactor); @@ -282,7 +282,7 @@ public class TestGetBlocks { // Namenode should refuse to provide block locations to the balancer // while in safemode. - locs = namenode.getBlocks(dataNodes[0], fileLen, 0).getBlocks(); + locs = namenode.getBlocks(dataNodes[0], fileLen, 0, 0).getBlocks(); assertEquals(blkLocsSize, locs.length); assertFalse(fs.isInSafeMode()); LOG.info("Entering safe mode"); @@ -309,7 +309,7 @@ public class TestGetBlocks { // Namenode should refuse should fail LambdaTestUtils.intercept(exClass, - msg, () -> namenode.getBlocks(datanode, size, minBlkSize)); + msg, () -> namenode.getBlocks(datanode, size, minBlkSize, 0)); } /** @@ -396,4 +396,76 @@ public class TestGetBlocks { } } -} + private boolean belongToFile(BlockWithLocations blockWithLocations, + List blocks) { + for(LocatedBlock block : blocks) { + if (block.getBlock().getLocalBlock().equals( + blockWithLocations.getBlock())) { + return true; + } + } + return false; + } + + /** + * test GetBlocks with dfs.namenode.hot.block.interval. + * Balancer prefer to get blocks which are belong to the cold files + * created before this time period. + */ + @Test + public void testGetBlocksWithHotBlockTimeInterval() throws Exception { + final Configuration conf = new HdfsConfiguration(); + final short repFactor = (short) 1; + final int blockNum = 2; + final int fileLen = BLOCK_SIZE * blockNum; + final long hotInterval = 2000; + + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf). + numDataNodes(repFactor).build(); + try { + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + final DFSClient dfsclient = ((DistributedFileSystem) fs).getClient(); + + String fileOld = "/f.old"; + DFSTestUtil.createFile(fs, new Path(fileOld), fileLen, repFactor, 0); + + List locatedBlocksOld = dfsclient.getNamenode(). + getBlockLocations(fileOld, 0, fileLen).getLocatedBlocks(); + DatanodeInfo[] dataNodes = locatedBlocksOld.get(0).getLocations(); + + InetSocketAddress addr = new InetSocketAddress("localhost", + cluster.getNameNodePort()); + NamenodeProtocol namenode = NameNodeProxies.createProxy(conf, + DFSUtilClient.getNNUri(addr), NamenodeProtocol.class).getProxy(); + + // make the file as old. + dfsclient.getNamenode().setTimes(fileOld, 0, 0); + + String fileNew = "/f.new"; + DFSTestUtil.createFile(fs, new Path(fileNew), fileLen, repFactor, 0); + List locatedBlocksNew = dfsclient.getNamenode() + .getBlockLocations(fileNew, 0, fileLen).getLocatedBlocks(); + + BlockWithLocations[] locsAll = namenode.getBlocks( + dataNodes[0], fileLen*2, 0, hotInterval).getBlocks(); + assertEquals(locsAll.length, 4); + + for(int i = 0; i < blockNum; i++) { + assertTrue(belongToFile(locsAll[i], locatedBlocksOld)); + } + for(int i = blockNum; i < blockNum*2; i++) { + assertTrue(belongToFile(locsAll[i], locatedBlocksNew)); + } + + BlockWithLocations[] locs2 = namenode.getBlocks( + dataNodes[0], fileLen*2, 0, hotInterval).getBlocks(); + for(int i = 0; i < 2; i++) { + assertTrue(belongToFile(locs2[i], locatedBlocksOld)); + } + } finally { + cluster.shutdown(); + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 82d710d790f..9f65ffa1fea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -2170,7 +2170,8 @@ public class TestBalancer { endGetBlocksTime.getAndUpdate((curr) -> Math.max(curr, endTime)); numGetBlocksCalls.incrementAndGet(); return blk; - }}).when(fsnSpy).getBlocks(any(DatanodeID.class), anyLong(), anyLong()); + }}).when(fsnSpy).getBlocks(any(DatanodeID.class), + anyLong(), anyLong(), anyLong()); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java index b0ee04e00f1..a74f94f54d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java @@ -228,7 +228,7 @@ public class TestBalancerWithHANameNodes { int expectedObserverIdx = withObserverFailure ? 3 : 2; int expectedCount = (i == expectedObserverIdx) ? 2 : 0; verify(namesystemSpies.get(i), times(expectedCount)) - .getBlocks(any(), anyLong(), anyLong()); + .getBlocks(any(), anyLong(), anyLong(), anyLong()); } } finally { if (qjmhaCluster != null) {