HDFS-14861. Reset LowRedundancyBlocks Iterator periodically. Contributed by Stephen O'Donnell.
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
(cherry picked from commit 900430b990
)
This commit is contained in:
parent
fb0aba8521
commit
2377649cdb
|
@ -223,6 +223,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final String DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY =
|
public static final String DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY =
|
||||||
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY;
|
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY;
|
||||||
public static final int DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT = 3;
|
public static final int DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT = 3;
|
||||||
|
public static final String DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS =
|
||||||
|
"dfs.namenode.redundancy.queue.restart.iterations";
|
||||||
|
public static final int
|
||||||
|
DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS_DEFAULT = 2400;
|
||||||
public static final String DFS_NAMENODE_REPLICATION_MIN_KEY =
|
public static final String DFS_NAMENODE_REPLICATION_MIN_KEY =
|
||||||
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
|
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
|
||||||
public static final int DFS_NAMENODE_REPLICATION_MIN_DEFAULT = 1;
|
public static final int DFS_NAMENODE_REPLICATION_MIN_DEFAULT = 1;
|
||||||
|
|
|
@ -301,6 +301,16 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
*/
|
*/
|
||||||
private final long redundancyRecheckIntervalMs;
|
private final long redundancyRecheckIntervalMs;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tracks how many calls have been made to chooseLowReduncancyBlocks since
|
||||||
|
* the queue position was last reset to the queue head. If CallsSinceReset
|
||||||
|
* crosses the threshold the next call will reset the iterators. A threshold
|
||||||
|
* of zero means the queue position will only be reset once the next of the
|
||||||
|
* queue has been reached.
|
||||||
|
*/
|
||||||
|
private int replQueueResetToHeadThreshold;
|
||||||
|
private int replQueueCallsSinceReset = 0;
|
||||||
|
|
||||||
/** How often to check and the limit for the storageinfo efficiency. */
|
/** How often to check and the limit for the storageinfo efficiency. */
|
||||||
private final long storageInfoDefragmentInterval;
|
private final long storageInfoDefragmentInterval;
|
||||||
private final long storageInfoDefragmentTimeout;
|
private final long storageInfoDefragmentTimeout;
|
||||||
|
@ -564,6 +574,18 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
}
|
}
|
||||||
this.minReplicationToBeInMaintenance = (short)minMaintenanceR;
|
this.minReplicationToBeInMaintenance = (short)minMaintenanceR;
|
||||||
|
|
||||||
|
replQueueResetToHeadThreshold = conf.getInt(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS,
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS_DEFAULT);
|
||||||
|
if (replQueueResetToHeadThreshold < 0) {
|
||||||
|
LOG.warn("{} is set to {} and it must be >= 0. Resetting to default {}",
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS,
|
||||||
|
replQueueResetToHeadThreshold, DFSConfigKeys.
|
||||||
|
DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS_DEFAULT);
|
||||||
|
replQueueResetToHeadThreshold = DFSConfigKeys.
|
||||||
|
DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS_DEFAULT;
|
||||||
|
}
|
||||||
|
|
||||||
long heartbeatIntervalSecs = conf.getTimeDuration(
|
long heartbeatIntervalSecs = conf.getTimeDuration(
|
||||||
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
|
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
|
||||||
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
|
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
|
||||||
|
@ -1904,9 +1926,18 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
List<List<BlockInfo>> blocksToReconstruct = null;
|
List<List<BlockInfo>> blocksToReconstruct = null;
|
||||||
namesystem.writeLock();
|
namesystem.writeLock();
|
||||||
try {
|
try {
|
||||||
|
boolean reset = false;
|
||||||
|
if (replQueueResetToHeadThreshold > 0) {
|
||||||
|
if (replQueueCallsSinceReset >= replQueueResetToHeadThreshold) {
|
||||||
|
reset = true;
|
||||||
|
replQueueCallsSinceReset = 0;
|
||||||
|
} else {
|
||||||
|
replQueueCallsSinceReset++;
|
||||||
|
}
|
||||||
|
}
|
||||||
// Choose the blocks to be reconstructed
|
// Choose the blocks to be reconstructed
|
||||||
blocksToReconstruct = neededReconstruction
|
blocksToReconstruct = neededReconstruction
|
||||||
.chooseLowRedundancyBlocks(blocksToProcess);
|
.chooseLowRedundancyBlocks(blocksToProcess, reset);
|
||||||
} finally {
|
} finally {
|
||||||
namesystem.writeUnlock();
|
namesystem.writeUnlock();
|
||||||
}
|
}
|
||||||
|
|
|
@ -488,6 +488,28 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
|
||||||
*/
|
*/
|
||||||
synchronized List<List<BlockInfo>> chooseLowRedundancyBlocks(
|
synchronized List<List<BlockInfo>> chooseLowRedundancyBlocks(
|
||||||
int blocksToProcess) {
|
int blocksToProcess) {
|
||||||
|
return chooseLowRedundancyBlocks(blocksToProcess, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a list of block lists without sufficient redundancy. The index of
|
||||||
|
* block lists represents its replication priority. Iterates each block list
|
||||||
|
* in priority order beginning with the highest priority list. Iterators use
|
||||||
|
* a bookmark to resume where the previous iteration stopped. Returns when
|
||||||
|
* the block count is met or iteration reaches the end of the lowest priority
|
||||||
|
* list, in which case bookmarks for each block list are reset to the heads
|
||||||
|
* of their respective lists.
|
||||||
|
*
|
||||||
|
* @param blocksToProcess - number of blocks to fetch from low redundancy
|
||||||
|
* blocks.
|
||||||
|
* @param resetIterators - After gathering the list of blocks reset the
|
||||||
|
* position of all queue iterators to the head of the queue so
|
||||||
|
* subsequent calls will begin at the head of the queue
|
||||||
|
* @return Return a list of block lists to be replicated. The block list
|
||||||
|
* index represents its redundancy priority.
|
||||||
|
*/
|
||||||
|
synchronized List<List<BlockInfo>> chooseLowRedundancyBlocks(
|
||||||
|
int blocksToProcess, boolean resetIterators) {
|
||||||
final List<List<BlockInfo>> blocksToReconstruct = new ArrayList<>(LEVEL);
|
final List<List<BlockInfo>> blocksToReconstruct = new ArrayList<>(LEVEL);
|
||||||
|
|
||||||
int count = 0;
|
int count = 0;
|
||||||
|
@ -509,7 +531,7 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (priority == LEVEL) {
|
if (priority == LEVEL || resetIterators) {
|
||||||
// Reset all bookmarks because there were no recently added blocks.
|
// Reset all bookmarks because there were no recently added blocks.
|
||||||
for (LightWeightLinkedSet<BlockInfo> q : priorityQueues) {
|
for (LightWeightLinkedSet<BlockInfo> q : priorityQueues) {
|
||||||
q.resetBookmark();
|
q.resetBookmark();
|
||||||
|
|
|
@ -1074,6 +1074,24 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.namenode.redundancy.queue.restart.iterations</name>
|
||||||
|
<value>2400</value>
|
||||||
|
<description>When picking blocks from the low redundancy queues, reset the
|
||||||
|
bookmarked iterator after the set number of iterations to ensure any blocks
|
||||||
|
which were not processed on the first pass are retried before the iterators
|
||||||
|
would naturally reach their end point. This ensures blocks are retried
|
||||||
|
more frequently when there are many pending blocks or blocks are
|
||||||
|
continuously added to the queues preventing the iterator reaching its
|
||||||
|
natural endpoint.
|
||||||
|
The default setting of 2400 combined with the default of
|
||||||
|
dfs.namenode.redundancy.interval.seconds means the iterators will be reset
|
||||||
|
approximately every 2 hours.
|
||||||
|
Setting this parameter to zero disables the feature and the iterators will
|
||||||
|
be reset only when the end of all queues has been reached.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.namenode.accesstime.precision</name>
|
<name>dfs.namenode.accesstime.precision</name>
|
||||||
<value>3600000</value>
|
<value>3600000</value>
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.StripedFileTestUtil;
|
import org.apache.hadoop.hdfs.StripedFileTestUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
@ -92,6 +93,32 @@ public class TestLowRedundancyBlockQueues {
|
||||||
queues.getHighestPriorityECBlockCount());
|
queues.getHighestPriorityECBlockCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testQueuePositionCanBeReset() throws Throwable {
|
||||||
|
LowRedundancyBlocks queues = new LowRedundancyBlocks();
|
||||||
|
for (int i=0; i< 4; i++) {
|
||||||
|
BlockInfo block = genBlockInfo(i);
|
||||||
|
queues.add(block, 2, 0, 0, 3);
|
||||||
|
}
|
||||||
|
List<List<BlockInfo>> blocks;
|
||||||
|
// Get one block from the queue - should be block ID 0 returned
|
||||||
|
blocks = queues.chooseLowRedundancyBlocks(1, false);
|
||||||
|
assertEquals(1, blocks.get(2).size());
|
||||||
|
assertEquals(0, blocks.get(2).get(0).getBlockId());
|
||||||
|
|
||||||
|
// Get the next blocks - should be ID 1
|
||||||
|
blocks = queues.chooseLowRedundancyBlocks(1, false);
|
||||||
|
assertEquals(1, blocks.get(2).get(0).getBlockId());
|
||||||
|
|
||||||
|
// Get the next block, but also reset this time - should be ID 2 returned
|
||||||
|
blocks = queues.chooseLowRedundancyBlocks(1, true);
|
||||||
|
assertEquals(2, blocks.get(2).get(0).getBlockId());
|
||||||
|
|
||||||
|
// Get one more block and due to resetting the queue it will be block id 0
|
||||||
|
blocks = queues.chooseLowRedundancyBlocks(1, false);
|
||||||
|
assertEquals(0, blocks.get(2).get(0).getBlockId());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test that adding blocks with different replication counts puts them
|
* Test that adding blocks with different replication counts puts them
|
||||||
* into different queues.
|
* into different queues.
|
||||||
|
|
Loading…
Reference in New Issue