From 5ee90efed385db4bf235816145b30a0f691fc91b Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Thu, 12 Jul 2018 10:43:24 -0700 Subject: [PATCH] HDDS-228. Add the ReplicaMaps to ContainerStateManager. Contributed by Ajay Kumar. --- .../scm/container/ContainerStateManager.java | 34 ++++++++ .../container/states/ContainerStateMap.java | 86 +++++++++++++++++++ .../container/TestContainerStateManager.java | 79 +++++++++++++++++ 3 files changed, 199 insertions(+) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java index 870ab1d1e5e..223deacb06d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.container; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; @@ -488,4 +489,37 @@ public class ContainerStateManager implements Closeable { public void close() throws IOException { } + /** + * Returns the latest list of DataNodes where replica for given containerId + * exist. Throws an SCMException if no entry is found for given containerId. + * + * @param containerID + * @return Set + */ + public Set getContainerReplicas(ContainerID containerID) + throws SCMException { + return containers.getContainerReplicas(containerID); + } + + /** + * Add a container Replica for given DataNode. + * + * @param containerID + * @param dn + */ + public void addContainerReplica(ContainerID containerID, DatanodeDetails dn) { + containers.addContainerReplica(containerID, dn); + } + + /** + * Remove a container Replica for given DataNode. + * + * @param containerID + * @param dn + * @return True of dataNode is removed successfully else false. + */ + public boolean removeContainerReplica(ContainerID containerID, + DatanodeDetails dn) throws SCMException { + return containers.removeContainerReplica(containerID, dn); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java index c23b1fd17dc..1c92861c66d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java @@ -18,13 +18,18 @@ package org.apache.hadoop.hdds.scm.container.states; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import java.util.HashSet; +import java.util.Set; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; import org.apache.hadoop.util.AutoCloseableLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,6 +88,8 @@ public class ContainerStateMap { private final ContainerAttribute typeMap; private final Map containerMap; + // Map to hold replicas of given container. + private final Map> contReplicaMap; private final static NavigableSet EMPTY_SET = Collections.unmodifiableNavigableSet(new TreeSet<>()); @@ -101,6 +108,7 @@ public class ContainerStateMap { typeMap = new ContainerAttribute<>(); containerMap = new HashMap<>(); autoLock = new AutoCloseableLock(); + contReplicaMap = new HashMap<>(); // new InstrumentedLock(getClass().getName(), LOG, // new ReentrantLock(), // 1000, @@ -157,6 +165,84 @@ public class ContainerStateMap { return containerMap.get(id); } + /** + * Returns the latest list of DataNodes where replica for given containerId + * exist. Throws an SCMException if no entry is found for given containerId. + * + * @param containerID + * @return Set + */ + public Set getContainerReplicas(ContainerID containerID) + throws SCMException { + Preconditions.checkNotNull(containerID); + try (AutoCloseableLock lock = autoLock.acquire()) { + if (contReplicaMap.containsKey(containerID)) { + return Collections + .unmodifiableSet(contReplicaMap.get(containerID)); + } + } + throw new SCMException( + "No entry exist for containerId: " + containerID + " in replica map.", + ResultCodes.FAILED_TO_FIND_CONTAINER); + } + + /** + * Adds given datanodes as nodes where replica for given containerId exist. + * Logs a debug entry if a datanode is already added as replica for given + * ContainerId. + * + * @param containerID + * @param dnList + */ + public void addContainerReplica(ContainerID containerID, + DatanodeDetails... dnList) { + Preconditions.checkNotNull(containerID); + // Take lock to avoid race condition around insertion. + try (AutoCloseableLock lock = autoLock.acquire()) { + for (DatanodeDetails dn : dnList) { + Preconditions.checkNotNull(dn); + if (contReplicaMap.containsKey(containerID)) { + if(!contReplicaMap.get(containerID).add(dn)) { + LOG.debug("ReplicaMap already contains entry for container Id: " + + "{},DataNode: {}", containerID, dn); + } + } else { + Set dnSet = new HashSet<>(); + dnSet.add(dn); + contReplicaMap.put(containerID, dnSet); + } + } + } + } + + /** + * Remove a container Replica for given DataNode. + * + * @param containerID + * @param dn + * @return True of dataNode is removed successfully else false. + */ + public boolean removeContainerReplica(ContainerID containerID, + DatanodeDetails dn) throws SCMException { + Preconditions.checkNotNull(containerID); + Preconditions.checkNotNull(dn); + + // Take lock to avoid race condition. + try (AutoCloseableLock lock = autoLock.acquire()) { + if (contReplicaMap.containsKey(containerID)) { + return contReplicaMap.get(containerID).remove(dn); + } + } + throw new SCMException( + "No entry exist for containerId: " + containerID + " in replica map.", + ResultCodes.FAILED_TO_FIND_CONTAINER); + } + + @VisibleForTesting + public static Logger getLOG() { + return LOG; + } + /** * Returns the full container Map. * diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java index bb8565072cf..9e209af517a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java @@ -17,14 +17,22 @@ package org.apache.hadoop.hdds.scm.container; import com.google.common.primitives.Longs; +import java.util.Set; +import java.util.UUID; +import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -35,6 +43,7 @@ import java.util.ArrayList; import java.util.List; import java.util.NavigableSet; import java.util.Random; +import org.slf4j.event.Level; /** * Tests for ContainerStateManager. @@ -333,4 +342,74 @@ public class TestContainerStateManager { Assert.assertEquals(allocatedSize, currentInfo.getAllocatedBytes()); } } + + @Test + public void testReplicaMap() throws Exception { + GenericTestUtils.setLogLevel(ContainerStateMap.getLOG(), Level.DEBUG); + GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer + .captureLogs(ContainerStateMap.getLOG()); + DatanodeDetails dn1 = DatanodeDetails.newBuilder().setHostName("host1") + .setIpAddress("1.1.1.1") + .setUuid(UUID.randomUUID().toString()).build(); + DatanodeDetails dn2 = DatanodeDetails.newBuilder().setHostName("host2") + .setIpAddress("2.2.2.2") + .setUuid(UUID.randomUUID().toString()).build(); + + // Test 1: no replica's exist + ContainerID containerID = ContainerID.valueof(RandomUtils.nextLong()); + Set replicaSet; + LambdaTestUtils.intercept(SCMException.class, "", () -> { + containerStateManager.getContainerReplicas(containerID); + }); + + // Test 2: Add replica nodes and then test + containerStateManager.addContainerReplica(containerID, dn1); + containerStateManager.addContainerReplica(containerID, dn2); + replicaSet = containerStateManager.getContainerReplicas(containerID); + Assert.assertEquals(2, replicaSet.size()); + Assert.assertTrue(replicaSet.contains(dn1)); + Assert.assertTrue(replicaSet.contains(dn2)); + + // Test 3: Remove one replica node and then test + containerStateManager.removeContainerReplica(containerID, dn1); + replicaSet = containerStateManager.getContainerReplicas(containerID); + Assert.assertEquals(1, replicaSet.size()); + Assert.assertFalse(replicaSet.contains(dn1)); + Assert.assertTrue(replicaSet.contains(dn2)); + + // Test 3: Remove second replica node and then test + containerStateManager.removeContainerReplica(containerID, dn2); + replicaSet = containerStateManager.getContainerReplicas(containerID); + Assert.assertEquals(0, replicaSet.size()); + Assert.assertFalse(replicaSet.contains(dn1)); + Assert.assertFalse(replicaSet.contains(dn2)); + + // Test 4: Re-insert dn1 + containerStateManager.addContainerReplica(containerID, dn1); + replicaSet = containerStateManager.getContainerReplicas(containerID); + Assert.assertEquals(1, replicaSet.size()); + Assert.assertTrue(replicaSet.contains(dn1)); + Assert.assertFalse(replicaSet.contains(dn2)); + + // Re-insert dn2 + containerStateManager.addContainerReplica(containerID, dn2); + replicaSet = containerStateManager.getContainerReplicas(containerID); + Assert.assertEquals(2, replicaSet.size()); + Assert.assertTrue(replicaSet.contains(dn1)); + Assert.assertTrue(replicaSet.contains(dn2)); + + Assert.assertFalse(logCapturer.getOutput().contains( + "ReplicaMap already contains entry for container Id: " + containerID + .toString() + ",DataNode: " + dn1.toString())); + // Re-insert dn1 + containerStateManager.addContainerReplica(containerID, dn1); + replicaSet = containerStateManager.getContainerReplicas(containerID); + Assert.assertEquals(2, replicaSet.size()); + Assert.assertTrue(replicaSet.contains(dn1)); + Assert.assertTrue(replicaSet.contains(dn2)); + Assert.assertTrue(logCapturer.getOutput().contains( + "ReplicaMap already contains entry for container Id: " + containerID + .toString() + ",DataNode: " + dn1.toString())); + } + }