MAPREDUCE-5466. Changed MR AM to not promote history files of intermediate AMs in case they are exiting because of errors and thus help history-server pick up the right history file for the last successful AM. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1516238 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-08-21 18:16:47 +00:00
parent 2499a86664
commit 2d614a916c
13 changed files with 190 additions and 6 deletions

View File

@ -226,6 +226,11 @@ Release 2.1.1-beta - UNRELEASED
MAPREDUCE-5001. LocalJobRunner has race condition resulting in job
failures (Sandy Ryza via jlowe)
MAPREDUCE-5466. Changed MR AM to not promote history files of intermediate
AMs in case they are exiting because of errors and thus help history-server
pick up the right history file for the last successful AM. (Jian He via
vinodkv)
Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES

View File

@ -532,6 +532,24 @@ protected void handleEvent(JobHistoryEvent event) {
jFinishedEvent.getFinishedReduces());
mi.getJobIndexInfo().setJobStatus(JobState.SUCCEEDED.toString());
closeEventWriter(event.getJobID());
processDoneFiles(event.getJobID());
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
}
// In case of JOB_ERROR, only process all the Done files(e.g. job
// summary, job history file etc.) if it is last AM retry.
if (event.getHistoryEvent().getEventType() == EventType.JOB_ERROR) {
try {
JobUnsuccessfulCompletionEvent jucEvent =
(JobUnsuccessfulCompletionEvent) event.getHistoryEvent();
mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
closeEventWriter(event.getJobID());
if(context.isLastAMRetry())
processDoneFiles(event.getJobID());
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
@ -548,6 +566,7 @@ protected void handleEvent(JobHistoryEvent event) {
mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
closeEventWriter(event.getJobID());
processDoneFiles(event.getJobID());
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
@ -634,7 +653,6 @@ private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {
}
protected void closeEventWriter(JobId jobId) throws IOException {
final MetaInfo mi = fileMap.get(jobId);
if (mi == null) {
throw new IOException("No MetaInfo found for JobId: [" + jobId + "]");
@ -654,6 +672,14 @@ protected void closeEventWriter(JobId jobId) throws IOException {
LOG.error("Error closing writer for JobID: " + jobId);
throw e;
}
}
protected void processDoneFiles(JobId jobId) throws IOException {
final MetaInfo mi = fileMap.get(jobId);
if (mi == null) {
throw new IOException("No MetaInfo found for JobId: [" + jobId + "]");
}
if (mi.getHistoryFile() == null) {
LOG.warn("No file for job-history with " + jobId + " found in cache!");

View File

@ -61,4 +61,6 @@ public interface AppContext {
Set<String> getBlacklistedNodes();
ClientToAMTokenSecretManager getClientToAMTokenSecretManager();
boolean isLastAMRetry();
}

View File

@ -952,6 +952,11 @@ public Set<String> getBlacklistedNodes() {
public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
return clientToAMTokenSecretManager;
}
@Override
public boolean isLastAMRetry(){
return isLastAMRetry;
}
}
@SuppressWarnings("unchecked")

View File

@ -36,9 +36,12 @@
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@ -223,6 +226,7 @@ protected void serviceStop() throws Exception {
protected void startAllocatorThread() {
allocatorThread = new Thread(new Runnable() {
@SuppressWarnings("unchecked")
@Override
public void run() {
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
@ -233,6 +237,15 @@ public void run() {
} catch (YarnRuntimeException e) {
LOG.error("Error communicating with RM: " + e.getMessage() , e);
return;
} catch (InvalidToken e) {
// This can happen if the RM has been restarted, since currently
// when RM restarts AMRMToken is not populated back to
// AMRMTokenSecretManager yet. Once this is fixed, no need
// to send JOB_AM_REBOOT event in this method any more.
eventHandler.handle(new JobEvent(job.getID(),
JobEventType.JOB_AM_REBOOT));
LOG.error("Error in authencating with RM: " ,e);
return;
} catch (Exception e) {
LOG.error("ERROR IN CONTACTING RM. ", e);
continue;

View File

@ -25,10 +25,13 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.never;
import java.io.File;
import java.io.IOException;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -43,6 +46,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -229,6 +233,98 @@ public void testBatchedFlushJobEndMultiplier() throws Exception {
}
}
// In case of all types of events, process Done files if it's last AM retry
@Test (timeout=50000)
public void testProcessDoneFilesOnLastAMRetry() throws Exception {
TestParams t = new TestParams(true);
Configuration conf = new Configuration();
JHEvenHandlerForTest realJheh =
new JHEvenHandlerForTest(t.mockAppContext, 0);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
EventWriter mockWriter = null;
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
verify(jheh, times(0)).processDoneFiles(any(JobId.class));
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
0, 0, JobStateInternal.ERROR.toString())));
verify(jheh, times(1)).processDoneFiles(any(JobId.class));
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
new Counters(), new Counters())));
verify(jheh, times(2)).processDoneFiles(any(JobId.class));
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
0, 0, JobStateInternal.FAILED.toString())));
verify(jheh, times(3)).processDoneFiles(any(JobId.class));
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
0, 0, JobStateInternal.KILLED.toString())));
verify(jheh, times(4)).processDoneFiles(any(JobId.class));
mockWriter = jheh.getEventWriter();
verify(mockWriter, times(5)).write(any(HistoryEvent.class));
} finally {
jheh.stop();
verify(mockWriter).close();
}
}
// Skip processing Done files in case of ERROR, if it's not last AM retry
@Test (timeout=50000)
public void testProcessDoneFilesNotLastAMRetry() throws Exception {
TestParams t = new TestParams(false);
Configuration conf = new Configuration();
JHEvenHandlerForTest realJheh =
new JHEvenHandlerForTest(t.mockAppContext, 0);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
EventWriter mockWriter = null;
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
verify(jheh, times(0)).processDoneFiles(t.jobId);
// skip processing done files
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
0, 0, JobStateInternal.ERROR.toString())));
verify(jheh, times(0)).processDoneFiles(t.jobId);
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
new Counters(), new Counters())));
verify(jheh, times(1)).processDoneFiles(t.jobId);
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
0, 0, JobStateInternal.FAILED.toString())));
verify(jheh, times(2)).processDoneFiles(t.jobId);
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), 0,
0, 0, JobStateInternal.KILLED.toString())));
verify(jheh, times(3)).processDoneFiles(t.jobId);
mockWriter = jheh.getEventWriter();
verify(mockWriter, times(5)).write(any(HistoryEvent.class));
} finally {
jheh.stop();
verify(mockWriter).close();
}
}
private void queueEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) {
jheh.handle(event);
}
@ -258,20 +354,23 @@ private String setupTestWorkDir() {
}
}
private AppContext mockAppContext(ApplicationId appId) {
private AppContext mockAppContext(ApplicationId appId, boolean isLastAMRetry) {
JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appId));
AppContext mockContext = mock(AppContext.class);
Job mockJob = mock(Job.class);
when(mockJob.getAllCounters()).thenReturn(new Counters());
when(mockJob.getTotalMaps()).thenReturn(10);
when(mockJob.getTotalReduces()).thenReturn(10);
when(mockJob.getName()).thenReturn("mockjob");
when(mockContext.getJob(jobId)).thenReturn(mockJob);
when(mockContext.getApplicationID()).thenReturn(appId);
when(mockContext.isLastAMRetry()).thenReturn(isLastAMRetry);
return mockContext;
}
private class TestParams {
boolean isLastAMRetry;
String workDir = setupTestWorkDir();
ApplicationId appId = ApplicationId.newInstance(200, 1);
ApplicationAttemptId appAttemptId =
@ -279,7 +378,15 @@ private class TestParams {
ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
TaskID taskID = TaskID.forName("task_200707121733_0003_m_000005");
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
AppContext mockAppContext = mockAppContext(appId);
AppContext mockAppContext;
public TestParams() {
this(false);
}
public TestParams(boolean isLastAMRetry) {
this.isLastAMRetry = isLastAMRetry;
mockAppContext = mockAppContext(appId, this.isLastAMRetry);
}
}
private JobHistoryEvent getEventToEnqueue(JobId jobId) {
@ -344,7 +451,6 @@ public void testSigTermedFunctionality() throws IOException {
class JHEvenHandlerForTest extends JobHistoryEventHandler {
private EventWriter eventWriter;
public JHEvenHandlerForTest(AppContext context, int startCount) {
super(context, startCount);
}
@ -367,6 +473,11 @@ protected void closeEventWriter(JobId jobId) {
public EventWriter getEventWriter() {
return this.eventWriter;
}
@Override
protected void processDoneFiles(JobId jobId){
// do nothing
}
}
/**

View File

@ -130,4 +130,9 @@ public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
// Not implemented
return null;
}
@Override
public boolean isLastAMRetry() {
return false;
}
}

View File

@ -862,5 +862,10 @@ public Set<String> getBlacklistedNodes() {
public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
return null;
}
@Override
public boolean isLastAMRetry() {
return false;
}
}
}

View File

@ -269,6 +269,7 @@
"JOB_STATUS_CHANGED",
"JOB_FAILED",
"JOB_KILLED",
"JOB_ERROR",
"JOB_INFO_CHANGED",
"TASK_STARTED",
"TASK_FINISHED",

View File

@ -104,6 +104,8 @@ public HistoryEvent getNextEvent() throws IOException {
result = new JobUnsuccessfulCompletionEvent(); break;
case JOB_KILLED:
result = new JobUnsuccessfulCompletionEvent(); break;
case JOB_ERROR:
result = new JobUnsuccessfulCompletionEvent(); break;
case JOB_INFO_CHANGED:
result = new JobInfoChangeEvent(); break;
case TASK_STARTED:

View File

@ -185,6 +185,7 @@ public void handleEvent(HistoryEvent event) {
break;
case JOB_FAILED:
case JOB_KILLED:
case JOB_ERROR:
handleJobFailedEvent((JobUnsuccessfulCompletionEvent) event);
break;
case JOB_FINISHED:

View File

@ -72,6 +72,8 @@ public void setDatum(Object datum) {
public EventType getEventType() {
if ("FAILED".equals(getStatus())) {
return EventType.JOB_FAILED;
} else if ("ERROR".equals(getStatus())) {
return EventType.JOB_ERROR;
} else
return EventType.JOB_KILLED;
}

View File

@ -381,4 +381,10 @@ public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
// Not implemented.
return null;
}
@Override
public boolean isLastAMRetry() {
// bogus - Not Required
return false;
}
}