diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java index 690aa015b14..acd4af978e1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java @@ -42,7 +42,7 @@ import java.util.concurrent.Future; /** - * Register a container with SCM. + * Register a datanode with SCM. */ public final class RegisterEndpointTask implements Callable { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java index b3c3eb359ec..b5d75ef01cb 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java @@ -67,4 +67,12 @@ RegisteredCommand register(DatanodeDetails datanodeDetails, */ List processHeartbeat(DatanodeDetails datanodeDetails); + /** + * Check if node is registered or not. + * Return true if Node is registered and false otherwise. + * @param datanodeDetails - Datanode ID. + * @return true if Node is registered, false otherwise + */ + Boolean isNodeRegistered(DatanodeDetails datanodeDetails); + } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index 585d545c8e0..5f6a2e401d6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -196,4 +196,11 @@ void addDatanodeInContainerMap(UUID uuid, Set containerIDs) * @param dnUuid datanode uuid. */ void processDeadNode(UUID dnUuid); + + /** + * Get list of SCMCommands in the Command Queue for a particular Datanode. + * @param dnID - Datanode uuid. + * @return list of commands + */ + List getCommandQueue(UUID dnID); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 4a341365926..acec6aa3fc8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -52,7 +52,6 @@ import org.apache.hadoop.ozone.protocol.VersionResponse; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; -import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.slf4j.Logger; @@ -196,7 +195,7 @@ private void updateNodeStat(UUID dnId, NodeReportProto nodeReport) { try { stat = nodeStateManager.getNodeStat(dnId); } catch (NodeNotFoundException e) { - LOG.debug("SCM updateNodeStat based on heartbeat from previous" + + LOG.debug("SCM updateNodeStat based on heartbeat from previous " + "dead datanode {}", dnId); stat = new SCMNodeStat(); } @@ -277,7 +276,7 @@ public RegisteredCommand register( nodeStateManager.setNodeStat(dnId, new SCMNodeStat()); // Updating Node Report, as registration is successful updateNodeStat(datanodeDetails.getUuid(), nodeReport); - LOG.info("Data node with ID: {} Registered.", datanodeDetails.getUuid()); + LOG.info("Registered Data node : {}", datanodeDetails); } catch (NodeAlreadyExistsException e) { LOG.trace("Datanode is already registered. Datanode: {}", datanodeDetails.toString()); @@ -304,14 +303,22 @@ public List processHeartbeat(DatanodeDetails datanodeDetails) { try { nodeStateManager.updateLastHeartbeatTime(datanodeDetails); } catch (NodeNotFoundException e) { - LOG.warn("SCM receive heartbeat from unregistered datanode {}", - datanodeDetails); - commandQueue.addCommand(datanodeDetails.getUuid(), - new ReregisterCommand()); + LOG.error("SCM trying to process heartbeat from an " + + "unregistered node {}. Ignoring the heartbeat.", datanodeDetails); } return commandQueue.getCommand(datanodeDetails.getUuid()); } + @Override + public Boolean isNodeRegistered(DatanodeDetails datanodeDetails) { + try { + nodeStateManager.getNode(datanodeDetails); + return true; + } catch (NodeNotFoundException e) { + return false; + } + } + /** * Process node report. * @@ -487,4 +494,9 @@ public void processDeadNode(UUID dnUuid) { + " doesn't exist or decommissioned already.", dnUuid); } } + + @Override + public List getCommandQueue(UUID dnID) { + return commandQueue.getCommand(dnID); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java index d9a0875385b..7c7df27acd2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java @@ -35,13 +35,15 @@ .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import com.google.protobuf.GeneratedMessage; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; +import java.util.UUID; import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_ACTIONS; import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT; @@ -82,51 +84,78 @@ public SCMDatanodeHeartbeatDispatcher(NodeManager nodeManager, public List dispatch(SCMHeartbeatRequestProto heartbeat) { DatanodeDetails datanodeDetails = DatanodeDetails.getFromProtoBuf(heartbeat.getDatanodeDetails()); - // should we dispatch heartbeat through eventPublisher? - List commands = nodeManager.processHeartbeat(datanodeDetails); - if (heartbeat.hasNodeReport()) { - LOG.debug("Dispatching Node Report."); - eventPublisher.fireEvent(NODE_REPORT, - new NodeReportFromDatanode(datanodeDetails, - heartbeat.getNodeReport())); - } + List commands; - if (heartbeat.hasContainerReport()) { - LOG.debug("Dispatching Container Report."); - eventPublisher.fireEvent(CONTAINER_REPORT, - new ContainerReportFromDatanode(datanodeDetails, - heartbeat.getContainerReport())); + // If node is not registered, ask the node to re-register. Do not process + // Heartbeat for unregistered nodes. + if (!nodeManager.isNodeRegistered(datanodeDetails)) { + LOG.info("SCM received heartbeat from an unregistered datanode {}. " + + "Asking datanode to re-register.", datanodeDetails); + UUID dnID = datanodeDetails.getUuid(); + nodeManager.addDatanodeCommand(dnID, new ReregisterCommand()); - } + commands = nodeManager.getCommandQueue(dnID); - if (heartbeat.hasContainerActions()) { - LOG.debug("Dispatching Container Actions."); - eventPublisher.fireEvent(CONTAINER_ACTIONS, - new ContainerActionsFromDatanode(datanodeDetails, - heartbeat.getContainerActions())); - } + } else { - if (heartbeat.hasPipelineReports()) { - LOG.debug("Dispatching Pipeline Report."); - eventPublisher.fireEvent(PIPELINE_REPORT, - new PipelineReportFromDatanode(datanodeDetails, - heartbeat.getPipelineReports())); + // should we dispatch heartbeat through eventPublisher? + commands = nodeManager.processHeartbeat(datanodeDetails); + if (heartbeat.hasNodeReport()) { + LOG.debug("Dispatching Node Report."); + eventPublisher.fireEvent( + NODE_REPORT, + new NodeReportFromDatanode( + datanodeDetails, + heartbeat.getNodeReport())); + } - } + if (heartbeat.hasContainerReport()) { + LOG.debug("Dispatching Container Report."); + eventPublisher.fireEvent( + CONTAINER_REPORT, + new ContainerReportFromDatanode( + datanodeDetails, + heartbeat.getContainerReport())); - if (heartbeat.hasPipelineActions()) { - LOG.debug("Dispatching Pipeline Actions."); - eventPublisher.fireEvent(PIPELINE_ACTIONS, - new PipelineActionsFromDatanode(datanodeDetails, - heartbeat.getPipelineActions())); - } + } - if (heartbeat.getCommandStatusReportsCount() != 0) { - for (CommandStatusReportsProto commandStatusReport : heartbeat - .getCommandStatusReportsList()) { - eventPublisher.fireEvent(CMD_STATUS_REPORT, - new CommandStatusReportFromDatanode(datanodeDetails, - commandStatusReport)); + if (heartbeat.hasContainerActions()) { + LOG.debug("Dispatching Container Actions."); + eventPublisher.fireEvent( + CONTAINER_ACTIONS, + new ContainerActionsFromDatanode( + datanodeDetails, + heartbeat.getContainerActions())); + } + + if (heartbeat.hasPipelineReports()) { + LOG.debug("Dispatching Pipeline Report."); + eventPublisher.fireEvent( + PIPELINE_REPORT, + new PipelineReportFromDatanode( + datanodeDetails, + heartbeat.getPipelineReports())); + + } + + if (heartbeat.hasPipelineActions()) { + LOG.debug("Dispatching Pipeline Actions."); + eventPublisher.fireEvent( + PIPELINE_ACTIONS, + new PipelineActionsFromDatanode( + datanodeDetails, + heartbeat.getPipelineActions())); + } + + if (heartbeat.getCommandStatusReportsCount() != 0) { + for (CommandStatusReportsProto commandStatusReport : heartbeat + .getCommandStatusReportsList()) { + eventPublisher.fireEvent( + CMD_STATUS_REPORT, + new CommandStatusReportFromDatanode( + datanodeDetails, + commandStatusReport)); + } } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index e32f3161958..f4ce1028d01 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -407,6 +407,12 @@ public List processHeartbeat(DatanodeDetails datanodeDetails) { return null; } + @Override + public Boolean isNodeRegistered( + DatanodeDetails datanodeDetails) { + return null; + } + @Override public Map getNodeCount() { Map nodeCountMap = new HashMap(); @@ -470,6 +476,11 @@ public void processDeadNode(UUID dnUuid) { } } + @Override + public List getCommandQueue(UUID dnID) { + return null; + } + /** * A class to declare some values for the nodes so that our tests * won't fail. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java index 32ee371b921..ed95709cf2f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java @@ -17,11 +17,8 @@ */ package org.apache.hadoop.hdds.scm.node; -import com.google.common.base.Supplier; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; @@ -213,56 +210,6 @@ public void testScmShutdown() throws IOException, InterruptedException, //TODO: add assertion } - /** - * Asserts scm informs datanodes to re-register with the nodemanager - * on a restart. - * - * @throws Exception - */ - @Test - public void testScmHeartbeatAfterRestart() throws Exception { - OzoneConfiguration conf = getConf(); - conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, - 100, TimeUnit.MILLISECONDS); - DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails(); - UUID dnId = datanodeDetails.getUuid(); - String storagePath = testDir.getAbsolutePath() + "/" + dnId; - StorageReportProto report = - TestUtils.createStorageReport(dnId, storagePath, 100, 10, 90, null); - try (SCMNodeManager nodemanager = createNodeManager(conf)) { - nodemanager.register(datanodeDetails, - TestUtils.createNodeReport(report), - TestUtils.getRandomPipelineReports()); - List command = nodemanager.processHeartbeat(datanodeDetails); - Assert.assertTrue(nodemanager.getAllNodes().contains(datanodeDetails)); - Assert.assertTrue("On regular HB calls, SCM responses a " - + "datanode with an empty command list", command.isEmpty()); - } - - // Sends heartbeat without registering to SCM. - // This happens when SCM restarts. - try (SCMNodeManager nodemanager = createNodeManager(conf)) { - Assert.assertFalse(nodemanager - .getAllNodes().contains(datanodeDetails)); - try { - // SCM handles heartbeat asynchronously. - // It may need more than one heartbeat processing to - // send the notification. - GenericTestUtils.waitFor(new Supplier() { - @Override public Boolean get() { - List command = - nodemanager.processHeartbeat(datanodeDetails); - return command.size() == 1 && command.get(0).getType() - .equals(SCMCommandProto.Type.reregisterCommand); - } - }, 100, 3 * 1000); - } catch (TimeoutException e) { - Assert.fail("Times out to verify that scm informs " - + "datanode to re-register itself."); - } - } - } - /** * Asserts that we detect as many healthy nodes as we have generated heartbeat * for. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java index f3cd4eaabad..eac8c906034 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.UUID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto. @@ -39,6 +40,7 @@ .NodeReportFromDatanode; import org.apache.hadoop.hdds.server.events.Event; import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand; import org.junit.Assert; import org.junit.Test; @@ -61,8 +63,12 @@ public void testNodeReportDispatcher() throws IOException { NodeReportProto nodeReport = NodeReportProto.getDefaultInstance(); + NodeManager mockNodeManager = Mockito.mock(NodeManager.class); + Mockito.when(mockNodeManager.isNodeRegistered(Mockito.any())) + .thenReturn(true); + SCMDatanodeHeartbeatDispatcher dispatcher = - new SCMDatanodeHeartbeatDispatcher(Mockito.mock(NodeManager.class), + new SCMDatanodeHeartbeatDispatcher(mockNodeManager, new EventPublisher() { @Override public > void fireEvent( @@ -99,8 +105,13 @@ public void testContainerReportDispatcher() throws IOException { CommandStatusReportsProto commandStatusReport = CommandStatusReportsProto.getDefaultInstance(); + NodeManager mockNodeManager = Mockito.mock(NodeManager.class); + Mockito.when(mockNodeManager.isNodeRegistered(Mockito.any())) + .thenReturn(true); + SCMDatanodeHeartbeatDispatcher dispatcher = - new SCMDatanodeHeartbeatDispatcher(Mockito.mock(NodeManager.class), + new SCMDatanodeHeartbeatDispatcher( + mockNodeManager, new EventPublisher() { @Override public > void fireEvent( @@ -135,4 +146,30 @@ public > void fireEvent( } + /** + * Asserts scm informs datanodes to re-register on a restart. + * + * @throws Exception + */ + @Test + public void testScmHeartbeatAfterRestart() throws Exception { + + NodeManager mockNodeManager = Mockito.mock(NodeManager.class); + SCMDatanodeHeartbeatDispatcher dispatcher = + new SCMDatanodeHeartbeatDispatcher( + mockNodeManager, Mockito.mock(EventPublisher.class)); + + DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails(); + + SCMHeartbeatRequestProto heartbeat = + SCMHeartbeatRequestProto.newBuilder() + .setDatanodeDetails(datanodeDetails.getProtoBufMessage()) + .build(); + + dispatcher.dispatch(heartbeat); + // If SCM receives heartbeat from a node after it restarts and the node + // is not registered, it should send a Re-Register command back to the node. + Mockito.verify(mockNodeManager, Mockito.times(1)).addDatanodeCommand( + Mockito.any(UUID.class), Mockito.any(ReregisterCommand.class)); + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java index 6584aec2852..069f1af1b7b 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java @@ -295,6 +295,12 @@ public List processHeartbeat(DatanodeDetails dd) { return null; } + @Override + public Boolean isNodeRegistered( + DatanodeDetails datanodeDetails) { + return null; + } + /** * Clears all nodes from the node Manager. */ @@ -341,4 +347,9 @@ public void onMessage(CommandForDatanode commandForDatanode, public void processDeadNode(UUID dnUuid) { // do nothing. } + + @Override + public List getCommandQueue(UUID dnID) { + return null; + } }