diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index 490321e78bf..64fa94ed15b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import java.io.File; +import java.io.FilenameFilter; import java.io.IOException; import java.util.Arrays; import java.util.Collections; @@ -45,12 +46,13 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.StopWatch; import org.apache.hadoop.util.Time; @@ -731,18 +733,20 @@ private Map getDiskReport() { for (Entry> report : compilersInProgress.entrySet()) { + Integer index = report.getKey(); try { - dirReports[report.getKey()] = report.getValue().get(); + dirReports[index] = report.getValue().get(); // If our compiler threads were interrupted, give up on this run - if (dirReports[report.getKey()] == null) { + if (dirReports[index] == null) { dirReports = null; break; } } catch (Exception ex) { - LOG.error("Error compiling report", ex); - // Propagate ex to DataBlockScanner to deal with - throw new RuntimeException(ex); + FsVolumeSpi fsVolumeSpi = volumes.get(index); + LOG.error("Error compiling report for the volume, StorageId: " + + fsVolumeSpi.getStorageID(), ex); + // Continue scanning the other volumes } } } catch (IOException e) { @@ -751,7 +755,9 @@ private Map getDiskReport() { if (dirReports != null) { // Compile consolidated report for all the volumes for (ScanInfoPerBlockPool report : dirReports) { - list.addAll(report); + if(report != null){ + list.addAll(report); + } } } return list.toSortedArrays(); @@ -841,12 +847,11 @@ private LinkedList compileReport(FsVolumeSpi vol, File bpFinalizedDir, File dir, LinkedList report) throws InterruptedException { - File[] files; - throttle(); + List fileNames; try { - files = FileUtil.listFiles(dir); + fileNames = IOUtils.listDirectory(dir, BlockDirFilter.INSTANCE); } catch (IOException ioe) { LOG.warn("Exception occured while compiling report: ", ioe); // Initiate a check on disk failure. @@ -854,44 +859,50 @@ private LinkedList compileReport(FsVolumeSpi vol, // Ignore this directory and proceed. return report; } - Arrays.sort(files); + Collections.sort(fileNames); + /* * Assumption: In the sorted list of files block file appears immediately * before block metadata file. This is true for the current naming * convention for block file blk_ and meta file * blk__.meta */ - for (int i = 0; i < files.length; i++) { + for (int i = 0; i < fileNames.size(); i++) { // Make sure this thread can make a timely exit. With a low throttle // rate, completing a run can take a looooong time. if (Thread.interrupted()) { throw new InterruptedException(); } - if (files[i].isDirectory()) { - compileReport(vol, bpFinalizedDir, files[i], report); + File file = new File(dir, fileNames.get(i)); + if (file.isDirectory()) { + compileReport(vol, bpFinalizedDir, file, report); continue; } - if (!Block.isBlockFilename(files[i])) { - if (isBlockMetaFile(Block.BLOCK_FILE_PREFIX, files[i].getName())) { - long blockId = Block.getBlockId(files[i].getName()); - verifyFileLocation(files[i].getParentFile(), bpFinalizedDir, + if (!Block.isBlockFilename(file)) { + if (isBlockMetaFile(Block.BLOCK_FILE_PREFIX, file.getName())) { + long blockId = Block.getBlockId(file.getName()); + verifyFileLocation(file.getParentFile(), bpFinalizedDir, blockId); - report.add(new ScanInfo(blockId, null, files[i], vol)); + report.add(new ScanInfo(blockId, null, file, vol)); } continue; } - File blockFile = files[i]; - long blockId = Block.filename2id(blockFile.getName()); + File blockFile = file; + long blockId = Block.filename2id(file.getName()); File metaFile = null; // Skip all the files that start with block name until // getting to the metafile for the block - while (i + 1 < files.length && files[i + 1].isFile() - && files[i + 1].getName().startsWith(blockFile.getName())) { + while (i + 1 < fileNames.size()) { + File blkMetaFile = new File(dir, fileNames.get(i + 1)); + if (!(blkMetaFile.isFile() + && blkMetaFile.getName().startsWith(blockFile.getName()))) { + break; + } i++; - if (isBlockMetaFile(blockFile.getName(), files[i].getName())) { - metaFile = files[i]; + if (isBlockMetaFile(blockFile.getName(), blkMetaFile.getName())) { + metaFile = blkMetaFile; break; } } @@ -952,4 +963,15 @@ private void accumulateTimeWaiting() { perfTimer.reset().start(); } } + + private enum BlockDirFilter implements FilenameFilter { + INSTANCE; + + @Override + public boolean accept(File dir, String name) { + return name.startsWith(DataStorage.BLOCK_SUBDIR_PREFIX) + || name.startsWith(DataStorage.STORAGE_DIR_FINALIZED) + || name.startsWith(Block.BLOCK_FILE_PREFIX); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index 01cc192be17..35bd7e8bd37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -30,6 +30,8 @@ import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Random; @@ -54,6 +56,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; @@ -64,6 +67,7 @@ import org.apache.hadoop.util.Time; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; /** * Tests {@link DirectoryScanner} handling of differences @@ -941,4 +945,50 @@ public void TestScanInfo() throws Exception { new File(TEST_VOLUME.getFinalizedDir(BPID_2).getAbsolutePath(), "blk_567__1004.meta")); } + + /** + * Test the behavior of exception handling during directory scan operation. + * Directory scanner shouldn't abort the scan on every directory just because + * one had an error. + */ + @Test(timeout = 60000) + public void testExceptionHandlingWhileDirectoryScan() throws Exception { + cluster = new MiniDFSCluster.Builder(CONF).build(); + try { + cluster.waitActive(); + bpid = cluster.getNamesystem().getBlockPoolId(); + fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0)); + client = cluster.getFileSystem().getClient(); + CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1); + DataNode dataNode = cluster.getDataNodes().get(0); + + // Add files with 2 blocks + createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH * 2, false); + + // Inject error on #getFinalizedDir() so that ReportCompiler#call() will + // hit exception while preparing the block info report list. + List volumes = new ArrayList<>(); + Iterator iterator = fds.getFsVolumeReferences().iterator(); + while (iterator.hasNext()) { + FsVolumeSpi volume = iterator.next(); + FsVolumeSpi spy = Mockito.spy(volume); + Mockito.doThrow(new IOException("Error while getFinalizedDir")) + .when(spy).getFinalizedDir(volume.getBlockPoolList()[0]); + volumes.add(spy); + } + FsVolumeReferences volReferences = new FsVolumeReferences(volumes); + FsDatasetSpi spyFds = Mockito.spy(fds); + Mockito.doReturn(volReferences).when(spyFds).getFsVolumeReferences(); + + scanner = new DirectoryScanner(dataNode, spyFds, CONF); + scanner.setRetainDiffs(true); + scanner.reconcile(); + } finally { + if (scanner != null) { + scanner.shutdown(); + scanner = null; + } + cluster.shutdown(); + } + } }