diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java index e3fee632e7e..ba5078b402c 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java @@ -102,13 +102,13 @@ public void start(EventQueue queue) { queue.addHandler(startEvent, this::handleStartEvent); queue.addHandler(completionEvent, (completionPayload, publisher) -> { - long id = completionPayload.getId(); try { - handleCompletion(id, publisher); + handleCompletion(completionPayload, publisher); } catch (LeaseNotFoundException e) { //It's already done. Too late, we already retried it. //Not a real problem. - LOG.warn("Completion event without active lease. Id={}", id); + LOG.warn("Completion event without active lease. Id={}", + completionPayload.getId()); } }); @@ -140,9 +140,11 @@ private synchronized void handleStartEvent(TIMEOUT_PAYLOAD payload, } } - private synchronized void handleCompletion(long id, - EventPublisher publisher) throws LeaseNotFoundException { + protected synchronized void handleCompletion(COMPLETION_PAYLOAD + completionPayload, EventPublisher publisher) throws + LeaseNotFoundException { metrics.incrementCompletedEvents(); + long id = completionPayload.getId(); leaseManager.release(id); TIMEOUT_PAYLOAD payload = trackedEventsByID.remove(id); trackedEvents.remove(payload); @@ -196,4 +198,12 @@ public List getTimeoutEvents( protected EventWatcherMetrics getMetrics() { return metrics; } + + /** + * Returns a tracked event to which the specified id is + * mapped, or {@code null} if there is no mapping for the id. + */ + public TIMEOUT_PAYLOAD getTrackedEventbyId(long id) { + return trackedEventsByID.get(id); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java index 9413a46021b..054665a1b5a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java @@ -103,7 +103,7 @@ public long getId() { * Wrapper event for Replicate Command. */ public static class ReplicationStatus extends CommandStatusEvent { - ReplicationStatus(CommandStatus cmdStatus) { + public ReplicationStatus(CommandStatus cmdStatus) { super(cmdStatus); } } @@ -112,7 +112,7 @@ public static class ReplicationStatus extends CommandStatusEvent { * Wrapper event for CloseContainer Command. */ public static class CloseContainerStatus extends CommandStatusEvent { - CloseContainerStatus(CommandStatus cmdStatus) { + public CloseContainerStatus(CommandStatus cmdStatus) { super(cmdStatus); } } @@ -121,7 +121,7 @@ public static class CloseContainerStatus extends CommandStatusEvent { * Wrapper event for DeleteBlock Command. */ public static class DeleteBlockCommandStatus extends CommandStatusEvent { - DeleteBlockCommandStatus(CommandStatus cmdStatus) { + public DeleteBlockCommandStatus(CommandStatus cmdStatus) { super(cmdStatus); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java index 863907eaf7b..b94ce4fcb25 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java @@ -23,12 +23,14 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND; +import static org.apache.hadoop.hdds.scm.events.SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ; /** * In case of a node failure, volume failure, volume out of spapce, node @@ -80,6 +82,8 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) { new CloseContainerCommand(containerID.getId(), info.getReplicationType(), info.getPipelineID())); publisher.fireEvent(DATANODE_COMMAND, closeContainerCommand); + publisher.fireEvent(CLOSE_CONTAINER_RETRYABLE_REQ, new + CloseContainerRetryableReq(containerID)); } try { // Finalize event will make sure the state of the container transitions @@ -107,4 +111,26 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) { } } + + /** + * Class to create retryable event. Prevents redundant requests for same + * container Id. + */ + public static class CloseContainerRetryableReq implements + IdentifiableEventPayload { + + private ContainerID containerID; + public CloseContainerRetryableReq(ContainerID containerID) { + this.containerID = containerID; + } + + public ContainerID getContainerID() { + return containerID; + } + + @Override + public long getId() { + return containerID.getId(); + } + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java new file mode 100644 index 00000000000..8e277b9f369 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ *

http://www.apache.org/licenses/LICENSE-2.0 + *

+ *

Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container; + +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.CommandStatus.Status; +import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler + .CloseContainerStatus; + +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.server.events.Event; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.hdds.server.events.EventWatcher; +import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler + .CloseContainerRetryableReq; +import org.apache.hadoop.ozone.lease.LeaseManager; +import org.apache.hadoop.ozone.lease.LeaseNotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * This watcher will watch for CLOSE_CONTAINER_STATUS events fired from + * CommandStatusReport. If required it will re-trigger CloseContainer command + * for DataNodes to CloseContainerEventHandler. + */ +public class CloseContainerWatcher extends + EventWatcher { + + public static final Logger LOG = + LoggerFactory.getLogger(CloseContainerWatcher.class); + private final Mapping containerManager; + + public CloseContainerWatcher(Event startEvent, + Event completionEvent, + LeaseManager leaseManager, Mapping containerManager) { + super(startEvent, completionEvent, leaseManager); + this.containerManager = containerManager; + } + + @Override + protected void onTimeout(EventPublisher publisher, + CloseContainerRetryableReq payload) { + // Let CloseContainerEventHandler handle this message. + this.resendEventToHandler(payload.getId(), publisher); + } + + @Override + protected void onFinished(EventPublisher publisher, + CloseContainerRetryableReq payload) { + LOG.trace("CloseContainerCommand for containerId: {} executed ", payload + .getContainerID().getId()); + } + + @Override + protected synchronized void handleCompletion(CloseContainerStatus status, + EventPublisher publisher) throws LeaseNotFoundException { + // If status is PENDING then return without doing anything. + if(status.getCmdStatus().getStatus().equals(Status.PENDING)){ + return; + } + + CloseContainerRetryableReq closeCont = getTrackedEventbyId(status.getId()); + super.handleCompletion(status, publisher); + // If status is FAILED then send a msg to Handler to resend the command. + if (status.getCmdStatus().getStatus().equals(Status.FAILED) && closeCont + != null) { + this.resendEventToHandler(closeCont.getId(), publisher); + } + } + + private void resendEventToHandler(long containerID, EventPublisher + publisher) { + try { + // Check if container is still open + if (containerManager.getContainer(containerID).isContainerOpen()) { + publisher.fireEvent(SCMEvents.CLOSE_CONTAINER, + ContainerID.valueof(containerID)); + } + } catch (IOException e) { + LOG.warn("Error in CloseContainerWatcher while processing event " + + "for containerId {} ExceptionMsg: ", containerID, e.getMessage()); + } + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index 5911ce206be..9a4f887cd7d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -27,6 +27,7 @@ .DeleteBlockCommandStatus; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler .ReplicationStatus; +import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler.CloseContainerRetryableReq; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .ContainerActionsFromDatanode; @@ -103,6 +104,16 @@ public final class SCMEvents { public static final TypedEvent CLOSE_CONTAINER = new TypedEvent<>(ContainerID.class, "Close_Container"); + /** + * A CLOSE_CONTAINER_RETRYABLE_REQ will be triggered by + * CloseContainerEventHandler after sending a SCMCommand to DataNode. + * CloseContainerWatcher will track this event. Watcher will be responsible + * for retrying it in event of failure or timeout. + */ + public static final TypedEvent + CLOSE_CONTAINER_RETRYABLE_REQ = new TypedEvent<>( + CloseContainerRetryableReq.class, "Close_Container_Retryable"); + /** * This event will be triggered whenever a new datanode is registered with * SCM. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 7e2bc23f209..061ff7855ef 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdds.scm.block.PendingDeleteHandler; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler; import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler; +import org.apache.hadoop.hdds.scm.container.CloseContainerWatcher; import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler; import org.apache.hadoop.hdds.scm.container.ContainerMapping; import org.apache.hadoop.hdds.scm.container.ContainerReportHandler; @@ -257,6 +258,13 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException { scmContainerManager.getStateManager(), eventQueue, commandWatcherLeaseManager); + // setup CloseContainer watcher + CloseContainerWatcher closeContainerWatcher = + new CloseContainerWatcher(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ, + SCMEvents.CLOSE_CONTAINER_STATUS, commandWatcherLeaseManager, + scmContainerManager); + closeContainerWatcher.start(eventQueue); + scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys .OZONE_ADMINISTRATORS); scmUsername = UserGroupInformation.getCurrentUser().getUserName(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/TestCloseContainerWatcher.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/TestCloseContainerWatcher.java new file mode 100644 index 00000000000..56c3830c9b8 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/TestCloseContainerWatcher.java @@ -0,0 +1,287 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container; + +import org.apache.hadoop.hdds.HddsIdFactory; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.CommandStatus; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; +import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler + .CloseContainerStatus; +import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler + .CloseContainerRetryableReq; +import org.apache.hadoop.hdds.scm.container.CloseContainerWatcher; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerMapping; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.hdds.server.events.EventQueue; +import org.apache.hadoop.hdds.server.events.EventWatcher; +import org.apache.hadoop.ozone.lease.LeaseManager; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +import java.io.IOException; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; + +/** + * Test class for {@link CloseContainerWatcher}. + * */ +public class TestCloseContainerWatcher implements EventHandler { + + private static final Logger LOG = LoggerFactory + .getLogger(TestCloseContainerWatcher.class); + private static EventWatcher + watcher; + private static LeaseManager leaseManager; + private static ContainerMapping containerMapping = Mockito + .mock(ContainerMapping.class); + private static EventQueue queue; + @Rule + public Timeout timeout = new Timeout(1000*15); + + @After + public void stop() { + leaseManager.shutdown(); + queue.close(); + } + + /* + * This test will test watcher for Failure status event. + * */ + @Test + public void testWatcherForFailureStatusEvent() throws + InterruptedException, IOException { + setupWatcher(90000L); + long id1 = HddsIdFactory.getLongId(); + long id2 = HddsIdFactory.getLongId(); + queue.addHandler(SCMEvents.CLOSE_CONTAINER, this); + setupMock(id1, id2, true); + GenericTestUtils.LogCapturer testLogger = GenericTestUtils.LogCapturer + .captureLogs(LOG); + GenericTestUtils.LogCapturer watcherLogger = GenericTestUtils.LogCapturer + .captureLogs(CloseContainerWatcher.LOG); + GenericTestUtils.setLogLevel(CloseContainerWatcher.LOG, Level.TRACE); + testLogger.clearOutput(); + watcherLogger.clearOutput(); + + CommandStatus cmdStatus1 = CommandStatus.newBuilder() + .setCmdId(id1) + .setStatus(CommandStatus.Status.FAILED) + .setType(Type.closeContainerCommand).build(); + CommandStatus cmdStatus2 = CommandStatus.newBuilder() + .setCmdId(id2) + .setStatus(CommandStatus.Status.FAILED) + .setType(Type.closeContainerCommand).build(); + + // File events to watcher + queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ, + new CloseContainerRetryableReq(ContainerID.valueof(id1))); + queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ, + new CloseContainerRetryableReq(ContainerID.valueof(id2))); + Thread.sleep(10L); + queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new + CloseContainerStatus(cmdStatus1)); + queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new + CloseContainerStatus(cmdStatus2)); + + Thread.sleep(1000*4L); + // validation + assertTrue(watcherLogger.getOutput().contains("CloseContainerCommand for " + + "containerId: " + id1 + " executed")); + assertTrue(watcherLogger.getOutput().contains("CloseContainerCommand for " + + "containerId: " + id2 + " executed")); + assertTrue( + testLogger.getOutput().contains("Handling closeContainerEvent " + + "for containerId: id=" + id1)); + assertTrue(testLogger.getOutput().contains("Handling closeContainerEvent " + + "for containerId: id=" + id2)); + + } + + @Test + public void testWatcherForPendingStatusEvent() throws + InterruptedException, IOException { + setupWatcher(90000L); + long id1 = HddsIdFactory.getLongId(); + long id2 = HddsIdFactory.getLongId(); + queue.addHandler(SCMEvents.CLOSE_CONTAINER, this); + setupMock(id1, id2, true); + GenericTestUtils.LogCapturer testLogger = GenericTestUtils.LogCapturer + .captureLogs(LOG); + GenericTestUtils.LogCapturer watcherLogger = GenericTestUtils.LogCapturer + .captureLogs(CloseContainerWatcher.LOG); + GenericTestUtils.setLogLevel(CloseContainerWatcher.LOG, Level.TRACE); + testLogger.clearOutput(); + watcherLogger.clearOutput(); + + CommandStatus cmdStatus1 = CommandStatus.newBuilder() + .setCmdId(id1) + .setStatus(CommandStatus.Status.PENDING) + .setType(Type.closeContainerCommand).build(); + CommandStatus cmdStatus2 = CommandStatus.newBuilder() + .setCmdId(id2) + .setStatus(CommandStatus.Status.PENDING) + .setType(Type.closeContainerCommand).build(); + + // File events to watcher + queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ, + new CloseContainerRetryableReq(ContainerID.valueof(id1))); + queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ, + new CloseContainerRetryableReq(ContainerID.valueof(id2))); + Thread.sleep(10L); + queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new + CloseContainerStatus(cmdStatus1)); + queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new + CloseContainerStatus(cmdStatus2)); + + Thread.sleep(1000*2L); + // validation + assertFalse(watcherLogger.getOutput().contains("CloseContainerCommand " + + "for containerId: " + id1 + " executed")); + assertFalse(watcherLogger.getOutput().contains("CloseContainerCommand " + + "for containerId: " + id2 + " executed")); + assertFalse(testLogger.getOutput().contains("Handling " + + "closeContainerEvent for containerId: id=" + id1)); + assertFalse(testLogger.getOutput().contains("Handling " + + "closeContainerEvent for containerId: id=" + id2)); + + } + + @Test + public void testWatcherForExecutedStatusEvent() + throws IOException, InterruptedException { + setupWatcher(90000L); + long id1 = HddsIdFactory.getLongId(); + long id2 = HddsIdFactory.getLongId(); + queue.addHandler(SCMEvents.CLOSE_CONTAINER, this); + setupMock(id1, id2, true); + GenericTestUtils.LogCapturer testLogger = GenericTestUtils.LogCapturer + .captureLogs(LOG); + GenericTestUtils.LogCapturer watcherLogger = GenericTestUtils.LogCapturer + .captureLogs(CloseContainerWatcher.LOG); + GenericTestUtils.setLogLevel(CloseContainerWatcher.LOG, Level.TRACE); + testLogger.clearOutput(); + watcherLogger.clearOutput(); + + // When both of the pending event are executed successfully by DataNode + CommandStatus cmdStatus1 = CommandStatus.newBuilder() + .setCmdId(id1) + .setStatus(CommandStatus.Status.EXECUTED) + .setType(Type.closeContainerCommand).build(); + CommandStatus cmdStatus2 = CommandStatus.newBuilder() + .setCmdId(id2) + .setStatus(CommandStatus.Status.EXECUTED) + .setType(Type.closeContainerCommand).build(); + // File events to watcher + testLogger.clearOutput(); + watcherLogger.clearOutput(); + queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ, + new CloseContainerRetryableReq(ContainerID.valueof(id1))); + queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ, + new CloseContainerRetryableReq(ContainerID.valueof(id2))); + Thread.sleep(10L); + queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, + new CloseContainerStatus(cmdStatus1)); + queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, + new CloseContainerStatus(cmdStatus2)); + + Thread.sleep(1000*3L); + // validation + assertTrue(watcherLogger.getOutput().contains("CloseContainerCommand " + + "for containerId: " + id1 + " executed")); + assertTrue(watcherLogger.getOutput().contains("CloseContainerCommand " + + "for containerId: " + id2 + " executed")); + assertFalse(testLogger.getOutput().contains("Handling " + + "closeContainerEvent for containerId: id=" + id1)); + assertFalse(testLogger.getOutput().contains("Handling " + + "closeContainerEvent for containerId: id=" + id2)); + } + + private void setupWatcher(long time) { + leaseManager = new LeaseManager<>("TestCloseContainerWatcher#LeaseManager", + time); + leaseManager.start(); + watcher = new CloseContainerWatcher(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ, + SCMEvents.CLOSE_CONTAINER_STATUS, leaseManager, containerMapping); + queue = new EventQueue(); + watcher.start(queue); + } + + /* + * This test will fire two retryable closeContainer events. Both will timeout. + * First event container will be open at time of handling so it should be + * sent back to appropriate handler. Second event container will be closed, + * so it should not be retried. + * */ + @Test + public void testWatcherRetryableTimeoutHandling() throws InterruptedException, + IOException { + + long id1 = HddsIdFactory.getLongId(); + long id2 = HddsIdFactory.getLongId(); + setupWatcher(1000L); + queue.addHandler(SCMEvents.CLOSE_CONTAINER, this); + setupMock(id1, id2, false); + GenericTestUtils.LogCapturer testLogger = GenericTestUtils.LogCapturer + .captureLogs(LOG); + testLogger.clearOutput(); + + // File events to watcher + queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ, + new CloseContainerRetryableReq(ContainerID.valueof(id1))); + queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ, + new CloseContainerRetryableReq(ContainerID.valueof(id2))); + + Thread.sleep(1000L + 10); + + // validation + assertTrue(testLogger.getOutput().contains("Handling " + + "closeContainerEvent for containerId: id=" + id1)); + assertFalse(testLogger.getOutput().contains("Handling " + + "closeContainerEvent for containerId: id=" + id2)); + } + + + private void setupMock(long id1, long id2, boolean isOpen) + throws IOException { + ContainerInfo containerInfo = Mockito.mock(ContainerInfo.class); + ContainerInfo containerInfo2 = Mockito.mock(ContainerInfo.class); + when(containerMapping.getContainer(id1)).thenReturn(containerInfo); + when(containerMapping.getContainer(id2)).thenReturn(containerInfo2); + when(containerInfo.isContainerOpen()).thenReturn(true); + when(containerInfo2.isContainerOpen()).thenReturn(isOpen); + } + + @Override + public void onMessage(ContainerID containerID, EventPublisher publisher) { + LOG.info("Handling closeContainerEvent for containerId: {}", containerID); + } +}