diff --git a/YARN-1051-CHANGES.txt b/YARN-1051-CHANGES.txt
index 56b3c12d8fb..c4106b277ec 100644
--- a/YARN-1051-CHANGES.txt
+++ b/YARN-1051-CHANGES.txt
@@ -20,3 +20,6 @@ on user reservations. (Carlo Curino and Subru Krishnan via curino)
YARN-1712. Plan follower that synchronizes the current state of reservation
subsystem with the scheduler. (Subru Krishnan and Carlo Curino via subru)
+
+YARN-2080. Integrating reservation system with ResourceManager and
+client-RM protocol. (Subru Krishnan and Carlo Curino via subru)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 1a201c4d060..1a2aa1dc556 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -176,6 +176,25 @@ public class YarnConfiguration extends Configuration {
public static final boolean DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME =
false;
+ /** Whether the RM should enable Reservation System */
+ public static final String RM_RESERVATION_SYSTEM_ENABLE = RM_PREFIX
+ + "reservation-system.enable";
+ public static final boolean DEFAULT_RM_RESERVATION_SYSTEM_ENABLE = false;
+
+ /** The class to use as the Reservation System. */
+ public static final String RM_RESERVATION_SYSTEM_CLASS = RM_PREFIX
+ + "reservation-system.class";
+
+ /** The PlanFollower for the Reservation System. */
+ public static final String RM_RESERVATION_SYSTEM_PLAN_FOLLOWER = RM_PREFIX
+ + "reservation-system.plan.follower";
+
+ /** The step size of the Reservation System. */
+ public static final String RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP =
+ RM_PREFIX + "reservation-system.planfollower.time-step";
+ public static final long DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP =
+ 1000L;
+
/**
* Enable periodic monitor threads.
* @see #RM_SCHEDULER_MONITOR_POLICIES
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
index 9e27de58290..d697de91b0a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
@@ -27,10 +27,17 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
@@ -43,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@@ -474,4 +482,98 @@ public abstract class YarnClient extends AbstractService {
*/
public abstract void moveApplicationAcrossQueues(ApplicationId appId,
String queue) throws YarnException, IOException;
+
+ /**
+ *
+ * The interface used by clients to submit a new reservation to the
+ * {@link ResourceManager}.
+ *
+ *
+ *
+ * The client packages all details of its request in a
+ * {@link ReservationRequest} object. This contains information about the
+ * amount of capacity, temporal constraints, and gang needs. Furthermore, the
+ * reservation might be composed of multiple stages, with ordering
+ * dependencies among them.
+ *
+ *
+ *
+ * In order to respond, a new admission control component in the
+ * {@link ResourceManager} performs an analysis of the resources that have
+ * been committed over the period of time the user is requesting, verify that
+ * the user requests can be fulfilled, and that it respect a sharing policy
+ * (e.g., {@link CapacityOverTimePolicy}). Once it has positively determined
+ * that the ReservationRequest is satisfiable the {@link ResourceManager}
+ * answers with a {@link ReservationResponse} that include a
+ * {@link ReservationId}. Upon failure to find a valid allocation the response
+ * is an exception with the message detailing the reason of failure.
+ *
+ *
+ *
+ * The semantics guarantees that the ReservationId returned, corresponds to a
+ * valid reservation existing in the time-range request by the user. The
+ * amount of capacity dedicated to such reservation can vary overtime,
+ * depending of the allocation that has been determined. But it is guaranteed
+ * to satisfy all the constraint expressed by the user in the
+ * {@link ReservationRequest}
+ *
+ *
+ * @param request request to submit a new Reservation
+ * @return response contains the {@link ReservationId} on accepting the
+ * submission
+ * @throws YarnException if the reservation cannot be created successfully
+ * @throws IOException
+ *
+ */
+ @Public
+ @Unstable
+ public abstract ReservationSubmissionResponse submitReservation(
+ ReservationSubmissionRequest request) throws YarnException, IOException;
+
+ /**
+ *
+ * The interface used by clients to update an existing Reservation. This is
+ * referred to as a re-negotiation process, in which a user that has
+ * previously submitted a Reservation.
+ *
+ *
+ *
+ * The allocation is attempted by virtually substituting all previous
+ * allocations related to this Reservation with new ones, that satisfy the new
+ * {@link ReservationRequest}. Upon success the previous allocation is
+ * atomically substituted by the new one, and on failure (i.e., if the system
+ * cannot find a valid allocation for the updated request), the previous
+ * allocation remains valid.
+ *
+ *
+ * @param request to update an existing Reservation (the ReservationRequest
+ * should refer to an existing valid {@link ReservationId})
+ * @return response empty on successfully updating the existing reservation
+ * @throws YarnException if the request is invalid or reservation cannot be
+ * updated successfully
+ * @throws IOException
+ *
+ */
+ @Public
+ @Unstable
+ public abstract ReservationUpdateResponse updateReservation(
+ ReservationUpdateRequest request) throws YarnException, IOException;
+
+ /**
+ *
+ * The interface used by clients to remove an existing Reservation.
+ *
+ *
+ * @param request to remove an existing Reservation (the ReservationRequest
+ * should refer to an existing valid {@link ReservationId})
+ * @return response empty on successfully deleting the existing reservation
+ * @throws YarnException if the request is invalid or reservation cannot be
+ * deleted successfully
+ * @throws IOException
+ *
+ */
+ @Public
+ @Unstable
+ public abstract ReservationDeleteResponse deleteReservation(
+ ReservationDeleteRequest request) throws YarnException, IOException;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index def6da55ea8..02c5a74b041 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -63,6 +63,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
@@ -646,4 +652,23 @@ public class YarnClientImpl extends YarnClient {
MoveApplicationAcrossQueuesRequest.newInstance(appId, queue);
rmClient.moveApplicationAcrossQueues(request);
}
+
+ @Override
+ public ReservationSubmissionResponse submitReservation(
+ ReservationSubmissionRequest request) throws YarnException, IOException {
+ return rmClient.submitReservation(request);
+ }
+
+ @Override
+ public ReservationUpdateResponse updateReservation(
+ ReservationUpdateRequest request) throws YarnException, IOException {
+ return rmClient.updateReservation(request);
+ }
+
+ @Override
+ public ReservationDeleteResponse deleteReservation(
+ ReservationDeleteRequest request) throws YarnException, IOException {
+ return rmClient.deleteReservation(request);
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
index 3c1b1c19908..d7bea7a8d0a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
@@ -31,6 +31,7 @@ import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -63,6 +64,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
@@ -76,6 +83,11 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -89,8 +101,14 @@ import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
@@ -834,4 +852,101 @@ public class TestYarnClient {
client.stop();
}
}
+
+ @Test
+ public void testReservationAPIs() {
+ // initialize
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ ReservationSystemTestUtil.setupQueueConfiguration(conf);
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
+ MiniYARNCluster cluster =
+ new MiniYARNCluster("testReservationAPIs", 2, 1, 1);
+ YarnClient client = null;
+ try {
+ cluster.init(conf);
+ cluster.start();
+ final Configuration yarnConf = cluster.getConfig();
+ client = YarnClient.createYarnClient();
+ client.init(yarnConf);
+ client.start();
+
+ // create a reservation
+ Clock clock = new UTCClock();
+ long arrival = clock.getTime();
+ long duration = 60000;
+ long deadline = (long) (arrival + 1.05 * duration);
+ ReservationSubmissionRequest sRequest =
+ createSimpleReservationRequest(4, arrival, deadline, duration);
+ ReservationSubmissionResponse sResponse = null;
+ try {
+ sResponse = client.submitReservation(sRequest);
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ Assert.assertNotNull(sResponse);
+ ReservationId reservationID = sResponse.getReservationId();
+ Assert.assertNotNull(reservationID);
+ System.out.println("Submit reservation response: " + reservationID);
+
+ // Update the reservation
+ ReservationDefinition rDef = sRequest.getReservationDefinition();
+ ReservationRequest rr =
+ rDef.getReservationRequests().getReservationResources().get(0);
+ rr.setNumContainers(5);
+ arrival = clock.getTime();
+ duration = 30000;
+ deadline = (long) (arrival + 1.05 * duration);
+ rr.setDuration(duration);
+ rDef.setArrival(arrival);
+ rDef.setDeadline(deadline);
+ ReservationUpdateRequest uRequest =
+ ReservationUpdateRequest.newInstance(rDef, reservationID);
+ ReservationUpdateResponse uResponse = null;
+ try {
+ uResponse = client.updateReservation(uRequest);
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ Assert.assertNotNull(sResponse);
+ System.out.println("Update reservation response: " + uResponse);
+
+ // Delete the reservation
+ ReservationDeleteRequest dRequest =
+ ReservationDeleteRequest.newInstance(reservationID);
+ ReservationDeleteResponse dResponse = null;
+ try {
+ dResponse = client.deleteReservation(dRequest);
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ Assert.assertNotNull(sResponse);
+ System.out.println("Delete reservation response: " + dResponse);
+ } finally {
+ // clean-up
+ if (client != null) {
+ client.stop();
+ }
+ cluster.stop();
+ }
+ }
+
+ private ReservationSubmissionRequest createSimpleReservationRequest(
+ int numContainers, long arrival, long deadline, long duration) {
+ // create a request with a single atomic ask
+ ReservationRequest r =
+ ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+ numContainers, 1, duration);
+ ReservationRequests reqs =
+ ReservationRequests.newInstance(Collections.singletonList(r),
+ ReservationRequestInterpreter.R_ALL);
+ ReservationDefinition rDef =
+ ReservationDefinition.newInstance(arrival, deadline, reqs,
+ "testYarnClient#reservation");
+ ReservationSubmissionRequest request =
+ ReservationSubmissionRequest.newInstance(rDef,
+ ReservationSystemTestUtil.reservationQ);
+ return request;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index ff0a249bce9..2b7797fa2ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMapp
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
@@ -348,6 +349,11 @@ public class AdminService extends CompositeService implements
recordFactory.newRecordInstance(RefreshQueuesResponse.class);
try {
rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
+ // refresh the reservation system
+ ReservationSystem rSystem = rmContext.getReservationSystem();
+ if (rSystem != null) {
+ rSystem.reinitialize(getConfig(), rmContext);
+ }
RMAuditLogger.logSuccess(user.getShortUserName(), argName,
"AdminService");
return response;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index 71f873c26a4..12811d0c5e3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.security.AccessControlException;
+import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -79,6 +80,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesReq
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -93,6 +100,8 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@@ -107,6 +116,10 @@ import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInputValidator;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -123,7 +136,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenS
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.UTCClock;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Futures;
@@ -154,10 +169,23 @@ public class ClientRMService extends AbstractService implements
private final ApplicationACLsManager applicationsACLsManager;
private final QueueACLsManager queueACLsManager;
+ // For Reservation APIs
+ private Clock clock;
+ private ReservationSystem reservationSystem;
+ private ReservationInputValidator rValidator;
+
public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager,
QueueACLsManager queueACLsManager,
RMDelegationTokenSecretManager rmDTSecretManager) {
+ this(rmContext, scheduler, rmAppManager, applicationACLsManager,
+ queueACLsManager, rmDTSecretManager, new UTCClock());
+ }
+
+ public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
+ RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager,
+ QueueACLsManager queueACLsManager,
+ RMDelegationTokenSecretManager rmDTSecretManager, Clock clock) {
super(ClientRMService.class.getName());
this.scheduler = scheduler;
this.rmContext = rmContext;
@@ -165,6 +193,9 @@ public class ClientRMService extends AbstractService implements
this.applicationsACLsManager = applicationACLsManager;
this.queueACLsManager = queueACLsManager;
this.rmDTSecretManager = rmDTSecretManager;
+ this.reservationSystem = rmContext.getReservationSystem();
+ this.clock = clock;
+ this.rValidator = new ReservationInputValidator(clock);
}
@Override
@@ -1033,4 +1064,174 @@ public class ClientRMService extends AbstractService implements
public Server getServer() {
return this.server;
}
+
+ @Override
+ public ReservationSubmissionResponse submitReservation(
+ ReservationSubmissionRequest request) throws YarnException, IOException {
+ // Check if reservation system is enabled
+ checkReservationSytem(AuditConstants.SUBMIT_RESERVATION_REQUEST);
+ ReservationSubmissionResponse response =
+ recordFactory.newRecordInstance(ReservationSubmissionResponse.class);
+ // Create a new Reservation Id
+ ReservationId reservationId = reservationSystem.getNewReservationId();
+ // Validate the input
+ Plan plan =
+ rValidator.validateReservationSubmissionRequest(reservationSystem,
+ request, reservationId);
+ // Check ACLs
+ String queueName = request.getQueue();
+ String user =
+ checkReservationACLs(queueName,
+ AuditConstants.SUBMIT_RESERVATION_REQUEST);
+ try {
+ // Try to place the reservation using the agent
+ boolean result =
+ plan.getReservationAgent().createReservation(reservationId, user,
+ plan, request.getReservationDefinition());
+ if (result) {
+ // add the reservation id to valid ones maintained by reservation
+ // system
+ reservationSystem.setQueueForReservation(reservationId, queueName);
+ // create the reservation synchronously if required
+ refreshScheduler(queueName, request.getReservationDefinition(),
+ reservationId.toString());
+ // return the reservation id
+ response.setReservationId(reservationId);
+ }
+ } catch (PlanningException e) {
+ RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_RESERVATION_REQUEST,
+ e.getMessage(), "ClientRMService",
+ "Unable to create the reservation: " + reservationId);
+ throw RPCUtil.getRemoteException(e);
+ }
+ RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_RESERVATION_REQUEST,
+ "ClientRMService: " + reservationId);
+ return response;
+ }
+
+ @Override
+ public ReservationUpdateResponse updateReservation(
+ ReservationUpdateRequest request) throws YarnException, IOException {
+ // Check if reservation system is enabled
+ checkReservationSytem(AuditConstants.UPDATE_RESERVATION_REQUEST);
+ ReservationUpdateResponse response =
+ recordFactory.newRecordInstance(ReservationUpdateResponse.class);
+ // Validate the input
+ Plan plan =
+ rValidator.validateReservationUpdateRequest(reservationSystem, request);
+ ReservationId reservationId = request.getReservationId();
+ String queueName = reservationSystem.getQueueForReservation(reservationId);
+ // Check ACLs
+ String user =
+ checkReservationACLs(queueName,
+ AuditConstants.UPDATE_RESERVATION_REQUEST);
+ // Try to update the reservation using default agent
+ try {
+ boolean result =
+ plan.getReservationAgent().updateReservation(reservationId, user,
+ plan, request.getReservationDefinition());
+ if (!result) {
+ String errMsg = "Unable to update reservation: " + reservationId;
+ RMAuditLogger.logFailure(user,
+ AuditConstants.UPDATE_RESERVATION_REQUEST, errMsg,
+ "ClientRMService", errMsg);
+ throw RPCUtil.getRemoteException(errMsg);
+ }
+ } catch (PlanningException e) {
+ RMAuditLogger.logFailure(user, AuditConstants.UPDATE_RESERVATION_REQUEST,
+ e.getMessage(), "ClientRMService",
+ "Unable to update the reservation: " + reservationId);
+ throw RPCUtil.getRemoteException(e);
+ }
+ RMAuditLogger.logSuccess(user, AuditConstants.UPDATE_RESERVATION_REQUEST,
+ "ClientRMService: " + reservationId);
+ return response;
+ }
+
+ @Override
+ public ReservationDeleteResponse deleteReservation(
+ ReservationDeleteRequest request) throws YarnException, IOException {
+ // Check if reservation system is enabled
+ checkReservationSytem(AuditConstants.DELETE_RESERVATION_REQUEST);
+ ReservationDeleteResponse response =
+ recordFactory.newRecordInstance(ReservationDeleteResponse.class);
+ // Validate the input
+ Plan plan =
+ rValidator.validateReservationDeleteRequest(reservationSystem, request);
+ ReservationId reservationId = request.getReservationId();
+ String queueName = reservationSystem.getQueueForReservation(reservationId);
+ // Check ACLs
+ String user =
+ checkReservationACLs(queueName,
+ AuditConstants.DELETE_RESERVATION_REQUEST);
+ // Try to update the reservation using default agent
+ try {
+ boolean result =
+ plan.getReservationAgent().deleteReservation(reservationId, user,
+ plan);
+ if (!result) {
+ String errMsg = "Could not delete reservation: " + reservationId;
+ RMAuditLogger.logFailure(user,
+ AuditConstants.DELETE_RESERVATION_REQUEST, errMsg,
+ "ClientRMService", errMsg);
+ throw RPCUtil.getRemoteException(errMsg);
+ }
+ } catch (PlanningException e) {
+ RMAuditLogger.logFailure(user, AuditConstants.DELETE_RESERVATION_REQUEST,
+ e.getMessage(), "ClientRMService",
+ "Unable to delete the reservation: " + reservationId);
+ throw RPCUtil.getRemoteException(e);
+ }
+ RMAuditLogger.logSuccess(user, AuditConstants.DELETE_RESERVATION_REQUEST,
+ "ClientRMService: " + reservationId);
+ return response;
+ }
+
+ private void checkReservationSytem(String auditConstant) throws YarnException {
+ // Check if reservation is enabled
+ if (reservationSystem == null) {
+ throw RPCUtil.getRemoteException("Reservation is not enabled."
+ + " Please enable & try again");
+ }
+ }
+
+ private void refreshScheduler(String planName,
+ ReservationDefinition contract, String reservationId) {
+ if ((contract.getArrival() - clock.getTime()) < reservationSystem
+ .getPlanFollowerTimeStep()) {
+ LOG.debug(MessageFormat
+ .format(
+ "Reservation {0} is within threshold so attempting to create synchronously.",
+ reservationId));
+ reservationSystem.synchronizePlan(planName);
+ LOG.info(MessageFormat.format("Created reservation {0} synchronously.",
+ reservationId));
+ }
+ }
+
+ private String checkReservationACLs(String queueName, String auditConstant)
+ throws YarnException {
+ UserGroupInformation callerUGI;
+ try {
+ callerUGI = UserGroupInformation.getCurrentUser();
+ } catch (IOException ie) {
+ RMAuditLogger.logFailure("UNKNOWN", auditConstant, queueName,
+ "ClientRMService", "Error getting UGI");
+ throw RPCUtil.getRemoteException(ie);
+ }
+ // Check if user has access on the managed queue
+ if (!queueACLsManager.checkAccess(callerUGI, QueueACL.SUBMIT_APPLICATIONS,
+ queueName)) {
+ RMAuditLogger.logFailure(
+ callerUGI.getShortUserName(),
+ auditConstant,
+ "User doesn't have permissions to "
+ + QueueACL.SUBMIT_APPLICATIONS.toString(), "ClientRMService",
+ AuditConstants.UNAUTHORIZED_USER);
+ throw RPCUtil.getRemoteException(new AccessControlException("User "
+ + callerUGI.getShortUserName() + " cannot perform operation "
+ + QueueACL.SUBMIT_APPLICATIONS.name() + " on queue" + queueName));
+ }
+ return callerUGI.getShortUserName();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
index 9ae09a4db7a..6dd67c792bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
@@ -56,6 +56,11 @@ public class RMAuditLogger {
// Some commonly used descriptions
public static final String UNAUTHORIZED_USER = "Unauthorized user";
+
+ // For Reservation system
+ public static final String SUBMIT_RESERVATION_REQUEST = "Submit Reservation Request";
+ public static final String UPDATE_RESERVATION_REQUEST = "Update Reservation Request";
+ public static final String DELETE_RESERVATION_REQUEST = "Delete Reservation Request";
}
/**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index 60f88f60b98..46ecfcd0e75 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -109,5 +109,7 @@ public interface RMContext {
long getEpoch();
+ ReservationSystem getReservationSystem();
+
boolean isSchedulerReadyForAllocatingContainers();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 36eec045c52..78787ee16a9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
@@ -83,6 +84,7 @@ public class RMContextImpl implements RMContext {
private ClientRMService clientRMService;
private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;
private ResourceScheduler scheduler;
+ private ReservationSystem reservationSystem;
private NodesListManager nodesListManager;
private ResourceTrackerService resourceTrackerService;
private ApplicationMasterService applicationMasterService;
@@ -208,6 +210,11 @@ public class RMContextImpl implements RMContext {
return this.scheduler;
}
+ @Override
+ public ReservationSystem getReservationSystem() {
+ return this.reservationSystem;
+ }
+
@Override
public NodesListManager getNodesListManager() {
return this.nodesListManager;
@@ -303,6 +310,10 @@ public class RMContextImpl implements RMContext {
void setScheduler(ResourceScheduler scheduler) {
this.scheduler = scheduler;
}
+
+ void setReservationSystem(ReservationSystem reservationSystem) {
+ this.reservationSystem = reservationSystem;
+ }
void setDelegationTokenRenewer(
DelegationTokenRenewer delegationTokenRenewer) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 79af7a649f3..3e5f13801a2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -72,6 +72,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.AbstractReservationSystem;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -147,6 +149,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected RMSecretManagerService rmSecretManagerService;
protected ResourceScheduler scheduler;
+ protected ReservationSystem reservationSystem;
private ClientRMService clientRM;
protected ApplicationMasterService masterService;
protected NMLivelinessMonitor nmLivelinessMonitor;
@@ -281,6 +284,29 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
}
+ protected ReservationSystem createReservationSystem() {
+ String reservationClassName =
+ conf.get(YarnConfiguration.RM_RESERVATION_SYSTEM_CLASS,
+ AbstractReservationSystem.getDefaultReservationSystem(scheduler));
+ if (reservationClassName == null) {
+ return null;
+ }
+ LOG.info("Using ReservationSystem: " + reservationClassName);
+ try {
+ Class> reservationClazz = Class.forName(reservationClassName);
+ if (ReservationSystem.class.isAssignableFrom(reservationClazz)) {
+ return (ReservationSystem) ReflectionUtils.newInstance(
+ reservationClazz, this.conf);
+ } else {
+ throw new YarnRuntimeException("Class: " + reservationClassName
+ + " not instance of " + ReservationSystem.class.getCanonicalName());
+ }
+ } catch (ClassNotFoundException e) {
+ throw new YarnRuntimeException(
+ "Could not instantiate ReservationSystem: " + reservationClassName, e);
+ }
+ }
+
protected ApplicationMasterLauncher createAMLauncher() {
return new ApplicationMasterLauncher(this.rmContext);
}
@@ -456,6 +482,18 @@ public class ResourceManager extends CompositeService implements Recoverable {
DefaultMetricsSystem.initialize("ResourceManager");
JvmMetrics.initSingleton("ResourceManager", null);
+ // Initialize the Reservation system
+ if (conf.getBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE,
+ YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_ENABLE)) {
+ reservationSystem = createReservationSystem();
+ if (reservationSystem != null) {
+ reservationSystem.setRMContext(rmContext);
+ addIfService(reservationSystem);
+ rmContext.setReservationSystem(reservationSystem);
+ LOG.info("Initialized Reservation system");
+ }
+ }
+
// creating monitors that handle preemption
createPolicyMonitors();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
new file mode 100644
index 00000000000..f0a95435d54
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
@@ -0,0 +1,323 @@
+/**
+ *
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.UTCClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the implementation of {@link ReservationSystem} based on the
+ * {@link ResourceScheduler}
+ */
+@LimitedPrivate("yarn")
+@Unstable
+public abstract class AbstractReservationSystem extends AbstractService
+ implements ReservationSystem {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(AbstractReservationSystem.class);
+
+ // private static final String DEFAULT_CAPACITY_SCHEDULER_PLAN
+
+ private final ReentrantReadWriteLock readWriteLock =
+ new ReentrantReadWriteLock(true);
+ private final Lock readLock = readWriteLock.readLock();
+ private final Lock writeLock = readWriteLock.writeLock();
+
+ private boolean initialized = false;
+
+ private final Clock clock = new UTCClock();
+
+ private AtomicLong resCounter = new AtomicLong();
+
+ private Map plans = new HashMap();
+
+ private Map resQMap =
+ new HashMap();
+
+ private RMContext rmContext;
+
+ private ResourceScheduler scheduler;
+
+ private ScheduledExecutorService scheduledExecutorService;
+
+ protected Configuration conf;
+
+ protected long planStepSize;
+
+ private PlanFollower planFollower;
+
+ /**
+ * Construct the service.
+ *
+ * @param name service name
+ */
+ public AbstractReservationSystem(String name) {
+ super(name);
+ }
+
+ @Override
+ public void setRMContext(RMContext rmContext) {
+ writeLock.lock();
+ try {
+ this.rmContext = rmContext;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public void reinitialize(Configuration conf, RMContext rmContext)
+ throws YarnException {
+ writeLock.lock();
+ try {
+ if (!initialized) {
+ initialize(conf);
+ initialized = true;
+ } else {
+ initializeNewPlans(conf);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private void initialize(Configuration conf) throws YarnException {
+ LOG.info("Initializing Reservation system");
+ this.conf = conf;
+ scheduler = rmContext.getScheduler();
+ // Get the plan step size
+ planStepSize =
+ conf.getTimeDuration(
+ YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
+ YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
+ TimeUnit.MILLISECONDS);
+ if (planStepSize < 0) {
+ planStepSize =
+ YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP;
+ }
+ // Create a plan corresponding to every reservable queue
+ Set planQueueNames = scheduler.getPlanQueues();
+ for (String planQueueName : planQueueNames) {
+ Plan plan = initializePlan(planQueueName);
+ plans.put(planQueueName, plan);
+ }
+ }
+
+ private void initializeNewPlans(Configuration conf) {
+ LOG.info("Refreshing Reservation system");
+ writeLock.lock();
+ try {
+ // Create a plan corresponding to every new reservable queue
+ Set planQueueNames = scheduler.getPlanQueues();
+ for (String planQueueName : planQueueNames) {
+ if (!plans.containsKey(planQueueName)) {
+ Plan plan = initializePlan(planQueueName);
+ plans.put(planQueueName, plan);
+ } else {
+ LOG.warn("Plan based on reservation queue {0} already exists.",
+ planQueueName);
+ }
+ }
+ // Update the plan follower with the active plans
+ if (planFollower != null) {
+ planFollower.setPlans(plans.values());
+ }
+ } catch (YarnException e) {
+ LOG.warn("Exception while trying to refresh reservable queues", e);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private PlanFollower createPlanFollower() {
+ String planFollowerPolicyClassName =
+ conf.get(YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER,
+ getDefaultPlanFollower());
+ if (planFollowerPolicyClassName == null) {
+ return null;
+ }
+ LOG.info("Using PlanFollowerPolicy: " + planFollowerPolicyClassName);
+ try {
+ Class> planFollowerPolicyClazz =
+ conf.getClassByName(planFollowerPolicyClassName);
+ if (PlanFollower.class.isAssignableFrom(planFollowerPolicyClazz)) {
+ return (PlanFollower) ReflectionUtils.newInstance(
+ planFollowerPolicyClazz, conf);
+ } else {
+ throw new YarnRuntimeException("Class: " + planFollowerPolicyClassName
+ + " not instance of " + PlanFollower.class.getCanonicalName());
+ }
+ } catch (ClassNotFoundException e) {
+ throw new YarnRuntimeException(
+ "Could not instantiate PlanFollowerPolicy: "
+ + planFollowerPolicyClassName, e);
+ }
+ }
+
+ private String getDefaultPlanFollower() {
+ // currently only capacity scheduler is supported
+ if (scheduler instanceof CapacityScheduler) {
+ return CapacitySchedulerPlanFollower.class.getName();
+ }
+ return null;
+ }
+
+ @Override
+ public Plan getPlan(String planName) {
+ readLock.lock();
+ try {
+ return plans.get(planName);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * @return the planStepSize
+ */
+ @Override
+ public long getPlanFollowerTimeStep() {
+ readLock.lock();
+ try {
+ return planStepSize;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void synchronizePlan(String planName) {
+ writeLock.lock();
+ try {
+ Plan plan = plans.get(planName);
+ if (plan != null) {
+ planFollower.synchronizePlan(plan);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ Configuration configuration = new Configuration(conf);
+ reinitialize(configuration, rmContext);
+ // Create the plan follower with the active plans
+ planFollower = createPlanFollower();
+ if (planFollower != null) {
+ planFollower.init(clock, scheduler, plans.values());
+ }
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void serviceStart() throws Exception {
+ if (planFollower != null) {
+ scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+ scheduledExecutorService.scheduleWithFixedDelay(planFollower, 0L,
+ planStepSize, TimeUnit.MILLISECONDS);
+ }
+ super.serviceStart();
+ }
+
+ @Override
+ public void serviceStop() {
+ // Stop the plan follower
+ if (scheduledExecutorService != null
+ && !scheduledExecutorService.isShutdown()) {
+ scheduledExecutorService.shutdown();
+ }
+ // Clear the plans
+ plans.clear();
+ }
+
+ @Override
+ public String getQueueForReservation(ReservationId reservationId) {
+ readLock.lock();
+ try {
+ return resQMap.get(reservationId);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void setQueueForReservation(ReservationId reservationId,
+ String queueName) {
+ writeLock.lock();
+ try {
+ resQMap.put(reservationId, queueName);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public ReservationId getNewReservationId() {
+ writeLock.lock();
+ try {
+ ReservationId resId =
+ ReservationId.newInstance(ResourceManager.getClusterTimeStamp(),
+ resCounter.incrementAndGet());
+ LOG.info("Allocated new reservationId: " + resId);
+ return resId;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public Map getAllPlans() {
+ return plans;
+ }
+
+ /**
+ * Get the default reservation system corresponding to the scheduler
+ *
+ * @param scheduler the scheduler for which the reservation system is required
+ */
+ public static String getDefaultReservationSystem(ResourceScheduler scheduler) {
+ // currently only capacity scheduler is supported
+ if (scheduler instanceof CapacityScheduler) {
+ return CapacityReservationSystem.class.getName();
+ }
+ return null;
+ }
+
+ protected abstract Plan initializePlan(String planQueueName)
+ throws YarnException;
+
+ protected abstract Planner getReplanner(String planQueueName);
+
+ protected abstract ReservationAgent getAgent(String queueName);
+
+ protected abstract SharingPolicy getAdmissionPolicy(String queueName);
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityReservationSystem.java
new file mode 100644
index 00000000000..548fde1e024
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityReservationSystem.java
@@ -0,0 +1,146 @@
+/**
+ *
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the implementation of {@link ReservationSystem} based on the
+ * {@link CapacityScheduler}
+ */
+@LimitedPrivate("yarn")
+@Unstable
+public class CapacityReservationSystem extends AbstractReservationSystem {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(CapacityReservationSystem.class);
+
+ private CapacityScheduler capScheduler;
+
+ public CapacityReservationSystem() {
+ super(CapacityReservationSystem.class.getName());
+ }
+
+ @Override
+ public void reinitialize(Configuration conf, RMContext rmContext)
+ throws YarnException {
+ // Validate if the scheduler is capacity based
+ ResourceScheduler scheduler = rmContext.getScheduler();
+ if (!(scheduler instanceof CapacityScheduler)) {
+ throw new YarnRuntimeException("Class "
+ + scheduler.getClass().getCanonicalName() + " not instance of "
+ + CapacityScheduler.class.getCanonicalName());
+ }
+ capScheduler = (CapacityScheduler) scheduler;
+ this.conf = conf;
+ super.reinitialize(conf, rmContext);
+ }
+
+ @Override
+ protected Plan initializePlan(String planQueueName) throws YarnException {
+ SharingPolicy adPolicy = getAdmissionPolicy(planQueueName);
+ String planQueuePath = capScheduler.getQueue(planQueueName).getQueuePath();
+ adPolicy.init(planQueuePath, capScheduler.getConfiguration());
+ CSQueue planQueue = capScheduler.getQueue(planQueueName);
+ // Calculate the max plan capacity
+ Resource minAllocation = capScheduler.getMinimumResourceCapability();
+ ResourceCalculator rescCalc = capScheduler.getResourceCalculator();
+ Resource totCap =
+ rescCalc.multiplyAndNormalizeDown(capScheduler.getClusterResource(),
+ planQueue.getAbsoluteCapacity(), minAllocation);
+ Plan plan =
+ new InMemoryPlan(capScheduler.getRootQueueMetrics(), adPolicy,
+ getAgent(planQueuePath), totCap, planStepSize, rescCalc,
+ minAllocation, capScheduler.getMaximumResourceCapability(),
+ planQueueName, getReplanner(planQueuePath), capScheduler
+ .getConfiguration().getMoveOnExpiry(planQueuePath));
+ LOG.info("Intialized plan {0} based on reservable queue {1}",
+ plan.toString(), planQueueName);
+ return plan;
+ }
+
+ @Override
+ protected Planner getReplanner(String planQueueName) {
+ CapacitySchedulerConfiguration capSchedulerConfig =
+ capScheduler.getConfiguration();
+ String plannerClassName = capSchedulerConfig.getReplanner(planQueueName);
+ LOG.info("Using Replanner: " + plannerClassName + " for queue: "
+ + planQueueName);
+ try {
+ Class> plannerClazz =
+ capSchedulerConfig.getClassByName(plannerClassName);
+ if (Planner.class.isAssignableFrom(plannerClazz)) {
+ Planner planner =
+ (Planner) ReflectionUtils.newInstance(plannerClazz, conf);
+ planner.init(planQueueName, capSchedulerConfig);
+ return planner;
+ } else {
+ throw new YarnRuntimeException("Class: " + plannerClazz
+ + " not instance of " + Planner.class.getCanonicalName());
+ }
+ } catch (ClassNotFoundException e) {
+ throw new YarnRuntimeException("Could not instantiate Planner: "
+ + plannerClassName + " for queue: " + planQueueName, e);
+ }
+ }
+
+ @Override
+ protected ReservationAgent getAgent(String queueName) {
+ CapacitySchedulerConfiguration capSchedulerConfig =
+ capScheduler.getConfiguration();
+ String agentClassName = capSchedulerConfig.getReservationAgent(queueName);
+ LOG.info("Using Agent: " + agentClassName + " for queue: " + queueName);
+ try {
+ Class> agentClazz = capSchedulerConfig.getClassByName(agentClassName);
+ if (ReservationAgent.class.isAssignableFrom(agentClazz)) {
+ return (ReservationAgent) ReflectionUtils.newInstance(agentClazz, conf);
+ } else {
+ throw new YarnRuntimeException("Class: " + agentClassName
+ + " not instance of " + ReservationAgent.class.getCanonicalName());
+ }
+ } catch (ClassNotFoundException e) {
+ throw new YarnRuntimeException("Could not instantiate Agent: "
+ + agentClassName + " for queue: " + queueName, e);
+ }
+ }
+
+ @Override
+ protected SharingPolicy getAdmissionPolicy(String queueName) {
+ CapacitySchedulerConfiguration capSchedulerConfig =
+ capScheduler.getConfiguration();
+ String admissionPolicyClassName =
+ capSchedulerConfig.getReservationAdmissionPolicy(queueName);
+ LOG.info("Using AdmissionPolicy: " + admissionPolicyClassName
+ + " for queue: " + queueName);
+ try {
+ Class> admissionPolicyClazz =
+ capSchedulerConfig.getClassByName(admissionPolicyClassName);
+ if (SharingPolicy.class.isAssignableFrom(admissionPolicyClazz)) {
+ return (SharingPolicy) ReflectionUtils.newInstance(
+ admissionPolicyClazz, conf);
+ } else {
+ throw new YarnRuntimeException("Class: " + admissionPolicyClassName
+ + " not instance of " + SharingPolicy.class.getCanonicalName());
+ }
+ } catch (ClassNotFoundException e) {
+ throw new YarnRuntimeException("Could not instantiate AdmissionPolicy: "
+ + admissionPolicyClassName + " for queue: " + queueName, e);
+ }
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java
new file mode 100644
index 00000000000..678773de47c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationInputValidator.java
@@ -0,0 +1,244 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+public class ReservationInputValidator {
+
+ private final Clock clock;
+
+ /**
+ * Utility class to validate reservation requests.
+ */
+ public ReservationInputValidator(Clock clock) {
+ this.clock = clock;
+ }
+
+ private Plan validateReservation(ReservationSystem reservationSystem,
+ ReservationId reservationId, String auditConstant) throws YarnException {
+ String message = "";
+ // check if the reservation id is valid
+ if (reservationId == null) {
+ message =
+ "Missing reservation id."
+ + " Please try again by specifying a reservation id.";
+ RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+ "validate reservation input", "ClientRMService", message);
+ throw RPCUtil.getRemoteException(message);
+ }
+ String queueName = reservationSystem.getQueueForReservation(reservationId);
+ if (queueName == null) {
+ message =
+ "The specified reservation with ID: " + reservationId
+ + " is unknown. Please try again with a valid reservation.";
+ RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+ "validate reservation input", "ClientRMService", message);
+ throw RPCUtil.getRemoteException(message);
+ }
+ // check if the associated plan is valid
+ Plan plan = reservationSystem.getPlan(queueName);
+ if (plan == null) {
+ message =
+ "The specified reservation: " + reservationId
+ + " is not associated with any valid plan."
+ + " Please try again with a valid reservation.";
+ RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+ "validate reservation input", "ClientRMService", message);
+ throw RPCUtil.getRemoteException(message);
+ }
+ return plan;
+ }
+
+ private void validateReservationDefinition(ReservationId reservationId,
+ ReservationDefinition contract, Plan plan, String auditConstant)
+ throws YarnException {
+ String message = "";
+ // check if deadline is in the past
+ if (contract == null) {
+ message =
+ "Missing reservation definition."
+ + " Please try again by specifying a reservation definition.";
+ RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+ "validate reservation input definition", "ClientRMService", message);
+ throw RPCUtil.getRemoteException(message);
+ }
+ if (contract.getDeadline() <= clock.getTime()) {
+ message =
+ "The specified deadline: " + contract.getDeadline()
+ + " is the past. Please try again with deadline in the future.";
+ RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+ "validate reservation input definition", "ClientRMService", message);
+ throw RPCUtil.getRemoteException(message);
+ }
+ // Check if at least one RR has been specified
+ ReservationRequests resReqs = contract.getReservationRequests();
+ if (resReqs == null) {
+ message =
+ "No resources have been specified to reserve."
+ + "Please try again by specifying the resources to reserve.";
+ RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+ "validate reservation input definition", "ClientRMService", message);
+ throw RPCUtil.getRemoteException(message);
+ }
+ List resReq = resReqs.getReservationResources();
+ if (resReq == null || resReq.isEmpty()) {
+ message =
+ "No resources have been specified to reserve."
+ + " Please try again by specifying the resources to reserve.";
+ RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+ "validate reservation input definition", "ClientRMService", message);
+ throw RPCUtil.getRemoteException(message);
+ }
+ // compute minimum duration and max gang size
+ long minDuration = 0;
+ Resource maxGangSize = Resource.newInstance(0, 0);
+ ReservationRequestInterpreter type =
+ contract.getReservationRequests().getInterpreter();
+ for (ReservationRequest rr : resReq) {
+ if (type == ReservationRequestInterpreter.R_ALL
+ || type == ReservationRequestInterpreter.R_ANY) {
+ minDuration = Math.max(minDuration, rr.getDuration());
+ } else {
+ minDuration += rr.getDuration();
+ }
+ maxGangSize =
+ Resources.max(plan.getResourceCalculator(), plan.getTotalCapacity(),
+ maxGangSize,
+ Resources.multiply(rr.getCapability(), rr.getConcurrency()));
+ }
+ // verify the allocation is possible (skip for ANY)
+ if (contract.getDeadline() - contract.getArrival() < minDuration
+ && type != ReservationRequestInterpreter.R_ANY) {
+ message =
+ "The time difference ("
+ + (contract.getDeadline() - contract.getArrival())
+ + ") between arrival (" + contract.getArrival() + ") "
+ + "and deadline (" + contract.getDeadline() + ") must "
+ + " be greater or equal to the minimum resource duration ("
+ + minDuration + ")";
+ RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+ "validate reservation input definition", "ClientRMService", message);
+ throw RPCUtil.getRemoteException(message);
+ }
+ // check that the largest gang does not exceed the inventory available
+ // capacity (skip for ANY)
+ if (Resources.greaterThan(plan.getResourceCalculator(),
+ plan.getTotalCapacity(), maxGangSize, plan.getTotalCapacity())
+ && type != ReservationRequestInterpreter.R_ANY) {
+ message =
+ "The size of the largest gang in the reservation refinition ("
+ + maxGangSize + ") exceed the capacity available ("
+ + plan.getTotalCapacity() + " )";
+ RMAuditLogger.logFailure("UNKNOWN", auditConstant,
+ "validate reservation input definition", "ClientRMService", message);
+ throw RPCUtil.getRemoteException(message);
+ }
+ }
+
+ /**
+ * Quick validation on the input to check some obvious fail conditions (fail
+ * fast) the input and returns the appropriate {@link Plan} associated with
+ * the specified {@link Queue} or throws an exception message illustrating the
+ * details of any validation check failures
+ *
+ * @param reservationSystem the {@link ReservationSystem} to validate against
+ * @param request the {@link ReservationSubmissionRequest} defining the
+ * resources required over time for the request
+ * @param reservationId the {@link ReservationId} associated with the current
+ * request
+ * @return the {@link Plan} to submit the request to
+ * @throws YarnException
+ */
+ public Plan validateReservationSubmissionRequest(
+ ReservationSystem reservationSystem,
+ ReservationSubmissionRequest request, ReservationId reservationId)
+ throws YarnException {
+ // Check if it is a managed queue
+ String queueName = request.getQueue();
+ if (queueName == null || queueName.isEmpty()) {
+ String errMsg =
+ "The queue to submit is not specified."
+ + " Please try again with a valid reservable queue.";
+ RMAuditLogger.logFailure("UNKNOWN",
+ AuditConstants.SUBMIT_RESERVATION_REQUEST,
+ "validate reservation input", "ClientRMService", errMsg);
+ throw RPCUtil.getRemoteException(errMsg);
+ }
+ Plan plan = reservationSystem.getPlan(queueName);
+ if (plan == null) {
+ String errMsg =
+ "The specified queue: " + queueName
+ + " is not managed by reservation system."
+ + " Please try again with a valid reservable queue.";
+ RMAuditLogger.logFailure("UNKNOWN",
+ AuditConstants.SUBMIT_RESERVATION_REQUEST,
+ "validate reservation input", "ClientRMService", errMsg);
+ throw RPCUtil.getRemoteException(errMsg);
+ }
+ validateReservationDefinition(reservationId,
+ request.getReservationDefinition(), plan,
+ AuditConstants.SUBMIT_RESERVATION_REQUEST);
+ return plan;
+ }
+
+ /**
+ * Quick validation on the input to check some obvious fail conditions (fail
+ * fast) the input and returns the appropriate {@link Plan} associated with
+ * the specified {@link Queue} or throws an exception message illustrating the
+ * details of any validation check failures
+ *
+ * @param reservationSystem the {@link ReservationSystem} to validate against
+ * @param request the {@link ReservationUpdateRequest} defining the resources
+ * required over time for the request
+ * @return the {@link Plan} to submit the request to
+ * @throws YarnException
+ */
+ public Plan validateReservationUpdateRequest(
+ ReservationSystem reservationSystem, ReservationUpdateRequest request)
+ throws YarnException {
+ ReservationId reservationId = request.getReservationId();
+ Plan plan =
+ validateReservation(reservationSystem, reservationId,
+ AuditConstants.UPDATE_RESERVATION_REQUEST);
+ validateReservationDefinition(reservationId,
+ request.getReservationDefinition(), plan,
+ AuditConstants.UPDATE_RESERVATION_REQUEST);
+ return plan;
+ }
+
+ /**
+ * Quick validation on the input to check some obvious fail conditions (fail
+ * fast) the input and returns the appropriate {@link Plan} associated with
+ * the specified {@link Queue} or throws an exception message illustrating the
+ * details of any validation check failures
+ *
+ * @param reservationSystem the {@link ReservationSystem} to validate against
+ * @param request the {@link ReservationDeleteRequest} defining the resources
+ * required over time for the request
+ * @return the {@link Plan} to submit the request to
+ * @throws YarnException
+ */
+ public Plan validateReservationDeleteRequest(
+ ReservationSystem reservationSystem, ReservationDeleteRequest request)
+ throws YarnException {
+ return validateReservation(reservationSystem, request.getReservationId(),
+ AuditConstants.DELETE_RESERVATION_REQUEST);
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
new file mode 100644
index 00000000000..cb76dcf8e69
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+
+/**
+ * This interface is the one implemented by any system that wants to support
+ * Reservations i.e. make {@link Resource} allocations in future. Implementors
+ * need to bootstrap all configured {@link Plan}s in the active
+ * {@link ResourceScheduler} along with their corresponding
+ * {@link ReservationAgent} and {@link SharingPolicy}. It is also responsible
+ * for managing the {@link PlanFollower} to ensure the {@link Plan}s are in sync
+ * with the {@link ResourceScheduler}.
+ */
+@LimitedPrivate("yarn")
+@Unstable
+public interface ReservationSystem {
+
+ /**
+ * Set RMContext for {@link ReservationSystem}. This method should be called
+ * immediately after instantiating a reservation system once.
+ *
+ * @param rmContext created by {@link ResourceManager}
+ */
+ void setRMContext(RMContext rmContext);
+
+ /**
+ * Re-initialize the {@link ReservationSystem}.
+ *
+ * @param conf configuration
+ * @param rmContext current context of the {@link ResourceManager}
+ * @throws YarnException
+ */
+ void reinitialize(Configuration conf, RMContext rmContext)
+ throws YarnException;
+
+ /**
+ * Get an existing {@link Plan} that has been initialized.
+ *
+ * @param planName the name of the {@link Plan}
+ * @return the {@link Plan} identified by name
+ *
+ */
+ Plan getPlan(String planName);
+
+ /**
+ * Return a map containing all the plans known to this ReservationSystem
+ * (useful for UI)
+ *
+ * @return a Map of Plan names and Plan objects
+ */
+ Map getAllPlans();
+
+ /**
+ * Invokes {@link PlanFollower} to synchronize the specified {@link Plan} with
+ * the {@link ResourceScheduler}
+ *
+ * @param planName the name of the {@link Plan} to be synchronized
+ */
+ void synchronizePlan(String planName);
+
+ /**
+ * Return the time step (ms) at which the {@link PlanFollower} is invoked
+ *
+ * @return the time step (ms) at which the {@link PlanFollower} is invoked
+ */
+ long getPlanFollowerTimeStep();
+
+ /**
+ * Get a new unique {@link ReservationId}.
+ *
+ * @return a new unique {@link ReservationId}
+ *
+ */
+ ReservationId getNewReservationId();
+
+ /**
+ * Get the {@link Queue} that an existing {@link ReservationId} is associated
+ * with.
+ *
+ * @param reservationId the unique id of the reservation
+ * @return the name of the associated Queue
+ *
+ */
+ String getQueueForReservation(ReservationId reservationId);
+
+ /**
+ * Set the {@link Queue} that an existing {@link ReservationId} should be
+ * associated with.
+ *
+ * @param reservationId the unique id of the reservation
+ * @param queueName the name of Queue to associate the reservation with
+ *
+ */
+ void setQueueForReservation(ReservationId reservationId, String queueName);
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index a1ae3ca3c64..624aa18c6bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -236,4 +237,6 @@ public interface RMApp extends EventHandler {
* @return metrics
*/
RMAppMetrics getRMAppMetrics();
+
+ ReservationId getReservationId();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 4899434c480..84ec766c5bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -1284,4 +1285,8 @@ public class RMAppImpl implements RMApp, Recoverable {
this.systemClock = clock;
}
+ @Override
+ public ReservationId getReservationId() {
+ return submissionContext.getReservationID();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
index 7e0b89e20f3..a54e4bfb146 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
@@ -19,25 +19,33 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
public class AppAddedSchedulerEvent extends SchedulerEvent {
private final ApplicationId applicationId;
private final String queue;
private final String user;
+ private final ReservationId reservationID;
private final boolean isAppRecovering;
public AppAddedSchedulerEvent(
ApplicationId applicationId, String queue, String user) {
- this(applicationId, queue, user, false);
+ this(applicationId, queue, user, false, null);
}
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
- String user, boolean isAppRecovering) {
+ String user, ReservationId reservationID) {
+ this(applicationId, queue, user, false, reservationID);
+ }
+
+ public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
+ String user, boolean isAppRecovering, ReservationId reservationID) {
super(SchedulerEventType.APP_ADDED);
this.applicationId = applicationId;
this.queue = queue;
this.user = user;
+ this.reservationID = reservationID;
this.isAppRecovering = isAppRecovering;
}
@@ -56,4 +64,8 @@ public class AppAddedSchedulerEvent extends SchedulerEvent {
public boolean getIsAppRecovering() {
return isAppRecovering;
}
+
+ public ReservationId getReservationID() {
+ return reservationID;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index db867a95316..954e21d72cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -33,6 +33,7 @@ import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -74,6 +75,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -91,7 +98,12 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -107,6 +119,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
@@ -117,11 +130,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -1199,4 +1217,102 @@ public class TestClientRMService {
when(yarnScheduler.getAppResourceUsageReport(attemptId)).thenReturn(null);
return yarnScheduler;
}
+
+ @Test
+ public void testReservationAPIs() {
+ // initialize
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ ReservationSystemTestUtil.setupQueueConfiguration(conf);
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
+ MockRM rm = new MockRM(conf);
+ rm.start();
+ MockNM nm;
+ try {
+ nm = rm.registerNode("127.0.0.1:0", 102400, 100);
+ // allow plan follower to synchronize
+ Thread.sleep(1050);
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+
+ // Create a client.
+ ClientRMService clientService = rm.getClientRMService();
+
+ // create a reservation
+ Clock clock = new UTCClock();
+ long arrival = clock.getTime();
+ long duration = 60000;
+ long deadline = (long) (arrival + 1.05 * duration);
+ ReservationSubmissionRequest sRequest =
+ createSimpleReservationRequest(4, arrival, deadline, duration);
+ ReservationSubmissionResponse sResponse = null;
+ try {
+ sResponse = clientService.submitReservation(sRequest);
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ Assert.assertNotNull(sResponse);
+ ReservationId reservationID = sResponse.getReservationId();
+ Assert.assertNotNull(reservationID);
+ LOG.info("Submit reservation response: " + reservationID);
+
+ // Update the reservation
+ ReservationDefinition rDef = sRequest.getReservationDefinition();
+ ReservationRequest rr =
+ rDef.getReservationRequests().getReservationResources().get(0);
+ rr.setNumContainers(5);
+ arrival = clock.getTime();
+ duration = 30000;
+ deadline = (long) (arrival + 1.05 * duration);
+ rr.setDuration(duration);
+ rDef.setArrival(arrival);
+ rDef.setDeadline(deadline);
+ ReservationUpdateRequest uRequest =
+ ReservationUpdateRequest.newInstance(rDef, reservationID);
+ ReservationUpdateResponse uResponse = null;
+ try {
+ uResponse = clientService.updateReservation(uRequest);
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ Assert.assertNotNull(sResponse);
+ LOG.info("Update reservation response: " + uResponse);
+
+ // Delete the reservation
+ ReservationDeleteRequest dRequest =
+ ReservationDeleteRequest.newInstance(reservationID);
+ ReservationDeleteResponse dResponse = null;
+ try {
+ dResponse = clientService.deleteReservation(dRequest);
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ Assert.assertNotNull(sResponse);
+ LOG.info("Delete reservation response: " + dResponse);
+
+ // clean-up
+ rm.stop();
+ nm = null;
+ rm = null;
+ }
+
+ private ReservationSubmissionRequest createSimpleReservationRequest(
+ int numContainers, long arrival, long deadline, long duration) {
+ // create a request with a single atomic ask
+ ReservationRequest r =
+ ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+ numContainers, 1, duration);
+ ReservationRequests reqs =
+ ReservationRequests.newInstance(Collections.singletonList(r),
+ ReservationRequestInterpreter.R_ALL);
+ ReservationDefinition rDef =
+ ReservationDefinition.newInstance(arrival, deadline, reqs,
+ "testClientRMService#reservation");
+ ReservationSubmissionRequest request =
+ ReservationSubmissionRequest.newInstance(rDef,
+ ReservationSystemTestUtil.reservationQ);
+ return request;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityReservationSystem.java
new file mode 100644
index 00000000000..2a7779110d2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityReservationSystem.java
@@ -0,0 +1,102 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestCapacityReservationSystem {
+
+ @Test
+ public void testInitialize() {
+ ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+ CapacityScheduler capScheduler = null;
+ try {
+ capScheduler = testUtil.mockCapacityScheduler(10);
+ } catch (IOException e) {
+ Assert.fail(e.getMessage());
+ }
+ CapacityReservationSystem reservationSystem =
+ new CapacityReservationSystem();
+ reservationSystem.setRMContext(capScheduler.getRMContext());
+ try {
+ reservationSystem.reinitialize(capScheduler.getConf(),
+ capScheduler.getRMContext());
+ } catch (YarnException e) {
+ Assert.fail(e.getMessage());
+ }
+ String planQName = testUtil.getreservationQueueName();
+ Plan plan = reservationSystem.getPlan(planQName);
+ Assert.assertNotNull(plan);
+ Assert.assertTrue(plan instanceof InMemoryPlan);
+ Assert.assertEquals(planQName, plan.getQueueName());
+ Assert.assertEquals(8192, plan.getTotalCapacity().getMemory());
+ Assert
+ .assertTrue(plan.getReservationAgent() instanceof GreedyReservationAgent);
+ Assert
+ .assertTrue(plan.getSharingPolicy() instanceof CapacityOverTimePolicy);
+ }
+
+ @Test
+ public void testReinitialize() {
+ ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+ CapacityScheduler capScheduler = null;
+ try {
+ capScheduler = testUtil.mockCapacityScheduler(10);
+ } catch (IOException e) {
+ Assert.fail(e.getMessage());
+ }
+ CapacityReservationSystem reservationSystem =
+ new CapacityReservationSystem();
+ CapacitySchedulerConfiguration conf = capScheduler.getConfiguration();
+ RMContext mockContext = capScheduler.getRMContext();
+ reservationSystem.setRMContext(mockContext);
+ try {
+ reservationSystem.reinitialize(capScheduler.getConfiguration(),
+ mockContext);
+ } catch (YarnException e) {
+ Assert.fail(e.getMessage());
+ }
+ // Assert queue in original config
+ String planQName = testUtil.getreservationQueueName();
+ Plan plan = reservationSystem.getPlan(planQName);
+ Assert.assertNotNull(plan);
+ Assert.assertTrue(plan instanceof InMemoryPlan);
+ Assert.assertEquals(planQName, plan.getQueueName());
+ Assert.assertEquals(8192, plan.getTotalCapacity().getMemory());
+ Assert
+ .assertTrue(plan.getReservationAgent() instanceof GreedyReservationAgent);
+ Assert
+ .assertTrue(plan.getSharingPolicy() instanceof CapacityOverTimePolicy);
+
+ // Dynamically add a plan
+ String newQ = "reservation";
+ Assert.assertNull(reservationSystem.getPlan(newQ));
+ testUtil.updateQueueConfiguration(conf, newQ);
+ try {
+ capScheduler.reinitialize(conf, mockContext);
+ } catch (IOException e) {
+ Assert.fail(e.getMessage());
+ }
+ try {
+ reservationSystem.reinitialize(conf, mockContext);
+ } catch (YarnException e) {
+ Assert.fail(e.getMessage());
+ }
+ Plan newPlan = reservationSystem.getPlan(newQ);
+ Assert.assertNotNull(newPlan);
+ Assert.assertTrue(newPlan instanceof InMemoryPlan);
+ Assert.assertEquals(newQ, newPlan.getQueueName());
+ Assert.assertEquals(1024, newPlan.getTotalCapacity().getMemory());
+ Assert
+ .assertTrue(newPlan.getReservationAgent() instanceof GreedyReservationAgent);
+ Assert
+ .assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy);
+
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationInputValidator.java
new file mode 100644
index 00000000000..f5917bbff82
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationInputValidator.java
@@ -0,0 +1,560 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.text.MessageFormat;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationDeleteRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateRequestPBImpl;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestReservationInputValidator {
+
+ private static final Log LOG = LogFactory
+ .getLog(TestReservationInputValidator.class);
+
+ private static final String PLAN_NAME = "test-reservation";
+
+ private Clock clock;
+ private Map plans = new HashMap(1);
+ private ReservationSystem rSystem;
+ private Plan plan;
+
+ private ReservationInputValidator rrValidator;
+
+ @Before
+ public void setUp() {
+ clock = mock(Clock.class);
+ plan = mock(Plan.class);
+ rSystem = mock(ReservationSystem.class);
+ plans.put(PLAN_NAME, plan);
+ rrValidator = new ReservationInputValidator(clock);
+ when(clock.getTime()).thenReturn(1L);
+ ResourceCalculator rCalc = new DefaultResourceCalculator();
+ Resource resource = Resource.newInstance(10240, 10);
+ when(plan.getResourceCalculator()).thenReturn(rCalc);
+ when(plan.getTotalCapacity()).thenReturn(resource);
+ when(rSystem.getQueueForReservation(any(ReservationId.class))).thenReturn(
+ PLAN_NAME);
+ when(rSystem.getPlan(PLAN_NAME)).thenReturn(plan);
+ }
+
+ @After
+ public void tearDown() {
+ rrValidator = null;
+ clock = null;
+ plan = null;
+ }
+
+ @Test
+ public void testSubmitReservationNormal() {
+ ReservationSubmissionRequest request =
+ createSimpleReservationSubmissionRequest(1, 1, 1, 5, 3);
+ Plan plan = null;
+ try {
+ plan =
+ rrValidator.validateReservationSubmissionRequest(rSystem, request,
+ ReservationSystemTestUtil.getNewReservationId());
+ } catch (YarnException e) {
+ Assert.fail(e.getMessage());
+ }
+ Assert.assertNotNull(plan);
+ }
+
+ @Test
+ public void testSubmitReservationDoesnotExist() {
+ ReservationSubmissionRequest request =
+ new ReservationSubmissionRequestPBImpl();
+ Plan plan = null;
+ try {
+ plan =
+ rrValidator.validateReservationSubmissionRequest(rSystem, request,
+ ReservationSystemTestUtil.getNewReservationId());
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message
+ .equals("The queue to submit is not specified. Please try again with a valid reservable queue."));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testSubmitReservationInvalidPlan() {
+ ReservationSubmissionRequest request =
+ createSimpleReservationSubmissionRequest(1, 1, 1, 5, 3);
+ when(rSystem.getPlan(PLAN_NAME)).thenReturn(null);
+ Plan plan = null;
+ try {
+ plan =
+ rrValidator.validateReservationSubmissionRequest(rSystem, request,
+ ReservationSystemTestUtil.getNewReservationId());
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message
+ .endsWith(" is not managed by reservation system. Please try again with a valid reservable queue."));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testSubmitReservationNoDefinition() {
+ ReservationSubmissionRequest request =
+ new ReservationSubmissionRequestPBImpl();
+ request.setQueue(PLAN_NAME);
+ Plan plan = null;
+ try {
+ plan =
+ rrValidator.validateReservationSubmissionRequest(rSystem, request,
+ ReservationSystemTestUtil.getNewReservationId());
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message
+ .equals("Missing reservation definition. Please try again by specifying a reservation definition."));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testSubmitReservationInvalidDeadline() {
+ ReservationSubmissionRequest request =
+ createSimpleReservationSubmissionRequest(1, 1, 1, 0, 3);
+ Plan plan = null;
+ try {
+ plan =
+ rrValidator.validateReservationSubmissionRequest(rSystem, request,
+ ReservationSystemTestUtil.getNewReservationId());
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert.assertTrue(message
+ .startsWith("The specified deadline: 0 is the past"));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testSubmitReservationInvalidRR() {
+ ReservationSubmissionRequest request =
+ createSimpleReservationSubmissionRequest(0, 0, 1, 5, 3);
+ Plan plan = null;
+ try {
+ plan =
+ rrValidator.validateReservationSubmissionRequest(rSystem, request,
+ ReservationSystemTestUtil.getNewReservationId());
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert.assertTrue(message
+ .startsWith("No resources have been specified to reserve"));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testSubmitReservationEmptyRR() {
+ ReservationSubmissionRequest request =
+ createSimpleReservationSubmissionRequest(1, 0, 1, 5, 3);
+ Plan plan = null;
+ try {
+ plan =
+ rrValidator.validateReservationSubmissionRequest(rSystem, request,
+ ReservationSystemTestUtil.getNewReservationId());
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert.assertTrue(message
+ .startsWith("No resources have been specified to reserve"));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testSubmitReservationInvalidDuration() {
+ ReservationSubmissionRequest request =
+ createSimpleReservationSubmissionRequest(1, 1, 1, 3, 4);
+ Plan plan = null;
+ try {
+ plan =
+ rrValidator.validateReservationSubmissionRequest(rSystem, request,
+ ReservationSystemTestUtil.getNewReservationId());
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert.assertTrue(message.startsWith("The time difference"));
+ Assert
+ .assertTrue(message
+ .contains("must be greater or equal to the minimum resource duration"));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testSubmitReservationExceedsGangSize() {
+ ReservationSubmissionRequest request =
+ createSimpleReservationSubmissionRequest(1, 1, 1, 5, 4);
+ Resource resource = Resource.newInstance(512, 1);
+ when(plan.getTotalCapacity()).thenReturn(resource);
+ Plan plan = null;
+ try {
+ plan =
+ rrValidator.validateReservationSubmissionRequest(rSystem, request,
+ ReservationSystemTestUtil.getNewReservationId());
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message
+ .startsWith("The size of the largest gang in the reservation refinition"));
+ Assert.assertTrue(message.contains("exceed the capacity available "));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testUpdateReservationNormal() {
+ ReservationUpdateRequest request =
+ createSimpleReservationUpdateRequest(1, 1, 1, 5, 3);
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+ } catch (YarnException e) {
+ Assert.fail(e.getMessage());
+ }
+ Assert.assertNotNull(plan);
+ }
+
+ @Test
+ public void testUpdateReservationNoID() {
+ ReservationUpdateRequest request = new ReservationUpdateRequestPBImpl();
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message
+ .startsWith("Missing reservation id. Please try again by specifying a reservation id."));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testUpdateReservationDoesnotExist() {
+ ReservationUpdateRequest request =
+ createSimpleReservationUpdateRequest(1, 1, 1, 5, 4);
+ ReservationId rId = request.getReservationId();
+ when(rSystem.getQueueForReservation(rId)).thenReturn(null);
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message.equals(MessageFormat
+ .format(
+ "The specified reservation with ID: {0} is unknown. Please try again with a valid reservation.",
+ rId)));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testUpdateReservationInvalidPlan() {
+ ReservationUpdateRequest request =
+ createSimpleReservationUpdateRequest(1, 1, 1, 5, 4);
+ when(rSystem.getPlan(PLAN_NAME)).thenReturn(null);
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message
+ .endsWith(" is not associated with any valid plan. Please try again with a valid reservation."));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testUpdateReservationNoDefinition() {
+ ReservationUpdateRequest request = new ReservationUpdateRequestPBImpl();
+ request.setReservationId(ReservationSystemTestUtil.getNewReservationId());
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message
+ .startsWith("Missing reservation definition. Please try again by specifying a reservation definition."));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testUpdateReservationInvalidDeadline() {
+ ReservationUpdateRequest request =
+ createSimpleReservationUpdateRequest(1, 1, 1, 0, 3);
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert.assertTrue(message
+ .startsWith("The specified deadline: 0 is the past"));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testUpdateReservationInvalidRR() {
+ ReservationUpdateRequest request =
+ createSimpleReservationUpdateRequest(0, 0, 1, 5, 3);
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert.assertTrue(message
+ .startsWith("No resources have been specified to reserve"));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testUpdateReservationEmptyRR() {
+ ReservationUpdateRequest request =
+ createSimpleReservationUpdateRequest(1, 0, 1, 5, 3);
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert.assertTrue(message
+ .startsWith("No resources have been specified to reserve"));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testUpdateReservationInvalidDuration() {
+ ReservationUpdateRequest request =
+ createSimpleReservationUpdateRequest(1, 1, 1, 3, 4);
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message
+ .contains("must be greater or equal to the minimum resource duration"));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testUpdateReservationExceedsGangSize() {
+ ReservationUpdateRequest request =
+ createSimpleReservationUpdateRequest(1, 1, 1, 5, 4);
+ Resource resource = Resource.newInstance(512, 1);
+ when(plan.getTotalCapacity()).thenReturn(resource);
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message
+ .startsWith("The size of the largest gang in the reservation refinition"));
+ Assert.assertTrue(message.contains("exceed the capacity available "));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testDeleteReservationNormal() {
+ ReservationDeleteRequest request = new ReservationDeleteRequestPBImpl();
+ ReservationId reservationID =
+ ReservationSystemTestUtil.getNewReservationId();
+ request.setReservationId(reservationID);
+ ReservationAllocation reservation = mock(ReservationAllocation.class);
+ when(plan.getReservationById(reservationID)).thenReturn(reservation);
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationDeleteRequest(rSystem, request);
+ } catch (YarnException e) {
+ Assert.fail(e.getMessage());
+ }
+ Assert.assertNotNull(plan);
+ }
+
+ @Test
+ public void testDeleteReservationNoID() {
+ ReservationDeleteRequest request = new ReservationDeleteRequestPBImpl();
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationDeleteRequest(rSystem, request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message
+ .startsWith("Missing reservation id. Please try again by specifying a reservation id."));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testDeleteReservationDoesnotExist() {
+ ReservationDeleteRequest request = new ReservationDeleteRequestPBImpl();
+ ReservationId rId = ReservationSystemTestUtil.getNewReservationId();
+ request.setReservationId(rId);
+ when(rSystem.getQueueForReservation(rId)).thenReturn(null);
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationDeleteRequest(rSystem, request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message.equals(MessageFormat
+ .format(
+ "The specified reservation with ID: {0} is unknown. Please try again with a valid reservation.",
+ rId)));
+ LOG.info(message);
+ }
+ }
+
+ @Test
+ public void testDeleteReservationInvalidPlan() {
+ ReservationDeleteRequest request = new ReservationDeleteRequestPBImpl();
+ ReservationId reservationID =
+ ReservationSystemTestUtil.getNewReservationId();
+ request.setReservationId(reservationID);
+ when(rSystem.getPlan(PLAN_NAME)).thenReturn(null);
+ Plan plan = null;
+ try {
+ plan = rrValidator.validateReservationDeleteRequest(rSystem, request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertNull(plan);
+ String message = e.getMessage();
+ Assert
+ .assertTrue(message
+ .endsWith(" is not associated with any valid plan. Please try again with a valid reservation."));
+ LOG.info(message);
+ }
+ }
+
+ private ReservationSubmissionRequest createSimpleReservationSubmissionRequest(
+ int numRequests, int numContainers, long arrival, long deadline,
+ long duration) {
+ // create a request with a single atomic ask
+ ReservationSubmissionRequest request =
+ new ReservationSubmissionRequestPBImpl();
+ ReservationDefinition rDef = new ReservationDefinitionPBImpl();
+ rDef.setArrival(arrival);
+ rDef.setDeadline(deadline);
+ if (numRequests > 0) {
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ rDef.setReservationRequests(reqs);
+ if (numContainers > 0) {
+ ReservationRequest r =
+ ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+ numContainers, 1, duration);
+
+ reqs.setReservationResources(Collections.singletonList(r));
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+ }
+ }
+ request.setQueue(PLAN_NAME);
+ request.setReservationDefinition(rDef);
+ return request;
+ }
+
+ private ReservationUpdateRequest createSimpleReservationUpdateRequest(
+ int numRequests, int numContainers, long arrival, long deadline,
+ long duration) {
+ // create a request with a single atomic ask
+ ReservationUpdateRequest request = new ReservationUpdateRequestPBImpl();
+ ReservationDefinition rDef = new ReservationDefinitionPBImpl();
+ rDef.setArrival(arrival);
+ rDef.setDeadline(deadline);
+ if (numRequests > 0) {
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ rDef.setReservationRequests(reqs);
+ if (numContainers > 0) {
+ ReservationRequest r =
+ ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+ numContainers, 1, duration);
+
+ reqs.setReservationResources(Collections.singletonList(r));
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+ }
+ }
+ request.setReservationDefinition(rDef);
+ request.setReservationId(ReservationSystemTestUtil.getNewReservationId());
+ return request;
+ }
+
+}