From bf1b84abe1a6ee5a740978b9e37b276ed61567b6 Mon Sep 17 00:00:00 2001 From: arp Date: Sat, 20 Sep 2014 13:25:23 -0700 Subject: [PATCH] HDFS-7100. Make eviction scheme pluggable. (Arpit Agarwal) Conflicts: hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 3 + .../hadoop/hdfs/server/datanode/DataNode.java | 3 +- .../fsdataset/impl/BlockPoolSlice.java | 11 +- .../fsdataset/impl/FsDatasetImpl.java | 97 ++++--- .../datanode/fsdataset/impl/FsVolumeImpl.java | 14 +- .../datanode/fsdataset/impl/FsVolumeList.java | 4 +- .../impl/LazyWriteReplicaTracker.java | 268 ------------------ .../impl/RamDiskReplicaLruTracker.java | 208 ++++++++++++++ .../fsdataset/impl/RamDiskReplicaTracker.java | 245 ++++++++++++++++ .../fsdataset/impl/FsDatasetTestUtil.java | 2 +- .../fsdataset/impl/TestLazyPersistFiles.java | 49 ++-- 11 files changed, 557 insertions(+), 347 deletions(-) delete mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index f5bba8656e6..b5c7fed0528 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker; import org.apache.hadoop.hdfs.web.AuthFilter; import org.apache.hadoop.http.HttpConfig; @@ -129,6 +130,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4; public static final String DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC = "dfs.datanode.lazywriter.interval.sec"; public static final int DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60; + public static final String DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY = "dfs.datanode.ram.disk.replica.tracker"; + public static final Class DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_DEFAULT = RamDiskReplicaLruTracker.class; public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT = "dfs.datanode.ram.disk.low.watermark.percent"; public static final int DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT = 10; public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS = "dfs.datanode.ram.disk.low.watermark.replicas"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index bf098995cdf..f10be62aa8a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2088,7 +2088,8 @@ public class DataNode extends ReconfigurableBase LOG.warn("Cannot find BPOfferService for reporting block received for bpid=" + block.getBlockPoolId()); } - if (blockScanner != null) { + FsVolumeSpi volume = getFSDataset().getVolume(block); + if (blockScanner != null && !volume.isTransientStorage()) { blockScanner.addBlock(block); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 06d60b1e52b..bfa1772dd42 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -292,9 +292,9 @@ class BlockPoolSlice { * Move a persisted replica from lazypersist directory to a subdirectory * under finalized. */ - File activateSavedReplica(Block b, File blockFile) throws IOException { + File activateSavedReplica(Block b, File metaFile, File blockFile) + throws IOException { final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId()); - final File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp()); final File targetBlockFile = new File(blockDir, blockFile.getName()); final File targetMetaFile = new File(blockDir, metaFile.getName()); FileUtils.moveFile(blockFile, targetBlockFile); @@ -313,7 +313,7 @@ class BlockPoolSlice { void getVolumeMap(ReplicaMap volumeMap, - final LazyWriteReplicaTracker lazyWriteReplicaMap) + final RamDiskReplicaTracker lazyWriteReplicaMap) throws IOException { // Recover lazy persist replicas, they will be added to the volumeMap // when we scan the finalized directory. @@ -410,7 +410,7 @@ class BlockPoolSlice { * false if the directory has rbw replicas */ void addToReplicasMap(ReplicaMap volumeMap, File dir, - final LazyWriteReplicaTracker lazyWriteReplicaMap, + final RamDiskReplicaTracker lazyWriteReplicaMap, boolean isFinalized) throws IOException { File files[] = FileUtil.listFiles(dir); @@ -487,7 +487,8 @@ class BlockPoolSlice { // it is in the lazyWriteReplicaMap so it can be persisted // eventually. if (newReplica.getVolume().isTransientStorage()) { - lazyWriteReplicaMap.addReplica(bpid, blockId, newReplica.getVolume()); + lazyWriteReplicaMap.addReplica(bpid, blockId, + (FsVolumeImpl) newReplica.getVolume()); } else { lazyWriteReplicaMap.discardReplica(bpid, blockId, false); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 83d94a36d19..11322a97dcd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -30,7 +30,6 @@ import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -88,6 +87,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; +import static org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -159,7 +159,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public synchronized Block getStoredBlock(String bpid, long blkid) throws IOException { - File blockfile = getFile(bpid, blkid); + File blockfile = getFile(bpid, blkid, false); if (blockfile == null) { return null; } @@ -219,7 +219,7 @@ class FsDatasetImpl implements FsDatasetSpi { private volatile boolean fsRunning; final ReplicaMap volumeMap; - final LazyWriteReplicaTracker lazyWriteReplicaTracker; + final RamDiskReplicaTracker ramDiskReplicaTracker; private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3; @@ -263,7 +263,7 @@ class FsDatasetImpl implements FsDatasetSpi { storageMap = new ConcurrentHashMap(); volumeMap = new ReplicaMap(this); - lazyWriteReplicaTracker = new LazyWriteReplicaTracker(this); + ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this); @SuppressWarnings("unchecked") final VolumeChoosingPolicy blockChooserImpl = @@ -298,7 +298,7 @@ class FsDatasetImpl implements FsDatasetSpi { FsVolumeImpl fsVolume = FsVolumeImplAllocator.createVolume( this, sd.getStorageUuid(), dir, this.conf, storageType); ReplicaMap tempVolumeMap = new ReplicaMap(this); - fsVolume.getVolumeMap(volumeMap, lazyWriteReplicaTracker); + fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker); volumeMap.addAll(tempVolumeMap); volumes.addVolume(fsVolume); @@ -326,7 +326,7 @@ class FsDatasetImpl implements FsDatasetSpi { for (final String bpid : bpids) { try { fsVolume.addBlockPool(bpid, this.conf); - fsVolume.getVolumeMap(bpid, tempVolumeMap); + fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker); } catch (IOException e) { LOG.warn("Caught exception when adding " + fsVolume + ". Will throw later.", e); @@ -586,12 +586,16 @@ class FsDatasetImpl implements FsDatasetSpi { * checking that it exists. This should be used when the * next operation is going to open the file for read anyway, * and thus the exists check is redundant. + * + * @param touch if true then update the last access timestamp of the + * block. Currently used for blocks on transient storage. */ - private File getBlockFileNoExistsCheck(ExtendedBlock b) + private File getBlockFileNoExistsCheck(ExtendedBlock b, + boolean touch) throws IOException { final File f; synchronized(this) { - f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId()); + f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId(), touch); } if (f == null) { throw new IOException("Block " + b + " is not valid"); @@ -602,7 +606,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public InputStream getBlockInputStream(ExtendedBlock b, long seekOffset) throws IOException { - File blockFile = getBlockFileNoExistsCheck(b); + File blockFile = getBlockFileNoExistsCheck(b, true); if (isNativeIOAvailable) { return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset); } else { @@ -1240,7 +1244,7 @@ class FsDatasetImpl implements FsDatasetSpi { newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile()); if (v.isTransientStorage()) { - lazyWriteReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v); + ramDiskReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v); } } volumeMap.add(bpid, newReplicaInfo); @@ -1265,7 +1269,7 @@ class FsDatasetImpl implements FsDatasetSpi { LOG.warn("Block " + b + " unfinalized and removed. " ); } if (replicaInfo.getVolume().isTransientStorage()) { - lazyWriteReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true); + ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true); } } } @@ -1411,7 +1415,7 @@ class FsDatasetImpl implements FsDatasetSpi { //Should we check for metadata file too? final File f; synchronized(this) { - f = getFile(bpid, blockId); + f = getFile(bpid, blockId, false); } if(f != null ) { @@ -1496,7 +1500,7 @@ class FsDatasetImpl implements FsDatasetSpi { } if (v.isTransientStorage()) { - lazyWriteReplicaTracker.discardReplica(bpid, invalidBlks[i].getBlockId(), true); + ramDiskReplicaTracker.discardReplica(bpid, invalidBlks[i].getBlockId(), true); } // If a DFSClient has the replica in its cache of short-circuit file @@ -1628,7 +1632,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public synchronized boolean contains(final ExtendedBlock block) { final long blockId = block.getLocalBlock().getBlockId(); - return getFile(block.getBlockPoolId(), blockId) != null; + return getFile(block.getBlockPoolId(), blockId, false) != null; } /** @@ -1637,9 +1641,12 @@ class FsDatasetImpl implements FsDatasetSpi { * @param blockId a block's id * @return on disk data file path; null if the replica does not exist */ - File getFile(final String bpid, final long blockId) { + File getFile(final String bpid, final long blockId, boolean touch) { ReplicaInfo info = volumeMap.get(bpid, blockId); if (info != null) { + if (touch && info.getVolume().isTransientStorage()) { + ramDiskReplicaTracker.touch(bpid, blockId); + } return info.getBlockFile(); } return null; @@ -1808,7 +1815,7 @@ class FsDatasetImpl implements FsDatasetSpi { blockScanner.deleteBlock(bpid, new Block(blockId)); } if (vol.isTransientStorage()) { - lazyWriteReplicaTracker.discardReplica(bpid, blockId, true); + ramDiskReplicaTracker.discardReplica(bpid, blockId, true); } LOG.warn("Removed block " + blockId + " from memory with missing block file on the disk"); @@ -1830,11 +1837,12 @@ class FsDatasetImpl implements FsDatasetSpi { diskFile.length(), diskGS, vol, diskFile.getParentFile()); volumeMap.add(bpid, diskBlockInfo); final DataBlockScanner blockScanner = datanode.getBlockScanner(); - if (blockScanner != null) { - blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo)); - } - if (vol.isTransientStorage()) { - lazyWriteReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol); + if (!vol.isTransientStorage()) { + if (blockScanner != null) { + blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo)); + } + } else { + ramDiskReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol); } LOG.warn("Added missing block to memory " + diskBlockInfo); return; @@ -2117,7 +2125,7 @@ class FsDatasetImpl implements FsDatasetSpi { volumes.addBlockPool(bpid, conf); volumeMap.initBlockPool(bpid); } - volumes.getAllVolumesMap(bpid, volumeMap, lazyWriteReplicaTracker); + volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker); } @Override @@ -2347,7 +2355,7 @@ class FsDatasetImpl implements FsDatasetSpi { LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid); } - lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume); + ramDiskReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume); bpSlice = targetVolume.getBlockPoolSlice(bpid); srcMeta = replicaInfo.getMetaFile(); srcFile = replicaInfo.getBlockFile(); @@ -2359,7 +2367,7 @@ class FsDatasetImpl implements FsDatasetSpi { bpSlice.lazyPersistReplica(blockId, genStamp, srcMeta, srcFile); synchronized (FsDatasetImpl.this) { - lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedFiles); + ramDiskReplicaTracker.recordEndLazyPersist(bpid, blockId, savedFiles); if (LOG.isDebugEnabled()) { LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid + @@ -2374,21 +2382,21 @@ class FsDatasetImpl implements FsDatasetSpi { * @return true if there is more work to be done, false otherwise. */ private boolean saveNextReplica() { - LazyWriteReplicaTracker.ReplicaState replicaState = null; + RamDiskReplica block = null; boolean succeeded = false; try { - replicaState = lazyWriteReplicaTracker.dequeueNextReplicaToPersist(); - if (replicaState != null) { - moveReplicaToNewVolume(replicaState.bpid, replicaState.blockId); + block = ramDiskReplicaTracker.dequeueNextReplicaToPersist(); + if (block != null) { + moveReplicaToNewVolume(block.getBlockPoolId(), block.getBlockId()); } succeeded = true; } catch(IOException ioe) { - LOG.warn("Exception saving replica " + replicaState, ioe); + LOG.warn("Exception saving replica " + block, ioe); } finally { - if (!succeeded && replicaState != null) { - LOG.warn("Failed to save replica " + replicaState + ". re-enqueueing it."); - lazyWriteReplicaTracker.reenqueueReplicaNotPersisted(replicaState); + if (!succeeded && block != null) { + LOG.warn("Failed to save replica " + block + ". re-enqueueing it."); + ramDiskReplicaTracker.reenqueueReplicaNotPersisted(block); } } @@ -2426,8 +2434,7 @@ class FsDatasetImpl implements FsDatasetSpi { while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION && transientFreeSpaceBelowThreshold()) { - LazyWriteReplicaTracker.ReplicaState replicaState = - lazyWriteReplicaTracker.getNextCandidateForEviction(); + RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidateForEviction(); if (replicaState == null) { break; @@ -2440,46 +2447,48 @@ class FsDatasetImpl implements FsDatasetSpi { ReplicaInfo replicaInfo, newReplicaInfo; File blockFile, metaFile; long blockFileUsed, metaFileUsed; + final String bpid = replicaState.getBlockPoolId(); synchronized (FsDatasetImpl.this) { - replicaInfo = getReplicaInfo(replicaState.bpid, replicaState.blockId); + replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(), replicaState.getBlockId()); Preconditions.checkState(replicaInfo.getVolume().isTransientStorage()); blockFile = replicaInfo.getBlockFile(); metaFile = replicaInfo.getMetaFile(); blockFileUsed = blockFile.length(); metaFileUsed = metaFile.length(); - lazyWriteReplicaTracker.discardReplica(replicaState, false); + ramDiskReplicaTracker.discardReplica(replicaState, false); // Move the replica from lazyPersist/ to finalized/ on target volume BlockPoolSlice bpSlice = - replicaState.lazyPersistVolume.getBlockPoolSlice(replicaState.bpid); + replicaState.getLazyPersistVolume().getBlockPoolSlice(bpid); File newBlockFile = bpSlice.activateSavedReplica( - replicaInfo, replicaState.savedBlockFile); + replicaInfo, replicaState.getSavedMetaFile(), + replicaState.getSavedBlockFile()); newReplicaInfo = new FinalizedReplica(replicaInfo.getBlockId(), replicaInfo.getBytesOnDisk(), replicaInfo.getGenerationStamp(), - replicaState.lazyPersistVolume, + replicaState.getLazyPersistVolume(), newBlockFile.getParentFile()); // Update the volumeMap entry. - volumeMap.add(replicaState.bpid, newReplicaInfo); + volumeMap.add(bpid, newReplicaInfo); } // Before deleting the files from transient storage we must notify the // NN that the files are on the new storage. Else a blockReport from // the transient storage might cause the NN to think the blocks are lost. ExtendedBlock extendedBlock = - new ExtendedBlock(replicaState.bpid, newReplicaInfo); + new ExtendedBlock(bpid, newReplicaInfo); datanode.notifyNamenodeReceivedBlock( extendedBlock, null, newReplicaInfo.getStorageUuid()); // Remove the old replicas from transient storage. if (blockFile.delete() || !blockFile.exists()) { - ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, blockFileUsed); + ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, blockFileUsed); if (metaFile.delete() || !metaFile.exists()) { - ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, metaFileUsed); + ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, metaFileUsed); } } @@ -2500,7 +2509,7 @@ class FsDatasetImpl implements FsDatasetSpi { // Sleep if we have no more work to do or if it looks like we are not // making any forward progress. This is to ensure that if all persist // operations are failing we don't keep retrying them in a tight loop. - if (numSuccessiveFailures >= lazyWriteReplicaTracker.numReplicasNotPersisted()) { + if (numSuccessiveFailures >= ramDiskReplicaTracker.numReplicasNotPersisted()) { Thread.sleep(checkpointerInterval * 1000); numSuccessiveFailures = 0; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index e2efb2ff2c3..098ad091b44 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -235,9 +235,6 @@ public class FsVolumeImpl implements FsVolumeSpi { @Override public void reserveSpaceForRbw(long bytesToReserve) { if (bytesToReserve != 0) { - if (FsDatasetImpl.LOG.isDebugEnabled()) { - FsDatasetImpl.LOG.debug("Reserving " + bytesToReserve + " on volume " + getBasePath()); - } reservedForRbw.addAndGet(bytesToReserve); } } @@ -245,9 +242,6 @@ public class FsVolumeImpl implements FsVolumeSpi { @Override public void releaseReservedSpace(long bytesToRelease) { if (bytesToRelease != 0) { - if (FsDatasetImpl.LOG.isDebugEnabled()) { - FsDatasetImpl.LOG.debug("Releasing " + bytesToRelease + " on volume " + getBasePath()); - } long oldReservation, newReservation; do { @@ -298,17 +292,17 @@ public class FsVolumeImpl implements FsVolumeSpi { } void getVolumeMap(ReplicaMap volumeMap, - final LazyWriteReplicaTracker lazyWriteReplicaMap) + final RamDiskReplicaTracker ramDiskReplicaMap) throws IOException { for(BlockPoolSlice s : bpSlices.values()) { - s.getVolumeMap(volumeMap, lazyWriteReplicaMap); + s.getVolumeMap(volumeMap, ramDiskReplicaMap); } } void getVolumeMap(String bpid, ReplicaMap volumeMap, - final LazyWriteReplicaTracker lazyWriteReplicaMap) + final RamDiskReplicaTracker ramDiskReplicaMap) throws IOException { - getBlockPoolSlice(bpid).getVolumeMap(volumeMap, lazyWriteReplicaMap); + getBlockPoolSlice(bpid).getVolumeMap(volumeMap, ramDiskReplicaMap); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index 82fe35f04b5..837ddf720af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -121,7 +121,7 @@ class FsVolumeList { void getAllVolumesMap(final String bpid, final ReplicaMap volumeMap, - final LazyWriteReplicaTracker lazyWriteReplicaMap) + final RamDiskReplicaTracker ramDiskReplicaMap) throws IOException { long totalStartTime = Time.monotonicNow(); final List exceptions = Collections.synchronizedList( @@ -134,7 +134,7 @@ class FsVolumeList { FsDatasetImpl.LOG.info("Adding replicas to map for block pool " + bpid + " on volume " + v + "..."); long startTime = Time.monotonicNow(); - v.getVolumeMap(bpid, volumeMap, lazyWriteReplicaMap); + v.getVolumeMap(bpid, volumeMap, ramDiskReplicaMap); long timeTaken = Time.monotonicNow() - startTime; FsDatasetImpl.LOG.info("Time to add replicas to map for block pool" + " " + bpid + " on volume " + v + ": " + timeTaken + "ms"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java deleted file mode 100644 index e8d9c5c63ee..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java +++ /dev/null @@ -1,268 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; - - -import com.google.common.collect.TreeMultimap; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; - -import java.io.File; -import java.util.*; - -class LazyWriteReplicaTracker { - - enum State { - IN_MEMORY, - LAZY_PERSIST_IN_PROGRESS, - LAZY_PERSIST_COMPLETE, - } - - static class ReplicaState implements Comparable { - - final String bpid; - final long blockId; - State state; - - /** - * transient storage volume that holds the original replica. - */ - final FsVolumeSpi transientVolume; - - /** - * Persistent volume that holds or will hold the saved replica. - */ - FsVolumeImpl lazyPersistVolume; - File savedMetaFile; - File savedBlockFile; - - ReplicaState(final String bpid, final long blockId, FsVolumeSpi transientVolume) { - this.bpid = bpid; - this.blockId = blockId; - this.transientVolume = transientVolume; - state = State.IN_MEMORY; - lazyPersistVolume = null; - savedMetaFile = null; - savedBlockFile = null; - } - - void deleteSavedFiles() { - try { - if (savedBlockFile != null) { - savedBlockFile.delete(); - savedBlockFile = null; - } - - if (savedMetaFile != null) { - savedMetaFile.delete(); - savedMetaFile = null; - } - } catch (Throwable t) { - // Ignore any exceptions. - } - } - - @Override - public String toString() { - return "[Bpid=" + bpid + ";blockId=" + blockId + "]"; - } - - @Override - public int hashCode() { - return bpid.hashCode() ^ (int) blockId; - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - - if (other == null || getClass() != other.getClass()) { - return false; - } - - ReplicaState otherState = (ReplicaState) other; - return (otherState.bpid.equals(bpid) && otherState.blockId == blockId); - } - - @Override - public int compareTo(ReplicaState other) { - if (blockId == other.blockId) { - return 0; - } else if (blockId < other.blockId) { - return -1; - } else { - return 1; - } - } - } - - final FsDatasetImpl fsDataset; - - /** - * Map of blockpool ID to map of blockID to ReplicaInfo. - */ - final Map> replicaMaps; - - /** - * Queue of replicas that need to be written to disk. - * Stale entries are GC'd by dequeueNextReplicaToPersist. - */ - final Queue replicasNotPersisted; - - /** - * Queue of replicas in the order in which they were persisted. - * We'll dequeue them in the same order. - * We can improve the eviction scheme later. - * Stale entries are GC'd by getNextCandidateForEviction. - */ - final Queue replicasPersisted; - - LazyWriteReplicaTracker(final FsDatasetImpl fsDataset) { - this.fsDataset = fsDataset; - replicaMaps = new HashMap>(); - replicasNotPersisted = new LinkedList(); - replicasPersisted = new LinkedList(); - } - - synchronized void addReplica(String bpid, long blockId, - final FsVolumeSpi transientVolume) { - Map map = replicaMaps.get(bpid); - if (map == null) { - map = new HashMap(); - replicaMaps.put(bpid, map); - } - ReplicaState replicaState = new ReplicaState(bpid, blockId, transientVolume); - map.put(blockId, replicaState); - replicasNotPersisted.add(replicaState); - } - - synchronized void recordStartLazyPersist( - final String bpid, final long blockId, FsVolumeImpl checkpointVolume) { - Map map = replicaMaps.get(bpid); - ReplicaState replicaState = map.get(blockId); - replicaState.state = State.LAZY_PERSIST_IN_PROGRESS; - replicaState.lazyPersistVolume = checkpointVolume; - } - - /** - * @param bpid - * @param blockId - * @param savedFiles The saved meta and block files, in that order. - */ - synchronized void recordEndLazyPersist( - final String bpid, final long blockId, final File[] savedFiles) { - Map map = replicaMaps.get(bpid); - ReplicaState replicaState = map.get(blockId); - - if (replicaState == null) { - throw new IllegalStateException("Unknown replica bpid=" + - bpid + "; blockId=" + blockId); - } - replicaState.state = State.LAZY_PERSIST_COMPLETE; - replicaState.savedMetaFile = savedFiles[0]; - replicaState.savedBlockFile = savedFiles[1]; - - if (replicasNotPersisted.peek() == replicaState) { - // Common case. - replicasNotPersisted.remove(); - } else { - // Should never occur in practice as lazy writer always persists - // the replica at the head of the queue before moving to the next - // one. - replicasNotPersisted.remove(replicaState); - } - - replicasPersisted.add(replicaState); - } - - synchronized ReplicaState dequeueNextReplicaToPersist() { - while (replicasNotPersisted.size() != 0) { - ReplicaState replicaState = replicasNotPersisted.remove(); - Map replicaMap = replicaMaps.get(replicaState.bpid); - - if (replicaMap != null && replicaMap.get(replicaState.blockId) != null) { - return replicaState; - } - - // The replica no longer exists, look for the next one. - } - return null; - } - - synchronized void reenqueueReplicaNotPersisted(final ReplicaState replicaState) { - replicasNotPersisted.add(replicaState); - } - - synchronized void reenqueueReplicaPersisted(final ReplicaState replicaState) { - replicasPersisted.add(replicaState); - } - - synchronized int numReplicasNotPersisted() { - return replicasNotPersisted.size(); - } - - synchronized ReplicaState getNextCandidateForEviction() { - while (replicasPersisted.size() != 0) { - ReplicaState replicaState = replicasPersisted.remove(); - Map replicaMap = replicaMaps.get(replicaState.bpid); - - if (replicaMap != null && replicaMap.get(replicaState.blockId) != null) { - return replicaState; - } - - // The replica no longer exists, look for the next one. - } - return null; - } - - void discardReplica(ReplicaState replicaState, boolean deleteSavedCopies) { - discardReplica(replicaState.bpid, replicaState.blockId, deleteSavedCopies); - } - - /** - * Discard any state we are tracking for the given replica. This could mean - * the block is either deleted from the block space or the replica is no longer - * on transient storage. - * - * @param deleteSavedCopies true if we should delete the saved copies on - * persistent storage. This should be set by the - * caller when the block is no longer needed. - */ - synchronized void discardReplica( - final String bpid, final long blockId, - boolean deleteSavedCopies) { - Map map = replicaMaps.get(bpid); - - if (map == null) { - return; - } - - ReplicaState replicaState = map.get(blockId); - - if (replicaState == null) { - return; - } - - if (deleteSavedCopies) { - replicaState.deleteSavedFiles(); - } - map.remove(blockId); - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java new file mode 100644 index 00000000000..0899e703a96 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + + +import com.google.common.collect.TreeMultimap; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.io.File; +import java.util.*; + +/** + * An implementation of RamDiskReplicaTracker that uses an LRU + * eviction scheme. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker { + + private class RamDiskReplicaLru extends RamDiskReplica { + long lastUsedTime; + + private RamDiskReplicaLru(String bpid, long blockId, FsVolumeImpl ramDiskVolume) { + super(bpid, blockId, ramDiskVolume); + } + } + + /** + * Map of blockpool ID to . + */ + Map> replicaMaps; + + /** + * Queue of replicas that need to be written to disk. + * Stale entries are GC'd by dequeueNextReplicaToPersist. + */ + Queue replicasNotPersisted; + + /** + * Map of persisted replicas ordered by their last use times. + */ + TreeMultimap replicasPersisted; + + RamDiskReplicaLruTracker() { + replicaMaps = new HashMap>(); + replicasNotPersisted = new LinkedList(); + replicasPersisted = TreeMultimap.create(); + } + + @Override + synchronized void addReplica(final String bpid, final long blockId, + final FsVolumeImpl transientVolume) { + Map map = replicaMaps.get(bpid); + if (map == null) { + map = new HashMap(); + replicaMaps.put(bpid, map); + } + RamDiskReplicaLru ramDiskReplicaLru = new RamDiskReplicaLru(bpid, blockId, transientVolume); + map.put(blockId, ramDiskReplicaLru); + replicasNotPersisted.add(ramDiskReplicaLru); + } + + @Override + synchronized void touch(final String bpid, + final long blockId) { + Map map = replicaMaps.get(bpid); + RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId); + + if (ramDiskReplicaLru == null) { + return; + } + + // Reinsert the replica with its new timestamp. + if (replicasPersisted.remove(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru)) { + ramDiskReplicaLru.lastUsedTime = System.currentTimeMillis(); + replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru); + } + } + + @Override + synchronized void recordStartLazyPersist( + final String bpid, final long blockId, FsVolumeImpl checkpointVolume) { + Map map = replicaMaps.get(bpid); + RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId); + ramDiskReplicaLru.setLazyPersistVolume(checkpointVolume); + } + + @Override + synchronized void recordEndLazyPersist( + final String bpid, final long blockId, final File[] savedFiles) { + Map map = replicaMaps.get(bpid); + RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId); + + if (ramDiskReplicaLru == null) { + throw new IllegalStateException("Unknown replica bpid=" + + bpid + "; blockId=" + blockId); + } + ramDiskReplicaLru.recordSavedBlockFiles(savedFiles); + + if (replicasNotPersisted.peek() == ramDiskReplicaLru) { + // Common case. + replicasNotPersisted.remove(); + } else { + // Caller error? Fallback to O(n) removal. + replicasNotPersisted.remove(ramDiskReplicaLru); + } + + ramDiskReplicaLru.lastUsedTime = System.currentTimeMillis(); + replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru); + } + + @Override + synchronized RamDiskReplicaLru dequeueNextReplicaToPersist() { + while (replicasNotPersisted.size() != 0) { + RamDiskReplicaLru ramDiskReplicaLru = replicasNotPersisted.remove(); + Map replicaMap = + replicaMaps.get(ramDiskReplicaLru.getBlockPoolId()); + + if (replicaMap != null && replicaMap.get(ramDiskReplicaLru.getBlockId()) != null) { + return ramDiskReplicaLru; + } + + // The replica no longer exists, look for the next one. + } + return null; + } + + @Override + synchronized void reenqueueReplicaNotPersisted(final RamDiskReplica ramDiskReplicaLru) { + replicasNotPersisted.add((RamDiskReplicaLru) ramDiskReplicaLru); + } + + @Override + synchronized int numReplicasNotPersisted() { + return replicasNotPersisted.size(); + } + + @Override + synchronized RamDiskReplicaLru getNextCandidateForEviction() { + Iterator it = replicasPersisted.values().iterator(); + while (it.hasNext()) { + RamDiskReplicaLru ramDiskReplicaLru = (RamDiskReplicaLru) it.next(); + it.remove(); + + Map replicaMap = + replicaMaps.get(ramDiskReplicaLru.getBlockPoolId()); + + if (replicaMap != null && replicaMap.get(ramDiskReplicaLru.getBlockId()) != null) { + return ramDiskReplicaLru; + } + + // The replica no longer exists, look for the next one. + } + return null; + } + + /** + * Discard any state we are tracking for the given replica. This could mean + * the block is either deleted from the block space or the replica is no longer + * on transient storage. + * + * @param deleteSavedCopies true if we should delete the saved copies on + * persistent storage. This should be set by the + * caller when the block is no longer needed. + */ + @Override + synchronized void discardReplica( + final String bpid, final long blockId, + boolean deleteSavedCopies) { + Map map = replicaMaps.get(bpid); + + if (map == null) { + return; + } + + RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId); + + if (ramDiskReplicaLru == null) { + return; + } + + if (deleteSavedCopies) { + ramDiskReplicaLru.deleteSavedFiles(); + } + + map.remove(blockId); + replicasPersisted.remove(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru); + + // replicasNotPersisted will be lazily GC'ed. + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java new file mode 100644 index 00000000000..03fc0680274 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.util.ReflectionUtils; + +import java.io.File; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public abstract class RamDiskReplicaTracker { + + FsDatasetImpl fsDataset; + + static class RamDiskReplica implements Comparable { + private final String bpid; + private final long blockId; + private File savedBlockFile; + private File savedMetaFile; + + /** + * RAM_DISK volume that holds the original replica. + */ + final FsVolumeSpi ramDiskVolume; + + /** + * Persistent volume that holds or will hold the saved replica. + */ + FsVolumeImpl lazyPersistVolume; + + RamDiskReplica(final String bpid, final long blockId, + final FsVolumeImpl ramDiskVolume) { + this.bpid = bpid; + this.blockId = blockId; + this.ramDiskVolume = ramDiskVolume; + lazyPersistVolume = null; + savedMetaFile = null; + savedBlockFile = null; + } + + long getBlockId() { + return blockId; + } + + String getBlockPoolId() { + return bpid; + } + + FsVolumeImpl getLazyPersistVolume() { + return lazyPersistVolume; + } + + void setLazyPersistVolume(FsVolumeImpl volume) { + Preconditions.checkState(!volume.isTransientStorage()); + this.lazyPersistVolume = volume; + } + + File getSavedBlockFile() { + return savedBlockFile; + } + + File getSavedMetaFile() { + return savedMetaFile; + } + + /** + * Record the saved meta and block files on the given volume. + * + * @param files Meta and block files, in that order. + */ + void recordSavedBlockFiles(File[] files) { + this.savedMetaFile = files[0]; + this.savedBlockFile = files[1]; + } + + @Override + public int hashCode() { + return bpid.hashCode() ^ (int) blockId; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + RamDiskReplica otherState = (RamDiskReplica) other; + return (otherState.bpid.equals(bpid) && otherState.blockId == blockId); + } + + // Delete the saved meta and block files. Failure to delete can be + // ignored, the directory scanner will retry the deletion later. + void deleteSavedFiles() { + try { + if (savedBlockFile != null) { + savedBlockFile.delete(); + savedBlockFile = null; + } + + if (savedMetaFile != null) { + savedMetaFile.delete(); + savedMetaFile = null; + } + } catch (Throwable t) { + // Ignore any exceptions. + } + } + + @Override + public int compareTo(RamDiskReplica other) { + int bpidResult = bpid.compareTo(other.bpid); + if (bpidResult == 0) + if (blockId == other.blockId) { + return 0; + } else if (blockId < other.blockId) { + return -1; + } else { + return 1; + } + return bpidResult; + } + + @Override + public String toString() { + return "[BlockPoolID=" + bpid + "; BlockId=" + blockId + "]"; + } + } + + /** + * Get an instance of the configured RamDiskReplicaTracker based on the + * the configuration property + * {@link org.apache.hadoop.hdfs.DFSConfigKeys#DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY}. + * + * @param conf the configuration to be used + * @param dataset the FsDataset object. + * @return an instance of RamDiskReplicaTracker + */ + static RamDiskReplicaTracker getInstance(final Configuration conf, + final FsDatasetImpl fsDataset) { + final Class trackerClass = conf.getClass( + DFSConfigKeys.DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY, + DFSConfigKeys.DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_DEFAULT, + RamDiskReplicaTracker.class); + final RamDiskReplicaTracker tracker = ReflectionUtils.newInstance( + trackerClass, conf); + tracker.initialize(fsDataset); + return tracker; + } + + void initialize(final FsDatasetImpl fsDataset) { + this.fsDataset = fsDataset; + } + + /** + * Start tracking a new finalized replica on RAM disk. + * + * @param transientVolume RAM disk volume that stores the replica. + */ + abstract void addReplica(final String bpid, final long blockId, + final FsVolumeImpl transientVolume); + + /** + * Invoked when a replica is opened by a client. This may be used as + * a heuristic by the eviction scheme. + */ + abstract void touch(final String bpid, final long blockId); + + /** + * Get the next replica to write to persistent storage. + */ + abstract RamDiskReplica dequeueNextReplicaToPersist(); + + /** + * Invoked if a replica that was previously dequeued for persistence + * could not be successfully persisted. Add it back so it can be retried + * later. + */ + abstract void reenqueueReplicaNotPersisted( + final RamDiskReplica ramDiskReplica); + + /** + * Invoked when the Lazy persist operation is started by the DataNode. + * @param checkpointVolume + */ + abstract void recordStartLazyPersist( + final String bpid, final long blockId, FsVolumeImpl checkpointVolume); + + /** + * Invoked when the Lazy persist operation is complete. + * + * @param savedFiles The saved meta and block files, in that order. + */ + abstract void recordEndLazyPersist( + final String bpid, final long blockId, final File[] savedFiles); + + /** + * Return a candidate replica to remove from RAM Disk. The exact replica + * to be returned may depend on the eviction scheme utilized. + * + * @return + */ + abstract RamDiskReplica getNextCandidateForEviction(); + + /** + * Return the number of replicas pending persistence to disk. + */ + abstract int numReplicasNotPersisted(); + + /** + * Discard all state we are tracking for the given replica. + */ + abstract void discardReplica( + final String bpid, final long blockId, + boolean deleteSavedCopies); + + void discardReplica(RamDiskReplica replica, boolean deleteSavedCopies) { + discardReplica(replica.getBlockPoolId(), replica.getBlockId(), deleteSavedCopies); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java index 48ddcc29b5f..f9e30e12a0a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java @@ -30,7 +30,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; public class FsDatasetTestUtil { public static File getFile(FsDatasetSpi fsd, String bpid, long bid) { - return ((FsDatasetImpl)fsd).getFile(bpid, bid); + return ((FsDatasetImpl)fsd).getFile(bpid, bid, false); } public static File getBlockFile(FsDatasetSpi fsd, String bpid, Block b diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java index 777779f208f..95404b31b45 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java @@ -71,7 +71,7 @@ public class TestLazyPersistFiles { private static final int THREADPOOL_SIZE = 10; private static final short REPL_FACTOR = 1; - private static final int BLOCK_SIZE = 10485760; // 10 MB + private static final int BLOCK_SIZE = 5 * 1024 * 1024; private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3; private static final long HEARTBEAT_INTERVAL_SEC = 1; private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500; @@ -449,34 +449,51 @@ public class TestLazyPersistFiles { * @throws InterruptedException */ @Test (timeout=300000) - public void testRamDiskEvictionLRU() + public void testRamDiskEvictionIsLru() throws IOException, InterruptedException { - startUpCluster(true, 3); + final int NUM_PATHS = 5; + startUpCluster(true, NUM_PATHS + EVICTION_LOW_WATERMARK); final String METHOD_NAME = GenericTestUtils.getMethodName(); - final int NUM_PATHS = 6; - Path paths[] = new Path[NUM_PATHS]; + Path paths[] = new Path[NUM_PATHS * 2]; - for (int i = 0; i < NUM_PATHS; i++) { + for (int i = 0; i < paths.length; i++) { paths[i] = new Path("/" + METHOD_NAME + "." + i +".dat"); } - // No eviction for the first half of files - for (int i = 0; i < NUM_PATHS/2; i++) { + for (int i = 0; i < NUM_PATHS; i++) { makeTestFile(paths[i], BLOCK_SIZE, true); + } + + // Sleep for a short time to allow the lazy writer thread to do its job. + Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); + + for (int i = 0; i < NUM_PATHS; ++i) { ensureFileReplicasOnStorageType(paths[i], RAM_DISK); } - // Lazy persist writer persists the first half of files - Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); + // Open the files for read in a random order. + ArrayList indexes = new ArrayList(NUM_PATHS); + for (int i = 0; i < NUM_PATHS; ++i) { + indexes.add(i); + } + Collections.shuffle(indexes); - // Create the second half of files with eviction upon each create. - for (int i = NUM_PATHS/2; i < NUM_PATHS; i++) { - makeTestFile(paths[i], BLOCK_SIZE, true); - ensureFileReplicasOnStorageType(paths[i], RAM_DISK); + for (int i = 0; i < NUM_PATHS; ++i) { + LOG.info("Touching file " + paths[indexes.get(i)]); + DFSTestUtil.readFile(fs, paths[indexes.get(i)]); + } - // path[i-NUM_PATHS/2] is expected to be evicted by LRU + // Create an equal number of new files ensuring that the previous + // files are evicted in the same order they were read. + for (int i = 0; i < NUM_PATHS; ++i) { + makeTestFile(paths[i + NUM_PATHS], BLOCK_SIZE, true); triggerBlockReport(); - ensureFileReplicasOnStorageType(paths[i - NUM_PATHS / 2], DEFAULT); + Thread.sleep(3000); + ensureFileReplicasOnStorageType(paths[i + NUM_PATHS], RAM_DISK); + ensureFileReplicasOnStorageType(paths[indexes.get(i)], DEFAULT); + for (int j = i + 1; j < NUM_PATHS; ++j) { + ensureFileReplicasOnStorageType(paths[indexes.get(j)], RAM_DISK); + } } }