diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java index 43f016701dd..8e713998b0d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.node; import java.util.Set; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.ContainerException; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; @@ -81,8 +82,6 @@ public class DeadNodeHandler implements EventHandler { try { final ContainerInfo container = containerManager.getContainer(id); // TODO: For open containers, trigger close on other nodes - // TODO: Check replica count and call replication manager - // on these containers. if (!container.isOpen()) { Set replicas = containerManager .getContainerReplicas(id); @@ -92,6 +91,9 @@ public class DeadNodeHandler implements EventHandler { .ifPresent(replica -> { try { containerManager.removeContainerReplica(id, replica); + ContainerInfo containerInfo = + containerManager.getContainer(id); + replicateIfNeeded(containerInfo, publisher); } catch (ContainerException ex) { LOG.warn("Exception while removing container replica #{} " + "for container #{}.", replica, container, ex); @@ -109,13 +111,21 @@ public class DeadNodeHandler implements EventHandler { */ private void replicateIfNeeded(ContainerInfo container, EventPublisher publisher) throws ContainerNotFoundException { - final int existingReplicas = containerManager - .getContainerReplicas(container.containerID()).size(); - final int expectedReplicas = container.getReplicationFactor().getNumber(); - if (existingReplicas != expectedReplicas) { - publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER, - new ReplicationRequest( - container.getContainerID(), existingReplicas, expectedReplicas)); + // Replicate only closed and Quasi closed containers + if (container.getState() == HddsProtos.LifeCycleState.CLOSED || + container.getState() == HddsProtos.LifeCycleState.QUASI_CLOSED) { + final int existingReplicas = containerManager + .getContainerReplicas(container.containerID()).size(); + final int expectedReplicas = container.getReplicationFactor().getNumber(); + if (existingReplicas != expectedReplicas) { + LOG.debug("Replicate Request fired for container {}, exisiting " + + "replica count {}, expected replica count {}", + container.getContainerID(), existingReplicas, expectedReplicas); + publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER, + new ReplicationRequest( + container.getContainerID(), existingReplicas, + expectedReplicas)); + } } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java index e99739cf58d..0f9a5e4c2d5 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java @@ -446,4 +446,19 @@ public final class TestUtils { id, HddsProtos.LifeCycleEvent.CLOSE); } + + /** + * Move the container to Quaise close state. + * @param containerManager + * @param id + * @throws IOException + */ + public static void quasiCloseContainer(ContainerManager containerManager, + ContainerID id) throws IOException { + containerManager.updateContainerState( + id, HddsProtos.LifeCycleEvent.FINALIZE); + containerManager.updateContainerState( + id, HddsProtos.LifeCycleEvent.QUASI_CLOSE); + + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java index 9fbefcb3f19..ec216d8ad0c 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java @@ -59,6 +59,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import org.slf4j.event.Level; /** * Test DeadNodeHandler. @@ -140,17 +141,25 @@ public class TestDeadNodeHandler { TestUtils.allocateContainer(containerManager); ContainerInfo container3 = TestUtils.allocateContainer(containerManager); + ContainerInfo container4 = + TestUtils.allocateContainer(containerManager); - registerReplicas(datanode1, container1, container2); - registerReplicas(datanode2, container1, container3); + registerContainers(datanode1, container1, container2, container4); + registerContainers(datanode2, container1, container2); + registerContainers(datanode3, container3); registerReplicas(containerManager, container1, datanode1, datanode2); - registerReplicas(containerManager, container2, datanode1); - registerReplicas(containerManager, container3, datanode2); + registerReplicas(containerManager, container2, datanode1, datanode2); + registerReplicas(containerManager, container3, datanode3); + registerReplicas(containerManager, container4, datanode1); TestUtils.closeContainer(containerManager, container1.containerID()); TestUtils.closeContainer(containerManager, container2.containerID()); - TestUtils.closeContainer(containerManager, container3.containerID()); + TestUtils.quasiCloseContainer(containerManager, container3.containerID()); + + GenericTestUtils.setLogLevel(DeadNodeHandler.getLogger(), Level.DEBUG); + GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer + .captureLogs(DeadNodeHandler.getLogger()); deadNodeHandler.onMessage(datanode1, publisher); @@ -162,13 +171,33 @@ public class TestDeadNodeHandler { Set container2Replicas = containerManager .getContainerReplicas(new ContainerID(container2.getContainerID())); - Assert.assertEquals(0, container2Replicas.size()); + Assert.assertEquals(1, container2Replicas.size()); + Assert.assertEquals(datanode2, + container2Replicas.iterator().next().getDatanodeDetails()); Set container3Replicas = containerManager .getContainerReplicas(new ContainerID(container3.getContainerID())); Assert.assertEquals(1, container3Replicas.size()); - Assert.assertEquals(datanode2, + Assert.assertEquals(datanode3, container3Replicas.iterator().next().getDatanodeDetails()); + + // Replicate should be fired for container 1 and container 2 as now + // datanode 1 is dead, these 2 will not match with expected replica count + // and their state is one of CLOSED/QUASI_CLOSE. + Assert.assertTrue(logCapturer.getOutput().contains( + "Replicate Request fired for container " + + container1.getContainerID())); + Assert.assertTrue(logCapturer.getOutput().contains( + "Replicate Request fired for container " + + container2.getContainerID())); + + // as container4 is still in open state, replicate event should not have + // fired for this. + Assert.assertFalse(logCapturer.getOutput().contains( + "Replicate Request fired for container " + + container4.getContainerID())); + + } @Test @@ -272,7 +301,13 @@ public class TestDeadNodeHandler { } } - private void registerReplicas(DatanodeDetails datanode, + /** + * Update containers available on the datanode. + * @param datanode + * @param containers + * @throws NodeNotFoundException + */ + private void registerContainers(DatanodeDetails datanode, ContainerInfo... containers) throws NodeNotFoundException { nodeManager