Retry for transient exceptions while doing cleanup for Hadoop Jobs (#3177)

* fix 1828

fixes https://github.com/druid-io/druid/issues/1828

* remove unused import

* Review comment
This commit is contained in:
Nishant 2016-06-23 13:38:47 -07:00 committed by Charles Allen
parent 6f330dc816
commit 2696b0c451
2 changed files with 42 additions and 2 deletions

View File

@ -774,4 +774,26 @@ public class JobHelper
} }
}; };
} }
public static boolean deleteWithRetry(final FileSystem fs, final Path path, final boolean recursive)
{
try {
return RetryUtils.retry(
new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return fs.delete(path, recursive);
}
},
shouldRetryPredicate(),
NUM_RETRIES
);
}
catch (Exception e) {
log.error(e, "Failed to cleanup path[%s]", path);
throw Throwables.propagate(e);
}
}
} }

View File

@ -147,8 +147,26 @@ public class HadoopConverterJob
{ {
final Path jobDir = getJobPath(job.getJobID(), job.getWorkingDirectory()); final Path jobDir = getJobPath(job.getJobID(), job.getWorkingDirectory());
final FileSystem fs = jobDir.getFileSystem(job.getConfiguration()); final FileSystem fs = jobDir.getFileSystem(job.getConfiguration());
fs.delete(jobDir, true); RuntimeException e = null;
fs.delete(getJobClassPathDir(job.getJobName(), job.getWorkingDirectory()), true); try {
JobHelper.deleteWithRetry(fs, jobDir, true);
}
catch (RuntimeException ex) {
e = ex;
}
try {
JobHelper.deleteWithRetry(fs, getJobClassPathDir(job.getJobName(), job.getWorkingDirectory()), true);
}
catch (RuntimeException ex) {
if (e == null) {
e = ex;
} else {
e.addSuppressed(ex);
}
}
if (e != null) {
throw e;
}
} }