MAPREDUCE-6731. TestMRTimelineEventHandling#testMRNewTimelineServiceEventHandling fails randomly for concurrent tests. (Sangjin Lee via Varun Saxena).
(cherry picked from commit d14e729a55c0ca4053c537290392b74f732ce939)
This commit is contained in:
parent
16942ea115
commit
0820df611d
@ -175,6 +175,13 @@ public void testMRTimelineEventHandling() throws Exception {
|
|||||||
@Test
|
@Test
|
||||||
public void testMRNewTimelineServiceEventHandling() throws Exception {
|
public void testMRNewTimelineServiceEventHandling() throws Exception {
|
||||||
LOG.info("testMRNewTimelineServiceEventHandling start.");
|
LOG.info("testMRNewTimelineServiceEventHandling start.");
|
||||||
|
|
||||||
|
String testDir =
|
||||||
|
new File("target", getClass().getSimpleName() +
|
||||||
|
"-test_dir").getAbsolutePath();
|
||||||
|
String storageDir =
|
||||||
|
testDir + File.separator + "timeline_service_data";
|
||||||
|
|
||||||
Configuration conf = new YarnConfiguration();
|
Configuration conf = new YarnConfiguration();
|
||||||
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||||
// enable new timeline service
|
// enable new timeline service
|
||||||
@ -182,6 +189,9 @@ public void testMRNewTimelineServiceEventHandling() throws Exception {
|
|||||||
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
|
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
|
||||||
FileSystemTimelineWriterImpl.class, TimelineWriter.class);
|
FileSystemTimelineWriterImpl.class, TimelineWriter.class);
|
||||||
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
|
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
|
||||||
|
// set the file system root directory
|
||||||
|
conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
|
||||||
|
storageDir);
|
||||||
|
|
||||||
// enable aux-service based timeline collectors
|
// enable aux-service based timeline collectors
|
||||||
conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
|
conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
|
||||||
@ -198,8 +208,8 @@ public void testMRNewTimelineServiceEventHandling() throws Exception {
|
|||||||
cluster.start();
|
cluster.start();
|
||||||
LOG.info("A MiniMRYarnCluster get start.");
|
LOG.info("A MiniMRYarnCluster get start.");
|
||||||
|
|
||||||
Path inDir = new Path("input");
|
Path inDir = new Path(testDir, "input");
|
||||||
Path outDir = new Path("output");
|
Path outDir = new Path(testDir, "output");
|
||||||
LOG.info("Run 1st job which should be successful.");
|
LOG.info("Run 1st job which should be successful.");
|
||||||
JobConf successConf = new JobConf(conf);
|
JobConf successConf = new JobConf(conf);
|
||||||
successConf.set("dummy_conf1",
|
successConf.set("dummy_conf1",
|
||||||
@ -227,7 +237,7 @@ public void testMRNewTimelineServiceEventHandling() throws Exception {
|
|||||||
ApplicationReport appReport = apps.get(0);
|
ApplicationReport appReport = apps.get(0);
|
||||||
firstAppId = appReport.getApplicationId();
|
firstAppId = appReport.getApplicationId();
|
||||||
UtilsForTests.waitForAppFinished(job, cluster);
|
UtilsForTests.waitForAppFinished(job, cluster);
|
||||||
checkNewTimelineEvent(firstAppId, appReport);
|
checkNewTimelineEvent(firstAppId, appReport, storageDir);
|
||||||
|
|
||||||
LOG.info("Run 2nd job which should be failed.");
|
LOG.info("Run 2nd job which should be failed.");
|
||||||
job = UtilsForTests.runJobFail(new JobConf(conf), inDir, outDir);
|
job = UtilsForTests.runJobFail(new JobConf(conf), inDir, outDir);
|
||||||
@ -240,41 +250,38 @@ public void testMRNewTimelineServiceEventHandling() throws Exception {
|
|||||||
appReport = apps.get(0).getApplicationId().equals(firstAppId) ?
|
appReport = apps.get(0).getApplicationId().equals(firstAppId) ?
|
||||||
apps.get(0) : apps.get(1);
|
apps.get(0) : apps.get(1);
|
||||||
|
|
||||||
checkNewTimelineEvent(firstAppId, appReport);
|
checkNewTimelineEvent(firstAppId, appReport, storageDir);
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
cluster.stop();
|
cluster.stop();
|
||||||
}
|
}
|
||||||
// Cleanup test file
|
// Cleanup test file
|
||||||
String testRoot =
|
File testDirFolder = new File(testDir);
|
||||||
FileSystemTimelineWriterImpl.
|
if(testDirFolder.isDirectory()) {
|
||||||
DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT;
|
FileUtils.deleteDirectory(testDirFolder);
|
||||||
File testRootFolder = new File(testRoot);
|
|
||||||
if(testRootFolder.isDirectory()) {
|
|
||||||
FileUtils.deleteDirectory(testRootFolder);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkNewTimelineEvent(ApplicationId appId,
|
private void checkNewTimelineEvent(ApplicationId appId,
|
||||||
ApplicationReport appReport) throws IOException {
|
ApplicationReport appReport, String storageDir) throws IOException {
|
||||||
String tmpRoot =
|
String tmpRoot = storageDir + File.separator + "entities" + File.separator;
|
||||||
FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
|
|
||||||
+ "/entities/";
|
|
||||||
|
|
||||||
File tmpRootFolder = new File(tmpRoot);
|
File tmpRootFolder = new File(tmpRoot);
|
||||||
|
|
||||||
Assert.assertTrue(tmpRootFolder.isDirectory());
|
Assert.assertTrue(tmpRootFolder.isDirectory());
|
||||||
String basePath = tmpRoot + YarnConfiguration.DEFAULT_RM_CLUSTER_ID +
|
String basePath = tmpRoot + YarnConfiguration.DEFAULT_RM_CLUSTER_ID +
|
||||||
"/" + UserGroupInformation.getCurrentUser().getShortUserName() +
|
File.separator +
|
||||||
"/" + appReport.getName() +
|
UserGroupInformation.getCurrentUser().getShortUserName() +
|
||||||
"/" + TimelineUtils.DEFAULT_FLOW_VERSION +
|
File.separator + appReport.getName() +
|
||||||
"/" + appReport.getStartTime() +
|
File.separator + TimelineUtils.DEFAULT_FLOW_VERSION +
|
||||||
"/" + appId.toString();
|
File.separator + appReport.getStartTime() +
|
||||||
|
File.separator + appId.toString();
|
||||||
// for this test, we expect MAPREDUCE_JOB and MAPREDUCE_TASK dirs
|
// for this test, we expect MAPREDUCE_JOB and MAPREDUCE_TASK dirs
|
||||||
String outputDirJob = basePath + "/MAPREDUCE_JOB/";
|
String outputDirJob =
|
||||||
|
basePath + File.separator + "MAPREDUCE_JOB" + File.separator;
|
||||||
|
|
||||||
File entityFolder = new File(outputDirJob);
|
File entityFolder = new File(outputDirJob);
|
||||||
Assert.assertTrue("Job output directory: " + outputDirJob +
|
Assert.assertTrue("Job output directory: " + outputDirJob +
|
||||||
@ -297,7 +304,8 @@ private void checkNewTimelineEvent(ApplicationId appId,
|
|||||||
verifyEntity(jobEventFile, null, false, true, cfgsToCheck);
|
verifyEntity(jobEventFile, null, false, true, cfgsToCheck);
|
||||||
|
|
||||||
// for this test, we expect MR job metrics are published in YARN_APPLICATION
|
// for this test, we expect MR job metrics are published in YARN_APPLICATION
|
||||||
String outputAppDir = basePath + "/YARN_APPLICATION/";
|
String outputAppDir =
|
||||||
|
basePath + File.separator + "YARN_APPLICATION" + File.separator;
|
||||||
entityFolder = new File(outputAppDir);
|
entityFolder = new File(outputAppDir);
|
||||||
Assert.assertTrue(
|
Assert.assertTrue(
|
||||||
"Job output directory: " + outputAppDir +
|
"Job output directory: " + outputAppDir +
|
||||||
@ -318,7 +326,8 @@ private void checkNewTimelineEvent(ApplicationId appId,
|
|||||||
verifyEntity(appEventFile, null, false, true, cfgsToCheck);
|
verifyEntity(appEventFile, null, false, true, cfgsToCheck);
|
||||||
|
|
||||||
// check for task event file
|
// check for task event file
|
||||||
String outputDirTask = basePath + "/MAPREDUCE_TASK/";
|
String outputDirTask =
|
||||||
|
basePath + File.separator + "MAPREDUCE_TASK" + File.separator;
|
||||||
File taskFolder = new File(outputDirTask);
|
File taskFolder = new File(outputDirTask);
|
||||||
Assert.assertTrue("Task output directory: " + outputDirTask +
|
Assert.assertTrue("Task output directory: " + outputDirTask +
|
||||||
" does not exist.",
|
" does not exist.",
|
||||||
@ -338,7 +347,8 @@ private void checkNewTimelineEvent(ApplicationId appId,
|
|||||||
true, false, null);
|
true, false, null);
|
||||||
|
|
||||||
// check for task attempt event file
|
// check for task attempt event file
|
||||||
String outputDirTaskAttempt = basePath + "/MAPREDUCE_TASK_ATTEMPT/";
|
String outputDirTaskAttempt =
|
||||||
|
basePath + File.separator + "MAPREDUCE_TASK_ATTEMPT" + File.separator;
|
||||||
File taskAttemptFolder = new File(outputDirTaskAttempt);
|
File taskAttemptFolder = new File(outputDirTaskAttempt);
|
||||||
Assert.assertTrue("TaskAttempt output directory: " + outputDirTaskAttempt +
|
Assert.assertTrue("TaskAttempt output directory: " + outputDirTaskAttempt +
|
||||||
" does not exist.", taskAttemptFolder.isDirectory());
|
" does not exist.", taskAttemptFolder.isDirectory());
|
||||||
|
Loading…
x
Reference in New Issue
Block a user