svn merge -c 1458906 FIXES: MAPREDUCE-4972. Coverage fixing for org.apache.hadoop.mapreduce.jobhistory (Aleksey Gorshkov via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1458915 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
aa874f3952
commit
05fabc2013
|
@ -612,6 +612,9 @@ Release 0.23.7 - UNRELEASED
|
||||||
MAPREDUCE-5027. Shuffle does not limit number of outstanding connections
|
MAPREDUCE-5027. Shuffle does not limit number of outstanding connections
|
||||||
(Robert Parker via jeagles)
|
(Robert Parker via jeagles)
|
||||||
|
|
||||||
|
MAPREDUCE-4972. Coverage fixing for org.apache.hadoop.mapreduce.jobhistory
|
||||||
|
(Aleksey Gorshkov via bobby)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
MAPREDUCE-4946. Fix a performance problem for large jobs by reducing the
|
MAPREDUCE-4946. Fix a performance problem for large jobs by reducing the
|
||||||
|
|
|
@ -0,0 +1,397 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapreduce.jobhistory;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.DataInputStream;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import static junit.framework.Assert.*;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.mapred.JobPriority;
|
||||||
|
import org.apache.hadoop.mapreduce.Counters;
|
||||||
|
import org.apache.hadoop.mapreduce.JobID;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskID;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskType;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestEvents {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* test a getters of TaskAttemptFinishedEvent and TaskAttemptFinished
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testTaskAttemptFinishedEvent() throws Exception {
|
||||||
|
|
||||||
|
JobID jid = new JobID("001", 1);
|
||||||
|
TaskID tid = new TaskID(jid, TaskType.REDUCE, 2);
|
||||||
|
TaskAttemptID taskAttemptId = new TaskAttemptID(tid, 3);
|
||||||
|
Counters counters = new Counters();
|
||||||
|
TaskAttemptFinishedEvent test = new TaskAttemptFinishedEvent(taskAttemptId,
|
||||||
|
TaskType.REDUCE, "TEST", 123L, "RAKNAME", "HOSTNAME", "STATUS",
|
||||||
|
counters);
|
||||||
|
assertEquals(test.getAttemptId().toString(), taskAttemptId.toString());
|
||||||
|
|
||||||
|
assertEquals(test.getCounters(), counters);
|
||||||
|
assertEquals(test.getFinishTime(), 123L);
|
||||||
|
assertEquals(test.getHostname(), "HOSTNAME");
|
||||||
|
assertEquals(test.getRackName(), "RAKNAME");
|
||||||
|
assertEquals(test.getState(), "STATUS");
|
||||||
|
assertEquals(test.getTaskId(), tid);
|
||||||
|
assertEquals(test.getTaskStatus(), "TEST");
|
||||||
|
assertEquals(test.getTaskType(), TaskType.REDUCE);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* simple test JobPriorityChangeEvent and JobPriorityChange
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testJobPriorityChange() throws Exception {
|
||||||
|
org.apache.hadoop.mapreduce.JobID jid = new JobID("001", 1);
|
||||||
|
JobPriorityChangeEvent test = new JobPriorityChangeEvent(jid,
|
||||||
|
JobPriority.LOW);
|
||||||
|
assertEquals(test.getJobId().toString(), jid.toString());
|
||||||
|
assertEquals(test.getPriority(), JobPriority.LOW);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* simple test TaskUpdatedEvent and TaskUpdated
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testTaskUpdated() throws Exception {
|
||||||
|
JobID jid = new JobID("001", 1);
|
||||||
|
TaskID tid = new TaskID(jid, TaskType.REDUCE, 2);
|
||||||
|
TaskUpdatedEvent test = new TaskUpdatedEvent(tid, 1234L);
|
||||||
|
assertEquals(test.getTaskId().toString(), tid.toString());
|
||||||
|
assertEquals(test.getFinishTime(), 1234L);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* test EventReader EventReader should read the list of events and return
|
||||||
|
* instance of HistoryEvent Different HistoryEvent should have a different
|
||||||
|
* datum.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testEvents() throws Exception {
|
||||||
|
|
||||||
|
EventReader reader = new EventReader(new DataInputStream(
|
||||||
|
new ByteArrayInputStream(getEvents())));
|
||||||
|
HistoryEvent e = reader.getNextEvent();
|
||||||
|
assertTrue(e.getEventType().equals(EventType.JOB_PRIORITY_CHANGED));
|
||||||
|
assertEquals("ID", ((JobPriorityChange) e.getDatum()).jobid.toString());
|
||||||
|
|
||||||
|
e = reader.getNextEvent();
|
||||||
|
assertTrue(e.getEventType().equals(EventType.JOB_STATUS_CHANGED));
|
||||||
|
assertEquals("ID", ((JobStatusChanged) e.getDatum()).jobid.toString());
|
||||||
|
|
||||||
|
e = reader.getNextEvent();
|
||||||
|
assertTrue(e.getEventType().equals(EventType.TASK_UPDATED));
|
||||||
|
assertEquals("ID", ((TaskUpdated) e.getDatum()).taskid.toString());
|
||||||
|
|
||||||
|
e = reader.getNextEvent();
|
||||||
|
assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_KILLED));
|
||||||
|
assertEquals("task_1_2_r03_4",
|
||||||
|
((TaskAttemptUnsuccessfulCompletion) e.getDatum()).taskid.toString());
|
||||||
|
|
||||||
|
e = reader.getNextEvent();
|
||||||
|
assertTrue(e.getEventType().equals(EventType.JOB_KILLED));
|
||||||
|
assertEquals("ID",
|
||||||
|
((JobUnsuccessfulCompletion) e.getDatum()).jobid.toString());
|
||||||
|
|
||||||
|
e = reader.getNextEvent();
|
||||||
|
assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_STARTED));
|
||||||
|
assertEquals("task_1_2_r03_4",
|
||||||
|
((TaskAttemptStarted) e.getDatum()).taskid.toString());
|
||||||
|
|
||||||
|
e = reader.getNextEvent();
|
||||||
|
assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_FINISHED));
|
||||||
|
assertEquals("task_1_2_r03_4",
|
||||||
|
((TaskAttemptFinished) e.getDatum()).taskid.toString());
|
||||||
|
|
||||||
|
e = reader.getNextEvent();
|
||||||
|
assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_KILLED));
|
||||||
|
assertEquals("task_1_2_r03_4",
|
||||||
|
((TaskAttemptUnsuccessfulCompletion) e.getDatum()).taskid.toString());
|
||||||
|
|
||||||
|
e = reader.getNextEvent();
|
||||||
|
assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_KILLED));
|
||||||
|
assertEquals("task_1_2_r03_4",
|
||||||
|
((TaskAttemptUnsuccessfulCompletion) e.getDatum()).taskid.toString());
|
||||||
|
|
||||||
|
e = reader.getNextEvent();
|
||||||
|
assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_STARTED));
|
||||||
|
assertEquals("task_1_2_r03_4",
|
||||||
|
((TaskAttemptStarted) e.getDatum()).taskid.toString());
|
||||||
|
|
||||||
|
e = reader.getNextEvent();
|
||||||
|
assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_FINISHED));
|
||||||
|
assertEquals("task_1_2_r03_4",
|
||||||
|
((TaskAttemptFinished) e.getDatum()).taskid.toString());
|
||||||
|
|
||||||
|
e = reader.getNextEvent();
|
||||||
|
assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_KILLED));
|
||||||
|
assertEquals("task_1_2_r03_4",
|
||||||
|
((TaskAttemptUnsuccessfulCompletion) e.getDatum()).taskid.toString());
|
||||||
|
|
||||||
|
e = reader.getNextEvent();
|
||||||
|
assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_KILLED));
|
||||||
|
assertEquals("task_1_2_r03_4",
|
||||||
|
((TaskAttemptUnsuccessfulCompletion) e.getDatum()).taskid.toString());
|
||||||
|
|
||||||
|
reader.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* makes array of bytes with History events
|
||||||
|
*/
|
||||||
|
private byte[] getEvents() throws Exception {
|
||||||
|
ByteArrayOutputStream output = new ByteArrayOutputStream();
|
||||||
|
FSDataOutputStream fsOutput = new FSDataOutputStream(output,
|
||||||
|
new FileSystem.Statistics("scheme"));
|
||||||
|
EventWriter writer = new EventWriter(fsOutput);
|
||||||
|
writer.write(getJobPriorityChangedEvent());
|
||||||
|
writer.write(getJobStatusChangedEvent());
|
||||||
|
writer.write(getTaskUpdatedEvent());
|
||||||
|
writer.write(getReduceAttemptKilledEvent());
|
||||||
|
writer.write(getJobKilledEvent());
|
||||||
|
writer.write(getSetupAttemptStartedEvent());
|
||||||
|
writer.write(getTaskAttemptFinishedEvent());
|
||||||
|
writer.write(getSetupAttemptFieledEvent());
|
||||||
|
writer.write(getSetupAttemptKilledEvent());
|
||||||
|
writer.write(getCleanupAttemptStartedEvent());
|
||||||
|
writer.write(getCleanupAttemptFinishedEvent());
|
||||||
|
writer.write(getCleanupAttemptFiledEvent());
|
||||||
|
writer.write(getCleanupAttemptKilledEvent());
|
||||||
|
|
||||||
|
writer.flush();
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
return output.toByteArray();
|
||||||
|
}
|
||||||
|
|
||||||
|
private FakeEvent getCleanupAttemptKilledEvent() {
|
||||||
|
FakeEvent result = new FakeEvent(EventType.CLEANUP_ATTEMPT_KILLED);
|
||||||
|
|
||||||
|
result.setDatum(getTaskAttemptUnsuccessfulCompletion());
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private FakeEvent getCleanupAttemptFiledEvent() {
|
||||||
|
FakeEvent result = new FakeEvent(EventType.CLEANUP_ATTEMPT_FAILED);
|
||||||
|
|
||||||
|
result.setDatum(getTaskAttemptUnsuccessfulCompletion());
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private TaskAttemptUnsuccessfulCompletion getTaskAttemptUnsuccessfulCompletion() {
|
||||||
|
TaskAttemptUnsuccessfulCompletion datum = new TaskAttemptUnsuccessfulCompletion();
|
||||||
|
datum.attemptId = "attempt_1_2_r3_4_5";
|
||||||
|
datum.clockSplits = Arrays.asList(1, 2, 3);
|
||||||
|
datum.cpuUsages = Arrays.asList(100, 200, 300);
|
||||||
|
datum.error = "Error";
|
||||||
|
datum.finishTime = 2;
|
||||||
|
datum.hostname = "hostname";
|
||||||
|
datum.rackname = "rackname";
|
||||||
|
datum.physMemKbytes = Arrays.asList(1000, 2000, 3000);
|
||||||
|
datum.taskid = "task_1_2_r03_4";
|
||||||
|
datum.port = 1000;
|
||||||
|
datum.taskType = "REDUCE";
|
||||||
|
datum.status = "STATUS";
|
||||||
|
datum.counters = getCounters();
|
||||||
|
datum.vMemKbytes = Arrays.asList(1000, 2000, 3000);
|
||||||
|
return datum;
|
||||||
|
}
|
||||||
|
|
||||||
|
private JhCounters getCounters() {
|
||||||
|
JhCounters counters = new JhCounters();
|
||||||
|
counters.groups = new ArrayList<JhCounterGroup>(0);
|
||||||
|
counters.name = "name";
|
||||||
|
return counters;
|
||||||
|
}
|
||||||
|
|
||||||
|
private FakeEvent getCleanupAttemptFinishedEvent() {
|
||||||
|
FakeEvent result = new FakeEvent(EventType.CLEANUP_ATTEMPT_FINISHED);
|
||||||
|
TaskAttemptFinished datum = new TaskAttemptFinished();
|
||||||
|
datum.attemptId = "attempt_1_2_r3_4_5";
|
||||||
|
|
||||||
|
datum.counters = getCounters();
|
||||||
|
datum.finishTime = 2;
|
||||||
|
datum.hostname = "hostname";
|
||||||
|
datum.rackname = "rackName";
|
||||||
|
datum.state = "state";
|
||||||
|
datum.taskid = "task_1_2_r03_4";
|
||||||
|
datum.taskStatus = "taskStatus";
|
||||||
|
datum.taskType = "REDUCE";
|
||||||
|
result.setDatum(datum);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private FakeEvent getCleanupAttemptStartedEvent() {
|
||||||
|
FakeEvent result = new FakeEvent(EventType.CLEANUP_ATTEMPT_STARTED);
|
||||||
|
TaskAttemptStarted datum = new TaskAttemptStarted();
|
||||||
|
|
||||||
|
datum.attemptId = "attempt_1_2_r3_4_5";
|
||||||
|
datum.avataar = "avatar";
|
||||||
|
datum.containerId = "containerId";
|
||||||
|
datum.httpPort = 10000;
|
||||||
|
datum.locality = "locality";
|
||||||
|
datum.shufflePort = 10001;
|
||||||
|
datum.startTime = 1;
|
||||||
|
datum.taskid = "task_1_2_r03_4";
|
||||||
|
datum.taskType = "taskType";
|
||||||
|
datum.trackerName = "trackerName";
|
||||||
|
result.setDatum(datum);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private FakeEvent getSetupAttemptKilledEvent() {
|
||||||
|
FakeEvent result = new FakeEvent(EventType.SETUP_ATTEMPT_KILLED);
|
||||||
|
result.setDatum(getTaskAttemptUnsuccessfulCompletion());
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private FakeEvent getSetupAttemptFieledEvent() {
|
||||||
|
FakeEvent result = new FakeEvent(EventType.SETUP_ATTEMPT_FAILED);
|
||||||
|
|
||||||
|
result.setDatum(getTaskAttemptUnsuccessfulCompletion());
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private FakeEvent getTaskAttemptFinishedEvent() {
|
||||||
|
FakeEvent result = new FakeEvent(EventType.SETUP_ATTEMPT_FINISHED);
|
||||||
|
TaskAttemptFinished datum = new TaskAttemptFinished();
|
||||||
|
|
||||||
|
datum.attemptId = "attempt_1_2_r3_4_5";
|
||||||
|
datum.counters = getCounters();
|
||||||
|
datum.finishTime = 2;
|
||||||
|
datum.hostname = "hostname";
|
||||||
|
datum.rackname = "rackname";
|
||||||
|
datum.state = "state";
|
||||||
|
datum.taskid = "task_1_2_r03_4";
|
||||||
|
datum.taskStatus = "taskStatus";
|
||||||
|
datum.taskType = "REDUCE";
|
||||||
|
result.setDatum(datum);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private FakeEvent getSetupAttemptStartedEvent() {
|
||||||
|
FakeEvent result = new FakeEvent(EventType.SETUP_ATTEMPT_STARTED);
|
||||||
|
TaskAttemptStarted datum = new TaskAttemptStarted();
|
||||||
|
datum.attemptId = "ID";
|
||||||
|
datum.avataar = "avataar";
|
||||||
|
datum.containerId = "containerId";
|
||||||
|
datum.httpPort = 10000;
|
||||||
|
datum.locality = "locality";
|
||||||
|
datum.shufflePort = 10001;
|
||||||
|
datum.startTime = 1;
|
||||||
|
datum.taskid = "task_1_2_r03_4";
|
||||||
|
datum.taskType = "taskType";
|
||||||
|
datum.trackerName = "trackerName";
|
||||||
|
result.setDatum(datum);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private FakeEvent getJobKilledEvent() {
|
||||||
|
FakeEvent result = new FakeEvent(EventType.JOB_KILLED);
|
||||||
|
JobUnsuccessfulCompletion datum = new JobUnsuccessfulCompletion();
|
||||||
|
datum.finishedMaps = 1;
|
||||||
|
datum.finishedReduces = 2;
|
||||||
|
datum.finishTime = 3;
|
||||||
|
datum.jobid = "ID";
|
||||||
|
datum.jobStatus = "STATUS";
|
||||||
|
result.setDatum(datum);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private FakeEvent getReduceAttemptKilledEvent() {
|
||||||
|
FakeEvent result = new FakeEvent(EventType.REDUCE_ATTEMPT_KILLED);
|
||||||
|
|
||||||
|
result.setDatum(getTaskAttemptUnsuccessfulCompletion());
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private FakeEvent getJobPriorityChangedEvent() {
|
||||||
|
FakeEvent result = new FakeEvent(EventType.JOB_PRIORITY_CHANGED);
|
||||||
|
JobPriorityChange datum = new JobPriorityChange();
|
||||||
|
datum.jobid = "ID";
|
||||||
|
datum.priority = "priority";
|
||||||
|
result.setDatum(datum);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private FakeEvent getJobStatusChangedEvent() {
|
||||||
|
FakeEvent result = new FakeEvent(EventType.JOB_STATUS_CHANGED);
|
||||||
|
JobStatusChanged datum = new JobStatusChanged();
|
||||||
|
datum.jobid = "ID";
|
||||||
|
datum.jobStatus = "newStatus";
|
||||||
|
result.setDatum(datum);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private FakeEvent getTaskUpdatedEvent() {
|
||||||
|
FakeEvent result = new FakeEvent(EventType.TASK_UPDATED);
|
||||||
|
TaskUpdated datum = new TaskUpdated();
|
||||||
|
datum.finishTime = 2;
|
||||||
|
datum.taskid = "ID";
|
||||||
|
result.setDatum(datum);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private class FakeEvent implements HistoryEvent {
|
||||||
|
private EventType eventType;
|
||||||
|
private Object datum;
|
||||||
|
|
||||||
|
public FakeEvent(EventType eventType) {
|
||||||
|
this.eventType = eventType;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EventType getEventType() {
|
||||||
|
return eventType;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object getDatum() {
|
||||||
|
|
||||||
|
return datum;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setDatum(Object datum) {
|
||||||
|
this.datum = datum;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileContext;
|
import org.apache.hadoop.fs.FileContext;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapreduce.Counters;
|
import org.apache.hadoop.mapreduce.Counters;
|
||||||
|
import org.apache.hadoop.mapreduce.JobID;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.TaskID;
|
import org.apache.hadoop.mapreduce.TaskID;
|
||||||
import org.apache.hadoop.mapreduce.TaskType;
|
import org.apache.hadoop.mapreduce.TaskType;
|
||||||
|
@ -53,7 +54,7 @@ public class TestJobHistoryEventHandler {
|
||||||
private static final Log LOG = LogFactory
|
private static final Log LOG = LogFactory
|
||||||
.getLog(TestJobHistoryEventHandler.class);
|
.getLog(TestJobHistoryEventHandler.class);
|
||||||
|
|
||||||
@Test
|
@Test (timeout=50000)
|
||||||
public void testFirstFlushOnCompletionEvent() throws Exception {
|
public void testFirstFlushOnCompletionEvent() throws Exception {
|
||||||
TestParams t = new TestParams();
|
TestParams t = new TestParams();
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
@ -96,7 +97,7 @@ public class TestJobHistoryEventHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout=50000)
|
||||||
public void testMaxUnflushedCompletionEvents() throws Exception {
|
public void testMaxUnflushedCompletionEvents() throws Exception {
|
||||||
TestParams t = new TestParams();
|
TestParams t = new TestParams();
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
@ -131,17 +132,17 @@ public class TestJobHistoryEventHandler {
|
||||||
|
|
||||||
handleNextNEvents(jheh, 1);
|
handleNextNEvents(jheh, 1);
|
||||||
verify(mockWriter).flush();
|
verify(mockWriter).flush();
|
||||||
|
|
||||||
handleNextNEvents(jheh, 50);
|
handleNextNEvents(jheh, 50);
|
||||||
verify(mockWriter, times(6)).flush();
|
verify(mockWriter, times(6)).flush();
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
jheh.stop();
|
jheh.stop();
|
||||||
verify(mockWriter).close();
|
verify(mockWriter).close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout=50000)
|
||||||
public void testUnflushedTimer() throws Exception {
|
public void testUnflushedTimer() throws Exception {
|
||||||
TestParams t = new TestParams();
|
TestParams t = new TestParams();
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
@ -181,8 +182,8 @@ public class TestJobHistoryEventHandler {
|
||||||
verify(mockWriter).close();
|
verify(mockWriter).close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout=50000)
|
||||||
public void testBatchedFlushJobEndMultiplier() throws Exception {
|
public void testBatchedFlushJobEndMultiplier() throws Exception {
|
||||||
TestParams t = new TestParams();
|
TestParams t = new TestParams();
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
@ -265,7 +266,7 @@ public class TestJobHistoryEventHandler {
|
||||||
when(mockContext.getApplicationID()).thenReturn(appId);
|
when(mockContext.getApplicationID()).thenReturn(appId);
|
||||||
return mockContext;
|
return mockContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private class TestParams {
|
private class TestParams {
|
||||||
String workDir = setupTestWorkDir();
|
String workDir = setupTestWorkDir();
|
||||||
|
@ -279,12 +280,8 @@ public class TestJobHistoryEventHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
private JobHistoryEvent getEventToEnqueue(JobId jobId) {
|
private JobHistoryEvent getEventToEnqueue(JobId jobId) {
|
||||||
JobHistoryEvent toReturn = Mockito.mock(JobHistoryEvent.class);
|
HistoryEvent toReturn = new JobStatusChangedEvent(new JobID(Integer.toString(jobId.getId()), jobId.getId()), "change status");
|
||||||
HistoryEvent he = Mockito.mock(HistoryEvent.class);
|
return new JobHistoryEvent(jobId, toReturn);
|
||||||
Mockito.when(he.getEventType()).thenReturn(EventType.JOB_STATUS_CHANGED);
|
|
||||||
Mockito.when(toReturn.getHistoryEvent()).thenReturn(he);
|
|
||||||
Mockito.when(toReturn.getJobID()).thenReturn(jobId);
|
|
||||||
return toReturn;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -344,8 +341,6 @@ public class TestJobHistoryEventHandler {
|
||||||
class JHEvenHandlerForTest extends JobHistoryEventHandler {
|
class JHEvenHandlerForTest extends JobHistoryEventHandler {
|
||||||
|
|
||||||
private EventWriter eventWriter;
|
private EventWriter eventWriter;
|
||||||
volatile int handleEventCompleteCalls = 0;
|
|
||||||
volatile int handleEventStartedCalls = 0;
|
|
||||||
|
|
||||||
public JHEvenHandlerForTest(AppContext context, int startCount) {
|
public JHEvenHandlerForTest(AppContext context, int startCount) {
|
||||||
super(context, startCount);
|
super(context, startCount);
|
||||||
|
@ -354,7 +349,7 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
|
||||||
@Override
|
@Override
|
||||||
public void start() {
|
public void start() {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected EventWriter createEventWriter(Path historyFilePath)
|
protected EventWriter createEventWriter(Path historyFilePath)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -365,7 +360,7 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
|
||||||
@Override
|
@Override
|
||||||
protected void closeEventWriter(JobId jobId) {
|
protected void closeEventWriter(JobId jobId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
public EventWriter getEventWriter() {
|
public EventWriter getEventWriter() {
|
||||||
return this.eventWriter;
|
return this.eventWriter;
|
||||||
}
|
}
|
||||||
|
@ -375,13 +370,12 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
|
||||||
* Class to help with testSigTermedFunctionality
|
* Class to help with testSigTermedFunctionality
|
||||||
*/
|
*/
|
||||||
class JHEventHandlerForSigtermTest extends JobHistoryEventHandler {
|
class JHEventHandlerForSigtermTest extends JobHistoryEventHandler {
|
||||||
private MetaInfo metaInfo;
|
|
||||||
public JHEventHandlerForSigtermTest(AppContext context, int startCount) {
|
public JHEventHandlerForSigtermTest(AppContext context, int startCount) {
|
||||||
super(context, startCount);
|
super(context, startCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addToFileMap(JobId jobId) {
|
public void addToFileMap(JobId jobId) {
|
||||||
metaInfo = Mockito.mock(MetaInfo.class);
|
MetaInfo metaInfo = Mockito.mock(MetaInfo.class);
|
||||||
Mockito.when(metaInfo.isWriterActive()).thenReturn(true);
|
Mockito.when(metaInfo.isWriterActive()).thenReturn(true);
|
||||||
fileMap.put(jobId, metaInfo);
|
fileMap.put(jobId, metaInfo);
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,7 +79,7 @@ public class TestJobHistoryEntities {
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Verify some expected values based on the history file */
|
/* Verify some expected values based on the history file */
|
||||||
@Test
|
@Test (timeout=10000)
|
||||||
public void testCompletedJob() throws Exception {
|
public void testCompletedJob() throws Exception {
|
||||||
HistoryFileInfo info = mock(HistoryFileInfo.class);
|
HistoryFileInfo info = mock(HistoryFileInfo.class);
|
||||||
when(info.getConfFile()).thenReturn(fullConfPath);
|
when(info.getConfFile()).thenReturn(fullConfPath);
|
||||||
|
@ -104,7 +104,7 @@ public class TestJobHistoryEntities {
|
||||||
assertEquals(JobState.SUCCEEDED, jobReport.getJobState());
|
assertEquals(JobState.SUCCEEDED, jobReport.getJobState());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout=10000)
|
||||||
public void testCompletedTask() throws Exception {
|
public void testCompletedTask() throws Exception {
|
||||||
HistoryFileInfo info = mock(HistoryFileInfo.class);
|
HistoryFileInfo info = mock(HistoryFileInfo.class);
|
||||||
when(info.getConfFile()).thenReturn(fullConfPath);
|
when(info.getConfFile()).thenReturn(fullConfPath);
|
||||||
|
@ -133,7 +133,7 @@ public class TestJobHistoryEntities {
|
||||||
assertEquals(rt1Id, rt1Report.getTaskId());
|
assertEquals(rt1Id, rt1Report.getTaskId());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout=10000)
|
||||||
public void testCompletedTaskAttempt() throws Exception {
|
public void testCompletedTaskAttempt() throws Exception {
|
||||||
HistoryFileInfo info = mock(HistoryFileInfo.class);
|
HistoryFileInfo info = mock(HistoryFileInfo.class);
|
||||||
when(info.getConfFile()).thenReturn(fullConfPath);
|
when(info.getConfFile()).thenReturn(fullConfPath);
|
||||||
|
|
|
@ -25,7 +25,6 @@ import junit.framework.Assert;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
||||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
|
||||||
|
@ -67,8 +66,17 @@ public class TestJobHistoryEvents {
|
||||||
* completed maps
|
* completed maps
|
||||||
*/
|
*/
|
||||||
HistoryContext context = new JobHistory();
|
HistoryContext context = new JobHistory();
|
||||||
|
// test start and stop states
|
||||||
((JobHistory)context).init(conf);
|
((JobHistory)context).init(conf);
|
||||||
Job parsedJob = context.getJob(jobId);
|
((JobHistory)context).start();
|
||||||
|
Assert.assertTrue( context.getStartTime()>0);
|
||||||
|
Assert.assertEquals(((JobHistory)context).getServiceState(),Service.STATE.STARTED);
|
||||||
|
|
||||||
|
|
||||||
|
((JobHistory)context).stop();
|
||||||
|
Assert.assertEquals(((JobHistory)context).getServiceState(),Service.STATE.STOPPED);
|
||||||
|
Job parsedJob = context.getJob(jobId);
|
||||||
|
|
||||||
Assert.assertEquals("CompletedMaps not correct", 2,
|
Assert.assertEquals("CompletedMaps not correct", 2,
|
||||||
parsedJob.getCompletedMaps());
|
parsedJob.getCompletedMaps());
|
||||||
Assert.assertEquals(System.getProperty("user.name"), parsedJob.getUserName());
|
Assert.assertEquals(System.getProperty("user.name"), parsedJob.getUserName());
|
||||||
|
@ -177,9 +185,8 @@ public class TestJobHistoryEvents {
|
||||||
@Override
|
@Override
|
||||||
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
|
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
|
||||||
AppContext context) {
|
AppContext context) {
|
||||||
JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(
|
return new JobHistoryEventHandler(
|
||||||
context, getStartCount());
|
context, getStartCount());
|
||||||
return eventHandler;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,9 @@
|
||||||
|
|
||||||
package org.apache.hadoop.mapreduce.v2.hs;
|
package org.apache.hadoop.mapreduce.v2.hs;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.PrintStream;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -40,6 +42,7 @@ import org.apache.hadoop.mapreduce.TaskID;
|
||||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.EventReader;
|
import org.apache.hadoop.mapreduce.jobhistory.EventReader;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
|
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
|
||||||
|
import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
|
||||||
|
@ -60,7 +63,6 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||||
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
|
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
|
||||||
import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
|
import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
|
||||||
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
|
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
|
||||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
|
||||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
||||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
|
||||||
import org.apache.hadoop.net.DNSToSwitchMapping;
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
||||||
|
@ -78,6 +80,8 @@ public class TestJobHistoryParsing {
|
||||||
|
|
||||||
private static final String RACK_NAME = "/MyRackName";
|
private static final String RACK_NAME = "/MyRackName";
|
||||||
|
|
||||||
|
private ByteArrayOutputStream outContent = new ByteArrayOutputStream();
|
||||||
|
|
||||||
public static class MyResolver implements DNSToSwitchMapping {
|
public static class MyResolver implements DNSToSwitchMapping {
|
||||||
@Override
|
@Override
|
||||||
public List<String> resolve(List<String> names) {
|
public List<String> resolve(List<String> names) {
|
||||||
|
@ -89,14 +93,14 @@ public class TestJobHistoryParsing {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout=50000)
|
||||||
public void testJobInfo() throws Exception {
|
public void testJobInfo() throws Exception {
|
||||||
JobInfo info = new JobInfo();
|
JobInfo info = new JobInfo();
|
||||||
Assert.assertEquals("NORMAL", info.getPriority());
|
Assert.assertEquals("NORMAL", info.getPriority());
|
||||||
info.printAll();
|
info.printAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout=50000)
|
||||||
public void testHistoryParsing() throws Exception {
|
public void testHistoryParsing() throws Exception {
|
||||||
LOG.info("STARTING testHistoryParsing()");
|
LOG.info("STARTING testHistoryParsing()");
|
||||||
try {
|
try {
|
||||||
|
@ -106,7 +110,7 @@ public class TestJobHistoryParsing {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout=50000)
|
||||||
public void testHistoryParsingWithParseErrors() throws Exception {
|
public void testHistoryParsingWithParseErrors() throws Exception {
|
||||||
LOG.info("STARTING testHistoryParsingWithParseErrors()");
|
LOG.info("STARTING testHistoryParsingWithParseErrors()");
|
||||||
try {
|
try {
|
||||||
|
@ -321,18 +325,37 @@ public class TestJobHistoryParsing {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// test output for HistoryViewer
|
||||||
|
PrintStream stdps=System.out;
|
||||||
|
try {
|
||||||
|
System.setOut(new PrintStream(outContent));
|
||||||
|
HistoryViewer viewer = new HistoryViewer(fc.makeQualified(
|
||||||
|
fileInfo.getHistoryFile()).toString(), conf, true);
|
||||||
|
viewer.print();
|
||||||
|
|
||||||
|
for (TaskInfo taskInfo : allTasks.values()) {
|
||||||
|
|
||||||
|
String test= (taskInfo.getTaskStatus()==null?"":taskInfo.getTaskStatus())+" "+taskInfo.getTaskType()+" task list for "+taskInfo.getTaskId().getJobID();
|
||||||
|
Assert.assertTrue(outContent.toString().indexOf(test)>0);
|
||||||
|
Assert.assertTrue(outContent.toString().indexOf(taskInfo.getTaskId().toString())>0);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
System.setOut(stdps);
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Computes finished maps similar to RecoveryService...
|
// Computes finished maps similar to RecoveryService...
|
||||||
private long computeFinishedMaps(JobInfo jobInfo,
|
private long computeFinishedMaps(JobInfo jobInfo, int numMaps,
|
||||||
int numMaps, int numSuccessfulMaps) {
|
int numSuccessfulMaps) {
|
||||||
if (numMaps == numSuccessfulMaps) {
|
if (numMaps == numSuccessfulMaps) {
|
||||||
return jobInfo.getFinishedMaps();
|
return jobInfo.getFinishedMaps();
|
||||||
}
|
}
|
||||||
|
|
||||||
long numFinishedMaps = 0;
|
long numFinishedMaps = 0;
|
||||||
Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos =
|
Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
|
||||||
jobInfo.getAllTasks();
|
.getAllTasks();
|
||||||
for (TaskInfo taskInfo : taskInfos.values()) {
|
for (TaskInfo taskInfo : taskInfos.values()) {
|
||||||
if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
|
if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
|
||||||
++numFinishedMaps;
|
++numFinishedMaps;
|
||||||
|
@ -341,7 +364,7 @@ public class TestJobHistoryParsing {
|
||||||
return numFinishedMaps;
|
return numFinishedMaps;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout=50000)
|
||||||
public void testHistoryParsingForFailedAttempts() throws Exception {
|
public void testHistoryParsingForFailedAttempts() throws Exception {
|
||||||
LOG.info("STARTING testHistoryParsingForFailedAttempts");
|
LOG.info("STARTING testHistoryParsingForFailedAttempts");
|
||||||
try {
|
try {
|
||||||
|
@ -468,7 +491,7 @@ public class TestJobHistoryParsing {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (timeout=50000)
|
||||||
public void testScanningOldDirs() throws Exception {
|
public void testScanningOldDirs() throws Exception {
|
||||||
LOG.info("STARTING testScanningOldDirs");
|
LOG.info("STARTING testScanningOldDirs");
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue