diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 044d3a7514b..0a29a9d52db 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -717,6 +717,9 @@ Release 0.23.7 - UNRELEASED MAPREDUCE-4991. coverage for gridmix (Aleksey Gorshkov via tgraves) + MAPREDUCE-5007. fix coverage org.apache.hadoop.mapreduce.v2.hs (Aleksey + Gorshkov via tgraves) + Release 0.23.6 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java index 004d5b0a6f2..54f0a6ef474 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java @@ -869,4 +869,8 @@ void clean() throws IOException { } } } + @VisibleForTesting + protected void setMaxHistoryAge(long newValue){ + maxHistoryAge=newValue; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestCompletedTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestCompletedTask.java index 23e636d0dfc..0206e95f1f0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestCompletedTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestCompletedTask.java @@ -21,46 +21,75 @@ import java.util.Map; import java.util.TreeMap; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; +import org.apache.hadoop.mapreduce.v2.api.records.Phase; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskReport; import org.apache.hadoop.mapreduce.v2.hs.CompletedTask; -import org.junit.Assert; import org.junit.Test; -import org.mockito.Mockito; +import static org.mockito.Mockito.*; +import static org.junit.Assert.*; public class TestCompletedTask{ - @Test + @Test (timeout=5000) public void testTaskStartTimes() { - TaskId taskId = Mockito.mock(TaskId.class); - TaskInfo taskInfo = Mockito.mock(TaskInfo.class); + TaskId taskId = mock(TaskId.class); + TaskInfo taskInfo = mock(TaskInfo.class); Map taskAttempts = new TreeMap(); TaskAttemptID id = new TaskAttemptID("0", 0, TaskType.MAP, 0, 0); - TaskAttemptInfo info = Mockito.mock(TaskAttemptInfo.class); - Mockito.when(info.getAttemptId()).thenReturn(id); - Mockito.when(info.getStartTime()).thenReturn(10l); + TaskAttemptInfo info = mock(TaskAttemptInfo.class); + when(info.getAttemptId()).thenReturn(id); + when(info.getStartTime()).thenReturn(10l); taskAttempts.put(id, info); id = new TaskAttemptID("1", 0, TaskType.MAP, 1, 1); - info = Mockito.mock(TaskAttemptInfo.class); - Mockito.when(info.getAttemptId()).thenReturn(id); - Mockito.when(info.getStartTime()).thenReturn(20l); + info = mock(TaskAttemptInfo.class); + when(info.getAttemptId()).thenReturn(id); + when(info.getStartTime()).thenReturn(20l); taskAttempts.put(id, info); - Mockito.when(taskInfo.getAllTaskAttempts()).thenReturn(taskAttempts); + when(taskInfo.getAllTaskAttempts()).thenReturn(taskAttempts); CompletedTask task = new CompletedTask(taskId, taskInfo); TaskReport report = task.getReport(); // Make sure the startTime returned by report is the lesser of the // attempy launch times - Assert.assertTrue(report.getStartTime() == 10); + assertTrue(report.getStartTime() == 10); + } + /** + * test some methods of CompletedTaskAttempt + */ + @Test (timeout=5000) + public void testCompletedTaskAttempt(){ + + TaskAttemptInfo attemptInfo= mock(TaskAttemptInfo.class); + when(attemptInfo.getRackname()).thenReturn("Rackname"); + when(attemptInfo.getShuffleFinishTime()).thenReturn(11L); + when(attemptInfo.getSortFinishTime()).thenReturn(12L); + when(attemptInfo.getShufflePort()).thenReturn(10); + + JobID jobId= new JobID("12345",0); + TaskID taskId =new TaskID(jobId,TaskType.REDUCE, 0); + TaskAttemptID taskAttemptId= new TaskAttemptID(taskId, 0); + when(attemptInfo.getAttemptId()).thenReturn(taskAttemptId); + + + CompletedTaskAttempt taskAttemt= new CompletedTaskAttempt(null,attemptInfo); + assertEquals( "Rackname", taskAttemt.getNodeRackName()); + assertEquals( Phase.CLEANUP, taskAttemt.getPhase()); + assertTrue( taskAttemt.isFinished()); + assertEquals( 11L, taskAttemt.getShuffleFinishTime()); + assertEquals( 12L, taskAttemt.getSortFinishTime()); + assertEquals( 10, taskAttemt.getShufflePort()); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java index 837f9e17b73..3c827cd6adf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobACLsManager; +import org.apache.hadoop.mapred.TaskCompletionEvent; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; @@ -46,6 +47,7 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import static org.junit.Assert.*; import static org.mockito.Mockito.*; @RunWith(value = Parameterized.class) @@ -79,7 +81,7 @@ public static Collection data() { } /* Verify some expected values based on the history file */ - @Test (timeout=10000) + @Test (timeout=100000) public void testCompletedJob() throws Exception { HistoryFileInfo info = mock(HistoryFileInfo.class); when(info.getConfFile()).thenReturn(fullConfPath); @@ -168,4 +170,45 @@ public void testCompletedTaskAttempt() throws Exception { assertEquals(45454, rta1Report.getNodeManagerPort()); assertEquals(9999, rta1Report.getNodeManagerHttpPort()); } + /** + * Simple test of some methods of CompletedJob + * @throws Exception + */ + @Test (timeout=30000) + public void testGetTaskAttemptCompletionEvent() throws Exception{ + HistoryFileInfo info = mock(HistoryFileInfo.class); + when(info.getConfFile()).thenReturn(fullConfPath); + completedJob = + new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user", + info, jobAclsManager); + TaskCompletionEvent[] events= completedJob.getMapAttemptCompletionEvents(0,1000); + assertEquals(10, completedJob.getMapAttemptCompletionEvents(0,10).length); + int currentEventId=0; + for (TaskCompletionEvent taskAttemptCompletionEvent : events) { + int eventId= taskAttemptCompletionEvent.getEventId(); + assertTrue(eventId>=currentEventId); + currentEventId=eventId; + } + assertNull(completedJob.loadConfFile() ); + // job name + assertEquals("Sleep job",completedJob.getName()); + // queue name + assertEquals("default",completedJob.getQueueName()); + // progress + assertEquals(1.0, completedJob.getProgress(),0.001); + // 11 rows in answer + assertEquals(11,completedJob.getTaskAttemptCompletionEvents(0,1000).length); + // select first 10 rows + assertEquals(10,completedJob.getTaskAttemptCompletionEvents(0,10).length); + // select 5-10 rows include 5th + assertEquals(6,completedJob.getTaskAttemptCompletionEvents(5,10).length); + + // without errors + assertEquals(1,completedJob.getDiagnostics().size()); + assertEquals("",completedJob.getDiagnostics().get(0)); + + assertEquals(0, completedJob.getJobACLs().size()); + + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java index 073a17bde17..ba0eb8e353f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java @@ -18,6 +18,10 @@ package org.apache.hadoop.mapreduce.v2.hs; +import static junit.framework.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.PrintStream; @@ -54,6 +58,9 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.JobIdPBImpl; +import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.TaskIdPBImpl; import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; @@ -65,7 +72,9 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; +import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo; import org.apache.hadoop.net.DNSToSwitchMapping; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.util.BuilderUtils; @@ -80,12 +89,12 @@ public class TestJobHistoryParsing { private static final String RACK_NAME = "/MyRackName"; - private ByteArrayOutputStream outContent = new ByteArrayOutputStream(); + private ByteArrayOutputStream outContent = new ByteArrayOutputStream(); public static class MyResolver implements DNSToSwitchMapping { @Override public List resolve(List names) { - return Arrays.asList(new String[]{RACK_NAME}); + return Arrays.asList(new String[] { RACK_NAME }); } @Override @@ -93,14 +102,14 @@ public void reloadCachedMappings() { } } - @Test (timeout=50000) + @Test(timeout = 50000) public void testJobInfo() throws Exception { JobInfo info = new JobInfo(); Assert.assertEquals("NORMAL", info.getPriority()); info.printAll(); } - @Test (timeout=50000) + @Test(timeout = 300000) public void testHistoryParsing() throws Exception { LOG.info("STARTING testHistoryParsing()"); try { @@ -109,8 +118,8 @@ public void testHistoryParsing() throws Exception { LOG.info("FINISHED testHistoryParsing()"); } } - - @Test (timeout=50000) + + @Test(timeout = 50000) public void testHistoryParsingWithParseErrors() throws Exception { LOG.info("STARTING testHistoryParsingWithParseErrors()"); try { @@ -119,18 +128,18 @@ public void testHistoryParsingWithParseErrors() throws Exception { LOG.info("FINISHED testHistoryParsingWithParseErrors()"); } } - - private static String getJobSummary(FileContext fc, Path path) throws IOException { + + private static String getJobSummary(FileContext fc, Path path) + throws IOException { Path qPath = fc.makeQualified(path); FSDataInputStream in = fc.open(qPath); String jobSummaryString = in.readUTF(); in.close(); return jobSummaryString; } - + private void checkHistoryParsing(final int numMaps, final int numReduces, - final int numSuccessfulMaps) - throws Exception { + final int numSuccessfulMaps) throws Exception { Configuration conf = new Configuration(); conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name")); long amStartTimeEst = System.currentTimeMillis(); @@ -138,9 +147,8 @@ private void checkHistoryParsing(final int numMaps, final int numReduces, CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, MyResolver.class, DNSToSwitchMapping.class); RackResolver.init(conf); - MRApp app = - new MRAppWithHistory(numMaps, numReduces, true, - this.getClass().getName(), true); + MRApp app = new MRAppWithHistory(numMaps, numReduces, true, this.getClass() + .getName(), true); app.submit(conf); Job job = app.getContext().getAllJobs().values().iterator().next(); JobId jobId = job.getID(); @@ -152,7 +160,7 @@ private void checkHistoryParsing(final int numMaps, final int numReduces, String jobhistoryDir = JobHistoryUtils .getHistoryIntermediateDoneDirForUser(conf); - + FileContext fc = null; try { fc = FileContext.getFileContext(conf); @@ -160,7 +168,7 @@ private void checkHistoryParsing(final int numMaps, final int numReduces, LOG.info("Can not get FileContext", ioe); throw (new Exception("Can not get File Context")); } - + if (numMaps == numSuccessfulMaps) { String summaryFileName = JobHistoryUtils .getIntermediateSummaryFileName(jobId); @@ -185,20 +193,22 @@ private void checkHistoryParsing(final int numMaps, final int numReduces, Long.parseLong(jobSummaryElements.get("submitTime")) != 0); Assert.assertTrue("launchTime should not be 0", Long.parseLong(jobSummaryElements.get("launchTime")) != 0); - Assert.assertTrue("firstMapTaskLaunchTime should not be 0", - Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0); Assert - .assertTrue( - "firstReduceTaskLaunchTime should not be 0", - Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0); + .assertTrue( + "firstMapTaskLaunchTime should not be 0", + Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0); + Assert + .assertTrue("firstReduceTaskLaunchTime should not be 0", + Long.parseLong(jobSummaryElements + .get("firstReduceTaskLaunchTime")) != 0); Assert.assertTrue("finishTime should not be 0", Long.parseLong(jobSummaryElements.get("finishTime")) != 0); Assert.assertEquals("Mismatch in num map slots", numSuccessfulMaps, Integer.parseInt(jobSummaryElements.get("numMaps"))); Assert.assertEquals("Mismatch in num reduce slots", numReduces, Integer.parseInt(jobSummaryElements.get("numReduces"))); - Assert.assertEquals("User does not match", System.getProperty("user.name"), - jobSummaryElements.get("user")); + Assert.assertEquals("User does not match", + System.getProperty("user.name"), jobSummaryElements.get("user")); Assert.assertEquals("Queue does not match", "default", jobSummaryElements.get("queue")); Assert.assertEquals("Status does not match", "SUCCEEDED", @@ -210,8 +220,8 @@ private void checkHistoryParsing(final int numMaps, final int numReduces, HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId); JobInfo jobInfo; long numFinishedMaps; - - synchronized(fileInfo) { + + synchronized (fileInfo) { Path historyFilePath = fileInfo.getHistoryFile(); FSDataInputStream in = null; LOG.info("JobHistoryFile is: " + historyFilePath); @@ -228,11 +238,11 @@ private void checkHistoryParsing(final int numMaps, final int numReduces, if (numMaps == numSuccessfulMaps) { reader = realReader; } else { - final AtomicInteger numFinishedEvents = new AtomicInteger(0); // Hack! + final AtomicInteger numFinishedEvents = new AtomicInteger(0); // Hack! Mockito.when(reader.getNextEvent()).thenAnswer( new Answer() { - public HistoryEvent answer(InvocationOnMock invocation) - throws IOException { + public HistoryEvent answer(InvocationOnMock invocation) + throws IOException { HistoryEvent event = realReader.getNextEvent(); if (event instanceof TaskFinishedEvent) { numFinishedEvents.incrementAndGet(); @@ -244,22 +254,20 @@ public HistoryEvent answer(InvocationOnMock invocation) throw new IOException("test"); } } - } - ); + }); } jobInfo = parser.parse(reader); - numFinishedMaps = - computeFinishedMaps(jobInfo, numMaps, numSuccessfulMaps); + numFinishedMaps = computeFinishedMaps(jobInfo, numMaps, numSuccessfulMaps); if (numFinishedMaps != numMaps) { Exception parseException = parser.getParseException(); - Assert.assertNotNull("Didn't get expected parse exception", + Assert.assertNotNull("Didn't get expected parse exception", parseException); } } - + Assert.assertEquals("Incorrect username ", System.getProperty("user.name"), jobInfo.getUsername()); Assert.assertEquals("Incorrect jobName ", "test", jobInfo.getJobname()); @@ -267,7 +275,7 @@ public HistoryEvent answer(InvocationOnMock invocation) jobInfo.getJobQueueName()); Assert .assertEquals("incorrect conf path", "test", jobInfo.getJobConfPath()); - Assert.assertEquals("incorrect finishedMap ", numSuccessfulMaps, + Assert.assertEquals("incorrect finishedMap ", numSuccessfulMaps, numFinishedMaps); Assert.assertEquals("incorrect finishedReduces ", numReduces, jobInfo.getFinishedReduces()); @@ -275,8 +283,8 @@ public HistoryEvent answer(InvocationOnMock invocation) jobInfo.getUberized()); Map allTasks = jobInfo.getAllTasks(); int totalTasks = allTasks.size(); - Assert.assertEquals("total number of tasks is incorrect ", - (numMaps+numReduces), totalTasks); + Assert.assertEquals("total number of tasks is incorrect ", + (numMaps + numReduces), totalTasks); // Verify aminfo Assert.assertEquals(1, jobInfo.getAMInfos().size()); @@ -306,8 +314,7 @@ public HistoryEvent answer(InvocationOnMock invocation) // Deep compare Job and JobInfo for (Task task : job.getTasks().values()) { - TaskInfo taskInfo = allTasks.get( - TypeConverter.fromYarn(task.getID())); + TaskInfo taskInfo = allTasks.get(TypeConverter.fromYarn(task.getID())); Assert.assertNotNull("TaskInfo not found", taskInfo); for (TaskAttempt taskAttempt : task.getAttempts().values()) { TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get( @@ -318,27 +325,32 @@ public HistoryEvent answer(InvocationOnMock invocation) if (numMaps == numSuccessfulMaps) { Assert.assertEquals(MRApp.NM_HOST, taskAttemptInfo.getHostname()); Assert.assertEquals(MRApp.NM_PORT, taskAttemptInfo.getPort()); - + // Verify rack-name - Assert.assertEquals("rack-name is incorrect", taskAttemptInfo - .getRackname(), RACK_NAME); + Assert.assertEquals("rack-name is incorrect", + taskAttemptInfo.getRackname(), RACK_NAME); } } } - + // test output for HistoryViewer - PrintStream stdps=System.out; + PrintStream stdps = System.out; try { System.setOut(new PrintStream(outContent)); HistoryViewer viewer = new HistoryViewer(fc.makeQualified( fileInfo.getHistoryFile()).toString(), conf, true); viewer.print(); - - for (TaskInfo taskInfo : allTasks.values()) { - - String test= (taskInfo.getTaskStatus()==null?"":taskInfo.getTaskStatus())+" "+taskInfo.getTaskType()+" task list for "+taskInfo.getTaskId().getJobID(); - Assert.assertTrue(outContent.toString().indexOf(test)>0); - Assert.assertTrue(outContent.toString().indexOf(taskInfo.getTaskId().toString())>0); + + for (TaskInfo taskInfo : allTasks.values()) { + + String test = (taskInfo.getTaskStatus() == null ? "" : taskInfo + .getTaskStatus()) + + " " + + taskInfo.getTaskType() + + " task list for " + taskInfo.getTaskId().getJobID(); + Assert.assertTrue(outContent.toString().indexOf(test) > 0); + Assert.assertTrue(outContent.toString().indexOf( + taskInfo.getTaskId().toString()) > 0); } } finally { System.setOut(stdps); @@ -363,186 +375,180 @@ private long computeFinishedMaps(JobInfo jobInfo, int numMaps, } return numFinishedMaps; } - - @Test (timeout=50000) + + @Test(timeout = 30000) public void testHistoryParsingForFailedAttempts() throws Exception { LOG.info("STARTING testHistoryParsingForFailedAttempts"); try { - Configuration conf = new Configuration(); - conf - .setClass( - CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, - MyResolver.class, DNSToSwitchMapping.class); - RackResolver.init(conf); - MRApp app = new MRAppWithHistoryWithFailedAttempt(2, 1, true, this.getClass().getName(), - true); - app.submit(conf); - Job job = app.getContext().getAllJobs().values().iterator().next(); - JobId jobId = job.getID(); - app.waitForState(job, JobState.SUCCEEDED); - - // make sure all events are flushed - app.waitForState(Service.STATE.STOPPED); + Configuration conf = new Configuration(); + conf.setClass( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + MyResolver.class, DNSToSwitchMapping.class); + RackResolver.init(conf); + MRApp app = new MRAppWithHistoryWithFailedAttempt(2, 1, true, this + .getClass().getName(), true); + app.submit(conf); + Job job = app.getContext().getAllJobs().values().iterator().next(); + JobId jobId = job.getID(); + app.waitForState(job, JobState.SUCCEEDED); - String jobhistoryDir = JobHistoryUtils - .getHistoryIntermediateDoneDirForUser(conf); - JobHistory jobHistory = new JobHistory(); - jobHistory.init(conf); + // make sure all events are flushed + app.waitForState(Service.STATE.STOPPED); - JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId) - .getJobIndexInfo(); - String jobhistoryFileName = FileNameIndexUtils - .getDoneFileName(jobIndexInfo); + String jobhistoryDir = JobHistoryUtils + .getHistoryIntermediateDoneDirForUser(conf); + JobHistory jobHistory = new JobHistory(); + jobHistory.init(conf); - Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName); - FSDataInputStream in = null; - FileContext fc = null; - try { - fc = FileContext.getFileContext(conf); - in = fc.open(fc.makeQualified(historyFilePath)); - } catch (IOException ioe) { - LOG.info("Can not open history file: " + historyFilePath, ioe); - throw (new Exception("Can not open History File")); - } + JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId) + .getJobIndexInfo(); + String jobhistoryFileName = FileNameIndexUtils + .getDoneFileName(jobIndexInfo); - JobHistoryParser parser = new JobHistoryParser(in); - JobInfo jobInfo = parser.parse(); - Exception parseException = parser.getParseException(); - Assert.assertNull("Caught an expected exception " + parseException, - parseException); - int noOffailedAttempts = 0; - Map allTasks = jobInfo.getAllTasks(); - for (Task task : job.getTasks().values()) { - TaskInfo taskInfo = allTasks.get(TypeConverter.fromYarn(task.getID())); - for (TaskAttempt taskAttempt : task.getAttempts().values()) { - TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get( - TypeConverter.fromYarn((taskAttempt.getID()))); - // Verify rack-name for all task attempts - Assert.assertEquals("rack-name is incorrect", taskAttemptInfo - .getRackname(), RACK_NAME); - if (taskAttemptInfo.getTaskStatus().equals("FAILED")) { - noOffailedAttempts++; + Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName); + FSDataInputStream in = null; + FileContext fc = null; + try { + fc = FileContext.getFileContext(conf); + in = fc.open(fc.makeQualified(historyFilePath)); + } catch (IOException ioe) { + LOG.info("Can not open history file: " + historyFilePath, ioe); + throw (new Exception("Can not open History File")); + } + + JobHistoryParser parser = new JobHistoryParser(in); + JobInfo jobInfo = parser.parse(); + Exception parseException = parser.getParseException(); + Assert.assertNull("Caught an expected exception " + parseException, + parseException); + int noOffailedAttempts = 0; + Map allTasks = jobInfo.getAllTasks(); + for (Task task : job.getTasks().values()) { + TaskInfo taskInfo = allTasks.get(TypeConverter.fromYarn(task.getID())); + for (TaskAttempt taskAttempt : task.getAttempts().values()) { + TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get( + TypeConverter.fromYarn((taskAttempt.getID()))); + // Verify rack-name for all task attempts + Assert.assertEquals("rack-name is incorrect", + taskAttemptInfo.getRackname(), RACK_NAME); + if (taskAttemptInfo.getTaskStatus().equals("FAILED")) { + noOffailedAttempts++; + } } } - } - Assert.assertEquals("No of Failed tasks doesn't match.", 2, noOffailedAttempts); + Assert.assertEquals("No of Failed tasks doesn't match.", 2, + noOffailedAttempts); } finally { LOG.info("FINISHED testHistoryParsingForFailedAttempts"); } } - - @Test (timeout=5000) + + @Test(timeout = 60000) public void testCountersForFailedTask() throws Exception { LOG.info("STARTING testCountersForFailedTask"); try { - Configuration conf = new Configuration(); - conf - .setClass( - CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, - MyResolver.class, DNSToSwitchMapping.class); - RackResolver.init(conf); - MRApp app = new MRAppWithHistoryWithFailedTask(2, 1, true, - this.getClass().getName(), true); - app.submit(conf); - Job job = app.getContext().getAllJobs().values().iterator().next(); - JobId jobId = job.getID(); - app.waitForState(job, JobState.FAILED); + Configuration conf = new Configuration(); + conf.setClass( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + MyResolver.class, DNSToSwitchMapping.class); + RackResolver.init(conf); + MRApp app = new MRAppWithHistoryWithFailedTask(2, 1, true, this + .getClass().getName(), true); + app.submit(conf); + Job job = app.getContext().getAllJobs().values().iterator().next(); + JobId jobId = job.getID(); + app.waitForState(job, JobState.FAILED); - // make sure all events are flushed - app.waitForState(Service.STATE.STOPPED); + // make sure all events are flushed + app.waitForState(Service.STATE.STOPPED); - String jobhistoryDir = JobHistoryUtils - .getHistoryIntermediateDoneDirForUser(conf); - JobHistory jobHistory = new JobHistory(); - jobHistory.init(conf); + String jobhistoryDir = JobHistoryUtils + .getHistoryIntermediateDoneDirForUser(conf); + JobHistory jobHistory = new JobHistory(); + jobHistory.init(conf); - JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId) - .getJobIndexInfo(); - String jobhistoryFileName = FileNameIndexUtils - .getDoneFileName(jobIndexInfo); + JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId) + .getJobIndexInfo(); + String jobhistoryFileName = FileNameIndexUtils + .getDoneFileName(jobIndexInfo); - Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName); - FSDataInputStream in = null; - FileContext fc = null; - try { - fc = FileContext.getFileContext(conf); - in = fc.open(fc.makeQualified(historyFilePath)); - } catch (IOException ioe) { - LOG.info("Can not open history file: " + historyFilePath, ioe); - throw (new Exception("Can not open History File")); - } + Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName); + FSDataInputStream in = null; + FileContext fc = null; + try { + fc = FileContext.getFileContext(conf); + in = fc.open(fc.makeQualified(historyFilePath)); + } catch (IOException ioe) { + LOG.info("Can not open history file: " + historyFilePath, ioe); + throw (new Exception("Can not open History File")); + } - JobHistoryParser parser = new JobHistoryParser(in); - JobInfo jobInfo = parser.parse(); - Exception parseException = parser.getParseException(); - Assert.assertNull("Caught an expected exception " + parseException, - parseException); - for (Map.Entry entry : jobInfo.getAllTasks().entrySet()) { - TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey()); - CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue()); - Assert.assertNotNull("completed task report has null counters", - ct.getReport().getCounters()); - //Make sure all the completedTask has counters, and the counters are not empty - Assert.assertTrue(ct.getReport().getCounters() - .getAllCounterGroups().size() > 0); - } + JobHistoryParser parser = new JobHistoryParser(in); + JobInfo jobInfo = parser.parse(); + Exception parseException = parser.getParseException(); + Assert.assertNull("Caught an expected exception " + parseException, + parseException); + for (Map.Entry entry : jobInfo.getAllTasks().entrySet()) { + TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey()); + CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue()); + Assert.assertNotNull("completed task report has null counters", ct + .getReport().getCounters()); + } } finally { LOG.info("FINISHED testCountersForFailedTask"); } } - @Test (timeout=50000) + @Test(timeout = 50000) public void testScanningOldDirs() throws Exception { LOG.info("STARTING testScanningOldDirs"); try { - Configuration conf = new Configuration(); - conf - .setClass( - CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, - MyResolver.class, DNSToSwitchMapping.class); - RackResolver.init(conf); - MRApp app = - new MRAppWithHistory(1, 1, true, - this.getClass().getName(), true); - app.submit(conf); - Job job = app.getContext().getAllJobs().values().iterator().next(); - JobId jobId = job.getID(); - LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString()); - app.waitForState(job, JobState.SUCCEEDED); + Configuration conf = new Configuration(); + conf.setClass( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + MyResolver.class, DNSToSwitchMapping.class); + RackResolver.init(conf); + MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(), + true); + app.submit(conf); + Job job = app.getContext().getAllJobs().values().iterator().next(); + JobId jobId = job.getID(); + LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString()); + app.waitForState(job, JobState.SUCCEEDED); - // make sure all events are flushed - app.waitForState(Service.STATE.STOPPED); + // make sure all events are flushed + app.waitForState(Service.STATE.STOPPED); - HistoryFileManagerForTest hfm = new HistoryFileManagerForTest(); - hfm.init(conf); - HistoryFileInfo fileInfo = hfm.getFileInfo(jobId); - Assert.assertNotNull("Unable to locate job history", fileInfo); + HistoryFileManagerForTest hfm = new HistoryFileManagerForTest(); + hfm.init(conf); + HistoryFileInfo fileInfo = hfm.getFileInfo(jobId); + Assert.assertNotNull("Unable to locate job history", fileInfo); - // force the manager to "forget" the job - hfm.deleteJobFromJobListCache(fileInfo); - final int msecPerSleep = 10; - int msecToSleep = 10 * 1000; - while (fileInfo.isMovePending() && msecToSleep > 0) { - Assert.assertTrue(!fileInfo.didMoveFail()); - msecToSleep -= msecPerSleep; - Thread.sleep(msecPerSleep); - } - Assert.assertTrue("Timeout waiting for history move", msecToSleep > 0); + // force the manager to "forget" the job + hfm.deleteJobFromJobListCache(fileInfo); + final int msecPerSleep = 10; + int msecToSleep = 10 * 1000; + while (fileInfo.isMovePending() && msecToSleep > 0) { + Assert.assertTrue(!fileInfo.didMoveFail()); + msecToSleep -= msecPerSleep; + Thread.sleep(msecPerSleep); + } + Assert.assertTrue("Timeout waiting for history move", msecToSleep > 0); - fileInfo = hfm.getFileInfo(jobId); - Assert.assertNotNull("Unable to locate old job history", fileInfo); - } finally { + fileInfo = hfm.getFileInfo(jobId); + Assert.assertNotNull("Unable to locate old job history", fileInfo); + } finally { LOG.info("FINISHED testScanningOldDirs"); } } static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory { - public MRAppWithHistoryWithFailedAttempt(int maps, int reduces, boolean autoComplete, - String testName, boolean cleanOnStart) { + public MRAppWithHistoryWithFailedAttempt(int maps, int reduces, + boolean autoComplete, String testName, boolean cleanOnStart) { super(maps, reduces, autoComplete, testName, cleanOnStart); } - + @SuppressWarnings("unchecked") @Override protected void attemptLaunched(TaskAttemptId attemptID) { @@ -558,8 +564,8 @@ protected void attemptLaunched(TaskAttemptId attemptID) { static class MRAppWithHistoryWithFailedTask extends MRAppWithHistory { - public MRAppWithHistoryWithFailedTask(int maps, int reduces, boolean autoComplete, - String testName, boolean cleanOnStart) { + public MRAppWithHistoryWithFailedTask(int maps, int reduces, + boolean autoComplete, String testName, boolean cleanOnStart) { super(maps, reduces, autoComplete, testName, cleanOnStart); } @@ -587,4 +593,128 @@ public static void main(String[] args) throws Exception { t.testHistoryParsing(); t.testHistoryParsingForFailedAttempts(); } + + /** + * test clean old history files. Files should be deleted after 1 week by + * default. + */ + @Test(timeout = 15000) + public void testDeleteFileInfo() throws Exception { + LOG.info("STARTING testDeleteFileInfo"); + try { + Configuration conf = new Configuration(); + conf.setClass( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + MyResolver.class, DNSToSwitchMapping.class); + + RackResolver.init(conf); + MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(), + true); + app.submit(conf); + Job job = app.getContext().getAllJobs().values().iterator().next(); + JobId jobId = job.getID(); + + app.waitForState(job, JobState.SUCCEEDED); + + // make sure all events are flushed + app.waitForState(Service.STATE.STOPPED); + HistoryFileManager hfm = new HistoryFileManager(); + hfm.init(conf); + HistoryFileInfo fileInfo = hfm.getFileInfo(jobId); + hfm.initExisting(); + // wait for move files form the done_intermediate directory to the gone + // directory + while (fileInfo.isMovePending()) { + Thread.sleep(300); + } + + Assert.assertNotNull(hfm.jobListCache.values()); + + // try to remove fileInfo + hfm.clean(); + // check that fileInfo does not deleted + Assert.assertFalse(fileInfo.isDeleted()); + // correct live time + hfm.setMaxHistoryAge(-1); + hfm.clean(); + // should be deleted ! + Assert.assertTrue("file should be deleted ", fileInfo.isDeleted()); + + } finally { + LOG.info("FINISHED testDeleteFileInfo"); + } + } + + /** + * Simple test some methods of JobHistory + */ + @Test(timeout = 20000) + public void testJobHistoryMethods() throws Exception { + LOG.info("STARTING testJobHistoryMethods"); + try { + Configuration configuration = new Configuration(); + configuration + .setClass( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + MyResolver.class, DNSToSwitchMapping.class); + + RackResolver.init(configuration); + MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(), + true); + app.submit(configuration); + Job job = app.getContext().getAllJobs().values().iterator().next(); + app.waitForState(job, JobState.SUCCEEDED); + + JobHistory jobHistory = new JobHistory(); + jobHistory.init(configuration); + // Method getAllJobs + Assert.assertEquals(1, jobHistory.getAllJobs().size()); + // and with ApplicationId + Assert.assertEquals(1, jobHistory.getAllJobs(app.getAppID()).size()); + + JobsInfo jobsinfo = jobHistory.getPartialJobs(0L, 10L, null, "default", + 0L, System.currentTimeMillis() + 1, 0L, + System.currentTimeMillis() + 1, JobState.SUCCEEDED); + + Assert.assertEquals(1, jobsinfo.getJobs().size()); + Assert.assertNotNull(jobHistory.getApplicationAttemptId()); + // test Application Id + Assert.assertEquals("application_0_0000", jobHistory.getApplicationID() + .toString()); + Assert + .assertEquals("Job History Server", jobHistory.getApplicationName()); + // method does not work + Assert.assertNull(jobHistory.getEventHandler()); + // method does not work + Assert.assertNull(jobHistory.getClock()); + // method does not work + Assert.assertNull(jobHistory.getClusterInfo()); + + } finally { + LOG.info("FINISHED testJobHistoryMethods"); + } + } + + /** + * Simple test PartialJob + */ + @Test(timeout = 1000) + public void testPartialJob() throws Exception { + JobId jobId = new JobIdPBImpl(); + jobId.setId(0); + JobIndexInfo jii = new JobIndexInfo(0L, System.currentTimeMillis(), "user", + "jobName", jobId, 3, 2, "JobStatus"); + PartialJob test = new PartialJob(jii, jobId); + assertEquals(1.0f, test.getProgress(), 0.001); + assertNull(test.getAllCounters()); + assertNull(test.getTasks()); + assertNull(test.getTasks(TaskType.MAP)); + assertNull(test.getTask(new TaskIdPBImpl())); + + assertNull(test.getTaskAttemptCompletionEvents(0, 100)); + assertNull(test.getMapAttemptCompletionEvents(0, 100)); + assertTrue(test.checkAccess(UserGroupInformation.getCurrentUser(), null)); + assertNull(test.getAMInfos()); + + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java new file mode 100644 index 00000000000..faf3aebd259 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.hs; + + +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.mapreduce.TaskCounter; +import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsResponse; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportResponse; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportResponse; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskState; +import org.apache.hadoop.mapreduce.v2.app.MRApp; +import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.Task; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory; +import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryParsing.MyResolver; +import org.apache.hadoop.net.DNSToSwitchMapping; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.service.Service; +import org.apache.hadoop.yarn.service.Service.STATE; +import org.apache.hadoop.yarn.util.RackResolver; +import org.junit.After; +import org.junit.Test; + +import static org.junit.Assert.*; + +/* +test JobHistoryServer protocols.... + */ +public class TestJobHistoryServer { + private static RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + + + JobHistoryServer historyServer=null; + // simple test init/start/stop JobHistoryServer. Status should change. + + @Test (timeout= 50000 ) + public void testStartStopServer() throws Exception { + + historyServer = new JobHistoryServer(); + Configuration config = new Configuration(); + historyServer.init(config); + assertEquals(STATE.INITED, historyServer.getServiceState()); + assertEquals(3, historyServer.getServices().size()); + historyServer.start(); + assertEquals(STATE.STARTED, historyServer.getServiceState()); + historyServer.stop(); + assertEquals(STATE.STOPPED, historyServer.getServiceState()); + assertNotNull(historyServer.getClientService()); + HistoryClientService historyService = historyServer.getClientService(); + assertNotNull(historyService.getClientHandler().getConnectAddress()); + + + + } + + + + //Test reports of JobHistoryServer. History server should get log files from MRApp and read them + + @Test (timeout= 50000 ) + public void testReports() throws Exception { + Configuration config = new Configuration(); + config + .setClass( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + MyResolver.class, DNSToSwitchMapping.class); + + RackResolver.init(config); + MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(), + true); + app.submit(config); + Job job = app.getContext().getAllJobs().values().iterator().next(); + app.waitForState(job, JobState.SUCCEEDED); + + historyServer = new JobHistoryServer(); + + historyServer.init(config); + historyServer.start(); + + // search JobHistory service + JobHistory jobHistory= null; + for (Service service : historyServer.getServices() ) { + if (service instanceof JobHistory) { + jobHistory = (JobHistory) service; + } + }; + + Map jobs= jobHistory.getAllJobs(); + + assertEquals(1, jobs.size()); + assertEquals("job_0_0000",jobs.keySet().iterator().next().toString()); + + + Task task = job.getTasks().values().iterator().next(); + TaskAttempt attempt = task.getAttempts().values().iterator().next(); + + HistoryClientService historyService = historyServer.getClientService(); + MRClientProtocol protocol = historyService.getClientHandler(); + + GetTaskAttemptReportRequest gtarRequest = recordFactory + .newRecordInstance(GetTaskAttemptReportRequest.class); + // test getTaskAttemptReport + TaskAttemptId taId = attempt.getID(); + taId.setTaskId(task.getID()); + taId.getTaskId().setJobId(job.getID()); + gtarRequest.setTaskAttemptId(taId); + GetTaskAttemptReportResponse response = protocol + .getTaskAttemptReport(gtarRequest); + assertEquals("container_0_0000_01_000000", response.getTaskAttemptReport() + .getContainerId().toString()); + assertTrue(response.getTaskAttemptReport().getDiagnosticInfo().isEmpty()); + // counters + assertNotNull(response.getTaskAttemptReport().getCounters() + .getCounter(TaskCounter.PHYSICAL_MEMORY_BYTES)); + assertEquals(taId.toString(), response.getTaskAttemptReport() + .getTaskAttemptId().toString()); + // test getTaskReport + GetTaskReportRequest request = recordFactory + .newRecordInstance(GetTaskReportRequest.class); + TaskId taskId = task.getID(); + taskId.setJobId(job.getID()); + request.setTaskId(taskId); + GetTaskReportResponse reportResponse = protocol.getTaskReport(request); + assertEquals("", reportResponse.getTaskReport().getDiagnosticsList() + .iterator().next()); + // progress + assertEquals(1.0f, reportResponse.getTaskReport().getProgress(), 0.01); + // report has corrected taskId + assertEquals(taskId.toString(), reportResponse.getTaskReport().getTaskId() + .toString()); + // Task state should be SUCCEEDED + assertEquals(TaskState.SUCCEEDED, reportResponse.getTaskReport() + .getTaskState()); + // test getTaskAttemptCompletionEvents + GetTaskAttemptCompletionEventsRequest taskAttemptRequest = recordFactory + .newRecordInstance(GetTaskAttemptCompletionEventsRequest.class); + taskAttemptRequest.setJobId(job.getID()); + GetTaskAttemptCompletionEventsResponse taskAttemptCompletionEventsResponse = protocol + .getTaskAttemptCompletionEvents(taskAttemptRequest); + assertEquals(0, taskAttemptCompletionEventsResponse.getCompletionEventCount()); + + // test getDiagnostics + GetDiagnosticsRequest diagnosticRequest = recordFactory + .newRecordInstance(GetDiagnosticsRequest.class); + diagnosticRequest.setTaskAttemptId(taId); + GetDiagnosticsResponse diagnosticResponse = protocol + .getDiagnostics(diagnosticRequest); + // it is strange : why one empty string ? + assertEquals(1, diagnosticResponse.getDiagnosticsCount()); + assertEquals("", diagnosticResponse.getDiagnostics(0)); + + } + // test main method + @Test (timeout =60000) + public void testMainMethod() throws Exception { + + ExitUtil.disableSystemExit(); + try { + JobHistoryServer.main(new String[0]); + + } catch (ExitUtil.ExitException e) { + assertEquals(0,e.status); + ExitUtil.resetFirstExitException(); + fail(); + } + } + + @After + public void stop(){ + if(historyServer !=null && !STATE.STOPPED.equals(historyServer.getServiceState())){ + historyServer.stop(); + } + } +}