diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/SCMMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/SCMMXBean.java index 9f50f7726c7..336ed4454b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/SCMMXBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/SCMMXBean.java @@ -18,6 +18,8 @@ package org.apache.hadoop.ozone.scm; +import java.util.Map; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.jmx.ServiceRuntimeInfo; @@ -39,4 +41,10 @@ public interface SCMMXBean extends ServiceRuntimeInfo { * @return SCM client RPC server port */ String getClientRpcPort(); + + /** + * Get container report info that includes container IO stats of nodes. + * @return The datanodeUUid to report json string mapping + */ + Map getContainerReport(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java index a9f93601811..0f1aae8912a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java @@ -19,6 +19,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import com.google.protobuf.BlockingService; import com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.classification.InterfaceAudience; @@ -104,6 +108,8 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.Collections; import java.util.stream.Collectors; @@ -204,6 +210,9 @@ public String getName() { /** SCM metrics. */ private static SCMMetrics metrics; + /** Key = DatanodeUuid, value = ContainerStat. */ + private Cache containerReportCache; + private static final String USAGE = "Usage: \n hdfs scm [ " + StartupOption.INIT.getName() + " [ " @@ -225,13 +234,15 @@ private StorageContainerManager(OzoneConfiguration conf) OZONE_SCM_DB_CACHE_SIZE_DEFAULT); StorageContainerManager.initMetrics(); + initContainerReportCache(conf); + scmStorage = new SCMStorage(conf); String clusterId = scmStorage.getClusterID(); if (clusterId == null) { throw new SCMException("clusterId not found", ResultCodes.SCM_NOT_INITIALIZED); } - scmNodeManager = new SCMNodeManager(conf, scmStorage.getClusterID()); + scmNodeManager = new SCMNodeManager(conf, scmStorage.getClusterID(), this); scmContainerManager = new ContainerMapping(conf, scmNodeManager, cacheSize); scmBlockManager = new BlockManagerImpl(conf, scmNodeManager, scmContainerManager, cacheSize); @@ -297,6 +308,31 @@ private StorageContainerManager(OzoneConfiguration conf) registerMXBean(); } + /** + * Initialize container reports cache that sent from datanodes. + * + * @param conf + */ + private void initContainerReportCache(OzoneConfiguration conf) { + containerReportCache = CacheBuilder.newBuilder() + .expireAfterAccess(Long.MAX_VALUE, TimeUnit.MILLISECONDS) + .maximumSize(Integer.MAX_VALUE) + .removalListener(new RemovalListener() { + @Override + public void onRemoval( + RemovalNotification removalNotification) { + synchronized (containerReportCache) { + ContainerStat stat = removalNotification.getValue(); + // remove invalid container report + metrics.decrContainerStat(stat); + LOG.debug( + "Remove expired container stat entry for datanode: {}.", + removalNotification.getKey()); + } + } + }).build(); + } + /** * Builds a message for logging startup information about an RPC server. * @@ -836,7 +872,15 @@ public void stop() { LOG.error("SCM block manager service stop failed.", ex); } - metrics.unRegister(); + if (containerReportCache != null) { + containerReportCache.invalidateAll(); + containerReportCache.cleanUp(); + } + + if (metrics != null) { + metrics.unRegister(); + } + unregisterMXBean(); IOUtils.cleanupWithLogger(LOG, scmContainerManager); IOUtils.cleanupWithLogger(LOG, scmBlockManager); @@ -917,27 +961,7 @@ public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID, @Override public ContainerReportsResponseProto sendContainerReport( ContainerReportsRequestProto reports) throws IOException { - // TODO: We should update the logic once incremental container report - // type is supported. - if (reports.getType() == - ContainerReportsRequestProto.reportType.fullReport) { - ContainerStat stat = new ContainerStat(); - for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports - .getReportsList()) { - stat.add(new ContainerStat(info.getSize(), info.getUsed(), - info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(), - info.getReadCount(), info.getWriteCount())); - } - - // update container metrics - metrics.setLastContainerReportSize(stat.getSize().get()); - metrics.setLastContainerReportUsed(stat.getUsed().get()); - metrics.setLastContainerReportKeyCount(stat.getKeyCount().get()); - metrics.setLastContainerReportReadBytes(stat.getReadBytes().get()); - metrics.setLastContainerReportWriteBytes(stat.getWriteBytes().get()); - metrics.setLastContainerReportReadCount(stat.getReadCount().get()); - metrics.setLastContainerReportWriteCount(stat.getWriteCount().get()); - } + updateContainerReportMetrics(reports); // should we process container reports async? scmContainerManager.processContainerReports( @@ -946,6 +970,37 @@ public ContainerReportsResponseProto sendContainerReport( return ContainerReportsResponseProto.newBuilder().build(); } + private void updateContainerReportMetrics( + ContainerReportsRequestProto reports) { + ContainerStat newStat = null; + // TODO: We should update the logic once incremental container report + // type is supported. + if (reports + .getType() == ContainerReportsRequestProto.reportType.fullReport) { + newStat = new ContainerStat(); + for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports + .getReportsList()) { + newStat.add(new ContainerStat(info.getSize(), info.getUsed(), + info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(), + info.getReadCount(), info.getWriteCount())); + } + + // update container metrics + metrics.setLastContainerStat(newStat); + } + + // Update container stat entry, this will trigger a removal operation if it + // exists in cache. + synchronized (containerReportCache) { + String datanodeUuid = reports.getDatanodeID().getDatanodeUuid(); + if (datanodeUuid != null && newStat != null) { + containerReportCache.put(datanodeUuid, newStat); + // update global view container metrics + metrics.incrContainerStat(newStat); + } + } + } + /** * Handles the block deletion ACKs sent by datanodes. Once ACKs recieved, * SCM considers the blocks are deleted and update the metadata in SCM DB. @@ -1124,4 +1179,53 @@ public static void initMetrics() { public static SCMMetrics getMetrics() { return metrics == null ? SCMMetrics.create() : metrics; } + + /** + * Invalidate container stat entry for given datanode. + * + * @param datanodeUuid + */ + public void removeContainerReport(String datanodeUuid) { + synchronized (containerReportCache) { + containerReportCache.invalidate(datanodeUuid); + } + } + + /** + * Get container stat of specified datanode. + * + * @param datanodeUuid + * @return + */ + public ContainerStat getContainerReport(String datanodeUuid) { + ContainerStat stat = null; + synchronized (containerReportCache) { + stat = containerReportCache.getIfPresent(datanodeUuid); + } + + return stat; + } + + /** + * Returns a view of the container stat entries. Modifications made to the + * map will directly affect the cache. + * + * @return + */ + public ConcurrentMap getContainerReportCache() { + return containerReportCache.asMap(); + } + + @Override + public Map getContainerReport() { + Map id2StatMap = new HashMap<>(); + synchronized (containerReportCache) { + ConcurrentMap map = containerReportCache.asMap(); + for (Map.Entry entry : map.entrySet()) { + id2StatMap.put(entry.getKey(), entry.getValue().toJsonString()); + } + } + + return id2StatMap; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/ContainerStat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/ContainerStat.java index 65e96c33f19..810b8fd5dc2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/ContainerStat.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/ContainerStat.java @@ -17,6 +17,11 @@ */ package org.apache.hadoop.ozone.scm.container.placement.metrics; +import java.io.IOException; + +import org.apache.hadoop.ozone.web.utils.JsonUtils; + +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; /** @@ -26,36 +31,43 @@ public class ContainerStat { /** * The maximum container size. */ + @JsonProperty("Size") private LongMetric size; /** * The number of bytes used by the container. */ + @JsonProperty("Used") private LongMetric used; /** * The number of keys in the container. */ + @JsonProperty("KeyCount") private LongMetric keyCount; /** * The number of bytes read from the container. */ + @JsonProperty("ReadBytes") private LongMetric readBytes; /** * The number of bytes write into the container. */ + @JsonProperty("WriteBytes") private LongMetric writeBytes; /** * The number of times the container is read. */ + @JsonProperty("ReadCount") private LongMetric readCount; /** - * The number of times the container is written into . + * The number of times the container is written into. */ + @JsonProperty("WriteCount") private LongMetric writeCount; public ContainerStat() { @@ -117,6 +129,10 @@ public LongMetric getWriteCount() { } public void add(ContainerStat stat) { + if (stat == null) { + return; + } + this.size.add(stat.getSize().get()); this.used.add(stat.getUsed().get()); this.keyCount.add(stat.getKeyCount().get()); @@ -125,4 +141,26 @@ public void add(ContainerStat stat) { this.readCount.add(stat.getReadCount().get()); this.writeCount.add(stat.getWriteCount().get()); } -} + + public void subtract(ContainerStat stat) { + if (stat == null) { + return; + } + + this.size.subtract(stat.getSize().get()); + this.used.subtract(stat.getUsed().get()); + this.keyCount.subtract(stat.getKeyCount().get()); + this.readBytes.subtract(stat.getReadBytes().get()); + this.writeBytes.subtract(stat.getWriteBytes().get()); + this.readCount.subtract(stat.getReadCount().get()); + this.writeCount.subtract(stat.getWriteCount().get()); + } + + public String toJsonString() { + try { + return JsonUtils.toJsonString(this); + } catch (IOException ignored) { + return null; + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/LongMetric.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/LongMetric.java index df3f4626016..dbcd9f42264 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/LongMetric.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/LongMetric.java @@ -16,9 +16,13 @@ */ package org.apache.hadoop.ozone.scm.container.placement.metrics; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; + /** * An helper class for all metrics based on Longs. */ +@JsonAutoDetect(fieldVisibility = Visibility.ANY) public class LongMetric implements DatanodeMetric { private Long value; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/SCMMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/SCMMetrics.java index 17892dd5bc6..bdfec2d32aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/SCMMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/SCMMetrics.java @@ -21,6 +21,7 @@ import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; /** @@ -43,6 +44,14 @@ public class SCMMetrics { @Metric private MutableGaugeLong lastContainerReportReadCount; @Metric private MutableGaugeLong lastContainerReportWriteCount; + @Metric private MutableCounterLong containerReportSize; + @Metric private MutableCounterLong containerReportUsed; + @Metric private MutableCounterLong containerReportKeyCount; + @Metric private MutableCounterLong containerReportReadBytes; + @Metric private MutableCounterLong containerReportWriteBytes; + @Metric private MutableCounterLong containerReportReadCount; + @Metric private MutableCounterLong containerReportWriteCount; + public SCMMetrics() { } @@ -80,6 +89,64 @@ public void setLastContainerReportWriteCount(long writeCount) { this.lastContainerReportWriteCount.set(writeCount); } + public void incrContainerReportSize(long size) { + this.containerReportSize.incr(size); + } + + public void incrContainerReportUsed(long used) { + this.containerReportUsed.incr(used); + } + + public void incrContainerReportKeyCount(long keyCount) { + this.containerReportKeyCount.incr(keyCount); + } + + public void incrContainerReportReadBytes(long readBytes) { + this.containerReportReadBytes.incr(readBytes); + } + + public void incrContainerReportWriteBytes(long writeBytes) { + this.containerReportWriteBytes.incr(writeBytes); + } + + public void incrContainerReportReadCount(long readCount) { + this.containerReportReadCount.incr(readCount); + } + + public void incrContainerReportWriteCount(long writeCount) { + this.containerReportWriteCount.incr(writeCount); + } + + public void setLastContainerStat(ContainerStat newStat) { + this.lastContainerReportSize.set(newStat.getSize().get()); + this.lastContainerReportUsed.set(newStat.getUsed().get()); + this.lastContainerReportKeyCount.set(newStat.getKeyCount().get()); + this.lastContainerReportReadBytes.set(newStat.getReadBytes().get()); + this.lastContainerReportWriteBytes.set(newStat.getWriteBytes().get()); + this.lastContainerReportReadCount.set(newStat.getReadCount().get()); + this.lastContainerReportWriteCount.set(newStat.getWriteCount().get()); + } + + public void incrContainerStat(ContainerStat deltaStat) { + this.containerReportSize.incr(deltaStat.getSize().get()); + this.containerReportUsed.incr(deltaStat.getUsed().get()); + this.containerReportKeyCount.incr(deltaStat.getKeyCount().get()); + this.containerReportReadBytes.incr(deltaStat.getReadBytes().get()); + this.containerReportWriteBytes.incr(deltaStat.getWriteBytes().get()); + this.containerReportReadCount.incr(deltaStat.getReadCount().get()); + this.containerReportWriteCount.incr(deltaStat.getWriteCount().get()); + } + + public void decrContainerStat(ContainerStat deltaStat) { + this.containerReportSize.incr(-1 * deltaStat.getSize().get()); + this.containerReportUsed.incr(-1 * deltaStat.getUsed().get()); + this.containerReportKeyCount.incr(-1 * deltaStat.getKeyCount().get()); + this.containerReportReadBytes.incr(-1 * deltaStat.getReadBytes().get()); + this.containerReportWriteBytes.incr(-1 * deltaStat.getWriteBytes().get()); + this.containerReportReadCount.incr(-1 * deltaStat.getReadCount().get()); + this.containerReportWriteCount.incr(-1 * deltaStat.getWriteCount().get()); + } + public void unRegister() { MetricsSystem ms = DefaultMetricsSystem.instance(); ms.unregisterSource(SOURCE_NAME); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java index 60148ba84f3..ed894cbb571 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java @@ -43,7 +43,7 @@ .proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport; import org.apache.hadoop.ozone.protocol .proto.StorageContainerDatanodeProtocolProtos.SCMStorageReport; - +import org.apache.hadoop.ozone.scm.StorageContainerManager; import org.apache.hadoop.ozone.scm.VersionInfo; import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat; @@ -121,7 +121,7 @@ public class SCMNodeManager private final AtomicInteger staleNodeCount; private final AtomicInteger deadNodeCount; private final AtomicInteger totalNodes; - private final long staleNodeIntervalMs; + private long staleNodeIntervalMs; private final long deadNodeIntervalMs; private final long heartbeatCheckerIntervalMs; private final long datanodeHBIntervalSeconds; @@ -150,12 +150,13 @@ public class SCMNodeManager // Node pool manager. private final SCMNodePoolManager nodePoolManager; + private final StorageContainerManager scmManager; /** * Constructs SCM machine Manager. */ - public SCMNodeManager(OzoneConfiguration conf, String clusterID) - throws IOException { + public SCMNodeManager(OzoneConfiguration conf, String clusterID, + StorageContainerManager scmManager) throws IOException { heartbeatQueue = new ConcurrentLinkedQueue<>(); healthyNodes = new ConcurrentHashMap<>(); deadNodes = new ConcurrentHashMap<>(); @@ -197,6 +198,7 @@ public SCMNodeManager(OzoneConfiguration conf, String clusterID) registerMXBean(); this.nodePoolManager = new SCMNodePoolManager(conf); + this.scmManager = scmManager; } private void registerMXBean() { @@ -551,6 +553,11 @@ private void moveHealthyNodeToStale(Map.Entry entry) { healthyNodeCount.decrementAndGet(); staleNodes.put(entry.getKey(), entry.getValue()); staleNodeCount.incrementAndGet(); + + if (scmManager != null) { + // remove stale node's container report + scmManager.removeContainerReport(entry.getKey()); + } } /** @@ -863,4 +870,9 @@ public Map getNodeCount() { public void addDatanodeCommand(DatanodeID id, SCMCommand command) { this.commandQueue.addCommand(id, command); } + + @VisibleForTesting + public void setStaleNodeIntervalMs(long interval) { + this.staleNodeIntervalMs = interval; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md index cd153eef6dd..f5eccf6b690 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md @@ -110,13 +110,20 @@ Following are the counters for containers: | Name | Description | |:---- |:---- | -| `LastContainerReportSize` | Total size in bytes of all containers | -| `LastContainerReportUsed` | Total number of bytes used by all containers | -| `LastContainerReportKeyCount` | Total number of keys in all containers | -| `LastContainerReportReadBytes` | Total number of bytes have been read from all containers | -| `LastContainerReportWriteBytes` | Total number of bytes have been written into all containers | -| `LastContainerReportReadCount` | Total number of times containers have been read from | -| `LastContainerReportWriteCount` | Total number of times containers have been written to | +| `LastContainerReportSize` | Total size in bytes of all containers in latest container report that SCM received from datanode | +| `LastContainerReportUsed` | Total number of bytes used by all containers in latest container report that SCM received from datanode | +| `LastContainerReportKeyCount` | Total number of keys in all containers in latest container report that SCM received from datanode | +| `LastContainerReportReadBytes` | Total number of bytes have been read from all containers in latest container report that SCM received from datanode | +| `LastContainerReportWriteBytes` | Total number of bytes have been written into all containers in latest container report that SCM received from datanode | +| `LastContainerReportReadCount` | Total number of times containers have been read from in latest container report that SCM received from datanode | +| `LastContainerReportWriteCount` | Total number of times containers have been written to in latest container report that SCM received from datanode | +| `ContainerReportSize` | Total size in bytes of all containers over whole cluster | +| `ContainerReportUsed` | Total number of bytes used by all containers over whole cluster | +| `ContainerReportKeyCount` | Total number of keys in all containers over whole cluster | +| `ContainerReportReadBytes` | Total number of bytes have been read from all containers over whole cluster | +| `ContainerReportWriteBytes` | Total number of bytes have been written into all containers over whole cluster | +| `ContainerReportReadCount` | Total number of times containers have been read from over whole cluster | +| `ContainerReportWriteCount` | Total number of times containers have been written to over whole cluster | ### Key Space Metrics diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java index 3808e60eb9d..2ce751b468c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java @@ -25,6 +25,7 @@ import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.conf.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.scm.container.placement.metrics.ContainerStat; import org.apache.hadoop.ozone.scm.node.NodeManager; import org.junit.BeforeClass; import org.junit.AfterClass; @@ -40,6 +41,7 @@ import java.lang.management.ManagementFactory; import java.util.Map; import java.util.Iterator; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeoutException; import javax.management.openmbean.CompositeData; @@ -91,6 +93,24 @@ public void testSCMMXBean() throws Exception { String clientRpcPort = (String)mbs.getAttribute(bean, "ClientRpcPort"); assertEquals(scm.getClientRpcPort(), clientRpcPort); + + ConcurrentMap map = scm.getContainerReportCache(); + ContainerStat stat = new ContainerStat(1, 2, 3, 4, 5, 6, 7); + map.put("nodeID", stat); + TabularData data = (TabularData) mbs.getAttribute( + bean, "ContainerReport"); + + // verify report info + assertEquals(1, data.values().size()); + for (Object obj : data.values()) { + assertTrue(obj instanceof CompositeData); + CompositeData d = (CompositeData) obj; + Iterator it = d.values().iterator(); + String key = it.next().toString(); + String value = it.next().toString(); + assertEquals("nodeID", key); + assertEquals(stat.toJsonString(), value); + } } @Test diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java index 762852bc219..5eeb0aa67d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.ozone.scm; +import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getLongGauge; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; @@ -25,9 +26,10 @@ import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.conf.OzoneConfiguration; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.ozone.MiniOzoneClassicCluster; -import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerReport; @@ -35,13 +37,23 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; import org.apache.hadoop.ozone.scm.container.placement.metrics.ContainerStat; import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMMetrics; +import org.apache.hadoop.ozone.scm.node.SCMNodeManager; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; /** * This class tests the metrics of Storage Container Manager. */ public class TestSCMMetrics { - private static MiniOzoneCluster cluster = null; + /** + * Set the timeout for each test. + */ + @Rule + public Timeout testTimeout = new Timeout(90000); + + private static MiniOzoneClassicCluster cluster = null; @Test public void testContainerMetrics() throws Exception { @@ -64,7 +76,11 @@ public void testContainerMetrics() throws Exception { ContainerStat stat = new ContainerStat(size, used, keyCount, readBytes, writeBytes, readCount, writeCount); StorageContainerManager scmManager = cluster.getStorageContainerManager(); - scmManager.sendContainerReport(createContainerReport(numReport, stat)); + + ContainerReportsRequestProto request = createContainerReport(numReport, + stat, null); + String fstDatanodeID = request.getDatanodeID().getDatanodeUuid(); + scmManager.sendContainerReport(request); // verify container stat metrics MetricsRecordBuilder scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME); @@ -83,6 +99,117 @@ public void testContainerMetrics() throws Exception { getLongGauge("LastContainerReportReadCount", scmMetrics)); assertEquals(writeCount * numReport, getLongGauge("LastContainerReportWriteCount", scmMetrics)); + + // add one new report + request = createContainerReport(1, stat, null); + String sndDatanodeID = request.getDatanodeID().getDatanodeUuid(); + scmManager.sendContainerReport(request); + + scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME); + assertEquals(size * (numReport + 1), + getLongCounter("ContainerReportSize", scmMetrics)); + assertEquals(used * (numReport + 1), + getLongCounter("ContainerReportUsed", scmMetrics)); + assertEquals(readBytes * (numReport + 1), + getLongCounter("ContainerReportReadBytes", scmMetrics)); + assertEquals(writeBytes * (numReport + 1), + getLongCounter("ContainerReportWriteBytes", scmMetrics)); + + assertEquals(keyCount * (numReport + 1), + getLongCounter("ContainerReportKeyCount", scmMetrics)); + assertEquals(readCount * (numReport + 1), + getLongCounter("ContainerReportReadCount", scmMetrics)); + assertEquals(writeCount * (numReport + 1), + getLongCounter("ContainerReportWriteCount", scmMetrics)); + + // Re-send reports but with different value for validating + // the aggregation. + stat = new ContainerStat(100, 50, 3, 50, 60, 5, 6); + scmManager.sendContainerReport(createContainerReport(1, stat, + fstDatanodeID)); + + stat = new ContainerStat(1, 1, 1, 1, 1, 1, 1); + scmManager.sendContainerReport(createContainerReport(1, stat, + sndDatanodeID)); + + // the global container metrics value should be updated + scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME); + assertEquals(101, getLongCounter("ContainerReportSize", scmMetrics)); + assertEquals(51, getLongCounter("ContainerReportUsed", scmMetrics)); + assertEquals(51, getLongCounter("ContainerReportReadBytes", scmMetrics)); + assertEquals(61, getLongCounter("ContainerReportWriteBytes", scmMetrics)); + + assertEquals(4, getLongCounter("ContainerReportKeyCount", scmMetrics)); + assertEquals(6, getLongCounter("ContainerReportReadCount", scmMetrics)); + assertEquals(7, getLongCounter("ContainerReportWriteCount", scmMetrics)); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testStaleNodeContainerReport() throws Exception { + int nodeCount = 2; + int numReport = 2; + long size = OzoneConsts.GB * 5; + long used = OzoneConsts.GB * 2; + long readBytes = OzoneConsts.GB * 1; + long writeBytes = OzoneConsts.GB * 2; + int keyCount = 1000; + int readCount = 100; + int writeCount = 50; + OzoneConfiguration conf = new OzoneConfiguration(); + + try { + cluster = new MiniOzoneClassicCluster.Builder(conf) + .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED) + .numDataNodes(nodeCount).build(); + + ContainerStat stat = new ContainerStat(size, used, keyCount, readBytes, + writeBytes, readCount, writeCount); + StorageContainerManager scmManager = cluster.getStorageContainerManager(); + + DataNode dataNode = cluster.getDataNodes().get(0); + String datanodeUuid = dataNode.getDatanodeId().getDatanodeUuid(); + ContainerReportsRequestProto request = createContainerReport(numReport, + stat, datanodeUuid); + scmManager.sendContainerReport(request); + + MetricsRecordBuilder scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME); + assertEquals(size * numReport, + getLongCounter("ContainerReportSize", scmMetrics)); + assertEquals(used * numReport, + getLongCounter("ContainerReportUsed", scmMetrics)); + assertEquals(readBytes * numReport, + getLongCounter("ContainerReportReadBytes", scmMetrics)); + assertEquals(writeBytes * numReport, + getLongCounter("ContainerReportWriteBytes", scmMetrics)); + + assertEquals(keyCount * numReport, + getLongCounter("ContainerReportKeyCount", scmMetrics)); + assertEquals(readCount * numReport, + getLongCounter("ContainerReportReadCount", scmMetrics)); + assertEquals(writeCount * numReport, + getLongCounter("ContainerReportWriteCount", scmMetrics)); + + // reset stale interval time to move node from healthy to stale + SCMNodeManager nodeManager = (SCMNodeManager) cluster + .getStorageContainerManager().getScmNodeManager(); + nodeManager.setStaleNodeIntervalMs(100); + + // verify the metrics when node becomes stale + GenericTestUtils.waitFor(() -> { + MetricsRecordBuilder metrics = getMetrics(SCMMetrics.SOURCE_NAME); + return 0 == getLongCounter("ContainerReportSize", metrics) + && 0 == getLongCounter("ContainerReportUsed", metrics) + && 0 == getLongCounter("ContainerReportReadBytes", metrics) + && 0 == getLongCounter("ContainerReportWriteBytes", metrics) + && 0 == getLongCounter("ContainerReportKeyCount", metrics) + && 0 == getLongCounter("ContainerReportReadCount", metrics) + && 0 == getLongCounter("ContainerReportWriteCount", metrics); + }, 1000, 60000); } finally { if (cluster != null) { cluster.shutdown(); @@ -91,7 +218,7 @@ public void testContainerMetrics() throws Exception { } private ContainerReportsRequestProto createContainerReport(int numReport, - ContainerStat stat) { + ContainerStat stat, String datanodeUuid) { StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto.Builder reportsBuilder = StorageContainerDatanodeProtocolProtos .ContainerReportsRequestProto.newBuilder(); @@ -108,8 +235,15 @@ private ContainerReportsRequestProto createContainerReport(int numReport, report.setWriteBytes(stat.getWriteBytes().get()); reportsBuilder.addReports(report.getProtoBufMessage()); } - reportsBuilder.setDatanodeID(SCMTestUtils.getDatanodeID() - .getProtoBufMessage()); + + DatanodeID datanodeID; + if (datanodeUuid == null) { + datanodeID = SCMTestUtils.getDatanodeID(); + } else { + datanodeID = new DatanodeID("null", "null", datanodeUuid, 0, 0, 0, 0); + } + + reportsBuilder.setDatanodeID(datanodeID.getProtoBufMessage()); reportsBuilder.setType(StorageContainerDatanodeProtocolProtos .ContainerReportsRequestProto.reportType.fullReport); return reportsBuilder.build(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java index 42a56276f14..e08cf273ab5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java @@ -93,7 +93,7 @@ OzoneConfiguration getConf() { SCMNodeManager createNodeManager(OzoneConfiguration config) throws IOException { SCMNodeManager nodeManager = new SCMNodeManager(config, - UUID.randomUUID().toString()); + UUID.randomUUID().toString(), null); assertFalse("Node manager should be in chill mode", nodeManager.isOutOfChillMode()); return nodeManager; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java index f81e6e6a291..58a6af382a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java @@ -125,7 +125,7 @@ OzoneConfiguration getConf() { SCMNodeManager createNodeManager(OzoneConfiguration config) throws IOException { SCMNodeManager nodeManager = new SCMNodeManager(config, - UUID.randomUUID().toString()); + UUID.randomUUID().toString(), null); assertFalse("Node manager should be in chill mode", nodeManager.isOutOfChillMode()); return nodeManager;