diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index 5e2969d3cfc..deb1628a362 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdds.scm.node; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; @@ -138,4 +139,12 @@ public interface NodeManager extends StorageContainerNodeProtocol, * @param command */ void addDatanodeCommand(UUID dnId, SCMCommand command); + + /** + * Process node report. + * + * @param dnUuid + * @param nodeReport + */ + void processNodeReport(UUID dnUuid, NodeReportProto nodeReport); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeReportHandler.java index aa78d53cfd7..331bfed1ab3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeReportHandler.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,25 +18,38 @@ package org.apache.hadoop.hdds.scm.node; +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .NodeReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Handles Node Reports from datanode. */ public class NodeReportHandler implements EventHandler { + private static final Logger LOGGER = LoggerFactory + .getLogger(NodeReportHandler.class); private final NodeManager nodeManager; public NodeReportHandler(NodeManager nodeManager) { + Preconditions.checkNotNull(nodeManager); this.nodeManager = nodeManager; } @Override public void onMessage(NodeReportFromDatanode nodeReportFromDatanode, - EventPublisher publisher) { - //TODO: process node report. + EventPublisher publisher) { + Preconditions.checkNotNull(nodeReportFromDatanode); + DatanodeDetails dn = nodeReportFromDatanode.getDatanodeDetails(); + Preconditions.checkNotNull(dn, "NodeReport is " + + "missing DatanodeDetails."); + LOGGER.trace("Processing node report for dn: {}", dn); + nodeManager + .processNodeReport(dn.getUuid(), nodeReportFromDatanode.getReport()); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 2ba8067a05b..7370b077db7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -422,6 +422,17 @@ public List processHeartbeat(DatanodeDetails datanodeDetails) { return commandQueue.getCommand(datanodeDetails.getUuid()); } + /** + * Process node report. + * + * @param dnUuid + * @param nodeReport + */ + @Override + public void processNodeReport(UUID dnUuid, NodeReportProto nodeReport) { + this.updateNodeStat(dnUuid, nodeReport); + } + /** * Returns the aggregated node stats. * @return the aggregated node stats. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 5e83c288ed9..593b7803224 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -295,6 +295,17 @@ public void addDatanodeCommand(UUID dnId, SCMCommand command) { } } + /** + * Empty implementation for processNodeReport. + * + * @param dnUuid + * @param nodeReport + */ + @Override + public void processNodeReport(UUID dnUuid, NodeReportProto nodeReport) { + // do nothing + } + // Returns the number of commands that is queued to this node manager. public int getCommandCount(DatanodeDetails dd) { List list = commandMap.get(dd.getUuid()); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java new file mode 100644 index 00000000000..3cbde4b6455 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.node; + +import java.io.IOException; +import java.util.List; +import java.util.UUID; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode; +import org.apache.hadoop.hdds.server.events.Event; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.hdds.server.events.EventQueue; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestNodeReportHandler implements EventPublisher { + + private static Logger LOG = LoggerFactory + .getLogger(TestNodeReportHandler.class); + private NodeReportHandler nodeReportHandler; + private SCMNodeManager nodeManager; + private String storagePath = GenericTestUtils.getRandomizedTempPath() + .concat("/" + UUID.randomUUID().toString()); + ; + + @Before + public void resetEventCollector() throws IOException { + OzoneConfiguration conf = new OzoneConfiguration(); + nodeManager = new SCMNodeManager(conf, "cluster1", null, new EventQueue()); + nodeReportHandler = new NodeReportHandler(nodeManager); + } + + @Test + public void testNodeReport() throws IOException { + DatanodeDetails dn = TestUtils.getDatanodeDetails(); + List reports = + TestUtils.createStorageReport(100, 10, 90, storagePath, null, + dn.getUuid().toString(), 1); + + nodeReportHandler.onMessage( + getNodeReport(dn, reports), this); + SCMNodeMetric nodeMetric = nodeManager.getNodeStat(dn); + + Assert.assertTrue(nodeMetric.get().getCapacity().get() == 100); + Assert.assertTrue(nodeMetric.get().getRemaining().get() == 90); + Assert.assertTrue(nodeMetric.get().getScmUsed().get() == 10); + + reports = + TestUtils.createStorageReport(100, 10, 90, storagePath, null, + dn.getUuid().toString(), 2); + nodeReportHandler.onMessage( + getNodeReport(dn, reports), this); + nodeMetric = nodeManager.getNodeStat(dn); + + Assert.assertTrue(nodeMetric.get().getCapacity().get() == 200); + Assert.assertTrue(nodeMetric.get().getRemaining().get() == 180); + Assert.assertTrue(nodeMetric.get().getScmUsed().get() == 20); + + } + + private NodeReportFromDatanode getNodeReport(DatanodeDetails dn, + List reports) { + NodeReportProto nodeReportProto = TestUtils.createNodeReport(reports); + return new NodeReportFromDatanode(dn, nodeReportProto); + } + + @Override + public > void fireEvent( + EVENT_TYPE event, PAYLOAD payload) { + LOG.info("Event is published: {}", payload); + } +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java index 2d27d7143ee..a0249aaa0e7 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java @@ -289,6 +289,16 @@ public void addDatanodeCommand(UUID dnId, SCMCommand command) { this.commandQueue.addCommand(dnId, command); } + /** + * Empty implementation for processNodeReport. + * @param dnUuid + * @param nodeReport + */ + @Override + public void processNodeReport(UUID dnUuid, NodeReportProto nodeReport) { + // do nothing. + } + @Override public void onMessage(CommandForDatanode commandForDatanode, EventPublisher publisher) {