diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java index 625cdd1f6c8..bbba2b7f650 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java @@ -243,6 +243,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { .setPipeline(pipeline); LOG.trace("New block allocated : {} Container ID: {}", localID, containerID); + pipelineManager.incNumBlocksAllocatedMetric(pipeline.getId()); return abb.build(); } catch (PipelineNotFoundException ex) { LOG.error("Pipeline Machine count is zero.", ex); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 2793647b7f4..70bd64c8a48 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -75,4 +75,6 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean { void startPipelineCreator(); void triggerPipelineCreation(); + + void incNumBlocksAllocatedMetric(PipelineID id); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index bce396b6a56..6660f47dbfa 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -152,6 +152,7 @@ public class SCMPipelineManager implements PipelineManager { stateManager.addPipeline(pipeline); nodeManager.addPipeline(pipeline); metrics.incNumPipelineCreated(); + metrics.createPerPipelineMetrics(pipeline); return pipeline; } catch (InsufficientDatanodesException idEx) { throw idEx; @@ -285,7 +286,8 @@ public class SCMPipelineManager implements PipelineManager { public void openPipeline(PipelineID pipelineId) throws IOException { lock.writeLock().lock(); try { - stateManager.openPipeline(pipelineId); + Pipeline pipeline = stateManager.openPipeline(pipelineId); + metrics.createPerPipelineMetrics(pipeline); } finally { lock.writeLock().unlock(); } @@ -362,6 +364,7 @@ public class SCMPipelineManager implements PipelineManager { for (ContainerID containerID : containerIDs) { eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID); } + metrics.removePipelineMetrics(pipelineId); } finally { lock.writeLock().unlock(); } @@ -402,6 +405,11 @@ public class SCMPipelineManager implements PipelineManager { } } + @Override + public void incNumBlocksAllocatedMetric(PipelineID id) { + metrics.incNumBlocksAllocated(id); + } + @Override public void close() throws IOException { if (scheduler != null) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java index d1ae90e3e41..d0f7f6ef3be 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java @@ -19,31 +19,46 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.Interns; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + /** * This class maintains Pipeline related metrics. */ @InterfaceAudience.Private @Metrics(about = "SCM PipelineManager Metrics", context = "ozone") -public final class SCMPipelineMetrics { +public final class SCMPipelineMetrics implements MetricsSource { private static final String SOURCE_NAME = SCMPipelineMetrics.class.getSimpleName(); + private MetricsRegistry registry; + private @Metric MutableCounterLong numPipelineCreated; private @Metric MutableCounterLong numPipelineCreationFailed; private @Metric MutableCounterLong numPipelineDestroyed; private @Metric MutableCounterLong numPipelineDestroyFailed; private @Metric MutableCounterLong numPipelineReportProcessed; private @Metric MutableCounterLong numPipelineReportProcessingFailed; + private Map numBlocksAllocated; /** Private constructor. */ - private SCMPipelineMetrics() { } + private SCMPipelineMetrics() { + this.registry = new MetricsRegistry(SOURCE_NAME); + numBlocksAllocated = new ConcurrentHashMap<>(); + } /** * Create and returns SCMPipelineMetrics instance. @@ -64,6 +79,43 @@ public final class SCMPipelineMetrics { ms.unregisterSource(SOURCE_NAME); } + @Override + @SuppressWarnings("SuspiciousMethodCalls") + public void getMetrics(MetricsCollector collector, boolean all) { + MetricsRecordBuilder recordBuilder = collector.addRecord(SOURCE_NAME); + numPipelineCreated.snapshot(recordBuilder, true); + numPipelineCreationFailed.snapshot(recordBuilder, true); + numPipelineDestroyed.snapshot(recordBuilder, true); + numPipelineDestroyFailed.snapshot(recordBuilder, true); + numPipelineReportProcessed.snapshot(recordBuilder, true); + numPipelineReportProcessingFailed.snapshot(recordBuilder, true); + numBlocksAllocated + .forEach((pid, metric) -> metric.snapshot(recordBuilder, true)); + } + + void createPerPipelineMetrics(Pipeline pipeline) { + numBlocksAllocated.put(pipeline.getId(), new MutableCounterLong(Interns + .info(getBlockAllocationMetricName(pipeline), + "Number of blocks allocated in pipeline " + pipeline.getId()), 0L)); + } + + public static String getBlockAllocationMetricName(Pipeline pipeline) { + return "NumBlocksAllocated-" + pipeline.getType() + "-" + pipeline + .getFactor() + "-" + pipeline.getId().getId(); + } + + void removePipelineMetrics(PipelineID pipelineID) { + numBlocksAllocated.remove(pipelineID); + } + + /** + * Increments number of blocks allocated for the pipeline. + */ + void incNumBlocksAllocated(PipelineID pipelineID) { + Optional.of(numBlocksAllocated.get(pipelineID)).ifPresent( + MutableCounterLong::incr); + } + /** * Increments number of successful pipeline creation count. */ diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java index 1583952be5a..2f1ec66d694 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java @@ -19,6 +19,9 @@ package org.apache.hadoop.ozone.scm.pipeline; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineMetrics; @@ -90,6 +93,38 @@ public class TestSCMPipelineMetrics { assertCounter("NumPipelineDestroyed", 1L, metrics); } + @Test + public void testNumBlocksAllocated() throws IOException { + AllocatedBlock block = + cluster.getStorageContainerManager().getScmBlockManager() + .allocateBlock(5, HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.ONE, "Test", new ExcludeList()); + MetricsRecordBuilder metrics = + getMetrics(SCMPipelineMetrics.class.getSimpleName()); + Pipeline pipeline = block.getPipeline(); + long numBlocksAllocated = getLongCounter( + SCMPipelineMetrics.getBlockAllocationMetricName(pipeline), metrics); + Assert.assertEquals(numBlocksAllocated, 1); + + // destroy the pipeline + try { + cluster.getStorageContainerManager().getClientProtocolServer() + .closePipeline(pipeline.getId().getProtobuf()); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(); + } + metrics = getMetrics(SCMPipelineMetrics.class.getSimpleName()); + try { + getLongCounter(SCMPipelineMetrics.getBlockAllocationMetricName(pipeline), + metrics); + Assert.fail("Metric should not be present for closed pipeline."); + } catch (AssertionError e) { + Assert.assertTrue(e.getMessage().contains( + "Expected exactly one metric for name " + SCMPipelineMetrics + .getBlockAllocationMetricName(block.getPipeline()))); + } + } @After public void teardown() {