MAPREDUCE-7131. Job History Server has race condition where it moves files from intermediate to finished but thinks file is in intermediate. Contributed by Anthony Hsu
(cherry picked from commit eb0b5a844f960017f6f48d746174d0f5826f0e5f)
This commit is contained in:
parent
f6e4b13d5f
commit
3eb70d4dcb
@ -1092,7 +1092,17 @@ public HistoryFileInfo getFileInfo(JobId jobId) throws IOException {
|
||||
private void moveToDoneNow(final Path src, final Path target)
|
||||
throws IOException {
|
||||
LOG.info("Moving " + src.toString() + " to " + target.toString());
|
||||
intermediateDoneDirFc.rename(src, target, Options.Rename.NONE);
|
||||
try {
|
||||
intermediateDoneDirFc.rename(src, target, Options.Rename.NONE);
|
||||
} catch (FileNotFoundException e) {
|
||||
if (doneDirFc.util().exists(target)) {
|
||||
LOG.info("Source file " + src.toString() + " not found, but target "
|
||||
+ "file " + target.toString() + " already exists. Move already "
|
||||
+ "happened.");
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String getJobSummary(FileContext fc, Path path) throws IOException {
|
||||
|
@ -26,6 +26,7 @@
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
||||
import org.junit.Assert;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
@ -341,6 +342,57 @@ public void testHistoryFileInfoShouldReturnCompletedJobIfMaxNotConfiged()
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* This test sets up a scenario where the history files have already been
|
||||
* moved to the "done" directory (so the "intermediate" directory is empty),
|
||||
* but then moveToDone() is called again on the same history file. It
|
||||
* validates that the second moveToDone() still succeeds rather than throws a
|
||||
* FileNotFoundException.
|
||||
*/
|
||||
@Test
|
||||
public void testMoveToDoneAlreadyMovedSucceeds() throws Exception {
|
||||
HistoryFileManagerTest historyFileManager = new HistoryFileManagerTest();
|
||||
long jobTimestamp = 1535436603000L;
|
||||
String job = "job_" + jobTimestamp + "_123456789";
|
||||
|
||||
String intermediateDirectory = "/" + UUID.randomUUID();
|
||||
String doneDirectory = "/" + UUID.randomUUID();
|
||||
Configuration conf = dfsCluster.getConfiguration(0);
|
||||
conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR,
|
||||
intermediateDirectory);
|
||||
conf.set(JHAdminConfig.MR_HISTORY_DONE_DIR, doneDirectory);
|
||||
|
||||
Path intermediateHistoryFilePath = new Path(intermediateDirectory + "/"
|
||||
+ job + ".jhist");
|
||||
Path intermediateConfFilePath = new Path(intermediateDirectory + "/"
|
||||
+ job + "_conf.xml");
|
||||
Path doneHistoryFilePath = new Path(doneDirectory + "/"
|
||||
+ JobHistoryUtils.timestampDirectoryComponent(jobTimestamp) + "/123456/"
|
||||
+ job + ".jhist");
|
||||
Path doneConfFilePath = new Path(doneDirectory + "/"
|
||||
+ JobHistoryUtils.timestampDirectoryComponent(jobTimestamp)
|
||||
+ "/123456/" + job + "_conf.xml");
|
||||
|
||||
dfsCluster.getFileSystem().createNewFile(doneHistoryFilePath);
|
||||
dfsCluster.getFileSystem().createNewFile(doneConfFilePath);
|
||||
|
||||
historyFileManager.serviceInit(conf);
|
||||
|
||||
JobIndexInfo jobIndexInfo = new JobIndexInfo();
|
||||
jobIndexInfo.setJobId(TypeConverter.toYarn(JobID.forName(job)));
|
||||
jobIndexInfo.setFinishTime(jobTimestamp);
|
||||
HistoryFileInfo info = historyFileManager.getHistoryFileInfo(
|
||||
intermediateHistoryFilePath, intermediateConfFilePath, null,
|
||||
jobIndexInfo, false);
|
||||
info.moveToDone();
|
||||
|
||||
Assert.assertFalse(info.isMovePending());
|
||||
Assert.assertEquals(doneHistoryFilePath.toString(),
|
||||
info.getHistoryFile().toUri().getPath());
|
||||
Assert.assertEquals(doneConfFilePath.toString(),
|
||||
info.getConfFile().toUri().getPath());
|
||||
}
|
||||
|
||||
static class HistoryFileManagerTest extends HistoryFileManager {
|
||||
public HistoryFileManagerTest() {
|
||||
super();
|
||||
|
Loading…
x
Reference in New Issue
Block a user