From 3f5482bb69e788952afe4b0924d1c7a5a80382a0 Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Tue, 27 Sep 2016 09:10:21 -0700 Subject: [PATCH] HDFS-10828. Fix usage of FsDatasetImpl object lock in ReplicaMap. (Arpit Agarwal) --- .../fsdataset/impl/BlockPoolSlice.java | 3 +- .../fsdataset/impl/FsDatasetImpl.java | 11 ++--- .../datanode/fsdataset/impl/ReplicaMap.java | 41 ++++++++++--------- .../impl/TestInterDatanodeProtocol.java | 3 +- .../fsdataset/impl/TestReplicaMap.java | 3 +- .../fsdataset/impl/TestWriteToReplica.java | 3 +- 6 files changed, 35 insertions(+), 29 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index f6ad03592f4..c00dfdab565 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskChecker.DiskErrorException; @@ -740,7 +741,7 @@ void shutdown(BlockListAsLongs blocksListToPersist) { private boolean readReplicasFromCache(ReplicaMap volumeMap, final RamDiskReplicaTracker lazyWriteReplicaMap) { - ReplicaMap tmpReplicaMap = new ReplicaMap(this); + ReplicaMap tmpReplicaMap = new ReplicaMap(new AutoCloseableLock()); File replicaFile = new File(currentDir, REPLICA_CACHE_FILE); // Check whether the file exists or not. if (!replicaFile.exists()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index d50f3996017..53c648427e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -271,7 +271,8 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) private boolean blockPinningEnabled; private final int maxDataLength; - private final AutoCloseableLock datasetLock; + @VisibleForTesting + final AutoCloseableLock datasetLock; private final Condition datasetLockCondition; /** @@ -313,7 +314,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) } storageMap = new ConcurrentHashMap(); - volumeMap = new ReplicaMap(this); + volumeMap = new ReplicaMap(datasetLock); ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this); @SuppressWarnings("unchecked") @@ -434,7 +435,7 @@ private void addVolume(Collection dataLocations, FsVolumeImpl fsVolume = new FsVolumeImpl( this, sd.getStorageUuid(), dir, this.conf, storageType); FsVolumeReference ref = fsVolume.obtainReference(); - ReplicaMap tempVolumeMap = new ReplicaMap(this); + ReplicaMap tempVolumeMap = new ReplicaMap(datasetLock); fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker); activateVolume(tempVolumeMap, sd, storageType, ref); @@ -468,7 +469,7 @@ public void addVolume(final StorageLocation location, StorageType storageType = location.getStorageType(); final FsVolumeImpl fsVolume = createFsVolume(sd.getStorageUuid(), sd.getCurrentDir(), storageType); - final ReplicaMap tempVolumeMap = new ReplicaMap(fsVolume); + final ReplicaMap tempVolumeMap = new ReplicaMap(new AutoCloseableLock()); ArrayList exceptions = Lists.newArrayList(); for (final NamespaceInfo nsInfo : nsInfos) { @@ -2474,7 +2475,7 @@ static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map, Block block, long recoveryId, long xceiverStopTimeout) throws IOException { while (true) { try { - synchronized (map.getMutex()) { + try (AutoCloseableLock lock = map.getLock().acquire()) { return initReplicaRecoveryImpl(bpid, map, block, recoveryId); } } catch (MustStopExistingWriter e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java index 0d1b7876faa..5705792cefb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java @@ -25,28 +25,29 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.util.LightWeightResizableGSet; +import org.apache.hadoop.util.AutoCloseableLock; /** * Maintains the replica map. */ class ReplicaMap { - // Object using which this class is synchronized - private final Object mutex; + // Lock object to synchronize this instance. + private final AutoCloseableLock lock; // Map of block pool Id to another map of block Id to ReplicaInfo. private final Map> map = new HashMap>(); - ReplicaMap(Object mutex) { - if (mutex == null) { + ReplicaMap(AutoCloseableLock lock) { + if (lock == null) { throw new HadoopIllegalArgumentException( - "Object to synchronize on cannot be null"); + "Lock to synchronize on cannot be null"); } - this.mutex = mutex; + this.lock = lock; } String[] getBlockPoolList() { - synchronized(mutex) { + try (AutoCloseableLock l = lock.acquire()) { return map.keySet().toArray(new String[map.keySet().size()]); } } @@ -91,7 +92,7 @@ ReplicaInfo get(String bpid, Block block) { */ ReplicaInfo get(String bpid, long blockId) { checkBlockPool(bpid); - synchronized(mutex) { + try (AutoCloseableLock l = lock.acquire()) { LightWeightResizableGSet m = map.get(bpid); return m != null ? m.get(new Block(blockId)) : null; } @@ -108,7 +109,7 @@ ReplicaInfo get(String bpid, long blockId) { ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) { checkBlockPool(bpid); checkBlock(replicaInfo); - synchronized(mutex) { + try (AutoCloseableLock l = lock.acquire()) { LightWeightResizableGSet m = map.get(bpid); if (m == null) { // Add an entry for block pool if it does not exist already @@ -137,7 +138,7 @@ void addAll(ReplicaMap other) { ReplicaInfo remove(String bpid, Block block) { checkBlockPool(bpid); checkBlock(block); - synchronized(mutex) { + try (AutoCloseableLock l = lock.acquire()) { LightWeightResizableGSet m = map.get(bpid); if (m != null) { ReplicaInfo replicaInfo = m.get(block); @@ -159,7 +160,7 @@ ReplicaInfo remove(String bpid, Block block) { */ ReplicaInfo remove(String bpid, long blockId) { checkBlockPool(bpid); - synchronized(mutex) { + try (AutoCloseableLock l = lock.acquire()) { LightWeightResizableGSet m = map.get(bpid); if (m != null) { return m.remove(new Block(blockId)); @@ -175,7 +176,7 @@ ReplicaInfo remove(String bpid, long blockId) { */ int size(String bpid) { LightWeightResizableGSet m = null; - synchronized(mutex) { + try (AutoCloseableLock l = lock.acquire()) { m = map.get(bpid); return m != null ? m.size() : 0; } @@ -184,9 +185,9 @@ int size(String bpid) { /** * Get a collection of the replicas for given block pool * This method is not synchronized. It needs to be synchronized - * externally using the mutex, both for getting the replicas + * externally using the lock, both for getting the replicas * values from the map and iterating over it. Mutex can be accessed using - * {@link #getMutext()} method. + * {@link #getLock()} method. * * @param bpid block pool id * @return a collection of the replicas belonging to the block pool @@ -199,7 +200,7 @@ Collection replicas(String bpid) { void initBlockPool(String bpid) { checkBlockPool(bpid); - synchronized(mutex) { + try (AutoCloseableLock l = lock.acquire()) { LightWeightResizableGSet m = map.get(bpid); if (m == null) { // Add an entry for block pool if it does not exist already @@ -211,16 +212,16 @@ void initBlockPool(String bpid) { void cleanUpBlockPool(String bpid) { checkBlockPool(bpid); - synchronized(mutex) { + try (AutoCloseableLock l = lock.acquire()) { map.remove(bpid); } } /** - * Give access to mutex used for synchronizing ReplicasMap - * @return object used as lock + * Get the lock object used for synchronizing ReplicasMap + * @return lock object */ - Object getMutex() { - return mutex; + AutoCloseableLock getLock() { + return lock; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java index c054641611d..4f6db249466 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java @@ -58,6 +58,7 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.AutoCloseableLock; import org.junit.Assert; import org.junit.Test; @@ -234,7 +235,7 @@ public void testInitReplicaRecovery() throws IOException { final long firstblockid = 10000L; final long gs = 7777L; final long length = 22L; - final ReplicaMap map = new ReplicaMap(this); + final ReplicaMap map = new ReplicaMap(new AutoCloseableLock()); String bpid = "BP-TEST"; final Block[] blocks = new Block[5]; for(int i = 0; i < blocks.length; i++) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java index db1cbbc8cb5..4fa91b040e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; +import org.apache.hadoop.util.AutoCloseableLock; import org.junit.Before; import org.junit.Test; @@ -30,7 +31,7 @@ * Unit test for ReplicasMap class */ public class TestReplicaMap { - private final ReplicaMap map = new ReplicaMap(TestReplicaMap.class); + private final ReplicaMap map = new ReplicaMap(new AutoCloseableLock()); private final String bpid = "BP-TEST"; private final Block block = new Block(1234, 1234, 1234); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java index 4ba3d811071..45fcbf22754 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.junit.Assert; import org.junit.Test; @@ -534,7 +535,7 @@ public void testReplicaMapAfterDatanodeRestart() throws Exception { bpList.size() == 2); createReplicas(bpList, volumes, cluster.getFsDatasetTestUtils(dn)); - ReplicaMap oldReplicaMap = new ReplicaMap(this); + ReplicaMap oldReplicaMap = new ReplicaMap(new AutoCloseableLock()); oldReplicaMap.addAll(dataSet.volumeMap); cluster.restartDataNode(0);