HDFS-16511. Improve lock type for ReplicaMap under fine-grain lock mode. (#4085). Contributed by limingxiang.

Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
He Xiaoqiao 2022-03-31 14:00:38 +08:00
parent ab8c360620
commit 2bf78e2416
No known key found for this signature in database
GPG Key ID: A80CC124E9A0FA63
2 changed files with 59 additions and 8 deletions

View File

@ -120,12 +120,12 @@ class ReplicaMap {
ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
checkBlockPool(bpid);
checkBlock(replicaInfo);
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m == null) {
// Add an entry for block pool if it does not exist already
m = new LightWeightResizableGSet<Block, ReplicaInfo>();
map.put(bpid, m);
map.putIfAbsent(bpid, new LightWeightResizableGSet<Block, ReplicaInfo>());
m = map.get(bpid);
}
return m.put(replicaInfo);
}
@ -138,12 +138,12 @@ class ReplicaMap {
ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) {
checkBlockPool(bpid);
checkBlock(replicaInfo);
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m == null) {
// Add an entry for block pool if it does not exist already
m = new LightWeightResizableGSet<Block, ReplicaInfo>();
map.put(bpid, m);
map.putIfAbsent(bpid, new LightWeightResizableGSet<Block, ReplicaInfo>());
m = map.get(bpid);
}
ReplicaInfo oldReplicaInfo = m.get(replicaInfo);
if (oldReplicaInfo != null) {
@ -202,7 +202,7 @@ class ReplicaMap {
ReplicaInfo remove(String bpid, Block block) {
checkBlockPool(bpid);
checkBlock(block);
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m != null) {
ReplicaInfo replicaInfo = m.get(block);
@ -224,7 +224,7 @@ class ReplicaMap {
*/
ReplicaInfo remove(String bpid, long blockId) {
checkBlockPool(bpid);
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m != null) {
return m.remove(new Block(blockId));

View File

@ -21,6 +21,10 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import org.apache.hadoop.fs.DF;
@ -602,6 +606,53 @@ public class TestFsDatasetImpl {
+ "volumeMap.", 0, totalNumReplicas);
}
@Test(timeout = 30000)
public void testConcurrentWriteAndDeleteBlock() throws Exception {
// Feed FsDataset with block metadata.
final int numBlocks = 1000;
final int threadCount = 10;
// Generate data blocks.
ExecutorService pool = Executors.newFixedThreadPool(threadCount);
List<Future<?>> futureList = new ArrayList<>();
Random random = new Random();
// Random write block and delete half of them.
for (int i = 0; i < threadCount; i++) {
Thread thread = new Thread() {
@Override
public void run() {
try {
String bpid = BLOCK_POOL_IDS[random.nextInt(BLOCK_POOL_IDS.length)];
for (int blockId = 0; blockId < numBlocks; blockId++) {
ExtendedBlock eb = new ExtendedBlock(bpid, blockId);
ReplicaHandler replica = null;
try {
replica = dataset.createRbw(StorageType.DEFAULT, null, eb,
false);
if (blockId % 2 > 0) {
dataset.invalidate(bpid, new Block[]{eb.getLocalBlock()});
}
} finally {
if (replica != null) {
replica.close();
}
}
}
// Just keep final consistency no need to care exception.
} catch (Exception ignore) {}
}
};
thread.setName("AddBlock" + i);
futureList.add(pool.submit(thread));
}
// Wait for data generation
for (Future<?> f : futureList) {
f.get();
}
for (String bpid : dataset.volumeMap.getBlockPoolList()) {
assertEquals(numBlocks / 2, dataset.volumeMap.size(bpid));
}
}
@Test(timeout = 5000)
public void testRemoveNewlyAddedVolume() throws IOException {
final int numExistingVolumes = getNumVolumes();