Merge r1576023 from trunk. Fixed ClientRMService#forceKillApplication not killing unmanaged application. Contributed by Karthik Kambatla

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1576026 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jian He 2014-03-10 18:03:47 +00:00
parent bc927a10cf
commit 8d6fb7e920
10 changed files with 106 additions and 41 deletions

View File

@ -418,6 +418,9 @@ Release 2.4.0 - UNRELEASED
YARN-1787. Fixed help messages for applicationattempt and container
sub-commands in bin/yarn. (Zhijie Shen via vinodkv)
YARN-1793. Fixed ClientRMService#forceKillApplication not killing unmanaged
application. (Karthik Kambatla via jianhe)
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -351,27 +351,18 @@ public FinishApplicationMasterResponse finishApplicationMaster(
RMApp rmApp =
rmContext.getRMApps().get(applicationAttemptId.getApplicationId());
if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) {
// No recovery supported yet for unmanaged AM. Send the unregister event
// and (falsely) acknowledge state-store write immediately.
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
.getTrackingUrl(), request.getFinalApplicationStatus(), request
.getDiagnostics()));
if (rmApp.isAppFinalStateStored()) {
return FinishApplicationMasterResponse.newInstance(true);
}
// Not an unmanaged-AM.
if (rmApp.isAppSafeToTerminate()) {
return FinishApplicationMasterResponse.newInstance(true);
} else {
// keep sending the unregister event as RM may crash in the meanwhile.
rmContext.getDispatcher().getEventHandler().handle(
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
.getTrackingUrl(), request.getFinalApplicationStatus(), request
.getDiagnostics()));
return FinishApplicationMasterResponse.newInstance(false);
}
// For UnmanagedAMs, return true so they don't retry
return FinishApplicationMasterResponse.newInstance(
rmApp.getApplicationSubmissionContext().getUnmanagedAM());
}
}

View File

@ -396,15 +396,18 @@ public KillApplicationResponse forceKillApplication(
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
}
if (application.isAppSafeToTerminate()) {
if (application.isAppFinalStateStored()) {
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.KILL_APP_REQUEST, "ClientRMService", applicationId);
AuditConstants.KILL_APP_REQUEST, "ClientRMService", applicationId);
return KillApplicationResponse.newInstance(true);
} else {
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.KILL));
return KillApplicationResponse.newInstance(false);
}
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.KILL));
// For UnmanagedAMs, return true so they don't retry
return KillApplicationResponse.newInstance(
application.getApplicationSubmissionContext().getUnmanagedAM());
}
@Override

View File

@ -204,13 +204,10 @@ ApplicationReport createAndGetApplicationReport(String clientUserName,
Set<String> getApplicationTags();
/**
* Check whether this application is safe to terminate.
* An application is deemed to be safe to terminate if it is an unmanaged
* AM or its state has been saved in state store.
* @return the flag which indicates whether this application is safe to
* terminate.
* Check whether this application's state has been saved to the state store.
* @return the flag indicating whether the applications's state is stored.
*/
boolean isAppSafeToTerminate();
boolean isAppFinalStateStored();
/**
* Create the external user-facing state of ApplicationMaster from the

View File

@ -1103,14 +1103,11 @@ public Set<String> getApplicationTags() {
}
@Override
public boolean isAppSafeToTerminate() {
public boolean isAppFinalStateStored() {
RMAppState state = getState();
return state.equals(RMAppState.FINISHING)
|| state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED)
|| state.equals(RMAppState.KILLED) ||
// If this is an unmanaged AM, we are safe to unregister since unmanaged
// AM will immediately go to FINISHED state on AM unregistration
getApplicationSubmissionContext().getUnmanagedAM();
|| state.equals(RMAppState.KILLED);
}
@Override

View File

@ -173,17 +173,28 @@ public GetNewApplicationResponse getNewAppId() throws Exception {
}
public RMApp submitApp(int masterMemory) throws Exception {
return submitApp(masterMemory, false);
}
public RMApp submitApp(int masterMemory, boolean unmanaged)
throws Exception {
return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
.getShortUserName());
.getShortUserName(), unmanaged);
}
// client
public RMApp submitApp(int masterMemory, String name, String user) throws Exception {
return submitApp(masterMemory, name, user, null, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
return submitApp(masterMemory, name, user, false);
}
public RMApp submitApp(int masterMemory, String name, String user,
boolean unmanaged)
throws Exception {
return submitApp(masterMemory, name, user, null, unmanaged, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
}
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls) throws Exception {
return submitApp(masterMemory, name, user, acls, false, null,

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
@ -34,6 +36,7 @@
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -61,6 +64,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
@ -218,7 +222,7 @@ public void testGetApplicationReport() throws YarnException {
}
@Test
public void testForceKillApplication() throws YarnException {
public void testForceKillNonExistingApplication() throws YarnException {
RMContext rmContext = mock(RMContext.class);
when(rmContext.getRMApps()).thenReturn(
new ConcurrentHashMap<ApplicationId, RMApp>());
@ -237,6 +241,58 @@ public void testForceKillApplication() throws YarnException {
"application " + request.getApplicationId());
}
}
@Test
public void testForceKillApplication() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
MockRM rm = new MockRM();
rm.init(conf);
rm.start();
ClientRMService rmService = rm.getClientRMService();
GetApplicationsRequest getRequest = GetApplicationsRequest.newInstance(
EnumSet.of(YarnApplicationState.KILLED));
RMApp app1 = rm.submitApp(1024);
RMApp app2 = rm.submitApp(1024, true);
assertEquals("Incorrect number of apps in the RM", 0,
rmService.getApplications(getRequest).getApplicationList().size());
KillApplicationRequest killRequest1 =
KillApplicationRequest.newInstance(app1.getApplicationId());
KillApplicationRequest killRequest2 =
KillApplicationRequest.newInstance(app2.getApplicationId());
int killAttemptCount = 0;
for (int i = 0; i < 100; i++) {
KillApplicationResponse killResponse1 =
rmService.forceKillApplication(killRequest1);
killAttemptCount++;
if (killResponse1.getIsKillCompleted()) {
break;
}
Thread.sleep(10);
}
assertTrue("Kill attempt count should be greater than 1 for managed AMs",
killAttemptCount > 1);
assertEquals("Incorrect number of apps in the RM", 1,
rmService.getApplications(getRequest).getApplicationList().size());
KillApplicationResponse killResponse2 =
rmService.forceKillApplication(killRequest2);
assertTrue("Killing UnmanagedAM should falsely acknowledge true",
killResponse2.getIsKillCompleted());
for (int i = 0; i < 100; i++) {
if (2 ==
rmService.getApplications(getRequest).getApplicationList().size()) {
break;
}
Thread.sleep(10);
}
assertEquals("Incorrect number of apps in the RM", 2,
rmService.getApplications(getRequest).getApplicationList().size());
}
@Test (expected = ApplicationNotFoundException.class)
public void testMoveAbsentApplication() throws YarnException {
@ -629,6 +685,12 @@ private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId,
private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId,
String name, String queue, Set<String> tags) {
return mockSubmitAppRequest(appId, name, queue, tags, false);
}
private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId,
String name, String queue, Set<String> tags, boolean unmanaged) {
ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
Resource resource = Resources.createResource(
@ -643,6 +705,7 @@ private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId,
submissionContext.setResource(resource);
submissionContext.setApplicationType(appType);
submissionContext.setApplicationTags(tags);
submissionContext.setUnmanagedAM(unmanaged);
SubmitApplicationRequest submitRequest =
recordFactory.newRecordInstance(SubmitApplicationRequest.class);

View File

@ -233,7 +233,7 @@ public KillApplicationResponse forceKillApplication(
KillApplicationRequest request) throws YarnException {
ApplicationId applicationId = request.getApplicationId();
RMApp application = this.rmContext.getRMApps().get(applicationId);
if (application.isAppSafeToTerminate()) {
if (application.isAppFinalStateStored()) {
return KillApplicationResponse.newInstance(true);
} else {
return KillApplicationResponse.newInstance(false);

View File

@ -151,7 +151,7 @@ public void setQueue(String name) {
}
@Override
public boolean isAppSafeToTerminate() {
public boolean isAppFinalStateStored() {
throw new UnsupportedOperationException("Not supported yet.");
}

View File

@ -224,7 +224,7 @@ public Set<String> getApplicationTags() {
}
@Override
public boolean isAppSafeToTerminate() {
public boolean isAppFinalStateStored() {
return true;
}