HDFS-7764. DirectoryScanner shouldn't abort the scan if one directory had an error (Rakesh R via cmccabe)

This commit is contained in:
Colin Patrick Mccabe 2016-01-28 19:54:50 -08:00
parent ee005e010c
commit f67149ab08
3 changed files with 100 additions and 25 deletions

View File

@ -962,6 +962,9 @@ Release 2.9.0 - UNRELEASED
HDFS-9677. Rename generationStampV1/generationStampV2 to HDFS-9677. Rename generationStampV1/generationStampV2 to
legacyGenerationStamp/generationStamp. (Mingliang Liu via jing9) legacyGenerationStamp/generationStamp. (Mingliang Liu via jing9)
HDFS-7764. DirectoryScanner shouldn't abort the scan if one directory had
an error (Rakesh R via cmccabe)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import java.io.File; import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -42,12 +44,12 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StopWatch; import org.apache.hadoop.util.StopWatch;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
@ -727,18 +729,20 @@ public class DirectoryScanner implements Runnable {
for (Entry<Integer, Future<ScanInfoPerBlockPool>> report : for (Entry<Integer, Future<ScanInfoPerBlockPool>> report :
compilersInProgress.entrySet()) { compilersInProgress.entrySet()) {
Integer index = report.getKey();
try { try {
dirReports[report.getKey()] = report.getValue().get(); dirReports[index] = report.getValue().get();
// If our compiler threads were interrupted, give up on this run // If our compiler threads were interrupted, give up on this run
if (dirReports[report.getKey()] == null) { if (dirReports[index] == null) {
dirReports = null; dirReports = null;
break; break;
} }
} catch (Exception ex) { } catch (Exception ex) {
LOG.error("Error compiling report", ex); FsVolumeSpi fsVolumeSpi = volumes.get(index);
// Propagate ex to DataBlockScanner to deal with LOG.error("Error compiling report for the volume, StorageId: "
throw new RuntimeException(ex); + fsVolumeSpi.getStorageID(), ex);
// Continue scanning the other volumes
} }
} }
} catch (IOException e) { } catch (IOException e) {
@ -747,9 +751,11 @@ public class DirectoryScanner implements Runnable {
if (dirReports != null) { if (dirReports != null) {
// Compile consolidated report for all the volumes // Compile consolidated report for all the volumes
for (ScanInfoPerBlockPool report : dirReports) { for (ScanInfoPerBlockPool report : dirReports) {
if(report != null){
list.addAll(report); list.addAll(report);
} }
} }
}
return list.toSortedArrays(); return list.toSortedArrays();
} }
@ -837,12 +843,11 @@ public class DirectoryScanner implements Runnable {
File bpFinalizedDir, File dir, LinkedList<ScanInfo> report) File bpFinalizedDir, File dir, LinkedList<ScanInfo> report)
throws InterruptedException { throws InterruptedException {
File[] files;
throttle(); throttle();
List <String> fileNames;
try { try {
files = FileUtil.listFiles(dir); fileNames = IOUtils.listDirectory(dir, BlockDirFilter.INSTANCE);
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.warn("Exception occured while compiling report: ", ioe); LOG.warn("Exception occured while compiling report: ", ioe);
// Initiate a check on disk failure. // Initiate a check on disk failure.
@ -850,44 +855,50 @@ public class DirectoryScanner implements Runnable {
// Ignore this directory and proceed. // Ignore this directory and proceed.
return report; return report;
} }
Arrays.sort(files); Collections.sort(fileNames);
/* /*
* Assumption: In the sorted list of files block file appears immediately * Assumption: In the sorted list of files block file appears immediately
* before block metadata file. This is true for the current naming * before block metadata file. This is true for the current naming
* convention for block file blk_<blockid> and meta file * convention for block file blk_<blockid> and meta file
* blk_<blockid>_<genstamp>.meta * blk_<blockid>_<genstamp>.meta
*/ */
for (int i = 0; i < files.length; i++) { for (int i = 0; i < fileNames.size(); i++) {
// Make sure this thread can make a timely exit. With a low throttle // Make sure this thread can make a timely exit. With a low throttle
// rate, completing a run can take a looooong time. // rate, completing a run can take a looooong time.
if (Thread.interrupted()) { if (Thread.interrupted()) {
throw new InterruptedException(); throw new InterruptedException();
} }
if (files[i].isDirectory()) { File file = new File(dir, fileNames.get(i));
compileReport(vol, bpFinalizedDir, files[i], report); if (file.isDirectory()) {
compileReport(vol, bpFinalizedDir, file, report);
continue; continue;
} }
if (!Block.isBlockFilename(files[i])) { if (!Block.isBlockFilename(file)) {
if (isBlockMetaFile(Block.BLOCK_FILE_PREFIX, files[i].getName())) { if (isBlockMetaFile(Block.BLOCK_FILE_PREFIX, file.getName())) {
long blockId = Block.getBlockId(files[i].getName()); long blockId = Block.getBlockId(file.getName());
verifyFileLocation(files[i].getParentFile(), bpFinalizedDir, verifyFileLocation(file.getParentFile(), bpFinalizedDir,
blockId); blockId);
report.add(new ScanInfo(blockId, null, files[i], vol)); report.add(new ScanInfo(blockId, null, file, vol));
} }
continue; continue;
} }
File blockFile = files[i]; File blockFile = file;
long blockId = Block.filename2id(blockFile.getName()); long blockId = Block.filename2id(file.getName());
File metaFile = null; File metaFile = null;
// Skip all the files that start with block name until // Skip all the files that start with block name until
// getting to the metafile for the block // getting to the metafile for the block
while (i + 1 < files.length && files[i + 1].isFile() while (i + 1 < fileNames.size()) {
&& files[i + 1].getName().startsWith(blockFile.getName())) { File blkMetaFile = new File(dir, fileNames.get(i + 1));
if (!(blkMetaFile.isFile()
&& blkMetaFile.getName().startsWith(blockFile.getName()))) {
break;
}
i++; i++;
if (isBlockMetaFile(blockFile.getName(), files[i].getName())) { if (isBlockMetaFile(blockFile.getName(), blkMetaFile.getName())) {
metaFile = files[i]; metaFile = blkMetaFile;
break; break;
} }
} }
@ -946,4 +957,15 @@ public class DirectoryScanner implements Runnable {
perfTimer.reset().start(); perfTimer.reset().start();
} }
} }
private enum BlockDirFilter implements FilenameFilter {
INSTANCE;
@Override
public boolean accept(File dir, String name) {
return name.startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)
|| name.startsWith(DataStorage.STORAGE_DIR_FINALIZED)
|| name.startsWith(Block.BLOCK_FILE_PREFIX);
}
}
} }

View File

@ -30,6 +30,8 @@ import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
@ -54,6 +56,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
@ -63,6 +66,7 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
/** /**
* Tests {@link DirectoryScanner} handling of differences * Tests {@link DirectoryScanner} handling of differences
@ -934,4 +938,50 @@ public class TestDirectoryScanner {
new File(TEST_VOLUME.getFinalizedDir(BPID_2).getAbsolutePath(), new File(TEST_VOLUME.getFinalizedDir(BPID_2).getAbsolutePath(),
"blk_567__1004.meta")); "blk_567__1004.meta"));
} }
/**
* Test the behavior of exception handling during directory scan operation.
* Directory scanner shouldn't abort the scan on every directory just because
* one had an error.
*/
@Test(timeout = 60000)
public void testExceptionHandlingWhileDirectoryScan() throws Exception {
cluster = new MiniDFSCluster.Builder(CONF).build();
try {
cluster.waitActive();
bpid = cluster.getNamesystem().getBlockPoolId();
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
client = cluster.getFileSystem().getClient();
CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
DataNode dataNode = cluster.getDataNodes().get(0);
// Add files with 2 blocks
createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH * 2, false);
// Inject error on #getFinalizedDir() so that ReportCompiler#call() will
// hit exception while preparing the block info report list.
List<FsVolumeSpi> volumes = new ArrayList<>();
Iterator<FsVolumeSpi> iterator = fds.getFsVolumeReferences().iterator();
while (iterator.hasNext()) {
FsVolumeSpi volume = iterator.next();
FsVolumeSpi spy = Mockito.spy(volume);
Mockito.doThrow(new IOException("Error while getFinalizedDir"))
.when(spy).getFinalizedDir(volume.getBlockPoolList()[0]);
volumes.add(spy);
}
FsVolumeReferences volReferences = new FsVolumeReferences(volumes);
FsDatasetSpi<? extends FsVolumeSpi> spyFds = Mockito.spy(fds);
Mockito.doReturn(volReferences).when(spyFds).getFsVolumeReferences();
scanner = new DirectoryScanner(dataNode, spyFds, CONF);
scanner.setRetainDiffs(true);
scanner.reconcile();
} finally {
if (scanner != null) {
scanner.shutdown();
scanner = null;
}
cluster.shutdown();
}
}
} }