HDFS-15075. Remove process command timing from BPServiceActor. Contributed by Xiaoqiao He.
(cherry picked from commitcdcb77a2c5
) (cherry picked from commit5da46e197b
)
This commit is contained in:
parent
cedb8fec9d
commit
79187b5a5e
|
@ -398,6 +398,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final boolean DFS_DATANODE_PMEM_CACHE_RECOVERY_DEFAULT =
|
public static final boolean DFS_DATANODE_PMEM_CACHE_RECOVERY_DEFAULT =
|
||||||
true;
|
true;
|
||||||
|
|
||||||
|
public static final String DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_KEY =
|
||||||
|
"dfs.datanode.processcommands.threshold";
|
||||||
|
public static final long DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_DEFAULT =
|
||||||
|
TimeUnit.SECONDS.toMillis(2);
|
||||||
|
|
||||||
public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check";
|
public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check";
|
||||||
public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true;
|
public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true;
|
||||||
|
|
||||||
|
|
|
@ -683,15 +683,7 @@ class BPServiceActor implements Runnable {
|
||||||
if (state == HAServiceState.ACTIVE) {
|
if (state == HAServiceState.ACTIVE) {
|
||||||
handleRollingUpgradeStatus(resp);
|
handleRollingUpgradeStatus(resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
long startProcessCommands = monotonicNow();
|
|
||||||
commandProcessingThread.enqueue(resp.getCommands());
|
commandProcessingThread.enqueue(resp.getCommands());
|
||||||
long endProcessCommands = monotonicNow();
|
|
||||||
if (endProcessCommands - startProcessCommands > 2000) {
|
|
||||||
LOG.info("Took " + (endProcessCommands - startProcessCommands)
|
|
||||||
+ "ms to process " + resp.getCommands().length
|
|
||||||
+ " commands from NN");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!dn.areIBRDisabledForTests() &&
|
if (!dn.areIBRDisabledForTests() &&
|
||||||
|
@ -1312,6 +1304,7 @@ class BPServiceActor implements Runnable {
|
||||||
*/
|
*/
|
||||||
private boolean processCommand(DatanodeCommand[] cmds) {
|
private boolean processCommand(DatanodeCommand[] cmds) {
|
||||||
if (cmds != null) {
|
if (cmds != null) {
|
||||||
|
long startProcessCommands = monotonicNow();
|
||||||
for (DatanodeCommand cmd : cmds) {
|
for (DatanodeCommand cmd : cmds) {
|
||||||
try {
|
try {
|
||||||
if (!bpos.processCommandFromActor(cmd, actor)) {
|
if (!bpos.processCommandFromActor(cmd, actor)) {
|
||||||
|
@ -1330,6 +1323,14 @@ class BPServiceActor implements Runnable {
|
||||||
LOG.warn("Error processing datanode Command", ioe);
|
LOG.warn("Error processing datanode Command", ioe);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
long processCommandsMs = monotonicNow() - startProcessCommands;
|
||||||
|
if (cmds.length > 0) {
|
||||||
|
dn.getMetrics().addNumProcessedCommands(processCommandsMs);
|
||||||
|
}
|
||||||
|
if (processCommandsMs > dnConf.getProcessCommandsThresholdMs()) {
|
||||||
|
LOG.info("Took {} ms to process {} commands from NN",
|
||||||
|
processCommandsMs, cmds.length);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_DIRS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_DIRS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_RECOVERY_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_RECOVERY_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_RECOVERY_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_RECOVERY_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY;
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
|
||||||
|
@ -122,6 +124,8 @@ public class DNConf {
|
||||||
final long xceiverStopTimeout;
|
final long xceiverStopTimeout;
|
||||||
final long restartReplicaExpiry;
|
final long restartReplicaExpiry;
|
||||||
|
|
||||||
|
private final long processCommandsThresholdMs;
|
||||||
|
|
||||||
final long maxLockedMemory;
|
final long maxLockedMemory;
|
||||||
private final String[] pmemDirs;
|
private final String[] pmemDirs;
|
||||||
|
|
||||||
|
@ -298,6 +302,12 @@ public class DNConf {
|
||||||
this.pmemCacheRecoveryEnabled = getConf().getBoolean(
|
this.pmemCacheRecoveryEnabled = getConf().getBoolean(
|
||||||
DFS_DATANODE_PMEM_CACHE_RECOVERY_KEY,
|
DFS_DATANODE_PMEM_CACHE_RECOVERY_KEY,
|
||||||
DFS_DATANODE_PMEM_CACHE_RECOVERY_DEFAULT);
|
DFS_DATANODE_PMEM_CACHE_RECOVERY_DEFAULT);
|
||||||
|
|
||||||
|
this.processCommandsThresholdMs = getConf().getTimeDuration(
|
||||||
|
DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_KEY,
|
||||||
|
DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_DEFAULT,
|
||||||
|
TimeUnit.MILLISECONDS
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// We get minimumNameNodeVersion via a method so it can be mocked out in tests.
|
// We get minimumNameNodeVersion via a method so it can be mocked out in tests.
|
||||||
|
@ -460,4 +470,8 @@ public class DNConf {
|
||||||
public boolean getPmemCacheRecoveryEnabled() {
|
public boolean getPmemCacheRecoveryEnabled() {
|
||||||
return pmemCacheRecoveryEnabled;
|
return pmemCacheRecoveryEnabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getProcessCommandsThresholdMs() {
|
||||||
|
return processCommandsThresholdMs;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -165,6 +165,8 @@ public class DataNodeMetrics {
|
||||||
private MutableCounterLong sumOfActorCommandQueueLength;
|
private MutableCounterLong sumOfActorCommandQueueLength;
|
||||||
@Metric("Num of processed commands of all BPServiceActors")
|
@Metric("Num of processed commands of all BPServiceActors")
|
||||||
private MutableCounterLong numProcessedCommands;
|
private MutableCounterLong numProcessedCommands;
|
||||||
|
@Metric("Rate of processed commands of all BPServiceActors")
|
||||||
|
private MutableRate processedCommandsOp;
|
||||||
|
|
||||||
final MetricsRegistry registry = new MetricsRegistry("datanode");
|
final MetricsRegistry registry = new MetricsRegistry("datanode");
|
||||||
final String name;
|
final String name;
|
||||||
|
@ -553,4 +555,12 @@ public class DataNodeMetrics {
|
||||||
public void incrNumProcessedCommands() {
|
public void incrNumProcessedCommands() {
|
||||||
numProcessedCommands.incr();
|
numProcessedCommands.incr();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add processedCommandsOp metrics.
|
||||||
|
* @param latency milliseconds of process commands
|
||||||
|
*/
|
||||||
|
public void addNumProcessedCommands(long latency) {
|
||||||
|
processedCommandsOp.add(latency);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2933,6 +2933,15 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.datanode.processcommands.threshold</name>
|
||||||
|
<value>2s</value>
|
||||||
|
<description>The threshold in milliseconds at which we will log a slow
|
||||||
|
command processing in BPServiceActor. By default, this parameter is set
|
||||||
|
to 2 seconds.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.client.refresh.read-block-locations.ms</name>
|
<name>dfs.client.refresh.read-block-locations.ms</name>
|
||||||
<value>0</value>
|
<value>0</value>
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
|
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
@ -1078,6 +1079,9 @@ public class TestBPOfferService {
|
||||||
assertTrue("Process command nums is not expected.",
|
assertTrue("Process command nums is not expected.",
|
||||||
getLongCounter("NumProcessedCommands", mrb) > 0);
|
getLongCounter("NumProcessedCommands", mrb) > 0);
|
||||||
assertEquals(0, getLongCounter("SumOfActorCommandQueueLength", mrb));
|
assertEquals(0, getLongCounter("SumOfActorCommandQueueLength", mrb));
|
||||||
|
// Check new metric result about processedCommandsOp.
|
||||||
|
// One command send back to DataNode here is #FinalizeCommand.
|
||||||
|
assertCounter("ProcessedCommandsOpNumOps", 1L, mrb);
|
||||||
} finally {
|
} finally {
|
||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
|
|
Loading…
Reference in New Issue