HDDS-304. Process ContainerAction from datanode heartbeat in SCM. Contributed by Nanda Kumar.

This commit is contained in:
Mukul Kumar Singh 2018-08-02 17:34:17 +05:30
parent 97870ec1f6
commit 7c368575a3
5 changed files with 168 additions and 1 deletions

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerAction;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.ContainerActionsFromDatanode;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Handles container reports from datanode.
*/
public class ContainerActionsHandler implements
EventHandler<ContainerActionsFromDatanode> {
private static final Logger LOG = LoggerFactory.getLogger(
ContainerActionsHandler.class);
@Override
public void onMessage(
ContainerActionsFromDatanode containerReportFromDatanode,
EventPublisher publisher) {
DatanodeDetails dd = containerReportFromDatanode.getDatanodeDetails();
for (ContainerAction action : containerReportFromDatanode.getReport()
.getContainerActionsList()) {
ContainerID containerId = ContainerID.valueof(action.getContainerID());
switch (action.getAction()) {
case CLOSE:
LOG.debug("Closing container {} in datanode {} because the" +
" container is {}.", containerId, dd, action.getReason());
publisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerId);
break;
default:
LOG.warn("Invalid action {} with reason {}, from datanode {}. ",
action.getAction(), action.getReason(), dd); }
}
}
}

View File

@ -20,8 +20,15 @@
package org.apache.hadoop.hdds.scm.events; package org.apache.hadoop.hdds.scm.events;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.*; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
.CloseContainerStatus;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
.DeleteBlockCommandStatus;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
.ReplicationStatus;
import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.ContainerActionsFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.CommandStatusReportFromDatanode; .CommandStatusReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
@ -56,6 +63,13 @@ public final class SCMEvents {
public static final TypedEvent<ContainerReportFromDatanode> CONTAINER_REPORT = public static final TypedEvent<ContainerReportFromDatanode> CONTAINER_REPORT =
new TypedEvent<>(ContainerReportFromDatanode.class, "Container_Report"); new TypedEvent<>(ContainerReportFromDatanode.class, "Container_Report");
/**
* ContainerActions are sent by Datanode. This event is received by
* SCMDatanodeHeartbeatDispatcher and CONTAINER_ACTIONS event is generated.
*/
public static final TypedEvent<ContainerActionsFromDatanode>
CONTAINER_ACTIONS = new TypedEvent<>(ContainerActionsFromDatanode.class,
"Container_Actions");
/** /**
* A Command status report will be sent by datanodes. This repoort is received * A Command status report will be sent by datanodes. This repoort is received
* by SCMDatanodeHeartbeatDispatcher and CommandReport event is generated. * by SCMDatanodeHeartbeatDispatcher and CommandReport event is generated.

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.hdds.scm.server;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerActionsProto;
import org.apache.hadoop.hdds.protocol.proto. import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto; StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
@ -37,6 +39,7 @@ import org.slf4j.LoggerFactory;
import java.util.List; import java.util.List;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_ACTIONS;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT; import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT; import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CMD_STATUS_REPORT; import static org.apache.hadoop.hdds.scm.events.SCMEvents.CMD_STATUS_REPORT;
@ -89,6 +92,13 @@ public final class SCMDatanodeHeartbeatDispatcher {
} }
if (heartbeat.hasContainerActions()) {
LOG.debug("Dispatching Container Actions.");
eventPublisher.fireEvent(CONTAINER_ACTIONS,
new ContainerActionsFromDatanode(datanodeDetails,
heartbeat.getContainerActions()));
}
if (heartbeat.hasCommandStatusReport()) { if (heartbeat.hasCommandStatusReport()) {
eventPublisher.fireEvent(CMD_STATUS_REPORT, eventPublisher.fireEvent(CMD_STATUS_REPORT,
new CommandStatusReportFromDatanode(datanodeDetails, new CommandStatusReportFromDatanode(datanodeDetails,
@ -145,6 +155,18 @@ public final class SCMDatanodeHeartbeatDispatcher {
} }
} }
/**
* Container action event payload with origin.
*/
public static class ContainerActionsFromDatanode
extends ReportFromDatanode<ContainerActionsProto> {
public ContainerActionsFromDatanode(DatanodeDetails datanodeDetails,
ContainerActionsProto actions) {
super(datanodeDetails, actions);
}
}
/** /**
* Container report event payload with origin. * Container report event payload with origin.
*/ */

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.block.BlockManagerImpl; import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler; import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler;
import org.apache.hadoop.hdds.scm.container.ContainerMapping; import org.apache.hadoop.hdds.scm.container.ContainerMapping;
import org.apache.hadoop.hdds.scm.container.ContainerReportHandler; import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
import org.apache.hadoop.hdds.scm.container.Mapping; import org.apache.hadoop.hdds.scm.container.Mapping;
@ -209,10 +210,12 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
NewNodeHandler newNodeHandler = new NewNodeHandler(node2ContainerMap); NewNodeHandler newNodeHandler = new NewNodeHandler(node2ContainerMap);
StaleNodeHandler staleNodeHandler = new StaleNodeHandler(node2ContainerMap); StaleNodeHandler staleNodeHandler = new StaleNodeHandler(node2ContainerMap);
DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap); DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap);
ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager); eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler); eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportHandler); eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportHandler);
eventQueue.addHandler(SCMEvents.CONTAINER_ACTIONS, actionsHandler);
eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler); eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
eventQueue.addHandler(SCMEvents.NEW_NODE, newNodeHandler); eventQueue.addHandler(SCMEvents.NEW_NODE, newNodeHandler);
eventQueue.addHandler(SCMEvents.STALE_NODE, staleNodeHandler); eventQueue.addHandler(SCMEvents.STALE_NODE, staleNodeHandler);

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerActionsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerActionsFromDatanode;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.junit.Test;
import org.mockito.Mockito;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
/**
* Tests ContainerActionsHandler.
*/
public class TestContainerActionsHandler {
@Test
public void testCloseContainerAction() {
EventQueue queue = new EventQueue();
ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
CloseContainerEventHandler closeContainerEventHandler = Mockito.mock(
CloseContainerEventHandler.class);
queue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerEventHandler);
queue.addHandler(SCMEvents.CONTAINER_ACTIONS, actionsHandler);
ContainerAction action = ContainerAction.newBuilder()
.setContainerID(1L)
.setAction(ContainerAction.Action.CLOSE)
.setReason(ContainerAction.Reason.CONTAINER_FULL)
.build();
ContainerActionsProto cap = ContainerActionsProto.newBuilder()
.addContainerActions(action)
.build();
ContainerActionsFromDatanode containerActions =
new ContainerActionsFromDatanode(
TestUtils.randomDatanodeDetails(), cap);
queue.fireEvent(SCMEvents.CONTAINER_ACTIONS, containerActions);
verify(closeContainerEventHandler, times(1))
.onMessage(ContainerID.valueof(1L), queue);
}
}