HDDS-217. Move all SCMEvents to a package.

Contributed by Anu Engineer.
This commit is contained in:
Anu Engineer 2018-07-08 11:11:21 -07:00
parent 936e0df0d3
commit 2f51cd60ef
9 changed files with 147 additions and 49 deletions

View File

@ -24,15 +24,14 @@
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.TypedEvent;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* In case of a node failure, volume failure, volume out of spapce, node
* out of space etc, CLOSE_CONTAINER_EVENT will be triggered.
* CloseContainerEventHandler is the handler for CLOSE_CONTAINER_EVENT.
* out of space etc, CLOSE_CONTAINER will be triggered.
* CloseContainerEventHandler is the handler for CLOSE_CONTAINER.
* When a close container event is fired, a close command for the container
* should be sent to all the datanodes in the pipeline and containerStateManager
* needs to update the container state to Closing.
@ -42,8 +41,6 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
public static final Logger LOG =
LoggerFactory.getLogger(CloseContainerEventHandler.class);
public static final TypedEvent<ContainerID> CLOSE_CONTAINER_EVENT =
new TypedEvent<>(ContainerID.class);
private final Mapping containerManager;
@ -59,7 +56,8 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) {
ContainerWithPipeline containerWithPipeline = null;
ContainerInfo info;
try {
containerWithPipeline = containerManager.getContainerWithPipeline(containerID.getId());
containerWithPipeline =
containerManager.getContainerWithPipeline(containerID.getId());
info = containerWithPipeline.getContainerInfo();
if (info == null) {
LOG.info("Failed to update the container state. Container with id : {} "
@ -73,7 +71,8 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) {
}
if (info.getState() == HddsProtos.LifeCycleState.OPEN) {
for (DatanodeDetails datanode : containerWithPipeline.getPipeline().getMachines()) {
for (DatanodeDetails datanode :
containerWithPipeline.getPipeline().getMachines()) {
containerManager.getNodeManager().addDatanodeCommand(datanode.getUuid(),
new CloseContainerCommand(containerID.getId(),
info.getReplicationType()));

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.hdds.scm.events;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode;
import org.apache.hadoop.hdds.server.events.Event;
import org.apache.hadoop.hdds.server.events.TypedEvent;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
/**
* Class that acts as the namespace for all SCM Events.
*/
public final class SCMEvents {
/**
* NodeReports are sent out by Datanodes. This report is
* received by SCMDatanodeHeartbeatDispatcher and NodeReport Event is
* generated.
*/
public static final TypedEvent<NodeReportFromDatanode> NODE_REPORT =
new TypedEvent<>(NodeReportFromDatanode.class, "Node_Report");
/**
* ContainerReports are send out by Datanodes. This report
* is received by SCMDatanodeHeartbeatDispatcher and Container_Report Event
* i generated.
*/
public static final TypedEvent<ContainerReportFromDatanode> CONTAINER_REPORT =
new TypedEvent<>(ContainerReportFromDatanode.class, "Container_Report");
/**
* When ever a command for the Datanode needs to be issued by any component
* inside SCM, a Datanode_Command event is generated. NodeManager listens
* to these events and dispatches them to Datanode for further processing.
*/
public static final Event<CommandForDatanode> DATANODE_COMMAND =
new TypedEvent<>(CommandForDatanode.class, "Datanode_Command");
/**
* A Close Container Event can be triggered under many condition.
* Some of them are:
* 1. A Container is full, then we stop writing further information to
* that container. DN's let SCM know that current state and sends a
* informational message that allows SCM to close the container.
*
* 2. If a pipeline is open; for example Ratis; if a single node fails,
* we will proactively close these containers.
*
* Once a command is dispatched to DN, we will also listen to updates from
* the datanode which lets us know that this command completed or timed out.
*/
public static final TypedEvent<ContainerID> CLOSE_CONTAINER =
new TypedEvent<>(ContainerID.class, "Close_Container");
/**
* Private Ctor. Never Constructed.
*/
private SCMEvents() {
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/**
* Events Package contains all the Events used by SCM internally to
* communicate between different sub-systems that make up SCM.
*/
package org.apache.hadoop.hdds.scm.events;

View File

@ -25,10 +25,8 @@
import org.apache.hadoop.hdds.scm.VersionInfo;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.server.events.Event;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.TypedEvent;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
@ -118,8 +116,7 @@ public class SCMNodeManager
// Node pool manager.
private final StorageContainerManager scmManager;
public static final Event<CommandForDatanode> DATANODE_COMMAND =
new TypedEvent<>(CommandForDatanode.class, "DATANODE_COMMAND");
/**
* Constructs SCM machine Manager.

View File

@ -25,12 +25,14 @@
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.TypedEvent;
import com.google.protobuf.GeneratedMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT;
/**
* This class is responsible for dispatching heartbeat from datanode to
* appropriate EventHandler at SCM.
@ -42,11 +44,6 @@ public final class SCMDatanodeHeartbeatDispatcher {
private EventPublisher eventPublisher;
public static final TypedEvent<NodeReportFromDatanode> NODE_REPORT =
new TypedEvent<>(NodeReportFromDatanode.class);
public static final TypedEvent<ContainerReportFromDatanode> CONTAINER_REPORT =
new TypedEvent<ContainerReportFromDatanode>(ContainerReportFromDatanode.class);
public SCMDatanodeHeartbeatDispatcher(EventPublisher eventPublisher) {
this.eventPublisher = eventPublisher;
@ -63,12 +60,14 @@ public void dispatch(SCMHeartbeatRequestProto heartbeat) {
DatanodeDetails.getFromProtoBuf(heartbeat.getDatanodeDetails());
// should we dispatch heartbeat through eventPublisher?
if (heartbeat.hasNodeReport()) {
LOG.debug("Dispatching Node Report.");
eventPublisher.fireEvent(NODE_REPORT,
new NodeReportFromDatanode(datanodeDetails,
heartbeat.getNodeReport()));
}
if (heartbeat.hasContainerReport()) {
LOG.debug("Dispatching Container Report.");
eventPublisher.fireEvent(CONTAINER_REPORT,
new ContainerReportFromDatanode(datanodeDetails,
heartbeat.getContainerReport()));

View File

@ -70,6 +70,8 @@
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
import static org.apache.hadoop.util.ExitUtil.terminate;
@ -164,9 +166,10 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException {
}
EventQueue eventQueue = new EventQueue();
SCMNodeManager nm = new SCMNodeManager(conf, scmStorage.getClusterID(), this);
SCMNodeManager nm =
new SCMNodeManager(conf, scmStorage.getClusterID(), this);
scmNodeManager = nm;
eventQueue.addHandler(SCMNodeManager.DATANODE_COMMAND, nm);
eventQueue.addHandler(DATANODE_COMMAND, nm);
scmContainerManager = new ContainerMapping(conf, getScmNodeManager(),
cacheSize);

View File

@ -17,11 +17,13 @@
package org.apache.hadoop.hdds.scm.container;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers
.ContainerWithPipeline;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
@ -33,12 +35,12 @@
import java.io.File;
import java.io.IOException;
import java.util.Random;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CREATE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CREATED;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CLOSE_CONTAINER;
/**
* Tests the closeContainerEventHandler class.
@ -65,7 +67,7 @@ public static void setUp() throws Exception {
nodeManager = new MockNodeManager(true, 10);
mapping = new ContainerMapping(configuration, nodeManager, 128);
eventQueue = new EventQueue();
eventQueue.addHandler(CloseContainerEventHandler.CLOSE_CONTAINER_EVENT,
eventQueue.addHandler(CLOSE_CONTAINER,
new CloseContainerEventHandler(mapping));
}
@ -81,8 +83,8 @@ public static void tearDown() throws Exception {
public void testIfCloseContainerEventHadnlerInvoked() {
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(CloseContainerEventHandler.LOG);
eventQueue.fireEvent(CloseContainerEventHandler.CLOSE_CONTAINER_EVENT,
new ContainerID(Math.abs(new Random().nextLong())));
eventQueue.fireEvent(CLOSE_CONTAINER,
new ContainerID(Math.abs(RandomUtils.nextInt())));
eventQueue.processAll(1000);
Assert.assertTrue(logCapturer.getOutput()
.contains("Close container Event triggered for container"));
@ -90,10 +92,10 @@ public void testIfCloseContainerEventHadnlerInvoked() {
@Test
public void testCloseContainerEventWithInvalidContainer() {
long id = Math.abs(new Random().nextLong());
long id = Math.abs(RandomUtils.nextInt());
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(CloseContainerEventHandler.LOG);
eventQueue.fireEvent(CloseContainerEventHandler.CLOSE_CONTAINER_EVENT,
eventQueue.fireEvent(CLOSE_CONTAINER,
new ContainerID(id));
eventQueue.processAll(1000);
Assert.assertTrue(logCapturer.getOutput()
@ -112,7 +114,7 @@ public void testCloseContainerEventWithValidContainers() throws IOException {
containerWithPipeline.getContainerInfo().getContainerID());
DatanodeDetails datanode = containerWithPipeline.getPipeline().getLeader();
int closeCount = nodeManager.getCommandCount(datanode);
eventQueue.fireEvent(CloseContainerEventHandler.CLOSE_CONTAINER_EVENT, id);
eventQueue.fireEvent(CLOSE_CONTAINER, id);
eventQueue.processAll(1000);
// At this point of time, the allocated container is not in open
// state, so firing close container event should not queue CLOSE
@ -125,11 +127,12 @@ public void testCloseContainerEventWithValidContainers() throws IOException {
//Execute these state transitions so that we can close the container.
mapping.updateContainerState(id.getId(), CREATE);
mapping.updateContainerState(id.getId(), CREATED);
eventQueue.fireEvent(CloseContainerEventHandler.CLOSE_CONTAINER_EVENT,
eventQueue.fireEvent(CLOSE_CONTAINER,
new ContainerID(
containerWithPipeline.getContainerInfo().getContainerID()));
eventQueue.processAll(1000);
Assert.assertEquals(closeCount + 1, nodeManager.getCommandCount(datanode));
Assert.assertEquals(closeCount + 1,
nodeManager.getCommandCount(datanode));
Assert.assertEquals(HddsProtos.LifeCycleState.CLOSING,
mapping.getStateManager().getContainer(id).getState());
}
@ -145,7 +148,7 @@ public void testCloseContainerEventWithRatis() throws IOException {
ContainerID id = new ContainerID(
containerWithPipeline.getContainerInfo().getContainerID());
int[] closeCount = new int[3];
eventQueue.fireEvent(CloseContainerEventHandler.CLOSE_CONTAINER_EVENT, id);
eventQueue.fireEvent(CLOSE_CONTAINER, id);
eventQueue.processAll(1000);
int i = 0;
for (DatanodeDetails details : containerWithPipeline.getPipeline()
@ -166,7 +169,7 @@ public void testCloseContainerEventWithRatis() throws IOException {
//Execute these state transitions so that we can close the container.
mapping.updateContainerState(id.getId(), CREATE);
mapping.updateContainerState(id.getId(), CREATED);
eventQueue.fireEvent(CloseContainerEventHandler.CLOSE_CONTAINER_EVENT, id);
eventQueue.fireEvent(CLOSE_CONTAINER, id);
eventQueue.processAll(1000);
i = 0;
// Make sure close is queued for each datanode on the pipeline

View File

@ -68,6 +68,7 @@
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
.HEALTHY;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.core.StringStartsWith.startsWith;
import static org.junit.Assert.assertEquals;
@ -1068,11 +1069,6 @@ public void testScmNodeReportUpdate() throws IOException,
foundRemaining = nodeManager.getStats().getRemaining().get();
assertEquals(0, foundRemaining);
// Send a new report to bring the dead node back to healthy
String storagePath = testDir.getAbsolutePath() + "/" + dnId;
List<StorageReportProto> reports = TestUtils
.createStorageReport(capacity, expectedScmUsed, expectedRemaining,
storagePath, null, dnId, 1);
nodeManager.processHeartbeat(datanodeDetails);
// Wait up to 5 seconds so that the dead node becomes healthy
@ -1111,11 +1107,11 @@ public void testHandlingSCMCommandEvent() {
EventQueue eq = new EventQueue();
try (SCMNodeManager nodemanager = createNodeManager(conf)) {
eq.addHandler(SCMNodeManager.DATANODE_COMMAND, nodemanager);
eq.addHandler(DATANODE_COMMAND, nodemanager);
nodemanager
.register(datanodeDetails, TestUtils.createNodeReport(reports));
eq.fireEvent(SCMNodeManager.DATANODE_COMMAND,
eq.fireEvent(DATANODE_COMMAND,
new CommandForDatanode(datanodeDetails.getUuid(),
new CloseContainerCommand(1L, ReplicationType.STAND_ALONE)));

View File

@ -20,8 +20,6 @@
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
@ -40,6 +38,9 @@
import org.junit.Assert;
import org.junit.Test;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT;
/**
* This class tests the behavior of SCMDatanodeHeartbeatDispatcher.
*/
@ -49,8 +50,6 @@ public class TestSCMDatanodeHeartbeatDispatcher {
@Test
public void testNodeReportDispatcher() throws IOException {
Configuration conf = new OzoneConfiguration();
AtomicInteger eventReceived = new AtomicInteger();
NodeReportProto nodeReport = NodeReportProto.getDefaultInstance();
@ -60,10 +59,10 @@ public void testNodeReportDispatcher() throws IOException {
@Override
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
EVENT_TYPE event, PAYLOAD payload) {
Assert.assertEquals(event,
SCMDatanodeHeartbeatDispatcher.NODE_REPORT);
Assert.assertEquals(event, NODE_REPORT);
eventReceived.incrementAndGet();
Assert.assertEquals(nodeReport, ((NodeReportFromDatanode)payload).getReport());
Assert.assertEquals(nodeReport,
((NodeReportFromDatanode)payload).getReport());
}
});
@ -84,7 +83,6 @@ public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
@Test
public void testContainerReportDispatcher() throws IOException {
Configuration conf = new OzoneConfiguration();
AtomicInteger eventReceived = new AtomicInteger();
@ -96,9 +94,9 @@ public void testContainerReportDispatcher() throws IOException {
@Override
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
EVENT_TYPE event, PAYLOAD payload) {
Assert.assertEquals(event,
SCMDatanodeHeartbeatDispatcher.CONTAINER_REPORT);
Assert.assertEquals(containerReport, ((ContainerReportFromDatanode)payload).getReport());
Assert.assertEquals(event, CONTAINER_REPORT);
Assert.assertEquals(containerReport,
((ContainerReportFromDatanode)payload).getReport());
eventReceived.incrementAndGet();
}
});