MAPREDUCE-7131. Job History Server has race condition where it moves files from intermediate to finished but thinks file is in intermediate. Contributed by Anthony Hsu
(cherry picked from commit eb0b5a844f
)
This commit is contained in:
parent
d1ea7df43d
commit
4a55092c6d
|
@ -1089,7 +1089,17 @@ public class HistoryFileManager extends AbstractService {
|
||||||
private void moveToDoneNow(final Path src, final Path target)
|
private void moveToDoneNow(final Path src, final Path target)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
LOG.info("Moving " + src.toString() + " to " + target.toString());
|
LOG.info("Moving " + src.toString() + " to " + target.toString());
|
||||||
|
try {
|
||||||
intermediateDoneDirFc.rename(src, target, Options.Rename.NONE);
|
intermediateDoneDirFc.rename(src, target, Options.Rename.NONE);
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
if (doneDirFc.util().exists(target)) {
|
||||||
|
LOG.info("Source file " + src.toString() + " not found, but target "
|
||||||
|
+ "file " + target.toString() + " already exists. Move already "
|
||||||
|
+ "happened.");
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getJobSummary(FileContext fc, Path path) throws IOException {
|
private String getJobSummary(FileContext fc, Path path) throws IOException {
|
||||||
|
|
|
@ -26,6 +26,7 @@ import java.util.UUID;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
@ -341,6 +342,57 @@ public class TestHistoryFileManager {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test sets up a scenario where the history files have already been
|
||||||
|
* moved to the "done" directory (so the "intermediate" directory is empty),
|
||||||
|
* but then moveToDone() is called again on the same history file. It
|
||||||
|
* validates that the second moveToDone() still succeeds rather than throws a
|
||||||
|
* FileNotFoundException.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMoveToDoneAlreadyMovedSucceeds() throws Exception {
|
||||||
|
HistoryFileManagerTest historyFileManager = new HistoryFileManagerTest();
|
||||||
|
long jobTimestamp = 1535436603000L;
|
||||||
|
String job = "job_" + jobTimestamp + "_123456789";
|
||||||
|
|
||||||
|
String intermediateDirectory = "/" + UUID.randomUUID();
|
||||||
|
String doneDirectory = "/" + UUID.randomUUID();
|
||||||
|
Configuration conf = dfsCluster.getConfiguration(0);
|
||||||
|
conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR,
|
||||||
|
intermediateDirectory);
|
||||||
|
conf.set(JHAdminConfig.MR_HISTORY_DONE_DIR, doneDirectory);
|
||||||
|
|
||||||
|
Path intermediateHistoryFilePath = new Path(intermediateDirectory + "/"
|
||||||
|
+ job + ".jhist");
|
||||||
|
Path intermediateConfFilePath = new Path(intermediateDirectory + "/"
|
||||||
|
+ job + "_conf.xml");
|
||||||
|
Path doneHistoryFilePath = new Path(doneDirectory + "/"
|
||||||
|
+ JobHistoryUtils.timestampDirectoryComponent(jobTimestamp) + "/123456/"
|
||||||
|
+ job + ".jhist");
|
||||||
|
Path doneConfFilePath = new Path(doneDirectory + "/"
|
||||||
|
+ JobHistoryUtils.timestampDirectoryComponent(jobTimestamp)
|
||||||
|
+ "/123456/" + job + "_conf.xml");
|
||||||
|
|
||||||
|
dfsCluster.getFileSystem().createNewFile(doneHistoryFilePath);
|
||||||
|
dfsCluster.getFileSystem().createNewFile(doneConfFilePath);
|
||||||
|
|
||||||
|
historyFileManager.serviceInit(conf);
|
||||||
|
|
||||||
|
JobIndexInfo jobIndexInfo = new JobIndexInfo();
|
||||||
|
jobIndexInfo.setJobId(TypeConverter.toYarn(JobID.forName(job)));
|
||||||
|
jobIndexInfo.setFinishTime(jobTimestamp);
|
||||||
|
HistoryFileInfo info = historyFileManager.getHistoryFileInfo(
|
||||||
|
intermediateHistoryFilePath, intermediateConfFilePath, null,
|
||||||
|
jobIndexInfo, false);
|
||||||
|
info.moveToDone();
|
||||||
|
|
||||||
|
Assert.assertFalse(info.isMovePending());
|
||||||
|
Assert.assertEquals(doneHistoryFilePath.toString(),
|
||||||
|
info.getHistoryFile().toUri().getPath());
|
||||||
|
Assert.assertEquals(doneConfFilePath.toString(),
|
||||||
|
info.getConfFile().toUri().getPath());
|
||||||
|
}
|
||||||
|
|
||||||
static class HistoryFileManagerTest extends HistoryFileManager {
|
static class HistoryFileManagerTest extends HistoryFileManager {
|
||||||
public HistoryFileManagerTest() {
|
public HistoryFileManagerTest() {
|
||||||
super();
|
super();
|
||||||
|
|
Loading…
Reference in New Issue