HDFS-7764. DirectoryScanner shouldn't abort the scan if one directory had an error (Rakesh R via cmccabe)
(cherry picked from commit f67149ab08
)
This commit is contained in:
parent
2d548759ad
commit
945c916eec
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.io.File;
|
||||
import java.io.FilenameFilter;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
@ -45,12 +46,13 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.StopWatch;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
@ -731,18 +733,20 @@ public class DirectoryScanner implements Runnable {
|
|||
|
||||
for (Entry<Integer, Future<ScanInfoPerBlockPool>> report :
|
||||
compilersInProgress.entrySet()) {
|
||||
Integer index = report.getKey();
|
||||
try {
|
||||
dirReports[report.getKey()] = report.getValue().get();
|
||||
dirReports[index] = report.getValue().get();
|
||||
|
||||
// If our compiler threads were interrupted, give up on this run
|
||||
if (dirReports[report.getKey()] == null) {
|
||||
if (dirReports[index] == null) {
|
||||
dirReports = null;
|
||||
break;
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Error compiling report", ex);
|
||||
// Propagate ex to DataBlockScanner to deal with
|
||||
throw new RuntimeException(ex);
|
||||
FsVolumeSpi fsVolumeSpi = volumes.get(index);
|
||||
LOG.error("Error compiling report for the volume, StorageId: "
|
||||
+ fsVolumeSpi.getStorageID(), ex);
|
||||
// Continue scanning the other volumes
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
|
@ -751,7 +755,9 @@ public class DirectoryScanner implements Runnable {
|
|||
if (dirReports != null) {
|
||||
// Compile consolidated report for all the volumes
|
||||
for (ScanInfoPerBlockPool report : dirReports) {
|
||||
list.addAll(report);
|
||||
if(report != null){
|
||||
list.addAll(report);
|
||||
}
|
||||
}
|
||||
}
|
||||
return list.toSortedArrays();
|
||||
|
@ -841,12 +847,11 @@ public class DirectoryScanner implements Runnable {
|
|||
File bpFinalizedDir, File dir, LinkedList<ScanInfo> report)
|
||||
throws InterruptedException {
|
||||
|
||||
File[] files;
|
||||
|
||||
throttle();
|
||||
|
||||
List <String> fileNames;
|
||||
try {
|
||||
files = FileUtil.listFiles(dir);
|
||||
fileNames = IOUtils.listDirectory(dir, BlockDirFilter.INSTANCE);
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Exception occured while compiling report: ", ioe);
|
||||
// Initiate a check on disk failure.
|
||||
|
@ -854,44 +859,50 @@ public class DirectoryScanner implements Runnable {
|
|||
// Ignore this directory and proceed.
|
||||
return report;
|
||||
}
|
||||
Arrays.sort(files);
|
||||
Collections.sort(fileNames);
|
||||
|
||||
/*
|
||||
* Assumption: In the sorted list of files block file appears immediately
|
||||
* before block metadata file. This is true for the current naming
|
||||
* convention for block file blk_<blockid> and meta file
|
||||
* blk_<blockid>_<genstamp>.meta
|
||||
*/
|
||||
for (int i = 0; i < files.length; i++) {
|
||||
for (int i = 0; i < fileNames.size(); i++) {
|
||||
// Make sure this thread can make a timely exit. With a low throttle
|
||||
// rate, completing a run can take a looooong time.
|
||||
if (Thread.interrupted()) {
|
||||
throw new InterruptedException();
|
||||
}
|
||||
|
||||
if (files[i].isDirectory()) {
|
||||
compileReport(vol, bpFinalizedDir, files[i], report);
|
||||
File file = new File(dir, fileNames.get(i));
|
||||
if (file.isDirectory()) {
|
||||
compileReport(vol, bpFinalizedDir, file, report);
|
||||
continue;
|
||||
}
|
||||
if (!Block.isBlockFilename(files[i])) {
|
||||
if (isBlockMetaFile(Block.BLOCK_FILE_PREFIX, files[i].getName())) {
|
||||
long blockId = Block.getBlockId(files[i].getName());
|
||||
verifyFileLocation(files[i].getParentFile(), bpFinalizedDir,
|
||||
if (!Block.isBlockFilename(file)) {
|
||||
if (isBlockMetaFile(Block.BLOCK_FILE_PREFIX, file.getName())) {
|
||||
long blockId = Block.getBlockId(file.getName());
|
||||
verifyFileLocation(file.getParentFile(), bpFinalizedDir,
|
||||
blockId);
|
||||
report.add(new ScanInfo(blockId, null, files[i], vol));
|
||||
report.add(new ScanInfo(blockId, null, file, vol));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
File blockFile = files[i];
|
||||
long blockId = Block.filename2id(blockFile.getName());
|
||||
File blockFile = file;
|
||||
long blockId = Block.filename2id(file.getName());
|
||||
File metaFile = null;
|
||||
|
||||
// Skip all the files that start with block name until
|
||||
// getting to the metafile for the block
|
||||
while (i + 1 < files.length && files[i + 1].isFile()
|
||||
&& files[i + 1].getName().startsWith(blockFile.getName())) {
|
||||
while (i + 1 < fileNames.size()) {
|
||||
File blkMetaFile = new File(dir, fileNames.get(i + 1));
|
||||
if (!(blkMetaFile.isFile()
|
||||
&& blkMetaFile.getName().startsWith(blockFile.getName()))) {
|
||||
break;
|
||||
}
|
||||
i++;
|
||||
if (isBlockMetaFile(blockFile.getName(), files[i].getName())) {
|
||||
metaFile = files[i];
|
||||
if (isBlockMetaFile(blockFile.getName(), blkMetaFile.getName())) {
|
||||
metaFile = blkMetaFile;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -952,4 +963,15 @@ public class DirectoryScanner implements Runnable {
|
|||
perfTimer.reset().start();
|
||||
}
|
||||
}
|
||||
|
||||
private enum BlockDirFilter implements FilenameFilter {
|
||||
INSTANCE;
|
||||
|
||||
@Override
|
||||
public boolean accept(File dir, String name) {
|
||||
return name.startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)
|
||||
|| name.startsWith(DataStorage.STORAGE_DIR_FINALIZED)
|
||||
|| name.startsWith(Block.BLOCK_FILE_PREFIX);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,8 @@ import java.io.FileOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
@ -54,6 +56,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
|
|||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
||||
|
@ -64,6 +67,7 @@ import org.apache.hadoop.util.AutoCloseableLock;
|
|||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
/**
|
||||
* Tests {@link DirectoryScanner} handling of differences
|
||||
|
@ -941,4 +945,50 @@ public class TestDirectoryScanner {
|
|||
new File(TEST_VOLUME.getFinalizedDir(BPID_2).getAbsolutePath(),
|
||||
"blk_567__1004.meta"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the behavior of exception handling during directory scan operation.
|
||||
* Directory scanner shouldn't abort the scan on every directory just because
|
||||
* one had an error.
|
||||
*/
|
||||
@Test(timeout = 60000)
|
||||
public void testExceptionHandlingWhileDirectoryScan() throws Exception {
|
||||
cluster = new MiniDFSCluster.Builder(CONF).build();
|
||||
try {
|
||||
cluster.waitActive();
|
||||
bpid = cluster.getNamesystem().getBlockPoolId();
|
||||
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
|
||||
client = cluster.getFileSystem().getClient();
|
||||
CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
|
||||
DataNode dataNode = cluster.getDataNodes().get(0);
|
||||
|
||||
// Add files with 2 blocks
|
||||
createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH * 2, false);
|
||||
|
||||
// Inject error on #getFinalizedDir() so that ReportCompiler#call() will
|
||||
// hit exception while preparing the block info report list.
|
||||
List<FsVolumeSpi> volumes = new ArrayList<>();
|
||||
Iterator<FsVolumeSpi> iterator = fds.getFsVolumeReferences().iterator();
|
||||
while (iterator.hasNext()) {
|
||||
FsVolumeSpi volume = iterator.next();
|
||||
FsVolumeSpi spy = Mockito.spy(volume);
|
||||
Mockito.doThrow(new IOException("Error while getFinalizedDir"))
|
||||
.when(spy).getFinalizedDir(volume.getBlockPoolList()[0]);
|
||||
volumes.add(spy);
|
||||
}
|
||||
FsVolumeReferences volReferences = new FsVolumeReferences(volumes);
|
||||
FsDatasetSpi<? extends FsVolumeSpi> spyFds = Mockito.spy(fds);
|
||||
Mockito.doReturn(volReferences).when(spyFds).getFsVolumeReferences();
|
||||
|
||||
scanner = new DirectoryScanner(dataNode, spyFds, CONF);
|
||||
scanner.setRetainDiffs(true);
|
||||
scanner.reconcile();
|
||||
} finally {
|
||||
if (scanner != null) {
|
||||
scanner.shutdown();
|
||||
scanner = null;
|
||||
}
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue