HDFS-8941. DistributedFileSystem listCorruptFileBlocks API should resolve relative path. Contributed by Rakesh R.

(cherry picked from commit c32614f410)
This commit is contained in:
Andrew Wang 2015-10-09 11:57:03 -07:00
parent 673f301408
commit 12d3b69217
3 changed files with 86 additions and 2 deletions

View File

@ -1193,9 +1193,22 @@ public class DistributedFileSystem extends FileSystem {
}
@Override
public RemoteIterator<Path> listCorruptFileBlocks(Path path)
public RemoteIterator<Path> listCorruptFileBlocks(final Path path)
throws IOException {
return new CorruptFileBlockIterator(dfs, path);
Path absF = fixRelativePart(path);
return new FileSystemLinkResolver<RemoteIterator<Path>>() {
@Override
public RemoteIterator<Path> doCall(final Path path) throws IOException,
UnresolvedLinkException {
return new CorruptFileBlockIterator(dfs, path);
}
@Override
public RemoteIterator<Path> next(final FileSystem fs, final Path path)
throws IOException {
return fs.listCorruptFileBlocks(path);
}
}.resolve(this, absF);
}
/** @return datanode statistics. */

View File

@ -1184,6 +1184,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9142. Separating Configuration object for namenode(s) in
MiniDFSCluster. (Siqi Li via mingma)
HDFS-8941. DistributedFileSystem listCorruptFileBlocks API should
resolve relative path. (Rakesh R via wang)
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -530,4 +530,72 @@ public class TestListCorruptFileBlocks {
}
}
@Test(timeout = 60000)
public void testListCorruptFileBlocksOnRelativePath() throws Exception {
Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1);
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
DistributedFileSystem dfs = (DistributedFileSystem) fs;
final Path baseDir = new Path("/somewhere/base");
fs.mkdirs(baseDir);
// set working dir
fs.setWorkingDirectory(baseDir);
DFSTestUtil util = new DFSTestUtil.Builder()
.setName("testGetCorruptFilesOnRelativePath").setNumFiles(3)
.setMaxLevels(1).setMaxSize(1024).build();
util.createFiles(fs, "corruptData");
RemoteIterator<Path> corruptFileBlocks = dfs
.listCorruptFileBlocks(new Path("corruptData"));
int numCorrupt = countPaths(corruptFileBlocks);
assertTrue(numCorrupt == 0);
// delete the blocks
String bpid = cluster.getNamesystem().getBlockPoolId();
// For loop through number of data directories per datanode (2)
for (int i = 0; i < 2; i++) {
File storageDir = cluster.getInstanceStorageDir(0, i);
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
List<File> metadataFiles = MiniDFSCluster
.getAllBlockMetadataFiles(data_dir);
if (metadataFiles == null)
continue;
for (File metadataFile : metadataFiles) {
File blockFile = Block.metaToBlockFile(metadataFile);
LOG.info("Deliberately removing file " + blockFile.getName());
assertTrue("Cannot remove file.", blockFile.delete());
LOG.info("Deliberately removing file " + metadataFile.getName());
assertTrue("Cannot remove file.", metadataFile.delete());
}
}
int count = 0;
corruptFileBlocks = dfs.listCorruptFileBlocks(new Path("corruptData"));
numCorrupt = countPaths(corruptFileBlocks);
while (numCorrupt < 3) {
Thread.sleep(1000);
corruptFileBlocks = dfs.listCorruptFileBlocks(new Path("corruptData"));
numCorrupt = countPaths(corruptFileBlocks);
count++;
if (count > 30)
break;
}
// Validate we get all the corrupt files
LOG.info("Namenode has bad files. " + numCorrupt);
assertTrue("Failed to get corrupt files!", numCorrupt == 3);
util.cleanup(fs, "corruptData");
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}