From 1fe08c919a6e12359b15203e3774bc94f27013b1 Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Wed, 17 Aug 2016 16:22:00 -0700 Subject: [PATCH] HDFS-10682. Replace FsDatasetImpl object lock with a separate lock object. (Contributed by Chen Liang) --- .../hdfs/server/datanode/BlockSender.java | 3 +- .../hadoop/hdfs/server/datanode/DataNode.java | 3 +- .../server/datanode/DirectoryScanner.java | 3 +- .../datanode/fsdataset/FsDatasetSpi.java | 6 + .../fsdataset/impl/FsDatasetImpl.java | 931 ++++++++++-------- .../datanode/fsdataset/impl/FsVolumeImpl.java | 9 +- .../server/datanode/SimulatedFSDataset.java | 9 + .../server/datanode/TestBlockRecovery.java | 3 +- .../server/datanode/TestDirectoryScanner.java | 9 +- .../extdataset/ExternalDatasetImpl.java | 6 + 10 files changed, 535 insertions(+), 447 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index 6a35fcedcbb..7171e73c900 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -46,6 +46,7 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.net.SocketOutputStream; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.DataChecksum; import org.apache.htrace.core.Sampler; import org.apache.htrace.core.TraceScope; @@ -240,7 +241,7 @@ class BlockSender implements java.io.Closeable { final Replica replica; final long replicaVisibleLength; - synchronized(datanode.data) { + try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) { replica = getReplica(block, datanode); replicaVisibleLength = replica.getVisibleLength(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 9aef56e75d1..7501b7ea16c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -196,6 +196,7 @@ import org.apache.hadoop.tracing.TraceAdminProtocolPB; import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB; import org.apache.hadoop.tracing.TraceUtils; import org.apache.hadoop.tracing.TracerConfigurationManager; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskChecker.DiskErrorException; @@ -2817,7 +2818,7 @@ public class DataNode extends ReconfigurableBase final BlockConstructionStage stage; //get replica information - synchronized(data) { + try(AutoCloseableLock lock = data.acquireDatasetLock()) { Block storedBlock = data.getStoredBlock(b.getBlockPoolId(), b.getBlockId()); if (null == storedBlock) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index 5da0eafd1b5..6fb3a198749 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -43,6 +43,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -581,7 +582,7 @@ public class DirectoryScanner implements Runnable { Map diskReport = getDiskReport(); // Hold FSDataset lock to prevent further changes to the block map - synchronized(dataset) { + try(AutoCloseableLock lock = dataset.acquireDatasetLock()) { for (Entry entry : diskReport.entrySet()) { String bpid = entry.getKey(); ScanInfo[] blockpoolReport = entry.getValue(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 788b75b5f5f..ac3c5b4c56a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.ReflectionUtils; /** @@ -641,4 +642,9 @@ public interface FsDatasetSpi extends FSDatasetMBean { * Confirm whether the block is deleting */ boolean isDeletingBlock(String bpid, long blockId); + + /** + * Acquire the lock of the dataset. + */ + AutoCloseableLock acquireDatasetLock(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index aa415e54653..de5b603e796 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; @@ -178,21 +179,26 @@ class FsDatasetImpl implements FsDatasetSpi { } @Override - public synchronized FsVolumeImpl getVolume(final ExtendedBlock b) { - final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); - return r != null? (FsVolumeImpl)r.getVolume(): null; + public FsVolumeImpl getVolume(final ExtendedBlock b) { + try(AutoCloseableLock lock = datasetLock.acquire()) { + final ReplicaInfo r = + volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); + return r != null ? (FsVolumeImpl) r.getVolume() : null; + } } @Override // FsDatasetSpi - public synchronized Block getStoredBlock(String bpid, long blkid) + public Block getStoredBlock(String bpid, long blkid) throws IOException { - File blockfile = getFile(bpid, blkid, false); - if (blockfile == null) { - return null; + try(AutoCloseableLock lock = datasetLock.acquire()) { + File blockfile = getFile(bpid, blkid, false); + if (blockfile == null) { + return null; + } + final File metafile = FsDatasetUtil.findMetaFile(blockfile); + final long gs = FsDatasetUtil.parseGenerationStamp(blockfile, metafile); + return new Block(blkid, blockfile.length(), gs); } - final File metafile = FsDatasetUtil.findMetaFile(blockfile); - final long gs = FsDatasetUtil.parseGenerationStamp(blockfile, metafile); - return new Block(blkid, blockfile.length(), gs); } @@ -261,7 +267,8 @@ class FsDatasetImpl implements FsDatasetSpi { private boolean blockPinningEnabled; private final int maxDataLength; - + + private final AutoCloseableLock datasetLock; /** * An FSDataset has a directory where it loads its data files. */ @@ -272,6 +279,7 @@ class FsDatasetImpl implements FsDatasetSpi { this.dataStorage = storage; this.conf = conf; this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf); + this.datasetLock = new AutoCloseableLock(); // The number of volumes required for operation is the total number // of volumes minus the number of failed volumes we can tolerate. volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated(); @@ -378,25 +386,27 @@ class FsDatasetImpl implements FsDatasetSpi { * Activate a volume to serve requests. * @throws IOException if the storage UUID already exists. */ - private synchronized void activateVolume( + private void activateVolume( ReplicaMap replicaMap, Storage.StorageDirectory sd, StorageType storageType, FsVolumeReference ref) throws IOException { - DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid()); - if (dnStorage != null) { - final String errorMsg = String.format( - "Found duplicated storage UUID: %s in %s.", - sd.getStorageUuid(), sd.getVersionFile()); - LOG.error(errorMsg); - throw new IOException(errorMsg); + try(AutoCloseableLock lock = datasetLock.acquire()) { + DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid()); + if (dnStorage != null) { + final String errorMsg = String.format( + "Found duplicated storage UUID: %s in %s.", + sd.getStorageUuid(), sd.getVersionFile()); + LOG.error(errorMsg); + throw new IOException(errorMsg); + } + volumeMap.addAll(replicaMap); + storageMap.put(sd.getStorageUuid(), + new DatanodeStorage(sd.getStorageUuid(), + DatanodeStorage.State.NORMAL, + storageType)); + asyncDiskService.addVolume(sd.getCurrentDir()); + volumes.addVolume(ref); } - volumeMap.addAll(replicaMap); - storageMap.put(sd.getStorageUuid(), - new DatanodeStorage(sd.getStorageUuid(), - DatanodeStorage.State.NORMAL, - storageType)); - asyncDiskService.addVolume(sd.getCurrentDir()); - volumes.addVolume(ref); } private void addVolume(Collection dataLocations, @@ -491,7 +501,7 @@ class FsDatasetImpl implements FsDatasetSpi { Map> blkToInvalidate = new HashMap<>(); List storageToRemove = new ArrayList<>(); - synchronized (this) { + try(AutoCloseableLock lock = datasetLock.acquire()) { for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) { Storage.StorageDirectory sd = dataStorage.getStorageDir(idx); final File absRoot = sd.getRoot().getAbsoluteFile(); @@ -537,7 +547,7 @@ class FsDatasetImpl implements FsDatasetSpi { } } - synchronized (this) { + try(AutoCloseableLock lock = datasetLock.acquire()) { for(String storageUuid : storageToRemove) { storageMap.remove(storageUuid); } @@ -746,7 +756,7 @@ class FsDatasetImpl implements FsDatasetSpi { boolean touch) throws IOException { final File f; - synchronized(this) { + try(AutoCloseableLock lock = datasetLock.acquire()) { f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId(), touch); } if (f == null) { @@ -812,22 +822,25 @@ class FsDatasetImpl implements FsDatasetSpi { * Returns handles to the block file and its metadata file */ @Override // FsDatasetSpi - public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, + public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkOffset, long metaOffset) throws IOException { - ReplicaInfo info = getReplicaInfo(b); - FsVolumeReference ref = info.getVolume().obtainReference(); - try { - InputStream blockInStream = openAndSeek(info.getBlockFile(), blkOffset); + try(AutoCloseableLock lock = datasetLock.acquire()) { + ReplicaInfo info = getReplicaInfo(b); + FsVolumeReference ref = info.getVolume().obtainReference(); try { - InputStream metaInStream = openAndSeek(info.getMetaFile(), metaOffset); - return new ReplicaInputStreams(blockInStream, metaInStream, ref); + InputStream blockInStream = openAndSeek(info.getBlockFile(), blkOffset); + try { + InputStream metaInStream = + openAndSeek(info.getMetaFile(), metaOffset); + return new ReplicaInputStreams(blockInStream, metaInStream, ref); + } catch (IOException e) { + IOUtils.cleanup(null, blockInStream); + throw e; + } } catch (IOException e) { - IOUtils.cleanup(null, blockInStream); + IOUtils.cleanup(null, ref); throw e; } - } catch (IOException e) { - IOUtils.cleanup(null, ref); - throw e; } } @@ -946,7 +959,7 @@ class FsDatasetImpl implements FsDatasetSpi { } FsVolumeReference volumeRef = null; - synchronized (this) { + try(AutoCloseableLock lock = datasetLock.acquire()) { volumeRef = volumes.getNextVolume(targetStorageType, block.getNumBytes()); } try { @@ -965,7 +978,7 @@ class FsDatasetImpl implements FsDatasetSpi { newReplicaInfo.setNumBytes(blockFiles[1].length()); // Finalize the copied files newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo); - synchronized (this) { + try(AutoCloseableLock lock = datasetLock.acquire()) { // Increment numBlocks here as this block moved without knowing to BPS FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume(); volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks(); @@ -1097,41 +1110,43 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi - public synchronized ReplicaHandler append(ExtendedBlock b, + public ReplicaHandler append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { - // If the block was successfully finalized because all packets - // were successfully processed at the Datanode but the ack for - // some of the packets were not received by the client. The client - // re-opens the connection and retries sending those packets. - // The other reason is that an "append" is occurring to this block. - - // check the validity of the parameter - if (newGS < b.getGenerationStamp()) { - throw new IOException("The new generation stamp " + newGS + - " should be greater than the replica " + b + "'s generation stamp"); - } - ReplicaInfo replicaInfo = getReplicaInfo(b); - LOG.info("Appending to " + replicaInfo); - if (replicaInfo.getState() != ReplicaState.FINALIZED) { - throw new ReplicaNotFoundException( - ReplicaNotFoundException.UNFINALIZED_REPLICA + b); - } - if (replicaInfo.getNumBytes() != expectedBlockLen) { - throw new IOException("Corrupted replica " + replicaInfo + - " with a length of " + replicaInfo.getNumBytes() + - " expected length is " + expectedBlockLen); - } + try(AutoCloseableLock lock = datasetLock.acquire()) { + // If the block was successfully finalized because all packets + // were successfully processed at the Datanode but the ack for + // some of the packets were not received by the client. The client + // re-opens the connection and retries sending those packets. + // The other reason is that an "append" is occurring to this block. - FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); - ReplicaBeingWritten replica = null; - try { - replica = append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS, - b.getNumBytes()); - } catch (IOException e) { - IOUtils.cleanup(null, ref); - throw e; + // check the validity of the parameter + if (newGS < b.getGenerationStamp()) { + throw new IOException("The new generation stamp " + newGS + + " should be greater than the replica " + b + "'s generation stamp"); + } + ReplicaInfo replicaInfo = getReplicaInfo(b); + LOG.info("Appending to " + replicaInfo); + if (replicaInfo.getState() != ReplicaState.FINALIZED) { + throw new ReplicaNotFoundException( + ReplicaNotFoundException.UNFINALIZED_REPLICA + b); + } + if (replicaInfo.getNumBytes() != expectedBlockLen) { + throw new IOException("Corrupted replica " + replicaInfo + + " with a length of " + replicaInfo.getNumBytes() + + " expected length is " + expectedBlockLen); + } + + FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); + ReplicaBeingWritten replica = null; + try { + replica = append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo, + newGS, b.getNumBytes()); + } catch (IOException e) { + IOUtils.cleanup(null, ref); + throw e; + } + return new ReplicaHandler(replica, ref); } - return new ReplicaHandler(replica, ref); } /** Append to a finalized replica @@ -1146,66 +1161,68 @@ class FsDatasetImpl implements FsDatasetSpi { * @throws IOException if moving the replica from finalized directory * to rbw directory fails */ - private synchronized ReplicaBeingWritten append(String bpid, + private ReplicaBeingWritten append(String bpid, FinalizedReplica replicaInfo, long newGS, long estimateBlockLen) throws IOException { - // If the block is cached, start uncaching it. - cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId()); + try(AutoCloseableLock lock = datasetLock.acquire()) { + // If the block is cached, start uncaching it. + cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId()); - // If there are any hardlinks to the block, break them. This ensures we are - // not appending to a file that is part of a previous/ directory. - replicaInfo.breakHardLinksIfNeeded(); - - // construct a RBW replica with the new GS - File blkfile = replicaInfo.getBlockFile(); - FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume(); - long bytesReserved = estimateBlockLen - replicaInfo.getNumBytes(); - if (v.getAvailable() < bytesReserved) { - throw new DiskOutOfSpaceException("Insufficient space for appending to " - + replicaInfo); - } - File newBlkFile = new File(v.getRbwDir(bpid), replicaInfo.getBlockName()); - File oldmeta = replicaInfo.getMetaFile(); - ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten( - replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS, - v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved); - File newmeta = newReplicaInfo.getMetaFile(); + // If there are any hardlinks to the block, break them. This ensures we + // are not appending to a file that is part of a previous/ directory. + replicaInfo.breakHardLinksIfNeeded(); - // rename meta file to rbw directory - if (LOG.isDebugEnabled()) { - LOG.debug("Renaming " + oldmeta + " to " + newmeta); - } - try { - NativeIO.renameTo(oldmeta, newmeta); - } catch (IOException e) { - throw new IOException("Block " + replicaInfo + " reopen failed. " + - " Unable to move meta file " + oldmeta + - " to rbw dir " + newmeta, e); - } - - // rename block file to rbw directory - if (LOG.isDebugEnabled()) { - LOG.debug("Renaming " + blkfile + " to " + newBlkFile - + ", file length=" + blkfile.length()); - } - try { - NativeIO.renameTo(blkfile, newBlkFile); - } catch (IOException e) { - try { - NativeIO.renameTo(newmeta, oldmeta); - } catch (IOException ex) { - LOG.warn("Cannot move meta file " + newmeta + - "back to the finalized directory " + oldmeta, ex); + // construct a RBW replica with the new GS + File blkfile = replicaInfo.getBlockFile(); + FsVolumeImpl v = (FsVolumeImpl) replicaInfo.getVolume(); + long bytesReserved = estimateBlockLen - replicaInfo.getNumBytes(); + if (v.getAvailable() < bytesReserved) { + throw new DiskOutOfSpaceException("Insufficient space for appending to " + + replicaInfo); } - throw new IOException("Block " + replicaInfo + " reopen failed. " + - " Unable to move block file " + blkfile + - " to rbw dir " + newBlkFile, e); + File newBlkFile = new File(v.getRbwDir(bpid), replicaInfo.getBlockName()); + File oldmeta = replicaInfo.getMetaFile(); + ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten( + replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS, + v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved); + File newmeta = newReplicaInfo.getMetaFile(); + + // rename meta file to rbw directory + if (LOG.isDebugEnabled()) { + LOG.debug("Renaming " + oldmeta + " to " + newmeta); + } + try { + NativeIO.renameTo(oldmeta, newmeta); + } catch (IOException e) { + throw new IOException("Block " + replicaInfo + " reopen failed. " + + " Unable to move meta file " + oldmeta + + " to rbw dir " + newmeta, e); + } + + // rename block file to rbw directory + if (LOG.isDebugEnabled()) { + LOG.debug("Renaming " + blkfile + " to " + newBlkFile + + ", file length=" + blkfile.length()); + } + try { + NativeIO.renameTo(blkfile, newBlkFile); + } catch (IOException e) { + try { + NativeIO.renameTo(newmeta, oldmeta); + } catch (IOException ex) { + LOG.warn("Cannot move meta file " + newmeta + + "back to the finalized directory " + oldmeta, ex); + } + throw new IOException("Block " + replicaInfo + " reopen failed. " + + " Unable to move block file " + blkfile + + " to rbw dir " + newBlkFile, e); + } + + // Replace finalized replica by a RBW replica in replicas map + volumeMap.add(bpid, newReplicaInfo); + v.reserveSpaceForReplica(bytesReserved); + return newReplicaInfo; } - - // Replace finalized replica by a RBW replica in replicas map - volumeMap.add(bpid, newReplicaInfo); - v.reserveSpaceForReplica(bytesReserved); - return newReplicaInfo; } private static class MustStopExistingWriter extends Exception { @@ -1275,7 +1292,7 @@ class FsDatasetImpl implements FsDatasetSpi { while (true) { try { - synchronized (this) { + try(AutoCloseableLock lock = datasetLock.acquire()) { ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); @@ -1307,7 +1324,7 @@ class FsDatasetImpl implements FsDatasetSpi { LOG.info("Recover failed close " + b); while (true) { try { - synchronized (this) { + try(AutoCloseableLock lock = datasetLock.acquire()) { // check replica's state ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); // bump the replica's GS @@ -1354,62 +1371,65 @@ class FsDatasetImpl implements FsDatasetSpi { } @Override // FsDatasetSpi - public synchronized ReplicaHandler createRbw( + public ReplicaHandler createRbw( StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) throws IOException { - ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), - b.getBlockId()); - if (replicaInfo != null) { - throw new ReplicaAlreadyExistsException("Block " + b + - " already exists in state " + replicaInfo.getState() + - " and thus cannot be created."); - } - // create a new block - FsVolumeReference ref = null; + try(AutoCloseableLock lock = datasetLock.acquire()) { + ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), + b.getBlockId()); + if (replicaInfo != null) { + throw new ReplicaAlreadyExistsException("Block " + b + + " already exists in state " + replicaInfo.getState() + + " and thus cannot be created."); + } + // create a new block + FsVolumeReference ref = null; - // Use ramdisk only if block size is a multiple of OS page size. - // This simplifies reservation for partially used replicas - // significantly. - if (allowLazyPersist && - lazyWriter != null && - b.getNumBytes() % cacheManager.getOsPageSize() == 0 && - reserveLockedMemory(b.getNumBytes())) { - try { - // First try to place the block on a transient volume. - ref = volumes.getNextTransientVolume(b.getNumBytes()); - datanode.getMetrics().incrRamDiskBlocksWrite(); - } catch(DiskOutOfSpaceException de) { - // Ignore the exception since we just fall back to persistent storage. - } finally { - if (ref == null) { - cacheManager.release(b.getNumBytes()); + // Use ramdisk only if block size is a multiple of OS page size. + // This simplifies reservation for partially used replicas + // significantly. + if (allowLazyPersist && + lazyWriter != null && + b.getNumBytes() % cacheManager.getOsPageSize() == 0 && + reserveLockedMemory(b.getNumBytes())) { + try { + // First try to place the block on a transient volume. + ref = volumes.getNextTransientVolume(b.getNumBytes()); + datanode.getMetrics().incrRamDiskBlocksWrite(); + } catch (DiskOutOfSpaceException de) { + // Ignore the exception since we just fall back to persistent storage. + } finally { + if (ref == null) { + cacheManager.release(b.getNumBytes()); + } } } + + if (ref == null) { + ref = volumes.getNextVolume(storageType, b.getNumBytes()); + } + + FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); + // create an rbw file to hold block in the designated volume + + if (allowLazyPersist && !v.isTransientStorage()) { + datanode.getMetrics().incrRamDiskBlocksWriteFallback(); + } + + File f; + try { + f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); + } catch (IOException e) { + IOUtils.cleanup(null, ref); + throw e; + } + + ReplicaBeingWritten newReplicaInfo = + new ReplicaBeingWritten(b.getBlockId(), + b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes()); + volumeMap.add(b.getBlockPoolId(), newReplicaInfo); + return new ReplicaHandler(newReplicaInfo, ref); } - - if (ref == null) { - ref = volumes.getNextVolume(storageType, b.getNumBytes()); - } - - FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); - // create an rbw file to hold block in the designated volume - - if (allowLazyPersist && !v.isTransientStorage()) { - datanode.getMetrics().incrRamDiskBlocksWriteFallback(); - } - - File f; - try { - f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); - } catch (IOException e) { - IOUtils.cleanup(null, ref); - throw e; - } - - ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), - b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes()); - volumeMap.add(b.getBlockPoolId(), newReplicaInfo); - return new ReplicaHandler(newReplicaInfo, ref); } @Override // FsDatasetSpi @@ -1420,7 +1440,7 @@ class FsDatasetImpl implements FsDatasetSpi { while (true) { try { - synchronized (this) { + try(AutoCloseableLock lock = datasetLock.acquire()) { ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId()); // check the replica's state @@ -1441,61 +1461,64 @@ class FsDatasetImpl implements FsDatasetSpi { } } - private synchronized ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw, + private ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw, ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { - // check generation stamp - long replicaGenerationStamp = rbw.getGenerationStamp(); - if (replicaGenerationStamp < b.getGenerationStamp() || - replicaGenerationStamp > newGS) { - throw new ReplicaNotFoundException( - ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b + - ". Expected GS range is [" + b.getGenerationStamp() + ", " + - newGS + "]."); - } - - // check replica length - long bytesAcked = rbw.getBytesAcked(); - long numBytes = rbw.getNumBytes(); - if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){ - throw new ReplicaNotFoundException("Unmatched length replica " + - rbw + ": BytesAcked = " + bytesAcked + - " BytesRcvd = " + numBytes + " are not in the range of [" + - minBytesRcvd + ", " + maxBytesRcvd + "]."); - } - - FsVolumeReference ref = rbw.getVolume().obtainReference(); - try { - // Truncate the potentially corrupt portion. - // If the source was client and the last node in the pipeline was lost, - // any corrupt data written after the acked length can go unnoticed. - if (numBytes > bytesAcked) { - final File replicafile = rbw.getBlockFile(); - truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked); - rbw.setNumBytes(bytesAcked); - rbw.setLastChecksumAndDataLen(bytesAcked, null); + try(AutoCloseableLock lock = datasetLock.acquire()) { + // check generation stamp + long replicaGenerationStamp = rbw.getGenerationStamp(); + if (replicaGenerationStamp < b.getGenerationStamp() || + replicaGenerationStamp > newGS) { + throw new ReplicaNotFoundException( + ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b + + ". Expected GS range is [" + b.getGenerationStamp() + ", " + + newGS + "]."); } - // bump the replica's generation stamp to newGS - bumpReplicaGS(rbw, newGS); - } catch (IOException e) { - IOUtils.cleanup(null, ref); - throw e; + // check replica length + long bytesAcked = rbw.getBytesAcked(); + long numBytes = rbw.getNumBytes(); + if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd) { + throw new ReplicaNotFoundException("Unmatched length replica " + + rbw + ": BytesAcked = " + bytesAcked + + " BytesRcvd = " + numBytes + " are not in the range of [" + + minBytesRcvd + ", " + maxBytesRcvd + "]."); + } + + FsVolumeReference ref = rbw.getVolume().obtainReference(); + try { + // Truncate the potentially corrupt portion. + // If the source was client and the last node in the pipeline was lost, + // any corrupt data written after the acked length can go unnoticed. + if (numBytes > bytesAcked) { + final File replicafile = rbw.getBlockFile(); + truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked); + rbw.setNumBytes(bytesAcked); + rbw.setLastChecksumAndDataLen(bytesAcked, null); + } + + // bump the replica's generation stamp to newGS + bumpReplicaGS(rbw, newGS); + } catch (IOException e) { + IOUtils.cleanup(null, ref); + throw e; + } + return new ReplicaHandler(rbw, ref); } - return new ReplicaHandler(rbw, ref); } @Override // FsDatasetSpi - public synchronized ReplicaInPipeline convertTemporaryToRbw( + public ReplicaInPipeline convertTemporaryToRbw( final ExtendedBlock b) throws IOException { - final long blockId = b.getBlockId(); - final long expectedGs = b.getGenerationStamp(); - final long visible = b.getNumBytes(); - LOG.info("Convert " + b + " from Temporary to RBW, visible length=" - + visible); + try(AutoCloseableLock lock = datasetLock.acquire()) { + final long blockId = b.getBlockId(); + final long expectedGs = b.getGenerationStamp(); + final long visible = b.getNumBytes(); + LOG.info("Convert " + b + " from Temporary to RBW, visible length=" + + visible); + + final ReplicaInPipeline temp; - final ReplicaInPipeline temp; - { // get replica final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), blockId); if (r == null) { @@ -1507,43 +1530,44 @@ class FsDatasetImpl implements FsDatasetSpi { throw new ReplicaAlreadyExistsException( "r.getState() != ReplicaState.TEMPORARY, r=" + r); } - temp = (ReplicaInPipeline)r; - } - // check generation stamp - if (temp.getGenerationStamp() != expectedGs) { - throw new ReplicaAlreadyExistsException( - "temp.getGenerationStamp() != expectedGs = " + expectedGs - + ", temp=" + temp); - } + temp = (ReplicaInPipeline) r; - // TODO: check writer? - // set writer to the current thread - // temp.setWriter(Thread.currentThread()); + // check generation stamp + if (temp.getGenerationStamp() != expectedGs) { + throw new ReplicaAlreadyExistsException( + "temp.getGenerationStamp() != expectedGs = " + expectedGs + + ", temp=" + temp); + } - // check length - final long numBytes = temp.getNumBytes(); - if (numBytes < visible) { - throw new IOException(numBytes + " = numBytes < visible = " - + visible + ", temp=" + temp); + // TODO: check writer? + // set writer to the current thread + // temp.setWriter(Thread.currentThread()); + + // check length + final long numBytes = temp.getNumBytes(); + if (numBytes < visible) { + throw new IOException(numBytes + " = numBytes < visible = " + + visible + ", temp=" + temp); + } + // check volume + final FsVolumeImpl v = (FsVolumeImpl) temp.getVolume(); + if (v == null) { + throw new IOException("r.getVolume() = null, temp=" + temp); + } + + // move block files to the rbw directory + BlockPoolSlice bpslice = v.getBlockPoolSlice(b.getBlockPoolId()); + final File dest = moveBlockFiles(b.getLocalBlock(), temp.getBlockFile(), + bpslice.getRbwDir()); + // create RBW + final ReplicaBeingWritten rbw = new ReplicaBeingWritten( + blockId, numBytes, expectedGs, + v, dest.getParentFile(), Thread.currentThread(), 0); + rbw.setBytesAcked(visible); + // overwrite the RBW in the volume map + volumeMap.add(b.getBlockPoolId(), rbw); + return rbw; } - // check volume - final FsVolumeImpl v = (FsVolumeImpl)temp.getVolume(); - if (v == null) { - throw new IOException("r.getVolume() = null, temp=" + temp); - } - - // move block files to the rbw directory - BlockPoolSlice bpslice = v.getBlockPoolSlice(b.getBlockPoolId()); - final File dest = moveBlockFiles(b.getLocalBlock(), temp.getBlockFile(), - bpslice.getRbwDir()); - // create RBW - final ReplicaBeingWritten rbw = new ReplicaBeingWritten( - blockId, numBytes, expectedGs, - v, dest.getParentFile(), Thread.currentThread(), 0); - rbw.setBytesAcked(visible); - // overwrite the RBW in the volume map - volumeMap.add(b.getBlockPoolId(), rbw); - return rbw; } @Override // FsDatasetSpi @@ -1553,7 +1577,7 @@ class FsDatasetImpl implements FsDatasetSpi { long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout(); ReplicaInfo lastFoundReplicaInfo = null; do { - synchronized (this) { + try(AutoCloseableLock lock = datasetLock.acquire()) { ReplicaInfo currentReplicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (currentReplicaInfo == lastFoundReplicaInfo) { @@ -1632,72 +1656,82 @@ class FsDatasetImpl implements FsDatasetSpi { * Complete the block write! */ @Override // FsDatasetSpi - public synchronized void finalizeBlock(ExtendedBlock b) throws IOException { - if (Thread.interrupted()) { - // Don't allow data modifications from interrupted threads - throw new IOException("Cannot finalize block from Interrupted Thread"); + public void finalizeBlock(ExtendedBlock b) throws IOException { + try(AutoCloseableLock lock = datasetLock.acquire()) { + if (Thread.interrupted()) { + // Don't allow data modifications from interrupted threads + throw new IOException("Cannot finalize block from Interrupted Thread"); + } + ReplicaInfo replicaInfo = getReplicaInfo(b); + if (replicaInfo.getState() == ReplicaState.FINALIZED) { + // this is legal, when recovery happens on a file that has + // been opened for append but never modified + return; + } + finalizeReplica(b.getBlockPoolId(), replicaInfo); } - ReplicaInfo replicaInfo = getReplicaInfo(b); - if (replicaInfo.getState() == ReplicaState.FINALIZED) { - // this is legal, when recovery happens on a file that has - // been opened for append but never modified - return; - } - finalizeReplica(b.getBlockPoolId(), replicaInfo); } - private synchronized FinalizedReplica finalizeReplica(String bpid, + private FinalizedReplica finalizeReplica(String bpid, ReplicaInfo replicaInfo) throws IOException { - FinalizedReplica newReplicaInfo = null; - if (replicaInfo.getState() == ReplicaState.RUR && - ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica().getState() == - ReplicaState.FINALIZED) { - newReplicaInfo = (FinalizedReplica) - ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica(); - } else { - FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume(); - File f = replicaInfo.getBlockFile(); - if (v == null) { - throw new IOException("No volume for temporary file " + f + - " for block " + replicaInfo); - } + try(AutoCloseableLock lock = datasetLock.acquire()) { + FinalizedReplica newReplicaInfo = null; + if (replicaInfo.getState() == ReplicaState.RUR && + ((ReplicaUnderRecovery) replicaInfo).getOriginalReplica().getState() + == ReplicaState.FINALIZED) { + newReplicaInfo = (FinalizedReplica) + ((ReplicaUnderRecovery) replicaInfo).getOriginalReplica(); + } else { + FsVolumeImpl v = (FsVolumeImpl) replicaInfo.getVolume(); + File f = replicaInfo.getBlockFile(); + if (v == null) { + throw new IOException("No volume for temporary file " + f + + " for block " + replicaInfo); + } - File dest = v.addFinalizedBlock( - bpid, replicaInfo, f, replicaInfo.getBytesReserved()); - newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile()); + File dest = v.addFinalizedBlock( + bpid, replicaInfo, f, replicaInfo.getBytesReserved()); + newReplicaInfo = + new FinalizedReplica(replicaInfo, v, dest.getParentFile()); - if (v.isTransientStorage()) { - releaseLockedMemory( - replicaInfo.getOriginalBytesReserved() - replicaInfo.getNumBytes(), - false); - ramDiskReplicaTracker.addReplica( - bpid, replicaInfo.getBlockId(), v, replicaInfo.getNumBytes()); - datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes()); + if (v.isTransientStorage()) { + releaseLockedMemory( + replicaInfo.getOriginalBytesReserved() + - replicaInfo.getNumBytes(), + false); + ramDiskReplicaTracker.addReplica( + bpid, replicaInfo.getBlockId(), v, replicaInfo.getNumBytes()); + datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes()); + } } + volumeMap.add(bpid, newReplicaInfo); + + return newReplicaInfo; } - volumeMap.add(bpid, newReplicaInfo); - - return newReplicaInfo; } /** * Remove the temporary block file (if any) */ @Override // FsDatasetSpi - public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException { - ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), - b.getLocalBlock()); - if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) { - // remove from volumeMap - volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock()); + public void unfinalizeBlock(ExtendedBlock b) throws IOException { + try(AutoCloseableLock lock = datasetLock.acquire()) { + ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), + b.getLocalBlock()); + if (replicaInfo != null + && replicaInfo.getState() == ReplicaState.TEMPORARY) { + // remove from volumeMap + volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock()); - // delete the on-disk temp file - if (delBlockFromDisk(replicaInfo.getBlockFile(), - replicaInfo.getMetaFile(), b.getLocalBlock())) { - LOG.warn("Block " + b + " unfinalized and removed. " ); - } - if (replicaInfo.getVolume().isTransientStorage()) { - ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true); + // delete the on-disk temp file + if (delBlockFromDisk(replicaInfo.getBlockFile(), + replicaInfo.getMetaFile(), b.getLocalBlock())) { + LOG.warn("Block " + b + " unfinalized and removed. "); + } + if (replicaInfo.getVolume().isTransientStorage()) { + ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(), + b.getBlockId(), true); + } } } } @@ -1740,7 +1774,7 @@ class FsDatasetImpl implements FsDatasetSpi { builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength)); } - synchronized(this) { + try(AutoCloseableLock lock = datasetLock.acquire()) { for (ReplicaInfo b : volumeMap.replicas(bpid)) { switch(b.getState()) { case FINALIZED: @@ -1778,31 +1812,36 @@ class FsDatasetImpl implements FsDatasetSpi { * Get the list of finalized blocks from in-memory blockmap for a block pool. */ @Override - public synchronized List getFinalizedBlocks(String bpid) { - ArrayList finalized = - new ArrayList(volumeMap.size(bpid)); - for (ReplicaInfo b : volumeMap.replicas(bpid)) { - if(b.getState() == ReplicaState.FINALIZED) { - finalized.add(new FinalizedReplica((FinalizedReplica)b)); + public List getFinalizedBlocks(String bpid) { + try(AutoCloseableLock lock = datasetLock.acquire()) { + ArrayList finalized = + new ArrayList(volumeMap.size(bpid)); + for (ReplicaInfo b : volumeMap.replicas(bpid)) { + if (b.getState() == ReplicaState.FINALIZED) { + finalized.add(new FinalizedReplica((FinalizedReplica) b)); + } } + return finalized; } - return finalized; } /** * Get the list of finalized blocks from in-memory blockmap for a block pool. */ @Override - public synchronized List getFinalizedBlocksOnPersistentStorage(String bpid) { - ArrayList finalized = - new ArrayList(volumeMap.size(bpid)); - for (ReplicaInfo b : volumeMap.replicas(bpid)) { - if(!b.getVolume().isTransientStorage() && - b.getState() == ReplicaState.FINALIZED) { - finalized.add(new FinalizedReplica((FinalizedReplica)b)); + public List + getFinalizedBlocksOnPersistentStorage(String bpid) { + try(AutoCloseableLock lock = datasetLock.acquire()) { + ArrayList finalized = + new ArrayList(volumeMap.size(bpid)); + for (ReplicaInfo b : volumeMap.replicas(bpid)) { + if (!b.getVolume().isTransientStorage() && + b.getState() == ReplicaState.FINALIZED) { + finalized.add(new FinalizedReplica((FinalizedReplica) b)); + } } + return finalized; } - return finalized; } /** @@ -1878,7 +1917,7 @@ class FsDatasetImpl implements FsDatasetSpi { File validateBlockFile(String bpid, long blockId) { //Should we check for metadata file too? final File f; - synchronized(this) { + try(AutoCloseableLock lock = datasetLock.acquire()) { f = getFile(bpid, blockId, false); } @@ -1927,7 +1966,7 @@ class FsDatasetImpl implements FsDatasetSpi { for (int i = 0; i < invalidBlks.length; i++) { final File f; final FsVolumeImpl v; - synchronized (this) { + try(AutoCloseableLock lock = datasetLock.acquire()) { final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]); if (info == null) { // It is okay if the block is not found -- it may be deleted earlier. @@ -2038,7 +2077,7 @@ class FsDatasetImpl implements FsDatasetSpi { long length, genstamp; Executor volumeExecutor; - synchronized (this) { + try(AutoCloseableLock lock = datasetLock.acquire()) { ReplicaInfo info = volumeMap.get(bpid, blockId); boolean success = false; try { @@ -2105,9 +2144,11 @@ class FsDatasetImpl implements FsDatasetSpi { } @Override // FsDatasetSpi - public synchronized boolean contains(final ExtendedBlock block) { - final long blockId = block.getLocalBlock().getBlockId(); - return getFile(block.getBlockPoolId(), blockId, false) != null; + public boolean contains(final ExtendedBlock block) { + try(AutoCloseableLock lock = datasetLock.acquire()) { + final long blockId = block.getLocalBlock().getBlockId(); + return getFile(block.getBlockPoolId(), blockId, false) != null; + } } /** @@ -2233,7 +2274,7 @@ class FsDatasetImpl implements FsDatasetSpi { File diskMetaFile, FsVolumeSpi vol) throws IOException { Block corruptBlock = null; ReplicaInfo memBlockInfo; - synchronized (this) { + try(AutoCloseableLock lock = datasetLock.acquire()) { memBlockInfo = volumeMap.get(bpid, blockId); if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) { // Block is not finalized - ignore the difference @@ -2389,9 +2430,11 @@ class FsDatasetImpl implements FsDatasetSpi { } @Override - public synchronized String getReplicaString(String bpid, long blockId) { - final Replica r = volumeMap.get(bpid, blockId); - return r == null? "null": r.toString(); + public String getReplicaString(String bpid, long blockId) { + try(AutoCloseableLock lock = datasetLock.acquire()) { + final Replica r = volumeMap.get(bpid, blockId); + return r == null ? "null" : r.toString(); + } } @Override // FsDatasetSpi @@ -2484,67 +2527,69 @@ class FsDatasetImpl implements FsDatasetSpi { } @Override // FsDatasetSpi - public synchronized Replica updateReplicaUnderRecovery( + public Replica updateReplicaUnderRecovery( final ExtendedBlock oldBlock, final long recoveryId, final long newBlockId, final long newlength) throws IOException { - //get replica - final String bpid = oldBlock.getBlockPoolId(); - final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId()); - LOG.info("updateReplica: " + oldBlock - + ", recoveryId=" + recoveryId - + ", length=" + newlength - + ", replica=" + replica); - - //check replica - if (replica == null) { - throw new ReplicaNotFoundException(oldBlock); - } - - //check replica state - if (replica.getState() != ReplicaState.RUR) { - throw new IOException("replica.getState() != " + ReplicaState.RUR + try(AutoCloseableLock lock = datasetLock.acquire()) { + //get replica + final String bpid = oldBlock.getBlockPoolId(); + final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId()); + LOG.info("updateReplica: " + oldBlock + + ", recoveryId=" + recoveryId + + ", length=" + newlength + ", replica=" + replica); + + //check replica + if (replica == null) { + throw new ReplicaNotFoundException(oldBlock); + } + + //check replica state + if (replica.getState() != ReplicaState.RUR) { + throw new IOException("replica.getState() != " + ReplicaState.RUR + + ", replica=" + replica); + } + + //check replica's byte on disk + if (replica.getBytesOnDisk() != oldBlock.getNumBytes()) { + throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:" + + " replica.getBytesOnDisk() != block.getNumBytes(), block=" + + oldBlock + ", replica=" + replica); + } + + //check replica files before update + checkReplicaFiles(replica); + + //update replica + final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock + .getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId, + newBlockId, newlength); + + boolean copyTruncate = newBlockId != oldBlock.getBlockId(); + if (!copyTruncate) { + assert finalized.getBlockId() == oldBlock.getBlockId() + && finalized.getGenerationStamp() == recoveryId + && finalized.getNumBytes() == newlength + : "Replica information mismatched: oldBlock=" + oldBlock + + ", recoveryId=" + recoveryId + ", newlength=" + newlength + + ", newBlockId=" + newBlockId + ", finalized=" + finalized; + } else { + assert finalized.getBlockId() == oldBlock.getBlockId() + && finalized.getGenerationStamp() == oldBlock.getGenerationStamp() + && finalized.getNumBytes() == oldBlock.getNumBytes() + : "Finalized and old information mismatched: oldBlock=" + oldBlock + + ", genStamp=" + oldBlock.getGenerationStamp() + + ", len=" + oldBlock.getNumBytes() + + ", finalized=" + finalized; + } + + //check replica files after update + checkReplicaFiles(finalized); + + return finalized; } - - //check replica's byte on disk - if (replica.getBytesOnDisk() != oldBlock.getNumBytes()) { - throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:" - + " replica.getBytesOnDisk() != block.getNumBytes(), block=" - + oldBlock + ", replica=" + replica); - } - - //check replica files before update - checkReplicaFiles(replica); - - //update replica - final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock - .getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId, - newBlockId, newlength); - - boolean copyTruncate = newBlockId != oldBlock.getBlockId(); - if(!copyTruncate) { - assert finalized.getBlockId() == oldBlock.getBlockId() - && finalized.getGenerationStamp() == recoveryId - && finalized.getNumBytes() == newlength - : "Replica information mismatched: oldBlock=" + oldBlock - + ", recoveryId=" + recoveryId + ", newlength=" + newlength - + ", newBlockId=" + newBlockId + ", finalized=" + finalized; - } else { - assert finalized.getBlockId() == oldBlock.getBlockId() - && finalized.getGenerationStamp() == oldBlock.getGenerationStamp() - && finalized.getNumBytes() == oldBlock.getNumBytes() - : "Finalized and old information mismatched: oldBlock=" + oldBlock - + ", genStamp=" + oldBlock.getGenerationStamp() - + ", len=" + oldBlock.getNumBytes() - + ", finalized=" + finalized; - } - - //check replica files after update - checkReplicaFiles(finalized); - - return finalized; } private FinalizedReplica updateReplicaUnderRecovery( @@ -2622,23 +2667,25 @@ class FsDatasetImpl implements FsDatasetSpi { } @Override // FsDatasetSpi - public synchronized long getReplicaVisibleLength(final ExtendedBlock block) + public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException { - final Replica replica = getReplicaInfo(block.getBlockPoolId(), - block.getBlockId()); - if (replica.getGenerationStamp() < block.getGenerationStamp()) { - throw new IOException( - "replica.getGenerationStamp() < block.getGenerationStamp(), block=" - + block + ", replica=" + replica); + try(AutoCloseableLock lock = datasetLock.acquire()) { + final Replica replica = getReplicaInfo(block.getBlockPoolId(), + block.getBlockId()); + if (replica.getGenerationStamp() < block.getGenerationStamp()) { + throw new IOException( + "replica.getGenerationStamp() < block.getGenerationStamp(), block=" + + block + ", replica=" + replica); + } + return replica.getVisibleLength(); } - return replica.getVisibleLength(); } @Override public void addBlockPool(String bpid, Configuration conf) throws IOException { LOG.info("Adding block pool " + bpid); - synchronized(this) { + try(AutoCloseableLock lock = datasetLock.acquire()) { volumes.addBlockPool(bpid, conf); volumeMap.initBlockPool(bpid); } @@ -2646,11 +2693,14 @@ class FsDatasetImpl implements FsDatasetSpi { } @Override - public synchronized void shutdownBlockPool(String bpid) { - LOG.info("Removing block pool " + bpid); - Map blocksPerVolume = getBlockReports(bpid); - volumeMap.cleanUpBlockPool(bpid); - volumes.removeBlockPool(bpid, blocksPerVolume); + public void shutdownBlockPool(String bpid) { + try(AutoCloseableLock lock = datasetLock.acquire()) { + LOG.info("Removing block pool " + bpid); + Map blocksPerVolume = + getBlockReports(bpid); + volumeMap.cleanUpBlockPool(bpid); + volumes.removeBlockPool(bpid, blocksPerVolume); + } } /** @@ -2713,35 +2763,38 @@ class FsDatasetImpl implements FsDatasetSpi { } @Override //FsDatasetSpi - public synchronized void deleteBlockPool(String bpid, boolean force) + public void deleteBlockPool(String bpid, boolean force) throws IOException { - List curVolumes = volumes.getVolumes(); - if (!force) { + try(AutoCloseableLock lock = datasetLock.acquire()) { + List curVolumes = volumes.getVolumes(); + if (!force) { + for (FsVolumeImpl volume : curVolumes) { + try (FsVolumeReference ref = volume.obtainReference()) { + if (!volume.isBPDirEmpty(bpid)) { + LOG.warn(bpid + + " has some block files, cannot delete unless forced"); + throw new IOException("Cannot delete block pool, " + + "it contains some block files"); + } + } catch (ClosedChannelException e) { + // ignore. + } + } + } for (FsVolumeImpl volume : curVolumes) { try (FsVolumeReference ref = volume.obtainReference()) { - if (!volume.isBPDirEmpty(bpid)) { - LOG.warn(bpid + " has some block files, cannot delete unless forced"); - throw new IOException("Cannot delete block pool, " - + "it contains some block files"); - } + volume.deleteBPDirectories(bpid, force); } catch (ClosedChannelException e) { // ignore. } } } - for (FsVolumeImpl volume : curVolumes) { - try (FsVolumeReference ref = volume.obtainReference()) { - volume.deleteBPDirectories(bpid, force); - } catch (ClosedChannelException e) { - // ignore. - } - } } @Override // FsDatasetSpi public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block) throws IOException { - synchronized(this) { + try(AutoCloseableLock lock = datasetLock.acquire()) { final Replica replica = volumeMap.get(block.getBlockPoolId(), block.getBlockId()); if (replica == null) { @@ -2834,7 +2887,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override public void onCompleteLazyPersist(String bpId, long blockId, long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) { - synchronized (FsDatasetImpl.this) { + try(AutoCloseableLock lock = datasetLock.acquire()) { ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles); targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length() @@ -2968,7 +3021,7 @@ class FsDatasetImpl implements FsDatasetSpi { try { block = ramDiskReplicaTracker.dequeueNextReplicaToPersist(); if (block != null) { - synchronized (FsDatasetImpl.this) { + try(AutoCloseableLock lock = datasetLock.acquire()) { replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId()); // If replicaInfo is null, the block was either deleted before @@ -3038,7 +3091,7 @@ class FsDatasetImpl implements FsDatasetSpi { long blockFileUsed, metaFileUsed; final String bpid = replicaState.getBlockPoolId(); - synchronized (FsDatasetImpl.this) { + try(AutoCloseableLock lock = datasetLock.acquire()) { replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(), replicaState.getBlockId()); Preconditions.checkState(replicaInfo.getVolume().isTransientStorage()); @@ -3147,7 +3200,12 @@ class FsDatasetImpl implements FsDatasetSpi { return s != null ? s.contains(blockId) : false; } } - + + @Override + public AutoCloseableLock acquireDatasetLock() { + return datasetLock.acquire(); + } + public void removeDeletedBlocks(String bpid, Set blockIds) { synchronized (deletingBlock) { Set s = deletingBlock.get(bpid); @@ -3210,14 +3268,17 @@ class FsDatasetImpl implements FsDatasetSpi { return cacheManager.reserve(bytesNeeded) > 0; } - synchronized void stopAllDataxceiverThreads(FsVolumeImpl volume) { - for (String blockPoolId : volumeMap.getBlockPoolList()) { - Collection replicas = volumeMap.replicas(blockPoolId); - for (ReplicaInfo replicaInfo : replicas) { - if (replicaInfo instanceof ReplicaInPipeline - && replicaInfo.getVolume().equals(volume)) { - ReplicaInPipeline replicaInPipeline = (ReplicaInPipeline) replicaInfo; - replicaInPipeline.interruptThread(); + void stopAllDataxceiverThreads(FsVolumeImpl volume) { + try(AutoCloseableLock lock = datasetLock.acquire()) { + for (String blockPoolId : volumeMap.getBlockPoolList()) { + Collection replicas = volumeMap.replicas(blockPoolId); + for (ReplicaInfo replicaInfo : replicas) { + if (replicaInfo instanceof ReplicaInPipeline + && replicaInfo.getVolume().equals(volume)) { + ReplicaInPipeline replicaInPipeline = + (ReplicaInPipeline) replicaInfo; + replicaInPipeline.interruptThread(); + } } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 07d91acd6e5..361263d9939 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.CloseableReferenceCount; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.StringUtils; @@ -303,7 +304,7 @@ public class FsVolumeImpl implements FsVolumeSpi { private void decDfsUsedAndNumBlocks(String bpid, long value, boolean blockFileDeleted) { - synchronized(dataset) { + try(AutoCloseableLock lock = dataset.acquireDatasetLock()) { BlockPoolSlice bp = bpSlices.get(bpid); if (bp != null) { bp.decDfsUsed(value); @@ -315,7 +316,7 @@ public class FsVolumeImpl implements FsVolumeSpi { } void incDfsUsedAndNumBlocks(String bpid, long value) { - synchronized (dataset) { + try(AutoCloseableLock lock = dataset.acquireDatasetLock()) { BlockPoolSlice bp = bpSlices.get(bpid); if (bp != null) { bp.incDfsUsed(value); @@ -325,7 +326,7 @@ public class FsVolumeImpl implements FsVolumeSpi { } void incDfsUsed(String bpid, long value) { - synchronized(dataset) { + try(AutoCloseableLock lock = dataset.acquireDatasetLock()) { BlockPoolSlice bp = bpSlices.get(bpid); if (bp != null) { bp.incDfsUsed(value); @@ -336,7 +337,7 @@ public class FsVolumeImpl implements FsVolumeSpi { @VisibleForTesting public long getDfsUsed() throws IOException { long dfsUsed = 0; - synchronized(dataset) { + try(AutoCloseableLock lock = dataset.acquireDatasetLock()) { for(BlockPoolSlice s : bpSlices.values()) { dfsUsed += s.getDfsUsed(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index a2ae209adf3..a13dcaa0480 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -62,6 +62,7 @@ import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.DataChecksum; /** @@ -113,6 +114,8 @@ public class SimulatedFSDataset implements FsDatasetSpi { "dfs.datanode.simulateddatastorage.state"; private static final DatanodeStorage.State DEFAULT_STATE = DatanodeStorage.State.NORMAL; + + private final AutoCloseableLock datasetLock; static final byte[] nullCrcFileData; static { @@ -550,6 +553,7 @@ public class SimulatedFSDataset implements FsDatasetSpi { conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY), conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE)); this.volume = new SimulatedVolume(this.storage); + this.datasetLock = new AutoCloseableLock(); } public synchronized void injectBlocks(String bpid, @@ -1365,5 +1369,10 @@ public class SimulatedFSDataset implements FsDatasetSpi { public boolean isDeletingBlock(String bpid, long blockId) { throw new UnsupportedOperationException(); } + + @Override + public AutoCloseableLock acquireDatasetLock() { + return datasetLock.acquire(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 513e9a8ca99..a11cfb9cfc0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -85,6 +85,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Time; import org.apache.log4j.Level; @@ -693,7 +694,7 @@ public class TestBlockRecovery { final RecoveringBlock recoveringBlock = new RecoveringBlock( block.getBlock(), locations, block.getBlock() .getGenerationStamp() + 1); - synchronized (dataNode.data) { + try(AutoCloseableLock lock = dataNode.data.acquireDatasetLock()) { Thread.sleep(2000); dataNode.initReplicaRecovery(recoveringBlock); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index 8e8405de089..3da7ad422bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.Time; import org.junit.Before; import org.junit.Test; @@ -109,7 +110,7 @@ public class TestDirectoryScanner { /** Truncate a block file */ private long truncateBlockFile() throws IOException { - synchronized (fds) { + try(AutoCloseableLock lock = fds.acquireDatasetLock()) { for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) { File f = b.getBlockFile(); File mf = b.getMetaFile(); @@ -134,7 +135,7 @@ public class TestDirectoryScanner { /** Delete a block file */ private long deleteBlockFile() { - synchronized(fds) { + try(AutoCloseableLock lock = fds.acquireDatasetLock()) { for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) { File f = b.getBlockFile(); File mf = b.getMetaFile(); @@ -150,7 +151,7 @@ public class TestDirectoryScanner { /** Delete block meta file */ private long deleteMetaFile() { - synchronized(fds) { + try(AutoCloseableLock lock = fds.acquireDatasetLock()) { for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) { File file = b.getMetaFile(); // Delete a metadata file @@ -169,7 +170,7 @@ public class TestDirectoryScanner { * @throws IOException */ private void duplicateBlock(long blockId) throws IOException { - synchronized (fds) { + try(AutoCloseableLock lock = fds.acquireDatasetLock()) { ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId); try (FsDatasetSpi.FsVolumeReferences volumes = fds.getFsVolumeReferences()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 155202b7c58..0f69ebfc6f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.util.AutoCloseableLock; public class ExternalDatasetImpl implements FsDatasetSpi { @@ -448,4 +449,9 @@ public class ExternalDatasetImpl implements FsDatasetSpi { public boolean isDeletingBlock(String bpid, long blockId) { return false; } + + @Override + public AutoCloseableLock acquireDatasetLock() { + return null; + } }