YARN-8973. [Router] Add missing methods in RMWebProtocol. (#4664)
This commit is contained in:
parent
57da4bb0a1
commit
1f0a71a92b
|
@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
||||||
|
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -745,4 +746,31 @@ public interface RMWebServiceProtocol {
|
||||||
*/
|
*/
|
||||||
Response signalToContainer(String containerId, String command,
|
Response signalToContainer(String containerId, String command,
|
||||||
HttpServletRequest req) throws AuthorizationException;
|
HttpServletRequest req) throws AuthorizationException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method updates the Scheduler configuration, and it is reachable by
|
||||||
|
* using {@link RMWSConsts#SCHEDULER_CONF}.
|
||||||
|
*
|
||||||
|
* @param mutationInfo th information for making scheduler configuration
|
||||||
|
* changes (supports adding, removing, or updating a queue, as well
|
||||||
|
* as global scheduler conf changes)
|
||||||
|
* @param hsr the servlet request
|
||||||
|
* @return Response containing the status code
|
||||||
|
* @throws AuthorizationException if the user is not authorized to invoke this
|
||||||
|
* method
|
||||||
|
* @throws InterruptedException if interrupted
|
||||||
|
*/
|
||||||
|
Response updateSchedulerConfiguration(SchedConfUpdateInfo mutationInfo,
|
||||||
|
HttpServletRequest hsr) throws AuthorizationException, InterruptedException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method retrieves all the Scheduler configuration, and it is reachable
|
||||||
|
* by using {@link RMWSConsts#SCHEDULER_CONF}.
|
||||||
|
*
|
||||||
|
* @param hsr the servlet request
|
||||||
|
* @return Response containing the status code
|
||||||
|
* @throws AuthorizationException if the user is not authorized to invoke this
|
||||||
|
* method.
|
||||||
|
*/
|
||||||
|
Response getSchedulerConfiguration(HttpServletRequest hsr) throws AuthorizationException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInf
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
||||||
|
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
|
||||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -565,6 +566,25 @@ public class DefaultRequestInterceptorREST
|
||||||
null, null, getConf(), client);
|
null, null, getConf(), client);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Response updateSchedulerConfiguration(SchedConfUpdateInfo mutationInfo,
|
||||||
|
HttpServletRequest req)
|
||||||
|
throws AuthorizationException, InterruptedException {
|
||||||
|
return RouterWebServiceUtil.genericForward(webAppAddress, req,
|
||||||
|
Response.class, HTTPMethods.PUT,
|
||||||
|
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.SCHEDULER_CONF,
|
||||||
|
mutationInfo, null, getConf(), client);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Response getSchedulerConfiguration(HttpServletRequest req)
|
||||||
|
throws AuthorizationException {
|
||||||
|
return RouterWebServiceUtil.genericForward(webAppAddress, req,
|
||||||
|
Response.class, HTTPMethods.GET,
|
||||||
|
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.SCHEDULER_CONF,
|
||||||
|
null, null, getConf(), client);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setNextInterceptor(RESTRequestInterceptor next) {
|
public void setNextInterceptor(RESTRequestInterceptor next) {
|
||||||
throw new YarnRuntimeException("setNextInterceptor is being called on "
|
throw new YarnRuntimeException("setNextInterceptor is being called on "
|
||||||
|
|
|
@ -101,6 +101,7 @@ import org.apache.hadoop.yarn.server.router.clientrm.ClientMethod;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
||||||
|
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
import org.apache.hadoop.yarn.util.MonotonicClock;
|
import org.apache.hadoop.yarn.util.MonotonicClock;
|
||||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||||
|
@ -1417,6 +1418,19 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
||||||
throw new NotImplementedException("Code is not implemented");
|
throw new NotImplementedException("Code is not implemented");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Response updateSchedulerConfiguration(SchedConfUpdateInfo mutationInfo,
|
||||||
|
HttpServletRequest hsr)
|
||||||
|
throws AuthorizationException, InterruptedException {
|
||||||
|
throw new NotImplementedException("Code is not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Response getSchedulerConfiguration(HttpServletRequest hsr)
|
||||||
|
throws AuthorizationException {
|
||||||
|
throw new NotImplementedException("Code is not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setNextInterceptor(RESTRequestInterceptor next) {
|
public void setNextInterceptor(RESTRequestInterceptor next) {
|
||||||
throw new YarnRuntimeException("setNextInterceptor is being called on "
|
throw new YarnRuntimeException("setNextInterceptor is being called on "
|
||||||
|
|
|
@ -89,6 +89,7 @@ import org.apache.hadoop.yarn.server.router.Router;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
||||||
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
|
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
|
||||||
|
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -956,6 +957,33 @@ public class RouterWebServices implements RMWebServiceProtocol {
|
||||||
appAttemptId, containerId);
|
appAttemptId, containerId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@PUT
|
||||||
|
@Path(RMWSConsts.SCHEDULER_CONF)
|
||||||
|
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
||||||
|
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
|
||||||
|
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
|
||||||
|
@Override
|
||||||
|
public Response updateSchedulerConfiguration(SchedConfUpdateInfo mutationInfo,
|
||||||
|
HttpServletRequest hsr)
|
||||||
|
throws AuthorizationException, InterruptedException {
|
||||||
|
init();
|
||||||
|
RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
|
||||||
|
return pipeline.getRootInterceptor()
|
||||||
|
.updateSchedulerConfiguration(mutationInfo, hsr);
|
||||||
|
}
|
||||||
|
|
||||||
|
@GET
|
||||||
|
@Path(RMWSConsts.SCHEDULER_CONF)
|
||||||
|
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
||||||
|
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
|
||||||
|
@Override
|
||||||
|
public Response getSchedulerConfiguration(HttpServletRequest hsr)
|
||||||
|
throws AuthorizationException {
|
||||||
|
init();
|
||||||
|
RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
|
||||||
|
return pipeline.getRootInterceptor().getSchedulerConfiguration(hsr);
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected void setResponse(HttpServletResponse response) {
|
protected void setResponse(HttpServletResponse response) {
|
||||||
this.response = response;
|
this.response = response;
|
||||||
|
|
|
@ -395,4 +395,16 @@ public abstract class BaseRouterWebServicesTest {
|
||||||
when(request.getRemoteUser()).thenReturn(user);
|
when(request.getRemoteUser()).thenReturn(user);
|
||||||
return request;
|
return request;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected Response updateSchedulerConfiguration(String user)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
return routerWebService.updateSchedulerConfiguration(null,
|
||||||
|
createHttpServletRequest(user));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Response getSchedulerConfiguration(String user)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
return routerWebService.
|
||||||
|
getSchedulerConfiguration(createHttpServletRequest(user));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInf
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
||||||
|
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class mocks the RESTRequestInterceptor.
|
* This class mocks the RESTRequestInterceptor.
|
||||||
|
@ -373,4 +374,16 @@ public class MockRESTRequestInterceptor extends AbstractRESTRequestInterceptor {
|
||||||
HttpServletRequest req) {
|
HttpServletRequest req) {
|
||||||
return Response.status(Status.OK).build();
|
return Response.status(Status.OK).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Response updateSchedulerConfiguration(SchedConfUpdateInfo mutationInfo,
|
||||||
|
HttpServletRequest hsr) throws AuthorizationException, InterruptedException {
|
||||||
|
return Response.status(Status.OK).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Response getSchedulerConfiguration(HttpServletRequest hsr)
|
||||||
|
throws AuthorizationException {
|
||||||
|
return Response.status(Status.OK).build();
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInf
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
||||||
|
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Mock interceptor that does not do anything other than forwarding it to the
|
* Mock interceptor that does not do anything other than forwarding it to the
|
||||||
|
@ -379,4 +380,17 @@ public class PassThroughRESTRequestInterceptor
|
||||||
String command, HttpServletRequest req) throws AuthorizationException {
|
String command, HttpServletRequest req) throws AuthorizationException {
|
||||||
return getNextInterceptor().signalToContainer(containerId, command, req);
|
return getNextInterceptor().signalToContainer(containerId, command, req);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Response updateSchedulerConfiguration(SchedConfUpdateInfo mutationInfo,
|
||||||
|
HttpServletRequest hsr)
|
||||||
|
throws AuthorizationException, InterruptedException {
|
||||||
|
return getNextInterceptor().updateSchedulerConfiguration(mutationInfo, hsr);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Response getSchedulerConfiguration(HttpServletRequest hsr)
|
||||||
|
throws AuthorizationException {
|
||||||
|
return getNextInterceptor().getSchedulerConfiguration(hsr);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -200,6 +200,12 @@ public class TestRouterWebServices extends BaseRouterWebServicesTest {
|
||||||
|
|
||||||
ContainerInfo containerInfo = getContainer(user);
|
ContainerInfo containerInfo = getContainer(user);
|
||||||
Assert.assertNotNull(containerInfo);
|
Assert.assertNotNull(containerInfo);
|
||||||
|
|
||||||
|
Response response19 = updateSchedulerConfiguration(user);
|
||||||
|
Assert.assertNotNull(response19);
|
||||||
|
|
||||||
|
Response response20 = getSchedulerConfiguration(user);
|
||||||
|
Assert.assertNotNull(response20);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue