diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 637705a7abd..f5c6ed61cc1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -270,17 +270,16 @@ File addBlock(Block b, File f) throws IOException { * Save the given replica to persistent storage. * * @param replicaInfo - * @return The saved block file. + * @return The saved meta and block files, in that order. * @throws IOException */ - File lazyPersistReplica(ReplicaInfo replicaInfo) throws IOException { + File[] lazyPersistReplica(ReplicaInfo replicaInfo) throws IOException { if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) { FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir); } - File metaFile = FsDatasetImpl.copyBlockFiles(replicaInfo, lazypersistDir); - File blockFile = Block.metaToBlockFile(metaFile); - dfsUsage.incDfsUsed(replicaInfo.getNumBytes() + metaFile.length()); - return blockFile; + File targetFiles[] = FsDatasetImpl.copyBlockFiles(replicaInfo, lazypersistDir); + dfsUsage.incDfsUsed(targetFiles[0].length() + targetFiles[1].length()); + return targetFiles; } /** @@ -484,7 +483,7 @@ void addToReplicasMap(ReplicaMap volumeMap, File dir, if (newReplica.getVolume().isTransientStorage()) { lazyWriteReplicaMap.addReplica(bpid, blockId, newReplica.getVolume()); } else { - lazyWriteReplicaMap.discardReplica(bpid, blockId, true); + lazyWriteReplicaMap.discardReplica(bpid, blockId, false); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 8d93dccc686..19260db3e2d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -707,10 +707,10 @@ static File moveBlockFiles(Block b, File srcfile, File destdir) /** * Copy the block and meta files for the given block from the given - * @return the new meta file. + * @return the new meta and block files. * @throws IOException */ - static File copyBlockFiles(ReplicaInfo replicaInfo, File destRoot) + static File[] copyBlockFiles(ReplicaInfo replicaInfo, File destRoot) throws IOException { final File destDir = DatanodeUtil.idToBlockDir(destRoot, replicaInfo.getBlockId()); final File dstFile = new File(destDir, replicaInfo.getBlockName()); @@ -731,7 +731,7 @@ static File copyBlockFiles(ReplicaInfo replicaInfo, File destRoot) LOG.debug("addBlock: Moved " + srcMeta + " to " + dstMeta); LOG.debug("addBlock: Moved " + srcFile + " to " + dstFile); } - return dstMeta; + return new File[] {dstMeta, dstFile}; } static private void truncateBlock(File blockFile, File metaFile, @@ -2332,13 +2332,14 @@ private void moveReplicaToNewVolume(String bpid, long blockId) } lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume); - File savedBlockFile = targetVolume.getBlockPoolSlice(bpid) - .lazyPersistReplica(replicaInfo); - lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedBlockFile); + File[] savedFiles = targetVolume.getBlockPoolSlice(bpid) + .lazyPersistReplica(replicaInfo); + lazyWriteReplicaTracker.recordEndLazyPersist( + bpid, blockId, savedFiles[0], savedFiles[1]); if (LOG.isDebugEnabled()) { LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid + - " to file " + savedBlockFile); + " to file " + savedFiles[1]); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java index 9f020c4ab9b..a8ab1b9b804 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java @@ -48,6 +48,7 @@ static class ReplicaState implements Comparable { * Persistent volume that holds or will hold the saved replica. */ FsVolumeImpl lazyPersistVolume; + File savedMetaFile; File savedBlockFile; ReplicaState(final String bpid, final long blockId, FsVolumeSpi transientVolume) { @@ -56,9 +57,26 @@ static class ReplicaState implements Comparable { this.transientVolume = transientVolume; state = State.IN_MEMORY; lazyPersistVolume = null; + savedMetaFile = null; savedBlockFile = null; } + void deleteSavedFiles() { + try { + if (savedBlockFile != null) { + savedBlockFile.delete(); + savedBlockFile = null; + } + + if (savedMetaFile != null) { + savedMetaFile.delete(); + savedMetaFile = null; + } + } catch (Throwable t) { + // Ignore any exceptions. + } + } + @Override public String toString() { return "[Bpid=" + bpid + ";blockId=" + blockId + "]"; @@ -144,7 +162,8 @@ synchronized void recordStartLazyPersist( } synchronized void recordEndLazyPersist( - final String bpid, final long blockId, File savedBlockFile) { + final String bpid, final long blockId, + final File savedMetaFile, final File savedBlockFile) { Map map = replicaMaps.get(bpid); ReplicaState replicaState = map.get(blockId); @@ -153,6 +172,7 @@ synchronized void recordEndLazyPersist( bpid + "; blockId=" + blockId); } replicaState.state = State.LAZY_PERSIST_COMPLETE; + replicaState.savedMetaFile = savedMetaFile; replicaState.savedBlockFile = savedBlockFile; if (replicasNotPersisted.peek() == replicaState) { @@ -208,12 +228,22 @@ synchronized ReplicaState getNextCandidateForEviction() { return null; } - void discardReplica(ReplicaState replicaState, boolean force) { - discardReplica(replicaState.bpid, replicaState.blockId, force); + void discardReplica(ReplicaState replicaState, boolean deleteSavedCopies) { + discardReplica(replicaState.bpid, replicaState.blockId, deleteSavedCopies); } + /** + * Discard any state we are tracking for the given replica. This could mean + * the block is either deleted from the block space or the replica is no longer + * on transient storage. + * + * @param deleteSavedCopies true if we should delete the saved copies on + * persistent storage. This should be set by the + * caller when the block is no longer needed. + */ synchronized void discardReplica( - final String bpid, final long blockId, boolean force) { + final String bpid, final long blockId, + boolean deleteSavedCopies) { Map map = replicaMaps.get(bpid); if (map == null) { @@ -223,19 +253,12 @@ synchronized void discardReplica( ReplicaState replicaState = map.get(blockId); if (replicaState == null) { - if (force) { - return; - } - throw new IllegalStateException("Unknown replica bpid=" + - bpid + "; blockId=" + blockId); + return; } - if (replicaState.state != State.LAZY_PERSIST_COMPLETE && !force) { - throw new IllegalStateException("Discarding replica without " + - "saving it to disk bpid=" + bpid + "; blockId=" + blockId); - + if (deleteSavedCopies) { + replicaState.deleteSavedFiles(); } - map.remove(blockId); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java index 461c44dc1a0..7dfba50a968 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java @@ -394,7 +394,7 @@ public void testLazyPersistBlocksAreSaved() public void testRamDiskEviction() throws IOException, InterruptedException { startUpCluster(REPL_FACTOR, - new StorageType[] {RAM_DISK, DEFAULT }, + new StorageType[] { RAM_DISK, DEFAULT }, (2 * BLOCK_SIZE - 1)); // 1 replica + delta. final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); @@ -415,7 +415,7 @@ public void testRamDiskEviction() // Make sure that the second file's block replica is on RAM_DISK, whereas // the original file's block replica is now on disk. -// ensureFileReplicasOnStorageType(path2, RAM_DISK); + ensureFileReplicasOnStorageType(path2, RAM_DISK); ensureFileReplicasOnStorageType(path1, DEFAULT); }