YARN-11226. [Federation] Add createNewReservation, submitReservation, updateReservation, deleteReservation REST APIs for Router. (#5175)

This commit is contained in:
slfan1989 2022-12-23 03:25:09 +08:00 committed by GitHub
parent e6056d128a
commit 17035da46e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1052 additions and 97 deletions

View File

@ -1062,4 +1062,93 @@ public final class FederationStateStoreFacade {
updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster); updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster);
} }
} }
/**
* Exists ReservationHomeSubCluster Mapping.
*
* @param reservationId reservationId
* @return true - exist, false - not exist
*/
public boolean existsReservationHomeSubCluster(ReservationId reservationId) {
try {
SubClusterId subClusterId = getReservationHomeSubCluster(reservationId);
if (subClusterId != null) {
return true;
}
} catch (YarnException e) {
LOG.warn("get homeSubCluster by reservationId = {} error.", reservationId, e);
}
return false;
}
/**
* Save Reservation And HomeSubCluster Mapping.
*
* @param reservationId reservationId
* @param homeSubCluster homeSubCluster
* @throws YarnException on failure
*/
public void addReservationHomeSubCluster(ReservationId reservationId,
ReservationHomeSubCluster homeSubCluster) throws YarnException {
try {
// persist the mapping of reservationId and the subClusterId which has
// been selected as its home
addReservationHomeSubCluster(homeSubCluster);
} catch (YarnException e) {
String msg = String.format(
"Unable to insert the ReservationId %s into the FederationStateStore.", reservationId);
throw new YarnException(msg, e);
}
}
/**
* Update Reservation And HomeSubCluster Mapping.
*
* @param subClusterId subClusterId
* @param reservationId reservationId
* @param homeSubCluster homeSubCluster
* @throws YarnException on failure
*/
public void updateReservationHomeSubCluster(SubClusterId subClusterId,
ReservationId reservationId, ReservationHomeSubCluster homeSubCluster) throws YarnException {
try {
// update the mapping of reservationId and the home subClusterId to
// the new subClusterId we have selected
updateReservationHomeSubCluster(homeSubCluster);
} catch (YarnException e) {
SubClusterId subClusterIdInStateStore = getReservationHomeSubCluster(reservationId);
if (subClusterId == subClusterIdInStateStore) {
LOG.info("Reservation {} already submitted on SubCluster {}.", reservationId, subClusterId);
} else {
String msg = String.format(
"Unable to update the ReservationId %s into the FederationStateStore.", reservationId);
throw new YarnException(msg, e);
}
}
}
/**
* Add or Update ReservationHomeSubCluster.
*
* @param reservationId reservationId.
* @param subClusterId homeSubClusterId, this is selected by strategy.
* @param retryCount number of retries.
* @throws YarnException yarn exception.
*/
public void addOrUpdateReservationHomeSubCluster(ReservationId reservationId,
SubClusterId subClusterId, int retryCount) throws YarnException {
Boolean exists = existsReservationHomeSubCluster(reservationId);
ReservationHomeSubCluster reservationHomeSubCluster =
ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
if (!exists || retryCount == 0) {
// persist the mapping of reservationId and the subClusterId which has
// been selected as its home.
addReservationHomeSubCluster(reservationId, reservationHomeSubCluster);
} else {
// update the mapping of reservationId and the home subClusterId to
// the new subClusterId we have selected.
updateReservationHomeSubCluster(subClusterId, reservationId,
reservationHomeSubCluster);
}
}
} }

View File

@ -27,6 +27,16 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
@ -57,6 +67,8 @@ public final class RouterServerUtil {
private static final String EPOCH_PREFIX = "e"; private static final String EPOCH_PREFIX = "e";
private static final String RESERVEIDSTR_PREFIX = "reservation_";
/** Disable constructor. */ /** Disable constructor. */
private RouterServerUtil() { private RouterServerUtil() {
} }
@ -494,6 +506,15 @@ public final class RouterServerUtil {
? token.decodeIdentifier().getRenewer().toString() : user.getShortUserName(); ? token.decodeIdentifier().getRenewer().toString() : user.getShortUserName();
} }
/**
* Set User information.
*
* If the username is empty, we will use the Yarn Router user directly.
* Do not create a proxy user if userName matches the userName on current UGI.
*
* @param userName userName.
* @return UserGroupInformation.
*/
public static UserGroupInformation setupUser(final String userName) { public static UserGroupInformation setupUser(final String userName) {
UserGroupInformation user = null; UserGroupInformation user = null;
try { try {
@ -513,7 +534,94 @@ public final class RouterServerUtil {
return user; return user;
} catch (IOException e) { } catch (IOException e) {
throw RouterServerUtil.logAndReturnYarnRunTimeException(e, throw RouterServerUtil.logAndReturnYarnRunTimeException(e,
"Error while creating Router RMAdmin Service for user : %s.", user); "Error while creating Router Service for user : %s.", user);
} }
} }
/**
* Check reservationId is accurate.
*
* We need to ensure that reservationId cannot be empty and
* can be converted to ReservationId object normally.
*
* @param reservationId reservationId.
* @throws IllegalArgumentException If the format of the reservationId is not accurate,
* an IllegalArgumentException needs to be thrown.
*/
@Public
@Unstable
public static void validateReservationId(String reservationId) throws IllegalArgumentException {
if (reservationId == null || reservationId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the reservationId is empty or null.");
}
if (!reservationId.startsWith(RESERVEIDSTR_PREFIX)) {
throw new IllegalArgumentException("Invalid ReservationId: " + reservationId);
}
String[] resFields = reservationId.split("_");
if (resFields.length != 3) {
throw new IllegalArgumentException("Invalid ReservationId: " + reservationId);
}
String clusterTimestamp = resFields[1];
String id = resFields[2];
if (!NumberUtils.isDigits(id) || !NumberUtils.isDigits(clusterTimestamp)) {
throw new IllegalArgumentException("Invalid ReservationId: " + reservationId);
}
}
/**
* Convert ReservationDefinitionInfo to ReservationDefinition.
*
* @param definitionInfo ReservationDefinitionInfo Object.
* @return ReservationDefinition.
*/
public static ReservationDefinition convertReservationDefinition(
ReservationDefinitionInfo definitionInfo) {
if (definitionInfo == null || definitionInfo.getReservationRequests() == null
|| definitionInfo.getReservationRequests().getReservationRequest() == null
|| definitionInfo.getReservationRequests().getReservationRequest().isEmpty()) {
throw new RuntimeException("definitionInfo Or ReservationRequests is Null.");
}
// basic variable
long arrival = definitionInfo.getArrival();
long deadline = definitionInfo.getDeadline();
// ReservationRequests reservationRequests
String name = definitionInfo.getReservationName();
String recurrenceExpression = definitionInfo.getRecurrenceExpression();
Priority priority = Priority.newInstance(definitionInfo.getPriority());
// reservation requests info
List<ReservationRequest> reservationRequestList = new ArrayList<>();
ReservationRequestsInfo reservationRequestsInfo = definitionInfo.getReservationRequests();
List<ReservationRequestInfo> reservationRequestInfos =
reservationRequestsInfo.getReservationRequest();
for (ReservationRequestInfo resRequestInfo : reservationRequestInfos) {
ResourceInfo resourceInfo = resRequestInfo.getCapability();
Resource capability =
Resource.newInstance(resourceInfo.getMemorySize(), resourceInfo.getvCores());
ReservationRequest reservationRequest = ReservationRequest.newInstance(capability,
resRequestInfo.getNumContainers(), resRequestInfo.getMinConcurrency(),
resRequestInfo.getDuration());
reservationRequestList.add(reservationRequest);
}
ReservationRequestInterpreter[] values = ReservationRequestInterpreter.values();
ReservationRequestInterpreter reservationRequestInterpreter =
values[reservationRequestsInfo.getReservationRequestsInterpreter()];
ReservationRequests reservationRequests = ReservationRequests.newInstance(
reservationRequestList, reservationRequestInterpreter);
ReservationDefinition definition = ReservationDefinition.newInstance(
arrival, deadline, reservationRequests, name, recurrenceExpression, priority);
return definition;
}
} }

View File

@ -20,9 +20,7 @@ package org.apache.hadoop.yarn.server.router.webapp;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import java.io.IOException;
/** /**
* Extends the RequestInterceptor class and provides common functionality which * Extends the RequestInterceptor class and provides common functionality which
@ -68,7 +66,7 @@ public abstract class AbstractRESTRequestInterceptor
*/ */
@Override @Override
public void init(String userName) { public void init(String userName) {
setupUser(userName); this.user = RouterServerUtil.setupUser(userName);
if (this.nextInterceptor != null) { if (this.nextInterceptor != null) {
this.nextInterceptor.init(userName); this.nextInterceptor.init(userName);
} }
@ -92,34 +90,6 @@ public abstract class AbstractRESTRequestInterceptor
return this.nextInterceptor; return this.nextInterceptor;
} }
/**
* Set User information.
*
* If the username is empty, we will use the Yarn Router user directly.
* Do not create a proxy user if user name matches the user name on current UGI.
* @param userName userName.
*/
private void setupUser(final String userName) {
try {
if (userName == null || userName.isEmpty()) {
user = UserGroupInformation.getCurrentUser();
} else if (UserGroupInformation.isSecurityEnabled()) {
user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser());
} else if (userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName())) {
user = UserGroupInformation.getCurrentUser();
} else {
user = UserGroupInformation.createProxyUser(userName,
UserGroupInformation.getCurrentUser());
}
} catch (IOException e) {
String message = "Error while creating Router RMAdmin Service for user:";
if (user != null) {
message += ", user: " + user;
}
throw new YarnRuntimeException(message, e);
}
}
public UserGroupInformation getUser() { public UserGroupInformation getUser() {
return user; return user;
} }

View File

@ -51,11 +51,13 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Sets; import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -101,6 +103,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionIn
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
import org.apache.hadoop.yarn.server.router.RouterMetrics; import org.apache.hadoop.yarn.server.router.RouterMetrics;
import org.apache.hadoop.yarn.server.router.RouterServerUtil; import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.apache.hadoop.yarn.server.router.clientrm.ClientMethod; import org.apache.hadoop.yarn.server.router.clientrm.ClientMethod;
@ -1588,28 +1591,239 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
@Override @Override
public Response createNewReservation(HttpServletRequest hsr) public Response createNewReservation(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException { throws AuthorizationException, IOException, InterruptedException {
throw new NotImplementedException("Code is not implemented"); long startTime = clock.getTime();
try {
Map<SubClusterId, SubClusterInfo> subClustersActive =
federationFacade.getSubClusters(true);
// We declare blackList and retries.
List<SubClusterId> blackList = new ArrayList<>();
int actualRetryNums = federationFacade.getRetryNumbers(numSubmitRetries);
Response response = ((FederationActionRetry<Response>) (retryCount) ->
invokeCreateNewReservation(subClustersActive, blackList, hsr, retryCount)).
runWithRetries(actualRetryNums, submitIntervalTime);
// If the response is not empty and the status is SC_OK,
// this request can be returned directly.
if (response != null && response.getStatus() == HttpServletResponse.SC_OK) {
long stopTime = clock.getTime();
routerMetrics.succeededGetNewReservationRetrieved(stopTime - startTime);
return response;
}
} catch (FederationPolicyException e) {
// If a FederationPolicyException is thrown, the service is unavailable.
routerMetrics.incrGetNewReservationFailedRetrieved();
return Response.status(Status.SERVICE_UNAVAILABLE).entity(e.getLocalizedMessage()).build();
} catch (Exception e) {
routerMetrics.incrGetNewReservationFailedRetrieved();
return Response.status(Status.INTERNAL_SERVER_ERROR).entity(e.getLocalizedMessage()).build();
}
// return error message directly.
String errMsg = "Fail to create a new reservation.";
LOG.error(errMsg);
routerMetrics.incrGetNewReservationFailedRetrieved();
return Response.status(Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
}
private Response invokeCreateNewReservation(Map<SubClusterId, SubClusterInfo> subClustersActive,
List<SubClusterId> blackList, HttpServletRequest hsr, int retryCount)
throws YarnException, IOException, InterruptedException {
SubClusterId subClusterId =
federationFacade.getRandomActiveSubCluster(subClustersActive, blackList);
LOG.info("createNewReservation try #{} on SubCluster {}.", retryCount, subClusterId);
SubClusterInfo subClusterInfo = subClustersActive.get(subClusterId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterId, subClusterInfo.getRMWebServiceAddress());
try {
Response response = interceptor.createNewReservation(hsr);
if (response != null && response.getStatus() == HttpServletResponse.SC_OK) {
return response;
}
} catch (Exception e) {
blackList.add(subClusterId);
RouterServerUtil.logAndThrowException(e.getMessage(), e);
}
// We need to throw the exception directly.
String msg = String.format("Unable to create a new ReservationId in SubCluster %s.",
subClusterId.getId());
throw new YarnException(msg);
} }
@Override @Override
public Response submitReservation(ReservationSubmissionRequestInfo resContext, public Response submitReservation(ReservationSubmissionRequestInfo resContext,
HttpServletRequest hsr) HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException {
throws AuthorizationException, IOException, InterruptedException { long startTime = clock.getTime();
throw new NotImplementedException("Code is not implemented"); if (resContext == null || resContext.getReservationId() == null
|| resContext.getReservationDefinition() == null || resContext.getQueue() == null) {
routerMetrics.incrSubmitReservationFailedRetrieved();
String errMsg = "Missing submitReservation resContext or reservationId " +
"or reservation definition or queue.";
return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
}
// Check that the resId format is accurate
String resId = resContext.getReservationId();
try {
RouterServerUtil.validateReservationId(resId);
} catch (IllegalArgumentException e) {
routerMetrics.incrSubmitReservationFailedRetrieved();
throw e;
}
List<SubClusterId> blackList = new ArrayList<>();
try {
int activeSubClustersCount = federationFacade.getActiveSubClustersCount();
int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries);
Response response = ((FederationActionRetry<Response>) (retryCount) ->
invokeSubmitReservation(resContext, blackList, hsr, retryCount)).
runWithRetries(actualRetryNums, submitIntervalTime);
if (response != null) {
long stopTime = clock.getTime();
routerMetrics.succeededSubmitReservationRetrieved(stopTime - startTime);
return response;
}
} catch (Exception e) {
routerMetrics.incrSubmitReservationFailedRetrieved();
return Response.status(Status.SERVICE_UNAVAILABLE).entity(e.getLocalizedMessage()).build();
}
routerMetrics.incrSubmitReservationFailedRetrieved();
String msg = String.format("Reservation %s failed to be submitted.", resId);
return Response.status(Status.SERVICE_UNAVAILABLE).entity(msg).build();
}
private Response invokeSubmitReservation(ReservationSubmissionRequestInfo requestContext,
List<SubClusterId> blackList, HttpServletRequest hsr, int retryCount)
throws YarnException, IOException, InterruptedException {
String resId = requestContext.getReservationId();
ReservationId reservationId = ReservationId.parseReservationId(resId);
ReservationDefinitionInfo definitionInfo = requestContext.getReservationDefinition();
ReservationDefinition definition =
RouterServerUtil.convertReservationDefinition(definitionInfo);
// First, Get SubClusterId according to specific strategy.
ReservationSubmissionRequest request = ReservationSubmissionRequest.newInstance(
definition, requestContext.getQueue(), reservationId);
SubClusterId subClusterId = null;
try {
// Get subClusterId from policy.
subClusterId = policyFacade.getReservationHomeSubCluster(request);
// Print the log of submitting the submitApplication.
LOG.info("submitReservation ReservationId {} try #{} on SubCluster {}.", reservationId,
retryCount, subClusterId);
// Step2. We Store the mapping relationship
// between Application and HomeSubCluster in stateStore.
federationFacade.addOrUpdateReservationHomeSubCluster(reservationId,
subClusterId, retryCount);
// Step3. We get subClusterInfo based on subClusterId.
SubClusterInfo subClusterInfo = federationFacade.getSubCluster(subClusterId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
HttpServletRequest hsrCopy = clone(hsr);
Response response = interceptor.submitReservation(requestContext, hsrCopy);
if (response != null && response.getStatus() == HttpServletResponse.SC_ACCEPTED) {
LOG.info("Reservation {} submitted on subCluster {}.", reservationId, subClusterId);
return response;
}
String msg = String.format("application %s failed to be submitted.", resId);
throw new YarnException(msg);
} catch (Exception e) {
LOG.warn("Unable to submit the reservation {} to SubCluster {}.", resId,
subClusterId, e);
if (subClusterId != null) {
blackList.add(subClusterId);
}
throw e;
}
} }
@Override @Override
public Response updateReservation(ReservationUpdateRequestInfo resContext, public Response updateReservation(ReservationUpdateRequestInfo resContext,
HttpServletRequest hsr) HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException {
throws AuthorizationException, IOException, InterruptedException {
throw new NotImplementedException("Code is not implemented"); // parameter verification
if (resContext == null || resContext.getReservationId() == null
|| resContext.getReservationDefinition() == null) {
routerMetrics.incrUpdateReservationFailedRetrieved();
String errMsg = "Missing updateReservation resContext or reservationId " +
"or reservation definition.";
return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
}
// get reservationId
String reservationId = resContext.getReservationId();
// Check that the reservationId format is accurate
try {
RouterServerUtil.validateReservationId(reservationId);
} catch (IllegalArgumentException e) {
routerMetrics.incrUpdateReservationFailedRetrieved();
throw e;
}
try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByReservationId(reservationId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
HttpServletRequest hsrCopy = clone(hsr);
Response response = interceptor.updateReservation(resContext, hsrCopy);
if (response != null) {
return response;
}
} catch (Exception e) {
routerMetrics.incrUpdateReservationFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("updateReservation Failed.", e);
}
// throw an exception
routerMetrics.incrUpdateReservationFailedRetrieved();
throw new YarnRuntimeException("updateReservation Failed, reservationId = " + reservationId);
} }
@Override @Override
public Response deleteReservation(ReservationDeleteRequestInfo resContext, public Response deleteReservation(ReservationDeleteRequestInfo resContext,
HttpServletRequest hsr) HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException { throws AuthorizationException, IOException, InterruptedException {
throw new NotImplementedException("Code is not implemented");
// parameter verification
if (resContext == null || resContext.getReservationId() == null) {
routerMetrics.incrDeleteReservationFailedRetrieved();
String errMsg = "Missing deleteReservation request or reservationId.";
return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
}
// get ReservationId
String reservationId = resContext.getReservationId();
// Check that the reservationId format is accurate
try {
RouterServerUtil.validateReservationId(reservationId);
} catch (IllegalArgumentException e) {
routerMetrics.incrDeleteReservationFailedRetrieved();
throw e;
}
try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByReservationId(reservationId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
HttpServletRequest hsrCopy = clone(hsr);
Response response = interceptor.deleteReservation(resContext, hsrCopy);
if (response != null) {
return response;
}
} catch (Exception e) {
routerMetrics.incrDeleteReservationFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("deleteReservation Failed.", e);
}
// throw an exception
routerMetrics.incrDeleteReservationFailedRetrieved();
throw new YarnRuntimeException("deleteReservation Failed, reservationId = " + reservationId);
} }
@Override @Override
@ -1627,9 +1841,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
throw new IllegalArgumentException("Parameter error, the reservationId is empty or null."); throw new IllegalArgumentException("Parameter error, the reservationId is empty or null.");
} }
// Check that the appId format is accurate // Check that the reservationId format is accurate
try { try {
ReservationId.parseReservationId(reservationId); RouterServerUtil.validateReservationId(reservationId);
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
routerMetrics.incrListReservationFailedRetrieved(); routerMetrics.incrListReservationFailedRetrieved();
throw e; throw e;
@ -2190,6 +2404,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
} }
@VisibleForTesting @VisibleForTesting
public Map<SubClusterId, DefaultRequestInterceptorREST> getInterceptors() {
return interceptors;
}
public void setAllowPartialResult(boolean allowPartialResult) { public void setAllowPartialResult(boolean allowPartialResult) {
this.allowPartialResult = allowPartialResult; this.allowPartialResult = allowPartialResult;
} }

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/package org.apache.hadoop.yarn.server.router;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import static org.apache.hadoop.yarn.server.router.webapp.TestFederationInterceptorREST.getReservationSubmissionRequestInfo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class TestRouterServerUtil {
public static final Logger LOG = LoggerFactory.getLogger(TestRouterServerUtil.class);
@Test
public void testConvertReservationDefinition() {
// Prepare parameters
ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
ReservationSubmissionRequestInfo requestInfo =
getReservationSubmissionRequestInfo(reservationId);
ReservationDefinitionInfo expectDefinitionInfo = requestInfo.getReservationDefinition();
// ReservationDefinitionInfo conversion ReservationDefinition
ReservationDefinition convertDefinition =
RouterServerUtil.convertReservationDefinition(expectDefinitionInfo);
// reservationDefinition is not null
assertNotNull(convertDefinition);
assertEquals(expectDefinitionInfo.getArrival(), convertDefinition.getArrival());
assertEquals(expectDefinitionInfo.getDeadline(), convertDefinition.getDeadline());
Priority priority = convertDefinition.getPriority();
assertNotNull(priority);
assertEquals(expectDefinitionInfo.getPriority(), priority.getPriority());
assertEquals(expectDefinitionInfo.getRecurrenceExpression(),
convertDefinition.getRecurrenceExpression());
assertEquals(expectDefinitionInfo.getReservationName(), convertDefinition.getReservationName());
ReservationRequestsInfo expectRequestsInfo = expectDefinitionInfo.getReservationRequests();
List<ReservationRequestInfo> expectRequestsInfoList =
expectRequestsInfo.getReservationRequest();
ReservationRequests convertReservationRequests =
convertDefinition.getReservationRequests();
assertNotNull(convertReservationRequests);
List<ReservationRequest> convertRequestList =
convertReservationRequests.getReservationResources();
assertNotNull(convertRequestList);
assertEquals(1, convertRequestList.size());
ReservationRequestInfo expectResRequestInfo = expectRequestsInfoList.get(0);
ReservationRequest convertResRequest = convertRequestList.get(0);
assertNotNull(convertResRequest);
assertEquals(expectResRequestInfo.getNumContainers(), convertResRequest.getNumContainers());
assertEquals(expectResRequestInfo.getDuration(), convertResRequest.getDuration());
ResourceInfo expectResourceInfo = expectResRequestInfo.getCapability();
Resource convertResource = convertResRequest.getCapability();
assertNotNull(expectResourceInfo);
assertEquals(expectResourceInfo.getMemorySize(), convertResource.getMemorySize());
assertEquals(expectResourceInfo.getvCores(), convertResource.getVirtualCores());
}
@Test
public void testConvertReservationDefinitionEmpty() throws Exception {
// param ReservationDefinitionInfo is Null
ReservationDefinitionInfo definitionInfo = null;
// null request1
LambdaTestUtils.intercept(RuntimeException.class,
"definitionInfo Or ReservationRequests is Null.",
() -> RouterServerUtil.convertReservationDefinition(definitionInfo));
// param ReservationRequests is Null
ReservationDefinitionInfo definitionInfo2 = new ReservationDefinitionInfo();
// null request2
LambdaTestUtils.intercept(RuntimeException.class,
"definitionInfo Or ReservationRequests is Null.",
() -> RouterServerUtil.convertReservationDefinition(definitionInfo2));
// param ReservationRequests is Null
ReservationDefinitionInfo definitionInfo3 = new ReservationDefinitionInfo();
ReservationRequestsInfo requestsInfo = new ReservationRequestsInfo();
definitionInfo3.setReservationRequests(requestsInfo);
// null request3
LambdaTestUtils.intercept(RuntimeException.class,
"definitionInfo Or ReservationRequests is Null.",
() -> RouterServerUtil.convertReservationDefinition(definitionInfo3));
}
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
@ -74,10 +75,17 @@ public abstract class BaseRouterWebServicesTest {
private Router router; private Router router;
public final static int TEST_MAX_CACHE_SIZE = 10; public final static int TEST_MAX_CACHE_SIZE = 10;
public static final String QUEUE_DEFAULT = "default";
public static final String QUEUE_DEFAULT_FULL = CapacitySchedulerConfiguration.ROOT +
CapacitySchedulerConfiguration.DOT + QUEUE_DEFAULT;
public static final String QUEUE_DEDICATED = "dedicated";
public static final String QUEUE_DEDICATED_FULL = CapacitySchedulerConfiguration.ROOT +
CapacitySchedulerConfiguration.DOT + QUEUE_DEDICATED;
private RouterWebServices routerWebService; private RouterWebServices routerWebService;
@Before @Before
public void setUp() { public void setUp() throws YarnException, IOException {
this.conf = createConfiguration(); this.conf = createConfiguration();
router = spy(new Router()); router = spy(new Router());

View File

@ -27,7 +27,9 @@ import java.util.Map;
import java.util.HashMap; import java.util.HashMap;
import java.util.Collections; import java.util.Collections;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -40,6 +42,7 @@ import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang3.EnumUtils; import org.apache.commons.lang3.EnumUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -47,6 +50,11 @@ import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.util.Sets; import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -65,10 +73,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ApplicationTimeout; import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -78,7 +87,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@ -117,10 +125,19 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationListInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationListInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateResponseInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteResponseInfo;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.PartitionInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.PartitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServices;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
@ -134,6 +151,11 @@ import org.mockito.Mockito;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEFAULT;
import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEFAULT_FULL;
import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEDICATED;
import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEDICATED_FULL;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -153,19 +175,16 @@ public class MockDefaultRequestInterceptorREST
private Map<ApplicationId, ApplicationReport> applicationMap = new HashMap<>(); private Map<ApplicationId, ApplicationReport> applicationMap = new HashMap<>();
public static final String APP_STATE_RUNNING = "RUNNING"; public static final String APP_STATE_RUNNING = "RUNNING";
private static final String QUEUE_DEFAULT = "default";
private static final String QUEUE_DEFAULT_FULL = CapacitySchedulerConfiguration.ROOT +
CapacitySchedulerConfiguration.DOT + QUEUE_DEFAULT;
private static final String QUEUE_DEDICATED = "dedicated";
public static final String QUEUE_DEDICATED_FULL = CapacitySchedulerConfiguration.ROOT +
CapacitySchedulerConfiguration.DOT + QUEUE_DEDICATED;
// duration(milliseconds), 1mins // duration(milliseconds), 1mins
public static final long DURATION = 60*1000; public static final long DURATION = 60*1000;
// Containers 4 // Containers 4
public static final int NUM_CONTAINERS = 4; public static final int NUM_CONTAINERS = 4;
private Map<ReservationId, SubClusterId> reservationMap = new HashMap<>();
private AtomicLong resCounter = new AtomicLong();
private MockRM mockRM = null;
private void validateRunning() throws ConnectException { private void validateRunning() throws ConnectException {
if (!isRunning) { if (!isRunning) {
throw new ConnectException("RM is stopped"); throw new ConnectException("RM is stopped");
@ -859,44 +878,191 @@ public class MockDefaultRequestInterceptorREST
" Please try again with a valid reservable queue."); " Please try again with a valid reservable queue.");
} }
MockRM mockRM = setupResourceManager(); ReservationId reservationID =
ReservationId.parseReservationId(reservationId);
ReservationId reservationID = ReservationId.parseReservationId(reservationId); if (!reservationMap.containsKey(reservationID)) {
ReservationSystem reservationSystem = mockRM.getReservationSystem(); throw new NotFoundException("reservationId with id: " + reservationId + " not found");
reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true); }
// Generate reserved resources
ClientRMService clientService = mockRM.getClientRMService(); ClientRMService clientService = mockRM.getClientRMService();
// arrival time from which the resource(s) can be allocated.
long arrival = Time.now();
// deadline by when the resource(s) must be allocated.
// The reason for choosing 1.05 is because this gives an integer
// DURATION * 0.05 = 3000(ms)
// deadline = arrival + 3000ms
long deadline = (long) (arrival + 1.05 * DURATION);
// In this test of reserved resources, we will apply for 4 containers (1 core, 1GB memory)
// arrival = Time.now(), and make sure deadline - arrival > duration,
// the current setting is greater than 3000ms
ReservationSubmissionRequest submissionRequest =
ReservationSystemTestUtil.createSimpleReservationRequest(
reservationID, NUM_CONTAINERS, arrival, deadline, DURATION);
clientService.submitReservation(submissionRequest);
// listReservations // listReservations
ReservationListRequest request = ReservationListRequest.newInstance( ReservationListRequest request = ReservationListRequest.newInstance(
queue, reservationID.toString(), startTime, endTime, includeResourceAllocations); queue, reservationId, startTime, endTime, includeResourceAllocations);
ReservationListResponse resRespInfo = clientService.listReservations(request); ReservationListResponse resRespInfo = clientService.listReservations(request);
ReservationListInfo resResponse = ReservationListInfo resResponse =
new ReservationListInfo(resRespInfo, includeResourceAllocations); new ReservationListInfo(resRespInfo, includeResourceAllocations);
if (mockRM != null) { return Response.status(Status.OK).entity(resResponse).build();
mockRM.stop();
} }
return Response.status(Status.OK).entity(resResponse).build(); @Override
public Response createNewReservation(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ReservationId resId = ReservationId.newInstance(Time.now(), resCounter.incrementAndGet());
LOG.info("Allocated new reservationId: {}.", resId);
NewReservation reservationId = new NewReservation(resId.toString());
return Response.status(Status.OK).entity(reservationId).build();
}
@Override
public Response submitReservation(ReservationSubmissionRequestInfo resContext,
HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ReservationId reservationId = ReservationId.parseReservationId(resContext.getReservationId());
ReservationDefinitionInfo definitionInfo = resContext.getReservationDefinition();
ReservationDefinition definition =
RouterServerUtil.convertReservationDefinition(definitionInfo);
ReservationSubmissionRequest request = ReservationSubmissionRequest.newInstance(
definition, resContext.getQueue(), reservationId);
submitReservation(request);
LOG.info("Reservation submitted: {}.", reservationId);
SubClusterId subClusterId = getSubClusterId();
reservationMap.put(reservationId, subClusterId);
return Response.status(Status.ACCEPTED).build();
}
private void submitReservation(ReservationSubmissionRequest request) {
try {
// synchronize plan
ReservationSystem reservationSystem = mockRM.getReservationSystem();
reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true);
// Generate reserved resources
ClientRMService clientService = mockRM.getClientRMService();
clientService.submitReservation(request);
} catch (IOException | YarnException e) {
throw new RuntimeException(e);
}
}
@Override
public Response updateReservation(ReservationUpdateRequestInfo resContext,
HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException {
if (resContext == null || resContext.getReservationId() == null ||
resContext.getReservationDefinition() == null) {
return Response.status(Status.BAD_REQUEST).build();
}
String resId = resContext.getReservationId();
ReservationId reservationId = ReservationId.parseReservationId(resId);
if (!reservationMap.containsKey(reservationId)) {
throw new NotFoundException("reservationId with id: " + reservationId + " not found");
}
// Generate reserved resources
updateReservation(resContext);
ReservationUpdateResponseInfo resRespInfo = new ReservationUpdateResponseInfo();
return Response.status(Status.OK).entity(resRespInfo).build();
}
private void updateReservation(ReservationUpdateRequestInfo resContext) throws IOException {
if (resContext == null) {
throw new BadRequestException("Input ReservationSubmissionContext should not be null");
}
ReservationDefinitionInfo resInfo = resContext.getReservationDefinition();
if (resInfo == null) {
throw new BadRequestException("Input ReservationDefinition should not be null");
}
ReservationRequestsInfo resReqsInfo = resInfo.getReservationRequests();
if (resReqsInfo == null || resReqsInfo.getReservationRequest() == null
|| resReqsInfo.getReservationRequest().isEmpty()) {
throw new BadRequestException("The ReservationDefinition should " +
"contain at least one ReservationRequest");
}
if (resContext.getReservationId() == null) {
throw new BadRequestException("Update operations must specify an existing ReservaitonId");
}
ReservationRequestInterpreter[] values = ReservationRequestInterpreter.values();
ReservationRequestInterpreter requestInterpreter =
values[resReqsInfo.getReservationRequestsInterpreter()];
List<ReservationRequest> list = new ArrayList<>();
for (ReservationRequestInfo resReqInfo : resReqsInfo.getReservationRequest()) {
ResourceInfo rInfo = resReqInfo.getCapability();
Resource capability = Resource.newInstance(rInfo.getMemorySize(), rInfo.getvCores());
int numContainers = resReqInfo.getNumContainers();
int minConcurrency = resReqInfo.getMinConcurrency();
long duration = resReqInfo.getDuration();
ReservationRequest rr = ReservationRequest.newInstance(
capability, numContainers, minConcurrency, duration);
list.add(rr);
}
ReservationRequests reqs = ReservationRequests.newInstance(list, requestInterpreter);
ReservationDefinition rDef = ReservationDefinition.newInstance(
resInfo.getArrival(), resInfo.getDeadline(), reqs,
resInfo.getReservationName(), resInfo.getRecurrenceExpression(),
Priority.newInstance(resInfo.getPriority()));
ReservationUpdateRequest request = ReservationUpdateRequest.newInstance(
rDef, ReservationId.parseReservationId(resContext.getReservationId()));
ClientRMService clientService = mockRM.getClientRMService();
try {
clientService.updateReservation(request);
} catch (YarnException ex) {
throw new RuntimeException(ex);
}
}
@Override
public Response deleteReservation(ReservationDeleteRequestInfo resContext, HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
try {
String resId = resContext.getReservationId();
ReservationId reservationId = ReservationId.parseReservationId(resId);
if (!reservationMap.containsKey(reservationId)) {
throw new NotFoundException("reservationId with id: " + reservationId + " not found");
}
ReservationDeleteRequest reservationDeleteRequest =
ReservationDeleteRequest.newInstance(reservationId);
ClientRMService clientService = mockRM.getClientRMService();
clientService.deleteReservation(reservationDeleteRequest);
ReservationDeleteResponseInfo resRespInfo = new ReservationDeleteResponseInfo();
reservationMap.remove(reservationId);
return Response.status(Status.OK).entity(resRespInfo).build();
} catch (YarnException e) {
throw new RuntimeException(e);
}
}
@VisibleForTesting
public MockRM getMockRM() {
return mockRM;
}
@VisibleForTesting
public void setMockRM(MockRM mockResourceManager) {
this.mockRM = mockResourceManager;
} }
@Override @Override
@ -939,7 +1105,7 @@ public class MockDefaultRequestInterceptorREST
public RMQueueAclInfo checkUserAccessToQueue(String queue, String username, public RMQueueAclInfo checkUserAccessToQueue(String queue, String username,
String queueAclType, HttpServletRequest hsr) throws AuthorizationException { String queueAclType, HttpServletRequest hsr) throws AuthorizationException {
ResourceManager mockRM = mock(ResourceManager.class); ResourceManager mockResourceManager = mock(ResourceManager.class);
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();
ResourceScheduler mockScheduler = new CapacityScheduler() { ResourceScheduler mockScheduler = new CapacityScheduler() {
@ -959,8 +1125,9 @@ public class MockDefaultRequestInterceptorREST
} }
}; };
when(mockRM.getResourceScheduler()).thenReturn(mockScheduler); when(mockResourceManager.getResourceScheduler()).thenReturn(mockScheduler);
MockRMWebServices webSvc = new MockRMWebServices(mockRM, conf, mock(HttpServletResponse.class)); MockRMWebServices webSvc = new MockRMWebServices(mockResourceManager, conf,
mock(HttpServletResponse.class));
return webSvc.checkUserAccessToQueue(queue, username, queueAclType, hsr); return webSvc.checkUserAccessToQueue(queue, username, queueAclType, hsr);
} }

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -45,6 +46,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -59,8 +65,6 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
@ -86,6 +90,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationListInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationListInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey; import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey;
@ -94,6 +99,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInf
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo; import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
@ -106,7 +114,8 @@ import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.hadoop.yarn.server.router.webapp.MockDefaultRequestInterceptorREST.QUEUE_DEDICATED_FULL; import static org.apache.hadoop.yarn.server.router.webapp.MockDefaultRequestInterceptorREST.DURATION;
import static org.apache.hadoop.yarn.server.router.webapp.MockDefaultRequestInterceptorREST.NUM_CONTAINERS;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -130,7 +139,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
private List<SubClusterId> subClusters; private List<SubClusterId> subClusters;
@Override @Override
public void setUp() { public void setUp() throws YarnException, IOException {
super.setUpConfig(); super.setUpConfig();
interceptor = new TestableFederationInterceptorREST(); interceptor = new TestableFederationInterceptorREST();
@ -156,6 +165,13 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
Assert.fail(); Assert.fail();
} }
for (SubClusterId subCluster : subClusters) {
SubClusterInfo subClusterInfo = stateStoreUtil.querySubClusterInfo(subCluster);
interceptor.getOrCreateInterceptorForSubCluster(
subCluster, subClusterInfo.getRMWebServiceAddress());
}
interceptor.setupResourceManager();
} }
@Override @Override
@ -1100,14 +1116,9 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
@Test @Test
public void testListReservation() throws Exception { public void testListReservation() throws Exception {
// Add ReservationId In stateStore // submitReservation
ReservationId reservationId = ReservationId.newInstance(Time.now(), 1); ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
SubClusterId homeSubClusterId = subClusters.get(0); submitReservation(reservationId);
ReservationHomeSubCluster reservationHomeSubCluster =
ReservationHomeSubCluster.newInstance(reservationId, homeSubClusterId);
AddReservationHomeSubClusterRequest request =
AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster);
stateStore.addReservationHomeSubCluster(request);
// Call the listReservation method // Call the listReservation method
String applyReservationId = reservationId.toString(); String applyReservationId = reservationId.toString();
@ -1157,6 +1168,199 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
Assert.assertEquals(1024, memory); Assert.assertEquals(1024, memory);
} }
@Test
public void testCreateNewReservation() throws Exception {
Response response = interceptor.createNewReservation(null);
Assert.assertNotNull(response);
Object entity = response.getEntity();
Assert.assertNotNull(entity);
Assert.assertTrue(entity instanceof NewReservation);
NewReservation newReservation = (NewReservation) entity;
Assert.assertNotNull(newReservation);
Assert.assertTrue(newReservation.getReservationId().contains("reservation"));
}
@Test
public void testSubmitReservation() throws Exception {
// submit reservation
ReservationId reservationId = ReservationId.newInstance(Time.now(), 2);
Response response = submitReservation(reservationId);
Assert.assertNotNull(response);
Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
String applyReservationId = reservationId.toString();
Response reservationResponse = interceptor.listReservation(
QUEUE_DEDICATED_FULL, applyReservationId, -1, -1, false, null);
Assert.assertNotNull(reservationResponse);
Object entity = reservationResponse.getEntity();
Assert.assertNotNull(entity);
Assert.assertNotNull(entity instanceof ReservationListInfo);
ReservationListInfo listInfo = (ReservationListInfo) entity;
Assert.assertNotNull(listInfo);
List<ReservationInfo> reservationInfos = listInfo.getReservations();
Assert.assertNotNull(reservationInfos);
Assert.assertEquals(1, reservationInfos.size());
ReservationInfo reservationInfo = reservationInfos.get(0);
Assert.assertNotNull(reservationInfo);
Assert.assertEquals(reservationInfo.getReservationId(), applyReservationId);
}
@Test
public void testUpdateReservation() throws Exception {
// submit reservation
ReservationId reservationId = ReservationId.newInstance(Time.now(), 3);
Response response = submitReservation(reservationId);
Assert.assertNotNull(response);
Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
// update reservation
ReservationSubmissionRequest resSubRequest =
getReservationSubmissionRequest(reservationId, 6, 2048, 2);
ReservationDefinition reservationDefinition = resSubRequest.getReservationDefinition();
ReservationDefinitionInfo reservationDefinitionInfo =
new ReservationDefinitionInfo(reservationDefinition);
ReservationUpdateRequestInfo updateRequestInfo = new ReservationUpdateRequestInfo();
updateRequestInfo.setReservationId(reservationId.toString());
updateRequestInfo.setReservationDefinition(reservationDefinitionInfo);
Response updateReservationResp = interceptor.updateReservation(updateRequestInfo, null);
Assert.assertNotNull(updateReservationResp);
Assert.assertEquals(Status.OK.getStatusCode(), updateReservationResp.getStatus());
String applyReservationId = reservationId.toString();
Response reservationResponse = interceptor.listReservation(
QUEUE_DEDICATED_FULL, applyReservationId, -1, -1, false, null);
Assert.assertNotNull(reservationResponse);
Object entity = reservationResponse.getEntity();
Assert.assertNotNull(entity);
Assert.assertNotNull(entity instanceof ReservationListInfo);
ReservationListInfo listInfo = (ReservationListInfo) entity;
Assert.assertNotNull(listInfo);
List<ReservationInfo> reservationInfos = listInfo.getReservations();
Assert.assertNotNull(reservationInfos);
Assert.assertEquals(1, reservationInfos.size());
ReservationInfo reservationInfo = reservationInfos.get(0);
Assert.assertNotNull(reservationInfo);
Assert.assertEquals(reservationInfo.getReservationId(), applyReservationId);
ReservationDefinitionInfo resDefinitionInfo = reservationInfo.getReservationDefinition();
Assert.assertNotNull(resDefinitionInfo);
ReservationRequestsInfo reservationRequestsInfo = resDefinitionInfo.getReservationRequests();
Assert.assertNotNull(reservationRequestsInfo);
ArrayList<ReservationRequestInfo> reservationRequestInfoList =
reservationRequestsInfo.getReservationRequest();
Assert.assertNotNull(reservationRequestInfoList);
Assert.assertEquals(1, reservationRequestInfoList.size());
ReservationRequestInfo reservationRequestInfo = reservationRequestInfoList.get(0);
Assert.assertNotNull(reservationRequestInfo);
Assert.assertEquals(6, reservationRequestInfo.getNumContainers());
ResourceInfo resourceInfo = reservationRequestInfo.getCapability();
Assert.assertNotNull(resourceInfo);
int vCore = resourceInfo.getvCores();
long memory = resourceInfo.getMemorySize();
Assert.assertEquals(2, vCore);
Assert.assertEquals(2048, memory);
}
@Test
public void testDeleteReservation() throws Exception {
// submit reservation
ReservationId reservationId = ReservationId.newInstance(Time.now(), 4);
Response response = submitReservation(reservationId);
Assert.assertNotNull(response);
Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
String applyResId = reservationId.toString();
Response reservationResponse = interceptor.listReservation(
QUEUE_DEDICATED_FULL, applyResId, -1, -1, false, null);
Assert.assertNotNull(reservationResponse);
ReservationDeleteRequestInfo deleteRequestInfo =
new ReservationDeleteRequestInfo();
deleteRequestInfo.setReservationId(applyResId);
Response delResponse = interceptor.deleteReservation(deleteRequestInfo, null);
Assert.assertNotNull(delResponse);
LambdaTestUtils.intercept(Exception.class,
"reservationId with id: " + reservationId + " not found",
() -> interceptor.listReservation(QUEUE_DEDICATED_FULL, applyResId, -1, -1, false, null));
}
private Response submitReservation(ReservationId reservationId)
throws IOException, InterruptedException {
ReservationSubmissionRequestInfo resSubmissionRequestInfo =
getReservationSubmissionRequestInfo(reservationId);
Response response = interceptor.submitReservation(resSubmissionRequestInfo, null);
return response;
}
public static ReservationSubmissionRequestInfo getReservationSubmissionRequestInfo(
ReservationId reservationId) {
ReservationSubmissionRequest resSubRequest =
getReservationSubmissionRequest(reservationId, NUM_CONTAINERS, 1024, 1);
ReservationDefinition reservationDefinition = resSubRequest.getReservationDefinition();
ReservationSubmissionRequestInfo resSubmissionRequestInfo =
new ReservationSubmissionRequestInfo();
resSubmissionRequestInfo.setQueue(resSubRequest.getQueue());
resSubmissionRequestInfo.setReservationId(reservationId.toString());
ReservationDefinitionInfo reservationDefinitionInfo =
new ReservationDefinitionInfo(reservationDefinition);
resSubmissionRequestInfo.setReservationDefinition(reservationDefinitionInfo);
return resSubmissionRequestInfo;
}
public static ReservationSubmissionRequest getReservationSubmissionRequest(
ReservationId reservationId, int numContainers, int memory, int vcore) {
// arrival time from which the resource(s) can be allocated.
long arrival = Time.now();
// deadline by when the resource(s) must be allocated.
// The reason for choosing 1.05 is because this gives an integer
// DURATION * 0.05 = 3000(ms)
// deadline = arrival + 3000ms
long deadline = (long) (arrival + 1.05 * DURATION);
ReservationSubmissionRequest submissionRequest = createSimpleReservationRequest(
reservationId, numContainers, arrival, deadline, DURATION, memory, vcore);
return submissionRequest;
}
public static ReservationSubmissionRequest createSimpleReservationRequest(
ReservationId reservationId, int numContainers, long arrival,
long deadline, long duration, int memory, int vcore) {
// create a request with a single atomic ask
ReservationRequest r = ReservationRequest.newInstance(
Resource.newInstance(memory, vcore), numContainers, 1, duration);
ReservationRequests reqs = ReservationRequests.newInstance(
Collections.singletonList(r), ReservationRequestInterpreter.R_ALL);
ReservationDefinition rDef = ReservationDefinition.newInstance(
arrival, deadline, reqs, "testClientRMService#reservation", "0", Priority.UNDEFINED);
ReservationSubmissionRequest request = ReservationSubmissionRequest.newInstance(
rDef, QUEUE_DEDICATED_FULL, reservationId);
return request;
}
@Test @Test
public void testWebAddressWithScheme() { public void testWebAddressWithScheme() {
// The style of the web address reported by the subCluster in the heartbeat is 0.0.0.0:8000 // The style of the web address reported by the subCluster in the heartbeat is 0.0.0.0:8000

View File

@ -18,10 +18,25 @@
package org.apache.hadoop.yarn.server.router.webapp; package org.apache.hadoop.yarn.server.router.webapp;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEDICATED_FULL;
import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEFAULT_FULL;
import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEFAULT;
import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEDICATED;
/** /**
* Extends the FederationInterceptorREST and overrides methods to provide a * Extends the FederationInterceptorREST and overrides methods to provide a
@ -30,7 +45,11 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
public class TestableFederationInterceptorREST public class TestableFederationInterceptorREST
extends FederationInterceptorREST { extends FederationInterceptorREST {
private List<SubClusterId> badSubCluster = new ArrayList<SubClusterId>(); private List<SubClusterId> badSubCluster = new ArrayList<>();
private MockRM mockRM = null;
private static final Logger LOG =
LoggerFactory.getLogger(TestableFederationInterceptorREST.class);
/** /**
* For testing purpose, some subclusters has to be down to simulate particular * For testing purpose, some subclusters has to be down to simulate particular
@ -51,4 +70,51 @@ public class TestableFederationInterceptorREST
interceptor.setRunning(false); interceptor.setRunning(false);
} }
protected void setupResourceManager() throws IOException {
if (mockRM != null) {
return;
}
try {
DefaultMetricsSystem.setMiniClusterMode(true);
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
// Define default queue
conf.setCapacity(QUEUE_DEFAULT_FULL, 20);
// Define dedicated queues
String[] queues = new String[]{QUEUE_DEFAULT, QUEUE_DEDICATED};
conf.setQueues(CapacitySchedulerConfiguration.ROOT, queues);
conf.setCapacity(QUEUE_DEDICATED_FULL, 80);
conf.setReservable(QUEUE_DEDICATED_FULL, true);
conf.setClass(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class, ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
mockRM = new MockRM(conf);
mockRM.start();
mockRM.registerNode("127.0.0.1:5678", 100*1024, 100);
Map<SubClusterId, DefaultRequestInterceptorREST> interceptors = super.getInterceptors();
for (DefaultRequestInterceptorREST item : interceptors.values()) {
MockDefaultRequestInterceptorREST interceptor = (MockDefaultRequestInterceptorREST) item;
interceptor.setMockRM(mockRM);
}
} catch (Exception e) {
LOG.error("setupResourceManager failed.", e);
throw new IOException(e);
}
}
@Override
public void shutdown() {
if (mockRM != null) {
mockRM.stop();
mockRM = null;
}
super.shutdown();
}
} }