From f67149ab08bb49381def6c535ab4c4610e0a4221 Mon Sep 17 00:00:00 2001 From: Colin Patrick Mccabe Date: Thu, 28 Jan 2016 19:54:50 -0800 Subject: [PATCH] HDFS-7764. DirectoryScanner shouldn't abort the scan if one directory had an error (Rakesh R via cmccabe) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../server/datanode/DirectoryScanner.java | 72 ++++++++++++------- .../server/datanode/TestDirectoryScanner.java | 50 +++++++++++++ 3 files changed, 100 insertions(+), 25 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index a51dc155c9c..9b80aa16af1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -962,6 +962,9 @@ Release 2.9.0 - UNRELEASED HDFS-9677. Rename generationStampV1/generationStampV2 to legacyGenerationStamp/generationStamp. (Mingliang Liu via jing9) + HDFS-7764. DirectoryScanner shouldn't abort the scan if one directory had + an error (Rakesh R via cmccabe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index 392c1214756..083ca31da30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs.server.datanode; import com.google.common.annotations.VisibleForTesting; import java.io.File; +import java.io.FilenameFilter; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -42,12 +44,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.StopWatch; import org.apache.hadoop.util.Time; @@ -727,18 +729,20 @@ public class DirectoryScanner implements Runnable { for (Entry> report : compilersInProgress.entrySet()) { + Integer index = report.getKey(); try { - dirReports[report.getKey()] = report.getValue().get(); + dirReports[index] = report.getValue().get(); // If our compiler threads were interrupted, give up on this run - if (dirReports[report.getKey()] == null) { + if (dirReports[index] == null) { dirReports = null; break; } } catch (Exception ex) { - LOG.error("Error compiling report", ex); - // Propagate ex to DataBlockScanner to deal with - throw new RuntimeException(ex); + FsVolumeSpi fsVolumeSpi = volumes.get(index); + LOG.error("Error compiling report for the volume, StorageId: " + + fsVolumeSpi.getStorageID(), ex); + // Continue scanning the other volumes } } } catch (IOException e) { @@ -747,7 +751,9 @@ public class DirectoryScanner implements Runnable { if (dirReports != null) { // Compile consolidated report for all the volumes for (ScanInfoPerBlockPool report : dirReports) { - list.addAll(report); + if(report != null){ + list.addAll(report); + } } } return list.toSortedArrays(); @@ -837,12 +843,11 @@ public class DirectoryScanner implements Runnable { File bpFinalizedDir, File dir, LinkedList report) throws InterruptedException { - File[] files; - throttle(); + List fileNames; try { - files = FileUtil.listFiles(dir); + fileNames = IOUtils.listDirectory(dir, BlockDirFilter.INSTANCE); } catch (IOException ioe) { LOG.warn("Exception occured while compiling report: ", ioe); // Initiate a check on disk failure. @@ -850,44 +855,50 @@ public class DirectoryScanner implements Runnable { // Ignore this directory and proceed. return report; } - Arrays.sort(files); + Collections.sort(fileNames); + /* * Assumption: In the sorted list of files block file appears immediately * before block metadata file. This is true for the current naming * convention for block file blk_ and meta file * blk__.meta */ - for (int i = 0; i < files.length; i++) { + for (int i = 0; i < fileNames.size(); i++) { // Make sure this thread can make a timely exit. With a low throttle // rate, completing a run can take a looooong time. if (Thread.interrupted()) { throw new InterruptedException(); } - if (files[i].isDirectory()) { - compileReport(vol, bpFinalizedDir, files[i], report); + File file = new File(dir, fileNames.get(i)); + if (file.isDirectory()) { + compileReport(vol, bpFinalizedDir, file, report); continue; } - if (!Block.isBlockFilename(files[i])) { - if (isBlockMetaFile(Block.BLOCK_FILE_PREFIX, files[i].getName())) { - long blockId = Block.getBlockId(files[i].getName()); - verifyFileLocation(files[i].getParentFile(), bpFinalizedDir, + if (!Block.isBlockFilename(file)) { + if (isBlockMetaFile(Block.BLOCK_FILE_PREFIX, file.getName())) { + long blockId = Block.getBlockId(file.getName()); + verifyFileLocation(file.getParentFile(), bpFinalizedDir, blockId); - report.add(new ScanInfo(blockId, null, files[i], vol)); + report.add(new ScanInfo(blockId, null, file, vol)); } continue; } - File blockFile = files[i]; - long blockId = Block.filename2id(blockFile.getName()); + File blockFile = file; + long blockId = Block.filename2id(file.getName()); File metaFile = null; // Skip all the files that start with block name until // getting to the metafile for the block - while (i + 1 < files.length && files[i + 1].isFile() - && files[i + 1].getName().startsWith(blockFile.getName())) { + while (i + 1 < fileNames.size()) { + File blkMetaFile = new File(dir, fileNames.get(i + 1)); + if (!(blkMetaFile.isFile() + && blkMetaFile.getName().startsWith(blockFile.getName()))) { + break; + } i++; - if (isBlockMetaFile(blockFile.getName(), files[i].getName())) { - metaFile = files[i]; + if (isBlockMetaFile(blockFile.getName(), blkMetaFile.getName())) { + metaFile = blkMetaFile; break; } } @@ -946,4 +957,15 @@ public class DirectoryScanner implements Runnable { perfTimer.reset().start(); } } + + private enum BlockDirFilter implements FilenameFilter { + INSTANCE; + + @Override + public boolean accept(File dir, String name) { + return name.startsWith(DataStorage.BLOCK_SUBDIR_PREFIX) + || name.startsWith(DataStorage.STORAGE_DIR_FINALIZED) + || name.startsWith(Block.BLOCK_FILE_PREFIX); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index d030144a583..d86010719de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -30,6 +30,8 @@ import java.io.FileOutputStream; import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Random; @@ -54,6 +56,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; @@ -63,6 +66,7 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; /** * Tests {@link DirectoryScanner} handling of differences @@ -934,4 +938,50 @@ public class TestDirectoryScanner { new File(TEST_VOLUME.getFinalizedDir(BPID_2).getAbsolutePath(), "blk_567__1004.meta")); } + + /** + * Test the behavior of exception handling during directory scan operation. + * Directory scanner shouldn't abort the scan on every directory just because + * one had an error. + */ + @Test(timeout = 60000) + public void testExceptionHandlingWhileDirectoryScan() throws Exception { + cluster = new MiniDFSCluster.Builder(CONF).build(); + try { + cluster.waitActive(); + bpid = cluster.getNamesystem().getBlockPoolId(); + fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0)); + client = cluster.getFileSystem().getClient(); + CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1); + DataNode dataNode = cluster.getDataNodes().get(0); + + // Add files with 2 blocks + createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH * 2, false); + + // Inject error on #getFinalizedDir() so that ReportCompiler#call() will + // hit exception while preparing the block info report list. + List volumes = new ArrayList<>(); + Iterator iterator = fds.getFsVolumeReferences().iterator(); + while (iterator.hasNext()) { + FsVolumeSpi volume = iterator.next(); + FsVolumeSpi spy = Mockito.spy(volume); + Mockito.doThrow(new IOException("Error while getFinalizedDir")) + .when(spy).getFinalizedDir(volume.getBlockPoolList()[0]); + volumes.add(spy); + } + FsVolumeReferences volReferences = new FsVolumeReferences(volumes); + FsDatasetSpi spyFds = Mockito.spy(fds); + Mockito.doReturn(volReferences).when(spyFds).getFsVolumeReferences(); + + scanner = new DirectoryScanner(dataNode, spyFds, CONF); + scanner.setRetainDiffs(true); + scanner.reconcile(); + } finally { + if (scanner != null) { + scanner.shutdown(); + scanner = null; + } + cluster.shutdown(); + } + } }