diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java index 25d302b86c2..6ecc48a95fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java @@ -120,12 +120,12 @@ class ReplicaMap { ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) { checkBlockPool(bpid); checkBlock(replicaInfo); - try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { + try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) { LightWeightResizableGSet m = map.get(bpid); if (m == null) { // Add an entry for block pool if it does not exist already - m = new LightWeightResizableGSet(); - map.put(bpid, m); + map.putIfAbsent(bpid, new LightWeightResizableGSet()); + m = map.get(bpid); } return m.put(replicaInfo); } @@ -138,12 +138,12 @@ class ReplicaMap { ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) { checkBlockPool(bpid); checkBlock(replicaInfo); - try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { + try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) { LightWeightResizableGSet m = map.get(bpid); if (m == null) { // Add an entry for block pool if it does not exist already - m = new LightWeightResizableGSet(); - map.put(bpid, m); + map.putIfAbsent(bpid, new LightWeightResizableGSet()); + m = map.get(bpid); } ReplicaInfo oldReplicaInfo = m.get(replicaInfo); if (oldReplicaInfo != null) { @@ -202,7 +202,7 @@ class ReplicaMap { ReplicaInfo remove(String bpid, Block block) { checkBlockPool(bpid); checkBlock(block); - try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { + try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) { LightWeightResizableGSet m = map.get(bpid); if (m != null) { ReplicaInfo replicaInfo = m.get(block); @@ -224,7 +224,7 @@ class ReplicaMap { */ ReplicaInfo remove(String bpid, long blockId) { checkBlockPool(bpid); - try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { + try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) { LightWeightResizableGSet m = map.get(bpid); if (m != null) { return m.remove(new Block(blockId)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 23a72f9acfa..f250eea2920 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -21,6 +21,10 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.function.Supplier; import org.apache.hadoop.fs.DF; @@ -602,6 +606,53 @@ public class TestFsDatasetImpl { + "volumeMap.", 0, totalNumReplicas); } + @Test(timeout = 30000) + public void testConcurrentWriteAndDeleteBlock() throws Exception { + // Feed FsDataset with block metadata. + final int numBlocks = 1000; + final int threadCount = 10; + // Generate data blocks. + ExecutorService pool = Executors.newFixedThreadPool(threadCount); + List> futureList = new ArrayList<>(); + Random random = new Random(); + // Random write block and delete half of them. + for (int i = 0; i < threadCount; i++) { + Thread thread = new Thread() { + @Override + public void run() { + try { + String bpid = BLOCK_POOL_IDS[random.nextInt(BLOCK_POOL_IDS.length)]; + for (int blockId = 0; blockId < numBlocks; blockId++) { + ExtendedBlock eb = new ExtendedBlock(bpid, blockId); + ReplicaHandler replica = null; + try { + replica = dataset.createRbw(StorageType.DEFAULT, null, eb, + false); + if (blockId % 2 > 0) { + dataset.invalidate(bpid, new Block[]{eb.getLocalBlock()}); + } + } finally { + if (replica != null) { + replica.close(); + } + } + } + // Just keep final consistency no need to care exception. + } catch (Exception ignore) {} + } + }; + thread.setName("AddBlock" + i); + futureList.add(pool.submit(thread)); + } + // Wait for data generation + for (Future f : futureList) { + f.get(); + } + for (String bpid : dataset.volumeMap.getBlockPoolList()) { + assertEquals(numBlocks / 2, dataset.volumeMap.size(bpid)); + } + } + @Test(timeout = 5000) public void testRemoveNewlyAddedVolume() throws IOException { final int numExistingVolumes = getNumVolumes();