diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index bb8039c2d93..51900a484cf 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -244,6 +244,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY = HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY; public static final int DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT = 3; + public static final String DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS = + "dfs.namenode.redundancy.queue.restart.iterations"; + public static final int + DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS_DEFAULT = 2400; public static final String DFS_NAMENODE_REPLICATION_MIN_KEY = HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_MIN_KEY; public static final int DFS_NAMENODE_REPLICATION_MIN_DEFAULT = 1; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 626048f1cf7..e2b22d39b50 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -300,6 +300,16 @@ public class BlockManager implements BlockStatsMXBean { */ private final long redundancyRecheckIntervalMs; + /** + * Tracks how many calls have been made to chooseLowReduncancyBlocks since + * the queue position was last reset to the queue head. If CallsSinceReset + * crosses the threshold the next call will reset the iterators. A threshold + * of zero means the queue position will only be reset once the next of the + * queue has been reached. + */ + private int replQueueResetToHeadThreshold; + private int replQueueCallsSinceReset = 0; + /** How often to check and the limit for the storageinfo efficiency. */ private final long storageInfoDefragmentInterval; private final long storageInfoDefragmentTimeout; @@ -572,6 +582,18 @@ public class BlockManager implements BlockStatsMXBean { } this.minReplicationToBeInMaintenance = (short)minMaintenanceR; + replQueueResetToHeadThreshold = conf.getInt( + DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS, + DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS_DEFAULT); + if (replQueueResetToHeadThreshold < 0) { + LOG.warn("{} is set to {} and it must be >= 0. Resetting to default {}", + DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS, + replQueueResetToHeadThreshold, DFSConfigKeys. + DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS_DEFAULT); + replQueueResetToHeadThreshold = DFSConfigKeys. + DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS_DEFAULT; + } + long heartbeatIntervalSecs = conf.getTimeDuration( DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS); @@ -1912,9 +1934,18 @@ public class BlockManager implements BlockStatsMXBean { List> blocksToReconstruct = null; namesystem.writeLock(); try { - // Choose the blocks to be reconstructed + boolean reset = false; + if (replQueueResetToHeadThreshold > 0) { + if (replQueueCallsSinceReset >= replQueueResetToHeadThreshold) { + reset = true; + replQueueCallsSinceReset = 0; + } else { + replQueueCallsSinceReset++; + } + } + // Choose the blocks to be reconstructed blocksToReconstruct = neededReconstruction - .chooseLowRedundancyBlocks(blocksToProcess); + .chooseLowRedundancyBlocks(blocksToProcess, reset); } finally { namesystem.writeUnlock(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java index 40ea98053fa..8cf9dd40ca6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java @@ -488,6 +488,28 @@ class LowRedundancyBlocks implements Iterable { */ synchronized List> chooseLowRedundancyBlocks( int blocksToProcess) { + return chooseLowRedundancyBlocks(blocksToProcess, false); + } + + /** + * Get a list of block lists without sufficient redundancy. The index of + * block lists represents its replication priority. Iterates each block list + * in priority order beginning with the highest priority list. Iterators use + * a bookmark to resume where the previous iteration stopped. Returns when + * the block count is met or iteration reaches the end of the lowest priority + * list, in which case bookmarks for each block list are reset to the heads + * of their respective lists. + * + * @param blocksToProcess - number of blocks to fetch from low redundancy + * blocks. + * @param resetIterators - After gathering the list of blocks reset the + * position of all queue iterators to the head of the queue so + * subsequent calls will begin at the head of the queue + * @return Return a list of block lists to be replicated. The block list + * index represents its redundancy priority. + */ + synchronized List> chooseLowRedundancyBlocks( + int blocksToProcess, boolean resetIterators) { final List> blocksToReconstruct = new ArrayList<>(LEVEL); int count = 0; @@ -509,7 +531,7 @@ class LowRedundancyBlocks implements Iterable { } } - if (priority == LEVEL) { + if (priority == LEVEL || resetIterators) { // Reset all bookmarks because there were no recently added blocks. for (LightWeightLinkedSet q : priorityQueues) { q.resetBookmark(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index ad556c60b6e..0b7d13af725 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1136,6 +1136,24 @@ + + dfs.namenode.redundancy.queue.restart.iterations + 2400 + When picking blocks from the low redundancy queues, reset the + bookmarked iterator after the set number of iterations to ensure any blocks + which were not processed on the first pass are retried before the iterators + would naturally reach their end point. This ensures blocks are retried + more frequently when there are many pending blocks or blocks are + continuously added to the queues preventing the iterator reaching its + natural endpoint. + The default setting of 2400 combined with the default of + dfs.namenode.redundancy.interval.seconds means the iterators will be reset + approximately every 2 hours. + Setting this parameter to zero disables the feature and the iterators will + be reset only when the end of all queues has been reached. + + + dfs.namenode.accesstime.precision 3600000 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java index 785f3bec05c..000c636716c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.Collection; import java.util.Iterator; +import java.util.List; import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.protocol.Block; @@ -92,6 +93,32 @@ public class TestLowRedundancyBlockQueues { queues.getHighestPriorityECBlockCount()); } + @Test + public void testQueuePositionCanBeReset() throws Throwable { + LowRedundancyBlocks queues = new LowRedundancyBlocks(); + for (int i=0; i< 4; i++) { + BlockInfo block = genBlockInfo(i); + queues.add(block, 2, 0, 0, 3); + } + List> blocks; + // Get one block from the queue - should be block ID 0 returned + blocks = queues.chooseLowRedundancyBlocks(1, false); + assertEquals(1, blocks.get(2).size()); + assertEquals(0, blocks.get(2).get(0).getBlockId()); + + // Get the next blocks - should be ID 1 + blocks = queues.chooseLowRedundancyBlocks(1, false); + assertEquals(1, blocks.get(2).get(0).getBlockId()); + + // Get the next block, but also reset this time - should be ID 2 returned + blocks = queues.chooseLowRedundancyBlocks(1, true); + assertEquals(2, blocks.get(2).get(0).getBlockId()); + + // Get one more block and due to resetting the queue it will be block id 0 + blocks = queues.chooseLowRedundancyBlocks(1, false); + assertEquals(0, blocks.get(2).get(0).getBlockId()); + } + /** * Test that adding blocks with different replication counts puts them * into different queues.