From 17035da46efa945f7eeff3e7f253a7031c6bcdd0 Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Fri, 23 Dec 2022 03:25:09 +0800 Subject: [PATCH] YARN-11226. [Federation] Add createNewReservation, submitReservation, updateReservation, deleteReservation REST APIs for Router. (#5175) --- .../utils/FederationStateStoreFacade.java | 89 +++++++ .../yarn/server/router/RouterServerUtil.java | 110 +++++++- .../AbstractRESTRequestInterceptor.java | 34 +-- .../webapp/FederationInterceptorREST.java | 238 ++++++++++++++++- .../server/router/TestRouterServerUtil.java | 125 +++++++++ .../webapp/BaseRouterWebServicesTest.java | 10 +- .../MockDefaultRequestInterceptorREST.java | 249 +++++++++++++++--- .../webapp/TestFederationInterceptorREST.java | 226 +++++++++++++++- .../TestableFederationInterceptorREST.java | 68 ++++- 9 files changed, 1052 insertions(+), 97 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterServerUtil.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index e1ebce82892..8c36fba1f1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -1062,4 +1062,93 @@ public void addOrUpdateApplicationHomeSubCluster(ApplicationId applicationId, updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster); } } + + /** + * Exists ReservationHomeSubCluster Mapping. + * + * @param reservationId reservationId + * @return true - exist, false - not exist + */ + public boolean existsReservationHomeSubCluster(ReservationId reservationId) { + try { + SubClusterId subClusterId = getReservationHomeSubCluster(reservationId); + if (subClusterId != null) { + return true; + } + } catch (YarnException e) { + LOG.warn("get homeSubCluster by reservationId = {} error.", reservationId, e); + } + return false; + } + + /** + * Save Reservation And HomeSubCluster Mapping. + * + * @param reservationId reservationId + * @param homeSubCluster homeSubCluster + * @throws YarnException on failure + */ + public void addReservationHomeSubCluster(ReservationId reservationId, + ReservationHomeSubCluster homeSubCluster) throws YarnException { + try { + // persist the mapping of reservationId and the subClusterId which has + // been selected as its home + addReservationHomeSubCluster(homeSubCluster); + } catch (YarnException e) { + String msg = String.format( + "Unable to insert the ReservationId %s into the FederationStateStore.", reservationId); + throw new YarnException(msg, e); + } + } + + /** + * Update Reservation And HomeSubCluster Mapping. + * + * @param subClusterId subClusterId + * @param reservationId reservationId + * @param homeSubCluster homeSubCluster + * @throws YarnException on failure + */ + public void updateReservationHomeSubCluster(SubClusterId subClusterId, + ReservationId reservationId, ReservationHomeSubCluster homeSubCluster) throws YarnException { + try { + // update the mapping of reservationId and the home subClusterId to + // the new subClusterId we have selected + updateReservationHomeSubCluster(homeSubCluster); + } catch (YarnException e) { + SubClusterId subClusterIdInStateStore = getReservationHomeSubCluster(reservationId); + if (subClusterId == subClusterIdInStateStore) { + LOG.info("Reservation {} already submitted on SubCluster {}.", reservationId, subClusterId); + } else { + String msg = String.format( + "Unable to update the ReservationId %s into the FederationStateStore.", reservationId); + throw new YarnException(msg, e); + } + } + } + + /** + * Add or Update ReservationHomeSubCluster. + * + * @param reservationId reservationId. + * @param subClusterId homeSubClusterId, this is selected by strategy. + * @param retryCount number of retries. + * @throws YarnException yarn exception. + */ + public void addOrUpdateReservationHomeSubCluster(ReservationId reservationId, + SubClusterId subClusterId, int retryCount) throws YarnException { + Boolean exists = existsReservationHomeSubCluster(reservationId); + ReservationHomeSubCluster reservationHomeSubCluster = + ReservationHomeSubCluster.newInstance(reservationId, subClusterId); + if (!exists || retryCount == 0) { + // persist the mapping of reservationId and the subClusterId which has + // been selected as its home. + addReservationHomeSubCluster(reservationId, reservationHomeSubCluster); + } else { + // update the mapping of reservationId and the home subClusterId to + // the new subClusterId we have selected. + updateReservationHomeSubCluster(subClusterId, reservationId, + reservationHomeSubCluster); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java index 8c880f25ddb..8fa6ca2f055 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java @@ -27,6 +27,16 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ReservationRequests; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; @@ -57,6 +67,8 @@ public final class RouterServerUtil { private static final String EPOCH_PREFIX = "e"; + private static final String RESERVEIDSTR_PREFIX = "reservation_"; + /** Disable constructor. */ private RouterServerUtil() { } @@ -494,6 +506,15 @@ public static String getRenewerForToken(Token token ? token.decodeIdentifier().getRenewer().toString() : user.getShortUserName(); } + /** + * Set User information. + * + * If the username is empty, we will use the Yarn Router user directly. + * Do not create a proxy user if userName matches the userName on current UGI. + * + * @param userName userName. + * @return UserGroupInformation. + */ public static UserGroupInformation setupUser(final String userName) { UserGroupInformation user = null; try { @@ -513,7 +534,94 @@ public static UserGroupInformation setupUser(final String userName) { return user; } catch (IOException e) { throw RouterServerUtil.logAndReturnYarnRunTimeException(e, - "Error while creating Router RMAdmin Service for user : %s.", user); + "Error while creating Router Service for user : %s.", user); } } + + /** + * Check reservationId is accurate. + * + * We need to ensure that reservationId cannot be empty and + * can be converted to ReservationId object normally. + * + * @param reservationId reservationId. + * @throws IllegalArgumentException If the format of the reservationId is not accurate, + * an IllegalArgumentException needs to be thrown. + */ + @Public + @Unstable + public static void validateReservationId(String reservationId) throws IllegalArgumentException { + + if (reservationId == null || reservationId.isEmpty()) { + throw new IllegalArgumentException("Parameter error, the reservationId is empty or null."); + } + + if (!reservationId.startsWith(RESERVEIDSTR_PREFIX)) { + throw new IllegalArgumentException("Invalid ReservationId: " + reservationId); + } + + String[] resFields = reservationId.split("_"); + if (resFields.length != 3) { + throw new IllegalArgumentException("Invalid ReservationId: " + reservationId); + } + + String clusterTimestamp = resFields[1]; + String id = resFields[2]; + if (!NumberUtils.isDigits(id) || !NumberUtils.isDigits(clusterTimestamp)) { + throw new IllegalArgumentException("Invalid ReservationId: " + reservationId); + } + } + + /** + * Convert ReservationDefinitionInfo to ReservationDefinition. + * + * @param definitionInfo ReservationDefinitionInfo Object. + * @return ReservationDefinition. + */ + public static ReservationDefinition convertReservationDefinition( + ReservationDefinitionInfo definitionInfo) { + if (definitionInfo == null || definitionInfo.getReservationRequests() == null + || definitionInfo.getReservationRequests().getReservationRequest() == null + || definitionInfo.getReservationRequests().getReservationRequest().isEmpty()) { + throw new RuntimeException("definitionInfo Or ReservationRequests is Null."); + } + + // basic variable + long arrival = definitionInfo.getArrival(); + long deadline = definitionInfo.getDeadline(); + + // ReservationRequests reservationRequests + String name = definitionInfo.getReservationName(); + String recurrenceExpression = definitionInfo.getRecurrenceExpression(); + Priority priority = Priority.newInstance(definitionInfo.getPriority()); + + // reservation requests info + List reservationRequestList = new ArrayList<>(); + + ReservationRequestsInfo reservationRequestsInfo = definitionInfo.getReservationRequests(); + + List reservationRequestInfos = + reservationRequestsInfo.getReservationRequest(); + + for (ReservationRequestInfo resRequestInfo : reservationRequestInfos) { + ResourceInfo resourceInfo = resRequestInfo.getCapability(); + Resource capability = + Resource.newInstance(resourceInfo.getMemorySize(), resourceInfo.getvCores()); + ReservationRequest reservationRequest = ReservationRequest.newInstance(capability, + resRequestInfo.getNumContainers(), resRequestInfo.getMinConcurrency(), + resRequestInfo.getDuration()); + reservationRequestList.add(reservationRequest); + } + + ReservationRequestInterpreter[] values = ReservationRequestInterpreter.values(); + ReservationRequestInterpreter reservationRequestInterpreter = + values[reservationRequestsInfo.getReservationRequestsInterpreter()]; + ReservationRequests reservationRequests = ReservationRequests.newInstance( + reservationRequestList, reservationRequestInterpreter); + + ReservationDefinition definition = ReservationDefinition.newInstance( + arrival, deadline, reservationRequests, name, recurrenceExpression, priority); + + return definition; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/AbstractRESTRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/AbstractRESTRequestInterceptor.java index 66b2495e2c9..baf931ac46e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/AbstractRESTRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/AbstractRESTRequestInterceptor.java @@ -20,9 +20,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; - -import java.io.IOException; +import org.apache.hadoop.yarn.server.router.RouterServerUtil; /** * Extends the RequestInterceptor class and provides common functionality which @@ -68,7 +66,7 @@ public Configuration getConf() { */ @Override public void init(String userName) { - setupUser(userName); + this.user = RouterServerUtil.setupUser(userName); if (this.nextInterceptor != null) { this.nextInterceptor.init(userName); } @@ -92,34 +90,6 @@ public RESTRequestInterceptor getNextInterceptor() { return this.nextInterceptor; } - /** - * Set User information. - * - * If the username is empty, we will use the Yarn Router user directly. - * Do not create a proxy user if user name matches the user name on current UGI. - * @param userName userName. - */ - private void setupUser(final String userName) { - try { - if (userName == null || userName.isEmpty()) { - user = UserGroupInformation.getCurrentUser(); - } else if (UserGroupInformation.isSecurityEnabled()) { - user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser()); - } else if (userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName())) { - user = UserGroupInformation.getCurrentUser(); - } else { - user = UserGroupInformation.createProxyUser(userName, - UserGroupInformation.getCurrentUser()); - } - } catch (IOException e) { - String message = "Error while creating Router RMAdmin Service for user:"; - if (user != null) { - message += ", user: " + user; - } - throw new YarnRuntimeException(message, e); - } - } - public UserGroupInformation getUser() { return user; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index 61edfb363d0..93e7a16cd91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -51,11 +51,13 @@ import org.apache.hadoop.util.Sets; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -101,6 +103,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo; import org.apache.hadoop.yarn.server.router.RouterMetrics; import org.apache.hadoop.yarn.server.router.RouterServerUtil; import org.apache.hadoop.yarn.server.router.clientrm.ClientMethod; @@ -1588,28 +1591,239 @@ public Response cancelDelegationToken(HttpServletRequest hsr) @Override public Response createNewReservation(HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { - throw new NotImplementedException("Code is not implemented"); + long startTime = clock.getTime(); + try { + Map subClustersActive = + federationFacade.getSubClusters(true); + // We declare blackList and retries. + List blackList = new ArrayList<>(); + int actualRetryNums = federationFacade.getRetryNumbers(numSubmitRetries); + Response response = ((FederationActionRetry) (retryCount) -> + invokeCreateNewReservation(subClustersActive, blackList, hsr, retryCount)). + runWithRetries(actualRetryNums, submitIntervalTime); + // If the response is not empty and the status is SC_OK, + // this request can be returned directly. + if (response != null && response.getStatus() == HttpServletResponse.SC_OK) { + long stopTime = clock.getTime(); + routerMetrics.succeededGetNewReservationRetrieved(stopTime - startTime); + return response; + } + } catch (FederationPolicyException e) { + // If a FederationPolicyException is thrown, the service is unavailable. + routerMetrics.incrGetNewReservationFailedRetrieved(); + return Response.status(Status.SERVICE_UNAVAILABLE).entity(e.getLocalizedMessage()).build(); + } catch (Exception e) { + routerMetrics.incrGetNewReservationFailedRetrieved(); + return Response.status(Status.INTERNAL_SERVER_ERROR).entity(e.getLocalizedMessage()).build(); + } + + // return error message directly. + String errMsg = "Fail to create a new reservation."; + LOG.error(errMsg); + routerMetrics.incrGetNewReservationFailedRetrieved(); + return Response.status(Status.INTERNAL_SERVER_ERROR).entity(errMsg).build(); + } + + private Response invokeCreateNewReservation(Map subClustersActive, + List blackList, HttpServletRequest hsr, int retryCount) + throws YarnException, IOException, InterruptedException { + SubClusterId subClusterId = + federationFacade.getRandomActiveSubCluster(subClustersActive, blackList); + LOG.info("createNewReservation try #{} on SubCluster {}.", retryCount, subClusterId); + SubClusterInfo subClusterInfo = subClustersActive.get(subClusterId); + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( + subClusterId, subClusterInfo.getRMWebServiceAddress()); + try { + Response response = interceptor.createNewReservation(hsr); + if (response != null && response.getStatus() == HttpServletResponse.SC_OK) { + return response; + } + } catch (Exception e) { + blackList.add(subClusterId); + RouterServerUtil.logAndThrowException(e.getMessage(), e); + } + // We need to throw the exception directly. + String msg = String.format("Unable to create a new ReservationId in SubCluster %s.", + subClusterId.getId()); + throw new YarnException(msg); } @Override public Response submitReservation(ReservationSubmissionRequestInfo resContext, - HttpServletRequest hsr) - throws AuthorizationException, IOException, InterruptedException { - throw new NotImplementedException("Code is not implemented"); + HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { + long startTime = clock.getTime(); + if (resContext == null || resContext.getReservationId() == null + || resContext.getReservationDefinition() == null || resContext.getQueue() == null) { + routerMetrics.incrSubmitReservationFailedRetrieved(); + String errMsg = "Missing submitReservation resContext or reservationId " + + "or reservation definition or queue."; + return Response.status(Status.BAD_REQUEST).entity(errMsg).build(); + } + + // Check that the resId format is accurate + String resId = resContext.getReservationId(); + try { + RouterServerUtil.validateReservationId(resId); + } catch (IllegalArgumentException e) { + routerMetrics.incrSubmitReservationFailedRetrieved(); + throw e; + } + + List blackList = new ArrayList<>(); + try { + int activeSubClustersCount = federationFacade.getActiveSubClustersCount(); + int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries); + Response response = ((FederationActionRetry) (retryCount) -> + invokeSubmitReservation(resContext, blackList, hsr, retryCount)). + runWithRetries(actualRetryNums, submitIntervalTime); + if (response != null) { + long stopTime = clock.getTime(); + routerMetrics.succeededSubmitReservationRetrieved(stopTime - startTime); + return response; + } + } catch (Exception e) { + routerMetrics.incrSubmitReservationFailedRetrieved(); + return Response.status(Status.SERVICE_UNAVAILABLE).entity(e.getLocalizedMessage()).build(); + } + + routerMetrics.incrSubmitReservationFailedRetrieved(); + String msg = String.format("Reservation %s failed to be submitted.", resId); + return Response.status(Status.SERVICE_UNAVAILABLE).entity(msg).build(); + } + + private Response invokeSubmitReservation(ReservationSubmissionRequestInfo requestContext, + List blackList, HttpServletRequest hsr, int retryCount) + throws YarnException, IOException, InterruptedException { + String resId = requestContext.getReservationId(); + ReservationId reservationId = ReservationId.parseReservationId(resId); + ReservationDefinitionInfo definitionInfo = requestContext.getReservationDefinition(); + ReservationDefinition definition = + RouterServerUtil.convertReservationDefinition(definitionInfo); + + // First, Get SubClusterId according to specific strategy. + ReservationSubmissionRequest request = ReservationSubmissionRequest.newInstance( + definition, requestContext.getQueue(), reservationId); + SubClusterId subClusterId = null; + + try { + // Get subClusterId from policy. + subClusterId = policyFacade.getReservationHomeSubCluster(request); + + // Print the log of submitting the submitApplication. + LOG.info("submitReservation ReservationId {} try #{} on SubCluster {}.", reservationId, + retryCount, subClusterId); + + // Step2. We Store the mapping relationship + // between Application and HomeSubCluster in stateStore. + federationFacade.addOrUpdateReservationHomeSubCluster(reservationId, + subClusterId, retryCount); + + // Step3. We get subClusterInfo based on subClusterId. + SubClusterInfo subClusterInfo = federationFacade.getSubCluster(subClusterId); + + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( + subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + HttpServletRequest hsrCopy = clone(hsr); + Response response = interceptor.submitReservation(requestContext, hsrCopy); + if (response != null && response.getStatus() == HttpServletResponse.SC_ACCEPTED) { + LOG.info("Reservation {} submitted on subCluster {}.", reservationId, subClusterId); + return response; + } + String msg = String.format("application %s failed to be submitted.", resId); + throw new YarnException(msg); + } catch (Exception e) { + LOG.warn("Unable to submit the reservation {} to SubCluster {}.", resId, + subClusterId, e); + if (subClusterId != null) { + blackList.add(subClusterId); + } + throw e; + } } @Override public Response updateReservation(ReservationUpdateRequestInfo resContext, - HttpServletRequest hsr) - throws AuthorizationException, IOException, InterruptedException { - throw new NotImplementedException("Code is not implemented"); + HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { + + // parameter verification + if (resContext == null || resContext.getReservationId() == null + || resContext.getReservationDefinition() == null) { + routerMetrics.incrUpdateReservationFailedRetrieved(); + String errMsg = "Missing updateReservation resContext or reservationId " + + "or reservation definition."; + return Response.status(Status.BAD_REQUEST).entity(errMsg).build(); + } + + // get reservationId + String reservationId = resContext.getReservationId(); + + // Check that the reservationId format is accurate + try { + RouterServerUtil.validateReservationId(reservationId); + } catch (IllegalArgumentException e) { + routerMetrics.incrUpdateReservationFailedRetrieved(); + throw e; + } + + try { + SubClusterInfo subClusterInfo = getHomeSubClusterInfoByReservationId(reservationId); + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( + subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + HttpServletRequest hsrCopy = clone(hsr); + Response response = interceptor.updateReservation(resContext, hsrCopy); + if (response != null) { + return response; + } + } catch (Exception e) { + routerMetrics.incrUpdateReservationFailedRetrieved(); + RouterServerUtil.logAndThrowRunTimeException("updateReservation Failed.", e); + } + + // throw an exception + routerMetrics.incrUpdateReservationFailedRetrieved(); + throw new YarnRuntimeException("updateReservation Failed, reservationId = " + reservationId); } @Override public Response deleteReservation(ReservationDeleteRequestInfo resContext, HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { - throw new NotImplementedException("Code is not implemented"); + + // parameter verification + if (resContext == null || resContext.getReservationId() == null) { + routerMetrics.incrDeleteReservationFailedRetrieved(); + String errMsg = "Missing deleteReservation request or reservationId."; + return Response.status(Status.BAD_REQUEST).entity(errMsg).build(); + } + + // get ReservationId + String reservationId = resContext.getReservationId(); + + // Check that the reservationId format is accurate + try { + RouterServerUtil.validateReservationId(reservationId); + } catch (IllegalArgumentException e) { + routerMetrics.incrDeleteReservationFailedRetrieved(); + throw e; + } + + try { + SubClusterInfo subClusterInfo = getHomeSubClusterInfoByReservationId(reservationId); + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( + subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + HttpServletRequest hsrCopy = clone(hsr); + Response response = interceptor.deleteReservation(resContext, hsrCopy); + if (response != null) { + return response; + } + } catch (Exception e) { + routerMetrics.incrDeleteReservationFailedRetrieved(); + RouterServerUtil.logAndThrowRunTimeException("deleteReservation Failed.", e); + } + + // throw an exception + routerMetrics.incrDeleteReservationFailedRetrieved(); + throw new YarnRuntimeException("deleteReservation Failed, reservationId = " + reservationId); } @Override @@ -1627,9 +1841,9 @@ public Response listReservation(String queue, String reservationId, throw new IllegalArgumentException("Parameter error, the reservationId is empty or null."); } - // Check that the appId format is accurate + // Check that the reservationId format is accurate try { - ReservationId.parseReservationId(reservationId); + RouterServerUtil.validateReservationId(reservationId); } catch (IllegalArgumentException e) { routerMetrics.incrListReservationFailedRetrieved(); throw e; @@ -2190,6 +2404,10 @@ public LRUCacheHashMap getAppInfosCaches() { } @VisibleForTesting + public Map getInterceptors() { + return interceptors; + } + public void setAllowPartialResult(boolean allowPartialResult) { this.allowPartialResult = allowPartialResult; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterServerUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterServerUtil.java new file mode 100644 index 00000000000..e82f67d12d5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterServerUtil.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */package org.apache.hadoop.yarn.server.router; + +import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequests; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +import static org.apache.hadoop.yarn.server.router.webapp.TestFederationInterceptorREST.getReservationSubmissionRequestInfo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class TestRouterServerUtil { + + public static final Logger LOG = LoggerFactory.getLogger(TestRouterServerUtil.class); + + @Test + public void testConvertReservationDefinition() { + // Prepare parameters + ReservationId reservationId = ReservationId.newInstance(Time.now(), 1); + ReservationSubmissionRequestInfo requestInfo = + getReservationSubmissionRequestInfo(reservationId); + ReservationDefinitionInfo expectDefinitionInfo = requestInfo.getReservationDefinition(); + + // ReservationDefinitionInfo conversion ReservationDefinition + ReservationDefinition convertDefinition = + RouterServerUtil.convertReservationDefinition(expectDefinitionInfo); + + // reservationDefinition is not null + assertNotNull(convertDefinition); + assertEquals(expectDefinitionInfo.getArrival(), convertDefinition.getArrival()); + assertEquals(expectDefinitionInfo.getDeadline(), convertDefinition.getDeadline()); + + Priority priority = convertDefinition.getPriority(); + assertNotNull(priority); + assertEquals(expectDefinitionInfo.getPriority(), priority.getPriority()); + assertEquals(expectDefinitionInfo.getRecurrenceExpression(), + convertDefinition.getRecurrenceExpression()); + assertEquals(expectDefinitionInfo.getReservationName(), convertDefinition.getReservationName()); + + ReservationRequestsInfo expectRequestsInfo = expectDefinitionInfo.getReservationRequests(); + List expectRequestsInfoList = + expectRequestsInfo.getReservationRequest(); + + ReservationRequests convertReservationRequests = + convertDefinition.getReservationRequests(); + assertNotNull(convertReservationRequests); + + List convertRequestList = + convertReservationRequests.getReservationResources(); + assertNotNull(convertRequestList); + assertEquals(1, convertRequestList.size()); + + ReservationRequestInfo expectResRequestInfo = expectRequestsInfoList.get(0); + ReservationRequest convertResRequest = convertRequestList.get(0); + assertNotNull(convertResRequest); + assertEquals(expectResRequestInfo.getNumContainers(), convertResRequest.getNumContainers()); + assertEquals(expectResRequestInfo.getDuration(), convertResRequest.getDuration()); + + ResourceInfo expectResourceInfo = expectResRequestInfo.getCapability(); + Resource convertResource = convertResRequest.getCapability(); + assertNotNull(expectResourceInfo); + assertEquals(expectResourceInfo.getMemorySize(), convertResource.getMemorySize()); + assertEquals(expectResourceInfo.getvCores(), convertResource.getVirtualCores()); + } + + @Test + public void testConvertReservationDefinitionEmpty() throws Exception { + + // param ReservationDefinitionInfo is Null + ReservationDefinitionInfo definitionInfo = null; + + // null request1 + LambdaTestUtils.intercept(RuntimeException.class, + "definitionInfo Or ReservationRequests is Null.", + () -> RouterServerUtil.convertReservationDefinition(definitionInfo)); + + // param ReservationRequests is Null + ReservationDefinitionInfo definitionInfo2 = new ReservationDefinitionInfo(); + + // null request2 + LambdaTestUtils.intercept(RuntimeException.class, + "definitionInfo Or ReservationRequests is Null.", + () -> RouterServerUtil.convertReservationDefinition(definitionInfo2)); + + // param ReservationRequests is Null + ReservationDefinitionInfo definitionInfo3 = new ReservationDefinitionInfo(); + ReservationRequestsInfo requestsInfo = new ReservationRequestsInfo(); + definitionInfo3.setReservationRequests(requestsInfo); + + // null request3 + LambdaTestUtils.intercept(RuntimeException.class, + "definitionInfo Or ReservationRequests is Null.", + () -> RouterServerUtil.convertReservationDefinition(definitionInfo3)); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java index a4294bc3610..9b086e51030 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java @@ -32,6 +32,7 @@ import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo; @@ -74,10 +75,17 @@ public abstract class BaseRouterWebServicesTest { private Router router; public final static int TEST_MAX_CACHE_SIZE = 10; + public static final String QUEUE_DEFAULT = "default"; + public static final String QUEUE_DEFAULT_FULL = CapacitySchedulerConfiguration.ROOT + + CapacitySchedulerConfiguration.DOT + QUEUE_DEFAULT; + public static final String QUEUE_DEDICATED = "dedicated"; + public static final String QUEUE_DEDICATED_FULL = CapacitySchedulerConfiguration.ROOT + + CapacitySchedulerConfiguration.DOT + QUEUE_DEDICATED; + private RouterWebServices routerWebService; @Before - public void setUp() { + public void setUp() throws YarnException, IOException { this.conf = createConfiguration(); router = spy(new Router()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java index 7f73434c766..e2ac5fbf260 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java @@ -27,7 +27,9 @@ import java.util.HashMap; import java.util.Collections; import java.util.Arrays; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -40,6 +42,7 @@ import org.apache.commons.lang3.EnumUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.UserGroupInformation; @@ -47,6 +50,11 @@ import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hadoop.util.Sets; import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; @@ -65,10 +73,11 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.ApplicationTimeout; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequests; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.QueueACL; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -78,7 +87,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -117,10 +125,19 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationListInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateResponseInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteResponseInfo; +import org.apache.hadoop.yarn.server.router.RouterServerUtil; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.PartitionInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo; -import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServices; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; @@ -134,6 +151,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEFAULT; +import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEFAULT_FULL; +import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEDICATED; +import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEDICATED_FULL; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -153,19 +175,16 @@ public class MockDefaultRequestInterceptorREST private Map applicationMap = new HashMap<>(); public static final String APP_STATE_RUNNING = "RUNNING"; - private static final String QUEUE_DEFAULT = "default"; - private static final String QUEUE_DEFAULT_FULL = CapacitySchedulerConfiguration.ROOT + - CapacitySchedulerConfiguration.DOT + QUEUE_DEFAULT; - private static final String QUEUE_DEDICATED = "dedicated"; - public static final String QUEUE_DEDICATED_FULL = CapacitySchedulerConfiguration.ROOT + - CapacitySchedulerConfiguration.DOT + QUEUE_DEDICATED; - // duration(milliseconds), 1mins public static final long DURATION = 60*1000; // Containers 4 public static final int NUM_CONTAINERS = 4; + private Map reservationMap = new HashMap<>(); + private AtomicLong resCounter = new AtomicLong(); + private MockRM mockRM = null; + private void validateRunning() throws ConnectException { if (!isRunning) { throw new ConnectException("RM is stopped"); @@ -859,44 +878,191 @@ public Response listReservation(String queue, String reservationId, long startTi " Please try again with a valid reservable queue."); } - MockRM mockRM = setupResourceManager(); + ReservationId reservationID = + ReservationId.parseReservationId(reservationId); - ReservationId reservationID = ReservationId.parseReservationId(reservationId); - ReservationSystem reservationSystem = mockRM.getReservationSystem(); - reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true); + if (!reservationMap.containsKey(reservationID)) { + throw new NotFoundException("reservationId with id: " + reservationId + " not found"); + } - // Generate reserved resources ClientRMService clientService = mockRM.getClientRMService(); - // arrival time from which the resource(s) can be allocated. - long arrival = Time.now(); - - // deadline by when the resource(s) must be allocated. - // The reason for choosing 1.05 is because this gives an integer - // DURATION * 0.05 = 3000(ms) - // deadline = arrival + 3000ms - long deadline = (long) (arrival + 1.05 * DURATION); - - // In this test of reserved resources, we will apply for 4 containers (1 core, 1GB memory) - // arrival = Time.now(), and make sure deadline - arrival > duration, - // the current setting is greater than 3000ms - ReservationSubmissionRequest submissionRequest = - ReservationSystemTestUtil.createSimpleReservationRequest( - reservationID, NUM_CONTAINERS, arrival, deadline, DURATION); - clientService.submitReservation(submissionRequest); - // listReservations ReservationListRequest request = ReservationListRequest.newInstance( - queue, reservationID.toString(), startTime, endTime, includeResourceAllocations); + queue, reservationId, startTime, endTime, includeResourceAllocations); ReservationListResponse resRespInfo = clientService.listReservations(request); ReservationListInfo resResponse = new ReservationListInfo(resRespInfo, includeResourceAllocations); - if (mockRM != null) { - mockRM.stop(); + return Response.status(Status.OK).entity(resResponse).build(); + } + + @Override + public Response createNewReservation(HttpServletRequest hsr) + throws AuthorizationException, IOException, InterruptedException { + + if (!isRunning) { + throw new RuntimeException("RM is stopped"); } - return Response.status(Status.OK).entity(resResponse).build(); + ReservationId resId = ReservationId.newInstance(Time.now(), resCounter.incrementAndGet()); + LOG.info("Allocated new reservationId: {}.", resId); + + NewReservation reservationId = new NewReservation(resId.toString()); + return Response.status(Status.OK).entity(reservationId).build(); + } + + @Override + public Response submitReservation(ReservationSubmissionRequestInfo resContext, + HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { + + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + + ReservationId reservationId = ReservationId.parseReservationId(resContext.getReservationId()); + ReservationDefinitionInfo definitionInfo = resContext.getReservationDefinition(); + ReservationDefinition definition = + RouterServerUtil.convertReservationDefinition(definitionInfo); + ReservationSubmissionRequest request = ReservationSubmissionRequest.newInstance( + definition, resContext.getQueue(), reservationId); + submitReservation(request); + + LOG.info("Reservation submitted: {}.", reservationId); + + SubClusterId subClusterId = getSubClusterId(); + reservationMap.put(reservationId, subClusterId); + + return Response.status(Status.ACCEPTED).build(); + } + + private void submitReservation(ReservationSubmissionRequest request) { + try { + // synchronize plan + ReservationSystem reservationSystem = mockRM.getReservationSystem(); + reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true); + // Generate reserved resources + ClientRMService clientService = mockRM.getClientRMService(); + clientService.submitReservation(request); + } catch (IOException | YarnException e) { + throw new RuntimeException(e); + } + } + + @Override + public Response updateReservation(ReservationUpdateRequestInfo resContext, + HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException { + + if (resContext == null || resContext.getReservationId() == null || + resContext.getReservationDefinition() == null) { + return Response.status(Status.BAD_REQUEST).build(); + } + + String resId = resContext.getReservationId(); + ReservationId reservationId = ReservationId.parseReservationId(resId); + + if (!reservationMap.containsKey(reservationId)) { + throw new NotFoundException("reservationId with id: " + reservationId + " not found"); + } + + // Generate reserved resources + updateReservation(resContext); + + ReservationUpdateResponseInfo resRespInfo = new ReservationUpdateResponseInfo(); + return Response.status(Status.OK).entity(resRespInfo).build(); + } + + private void updateReservation(ReservationUpdateRequestInfo resContext) throws IOException { + + if (resContext == null) { + throw new BadRequestException("Input ReservationSubmissionContext should not be null"); + } + + ReservationDefinitionInfo resInfo = resContext.getReservationDefinition(); + if (resInfo == null) { + throw new BadRequestException("Input ReservationDefinition should not be null"); + } + + ReservationRequestsInfo resReqsInfo = resInfo.getReservationRequests(); + if (resReqsInfo == null || resReqsInfo.getReservationRequest() == null + || resReqsInfo.getReservationRequest().isEmpty()) { + throw new BadRequestException("The ReservationDefinition should " + + "contain at least one ReservationRequest"); + } + + if (resContext.getReservationId() == null) { + throw new BadRequestException("Update operations must specify an existing ReservaitonId"); + } + + ReservationRequestInterpreter[] values = ReservationRequestInterpreter.values(); + ReservationRequestInterpreter requestInterpreter = + values[resReqsInfo.getReservationRequestsInterpreter()]; + List list = new ArrayList<>(); + + for (ReservationRequestInfo resReqInfo : resReqsInfo.getReservationRequest()) { + ResourceInfo rInfo = resReqInfo.getCapability(); + Resource capability = Resource.newInstance(rInfo.getMemorySize(), rInfo.getvCores()); + int numContainers = resReqInfo.getNumContainers(); + int minConcurrency = resReqInfo.getMinConcurrency(); + long duration = resReqInfo.getDuration(); + ReservationRequest rr = ReservationRequest.newInstance( + capability, numContainers, minConcurrency, duration); + list.add(rr); + } + + ReservationRequests reqs = ReservationRequests.newInstance(list, requestInterpreter); + ReservationDefinition rDef = ReservationDefinition.newInstance( + resInfo.getArrival(), resInfo.getDeadline(), reqs, + resInfo.getReservationName(), resInfo.getRecurrenceExpression(), + Priority.newInstance(resInfo.getPriority())); + ReservationUpdateRequest request = ReservationUpdateRequest.newInstance( + rDef, ReservationId.parseReservationId(resContext.getReservationId())); + + ClientRMService clientService = mockRM.getClientRMService(); + try { + clientService.updateReservation(request); + } catch (YarnException ex) { + throw new RuntimeException(ex); + } + } + + @Override + public Response deleteReservation(ReservationDeleteRequestInfo resContext, HttpServletRequest hsr) + throws AuthorizationException, IOException, InterruptedException { + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + + try { + String resId = resContext.getReservationId(); + ReservationId reservationId = ReservationId.parseReservationId(resId); + + if (!reservationMap.containsKey(reservationId)) { + throw new NotFoundException("reservationId with id: " + reservationId + " not found"); + } + + ReservationDeleteRequest reservationDeleteRequest = + ReservationDeleteRequest.newInstance(reservationId); + ClientRMService clientService = mockRM.getClientRMService(); + clientService.deleteReservation(reservationDeleteRequest); + + ReservationDeleteResponseInfo resRespInfo = new ReservationDeleteResponseInfo(); + reservationMap.remove(reservationId); + + return Response.status(Status.OK).entity(resRespInfo).build(); + } catch (YarnException e) { + throw new RuntimeException(e); + } + } + + @VisibleForTesting + public MockRM getMockRM() { + return mockRM; + } + + @VisibleForTesting + public void setMockRM(MockRM mockResourceManager) { + this.mockRM = mockResourceManager; } @Override @@ -939,7 +1105,7 @@ private MockRM setupResourceManager() throws Exception { public RMQueueAclInfo checkUserAccessToQueue(String queue, String username, String queueAclType, HttpServletRequest hsr) throws AuthorizationException { - ResourceManager mockRM = mock(ResourceManager.class); + ResourceManager mockResourceManager = mock(ResourceManager.class); Configuration conf = new YarnConfiguration(); ResourceScheduler mockScheduler = new CapacityScheduler() { @@ -959,8 +1125,9 @@ public synchronized boolean checkAccess(UserGroupInformation callerUGI, } }; - when(mockRM.getResourceScheduler()).thenReturn(mockScheduler); - MockRMWebServices webSvc = new MockRMWebServices(mockRM, conf, mock(HttpServletResponse.class)); + when(mockResourceManager.getResourceScheduler()).thenReturn(mockScheduler); + MockRMWebServices webSvc = new MockRMWebServices(mockResourceManager, conf, + mock(HttpServletResponse.class)); return webSvc.checkUserAccessToQueue(queue, username, queueAclType, hsr); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java index 7c82e71ea9b..14533d10871 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java @@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; @@ -45,6 +46,11 @@ import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequests; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -59,8 +65,6 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; -import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster; -import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo; @@ -86,6 +90,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationListInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo; import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey; @@ -94,6 +99,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo; @@ -106,7 +114,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.hadoop.yarn.server.router.webapp.MockDefaultRequestInterceptorREST.QUEUE_DEDICATED_FULL; +import static org.apache.hadoop.yarn.server.router.webapp.MockDefaultRequestInterceptorREST.DURATION; +import static org.apache.hadoop.yarn.server.router.webapp.MockDefaultRequestInterceptorREST.NUM_CONTAINERS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -130,7 +139,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { private List subClusters; @Override - public void setUp() { + public void setUp() throws YarnException, IOException { super.setUpConfig(); interceptor = new TestableFederationInterceptorREST(); @@ -156,6 +165,13 @@ public void setUp() { Assert.fail(); } + for (SubClusterId subCluster : subClusters) { + SubClusterInfo subClusterInfo = stateStoreUtil.querySubClusterInfo(subCluster); + interceptor.getOrCreateInterceptorForSubCluster( + subCluster, subClusterInfo.getRMWebServiceAddress()); + } + + interceptor.setupResourceManager(); } @Override @@ -1100,14 +1116,9 @@ public void testGetAppActivities() throws IOException, InterruptedException { @Test public void testListReservation() throws Exception { - // Add ReservationId In stateStore + // submitReservation ReservationId reservationId = ReservationId.newInstance(Time.now(), 1); - SubClusterId homeSubClusterId = subClusters.get(0); - ReservationHomeSubCluster reservationHomeSubCluster = - ReservationHomeSubCluster.newInstance(reservationId, homeSubClusterId); - AddReservationHomeSubClusterRequest request = - AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster); - stateStore.addReservationHomeSubCluster(request); + submitReservation(reservationId); // Call the listReservation method String applyReservationId = reservationId.toString(); @@ -1157,6 +1168,199 @@ public void testListReservation() throws Exception { Assert.assertEquals(1024, memory); } + @Test + public void testCreateNewReservation() throws Exception { + Response response = interceptor.createNewReservation(null); + Assert.assertNotNull(response); + + Object entity = response.getEntity(); + Assert.assertNotNull(entity); + Assert.assertTrue(entity instanceof NewReservation); + + NewReservation newReservation = (NewReservation) entity; + Assert.assertNotNull(newReservation); + Assert.assertTrue(newReservation.getReservationId().contains("reservation")); + } + + @Test + public void testSubmitReservation() throws Exception { + + // submit reservation + ReservationId reservationId = ReservationId.newInstance(Time.now(), 2); + Response response = submitReservation(reservationId); + Assert.assertNotNull(response); + Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus()); + + String applyReservationId = reservationId.toString(); + Response reservationResponse = interceptor.listReservation( + QUEUE_DEDICATED_FULL, applyReservationId, -1, -1, false, null); + Assert.assertNotNull(reservationResponse); + + Object entity = reservationResponse.getEntity(); + Assert.assertNotNull(entity); + Assert.assertNotNull(entity instanceof ReservationListInfo); + + ReservationListInfo listInfo = (ReservationListInfo) entity; + Assert.assertNotNull(listInfo); + + List reservationInfos = listInfo.getReservations(); + Assert.assertNotNull(reservationInfos); + Assert.assertEquals(1, reservationInfos.size()); + + ReservationInfo reservationInfo = reservationInfos.get(0); + Assert.assertNotNull(reservationInfo); + Assert.assertEquals(reservationInfo.getReservationId(), applyReservationId); + } + + @Test + public void testUpdateReservation() throws Exception { + // submit reservation + ReservationId reservationId = ReservationId.newInstance(Time.now(), 3); + Response response = submitReservation(reservationId); + Assert.assertNotNull(response); + Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus()); + + // update reservation + ReservationSubmissionRequest resSubRequest = + getReservationSubmissionRequest(reservationId, 6, 2048, 2); + ReservationDefinition reservationDefinition = resSubRequest.getReservationDefinition(); + ReservationDefinitionInfo reservationDefinitionInfo = + new ReservationDefinitionInfo(reservationDefinition); + + ReservationUpdateRequestInfo updateRequestInfo = new ReservationUpdateRequestInfo(); + updateRequestInfo.setReservationId(reservationId.toString()); + updateRequestInfo.setReservationDefinition(reservationDefinitionInfo); + Response updateReservationResp = interceptor.updateReservation(updateRequestInfo, null); + Assert.assertNotNull(updateReservationResp); + Assert.assertEquals(Status.OK.getStatusCode(), updateReservationResp.getStatus()); + + String applyReservationId = reservationId.toString(); + Response reservationResponse = interceptor.listReservation( + QUEUE_DEDICATED_FULL, applyReservationId, -1, -1, false, null); + Assert.assertNotNull(reservationResponse); + + Object entity = reservationResponse.getEntity(); + Assert.assertNotNull(entity); + Assert.assertNotNull(entity instanceof ReservationListInfo); + + ReservationListInfo listInfo = (ReservationListInfo) entity; + Assert.assertNotNull(listInfo); + + List reservationInfos = listInfo.getReservations(); + Assert.assertNotNull(reservationInfos); + Assert.assertEquals(1, reservationInfos.size()); + + ReservationInfo reservationInfo = reservationInfos.get(0); + Assert.assertNotNull(reservationInfo); + Assert.assertEquals(reservationInfo.getReservationId(), applyReservationId); + + ReservationDefinitionInfo resDefinitionInfo = reservationInfo.getReservationDefinition(); + Assert.assertNotNull(resDefinitionInfo); + + ReservationRequestsInfo reservationRequestsInfo = resDefinitionInfo.getReservationRequests(); + Assert.assertNotNull(reservationRequestsInfo); + + ArrayList reservationRequestInfoList = + reservationRequestsInfo.getReservationRequest(); + Assert.assertNotNull(reservationRequestInfoList); + Assert.assertEquals(1, reservationRequestInfoList.size()); + + ReservationRequestInfo reservationRequestInfo = reservationRequestInfoList.get(0); + Assert.assertNotNull(reservationRequestInfo); + Assert.assertEquals(6, reservationRequestInfo.getNumContainers()); + + ResourceInfo resourceInfo = reservationRequestInfo.getCapability(); + Assert.assertNotNull(resourceInfo); + + int vCore = resourceInfo.getvCores(); + long memory = resourceInfo.getMemorySize(); + Assert.assertEquals(2, vCore); + Assert.assertEquals(2048, memory); + } + + @Test + public void testDeleteReservation() throws Exception { + // submit reservation + ReservationId reservationId = ReservationId.newInstance(Time.now(), 4); + Response response = submitReservation(reservationId); + Assert.assertNotNull(response); + Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus()); + + String applyResId = reservationId.toString(); + Response reservationResponse = interceptor.listReservation( + QUEUE_DEDICATED_FULL, applyResId, -1, -1, false, null); + Assert.assertNotNull(reservationResponse); + + ReservationDeleteRequestInfo deleteRequestInfo = + new ReservationDeleteRequestInfo(); + deleteRequestInfo.setReservationId(applyResId); + Response delResponse = interceptor.deleteReservation(deleteRequestInfo, null); + Assert.assertNotNull(delResponse); + + LambdaTestUtils.intercept(Exception.class, + "reservationId with id: " + reservationId + " not found", + () -> interceptor.listReservation(QUEUE_DEDICATED_FULL, applyResId, -1, -1, false, null)); + } + + private Response submitReservation(ReservationId reservationId) + throws IOException, InterruptedException { + ReservationSubmissionRequestInfo resSubmissionRequestInfo = + getReservationSubmissionRequestInfo(reservationId); + Response response = interceptor.submitReservation(resSubmissionRequestInfo, null); + return response; + } + + public static ReservationSubmissionRequestInfo getReservationSubmissionRequestInfo( + ReservationId reservationId) { + + ReservationSubmissionRequest resSubRequest = + getReservationSubmissionRequest(reservationId, NUM_CONTAINERS, 1024, 1); + ReservationDefinition reservationDefinition = resSubRequest.getReservationDefinition(); + + ReservationSubmissionRequestInfo resSubmissionRequestInfo = + new ReservationSubmissionRequestInfo(); + resSubmissionRequestInfo.setQueue(resSubRequest.getQueue()); + resSubmissionRequestInfo.setReservationId(reservationId.toString()); + ReservationDefinitionInfo reservationDefinitionInfo = + new ReservationDefinitionInfo(reservationDefinition); + resSubmissionRequestInfo.setReservationDefinition(reservationDefinitionInfo); + + return resSubmissionRequestInfo; + } + + public static ReservationSubmissionRequest getReservationSubmissionRequest( + ReservationId reservationId, int numContainers, int memory, int vcore) { + + // arrival time from which the resource(s) can be allocated. + long arrival = Time.now(); + + // deadline by when the resource(s) must be allocated. + // The reason for choosing 1.05 is because this gives an integer + // DURATION * 0.05 = 3000(ms) + // deadline = arrival + 3000ms + long deadline = (long) (arrival + 1.05 * DURATION); + + ReservationSubmissionRequest submissionRequest = createSimpleReservationRequest( + reservationId, numContainers, arrival, deadline, DURATION, memory, vcore); + + return submissionRequest; + } + + public static ReservationSubmissionRequest createSimpleReservationRequest( + ReservationId reservationId, int numContainers, long arrival, + long deadline, long duration, int memory, int vcore) { + // create a request with a single atomic ask + ReservationRequest r = ReservationRequest.newInstance( + Resource.newInstance(memory, vcore), numContainers, 1, duration); + ReservationRequests reqs = ReservationRequests.newInstance( + Collections.singletonList(r), ReservationRequestInterpreter.R_ALL); + ReservationDefinition rDef = ReservationDefinition.newInstance( + arrival, deadline, reqs, "testClientRMService#reservation", "0", Priority.UNDEFINED); + ReservationSubmissionRequest request = ReservationSubmissionRequest.newInstance( + rDef, QUEUE_DEDICATED_FULL, reservationId); + return request; + } + @Test public void testWebAddressWithScheme() { // The style of the web address reported by the subCluster in the heartbeat is 0.0.0.0:8000 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java index 7126ca515c1..31fd756b664 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java @@ -18,10 +18,25 @@ package org.apache.hadoop.yarn.server.router.webapp; +import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEDICATED_FULL; +import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEFAULT_FULL; +import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEFAULT; +import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEDICATED; /** * Extends the FederationInterceptorREST and overrides methods to provide a @@ -30,7 +45,11 @@ public class TestableFederationInterceptorREST extends FederationInterceptorREST { - private List badSubCluster = new ArrayList(); + private List badSubCluster = new ArrayList<>(); + private MockRM mockRM = null; + + private static final Logger LOG = + LoggerFactory.getLogger(TestableFederationInterceptorREST.class); /** * For testing purpose, some subclusters has to be down to simulate particular @@ -51,4 +70,51 @@ protected void registerBadSubCluster(SubClusterId badSC) { interceptor.setRunning(false); } + protected void setupResourceManager() throws IOException { + + if (mockRM != null) { + return; + } + + try { + + DefaultMetricsSystem.setMiniClusterMode(true); + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + + // Define default queue + conf.setCapacity(QUEUE_DEFAULT_FULL, 20); + // Define dedicated queues + String[] queues = new String[]{QUEUE_DEFAULT, QUEUE_DEDICATED}; + conf.setQueues(CapacitySchedulerConfiguration.ROOT, queues); + conf.setCapacity(QUEUE_DEDICATED_FULL, 80); + conf.setReservable(QUEUE_DEDICATED_FULL, true); + + conf.setClass(YarnConfiguration.RM_SCHEDULER, + CapacityScheduler.class, ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true); + conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false); + + mockRM = new MockRM(conf); + mockRM.start(); + mockRM.registerNode("127.0.0.1:5678", 100*1024, 100); + + Map interceptors = super.getInterceptors(); + for (DefaultRequestInterceptorREST item : interceptors.values()) { + MockDefaultRequestInterceptorREST interceptor = (MockDefaultRequestInterceptorREST) item; + interceptor.setMockRM(mockRM); + } + } catch (Exception e) { + LOG.error("setupResourceManager failed.", e); + throw new IOException(e); + } + } + + @Override + public void shutdown() { + if (mockRM != null) { + mockRM.stop(); + mockRM = null; + } + super.shutdown(); + } } \ No newline at end of file