From f7df0cb7df1d2d9e8574b2409cff2747b7c20352 Mon Sep 17 00:00:00 2001 From: Robert Joseph Evans Date: Wed, 20 Mar 2013 16:05:33 +0000 Subject: [PATCH] MAPREDUCE-4972. Coverage fixing for org.apache.hadoop.mapreduce.jobhistory (Aleksey Gorshkov via bobby) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1458906 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapreduce/jobhistory/TestEvents.java | 397 ++++++++++++++++++ .../TestJobHistoryEventHandler.java | 36 +- .../v2/hs/TestJobHistoryEntities.java | 6 +- .../mapreduce/v2/hs/TestJobHistoryEvents.java | 17 +- .../v2/hs/TestJobHistoryParsing.java | 47 ++- 6 files changed, 465 insertions(+), 41 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 65e6e3aedb4..4c2e769d659 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -786,6 +786,9 @@ Release 0.23.7 - UNRELEASED MAPREDUCE-5027. Shuffle does not limit number of outstanding connections (Robert Parker via jeagles) + MAPREDUCE-4972. Coverage fixing for org.apache.hadoop.mapreduce.jobhistory + (Aleksey Gorshkov via bobby) + OPTIMIZATIONS MAPREDUCE-4946. Fix a performance problem for large jobs by reducing the diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java new file mode 100644 index 00000000000..8f1e4b9c2b7 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java @@ -0,0 +1,397 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.jobhistory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.util.ArrayList; +import java.util.Arrays; + +import static junit.framework.Assert.*; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapred.JobPriority; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.junit.Test; + +public class TestEvents { + + /** + * test a getters of TaskAttemptFinishedEvent and TaskAttemptFinished + * + * @throws Exception + */ + @Test(timeout = 10000) + public void testTaskAttemptFinishedEvent() throws Exception { + + JobID jid = new JobID("001", 1); + TaskID tid = new TaskID(jid, TaskType.REDUCE, 2); + TaskAttemptID taskAttemptId = new TaskAttemptID(tid, 3); + Counters counters = new Counters(); + TaskAttemptFinishedEvent test = new TaskAttemptFinishedEvent(taskAttemptId, + TaskType.REDUCE, "TEST", 123L, "RAKNAME", "HOSTNAME", "STATUS", + counters); + assertEquals(test.getAttemptId().toString(), taskAttemptId.toString()); + + assertEquals(test.getCounters(), counters); + assertEquals(test.getFinishTime(), 123L); + assertEquals(test.getHostname(), "HOSTNAME"); + assertEquals(test.getRackName(), "RAKNAME"); + assertEquals(test.getState(), "STATUS"); + assertEquals(test.getTaskId(), tid); + assertEquals(test.getTaskStatus(), "TEST"); + assertEquals(test.getTaskType(), TaskType.REDUCE); + + } + + /** + * simple test JobPriorityChangeEvent and JobPriorityChange + * + * @throws Exception + */ + + @Test(timeout = 10000) + public void testJobPriorityChange() throws Exception { + org.apache.hadoop.mapreduce.JobID jid = new JobID("001", 1); + JobPriorityChangeEvent test = new JobPriorityChangeEvent(jid, + JobPriority.LOW); + assertEquals(test.getJobId().toString(), jid.toString()); + assertEquals(test.getPriority(), JobPriority.LOW); + + } + + /** + * simple test TaskUpdatedEvent and TaskUpdated + * + * @throws Exception + */ + @Test(timeout = 10000) + public void testTaskUpdated() throws Exception { + JobID jid = new JobID("001", 1); + TaskID tid = new TaskID(jid, TaskType.REDUCE, 2); + TaskUpdatedEvent test = new TaskUpdatedEvent(tid, 1234L); + assertEquals(test.getTaskId().toString(), tid.toString()); + assertEquals(test.getFinishTime(), 1234L); + + } + + /* + * test EventReader EventReader should read the list of events and return + * instance of HistoryEvent Different HistoryEvent should have a different + * datum. + */ + @Test(timeout = 10000) + public void testEvents() throws Exception { + + EventReader reader = new EventReader(new DataInputStream( + new ByteArrayInputStream(getEvents()))); + HistoryEvent e = reader.getNextEvent(); + assertTrue(e.getEventType().equals(EventType.JOB_PRIORITY_CHANGED)); + assertEquals("ID", ((JobPriorityChange) e.getDatum()).jobid.toString()); + + e = reader.getNextEvent(); + assertTrue(e.getEventType().equals(EventType.JOB_STATUS_CHANGED)); + assertEquals("ID", ((JobStatusChanged) e.getDatum()).jobid.toString()); + + e = reader.getNextEvent(); + assertTrue(e.getEventType().equals(EventType.TASK_UPDATED)); + assertEquals("ID", ((TaskUpdated) e.getDatum()).taskid.toString()); + + e = reader.getNextEvent(); + assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_KILLED)); + assertEquals("task_1_2_r03_4", + ((TaskAttemptUnsuccessfulCompletion) e.getDatum()).taskid.toString()); + + e = reader.getNextEvent(); + assertTrue(e.getEventType().equals(EventType.JOB_KILLED)); + assertEquals("ID", + ((JobUnsuccessfulCompletion) e.getDatum()).jobid.toString()); + + e = reader.getNextEvent(); + assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_STARTED)); + assertEquals("task_1_2_r03_4", + ((TaskAttemptStarted) e.getDatum()).taskid.toString()); + + e = reader.getNextEvent(); + assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_FINISHED)); + assertEquals("task_1_2_r03_4", + ((TaskAttemptFinished) e.getDatum()).taskid.toString()); + + e = reader.getNextEvent(); + assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_KILLED)); + assertEquals("task_1_2_r03_4", + ((TaskAttemptUnsuccessfulCompletion) e.getDatum()).taskid.toString()); + + e = reader.getNextEvent(); + assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_KILLED)); + assertEquals("task_1_2_r03_4", + ((TaskAttemptUnsuccessfulCompletion) e.getDatum()).taskid.toString()); + + e = reader.getNextEvent(); + assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_STARTED)); + assertEquals("task_1_2_r03_4", + ((TaskAttemptStarted) e.getDatum()).taskid.toString()); + + e = reader.getNextEvent(); + assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_FINISHED)); + assertEquals("task_1_2_r03_4", + ((TaskAttemptFinished) e.getDatum()).taskid.toString()); + + e = reader.getNextEvent(); + assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_KILLED)); + assertEquals("task_1_2_r03_4", + ((TaskAttemptUnsuccessfulCompletion) e.getDatum()).taskid.toString()); + + e = reader.getNextEvent(); + assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_KILLED)); + assertEquals("task_1_2_r03_4", + ((TaskAttemptUnsuccessfulCompletion) e.getDatum()).taskid.toString()); + + reader.close(); + } + + /* + * makes array of bytes with History events + */ + private byte[] getEvents() throws Exception { + ByteArrayOutputStream output = new ByteArrayOutputStream(); + FSDataOutputStream fsOutput = new FSDataOutputStream(output, + new FileSystem.Statistics("scheme")); + EventWriter writer = new EventWriter(fsOutput); + writer.write(getJobPriorityChangedEvent()); + writer.write(getJobStatusChangedEvent()); + writer.write(getTaskUpdatedEvent()); + writer.write(getReduceAttemptKilledEvent()); + writer.write(getJobKilledEvent()); + writer.write(getSetupAttemptStartedEvent()); + writer.write(getTaskAttemptFinishedEvent()); + writer.write(getSetupAttemptFieledEvent()); + writer.write(getSetupAttemptKilledEvent()); + writer.write(getCleanupAttemptStartedEvent()); + writer.write(getCleanupAttemptFinishedEvent()); + writer.write(getCleanupAttemptFiledEvent()); + writer.write(getCleanupAttemptKilledEvent()); + + writer.flush(); + writer.close(); + + return output.toByteArray(); + } + + private FakeEvent getCleanupAttemptKilledEvent() { + FakeEvent result = new FakeEvent(EventType.CLEANUP_ATTEMPT_KILLED); + + result.setDatum(getTaskAttemptUnsuccessfulCompletion()); + return result; + } + + private FakeEvent getCleanupAttemptFiledEvent() { + FakeEvent result = new FakeEvent(EventType.CLEANUP_ATTEMPT_FAILED); + + result.setDatum(getTaskAttemptUnsuccessfulCompletion()); + return result; + } + + private TaskAttemptUnsuccessfulCompletion getTaskAttemptUnsuccessfulCompletion() { + TaskAttemptUnsuccessfulCompletion datum = new TaskAttemptUnsuccessfulCompletion(); + datum.attemptId = "attempt_1_2_r3_4_5"; + datum.clockSplits = Arrays.asList(1, 2, 3); + datum.cpuUsages = Arrays.asList(100, 200, 300); + datum.error = "Error"; + datum.finishTime = 2; + datum.hostname = "hostname"; + datum.rackname = "rackname"; + datum.physMemKbytes = Arrays.asList(1000, 2000, 3000); + datum.taskid = "task_1_2_r03_4"; + datum.port = 1000; + datum.taskType = "REDUCE"; + datum.status = "STATUS"; + datum.counters = getCounters(); + datum.vMemKbytes = Arrays.asList(1000, 2000, 3000); + return datum; + } + + private JhCounters getCounters() { + JhCounters counters = new JhCounters(); + counters.groups = new ArrayList(0); + counters.name = "name"; + return counters; + } + + private FakeEvent getCleanupAttemptFinishedEvent() { + FakeEvent result = new FakeEvent(EventType.CLEANUP_ATTEMPT_FINISHED); + TaskAttemptFinished datum = new TaskAttemptFinished(); + datum.attemptId = "attempt_1_2_r3_4_5"; + + datum.counters = getCounters(); + datum.finishTime = 2; + datum.hostname = "hostname"; + datum.rackname = "rackName"; + datum.state = "state"; + datum.taskid = "task_1_2_r03_4"; + datum.taskStatus = "taskStatus"; + datum.taskType = "REDUCE"; + result.setDatum(datum); + return result; + } + + private FakeEvent getCleanupAttemptStartedEvent() { + FakeEvent result = new FakeEvent(EventType.CLEANUP_ATTEMPT_STARTED); + TaskAttemptStarted datum = new TaskAttemptStarted(); + + datum.attemptId = "attempt_1_2_r3_4_5"; + datum.avataar = "avatar"; + datum.containerId = "containerId"; + datum.httpPort = 10000; + datum.locality = "locality"; + datum.shufflePort = 10001; + datum.startTime = 1; + datum.taskid = "task_1_2_r03_4"; + datum.taskType = "taskType"; + datum.trackerName = "trackerName"; + result.setDatum(datum); + return result; + } + + private FakeEvent getSetupAttemptKilledEvent() { + FakeEvent result = new FakeEvent(EventType.SETUP_ATTEMPT_KILLED); + result.setDatum(getTaskAttemptUnsuccessfulCompletion()); + return result; + } + + private FakeEvent getSetupAttemptFieledEvent() { + FakeEvent result = new FakeEvent(EventType.SETUP_ATTEMPT_FAILED); + + result.setDatum(getTaskAttemptUnsuccessfulCompletion()); + return result; + } + + private FakeEvent getTaskAttemptFinishedEvent() { + FakeEvent result = new FakeEvent(EventType.SETUP_ATTEMPT_FINISHED); + TaskAttemptFinished datum = new TaskAttemptFinished(); + + datum.attemptId = "attempt_1_2_r3_4_5"; + datum.counters = getCounters(); + datum.finishTime = 2; + datum.hostname = "hostname"; + datum.rackname = "rackname"; + datum.state = "state"; + datum.taskid = "task_1_2_r03_4"; + datum.taskStatus = "taskStatus"; + datum.taskType = "REDUCE"; + result.setDatum(datum); + return result; + } + + private FakeEvent getSetupAttemptStartedEvent() { + FakeEvent result = new FakeEvent(EventType.SETUP_ATTEMPT_STARTED); + TaskAttemptStarted datum = new TaskAttemptStarted(); + datum.attemptId = "ID"; + datum.avataar = "avataar"; + datum.containerId = "containerId"; + datum.httpPort = 10000; + datum.locality = "locality"; + datum.shufflePort = 10001; + datum.startTime = 1; + datum.taskid = "task_1_2_r03_4"; + datum.taskType = "taskType"; + datum.trackerName = "trackerName"; + result.setDatum(datum); + return result; + } + + private FakeEvent getJobKilledEvent() { + FakeEvent result = new FakeEvent(EventType.JOB_KILLED); + JobUnsuccessfulCompletion datum = new JobUnsuccessfulCompletion(); + datum.finishedMaps = 1; + datum.finishedReduces = 2; + datum.finishTime = 3; + datum.jobid = "ID"; + datum.jobStatus = "STATUS"; + result.setDatum(datum); + return result; + } + + private FakeEvent getReduceAttemptKilledEvent() { + FakeEvent result = new FakeEvent(EventType.REDUCE_ATTEMPT_KILLED); + + result.setDatum(getTaskAttemptUnsuccessfulCompletion()); + return result; + } + + private FakeEvent getJobPriorityChangedEvent() { + FakeEvent result = new FakeEvent(EventType.JOB_PRIORITY_CHANGED); + JobPriorityChange datum = new JobPriorityChange(); + datum.jobid = "ID"; + datum.priority = "priority"; + result.setDatum(datum); + return result; + } + + private FakeEvent getJobStatusChangedEvent() { + FakeEvent result = new FakeEvent(EventType.JOB_STATUS_CHANGED); + JobStatusChanged datum = new JobStatusChanged(); + datum.jobid = "ID"; + datum.jobStatus = "newStatus"; + result.setDatum(datum); + return result; + } + + private FakeEvent getTaskUpdatedEvent() { + FakeEvent result = new FakeEvent(EventType.TASK_UPDATED); + TaskUpdated datum = new TaskUpdated(); + datum.finishTime = 2; + datum.taskid = "ID"; + result.setDatum(datum); + return result; + } + + private class FakeEvent implements HistoryEvent { + private EventType eventType; + private Object datum; + + public FakeEvent(EventType eventType) { + this.eventType = eventType; + } + + @Override + public EventType getEventType() { + return eventType; + } + + @Override + public Object getDatum() { + + return datum; + } + + @Override + public void setDatum(Object datum) { + this.datum = datum; + } + + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java index 0af67429e5d..3d3994f8575 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; @@ -53,7 +54,7 @@ public class TestJobHistoryEventHandler { private static final Log LOG = LogFactory .getLog(TestJobHistoryEventHandler.class); - @Test + @Test (timeout=50000) public void testFirstFlushOnCompletionEvent() throws Exception { TestParams t = new TestParams(); Configuration conf = new Configuration(); @@ -96,7 +97,7 @@ public class TestJobHistoryEventHandler { } } - @Test + @Test (timeout=50000) public void testMaxUnflushedCompletionEvents() throws Exception { TestParams t = new TestParams(); Configuration conf = new Configuration(); @@ -131,17 +132,17 @@ public class TestJobHistoryEventHandler { handleNextNEvents(jheh, 1); verify(mockWriter).flush(); - + handleNextNEvents(jheh, 50); verify(mockWriter, times(6)).flush(); - + } finally { jheh.stop(); verify(mockWriter).close(); } } - - @Test + + @Test (timeout=50000) public void testUnflushedTimer() throws Exception { TestParams t = new TestParams(); Configuration conf = new Configuration(); @@ -181,8 +182,8 @@ public class TestJobHistoryEventHandler { verify(mockWriter).close(); } } - - @Test + + @Test (timeout=50000) public void testBatchedFlushJobEndMultiplier() throws Exception { TestParams t = new TestParams(); Configuration conf = new Configuration(); @@ -265,7 +266,7 @@ public class TestJobHistoryEventHandler { when(mockContext.getApplicationID()).thenReturn(appId); return mockContext; } - + private class TestParams { String workDir = setupTestWorkDir(); @@ -279,12 +280,8 @@ public class TestJobHistoryEventHandler { } private JobHistoryEvent getEventToEnqueue(JobId jobId) { - JobHistoryEvent toReturn = Mockito.mock(JobHistoryEvent.class); - HistoryEvent he = Mockito.mock(HistoryEvent.class); - Mockito.when(he.getEventType()).thenReturn(EventType.JOB_STATUS_CHANGED); - Mockito.when(toReturn.getHistoryEvent()).thenReturn(he); - Mockito.when(toReturn.getJobID()).thenReturn(jobId); - return toReturn; + HistoryEvent toReturn = new JobStatusChangedEvent(new JobID(Integer.toString(jobId.getId()), jobId.getId()), "change status"); + return new JobHistoryEvent(jobId, toReturn); } @Test @@ -344,8 +341,6 @@ public class TestJobHistoryEventHandler { class JHEvenHandlerForTest extends JobHistoryEventHandler { private EventWriter eventWriter; - volatile int handleEventCompleteCalls = 0; - volatile int handleEventStartedCalls = 0; public JHEvenHandlerForTest(AppContext context, int startCount) { super(context, startCount); @@ -354,7 +349,7 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler { @Override public void start() { } - + @Override protected EventWriter createEventWriter(Path historyFilePath) throws IOException { @@ -365,7 +360,7 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler { @Override protected void closeEventWriter(JobId jobId) { } - + public EventWriter getEventWriter() { return this.eventWriter; } @@ -375,13 +370,12 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler { * Class to help with testSigTermedFunctionality */ class JHEventHandlerForSigtermTest extends JobHistoryEventHandler { - private MetaInfo metaInfo; public JHEventHandlerForSigtermTest(AppContext context, int startCount) { super(context, startCount); } public void addToFileMap(JobId jobId) { - metaInfo = Mockito.mock(MetaInfo.class); + MetaInfo metaInfo = Mockito.mock(MetaInfo.class); Mockito.when(metaInfo.isWriterActive()).thenReturn(true); fileMap.put(jobId, metaInfo); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java index 69b4bd7ac3b..837f9e17b73 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java @@ -79,7 +79,7 @@ public class TestJobHistoryEntities { } /* Verify some expected values based on the history file */ - @Test + @Test (timeout=10000) public void testCompletedJob() throws Exception { HistoryFileInfo info = mock(HistoryFileInfo.class); when(info.getConfFile()).thenReturn(fullConfPath); @@ -104,7 +104,7 @@ public class TestJobHistoryEntities { assertEquals(JobState.SUCCEEDED, jobReport.getJobState()); } - @Test + @Test (timeout=10000) public void testCompletedTask() throws Exception { HistoryFileInfo info = mock(HistoryFileInfo.class); when(info.getConfFile()).thenReturn(fullConfPath); @@ -133,7 +133,7 @@ public class TestJobHistoryEntities { assertEquals(rt1Id, rt1Report.getTaskId()); } - @Test + @Test (timeout=10000) public void testCompletedTaskAttempt() throws Exception { HistoryFileInfo info = mock(HistoryFileInfo.class); when(info.getConfFile()).thenReturn(fullConfPath); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java index 58ab9c8117e..6b14b1c6f95 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java @@ -25,7 +25,6 @@ import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; @@ -67,8 +66,17 @@ public class TestJobHistoryEvents { * completed maps */ HistoryContext context = new JobHistory(); + // test start and stop states ((JobHistory)context).init(conf); - Job parsedJob = context.getJob(jobId); + ((JobHistory)context).start(); + Assert.assertTrue( context.getStartTime()>0); + Assert.assertEquals(((JobHistory)context).getServiceState(),Service.STATE.STARTED); + + + ((JobHistory)context).stop(); + Assert.assertEquals(((JobHistory)context).getServiceState(),Service.STATE.STOPPED); + Job parsedJob = context.getJob(jobId); + Assert.assertEquals("CompletedMaps not correct", 2, parsedJob.getCompletedMaps()); Assert.assertEquals(System.getProperty("user.name"), parsedJob.getUserName()); @@ -177,9 +185,8 @@ public class TestJobHistoryEvents { @Override protected EventHandler createJobHistoryHandler( AppContext context) { - JobHistoryEventHandler eventHandler = new JobHistoryEventHandler( - context, getStartCount()); - return eventHandler; + return new JobHistoryEventHandler( + context, getStartCount()); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java index ae13ba0d355..073a17bde17 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java @@ -18,7 +18,9 @@ package org.apache.hadoop.mapreduce.v2.hs; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.PrintStream; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -40,6 +42,7 @@ import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.EventReader; import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; @@ -60,7 +63,6 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo; import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory; import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; -import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; import org.apache.hadoop.net.DNSToSwitchMapping; @@ -78,6 +80,8 @@ public class TestJobHistoryParsing { private static final String RACK_NAME = "/MyRackName"; + private ByteArrayOutputStream outContent = new ByteArrayOutputStream(); + public static class MyResolver implements DNSToSwitchMapping { @Override public List resolve(List names) { @@ -89,14 +93,14 @@ public class TestJobHistoryParsing { } } - @Test + @Test (timeout=50000) public void testJobInfo() throws Exception { JobInfo info = new JobInfo(); Assert.assertEquals("NORMAL", info.getPriority()); info.printAll(); } - @Test + @Test (timeout=50000) public void testHistoryParsing() throws Exception { LOG.info("STARTING testHistoryParsing()"); try { @@ -106,7 +110,7 @@ public class TestJobHistoryParsing { } } - @Test + @Test (timeout=50000) public void testHistoryParsingWithParseErrors() throws Exception { LOG.info("STARTING testHistoryParsingWithParseErrors()"); try { @@ -321,18 +325,37 @@ public class TestJobHistoryParsing { } } } + + // test output for HistoryViewer + PrintStream stdps=System.out; + try { + System.setOut(new PrintStream(outContent)); + HistoryViewer viewer = new HistoryViewer(fc.makeQualified( + fileInfo.getHistoryFile()).toString(), conf, true); + viewer.print(); + + for (TaskInfo taskInfo : allTasks.values()) { + + String test= (taskInfo.getTaskStatus()==null?"":taskInfo.getTaskStatus())+" "+taskInfo.getTaskType()+" task list for "+taskInfo.getTaskId().getJobID(); + Assert.assertTrue(outContent.toString().indexOf(test)>0); + Assert.assertTrue(outContent.toString().indexOf(taskInfo.getTaskId().toString())>0); + } + } finally { + System.setOut(stdps); + + } } - + // Computes finished maps similar to RecoveryService... - private long computeFinishedMaps(JobInfo jobInfo, - int numMaps, int numSuccessfulMaps) { + private long computeFinishedMaps(JobInfo jobInfo, int numMaps, + int numSuccessfulMaps) { if (numMaps == numSuccessfulMaps) { return jobInfo.getFinishedMaps(); } - + long numFinishedMaps = 0; - Map taskInfos = - jobInfo.getAllTasks(); + Map taskInfos = jobInfo + .getAllTasks(); for (TaskInfo taskInfo : taskInfos.values()) { if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) { ++numFinishedMaps; @@ -341,7 +364,7 @@ public class TestJobHistoryParsing { return numFinishedMaps; } - @Test + @Test (timeout=50000) public void testHistoryParsingForFailedAttempts() throws Exception { LOG.info("STARTING testHistoryParsingForFailedAttempts"); try { @@ -468,7 +491,7 @@ public class TestJobHistoryParsing { } } - @Test + @Test (timeout=50000) public void testScanningOldDirs() throws Exception { LOG.info("STARTING testScanningOldDirs"); try {