From c92837aeab5188f6171d4016f91b3b4936a66beb Mon Sep 17 00:00:00 2001 From: arp Date: Thu, 28 Aug 2014 23:13:46 -0700 Subject: [PATCH] HDFS-6931. Move lazily persisted replicas to finalized directory on DN startup. (Arpit Agarwal) --- .../hadoop-hdfs/CHANGES-HDFS-6581.txt | 3 + .../fsdataset/impl/BlockPoolSlice.java | 168 +++++++++++++++--- .../fsdataset/impl/FsDatasetImpl.java | 9 +- .../datanode/fsdataset/impl/FsVolumeImpl.java | 12 +- .../datanode/fsdataset/impl/FsVolumeList.java | 18 +- .../fsdataset/impl/TestLazyPersistFiles.java | 53 +++++- 6 files changed, 213 insertions(+), 50 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt index 881cb63ab6d..8791485adce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt @@ -27,3 +27,6 @@ HDFS-6960. Bugfix in LazyWriter, fix test case and some refactoring. (Arpit Agarwal) + HDFS-6931. Move lazily persisted replicas to finalized directory on DN + startup. (Arpit Agarwal) + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 1313fef7ed5..1bb6680f636 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -94,17 +94,6 @@ class BlockPoolSlice { } } - // Delete all checkpointed replicas on startup. - // TODO: We can move checkpointed replicas to the finalized dir and delete - // the copy on RAM_DISK. For now we take the simpler approach. - - FileUtil.fullyDelete(lazypersistDir); - if (!this.lazypersistDir.exists()) { - if (!this.lazypersistDir.mkdirs()) { - throw new IOException("Failed to mkdirs " + this.lazypersistDir); - } - } - // Files that were being written when the datanode was last shutdown // are now moved back to the data directory. It is possible that // in the future, we might want to do some sort of datanode-local @@ -271,6 +260,13 @@ class BlockPoolSlice { return blockFile; } + /** + * Save the given replica to persistent storage. + * + * @param replicaInfo + * @return The saved block file. + * @throws IOException + */ File lazyPersistReplica(ReplicaInfo replicaInfo) throws IOException { if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) { FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir); @@ -305,11 +301,21 @@ class BlockPoolSlice { - void getVolumeMap(ReplicaMap volumeMap) throws IOException { + void getVolumeMap(ReplicaMap volumeMap, + final LazyWriteReplicaTracker lazyWriteReplicaMap) + throws IOException { + // Recover lazy persist replicas, they will be added to the volumeMap + // when we scan the finalized directory. + if (lazypersistDir.exists()) { + int numRecovered = moveLazyPersistReplicasToFinalized(lazypersistDir); + FsDatasetImpl.LOG.info( + "Recovered " + numRecovered + " replicas from " + lazypersistDir); + } + // add finalized replicas - addToReplicasMap(volumeMap, finalizedDir, true); + addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true); // add rbw replicas - addToReplicasMap(volumeMap, rbwDir, false); + addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false); } /** @@ -337,19 +343,69 @@ class BlockPoolSlice { } + /** + * Move replicas in the lazy persist directory to their corresponding locations + * in the finalized directory. + * @return number of replicas recovered. + */ + private int moveLazyPersistReplicasToFinalized(File source) + throws IOException { + File files[] = FileUtil.listFiles(source); + int numRecovered = 0; + for (File file : files) { + if (file.isDirectory()) { + numRecovered += moveLazyPersistReplicasToFinalized(file); + } + + if (Block.isMetaFilename(file.getName())) { + File metaFile = file; + File blockFile = Block.metaToBlockFile(metaFile); + long blockId = Block.filename2id(blockFile.getName()); + File targetDir = DatanodeUtil.idToBlockDir(finalizedDir, blockId); + + if (blockFile.exists()) { + File targetBlockFile = new File(targetDir, blockFile.getName()); + File targetMetaFile = new File(targetDir, metaFile.getName()); + + if (!targetDir.exists() && !targetDir.mkdirs()) { + FsDatasetImpl.LOG.warn("Failed to move " + blockFile + " to " + targetDir); + continue; + } + + metaFile.renameTo(targetMetaFile); + blockFile.renameTo(targetBlockFile); + + if (targetBlockFile.exists() && targetMetaFile.exists()) { + ++numRecovered; + } else { + // Failure should be rare. + FsDatasetImpl.LOG.warn("Failed to move " + blockFile + " to " + targetDir); + } + } + } + } + + FileUtil.fullyDelete(source); + return numRecovered; + } + /** * Add replicas under the given directory to the volume map * @param volumeMap the replicas map * @param dir an input directory + * @param lazyWriteReplicaMap Map of replicas on transient + * storage. * @param isFinalized true if the directory has finalized replicas; * false if the directory has rbw replicas */ - void addToReplicasMap(ReplicaMap volumeMap, File dir, boolean isFinalized - ) throws IOException { + void addToReplicasMap(ReplicaMap volumeMap, File dir, + final LazyWriteReplicaTracker lazyWriteReplicaMap, + boolean isFinalized) + throws IOException { File files[] = FileUtil.listFiles(dir); for (File file : files) { if (file.isDirectory()) { - addToReplicasMap(volumeMap, file, isFinalized); + addToReplicasMap(volumeMap, file, lazyWriteReplicaMap, isFinalized); } if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) { @@ -405,13 +461,83 @@ class BlockPoolSlice { } } - ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica); - if (oldReplica != null) { - FsDatasetImpl.LOG.warn("Two block files with the same block id exist " + - "on disk: " + oldReplica.getBlockFile() + " and " + file ); + ReplicaInfo oldReplica = volumeMap.get(bpid, newReplica.getBlockId()); + if (oldReplica == null) { + volumeMap.add(bpid, newReplica); + } else { + // We have multiple replicas of the same block so decide which one + // to keep. + newReplica = resolveDuplicateReplicas(newReplica, oldReplica, volumeMap); + } + + // If we are retaining a replica on transient storage make sure + // it is in the lazyWriteReplicaMap so it can be persisted + // eventually. + if (newReplica.getVolume().isTransientStorage()) { + lazyWriteReplicaMap.addReplica(bpid, blockId, newReplica.getVolume()); + } else { + lazyWriteReplicaMap.discardReplica(bpid, blockId, true); } } } + + /** + * This method is invoked during DN startup when volumes are scanned to + * build up the volumeMap. + * + * Given two replicas, decide which one to keep. The preference is as + * follows: + * 1. Prefer the replica with the higher generation stamp. + * 2. If generation stamps are equal, prefer the replica with the + * larger on-disk length. + * 3. If on-disk length is the same, prefer the replica on persistent + * storage volume. + * 4. All other factors being equal, keep replica1. + * + * The other replica is removed from the volumeMap and is deleted from + * its storage volume. + * + * @param replica1 + * @param replica2 + * @param volumeMap + * @return the replica that is retained. + * @throws IOException + */ + private ReplicaInfo resolveDuplicateReplicas( + final ReplicaInfo replica1, final ReplicaInfo replica2, + final ReplicaMap volumeMap) throws IOException { + + ReplicaInfo replicaToKeep; + ReplicaInfo replicaToDelete; + + if (replica1.getGenerationStamp() != replica2.getGenerationStamp()) { + replicaToKeep = replica1.getGenerationStamp() > replica2.getGenerationStamp() + ? replica1 : replica2; + } else if (replica1.getNumBytes() != replica2.getNumBytes()) { + replicaToKeep = replica1.getNumBytes() > replica2.getNumBytes() ? + replica1 : replica2; + } else if (replica1.getVolume().isTransientStorage() && + !replica2.getVolume().isTransientStorage()) { + replicaToKeep = replica2; + } else { + replicaToKeep = replica1; + } + + replicaToDelete = (replicaToKeep == replica1) ? replica2 : replica1; + + // Update volumeMap. + volumeMap.add(bpid, replicaToKeep); + + // Delete the files on disk. Failure here is okay. + replicaToDelete.getBlockFile().delete(); + replicaToDelete.getMetaFile().delete(); + + FsDatasetImpl.LOG.info( + "resolveDuplicateReplicas keeping " + replicaToKeep.getBlockFile() + + ", deleting " + replicaToDelete.getBlockFile()); + + return replicaToKeep; + } /** * Find out the number of bytes in the block that match its crc. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 22f626cc654..10d98adb5eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -262,13 +262,13 @@ class FsDatasetImpl implements FsDatasetSpi { volumes = new FsVolumeList(volsFailed, blockChooserImpl); asyncDiskService = new FsDatasetAsyncDiskService(datanode); - // TODO: Initialize transientReplicaTracker from blocks on disk. - for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { addVolume(dataLocations, storage.getStorageDir(idx)); } cacheManager = new FsDatasetCache(this); + + // Start the lazy writer once we have built the replica maps. lazyWriter = new Daemon(new LazyWriter( conf.getInt(DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC))); @@ -287,7 +287,7 @@ class FsDatasetImpl implements FsDatasetSpi { // storageMap and asyncDiskService, consistent. FsVolumeImpl fsVolume = FsVolumeImplAllocator.createVolume( this, sd.getStorageUuid(), dir, this.conf, storageType); - fsVolume.getVolumeMap(volumeMap); + fsVolume.getVolumeMap(volumeMap, lazyWriteReplicaTracker); volumes.addVolume(fsVolume); storageMap.put(sd.getStorageUuid(), @@ -2021,7 +2021,7 @@ class FsDatasetImpl implements FsDatasetSpi { volumes.addBlockPool(bpid, conf); volumeMap.initBlockPool(bpid); } - volumes.getAllVolumesMap(bpid, volumeMap); + volumes.getAllVolumesMap(bpid, volumeMap, lazyWriteReplicaTracker); } @Override @@ -2261,6 +2261,7 @@ class FsDatasetImpl implements FsDatasetSpi { LOG.warn("Exception saving replica " + replicaState, ioe); } finally { if (!succeeded && replicaState != null) { + LOG.warn("Failed to save replica " + replicaState + ". re-enqueueing it."); lazyWriteReplicaTracker.reenqueueReplica(replicaState); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 85756b78cee..ccfb4495e5e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -245,14 +245,18 @@ public class FsVolumeImpl implements FsVolumeSpi { } } - void getVolumeMap(ReplicaMap volumeMap) throws IOException { + void getVolumeMap(ReplicaMap volumeMap, + final LazyWriteReplicaTracker lazyWriteReplicaMap) + throws IOException { for(BlockPoolSlice s : bpSlices.values()) { - s.getVolumeMap(volumeMap); + s.getVolumeMap(volumeMap, lazyWriteReplicaMap); } } - void getVolumeMap(String bpid, ReplicaMap volumeMap) throws IOException { - getBlockPoolSlice(bpid).getVolumeMap(volumeMap); + void getVolumeMap(String bpid, ReplicaMap volumeMap, + final LazyWriteReplicaTracker lazyWriteReplicaMap) + throws IOException { + getBlockPoolSlice(bpid).getVolumeMap(volumeMap, lazyWriteReplicaMap); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index f1b196a0022..67958bf91c9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -119,7 +119,10 @@ class FsVolumeList { return remaining; } - void getAllVolumesMap(final String bpid, final ReplicaMap volumeMap) throws IOException { + void getAllVolumesMap(final String bpid, + final ReplicaMap volumeMap, + final LazyWriteReplicaTracker lazyWriteReplicaMap) + throws IOException { long totalStartTime = Time.monotonicNow(); final List exceptions = Collections.synchronizedList( new ArrayList()); @@ -131,7 +134,7 @@ class FsVolumeList { FsDatasetImpl.LOG.info("Adding replicas to map for block pool " + bpid + " on volume " + v + "..."); long startTime = Time.monotonicNow(); - v.getVolumeMap(bpid, volumeMap); + v.getVolumeMap(bpid, volumeMap, lazyWriteReplicaMap); long timeTaken = Time.monotonicNow() - startTime; FsDatasetImpl.LOG.info("Time to add replicas to map for block pool" + " " + bpid + " on volume " + v + ": " + timeTaken + "ms"); @@ -160,17 +163,6 @@ class FsVolumeList { + totalTimeTaken + "ms"); } - void getVolumeMap(String bpid, FsVolumeImpl volume, ReplicaMap volumeMap) - throws IOException { - FsDatasetImpl.LOG.info("Adding replicas to map for block pool " + bpid + - " on volume " + volume + "..."); - long startTime = Time.monotonicNow(); - volume.getVolumeMap(bpid, volumeMap); - long timeTaken = Time.monotonicNow() - startTime; - FsDatasetImpl.LOG.info("Time to add replicas to map for block pool " + bpid + - " on volume " + volume + ": " + timeTaken + "ms"); - } - /** * Calls {@link FsVolumeImpl#checkDirs()} on each volume, removing any * volumes from the active list that result in a DiskErrorException. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java index af0e8acbae2..cac99a77f8a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java @@ -265,7 +265,9 @@ public class TestLazyPersistFiles { LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK); // Sleep for a short time to allow the lazy writer thread to do its job - Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); + Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000); + + LOG.info("Verifying copy was saved to lazyPersist/"); // Make sure that there is a saved copy of the replica on persistent // storage. @@ -330,23 +332,52 @@ public class TestLazyPersistFiles { ensureFileReplicasOnStorageType(path1, DEFAULT); } - /** - * TODO: Stub test, to be completed. - * Verify that checksum computation is skipped for files written to memory. - */ @Test (timeout=300000) - public void testChecksumIsSkipped() + public void testDnRestartWithSavedReplicas() throws IOException, InterruptedException { + startUpCluster(REPL_FACTOR, - new StorageType[] {RAM_DISK, DEFAULT }, -1); + new StorageType[] {RAM_DISK, DEFAULT }, + (2 * BLOCK_SIZE - 1)); // 1 replica + delta. final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); makeTestFile(path1, BLOCK_SIZE, true); ensureFileReplicasOnStorageType(path1, RAM_DISK); - // Verify checksum was not computed. + // Sleep for a short time to allow the lazy writer thread to do its job. + // However the block replica should not be evicted from RAM_DISK yet. + Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); + ensureFileReplicasOnStorageType(path1, RAM_DISK); + LOG.info("Restarting the DataNode"); + cluster.restartDataNode(0, true); + cluster.waitActive(); + + // Ensure that the replica is now on persistent storage. + ensureFileReplicasOnStorageType(path1, DEFAULT); + } + + @Test (timeout=300000) + public void testDnRestartWithUnsavedReplicas() + throws IOException, InterruptedException { + + startUpCluster(REPL_FACTOR, + new StorageType[] {RAM_DISK, DEFAULT }, + (2 * BLOCK_SIZE - 1)); // 1 replica + delta. + stopLazyWriter(cluster.getDataNodes().get(0)); + + final String METHOD_NAME = GenericTestUtils.getMethodName(); + Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); + makeTestFile(path1, BLOCK_SIZE, true); + ensureFileReplicasOnStorageType(path1, RAM_DISK); + + LOG.info("Restarting the DataNode"); + cluster.restartDataNode(0, true); + cluster.waitActive(); + + // Ensure that the replica is still on transient storage. + ensureFileReplicasOnStorageType(path1, RAM_DISK); } // ---- Utility functions for all test cases ------------------------------- @@ -443,4 +474,10 @@ public class TestLazyPersistFiles { return locatedBlocks; } + + private void stopLazyWriter(DataNode dn) { + // Stop the lazyWriter daemon. + FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset()); + ((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop(); + } }