YARN-3692. Allow REST API to set a user generated message when killing an application. Contributed by Rohith Sharma K S

This commit is contained in:
Naganarasimha 2016-09-24 21:13:01 +05:30
parent 23984e1787
commit 0e4a5611ff
11 changed files with 116 additions and 15 deletions

View File

@ -511,4 +511,10 @@ public class ResourceMgrDelegate extends YarnClient {
throws YarnException, IOException {
client.signalToContainer(containerId, command);
}
@Override
public void killApplication(ApplicationId appId, String diagnostics)
throws YarnException, IOException {
client.killApplication(appId, diagnostics);
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.Records;
@ -57,4 +58,21 @@ public abstract class KillApplicationRequest {
@Public
@Stable
public abstract void setApplicationId(ApplicationId applicationId);
/**
* Get the <em>diagnostics</em> to which the application is being killed.
* @return <em>diagnostics</em> to which the application is being killed
*/
@Public
@Unstable
public abstract String getDiagnostics();
/**
* Set the <em>diagnostics</em> to which the application is being killed.
* @param diagnostics <em>diagnostics</em> to which the application is being
* killed
*/
@Public
@Unstable
public abstract void setDiagnostics(String diagnostics);
}

View File

@ -155,6 +155,7 @@ message FailApplicationAttemptResponseProto {
message KillApplicationRequestProto {
optional ApplicationIdProto application_id = 1;
optional string diagnostics = 2;
}
message KillApplicationResponseProto {

View File

@ -172,6 +172,20 @@ public abstract class YarnClient extends AbstractService {
public abstract void killApplication(ApplicationId applicationId) throws YarnException,
IOException;
/**
* <p>
* Kill an application identified by given ID.
* </p>
* @param applicationId {@link ApplicationId} of the application that needs to
* be killed
* @param diagnostics for killing an application.
* @throws YarnException in case of errors or if YARN rejects the request due
* to access-control restrictions.
* @throws IOException
*/
public abstract void killApplication(ApplicationId applicationId,
String diagnostics) throws YarnException, IOException;
/**
* <p>
* Get a report of the given Application.

View File

@ -405,10 +405,21 @@ public class YarnClientImpl extends YarnClient {
@Override
public void killApplication(ApplicationId applicationId)
throws YarnException, IOException {
killApplication(applicationId, null);
}
@Override
public void killApplication(ApplicationId applicationId, String diagnostics)
throws YarnException, IOException {
KillApplicationRequest request =
Records.newRecord(KillApplicationRequest.class);
request.setApplicationId(applicationId);
if (diagnostics != null) {
request.setDiagnostics(diagnostics);
}
try {
int pollCount = 0;
long startTime = System.currentTimeMillis();
@ -422,14 +433,15 @@ public class YarnClientImpl extends YarnClient {
}
long elapsedMillis = System.currentTimeMillis() - startTime;
if (enforceAsyncAPITimeout() &&
elapsedMillis >= this.asyncApiPollTimeoutMillis) {
throw new YarnException("Timed out while waiting for application " +
applicationId + " to be killed.");
if (enforceAsyncAPITimeout()
&& elapsedMillis >= this.asyncApiPollTimeoutMillis) {
throw new YarnException("Timed out while waiting for application "
+ applicationId + " to be killed.");
}
if (++pollCount % 10 == 0) {
LOG.info("Waiting for application " + applicationId + " to be killed.");
LOG.info(
"Waiting for application " + applicationId + " to be killed.");
}
Thread.sleep(asyncApiPollIntervalMillis);
}

View File

@ -127,6 +127,24 @@ public class KillApplicationRequestPBImpl extends KillApplicationRequest {
return ((ApplicationIdPBImpl)t).getProto();
}
@Override
public String getDiagnostics() {
KillApplicationRequestProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasDiagnostics()) {
return null;
}
return (p.getDiagnostics());
}
@Override
public void setDiagnostics(String diagnostics) {
maybeInitBuilder();
if (diagnostics == null) {
builder.clearDiagnostics();
return;
}
builder.setDiagnostics(diagnostics);
}
}

View File

@ -750,15 +750,25 @@ public class ClientRMService extends AbstractService implements
return KillApplicationResponse.newInstance(true);
}
String message = "Kill application " + applicationId + " received from "
+ callerUGI;
StringBuilder message = new StringBuilder();
message.append("Application ").append(applicationId)
.append(" was killed by user ").append(callerUGI.getShortUserName());
InetAddress remoteAddress = Server.getRemoteIp();
if (null != remoteAddress) {
message += " at " + remoteAddress.getHostAddress();
message.append(" at ").append(remoteAddress.getHostAddress());
}
String diagnostics = org.apache.commons.lang.StringUtils
.trimToNull(request.getDiagnostics());
if (diagnostics != null) {
message.append(" with diagnostic message: ");
message.append(diagnostics);
}
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppKillByClientEvent(applicationId, message, callerUGI,
remoteAddress));
.handle(new RMAppKillByClientEvent(applicationId, message.toString(),
callerUGI, remoteAddress));
// For UnmanagedAMs, return true so they don't retry
return KillApplicationResponse.newInstance(

View File

@ -131,14 +131,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
@ -180,7 +182,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.webapp.WebServices;
@ -925,7 +926,7 @@ public class RMWebServices extends WebServices {
// allow users to kill the app
if (targetState.getState().equals(YarnApplicationState.KILLED.toString())) {
return killApp(app, callerUGI, hsr);
return killApp(app, callerUGI, hsr, targetState.getDiagnostics());
}
throw new BadRequestException("Only '"
+ YarnApplicationState.KILLED.toString()
@ -1141,7 +1142,8 @@ public class RMWebServices extends WebServices {
}
protected Response killApp(RMApp app, UserGroupInformation callerUGI,
HttpServletRequest hsr) throws IOException, InterruptedException {
HttpServletRequest hsr, final String diagnostic)
throws IOException, InterruptedException {
if (app == null) {
throw new IllegalArgumentException("app cannot be null");
@ -1158,6 +1160,9 @@ public class RMWebServices extends WebServices {
YarnException {
KillApplicationRequest req =
KillApplicationRequest.newInstance(appid);
if (diagnostic != null) {
req.setDiagnostics(diagnostic);
}
return rm.getClientRMService().forceKillApplication(req);
}
});

View File

@ -27,6 +27,7 @@ import javax.xml.bind.annotation.XmlRootElement;
public class AppState {
String state;
private String diagnostics;
public AppState() {
}
@ -43,4 +44,11 @@ public class AppState {
return this.state;
}
public String getDiagnostics() {
return diagnostics;
}
public void setDiagnostics(String diagnostics) {
this.diagnostics = diagnostics;
}
}

View File

@ -505,7 +505,8 @@ public class TestClientRMService {
@Test
public void testForceKillApplication() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
MockRM rm = new MockRM();
conf.setBoolean(MockRM.ENABLE_WEBAPP, true);
MockRM rm = new MockRM(conf);
rm.init(conf);
rm.start();
@ -521,6 +522,8 @@ public class TestClientRMService {
KillApplicationRequest killRequest1 =
KillApplicationRequest.newInstance(app1.getApplicationId());
String diagnostic = "message1";
killRequest1.setDiagnostics(diagnostic);
KillApplicationRequest killRequest2 =
KillApplicationRequest.newInstance(app2.getApplicationId());
@ -538,6 +541,8 @@ public class TestClientRMService {
killAttemptCount > 1);
assertEquals("Incorrect number of apps in the RM", 1,
rmService.getApplications(getRequest).getApplicationList().size());
assertTrue("Diagnostic message is incorrect",
app1.getDiagnostics().toString().contains(diagnostic));
KillApplicationResponse killResponse2 =
rmService.forceKillApplication(killRequest2);

View File

@ -367,6 +367,7 @@ public class TestRMWebServicesAppsModification extends JerseyTestBase {
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
MediaType[] contentTypes =
{ MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_XML_TYPE };
String diagnostic = "message1";
for (String mediaType : mediaTypes) {
for (MediaType contentType : contentTypes) {
RMApp app = rm.submitApp(CONTAINER_MB, "", webserviceUserName);
@ -374,6 +375,7 @@ public class TestRMWebServicesAppsModification extends JerseyTestBase {
AppState targetState =
new AppState(YarnApplicationState.KILLED.toString());
targetState.setDiagnostics(diagnostic);
Object entity;
if (contentType.equals(MediaType.APPLICATION_JSON_TYPE)) {
@ -428,6 +430,8 @@ public class TestRMWebServicesAppsModification extends JerseyTestBase {
} else {
verifyAppStateXML(response, RMAppState.KILLED);
}
assertTrue("Diagnostic message is incorrect",
app.getDiagnostics().toString().contains(diagnostic));
break;
}
}