Allow block reconstruction pending timeout to be refreshable (#4567)
Reviewed-by: Hiroyuki Adachi <hadachi@yahoo-corp.jp> Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
This commit is contained in:
parent
21bae31d58
commit
1923096adb
|
@ -1067,6 +1067,26 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
blocksReplWorkMultiplier = newVal;
|
blocksReplWorkMultiplier = newVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Updates the value used for pendingReconstruction timeout, which is set by
|
||||||
|
* {@code DFSConfigKeys.
|
||||||
|
* DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY} initially.
|
||||||
|
*
|
||||||
|
* @param newVal - Must be a positive non-zero integer.
|
||||||
|
*/
|
||||||
|
public void setReconstructionPendingTimeout(int newVal) {
|
||||||
|
ensurePositiveInt(newVal,
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY);
|
||||||
|
pendingReconstruction.setTimeout(newVal * 1000L);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns the current setting for pendingReconstruction timeout, set by
|
||||||
|
* {@code DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY}.
|
||||||
|
*/
|
||||||
|
public int getReconstructionPendingTimeout() {
|
||||||
|
return (int)(pendingReconstruction.getTimeout() / 1000L);
|
||||||
|
}
|
||||||
|
|
||||||
public int getDefaultStorageNum(BlockInfo block) {
|
public int getDefaultStorageNum(BlockInfo block) {
|
||||||
switch (block.getBlockType()) {
|
switch (block.getBlockType()) {
|
||||||
case STRIPED: return ((BlockInfoStriped) block).getRealTotalBlockNum();
|
case STRIPED: return ((BlockInfoStriped) block).getRealTotalBlockNum();
|
||||||
|
|
|
@ -76,6 +76,14 @@ class PendingReconstructionBlocks {
|
||||||
timerThread.start();
|
timerThread.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setTimeout(long timeoutPeriod) {
|
||||||
|
this.timeout = timeoutPeriod;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getTimeout() {
|
||||||
|
return this.timeout;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add a block to the list of pending reconstructions
|
* Add a block to the list of pending reconstructions
|
||||||
* @param block The corresponding block
|
* @param block The corresponding block
|
||||||
|
|
|
@ -203,6 +203,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPO
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT;
|
||||||
|
|
||||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||||
import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
|
import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
|
||||||
|
@ -350,7 +352,8 @@ public class NameNode extends ReconfigurableBase implements
|
||||||
DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY,
|
DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY,
|
||||||
DFS_BLOCK_INVALIDATE_LIMIT_KEY,
|
DFS_BLOCK_INVALIDATE_LIMIT_KEY,
|
||||||
DFS_DATANODE_PEER_STATS_ENABLED_KEY,
|
DFS_DATANODE_PEER_STATS_ENABLED_KEY,
|
||||||
DFS_DATANODE_MAX_NODES_TO_REPORT_KEY));
|
DFS_DATANODE_MAX_NODES_TO_REPORT_KEY,
|
||||||
|
DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY));
|
||||||
|
|
||||||
private static final String USAGE = "Usage: hdfs namenode ["
|
private static final String USAGE = "Usage: hdfs namenode ["
|
||||||
+ StartupOption.BACKUP.getName() + "] | \n\t["
|
+ StartupOption.BACKUP.getName() + "] | \n\t["
|
||||||
|
@ -2301,7 +2304,8 @@ public class NameNode extends ReconfigurableBase implements
|
||||||
} else if (property.equals(DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY)
|
} else if (property.equals(DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY)
|
||||||
|| property.equals(DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY)
|
|| property.equals(DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY)
|
||||||
|| property.equals(
|
|| property.equals(
|
||||||
DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION)) {
|
DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION)
|
||||||
|
|| property.equals(DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY)) {
|
||||||
return reconfReplicationParameters(newVal, property);
|
return reconfReplicationParameters(newVal, property);
|
||||||
} else if (property.equals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY) || property
|
} else if (property.equals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY) || property
|
||||||
.equals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY)) {
|
.equals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY)) {
|
||||||
|
@ -2347,6 +2351,14 @@ public class NameNode extends ReconfigurableBase implements
|
||||||
DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT,
|
DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT,
|
||||||
newVal));
|
newVal));
|
||||||
newSetting = bm.getBlocksReplWorkMultiplier();
|
newSetting = bm.getBlocksReplWorkMultiplier();
|
||||||
|
} else if (
|
||||||
|
property.equals(
|
||||||
|
DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY)) {
|
||||||
|
bm.setReconstructionPendingTimeout(
|
||||||
|
adjustNewVal(
|
||||||
|
DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT,
|
||||||
|
newVal));
|
||||||
|
newSetting = bm.getReconstructionPendingTimeout();
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalArgumentException("Unexpected property " +
|
throw new IllegalArgumentException("Unexpected property " +
|
||||||
property + " in reconfReplicationParameters");
|
property + " in reconfReplicationParameters");
|
||||||
|
|
|
@ -49,6 +49,9 @@ public class TestRefreshNamenodeReplicationConfig {
|
||||||
config.setInt(
|
config.setInt(
|
||||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
|
DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
|
||||||
12);
|
12);
|
||||||
|
config.setInt(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
|
||||||
|
300);
|
||||||
|
|
||||||
cluster = new MiniDFSCluster.Builder(config)
|
cluster = new MiniDFSCluster.Builder(config)
|
||||||
.nnTopology(MiniDFSNNTopology.simpleSingleNN(0, 0))
|
.nnTopology(MiniDFSNNTopology.simpleSingleNN(0, 0))
|
||||||
|
@ -72,6 +75,7 @@ public class TestRefreshNamenodeReplicationConfig {
|
||||||
assertEquals(8, bm.getMaxReplicationStreams());
|
assertEquals(8, bm.getMaxReplicationStreams());
|
||||||
assertEquals(10, bm.getReplicationStreamsHardLimit());
|
assertEquals(10, bm.getReplicationStreamsHardLimit());
|
||||||
assertEquals(12, bm.getBlocksReplWorkMultiplier());
|
assertEquals(12, bm.getBlocksReplWorkMultiplier());
|
||||||
|
assertEquals(300, bm.getReconstructionPendingTimeout());
|
||||||
|
|
||||||
cluster.getNameNode().reconfigurePropertyImpl(
|
cluster.getNameNode().reconfigurePropertyImpl(
|
||||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, "20");
|
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, "20");
|
||||||
|
@ -81,10 +85,14 @@ public class TestRefreshNamenodeReplicationConfig {
|
||||||
cluster.getNameNode().reconfigurePropertyImpl(
|
cluster.getNameNode().reconfigurePropertyImpl(
|
||||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
|
DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
|
||||||
"24");
|
"24");
|
||||||
|
cluster.getNameNode().reconfigurePropertyImpl(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
|
||||||
|
"180");
|
||||||
|
|
||||||
assertEquals(20, bm.getMaxReplicationStreams());
|
assertEquals(20, bm.getMaxReplicationStreams());
|
||||||
assertEquals(22, bm.getReplicationStreamsHardLimit());
|
assertEquals(22, bm.getReplicationStreamsHardLimit());
|
||||||
assertEquals(24, bm.getBlocksReplWorkMultiplier());
|
assertEquals(24, bm.getBlocksReplWorkMultiplier());
|
||||||
|
assertEquals(180, bm.getReconstructionPendingTimeout());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -96,7 +104,8 @@ public class TestRefreshNamenodeReplicationConfig {
|
||||||
String[] keys = new String[]{
|
String[] keys = new String[]{
|
||||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
|
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
|
||||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
|
DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
|
||||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION
|
DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY
|
||||||
};
|
};
|
||||||
|
|
||||||
// Ensure we cannot set any of the parameters negative
|
// Ensure we cannot set any of the parameters negative
|
||||||
|
@ -112,6 +121,7 @@ public class TestRefreshNamenodeReplicationConfig {
|
||||||
assertEquals(8, bm.getMaxReplicationStreams());
|
assertEquals(8, bm.getMaxReplicationStreams());
|
||||||
assertEquals(10, bm.getReplicationStreamsHardLimit());
|
assertEquals(10, bm.getReplicationStreamsHardLimit());
|
||||||
assertEquals(12, bm.getBlocksReplWorkMultiplier());
|
assertEquals(12, bm.getBlocksReplWorkMultiplier());
|
||||||
|
assertEquals(300, bm.getReconstructionPendingTimeout());
|
||||||
|
|
||||||
for (String key : keys) {
|
for (String key : keys) {
|
||||||
ReconfigurationException e =
|
ReconfigurationException e =
|
||||||
|
@ -126,6 +136,7 @@ public class TestRefreshNamenodeReplicationConfig {
|
||||||
assertEquals(8, bm.getMaxReplicationStreams());
|
assertEquals(8, bm.getMaxReplicationStreams());
|
||||||
assertEquals(10, bm.getReplicationStreamsHardLimit());
|
assertEquals(10, bm.getReplicationStreamsHardLimit());
|
||||||
assertEquals(12, bm.getBlocksReplWorkMultiplier());
|
assertEquals(12, bm.getBlocksReplWorkMultiplier());
|
||||||
|
assertEquals(300, bm.getReconstructionPendingTimeout());
|
||||||
|
|
||||||
// Ensure none of the parameters can be set to a string value
|
// Ensure none of the parameters can be set to a string value
|
||||||
for (String key : keys) {
|
for (String key : keys) {
|
||||||
|
@ -139,5 +150,6 @@ public class TestRefreshNamenodeReplicationConfig {
|
||||||
assertEquals(8, bm.getMaxReplicationStreams());
|
assertEquals(8, bm.getMaxReplicationStreams());
|
||||||
assertEquals(10, bm.getReplicationStreamsHardLimit());
|
assertEquals(10, bm.getReplicationStreamsHardLimit());
|
||||||
assertEquals(12, bm.getBlocksReplWorkMultiplier());
|
assertEquals(12, bm.getBlocksReplWorkMultiplier());
|
||||||
|
assertEquals(300, bm.getReconstructionPendingTimeout());
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -438,7 +438,7 @@ public class TestDFSAdmin {
|
||||||
final List<String> outs = Lists.newArrayList();
|
final List<String> outs = Lists.newArrayList();
|
||||||
final List<String> errs = Lists.newArrayList();
|
final List<String> errs = Lists.newArrayList();
|
||||||
getReconfigurableProperties("namenode", address, outs, errs);
|
getReconfigurableProperties("namenode", address, outs, errs);
|
||||||
assertEquals(19, outs.size());
|
assertEquals(20, outs.size());
|
||||||
assertTrue(outs.get(0).contains("Reconfigurable properties:"));
|
assertTrue(outs.get(0).contains("Reconfigurable properties:"));
|
||||||
assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY, outs.get(1));
|
assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY, outs.get(1));
|
||||||
assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(2));
|
assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(2));
|
||||||
|
|
Loading…
Reference in New Issue