MAPREDUCE-5007. fix coverage org.apache.hadoop.mapreduce.v2.hs (Aleksey Gorshkov via tgraves)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1465016 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas Graves 2013-04-05 15:43:38 +00:00
parent d002c038b2
commit 003ecf8137
6 changed files with 623 additions and 205 deletions

View File

@ -717,6 +717,9 @@ Release 0.23.7 - UNRELEASED
MAPREDUCE-4991. coverage for gridmix (Aleksey Gorshkov via tgraves) MAPREDUCE-4991. coverage for gridmix (Aleksey Gorshkov via tgraves)
MAPREDUCE-5007. fix coverage org.apache.hadoop.mapreduce.v2.hs (Aleksey
Gorshkov via tgraves)
Release 0.23.6 - UNRELEASED Release 0.23.6 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -869,4 +869,8 @@ public class HistoryFileManager extends AbstractService {
} }
} }
} }
@VisibleForTesting
protected void setMaxHistoryAge(long newValue){
maxHistoryAge=newValue;
}
} }

View File

@ -21,46 +21,75 @@ package org.apache.hadoop.mapreduce.v2.hs;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.hs.CompletedTask; import org.apache.hadoop.mapreduce.v2.hs.CompletedTask;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import static org.mockito.Mockito.*;
import static org.junit.Assert.*;
public class TestCompletedTask{ public class TestCompletedTask{
@Test @Test (timeout=5000)
public void testTaskStartTimes() { public void testTaskStartTimes() {
TaskId taskId = Mockito.mock(TaskId.class); TaskId taskId = mock(TaskId.class);
TaskInfo taskInfo = Mockito.mock(TaskInfo.class); TaskInfo taskInfo = mock(TaskInfo.class);
Map<TaskAttemptID, TaskAttemptInfo> taskAttempts Map<TaskAttemptID, TaskAttemptInfo> taskAttempts
= new TreeMap<TaskAttemptID, TaskAttemptInfo>(); = new TreeMap<TaskAttemptID, TaskAttemptInfo>();
TaskAttemptID id = new TaskAttemptID("0", 0, TaskType.MAP, 0, 0); TaskAttemptID id = new TaskAttemptID("0", 0, TaskType.MAP, 0, 0);
TaskAttemptInfo info = Mockito.mock(TaskAttemptInfo.class); TaskAttemptInfo info = mock(TaskAttemptInfo.class);
Mockito.when(info.getAttemptId()).thenReturn(id); when(info.getAttemptId()).thenReturn(id);
Mockito.when(info.getStartTime()).thenReturn(10l); when(info.getStartTime()).thenReturn(10l);
taskAttempts.put(id, info); taskAttempts.put(id, info);
id = new TaskAttemptID("1", 0, TaskType.MAP, 1, 1); id = new TaskAttemptID("1", 0, TaskType.MAP, 1, 1);
info = Mockito.mock(TaskAttemptInfo.class); info = mock(TaskAttemptInfo.class);
Mockito.when(info.getAttemptId()).thenReturn(id); when(info.getAttemptId()).thenReturn(id);
Mockito.when(info.getStartTime()).thenReturn(20l); when(info.getStartTime()).thenReturn(20l);
taskAttempts.put(id, info); taskAttempts.put(id, info);
Mockito.when(taskInfo.getAllTaskAttempts()).thenReturn(taskAttempts); when(taskInfo.getAllTaskAttempts()).thenReturn(taskAttempts);
CompletedTask task = new CompletedTask(taskId, taskInfo); CompletedTask task = new CompletedTask(taskId, taskInfo);
TaskReport report = task.getReport(); TaskReport report = task.getReport();
// Make sure the startTime returned by report is the lesser of the // Make sure the startTime returned by report is the lesser of the
// attempy launch times // attempy launch times
Assert.assertTrue(report.getStartTime() == 10); assertTrue(report.getStartTime() == 10);
}
/**
* test some methods of CompletedTaskAttempt
*/
@Test (timeout=5000)
public void testCompletedTaskAttempt(){
TaskAttemptInfo attemptInfo= mock(TaskAttemptInfo.class);
when(attemptInfo.getRackname()).thenReturn("Rackname");
when(attemptInfo.getShuffleFinishTime()).thenReturn(11L);
when(attemptInfo.getSortFinishTime()).thenReturn(12L);
when(attemptInfo.getShufflePort()).thenReturn(10);
JobID jobId= new JobID("12345",0);
TaskID taskId =new TaskID(jobId,TaskType.REDUCE, 0);
TaskAttemptID taskAttemptId= new TaskAttemptID(taskId, 0);
when(attemptInfo.getAttemptId()).thenReturn(taskAttemptId);
CompletedTaskAttempt taskAttemt= new CompletedTaskAttempt(null,attemptInfo);
assertEquals( "Rackname", taskAttemt.getNodeRackName());
assertEquals( Phase.CLEANUP, taskAttemt.getPhase());
assertTrue( taskAttemt.isFinished());
assertEquals( 11L, taskAttemt.getShuffleFinishTime());
assertEquals( 12L, taskAttemt.getSortFinishTime());
assertEquals( 10, taskAttemt.getShufflePort());
} }
} }

View File

@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobACLsManager; import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@ -46,6 +47,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters; import org.junit.runners.Parameterized.Parameters;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
@RunWith(value = Parameterized.class) @RunWith(value = Parameterized.class)
@ -79,7 +81,7 @@ public class TestJobHistoryEntities {
} }
/* Verify some expected values based on the history file */ /* Verify some expected values based on the history file */
@Test (timeout=10000) @Test (timeout=100000)
public void testCompletedJob() throws Exception { public void testCompletedJob() throws Exception {
HistoryFileInfo info = mock(HistoryFileInfo.class); HistoryFileInfo info = mock(HistoryFileInfo.class);
when(info.getConfFile()).thenReturn(fullConfPath); when(info.getConfFile()).thenReturn(fullConfPath);
@ -168,4 +170,45 @@ public class TestJobHistoryEntities {
assertEquals(45454, rta1Report.getNodeManagerPort()); assertEquals(45454, rta1Report.getNodeManagerPort());
assertEquals(9999, rta1Report.getNodeManagerHttpPort()); assertEquals(9999, rta1Report.getNodeManagerHttpPort());
} }
/**
* Simple test of some methods of CompletedJob
* @throws Exception
*/
@Test (timeout=30000)
public void testGetTaskAttemptCompletionEvent() throws Exception{
HistoryFileInfo info = mock(HistoryFileInfo.class);
when(info.getConfFile()).thenReturn(fullConfPath);
completedJob =
new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user",
info, jobAclsManager);
TaskCompletionEvent[] events= completedJob.getMapAttemptCompletionEvents(0,1000);
assertEquals(10, completedJob.getMapAttemptCompletionEvents(0,10).length);
int currentEventId=0;
for (TaskCompletionEvent taskAttemptCompletionEvent : events) {
int eventId= taskAttemptCompletionEvent.getEventId();
assertTrue(eventId>=currentEventId);
currentEventId=eventId;
}
assertNull(completedJob.loadConfFile() );
// job name
assertEquals("Sleep job",completedJob.getName());
// queue name
assertEquals("default",completedJob.getQueueName());
// progress
assertEquals(1.0, completedJob.getProgress(),0.001);
// 11 rows in answer
assertEquals(11,completedJob.getTaskAttemptCompletionEvents(0,1000).length);
// select first 10 rows
assertEquals(10,completedJob.getTaskAttemptCompletionEvents(0,10).length);
// select 5-10 rows include 5th
assertEquals(6,completedJob.getTaskAttemptCompletionEvents(5,10).length);
// without errors
assertEquals(1,completedJob.getDiagnostics().size());
assertEquals("",completedJob.getDiagnostics().get(0));
assertEquals(0, completedJob.getJobACLs().size());
}
} }

View File

@ -18,6 +18,10 @@
package org.apache.hadoop.mapreduce.v2.hs; package org.apache.hadoop.mapreduce.v2.hs;
import static junit.framework.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
@ -54,6 +58,9 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.JobIdPBImpl;
import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.TaskIdPBImpl;
import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
@ -65,7 +72,9 @@ import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
@ -80,12 +89,12 @@ public class TestJobHistoryParsing {
private static final String RACK_NAME = "/MyRackName"; private static final String RACK_NAME = "/MyRackName";
private ByteArrayOutputStream outContent = new ByteArrayOutputStream(); private ByteArrayOutputStream outContent = new ByteArrayOutputStream();
public static class MyResolver implements DNSToSwitchMapping { public static class MyResolver implements DNSToSwitchMapping {
@Override @Override
public List<String> resolve(List<String> names) { public List<String> resolve(List<String> names) {
return Arrays.asList(new String[]{RACK_NAME}); return Arrays.asList(new String[] { RACK_NAME });
} }
@Override @Override
@ -93,14 +102,14 @@ public class TestJobHistoryParsing {
} }
} }
@Test (timeout=50000) @Test(timeout = 50000)
public void testJobInfo() throws Exception { public void testJobInfo() throws Exception {
JobInfo info = new JobInfo(); JobInfo info = new JobInfo();
Assert.assertEquals("NORMAL", info.getPriority()); Assert.assertEquals("NORMAL", info.getPriority());
info.printAll(); info.printAll();
} }
@Test (timeout=50000) @Test(timeout = 300000)
public void testHistoryParsing() throws Exception { public void testHistoryParsing() throws Exception {
LOG.info("STARTING testHistoryParsing()"); LOG.info("STARTING testHistoryParsing()");
try { try {
@ -109,8 +118,8 @@ public class TestJobHistoryParsing {
LOG.info("FINISHED testHistoryParsing()"); LOG.info("FINISHED testHistoryParsing()");
} }
} }
@Test (timeout=50000) @Test(timeout = 50000)
public void testHistoryParsingWithParseErrors() throws Exception { public void testHistoryParsingWithParseErrors() throws Exception {
LOG.info("STARTING testHistoryParsingWithParseErrors()"); LOG.info("STARTING testHistoryParsingWithParseErrors()");
try { try {
@ -119,18 +128,18 @@ public class TestJobHistoryParsing {
LOG.info("FINISHED testHistoryParsingWithParseErrors()"); LOG.info("FINISHED testHistoryParsingWithParseErrors()");
} }
} }
private static String getJobSummary(FileContext fc, Path path) throws IOException { private static String getJobSummary(FileContext fc, Path path)
throws IOException {
Path qPath = fc.makeQualified(path); Path qPath = fc.makeQualified(path);
FSDataInputStream in = fc.open(qPath); FSDataInputStream in = fc.open(qPath);
String jobSummaryString = in.readUTF(); String jobSummaryString = in.readUTF();
in.close(); in.close();
return jobSummaryString; return jobSummaryString;
} }
private void checkHistoryParsing(final int numMaps, final int numReduces, private void checkHistoryParsing(final int numMaps, final int numReduces,
final int numSuccessfulMaps) final int numSuccessfulMaps) throws Exception {
throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name")); conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
long amStartTimeEst = System.currentTimeMillis(); long amStartTimeEst = System.currentTimeMillis();
@ -138,9 +147,8 @@ public class TestJobHistoryParsing {
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class); MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf); RackResolver.init(conf);
MRApp app = MRApp app = new MRAppWithHistory(numMaps, numReduces, true, this.getClass()
new MRAppWithHistory(numMaps, numReduces, true, .getName(), true);
this.getClass().getName(), true);
app.submit(conf); app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next(); Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID(); JobId jobId = job.getID();
@ -152,7 +160,7 @@ public class TestJobHistoryParsing {
String jobhistoryDir = JobHistoryUtils String jobhistoryDir = JobHistoryUtils
.getHistoryIntermediateDoneDirForUser(conf); .getHistoryIntermediateDoneDirForUser(conf);
FileContext fc = null; FileContext fc = null;
try { try {
fc = FileContext.getFileContext(conf); fc = FileContext.getFileContext(conf);
@ -160,7 +168,7 @@ public class TestJobHistoryParsing {
LOG.info("Can not get FileContext", ioe); LOG.info("Can not get FileContext", ioe);
throw (new Exception("Can not get File Context")); throw (new Exception("Can not get File Context"));
} }
if (numMaps == numSuccessfulMaps) { if (numMaps == numSuccessfulMaps) {
String summaryFileName = JobHistoryUtils String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobId); .getIntermediateSummaryFileName(jobId);
@ -185,20 +193,22 @@ public class TestJobHistoryParsing {
Long.parseLong(jobSummaryElements.get("submitTime")) != 0); Long.parseLong(jobSummaryElements.get("submitTime")) != 0);
Assert.assertTrue("launchTime should not be 0", Assert.assertTrue("launchTime should not be 0",
Long.parseLong(jobSummaryElements.get("launchTime")) != 0); Long.parseLong(jobSummaryElements.get("launchTime")) != 0);
Assert.assertTrue("firstMapTaskLaunchTime should not be 0",
Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0);
Assert Assert
.assertTrue( .assertTrue(
"firstReduceTaskLaunchTime should not be 0", "firstMapTaskLaunchTime should not be 0",
Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0); Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0);
Assert
.assertTrue("firstReduceTaskLaunchTime should not be 0",
Long.parseLong(jobSummaryElements
.get("firstReduceTaskLaunchTime")) != 0);
Assert.assertTrue("finishTime should not be 0", Assert.assertTrue("finishTime should not be 0",
Long.parseLong(jobSummaryElements.get("finishTime")) != 0); Long.parseLong(jobSummaryElements.get("finishTime")) != 0);
Assert.assertEquals("Mismatch in num map slots", numSuccessfulMaps, Assert.assertEquals("Mismatch in num map slots", numSuccessfulMaps,
Integer.parseInt(jobSummaryElements.get("numMaps"))); Integer.parseInt(jobSummaryElements.get("numMaps")));
Assert.assertEquals("Mismatch in num reduce slots", numReduces, Assert.assertEquals("Mismatch in num reduce slots", numReduces,
Integer.parseInt(jobSummaryElements.get("numReduces"))); Integer.parseInt(jobSummaryElements.get("numReduces")));
Assert.assertEquals("User does not match", System.getProperty("user.name"), Assert.assertEquals("User does not match",
jobSummaryElements.get("user")); System.getProperty("user.name"), jobSummaryElements.get("user"));
Assert.assertEquals("Queue does not match", "default", Assert.assertEquals("Queue does not match", "default",
jobSummaryElements.get("queue")); jobSummaryElements.get("queue"));
Assert.assertEquals("Status does not match", "SUCCEEDED", Assert.assertEquals("Status does not match", "SUCCEEDED",
@ -210,8 +220,8 @@ public class TestJobHistoryParsing {
HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId); HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId);
JobInfo jobInfo; JobInfo jobInfo;
long numFinishedMaps; long numFinishedMaps;
synchronized(fileInfo) { synchronized (fileInfo) {
Path historyFilePath = fileInfo.getHistoryFile(); Path historyFilePath = fileInfo.getHistoryFile();
FSDataInputStream in = null; FSDataInputStream in = null;
LOG.info("JobHistoryFile is: " + historyFilePath); LOG.info("JobHistoryFile is: " + historyFilePath);
@ -228,11 +238,11 @@ public class TestJobHistoryParsing {
if (numMaps == numSuccessfulMaps) { if (numMaps == numSuccessfulMaps) {
reader = realReader; reader = realReader;
} else { } else {
final AtomicInteger numFinishedEvents = new AtomicInteger(0); // Hack! final AtomicInteger numFinishedEvents = new AtomicInteger(0); // Hack!
Mockito.when(reader.getNextEvent()).thenAnswer( Mockito.when(reader.getNextEvent()).thenAnswer(
new Answer<HistoryEvent>() { new Answer<HistoryEvent>() {
public HistoryEvent answer(InvocationOnMock invocation) public HistoryEvent answer(InvocationOnMock invocation)
throws IOException { throws IOException {
HistoryEvent event = realReader.getNextEvent(); HistoryEvent event = realReader.getNextEvent();
if (event instanceof TaskFinishedEvent) { if (event instanceof TaskFinishedEvent) {
numFinishedEvents.incrementAndGet(); numFinishedEvents.incrementAndGet();
@ -244,22 +254,20 @@ public class TestJobHistoryParsing {
throw new IOException("test"); throw new IOException("test");
} }
} }
} });
);
} }
jobInfo = parser.parse(reader); jobInfo = parser.parse(reader);
numFinishedMaps = numFinishedMaps = computeFinishedMaps(jobInfo, numMaps, numSuccessfulMaps);
computeFinishedMaps(jobInfo, numMaps, numSuccessfulMaps);
if (numFinishedMaps != numMaps) { if (numFinishedMaps != numMaps) {
Exception parseException = parser.getParseException(); Exception parseException = parser.getParseException();
Assert.assertNotNull("Didn't get expected parse exception", Assert.assertNotNull("Didn't get expected parse exception",
parseException); parseException);
} }
} }
Assert.assertEquals("Incorrect username ", System.getProperty("user.name"), Assert.assertEquals("Incorrect username ", System.getProperty("user.name"),
jobInfo.getUsername()); jobInfo.getUsername());
Assert.assertEquals("Incorrect jobName ", "test", jobInfo.getJobname()); Assert.assertEquals("Incorrect jobName ", "test", jobInfo.getJobname());
@ -267,7 +275,7 @@ public class TestJobHistoryParsing {
jobInfo.getJobQueueName()); jobInfo.getJobQueueName());
Assert Assert
.assertEquals("incorrect conf path", "test", jobInfo.getJobConfPath()); .assertEquals("incorrect conf path", "test", jobInfo.getJobConfPath());
Assert.assertEquals("incorrect finishedMap ", numSuccessfulMaps, Assert.assertEquals("incorrect finishedMap ", numSuccessfulMaps,
numFinishedMaps); numFinishedMaps);
Assert.assertEquals("incorrect finishedReduces ", numReduces, Assert.assertEquals("incorrect finishedReduces ", numReduces,
jobInfo.getFinishedReduces()); jobInfo.getFinishedReduces());
@ -275,8 +283,8 @@ public class TestJobHistoryParsing {
jobInfo.getUberized()); jobInfo.getUberized());
Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks(); Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
int totalTasks = allTasks.size(); int totalTasks = allTasks.size();
Assert.assertEquals("total number of tasks is incorrect ", Assert.assertEquals("total number of tasks is incorrect ",
(numMaps+numReduces), totalTasks); (numMaps + numReduces), totalTasks);
// Verify aminfo // Verify aminfo
Assert.assertEquals(1, jobInfo.getAMInfos().size()); Assert.assertEquals(1, jobInfo.getAMInfos().size());
@ -306,8 +314,7 @@ public class TestJobHistoryParsing {
// Deep compare Job and JobInfo // Deep compare Job and JobInfo
for (Task task : job.getTasks().values()) { for (Task task : job.getTasks().values()) {
TaskInfo taskInfo = allTasks.get( TaskInfo taskInfo = allTasks.get(TypeConverter.fromYarn(task.getID()));
TypeConverter.fromYarn(task.getID()));
Assert.assertNotNull("TaskInfo not found", taskInfo); Assert.assertNotNull("TaskInfo not found", taskInfo);
for (TaskAttempt taskAttempt : task.getAttempts().values()) { for (TaskAttempt taskAttempt : task.getAttempts().values()) {
TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get( TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get(
@ -318,27 +325,32 @@ public class TestJobHistoryParsing {
if (numMaps == numSuccessfulMaps) { if (numMaps == numSuccessfulMaps) {
Assert.assertEquals(MRApp.NM_HOST, taskAttemptInfo.getHostname()); Assert.assertEquals(MRApp.NM_HOST, taskAttemptInfo.getHostname());
Assert.assertEquals(MRApp.NM_PORT, taskAttemptInfo.getPort()); Assert.assertEquals(MRApp.NM_PORT, taskAttemptInfo.getPort());
// Verify rack-name // Verify rack-name
Assert.assertEquals("rack-name is incorrect", taskAttemptInfo Assert.assertEquals("rack-name is incorrect",
.getRackname(), RACK_NAME); taskAttemptInfo.getRackname(), RACK_NAME);
} }
} }
} }
// test output for HistoryViewer // test output for HistoryViewer
PrintStream stdps=System.out; PrintStream stdps = System.out;
try { try {
System.setOut(new PrintStream(outContent)); System.setOut(new PrintStream(outContent));
HistoryViewer viewer = new HistoryViewer(fc.makeQualified( HistoryViewer viewer = new HistoryViewer(fc.makeQualified(
fileInfo.getHistoryFile()).toString(), conf, true); fileInfo.getHistoryFile()).toString(), conf, true);
viewer.print(); viewer.print();
for (TaskInfo taskInfo : allTasks.values()) { for (TaskInfo taskInfo : allTasks.values()) {
String test= (taskInfo.getTaskStatus()==null?"":taskInfo.getTaskStatus())+" "+taskInfo.getTaskType()+" task list for "+taskInfo.getTaskId().getJobID(); String test = (taskInfo.getTaskStatus() == null ? "" : taskInfo
Assert.assertTrue(outContent.toString().indexOf(test)>0); .getTaskStatus())
Assert.assertTrue(outContent.toString().indexOf(taskInfo.getTaskId().toString())>0); + " "
+ taskInfo.getTaskType()
+ " task list for " + taskInfo.getTaskId().getJobID();
Assert.assertTrue(outContent.toString().indexOf(test) > 0);
Assert.assertTrue(outContent.toString().indexOf(
taskInfo.getTaskId().toString()) > 0);
} }
} finally { } finally {
System.setOut(stdps); System.setOut(stdps);
@ -363,186 +375,180 @@ public class TestJobHistoryParsing {
} }
return numFinishedMaps; return numFinishedMaps;
} }
@Test (timeout=50000) @Test(timeout = 30000)
public void testHistoryParsingForFailedAttempts() throws Exception { public void testHistoryParsingForFailedAttempts() throws Exception {
LOG.info("STARTING testHistoryParsingForFailedAttempts"); LOG.info("STARTING testHistoryParsingForFailedAttempts");
try { try {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf conf.setClass(
.setClass( CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, MyResolver.class, DNSToSwitchMapping.class);
MyResolver.class, DNSToSwitchMapping.class); RackResolver.init(conf);
RackResolver.init(conf); MRApp app = new MRAppWithHistoryWithFailedAttempt(2, 1, true, this
MRApp app = new MRAppWithHistoryWithFailedAttempt(2, 1, true, this.getClass().getName(), .getClass().getName(), true);
true); app.submit(conf);
app.submit(conf); Job job = app.getContext().getAllJobs().values().iterator().next();
Job job = app.getContext().getAllJobs().values().iterator().next(); JobId jobId = job.getID();
JobId jobId = job.getID(); app.waitForState(job, JobState.SUCCEEDED);
app.waitForState(job, JobState.SUCCEEDED);
// make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
String jobhistoryDir = JobHistoryUtils // make sure all events are flushed
.getHistoryIntermediateDoneDirForUser(conf); app.waitForState(Service.STATE.STOPPED);
JobHistory jobHistory = new JobHistory();
jobHistory.init(conf);
JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId) String jobhistoryDir = JobHistoryUtils
.getJobIndexInfo(); .getHistoryIntermediateDoneDirForUser(conf);
String jobhistoryFileName = FileNameIndexUtils JobHistory jobHistory = new JobHistory();
.getDoneFileName(jobIndexInfo); jobHistory.init(conf);
Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName); JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId)
FSDataInputStream in = null; .getJobIndexInfo();
FileContext fc = null; String jobhistoryFileName = FileNameIndexUtils
try { .getDoneFileName(jobIndexInfo);
fc = FileContext.getFileContext(conf);
in = fc.open(fc.makeQualified(historyFilePath));
} catch (IOException ioe) {
LOG.info("Can not open history file: " + historyFilePath, ioe);
throw (new Exception("Can not open History File"));
}
JobHistoryParser parser = new JobHistoryParser(in); Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
JobInfo jobInfo = parser.parse(); FSDataInputStream in = null;
Exception parseException = parser.getParseException(); FileContext fc = null;
Assert.assertNull("Caught an expected exception " + parseException, try {
parseException); fc = FileContext.getFileContext(conf);
int noOffailedAttempts = 0; in = fc.open(fc.makeQualified(historyFilePath));
Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks(); } catch (IOException ioe) {
for (Task task : job.getTasks().values()) { LOG.info("Can not open history file: " + historyFilePath, ioe);
TaskInfo taskInfo = allTasks.get(TypeConverter.fromYarn(task.getID())); throw (new Exception("Can not open History File"));
for (TaskAttempt taskAttempt : task.getAttempts().values()) { }
TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get(
TypeConverter.fromYarn((taskAttempt.getID()))); JobHistoryParser parser = new JobHistoryParser(in);
// Verify rack-name for all task attempts JobInfo jobInfo = parser.parse();
Assert.assertEquals("rack-name is incorrect", taskAttemptInfo Exception parseException = parser.getParseException();
.getRackname(), RACK_NAME); Assert.assertNull("Caught an expected exception " + parseException,
if (taskAttemptInfo.getTaskStatus().equals("FAILED")) { parseException);
noOffailedAttempts++; int noOffailedAttempts = 0;
Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
for (Task task : job.getTasks().values()) {
TaskInfo taskInfo = allTasks.get(TypeConverter.fromYarn(task.getID()));
for (TaskAttempt taskAttempt : task.getAttempts().values()) {
TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get(
TypeConverter.fromYarn((taskAttempt.getID())));
// Verify rack-name for all task attempts
Assert.assertEquals("rack-name is incorrect",
taskAttemptInfo.getRackname(), RACK_NAME);
if (taskAttemptInfo.getTaskStatus().equals("FAILED")) {
noOffailedAttempts++;
}
} }
} }
} Assert.assertEquals("No of Failed tasks doesn't match.", 2,
Assert.assertEquals("No of Failed tasks doesn't match.", 2, noOffailedAttempts); noOffailedAttempts);
} finally { } finally {
LOG.info("FINISHED testHistoryParsingForFailedAttempts"); LOG.info("FINISHED testHistoryParsingForFailedAttempts");
} }
} }
@Test (timeout=5000) @Test(timeout = 60000)
public void testCountersForFailedTask() throws Exception { public void testCountersForFailedTask() throws Exception {
LOG.info("STARTING testCountersForFailedTask"); LOG.info("STARTING testCountersForFailedTask");
try { try {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf conf.setClass(
.setClass( CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, MyResolver.class, DNSToSwitchMapping.class);
MyResolver.class, DNSToSwitchMapping.class); RackResolver.init(conf);
RackResolver.init(conf); MRApp app = new MRAppWithHistoryWithFailedTask(2, 1, true, this
MRApp app = new MRAppWithHistoryWithFailedTask(2, 1, true, .getClass().getName(), true);
this.getClass().getName(), true); app.submit(conf);
app.submit(conf); Job job = app.getContext().getAllJobs().values().iterator().next();
Job job = app.getContext().getAllJobs().values().iterator().next(); JobId jobId = job.getID();
JobId jobId = job.getID(); app.waitForState(job, JobState.FAILED);
app.waitForState(job, JobState.FAILED);
// make sure all events are flushed // make sure all events are flushed
app.waitForState(Service.STATE.STOPPED); app.waitForState(Service.STATE.STOPPED);
String jobhistoryDir = JobHistoryUtils String jobhistoryDir = JobHistoryUtils
.getHistoryIntermediateDoneDirForUser(conf); .getHistoryIntermediateDoneDirForUser(conf);
JobHistory jobHistory = new JobHistory(); JobHistory jobHistory = new JobHistory();
jobHistory.init(conf); jobHistory.init(conf);
JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId) JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId)
.getJobIndexInfo(); .getJobIndexInfo();
String jobhistoryFileName = FileNameIndexUtils String jobhistoryFileName = FileNameIndexUtils
.getDoneFileName(jobIndexInfo); .getDoneFileName(jobIndexInfo);
Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName); Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
FSDataInputStream in = null; FSDataInputStream in = null;
FileContext fc = null; FileContext fc = null;
try { try {
fc = FileContext.getFileContext(conf); fc = FileContext.getFileContext(conf);
in = fc.open(fc.makeQualified(historyFilePath)); in = fc.open(fc.makeQualified(historyFilePath));
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.info("Can not open history file: " + historyFilePath, ioe); LOG.info("Can not open history file: " + historyFilePath, ioe);
throw (new Exception("Can not open History File")); throw (new Exception("Can not open History File"));
} }
JobHistoryParser parser = new JobHistoryParser(in); JobHistoryParser parser = new JobHistoryParser(in);
JobInfo jobInfo = parser.parse(); JobInfo jobInfo = parser.parse();
Exception parseException = parser.getParseException(); Exception parseException = parser.getParseException();
Assert.assertNull("Caught an expected exception " + parseException, Assert.assertNull("Caught an expected exception " + parseException,
parseException); parseException);
for (Map.Entry<TaskID,TaskInfo> entry : jobInfo.getAllTasks().entrySet()) { for (Map.Entry<TaskID, TaskInfo> entry : jobInfo.getAllTasks().entrySet()) {
TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey()); TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey());
CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue()); CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue());
Assert.assertNotNull("completed task report has null counters", Assert.assertNotNull("completed task report has null counters", ct
ct.getReport().getCounters()); .getReport().getCounters());
//Make sure all the completedTask has counters, and the counters are not empty }
Assert.assertTrue(ct.getReport().getCounters()
.getAllCounterGroups().size() > 0);
}
} finally { } finally {
LOG.info("FINISHED testCountersForFailedTask"); LOG.info("FINISHED testCountersForFailedTask");
} }
} }
@Test (timeout=50000) @Test(timeout = 50000)
public void testScanningOldDirs() throws Exception { public void testScanningOldDirs() throws Exception {
LOG.info("STARTING testScanningOldDirs"); LOG.info("STARTING testScanningOldDirs");
try { try {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf conf.setClass(
.setClass( CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, MyResolver.class, DNSToSwitchMapping.class);
MyResolver.class, DNSToSwitchMapping.class); RackResolver.init(conf);
RackResolver.init(conf); MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
MRApp app = true);
new MRAppWithHistory(1, 1, true, app.submit(conf);
this.getClass().getName(), true); Job job = app.getContext().getAllJobs().values().iterator().next();
app.submit(conf); JobId jobId = job.getID();
Job job = app.getContext().getAllJobs().values().iterator().next(); LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
JobId jobId = job.getID(); app.waitForState(job, JobState.SUCCEEDED);
LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
app.waitForState(job, JobState.SUCCEEDED);
// make sure all events are flushed // make sure all events are flushed
app.waitForState(Service.STATE.STOPPED); app.waitForState(Service.STATE.STOPPED);
HistoryFileManagerForTest hfm = new HistoryFileManagerForTest(); HistoryFileManagerForTest hfm = new HistoryFileManagerForTest();
hfm.init(conf); hfm.init(conf);
HistoryFileInfo fileInfo = hfm.getFileInfo(jobId); HistoryFileInfo fileInfo = hfm.getFileInfo(jobId);
Assert.assertNotNull("Unable to locate job history", fileInfo); Assert.assertNotNull("Unable to locate job history", fileInfo);
// force the manager to "forget" the job // force the manager to "forget" the job
hfm.deleteJobFromJobListCache(fileInfo); hfm.deleteJobFromJobListCache(fileInfo);
final int msecPerSleep = 10; final int msecPerSleep = 10;
int msecToSleep = 10 * 1000; int msecToSleep = 10 * 1000;
while (fileInfo.isMovePending() && msecToSleep > 0) { while (fileInfo.isMovePending() && msecToSleep > 0) {
Assert.assertTrue(!fileInfo.didMoveFail()); Assert.assertTrue(!fileInfo.didMoveFail());
msecToSleep -= msecPerSleep; msecToSleep -= msecPerSleep;
Thread.sleep(msecPerSleep); Thread.sleep(msecPerSleep);
} }
Assert.assertTrue("Timeout waiting for history move", msecToSleep > 0); Assert.assertTrue("Timeout waiting for history move", msecToSleep > 0);
fileInfo = hfm.getFileInfo(jobId); fileInfo = hfm.getFileInfo(jobId);
Assert.assertNotNull("Unable to locate old job history", fileInfo); Assert.assertNotNull("Unable to locate old job history", fileInfo);
} finally { } finally {
LOG.info("FINISHED testScanningOldDirs"); LOG.info("FINISHED testScanningOldDirs");
} }
} }
static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory { static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory {
public MRAppWithHistoryWithFailedAttempt(int maps, int reduces, boolean autoComplete, public MRAppWithHistoryWithFailedAttempt(int maps, int reduces,
String testName, boolean cleanOnStart) { boolean autoComplete, String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart); super(maps, reduces, autoComplete, testName, cleanOnStart);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
protected void attemptLaunched(TaskAttemptId attemptID) { protected void attemptLaunched(TaskAttemptId attemptID) {
@ -558,8 +564,8 @@ public class TestJobHistoryParsing {
static class MRAppWithHistoryWithFailedTask extends MRAppWithHistory { static class MRAppWithHistoryWithFailedTask extends MRAppWithHistory {
public MRAppWithHistoryWithFailedTask(int maps, int reduces, boolean autoComplete, public MRAppWithHistoryWithFailedTask(int maps, int reduces,
String testName, boolean cleanOnStart) { boolean autoComplete, String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart); super(maps, reduces, autoComplete, testName, cleanOnStart);
} }
@ -587,4 +593,128 @@ public class TestJobHistoryParsing {
t.testHistoryParsing(); t.testHistoryParsing();
t.testHistoryParsingForFailedAttempts(); t.testHistoryParsingForFailedAttempts();
} }
/**
* test clean old history files. Files should be deleted after 1 week by
* default.
*/
@Test(timeout = 15000)
public void testDeleteFileInfo() throws Exception {
LOG.info("STARTING testDeleteFileInfo");
try {
Configuration conf = new Configuration();
conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf);
MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
app.waitForState(job, JobState.SUCCEEDED);
// make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
HistoryFileManager hfm = new HistoryFileManager();
hfm.init(conf);
HistoryFileInfo fileInfo = hfm.getFileInfo(jobId);
hfm.initExisting();
// wait for move files form the done_intermediate directory to the gone
// directory
while (fileInfo.isMovePending()) {
Thread.sleep(300);
}
Assert.assertNotNull(hfm.jobListCache.values());
// try to remove fileInfo
hfm.clean();
// check that fileInfo does not deleted
Assert.assertFalse(fileInfo.isDeleted());
// correct live time
hfm.setMaxHistoryAge(-1);
hfm.clean();
// should be deleted !
Assert.assertTrue("file should be deleted ", fileInfo.isDeleted());
} finally {
LOG.info("FINISHED testDeleteFileInfo");
}
}
/**
* Simple test some methods of JobHistory
*/
@Test(timeout = 20000)
public void testJobHistoryMethods() throws Exception {
LOG.info("STARTING testJobHistoryMethods");
try {
Configuration configuration = new Configuration();
configuration
.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(configuration);
MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
true);
app.submit(configuration);
Job job = app.getContext().getAllJobs().values().iterator().next();
app.waitForState(job, JobState.SUCCEEDED);
JobHistory jobHistory = new JobHistory();
jobHistory.init(configuration);
// Method getAllJobs
Assert.assertEquals(1, jobHistory.getAllJobs().size());
// and with ApplicationId
Assert.assertEquals(1, jobHistory.getAllJobs(app.getAppID()).size());
JobsInfo jobsinfo = jobHistory.getPartialJobs(0L, 10L, null, "default",
0L, System.currentTimeMillis() + 1, 0L,
System.currentTimeMillis() + 1, JobState.SUCCEEDED);
Assert.assertEquals(1, jobsinfo.getJobs().size());
Assert.assertNotNull(jobHistory.getApplicationAttemptId());
// test Application Id
Assert.assertEquals("application_0_0000", jobHistory.getApplicationID()
.toString());
Assert
.assertEquals("Job History Server", jobHistory.getApplicationName());
// method does not work
Assert.assertNull(jobHistory.getEventHandler());
// method does not work
Assert.assertNull(jobHistory.getClock());
// method does not work
Assert.assertNull(jobHistory.getClusterInfo());
} finally {
LOG.info("FINISHED testJobHistoryMethods");
}
}
/**
* Simple test PartialJob
*/
@Test(timeout = 1000)
public void testPartialJob() throws Exception {
JobId jobId = new JobIdPBImpl();
jobId.setId(0);
JobIndexInfo jii = new JobIndexInfo(0L, System.currentTimeMillis(), "user",
"jobName", jobId, 3, 2, "JobStatus");
PartialJob test = new PartialJob(jii, jobId);
assertEquals(1.0f, test.getProgress(), 0.001);
assertNull(test.getAllCounters());
assertNull(test.getTasks());
assertNull(test.getTasks(TaskType.MAP));
assertNull(test.getTask(new TaskIdPBImpl()));
assertNull(test.getTaskAttemptCompletionEvents(0, 100));
assertNull(test.getMapAttemptCompletionEvents(0, 100));
assertTrue(test.checkAccess(UserGroupInformation.getCurrentUser(), null));
assertNull(test.getAMInfos());
}
} }

View File

@ -0,0 +1,209 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportResponse;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryParsing.MyResolver;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.service.Service.STATE;
import org.apache.hadoop.yarn.util.RackResolver;
import org.junit.After;
import org.junit.Test;
import static org.junit.Assert.*;
/*
test JobHistoryServer protocols....
*/
public class TestJobHistoryServer {
private static RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
JobHistoryServer historyServer=null;
// simple test init/start/stop JobHistoryServer. Status should change.
@Test (timeout= 50000 )
public void testStartStopServer() throws Exception {
historyServer = new JobHistoryServer();
Configuration config = new Configuration();
historyServer.init(config);
assertEquals(STATE.INITED, historyServer.getServiceState());
assertEquals(3, historyServer.getServices().size());
historyServer.start();
assertEquals(STATE.STARTED, historyServer.getServiceState());
historyServer.stop();
assertEquals(STATE.STOPPED, historyServer.getServiceState());
assertNotNull(historyServer.getClientService());
HistoryClientService historyService = historyServer.getClientService();
assertNotNull(historyService.getClientHandler().getConnectAddress());
}
//Test reports of JobHistoryServer. History server should get log files from MRApp and read them
@Test (timeout= 50000 )
public void testReports() throws Exception {
Configuration config = new Configuration();
config
.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(config);
MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
true);
app.submit(config);
Job job = app.getContext().getAllJobs().values().iterator().next();
app.waitForState(job, JobState.SUCCEEDED);
historyServer = new JobHistoryServer();
historyServer.init(config);
historyServer.start();
// search JobHistory service
JobHistory jobHistory= null;
for (Service service : historyServer.getServices() ) {
if (service instanceof JobHistory) {
jobHistory = (JobHistory) service;
}
};
Map<JobId, Job> jobs= jobHistory.getAllJobs();
assertEquals(1, jobs.size());
assertEquals("job_0_0000",jobs.keySet().iterator().next().toString());
Task task = job.getTasks().values().iterator().next();
TaskAttempt attempt = task.getAttempts().values().iterator().next();
HistoryClientService historyService = historyServer.getClientService();
MRClientProtocol protocol = historyService.getClientHandler();
GetTaskAttemptReportRequest gtarRequest = recordFactory
.newRecordInstance(GetTaskAttemptReportRequest.class);
// test getTaskAttemptReport
TaskAttemptId taId = attempt.getID();
taId.setTaskId(task.getID());
taId.getTaskId().setJobId(job.getID());
gtarRequest.setTaskAttemptId(taId);
GetTaskAttemptReportResponse response = protocol
.getTaskAttemptReport(gtarRequest);
assertEquals("container_0_0000_01_000000", response.getTaskAttemptReport()
.getContainerId().toString());
assertTrue(response.getTaskAttemptReport().getDiagnosticInfo().isEmpty());
// counters
assertNotNull(response.getTaskAttemptReport().getCounters()
.getCounter(TaskCounter.PHYSICAL_MEMORY_BYTES));
assertEquals(taId.toString(), response.getTaskAttemptReport()
.getTaskAttemptId().toString());
// test getTaskReport
GetTaskReportRequest request = recordFactory
.newRecordInstance(GetTaskReportRequest.class);
TaskId taskId = task.getID();
taskId.setJobId(job.getID());
request.setTaskId(taskId);
GetTaskReportResponse reportResponse = protocol.getTaskReport(request);
assertEquals("", reportResponse.getTaskReport().getDiagnosticsList()
.iterator().next());
// progress
assertEquals(1.0f, reportResponse.getTaskReport().getProgress(), 0.01);
// report has corrected taskId
assertEquals(taskId.toString(), reportResponse.getTaskReport().getTaskId()
.toString());
// Task state should be SUCCEEDED
assertEquals(TaskState.SUCCEEDED, reportResponse.getTaskReport()
.getTaskState());
// test getTaskAttemptCompletionEvents
GetTaskAttemptCompletionEventsRequest taskAttemptRequest = recordFactory
.newRecordInstance(GetTaskAttemptCompletionEventsRequest.class);
taskAttemptRequest.setJobId(job.getID());
GetTaskAttemptCompletionEventsResponse taskAttemptCompletionEventsResponse = protocol
.getTaskAttemptCompletionEvents(taskAttemptRequest);
assertEquals(0, taskAttemptCompletionEventsResponse.getCompletionEventCount());
// test getDiagnostics
GetDiagnosticsRequest diagnosticRequest = recordFactory
.newRecordInstance(GetDiagnosticsRequest.class);
diagnosticRequest.setTaskAttemptId(taId);
GetDiagnosticsResponse diagnosticResponse = protocol
.getDiagnostics(diagnosticRequest);
// it is strange : why one empty string ?
assertEquals(1, diagnosticResponse.getDiagnosticsCount());
assertEquals("", diagnosticResponse.getDiagnostics(0));
}
// test main method
@Test (timeout =60000)
public void testMainMethod() throws Exception {
ExitUtil.disableSystemExit();
try {
JobHistoryServer.main(new String[0]);
} catch (ExitUtil.ExitException e) {
assertEquals(0,e.status);
ExitUtil.resetFirstExitException();
fail();
}
}
@After
public void stop(){
if(historyServer !=null && !STATE.STOPPED.equals(historyServer.getServiceState())){
historyServer.stop();
}
}
}