From 21fc30c1aa7b447d366d8394baca58ec7357e690 Mon Sep 17 00:00:00 2001 From: Hexiaoqiao Date: Tue, 14 Sep 2021 12:36:32 +0800 Subject: [PATCH] Revert " HDFS-15160. ReplicaMap, Disk Balancer, Directory Scanner and various FsDatasetImpl methods should use datanode readlock. (#3200)" This reverts commit 318bc5ff69bc578db1ce95198aa7fa7bc7199320. --- .../util/InstrumentedReadWriteLock.java | 2 +- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 11 -- .../hdfs/server/datanode/BlockSender.java | 2 +- .../hadoop/hdfs/server/datanode/DataNode.java | 2 +- .../hdfs/server/datanode/DiskBalancer.java | 2 +- .../datanode/fsdataset/FsDatasetSpi.java | 12 +- .../fsdataset/impl/BlockPoolSlice.java | 4 +- .../fsdataset/impl/FsDatasetImpl.java | 172 +++++++----------- .../fsdataset/impl/ProvidedVolumeImpl.java | 10 +- .../datanode/fsdataset/impl/ReplicaMap.java | 46 ++--- .../src/main/resources/hdfs-default.xml | 34 ---- .../server/datanode/SimulatedFSDataset.java | 6 - .../extdataset/ExternalDatasetImpl.java | 5 - .../impl/FsDatasetImplTestUtils.java | 2 +- .../fsdataset/impl/TestFsDatasetImpl.java | 120 +----------- .../fsdataset/impl/TestFsVolumeList.java | 4 +- .../impl/TestInterDatanodeProtocol.java | 4 +- .../fsdataset/impl/TestProvidedImpl.java | 8 +- .../fsdataset/impl/TestReplicaMap.java | 9 +- .../fsdataset/impl/TestWriteToReplica.java | 4 +- 20 files changed, 114 insertions(+), 345 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java index 758f1ff87cf..a4105240769 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java @@ -37,7 +37,7 @@ public class InstrumentedReadWriteLock implements ReadWriteLock { private final Lock readLock; private final Lock writeLock; - public InstrumentedReadWriteLock(boolean fair, String name, Logger logger, + InstrumentedReadWriteLock(boolean fair, String name, Logger logger, long minLoggingGapMs, long lockWarningThresholdMs) { ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(fair); readLock = new InstrumentedReadLock(name, logger, readWriteLock, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index b02b5f4f554..563b64c2c09 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -549,17 +549,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.lock.suppress.warning.interval"; public static final long DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT = 10000; //ms - public static final String DFS_DATANODE_LOCK_FAIR_KEY = - "dfs.datanode.lock.fair"; - public static final boolean DFS_DATANODE_LOCK_FAIR_DEFAULT = true; - public static final String DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY = - "dfs.datanode.lock.read.write.enabled"; - public static final Boolean DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT = - true; - public static final String DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY = - "dfs.datanode.lock-reporting-threshold-ms"; - public static final long - DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 300L; public static final String DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor"; public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index ee7cdb1f90b..ad9be88c087 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -254,7 +254,7 @@ class BlockSender implements java.io.Closeable { // the append write. ChunkChecksum chunkChecksum = null; final long replicaVisibleLength; - try(AutoCloseableLock lock = datanode.data.acquireDatasetReadLock()) { + try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) { replica = getReplica(block, datanode); replicaVisibleLength = replica.getVisibleLength(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index cda8096a70a..bc9fb13d8be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -3010,7 +3010,7 @@ public class DataNode extends ReconfigurableBase final BlockConstructionStage stage; //get replica information - try(AutoCloseableLock lock = data.acquireDatasetReadLock()) { + try(AutoCloseableLock lock = data.acquireDatasetLock()) { Block storedBlock = data.getStoredBlock(b.getBlockPoolId(), b.getBlockId()); if (null == storedBlock) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java index b99ca3b66d4..63f20e84d4d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java @@ -504,7 +504,7 @@ public class DiskBalancer { Map storageIDToVolBasePathMap = new HashMap<>(); FsDatasetSpi.FsVolumeReferences references; try { - try(AutoCloseableLock lock = this.dataset.acquireDatasetReadLock()) { + try(AutoCloseableLock lock = this.dataset.acquireDatasetLock()) { references = this.dataset.getFsVolumeReferences(); for (int ndx = 0; ndx < references.size(); ndx++) { FsVolumeSpi vol = references.get(ndx); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 499901dc6a9..78a5cfc9676 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -657,19 +657,9 @@ public interface FsDatasetSpi extends FSDatasetMBean { FsVolumeSpi destination) throws IOException; /** - * Acquire the lock of the data set. This prevents other threads from - * modifying the volume map structure inside the datanode, but other changes - * are still possible. For example modifying the genStamp of a block instance. + * Acquire the lock of the data set. */ AutoCloseableLock acquireDatasetLock(); - /*** - * Acquire the read lock of the data set. This prevents other threads from - * modifying the volume map structure inside the datanode, but other changes - * are still possible. For example modifying the genStamp of a block instance. - * @return The AutoClosable read lock instance. - */ - AutoCloseableLock acquireDatasetReadLock(); - Set deepCopyReplica(String bpid) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index ea775054469..9656b9d80f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -42,7 +42,6 @@ import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveAction; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed; import org.slf4j.Logger; @@ -67,6 +66,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTrack import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MultipleIOException; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum.Type; import org.apache.hadoop.util.DiskChecker; @@ -874,7 +874,7 @@ class BlockPoolSlice { private boolean readReplicasFromCache(ReplicaMap volumeMap, final RamDiskReplicaTracker lazyWriteReplicaMap) { - ReplicaMap tmpReplicaMap = new ReplicaMap(new ReentrantReadWriteLock()); + ReplicaMap tmpReplicaMap = new ReplicaMap(new AutoCloseableLock()); File replicaFile = new File(currentDir, REPLICA_CACHE_FILE); // Check whether the file exists or not. if (!replicaFile.exists()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 045c8cd2eec..3576671392a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -40,6 +40,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.TimeUnit; import javax.management.NotCompliantMBeanException; @@ -111,7 +112,7 @@ import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; -import org.apache.hadoop.util.InstrumentedReadWriteLock; +import org.apache.hadoop.util.InstrumentedLock; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Timer; @@ -178,7 +179,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override public FsVolumeImpl getVolume(final ExtendedBlock b) { - try (AutoCloseableLock lock = datasetReadLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); return r != null ? (FsVolumeImpl) r.getVolume() : null; @@ -188,7 +189,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public Block getStoredBlock(String bpid, long blkid) throws IOException { - try (AutoCloseableLock lock = datasetReadLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { ReplicaInfo r = volumeMap.get(bpid, blkid); if (r == null) { return null; @@ -201,16 +202,12 @@ class FsDatasetImpl implements FsDatasetSpi { * The deepCopyReplica call doesn't use the datasetock since it will lead the * potential deadlock with the {@link FsVolumeList#addBlockPool} call. */ - @SuppressWarnings("unchecked") @Override public Set deepCopyReplica(String bpid) throws IOException { - Set replicas; - try (AutoCloseableLock lock = datasetReadLock.acquire()) { - replicas = - new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET - : volumeMap.replicas(bpid)); - } + Set replicas = + new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET + : volumeMap.replicas(bpid)); return Collections.unmodifiableSet(replicas); } @@ -271,12 +268,8 @@ class FsDatasetImpl implements FsDatasetSpi { private final int maxDataLength; @VisibleForTesting - final AutoCloseableLock datasetWriteLock; - @VisibleForTesting - final AutoCloseableLock datasetReadLock; - @VisibleForTesting - final InstrumentedReadWriteLock datasetRWLock; - private final Condition datasetWriteLockCondition; + final AutoCloseableLock datasetLock; + private final Condition datasetLockCondition; private static String blockPoolId = ""; /** @@ -289,33 +282,15 @@ class FsDatasetImpl implements FsDatasetSpi { this.dataStorage = storage; this.conf = conf; this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf); - this.datasetRWLock = new InstrumentedReadWriteLock( - conf.getBoolean(DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_KEY, - DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_DEFAULT), - "FsDatasetRWLock", LOG, conf.getTimeDuration( - DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY, - DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT, - TimeUnit.MILLISECONDS), - conf.getTimeDuration( - DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY, - DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT, - TimeUnit.MILLISECONDS)); - this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock()); - boolean enableRL = conf.getBoolean( - DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, - DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT); - // The read lock can be disabled by the above config key. If it is disabled - // then we simply make the both the read and write lock variables hold - // the write lock. All accesses to the lock are via these variables, so that - // effectively disables the read lock. - if (enableRL) { - LOG.info("The datanode lock is a read write lock"); - this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock()); - } else { - LOG.info("The datanode lock is an exclusive write lock"); - this.datasetReadLock = this.datasetWriteLock; - } - this.datasetWriteLockCondition = datasetWriteLock.newCondition(); + this.datasetLock = new AutoCloseableLock( + new InstrumentedLock(getClass().getName(), LOG, + new ReentrantLock(true), + conf.getTimeDuration( + DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY, + DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS), + 300)); + this.datasetLockCondition = datasetLock.newCondition(); // The number of volumes required for operation is the total number // of volumes minus the number of failed volumes we can tolerate. @@ -354,7 +329,7 @@ class FsDatasetImpl implements FsDatasetSpi { } storageMap = new ConcurrentHashMap(); - volumeMap = new ReplicaMap(datasetReadLock, datasetWriteLock); + volumeMap = new ReplicaMap(datasetLock); ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this); @SuppressWarnings("unchecked") @@ -408,12 +383,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override public AutoCloseableLock acquireDatasetLock() { - return datasetWriteLock.acquire(); - } - - @Override - public AutoCloseableLock acquireDatasetReadLock() { - return datasetReadLock.acquire(); + return datasetLock.acquire(); } /** @@ -454,7 +424,7 @@ class FsDatasetImpl implements FsDatasetSpi { ReplicaMap replicaMap, Storage.StorageDirectory sd, StorageType storageType, FsVolumeReference ref) throws IOException { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid()); if (dnStorage != null) { final String errorMsg = String.format( @@ -487,8 +457,7 @@ class FsDatasetImpl implements FsDatasetSpi { .setConf(this.conf) .build(); FsVolumeReference ref = fsVolume.obtainReference(); - ReplicaMap tempVolumeMap = - new ReplicaMap(datasetReadLock, datasetWriteLock); + ReplicaMap tempVolumeMap = new ReplicaMap(datasetLock); fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker); activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref); @@ -527,8 +496,7 @@ class FsDatasetImpl implements FsDatasetSpi { StorageType storageType = location.getStorageType(); final FsVolumeImpl fsVolume = createFsVolume(sd.getStorageUuid(), sd, location); - final ReplicaMap tempVolumeMap = - new ReplicaMap(datasetReadLock, datasetWriteLock); + final ReplicaMap tempVolumeMap = new ReplicaMap(new AutoCloseableLock()); ArrayList exceptions = Lists.newArrayList(); for (final NamespaceInfo nsInfo : nsInfos) { @@ -573,7 +541,7 @@ class FsDatasetImpl implements FsDatasetSpi { new ArrayList<>(storageLocsToRemove); Map> blkToInvalidate = new HashMap<>(); List storageToRemove = new ArrayList<>(); - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) { Storage.StorageDirectory sd = dataStorage.getStorageDir(idx); final StorageLocation sdLocation = sd.getStorageLocation(); @@ -585,7 +553,7 @@ class FsDatasetImpl implements FsDatasetSpi { // Disable the volume from the service. asyncDiskService.removeVolume(sd.getStorageUuid()); volumes.removeVolume(sdLocation, clearFailure); - volumes.waitVolumeRemoved(5000, datasetWriteLockCondition); + volumes.waitVolumeRemoved(5000, datasetLockCondition); // Removed all replica information for the blocks on the volume. // Unlike updating the volumeMap in addVolume(), this operation does @@ -632,7 +600,7 @@ class FsDatasetImpl implements FsDatasetSpi { } } - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { for(String storageUuid : storageToRemove) { storageMap.remove(storageUuid); } @@ -823,7 +791,7 @@ class FsDatasetImpl implements FsDatasetSpi { long seekOffset) throws IOException { ReplicaInfo info; - try (AutoCloseableLock lock = datasetReadLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); } @@ -911,7 +879,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkOffset, long metaOffset) throws IOException { - try (AutoCloseableLock lock = datasetReadLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { ReplicaInfo info = getReplicaInfo(b); FsVolumeReference ref = info.getVolume().obtainReference(); try { @@ -1036,7 +1004,7 @@ class FsDatasetImpl implements FsDatasetSpi { } FsVolumeReference volumeRef = null; - try (AutoCloseableLock lock = datasetReadLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId, block.getNumBytes()); } @@ -1150,7 +1118,7 @@ class FsDatasetImpl implements FsDatasetSpi { FsVolumeReference volumeRef = null; - try (AutoCloseableLock lock = datasetReadLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { volumeRef = destination.obtainReference(); } @@ -1238,7 +1206,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public ReplicaHandler append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { // If the block was successfully finalized because all packets // were successfully processed at the Datanode but the ack for // some of the packets were not received by the client. The client @@ -1290,7 +1258,7 @@ class FsDatasetImpl implements FsDatasetSpi { private ReplicaInPipeline append(String bpid, ReplicaInfo replicaInfo, long newGS, long estimateBlockLen) throws IOException { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { // If the block is cached, start uncaching it. if (replicaInfo.getState() != ReplicaState.FINALIZED) { throw new IOException("Only a Finalized replica can be appended to; " @@ -1386,7 +1354,7 @@ class FsDatasetImpl implements FsDatasetSpi { while (true) { try { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); ReplicaInPipeline replica; @@ -1418,7 +1386,7 @@ class FsDatasetImpl implements FsDatasetSpi { LOG.info("Recover failed close " + b); while (true) { try { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { // check replica's state ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); // bump the replica's GS @@ -1440,7 +1408,7 @@ class FsDatasetImpl implements FsDatasetSpi { public ReplicaHandler createRbw( StorageType storageType, String storageId, ExtendedBlock b, boolean allowLazyPersist) throws IOException { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { @@ -1511,7 +1479,7 @@ class FsDatasetImpl implements FsDatasetSpi { while (true) { try { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId()); // check the replica's state @@ -1536,7 +1504,7 @@ class FsDatasetImpl implements FsDatasetSpi { private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw, ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { // check generation stamp long replicaGenerationStamp = rbw.getGenerationStamp(); if (replicaGenerationStamp < b.getGenerationStamp() || @@ -1597,7 +1565,7 @@ class FsDatasetImpl implements FsDatasetSpi { public ReplicaInPipeline convertTemporaryToRbw( final ExtendedBlock b) throws IOException { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { final long blockId = b.getBlockId(); final long expectedGs = b.getGenerationStamp(); final long visible = b.getNumBytes(); @@ -1671,7 +1639,7 @@ class FsDatasetImpl implements FsDatasetSpi { ReplicaInfo lastFoundReplicaInfo = null; boolean isInPipeline = false; do { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { ReplicaInfo currentReplicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (currentReplicaInfo == lastFoundReplicaInfo) { @@ -1724,7 +1692,7 @@ class FsDatasetImpl implements FsDatasetSpi { invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo }, false); } - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b .getNumBytes()); FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); @@ -1775,7 +1743,7 @@ class FsDatasetImpl implements FsDatasetSpi { throws IOException { ReplicaInfo replicaInfo = null; ReplicaInfo finalizedReplicaInfo = null; - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { if (Thread.interrupted()) { // Don't allow data modifications from interrupted threads throw new IOException("Cannot finalize block from Interrupted Thread"); @@ -1806,7 +1774,7 @@ class FsDatasetImpl implements FsDatasetSpi { private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo) throws IOException { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { // Compare generation stamp of old and new replica before finalizing if (volumeMap.get(bpid, replicaInfo.getBlockId()).getGenerationStamp() > replicaInfo.getGenerationStamp()) { @@ -1851,7 +1819,7 @@ class FsDatasetImpl implements FsDatasetSpi { */ @Override // FsDatasetSpi public void unfinalizeBlock(ExtendedBlock b) throws IOException { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); if (replicaInfo != null && @@ -1904,7 +1872,7 @@ class FsDatasetImpl implements FsDatasetSpi { new HashMap(); List curVolumes = null; - try (AutoCloseableLock lock = datasetReadLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { curVolumes = volumes.getVolumes(); for (FsVolumeSpi v : curVolumes) { builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength)); @@ -1959,7 +1927,7 @@ class FsDatasetImpl implements FsDatasetSpi { * Gets a list of references to the finalized blocks for the given block pool. *

* Callers of this function should call - * {@link FsDatasetSpi#acquireDatasetLock()} to avoid blocks' status being + * {@link FsDatasetSpi#acquireDatasetLock} to avoid blocks' status being * changed during list iteration. *

* @return a list of references to the finalized blocks for the given block @@ -1967,7 +1935,7 @@ class FsDatasetImpl implements FsDatasetSpi { */ @Override public List getFinalizedBlocks(String bpid) { - try (AutoCloseableLock lock = datasetReadLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { final List finalized = new ArrayList( volumeMap.size(bpid)); for (ReplicaInfo b : volumeMap.replicas(bpid)) { @@ -2060,7 +2028,9 @@ class FsDatasetImpl implements FsDatasetSpi { ReplicaInfo validateBlockFile(String bpid, long blockId) { //Should we check for metadata file too? final ReplicaInfo r; - r = volumeMap.get(bpid, blockId); + try (AutoCloseableLock lock = datasetLock.acquire()) { + r = volumeMap.get(bpid, blockId); + } if (r != null) { if (r.blockDataExists()) { return r; @@ -2109,7 +2079,7 @@ class FsDatasetImpl implements FsDatasetSpi { for (int i = 0; i < invalidBlks.length; i++) { final ReplicaInfo removing; final FsVolumeImpl v; - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]); if (info == null) { ReplicaInfo infoByBlockId = @@ -2235,7 +2205,7 @@ class FsDatasetImpl implements FsDatasetSpi { long length, genstamp; Executor volumeExecutor; - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { ReplicaInfo info = volumeMap.get(bpid, blockId); boolean success = false; try { @@ -2303,7 +2273,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public boolean contains(final ExtendedBlock block) { - try (AutoCloseableLock lock = datasetReadLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { final long blockId = block.getLocalBlock().getBlockId(); final String bpid = block.getBlockPoolId(); final ReplicaInfo r = volumeMap.get(bpid, blockId); @@ -2423,7 +2393,7 @@ class FsDatasetImpl implements FsDatasetSpi { Block corruptBlock = null; ReplicaInfo memBlockInfo; - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { memBlockInfo = volumeMap.get(bpid, blockId); if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) { @@ -2624,7 +2594,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override public String getReplicaString(String bpid, long blockId) { - try (AutoCloseableLock lock = datasetReadLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { final Replica r = volumeMap.get(bpid, blockId); return r == null ? "null" : r.toString(); } @@ -2731,7 +2701,7 @@ class FsDatasetImpl implements FsDatasetSpi { final long recoveryId, final long newBlockId, final long newlength) throws IOException { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { //get replica final String bpid = oldBlock.getBlockPoolId(); final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId()); @@ -2844,7 +2814,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException { - try (AutoCloseableLock lock = datasetReadLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { final Replica replica = getReplicaInfo(block.getBlockPoolId(), block.getBlockId()); if (replica.getGenerationStamp() < block.getGenerationStamp()) { @@ -2861,7 +2831,7 @@ class FsDatasetImpl implements FsDatasetSpi { throws IOException { LOG.info("Adding block pool " + bpid); AddBlockPoolException volumeExceptions = new AddBlockPoolException(); - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { try { volumes.addBlockPool(bpid, conf); } catch (AddBlockPoolException e) { @@ -2891,7 +2861,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override public void shutdownBlockPool(String bpid) { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { LOG.info("Removing block pool " + bpid); Map blocksPerVolume = getBlockReports(bpid); @@ -2965,7 +2935,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override //FsDatasetSpi public void deleteBlockPool(String bpid, boolean force) throws IOException { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { List curVolumes = volumes.getVolumes(); if (!force) { for (FsVolumeImpl volume : curVolumes) { @@ -2994,20 +2964,18 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block) throws IOException { - try (AutoCloseableLock lock = datasetReadLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { final Replica replica = volumeMap.get(block.getBlockPoolId(), block.getBlockId()); if (replica == null) { throw new ReplicaNotFoundException(block); } - synchronized(replica) { - if (replica.getGenerationStamp() < block.getGenerationStamp()) { - throw new IOException( - "Replica generation stamp < block generation stamp, block=" - + block + ", replica=" + replica); - } else if (replica.getGenerationStamp() > block.getGenerationStamp()) { - block.setGenerationStamp(replica.getGenerationStamp()); - } + if (replica.getGenerationStamp() < block.getGenerationStamp()) { + throw new IOException( + "Replica generation stamp < block generation stamp, block=" + + block + ", replica=" + replica); + } else if (replica.getGenerationStamp() > block.getGenerationStamp()) { + block.setGenerationStamp(replica.getGenerationStamp()); } } @@ -3048,7 +3016,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override public void onCompleteLazyPersist(String bpId, long blockId, long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles); targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length() @@ -3182,7 +3150,7 @@ class FsDatasetImpl implements FsDatasetSpi { try { block = ramDiskReplicaTracker.dequeueNextReplicaToPersist(); if (block != null) { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId()); // If replicaInfo is null, the block was either deleted before @@ -3249,7 +3217,7 @@ class FsDatasetImpl implements FsDatasetSpi { ReplicaInfo replicaInfo, newReplicaInfo; final String bpid = replicaState.getBlockPoolId(); - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(), replicaState.getBlockId()); Preconditions.checkState(replicaInfo.getVolume().isTransientStorage()); @@ -3422,7 +3390,7 @@ class FsDatasetImpl implements FsDatasetSpi { } void stopAllDataxceiverThreads(FsVolumeImpl volume) { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetLock.acquire()) { for (String bpid : volumeMap.getBlockPoolList()) { Collection replicas = volumeMap.replicas(bpid); for (ReplicaInfo replicaInfo : replicas) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java index 341a2f01173..e2d8681e013 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java @@ -28,7 +28,6 @@ import java.util.Set; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -54,10 +53,9 @@ import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder; -import org.apache.hadoop.util.DiskChecker.DiskErrorException; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Timer; +import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hadoop.util.AutoCloseableLock; import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectReader; @@ -65,6 +63,8 @@ import org.codehaus.jackson.map.ObjectWriter; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Time; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES; @@ -135,7 +135,7 @@ class ProvidedVolumeImpl extends FsVolumeImpl { ProvidedBlockPoolSlice(String bpid, ProvidedVolumeImpl volume, Configuration conf) { this.providedVolume = volume; - bpVolumeMap = new ReplicaMap(new ReentrantReadWriteLock()); + bpVolumeMap = new ReplicaMap(new AutoCloseableLock()); Class fmt = conf.getClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS, TextFileRegionAliasMap.class, BlockAliasMap.class); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java index 13c55d36888..53a238c6946 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.locks.ReadWriteLock; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.hdfs.protocol.Block; @@ -33,29 +32,22 @@ import org.apache.hadoop.util.AutoCloseableLock; */ class ReplicaMap { // Lock object to synchronize this instance. - private final AutoCloseableLock readLock; - private final AutoCloseableLock writeLock; + private final AutoCloseableLock lock; // Map of block pool Id to another map of block Id to ReplicaInfo. private final Map> map = new HashMap<>(); - ReplicaMap(AutoCloseableLock rLock, AutoCloseableLock wLock) { - if (rLock == null || wLock == null) { + ReplicaMap(AutoCloseableLock lock) { + if (lock == null) { throw new HadoopIllegalArgumentException( "Lock to synchronize on cannot be null"); } - this.readLock = rLock; - this.writeLock = wLock; - } - - ReplicaMap(ReadWriteLock lock) { - this(new AutoCloseableLock(lock.readLock()), - new AutoCloseableLock(lock.writeLock())); + this.lock = lock; } String[] getBlockPoolList() { - try (AutoCloseableLock l = readLock.acquire()) { + try (AutoCloseableLock l = lock.acquire()) { return map.keySet().toArray(new String[map.keySet().size()]); } } @@ -100,7 +92,7 @@ class ReplicaMap { */ ReplicaInfo get(String bpid, long blockId) { checkBlockPool(bpid); - try (AutoCloseableLock l = readLock.acquire()) { + try (AutoCloseableLock l = lock.acquire()) { LightWeightResizableGSet m = map.get(bpid); return m != null ? m.get(new Block(blockId)) : null; } @@ -117,7 +109,7 @@ class ReplicaMap { ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) { checkBlockPool(bpid); checkBlock(replicaInfo); - try (AutoCloseableLock l = writeLock.acquire()) { + try (AutoCloseableLock l = lock.acquire()) { LightWeightResizableGSet m = map.get(bpid); if (m == null) { // Add an entry for block pool if it does not exist already @@ -135,7 +127,7 @@ class ReplicaMap { ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) { checkBlockPool(bpid); checkBlock(replicaInfo); - try (AutoCloseableLock l = writeLock.acquire()) { + try (AutoCloseableLock l = lock.acquire()) { LightWeightResizableGSet m = map.get(bpid); if (m == null) { // Add an entry for block pool if it does not exist already @@ -184,7 +176,7 @@ class ReplicaMap { ReplicaInfo remove(String bpid, Block block) { checkBlockPool(bpid); checkBlock(block); - try (AutoCloseableLock l = writeLock.acquire()) { + try (AutoCloseableLock l = lock.acquire()) { LightWeightResizableGSet m = map.get(bpid); if (m != null) { ReplicaInfo replicaInfo = m.get(block); @@ -206,7 +198,7 @@ class ReplicaMap { */ ReplicaInfo remove(String bpid, long blockId) { checkBlockPool(bpid); - try (AutoCloseableLock l = writeLock.acquire()) { + try (AutoCloseableLock l = lock.acquire()) { LightWeightResizableGSet m = map.get(bpid); if (m != null) { return m.remove(new Block(blockId)); @@ -221,7 +213,7 @@ class ReplicaMap { * @return the number of replicas in the map */ int size(String bpid) { - try (AutoCloseableLock l = readLock.acquire()) { + try (AutoCloseableLock l = lock.acquire()) { LightWeightResizableGSet m = map.get(bpid); return m != null ? m.size() : 0; } @@ -245,7 +237,7 @@ class ReplicaMap { void initBlockPool(String bpid) { checkBlockPool(bpid); - try (AutoCloseableLock l = writeLock.acquire()) { + try (AutoCloseableLock l = lock.acquire()) { LightWeightResizableGSet m = map.get(bpid); if (m == null) { // Add an entry for block pool if it does not exist already @@ -257,7 +249,7 @@ class ReplicaMap { void cleanUpBlockPool(String bpid) { checkBlockPool(bpid); - try (AutoCloseableLock l = writeLock.acquire()) { + try (AutoCloseableLock l = lock.acquire()) { map.remove(bpid); } } @@ -267,16 +259,6 @@ class ReplicaMap { * @return lock object */ AutoCloseableLock getLock() { - return writeLock; + return lock; } - - /** - * Get the lock object used for synchronizing the ReplicasMap for read only - * operations. - * @return The read lock object - */ - AutoCloseableLock getReadLock() { - return readLock; - } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index c36b333270d..33917400153 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2999,40 +2999,6 @@ - - dfs.datanode.lock.fair - true - If this is true, the Datanode FsDataset lock will be used in Fair - mode, which will help to prevent writer threads from being starved, but can - lower lock throughput. See java.util.concurrent.locks.ReentrantReadWriteLock - for more information on fair/non-fair locks. - - - - - dfs.datanode.lock.read.write.enabled - true - If this is true, the FsDataset lock will be a read write lock. If - it is false, all locks will be a write lock. - Enabling this should give better datanode throughput, as many read only - functions can run concurrently under the read lock, when they would - previously have required the exclusive write lock. As the feature is - experimental, this switch can be used to disable the shared read lock, and - cause all lock acquisitions to use the exclusive write lock. - - - - - dfs.datanode.lock-reporting-threshold-ms - 300 - When thread waits to obtain a lock, or a thread holds a lock for - more than the threshold, a log message will be written. Note that - dfs.lock.suppress.warning.interval ensures a single log message is - emitted per interval for waiting threads and a single message for holding - threads to avoid excessive logging. - - - dfs.namenode.startup.delay.block.deletion.sec 0 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index cb7bbdb5455..8f4e29b6e89 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -1572,12 +1572,6 @@ public class SimulatedFSDataset implements FsDatasetSpi { return datasetLock.acquire(); } - @Override - public AutoCloseableLock acquireDatasetReadLock() { - // No RW lock implementation in simulated dataset currently. - return datasetLock.acquire(); - } - @Override public Set deepCopyReplica(String bpid) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index caaa89c25e0..8fe515f8b75 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -455,11 +455,6 @@ public class ExternalDatasetImpl implements FsDatasetSpi { return null; } - @Override - public AutoCloseableLock acquireDatasetReadLock() { - return null; - } - @Override public Set deepCopyReplica(String bpid) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java index e383e07246b..2d939fad261 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java @@ -434,7 +434,7 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils { @Override public Iterator getStoredReplicas(String bpid) throws IOException { // Reload replicas from the disk. - ReplicaMap replicaMap = new ReplicaMap(dataset.datasetRWLock); + ReplicaMap replicaMap = new ReplicaMap(dataset.datasetLock); try (FsVolumeReferences refs = dataset.getFsVolumeReferences()) { for (FsVolumeSpi vol : refs) { FsVolumeImpl volume = (FsVolumeImpl) vol; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 7809a2df355..13ffb96f465 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -17,10 +17,10 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; -import java.util.concurrent.TimeoutException; import java.util.function.Supplier; import com.google.common.collect.Lists; +import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Paths; import org.apache.commons.io.FileUtils; @@ -65,7 +65,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.LambdaTestUtils; -import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.FakeTimer; import org.apache.hadoop.util.StringUtils; import org.junit.Assert; @@ -86,7 +85,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; @@ -198,118 +196,6 @@ public class TestFsDatasetImpl { assertEquals(0, dataset.getNumFailedVolumes()); } - @Test(timeout=10000) - public void testReadLockEnabledByDefault() - throws Exception { - final FsDatasetSpi ds = dataset; - AtomicBoolean accessed = new AtomicBoolean(false); - CountDownLatch latch = new CountDownLatch(1); - CountDownLatch waiterLatch = new CountDownLatch(1); - - Thread holder = new Thread() { - public void run() { - try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { - latch.countDown(); - // wait for the waiter thread to access the lock. - waiterLatch.await(); - } catch (Exception e) { - } - } - }; - - Thread waiter = new Thread() { - public void run() { - try { - latch.await(); - } catch (InterruptedException e) { - waiterLatch.countDown(); - return; - } - try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { - accessed.getAndSet(true); - // signal the holder thread. - waiterLatch.countDown(); - } catch (Exception e) { - } - } - }; - waiter.start(); - holder.start(); - holder.join(); - waiter.join(); - // The holder thread is still holding the lock, but the waiter can still - // run as the lock is a shared read lock. - // Otherwise test will timeout with deadlock. - assertEquals(true, accessed.get()); - holder.interrupt(); - } - - @Test(timeout=20000) - public void testReadLockCanBeDisabledByConfig() - throws Exception { - HdfsConfiguration conf = new HdfsConfiguration(); - conf.setBoolean( - DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, false); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(1).build(); - try { - AtomicBoolean accessed = new AtomicBoolean(false); - cluster.waitActive(); - DataNode dn = cluster.getDataNodes().get(0); - final FsDatasetSpi ds = DataNodeTestUtils.getFSDataset(dn); - - CountDownLatch latch = new CountDownLatch(1); - CountDownLatch waiterLatch = new CountDownLatch(1); - Thread holder = new Thread() { - public void run() { - try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { - latch.countDown(); - // wait for the waiter thread to access the lock. - waiterLatch.await(); - } catch (Exception e) { - } - } - }; - - Thread waiter = new Thread() { - public void run() { - try { - // Wait for holder to get ds read lock. - latch.await(); - } catch (InterruptedException e) { - waiterLatch.countDown(); - return; - } - try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { - accessed.getAndSet(true); - // signal the holder thread. - waiterLatch.countDown(); - } catch (Exception e) { - } - } - }; - waiter.start(); - holder.start(); - // Wait for sometime to make sure we are in deadlock, - try { - GenericTestUtils.waitFor(() -> - accessed.get(), - 100, 10000); - fail("Waiter thread should not execute."); - } catch (TimeoutException e) { - } - // Release waiterLatch to exit deadlock. - waiterLatch.countDown(); - holder.join(); - waiter.join(); - // After releasing waiterLatch water - // thread will be able to execute. - assertTrue(accessed.get()); - } finally { - cluster.shutdown(); - } - } - @Test public void testAddVolumes() throws IOException { final int numNewVolumes = 3; @@ -356,8 +242,8 @@ public class TestFsDatasetImpl { @Test public void testAddVolumeWithSameStorageUuid() throws IOException { - HdfsConfiguration config = new HdfsConfiguration(); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(config) + HdfsConfiguration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(1).build(); try { cluster.waitActive(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java index 9371a514c57..9db9c0ccd39 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.StringUtils; import org.junit.Before; import org.junit.Test; @@ -52,7 +53,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; -import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DU_RESERVED_PERCENTAGE_KEY; import static org.junit.Assert.assertEquals; @@ -368,7 +368,7 @@ public class TestFsVolumeList { fs.close(); FsDatasetImpl fsDataset = (FsDatasetImpl) cluster.getDataNodes().get(0) .getFSDataset(); - ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock()); + ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock()); RamDiskReplicaTracker ramDiskReplicaMap = RamDiskReplicaTracker .getInstance(conf, fsDataset); FsVolumeImpl vol = (FsVolumeImpl) fsDataset.getFsVolumeReferences().get(0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java index b72b1cd1bb8..86e9f90cc64 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; import java.util.List; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -60,6 +59,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.AutoCloseableLock; import org.junit.Assert; import org.junit.Test; @@ -236,7 +236,7 @@ public class TestInterDatanodeProtocol { final long firstblockid = 10000L; final long gs = 7777L; final long length = 22L; - final ReplicaMap map = new ReplicaMap(new ReentrantReadWriteLock()); + final ReplicaMap map = new ReplicaMap(new AutoCloseableLock()); String bpid = "BP-TEST"; final Block[] blocks = new Block[5]; for(int i = 0; i < blocks.length; i++) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java index ef0a119f9c6..d7935b5cf1a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java @@ -43,7 +43,6 @@ import java.util.Map; import java.util.Optional; import java.util.Random; import java.util.Set; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; @@ -78,6 +77,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.StringUtils; import org.junit.Before; import org.junit.Test; @@ -399,7 +399,7 @@ public class TestProvidedImpl { public void testBlockLoad() throws IOException { for (int i = 0; i < providedVolumes.size(); i++) { FsVolumeImpl vol = providedVolumes.get(i); - ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock()); + ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock()); vol.getVolumeMap(volumeMap, null); assertEquals(vol.getBlockPoolList().length, BLOCK_POOL_IDS.length); @@ -475,7 +475,7 @@ public class TestProvidedImpl { vol.setFileRegionProvider(BLOCK_POOL_IDS[CHOSEN_BP_ID], new TestFileRegionBlockAliasMap(fileRegionIterator, minBlockId, numBlocks)); - ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock()); + ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock()); vol.getVolumeMap(BLOCK_POOL_IDS[CHOSEN_BP_ID], volumeMap, null); totalBlocks += volumeMap.size(BLOCK_POOL_IDS[CHOSEN_BP_ID]); } @@ -585,7 +585,7 @@ public class TestProvidedImpl { public void testProvidedReplicaPrefix() throws Exception { for (int i = 0; i < providedVolumes.size(); i++) { FsVolumeImpl vol = providedVolumes.get(i); - ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock()); + ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock()); vol.getVolumeMap(volumeMap, null); Path expectedPrefix = new Path( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java index 59203bb7d34..1059c08079d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java @@ -23,16 +23,15 @@ import static org.junit.Assert.fail; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; +import org.apache.hadoop.util.AutoCloseableLock; import org.junit.Before; import org.junit.Test; -import java.util.concurrent.locks.ReentrantReadWriteLock; - /** * Unit test for ReplicasMap class */ public class TestReplicaMap { - private final ReplicaMap map = new ReplicaMap(new ReentrantReadWriteLock()); + private final ReplicaMap map = new ReplicaMap(new AutoCloseableLock()); private final String bpid = "BP-TEST"; private final Block block = new Block(1234, 1234, 1234); @@ -112,7 +111,7 @@ public class TestReplicaMap { @Test public void testMergeAll() { - ReplicaMap temReplicaMap = new ReplicaMap(new ReentrantReadWriteLock()); + ReplicaMap temReplicaMap = new ReplicaMap(new AutoCloseableLock()); Block tmpBlock = new Block(5678, 5678, 5678); temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null)); @@ -123,7 +122,7 @@ public class TestReplicaMap { @Test public void testAddAll() { - ReplicaMap temReplicaMap = new ReplicaMap(new ReentrantReadWriteLock()); + ReplicaMap temReplicaMap = new ReplicaMap(new AutoCloseableLock()); Block tmpBlock = new Block(5678, 5678, 5678); temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java index e9393896241..2c5df28e5c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java @@ -27,7 +27,6 @@ import java.io.RandomAccessFile; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; @@ -48,6 +47,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.junit.Assert; import org.junit.Test; @@ -550,7 +550,7 @@ public class TestWriteToReplica { bpList.size() == 2); createReplicas(bpList, volumes, cluster.getFsDatasetTestUtils(dn)); - ReplicaMap oldReplicaMap = new ReplicaMap(new ReentrantReadWriteLock()); + ReplicaMap oldReplicaMap = new ReplicaMap(new AutoCloseableLock()); oldReplicaMap.addAll(dataSet.volumeMap); cluster.restartDataNode(0);