YARN-2080. Integrating reservation system with ResourceManager and client-RM protocol. Contributed by Subru Krishnan and Carlo Curino.

(cherry picked from commit 8baeaead85)

Conflicts:
	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
This commit is contained in:
subru 2014-09-18 15:30:27 -07:00 committed by Chris Douglas
parent 3418c56bcf
commit 6261f7cc69
21 changed files with 2165 additions and 2 deletions

View File

@ -20,3 +20,6 @@ on user reservations. (Carlo Curino and Subru Krishnan via curino)
YARN-1712. Plan follower that synchronizes the current state of reservation YARN-1712. Plan follower that synchronizes the current state of reservation
subsystem with the scheduler. (Subru Krishnan and Carlo Curino via subru) subsystem with the scheduler. (Subru Krishnan and Carlo Curino via subru)
YARN-2080. Integrating reservation system with ResourceManager and
client-RM protocol. (Subru Krishnan and Carlo Curino via subru)

View File

@ -176,6 +176,25 @@ public class YarnConfiguration extends Configuration {
public static final boolean DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME = public static final boolean DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME =
false; false;
/** Whether the RM should enable Reservation System */
public static final String RM_RESERVATION_SYSTEM_ENABLE = RM_PREFIX
+ "reservation-system.enable";
public static final boolean DEFAULT_RM_RESERVATION_SYSTEM_ENABLE = false;
/** The class to use as the Reservation System. */
public static final String RM_RESERVATION_SYSTEM_CLASS = RM_PREFIX
+ "reservation-system.class";
/** The PlanFollower for the Reservation System. */
public static final String RM_RESERVATION_SYSTEM_PLAN_FOLLOWER = RM_PREFIX
+ "reservation-system.plan.follower";
/** The step size of the Reservation System. */
public static final String RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP =
RM_PREFIX + "reservation-system.planfollower.time-step";
public static final long DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP =
1000L;
/** /**
* Enable periodic monitor threads. * Enable periodic monitor threads.
* @see #RM_SCHEDULER_MONITOR_POLICIES * @see #RM_SCHEDULER_MONITOR_POLICIES

View File

@ -27,10 +27,17 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
@ -43,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@ -474,4 +482,98 @@ public abstract class YarnClient extends AbstractService {
*/ */
public abstract void moveApplicationAcrossQueues(ApplicationId appId, public abstract void moveApplicationAcrossQueues(ApplicationId appId,
String queue) throws YarnException, IOException; String queue) throws YarnException, IOException;
/**
* <p>
* The interface used by clients to submit a new reservation to the
* {@link ResourceManager}.
* </p>
*
* <p>
* The client packages all details of its request in a
* {@link ReservationRequest} object. This contains information about the
* amount of capacity, temporal constraints, and gang needs. Furthermore, the
* reservation might be composed of multiple stages, with ordering
* dependencies among them.
* </p>
*
* <p>
* In order to respond, a new admission control component in the
* {@link ResourceManager} performs an analysis of the resources that have
* been committed over the period of time the user is requesting, verify that
* the user requests can be fulfilled, and that it respect a sharing policy
* (e.g., {@link CapacityOverTimePolicy}). Once it has positively determined
* that the ReservationRequest is satisfiable the {@link ResourceManager}
* answers with a {@link ReservationResponse} that include a
* {@link ReservationId}. Upon failure to find a valid allocation the response
* is an exception with the message detailing the reason of failure.
* </p>
*
* <p>
* The semantics guarantees that the ReservationId returned, corresponds to a
* valid reservation existing in the time-range request by the user. The
* amount of capacity dedicated to such reservation can vary overtime,
* depending of the allocation that has been determined. But it is guaranteed
* to satisfy all the constraint expressed by the user in the
* {@link ReservationRequest}
* </p>
*
* @param request request to submit a new Reservation
* @return response contains the {@link ReservationId} on accepting the
* submission
* @throws YarnException if the reservation cannot be created successfully
* @throws IOException
*
*/
@Public
@Unstable
public abstract ReservationSubmissionResponse submitReservation(
ReservationSubmissionRequest request) throws YarnException, IOException;
/**
* <p>
* The interface used by clients to update an existing Reservation. This is
* referred to as a re-negotiation process, in which a user that has
* previously submitted a Reservation.
* </p>
*
* <p>
* The allocation is attempted by virtually substituting all previous
* allocations related to this Reservation with new ones, that satisfy the new
* {@link ReservationRequest}. Upon success the previous allocation is
* atomically substituted by the new one, and on failure (i.e., if the system
* cannot find a valid allocation for the updated request), the previous
* allocation remains valid.
* </p>
*
* @param request to update an existing Reservation (the ReservationRequest
* should refer to an existing valid {@link ReservationId})
* @return response empty on successfully updating the existing reservation
* @throws YarnException if the request is invalid or reservation cannot be
* updated successfully
* @throws IOException
*
*/
@Public
@Unstable
public abstract ReservationUpdateResponse updateReservation(
ReservationUpdateRequest request) throws YarnException, IOException;
/**
* <p>
* The interface used by clients to remove an existing Reservation.
* </p>
*
* @param request to remove an existing Reservation (the ReservationRequest
* should refer to an existing valid {@link ReservationId})
* @return response empty on successfully deleting the existing reservation
* @throws YarnException if the request is invalid or reservation cannot be
* deleted successfully
* @throws IOException
*
*/
@Public
@Unstable
public abstract ReservationDeleteResponse deleteReservation(
ReservationDeleteRequest request) throws YarnException, IOException;
} }

View File

@ -63,6 +63,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
@ -646,4 +652,23 @@ public class YarnClientImpl extends YarnClient {
MoveApplicationAcrossQueuesRequest.newInstance(appId, queue); MoveApplicationAcrossQueuesRequest.newInstance(appId, queue);
rmClient.moveApplicationAcrossQueues(request); rmClient.moveApplicationAcrossQueues(request);
} }
@Override
public ReservationSubmissionResponse submitReservation(
ReservationSubmissionRequest request) throws YarnException, IOException {
return rmClient.submitReservation(request);
}
@Override
public ReservationUpdateResponse updateReservation(
ReservationUpdateRequest request) throws YarnException, IOException {
return rmClient.updateReservation(request);
}
@Override
public ReservationDeleteResponse deleteReservation(
ReservationDeleteRequest request) throws YarnException, IOException {
return rmClient.deleteReservation(request);
}
} }

View File

@ -31,6 +31,7 @@ import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -63,6 +64,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
@ -76,6 +83,11 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@ -89,8 +101,14 @@ import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
@ -834,4 +852,101 @@ public class TestYarnClient {
client.stop(); client.stop();
} }
} }
@Test
public void testReservationAPIs() {
// initialize
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
ReservationSystemTestUtil.setupQueueConfiguration(conf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
MiniYARNCluster cluster =
new MiniYARNCluster("testReservationAPIs", 2, 1, 1);
YarnClient client = null;
try {
cluster.init(conf);
cluster.start();
final Configuration yarnConf = cluster.getConfig();
client = YarnClient.createYarnClient();
client.init(yarnConf);
client.start();
// create a reservation
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
createSimpleReservationRequest(4, arrival, deadline, duration);
ReservationSubmissionResponse sResponse = null;
try {
sResponse = client.submitReservation(sRequest);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(sResponse);
ReservationId reservationID = sResponse.getReservationId();
Assert.assertNotNull(reservationID);
System.out.println("Submit reservation response: " + reservationID);
// Update the reservation
ReservationDefinition rDef = sRequest.getReservationDefinition();
ReservationRequest rr =
rDef.getReservationRequests().getReservationResources().get(0);
rr.setNumContainers(5);
arrival = clock.getTime();
duration = 30000;
deadline = (long) (arrival + 1.05 * duration);
rr.setDuration(duration);
rDef.setArrival(arrival);
rDef.setDeadline(deadline);
ReservationUpdateRequest uRequest =
ReservationUpdateRequest.newInstance(rDef, reservationID);
ReservationUpdateResponse uResponse = null;
try {
uResponse = client.updateReservation(uRequest);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(sResponse);
System.out.println("Update reservation response: " + uResponse);
// Delete the reservation
ReservationDeleteRequest dRequest =
ReservationDeleteRequest.newInstance(reservationID);
ReservationDeleteResponse dResponse = null;
try {
dResponse = client.deleteReservation(dRequest);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(sResponse);
System.out.println("Delete reservation response: " + dResponse);
} finally {
// clean-up
if (client != null) {
client.stop();
}
cluster.stop();
}
}
private ReservationSubmissionRequest createSimpleReservationRequest(
int numContainers, long arrival, long deadline, long duration) {
// create a request with a single atomic ask
ReservationRequest r =
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
numContainers, 1, duration);
ReservationRequests reqs =
ReservationRequests.newInstance(Collections.singletonList(r),
ReservationRequestInterpreter.R_ALL);
ReservationDefinition rDef =
ReservationDefinition.newInstance(arrival, deadline, reqs,
"testYarnClient#reservation");
ReservationSubmissionRequest request =
ReservationSubmissionRequest.newInstance(rDef,
ReservationSystemTestUtil.reservationQ);
return request;
}
} }

View File

@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMapp
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
@ -348,6 +349,11 @@ public class AdminService extends CompositeService implements
recordFactory.newRecordInstance(RefreshQueuesResponse.class); recordFactory.newRecordInstance(RefreshQueuesResponse.class);
try { try {
rmContext.getScheduler().reinitialize(getConfig(), this.rmContext); rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
// refresh the reservation system
ReservationSystem rSystem = rmContext.getReservationSystem();
if (rSystem != null) {
rSystem.reinitialize(getConfig(), rmContext);
}
RMAuditLogger.logSuccess(user.getShortUserName(), argName, RMAuditLogger.logSuccess(user.getShortUserName(), argName,
"AdminService"); "AdminService");
return response; return response;

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.security.AccessControlException; import java.security.AccessControlException;
import java.text.MessageFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -79,6 +80,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesReq
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@ -93,6 +100,8 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@ -107,6 +116,10 @@ import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInputValidator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@ -123,7 +136,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenS
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.UTCClock;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
@ -154,10 +169,23 @@ public class ClientRMService extends AbstractService implements
private final ApplicationACLsManager applicationsACLsManager; private final ApplicationACLsManager applicationsACLsManager;
private final QueueACLsManager queueACLsManager; private final QueueACLsManager queueACLsManager;
// For Reservation APIs
private Clock clock;
private ReservationSystem reservationSystem;
private ReservationInputValidator rValidator;
public ClientRMService(RMContext rmContext, YarnScheduler scheduler, public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager, RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager,
QueueACLsManager queueACLsManager, QueueACLsManager queueACLsManager,
RMDelegationTokenSecretManager rmDTSecretManager) { RMDelegationTokenSecretManager rmDTSecretManager) {
this(rmContext, scheduler, rmAppManager, applicationACLsManager,
queueACLsManager, rmDTSecretManager, new UTCClock());
}
public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager,
QueueACLsManager queueACLsManager,
RMDelegationTokenSecretManager rmDTSecretManager, Clock clock) {
super(ClientRMService.class.getName()); super(ClientRMService.class.getName());
this.scheduler = scheduler; this.scheduler = scheduler;
this.rmContext = rmContext; this.rmContext = rmContext;
@ -165,6 +193,9 @@ public class ClientRMService extends AbstractService implements
this.applicationsACLsManager = applicationACLsManager; this.applicationsACLsManager = applicationACLsManager;
this.queueACLsManager = queueACLsManager; this.queueACLsManager = queueACLsManager;
this.rmDTSecretManager = rmDTSecretManager; this.rmDTSecretManager = rmDTSecretManager;
this.reservationSystem = rmContext.getReservationSystem();
this.clock = clock;
this.rValidator = new ReservationInputValidator(clock);
} }
@Override @Override
@ -1033,4 +1064,174 @@ public class ClientRMService extends AbstractService implements
public Server getServer() { public Server getServer() {
return this.server; return this.server;
} }
@Override
public ReservationSubmissionResponse submitReservation(
ReservationSubmissionRequest request) throws YarnException, IOException {
// Check if reservation system is enabled
checkReservationSytem(AuditConstants.SUBMIT_RESERVATION_REQUEST);
ReservationSubmissionResponse response =
recordFactory.newRecordInstance(ReservationSubmissionResponse.class);
// Create a new Reservation Id
ReservationId reservationId = reservationSystem.getNewReservationId();
// Validate the input
Plan plan =
rValidator.validateReservationSubmissionRequest(reservationSystem,
request, reservationId);
// Check ACLs
String queueName = request.getQueue();
String user =
checkReservationACLs(queueName,
AuditConstants.SUBMIT_RESERVATION_REQUEST);
try {
// Try to place the reservation using the agent
boolean result =
plan.getReservationAgent().createReservation(reservationId, user,
plan, request.getReservationDefinition());
if (result) {
// add the reservation id to valid ones maintained by reservation
// system
reservationSystem.setQueueForReservation(reservationId, queueName);
// create the reservation synchronously if required
refreshScheduler(queueName, request.getReservationDefinition(),
reservationId.toString());
// return the reservation id
response.setReservationId(reservationId);
}
} catch (PlanningException e) {
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_RESERVATION_REQUEST,
e.getMessage(), "ClientRMService",
"Unable to create the reservation: " + reservationId);
throw RPCUtil.getRemoteException(e);
}
RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_RESERVATION_REQUEST,
"ClientRMService: " + reservationId);
return response;
}
@Override
public ReservationUpdateResponse updateReservation(
ReservationUpdateRequest request) throws YarnException, IOException {
// Check if reservation system is enabled
checkReservationSytem(AuditConstants.UPDATE_RESERVATION_REQUEST);
ReservationUpdateResponse response =
recordFactory.newRecordInstance(ReservationUpdateResponse.class);
// Validate the input
Plan plan =
rValidator.validateReservationUpdateRequest(reservationSystem, request);
ReservationId reservationId = request.getReservationId();
String queueName = reservationSystem.getQueueForReservation(reservationId);
// Check ACLs
String user =
checkReservationACLs(queueName,
AuditConstants.UPDATE_RESERVATION_REQUEST);
// Try to update the reservation using default agent
try {
boolean result =
plan.getReservationAgent().updateReservation(reservationId, user,
plan, request.getReservationDefinition());
if (!result) {
String errMsg = "Unable to update reservation: " + reservationId;
RMAuditLogger.logFailure(user,
AuditConstants.UPDATE_RESERVATION_REQUEST, errMsg,
"ClientRMService", errMsg);
throw RPCUtil.getRemoteException(errMsg);
}
} catch (PlanningException e) {
RMAuditLogger.logFailure(user, AuditConstants.UPDATE_RESERVATION_REQUEST,
e.getMessage(), "ClientRMService",
"Unable to update the reservation: " + reservationId);
throw RPCUtil.getRemoteException(e);
}
RMAuditLogger.logSuccess(user, AuditConstants.UPDATE_RESERVATION_REQUEST,
"ClientRMService: " + reservationId);
return response;
}
@Override
public ReservationDeleteResponse deleteReservation(
ReservationDeleteRequest request) throws YarnException, IOException {
// Check if reservation system is enabled
checkReservationSytem(AuditConstants.DELETE_RESERVATION_REQUEST);
ReservationDeleteResponse response =
recordFactory.newRecordInstance(ReservationDeleteResponse.class);
// Validate the input
Plan plan =
rValidator.validateReservationDeleteRequest(reservationSystem, request);
ReservationId reservationId = request.getReservationId();
String queueName = reservationSystem.getQueueForReservation(reservationId);
// Check ACLs
String user =
checkReservationACLs(queueName,
AuditConstants.DELETE_RESERVATION_REQUEST);
// Try to update the reservation using default agent
try {
boolean result =
plan.getReservationAgent().deleteReservation(reservationId, user,
plan);
if (!result) {
String errMsg = "Could not delete reservation: " + reservationId;
RMAuditLogger.logFailure(user,
AuditConstants.DELETE_RESERVATION_REQUEST, errMsg,
"ClientRMService", errMsg);
throw RPCUtil.getRemoteException(errMsg);
}
} catch (PlanningException e) {
RMAuditLogger.logFailure(user, AuditConstants.DELETE_RESERVATION_REQUEST,
e.getMessage(), "ClientRMService",
"Unable to delete the reservation: " + reservationId);
throw RPCUtil.getRemoteException(e);
}
RMAuditLogger.logSuccess(user, AuditConstants.DELETE_RESERVATION_REQUEST,
"ClientRMService: " + reservationId);
return response;
}
private void checkReservationSytem(String auditConstant) throws YarnException {
// Check if reservation is enabled
if (reservationSystem == null) {
throw RPCUtil.getRemoteException("Reservation is not enabled."
+ " Please enable & try again");
}
}
private void refreshScheduler(String planName,
ReservationDefinition contract, String reservationId) {
if ((contract.getArrival() - clock.getTime()) < reservationSystem
.getPlanFollowerTimeStep()) {
LOG.debug(MessageFormat
.format(
"Reservation {0} is within threshold so attempting to create synchronously.",
reservationId));
reservationSystem.synchronizePlan(planName);
LOG.info(MessageFormat.format("Created reservation {0} synchronously.",
reservationId));
}
}
private String checkReservationACLs(String queueName, String auditConstant)
throws YarnException {
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie) {
RMAuditLogger.logFailure("UNKNOWN", auditConstant, queueName,
"ClientRMService", "Error getting UGI");
throw RPCUtil.getRemoteException(ie);
}
// Check if user has access on the managed queue
if (!queueACLsManager.checkAccess(callerUGI, QueueACL.SUBMIT_APPLICATIONS,
queueName)) {
RMAuditLogger.logFailure(
callerUGI.getShortUserName(),
auditConstant,
"User doesn't have permissions to "
+ QueueACL.SUBMIT_APPLICATIONS.toString(), "ClientRMService",
AuditConstants.UNAUTHORIZED_USER);
throw RPCUtil.getRemoteException(new AccessControlException("User "
+ callerUGI.getShortUserName() + " cannot perform operation "
+ QueueACL.SUBMIT_APPLICATIONS.name() + " on queue" + queueName));
}
return callerUGI.getShortUserName();
}
} }

View File

@ -56,6 +56,11 @@ public class RMAuditLogger {
// Some commonly used descriptions // Some commonly used descriptions
public static final String UNAUTHORIZED_USER = "Unauthorized user"; public static final String UNAUTHORIZED_USER = "Unauthorized user";
// For Reservation system
public static final String SUBMIT_RESERVATION_REQUEST = "Submit Reservation Request";
public static final String UPDATE_RESERVATION_REQUEST = "Update Reservation Request";
public static final String DELETE_RESERVATION_REQUEST = "Delete Reservation Request";
} }
/** /**

View File

@ -109,5 +109,7 @@ public interface RMContext {
long getEpoch(); long getEpoch();
ReservationSystem getReservationSystem();
boolean isSchedulerReadyForAllocatingContainers(); boolean isSchedulerReadyForAllocatingContainers();
} }

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
@ -83,6 +84,7 @@ public class RMContextImpl implements RMContext {
private ClientRMService clientRMService; private ClientRMService clientRMService;
private RMDelegationTokenSecretManager rmDelegationTokenSecretManager; private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;
private ResourceScheduler scheduler; private ResourceScheduler scheduler;
private ReservationSystem reservationSystem;
private NodesListManager nodesListManager; private NodesListManager nodesListManager;
private ResourceTrackerService resourceTrackerService; private ResourceTrackerService resourceTrackerService;
private ApplicationMasterService applicationMasterService; private ApplicationMasterService applicationMasterService;
@ -208,6 +210,11 @@ public class RMContextImpl implements RMContext {
return this.scheduler; return this.scheduler;
} }
@Override
public ReservationSystem getReservationSystem() {
return this.reservationSystem;
}
@Override @Override
public NodesListManager getNodesListManager() { public NodesListManager getNodesListManager() {
return this.nodesListManager; return this.nodesListManager;
@ -303,6 +310,10 @@ public class RMContextImpl implements RMContext {
void setScheduler(ResourceScheduler scheduler) { void setScheduler(ResourceScheduler scheduler) {
this.scheduler = scheduler; this.scheduler = scheduler;
} }
void setReservationSystem(ReservationSystem reservationSystem) {
this.reservationSystem = reservationSystem;
}
void setDelegationTokenRenewer( void setDelegationTokenRenewer(
DelegationTokenRenewer delegationTokenRenewer) { DelegationTokenRenewer delegationTokenRenewer) {

View File

@ -72,6 +72,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.AbstractReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@ -147,6 +149,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected RMSecretManagerService rmSecretManagerService; protected RMSecretManagerService rmSecretManagerService;
protected ResourceScheduler scheduler; protected ResourceScheduler scheduler;
protected ReservationSystem reservationSystem;
private ClientRMService clientRM; private ClientRMService clientRM;
protected ApplicationMasterService masterService; protected ApplicationMasterService masterService;
protected NMLivelinessMonitor nmLivelinessMonitor; protected NMLivelinessMonitor nmLivelinessMonitor;
@ -281,6 +284,29 @@ public class ResourceManager extends CompositeService implements Recoverable {
} }
} }
protected ReservationSystem createReservationSystem() {
String reservationClassName =
conf.get(YarnConfiguration.RM_RESERVATION_SYSTEM_CLASS,
AbstractReservationSystem.getDefaultReservationSystem(scheduler));
if (reservationClassName == null) {
return null;
}
LOG.info("Using ReservationSystem: " + reservationClassName);
try {
Class<?> reservationClazz = Class.forName(reservationClassName);
if (ReservationSystem.class.isAssignableFrom(reservationClazz)) {
return (ReservationSystem) ReflectionUtils.newInstance(
reservationClazz, this.conf);
} else {
throw new YarnRuntimeException("Class: " + reservationClassName
+ " not instance of " + ReservationSystem.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException(
"Could not instantiate ReservationSystem: " + reservationClassName, e);
}
}
protected ApplicationMasterLauncher createAMLauncher() { protected ApplicationMasterLauncher createAMLauncher() {
return new ApplicationMasterLauncher(this.rmContext); return new ApplicationMasterLauncher(this.rmContext);
} }
@ -456,6 +482,18 @@ public class ResourceManager extends CompositeService implements Recoverable {
DefaultMetricsSystem.initialize("ResourceManager"); DefaultMetricsSystem.initialize("ResourceManager");
JvmMetrics.initSingleton("ResourceManager", null); JvmMetrics.initSingleton("ResourceManager", null);
// Initialize the Reservation system
if (conf.getBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE,
YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_ENABLE)) {
reservationSystem = createReservationSystem();
if (reservationSystem != null) {
reservationSystem.setRMContext(rmContext);
addIfService(reservationSystem);
rmContext.setReservationSystem(reservationSystem);
LOG.info("Initialized Reservation system");
}
}
// creating monitors that handle preemption // creating monitors that handle preemption
createPolicyMonitors(); createPolicyMonitors();

View File

@ -0,0 +1,323 @@
/**
*
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.UTCClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This is the implementation of {@link ReservationSystem} based on the
* {@link ResourceScheduler}
*/
@LimitedPrivate("yarn")
@Unstable
public abstract class AbstractReservationSystem extends AbstractService
implements ReservationSystem {
private static final Logger LOG = LoggerFactory
.getLogger(AbstractReservationSystem.class);
// private static final String DEFAULT_CAPACITY_SCHEDULER_PLAN
private final ReentrantReadWriteLock readWriteLock =
new ReentrantReadWriteLock(true);
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
private boolean initialized = false;
private final Clock clock = new UTCClock();
private AtomicLong resCounter = new AtomicLong();
private Map<String, Plan> plans = new HashMap<String, Plan>();
private Map<ReservationId, String> resQMap =
new HashMap<ReservationId, String>();
private RMContext rmContext;
private ResourceScheduler scheduler;
private ScheduledExecutorService scheduledExecutorService;
protected Configuration conf;
protected long planStepSize;
private PlanFollower planFollower;
/**
* Construct the service.
*
* @param name service name
*/
public AbstractReservationSystem(String name) {
super(name);
}
@Override
public void setRMContext(RMContext rmContext) {
writeLock.lock();
try {
this.rmContext = rmContext;
} finally {
writeLock.unlock();
}
}
@Override
public void reinitialize(Configuration conf, RMContext rmContext)
throws YarnException {
writeLock.lock();
try {
if (!initialized) {
initialize(conf);
initialized = true;
} else {
initializeNewPlans(conf);
}
} finally {
writeLock.unlock();
}
}
private void initialize(Configuration conf) throws YarnException {
LOG.info("Initializing Reservation system");
this.conf = conf;
scheduler = rmContext.getScheduler();
// Get the plan step size
planStepSize =
conf.getTimeDuration(
YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP,
TimeUnit.MILLISECONDS);
if (planStepSize < 0) {
planStepSize =
YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP;
}
// Create a plan corresponding to every reservable queue
Set<String> planQueueNames = scheduler.getPlanQueues();
for (String planQueueName : planQueueNames) {
Plan plan = initializePlan(planQueueName);
plans.put(planQueueName, plan);
}
}
private void initializeNewPlans(Configuration conf) {
LOG.info("Refreshing Reservation system");
writeLock.lock();
try {
// Create a plan corresponding to every new reservable queue
Set<String> planQueueNames = scheduler.getPlanQueues();
for (String planQueueName : planQueueNames) {
if (!plans.containsKey(planQueueName)) {
Plan plan = initializePlan(planQueueName);
plans.put(planQueueName, plan);
} else {
LOG.warn("Plan based on reservation queue {0} already exists.",
planQueueName);
}
}
// Update the plan follower with the active plans
if (planFollower != null) {
planFollower.setPlans(plans.values());
}
} catch (YarnException e) {
LOG.warn("Exception while trying to refresh reservable queues", e);
} finally {
writeLock.unlock();
}
}
private PlanFollower createPlanFollower() {
String planFollowerPolicyClassName =
conf.get(YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER,
getDefaultPlanFollower());
if (planFollowerPolicyClassName == null) {
return null;
}
LOG.info("Using PlanFollowerPolicy: " + planFollowerPolicyClassName);
try {
Class<?> planFollowerPolicyClazz =
conf.getClassByName(planFollowerPolicyClassName);
if (PlanFollower.class.isAssignableFrom(planFollowerPolicyClazz)) {
return (PlanFollower) ReflectionUtils.newInstance(
planFollowerPolicyClazz, conf);
} else {
throw new YarnRuntimeException("Class: " + planFollowerPolicyClassName
+ " not instance of " + PlanFollower.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException(
"Could not instantiate PlanFollowerPolicy: "
+ planFollowerPolicyClassName, e);
}
}
private String getDefaultPlanFollower() {
// currently only capacity scheduler is supported
if (scheduler instanceof CapacityScheduler) {
return CapacitySchedulerPlanFollower.class.getName();
}
return null;
}
@Override
public Plan getPlan(String planName) {
readLock.lock();
try {
return plans.get(planName);
} finally {
readLock.unlock();
}
}
/**
* @return the planStepSize
*/
@Override
public long getPlanFollowerTimeStep() {
readLock.lock();
try {
return planStepSize;
} finally {
readLock.unlock();
}
}
@Override
public void synchronizePlan(String planName) {
writeLock.lock();
try {
Plan plan = plans.get(planName);
if (plan != null) {
planFollower.synchronizePlan(plan);
}
} finally {
writeLock.unlock();
}
}
@Override
public void serviceInit(Configuration conf) throws Exception {
Configuration configuration = new Configuration(conf);
reinitialize(configuration, rmContext);
// Create the plan follower with the active plans
planFollower = createPlanFollower();
if (planFollower != null) {
planFollower.init(clock, scheduler, plans.values());
}
super.serviceInit(conf);
}
@Override
public void serviceStart() throws Exception {
if (planFollower != null) {
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
scheduledExecutorService.scheduleWithFixedDelay(planFollower, 0L,
planStepSize, TimeUnit.MILLISECONDS);
}
super.serviceStart();
}
@Override
public void serviceStop() {
// Stop the plan follower
if (scheduledExecutorService != null
&& !scheduledExecutorService.isShutdown()) {
scheduledExecutorService.shutdown();
}
// Clear the plans
plans.clear();
}
@Override
public String getQueueForReservation(ReservationId reservationId) {
readLock.lock();
try {
return resQMap.get(reservationId);
} finally {
readLock.unlock();
}
}
@Override
public void setQueueForReservation(ReservationId reservationId,
String queueName) {
writeLock.lock();
try {
resQMap.put(reservationId, queueName);
} finally {
writeLock.unlock();
}
}
@Override
public ReservationId getNewReservationId() {
writeLock.lock();
try {
ReservationId resId =
ReservationId.newInstance(ResourceManager.getClusterTimeStamp(),
resCounter.incrementAndGet());
LOG.info("Allocated new reservationId: " + resId);
return resId;
} finally {
writeLock.unlock();
}
}
@Override
public Map<String, Plan> getAllPlans() {
return plans;
}
/**
* Get the default reservation system corresponding to the scheduler
*
* @param scheduler the scheduler for which the reservation system is required
*/
public static String getDefaultReservationSystem(ResourceScheduler scheduler) {
// currently only capacity scheduler is supported
if (scheduler instanceof CapacityScheduler) {
return CapacityReservationSystem.class.getName();
}
return null;
}
protected abstract Plan initializePlan(String planQueueName)
throws YarnException;
protected abstract Planner getReplanner(String planQueueName);
protected abstract ReservationAgent getAgent(String queueName);
protected abstract SharingPolicy getAdmissionPolicy(String queueName);
}

View File

@ -0,0 +1,146 @@
/**
*
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This is the implementation of {@link ReservationSystem} based on the
* {@link CapacityScheduler}
*/
@LimitedPrivate("yarn")
@Unstable
public class CapacityReservationSystem extends AbstractReservationSystem {
private static final Logger LOG = LoggerFactory
.getLogger(CapacityReservationSystem.class);
private CapacityScheduler capScheduler;
public CapacityReservationSystem() {
super(CapacityReservationSystem.class.getName());
}
@Override
public void reinitialize(Configuration conf, RMContext rmContext)
throws YarnException {
// Validate if the scheduler is capacity based
ResourceScheduler scheduler = rmContext.getScheduler();
if (!(scheduler instanceof CapacityScheduler)) {
throw new YarnRuntimeException("Class "
+ scheduler.getClass().getCanonicalName() + " not instance of "
+ CapacityScheduler.class.getCanonicalName());
}
capScheduler = (CapacityScheduler) scheduler;
this.conf = conf;
super.reinitialize(conf, rmContext);
}
@Override
protected Plan initializePlan(String planQueueName) throws YarnException {
SharingPolicy adPolicy = getAdmissionPolicy(planQueueName);
String planQueuePath = capScheduler.getQueue(planQueueName).getQueuePath();
adPolicy.init(planQueuePath, capScheduler.getConfiguration());
CSQueue planQueue = capScheduler.getQueue(planQueueName);
// Calculate the max plan capacity
Resource minAllocation = capScheduler.getMinimumResourceCapability();
ResourceCalculator rescCalc = capScheduler.getResourceCalculator();
Resource totCap =
rescCalc.multiplyAndNormalizeDown(capScheduler.getClusterResource(),
planQueue.getAbsoluteCapacity(), minAllocation);
Plan plan =
new InMemoryPlan(capScheduler.getRootQueueMetrics(), adPolicy,
getAgent(planQueuePath), totCap, planStepSize, rescCalc,
minAllocation, capScheduler.getMaximumResourceCapability(),
planQueueName, getReplanner(planQueuePath), capScheduler
.getConfiguration().getMoveOnExpiry(planQueuePath));
LOG.info("Intialized plan {0} based on reservable queue {1}",
plan.toString(), planQueueName);
return plan;
}
@Override
protected Planner getReplanner(String planQueueName) {
CapacitySchedulerConfiguration capSchedulerConfig =
capScheduler.getConfiguration();
String plannerClassName = capSchedulerConfig.getReplanner(planQueueName);
LOG.info("Using Replanner: " + plannerClassName + " for queue: "
+ planQueueName);
try {
Class<?> plannerClazz =
capSchedulerConfig.getClassByName(plannerClassName);
if (Planner.class.isAssignableFrom(plannerClazz)) {
Planner planner =
(Planner) ReflectionUtils.newInstance(plannerClazz, conf);
planner.init(planQueueName, capSchedulerConfig);
return planner;
} else {
throw new YarnRuntimeException("Class: " + plannerClazz
+ " not instance of " + Planner.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException("Could not instantiate Planner: "
+ plannerClassName + " for queue: " + planQueueName, e);
}
}
@Override
protected ReservationAgent getAgent(String queueName) {
CapacitySchedulerConfiguration capSchedulerConfig =
capScheduler.getConfiguration();
String agentClassName = capSchedulerConfig.getReservationAgent(queueName);
LOG.info("Using Agent: " + agentClassName + " for queue: " + queueName);
try {
Class<?> agentClazz = capSchedulerConfig.getClassByName(agentClassName);
if (ReservationAgent.class.isAssignableFrom(agentClazz)) {
return (ReservationAgent) ReflectionUtils.newInstance(agentClazz, conf);
} else {
throw new YarnRuntimeException("Class: " + agentClassName
+ " not instance of " + ReservationAgent.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException("Could not instantiate Agent: "
+ agentClassName + " for queue: " + queueName, e);
}
}
@Override
protected SharingPolicy getAdmissionPolicy(String queueName) {
CapacitySchedulerConfiguration capSchedulerConfig =
capScheduler.getConfiguration();
String admissionPolicyClassName =
capSchedulerConfig.getReservationAdmissionPolicy(queueName);
LOG.info("Using AdmissionPolicy: " + admissionPolicyClassName
+ " for queue: " + queueName);
try {
Class<?> admissionPolicyClazz =
capSchedulerConfig.getClassByName(admissionPolicyClassName);
if (SharingPolicy.class.isAssignableFrom(admissionPolicyClazz)) {
return (SharingPolicy) ReflectionUtils.newInstance(
admissionPolicyClazz, conf);
} else {
throw new YarnRuntimeException("Class: " + admissionPolicyClassName
+ " not instance of " + SharingPolicy.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException("Could not instantiate AdmissionPolicy: "
+ admissionPolicyClassName + " for queue: " + queueName, e);
}
}
}

View File

@ -0,0 +1,244 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.List;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.Resources;
public class ReservationInputValidator {
private final Clock clock;
/**
* Utility class to validate reservation requests.
*/
public ReservationInputValidator(Clock clock) {
this.clock = clock;
}
private Plan validateReservation(ReservationSystem reservationSystem,
ReservationId reservationId, String auditConstant) throws YarnException {
String message = "";
// check if the reservation id is valid
if (reservationId == null) {
message =
"Missing reservation id."
+ " Please try again by specifying a reservation id.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
String queueName = reservationSystem.getQueueForReservation(reservationId);
if (queueName == null) {
message =
"The specified reservation with ID: " + reservationId
+ " is unknown. Please try again with a valid reservation.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
// check if the associated plan is valid
Plan plan = reservationSystem.getPlan(queueName);
if (plan == null) {
message =
"The specified reservation: " + reservationId
+ " is not associated with any valid plan."
+ " Please try again with a valid reservation.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
return plan;
}
private void validateReservationDefinition(ReservationId reservationId,
ReservationDefinition contract, Plan plan, String auditConstant)
throws YarnException {
String message = "";
// check if deadline is in the past
if (contract == null) {
message =
"Missing reservation definition."
+ " Please try again by specifying a reservation definition.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
if (contract.getDeadline() <= clock.getTime()) {
message =
"The specified deadline: " + contract.getDeadline()
+ " is the past. Please try again with deadline in the future.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
// Check if at least one RR has been specified
ReservationRequests resReqs = contract.getReservationRequests();
if (resReqs == null) {
message =
"No resources have been specified to reserve."
+ "Please try again by specifying the resources to reserve.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
List<ReservationRequest> resReq = resReqs.getReservationResources();
if (resReq == null || resReq.isEmpty()) {
message =
"No resources have been specified to reserve."
+ " Please try again by specifying the resources to reserve.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
// compute minimum duration and max gang size
long minDuration = 0;
Resource maxGangSize = Resource.newInstance(0, 0);
ReservationRequestInterpreter type =
contract.getReservationRequests().getInterpreter();
for (ReservationRequest rr : resReq) {
if (type == ReservationRequestInterpreter.R_ALL
|| type == ReservationRequestInterpreter.R_ANY) {
minDuration = Math.max(minDuration, rr.getDuration());
} else {
minDuration += rr.getDuration();
}
maxGangSize =
Resources.max(plan.getResourceCalculator(), plan.getTotalCapacity(),
maxGangSize,
Resources.multiply(rr.getCapability(), rr.getConcurrency()));
}
// verify the allocation is possible (skip for ANY)
if (contract.getDeadline() - contract.getArrival() < minDuration
&& type != ReservationRequestInterpreter.R_ANY) {
message =
"The time difference ("
+ (contract.getDeadline() - contract.getArrival())
+ ") between arrival (" + contract.getArrival() + ") "
+ "and deadline (" + contract.getDeadline() + ") must "
+ " be greater or equal to the minimum resource duration ("
+ minDuration + ")";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
// check that the largest gang does not exceed the inventory available
// capacity (skip for ANY)
if (Resources.greaterThan(plan.getResourceCalculator(),
plan.getTotalCapacity(), maxGangSize, plan.getTotalCapacity())
&& type != ReservationRequestInterpreter.R_ANY) {
message =
"The size of the largest gang in the reservation refinition ("
+ maxGangSize + ") exceed the capacity available ("
+ plan.getTotalCapacity() + " )";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input definition", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
}
/**
* Quick validation on the input to check some obvious fail conditions (fail
* fast) the input and returns the appropriate {@link Plan} associated with
* the specified {@link Queue} or throws an exception message illustrating the
* details of any validation check failures
*
* @param reservationSystem the {@link ReservationSystem} to validate against
* @param request the {@link ReservationSubmissionRequest} defining the
* resources required over time for the request
* @param reservationId the {@link ReservationId} associated with the current
* request
* @return the {@link Plan} to submit the request to
* @throws YarnException
*/
public Plan validateReservationSubmissionRequest(
ReservationSystem reservationSystem,
ReservationSubmissionRequest request, ReservationId reservationId)
throws YarnException {
// Check if it is a managed queue
String queueName = request.getQueue();
if (queueName == null || queueName.isEmpty()) {
String errMsg =
"The queue to submit is not specified."
+ " Please try again with a valid reservable queue.";
RMAuditLogger.logFailure("UNKNOWN",
AuditConstants.SUBMIT_RESERVATION_REQUEST,
"validate reservation input", "ClientRMService", errMsg);
throw RPCUtil.getRemoteException(errMsg);
}
Plan plan = reservationSystem.getPlan(queueName);
if (plan == null) {
String errMsg =
"The specified queue: " + queueName
+ " is not managed by reservation system."
+ " Please try again with a valid reservable queue.";
RMAuditLogger.logFailure("UNKNOWN",
AuditConstants.SUBMIT_RESERVATION_REQUEST,
"validate reservation input", "ClientRMService", errMsg);
throw RPCUtil.getRemoteException(errMsg);
}
validateReservationDefinition(reservationId,
request.getReservationDefinition(), plan,
AuditConstants.SUBMIT_RESERVATION_REQUEST);
return plan;
}
/**
* Quick validation on the input to check some obvious fail conditions (fail
* fast) the input and returns the appropriate {@link Plan} associated with
* the specified {@link Queue} or throws an exception message illustrating the
* details of any validation check failures
*
* @param reservationSystem the {@link ReservationSystem} to validate against
* @param request the {@link ReservationUpdateRequest} defining the resources
* required over time for the request
* @return the {@link Plan} to submit the request to
* @throws YarnException
*/
public Plan validateReservationUpdateRequest(
ReservationSystem reservationSystem, ReservationUpdateRequest request)
throws YarnException {
ReservationId reservationId = request.getReservationId();
Plan plan =
validateReservation(reservationSystem, reservationId,
AuditConstants.UPDATE_RESERVATION_REQUEST);
validateReservationDefinition(reservationId,
request.getReservationDefinition(), plan,
AuditConstants.UPDATE_RESERVATION_REQUEST);
return plan;
}
/**
* Quick validation on the input to check some obvious fail conditions (fail
* fast) the input and returns the appropriate {@link Plan} associated with
* the specified {@link Queue} or throws an exception message illustrating the
* details of any validation check failures
*
* @param reservationSystem the {@link ReservationSystem} to validate against
* @param request the {@link ReservationDeleteRequest} defining the resources
* required over time for the request
* @return the {@link Plan} to submit the request to
* @throws YarnException
*/
public Plan validateReservationDeleteRequest(
ReservationSystem reservationSystem, ReservationDeleteRequest request)
throws YarnException {
return validateReservation(reservationSystem, request.getReservationId(),
AuditConstants.DELETE_RESERVATION_REQUEST);
}
}

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
/**
* This interface is the one implemented by any system that wants to support
* Reservations i.e. make {@link Resource} allocations in future. Implementors
* need to bootstrap all configured {@link Plan}s in the active
* {@link ResourceScheduler} along with their corresponding
* {@link ReservationAgent} and {@link SharingPolicy}. It is also responsible
* for managing the {@link PlanFollower} to ensure the {@link Plan}s are in sync
* with the {@link ResourceScheduler}.
*/
@LimitedPrivate("yarn")
@Unstable
public interface ReservationSystem {
/**
* Set RMContext for {@link ReservationSystem}. This method should be called
* immediately after instantiating a reservation system once.
*
* @param rmContext created by {@link ResourceManager}
*/
void setRMContext(RMContext rmContext);
/**
* Re-initialize the {@link ReservationSystem}.
*
* @param conf configuration
* @param rmContext current context of the {@link ResourceManager}
* @throws YarnException
*/
void reinitialize(Configuration conf, RMContext rmContext)
throws YarnException;
/**
* Get an existing {@link Plan} that has been initialized.
*
* @param planName the name of the {@link Plan}
* @return the {@link Plan} identified by name
*
*/
Plan getPlan(String planName);
/**
* Return a map containing all the plans known to this ReservationSystem
* (useful for UI)
*
* @return a Map of Plan names and Plan objects
*/
Map<String, Plan> getAllPlans();
/**
* Invokes {@link PlanFollower} to synchronize the specified {@link Plan} with
* the {@link ResourceScheduler}
*
* @param planName the name of the {@link Plan} to be synchronized
*/
void synchronizePlan(String planName);
/**
* Return the time step (ms) at which the {@link PlanFollower} is invoked
*
* @return the time step (ms) at which the {@link PlanFollower} is invoked
*/
long getPlanFollowerTimeStep();
/**
* Get a new unique {@link ReservationId}.
*
* @return a new unique {@link ReservationId}
*
*/
ReservationId getNewReservationId();
/**
* Get the {@link Queue} that an existing {@link ReservationId} is associated
* with.
*
* @param reservationId the unique id of the reservation
* @return the name of the associated Queue
*
*/
String getQueueForReservation(ReservationId reservationId);
/**
* Set the {@link Queue} that an existing {@link ReservationId} should be
* associated with.
*
* @param reservationId the unique id of the reservation
* @param queueName the name of Queue to associate the reservation with
*
*/
void setQueueForReservation(ReservationId reservationId, String queueName);
}

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -236,4 +237,6 @@ public interface RMApp extends EventHandler<RMAppEvent> {
* @return metrics * @return metrics
*/ */
RMAppMetrics getRMAppMetrics(); RMAppMetrics getRMAppMetrics();
ReservationId getReservationId();
} }

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -1284,4 +1285,8 @@ public class RMAppImpl implements RMApp, Recoverable {
this.systemClock = clock; this.systemClock = clock;
} }
@Override
public ReservationId getReservationId() {
return submissionContext.getReservationID();
}
} }

View File

@ -19,25 +19,33 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
public class AppAddedSchedulerEvent extends SchedulerEvent { public class AppAddedSchedulerEvent extends SchedulerEvent {
private final ApplicationId applicationId; private final ApplicationId applicationId;
private final String queue; private final String queue;
private final String user; private final String user;
private final ReservationId reservationID;
private final boolean isAppRecovering; private final boolean isAppRecovering;
public AppAddedSchedulerEvent( public AppAddedSchedulerEvent(
ApplicationId applicationId, String queue, String user) { ApplicationId applicationId, String queue, String user) {
this(applicationId, queue, user, false); this(applicationId, queue, user, false, null);
} }
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
String user, boolean isAppRecovering) { String user, ReservationId reservationID) {
this(applicationId, queue, user, false, reservationID);
}
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
String user, boolean isAppRecovering, ReservationId reservationID) {
super(SchedulerEventType.APP_ADDED); super(SchedulerEventType.APP_ADDED);
this.applicationId = applicationId; this.applicationId = applicationId;
this.queue = queue; this.queue = queue;
this.user = user; this.user = user;
this.reservationID = reservationID;
this.isAppRecovering = isAppRecovering; this.isAppRecovering = isAppRecovering;
} }
@ -56,4 +64,8 @@ public class AppAddedSchedulerEvent extends SchedulerEvent {
public boolean getIsAppRecovering() { public boolean getIsAppRecovering() {
return isAppRecovering; return isAppRecovering;
} }
public ReservationId getReservationID() {
return reservationID;
}
} }

View File

@ -33,6 +33,7 @@ import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -74,6 +75,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -91,7 +98,12 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
@ -107,6 +119,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
@ -117,11 +130,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -1199,4 +1217,102 @@ public class TestClientRMService {
when(yarnScheduler.getAppResourceUsageReport(attemptId)).thenReturn(null); when(yarnScheduler.getAppResourceUsageReport(attemptId)).thenReturn(null);
return yarnScheduler; return yarnScheduler;
} }
@Test
public void testReservationAPIs() {
// initialize
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
ReservationSystemTestUtil.setupQueueConfiguration(conf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
MockRM rm = new MockRM(conf);
rm.start();
MockNM nm;
try {
nm = rm.registerNode("127.0.0.1:0", 102400, 100);
// allow plan follower to synchronize
Thread.sleep(1050);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
// Create a client.
ClientRMService clientService = rm.getClientRMService();
// create a reservation
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
createSimpleReservationRequest(4, arrival, deadline, duration);
ReservationSubmissionResponse sResponse = null;
try {
sResponse = clientService.submitReservation(sRequest);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(sResponse);
ReservationId reservationID = sResponse.getReservationId();
Assert.assertNotNull(reservationID);
LOG.info("Submit reservation response: " + reservationID);
// Update the reservation
ReservationDefinition rDef = sRequest.getReservationDefinition();
ReservationRequest rr =
rDef.getReservationRequests().getReservationResources().get(0);
rr.setNumContainers(5);
arrival = clock.getTime();
duration = 30000;
deadline = (long) (arrival + 1.05 * duration);
rr.setDuration(duration);
rDef.setArrival(arrival);
rDef.setDeadline(deadline);
ReservationUpdateRequest uRequest =
ReservationUpdateRequest.newInstance(rDef, reservationID);
ReservationUpdateResponse uResponse = null;
try {
uResponse = clientService.updateReservation(uRequest);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(sResponse);
LOG.info("Update reservation response: " + uResponse);
// Delete the reservation
ReservationDeleteRequest dRequest =
ReservationDeleteRequest.newInstance(reservationID);
ReservationDeleteResponse dResponse = null;
try {
dResponse = clientService.deleteReservation(dRequest);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(sResponse);
LOG.info("Delete reservation response: " + dResponse);
// clean-up
rm.stop();
nm = null;
rm = null;
}
private ReservationSubmissionRequest createSimpleReservationRequest(
int numContainers, long arrival, long deadline, long duration) {
// create a request with a single atomic ask
ReservationRequest r =
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
numContainers, 1, duration);
ReservationRequests reqs =
ReservationRequests.newInstance(Collections.singletonList(r),
ReservationRequestInterpreter.R_ALL);
ReservationDefinition rDef =
ReservationDefinition.newInstance(arrival, deadline, reqs,
"testClientRMService#reservation");
ReservationSubmissionRequest request =
ReservationSubmissionRequest.newInstance(rDef,
ReservationSystemTestUtil.reservationQ);
return request;
}
} }

View File

@ -0,0 +1,102 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.io.IOException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.junit.Assert;
import org.junit.Test;
public class TestCapacityReservationSystem {
@Test
public void testInitialize() {
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
CapacityScheduler capScheduler = null;
try {
capScheduler = testUtil.mockCapacityScheduler(10);
} catch (IOException e) {
Assert.fail(e.getMessage());
}
CapacityReservationSystem reservationSystem =
new CapacityReservationSystem();
reservationSystem.setRMContext(capScheduler.getRMContext());
try {
reservationSystem.reinitialize(capScheduler.getConf(),
capScheduler.getRMContext());
} catch (YarnException e) {
Assert.fail(e.getMessage());
}
String planQName = testUtil.getreservationQueueName();
Plan plan = reservationSystem.getPlan(planQName);
Assert.assertNotNull(plan);
Assert.assertTrue(plan instanceof InMemoryPlan);
Assert.assertEquals(planQName, plan.getQueueName());
Assert.assertEquals(8192, plan.getTotalCapacity().getMemory());
Assert
.assertTrue(plan.getReservationAgent() instanceof GreedyReservationAgent);
Assert
.assertTrue(plan.getSharingPolicy() instanceof CapacityOverTimePolicy);
}
@Test
public void testReinitialize() {
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
CapacityScheduler capScheduler = null;
try {
capScheduler = testUtil.mockCapacityScheduler(10);
} catch (IOException e) {
Assert.fail(e.getMessage());
}
CapacityReservationSystem reservationSystem =
new CapacityReservationSystem();
CapacitySchedulerConfiguration conf = capScheduler.getConfiguration();
RMContext mockContext = capScheduler.getRMContext();
reservationSystem.setRMContext(mockContext);
try {
reservationSystem.reinitialize(capScheduler.getConfiguration(),
mockContext);
} catch (YarnException e) {
Assert.fail(e.getMessage());
}
// Assert queue in original config
String planQName = testUtil.getreservationQueueName();
Plan plan = reservationSystem.getPlan(planQName);
Assert.assertNotNull(plan);
Assert.assertTrue(plan instanceof InMemoryPlan);
Assert.assertEquals(planQName, plan.getQueueName());
Assert.assertEquals(8192, plan.getTotalCapacity().getMemory());
Assert
.assertTrue(plan.getReservationAgent() instanceof GreedyReservationAgent);
Assert
.assertTrue(plan.getSharingPolicy() instanceof CapacityOverTimePolicy);
// Dynamically add a plan
String newQ = "reservation";
Assert.assertNull(reservationSystem.getPlan(newQ));
testUtil.updateQueueConfiguration(conf, newQ);
try {
capScheduler.reinitialize(conf, mockContext);
} catch (IOException e) {
Assert.fail(e.getMessage());
}
try {
reservationSystem.reinitialize(conf, mockContext);
} catch (YarnException e) {
Assert.fail(e.getMessage());
}
Plan newPlan = reservationSystem.getPlan(newQ);
Assert.assertNotNull(newPlan);
Assert.assertTrue(newPlan instanceof InMemoryPlan);
Assert.assertEquals(newQ, newPlan.getQueueName());
Assert.assertEquals(1024, newPlan.getTotalCapacity().getMemory());
Assert
.assertTrue(newPlan.getReservationAgent() instanceof GreedyReservationAgent);
Assert
.assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy);
}
}

View File

@ -0,0 +1,560 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.text.MessageFormat;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationDeleteRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateRequestPBImpl;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestReservationInputValidator {
private static final Log LOG = LogFactory
.getLog(TestReservationInputValidator.class);
private static final String PLAN_NAME = "test-reservation";
private Clock clock;
private Map<String, Plan> plans = new HashMap<String, Plan>(1);
private ReservationSystem rSystem;
private Plan plan;
private ReservationInputValidator rrValidator;
@Before
public void setUp() {
clock = mock(Clock.class);
plan = mock(Plan.class);
rSystem = mock(ReservationSystem.class);
plans.put(PLAN_NAME, plan);
rrValidator = new ReservationInputValidator(clock);
when(clock.getTime()).thenReturn(1L);
ResourceCalculator rCalc = new DefaultResourceCalculator();
Resource resource = Resource.newInstance(10240, 10);
when(plan.getResourceCalculator()).thenReturn(rCalc);
when(plan.getTotalCapacity()).thenReturn(resource);
when(rSystem.getQueueForReservation(any(ReservationId.class))).thenReturn(
PLAN_NAME);
when(rSystem.getPlan(PLAN_NAME)).thenReturn(plan);
}
@After
public void tearDown() {
rrValidator = null;
clock = null;
plan = null;
}
@Test
public void testSubmitReservationNormal() {
ReservationSubmissionRequest request =
createSimpleReservationSubmissionRequest(1, 1, 1, 5, 3);
Plan plan = null;
try {
plan =
rrValidator.validateReservationSubmissionRequest(rSystem, request,
ReservationSystemTestUtil.getNewReservationId());
} catch (YarnException e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(plan);
}
@Test
public void testSubmitReservationDoesnotExist() {
ReservationSubmissionRequest request =
new ReservationSubmissionRequestPBImpl();
Plan plan = null;
try {
plan =
rrValidator.validateReservationSubmissionRequest(rSystem, request,
ReservationSystemTestUtil.getNewReservationId());
Assert.fail();
} catch (YarnException e) {
Assert.assertNull(plan);
String message = e.getMessage();
Assert
.assertTrue(message
.equals("The queue to submit is not specified. Please try again with a valid reservable queue."));
LOG.info(message);
}
}
@Test
public void testSubmitReservationInvalidPlan() {
ReservationSubmissionRequest request =
createSimpleReservationSubmissionRequest(1, 1, 1, 5, 3);
when(rSystem.getPlan(PLAN_NAME)).thenReturn(null);
Plan plan = null;
try {
plan =
rrValidator.validateReservationSubmissionRequest(rSystem, request,
ReservationSystemTestUtil.getNewReservationId());
Assert.fail();
} catch (YarnException e) {
Assert.assertNull(plan);
String message = e.getMessage();
Assert
.assertTrue(message
.endsWith(" is not managed by reservation system. Please try again with a valid reservable queue."));
LOG.info(message);
}
}
@Test
public void testSubmitReservationNoDefinition() {
ReservationSubmissionRequest request =
new ReservationSubmissionRequestPBImpl();
request.setQueue(PLAN_NAME);
Plan plan = null;
try {
plan =
rrValidator.validateReservationSubmissionRequest(rSystem, request,
ReservationSystemTestUtil.getNewReservationId());
Assert.fail();
} catch (YarnException e) {
Assert.assertNull(plan);
String message = e.getMessage();
Assert
.assertTrue(message
.equals("Missing reservation definition. Please try again by specifying a reservation definition."));
LOG.info(message);
}
}
@Test
public void testSubmitReservationInvalidDeadline() {
ReservationSubmissionRequest request =
createSimpleReservationSubmissionRequest(1, 1, 1, 0, 3);
Plan plan = null;
try {
plan =
rrValidator.validateReservationSubmissionRequest(rSystem, request,
ReservationSystemTestUtil.getNewReservationId());
Assert.fail();
} catch (YarnException e) {
Assert.assertNull(plan);
String message = e.getMessage();
Assert.assertTrue(message
.startsWith("The specified deadline: 0 is the past"));
LOG.info(message);
}
}
@Test
public void testSubmitReservationInvalidRR() {
ReservationSubmissionRequest request =
createSimpleReservationSubmissionRequest(0, 0, 1, 5, 3);
Plan plan = null;
try {
plan =
rrValidator.validateReservationSubmissionRequest(rSystem, request,
ReservationSystemTestUtil.getNewReservationId());
Assert.fail();
} catch (YarnException e) {
Assert.assertNull(plan);
String message = e.getMessage();
Assert.assertTrue(message
.startsWith("No resources have been specified to reserve"));
LOG.info(message);
}
}
@Test
public void testSubmitReservationEmptyRR() {
ReservationSubmissionRequest request =
createSimpleReservationSubmissionRequest(1, 0, 1, 5, 3);
Plan plan = null;
try {
plan =
rrValidator.validateReservationSubmissionRequest(rSystem, request,
ReservationSystemTestUtil.getNewReservationId());
Assert.fail();
} catch (YarnException e) {
Assert.assertNull(plan);
String message = e.getMessage();
Assert.assertTrue(message
.startsWith("No resources have been specified to reserve"));
LOG.info(message);
}
}
@Test
public void testSubmitReservationInvalidDuration() {
ReservationSubmissionRequest request =
createSimpleReservationSubmissionRequest(1, 1, 1, 3, 4);
Plan plan = null;
try {
plan =
rrValidator.validateReservationSubmissionRequest(rSystem, request,
ReservationSystemTestUtil.getNewReservationId());
Assert.fail();
} catch (YarnException e) {
Assert.assertNull(plan);
String message = e.getMessage();
Assert.assertTrue(message.startsWith("The time difference"));
Assert
.assertTrue(message
.contains("must be greater or equal to the minimum resource duration"));
LOG.info(message);
}
}
@Test
public void testSubmitReservationExceedsGangSize() {
ReservationSubmissionRequest request =
createSimpleReservationSubmissionRequest(1, 1, 1, 5, 4);
Resource resource = Resource.newInstance(512, 1);
when(plan.getTotalCapacity()).thenReturn(resource);
Plan plan = null;
try {
plan =
rrValidator.validateReservationSubmissionRequest(rSystem, request,
ReservationSystemTestUtil.getNewReservationId());
Assert.fail();
} catch (YarnException e) {
Assert.assertNull(plan);
String message = e.getMessage();
Assert
.assertTrue(message
.startsWith("The size of the largest gang in the reservation refinition"));
Assert.assertTrue(message.contains("exceed the capacity available "));
LOG.info(message);
}
}
@Test
public void testUpdateReservationNormal() {
ReservationUpdateRequest request =
createSimpleReservationUpdateRequest(1, 1, 1, 5, 3);
Plan plan = null;
try {
plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
} catch (YarnException e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(plan);
}
@Test
public void testUpdateReservationNoID() {
ReservationUpdateRequest request = new ReservationUpdateRequestPBImpl();
Plan plan = null;
try {
plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
Assert.fail();
} catch (YarnException e) {
Assert.assertNull(plan);
String message = e.getMessage();
Assert
.assertTrue(message
.startsWith("Missing reservation id. Please try again by specifying a reservation id."));
LOG.info(message);
}
}
@Test
public void testUpdateReservationDoesnotExist() {
ReservationUpdateRequest request =
createSimpleReservationUpdateRequest(1, 1, 1, 5, 4);
ReservationId rId = request.getReservationId();
when(rSystem.getQueueForReservation(rId)).thenReturn(null);
Plan plan = null;
try {
plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
Assert.fail();
} catch (YarnException e) {
Assert.assertNull(plan);
String message = e.getMessage();
Assert
.assertTrue(message.equals(MessageFormat
.format(
"The specified reservation with ID: {0} is unknown. Please try again with a valid reservation.",
rId)));
LOG.info(message);
}
}
@Test
public void testUpdateReservationInvalidPlan() {
ReservationUpdateRequest request =
createSimpleReservationUpdateRequest(1, 1, 1, 5, 4);
when(rSystem.getPlan(PLAN_NAME)).thenReturn(null);
Plan plan = null;
try {
plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
Assert.fail();
} catch (YarnException e) {
Assert.assertNull(plan);
String message = e.getMessage();
Assert
.assertTrue(message
.endsWith(" is not associated with any valid plan. Please try again with a valid reservation."));
LOG.info(message);
}
}
@Test
public void testUpdateReservationNoDefinition() {
ReservationUpdateRequest request = new ReservationUpdateRequestPBImpl();
request.setReservationId(ReservationSystemTestUtil.getNewReservationId());
Plan plan = null;
try {
plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
Assert.fail();
} catch (YarnException e) {
Assert.assertNull(plan);
String message = e.getMessage();
Assert
.assertTrue(message
.startsWith("Missing reservation definition. Please try again by specifying a reservation definition."));
LOG.info(message);
}
}
@Test
public void testUpdateReservationInvalidDeadline() {
ReservationUpdateRequest request =
createSimpleReservationUpdateRequest(1, 1, 1, 0, 3);
Plan plan = null;
try {
plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
Assert.fail();
} catch (YarnException e) {
Assert.assertNull(plan);
String message = e.getMessage();
Assert.assertTrue(message
.startsWith("The specified deadline: 0 is the past"));
LOG.info(message);
}
}
@Test
public void testUpdateReservationInvalidRR() {
ReservationUpdateRequest request =
createSimpleReservationUpdateRequest(0, 0, 1, 5, 3);
Plan plan = null;
try {
plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
Assert.fail();
} catch (YarnException e) {
Assert.assertNull(plan);
String message = e.getMessage();
Assert.assertTrue(message
.startsWith("No resources have been specified to reserve"));
LOG.info(message);
}
}
@Test
public void testUpdateReservationEmptyRR() {
ReservationUpdateRequest request =
createSimpleReservationUpdateRequest(1, 0, 1, 5, 3);
Plan plan = null;
try {
plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
Assert.fail();
} catch (YarnException e) {
Assert.assertNull(plan);
String message = e.getMessage();
Assert.assertTrue(message
.startsWith("No resources have been specified to reserve"));
LOG.info(message);
}
}
@Test
public void testUpdateReservationInvalidDuration() {
ReservationUpdateRequest request =
createSimpleReservationUpdateRequest(1, 1, 1, 3, 4);
Plan plan = null;
try {
plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
Assert.fail();
} catch (YarnException e) {
Assert.assertNull(plan);
String message = e.getMessage();
Assert
.assertTrue(message
.contains("must be greater or equal to the minimum resource duration"));
LOG.info(message);
}
}
@Test
public void testUpdateReservationExceedsGangSize() {
ReservationUpdateRequest request =
createSimpleReservationUpdateRequest(1, 1, 1, 5, 4);
Resource resource = Resource.newInstance(512, 1);
when(plan.getTotalCapacity()).thenReturn(resource);
Plan plan = null;
try {
plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
Assert.fail();
} catch (YarnException e) {
Assert.assertNull(plan);
String message = e.getMessage();
Assert
.assertTrue(message
.startsWith("The size of the largest gang in the reservation refinition"));
Assert.assertTrue(message.contains("exceed the capacity available "));
LOG.info(message);
}
}
@Test
public void testDeleteReservationNormal() {
ReservationDeleteRequest request = new ReservationDeleteRequestPBImpl();
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
request.setReservationId(reservationID);
ReservationAllocation reservation = mock(ReservationAllocation.class);
when(plan.getReservationById(reservationID)).thenReturn(reservation);
Plan plan = null;
try {
plan = rrValidator.validateReservationDeleteRequest(rSystem, request);
} catch (YarnException e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(plan);
}
@Test
public void testDeleteReservationNoID() {
ReservationDeleteRequest request = new ReservationDeleteRequestPBImpl();
Plan plan = null;
try {
plan = rrValidator.validateReservationDeleteRequest(rSystem, request);
Assert.fail();
} catch (YarnException e) {
Assert.assertNull(plan);
String message = e.getMessage();
Assert
.assertTrue(message
.startsWith("Missing reservation id. Please try again by specifying a reservation id."));
LOG.info(message);
}
}
@Test
public void testDeleteReservationDoesnotExist() {
ReservationDeleteRequest request = new ReservationDeleteRequestPBImpl();
ReservationId rId = ReservationSystemTestUtil.getNewReservationId();
request.setReservationId(rId);
when(rSystem.getQueueForReservation(rId)).thenReturn(null);
Plan plan = null;
try {
plan = rrValidator.validateReservationDeleteRequest(rSystem, request);
Assert.fail();
} catch (YarnException e) {
Assert.assertNull(plan);
String message = e.getMessage();
Assert
.assertTrue(message.equals(MessageFormat
.format(
"The specified reservation with ID: {0} is unknown. Please try again with a valid reservation.",
rId)));
LOG.info(message);
}
}
@Test
public void testDeleteReservationInvalidPlan() {
ReservationDeleteRequest request = new ReservationDeleteRequestPBImpl();
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
request.setReservationId(reservationID);
when(rSystem.getPlan(PLAN_NAME)).thenReturn(null);
Plan plan = null;
try {
plan = rrValidator.validateReservationDeleteRequest(rSystem, request);
Assert.fail();
} catch (YarnException e) {
Assert.assertNull(plan);
String message = e.getMessage();
Assert
.assertTrue(message
.endsWith(" is not associated with any valid plan. Please try again with a valid reservation."));
LOG.info(message);
}
}
private ReservationSubmissionRequest createSimpleReservationSubmissionRequest(
int numRequests, int numContainers, long arrival, long deadline,
long duration) {
// create a request with a single atomic ask
ReservationSubmissionRequest request =
new ReservationSubmissionRequestPBImpl();
ReservationDefinition rDef = new ReservationDefinitionPBImpl();
rDef.setArrival(arrival);
rDef.setDeadline(deadline);
if (numRequests > 0) {
ReservationRequests reqs = new ReservationRequestsPBImpl();
rDef.setReservationRequests(reqs);
if (numContainers > 0) {
ReservationRequest r =
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
numContainers, 1, duration);
reqs.setReservationResources(Collections.singletonList(r));
reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
}
}
request.setQueue(PLAN_NAME);
request.setReservationDefinition(rDef);
return request;
}
private ReservationUpdateRequest createSimpleReservationUpdateRequest(
int numRequests, int numContainers, long arrival, long deadline,
long duration) {
// create a request with a single atomic ask
ReservationUpdateRequest request = new ReservationUpdateRequestPBImpl();
ReservationDefinition rDef = new ReservationDefinitionPBImpl();
rDef.setArrival(arrival);
rDef.setDeadline(deadline);
if (numRequests > 0) {
ReservationRequests reqs = new ReservationRequestsPBImpl();
rDef.setReservationRequests(reqs);
if (numContainers > 0) {
ReservationRequest r =
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
numContainers, 1, duration);
reqs.setReservationResources(Collections.singletonList(r));
reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
}
}
request.setReservationDefinition(rDef);
request.setReservationId(ReservationSystemTestUtil.getNewReservationId());
return request;
}
}