YARN-8693. Add signalToContainer REST API for RMWebServices. Contributed by Tao Yang.

This commit is contained in:
Weiwei Yang 2019-05-29 16:34:48 +08:00
parent afd844059c
commit 544876fe12
10 changed files with 308 additions and 0 deletions

View File

@ -189,6 +189,12 @@ public final class RMWSConsts {
public static final String CHECK_USER_ACCESS_TO_QUEUE = public static final String CHECK_USER_ACCESS_TO_QUEUE =
"/queues/{queue}/access"; "/queues/{queue}/access";
/**
* Path for {@code RMWebServiceProtocol#signalContainer}.
*/
public static final String SIGNAL_TO_CONTAINER =
"/containers/{containerid}/signal/{command}";
// ----------------QueryParams for RMWebServiceProtocol---------------- // ----------------QueryParams for RMWebServiceProtocol----------------
public static final String TIME = "time"; public static final String TIME = "time";
@ -229,6 +235,8 @@ public final class RMWSConsts {
public static final String REQUEST_PRIORITIES = "requestPriorities"; public static final String REQUEST_PRIORITIES = "requestPriorities";
public static final String ALLOCATION_REQUEST_IDS = "allocationRequestIds"; public static final String ALLOCATION_REQUEST_IDS = "allocationRequestIds";
public static final String GROUP_BY = "groupBy"; public static final String GROUP_BY = "groupBy";
public static final String SIGNAL = "signal";
public static final String COMMAND = "command";
private RMWSConsts() { private RMWSConsts() {
// not called // not called

View File

@ -712,4 +712,17 @@ Response updateApplicationTimeout(AppTimeoutInfo appTimeout,
RMQueueAclInfo checkUserAccessToQueue(String queue, String username, RMQueueAclInfo checkUserAccessToQueue(String queue, String username,
String queueAclType, HttpServletRequest hsr) String queueAclType, HttpServletRequest hsr)
throws AuthorizationException; throws AuthorizationException;
/**
* This method sends a signal to container.
* @param containerId containerId
* @param command signal command, it could be OUTPUT_THREAD_DUMP/
* GRACEFUL_SHUTDOWN/FORCEFUL_SHUTDOWN
* @param req request
* @return Response containing the status code
* @throws AuthorizationException if the user is not authorized to invoke this
* method.
*/
Response signalToContainer(String containerId, String command,
HttpServletRequest req) throws AuthorizationException;
} }

View File

@ -94,6 +94,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
@ -104,6 +105,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
@ -118,6 +120,7 @@
import org.apache.hadoop.yarn.api.records.ReservationRequests; import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -2550,4 +2553,35 @@ public RMQueueAclInfo checkUserAccessToQueue(
return new RMQueueAclInfo(true, user.getUserName(), ""); return new RMQueueAclInfo(true, user.getUserName(), "");
} }
@POST
@Path(RMWSConsts.SIGNAL_TO_CONTAINER)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public Response signalToContainer(
@PathParam(RMWSConsts.CONTAINERID) String containerId,
@PathParam(RMWSConsts.COMMAND) String command,
@Context HttpServletRequest hsr)
throws AuthorizationException {
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, true);
if (!EnumUtils.isValidEnum(
SignalContainerCommand.class, command.toUpperCase())) {
String errMsg =
"Invalid command: " + command.toUpperCase() + ", valid commands are: "
+ Arrays.asList(SignalContainerCommand.values());
return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
}
try {
ContainerId containerIdObj = ContainerId.fromString(containerId);
rm.getClientRMService().signalToContainer(SignalContainerRequest
.newInstance(containerIdObj,
SignalContainerCommand.valueOf(command.toUpperCase())));
} catch (Exception e) {
return Response.status(Status.INTERNAL_SERVER_ERROR)
.entity(e.getMessage()).build();
}
return Response.status(Status.OK).build();
}
} }

View File

@ -0,0 +1,158 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import javax.ws.rs.core.MediaType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.JettyUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
import org.eclipse.jetty.server.Response;
import org.junit.Before;
import org.junit.Test;
import com.google.inject.Guice;
import com.google.inject.servlet.ServletModule;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import com.sun.jersey.test.framework.WebAppDescriptor;
/**
* Testing containers REST API.
*/
public class TestRMWebServicesContainers extends JerseyTestBase {
private static MockRM rm;
private static String userName;
private static class WebServletModule extends ServletModule {
@Override
protected void configureServlets() {
bind(JAXBContextResolver.class);
bind(RMWebServices.class);
bind(GenericExceptionHandler.class);
try {
userName = UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException ioe) {
throw new RuntimeException("Unable to get current user name "
+ ioe.getMessage(), ioe);
}
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
ResourceScheduler.class);
conf.set(YarnConfiguration.YARN_ADMIN_ACL, userName);
rm = new MockRM(conf);
bind(ResourceManager.class).toInstance(rm);
serve("/*").with(GuiceContainer.class);
filter("/*").through(TestRMWebServicesAppsModification
.TestRMCustomAuthFilter.class);
}
}
static {
GuiceServletConfig.setInjector(
Guice.createInjector(new WebServletModule()));
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
GuiceServletConfig.setInjector(
Guice.createInjector(new WebServletModule()));
}
public TestRMWebServicesContainers() {
super(new WebAppDescriptor.Builder(
"org.apache.hadoop.yarn.server.resourcemanager.webapp")
.contextListenerClass(GuiceServletConfig.class)
.filterClass(com.google.inject.servlet.GuiceFilter.class)
.contextPath("jersey-guice-filter").servletPath("/").build());
}
@Test
public void testSignalContainer() throws Exception {
rm.start();
MockNM nm = rm.registerNode("127.0.0.1:1234", 2048);
RMApp app = rm.submitApp(1024);
nm.nodeHeartbeat(true);
MockRM
.waitForState(app.getCurrentAppAttempt(), RMAppAttemptState.ALLOCATED);
rm.sendAMLaunched(app.getCurrentAppAttempt().getAppAttemptId());
WebResource r = resource();
// test error command
ClientResponse response =
r.path("ws").path("v1").path("cluster").path("containers").path(
app.getCurrentAppAttempt().getMasterContainer().getId().toString())
.path("signal")
.path("not-exist-signal")
.queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON).post(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
assertEquals(Response.SC_BAD_REQUEST, response.getStatus());
assertTrue(response.getEntity(String.class)
.contains("Invalid command: NOT-EXIST-SIGNAL"));
// test error containerId
response =
r.path("ws").path("v1").path("cluster").path("containers").path("XXX")
.path("signal")
.path(SignalContainerCommand.OUTPUT_THREAD_DUMP.name())
.queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON).post(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
assertEquals(Response.SC_INTERNAL_SERVER_ERROR, response.getStatus());
assertTrue(
response.getEntity(String.class).contains("Invalid ContainerId"));
// test correct signal
response =
r.path("ws").path("v1").path("cluster").path("containers").path(
app.getCurrentAppAttempt().getMasterContainer().getId().toString())
.path("signal")
.path(SignalContainerCommand.OUTPUT_THREAD_DUMP.name())
.queryParam("user.name", userName)
.accept(MediaType.APPLICATION_JSON).post(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
assertEquals(Response.SC_OK, response.getStatus());
rm.stop();
}
}

View File

@ -543,4 +543,13 @@ public void setNextInterceptor(RESTRequestInterceptor next) {
+ "is correct"); + "is correct");
} }
@Override
public Response signalToContainer(String containerId, String command,
HttpServletRequest req) throws AuthorizationException {
return RouterWebServiceUtil
.genericForward(webAppAddress, req, Response.class, HTTPMethods.POST,
RMWSConsts.RM_WEB_SERVICE_PATH + "/" + RMWSConsts.CONTAINERS + "/"
+ containerId + "/" + RMWSConsts.SIGNAL + "/" + command, null,
null);
}
} }

View File

@ -1342,6 +1342,12 @@ public void setNextInterceptor(RESTRequestInterceptor next) {
+ "is correct"); + "is correct");
} }
@Override
public Response signalToContainer(String containerId, String command,
HttpServletRequest req) {
throw new NotImplementedException("Code is not implemented");
}
@Override @Override
public void shutdown() { public void shutdown() {
if (threadpool != null) { if (threadpool != null) {

View File

@ -934,4 +934,18 @@ protected void setResponse(HttpServletResponse response) {
this.response = response; this.response = response;
} }
@POST
@Path(RMWSConsts.SIGNAL_TO_CONTAINER)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
public Response signalToContainer(
@PathParam(RMWSConsts.CONTAINERID) String containerId,
@PathParam(RMWSConsts.COMMAND) String command,
@Context HttpServletRequest req)
throws AuthorizationException {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(req);
return pipeline.getRootInterceptor()
.signalToContainer(containerId, command, req);
}
} }

View File

@ -359,4 +359,10 @@ public ContainerInfo getContainer(HttpServletRequest req,
String containerId) { String containerId) {
return new ContainerInfo(); return new ContainerInfo();
} }
@Override
public Response signalToContainer(String containerId, String command,
HttpServletRequest req) {
return Response.status(Status.OK).build();
}
} }

View File

@ -363,4 +363,10 @@ public Response updateApplicationTimeout(AppTimeoutInfo appTimeout,
return getNextInterceptor().updateApplicationTimeout(appTimeout, hsr, return getNextInterceptor().updateApplicationTimeout(appTimeout, hsr,
appId); appId);
} }
@Override
public Response signalToContainer(String containerId,
String command, HttpServletRequest req) throws AuthorizationException {
return getNextInterceptor().signalToContainer(containerId, command, req);
}
} }

View File

@ -5291,3 +5291,57 @@ Response Header:
HTTP/1.1 200 OK HTTP/1.1 200 OK
Content-Type: application/xml Content-Type: application/xml
Transfer-Encoding: chunked Transfer-Encoding: chunked
Cluster Container Signal API
--------------------------------
With the Container Signal API, you can send a signal to a specified container with one of the following commands: OUTPUT_THREAD_DUMP, GRACEFUL_SHUTDOWN and FORCEFUL_SHUTDOWN.
### URI
http://rm-http-address:port/ws/v1/cluster/containers/{containerId}/signal/{command}
### HTTP Operations Supported
POST
### Query Parameters Supported
None
### Response Examples
**JSON response**
HTTP Request:
POST http://rm-http-address:port/ws/v1/cluster/containers/container_1531404209605_0008_01_000001/signal/OUTPUT_THREAD_DUMP
Accept: application/json
Content-Type: application/json
Response Header:
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
Server: Jetty(6.1.26)
No response body.
**XML response**
HTTP Request:
POST http://rm-http-address:port/ws/v1/cluster/containers/container_1531404209605_0008_01_000001/signal/OUTPUT_THREAD_DUMP
Accept: application/xml
Content-Type: application/xml
Response Header:
HTTP/1.1 200 OK
Content-Type: application/xml
Content-Length: 552
Server: Jetty(6.1.26)
No response body.