diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java index a4105240769..758f1ff87cf 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java @@ -37,7 +37,7 @@ public class InstrumentedReadWriteLock implements ReadWriteLock { private final Lock readLock; private final Lock writeLock; - InstrumentedReadWriteLock(boolean fair, String name, Logger logger, + public InstrumentedReadWriteLock(boolean fair, String name, Logger logger, long minLoggingGapMs, long lockWarningThresholdMs) { ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(fair); readLock = new InstrumentedReadLock(name, logger, readWriteLock, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index e71ed959319..980ce9bd978 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -452,6 +452,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.lock.suppress.warning.interval"; public static final long DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT = 10000; //ms + public static final String DFS_DATANODE_LOCK_FAIR_KEY = + "dfs.datanode.lock.fair"; + public static final boolean DFS_DATANODE_LOCK_FAIR_DEFAULT = true; + public static final String DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY = + "dfs.datanode.lock-reporting-threshold-ms"; + public static final long + DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 300L; public static final String DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor"; public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 7953d45d866..d29d7722618 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -662,6 +662,12 @@ public interface FsDatasetSpi extends FSDatasetMBean { */ AutoCloseableLock acquireDatasetLock(); + /*** + * Acquire the read lock of the data set. + * @return The AutoClosable read lock instance. + */ + AutoCloseableLock acquireDatasetReadLock(); + /** * Deep copy the replica info belonging to given block pool. * @param bpid Specified block pool id. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index fca01bb69f3..72bfa9a00df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -42,6 +42,7 @@ import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveAction; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -66,7 +67,6 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MultipleIOException; -import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum.Type; import org.apache.hadoop.util.DiskChecker; @@ -849,7 +849,7 @@ class BlockPoolSlice { private boolean readReplicasFromCache(ReplicaMap volumeMap, final RamDiskReplicaTracker lazyWriteReplicaMap) { - ReplicaMap tmpReplicaMap = new ReplicaMap(new AutoCloseableLock()); + ReplicaMap tmpReplicaMap = new ReplicaMap(new ReentrantReadWriteLock()); File replicaFile = new File(currentDir, REPLICA_CACHE_FILE); // Check whether the file exists or not. if (!replicaFile.exists()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index a6f95f458e9..9977b96d6b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -89,7 +89,7 @@ import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; -import org.apache.hadoop.util.InstrumentedLock; +import org.apache.hadoop.util.InstrumentedReadWriteLock; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Timer; @@ -125,7 +125,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /************************************************** * FSDataset manages a set of data blocks. Each block @@ -179,7 +179,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override public FsVolumeImpl getVolume(final ExtendedBlock b) { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); return r != null ? (FsVolumeImpl) r.getVolume() : null; @@ -189,7 +189,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public Block getStoredBlock(String bpid, long blkid) throws IOException { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { File blockfile = null; ReplicaInfo info = volumeMap.get(bpid, blkid); @@ -210,7 +210,7 @@ class FsDatasetImpl implements FsDatasetSpi { public Set deepCopyReplica(String bpid) throws IOException { Set replicas = null; - try (AutoCloseableLock lock = datasetLock.acquire()) { + try (AutoCloseableLock lock = datasetWriteLock.acquire()) { replicas = new HashSet<>(volumeMap.replicas(bpid) == null ? Collections. EMPTY_SET : volumeMap.replicas(bpid)); } @@ -250,7 +250,7 @@ class FsDatasetImpl implements FsDatasetSpi { return null; } - try (AutoCloseableLock lock = datasetLock.acquire()) { + try (AutoCloseableLock lock = datasetWriteLock.acquire()) { final ReplicaInfo replicaInfo = getReplicaInfo(b); if (replicaInfo != null) { volume = replicaInfo.getVolume(); @@ -294,8 +294,12 @@ class FsDatasetImpl implements FsDatasetSpi { private final int maxDataLength; @VisibleForTesting - final AutoCloseableLock datasetLock; - private final Condition datasetLockCondition; + final AutoCloseableLock datasetWriteLock; + @VisibleForTesting + final AutoCloseableLock datasetReadLock; + @VisibleForTesting + final InstrumentedReadWriteLock datasetRWLock; + private final Condition datasetWriteLockCondition; /** * An FSDataset has a directory where it loads its data files. @@ -307,15 +311,20 @@ class FsDatasetImpl implements FsDatasetSpi { this.dataStorage = storage; this.conf = conf; this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf); - this.datasetLock = new AutoCloseableLock( - new InstrumentedLock(getClass().getName(), LOG, - new ReentrantLock(true), - conf.getTimeDuration( - DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY, - DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT, - TimeUnit.MILLISECONDS), - 300)); - this.datasetLockCondition = datasetLock.newCondition(); + this.datasetRWLock = new InstrumentedReadWriteLock( + conf.getBoolean(DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_KEY, + DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_DEFAULT), + "FsDatasetRWLock", LOG, conf.getTimeDuration( + DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY, + DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS), + conf.getTimeDuration( + DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY, + DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT, + TimeUnit.MILLISECONDS)); + this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock()); + this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock()); + this.datasetWriteLockCondition = datasetWriteLock.newCondition(); // The number of volumes required for operation is the total number // of volumes minus the number of failed volumes we can tolerate. @@ -354,7 +363,7 @@ class FsDatasetImpl implements FsDatasetSpi { } storageMap = new ConcurrentHashMap(); - volumeMap = new ReplicaMap(datasetLock); + volumeMap = new ReplicaMap(datasetRWLock); ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this); @SuppressWarnings("unchecked") @@ -444,7 +453,7 @@ class FsDatasetImpl implements FsDatasetSpi { ReplicaMap replicaMap, Storage.StorageDirectory sd, StorageType storageType, FsVolumeReference ref) throws IOException { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid()); if (dnStorage != null) { final String errorMsg = String.format( @@ -475,7 +484,7 @@ class FsDatasetImpl implements FsDatasetSpi { FsVolumeImpl fsVolume = new FsVolumeImpl( this, sd.getStorageUuid(), dir, this.conf, storageType); FsVolumeReference ref = fsVolume.obtainReference(); - ReplicaMap tempVolumeMap = new ReplicaMap(datasetLock); + ReplicaMap tempVolumeMap = new ReplicaMap(datasetRWLock); fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker); activateVolume(tempVolumeMap, sd, storageType, ref); @@ -509,7 +518,8 @@ class FsDatasetImpl implements FsDatasetSpi { StorageType storageType = location.getStorageType(); final FsVolumeImpl fsVolume = createFsVolume(sd.getStorageUuid(), sd.getCurrentDir(), storageType); - final ReplicaMap tempVolumeMap = new ReplicaMap(new AutoCloseableLock()); + final ReplicaMap tempVolumeMap = + new ReplicaMap(new ReentrantReadWriteLock()); ArrayList exceptions = Lists.newArrayList(); for (final NamespaceInfo nsInfo : nsInfos) { @@ -542,7 +552,8 @@ class FsDatasetImpl implements FsDatasetSpi { /** * Removes a set of volumes from FsDataset. - * @param volumesToRemove a set of absolute root path of each volume. + * @param storageLocsToRemove a set of + * {@link StorageLocation}s for each volume. * @param clearFailure set true to clear failure information. */ @Override @@ -558,7 +569,7 @@ class FsDatasetImpl implements FsDatasetSpi { Map> blkToInvalidate = new HashMap<>(); List storageToRemove = new ArrayList<>(); - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) { Storage.StorageDirectory sd = dataStorage.getStorageDir(idx); final File absRoot = sd.getRoot().getAbsoluteFile(); @@ -568,7 +579,7 @@ class FsDatasetImpl implements FsDatasetSpi { // Disable the volume from the service. asyncDiskService.removeVolume(sd.getCurrentDir()); volumes.removeVolume(absRoot, clearFailure); - volumes.waitVolumeRemoved(5000, datasetLockCondition); + volumes.waitVolumeRemoved(5000, datasetWriteLockCondition); // Removed all replica information for the blocks on the volume. // Unlike updating the volumeMap in addVolume(), this operation does @@ -616,7 +627,7 @@ class FsDatasetImpl implements FsDatasetSpi { } } - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { for(String storageUuid : storageToRemove) { storageMap.remove(storageUuid); } @@ -814,7 +825,7 @@ class FsDatasetImpl implements FsDatasetSpi { public InputStream getBlockInputStream(ExtendedBlock b, long seekOffset) throws IOException { ReplicaInfo info; - try (AutoCloseableLock lock = datasetLock.acquire()) { + try (AutoCloseableLock lock = datasetWriteLock.acquire()) { info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); } @@ -904,7 +915,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkOffset, long metaOffset) throws IOException { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { final ReplicaInfo info = getReplicaInfo(b); final FileIoProvider fileIoProvider = datanode.getFileIoProvider(); FsVolumeReference ref = info.getVolume().obtainReference(); @@ -1026,7 +1037,7 @@ class FsDatasetImpl implements FsDatasetSpi { } FsVolumeReference volumeRef = null; - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { volumeRef = volumes.getNextVolume(targetStorageType, block.getNumBytes()); } try { @@ -1045,7 +1056,7 @@ class FsDatasetImpl implements FsDatasetSpi { newReplicaInfo.setNumBytes(blockFiles[1].length()); // Finalize the copied files newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo); - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { // Increment numBlocks here as this block moved without knowing to BPS FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume(); volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks(); @@ -1190,7 +1201,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public ReplicaHandler append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { // If the block was successfully finalized because all packets // were successfully processed at the Datanode but the ack for // some of the packets were not received by the client. The client @@ -1242,7 +1253,7 @@ class FsDatasetImpl implements FsDatasetSpi { private ReplicaBeingWritten append(String bpid, FinalizedReplica replicaInfo, long newGS, long estimateBlockLen) throws IOException { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { // If the block is cached, start uncaching it. cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId()); @@ -1378,7 +1389,7 @@ class FsDatasetImpl implements FsDatasetSpi { while (true) { try { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); @@ -1410,7 +1421,7 @@ class FsDatasetImpl implements FsDatasetSpi { LOG.info("Recover failed close " + b); while (true) { try { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { // check replica's state ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); // bump the replica's GS @@ -1461,7 +1472,7 @@ class FsDatasetImpl implements FsDatasetSpi { public ReplicaHandler createRbw( StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) throws IOException { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { @@ -1527,7 +1538,7 @@ class FsDatasetImpl implements FsDatasetSpi { while (true) { try { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId()); // check the replica's state @@ -1551,7 +1562,7 @@ class FsDatasetImpl implements FsDatasetSpi { private ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw, ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { // check generation stamp long replicaGenerationStamp = rbw.getGenerationStamp(); if (replicaGenerationStamp < b.getGenerationStamp() || @@ -1613,7 +1624,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public ReplicaInPipeline convertTemporaryToRbw( final ExtendedBlock b) throws IOException { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { final long blockId = b.getBlockId(); final long expectedGs = b.getGenerationStamp(); final long visible = b.getNumBytes(); @@ -1687,7 +1698,7 @@ class FsDatasetImpl implements FsDatasetSpi { ReplicaInfo lastFoundReplicaInfo = null; boolean isInPipeline = false; do { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { ReplicaInfo currentReplicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (currentReplicaInfo == lastFoundReplicaInfo) { @@ -1733,7 +1744,7 @@ class FsDatasetImpl implements FsDatasetSpi { invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo }, false); } - try (AutoCloseableLock lock = datasetLock.acquire()) { + try (AutoCloseableLock lock = datasetWriteLock.acquire()) { FsVolumeReference ref = volumes.getNextVolume(storageType, b .getNumBytes()); FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); @@ -1787,7 +1798,7 @@ class FsDatasetImpl implements FsDatasetSpi { throws IOException { ReplicaInfo replicaInfo = null; ReplicaInfo finalizedReplicaInfo = null; - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { if (Thread.interrupted()) { // Don't allow data modifications from interrupted threads throw new IOException("Cannot finalize block from Interrupted Thread"); @@ -1817,7 +1828,7 @@ class FsDatasetImpl implements FsDatasetSpi { private FinalizedReplica finalizeReplica(String bpid, ReplicaInfo replicaInfo) throws IOException { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { FinalizedReplica newReplicaInfo = null; if (replicaInfo.getState() == ReplicaState.RUR && ((ReplicaUnderRecovery) replicaInfo).getOriginalReplica().getState() @@ -1871,7 +1882,7 @@ class FsDatasetImpl implements FsDatasetSpi { */ @Override // FsDatasetSpi public void unfinalizeBlock(ExtendedBlock b) throws IOException { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); if (replicaInfo != null @@ -1926,7 +1937,7 @@ class FsDatasetImpl implements FsDatasetSpi { new HashMap(); List curVolumes = null; - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { curVolumes = volumes.getVolumes(); for (FsVolumeSpi v : curVolumes) { builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength)); @@ -1983,7 +1994,7 @@ class FsDatasetImpl implements FsDatasetSpi { * Gets a list of references to the finalized blocks for the given block pool. *

* Callers of this function should call - * {@link FsDatasetSpi#acquireDatasetLock} to avoid blocks' status being + * {@link FsDatasetSpi#acquireDatasetLock()} to avoid blocks' status being * changed during list iteration. *

* @return a list of references to the finalized blocks for the given block @@ -1991,7 +2002,7 @@ class FsDatasetImpl implements FsDatasetSpi { */ @Override public List getFinalizedBlocks(String bpid) { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { final ArrayList finalized = new ArrayList(volumeMap.size(bpid)); for (ReplicaInfo b : volumeMap.replicas(bpid)) { @@ -2077,7 +2088,7 @@ class FsDatasetImpl implements FsDatasetSpi { //Should we check for metadata file too? File f = null; ReplicaInfo info; - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { info = volumeMap.get(bpid, blockId); if (info != null) { f = info.getBlockFile(); @@ -2135,7 +2146,7 @@ class FsDatasetImpl implements FsDatasetSpi { for (int i = 0; i < invalidBlks.length; i++) { final File f; final FsVolumeImpl v; - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]); if (info == null) { ReplicaInfo infoByBlockId = @@ -2258,7 +2269,7 @@ class FsDatasetImpl implements FsDatasetSpi { long length, genstamp; Executor volumeExecutor; - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { ReplicaInfo info = volumeMap.get(bpid, blockId); boolean success = false; try { @@ -2326,7 +2337,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public boolean contains(final ExtendedBlock block) { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { final long blockId = block.getLocalBlock().getBlockId(); return getFile(block.getBlockPoolId(), blockId, false) != null; } @@ -2456,7 +2467,7 @@ class FsDatasetImpl implements FsDatasetSpi { File diskMetaFile, FsVolumeSpi vol) throws IOException { Block corruptBlock = null; ReplicaInfo memBlockInfo; - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { memBlockInfo = volumeMap.get(bpid, blockId); if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) { // Block is not finalized - ignore the difference @@ -2621,7 +2632,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override public String getReplicaString(String bpid, long blockId) { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { final Replica r = volumeMap.get(bpid, blockId); return r == null ? "null" : r.toString(); } @@ -2726,7 +2737,7 @@ class FsDatasetImpl implements FsDatasetSpi { final long recoveryId, final long newBlockId, final long newlength) throws IOException { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { //get replica final String bpid = oldBlock.getBlockPoolId(); final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId()); @@ -2868,7 +2879,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { final Replica replica = getReplicaInfo(block.getBlockPoolId(), block.getBlockId()); if (replica.getGenerationStamp() < block.getGenerationStamp()) { @@ -2884,7 +2895,7 @@ class FsDatasetImpl implements FsDatasetSpi { public void addBlockPool(String bpid, Configuration conf) throws IOException { LOG.info("Adding block pool " + bpid); - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { volumes.addBlockPool(bpid, conf); volumeMap.initBlockPool(bpid); } @@ -2893,7 +2904,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override public void shutdownBlockPool(String bpid) { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { LOG.info("Removing block pool " + bpid); Map blocksPerVolume = getBlockReports(bpid); @@ -2967,7 +2978,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override //FsDatasetSpi public void deleteBlockPool(String bpid, boolean force) throws IOException { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { List curVolumes = volumes.getVolumes(); if (!force) { for (FsVolumeImpl volume : curVolumes) { @@ -2996,7 +3007,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block) throws IOException { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { final Replica replica = volumeMap.get(block.getBlockPoolId(), block.getBlockId()); if (replica == null) { @@ -3089,7 +3100,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override public void onCompleteLazyPersist(String bpId, long blockId, long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles); targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length() @@ -3223,7 +3234,7 @@ class FsDatasetImpl implements FsDatasetSpi { try { block = ramDiskReplicaTracker.dequeueNextReplicaToPersist(); if (block != null) { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId()); // If replicaInfo is null, the block was either deleted before @@ -3293,7 +3304,7 @@ class FsDatasetImpl implements FsDatasetSpi { long blockFileUsed, metaFileUsed; final String bpid = replicaState.getBlockPoolId(); - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(), replicaState.getBlockId()); Preconditions.checkState(replicaInfo.getVolume().isTransientStorage()); @@ -3405,7 +3416,12 @@ class FsDatasetImpl implements FsDatasetSpi { @Override public AutoCloseableLock acquireDatasetLock() { - return datasetLock.acquire(); + return datasetWriteLock.acquire(); + } + + @Override + public AutoCloseableLock acquireDatasetReadLock() { + return datasetReadLock.acquire(); } public void removeDeletedBlocks(String bpid, Set blockIds) { @@ -3495,7 +3511,7 @@ class FsDatasetImpl implements FsDatasetSpi { } void stopAllDataxceiverThreads(FsVolumeImpl volume) { - try(AutoCloseableLock lock = datasetLock.acquire()) { + try(AutoCloseableLock lock = datasetWriteLock.acquire()) { for (String blockPoolId : volumeMap.getBlockPoolList()) { Collection replicas = volumeMap.replicas(blockPoolId); for (ReplicaInfo replicaInfo : replicas) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java index 97100b51a3e..adda9cf1e2d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.locks.ReadWriteLock; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.hdfs.protocol.Block; @@ -31,23 +32,27 @@ import org.apache.hadoop.util.AutoCloseableLock; * Maintains the replica map. */ class ReplicaMap { + private final ReadWriteLock rwLock; // Lock object to synchronize this instance. - private final AutoCloseableLock lock; + private final AutoCloseableLock readLock; + private final AutoCloseableLock writeLock; // Map of block pool Id to another map of block Id to ReplicaInfo. private final Map> map = new HashMap>(); - ReplicaMap(AutoCloseableLock lock) { + ReplicaMap(ReadWriteLock lock) { if (lock == null) { throw new HadoopIllegalArgumentException( "Lock to synchronize on cannot be null"); } - this.lock = lock; + this.rwLock = lock; + this.readLock = new AutoCloseableLock(rwLock.readLock()); + this.writeLock = new AutoCloseableLock(rwLock.writeLock()); } String[] getBlockPoolList() { - try (AutoCloseableLock l = lock.acquire()) { + try (AutoCloseableLock l = writeLock.acquire()) { return map.keySet().toArray(new String[map.keySet().size()]); } } @@ -92,7 +97,7 @@ class ReplicaMap { */ ReplicaInfo get(String bpid, long blockId) { checkBlockPool(bpid); - try (AutoCloseableLock l = lock.acquire()) { + try (AutoCloseableLock l = writeLock.acquire()) { LightWeightResizableGSet m = map.get(bpid); return m != null ? m.get(new Block(blockId)) : null; } @@ -109,7 +114,7 @@ class ReplicaMap { ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) { checkBlockPool(bpid); checkBlock(replicaInfo); - try (AutoCloseableLock l = lock.acquire()) { + try (AutoCloseableLock l = writeLock.acquire()) { LightWeightResizableGSet m = map.get(bpid); if (m == null) { // Add an entry for block pool if it does not exist already @@ -127,7 +132,7 @@ class ReplicaMap { ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) { checkBlockPool(bpid); checkBlock(replicaInfo); - try (AutoCloseableLock l = lock.acquire()) { + try (AutoCloseableLock l = writeLock.acquire()) { LightWeightResizableGSet m = map.get(bpid); if (m == null) { // Add an entry for block pool if it does not exist already @@ -174,7 +179,7 @@ class ReplicaMap { ReplicaInfo remove(String bpid, Block block) { checkBlockPool(bpid); checkBlock(block); - try (AutoCloseableLock l = lock.acquire()) { + try (AutoCloseableLock l = writeLock.acquire()) { LightWeightResizableGSet m = map.get(bpid); if (m != null) { ReplicaInfo replicaInfo = m.get(block); @@ -196,7 +201,7 @@ class ReplicaMap { */ ReplicaInfo remove(String bpid, long blockId) { checkBlockPool(bpid); - try (AutoCloseableLock l = lock.acquire()) { + try (AutoCloseableLock l = writeLock.acquire()) { LightWeightResizableGSet m = map.get(bpid); if (m != null) { return m.remove(new Block(blockId)); @@ -212,7 +217,7 @@ class ReplicaMap { */ int size(String bpid) { LightWeightResizableGSet m = null; - try (AutoCloseableLock l = lock.acquire()) { + try (AutoCloseableLock l = writeLock.acquire()) { m = map.get(bpid); return m != null ? m.size() : 0; } @@ -236,7 +241,7 @@ class ReplicaMap { void initBlockPool(String bpid) { checkBlockPool(bpid); - try (AutoCloseableLock l = lock.acquire()) { + try (AutoCloseableLock l = writeLock.acquire()) { LightWeightResizableGSet m = map.get(bpid); if (m == null) { // Add an entry for block pool if it does not exist already @@ -248,7 +253,7 @@ class ReplicaMap { void cleanUpBlockPool(String bpid) { checkBlockPool(bpid); - try (AutoCloseableLock l = lock.acquire()) { + try (AutoCloseableLock l = writeLock.acquire()) { map.remove(bpid); } } @@ -258,6 +263,6 @@ class ReplicaMap { * @return lock object */ AutoCloseableLock getLock() { - return lock; + return writeLock; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index c74dddba58e..2cbd48675cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -3036,6 +3036,27 @@ + + dfs.datanode.lock.fair + true + If this is true, the Datanode FsDataset lock will be used in Fair + mode, which will help to prevent writer threads from being starved, but can + lower lock throughput. See java.util.concurrent.locks.ReentrantReadWriteLock + for more information on fair/non-fair locks. + + + + + dfs.datanode.lock-reporting-threshold-ms + 300 + When thread waits to obtain a lock, or a thread holds a lock for + more than the threshold, a log message will be written. Note that + dfs.lock.suppress.warning.interval ensures a single log message is + emitted per interval for waiting threads and a single message for holding + threads to avoid excessive logging. + + + dfs.namenode.startup.delay.block.deletion.sec 0 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 2a5cfe08659..a36a9a2feda 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -1528,6 +1528,12 @@ public class SimulatedFSDataset implements FsDatasetSpi { return datasetLock.acquire(); } + @Override + public AutoCloseableLock acquireDatasetReadLock() { + // No RW lock implementation in simulated dataset currently. + return datasetLock.acquire(); + } + @Override public Set deepCopyReplica(String bpid) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 3635daaea99..1f03f5084bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -450,6 +450,11 @@ public class ExternalDatasetImpl implements FsDatasetSpi { return null; } + @Override + public AutoCloseableLock acquireDatasetReadLock() { + return null; + } + @Override public Set deepCopyReplica(String bpid) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java index 9b7ac3d5dd1..09fddbb4f01 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java @@ -442,7 +442,7 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils { @Override public Iterator getStoredReplicas(String bpid) throws IOException { // Reload replicas from the disk. - ReplicaMap replicaMap = new ReplicaMap(dataset.datasetLock); + ReplicaMap replicaMap = new ReplicaMap(dataset.datasetRWLock); try (FsVolumeReferences refs = dataset.getFsVolumeReferences()) { for (FsVolumeSpi vol : refs) { FsVolumeImpl volume = (FsVolumeImpl) vol; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java index 92a0bd780e6..a78b1c03079 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.StringUtils; import org.junit.Before; import org.junit.Test; @@ -48,6 +47,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DU_RESERVED_PERCENTAGE_KEY; import static org.junit.Assert.assertEquals; @@ -291,7 +291,7 @@ public class TestFsVolumeList { fs.close(); FsDatasetImpl fsDataset = (FsDatasetImpl) cluster.getDataNodes().get(0) .getFSDataset(); - ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock()); + ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock()); RamDiskReplicaTracker ramDiskReplicaMap = RamDiskReplicaTracker .getInstance(conf, fsDataset); FsVolumeImpl vol = (FsVolumeImpl) fsDataset.getFsVolumeReferences().get(0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java index 86e9f90cc64..b72b1cd1bb8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -59,7 +60,6 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.util.AutoCloseableLock; import org.junit.Assert; import org.junit.Test; @@ -236,7 +236,7 @@ public class TestInterDatanodeProtocol { final long firstblockid = 10000L; final long gs = 7777L; final long length = 22L; - final ReplicaMap map = new ReplicaMap(new AutoCloseableLock()); + final ReplicaMap map = new ReplicaMap(new ReentrantReadWriteLock()); String bpid = "BP-TEST"; final Block[] blocks = new Block[5]; for(int i = 0; i < blocks.length; i++) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java index 1059c08079d..4f2472dc2bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java @@ -17,13 +17,14 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; -import org.apache.hadoop.util.AutoCloseableLock; import org.junit.Before; import org.junit.Test; @@ -31,7 +32,7 @@ import org.junit.Test; * Unit test for ReplicasMap class */ public class TestReplicaMap { - private final ReplicaMap map = new ReplicaMap(new AutoCloseableLock()); + private final ReplicaMap map = new ReplicaMap(new ReentrantReadWriteLock()); private final String bpid = "BP-TEST"; private final Block block = new Block(1234, 1234, 1234); @@ -111,7 +112,7 @@ public class TestReplicaMap { @Test public void testMergeAll() { - ReplicaMap temReplicaMap = new ReplicaMap(new AutoCloseableLock()); + ReplicaMap temReplicaMap = new ReplicaMap(new ReentrantReadWriteLock()); Block tmpBlock = new Block(5678, 5678, 5678); temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null)); @@ -122,7 +123,7 @@ public class TestReplicaMap { @Test public void testAddAll() { - ReplicaMap temReplicaMap = new ReplicaMap(new AutoCloseableLock()); + ReplicaMap temReplicaMap = new ReplicaMap(new ReentrantReadWriteLock()); Block tmpBlock = new Block(5678, 5678, 5678); temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java index f6b16881200..d37990366f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java @@ -26,6 +26,7 @@ import java.io.RandomAccessFile; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; @@ -46,7 +47,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.junit.Assert; import org.junit.Test; @@ -545,7 +545,7 @@ public class TestWriteToReplica { bpList.size() == 2); createReplicas(bpList, volumes, cluster.getFsDatasetTestUtils(dn)); - ReplicaMap oldReplicaMap = new ReplicaMap(new AutoCloseableLock()); + ReplicaMap oldReplicaMap = new ReplicaMap(new ReentrantReadWriteLock()); oldReplicaMap.addAll(dataSet.volumeMap); cluster.restartDataNode(0);