MAPREDUCE-4611. MR AM dies badly when Node is decommissioned (Robert Evans via tgraves)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1379599 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas Graves 2012-08-31 20:43:46 +00:00
parent 63f941d2ad
commit 25e96e455b
6 changed files with 143 additions and 34 deletions

View File

@ -858,6 +858,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4614. Simplify debugging a job's tokens (daryn via bobby)
MAPREDUCE-4611. MR AM dies badly when Node is decommissioned (Robert
Evans via tgraves)
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -99,8 +99,8 @@ public class JobHistoryEventHandler extends AbstractService
protected static final Map<JobId, MetaInfo> fileMap =
Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>());
// Has a signal (SIGTERM etc) been issued?
protected volatile boolean isSignalled = false;
// should job completion be force when the AM shuts down?
protected volatile boolean forceJobCompletion = false;
public JobHistoryEventHandler(AppContext context, int startCount) {
super("JobHistoryEventHandler");
@ -322,7 +322,7 @@ public class JobHistoryEventHandler extends AbstractService
// Process JobUnsuccessfulCompletionEvent for jobIds which still haven't
// closed their event writers
Iterator<JobId> jobIt = fileMap.keySet().iterator();
if(isSignalled) {
if(forceJobCompletion) {
while (jobIt.hasNext()) {
JobId toClose = jobIt.next();
MetaInfo mi = fileMap.get(toClose);
@ -911,9 +911,9 @@ public class JobHistoryEventHandler extends AbstractService
return tmpFileName.substring(0, tmpFileName.length()-4);
}
public void setSignalled(boolean isSignalled) {
this.isSignalled = isSignalled;
LOG.info("JobHistoryEventHandler notified that isSignalled was "
+ isSignalled);
public void setForcejobCompletion(boolean forceJobCompletion) {
this.forceJobCompletion = forceJobCompletion;
LOG.info("JobHistoryEventHandler notified that forceJobCompletion is "
+ forceJobCompletion);
}
}

View File

@ -170,6 +170,8 @@ public class MRAppMaster extends CompositeService {
private Credentials fsTokens = new Credentials(); // Filled during init
private UserGroupInformation currentUser; // Will be setup during init
private volatile boolean isLastAMRetry = false;
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
long appSubmitTime) {
@ -195,11 +197,21 @@ public class MRAppMaster extends CompositeService {
@Override
public void init(final Configuration conf) {
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
downloadTokensAndSetupUGI(conf);
//TODO this is a hack, we really need the RM to inform us when we
// are the last one. This would allow us to configure retries on
// a per application basis.
int numAMRetries = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES);
isLastAMRetry = appAttemptID.getAttemptId() >= numAMRetries;
LOG.info("AM Retries: " + numAMRetries +
" attempt num: " + appAttemptID.getAttemptId() +
" is last retry: " + isLastAMRetry);
context = new RunningAppContext(conf);
// Job name is the same as the app name util we support DAG of jobs
@ -417,6 +429,8 @@ public class MRAppMaster extends CompositeService {
}
try {
//We are finishing cleanly so this is the last retry
isLastAMRetry = true;
// Stop all services
// This will also send the final report to the ResourceManager
LOG.info("Calling stop for all the services");
@ -666,7 +680,11 @@ public class MRAppMaster extends CompositeService {
}
public void setSignalled(boolean isSignalled) {
((RMCommunicator) containerAllocator).setSignalled(true);
((RMCommunicator) containerAllocator).setSignalled(isSignalled);
}
public void setShouldUnregister(boolean shouldUnregister) {
((RMCommunicator) containerAllocator).setShouldUnregister(shouldUnregister);
}
}
@ -717,7 +735,12 @@ public class MRAppMaster extends CompositeService {
@Override
public synchronized void stop() {
try {
if(isLastAMRetry) {
cleanupStagingDir();
} else {
LOG.info("Skipping cleaning up the staging dir. "
+ "assuming AM will be retried.");
}
} catch (IOException io) {
LOG.error("Failed to cleanup staging dir: ", io);
}
@ -1016,14 +1039,19 @@ public class MRAppMaster extends CompositeService {
public void run() {
LOG.info("MRAppMaster received a signal. Signaling RMCommunicator and "
+ "JobHistoryEventHandler.");
// Notify the JHEH and RMCommunicator that a SIGTERM has been received so
// that they don't take too long in shutting down
if(appMaster.containerAllocator instanceof ContainerAllocatorRouter) {
((ContainerAllocatorRouter) appMaster.containerAllocator)
.setSignalled(true);
((ContainerAllocatorRouter) appMaster.containerAllocator)
.setShouldUnregister(appMaster.isLastAMRetry);
}
if(appMaster.jobHistoryEventHandler != null) {
appMaster.jobHistoryEventHandler.setSignalled(true);
appMaster.jobHistoryEventHandler
.setForcejobCompletion(appMaster.isLastAMRetry);
}
appMaster.stop();
}

View File

@ -84,6 +84,7 @@ public abstract class RMCommunicator extends AbstractService {
private Job job;
// Has a signal (SIGTERM etc) been issued?
protected volatile boolean isSignalled = false;
private volatile boolean shouldUnregister = true;
public RMCommunicator(ClientService clientService, AppContext context) {
super("RMCommunicator");
@ -213,7 +214,9 @@ public abstract class RMCommunicator extends AbstractService {
} catch (InterruptedException ie) {
LOG.warn("InterruptedException while stopping", ie);
}
if(shouldUnregister) {
unregister();
}
super.stop();
}
@ -288,8 +291,15 @@ public abstract class RMCommunicator extends AbstractService {
protected abstract void heartbeat() throws Exception;
public void setShouldUnregister(boolean shouldUnregister) {
this.shouldUnregister = shouldUnregister;
LOG.info("RMCommunicator notified that shouldUnregistered is: "
+ shouldUnregister);
}
public void setSignalled(boolean isSignalled) {
this.isSignalled = isSignalled;
LOG.info("RMCommunicator notified that iSignalled was : " + isSignalled);
LOG.info("RMCommunicator notified that iSignalled is: "
+ isSignalled);
}
}

View File

@ -330,7 +330,7 @@ public class TestJobHistoryEventHandler {
Mockito.when(jobId.getAppId()).thenReturn(mockAppId);
jheh.addToFileMap(jobId);
jheh.setSignalled(true);
jheh.setForcejobCompletion(true);
for(int i=0; i < numEvents; ++i) {
events[i] = getEventToEnqueue(jobId);
jheh.handle(events[i]);

View File

@ -23,6 +23,7 @@ import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import java.io.IOException;
@ -47,6 +48,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -90,12 +92,69 @@ import org.junit.Test;
verify(fs).delete(stagingJobPath, true);
}
private class TestMRApp extends MRAppMaster {
@Test
public void testDeletionofStagingOnKill() throws IOException {
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 4);
fs = mock(FileSystem.class);
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class);
attemptId.setAttemptId(0);
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(System.currentTimeMillis());
appId.setId(0);
attemptId.setApplicationId(appId);
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc);
appMaster.init(conf);
//simulate the process being killed
MRAppMaster.MRAppMasterShutdownHook hook =
new MRAppMaster.MRAppMasterShutdownHook(appMaster);
hook.run();
verify(fs, times(0)).delete(stagingJobPath, true);
}
public TestMRApp(ApplicationAttemptId applicationAttemptId) {
@Test
public void testDeletionofStagingOnKillLastTry() throws IOException {
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 1);
fs = mock(FileSystem.class);
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class);
attemptId.setAttemptId(1);
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(System.currentTimeMillis());
appId.setId(0);
attemptId.setApplicationId(appId);
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc);
appMaster.init(conf);
//simulate the process being killed
MRAppMaster.MRAppMasterShutdownHook hook =
new MRAppMaster.MRAppMasterShutdownHook(appMaster);
hook.run();
verify(fs).delete(stagingJobPath, true);
}
private class TestMRApp extends MRAppMaster {
ContainerAllocator allocator;
public TestMRApp(ApplicationAttemptId applicationAttemptId,
ContainerAllocator allocator) {
super(applicationAttemptId, BuilderUtils.newContainerId(
applicationAttemptId, 1), "testhost", 2222, 3333, System
.currentTimeMillis());
this.allocator = allocator;
}
public TestMRApp(ApplicationAttemptId applicationAttemptId) {
this(applicationAttemptId, null);
}
@Override
@ -103,6 +162,15 @@ import org.junit.Test;
return fs;
}
@Override
protected ContainerAllocator createContainerAllocator(
final ClientService clientService, final AppContext context) {
if(allocator == null) {
return super.createContainerAllocator(clientService, context);
}
return allocator;
}
@Override
protected void sysexit() {
}