diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 3e3bab34b79..5df6200dd64 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -227,6 +227,9 @@ Release 2.8.0 - UNRELEASED HDFS-8394. Move getAdditionalBlock() and related functionalities into a separate class. (wheat9) + HDFS-8157. Writes to RAM DISK reserve locked memory for block files. + (Arpit Agarwal) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java index cc55f85c2ae..0eb143a4a52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java @@ -51,7 +51,8 @@ public class ReplicaInPipeline extends ReplicaInfo * the bytes already written to this block. */ private long bytesReserved; - + private final long originalBytesReserved; + /** * Constructor for a zero length replica * @param blockId block id @@ -97,6 +98,7 @@ public ReplicaInPipeline(long blockId, long genStamp, this.bytesOnDisk = len; this.writer = writer; this.bytesReserved = bytesToReserve; + this.originalBytesReserved = bytesToReserve; } /** @@ -109,6 +111,7 @@ public ReplicaInPipeline(ReplicaInPipeline from) { this.bytesOnDisk = from.getBytesOnDisk(); this.writer = from.writer; this.bytesReserved = from.bytesReserved; + this.originalBytesReserved = from.originalBytesReserved; } @Override @@ -148,9 +151,15 @@ public long getBytesReserved() { return bytesReserved; } + @Override + public long getOriginalBytesReserved() { + return originalBytesReserved; + } + @Override public void releaseAllBytesReserved() { // ReplicaInPipelineInterface getVolume().releaseReservedSpace(bytesReserved); + getVolume().releaseLockedMemory(bytesReserved); bytesReserved = 0; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java index 940d3eb5164..136d8a93bce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java @@ -218,7 +218,17 @@ public void setUnlinked() { public long getBytesReserved() { return 0; } - + + /** + * Number of bytes originally reserved for this replica. The actual + * reservation is adjusted as data is written to disk. + * + * @return the number of bytes originally reserved for this replica. + */ + public long getOriginalBytesReserved() { + return 0; + } + /** * Copy specified file into a temporary file. Then rename the * temporary file to the original name. This will cause any diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java index e6ace44a05c..d34022d6b77 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java @@ -72,6 +72,14 @@ public interface FsVolumeSpi { /** Returns true if the volume is NOT backed by persistent storage. */ public boolean isTransientStorage(); + /** + * Release reserved memory for an RBW block written to transient storage + * i.e. RAM. + * bytesToRelease will be rounded down to the OS page size since locked + * memory reservation must always be a multiple of the page size. + */ + public void releaseLockedMemory(long bytesToRelease); + /** * BlockIterator will return ExtendedBlock entries from a block pool in * this volume. The entries will be returned in sorted order.

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 5bc9c680810..b3546d11511 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -481,7 +481,7 @@ private void addReplicaToReplicasMap(Block block, ReplicaMap volumeMap, // eventually. if (newReplica.getVolume().isTransientStorage()) { lazyWriteReplicaMap.addReplica(bpid, blockId, - (FsVolumeImpl) newReplica.getVolume()); + (FsVolumeImpl) newReplica.getVolume(), 0); } else { lazyWriteReplicaMap.discardReplica(bpid, blockId, false); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java index c1d3990e22b..fdc9f83d52d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java @@ -20,7 +20,6 @@ import java.io.File; import java.io.FileDescriptor; -import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -277,7 +276,8 @@ private boolean moveFiles() { @Override public void run() { - long dfsBytes = blockFile.length() + metaFile.length(); + final long blockLength = blockFile.length(); + final long metaLength = metaFile.length(); boolean result; result = (trashDirectory == null) ? deleteFiles() : moveFiles(); @@ -291,7 +291,8 @@ public void run() { if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){ datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID()); } - volume.decDfsUsed(block.getBlockPoolId(), dfsBytes); + volume.onBlockFileDeletion(block.getBlockPoolId(), blockLength); + volume.onMetaFileDeletion(block.getBlockPoolId(), metaLength); LOG.info("Deleted " + block.getBlockPoolId() + " " + block.getLocalBlock() + " file " + blockFile); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java index e0df0f2e96f..6f524b28907 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java @@ -151,10 +151,15 @@ public static class PageRounder { /** * Round up a number to the operating system page size. */ - public long round(long count) { - long newCount = - (count + (osPageSize - 1)) / osPageSize; - return newCount * osPageSize; + public long roundUp(long count) { + return (count + osPageSize - 1) & (~(osPageSize - 1)); + } + + /** + * Round down a number to the operating system page size. + */ + public long roundDown(long count) { + return count & (~(osPageSize - 1)); } } @@ -173,7 +178,7 @@ private class UsedBytesCount { * -1 if we failed. */ long reserve(long count) { - count = rounder.round(count); + count = rounder.roundUp(count); while (true) { long cur = usedBytes.get(); long next = cur + count; @@ -195,10 +200,23 @@ long reserve(long count) { * @return The new number of usedBytes. */ long release(long count) { - count = rounder.round(count); + count = rounder.roundUp(count); return usedBytes.addAndGet(-count); } - + + /** + * Release some bytes that we're using rounded down to the page size. + * + * @param count The number of bytes to release. We will round this + * down to the page size. + * + * @return The new number of usedBytes. + */ + long releaseRoundDown(long count) { + count = rounder.roundDown(count); + return usedBytes.addAndGet(-count); + } + long get() { return usedBytes.get(); } @@ -340,6 +358,52 @@ synchronized void uncacheBlock(String bpid, long blockId) { } } + /** + * Try to reserve more bytes. + * + * @param count The number of bytes to add. We will round this + * up to the page size. + * + * @return The new number of usedBytes if we succeeded; + * -1 if we failed. + */ + long reserve(long count) { + return usedBytesCount.reserve(count); + } + + /** + * Release some bytes that we're using. + * + * @param count The number of bytes to release. We will round this + * up to the page size. + * + * @return The new number of usedBytes. + */ + long release(long count) { + return usedBytesCount.release(count); + } + + /** + * Release some bytes that we're using rounded down to the page size. + * + * @param count The number of bytes to release. We will round this + * down to the page size. + * + * @return The new number of usedBytes. + */ + long releaseRoundDown(long count) { + return usedBytesCount.releaseRoundDown(count); + } + + /** + * Get the OS page size. + * + * @return the OS page size. + */ + long getOsPageSize() { + return usedBytesCount.rounder.osPageSize; + } + /** * Background worker that mmaps, mlocks, and checksums a block */ @@ -363,7 +427,7 @@ public void run() { MappableBlock mappableBlock = null; ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(), key.getBlockId(), length, genstamp); - long newUsedBytes = usedBytesCount.reserve(length); + long newUsedBytes = reserve(length); boolean reservedBytes = false; try { if (newUsedBytes < 0) { @@ -423,7 +487,7 @@ public void run() { IOUtils.closeQuietly(metaIn); if (!success) { if (reservedBytes) { - usedBytesCount.release(length); + release(length); } LOG.debug("Caching of {} was aborted. We are now caching only {} " + "bytes in total.", key, usedBytesCount.get()); @@ -502,8 +566,7 @@ public void run() { synchronized (FsDatasetCache.this) { mappableBlockMap.remove(key); } - long newUsedBytes = - usedBytesCount.release(value.mappableBlock.getLength()); + long newUsedBytes = release(value.mappableBlock.getLength()); numBlocksCached.addAndGet(-1); dataset.datanode.getMetrics().incrBlocksUncached(1); if (revocationTimeMs != 0) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 46c488fd3ea..167f0d76975 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -321,8 +321,18 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) cacheManager = new FsDatasetCache(this); // Start the lazy writer once we have built the replica maps. - lazyWriter = new Daemon(new LazyWriter(conf)); - lazyWriter.start(); + // We need to start the lazy writer even if MaxLockedMemory is set to + // zero because we may have un-persisted replicas in memory from before + // the process restart. To minimize the chances of data loss we'll + // ensure they get written to disk now. + if (ramDiskReplicaTracker.numReplicasNotPersisted() > 0 || + datanode.getDnConf().getMaxLockedMemory() > 0) { + lazyWriter = new Daemon(new LazyWriter(conf)); + lazyWriter.start(); + } else { + lazyWriter = null; + } + registerMBean(datanode.getDatanodeUuid()); // Add a Metrics2 Source Interface. This is same @@ -1286,26 +1296,33 @@ public synchronized ReplicaHandler createRbw( " and thus cannot be created."); } // create a new block - FsVolumeReference ref; - while (true) { + FsVolumeReference ref = null; + + // Use ramdisk only if block size is a multiple of OS page size. + // This simplifies reservation for partially used replicas + // significantly. + if (allowLazyPersist && + lazyWriter != null && + b.getNumBytes() % cacheManager.getOsPageSize() == 0 && + (cacheManager.reserve(b.getNumBytes())) > 0) { try { - if (allowLazyPersist) { - // First try to place the block on a transient volume. - ref = volumes.getNextTransientVolume(b.getNumBytes()); - datanode.getMetrics().incrRamDiskBlocksWrite(); - } else { - ref = volumes.getNextVolume(storageType, b.getNumBytes()); + // First try to place the block on a transient volume. + ref = volumes.getNextTransientVolume(b.getNumBytes()); + datanode.getMetrics().incrRamDiskBlocksWrite(); + } catch(DiskOutOfSpaceException de) { + // Ignore the exception since we just fall back to persistent storage. + datanode.getMetrics().incrRamDiskBlocksWriteFallback(); + } finally { + if (ref == null) { + cacheManager.release(b.getNumBytes()); } - } catch (DiskOutOfSpaceException de) { - if (allowLazyPersist) { - datanode.getMetrics().incrRamDiskBlocksWriteFallback(); - allowLazyPersist = false; - continue; - } - throw de; } - break; } + + if (ref == null) { + ref = volumes.getNextVolume(storageType, b.getNumBytes()); + } + FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); // create an rbw file to hold block in the designated volume File f; @@ -1566,7 +1583,11 @@ private synchronized FinalizedReplica finalizeReplica(String bpid, newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile()); if (v.isTransientStorage()) { - ramDiskReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v); + releaseLockedMemory( + replicaInfo.getOriginalBytesReserved() - replicaInfo.getNumBytes(), + false); + ramDiskReplicaTracker.addReplica( + bpid, replicaInfo.getBlockId(), v, replicaInfo.getNumBytes()); datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes()); } } @@ -1813,9 +1834,7 @@ static void checkReplicaFiles(final ReplicaInfo r) throws IOException { } /** - * We're informed that a block is no longer valid. We - * could lazily garbage-collect the block, but why bother? - * just get rid of it. + * We're informed that a block is no longer valid. Delete it. */ @Override // FsDatasetSpi public void invalidate(String bpid, Block invalidBlks[]) throws IOException { @@ -2066,8 +2085,10 @@ void registerMBean(final String datanodeUuid) { public void shutdown() { fsRunning = false; - ((LazyWriter) lazyWriter.getRunnable()).stop(); - lazyWriter.interrupt(); + if (lazyWriter != null) { + ((LazyWriter) lazyWriter.getRunnable()).stop(); + lazyWriter.interrupt(); + } if (mbeanName != null) { MBeans.unregister(mbeanName); @@ -2085,11 +2106,13 @@ public void shutdown() { volumes.shutdown(); } - try { - lazyWriter.join(); - } catch (InterruptedException ie) { - LOG.warn("FsDatasetImpl.shutdown ignoring InterruptedException " + - "from LazyWriter.join"); + if (lazyWriter != null) { + try { + lazyWriter.join(); + } catch (InterruptedException ie) { + LOG.warn("FsDatasetImpl.shutdown ignoring InterruptedException " + + "from LazyWriter.join"); + } } } @@ -2175,7 +2198,11 @@ public void checkAndUpdate(String bpid, long blockId, File diskFile, diskFile.length(), diskGS, vol, diskFile.getParentFile()); volumeMap.add(bpid, diskBlockInfo); if (vol.isTransientStorage()) { - ramDiskReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol); + long lockedBytesReserved = + cacheManager.reserve(diskBlockInfo.getNumBytes()) > 0 ? + diskBlockInfo.getNumBytes() : 0; + ramDiskReplicaTracker.addReplica( + bpid, blockId, (FsVolumeImpl) vol, lockedBytesReserved); } LOG.warn("Added missing block to memory " + diskBlockInfo); return; @@ -2761,12 +2788,14 @@ private void setupAsyncLazyPersistThread(final FsVolumeImpl v) { boolean ramDiskConfigured = ramDiskConfigured(); // Add thread for DISK volume if RamDisk is configured if (ramDiskConfigured && + asyncLazyPersistService != null && !asyncLazyPersistService.queryVolume(v.getCurrentDir())) { asyncLazyPersistService.addVolume(v.getCurrentDir()); } // Remove thread for DISK volume if RamDisk is not configured if (!ramDiskConfigured && + asyncLazyPersistService != null && asyncLazyPersistService.queryVolume(v.getCurrentDir())) { asyncLazyPersistService.removeVolume(v.getCurrentDir()); } @@ -2791,9 +2820,10 @@ private void removeOldReplica(ReplicaInfo replicaInfo, // Remove the old replicas if (blockFile.delete() || !blockFile.exists()) { - ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, blockFileUsed); + FsVolumeImpl volume = (FsVolumeImpl) replicaInfo.getVolume(); + volume.onBlockFileDeletion(bpid, blockFileUsed); if (metaFile.delete() || !metaFile.exists()) { - ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, metaFileUsed); + volume.onMetaFileDeletion(bpid, metaFileUsed); } } @@ -2906,8 +2936,8 @@ private boolean transientFreeSpaceBelowThreshold() throws IOException { } /** - * Attempt to evict one or more transient block replicas we have at least - * spaceNeeded bytes free. + * Attempt to evict one or more transient block replicas until we + * have at least spaceNeeded bytes free. */ private void evictBlocks() throws IOException { int iterations = 0; @@ -3057,5 +3087,13 @@ private void addDeletingBlock(String bpid, Long blockId) { s.add(blockId); } } + + void releaseLockedMemory(long count, boolean roundup) { + if (roundup) { + cacheManager.release(count); + } else { + cacheManager.releaseRoundDown(count); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 34bfa007529..2f6b1c8046f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -274,7 +274,18 @@ File getTmpDir(String bpid) throws IOException { return getBlockPoolSlice(bpid).getTmpDir(); } - void decDfsUsed(String bpid, long value) { + void onBlockFileDeletion(String bpid, long value) { + decDfsUsed(bpid, value); + if (isTransientStorage()) { + dataset.releaseLockedMemory(value, true); + } + } + + void onMetaFileDeletion(String bpid, long value) { + decDfsUsed(bpid, value); + } + + private void decDfsUsed(String bpid, long value) { synchronized(dataset) { BlockPoolSlice bp = bpSlices.get(bpid); if (bp != null) { @@ -428,6 +439,13 @@ public void releaseReservedSpace(long bytesToRelease) { } } + @Override + public void releaseLockedMemory(long bytesToRelease) { + if (isTransientStorage()) { + dataset.releaseLockedMemory(bytesToRelease, false); + } + } + private enum SubdirFilter implements FilenameFilter { INSTANCE; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java index c01a6cf3772..b940736ccfd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java @@ -38,8 +38,10 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker { private class RamDiskReplicaLru extends RamDiskReplica { long lastUsedTime; - private RamDiskReplicaLru(String bpid, long blockId, FsVolumeImpl ramDiskVolume) { - super(bpid, blockId, ramDiskVolume); + private RamDiskReplicaLru(String bpid, long blockId, + FsVolumeImpl ramDiskVolume, + long lockedBytesReserved) { + super(bpid, blockId, ramDiskVolume, lockedBytesReserved); } @Override @@ -70,20 +72,23 @@ public boolean equals(Object other) { TreeMultimap replicasPersisted; RamDiskReplicaLruTracker() { - replicaMaps = new HashMap>(); - replicasNotPersisted = new LinkedList(); + replicaMaps = new HashMap<>(); + replicasNotPersisted = new LinkedList<>(); replicasPersisted = TreeMultimap.create(); } @Override synchronized void addReplica(final String bpid, final long blockId, - final FsVolumeImpl transientVolume) { + final FsVolumeImpl transientVolume, + long lockedBytesReserved) { Map map = replicaMaps.get(bpid); if (map == null) { - map = new HashMap(); + map = new HashMap<>(); replicaMaps.put(bpid, map); } - RamDiskReplicaLru ramDiskReplicaLru = new RamDiskReplicaLru(bpid, blockId, transientVolume); + RamDiskReplicaLru ramDiskReplicaLru = + new RamDiskReplicaLru(bpid, blockId, transientVolume, + lockedBytesReserved); map.put(blockId, ramDiskReplicaLru); replicasNotPersisted.add(ramDiskReplicaLru); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java index 75079254d36..335ed703abd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java @@ -45,6 +45,7 @@ static class RamDiskReplica implements Comparable { private final long blockId; private File savedBlockFile; private File savedMetaFile; + private long lockedBytesReserved; private long creationTime; protected AtomicLong numReads = new AtomicLong(0); @@ -61,10 +62,12 @@ static class RamDiskReplica implements Comparable { FsVolumeImpl lazyPersistVolume; RamDiskReplica(final String bpid, final long blockId, - final FsVolumeImpl ramDiskVolume) { + final FsVolumeImpl ramDiskVolume, + long lockedBytesReserved) { this.bpid = bpid; this.blockId = blockId; this.ramDiskVolume = ramDiskVolume; + this.lockedBytesReserved = lockedBytesReserved; lazyPersistVolume = null; savedMetaFile = null; savedBlockFile = null; @@ -168,6 +171,10 @@ public int compareTo(RamDiskReplica other) { public String toString() { return "[BlockPoolID=" + bpid + "; BlockId=" + blockId + "]"; } + + public long getLockedBytesReserved() { + return lockedBytesReserved; + } } /** @@ -201,7 +208,8 @@ void initialize(final FsDatasetImpl fsDataset) { * @param transientVolume RAM disk volume that stores the replica. */ abstract void addReplica(final String bpid, final long blockId, - final FsVolumeImpl transientVolume); + final FsVolumeImpl transientVolume, + long lockedBytesReserved); /** * Invoked when a replica is opened by a client. This may be used as diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 6674dca2241..5388688da40 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -1583,7 +1583,7 @@ private void finalizeNamenode(NameNode nn, Configuration conf) throws Exception throw new IllegalStateException("Attempting to finalize " + "Namenode but it is not running"); } - ToolRunner.run(new DFSAdmin(conf), new String[] {"-finalizeUpgrade"}); + ToolRunner.run(new DFSAdmin(conf), new String[]{"-finalizeUpgrade"}); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 5ee73d4d1af..4b7d1fc5e8a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.server.balancer.Balancer.Result; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; @@ -120,13 +121,16 @@ static void initConf(Configuration conf) { conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L); } - static void initConfWithRamDisk(Configuration conf) { + static void initConfWithRamDisk(Configuration conf, + long ramDiskCapacity) { conf.setLong(DFS_BLOCK_SIZE_KEY, DEFAULT_RAM_DISK_BLOCK_SIZE); + conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, ramDiskCapacity); conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC, 3); conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1); conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES, DEFAULT_RAM_DISK_BLOCK_SIZE); + LazyPersistTestCase.initCacheManipulator(); } /* create a file with a length of fileLen */ @@ -1247,7 +1251,6 @@ public void testBalancerWithRamDisk() throws Exception { final int SEED = 0xFADED; final short REPL_FACT = 1; Configuration conf = new Configuration(); - initConfWithRamDisk(conf); final int defaultRamDiskCapacity = 10; final long ramDiskStorageLimit = @@ -1257,6 +1260,8 @@ public void testBalancerWithRamDisk() throws Exception { ((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) + (DEFAULT_RAM_DISK_BLOCK_SIZE - 1); + initConfWithRamDisk(conf, ramDiskStorageLimit); + cluster = new MiniDFSCluster .Builder(conf) .numDataNodes(1) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 2ac94165abd..778dd2804c1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -491,6 +491,10 @@ public boolean isTransientStorage() { public void reserveSpaceForRbw(long bytesToReserve) { } + @Override + public void releaseLockedMemory(long bytesToRelease) { + } + @Override public void releaseReservedSpace(long bytesToRelease) { } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index e0b821aac24..68152fbfb39 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Test; @@ -79,6 +80,8 @@ public class TestDirectoryScanner { CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_LENGTH); CONF.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1); CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); + CONF.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, + Long.MAX_VALUE); } /** create a file with a length of fileLen */ @@ -308,6 +311,7 @@ private void scan(long totalBlocks, int diffsize, long missingMetaFile, long mis @Test (timeout=300000) public void testRetainBlockOnPersistentStorage() throws Exception { + LazyPersistTestCase.initCacheManipulator(); cluster = new MiniDFSCluster .Builder(CONF) .storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT }) @@ -349,6 +353,7 @@ public void testRetainBlockOnPersistentStorage() throws Exception { @Test (timeout=300000) public void testDeleteBlockOnTransientStorage() throws Exception { + LazyPersistTestCase.initCacheManipulator(); cluster = new MiniDFSCluster .Builder(CONF) .storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT }) @@ -614,6 +619,10 @@ public boolean isTransientStorage() { return false; } + @Override + public void releaseLockedMemory(long bytesToRelease) { + } + @Override public BlockIterator newBlockIterator(String bpid, String name) { throw new UnsupportedOperationException(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java index 7a09630be10..58932fbda1c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java @@ -339,7 +339,7 @@ public void testFilesExceedMaxLockedMemory() throws Exception { for (int i=0; i + * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + + +import com.google.common.base.Supplier; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSOutputStream; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.MetricsAsserts; +import org.junit.Test; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.fs.CreateFlag.CREATE; +import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST; +import static org.apache.hadoop.fs.StorageType.DEFAULT; +import static org.apache.hadoop.fs.StorageType.RAM_DISK; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +/** + * Verify that locked memory is used correctly when writing to replicas in + * memory + */ +public class TestLazyPersistLockedMemory extends LazyPersistTestCase { + + /** + * RAM disk present but locked memory is set to zero. Placement should + * fall back to disk. + */ + @Test + public void testWithNoLockedMemory() throws IOException { + getClusterBuilder().setNumDatanodes(1) + .setMaxLockedMemory(0).build(); + + final String METHOD_NAME = GenericTestUtils.getMethodName(); + Path path = new Path("/" + METHOD_NAME + ".dat"); + makeTestFile(path, BLOCK_SIZE, true); + ensureFileReplicasOnStorageType(path, DEFAULT); + } + + @Test + public void testReservation() + throws IOException, TimeoutException, InterruptedException { + getClusterBuilder().setNumDatanodes(1) + .setMaxLockedMemory(BLOCK_SIZE).build(); + final String METHOD_NAME = GenericTestUtils.getMethodName(); + final FsDatasetSpi fsd = cluster.getDataNodes().get(0).getFSDataset(); + + // Create a file and ensure the replica in RAM_DISK uses locked memory. + Path path = new Path("/" + METHOD_NAME + ".dat"); + makeTestFile(path, BLOCK_SIZE, true); + ensureFileReplicasOnStorageType(path, RAM_DISK); + assertThat(fsd.getCacheUsed(), is((long) BLOCK_SIZE)); + } + + @Test + public void testReleaseOnFileDeletion() + throws IOException, TimeoutException, InterruptedException { + getClusterBuilder().setNumDatanodes(1) + .setMaxLockedMemory(BLOCK_SIZE).build(); + final String METHOD_NAME = GenericTestUtils.getMethodName(); + final FsDatasetSpi fsd = cluster.getDataNodes().get(0).getFSDataset(); + + Path path = new Path("/" + METHOD_NAME + ".dat"); + makeTestFile(path, BLOCK_SIZE, true); + ensureFileReplicasOnStorageType(path, RAM_DISK); + assertThat(fsd.getCacheUsed(), is((long) BLOCK_SIZE)); + + // Delete the file and ensure that the locked memory is released. + fs.delete(path, false); + DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0)); + waitForLockedBytesUsed(fsd, 0); + } + + /** + * Verify that locked RAM is released when blocks are evicted from RAM disk. + */ + @Test + public void testReleaseOnEviction() + throws IOException, TimeoutException, InterruptedException { + getClusterBuilder().setNumDatanodes(1) + .setMaxLockedMemory(BLOCK_SIZE) + .setRamDiskReplicaCapacity(BLOCK_SIZE * 2 - 1) + .build(); + final String METHOD_NAME = GenericTestUtils.getMethodName(); + final FsDatasetSpi fsd = cluster.getDataNodes().get(0).getFSDataset(); + + Path path = new Path("/" + METHOD_NAME + ".dat"); + makeTestFile(path, BLOCK_SIZE, true); + + // The block should get evicted soon since it pushes RAM disk free + // space below the threshold. + waitForLockedBytesUsed(fsd, 0); + + MetricsRecordBuilder rb = + MetricsAsserts.getMetrics(cluster.getDataNodes().get(0).getMetrics().name()); + MetricsAsserts.assertCounter("RamDiskBlocksEvicted", 1L, rb); + } + + /** + * Verify that locked bytes are correctly updated when a block is finalized + * at less than its max length. + */ + @Test + public void testShortBlockFinalized() + throws IOException, TimeoutException, InterruptedException { + getClusterBuilder().setNumDatanodes(1).build(); + final String METHOD_NAME = GenericTestUtils.getMethodName(); + final FsDatasetSpi fsd = cluster.getDataNodes().get(0).getFSDataset(); + + Path path = new Path("/" + METHOD_NAME + ".dat"); + makeTestFile(path, 1, true); + assertThat(fsd.getCacheUsed(), is(osPageSize)); + + // Delete the file and ensure locked RAM usage goes to zero. + fs.delete(path, false); + waitForLockedBytesUsed(fsd, 0); + } + + /** + * Verify that locked bytes are correctly updated when the client goes + * away unexpectedly during a write. + */ + @Test + public void testWritePipelineFailure() + throws IOException, TimeoutException, InterruptedException { + getClusterBuilder().setNumDatanodes(1).build(); + final String METHOD_NAME = GenericTestUtils.getMethodName(); + final FsDatasetSpi fsd = cluster.getDataNodes().get(0).getFSDataset(); + + Path path = new Path("/" + METHOD_NAME + ".dat"); + + EnumSet createFlags = EnumSet.of(CREATE, LAZY_PERSIST); + // Write 1 byte to the file and kill the writer. + final FSDataOutputStream fos = + fs.create(path, + FsPermission.getFileDefault(), + createFlags, + BUFFER_LENGTH, + REPL_FACTOR, + BLOCK_SIZE, + null); + + fos.write(new byte[1]); + fos.hsync(); + DFSTestUtil.abortStream((DFSOutputStream) fos.getWrappedStream()); + waitForLockedBytesUsed(fsd, osPageSize); + + // Delete the file and ensure locked RAM goes to zero. + fs.delete(path, false); + DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0)); + waitForLockedBytesUsed(fsd, 0); + } + + /** + * Wait until used locked byte count goes to the expected value. + * @throws TimeoutException after 300 seconds. + */ + private void waitForLockedBytesUsed(final FsDatasetSpi fsd, + final long expectedLockedBytes) + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + long cacheUsed = fsd.getCacheUsed(); + LOG.info("cacheUsed=" + cacheUsed + ", waiting for it to be " + expectedLockedBytes); + if (cacheUsed < 0) { + throw new IllegalStateException("cacheUsed unpexpectedly negative"); + } + return (cacheUsed == expectedLockedBytes); + } + }, 1000, 300000); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java index d5664cf4c91..a77184b831c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java @@ -204,7 +204,7 @@ private void testAppend(String bpid, FsDatasetImpl dataSet, ExtendedBlock[] bloc long available = v.getCapacity()-v.getDfsUsed(); long expectedLen = blocks[FINALIZED].getNumBytes(); try { - v.decDfsUsed(bpid, -available); + v.onBlockFileDeletion(bpid, -available); blocks[FINALIZED].setNumBytes(expectedLen+100); dataSet.append(blocks[FINALIZED], newGS, expectedLen); Assert.fail("Should not have space to append to an RWR replica" + blocks[RWR]); @@ -212,7 +212,7 @@ private void testAppend(String bpid, FsDatasetImpl dataSet, ExtendedBlock[] bloc Assert.assertTrue(e.getMessage().startsWith( "Insufficient space for appending to ")); } - v.decDfsUsed(bpid, available); + v.onBlockFileDeletion(bpid, available); blocks[FINALIZED].setNumBytes(expectedLen); newGS = blocks[RBW].getGenerationStamp()+1;