YARN-3692. Allow REST API to set a user generated message when killing an application. Contributed by Rohith Sharma K S

This commit is contained in:
Naganarasimha 2016-09-24 21:13:01 +05:30
parent 6ca5ffe4b5
commit fa2025316d
11 changed files with 113 additions and 13 deletions

View File

@ -501,4 +501,10 @@ public class ResourceMgrDelegate extends YarnClient {
throws YarnException, IOException { throws YarnException, IOException {
client.signalToContainer(containerId, command); client.signalToContainer(containerId, command);
} }
@Override
public void killApplication(ApplicationId appId, String diagnostics)
throws YarnException, IOException {
client.killApplication(appId, diagnostics);
}
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -57,4 +58,21 @@ public abstract class KillApplicationRequest {
@Public @Public
@Stable @Stable
public abstract void setApplicationId(ApplicationId applicationId); public abstract void setApplicationId(ApplicationId applicationId);
/**
* Get the <em>diagnostics</em> to which the application is being killed.
* @return <em>diagnostics</em> to which the application is being killed
*/
@Public
@Unstable
public abstract String getDiagnostics();
/**
* Set the <em>diagnostics</em> to which the application is being killed.
* @param diagnostics <em>diagnostics</em> to which the application is being
* killed
*/
@Public
@Unstable
public abstract void setDiagnostics(String diagnostics);
} }

View File

@ -153,6 +153,7 @@ message FailApplicationAttemptResponseProto {
message KillApplicationRequestProto { message KillApplicationRequestProto {
optional ApplicationIdProto application_id = 1; optional ApplicationIdProto application_id = 1;
optional string diagnostics = 2;
} }
message KillApplicationResponseProto { message KillApplicationResponseProto {

View File

@ -172,6 +172,20 @@ public abstract class YarnClient extends AbstractService {
public abstract void killApplication(ApplicationId applicationId) throws YarnException, public abstract void killApplication(ApplicationId applicationId) throws YarnException,
IOException; IOException;
/**
* <p>
* Kill an application identified by given ID.
* </p>
* @param applicationId {@link ApplicationId} of the application that needs to
* be killed
* @param diagnostics for killing an application.
* @throws YarnException in case of errors or if YARN rejects the request due
* to access-control restrictions.
* @throws IOException
*/
public abstract void killApplication(ApplicationId applicationId,
String diagnostics) throws YarnException, IOException;
/** /**
* <p> * <p>
* Get a report of the given Application. * Get a report of the given Application.

View File

@ -400,10 +400,21 @@ public class YarnClientImpl extends YarnClient {
@Override @Override
public void killApplication(ApplicationId applicationId) public void killApplication(ApplicationId applicationId)
throws YarnException, IOException { throws YarnException, IOException {
killApplication(applicationId, null);
}
@Override
public void killApplication(ApplicationId applicationId, String diagnostics)
throws YarnException, IOException {
KillApplicationRequest request = KillApplicationRequest request =
Records.newRecord(KillApplicationRequest.class); Records.newRecord(KillApplicationRequest.class);
request.setApplicationId(applicationId); request.setApplicationId(applicationId);
if (diagnostics != null) {
request.setDiagnostics(diagnostics);
}
try { try {
int pollCount = 0; int pollCount = 0;
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
@ -417,14 +428,15 @@ public class YarnClientImpl extends YarnClient {
} }
long elapsedMillis = System.currentTimeMillis() - startTime; long elapsedMillis = System.currentTimeMillis() - startTime;
if (enforceAsyncAPITimeout() && if (enforceAsyncAPITimeout()
elapsedMillis >= this.asyncApiPollTimeoutMillis) { && elapsedMillis >= this.asyncApiPollTimeoutMillis) {
throw new YarnException("Timed out while waiting for application " + throw new YarnException("Timed out while waiting for application "
applicationId + " to be killed."); + applicationId + " to be killed.");
} }
if (++pollCount % 10 == 0) { if (++pollCount % 10 == 0) {
LOG.info("Waiting for application " + applicationId + " to be killed."); LOG.info(
"Waiting for application " + applicationId + " to be killed.");
} }
Thread.sleep(asyncApiPollIntervalMillis); Thread.sleep(asyncApiPollIntervalMillis);
} }

View File

@ -127,6 +127,24 @@ public class KillApplicationRequestPBImpl extends KillApplicationRequest {
return ((ApplicationIdPBImpl)t).getProto(); return ((ApplicationIdPBImpl)t).getProto();
} }
@Override
public String getDiagnostics() {
KillApplicationRequestProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasDiagnostics()) {
return null;
}
return (p.getDiagnostics());
}
@Override
public void setDiagnostics(String diagnostics) {
maybeInitBuilder();
if (diagnostics == null) {
builder.clearDiagnostics();
return;
}
builder.setDiagnostics(diagnostics);
}
} }

View File

@ -750,15 +750,25 @@ public class ClientRMService extends AbstractService implements
return KillApplicationResponse.newInstance(true); return KillApplicationResponse.newInstance(true);
} }
String message = "Kill application " + applicationId + " received from " StringBuilder message = new StringBuilder();
+ callerUGI; message.append("Application ").append(applicationId)
.append(" was killed by user ").append(callerUGI.getShortUserName());
InetAddress remoteAddress = Server.getRemoteIp(); InetAddress remoteAddress = Server.getRemoteIp();
if (null != remoteAddress) { if (null != remoteAddress) {
message += " at " + remoteAddress.getHostAddress(); message.append(" at ").append(remoteAddress.getHostAddress());
} }
String diagnostics = org.apache.commons.lang.StringUtils
.trimToNull(request.getDiagnostics());
if (diagnostics != null) {
message.append(" with diagnostic message: ");
message.append(diagnostics);
}
this.rmContext.getDispatcher().getEventHandler() this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppKillByClientEvent(applicationId, message, callerUGI, .handle(new RMAppKillByClientEvent(applicationId, message.toString(),
remoteAddress)); callerUGI, remoteAddress));
// For UnmanagedAMs, return true so they don't retry // For UnmanagedAMs, return true so they don't retry
return KillApplicationResponse.newInstance( return KillApplicationResponse.newInstance(

View File

@ -789,7 +789,7 @@ public class RMWebServices {
// allow users to kill the app // allow users to kill the app
if (targetState.getState().equals(YarnApplicationState.KILLED.toString())) { if (targetState.getState().equals(YarnApplicationState.KILLED.toString())) {
return killApp(app, callerUGI, hsr); return killApp(app, callerUGI, hsr, targetState.getDiagnostics());
} }
throw new BadRequestException("Only '" throw new BadRequestException("Only '"
+ YarnApplicationState.KILLED.toString() + YarnApplicationState.KILLED.toString()
@ -1006,7 +1006,8 @@ public class RMWebServices {
} }
protected Response killApp(RMApp app, UserGroupInformation callerUGI, protected Response killApp(RMApp app, UserGroupInformation callerUGI,
HttpServletRequest hsr) throws IOException, InterruptedException { HttpServletRequest hsr, final String diagnostic)
throws IOException, InterruptedException {
if (app == null) { if (app == null) {
throw new IllegalArgumentException("app cannot be null"); throw new IllegalArgumentException("app cannot be null");
@ -1023,6 +1024,9 @@ public class RMWebServices {
YarnException { YarnException {
KillApplicationRequest req = KillApplicationRequest req =
KillApplicationRequest.newInstance(appid); KillApplicationRequest.newInstance(appid);
if (diagnostic != null) {
req.setDiagnostics(diagnostic);
}
return rm.getClientRMService().forceKillApplication(req); return rm.getClientRMService().forceKillApplication(req);
} }
}); });

View File

@ -27,6 +27,7 @@ import javax.xml.bind.annotation.XmlRootElement;
public class AppState { public class AppState {
String state; String state;
private String diagnostics;
public AppState() { public AppState() {
} }
@ -43,4 +44,11 @@ public class AppState {
return this.state; return this.state;
} }
public String getDiagnostics() {
return diagnostics;
}
public void setDiagnostics(String diagnostics) {
this.diagnostics = diagnostics;
}
} }

View File

@ -501,7 +501,8 @@ public class TestClientRMService {
@Test @Test
public void testForceKillApplication() throws Exception { public void testForceKillApplication() throws Exception {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
MockRM rm = new MockRM(); conf.setBoolean(MockRM.ENABLE_WEBAPP, true);
MockRM rm = new MockRM(conf);
rm.init(conf); rm.init(conf);
rm.start(); rm.start();
@ -517,6 +518,8 @@ public class TestClientRMService {
KillApplicationRequest killRequest1 = KillApplicationRequest killRequest1 =
KillApplicationRequest.newInstance(app1.getApplicationId()); KillApplicationRequest.newInstance(app1.getApplicationId());
String diagnostic = "message1";
killRequest1.setDiagnostics(diagnostic);
KillApplicationRequest killRequest2 = KillApplicationRequest killRequest2 =
KillApplicationRequest.newInstance(app2.getApplicationId()); KillApplicationRequest.newInstance(app2.getApplicationId());
@ -534,6 +537,8 @@ public class TestClientRMService {
killAttemptCount > 1); killAttemptCount > 1);
assertEquals("Incorrect number of apps in the RM", 1, assertEquals("Incorrect number of apps in the RM", 1,
rmService.getApplications(getRequest).getApplicationList().size()); rmService.getApplications(getRequest).getApplicationList().size());
assertTrue("Diagnostic message is incorrect",
app1.getDiagnostics().toString().contains(diagnostic));
KillApplicationResponse killResponse2 = KillApplicationResponse killResponse2 =
rmService.forceKillApplication(killRequest2); rmService.forceKillApplication(killRequest2);

View File

@ -362,6 +362,7 @@ public class TestRMWebServicesAppsModification extends JerseyTestBase {
{ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }; { MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML };
MediaType[] contentTypes = MediaType[] contentTypes =
{ MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_XML_TYPE }; { MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_XML_TYPE };
String diagnostic = "message1";
for (String mediaType : mediaTypes) { for (String mediaType : mediaTypes) {
for (MediaType contentType : contentTypes) { for (MediaType contentType : contentTypes) {
RMApp app = rm.submitApp(CONTAINER_MB, "", webserviceUserName); RMApp app = rm.submitApp(CONTAINER_MB, "", webserviceUserName);
@ -369,6 +370,7 @@ public class TestRMWebServicesAppsModification extends JerseyTestBase {
AppState targetState = AppState targetState =
new AppState(YarnApplicationState.KILLED.toString()); new AppState(YarnApplicationState.KILLED.toString());
targetState.setDiagnostics(diagnostic);
Object entity; Object entity;
if (contentType.equals(MediaType.APPLICATION_JSON_TYPE)) { if (contentType.equals(MediaType.APPLICATION_JSON_TYPE)) {
@ -423,6 +425,8 @@ public class TestRMWebServicesAppsModification extends JerseyTestBase {
} else { } else {
verifyAppStateXML(response, RMAppState.KILLED); verifyAppStateXML(response, RMAppState.KILLED);
} }
assertTrue("Diagnostic message is incorrect",
app.getDiagnostics().toString().contains(diagnostic));
break; break;
} }
} }