merge -r 1379598:1379599 from trunk. FIXES: MAPREDUCE-4611

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1379602 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas Graves 2012-08-31 20:45:34 +00:00
parent 8ef7e47241
commit c54a74a2cc
6 changed files with 143 additions and 34 deletions

View File

@ -734,6 +734,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4614. Simplify debugging a job's tokens (daryn via bobby) MAPREDUCE-4614. Simplify debugging a job's tokens (daryn via bobby)
MAPREDUCE-4611. MR AM dies badly when Node is decommissioned (Robert
Evans via tgraves)
Release 0.23.2 - UNRELEASED Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -99,8 +99,8 @@ public class JobHistoryEventHandler extends AbstractService
protected static final Map<JobId, MetaInfo> fileMap = protected static final Map<JobId, MetaInfo> fileMap =
Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>()); Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>());
// Has a signal (SIGTERM etc) been issued? // should job completion be force when the AM shuts down?
protected volatile boolean isSignalled = false; protected volatile boolean forceJobCompletion = false;
public JobHistoryEventHandler(AppContext context, int startCount) { public JobHistoryEventHandler(AppContext context, int startCount) {
super("JobHistoryEventHandler"); super("JobHistoryEventHandler");
@ -322,7 +322,7 @@ public class JobHistoryEventHandler extends AbstractService
// Process JobUnsuccessfulCompletionEvent for jobIds which still haven't // Process JobUnsuccessfulCompletionEvent for jobIds which still haven't
// closed their event writers // closed their event writers
Iterator<JobId> jobIt = fileMap.keySet().iterator(); Iterator<JobId> jobIt = fileMap.keySet().iterator();
if(isSignalled) { if(forceJobCompletion) {
while (jobIt.hasNext()) { while (jobIt.hasNext()) {
JobId toClose = jobIt.next(); JobId toClose = jobIt.next();
MetaInfo mi = fileMap.get(toClose); MetaInfo mi = fileMap.get(toClose);
@ -911,9 +911,9 @@ public class JobHistoryEventHandler extends AbstractService
return tmpFileName.substring(0, tmpFileName.length()-4); return tmpFileName.substring(0, tmpFileName.length()-4);
} }
public void setSignalled(boolean isSignalled) { public void setForcejobCompletion(boolean forceJobCompletion) {
this.isSignalled = isSignalled; this.forceJobCompletion = forceJobCompletion;
LOG.info("JobHistoryEventHandler notified that isSignalled was " LOG.info("JobHistoryEventHandler notified that forceJobCompletion is "
+ isSignalled); + forceJobCompletion);
} }
} }

View File

@ -170,6 +170,8 @@ public class MRAppMaster extends CompositeService {
private Credentials fsTokens = new Credentials(); // Filled during init private Credentials fsTokens = new Credentials(); // Filled during init
private UserGroupInformation currentUser; // Will be setup during init private UserGroupInformation currentUser; // Will be setup during init
private volatile boolean isLastAMRetry = false;
public MRAppMaster(ApplicationAttemptId applicationAttemptId, public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
long appSubmitTime) { long appSubmitTime) {
@ -195,11 +197,21 @@ public class MRAppMaster extends CompositeService {
@Override @Override
public void init(final Configuration conf) { public void init(final Configuration conf) {
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
downloadTokensAndSetupUGI(conf); downloadTokensAndSetupUGI(conf);
//TODO this is a hack, we really need the RM to inform us when we
// are the last one. This would allow us to configure retries on
// a per application basis.
int numAMRetries = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES);
isLastAMRetry = appAttemptID.getAttemptId() >= numAMRetries;
LOG.info("AM Retries: " + numAMRetries +
" attempt num: " + appAttemptID.getAttemptId() +
" is last retry: " + isLastAMRetry);
context = new RunningAppContext(conf); context = new RunningAppContext(conf);
// Job name is the same as the app name util we support DAG of jobs // Job name is the same as the app name util we support DAG of jobs
@ -417,6 +429,8 @@ public class MRAppMaster extends CompositeService {
} }
try { try {
//We are finishing cleanly so this is the last retry
isLastAMRetry = true;
// Stop all services // Stop all services
// This will also send the final report to the ResourceManager // This will also send the final report to the ResourceManager
LOG.info("Calling stop for all the services"); LOG.info("Calling stop for all the services");
@ -666,7 +680,11 @@ public class MRAppMaster extends CompositeService {
} }
public void setSignalled(boolean isSignalled) { public void setSignalled(boolean isSignalled) {
((RMCommunicator) containerAllocator).setSignalled(true); ((RMCommunicator) containerAllocator).setSignalled(isSignalled);
}
public void setShouldUnregister(boolean shouldUnregister) {
((RMCommunicator) containerAllocator).setShouldUnregister(shouldUnregister);
} }
} }
@ -717,7 +735,12 @@ public class MRAppMaster extends CompositeService {
@Override @Override
public synchronized void stop() { public synchronized void stop() {
try { try {
cleanupStagingDir(); if(isLastAMRetry) {
cleanupStagingDir();
} else {
LOG.info("Skipping cleaning up the staging dir. "
+ "assuming AM will be retried.");
}
} catch (IOException io) { } catch (IOException io) {
LOG.error("Failed to cleanup staging dir: ", io); LOG.error("Failed to cleanup staging dir: ", io);
} }
@ -1016,14 +1039,19 @@ public class MRAppMaster extends CompositeService {
public void run() { public void run() {
LOG.info("MRAppMaster received a signal. Signaling RMCommunicator and " LOG.info("MRAppMaster received a signal. Signaling RMCommunicator and "
+ "JobHistoryEventHandler."); + "JobHistoryEventHandler.");
// Notify the JHEH and RMCommunicator that a SIGTERM has been received so // Notify the JHEH and RMCommunicator that a SIGTERM has been received so
// that they don't take too long in shutting down // that they don't take too long in shutting down
if(appMaster.containerAllocator instanceof ContainerAllocatorRouter) { if(appMaster.containerAllocator instanceof ContainerAllocatorRouter) {
((ContainerAllocatorRouter) appMaster.containerAllocator) ((ContainerAllocatorRouter) appMaster.containerAllocator)
.setSignalled(true); .setSignalled(true);
((ContainerAllocatorRouter) appMaster.containerAllocator)
.setShouldUnregister(appMaster.isLastAMRetry);
} }
if(appMaster.jobHistoryEventHandler != null) { if(appMaster.jobHistoryEventHandler != null) {
appMaster.jobHistoryEventHandler.setSignalled(true); appMaster.jobHistoryEventHandler
.setForcejobCompletion(appMaster.isLastAMRetry);
} }
appMaster.stop(); appMaster.stop();
} }

View File

@ -84,6 +84,7 @@ public abstract class RMCommunicator extends AbstractService {
private Job job; private Job job;
// Has a signal (SIGTERM etc) been issued? // Has a signal (SIGTERM etc) been issued?
protected volatile boolean isSignalled = false; protected volatile boolean isSignalled = false;
private volatile boolean shouldUnregister = true;
public RMCommunicator(ClientService clientService, AppContext context) { public RMCommunicator(ClientService clientService, AppContext context) {
super("RMCommunicator"); super("RMCommunicator");
@ -213,7 +214,9 @@ public abstract class RMCommunicator extends AbstractService {
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
LOG.warn("InterruptedException while stopping", ie); LOG.warn("InterruptedException while stopping", ie);
} }
unregister(); if(shouldUnregister) {
unregister();
}
super.stop(); super.stop();
} }
@ -288,8 +291,15 @@ public abstract class RMCommunicator extends AbstractService {
protected abstract void heartbeat() throws Exception; protected abstract void heartbeat() throws Exception;
public void setShouldUnregister(boolean shouldUnregister) {
this.shouldUnregister = shouldUnregister;
LOG.info("RMCommunicator notified that shouldUnregistered is: "
+ shouldUnregister);
}
public void setSignalled(boolean isSignalled) { public void setSignalled(boolean isSignalled) {
this.isSignalled = isSignalled; this.isSignalled = isSignalled;
LOG.info("RMCommunicator notified that iSignalled was : " + isSignalled); LOG.info("RMCommunicator notified that iSignalled is: "
+ isSignalled);
} }
} }

View File

@ -330,7 +330,7 @@ public class TestJobHistoryEventHandler {
Mockito.when(jobId.getAppId()).thenReturn(mockAppId); Mockito.when(jobId.getAppId()).thenReturn(mockAppId);
jheh.addToFileMap(jobId); jheh.addToFileMap(jobId);
jheh.setSignalled(true); jheh.setForcejobCompletion(true);
for(int i=0; i < numEvents; ++i) { for(int i=0; i < numEvents; ++i) {
events[i] = getEventToEnqueue(jobId); events[i] = getEventToEnqueue(jobId);
jheh.handle(events[i]); jheh.handle(events[i]);

View File

@ -23,6 +23,7 @@ import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import java.io.IOException; import java.io.IOException;
@ -47,6 +48,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -89,28 +91,94 @@ import org.junit.Test;
handler.handle(new JobFinishEvent(jobid)); handler.handle(new JobFinishEvent(jobid));
verify(fs).delete(stagingJobPath, true); verify(fs).delete(stagingJobPath, true);
} }
@Test
public void testDeletionofStagingOnKill() throws IOException {
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 4);
fs = mock(FileSystem.class);
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class);
attemptId.setAttemptId(0);
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(System.currentTimeMillis());
appId.setId(0);
attemptId.setApplicationId(appId);
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc);
appMaster.init(conf);
//simulate the process being killed
MRAppMaster.MRAppMasterShutdownHook hook =
new MRAppMaster.MRAppMasterShutdownHook(appMaster);
hook.run();
verify(fs, times(0)).delete(stagingJobPath, true);
}
@Test
public void testDeletionofStagingOnKillLastTry() throws IOException {
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 1);
fs = mock(FileSystem.class);
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class);
attemptId.setAttemptId(1);
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(System.currentTimeMillis());
appId.setId(0);
attemptId.setApplicationId(appId);
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc);
appMaster.init(conf);
//simulate the process being killed
MRAppMaster.MRAppMasterShutdownHook hook =
new MRAppMaster.MRAppMasterShutdownHook(appMaster);
hook.run();
verify(fs).delete(stagingJobPath, true);
}
private class TestMRApp extends MRAppMaster { private class TestMRApp extends MRAppMaster {
ContainerAllocator allocator;
public TestMRApp(ApplicationAttemptId applicationAttemptId) { public TestMRApp(ApplicationAttemptId applicationAttemptId,
super(applicationAttemptId, BuilderUtils.newContainerId( ContainerAllocator allocator) {
applicationAttemptId, 1), "testhost", 2222, 3333, System super(applicationAttemptId, BuilderUtils.newContainerId(
.currentTimeMillis()); applicationAttemptId, 1), "testhost", 2222, 3333, System
} .currentTimeMillis());
this.allocator = allocator;
@Override }
protected FileSystem getFileSystem(Configuration conf) {
return fs; public TestMRApp(ApplicationAttemptId applicationAttemptId) {
} this(applicationAttemptId, null);
}
@Override
protected void sysexit() { @Override
} protected FileSystem getFileSystem(Configuration conf) {
return fs;
@Override }
public Configuration getConfig() {
return conf; @Override
} protected ContainerAllocator createContainerAllocator(
final ClientService clientService, final AppContext context) {
if(allocator == null) {
return super.createContainerAllocator(clientService, context);
}
return allocator;
}
@Override
protected void sysexit() {
}
@Override
public Configuration getConfig() {
return conf;
}
} }
private final class MRAppTestCleanup extends MRApp { private final class MRAppTestCleanup extends MRApp {