From 84e16adab33960725d42c84981829fe8bc4b9d46 Mon Sep 17 00:00:00 2001 From: Stephen O'Donnell Date: Tue, 30 Jun 2020 07:09:26 -0700 Subject: [PATCH] HDFS-15160. ReplicaMap, Disk Balancer, Directory Scanner and various FsDatasetImpl methods should use datanode readlock. Contributed by Stephen O'Donnell. Signed-off-by: Wei-Chiu Chuang (cherry picked from commit 2a67e2b1a0e3a5f91056f5b977ef9c4c07ba6718) --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 + .../hdfs/server/datanode/BlockSender.java | 2 +- .../hadoop/hdfs/server/datanode/DataNode.java | 2 +- .../hdfs/server/datanode/DiskBalancer.java | 2 +- .../datanode/fsdataset/FsDatasetSpi.java | 8 +- .../fsdataset/impl/FsDatasetImpl.java | 67 +++++++----- .../datanode/fsdataset/impl/ReplicaMap.java | 31 ++++-- .../src/main/resources/hdfs-default.xml | 13 +++ .../fsdataset/impl/TestFsDatasetImpl.java | 102 +++++++++++++++++- 9 files changed, 187 insertions(+), 44 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 76784e790a5..b02b5f4f554 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -552,6 +552,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_DATANODE_LOCK_FAIR_KEY = "dfs.datanode.lock.fair"; public static final boolean DFS_DATANODE_LOCK_FAIR_DEFAULT = true; + public static final String DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY = + "dfs.datanode.lock.read.write.enabled"; + public static final Boolean DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT = + true; public static final String DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY = "dfs.datanode.lock-reporting-threshold-ms"; public static final long diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index ad9be88c087..ee7cdb1f90b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -254,7 +254,7 @@ class BlockSender implements java.io.Closeable { // the append write. ChunkChecksum chunkChecksum = null; final long replicaVisibleLength; - try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) { + try(AutoCloseableLock lock = datanode.data.acquireDatasetReadLock()) { replica = getReplica(block, datanode); replicaVisibleLength = replica.getVisibleLength(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index bc9fb13d8be..cda8096a70a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -3010,7 +3010,7 @@ public class DataNode extends ReconfigurableBase final BlockConstructionStage stage; //get replica information - try(AutoCloseableLock lock = data.acquireDatasetLock()) { + try(AutoCloseableLock lock = data.acquireDatasetReadLock()) { Block storedBlock = data.getStoredBlock(b.getBlockPoolId(), b.getBlockId()); if (null == storedBlock) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java index 63f20e84d4d..b99ca3b66d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java @@ -504,7 +504,7 @@ public class DiskBalancer { Map storageIDToVolBasePathMap = new HashMap<>(); FsDatasetSpi.FsVolumeReferences references; try { - try(AutoCloseableLock lock = this.dataset.acquireDatasetLock()) { + try(AutoCloseableLock lock = this.dataset.acquireDatasetReadLock()) { references = this.dataset.getFsVolumeReferences(); for (int ndx = 0; ndx < references.size(); ndx++) { FsVolumeSpi vol = references.get(ndx); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 4de91696fa2..499901dc6a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -657,12 +657,16 @@ public interface FsDatasetSpi extends FSDatasetMBean { FsVolumeSpi destination) throws IOException; /** - * Acquire the lock of the data set. + * Acquire the lock of the data set. This prevents other threads from + * modifying the volume map structure inside the datanode, but other changes + * are still possible. For example modifying the genStamp of a block instance. */ AutoCloseableLock acquireDatasetLock(); /*** - * Acquire the read lock of the data set. + * Acquire the read lock of the data set. This prevents other threads from + * modifying the volume map structure inside the datanode, but other changes + * are still possible. For example modifying the genStamp of a block instance. * @return The AutoClosable read lock instance. */ AutoCloseableLock acquireDatasetReadLock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 32e58f4eb22..7adcd8e2f14 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -41,7 +41,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.locks.Condition; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; @@ -179,7 +178,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override public FsVolumeImpl getVolume(final ExtendedBlock b) { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); return r != null ? (FsVolumeImpl) r.getVolume() : null; @@ -189,7 +188,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public Block getStoredBlock(String bpid, long blkid) throws IOException { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { ReplicaInfo r = volumeMap.get(bpid, blkid); if (r == null) { return null; @@ -206,7 +205,7 @@ class FsDatasetImpl implements FsDatasetSpi { public Set deepCopyReplica(String bpid) throws IOException { Set replicas = null; - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { replicas = new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET : volumeMap.replicas(bpid)); @@ -302,7 +301,20 @@ class FsDatasetImpl implements FsDatasetSpi { DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT, TimeUnit.MILLISECONDS)); this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock()); - this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock()); + boolean enableRL = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, + DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT); + // The read lock can be disabled by the above config key. If it is disabled + // then we simply make the both the read and write lock variables hold + // the write lock. All accesses to the lock are via these variables, so that + // effectively disables the read lock. + if (enableRL) { + LOG.info("The datanode lock is a read write lock"); + this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock()); + } else { + LOG.info("The datanode lock is an exclusive write lock"); + this.datasetReadLock = this.datasetWriteLock; + } this.datasetWriteLockCondition = datasetWriteLock.newCondition(); // The number of volumes required for operation is the total number @@ -342,7 +354,7 @@ class FsDatasetImpl implements FsDatasetSpi { } storageMap = new ConcurrentHashMap(); - volumeMap = new ReplicaMap(datasetRWLock); + volumeMap = new ReplicaMap(datasetReadLock, datasetWriteLock); ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this); @SuppressWarnings("unchecked") @@ -475,7 +487,8 @@ class FsDatasetImpl implements FsDatasetSpi { .setConf(this.conf) .build(); FsVolumeReference ref = fsVolume.obtainReference(); - ReplicaMap tempVolumeMap = new ReplicaMap(datasetRWLock); + ReplicaMap tempVolumeMap = + new ReplicaMap(datasetReadLock, datasetWriteLock); fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker); activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref); @@ -515,7 +528,7 @@ class FsDatasetImpl implements FsDatasetSpi { final FsVolumeImpl fsVolume = createFsVolume(sd.getStorageUuid(), sd, location); final ReplicaMap tempVolumeMap = - new ReplicaMap(new ReentrantReadWriteLock()); + new ReplicaMap(datasetReadLock, datasetWriteLock); ArrayList exceptions = Lists.newArrayList(); for (final NamespaceInfo nsInfo : nsInfos) { @@ -810,7 +823,7 @@ class FsDatasetImpl implements FsDatasetSpi { long seekOffset) throws IOException { ReplicaInfo info; - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); } @@ -898,7 +911,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkOffset, long metaOffset) throws IOException { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { ReplicaInfo info = getReplicaInfo(b); FsVolumeReference ref = info.getVolume().obtainReference(); try { @@ -1023,7 +1036,7 @@ class FsDatasetImpl implements FsDatasetSpi { } FsVolumeReference volumeRef = null; - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId, block.getNumBytes()); } @@ -1137,7 +1150,7 @@ class FsDatasetImpl implements FsDatasetSpi { FsVolumeReference volumeRef = null; - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { volumeRef = destination.obtainReference(); } @@ -1891,7 +1904,7 @@ class FsDatasetImpl implements FsDatasetSpi { new HashMap(); List curVolumes = null; - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { curVolumes = volumes.getVolumes(); for (FsVolumeSpi v : curVolumes) { builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength)); @@ -1954,7 +1967,7 @@ class FsDatasetImpl implements FsDatasetSpi { */ @Override public List getFinalizedBlocks(String bpid) { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { final List finalized = new ArrayList( volumeMap.size(bpid)); for (ReplicaInfo b : volumeMap.replicas(bpid)) { @@ -2047,9 +2060,7 @@ class FsDatasetImpl implements FsDatasetSpi { ReplicaInfo validateBlockFile(String bpid, long blockId) { //Should we check for metadata file too? final ReplicaInfo r; - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { - r = volumeMap.get(bpid, blockId); - } + r = volumeMap.get(bpid, blockId); if (r != null) { if (r.blockDataExists()) { return r; @@ -2292,7 +2303,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public boolean contains(final ExtendedBlock block) { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { final long blockId = block.getLocalBlock().getBlockId(); final String bpid = block.getBlockPoolId(); final ReplicaInfo r = volumeMap.get(bpid, blockId); @@ -2613,7 +2624,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override public String getReplicaString(String bpid, long blockId) { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { final Replica r = volumeMap.get(bpid, blockId); return r == null ? "null" : r.toString(); } @@ -2833,7 +2844,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { final Replica replica = getReplicaInfo(block.getBlockPoolId(), block.getBlockId()); if (replica.getGenerationStamp() < block.getGenerationStamp()) { @@ -2983,18 +2994,20 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block) throws IOException { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { final Replica replica = volumeMap.get(block.getBlockPoolId(), block.getBlockId()); if (replica == null) { throw new ReplicaNotFoundException(block); } - if (replica.getGenerationStamp() < block.getGenerationStamp()) { - throw new IOException( - "Replica generation stamp < block generation stamp, block=" - + block + ", replica=" + replica); - } else if (replica.getGenerationStamp() > block.getGenerationStamp()) { - block.setGenerationStamp(replica.getGenerationStamp()); + synchronized(replica) { + if (replica.getGenerationStamp() < block.getGenerationStamp()) { + throw new IOException( + "Replica generation stamp < block generation stamp, block=" + + block + ", replica=" + replica); + } else if (replica.getGenerationStamp() > block.getGenerationStamp()) { + block.setGenerationStamp(replica.getGenerationStamp()); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java index 25141b847e0..13c55d36888 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java @@ -32,7 +32,6 @@ import org.apache.hadoop.util.AutoCloseableLock; * Maintains the replica map. */ class ReplicaMap { - private final ReadWriteLock rwLock; // Lock object to synchronize this instance. private final AutoCloseableLock readLock; private final AutoCloseableLock writeLock; @@ -41,18 +40,22 @@ class ReplicaMap { private final Map> map = new HashMap<>(); - ReplicaMap(ReadWriteLock lock) { - if (lock == null) { + ReplicaMap(AutoCloseableLock rLock, AutoCloseableLock wLock) { + if (rLock == null || wLock == null) { throw new HadoopIllegalArgumentException( "Lock to synchronize on cannot be null"); } - this.rwLock = lock; - this.readLock = new AutoCloseableLock(rwLock.readLock()); - this.writeLock = new AutoCloseableLock(rwLock.writeLock()); + this.readLock = rLock; + this.writeLock = wLock; + } + + ReplicaMap(ReadWriteLock lock) { + this(new AutoCloseableLock(lock.readLock()), + new AutoCloseableLock(lock.writeLock())); } String[] getBlockPoolList() { - try (AutoCloseableLock l = writeLock.acquire()) { + try (AutoCloseableLock l = readLock.acquire()) { return map.keySet().toArray(new String[map.keySet().size()]); } } @@ -97,7 +100,7 @@ class ReplicaMap { */ ReplicaInfo get(String bpid, long blockId) { checkBlockPool(bpid); - try (AutoCloseableLock l = writeLock.acquire()) { + try (AutoCloseableLock l = readLock.acquire()) { LightWeightResizableGSet m = map.get(bpid); return m != null ? m.get(new Block(blockId)) : null; } @@ -218,7 +221,7 @@ class ReplicaMap { * @return the number of replicas in the map */ int size(String bpid) { - try (AutoCloseableLock l = writeLock.acquire()) { + try (AutoCloseableLock l = readLock.acquire()) { LightWeightResizableGSet m = map.get(bpid); return m != null ? m.size() : 0; } @@ -266,4 +269,14 @@ class ReplicaMap { AutoCloseableLock getLock() { return writeLock; } + + /** + * Get the lock object used for synchronizing the ReplicasMap for read only + * operations. + * @return The read lock object + */ + AutoCloseableLock getReadLock() { + return readLock; + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 2e1dbf6f31c..c36b333270d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -3009,6 +3009,19 @@ + + dfs.datanode.lock.read.write.enabled + true + If this is true, the FsDataset lock will be a read write lock. If + it is false, all locks will be a write lock. + Enabling this should give better datanode throughput, as many read only + functions can run concurrently under the read lock, when they would + previously have required the exclusive write lock. As the feature is + experimental, this switch can be used to disable the shared read lock, and + cause all lock acquisitions to use the exclusive write lock. + + + dfs.datanode.lock-reporting-threshold-ms 300 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 13ffb96f465..d4c4b14ac9b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.util.function.Supplier; import com.google.common.collect.Lists; -import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Paths; import org.apache.commons.io.FileUtils; @@ -65,6 +64,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.FakeTimer; import org.apache.hadoop.util.StringUtils; import org.junit.Assert; @@ -85,6 +85,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; @@ -196,6 +197,101 @@ public class TestFsDatasetImpl { assertEquals(0, dataset.getNumFailedVolumes()); } + @Test + public void testReadLockEnabledByDefault() + throws IOException, InterruptedException { + final FsDatasetSpi ds = dataset; + AtomicBoolean accessed = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(1); + CountDownLatch waiterLatch = new CountDownLatch(1); + + Thread holder = new Thread() { + public void run() { + try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { + latch.countDown(); + sleep(10000); + } catch (Exception e) { + } + } + }; + + Thread waiter = new Thread() { + public void run() { + try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { + waiterLatch.countDown(); + accessed.getAndSet(true); + } catch (Exception e) { + } + } + }; + + holder.start(); + latch.await(); + waiter.start(); + waiterLatch.await(); + // The holder thread is still holding the lock, but the waiter can still + // run as the lock is a shared read lock. + assertEquals(true, accessed.get()); + holder.interrupt(); + holder.join(); + waiter.join(); + } + + @Test(timeout=10000) + public void testReadLockCanBeDisabledByConfig() + throws IOException, InterruptedException { + HdfsConfiguration conf = new HdfsConfiguration(); + conf.setBoolean( + DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, false); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1).build(); + try { + cluster.waitActive(); + DataNode dn = cluster.getDataNodes().get(0); + final FsDatasetSpi ds = DataNodeTestUtils.getFSDataset(dn); + + CountDownLatch latch = new CountDownLatch(1); + CountDownLatch waiterLatch = new CountDownLatch(1); + AtomicBoolean accessed = new AtomicBoolean(false); + + Thread holder = new Thread() { + public void run() { + try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { + latch.countDown(); + sleep(10000); + } catch (Exception e) { + } + } + }; + + Thread waiter = new Thread() { + public void run() { + try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { + accessed.getAndSet(true); + waiterLatch.countDown(); + } catch (Exception e) { + } + } + }; + + holder.start(); + latch.await(); + waiter.start(); + Thread.sleep(200); + // Waiting thread should not have been able to update the variable + // as the read lock is disabled and hence an exclusive lock. + assertEquals(false, accessed.get()); + holder.interrupt(); + holder.join(); + waiterLatch.await(); + // After the holder thread exits, the variable is updated. + assertEquals(true, accessed.get()); + waiter.join(); + } finally { + cluster.shutdown(); + } + } + @Test public void testAddVolumes() throws IOException { final int numNewVolumes = 3; @@ -242,8 +338,8 @@ public class TestFsDatasetImpl { @Test public void testAddVolumeWithSameStorageUuid() throws IOException { - HdfsConfiguration conf = new HdfsConfiguration(); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + HdfsConfiguration config = new HdfsConfiguration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(config) .numDataNodes(1).build(); try { cluster.waitActive();