HDFS-12787. Ozone: SCM: Aggregate the metrics from all the container reports. Contributed by Yiqun Lin.
This commit is contained in:
parent
cbe4f314b3
commit
c8d8270f72
|
@ -18,6 +18,8 @@
|
|||
|
||||
package org.apache.hadoop.ozone.scm;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.jmx.ServiceRuntimeInfo;
|
||||
|
||||
|
@ -39,4 +41,10 @@ public interface SCMMXBean extends ServiceRuntimeInfo {
|
|||
* @return SCM client RPC server port
|
||||
*/
|
||||
String getClientRpcPort();
|
||||
|
||||
/**
|
||||
* Get container report info that includes container IO stats of nodes.
|
||||
* @return The datanodeUUid to report json string mapping
|
||||
*/
|
||||
Map<String, String> getContainerReport();
|
||||
}
|
||||
|
|
|
@ -19,6 +19,10 @@ package org.apache.hadoop.ozone.scm;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.RemovalListener;
|
||||
import com.google.common.cache.RemovalNotification;
|
||||
import com.google.protobuf.BlockingService;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -104,6 +108,8 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.Collections;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -204,6 +210,9 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
|
||||
/** SCM metrics. */
|
||||
private static SCMMetrics metrics;
|
||||
/** Key = DatanodeUuid, value = ContainerStat. */
|
||||
private Cache<String, ContainerStat> containerReportCache;
|
||||
|
||||
|
||||
private static final String USAGE =
|
||||
"Usage: \n hdfs scm [ " + StartupOption.INIT.getName() + " [ "
|
||||
|
@ -225,13 +234,15 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
|
||||
|
||||
StorageContainerManager.initMetrics();
|
||||
initContainerReportCache(conf);
|
||||
|
||||
scmStorage = new SCMStorage(conf);
|
||||
String clusterId = scmStorage.getClusterID();
|
||||
if (clusterId == null) {
|
||||
throw new SCMException("clusterId not found",
|
||||
ResultCodes.SCM_NOT_INITIALIZED);
|
||||
}
|
||||
scmNodeManager = new SCMNodeManager(conf, scmStorage.getClusterID());
|
||||
scmNodeManager = new SCMNodeManager(conf, scmStorage.getClusterID(), this);
|
||||
scmContainerManager = new ContainerMapping(conf, scmNodeManager, cacheSize);
|
||||
scmBlockManager = new BlockManagerImpl(conf, scmNodeManager,
|
||||
scmContainerManager, cacheSize);
|
||||
|
@ -297,6 +308,31 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
registerMXBean();
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize container reports cache that sent from datanodes.
|
||||
*
|
||||
* @param conf
|
||||
*/
|
||||
private void initContainerReportCache(OzoneConfiguration conf) {
|
||||
containerReportCache = CacheBuilder.newBuilder()
|
||||
.expireAfterAccess(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
|
||||
.maximumSize(Integer.MAX_VALUE)
|
||||
.removalListener(new RemovalListener<String, ContainerStat>() {
|
||||
@Override
|
||||
public void onRemoval(
|
||||
RemovalNotification<String, ContainerStat> removalNotification) {
|
||||
synchronized (containerReportCache) {
|
||||
ContainerStat stat = removalNotification.getValue();
|
||||
// remove invalid container report
|
||||
metrics.decrContainerStat(stat);
|
||||
LOG.debug(
|
||||
"Remove expired container stat entry for datanode: {}.",
|
||||
removalNotification.getKey());
|
||||
}
|
||||
}
|
||||
}).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a message for logging startup information about an RPC server.
|
||||
*
|
||||
|
@ -836,7 +872,15 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
LOG.error("SCM block manager service stop failed.", ex);
|
||||
}
|
||||
|
||||
metrics.unRegister();
|
||||
if (containerReportCache != null) {
|
||||
containerReportCache.invalidateAll();
|
||||
containerReportCache.cleanUp();
|
||||
}
|
||||
|
||||
if (metrics != null) {
|
||||
metrics.unRegister();
|
||||
}
|
||||
|
||||
unregisterMXBean();
|
||||
IOUtils.cleanupWithLogger(LOG, scmContainerManager);
|
||||
IOUtils.cleanupWithLogger(LOG, scmBlockManager);
|
||||
|
@ -917,27 +961,7 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
@Override
|
||||
public ContainerReportsResponseProto sendContainerReport(
|
||||
ContainerReportsRequestProto reports) throws IOException {
|
||||
// TODO: We should update the logic once incremental container report
|
||||
// type is supported.
|
||||
if (reports.getType() ==
|
||||
ContainerReportsRequestProto.reportType.fullReport) {
|
||||
ContainerStat stat = new ContainerStat();
|
||||
for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports
|
||||
.getReportsList()) {
|
||||
stat.add(new ContainerStat(info.getSize(), info.getUsed(),
|
||||
info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(),
|
||||
info.getReadCount(), info.getWriteCount()));
|
||||
}
|
||||
|
||||
// update container metrics
|
||||
metrics.setLastContainerReportSize(stat.getSize().get());
|
||||
metrics.setLastContainerReportUsed(stat.getUsed().get());
|
||||
metrics.setLastContainerReportKeyCount(stat.getKeyCount().get());
|
||||
metrics.setLastContainerReportReadBytes(stat.getReadBytes().get());
|
||||
metrics.setLastContainerReportWriteBytes(stat.getWriteBytes().get());
|
||||
metrics.setLastContainerReportReadCount(stat.getReadCount().get());
|
||||
metrics.setLastContainerReportWriteCount(stat.getWriteCount().get());
|
||||
}
|
||||
updateContainerReportMetrics(reports);
|
||||
|
||||
// should we process container reports async?
|
||||
scmContainerManager.processContainerReports(
|
||||
|
@ -946,6 +970,37 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
return ContainerReportsResponseProto.newBuilder().build();
|
||||
}
|
||||
|
||||
private void updateContainerReportMetrics(
|
||||
ContainerReportsRequestProto reports) {
|
||||
ContainerStat newStat = null;
|
||||
// TODO: We should update the logic once incremental container report
|
||||
// type is supported.
|
||||
if (reports
|
||||
.getType() == ContainerReportsRequestProto.reportType.fullReport) {
|
||||
newStat = new ContainerStat();
|
||||
for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports
|
||||
.getReportsList()) {
|
||||
newStat.add(new ContainerStat(info.getSize(), info.getUsed(),
|
||||
info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(),
|
||||
info.getReadCount(), info.getWriteCount()));
|
||||
}
|
||||
|
||||
// update container metrics
|
||||
metrics.setLastContainerStat(newStat);
|
||||
}
|
||||
|
||||
// Update container stat entry, this will trigger a removal operation if it
|
||||
// exists in cache.
|
||||
synchronized (containerReportCache) {
|
||||
String datanodeUuid = reports.getDatanodeID().getDatanodeUuid();
|
||||
if (datanodeUuid != null && newStat != null) {
|
||||
containerReportCache.put(datanodeUuid, newStat);
|
||||
// update global view container metrics
|
||||
metrics.incrContainerStat(newStat);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles the block deletion ACKs sent by datanodes. Once ACKs recieved,
|
||||
* SCM considers the blocks are deleted and update the metadata in SCM DB.
|
||||
|
@ -1124,4 +1179,53 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
public static SCMMetrics getMetrics() {
|
||||
return metrics == null ? SCMMetrics.create() : metrics;
|
||||
}
|
||||
|
||||
/**
|
||||
* Invalidate container stat entry for given datanode.
|
||||
*
|
||||
* @param datanodeUuid
|
||||
*/
|
||||
public void removeContainerReport(String datanodeUuid) {
|
||||
synchronized (containerReportCache) {
|
||||
containerReportCache.invalidate(datanodeUuid);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get container stat of specified datanode.
|
||||
*
|
||||
* @param datanodeUuid
|
||||
* @return
|
||||
*/
|
||||
public ContainerStat getContainerReport(String datanodeUuid) {
|
||||
ContainerStat stat = null;
|
||||
synchronized (containerReportCache) {
|
||||
stat = containerReportCache.getIfPresent(datanodeUuid);
|
||||
}
|
||||
|
||||
return stat;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a view of the container stat entries. Modifications made to the
|
||||
* map will directly affect the cache.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public ConcurrentMap<String, ContainerStat> getContainerReportCache() {
|
||||
return containerReportCache.asMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getContainerReport() {
|
||||
Map<String, String> id2StatMap = new HashMap<>();
|
||||
synchronized (containerReportCache) {
|
||||
ConcurrentMap<String, ContainerStat> map = containerReportCache.asMap();
|
||||
for (Map.Entry<String, ContainerStat> entry : map.entrySet()) {
|
||||
id2StatMap.put(entry.getKey(), entry.getValue().toJsonString());
|
||||
}
|
||||
}
|
||||
|
||||
return id2StatMap;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,11 @@
|
|||
*/
|
||||
package org.apache.hadoop.ozone.scm.container.placement.metrics;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.ozone.web.utils.JsonUtils;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
|
@ -26,36 +31,43 @@ public class ContainerStat {
|
|||
/**
|
||||
* The maximum container size.
|
||||
*/
|
||||
@JsonProperty("Size")
|
||||
private LongMetric size;
|
||||
|
||||
/**
|
||||
* The number of bytes used by the container.
|
||||
*/
|
||||
@JsonProperty("Used")
|
||||
private LongMetric used;
|
||||
|
||||
/**
|
||||
* The number of keys in the container.
|
||||
*/
|
||||
@JsonProperty("KeyCount")
|
||||
private LongMetric keyCount;
|
||||
|
||||
/**
|
||||
* The number of bytes read from the container.
|
||||
*/
|
||||
@JsonProperty("ReadBytes")
|
||||
private LongMetric readBytes;
|
||||
|
||||
/**
|
||||
* The number of bytes write into the container.
|
||||
*/
|
||||
@JsonProperty("WriteBytes")
|
||||
private LongMetric writeBytes;
|
||||
|
||||
/**
|
||||
* The number of times the container is read.
|
||||
*/
|
||||
@JsonProperty("ReadCount")
|
||||
private LongMetric readCount;
|
||||
|
||||
/**
|
||||
* The number of times the container is written into .
|
||||
* The number of times the container is written into.
|
||||
*/
|
||||
@JsonProperty("WriteCount")
|
||||
private LongMetric writeCount;
|
||||
|
||||
public ContainerStat() {
|
||||
|
@ -117,6 +129,10 @@ public class ContainerStat {
|
|||
}
|
||||
|
||||
public void add(ContainerStat stat) {
|
||||
if (stat == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.size.add(stat.getSize().get());
|
||||
this.used.add(stat.getUsed().get());
|
||||
this.keyCount.add(stat.getKeyCount().get());
|
||||
|
@ -125,4 +141,26 @@ public class ContainerStat {
|
|||
this.readCount.add(stat.getReadCount().get());
|
||||
this.writeCount.add(stat.getWriteCount().get());
|
||||
}
|
||||
}
|
||||
|
||||
public void subtract(ContainerStat stat) {
|
||||
if (stat == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.size.subtract(stat.getSize().get());
|
||||
this.used.subtract(stat.getUsed().get());
|
||||
this.keyCount.subtract(stat.getKeyCount().get());
|
||||
this.readBytes.subtract(stat.getReadBytes().get());
|
||||
this.writeBytes.subtract(stat.getWriteBytes().get());
|
||||
this.readCount.subtract(stat.getReadCount().get());
|
||||
this.writeCount.subtract(stat.getWriteCount().get());
|
||||
}
|
||||
|
||||
public String toJsonString() {
|
||||
try {
|
||||
return JsonUtils.toJsonString(this);
|
||||
} catch (IOException ignored) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -16,9 +16,13 @@
|
|||
*/
|
||||
package org.apache.hadoop.ozone.scm.container.placement.metrics;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonAutoDetect;
|
||||
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
|
||||
|
||||
/**
|
||||
* An helper class for all metrics based on Longs.
|
||||
*/
|
||||
@JsonAutoDetect(fieldVisibility = Visibility.ANY)
|
||||
public class LongMetric implements DatanodeMetric<Long, Long> {
|
||||
private Long value;
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.hadoop.metrics2.MetricsSystem;
|
|||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||
|
||||
/**
|
||||
|
@ -43,6 +44,14 @@ public class SCMMetrics {
|
|||
@Metric private MutableGaugeLong lastContainerReportReadCount;
|
||||
@Metric private MutableGaugeLong lastContainerReportWriteCount;
|
||||
|
||||
@Metric private MutableCounterLong containerReportSize;
|
||||
@Metric private MutableCounterLong containerReportUsed;
|
||||
@Metric private MutableCounterLong containerReportKeyCount;
|
||||
@Metric private MutableCounterLong containerReportReadBytes;
|
||||
@Metric private MutableCounterLong containerReportWriteBytes;
|
||||
@Metric private MutableCounterLong containerReportReadCount;
|
||||
@Metric private MutableCounterLong containerReportWriteCount;
|
||||
|
||||
public SCMMetrics() {
|
||||
}
|
||||
|
||||
|
@ -80,6 +89,64 @@ public class SCMMetrics {
|
|||
this.lastContainerReportWriteCount.set(writeCount);
|
||||
}
|
||||
|
||||
public void incrContainerReportSize(long size) {
|
||||
this.containerReportSize.incr(size);
|
||||
}
|
||||
|
||||
public void incrContainerReportUsed(long used) {
|
||||
this.containerReportUsed.incr(used);
|
||||
}
|
||||
|
||||
public void incrContainerReportKeyCount(long keyCount) {
|
||||
this.containerReportKeyCount.incr(keyCount);
|
||||
}
|
||||
|
||||
public void incrContainerReportReadBytes(long readBytes) {
|
||||
this.containerReportReadBytes.incr(readBytes);
|
||||
}
|
||||
|
||||
public void incrContainerReportWriteBytes(long writeBytes) {
|
||||
this.containerReportWriteBytes.incr(writeBytes);
|
||||
}
|
||||
|
||||
public void incrContainerReportReadCount(long readCount) {
|
||||
this.containerReportReadCount.incr(readCount);
|
||||
}
|
||||
|
||||
public void incrContainerReportWriteCount(long writeCount) {
|
||||
this.containerReportWriteCount.incr(writeCount);
|
||||
}
|
||||
|
||||
public void setLastContainerStat(ContainerStat newStat) {
|
||||
this.lastContainerReportSize.set(newStat.getSize().get());
|
||||
this.lastContainerReportUsed.set(newStat.getUsed().get());
|
||||
this.lastContainerReportKeyCount.set(newStat.getKeyCount().get());
|
||||
this.lastContainerReportReadBytes.set(newStat.getReadBytes().get());
|
||||
this.lastContainerReportWriteBytes.set(newStat.getWriteBytes().get());
|
||||
this.lastContainerReportReadCount.set(newStat.getReadCount().get());
|
||||
this.lastContainerReportWriteCount.set(newStat.getWriteCount().get());
|
||||
}
|
||||
|
||||
public void incrContainerStat(ContainerStat deltaStat) {
|
||||
this.containerReportSize.incr(deltaStat.getSize().get());
|
||||
this.containerReportUsed.incr(deltaStat.getUsed().get());
|
||||
this.containerReportKeyCount.incr(deltaStat.getKeyCount().get());
|
||||
this.containerReportReadBytes.incr(deltaStat.getReadBytes().get());
|
||||
this.containerReportWriteBytes.incr(deltaStat.getWriteBytes().get());
|
||||
this.containerReportReadCount.incr(deltaStat.getReadCount().get());
|
||||
this.containerReportWriteCount.incr(deltaStat.getWriteCount().get());
|
||||
}
|
||||
|
||||
public void decrContainerStat(ContainerStat deltaStat) {
|
||||
this.containerReportSize.incr(-1 * deltaStat.getSize().get());
|
||||
this.containerReportUsed.incr(-1 * deltaStat.getUsed().get());
|
||||
this.containerReportKeyCount.incr(-1 * deltaStat.getKeyCount().get());
|
||||
this.containerReportReadBytes.incr(-1 * deltaStat.getReadBytes().get());
|
||||
this.containerReportWriteBytes.incr(-1 * deltaStat.getWriteBytes().get());
|
||||
this.containerReportReadCount.incr(-1 * deltaStat.getReadCount().get());
|
||||
this.containerReportWriteCount.incr(-1 * deltaStat.getWriteCount().get());
|
||||
}
|
||||
|
||||
public void unRegister() {
|
||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||
ms.unregisterSource(SOURCE_NAME);
|
||||
|
|
|
@ -43,7 +43,7 @@ import org.apache.hadoop.ozone.protocol
|
|||
.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
|
||||
import org.apache.hadoop.ozone.protocol
|
||||
.proto.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
|
||||
|
||||
import org.apache.hadoop.ozone.scm.StorageContainerManager;
|
||||
import org.apache.hadoop.ozone.scm.VersionInfo;
|
||||
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric;
|
||||
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat;
|
||||
|
@ -121,7 +121,7 @@ public class SCMNodeManager
|
|||
private final AtomicInteger staleNodeCount;
|
||||
private final AtomicInteger deadNodeCount;
|
||||
private final AtomicInteger totalNodes;
|
||||
private final long staleNodeIntervalMs;
|
||||
private long staleNodeIntervalMs;
|
||||
private final long deadNodeIntervalMs;
|
||||
private final long heartbeatCheckerIntervalMs;
|
||||
private final long datanodeHBIntervalSeconds;
|
||||
|
@ -150,12 +150,13 @@ public class SCMNodeManager
|
|||
|
||||
// Node pool manager.
|
||||
private final SCMNodePoolManager nodePoolManager;
|
||||
private final StorageContainerManager scmManager;
|
||||
|
||||
/**
|
||||
* Constructs SCM machine Manager.
|
||||
*/
|
||||
public SCMNodeManager(OzoneConfiguration conf, String clusterID)
|
||||
throws IOException {
|
||||
public SCMNodeManager(OzoneConfiguration conf, String clusterID,
|
||||
StorageContainerManager scmManager) throws IOException {
|
||||
heartbeatQueue = new ConcurrentLinkedQueue<>();
|
||||
healthyNodes = new ConcurrentHashMap<>();
|
||||
deadNodes = new ConcurrentHashMap<>();
|
||||
|
@ -197,6 +198,7 @@ public class SCMNodeManager
|
|||
registerMXBean();
|
||||
|
||||
this.nodePoolManager = new SCMNodePoolManager(conf);
|
||||
this.scmManager = scmManager;
|
||||
}
|
||||
|
||||
private void registerMXBean() {
|
||||
|
@ -551,6 +553,11 @@ public class SCMNodeManager
|
|||
healthyNodeCount.decrementAndGet();
|
||||
staleNodes.put(entry.getKey(), entry.getValue());
|
||||
staleNodeCount.incrementAndGet();
|
||||
|
||||
if (scmManager != null) {
|
||||
// remove stale node's container report
|
||||
scmManager.removeContainerReport(entry.getKey());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -863,4 +870,9 @@ public class SCMNodeManager
|
|||
public void addDatanodeCommand(DatanodeID id, SCMCommand command) {
|
||||
this.commandQueue.addCommand(id, command);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setStaleNodeIntervalMs(long interval) {
|
||||
this.staleNodeIntervalMs = interval;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -110,13 +110,20 @@ Following are the counters for containers:
|
|||
|
||||
| Name | Description |
|
||||
|:---- |:---- |
|
||||
| `LastContainerReportSize` | Total size in bytes of all containers |
|
||||
| `LastContainerReportUsed` | Total number of bytes used by all containers |
|
||||
| `LastContainerReportKeyCount` | Total number of keys in all containers |
|
||||
| `LastContainerReportReadBytes` | Total number of bytes have been read from all containers |
|
||||
| `LastContainerReportWriteBytes` | Total number of bytes have been written into all containers |
|
||||
| `LastContainerReportReadCount` | Total number of times containers have been read from |
|
||||
| `LastContainerReportWriteCount` | Total number of times containers have been written to |
|
||||
| `LastContainerReportSize` | Total size in bytes of all containers in latest container report that SCM received from datanode |
|
||||
| `LastContainerReportUsed` | Total number of bytes used by all containers in latest container report that SCM received from datanode |
|
||||
| `LastContainerReportKeyCount` | Total number of keys in all containers in latest container report that SCM received from datanode |
|
||||
| `LastContainerReportReadBytes` | Total number of bytes have been read from all containers in latest container report that SCM received from datanode |
|
||||
| `LastContainerReportWriteBytes` | Total number of bytes have been written into all containers in latest container report that SCM received from datanode |
|
||||
| `LastContainerReportReadCount` | Total number of times containers have been read from in latest container report that SCM received from datanode |
|
||||
| `LastContainerReportWriteCount` | Total number of times containers have been written to in latest container report that SCM received from datanode |
|
||||
| `ContainerReportSize` | Total size in bytes of all containers over whole cluster |
|
||||
| `ContainerReportUsed` | Total number of bytes used by all containers over whole cluster |
|
||||
| `ContainerReportKeyCount` | Total number of keys in all containers over whole cluster |
|
||||
| `ContainerReportReadBytes` | Total number of bytes have been read from all containers over whole cluster |
|
||||
| `ContainerReportWriteBytes` | Total number of bytes have been written into all containers over whole cluster |
|
||||
| `ContainerReportReadCount` | Total number of times containers have been read from over whole cluster |
|
||||
| `ContainerReportWriteCount` | Total number of times containers have been written to over whole cluster |
|
||||
|
||||
### Key Space Metrics
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
|
|||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||
import org.apache.hadoop.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.scm.container.placement.metrics.ContainerStat;
|
||||
import org.apache.hadoop.ozone.scm.node.NodeManager;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -40,6 +41,7 @@ import java.io.IOException;
|
|||
import java.lang.management.ManagementFactory;
|
||||
import java.util.Map;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import javax.management.openmbean.CompositeData;
|
||||
|
@ -91,6 +93,24 @@ public class TestSCMMXBean {
|
|||
String clientRpcPort = (String)mbs.getAttribute(bean,
|
||||
"ClientRpcPort");
|
||||
assertEquals(scm.getClientRpcPort(), clientRpcPort);
|
||||
|
||||
ConcurrentMap<String, ContainerStat> map = scm.getContainerReportCache();
|
||||
ContainerStat stat = new ContainerStat(1, 2, 3, 4, 5, 6, 7);
|
||||
map.put("nodeID", stat);
|
||||
TabularData data = (TabularData) mbs.getAttribute(
|
||||
bean, "ContainerReport");
|
||||
|
||||
// verify report info
|
||||
assertEquals(1, data.values().size());
|
||||
for (Object obj : data.values()) {
|
||||
assertTrue(obj instanceof CompositeData);
|
||||
CompositeData d = (CompositeData) obj;
|
||||
Iterator<?> it = d.values().iterator();
|
||||
String key = it.next().toString();
|
||||
String value = it.next().toString();
|
||||
assertEquals("nodeID", key);
|
||||
assertEquals(stat.toJsonString(), value);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.ozone.scm;
|
||||
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getLongGauge;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
@ -25,9 +26,10 @@ import java.util.UUID;
|
|||
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.apache.hadoop.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
|
||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerReport;
|
||||
|
@ -35,13 +37,23 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr
|
|||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
|
||||
import org.apache.hadoop.ozone.scm.container.placement.metrics.ContainerStat;
|
||||
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMMetrics;
|
||||
import org.apache.hadoop.ozone.scm.node.SCMNodeManager;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
|
||||
/**
|
||||
* This class tests the metrics of Storage Container Manager.
|
||||
*/
|
||||
public class TestSCMMetrics {
|
||||
private static MiniOzoneCluster cluster = null;
|
||||
/**
|
||||
* Set the timeout for each test.
|
||||
*/
|
||||
@Rule
|
||||
public Timeout testTimeout = new Timeout(90000);
|
||||
|
||||
private static MiniOzoneClassicCluster cluster = null;
|
||||
|
||||
@Test
|
||||
public void testContainerMetrics() throws Exception {
|
||||
|
@ -64,7 +76,11 @@ public class TestSCMMetrics {
|
|||
ContainerStat stat = new ContainerStat(size, used, keyCount, readBytes,
|
||||
writeBytes, readCount, writeCount);
|
||||
StorageContainerManager scmManager = cluster.getStorageContainerManager();
|
||||
scmManager.sendContainerReport(createContainerReport(numReport, stat));
|
||||
|
||||
ContainerReportsRequestProto request = createContainerReport(numReport,
|
||||
stat, null);
|
||||
String fstDatanodeID = request.getDatanodeID().getDatanodeUuid();
|
||||
scmManager.sendContainerReport(request);
|
||||
|
||||
// verify container stat metrics
|
||||
MetricsRecordBuilder scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
|
||||
|
@ -83,6 +99,117 @@ public class TestSCMMetrics {
|
|||
getLongGauge("LastContainerReportReadCount", scmMetrics));
|
||||
assertEquals(writeCount * numReport,
|
||||
getLongGauge("LastContainerReportWriteCount", scmMetrics));
|
||||
|
||||
// add one new report
|
||||
request = createContainerReport(1, stat, null);
|
||||
String sndDatanodeID = request.getDatanodeID().getDatanodeUuid();
|
||||
scmManager.sendContainerReport(request);
|
||||
|
||||
scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
|
||||
assertEquals(size * (numReport + 1),
|
||||
getLongCounter("ContainerReportSize", scmMetrics));
|
||||
assertEquals(used * (numReport + 1),
|
||||
getLongCounter("ContainerReportUsed", scmMetrics));
|
||||
assertEquals(readBytes * (numReport + 1),
|
||||
getLongCounter("ContainerReportReadBytes", scmMetrics));
|
||||
assertEquals(writeBytes * (numReport + 1),
|
||||
getLongCounter("ContainerReportWriteBytes", scmMetrics));
|
||||
|
||||
assertEquals(keyCount * (numReport + 1),
|
||||
getLongCounter("ContainerReportKeyCount", scmMetrics));
|
||||
assertEquals(readCount * (numReport + 1),
|
||||
getLongCounter("ContainerReportReadCount", scmMetrics));
|
||||
assertEquals(writeCount * (numReport + 1),
|
||||
getLongCounter("ContainerReportWriteCount", scmMetrics));
|
||||
|
||||
// Re-send reports but with different value for validating
|
||||
// the aggregation.
|
||||
stat = new ContainerStat(100, 50, 3, 50, 60, 5, 6);
|
||||
scmManager.sendContainerReport(createContainerReport(1, stat,
|
||||
fstDatanodeID));
|
||||
|
||||
stat = new ContainerStat(1, 1, 1, 1, 1, 1, 1);
|
||||
scmManager.sendContainerReport(createContainerReport(1, stat,
|
||||
sndDatanodeID));
|
||||
|
||||
// the global container metrics value should be updated
|
||||
scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
|
||||
assertEquals(101, getLongCounter("ContainerReportSize", scmMetrics));
|
||||
assertEquals(51, getLongCounter("ContainerReportUsed", scmMetrics));
|
||||
assertEquals(51, getLongCounter("ContainerReportReadBytes", scmMetrics));
|
||||
assertEquals(61, getLongCounter("ContainerReportWriteBytes", scmMetrics));
|
||||
|
||||
assertEquals(4, getLongCounter("ContainerReportKeyCount", scmMetrics));
|
||||
assertEquals(6, getLongCounter("ContainerReportReadCount", scmMetrics));
|
||||
assertEquals(7, getLongCounter("ContainerReportWriteCount", scmMetrics));
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStaleNodeContainerReport() throws Exception {
|
||||
int nodeCount = 2;
|
||||
int numReport = 2;
|
||||
long size = OzoneConsts.GB * 5;
|
||||
long used = OzoneConsts.GB * 2;
|
||||
long readBytes = OzoneConsts.GB * 1;
|
||||
long writeBytes = OzoneConsts.GB * 2;
|
||||
int keyCount = 1000;
|
||||
int readCount = 100;
|
||||
int writeCount = 50;
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
|
||||
try {
|
||||
cluster = new MiniOzoneClassicCluster.Builder(conf)
|
||||
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
|
||||
.numDataNodes(nodeCount).build();
|
||||
|
||||
ContainerStat stat = new ContainerStat(size, used, keyCount, readBytes,
|
||||
writeBytes, readCount, writeCount);
|
||||
StorageContainerManager scmManager = cluster.getStorageContainerManager();
|
||||
|
||||
DataNode dataNode = cluster.getDataNodes().get(0);
|
||||
String datanodeUuid = dataNode.getDatanodeId().getDatanodeUuid();
|
||||
ContainerReportsRequestProto request = createContainerReport(numReport,
|
||||
stat, datanodeUuid);
|
||||
scmManager.sendContainerReport(request);
|
||||
|
||||
MetricsRecordBuilder scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
|
||||
assertEquals(size * numReport,
|
||||
getLongCounter("ContainerReportSize", scmMetrics));
|
||||
assertEquals(used * numReport,
|
||||
getLongCounter("ContainerReportUsed", scmMetrics));
|
||||
assertEquals(readBytes * numReport,
|
||||
getLongCounter("ContainerReportReadBytes", scmMetrics));
|
||||
assertEquals(writeBytes * numReport,
|
||||
getLongCounter("ContainerReportWriteBytes", scmMetrics));
|
||||
|
||||
assertEquals(keyCount * numReport,
|
||||
getLongCounter("ContainerReportKeyCount", scmMetrics));
|
||||
assertEquals(readCount * numReport,
|
||||
getLongCounter("ContainerReportReadCount", scmMetrics));
|
||||
assertEquals(writeCount * numReport,
|
||||
getLongCounter("ContainerReportWriteCount", scmMetrics));
|
||||
|
||||
// reset stale interval time to move node from healthy to stale
|
||||
SCMNodeManager nodeManager = (SCMNodeManager) cluster
|
||||
.getStorageContainerManager().getScmNodeManager();
|
||||
nodeManager.setStaleNodeIntervalMs(100);
|
||||
|
||||
// verify the metrics when node becomes stale
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
MetricsRecordBuilder metrics = getMetrics(SCMMetrics.SOURCE_NAME);
|
||||
return 0 == getLongCounter("ContainerReportSize", metrics)
|
||||
&& 0 == getLongCounter("ContainerReportUsed", metrics)
|
||||
&& 0 == getLongCounter("ContainerReportReadBytes", metrics)
|
||||
&& 0 == getLongCounter("ContainerReportWriteBytes", metrics)
|
||||
&& 0 == getLongCounter("ContainerReportKeyCount", metrics)
|
||||
&& 0 == getLongCounter("ContainerReportReadCount", metrics)
|
||||
&& 0 == getLongCounter("ContainerReportWriteCount", metrics);
|
||||
}, 1000, 60000);
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
|
@ -91,7 +218,7 @@ public class TestSCMMetrics {
|
|||
}
|
||||
|
||||
private ContainerReportsRequestProto createContainerReport(int numReport,
|
||||
ContainerStat stat) {
|
||||
ContainerStat stat, String datanodeUuid) {
|
||||
StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto.Builder
|
||||
reportsBuilder = StorageContainerDatanodeProtocolProtos
|
||||
.ContainerReportsRequestProto.newBuilder();
|
||||
|
@ -108,8 +235,15 @@ public class TestSCMMetrics {
|
|||
report.setWriteBytes(stat.getWriteBytes().get());
|
||||
reportsBuilder.addReports(report.getProtoBufMessage());
|
||||
}
|
||||
reportsBuilder.setDatanodeID(SCMTestUtils.getDatanodeID()
|
||||
.getProtoBufMessage());
|
||||
|
||||
DatanodeID datanodeID;
|
||||
if (datanodeUuid == null) {
|
||||
datanodeID = SCMTestUtils.getDatanodeID();
|
||||
} else {
|
||||
datanodeID = new DatanodeID("null", "null", datanodeUuid, 0, 0, 0, 0);
|
||||
}
|
||||
|
||||
reportsBuilder.setDatanodeID(datanodeID.getProtoBufMessage());
|
||||
reportsBuilder.setType(StorageContainerDatanodeProtocolProtos
|
||||
.ContainerReportsRequestProto.reportType.fullReport);
|
||||
return reportsBuilder.build();
|
||||
|
|
|
@ -93,7 +93,7 @@ public class TestContainerPlacement {
|
|||
SCMNodeManager createNodeManager(OzoneConfiguration config)
|
||||
throws IOException {
|
||||
SCMNodeManager nodeManager = new SCMNodeManager(config,
|
||||
UUID.randomUUID().toString());
|
||||
UUID.randomUUID().toString(), null);
|
||||
assertFalse("Node manager should be in chill mode",
|
||||
nodeManager.isOutOfChillMode());
|
||||
return nodeManager;
|
||||
|
|
|
@ -125,7 +125,7 @@ public class TestNodeManager {
|
|||
SCMNodeManager createNodeManager(OzoneConfiguration config)
|
||||
throws IOException {
|
||||
SCMNodeManager nodeManager = new SCMNodeManager(config,
|
||||
UUID.randomUUID().toString());
|
||||
UUID.randomUUID().toString(), null);
|
||||
assertFalse("Node manager should be in chill mode",
|
||||
nodeManager.isOutOfChillMode());
|
||||
return nodeManager;
|
||||
|
|
Loading…
Reference in New Issue