From 25ec339af53c6e191482f80be6de4e7d47c481e0 Mon Sep 17 00:00:00 2001 From: Hanisha Koneru Date: Fri, 7 Sep 2018 11:20:25 -0700 Subject: [PATCH] HDDS-400. Check global replication state for containers of dead node. Contributed by Elek, Marton. --- .../scm/container/ContainerReportHandler.java | 49 +++++----- .../scm/container/ContainerStateManager.java | 38 +++++++- .../hadoop/hdds/scm/node/DeadNodeHandler.java | 12 +++ .../org/apache/hadoop/hdds/scm/TestUtils.java | 45 +++++++++ .../container/TestContainerReportHandler.java | 18 +--- .../container/TestContainerStateManager.java | 96 +++++++++++++++++++ .../hdds/scm/node/TestDeadNodeHandler.java | 95 ++++++++++++------ ...TestContainerStateManagerIntegration.java} | 2 +- 8 files changed, 279 insertions(+), 76 deletions(-) create mode 100644 hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java rename hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/{TestContainerStateManager.java => TestContainerStateManagerIntegration.java} (99%) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java index 5ca2bcb5554..dcbd49c5ec7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hdds.scm.container; import java.io.IOException; @@ -23,18 +22,16 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.replication - .ReplicationActivityStatus; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus; import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; import org.apache.hadoop.hdds.scm.node.states.ReportResult; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher - .ContainerReportFromDatanode; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -59,22 +56,21 @@ public class ContainerReportHandler implements private ReplicationActivityStatus replicationStatus; - public ContainerReportHandler(Mapping containerMapping, Node2ContainerMap node2ContainerMap, ReplicationActivityStatus replicationActivityStatus) { Preconditions.checkNotNull(containerMapping); Preconditions.checkNotNull(node2ContainerMap); Preconditions.checkNotNull(replicationActivityStatus); + this.containerStateManager = containerMapping.getStateManager(); this.containerMapping = containerMapping; this.node2ContainerMap = node2ContainerMap; - this.containerStateManager = containerMapping.getStateManager(); this.replicationStatus = replicationActivityStatus; } @Override public void onMessage(ContainerReportFromDatanode containerReportFromDatanode, - EventPublisher publisher) { + EventPublisher publisher) { DatanodeDetails datanodeOrigin = containerReportFromDatanode.getDatanodeDetails(); @@ -88,7 +84,8 @@ public class ContainerReportHandler implements .processContainerReports(datanodeOrigin, containerReport, false); Set containerIds = containerReport.getReportsList().stream() - .map(containerProto -> containerProto.getContainerID()) + .map(StorageContainerDatanodeProtocolProtos + .ContainerInfo::getContainerID) .map(ContainerID::new) .collect(Collectors.toSet()); @@ -102,13 +99,12 @@ public class ContainerReportHandler implements for (ContainerID containerID : reportResult.getMissingContainers()) { containerStateManager .removeContainerReplica(containerID, datanodeOrigin); - emitReplicationRequestEvent(containerID, publisher); + checkReplicationState(containerID, publisher); } for (ContainerID containerID : reportResult.getNewContainers()) { containerStateManager.addContainerReplica(containerID, datanodeOrigin); - - emitReplicationRequestEvent(containerID, publisher); + checkReplicationState(containerID, publisher); } } catch (IOException e) { @@ -119,8 +115,9 @@ public class ContainerReportHandler implements } - private void emitReplicationRequestEvent(ContainerID containerID, - EventPublisher publisher) throws SCMException { + private void checkReplicationState(ContainerID containerID, + EventPublisher publisher) + throws SCMException { ContainerInfo container = containerStateManager.getContainer(containerID); if (container == null) { @@ -134,18 +131,18 @@ public class ContainerReportHandler implements if (container.isContainerOpen()) { return; } - if (replicationStatus.isReplicationEnabled()) { - - int existingReplicas = - containerStateManager.getContainerReplicas(containerID).size(); - - int expectedReplicas = container.getReplicationFactor().getNumber(); - - if (existingReplicas != expectedReplicas) { + ReplicationRequest replicationState = + containerStateManager.checkReplicationState(containerID); + if (replicationState != null) { + if (replicationStatus.isReplicationEnabled()) { publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER, - new ReplicationRequest(containerID.getId(), existingReplicas, - container.getReplicationFactor().getNumber())); + replicationState); + } else { + LOG.warn( + "Over/under replicated container but the replication is not " + + "(yet) enabled: " + + replicationState.toString()); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java index 421d34e7001..eb8f2e3f0f1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.container; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -27,6 +28,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; import org.apache.hadoop.hdds.scm.container.states.ContainerState; import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap; import org.apache.hadoop.hdds.scm.exceptions.SCMException; @@ -40,6 +42,7 @@ import org.apache.hadoop.ozone.common.statemachine .InvalidStateTransitionException; import org.apache.hadoop.ozone.common.statemachine.StateMachine; import org.apache.hadoop.util.Time; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,7 +151,7 @@ public class ContainerStateManager implements Closeable { finalStates); initializeStateMachine(); - this.containerSize =(long)configuration.getStorageSize( + this.containerSize = (long) configuration.getStorageSize( ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); @@ -399,7 +402,7 @@ public class ContainerStateManager implements Closeable { // container ID. ContainerState key = new ContainerState(owner, type, factor); ContainerID lastID = lastUsedMap.get(key); - if(lastID == null) { + if (lastID == null) { lastID = matchingSet.first(); } @@ -426,7 +429,7 @@ public class ContainerStateManager implements Closeable { selectedContainer = findContainerWithSpace(size, resultSet, owner); } // Update the allocated Bytes on this container. - if(selectedContainer != null) { + if (selectedContainer != null) { selectedContainer.updateAllocatedBytes(size); } return selectedContainer; @@ -539,9 +542,36 @@ public class ContainerStateManager implements Closeable { DatanodeDetails dn) throws SCMException { return containers.removeContainerReplica(containerID, dn); } - + + /** + * Compare the existing replication number with the expected one. + */ + public ReplicationRequest checkReplicationState(ContainerID containerID) + throws SCMException { + int existingReplicas = getContainerReplicas(containerID).size(); + int expectedReplicas = getContainer(containerID) + .getReplicationFactor().getNumber(); + if (existingReplicas != expectedReplicas) { + return new ReplicationRequest(containerID.getId(), existingReplicas, + expectedReplicas); + } + return null; + } + + /** + * Checks if the container is open. + */ + public boolean isOpen(ContainerID containerID) { + Preconditions.checkNotNull(containerID); + ContainerInfo container = Preconditions + .checkNotNull(getContainer(containerID), + "Container can't be found " + containerID); + return container.isContainerOpen(); + } + @VisibleForTesting public ContainerStateMap getContainerStateMap() { return containers; } + } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java index c853b3b61c5..d694a103a69 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java @@ -23,6 +23,8 @@ import java.util.Set; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerStateManager; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; import org.apache.hadoop.hdds.server.events.EventHandler; @@ -62,6 +64,16 @@ public class DeadNodeHandler implements EventHandler { try { containerStateManager.removeContainerReplica(container, datanodeDetails); + + if (!containerStateManager.isOpen(container)) { + ReplicationRequest replicationRequest = + containerStateManager.checkReplicationState(container); + + if (replicationRequest != null) { + publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER, + replicationRequest); + } + } } catch (SCMException e) { LOG.error("Can't remove container from containerStateMap {}", container .getId(), e); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java index d617680dbca..7af9dda4fb0 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java @@ -17,6 +17,11 @@ package org.apache.hadoop.hdds.scm; import com.google.common.base.Preconditions; +import org.mockito.Mockito; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerInfo; import org.apache.hadoop.hdds.protocol.proto @@ -31,12 +36,18 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.StorageTypeProto; +import org.apache.hadoop.hdds.scm.container.ContainerStateManager; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.SCMNodeManager; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -376,5 +387,39 @@ public final class TestUtils { return report.build(); } + public static + org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo + allocateContainer(ContainerStateManager containerStateManager) + throws IOException { + PipelineSelector pipelineSelector = Mockito.mock(PipelineSelector.class); + + Pipeline pipeline = new Pipeline("leader", HddsProtos.LifeCycleState.CLOSED, + HddsProtos.ReplicationType.STAND_ALONE, + HddsProtos.ReplicationFactor.THREE, + PipelineID.randomId()); + + when(pipelineSelector + .getReplicationPipeline(HddsProtos.ReplicationType.STAND_ALONE, + HddsProtos.ReplicationFactor.THREE)).thenReturn(pipeline); + + return containerStateManager + .allocateContainer(pipelineSelector, + HddsProtos.ReplicationType.STAND_ALONE, + HddsProtos.ReplicationFactor.THREE, "root").getContainerInfo(); + + } + + public static void closeContainer(ContainerStateManager containerStateManager, + org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo + container) + throws SCMException { + + containerStateManager.getContainerStateMap() + .updateState(container, container.getState(), LifeCycleState.CLOSING); + + containerStateManager.getContainerStateMap() + .updateState(container, container.getState(), LifeCycleState.CLOSED); + + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java index 66f0966497f..d74a32f6110 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java @@ -71,9 +71,7 @@ public class TestContainerReportHandler implements EventPublisher { @Test public void test() throws IOException { - - //given - + //GIVEN OzoneConfiguration conf = new OzoneConfiguration(); Node2ContainerMap node2ContainerMap = new Node2ContainerMap(); Mapping mapping = Mockito.mock(Mapping.class); @@ -133,19 +131,9 @@ public class TestContainerReportHandler implements EventPublisher { long c3 = cont3.getContainerID(); // Close remaining containers - try { - containerStateManager.getContainerStateMap() - .updateState(cont1, cont1.getState(), LifeCycleState.CLOSING); - containerStateManager.getContainerStateMap() - .updateState(cont1, cont1.getState(), LifeCycleState.CLOSED); - containerStateManager.getContainerStateMap() - .updateState(cont2, cont2.getState(), LifeCycleState.CLOSING); - containerStateManager.getContainerStateMap() - .updateState(cont2, cont2.getState(), LifeCycleState.CLOSED); + TestUtils.closeContainer(containerStateManager, cont1); + TestUtils.closeContainer(containerStateManager, cont2); - } catch (IOException e) { - LOG.info("Failed to change state of open containers.", e); - } //when //initial reports before replication is enabled. 2 containers w 3 replicas. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java new file mode 100644 index 00000000000..fe92ee5f1ef --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +import java.io.IOException; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +/** + * Testing ContainerStatemanager. + */ +public class TestContainerStateManager { + + private ContainerStateManager containerStateManager; + + @Before + public void init() throws IOException { + OzoneConfiguration conf = new OzoneConfiguration(); + Mapping mapping = Mockito.mock(Mapping.class); + containerStateManager = new ContainerStateManager(conf, mapping); + + } + + @Test + public void checkReplicationStateOK() throws IOException { + //GIVEN + ContainerInfo c1 = TestUtils.allocateContainer(containerStateManager); + + DatanodeDetails d1 = TestUtils.randomDatanodeDetails(); + DatanodeDetails d2 = TestUtils.randomDatanodeDetails(); + DatanodeDetails d3 = TestUtils.randomDatanodeDetails(); + + addReplica(c1, d1); + addReplica(c1, d2); + addReplica(c1, d3); + + //WHEN + ReplicationRequest replicationRequest = containerStateManager + .checkReplicationState(new ContainerID(c1.getContainerID())); + + //THEN + Assert.assertNull(replicationRequest); + } + + @Test + public void checkReplicationStateMissingReplica() throws IOException { + //GIVEN + + ContainerInfo c1 = TestUtils.allocateContainer(containerStateManager); + + DatanodeDetails d1 = TestUtils.randomDatanodeDetails(); + DatanodeDetails d2 = TestUtils.randomDatanodeDetails(); + + addReplica(c1, d1); + addReplica(c1, d2); + + //WHEN + ReplicationRequest replicationRequest = containerStateManager + .checkReplicationState(new ContainerID(c1.getContainerID())); + + Assert + .assertEquals(c1.getContainerID(), replicationRequest.getContainerId()); + Assert.assertEquals(2, replicationRequest.getReplicationCount()); + Assert.assertEquals(3, replicationRequest.getExpecReplicationCount()); + } + + private void addReplica(ContainerInfo c1, DatanodeDetails d1) { + containerStateManager + .addContainerReplica(new ContainerID(c1.getContainerID()), d1); + } + +} \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java index 4be10e102fb..0b69f5f8f4e 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java @@ -18,76 +18,76 @@ package org.apache.hadoop.hdds.scm.node; -import java.util.HashSet; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Set; +import java.util.stream.Collectors; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerInfo; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerStateManager; import org.apache.hadoop.hdds.scm.container.Mapping; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.junit.Assert; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import static org.mockito.Matchers.eq; import org.mockito.Mockito; /** * Test DeadNodeHandler. */ public class TestDeadNodeHandler { + + private List sentEvents = new ArrayList<>(); + @Test - public void testOnMessage() throws SCMException { + public void testOnMessage() throws IOException { //GIVEN DatanodeDetails datanode1 = TestUtils.randomDatanodeDetails(); DatanodeDetails datanode2 = TestUtils.randomDatanodeDetails(); - ContainerInfo container1 = TestUtils.getRandomContainerInfo(1); - ContainerInfo container2 = TestUtils.getRandomContainerInfo(2); - ContainerInfo container3 = TestUtils.getRandomContainerInfo(3); - Node2ContainerMap node2ContainerMap = new Node2ContainerMap(); ContainerStateManager containerStateManager = new ContainerStateManager( new OzoneConfiguration(), Mockito.mock(Mapping.class) ); + + ContainerInfo container1 = + TestUtils.allocateContainer(containerStateManager); + ContainerInfo container2 = + TestUtils.allocateContainer(containerStateManager); + ContainerInfo container3 = + TestUtils.allocateContainer(containerStateManager); + DeadNodeHandler handler = new DeadNodeHandler(node2ContainerMap, containerStateManager); - node2ContainerMap - .insertNewDatanode(datanode1.getUuid(), new HashSet() {{ - add(new ContainerID(container1.getContainerID())); - add(new ContainerID(container2.getContainerID())); - }}); + registerReplicas(node2ContainerMap, datanode1, container1, container2); + registerReplicas(node2ContainerMap, datanode2, container1, container3); - node2ContainerMap - .insertNewDatanode(datanode2.getUuid(), new HashSet() {{ - add(new ContainerID(container1.getContainerID())); - add(new ContainerID(container3.getContainerID())); - }}); + registerReplicas(containerStateManager, container1, datanode1, datanode2); + registerReplicas(containerStateManager, container2, datanode1); + registerReplicas(containerStateManager, container3, datanode2); - containerStateManager.getContainerStateMap() - .addContainerReplica(new ContainerID(container1.getContainerID()), - datanode1, datanode2); + TestUtils.closeContainer(containerStateManager, container1); - containerStateManager.getContainerStateMap() - .addContainerReplica(new ContainerID(container2.getContainerID()), - datanode1); - - containerStateManager.getContainerStateMap() - .addContainerReplica(new ContainerID(container3.getContainerID()), - datanode2); + EventPublisher publisher = Mockito.mock(EventPublisher.class); //WHEN datanode1 is dead - handler.onMessage(datanode1, Mockito.mock(EventPublisher.class)); + handler.onMessage(datanode1, publisher); //THEN - //node2ContainerMap has not been changed Assert.assertEquals(2, node2ContainerMap.size()); @@ -108,5 +108,40 @@ public class TestDeadNodeHandler { Assert.assertEquals(1, container3Replicas.size()); Assert.assertEquals(datanode2, container3Replicas.iterator().next()); + ArgumentCaptor replicationRequestParameter = + ArgumentCaptor.forClass(ReplicationRequest.class); + + Mockito.verify(publisher) + .fireEvent(eq(SCMEvents.REPLICATE_CONTAINER), + replicationRequestParameter.capture()); + + Assert + .assertEquals(container1.getContainerID(), + replicationRequestParameter.getValue().getContainerId()); + Assert + .assertEquals(1, + replicationRequestParameter.getValue().getReplicationCount()); + Assert + .assertEquals(3, + replicationRequestParameter.getValue().getExpecReplicationCount()); } + + private void registerReplicas(ContainerStateManager containerStateManager, + ContainerInfo container, DatanodeDetails... datanodes) { + containerStateManager.getContainerStateMap() + .addContainerReplica(new ContainerID(container.getContainerID()), + datanodes); + } + + private void registerReplicas(Node2ContainerMap node2ContainerMap, + DatanodeDetails datanode, + ContainerInfo... containers) + throws SCMException { + node2ContainerMap + .insertNewDatanode(datanode.getUuid(), + Arrays.stream(containers) + .map(container -> new ContainerID(container.getContainerID())) + .collect(Collectors.toSet())); + } + } \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java similarity index 99% rename from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java rename to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java index 9e209af517a..c6e819bc6e8 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java @@ -48,7 +48,7 @@ import org.slf4j.event.Level; /** * Tests for ContainerStateManager. */ -public class TestContainerStateManager { +public class TestContainerStateManagerIntegration { private OzoneConfiguration conf; private MiniOzoneCluster cluster;