Merge pull request #2605 from himanshug/robust_hdfs_kill

let hdfs segment kill be a success when segment file does not exist
This commit is contained in:
Fangjin Yang 2016-03-08 12:43:59 -08:00
commit 9e31e2ce0e
2 changed files with 15 additions and 18 deletions

View File

@ -50,10 +50,16 @@ public class HdfsDataSegmentKiller implements DataSegmentKiller
final Path path = getPath(segment);
log.info("killing segment[%s] mapped to path[%s]", segment.getIdentifier(), path);
final FileSystem fs = checkPathAndGetFilesystem(path);
try {
if (path.getName().endsWith(".zip")) {
final FileSystem fs = path.getFileSystem(config);
if (!fs.exists(path)) {
log.warn("Segment Path [%s] does not exist. It appears to have been deleted already.", path);
return ;
}
// path format -- > .../dataSource/interval/version/partitionNum/xxx.zip
Path partitionNumDir = path.getParent();
if (!fs.delete(partitionNumDir, true)) {
@ -95,21 +101,4 @@ public class HdfsDataSegmentKiller implements DataSegmentKiller
{
return new Path(String.valueOf(segment.getLoadSpec().get(PATH_KEY)));
}
private FileSystem checkPathAndGetFilesystem(Path path) throws SegmentLoadingException
{
FileSystem fs;
try {
fs = path.getFileSystem(config);
if (!fs.exists(path)) {
throw new SegmentLoadingException("Path[%s] doesn't exist.", path);
}
return fs;
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Problems interacting with filesystem[%s].", path);
}
}
}

View File

@ -95,6 +95,14 @@ public class HdfsDataSegmentKillerTest
Assert.assertFalse(fs.exists(dataSourceDir));
}
@Test
public void testKillNonExistingSegment() throws Exception
{
Configuration config = new Configuration();
HdfsDataSegmentKiller killer = new HdfsDataSegmentKiller(config);
killer.kill(getSegmentWithPath(new Path("/xxx/", "index.zip").toString()));
}
private void makePartitionDirWithIndex(FileSystem fs, Path path) throws IOException
{
Assert.assertTrue(fs.mkdirs(path));