HDFS-15261. RBF: Add Block Related Metrics. Contributed by Ayush Saxena.

This commit is contained in:
Ayush Saxena 2020-04-09 23:36:53 +05:30
parent 23481ad378
commit 275c478330
12 changed files with 396 additions and 37 deletions

View File

@ -276,4 +276,45 @@ public interface FederationMBean {
*/
@Deprecated
boolean isSecurityEnabled();
/**
* Get the number of corrupts files.
*
* @return the total number of corrupt files.
*/
int getCorruptFilesCount();
/**
* Blocks scheduled for replication.
*
* @return num of blocks scheduled for replication.
*/
long getScheduledReplicationBlocks();
/**
* Gets the total number of missing blocks on the cluster with
* replication factor 1.
*
* @return the total number of missing blocks on the cluster with
* replication factor 1.
*/
long getNumberOfMissingBlocksWithReplicationFactorOne();
/**
* Gets the total number of replicated low redundancy blocks on the cluster
* with the highest risk of loss.
*
* @return the total number of low redundancy blocks on the cluster
* with the highest risk of loss.
*/
long getHighestPriorityLowRedundancyReplicatedBlocks();
/**
* Gets the total number of erasure coded low redundancy blocks on the cluster
* with the highest risk of loss.
*
* @return the total number of low redundancy blocks on the cluster
* with the highest risk of loss.
*/
long getHighestPriorityLowRedundancyECBlocks();
}

View File

@ -366,21 +366,46 @@ public class NamenodeBeanMetrics
@Override
public long getScheduledReplicationBlocks() {
return -1;
try {
return getRBFMetrics().getScheduledReplicationBlocks();
} catch (IOException e) {
LOG.debug("Failed to get number of scheduled replication blocks.",
e.getMessage());
}
return 0;
}
@Override
public long getNumberOfMissingBlocksWithReplicationFactorOne() {
try {
return getRBFMetrics().getNumberOfMissingBlocksWithReplicationFactorOne();
} catch (IOException e) {
LOG.debug("Failed to get number of missing blocks with replication "
+ "factor one.", e.getMessage());
}
return 0;
}
@Override
public long getHighestPriorityLowRedundancyReplicatedBlocks() {
try {
return getRBFMetrics().getHighestPriorityLowRedundancyReplicatedBlocks();
} catch (IOException e) {
LOG.debug("Failed to get number of highest priority low redundancy "
+ "replicated blocks.", e.getMessage());
}
return 0;
}
@Override
public long getHighestPriorityLowRedundancyECBlocks() {
try {
return getRBFMetrics().getHighestPriorityLowRedundancyECBlocks();
} catch (IOException e) {
LOG.debug("Failed to get number of highest priority low redundancy EC "
+ "blocks.",
e.getMessage());
}
return 0;
}
@ -391,6 +416,11 @@ public class NamenodeBeanMetrics
@Override
public int getCorruptFilesCount() {
try {
return getRBFMetrics().getCorruptFilesCount();
} catch (IOException e) {
LOG.debug("Failed to get number of corrupt files.", e.getMessage());
}
return 0;
}

View File

@ -654,6 +654,35 @@ public class RBFMetrics implements RouterMBean, FederationMBean {
return UserGroupInformation.isSecurityEnabled();
}
@Override
public int getCorruptFilesCount() {
return getNameserviceAggregatedInt(MembershipStats::getCorruptFilesCount);
}
@Override
public long getScheduledReplicationBlocks() {
return getNameserviceAggregatedLong(
MembershipStats::getScheduledReplicationBlocks);
}
@Override
public long getNumberOfMissingBlocksWithReplicationFactorOne() {
return getNameserviceAggregatedLong(
MembershipStats::getNumberOfMissingBlocksWithReplicationFactorOne);
}
@Override
public long getHighestPriorityLowRedundancyReplicatedBlocks() {
return getNameserviceAggregatedLong(
MembershipStats::getHighestPriorityLowRedundancyReplicatedBlocks);
}
@Override
public long getHighestPriorityLowRedundancyECBlocks() {
return getNameserviceAggregatedLong(
MembershipStats::getHighestPriorityLowRedundancyECBlocks);
}
@Override
public String getSafemode() {
if (this.router.isRouterState(RouterServiceState.SAFEMODE)) {

View File

@ -295,6 +295,15 @@ public class MembershipNamenodeResolver
report.getNumInMaintenanceDeadDataNodes());
stats.setNumOfEnteringMaintenanceDataNodes(
report.getNumEnteringMaintenanceDataNodes());
stats.setCorruptFilesCount(report.getCorruptFilesCount());
stats.setScheduledReplicationBlocks(
report.getScheduledReplicationBlocks());
stats.setNumberOfMissingBlocksWithReplicationFactorOne(
report.getNumberOfMissingBlocksWithReplicationFactorOne());
stats.setHighestPriorityLowRedundancyReplicatedBlocks(
report.getHighestPriorityLowRedundancyReplicatedBlocks());
stats.setHighestPriorityLowRedundancyECBlocks(
report.getHighestPriorityLowRedundancyECBlocks());
record.setStats(stats);
}

View File

@ -70,6 +70,11 @@ public class NamenodeStatusReport {
private long numOfBlocksPendingDeletion = -1;
private long totalSpace = -1;
private long providedSpace = -1;
private int corruptFilesCount = -1;
private long scheduledReplicationBlocks = -1;
private long numberOfMissingBlocksWithReplicationFactorOne = -1;
private long highestPriorityLowRedundancyReplicatedBlocks = -1;
private long highestPriorityLowRedundancyECBlocks = -1;
/** If the fields are valid. */
private boolean registrationValid = false;
@ -251,11 +256,12 @@ public class NamenodeStatusReport {
* @param numInMaintenanceLive Number of in maintenance live nodes.
* @param numInMaintenanceDead Number of in maintenance dead nodes.
* @param numEnteringMaintenance Number of entering maintenance nodes.
* @param numScheduledReplicationBlocks Number of scheduled rep. blocks.
*/
public void setDatanodeInfo(int numLive, int numDead, int numStale,
int numDecom, int numLiveDecom, int numDeadDecom,
int numInMaintenanceLive, int numInMaintenanceDead,
int numEnteringMaintenance) {
int numEnteringMaintenance, long numScheduledReplicationBlocks) {
this.liveDatanodes = numLive;
this.deadDatanodes = numDead;
this.staleDatanodes = numStale;
@ -266,6 +272,7 @@ public class NamenodeStatusReport {
this.inMaintenanceDeadDataNodes = numInMaintenanceDead;
this.enteringMaintenanceDataNodes = numEnteringMaintenance;
this.statsValid = true;
this.scheduledReplicationBlocks = numScheduledReplicationBlocks;
}
/**
@ -378,6 +385,81 @@ public class NamenodeStatusReport {
this.providedSpace = providedSpace;
}
/**
* Set the namenode blocks information.
*
* @param numCorruptFiles number of corrupt files.
* @param numOfMissingBlocksWithReplicationFactorOne number of missing
* blocks with rep one.
* @param highestPriorityLowRedundancyRepBlocks number of high priority low
* redundancy rep blocks.
* @param highPriorityLowRedundancyECBlocks number of high priority low
* redundancy EC blocks.
*/
public void setNamenodeInfo(int numCorruptFiles,
long numOfMissingBlocksWithReplicationFactorOne,
long highestPriorityLowRedundancyRepBlocks,
long highPriorityLowRedundancyECBlocks) {
this.corruptFilesCount = numCorruptFiles;
this.numberOfMissingBlocksWithReplicationFactorOne =
numOfMissingBlocksWithReplicationFactorOne;
this.highestPriorityLowRedundancyReplicatedBlocks =
highestPriorityLowRedundancyRepBlocks;
this.highestPriorityLowRedundancyECBlocks =
highPriorityLowRedundancyECBlocks;
}
/**
* Get the number of corrupt files.
*
* @return the total number of corrupt files
*/
public int getCorruptFilesCount() {
return this.corruptFilesCount;
}
/**
* Blocks scheduled for replication.
*
* @return - num of blocks scheduled for replication
*/
public long getScheduledReplicationBlocks() {
return this.scheduledReplicationBlocks;
}
/**
* Gets the total number of missing blocks on the cluster with
* replication factor 1.
*
* @return the total number of missing blocks on the cluster with
* replication factor 1.
*/
public long getNumberOfMissingBlocksWithReplicationFactorOne() {
return this.numberOfMissingBlocksWithReplicationFactorOne;
}
/**
* Gets the total number of replicated low redundancy blocks on the cluster
* with the highest risk of loss.
*
* @return the total number of low redundancy blocks on the cluster
* with the highest risk of loss.
*/
public long getHighestPriorityLowRedundancyReplicatedBlocks() {
return this.highestPriorityLowRedundancyReplicatedBlocks;
}
/**
* Gets the total number of erasure coded low redundancy blocks on the cluster
* with the highest risk of loss.
*
* @return the total number of low redundancy blocks on the cluster
* with the highest risk of loss.
*/
public long getHighestPriorityLowRedundancyECBlocks() {
return this.highestPriorityLowRedundancyECBlocks;
}
/**
* Get the number of blocks.
*

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.tools.DFSHAAdmin;
import org.apache.hadoop.hdfs.tools.NNHAServiceTarget;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -344,44 +345,82 @@ public class NamenodeHeartbeatService extends PeriodicService {
String address, NamenodeStatusReport report) {
try {
// TODO part of this should be moved to its own utility
String query = "Hadoop:service=NameNode,name=FSNamesystem*";
JSONArray aux = FederationUtil.getJmx(
query, address, connectionFactory, scheme);
if (aux != null) {
for (int i = 0; i < aux.length(); i++) {
JSONObject jsonObject = aux.getJSONObject(i);
String name = jsonObject.getString("name");
if (name.equals("Hadoop:service=NameNode,name=FSNamesystemState")) {
report.setDatanodeInfo(
jsonObject.getInt("NumLiveDataNodes"),
jsonObject.getInt("NumDeadDataNodes"),
jsonObject.getInt("NumStaleDataNodes"),
jsonObject.getInt("NumDecommissioningDataNodes"),
jsonObject.getInt("NumDecomLiveDataNodes"),
jsonObject.getInt("NumDecomDeadDataNodes"),
jsonObject.optInt("NumInMaintenanceLiveDataNodes"),
jsonObject.optInt("NumInMaintenanceDeadDataNodes"),
jsonObject.optInt("NumEnteringMaintenanceDataNodes"));
} else if (name.equals(
"Hadoop:service=NameNode,name=FSNamesystem")) {
report.setNamesystemInfo(
jsonObject.getLong("CapacityRemaining"),
jsonObject.getLong("CapacityTotal"),
jsonObject.getLong("FilesTotal"),
jsonObject.getLong("BlocksTotal"),
jsonObject.getLong("MissingBlocks"),
jsonObject.getLong("PendingReplicationBlocks"),
jsonObject.getLong("UnderReplicatedBlocks"),
jsonObject.getLong("PendingDeletionBlocks"),
jsonObject.optLong("ProvidedCapacityTotal"));
}
}
}
getFsNamesystemMetrics(address, report);
getNamenodeInfoMetrics(address, report);
} catch (Exception e) {
LOG.error("Cannot get stat from {} using JMX", getNamenodeDesc(), e);
}
}
/**
* Fetches NamenodeInfo metrics from namenode.
* @param address Web interface of the Namenode to monitor.
* @param report Namenode status report to update with JMX data.
* @throws JSONException
*/
private void getNamenodeInfoMetrics(String address,
NamenodeStatusReport report) throws JSONException {
String query = "Hadoop:service=NameNode,name=NameNodeInfo";
JSONArray aux =
FederationUtil.getJmx(query, address, connectionFactory, scheme);
if (aux != null && aux.length() > 0) {
JSONObject jsonObject = aux.getJSONObject(0);
String name = jsonObject.getString("name");
if (name.equals("Hadoop:service=NameNode,name=NameNodeInfo")) {
report.setNamenodeInfo(jsonObject.optInt("CorruptFilesCount"),
jsonObject
.optLong("NumberOfMissingBlocksWithReplicationFactorOne"),
jsonObject
.optLong("HighestPriorityLowRedundancyReplicatedBlocks"),
jsonObject.optLong("HighestPriorityLowRedundancyECBlocks"));
}
}
}
/**
* Fetches FSNamesystem* metrics from namenode.
* @param address Web interface of the Namenode to monitor.
* @param report Namenode status report to update with JMX data.
* @throws JSONException
*/
private void getFsNamesystemMetrics(String address,
NamenodeStatusReport report) throws JSONException {
String query = "Hadoop:service=NameNode,name=FSNamesystem*";
JSONArray aux = FederationUtil.getJmx(
query, address, connectionFactory, scheme);
if (aux != null) {
for (int i = 0; i < aux.length(); i++) {
JSONObject jsonObject = aux.getJSONObject(i);
String name = jsonObject.getString("name");
if (name.equals("Hadoop:service=NameNode,name=FSNamesystemState")) {
report.setDatanodeInfo(
jsonObject.getInt("NumLiveDataNodes"),
jsonObject.getInt("NumDeadDataNodes"),
jsonObject.getInt("NumStaleDataNodes"),
jsonObject.getInt("NumDecommissioningDataNodes"),
jsonObject.getInt("NumDecomLiveDataNodes"),
jsonObject.getInt("NumDecomDeadDataNodes"),
jsonObject.optInt("NumInMaintenanceLiveDataNodes"),
jsonObject.optInt("NumInMaintenanceDeadDataNodes"),
jsonObject.optInt("NumEnteringMaintenanceDataNodes"),
jsonObject.optLong("ScheduledReplicationBlocks"));
} else if (name.equals(
"Hadoop:service=NameNode,name=FSNamesystem")) {
report.setNamesystemInfo(
jsonObject.getLong("CapacityRemaining"),
jsonObject.getLong("CapacityTotal"),
jsonObject.getLong("FilesTotal"),
jsonObject.getLong("BlocksTotal"),
jsonObject.getLong("MissingBlocks"),
jsonObject.getLong("PendingReplicationBlocks"),
jsonObject.getLong("UnderReplicatedBlocks"),
jsonObject.getLong("PendingDeletionBlocks"),
jsonObject.optLong("ProvidedCapacityTotal"));
}
}
}
}
@Override
protected void serviceStop() throws Exception {
LOG.info("Stopping NamenodeHeartbeat service for, NS {} NN {} ",

View File

@ -109,6 +109,30 @@ public abstract class MembershipStats extends BaseRecord {
public abstract int getNumOfEnteringMaintenanceDataNodes();
public abstract void setCorruptFilesCount(int num);
public abstract int getCorruptFilesCount();
public abstract void setScheduledReplicationBlocks(long blocks);
public abstract long getScheduledReplicationBlocks();
public abstract void setNumberOfMissingBlocksWithReplicationFactorOne(
long blocks);
public abstract long getNumberOfMissingBlocksWithReplicationFactorOne();
public abstract void setHighestPriorityLowRedundancyReplicatedBlocks(
long blocks);
public abstract long getHighestPriorityLowRedundancyReplicatedBlocks();
public abstract void setHighestPriorityLowRedundancyECBlocks(
long blocks);
public abstract long getHighestPriorityLowRedundancyECBlocks();
@Override
public SortedMap<String, String> getPrimaryKeys() {
// This record is not stored directly, no key needed

View File

@ -241,4 +241,60 @@ public class MembershipStatsPBImpl extends MembershipStats
return this.translator.getProtoOrBuilder()
.getNumOfEnteringMaintenanceDataNodes();
}
@Override
public void setCorruptFilesCount(int num) {
this.translator.getBuilder().setCorruptFilesCount(num);
}
@Override
public int getCorruptFilesCount() {
return this.translator.getProtoOrBuilder().getCorruptFilesCount();
}
@Override
public void setScheduledReplicationBlocks(long blocks) {
this.translator.getBuilder().setScheduledReplicationBlocks(blocks);
}
@Override
public long getScheduledReplicationBlocks() {
return this.translator.getProtoOrBuilder().getScheduledReplicationBlocks();
}
@Override
public void setNumberOfMissingBlocksWithReplicationFactorOne(long blocks) {
this.translator.getBuilder()
.setNumberOfMissingBlocksWithReplicationFactorOne(blocks);
}
@Override
public long getNumberOfMissingBlocksWithReplicationFactorOne() {
return this.translator.getProtoOrBuilder()
.getNumberOfMissingBlocksWithReplicationFactorOne();
}
@Override
public void setHighestPriorityLowRedundancyReplicatedBlocks(long blocks) {
this.translator.getBuilder()
.setHighestPriorityLowRedundancyReplicatedBlocks(blocks);
}
@Override
public long getHighestPriorityLowRedundancyReplicatedBlocks() {
return this.translator.getProtoOrBuilder()
.getHighestPriorityLowRedundancyReplicatedBlocks();
}
@Override
public void setHighestPriorityLowRedundancyECBlocks(long blocks) {
this.translator.getBuilder()
.setHighestPriorityLowRedundancyECBlocks(blocks);
}
@Override
public long getHighestPriorityLowRedundancyECBlocks() {
return this.translator.getProtoOrBuilder()
.getHighestPriorityLowRedundancyECBlocks();
}
}

View File

@ -49,6 +49,11 @@ message NamenodeMembershipStatsRecordProto {
optional uint32 numOfInMaintenanceLiveDataNodes = 26;
optional uint32 numOfInMaintenanceDeadDataNodes = 27;
optional uint32 numOfEnteringMaintenanceDataNodes = 28;
optional uint32 corruptFilesCount = 29;
optional uint64 scheduledReplicationBlocks = 30;
optional uint64 numberOfMissingBlocksWithReplicationFactorOne = 31;
optional uint64 highestPriorityLowRedundancyReplicatedBlocks = 32;
optional uint64 HighestPriorityLowRedundancyECBlocks = 33;
}
message NamenodeMembershipRecordProto {

View File

@ -286,6 +286,11 @@ public class TestRBFMetrics extends TestMetricsBase {
long numInMaintenanceLive = 0;
long numInMaintenanceDead = 0;
long numEnteringMaintenance = 0;
int numCorruptsFilesCount = 0;
long scheduledReplicationBlocks = 0;
long numberOfMissingBlocksWithReplicationFactorOne = 0;
long highestPriorityLowRedundancyReplicatedBlocks = 0;
long highestPriorityLowRedundancyECBlocks = 0;
long numFiles = 0;
for (MembershipState mock : getActiveMemberships()) {
MembershipStats stats = mock.getStats();
@ -299,6 +304,14 @@ public class TestRBFMetrics extends TestMetricsBase {
numInMaintenanceLive += stats.getNumOfInMaintenanceLiveDataNodes();
numInMaintenanceDead += stats.getNumOfInMaintenanceLiveDataNodes();
numEnteringMaintenance += stats.getNumOfEnteringMaintenanceDataNodes();
numCorruptsFilesCount += stats.getCorruptFilesCount();
scheduledReplicationBlocks += stats.getScheduledReplicationBlocks();
numberOfMissingBlocksWithReplicationFactorOne +=
stats.getNumberOfMissingBlocksWithReplicationFactorOne();
highestPriorityLowRedundancyReplicatedBlocks +=
stats.getHighestPriorityLowRedundancyReplicatedBlocks();
highestPriorityLowRedundancyECBlocks +=
stats.getHighestPriorityLowRedundancyECBlocks();
}
assertEquals(numBlocks, bean.getNumBlocks());
@ -316,6 +329,15 @@ public class TestRBFMetrics extends TestMetricsBase {
assertEquals(getActiveMemberships().size() + getStandbyMemberships().size(),
bean.getNumNamenodes());
assertEquals(getNameservices().size(), bean.getNumNameservices());
assertEquals(numCorruptsFilesCount, bean.getCorruptFilesCount());
assertEquals(scheduledReplicationBlocks,
bean.getScheduledReplicationBlocks());
assertEquals(numberOfMissingBlocksWithReplicationFactorOne,
bean.getNumberOfMissingBlocksWithReplicationFactorOne());
assertEquals(highestPriorityLowRedundancyReplicatedBlocks,
bean.getHighestPriorityLowRedundancyReplicatedBlocks());
assertEquals(highestPriorityLowRedundancyECBlocks,
bean.getHighestPriorityLowRedundancyECBlocks());
}
private void validateClusterStatsRouterBean(RouterMBean bean) {

View File

@ -320,10 +320,10 @@ public class TestRouterNamenodeMonitoring {
heartbeatService.getNamenodeStatusReport();
}
if (HttpConfig.Policy.HTTPS_ONLY.name().equals(scheme)) {
assertEquals(1, appender.countLinesWithMessage("JMX URL: https://"));
assertEquals(2, appender.countLinesWithMessage("JMX URL: https://"));
assertEquals(0, appender.countLinesWithMessage("JMX URL: http://"));
} else {
assertEquals(1, appender.countLinesWithMessage("JMX URL: http://"));
assertEquals(2, appender.countLinesWithMessage("JMX URL: http://"));
assertEquals(0, appender.countLinesWithMessage("JMX URL: https://"));
}
}

View File

@ -57,6 +57,11 @@ public class TestMembershipState {
private static final int NUM_MAIN_DEAD = 303;
private static final int NUM_ENTER_MAIN = 144;
private static final long NUM_BLOCK_MISSING = 1000;
private static final int CORRUPT_FILES_COUNT = 123;
private static final long SCHEDULED_REPLICATION_BLOCKS = 112;
private static final long MISSING_BLOCK_WITH_REPLICATION_ONE = 221;
private static final long HIGHEST_PRIORITY_LOW_REDUNDANCY_REPL_BLOCK = 212;
private static final long HIGHEST_PRIORITY_LOW_REDUNDANCY_EC_BLOCK = 122;
private static final long TOTAL_SPACE = 1100;
private static final long AVAILABLE_SPACE = 1200;
@ -88,6 +93,14 @@ public class TestMembershipState {
stats.setNumOfBlocksMissing(NUM_BLOCK_MISSING);
stats.setTotalSpace(TOTAL_SPACE);
stats.setAvailableSpace(AVAILABLE_SPACE);
stats.setCorruptFilesCount(CORRUPT_FILES_COUNT);
stats.setScheduledReplicationBlocks(SCHEDULED_REPLICATION_BLOCKS);
stats.setNumberOfMissingBlocksWithReplicationFactorOne(
MISSING_BLOCK_WITH_REPLICATION_ONE);
stats.setHighestPriorityLowRedundancyReplicatedBlocks(
HIGHEST_PRIORITY_LOW_REDUNDANCY_REPL_BLOCK);
stats.setHighestPriorityLowRedundancyECBlocks(
HIGHEST_PRIORITY_LOW_REDUNDANCY_EC_BLOCK);
record.setStats(stats);
return record;
}
@ -120,6 +133,15 @@ public class TestMembershipState {
assertEquals(NUM_ENTER_MAIN, stats.getNumOfEnteringMaintenanceDataNodes());
assertEquals(TOTAL_SPACE, stats.getTotalSpace());
assertEquals(AVAILABLE_SPACE, stats.getAvailableSpace());
assertEquals(CORRUPT_FILES_COUNT, stats.getCorruptFilesCount());
assertEquals(SCHEDULED_REPLICATION_BLOCKS,
stats.getScheduledReplicationBlocks());
assertEquals(MISSING_BLOCK_WITH_REPLICATION_ONE,
stats.getNumberOfMissingBlocksWithReplicationFactorOne());
assertEquals(HIGHEST_PRIORITY_LOW_REDUNDANCY_REPL_BLOCK,
stats.getHighestPriorityLowRedundancyReplicatedBlocks());
assertEquals(HIGHEST_PRIORITY_LOW_REDUNDANCY_EC_BLOCK,
stats.getHighestPriorityLowRedundancyECBlocks());
}
@Test