HDFS-16287. Support to make dfs.namenode.avoid.read.slow.datanode reconfigurable (#3596)
(cherry picked from commit db89a9411e
)
This commit is contained in:
parent
19b99c1ecc
commit
069426f5eb
|
@ -18,8 +18,6 @@
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION;
|
import static org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT;
|
|
||||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||||
|
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
|
import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
|
||||||
|
@ -141,7 +139,7 @@ public class DatanodeManager {
|
||||||
private final boolean avoidStaleDataNodesForRead;
|
private final boolean avoidStaleDataNodesForRead;
|
||||||
|
|
||||||
/** Whether or not to avoid using slow DataNodes for reading. */
|
/** Whether or not to avoid using slow DataNodes for reading. */
|
||||||
private final boolean avoidSlowDataNodesForRead;
|
private volatile boolean avoidSlowDataNodesForRead;
|
||||||
|
|
||||||
/** Whether or not to consider lad for reading. */
|
/** Whether or not to consider lad for reading. */
|
||||||
private final boolean readConsiderLoad;
|
private final boolean readConsiderLoad;
|
||||||
|
@ -217,7 +215,6 @@ public class DatanodeManager {
|
||||||
private Daemon slowPeerCollectorDaemon;
|
private Daemon slowPeerCollectorDaemon;
|
||||||
private final long slowPeerCollectionInterval;
|
private final long slowPeerCollectionInterval;
|
||||||
private final int maxSlowPeerReportNodes;
|
private final int maxSlowPeerReportNodes;
|
||||||
private boolean excludeSlowNodesEnabled;
|
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
private final SlowDiskTracker slowDiskTracker;
|
private final SlowDiskTracker slowDiskTracker;
|
||||||
|
@ -260,9 +257,6 @@ public class DatanodeManager {
|
||||||
final Timer timer = new Timer();
|
final Timer timer = new Timer();
|
||||||
this.slowPeerTracker = dataNodePeerStatsEnabled ?
|
this.slowPeerTracker = dataNodePeerStatsEnabled ?
|
||||||
new SlowPeerTracker(conf, timer) : null;
|
new SlowPeerTracker(conf, timer) : null;
|
||||||
this.excludeSlowNodesEnabled = conf.getBoolean(
|
|
||||||
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY,
|
|
||||||
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT);
|
|
||||||
this.maxSlowPeerReportNodes = conf.getInt(
|
this.maxSlowPeerReportNodes = conf.getInt(
|
||||||
DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY,
|
DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY,
|
||||||
DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT);
|
DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT);
|
||||||
|
@ -270,7 +264,7 @@ public class DatanodeManager {
|
||||||
DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY,
|
DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY,
|
||||||
DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_DEFAULT,
|
DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_DEFAULT,
|
||||||
TimeUnit.MILLISECONDS);
|
TimeUnit.MILLISECONDS);
|
||||||
if (slowPeerTracker != null && excludeSlowNodesEnabled) {
|
if (slowPeerTracker != null) {
|
||||||
startSlowPeerCollector();
|
startSlowPeerCollector();
|
||||||
}
|
}
|
||||||
this.slowDiskTracker = dataNodeDiskStatsEnabled ?
|
this.slowDiskTracker = dataNodeDiskStatsEnabled ?
|
||||||
|
@ -511,7 +505,16 @@ public class DatanodeManager {
|
||||||
private boolean isSlowNode(String dnUuid) {
|
private boolean isSlowNode(String dnUuid) {
|
||||||
return avoidSlowDataNodesForRead && slowNodesUuidSet.contains(dnUuid);
|
return avoidSlowDataNodesForRead && slowNodesUuidSet.contains(dnUuid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setAvoidSlowDataNodesForReadEnabled(boolean enable) {
|
||||||
|
this.avoidSlowDataNodesForRead = enable;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public boolean getEnableAvoidSlowDataNodesForRead() {
|
||||||
|
return this.avoidSlowDataNodesForRead;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sort the non-striped located blocks by the distance to the target host.
|
* Sort the non-striped located blocks by the distance to the target host.
|
||||||
*
|
*
|
||||||
|
|
|
@ -186,6 +186,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_GC_TIME_MONITOR_
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_GC_TIME_MONITOR_ENABLE_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_GC_TIME_MONITOR_ENABLE_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT;
|
||||||
|
|
||||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||||
import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
|
import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
|
||||||
|
@ -327,7 +329,8 @@ public class NameNode extends ReconfigurableBase implements
|
||||||
DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
|
DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
|
||||||
DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
|
DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
|
||||||
DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY,
|
DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY,
|
||||||
DFS_IMAGE_PARALLEL_LOAD_KEY));
|
DFS_IMAGE_PARALLEL_LOAD_KEY,
|
||||||
|
DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY));
|
||||||
|
|
||||||
private static final String USAGE = "Usage: hdfs namenode ["
|
private static final String USAGE = "Usage: hdfs namenode ["
|
||||||
+ StartupOption.BACKUP.getName() + "] | \n\t["
|
+ StartupOption.BACKUP.getName() + "] | \n\t["
|
||||||
|
@ -2193,6 +2196,8 @@ public class NameNode extends ReconfigurableBase implements
|
||||||
return newVal;
|
return newVal;
|
||||||
} else if (property.equals(DFS_IMAGE_PARALLEL_LOAD_KEY)) {
|
} else if (property.equals(DFS_IMAGE_PARALLEL_LOAD_KEY)) {
|
||||||
return reconfigureParallelLoad(newVal);
|
return reconfigureParallelLoad(newVal);
|
||||||
|
} else if (property.equals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY)) {
|
||||||
|
return reconfigureSlowNodesParameters(datanodeManager, property, newVal);
|
||||||
} else {
|
} else {
|
||||||
throw new ReconfigurationException(property, newVal, getConf().get(
|
throw new ReconfigurationException(property, newVal, getConf().get(
|
||||||
property));
|
property));
|
||||||
|
@ -2379,6 +2384,27 @@ public class NameNode extends ReconfigurableBase implements
|
||||||
return Boolean.toString(enableParallelLoad);
|
return Boolean.toString(enableParallelLoad);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String reconfigureSlowNodesParameters(final DatanodeManager datanodeManager,
|
||||||
|
final String property, final String newVal) throws ReconfigurationException {
|
||||||
|
namesystem.writeLock();
|
||||||
|
boolean enable;
|
||||||
|
try {
|
||||||
|
if (newVal == null) {
|
||||||
|
enable = DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT;
|
||||||
|
} else {
|
||||||
|
enable = Boolean.parseBoolean(newVal);
|
||||||
|
}
|
||||||
|
datanodeManager.setAvoidSlowDataNodesForReadEnabled(enable);
|
||||||
|
LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
|
||||||
|
return Boolean.toString(enable);
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
throw new ReconfigurationException(property, newVal, getConf().get(
|
||||||
|
property), e);
|
||||||
|
} finally {
|
||||||
|
namesystem.writeUnlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override // ReconfigurableBase
|
@Override // ReconfigurableBase
|
||||||
protected Configuration getNewConf() {
|
protected Configuration getNewConf() {
|
||||||
return new HdfsConfiguration();
|
return new HdfsConfiguration();
|
||||||
|
|
|
@ -51,6 +51,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHEC
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY;
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT;
|
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT;
|
||||||
|
|
||||||
public class TestNameNodeReconfigure {
|
public class TestNameNodeReconfigure {
|
||||||
|
@ -394,6 +395,23 @@ public class TestNameNodeReconfigure {
|
||||||
assertEquals(true, FSImageFormatProtobuf.getEnableParallelLoad());
|
assertEquals(true, FSImageFormatProtobuf.getEnableParallelLoad());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEnableSlowNodesParametersAfterReconfigured()
|
||||||
|
throws ReconfigurationException {
|
||||||
|
final NameNode nameNode = cluster.getNameNode();
|
||||||
|
final DatanodeManager datanodeManager = nameNode.namesystem
|
||||||
|
.getBlockManager().getDatanodeManager();
|
||||||
|
|
||||||
|
// By default, avoidSlowDataNodesForRead is false.
|
||||||
|
assertEquals(false, datanodeManager.getEnableAvoidSlowDataNodesForRead());
|
||||||
|
|
||||||
|
nameNode.reconfigureProperty(
|
||||||
|
DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, Boolean.toString(true));
|
||||||
|
|
||||||
|
// After reconfigured, avoidSlowDataNodesForRead is true.
|
||||||
|
assertEquals(true, datanodeManager.getEnableAvoidSlowDataNodesForRead());
|
||||||
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void shutDown() throws IOException {
|
public void shutDown() throws IOException {
|
||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
|
|
|
@ -25,6 +25,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY;
|
||||||
|
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
|
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
|
||||||
|
@ -422,12 +423,13 @@ public class TestDFSAdmin {
|
||||||
final List<String> outs = Lists.newArrayList();
|
final List<String> outs = Lists.newArrayList();
|
||||||
final List<String> errs = Lists.newArrayList();
|
final List<String> errs = Lists.newArrayList();
|
||||||
getReconfigurableProperties("namenode", address, outs, errs);
|
getReconfigurableProperties("namenode", address, outs, errs);
|
||||||
assertEquals(13, outs.size());
|
assertEquals(14, outs.size());
|
||||||
assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(1));
|
assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(1));
|
||||||
assertEquals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, outs.get(2));
|
assertEquals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, outs.get(2));
|
||||||
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(3));
|
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(3));
|
||||||
assertEquals(DFS_IMAGE_PARALLEL_LOAD_KEY, outs.get(4));
|
assertEquals(DFS_IMAGE_PARALLEL_LOAD_KEY, outs.get(4));
|
||||||
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(5));
|
assertEquals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, outs.get(5));
|
||||||
|
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(6));
|
||||||
assertEquals(errs.size(), 0);
|
assertEquals(errs.size(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue