diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java index 7411055d2d6..d85028bf161 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java @@ -32,7 +32,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; import java.util.Set; import java.util.Collections; import java.util.Map; @@ -138,7 +137,7 @@ public void addContainer(final ContainerInfo info) ownerMap.insert(info.getOwner(), id); factorMap.insert(info.getReplicationFactor(), id); typeMap.insert(info.getReplicationType(), id); - replicaMap.put(id, new HashSet<>()); + replicaMap.put(id, ConcurrentHashMap.newKeySet()); // Flush the cache of this container type, will be added later when // get container queries are executed. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java index 43aaa85a4d9..91a36b73034 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java @@ -44,12 +44,15 @@ import java.io.File; import java.io.IOException; import java.util.Iterator; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * Tests for Container ContainerManager. @@ -194,6 +197,47 @@ public void testGetContainerWithPipeline() throws Exception { Assert.assertTrue(replicaNodes.contains(dn1)); } + @Test + public void testGetContainerReplicaWithParallelUpdate() throws Exception { + testGetContainerWithPipeline(); + final Optional id = containerManager.getContainerIDs() + .stream().findFirst(); + Assert.assertTrue(id.isPresent()); + final ContainerID cId = id.get(); + final Optional replica = containerManager + .getContainerReplicas(cId).stream().findFirst(); + Assert.assertTrue(replica.isPresent()); + final ContainerReplica cReplica = replica.get(); + final AtomicBoolean runUpdaterThread = + new AtomicBoolean(true); + + Thread updaterThread = new Thread(() -> { + while (runUpdaterThread.get()) { + try { + containerManager.removeContainerReplica(cId, cReplica); + containerManager.updateContainerReplica(cId, cReplica); + } catch (ContainerException e) { + Assert.fail("Container Exception: " + e.getMessage()); + } + } + }); + + updaterThread.setDaemon(true); + updaterThread.start(); + + IntStream.range(0, 100).forEach(i -> { + try { + Assert.assertNotNull(containerManager + .getContainerReplicas(cId) + .stream().map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toSet())); + } catch (ContainerNotFoundException e) { + Assert.fail("Missing Container " + id); + } + }); + runUpdaterThread.set(false); + } + @Test public void testgetNoneExistentContainer() { try {