YARN-4957. Add getNewReservation in ApplicationClientProtocol (Sean Po via curino)

This commit is contained in:
Carlo Curino 2016-05-25 16:55:49 -07:00
parent 3c83cee118
commit 013532a95e
34 changed files with 1682 additions and 565 deletions
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src
main/java/org/apache/hadoop/mapred
test/java/org/apache/hadoop/mapred
hadoop-yarn-project/hadoop-yarn
hadoop-yarn-api/src/main
hadoop-yarn-client/src
main/java/org/apache/hadoop/yarn/client/api
test/java/org/apache/hadoop/yarn/client/api/impl
hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api
hadoop-yarn-server
hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy
hadoop-yarn-server-resourcemanager/src
hadoop-yarn-site/src/site/markdown

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
@ -435,6 +436,12 @@ public class ResourceMgrDelegate extends YarnClient {
client.moveApplicationAcrossQueues(appId, queue);
}
@Override
public GetNewReservationResponse createReservation() throws YarnException,
IOException {
return client.createReservation();
}
@Override
public ReservationSubmissionResponse submitReservation(
ReservationSubmissionRequest request) throws YarnException, IOException {

View File

@ -96,6 +96,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
@ -423,6 +425,12 @@ public class TestClientRedirect {
return null;
}
@Override
public GetNewReservationResponse getNewReservation(
GetNewReservationRequest request) throws YarnException, IOException {
return null;
}
@Override
public ReservationSubmissionResponse submitReservation(
ReservationSubmissionRequest request) throws YarnException, IOException {

View File

@ -37,6 +37,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
@ -301,6 +303,28 @@ public interface ApplicationClientProtocol extends ApplicationBaseProtocol {
public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
MoveApplicationAcrossQueuesRequest request) throws YarnException, IOException;
/**
* <p>The interface used by clients to obtain a new {@link ReservationId} for
* submitting new reservations.</p>
*
* <p>The <code>ResourceManager</code> responds with a new, unique,
* {@link ReservationId} which is used by the client to submit
* a new reservation.</p>
*
* @param request to get a new <code>ReservationId</code>
* @return response containing the new <code>ReservationId</code> to be used
* to submit a new reservation
* @throws YarnException if the reservation system is not enabled.
* @throws IOException on IO failures.
* @see #submitReservation(ReservationSubmissionRequest)
*/
@Public
@Unstable
@Idempotent
GetNewReservationResponse getNewReservation(
GetNewReservationRequest request)
throws YarnException, IOException;
/**
* <p>
* The interface used by clients to submit a new reservation to the
@ -349,6 +373,7 @@ public interface ApplicationClientProtocol extends ApplicationBaseProtocol {
*/
@Public
@Unstable
@Idempotent
public ReservationSubmissionResponse submitReservation(
ReservationSubmissionRequest request) throws YarnException, IOException;

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
/**
* <p>The request sent by clients to get a new {@code ReservationId} for
* submitting an reservation.</p>
*
* {@code ApplicationClientProtocol#getNewReservation(GetNewReservationRequest)}
*/
@Public
@Unstable
public abstract class GetNewReservationRequest {
@Public
@Unstable
public static GetNewReservationRequest newInstance() {
GetNewReservationRequest request =
Records.newRecord(GetNewReservationRequest.class);
return request;
}
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.util.Records;
/**
* <p>The response sent by the <code>ResourceManager</code> to the client for
* a request to get a new {@link ReservationId} for submitting reservations.</p>
*
* <p>Clients can submit an reservation with the returned
* {@link ReservationId}.</p>
*
* {@code ApplicationClientProtocol#getNewReservation(GetNewReservationRequest)}
*/
@Public
@Unstable
public abstract class GetNewReservationResponse {
@Private
@Unstable
public static GetNewReservationResponse newInstance(
ReservationId reservationId) {
GetNewReservationResponse response =
Records.newRecord(GetNewReservationResponse.class);
response.setReservationId(reservationId);
return response;
}
/**
* Get a new {@link ReservationId} to be used to submit a reservation.
*
* @return a {@link ReservationId} representing the unique id to identify
* a reservation with which it was submitted.
*/
@Public
@Unstable
public abstract ReservationId getReservationId();
/**
* Set a new {@link ReservationId} to be used to submit a reservation.
*
* @param reservationId a {@link ReservationId} representing the unique id to
* identify a reservation with which it was submitted.
*/
@Private
@Unstable
public abstract void setReservationId(ReservationId reservationId);
}

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.util.Records;
/**
@ -38,11 +39,13 @@ public abstract class ReservationSubmissionRequest {
@Public
@Unstable
public static ReservationSubmissionRequest newInstance(
ReservationDefinition reservationDefinition, String queueName) {
ReservationDefinition reservationDefinition, String queueName,
ReservationId reservationId) {
ReservationSubmissionRequest request =
Records.newRecord(ReservationSubmissionRequest.class);
request.setReservationDefinition(reservationDefinition);
request.setQueue(queueName);
request.setReservationId(reservationId);
return request;
}
@ -94,4 +97,24 @@ public abstract class ReservationSubmissionRequest {
@Unstable
public abstract void setQueue(String queueName);
/**
* Get the reservation id that corresponds to the reservation submission.
*
* @return reservation id that will be used to identify the reservation
* submission.
*/
@Public
@Unstable
public abstract ReservationId getReservationId();
/**
* Set the reservation id that corresponds to the reservation submission.
*
* @param reservationId reservation id that will be used to identify the
* reservation submission.
*/
@Public
@Unstable
public abstract void setReservationId(ReservationId reservationId);
}

View File

@ -21,18 +21,17 @@ package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.util.Records;
/**
* {@link ReservationSubmissionResponse} contains the answer of the admission
* control system in the {@code ResourceManager} to a reservation create
* operation. Response contains a {@link ReservationId} if the operation was
* successful, if not an exception reporting reason for a failure.
*
* @see ReservationDefinition
*
* <p>The response sent by the <code>ResourceManager</code> to a client on
* reservation submission.</p>
*
* <p>Currently, this is empty.</p>
*
* {@code ApplicationClientProtocol#submitReservation(
* ReservationSubmissionRequest)}
*
*/
@Public
@Unstable
@ -40,37 +39,10 @@ public abstract class ReservationSubmissionResponse {
@Private
@Unstable
public static ReservationSubmissionResponse newInstance(
ReservationId reservationId) {
public static ReservationSubmissionResponse newInstance() {
ReservationSubmissionResponse response =
Records.newRecord(ReservationSubmissionResponse.class);
response.setReservationId(reservationId);
return response;
}
/**
* Get the {@link ReservationId}, that corresponds to a valid resource
* allocation in the scheduler (between start and end time of this
* reservation)
*
* @return the {@link ReservationId} representing the unique id of the
* corresponding reserved resource allocation in the scheduler
*/
@Public
@Unstable
public abstract ReservationId getReservationId();
/**
* Set the {@link ReservationId}, that correspond to a valid resource
* allocation in the scheduler (between start and end time of this
* reservation)
*
* @param reservationId the {@link ReservationId} representing the the unique
* id of the corresponding reserved resource allocation in the
* scheduler
*/
@Private
@Unstable
public abstract void setReservationId(ReservationId reservationId);
}

View File

@ -50,6 +50,7 @@ service ApplicationClientProtocolService {
rpc getApplicationAttempts (GetApplicationAttemptsRequestProto) returns (GetApplicationAttemptsResponseProto);
rpc getContainerReport (GetContainerReportRequestProto) returns (GetContainerReportResponseProto);
rpc getContainers (GetContainersRequestProto) returns (GetContainersResponseProto);
rpc getNewReservation (GetNewReservationRequestProto) returns (GetNewReservationResponseProto);
rpc submitReservation (ReservationSubmissionRequestProto) returns (ReservationSubmissionResponseProto);
rpc updateReservation (ReservationUpdateRequestProto) returns (ReservationUpdateResponseProto);
rpc deleteReservation (ReservationDeleteRequestProto) returns (ReservationDeleteResponseProto);

View File

@ -367,13 +367,20 @@ message ReleaseSharedCacheResourceResponseProto {
// reservation_protocol
//////////////////////////////////////////////////////
message GetNewReservationRequestProto {
}
message GetNewReservationResponseProto {
optional ReservationIdProto reservation_id = 1;
}
message ReservationSubmissionRequestProto {
optional string queue = 1;
optional ReservationDefinitionProto reservation_definition = 2;
optional ReservationIdProto reservation_id = 3;
}
message ReservationSubmissionResponseProto {
optional ReservationIdProto reservation_id = 1;
}
message ReservationUpdateRequestProto {

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
@ -533,6 +534,20 @@ public abstract class YarnClient extends AbstractService {
public abstract void moveApplicationAcrossQueues(ApplicationId appId,
String queue) throws YarnException, IOException;
/**
* <p>
* Obtain a {@link GetNewReservationResponse} for a new reservation,
* which contains the {@link ReservationId} object.
* </p>
*
* @return The {@link GetNewReservationResponse} containing a new
* {@link ReservationId} object.
* @throws YarnException if reservation cannot be created.
* @throws IOException if reservation cannot be created.
*/
public abstract GetNewReservationResponse createReservation()
throws YarnException, IOException;
/**
* <p>
* The interface used by clients to submit a new reservation to the
@ -666,7 +681,7 @@ public abstract class YarnClient extends AbstractService {
* @return response that contains information about reservations that are
* being searched for.
* @throws YarnException if the request is invalid
* @throws IOException
* @throws IOException if the request failed otherwise
*
*/
@Public
@ -725,8 +740,10 @@ public abstract class YarnClient extends AbstractService {
* </p>
*
* @return cluster node labels collection
* @throws YarnException
* @throws IOException
* @throws YarnException when there is a failure in
* {@see ApplicationClientProtocol}
* @throws IOException when there is a failure in
* {@see ApplicationClientProtocol}
*/
@Public
@Unstable

View File

@ -65,6 +65,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
@ -788,6 +790,14 @@ public class YarnClientImpl extends YarnClient {
rmClient.moveApplicationAcrossQueues(request);
}
@Override
public GetNewReservationResponse createReservation() throws YarnException,
IOException {
GetNewReservationRequest request =
Records.newRecord(GetNewReservationRequest.class);
return rmClient.getNewReservation(request);
}
@Override
public ReservationSubmissionResponse submitReservation(
ReservationSubmissionRequest request) throws YarnException, IOException {

View File

@ -67,6 +67,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
@ -988,7 +990,19 @@ public class TestYarnClient {
return appId;
}
private GetNewReservationResponse getNewReservation(YarnClient rmClient) {
GetNewReservationRequest newReservationRequest = GetNewReservationRequest
.newInstance();
GetNewReservationResponse getNewReservationResponse = null;
try {
getNewReservationResponse = rmClient.createReservation();
} catch (Exception e) {
Assert.fail(e.getMessage());
}
return getNewReservationResponse;
}
private void waitTillAccepted(YarnClient rmClient, ApplicationId appId,
boolean unmanagedApplication)
throws Exception {
@ -1179,9 +1193,7 @@ public class TestYarnClient {
}
}
@Test
public void testReservationAPIs() {
// initialize
private MiniYARNCluster setupMiniYARNCluster() {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
ReservationSystemTestUtil.setupQueueConfiguration(conf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
@ -1189,55 +1201,116 @@ public class TestYarnClient {
conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
MiniYARNCluster cluster =
new MiniYARNCluster("testReservationAPIs", 2, 1, 1);
YarnClient client = null;
cluster.init(conf);
cluster.start();
int attempts;
for (attempts = 10; attempts > 0; attempts--) {
if (cluster.getResourceManager().getRMContext().getReservationSystem()
.getPlan(ReservationSystemTestUtil.reservationQ).getTotalCapacity()
.getMemory() > 6000) {
break;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (attempts <= 0) {
Assert.fail("Exhausted attempts in checking if node capacity was "
+ "added to the plan");
}
return cluster;
}
private YarnClient setupYarnClient(MiniYARNCluster cluster) {
final Configuration yarnConf = cluster.getConfig();
YarnClient client = YarnClient.createYarnClient();
client.init(yarnConf);
client.start();
return client;
}
private ReservationSubmissionRequest submitReservationTestHelper(
YarnClient client, long arrival, long deadline, long duration) {
ReservationId reservationID = getNewReservation(client).getReservationId();
ReservationSubmissionRequest sRequest = createSimpleReservationRequest(
reservationID, 4, arrival, deadline, duration);
ReservationSubmissionResponse sResponse = null;
try {
cluster.init(conf);
cluster.start();
final Configuration yarnConf = cluster.getConfig();
client = YarnClient.createYarnClient();
client.init(yarnConf);
client.start();
sResponse = client.submitReservation(sRequest);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(sResponse);
Assert.assertNotNull(reservationID);
System.out.println("Submit reservation response: " + reservationID);
int attempts;
for(attempts = 10; attempts > 0; attempts--) {
if (cluster.getResourceManager().getRMContext().getReservationSystem()
.getPlan(ReservationSystemTestUtil.reservationQ).getTotalCapacity()
.getMemory() > 0) {
break;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (attempts <= 0) {
Assert.fail("Exhausted attempts in checking if node capacity was "
+ "added to the plan");
}
return sRequest;
}
// create a reservation
@Test
public void testCreateReservation() {
MiniYARNCluster cluster = setupMiniYARNCluster();
YarnClient client = setupYarnClient(cluster);
try {
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
createSimpleReservationRequest(4, arrival, deadline, duration);
ReservationSubmissionResponse sResponse = null;
submitReservationTestHelper(client, arrival, deadline, duration);
// Submit the reservation again with the same request and make sure it
// passes.
try {
sResponse = client.submitReservation(sRequest);
client.submitReservation(sRequest);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(sResponse);
ReservationId reservationID = sResponse.getReservationId();
Assert.assertNotNull(reservationID);
System.out.println("Submit reservation response: " + reservationID);
// Update the reservation
// Submit the reservation with the same reservation id but different
// reservation definition, and ensure YarnException is thrown.
arrival = clock.getTime();
ReservationDefinition rDef = sRequest.getReservationDefinition();
rDef.setArrival(arrival + duration);
sRequest.setReservationDefinition(rDef);
try {
client.submitReservation(sRequest);
Assert.fail("Reservation submission should fail if a duplicate "
+ "reservation id is used, but the reservation definition has been "
+ "updated.");
} catch (Exception e) {
Assert.assertTrue(e instanceof YarnException);
}
} finally {
// clean-up
if (client != null) {
client.stop();
}
cluster.stop();
}
}
@Test
public void testUpdateReservation() {
MiniYARNCluster cluster = setupMiniYARNCluster();
YarnClient client = setupYarnClient(cluster);
try {
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(client, arrival, deadline, duration);
ReservationDefinition rDef = sRequest.getReservationDefinition();
ReservationRequest rr =
rDef.getReservationRequests().getReservationResources().get(0);
ReservationId reservationID = sRequest.getReservationId();
rr.setNumContainers(5);
arrival = clock.getTime();
duration = 30000;
@ -1253,14 +1326,73 @@ public class TestYarnClient {
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(sResponse);
Assert.assertNotNull(uResponse);
System.out.println("Update reservation response: " + uResponse);
} finally {
// clean-up
if (client != null) {
client.stop();
}
cluster.stop();
}
}
// List reservations, search by reservation ID
ReservationListRequest request =
ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ,
reservationID.toString(), -1, -1, false);
@Test
public void testListReservationsByReservationId() {
MiniYARNCluster cluster = setupMiniYARNCluster();
YarnClient client = setupYarnClient(cluster);
try {
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(client, arrival, deadline, duration);
ReservationId reservationID = sRequest.getReservationId();
ReservationListResponse response = null;
ReservationListRequest request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, reservationID.toString(), -1,
-1, false);
try {
response = client.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getResourceAllocationRequests().size(), 0);
} finally {
// clean-up
if (client != null) {
client.stop();
}
cluster.stop();
}
}
@Test
public void testListReservationsByTimeInterval() {
MiniYARNCluster cluster = setupMiniYARNCluster();
YarnClient client = setupYarnClient(cluster);
try {
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(client, arrival, deadline, duration);
// List reservations, search by a point in time within the reservation
// range.
arrival = clock.getTime();
ReservationId reservationID = sRequest.getReservationId();
ReservationListRequest request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", arrival + duration / 2,
arrival + duration / 2, true);
ReservationListResponse response = null;
try {
@ -1271,30 +1403,10 @@ public class TestYarnClient {
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getResourceAllocationRequests().size(), 0);
// List reservations, search by time interval.
.getReservationId().getId(), reservationID.getId());
// List reservations, search by time within reservation interval.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", arrival +
duration/2, arrival + duration/2, true);
response = null;
try {
response = client.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
// List reservations, search by invalid end time == -1.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 1, -1,
true);
ReservationSystemTestUtil.reservationQ, "", 1, Long.MAX_VALUE, true);
response = null;
try {
@ -1305,12 +1417,58 @@ public class TestYarnClient {
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
.getReservationId().getId(), reservationID.getId());
// Verify that the full resource allocations exist.
Assert.assertTrue(response.getReservationAllocationState().get(0)
.getResourceAllocationRequests().size() > 0);
// Verify that the full RDL is returned.
ReservationRequests reservationRequests =
response.getReservationAllocationState().get(0)
.getReservationDefinition().getReservationRequests();
Assert.assertTrue(
reservationRequests.getInterpreter().toString().equals("R_ALL"));
Assert.assertTrue(reservationRequests.getReservationResources().get(0)
.getDuration() == duration);
} finally {
// clean-up
if (client != null) {
client.stop();
}
cluster.stop();
}
}
@Test
public void testListReservationsByInvalidTimeInterval() {
MiniYARNCluster cluster = setupMiniYARNCluster();
YarnClient client = setupYarnClient(cluster);
try {
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(client, arrival, deadline, duration);
// List reservations, search by invalid end time == -1.
ReservationListRequest request = ReservationListRequest
.newInstance(ReservationSystemTestUtil.reservationQ, "", 1, -1, true);
ReservationListResponse response = null;
try {
response = client.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), sRequest.getReservationId().getId());
// List reservations, search by invalid end time < -1.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 1, -10,
true);
ReservationSystemTestUtil.reservationQ, "", 1, -10, true);
response = null;
try {
@ -1321,54 +1479,52 @@ public class TestYarnClient {
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
// List reservations, search by time within reservation interval.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 1, Long.MAX_VALUE,
true);
response = null;
try {
response = client.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
.getReservationId().getId(), sRequest.getReservationId().getId());
} finally {
// clean-up
if (client != null) {
client.stop();
}
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
cluster.stop();
}
}
// Verify that the full resource allocations exist.
Assert.assertTrue(response.getReservationAllocationState().get(0)
.getResourceAllocationRequests().size() > 0);
// Verify that the full RDL is returned.
ReservationRequests reservationRequests = response
.getReservationAllocationState().get(0)
.getReservationDefinition().getReservationRequests();
Assert.assertTrue(reservationRequests.getInterpreter().toString()
.equals("R_ALL"));
Assert.assertTrue(reservationRequests.getReservationResources().get(0)
.getDuration() == duration);
@Test
public void testListReservationsByTimeIntervalContainingNoReservations() {
MiniYARNCluster cluster = setupMiniYARNCluster();
YarnClient client = setupYarnClient(cluster);
try {
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(client, arrival, deadline, duration);
// List reservations, search by very large start time.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", Long.MAX_VALUE,
-1, false);
ReservationListRequest request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", Long.MAX_VALUE, -1,
false);
response = null;
ReservationListResponse response = null;
try {
response = client.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
// Ensure all reservations are filtered out.
Assert.assertNotNull(response);
Assert.assertEquals(response.getReservationAllocationState().size(), 0);
duration = 30000;
deadline = sRequest.getReservationDefinition().getDeadline();
// List reservations, search by start time after the reservation
// end time.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", deadline + duration,
deadline + 2 * duration, false);
ReservationSystemTestUtil.reservationQ, "", deadline + duration,
deadline + 2 * duration, false);
response = null;
try {
@ -1381,11 +1537,12 @@ public class TestYarnClient {
Assert.assertNotNull(response);
Assert.assertEquals(response.getReservationAllocationState().size(), 0);
arrival = clock.getTime();
// List reservations, search by end time before the reservation start
// time.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 0, arrival -
duration, false);
ReservationSystemTestUtil.reservationQ, "", 0, arrival - duration,
false);
response = null;
try {
@ -1399,8 +1556,8 @@ public class TestYarnClient {
Assert.assertEquals(response.getReservationAllocationState().size(), 0);
// List reservations, search by very small end time.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 0, 1, false);
request = ReservationListRequest
.newInstance(ReservationSystemTestUtil.reservationQ, "", 0, 1, false);
response = null;
try {
@ -1413,6 +1570,28 @@ public class TestYarnClient {
Assert.assertNotNull(response);
Assert.assertEquals(response.getReservationAllocationState().size(), 0);
} finally {
// clean-up
if (client != null) {
client.stop();
}
cluster.stop();
}
}
@Test
public void testReservationDelete() {
MiniYARNCluster cluster = setupMiniYARNCluster();
YarnClient client = setupYarnClient(cluster);
try {
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(client, arrival, deadline, duration);
ReservationId reservationID = sRequest.getReservationId();
// Delete the reservation
ReservationDeleteRequest dRequest =
ReservationDeleteRequest.newInstance(reservationID);
@ -1422,15 +1601,15 @@ public class TestYarnClient {
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(sResponse);
Assert.assertNotNull(dResponse);
System.out.println("Delete reservation response: " + dResponse);
// List reservations, search by non-existent reservationID
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ,
reservationID.toString(), -1, -1, false);
ReservationListRequest request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, reservationID.toString(), -1,
-1, false);
response = null;
ReservationListResponse response = null;
try {
response = client.listReservations(request);
} catch (Exception e) {
@ -1448,7 +1627,8 @@ public class TestYarnClient {
}
private ReservationSubmissionRequest createSimpleReservationRequest(
int numContainers, long arrival, long deadline, long duration) {
ReservationId reservationId, int numContainers, long arrival,
long deadline, long duration) {
// create a request with a single atomic ask
ReservationRequest r =
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
@ -1461,7 +1641,7 @@ public class TestYarnClient {
"testYarnClient#reservation");
ReservationSubmissionRequest request =
ReservationSubmissionRequest.newInstance(rDef,
ReservationSystemTestUtil.reservationQ);
ReservationSystemTestUtil.reservationQ, reservationId);
return request;
}

View File

@ -59,6 +59,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
@ -113,6 +115,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetLabelsToNodesReques
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetLabelsToNodesResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewReservationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewReservationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNodesToLabelsRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNodesToLabelsResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueInfoRequestPBImpl;
@ -452,6 +456,21 @@ public class ApplicationClientProtocolPBClientImpl implements ApplicationClientP
}
}
@Override
public GetNewReservationResponse getNewReservation(
GetNewReservationRequest request)
throws YarnException, IOException {
YarnServiceProtos.GetNewReservationRequestProto requestProto =
((GetNewReservationRequestPBImpl) request).getProto();
try {
return new GetNewReservationResponsePBImpl(proxy.getNewReservation(null,
requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
@Override
public ReservationSubmissionResponse submitReservation(ReservationSubmissionRequest request)
throws YarnException, IOException {

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
@ -84,6 +85,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetLabelsToNodesReques
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetLabelsToNodesResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewApplicationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewReservationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNewReservationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNodesToLabelsRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetNodesToLabelsResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueInfoRequestPBImpl;
@ -136,6 +139,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLabelsToNodesRequestPro
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetLabelsToNodesResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewReservationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewReservationResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNodesToLabelsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNodesToLabelsResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoRequestProto;
@ -448,6 +453,22 @@ public class ApplicationClientProtocolPBServiceImpl implements ApplicationClient
}
}
@Override
public GetNewReservationResponseProto getNewReservation(
RpcController arg0, GetNewReservationRequestProto proto) throws
ServiceException {
GetNewReservationRequestPBImpl request =
new GetNewReservationRequestPBImpl(proto);
try {
GetNewReservationResponse response = real.getNewReservation(request);
return ((GetNewReservationResponsePBImpl)response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public ReservationSubmissionResponseProto submitReservation(RpcController controller,
ReservationSubmissionRequestProto requestProto) throws ServiceException {

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import com.google.protobuf.TextFormat;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewReservationRequestProto;
/**
* <p>The implementation of the request sent by clients to get a
* new {@code ReservationId} for submitting an reservation.</p>
*
* {@code ApplicationClientProtocol#getNewReservation(GetNewReservationRequest)}
*/
@Private
@Unstable
public class GetNewReservationRequestPBImpl extends GetNewReservationRequest {
private GetNewReservationRequestProto proto =
GetNewReservationRequestProto.getDefaultInstance();
private GetNewReservationRequestProto.Builder builder = null;
private boolean viaProto = false;
public GetNewReservationRequestPBImpl(GetNewReservationRequestProto proto) {
this.proto = proto;
viaProto = true;
}
public GetNewReservationRequestPBImpl() {
builder = GetNewReservationRequestProto.newBuilder();
}
public GetNewReservationRequestProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
}

View File

@ -0,0 +1,144 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import com.google.protobuf.TextFormat;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationIdProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewReservationResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewReservationResponseProtoOrBuilder;
/**
* <p>The implementation of the response sent by the
* <code>ResourceManager</code> to the client for a request to get a new
* {@link ReservationId} for submitting reservations.</p>
*
* <p>Clients can submit an reservation with the returned
* {@link ReservationId}.</p>
*
* {@code ApplicationClientProtocol#getNewReservation(GetNewReservationRequest)}
*/
@Private
@Unstable
public class GetNewReservationResponsePBImpl extends GetNewReservationResponse {
private GetNewReservationResponseProto proto =
GetNewReservationResponseProto.getDefaultInstance();
private GetNewReservationResponseProto.Builder builder = null;
private boolean viaProto = false;
private ReservationId reservationId = null;
public GetNewReservationResponsePBImpl() {
builder = GetNewReservationResponseProto.newBuilder();
}
public GetNewReservationResponsePBImpl(GetNewReservationResponseProto proto) {
this.proto = proto;
viaProto = true;
}
public GetNewReservationResponseProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
private void mergeLocalToBuilder() {
if (reservationId != null) {
builder.setReservationId(convertToProtoFormat(this.reservationId));
}
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = GetNewReservationResponseProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public ReservationId getReservationId() {
if (this.reservationId != null) {
return this.reservationId;
}
GetNewReservationResponseProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasReservationId()) {
return null;
}
this.reservationId = convertFromProtoFormat(p.getReservationId());
return this.reservationId;
}
@Override
public void setReservationId(ReservationId reservationId) {
maybeInitBuilder();
if (reservationId == null) {
builder.clearReservationId();
}
this.reservationId = reservationId;
}
private ReservationIdPBImpl convertFromProtoFormat(ReservationIdProto p) {
return new ReservationIdPBImpl(p);
}
private ReservationIdProto convertToProtoFormat(ReservationId t) {
return ((ReservationIdPBImpl)t).getProto();
}
}

View File

@ -20,8 +20,11 @@ package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationDefinitionProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationIdProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionRequestProtoOrBuilder;
@ -119,6 +122,25 @@ public class ReservationSubmissionRequestPBImpl extends
builder.setQueue(planName);
}
@Override
public ReservationId getReservationId() {
ReservationSubmissionRequestProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasReservationId()) {
return null;
}
return (convertFromProtoFormat(p.getReservationId()));
}
@Override
public void setReservationId(ReservationId reservationId) {
maybeInitBuilder();
if (reservationId == null) {
builder.clearReservationId();
return;
}
builder.setReservationId(convertToProtoFormat(reservationId));
}
private ReservationDefinitionProto convertToProtoFormat(
ReservationDefinition r) {
return ((ReservationDefinitionPBImpl) r).getProto();
@ -129,6 +151,14 @@ public class ReservationSubmissionRequestPBImpl extends
return new ReservationDefinitionPBImpl(r);
}
private ReservationIdProto convertToProtoFormat(ReservationId r) {
return ((ReservationIdPBImpl) r).getProto();
}
private ReservationIdPBImpl convertFromProtoFormat(ReservationIdProto r) {
return new ReservationIdPBImpl(r);
}
@Override
public int hashCode() {
return getProto().hashCode();

View File

@ -19,11 +19,7 @@
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationIdProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionResponseProtoOrBuilder;
import com.google.protobuf.TextFormat;
@ -35,8 +31,6 @@ public class ReservationSubmissionResponsePBImpl extends
ReservationSubmissionResponseProto.Builder builder = null;
boolean viaProto = false;
private ReservationId reservationId;
public ReservationSubmissionResponsePBImpl() {
builder = ReservationSubmissionResponseProto.newBuilder();
}
@ -48,64 +42,11 @@ public class ReservationSubmissionResponsePBImpl extends
}
public ReservationSubmissionResponseProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToBuilder() {
if (this.reservationId != null) {
builder.setReservationId(convertToProtoFormat(this.reservationId));
}
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = ReservationSubmissionResponseProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public ReservationId getReservationId() {
ReservationSubmissionResponseProtoOrBuilder p = viaProto ? proto : builder;
if (reservationId != null) {
return reservationId;
}
if (!p.hasReservationId()) {
return null;
}
reservationId = convertFromProtoFormat(p.getReservationId());
return reservationId;
}
@Override
public void setReservationId(ReservationId reservationId) {
maybeInitBuilder();
if (reservationId == null) {
builder.clearReservationId();
return;
}
this.reservationId = reservationId;
}
private ReservationIdPBImpl convertFromProtoFormat(ReservationIdProto p) {
return new ReservationIdPBImpl(p);
}
private ReservationIdProto convertToProtoFormat(ReservationId t) {
return ((ReservationIdPBImpl) t).getProto();
}
@Override
public int hashCode() {
return getProto().hashCode();

View File

@ -61,6 +61,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
@ -425,6 +427,12 @@ public class MockResourceManagerFacade implements
throw new NotImplementedException();
}
@Override
public GetNewReservationResponse getNewReservation(
GetNewReservationRequest request) throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public ReservationSubmissionResponse submitReservation(
ReservationSubmissionRequest request) throws YarnException,
@ -488,4 +496,4 @@ return null;
FailApplicationAttemptRequest request) throws YarnException, IOException {
throw new NotImplementedException();
}
}
}

View File

@ -79,6 +79,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
@ -1217,6 +1219,19 @@ public class ClientRMService extends AbstractService implements
return this.server;
}
@Override
public GetNewReservationResponse getNewReservation(
GetNewReservationRequest request) throws YarnException, IOException {
checkReservationSytem(AuditConstants.CREATE_NEW_RESERVATION_REQUEST);
GetNewReservationResponse response =
recordFactory.newRecordInstance(GetNewReservationResponse.class);
ReservationId reservationId = reservationSystem.getNewReservationId();
response.setReservationId(reservationId);
// Create a new Reservation Id
return response;
}
@Override
public ReservationSubmissionResponse submitReservation(
ReservationSubmissionRequest request) throws YarnException, IOException {
@ -1224,12 +1239,28 @@ public class ClientRMService extends AbstractService implements
checkReservationSytem(AuditConstants.SUBMIT_RESERVATION_REQUEST);
ReservationSubmissionResponse response =
recordFactory.newRecordInstance(ReservationSubmissionResponse.class);
// Create a new Reservation Id
ReservationId reservationId = reservationSystem.getNewReservationId();
ReservationId reservationId = request.getReservationId();
// Validate the input
Plan plan =
rValidator.validateReservationSubmissionRequest(reservationSystem,
request, reservationId);
ReservationAllocation allocation = plan.getReservationById(reservationId);
if (allocation != null) {
boolean isNewDefinition = !allocation.getReservationDefinition().equals(
request.getReservationDefinition());
if (isNewDefinition) {
String message = "Reservation allocation already exists with the " +
"reservation id " + reservationId.toString() + ", but a different" +
" reservation definition was provided. Please try again with a " +
"new reservation id, or consider updating the reservation instead.";
throw RPCUtil.getRemoteException(message);
} else {
return response;
}
}
// Check ACLs
String queueName = request.getQueue();
String user =
@ -1248,7 +1279,6 @@ public class ClientRMService extends AbstractService implements
refreshScheduler(queueName, request.getReservationDefinition(),
reservationId.toString());
// return the reservation id
response.setReservationId(reservationId);
}
} catch (PlanningException e) {
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_RESERVATION_REQUEST,

View File

@ -69,6 +69,8 @@ public class RMAuditLogger {
public static final String UNAUTHORIZED_USER = "Unauthorized user";
// For Reservation system
public static final String CREATE_NEW_RESERVATION_REQUEST = "Create " +
"Reservation Request";
public static final String SUBMIT_RESERVATION_REQUEST = "Submit Reservation Request";
public static final String UPDATE_RESERVATION_REQUEST = "Update Reservation Request";
public static final String DELETE_RESERVATION_REQUEST = "Delete Reservation Request";

View File

@ -206,10 +206,17 @@ public class ReservationInputValidator {
ReservationSystem reservationSystem,
ReservationSubmissionRequest request, ReservationId reservationId)
throws YarnException {
String message;
if (reservationId == null) {
message = "Reservation id cannot be null. Please try again " +
"specifying a valid reservation id by creating a new reservation id.";
throw RPCUtil.getRemoteException(message);
}
// Check if it is a managed queue
String queue = request.getQueue();
Plan plan = getPlanFromQueue(reservationSystem, queue,
AuditConstants.SUBMIT_RESERVATION_REQUEST);
validateReservationDefinition(reservationId,
request.getReservationDefinition(), plan,
AuditConstants.SUBMIT_RESERVATION_REQUEST);

View File

@ -78,6 +78,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
@ -149,6 +151,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInf
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LocalResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LogAggregationContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
@ -162,7 +165,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDelet
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationListInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionResponseInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateResponseInfo;
@ -1869,6 +1871,61 @@ public class RMWebServices extends WebServices {
return token;
}
/**
* Generates a new ReservationId which is then sent to the client.
*
* @param hsr the servlet request
* @return Response containing the app id and the maximum resource
* capabilities
* @throws AuthorizationException if the user is not authorized
* to invoke this method.
* @throws IOException if creation fails.
* @throws InterruptedException if interrupted.
*/
@POST
@Path("/reservation/new-reservation")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public Response createNewReservation(@Context HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
init();
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
if (callerUGI == null) {
throw new AuthorizationException("Unable to obtain user name, "
+ "user not authenticated");
}
if (UserGroupInformation.isSecurityEnabled() && isStaticUser(callerUGI)) {
String msg = "The default static user cannot carry out this operation.";
return Response.status(Status.FORBIDDEN).entity(msg).build();
}
NewReservation reservationId = createNewReservation();
return Response.status(Status.OK).entity(reservationId).build();
}
/**
* Function that actually creates the {@link ReservationId} by calling the
* ClientRMService.
*
* @return returns structure containing the {@link ReservationId}
* @throws IOException if creation fails.
*/
private NewReservation createNewReservation() throws IOException {
GetNewReservationRequest req =
recordFactory.newRecordInstance(GetNewReservationRequest.class);
GetNewReservationResponse resp;
try {
resp = rm.getClientRMService().getNewReservation(req);
} catch (YarnException e) {
String msg = "Unable to create new reservation from RM web service";
LOG.error(msg, e);
throw new YarnRuntimeException(msg, e);
}
NewReservation reservationId =
new NewReservation(resp.getReservationId().toString());
return reservationId;
}
/**
* Function to submit a Reservation to the RM.
*
@ -1903,19 +1960,15 @@ public class RMWebServices extends WebServices {
final ReservationSubmissionRequest reservation =
createReservationSubmissionRequest(resContext);
ReservationSubmissionResponseInfo resRespInfo;
try {
resRespInfo =
callerUGI.doAs(
new PrivilegedExceptionAction<ReservationSubmissionResponseInfo>() {
@Override
public ReservationSubmissionResponseInfo run()
throws IOException, YarnException {
ReservationSubmissionResponse tempRes =
rm.getClientRMService().submitReservation(reservation);
return new ReservationSubmissionResponseInfo(tempRes);
}
});
callerUGI
.doAs(new PrivilegedExceptionAction<ReservationSubmissionResponse>() {
@Override
public ReservationSubmissionResponse run() throws IOException,
YarnException {
return rm.getClientRMService().submitReservation(reservation);
}
});
} catch (UndeclaredThrowableException ue) {
if (ue.getCause() instanceof YarnException) {
throw new BadRequestException(ue.getCause().getMessage());
@ -1924,11 +1977,11 @@ public class RMWebServices extends WebServices {
throw ue;
}
return Response.status(Status.OK).entity(resRespInfo).build();
return Response.status(Status.ACCEPTED).build();
}
private ReservationSubmissionRequest createReservationSubmissionRequest(
ReservationSubmissionRequestInfo resContext) {
ReservationSubmissionRequestInfo resContext) throws IOException {
// defending against a couple of common submission format problems
if (resContext == null) {
@ -1972,8 +2025,12 @@ public class RMWebServices extends WebServices {
ReservationDefinition rDef =
ReservationDefinition.newInstance(resInfo.getArrival(),
resInfo.getDeadline(), reqs, resInfo.getReservationName());
ReservationId reservationId = ReservationId.parseReservationId(resContext
.getReservationId());
ReservationSubmissionRequest request =
ReservationSubmissionRequest.newInstance(rDef, resContext.getQueue());
ReservationSubmissionRequest.newInstance(rDef, resContext.getQueue(),
reservationId);
return request;
}
@ -2095,9 +2152,13 @@ public class RMWebServices extends WebServices {
* the ReservationDeleteRequest
* @param hsr the servlet request
* @return Response containing the status code
* @throws AuthorizationException
* @throws IOException
* @throws InterruptedException
* @throws AuthorizationException when the user group information cannot be
* retrieved.
* @throws IOException when a {@link ReservationDeleteRequest} cannot be
* created from the {@link ReservationDeleteRequestInfo}. This
* exception is also thrown on
* {@code ClientRMService.deleteReservation} invokation failure.
* @throws InterruptedException if doAs action throws an InterruptedException.
*/
@POST
@Path("/reservation/delete")

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
/**
* <p>The response sent by the <code>ResourceManager</code> to the client for
* a request to get a new {@code ReservationId} for submitting reservations
* using the REST API.</p>
*
* <p>Clients can submit a reservation with the returned {@code ReservationId}.
* </p>
*
* {@code RMWebServices#createNewReservation(HttpServletRequest)}
*/
@XmlRootElement(name="new-reservation")
@XmlAccessorType(XmlAccessType.FIELD)
public class NewReservation {
@XmlElement(name="reservation-id")
private String reservationId;
public NewReservation() {
reservationId = "";
}
public NewReservation(String resId) {
reservationId = resId;
}
public String getReservationId() {
return reservationId;
}
}

View File

@ -37,6 +37,9 @@ public class ReservationSubmissionRequestInfo {
@XmlElement(name = "reservation-definition")
private ReservationDefinitionInfo reservationDefinition;
@XmlElement(name = "reservation-id")
private String reservationId;
public ReservationSubmissionRequestInfo() {
}
@ -48,6 +51,14 @@ public class ReservationSubmissionRequestInfo {
this.queue = queue;
}
public String getReservationId() {
return reservationId;
}
public void setReservationId(String reservationId) {
this.reservationId = reservationId;
}
public ReservationDefinitionInfo getReservationDefinition() {
return reservationDefinition;
}

View File

@ -1,54 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
/**
* Simple class that represent a response to a reservation submission.
*/
@XmlRootElement(name = "reservation-submission-response")
@XmlAccessorType(XmlAccessType.FIELD)
public class ReservationSubmissionResponseInfo {
@XmlElement(name = "reservation-id")
private String reservationId;
public ReservationSubmissionResponseInfo() {
}
public ReservationSubmissionResponseInfo(
ReservationSubmissionResponse response) {
this.reservationId = response.getReservationId().toString();
}
public String getReservationId() {
return reservationId;
}
public void setReservationId(String reservationId) {
this.reservationId = reservationId;
}
}

View File

@ -32,6 +32,8 @@ import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
@ -242,8 +244,8 @@ public class ReservationACLsTestBase extends ACLsTestBase {
private void verifySubmitReservationSuccess(String submitter, String
queueName) throws Exception {
ReservationId reservationId =
submitReservation(submitter, queueName);
ReservationId reservationId = createReservation(submitter);
submitReservation(submitter, queueName, reservationId);
deleteReservation(submitter, reservationId);
}
@ -251,7 +253,8 @@ public class ReservationACLsTestBase extends ACLsTestBase {
private void verifySubmitReservationFailure(String submitter, String
queueName) throws Exception {
try {
submitReservation(submitter, queueName);
ReservationId reservationId = createReservation(submitter);
submitReservation(submitter, queueName, reservationId);
Assert.fail("Submit reservation by the enemy should fail!");
} catch (YarnException e) {
handleAdministerException(e, submitter, queueName, ReservationACL
@ -261,8 +264,8 @@ public class ReservationACLsTestBase extends ACLsTestBase {
private void verifyListReservationSuccess(String lister, String
originalSubmitter, String queueName) throws Exception {
ReservationId reservationId =
submitReservation(originalSubmitter, queueName);
ReservationId reservationId = createReservation(originalSubmitter);
submitReservation(originalSubmitter, queueName, reservationId);
ReservationListResponse adminResponse = listReservation(lister, queueName);
@ -275,8 +278,8 @@ public class ReservationACLsTestBase extends ACLsTestBase {
private void verifyListReservationFailure(String lister,
String originalSubmitter, String queueName) throws Exception {
ReservationId reservationId =
submitReservation(originalSubmitter, queueName);
ReservationId reservationId = createReservation(originalSubmitter);
submitReservation(originalSubmitter, queueName, reservationId);
try {
listReservation(lister, queueName);
@ -291,8 +294,8 @@ public class ReservationACLsTestBase extends ACLsTestBase {
private void verifyListReservationByIdSuccess(String lister, String
originalSubmitter, String queueName) throws Exception {
ReservationId reservationId =
submitReservation(originalSubmitter, queueName);
ReservationId reservationId = createReservation(originalSubmitter);
submitReservation(originalSubmitter, queueName, reservationId);
ReservationListResponse adminResponse = listReservationById(lister,
reservationId, queueName);
@ -306,8 +309,8 @@ public class ReservationACLsTestBase extends ACLsTestBase {
private void verifyListReservationByIdFailure(String lister,
String originalSubmitter, String queueName) throws Exception {
ReservationId reservationId =
submitReservation(originalSubmitter, queueName);
ReservationId reservationId = createReservation(originalSubmitter);
submitReservation(originalSubmitter, queueName, reservationId);
try {
listReservationById(lister, reservationId, queueName);
Assert.fail("List reservation by the enemy should fail!");
@ -321,8 +324,8 @@ public class ReservationACLsTestBase extends ACLsTestBase {
private void verifyDeleteReservationSuccess(String killer,
String originalSubmitter, String queueName) throws Exception {
ReservationId reservationId =
submitReservation(originalSubmitter, queueName);
ReservationId reservationId = createReservation(originalSubmitter);
submitReservation(originalSubmitter, queueName, reservationId);
deleteReservation(killer, reservationId);
}
@ -330,8 +333,8 @@ public class ReservationACLsTestBase extends ACLsTestBase {
private void verifyDeleteReservationFailure(String killer,
String originalSubmitter, String queueName) throws Exception {
ReservationId reservationId =
submitReservation(originalSubmitter, queueName);
ReservationId reservationId = createReservation(originalSubmitter);
submitReservation(originalSubmitter, queueName, reservationId);
try {
deleteReservation(killer, reservationId);
@ -346,8 +349,8 @@ public class ReservationACLsTestBase extends ACLsTestBase {
private void verifyUpdateReservationSuccess(String updater,
String originalSubmitter, String queueName) throws Exception {
ReservationId reservationId =
submitReservation(originalSubmitter, queueName);
ReservationId reservationId = createReservation(originalSubmitter);
submitReservation(originalSubmitter, queueName, reservationId);
final ReservationUpdateRequest updateRequest =
ReservationUpdateRequest.newInstance(
@ -362,8 +365,8 @@ public class ReservationACLsTestBase extends ACLsTestBase {
private void verifyUpdateReservationFailure(String updater,
String originalSubmitter, String queueName) throws Exception {
ReservationId reservationId =
submitReservation(originalSubmitter, queueName);
ReservationId reservationId = createReservation(originalSubmitter);
submitReservation(originalSubmitter, queueName, reservationId);
final ReservationUpdateRequest updateRequest =
ReservationUpdateRequest.newInstance(
@ -422,17 +425,27 @@ public class ReservationACLsTestBase extends ACLsTestBase {
deleteClient.deleteReservation(deleteRequest);
}
private ReservationId submitReservation(String submitter,
String queueName) throws Exception {
private ReservationId createReservation(String creator) throws Exception {
ApplicationClientProtocol creatorClient = getRMClientForUser(creator);
GetNewReservationRequest getNewReservationRequest =
GetNewReservationRequest.newInstance();
GetNewReservationResponse response = creatorClient
.getNewReservation(getNewReservationRequest);
return response.getReservationId();
}
private void submitReservation(String submitter,
String queueName, ReservationId reservationId) throws Exception {
ApplicationClientProtocol submitterClient = getRMClientForUser(submitter);
ReservationSubmissionRequest reservationSubmissionRequest =
ReservationSubmissionRequest.newInstance(
makeSimpleReservationDefinition(), queueName);
ReservationSubmissionRequest.newInstance(
makeSimpleReservationDefinition(), queueName, reservationId);
ReservationSubmissionResponse response = submitterClient
.submitReservation(reservationSubmissionRequest);
return response.getReservationId();
}
private void handleAdministerException(Exception e, String user, String

View File

@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
@ -1091,9 +1092,7 @@ public class TestClientRMService {
return yarnScheduler;
}
@Test
public void testReservationAPIs() {
// initialize
private ResourceManager setupResourceManager() {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
ReservationSystemTestUtil.setupQueueConfiguration(conf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
@ -1101,41 +1100,95 @@ public class TestClientRMService {
conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
MockRM rm = new MockRM(conf);
rm.start();
MockNM nm;
try {
nm = rm.registerNode("127.0.0.1:1", 102400, 100);
rm.registerNode("127.0.0.1:1", 102400, 100);
// allow plan follower to synchronize
Thread.sleep(1050);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
return rm;
}
// Create a client.
ClientRMService clientService = rm.getClientRMService();
// create a reservation
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
ReservationSystemTestUtil.createSimpleReservationRequest(4, arrival,
deadline, duration);
private ReservationSubmissionRequest submitReservationTestHelper(
ClientRMService clientService, long arrival, long deadline,
long duration) {
ReservationSubmissionResponse sResponse = null;
GetNewReservationRequest newReservationRequest =
GetNewReservationRequest.newInstance();
ReservationId reservationID = null;
try {
reservationID = clientService.getNewReservation(newReservationRequest)
.getReservationId();
} catch (Exception e) {
Assert.fail(e.getMessage());
}
ReservationSubmissionRequest sRequest =
ReservationSystemTestUtil.createSimpleReservationRequest(reservationID,
4, arrival, deadline, duration);
try {
sResponse = clientService.submitReservation(sRequest);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(sResponse);
ReservationId reservationID = sResponse.getReservationId();
Assert.assertNotNull(reservationID);
LOG.info("Submit reservation response: " + reservationID);
System.out.println("Submit reservation response: " + reservationID);
return sRequest;
}
@Test
public void testCreateReservation() {
ResourceManager rm = setupResourceManager();
ClientRMService clientService = rm.getClientRMService();
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(clientService, arrival, deadline, duration);
// Submit the reservation again with the same request and make sure it
// passes.
try {
clientService.submitReservation(sRequest);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
// Submit the reservation with the same reservation id but different
// reservation definition, and ensure YarnException is thrown.
arrival = clock.getTime();
ReservationDefinition rDef = sRequest.getReservationDefinition();
rDef.setArrival(arrival + duration);
sRequest.setReservationDefinition(rDef);
try {
clientService.submitReservation(sRequest);
Assert.fail("Reservation submission should fail if a duplicate "
+ "reservation id is used, but the reservation definition has been "
+ "updated.");
} catch (Exception e) {
Assert.assertTrue(e instanceof YarnException);
}
rm.stop();
}
@Test
public void testUpdateReservation() {
ResourceManager rm = setupResourceManager();
ClientRMService clientService = rm.getClientRMService();
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(clientService, arrival, deadline, duration);
// Update the reservation
ReservationDefinition rDef = sRequest.getReservationDefinition();
ReservationRequest rr =
rDef.getReservationRequests().getReservationResources().get(0);
ReservationId reservationID = sRequest.getReservationId();
rr.setNumContainers(5);
arrival = clock.getTime();
duration = 30000;
@ -1151,14 +1204,61 @@ public class TestClientRMService {
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(sResponse);
LOG.info("Update reservation response: " + uResponse);
Assert.assertNotNull(uResponse);
System.out.println("Update reservation response: " + uResponse);
// List reservations, search by reservation ID
ReservationListRequest request =
ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ,
reservationID.toString(), -1, -1, false);
rm.stop();
}
@Test
public void testListReservationsByReservationId() {
ResourceManager rm = setupResourceManager();
ClientRMService clientService = rm.getClientRMService();
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(clientService, arrival, deadline, duration);
ReservationId reservationID = sRequest.getReservationId();
ReservationListResponse response = null;
ReservationListRequest request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, reservationID.toString(), -1,
-1, false);
try {
response = clientService.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getResourceAllocationRequests().size(), 0);
rm.stop();
}
@Test
public void testListReservationsByTimeInterval() {
ResourceManager rm = setupResourceManager();
ClientRMService clientService = rm.getClientRMService();
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(clientService, arrival, deadline, duration);
// List reservations, search by a point in time within the reservation
// range.
arrival = clock.getTime();
ReservationId reservationID = sRequest.getReservationId();
ReservationListRequest request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", arrival + duration / 2,
arrival + duration / 2, true);
ReservationListResponse response = null;
try {
@ -1167,16 +1267,12 @@ public class TestClientRMService {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertEquals(response.getReservationAllocationState().size(), 1);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getResourceAllocationRequests().size(), 0);
.getReservationId().getId(), reservationID.getId());
// List reservations, search by time within reservation interval.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 1, Long.MAX_VALUE,
true);
ReservationSystemTestUtil.reservationQ, "", 1, Long.MAX_VALUE, true);
response = null;
try {
@ -1187,73 +1283,52 @@ public class TestClientRMService {
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
// List reservations, search by invalid end time == -1.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 1, -1,
true);
response = null;
try {
response = clientService.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
// List reservations, search by invalid end time < -1.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 1, -10,
true);
response = null;
try {
response = clientService.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
// List reservations, search by time interval.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", arrival +
duration/2, arrival + duration/2, true);
response = null;
try {
response = clientService.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
.getReservationId().getId(), reservationID.getId());
// Verify that the full resource allocations exist.
Assert.assertTrue(response.getReservationAllocationState().get(0)
.getResourceAllocationRequests().size() > 0);
.getResourceAllocationRequests().size() > 0);
// Verify that the full RDL is returned.
ReservationRequests reservationRequests = response
.getReservationAllocationState().get(0).getReservationDefinition()
.getReservationRequests();
Assert.assertTrue(reservationRequests.getInterpreter().toString()
.equals("R_ALL"));
ReservationRequests reservationRequests =
response.getReservationAllocationState().get(0)
.getReservationDefinition().getReservationRequests();
Assert.assertTrue(
reservationRequests.getInterpreter().toString().equals("R_ALL"));
Assert.assertTrue(reservationRequests.getReservationResources().get(0)
.getDuration() == duration);
.getDuration() == duration);
// List reservations, search by a very large start time.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", Long.MAX_VALUE,
-1, false);
rm.stop();
}
@Test
public void testListReservationsByInvalidTimeInterval() {
ResourceManager rm = setupResourceManager();
ClientRMService clientService = rm.getClientRMService();
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(clientService, arrival, deadline, duration);
// List reservations, search by invalid end time == -1.
ReservationListRequest request = ReservationListRequest
.newInstance(ReservationSystemTestUtil.reservationQ, "", 1, -1, true);
ReservationListResponse response = null;
try {
response = clientService.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), sRequest.getReservationId().getId());
// List reservations, search by invalid end time < -1.
request = ReservationListRequest
.newInstance(ReservationSystemTestUtil.reservationQ, "", 1, -10, true);
response = null;
try {
@ -1261,16 +1336,48 @@ public class TestClientRMService {
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), sRequest.getReservationId().getId());
rm.stop();
}
@Test
public void testListReservationsByTimeIntervalContainingNoReservations() {
ResourceManager rm = setupResourceManager();
ClientRMService clientService = rm.getClientRMService();
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(clientService, arrival, deadline, duration);
// List reservations, search by very large start time.
ReservationListRequest request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", Long.MAX_VALUE, -1, false);
ReservationListResponse response = null;
try {
response = clientService.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
// Ensure all reservations are filtered out.
Assert.assertNotNull(response);
Assert.assertEquals(0, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().size(), 0);
duration = 30000;
deadline = sRequest.getReservationDefinition().getDeadline();
// List reservations, search by start time after the reservation
// end time.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", deadline + duration,
deadline + 2 * duration, false);
ReservationSystemTestUtil.reservationQ, "", deadline + duration,
deadline + 2 * duration, false);
response = null;
try {
@ -1283,11 +1390,12 @@ public class TestClientRMService {
Assert.assertNotNull(response);
Assert.assertEquals(response.getReservationAllocationState().size(), 0);
// List reservations, search by end time before the reservation start
arrival = clock.getTime();
// List reservations, search by end time before the reservation start
// time.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 0, arrival -
duration, false);
ReservationSystemTestUtil.reservationQ, "", 0, arrival - duration,
false);
response = null;
try {
@ -1300,10 +1408,9 @@ public class TestClientRMService {
Assert.assertNotNull(response);
Assert.assertEquals(response.getReservationAllocationState().size(), 0);
// List reservations, search by a very small end time.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 0, 1,
false);
// List reservations, search by very small end time.
request = ReservationListRequest
.newInstance(ReservationSystemTestUtil.reservationQ, "", 0, 1, false);
response = null;
try {
@ -1316,6 +1423,21 @@ public class TestClientRMService {
Assert.assertNotNull(response);
Assert.assertEquals(response.getReservationAllocationState().size(), 0);
rm.stop();
}
@Test
public void testReservationDelete() {
ResourceManager rm = setupResourceManager();
ClientRMService clientService = rm.getClientRMService();
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(clientService, arrival, deadline, duration);
ReservationId reservationID = sRequest.getReservationId();
// Delete the reservation
ReservationDeleteRequest dRequest =
ReservationDeleteRequest.newInstance(reservationID);
@ -1325,15 +1447,15 @@ public class TestClientRMService {
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(sResponse);
LOG.info("Delete reservation response: " + dResponse);
Assert.assertNotNull(dResponse);
System.out.println("Delete reservation response: " + dResponse);
// List reservations, search by non-existent reservationID
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, reservationID.toString(),
-1, -1, false);
ReservationListRequest request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, reservationID.toString(), -1,
-1, false);
response = null;
ReservationListResponse response = null;
try {
response = clientService.listReservations(request);
} catch (Exception e) {
@ -1342,10 +1464,7 @@ public class TestClientRMService {
Assert.assertNotNull(response);
Assert.assertEquals(0, response.getReservationAllocationState().size());
// clean-up
rm.stop();
nm = null;
rm = null;
}
@Test

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
@ -65,8 +67,11 @@ public class TestReservationSystemWithRMHA extends RMHATestBase {
ClientRMService clientService = rm1.getClientRMService();
ReservationId reservationID = getNewReservation(clientService)
.getReservationId();
// create a reservation
ReservationSubmissionRequest request = createReservationSubmissionRequest();
ReservationSubmissionRequest request = createReservationSubmissionRequest(
reservationID);
ReservationSubmissionResponse response = null;
try {
response = clientService.submitReservation(request);
@ -74,7 +79,6 @@ public class TestReservationSystemWithRMHA extends RMHATestBase {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
ReservationId reservationID = response.getReservationId();
Assert.assertNotNull(reservationID);
LOG.info("Submit reservation response: " + reservationID);
@ -98,8 +102,12 @@ public class TestReservationSystemWithRMHA extends RMHATestBase {
ClientRMService clientService = rm1.getClientRMService();
ReservationId reservationID = getNewReservation(clientService)
.getReservationId();
// create a reservation
ReservationSubmissionRequest request = createReservationSubmissionRequest();
ReservationSubmissionRequest request = createReservationSubmissionRequest(
reservationID);
ReservationSubmissionResponse response = null;
try {
response = clientService.submitReservation(request);
@ -107,7 +115,6 @@ public class TestReservationSystemWithRMHA extends RMHATestBase {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
ReservationId reservationID = response.getReservationId();
Assert.assertNotNull(reservationID);
LOG.info("Submit reservation response: " + reservationID);
ReservationDefinition reservationDefinition =
@ -144,8 +151,12 @@ public class TestReservationSystemWithRMHA extends RMHATestBase {
ClientRMService clientService = rm1.getClientRMService();
ReservationId reservationID = getNewReservation(clientService)
.getReservationId();
// create a reservation
ReservationSubmissionRequest request = createReservationSubmissionRequest();
ReservationSubmissionRequest request = createReservationSubmissionRequest(
reservationID);
ReservationSubmissionResponse response = null;
try {
response = clientService.submitReservation(request);
@ -153,7 +164,6 @@ public class TestReservationSystemWithRMHA extends RMHATestBase {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
ReservationId reservationID = response.getReservationId();
Assert.assertNotNull(reservationID);
// Delete the reservation
@ -199,13 +209,14 @@ public class TestReservationSystemWithRMHA extends RMHATestBase {
}
}
private ReservationSubmissionRequest createReservationSubmissionRequest() {
private ReservationSubmissionRequest createReservationSubmissionRequest(
ReservationId reservationId) {
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + duration + 1500);
return ReservationSystemTestUtil.createSimpleReservationRequest(4, arrival,
deadline, duration);
return ReservationSystemTestUtil.createSimpleReservationRequest(
reservationId, 4, arrival, deadline, duration);
}
private void validateReservation(Plan plan, ReservationId resId,
@ -224,8 +235,12 @@ public class TestReservationSystemWithRMHA extends RMHATestBase {
ClientRMService clientService = rm1.getClientRMService();
ReservationId reservationID = getNewReservation(clientService)
.getReservationId();
// create a reservation
ReservationSubmissionRequest request = createReservationSubmissionRequest();
ReservationSubmissionRequest request = createReservationSubmissionRequest(
reservationID);
ReservationSubmissionResponse response = null;
try {
response = clientService.submitReservation(request);
@ -233,7 +248,6 @@ public class TestReservationSystemWithRMHA extends RMHATestBase {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
ReservationId reservationID = response.getReservationId();
Assert.assertNotNull(reservationID);
LOG.info("Submit reservation response: " + reservationID);
ReservationDefinition reservationDefinition =
@ -273,10 +287,14 @@ public class TestReservationSystemWithRMHA extends RMHATestBase {
explicitFailover();
addNodeCapacityToPlan(rm2, 102400, 100);
ClientRMService clientService = rm2.getClientRMService();
ReservationId reservationID = getNewReservation(clientService)
.getReservationId();
// create a reservation
ClientRMService clientService = rm2.getClientRMService();
ReservationSubmissionRequest request = createReservationSubmissionRequest();
ReservationSubmissionRequest request = createReservationSubmissionRequest(
reservationID);
ReservationSubmissionResponse response = null;
try {
response = clientService.submitReservation(request);
@ -284,7 +302,6 @@ public class TestReservationSystemWithRMHA extends RMHATestBase {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
ReservationId reservationID = response.getReservationId();
Assert.assertNotNull(reservationID);
LOG.info("Submit reservation response: " + reservationID);
ReservationDefinition reservationDefinition =
@ -304,8 +321,12 @@ public class TestReservationSystemWithRMHA extends RMHATestBase {
ClientRMService clientService = rm1.getClientRMService();
ReservationId reservationID = getNewReservation(clientService)
.getReservationId();
// create a reservation
ReservationSubmissionRequest request = createReservationSubmissionRequest();
ReservationSubmissionRequest request = createReservationSubmissionRequest(
reservationID);
ReservationSubmissionResponse response = null;
try {
response = clientService.submitReservation(request);
@ -313,7 +334,6 @@ public class TestReservationSystemWithRMHA extends RMHATestBase {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
ReservationId reservationID = response.getReservationId();
Assert.assertNotNull(reservationID);
LOG.info("Submit reservation response: " + reservationID);
ReservationDefinition reservationDefinition =
@ -353,8 +373,12 @@ public class TestReservationSystemWithRMHA extends RMHATestBase {
ClientRMService clientService = rm1.getClientRMService();
ReservationId reservationID = getNewReservation(clientService)
.getReservationId();
// create a reservation
ReservationSubmissionRequest request = createReservationSubmissionRequest();
ReservationSubmissionRequest request = createReservationSubmissionRequest(
reservationID);
ReservationSubmissionResponse response = null;
try {
response = clientService.submitReservation(request);
@ -362,7 +386,6 @@ public class TestReservationSystemWithRMHA extends RMHATestBase {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
ReservationId reservationID = response.getReservationId();
Assert.assertNotNull(reservationID);
LOG.info("Submit reservation response: " + reservationID);
ReservationDefinition reservationDefinition =
@ -419,8 +442,12 @@ public class TestReservationSystemWithRMHA extends RMHATestBase {
ClientRMService clientService = rm1.getClientRMService();
// create 3 reservations
ReservationSubmissionRequest request = createReservationSubmissionRequest();
ReservationId resID1 = getNewReservation(clientService)
.getReservationId();
// create a reservation
ReservationSubmissionRequest request = createReservationSubmissionRequest(
resID1);
ReservationDefinition reservationDefinition =
request.getReservationDefinition();
ReservationSubmissionResponse response = null;
@ -430,25 +457,30 @@ public class TestReservationSystemWithRMHA extends RMHATestBase {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
ReservationId resID1 = response.getReservationId();
Assert.assertNotNull(resID1);
LOG.info("Submit reservation response: " + resID1);
ReservationId resID2 = getNewReservation(clientService)
.getReservationId();
request.setReservationId(resID2);
try {
response = clientService.submitReservation(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
ReservationId resID2 = response.getReservationId();
Assert.assertNotNull(resID2);
LOG.info("Submit reservation response: " + resID2);
ReservationId resID3 = getNewReservation(clientService)
.getReservationId();
request.setReservationId(resID3);
try {
response = clientService.submitReservation(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
ReservationId resID3 = response.getReservationId();
Assert.assertNotNull(resID3);
LOG.info("Submit reservation response: " + resID3);
@ -515,4 +547,18 @@ public class TestReservationSystemWithRMHA extends RMHATestBase {
}
}
private GetNewReservationResponse getNewReservation(ClientRMService
clientRMService) {
GetNewReservationRequest newReservationRequest = GetNewReservationRequest
.newInstance();
GetNewReservationResponse getNewReservationResponse = null;
try {
getNewReservationResponse = clientRMService.getNewReservation(
newReservationRequest);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
return getNewReservationResponse;
}
}

View File

@ -197,7 +197,8 @@ public class ReservationSystemTestUtil {
}
public static ReservationSubmissionRequest createSimpleReservationRequest(
int numContainers, long arrival, long deadline, long duration) {
ReservationId reservationId, int numContainers, long arrival,
long deadline, long duration) {
// create a request with a single atomic ask
ReservationRequest r =
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
@ -210,7 +211,7 @@ public class ReservationSystemTestUtil {
"testClientRMService#reservation");
ReservationSubmissionRequest request =
ReservationSubmissionRequest.newInstance(rDef,
reservationQ);
reservationQ, reservationId);
return request;
}

View File

@ -19,8 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
@ -102,6 +100,8 @@ public class TestRMWebServicesReservation extends JerseyTestBase {
// This is what is used in the test resource files.
private static final String DEFAULT_QUEUE = "dedicated";
private static final String LIST_RESERVATION_PATH = "reservation/list";
private static final String GET_NEW_RESERVATION_PATH =
"reservation/new-reservation";
public static class GuiceServletConfig extends GuiceServletContextListener {
@ -330,24 +330,88 @@ public class TestRMWebServicesReservation extends JerseyTestBase {
public void testSubmitReservation() throws Exception {
rm.start();
setupCluster(100);
ReservationId rid =
testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON);
ReservationId rid = getReservationIdTestHelper(1);
ClientResponse response = reservationSubmissionTestHelper(
"reservation/submit", MediaType.APPLICATION_JSON, rid);
if (this.isAuthenticationEnabled()) {
assertNotNull(rid);
assertTrue(isHttpSuccessResponse(response));
verifyReservationCount(1);
}
rm.stop();
}
@Test
public void testSubmitDuplicateReservation() throws Exception {
rm.start();
setupCluster(100);
ReservationId rid = getReservationIdTestHelper(1);
long currentTimestamp = clock.getTime() + MINIMUM_RESOURCE_DURATION;
ClientResponse response = reservationSubmissionTestHelper(
"reservation/submit", MediaType.APPLICATION_JSON, currentTimestamp, "",
rid);
// Make sure that the first submission is successful
if (this.isAuthenticationEnabled()) {
assertTrue(isHttpSuccessResponse(response));
}
response = reservationSubmissionTestHelper(
"reservation/submit", MediaType.APPLICATION_JSON, currentTimestamp, "",
rid);
// Make sure that the second submission is successful
if (this.isAuthenticationEnabled()) {
assertTrue(isHttpSuccessResponse(response));
verifyReservationCount(1);
}
rm.stop();
}
@Test
public void testSubmitDifferentReservationWithSameId() throws Exception {
rm.start();
setupCluster(100);
ReservationId rid = getReservationIdTestHelper(1);
long currentTimestamp = clock.getTime() + MINIMUM_RESOURCE_DURATION;
ClientResponse response = reservationSubmissionTestHelper(
"reservation/submit", MediaType.APPLICATION_JSON, currentTimestamp,
"res1", rid);
// Make sure that the first submission is successful
if (this.isAuthenticationEnabled()) {
assertTrue(isHttpSuccessResponse(response));
}
// Change the reservation definition.
response = reservationSubmissionTestHelper(
"reservation/submit", MediaType.APPLICATION_JSON,
currentTimestamp + MINIMUM_RESOURCE_DURATION, "res1", rid);
// Make sure that the second submission is unsuccessful
if (this.isAuthenticationEnabled()) {
assertTrue(!isHttpSuccessResponse(response));
verifyReservationCount(1);
}
rm.stop();
}
@Test
public void testFailedSubmitReservation() throws Exception {
rm.start();
// setup a cluster too small to accept the reservation
setupCluster(1);
ReservationId rid =
testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON);
assertNull(rid);
ReservationId rid = getReservationIdTestHelper(1);
ClientResponse response = reservationSubmissionTestHelper(
"reservation/submit", MediaType.APPLICATION_JSON, rid);
assertTrue(!isHttpSuccessResponse(response));
rm.stop();
}
@ -355,13 +419,14 @@ public class TestRMWebServicesReservation extends JerseyTestBase {
public void testUpdateReservation() throws JSONException, Exception {
rm.start();
setupCluster(100);
ReservationId rid =
testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON);
ReservationId rid = getReservationIdTestHelper(1);
ClientResponse response = reservationSubmissionTestHelper(
"reservation/submit", MediaType.APPLICATION_JSON, rid);
if (this.isAuthenticationEnabled()) {
assertNotNull(rid);
assertTrue(isHttpSuccessResponse(response));
}
testUpdateReservationHelper("reservation/update", rid,
updateReservationTestHelper("reservation/update", rid,
MediaType.APPLICATION_JSON);
rm.stop();
@ -373,11 +438,15 @@ public class TestRMWebServicesReservation extends JerseyTestBase {
setupCluster(100);
long time = clock.getTime() + MINIMUM_RESOURCE_DURATION;
testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON, time, "res_1", 1);
testSubmissionReservationHelper("reservation/submit",
ReservationId id1 = getReservationIdTestHelper(1);
ReservationId id2 = getReservationIdTestHelper(2);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time, "res_1", id1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time + MINIMUM_RESOURCE_DURATION,
"res_2", 2);
"res_2", id2);
WebResource resource = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("start-time", Long.toString((long) (time * 0.9)))
@ -410,11 +479,19 @@ public class TestRMWebServicesReservation extends JerseyTestBase {
long time = clock.getTime() + MINIMUM_RESOURCE_DURATION;
testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON, time, "res_1", 1);
testSubmissionReservationHelper("reservation/submit",
ReservationId id1 = getReservationIdTestHelper(1);
ReservationId id2 = getReservationIdTestHelper(2);
// If authentication is not enabled then id1 and id2 will be null
if (!this.isAuthenticationEnabled() && id1 == null && id2 == null) {
return;
}
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time, "res_1", id1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time + MINIMUM_RESOURCE_DURATION,
"res_2", 2);
"res_2", id2);
String timeParam = Long.toString(time + MINIMUM_RESOURCE_DURATION / 2);
WebResource resource = constructWebResource(LIST_RESERVATION_PATH)
@ -447,11 +524,14 @@ public class TestRMWebServicesReservation extends JerseyTestBase {
long time = clock.getTime() + MINIMUM_RESOURCE_DURATION;
ReservationId res1 = testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON, time, "res_1", 1);
ReservationId res2 = testSubmissionReservationHelper("reservation/submit",
ReservationId id1 = getReservationIdTestHelper(1);
ReservationId id2 = getReservationIdTestHelper(2);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time, "res_1", id1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time + MINIMUM_RESOURCE_DURATION,
"res_2", 2);
"res_2", id2);
WebResource resource;
resource = constructWebResource(LIST_RESERVATION_PATH)
@ -483,11 +563,14 @@ public class TestRMWebServicesReservation extends JerseyTestBase {
long time = clock.getTime() + MINIMUM_RESOURCE_DURATION;
testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON, time, "res_1", 1);
testSubmissionReservationHelper("reservation/submit",
ReservationId id1 = getReservationIdTestHelper(1);
ReservationId id2 = getReservationIdTestHelper(2);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time, "res_1", id1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time + MINIMUM_RESOURCE_DURATION,
"res_2", 2);
"res_2", id2);
WebResource resource = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("start-time", Long.toString((long) (time +
@ -520,11 +603,14 @@ public class TestRMWebServicesReservation extends JerseyTestBase {
long time = clock.getTime() + MINIMUM_RESOURCE_DURATION;
testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON, time, "res_1", 1);
testSubmissionReservationHelper("reservation/submit",
ReservationId id1 = getReservationIdTestHelper(1);
ReservationId id2 = getReservationIdTestHelper(2);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time, "res_1", id1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time + MINIMUM_RESOURCE_DURATION,
"res_2", 2);
"res_2", id2);
WebResource resource = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("start-time", new Long((long) (time +
@ -556,11 +642,14 @@ public class TestRMWebServicesReservation extends JerseyTestBase {
long time = clock.getTime() + MINIMUM_RESOURCE_DURATION;
testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON, time, "res_1", 1);
testSubmissionReservationHelper("reservation/submit",
ReservationId id1 = getReservationIdTestHelper(1);
ReservationId id2 = getReservationIdTestHelper(2);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time, "res_1", id1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time + MINIMUM_RESOURCE_DURATION,
"res_2", 2);
"res_2", id2);
WebResource resource = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("start-time", "-1")
@ -592,13 +681,16 @@ public class TestRMWebServicesReservation extends JerseyTestBase {
rm.start();
setupCluster(100);
ReservationId id1 = getReservationIdTestHelper(1);
ReservationId id2 = getReservationIdTestHelper(2);
long time = clock.getTime() + MINIMUM_RESOURCE_DURATION;
testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON, time, "res_1", 1);
testSubmissionReservationHelper("reservation/submit",
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time, "res_1", id1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, time + MINIMUM_RESOURCE_DURATION,
"res_2", 2);
"res_2", id2);
WebResource resource = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("end-time", new Long((long)(time +
@ -629,10 +721,13 @@ public class TestRMWebServicesReservation extends JerseyTestBase {
rm.start();
setupCluster(100);
testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_1", 1);
testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_2", 2);
ReservationId id1 = getReservationIdTestHelper(1);
ReservationId id2 = getReservationIdTestHelper(2);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_1", id1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_2", id2);
WebResource resource = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("queue", DEFAULT_QUEUE);
@ -656,10 +751,13 @@ public class TestRMWebServicesReservation extends JerseyTestBase {
rm.start();
setupCluster(100);
testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_1", 1);
testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_2", 2);
ReservationId id1 = getReservationIdTestHelper(1);
ReservationId id2 = getReservationIdTestHelper(2);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_1", id1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_2", id2);
WebResource resource = constructWebResource(LIST_RESERVATION_PATH);
@ -673,10 +771,13 @@ public class TestRMWebServicesReservation extends JerseyTestBase {
rm.start();
setupCluster(100);
testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_1", 1);
testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_2", 2);
ReservationId id1 = getReservationIdTestHelper(1);
ReservationId id2 = getReservationIdTestHelper(2);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_1", id1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_2", id2);
WebResource resource = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("queue", DEFAULT_QUEUE + "_invalid");
@ -691,10 +792,15 @@ public class TestRMWebServicesReservation extends JerseyTestBase {
rm.start();
setupCluster(100);
ReservationId id1 = testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_1", 1);
testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_2", 2);
ReservationId id1 = getReservationIdTestHelper(1);
ReservationId id2 = getReservationIdTestHelper(2);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_1", id1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_1", id1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_2", id2);
WebResource resource = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("include-resource-allocations", "true")
@ -726,8 +832,10 @@ public class TestRMWebServicesReservation extends JerseyTestBase {
rm.start();
setupCluster(100);
ReservationId id1 = testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_1", 1);
ReservationId id1 = getReservationIdTestHelper(1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_1", id1);
WebResource resource = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("queue", DEFAULT_QUEUE);
@ -747,8 +855,9 @@ public class TestRMWebServicesReservation extends JerseyTestBase {
rm.start();
setupCluster(100);
ReservationId id1 = testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_1", 1);
ReservationId id1 = getReservationIdTestHelper(1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_1", id1);
WebResource resource = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("include-resource-allocations", "true")
@ -781,8 +890,10 @@ public class TestRMWebServicesReservation extends JerseyTestBase {
rm.start();
setupCluster(100);
ReservationId id1 = testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_1", 1);
ReservationId id1 = getReservationIdTestHelper(1);
reservationSubmissionTestHelper("reservation/submit",
MediaType.APPLICATION_JSON, clock.getTime(), "res_1", id1);
WebResource resource = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("include-resource-allocations", "false")
@ -818,40 +929,76 @@ public class TestRMWebServicesReservation extends JerseyTestBase {
rm.registerNode("127.0.0." + i + ":1234", 100 * 1024);
amNodeManager.nodeHeartbeat(true);
}
ReservationId rid =
testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON);
if (this.isAuthenticationEnabled()) {
assertNotNull(rid);
}
ReservationId rid = getReservationIdTestHelper(1);
reservationSubmissionTestHelper("reservation/submit", MediaType
.APPLICATION_JSON, rid);
testDeleteReservationHelper("reservation/delete", rid,
MediaType.APPLICATION_JSON);
rm.stop();
}
private ReservationId testSubmissionReservationHelper(String path,
String media) throws Exception {
/**
* This method is used when a ReservationId is required. Attempt to use REST
* API. If authentication is not enabled, ensure that the response status is
* unauthorized and generate a ReservationId because downstream components
* require a ReservationId for testing.
* @param fallbackReservationId the ReservationId to use if authentication
* is not enabled, causing the getNewReservation
* API to fail.
* @return the object representing the reservation ID.
*/
private ReservationId getReservationIdTestHelper(int fallbackReservationId)
throws Exception {
Thread.sleep(1000);
ClientResponse response = constructWebResource(GET_NEW_RESERVATION_PATH)
.type(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class);
if (!this.isAuthenticationEnabled()) {
assertEquals(Status.UNAUTHORIZED, response.getClientResponseStatus());
return ReservationId.newInstance(clock.getTime(), fallbackReservationId);
}
System.out.println("RESPONSE:" + response);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
ReservationId rid = null;
try {
rid = ReservationId.parseReservationId(json.getString("reservation-id"));
} catch (JSONException j) {
// failure is possible and is checked outside
}
return rid;
}
private ClientResponse reservationSubmissionTestHelper(String path,
String media, ReservationId reservationId) throws Exception {
long arrival = clock.getTime() + MINIMUM_RESOURCE_DURATION;
return testSubmissionReservationHelper(path, media, arrival, "res_1", 1);
return reservationSubmissionTestHelper(path, media, arrival, "res_1",
reservationId);
}
private ReservationId testSubmissionReservationHelper(String path,
String media, Long arrival, String reservationName, int expectedId)
throws Exception {
private ClientResponse reservationSubmissionTestHelper(String path,
String media, Long arrival, String reservationName,
ReservationId reservationId) throws Exception {
String reservationJson = loadJsonFile("submit-reservation.json");
String reservationJsonRequest = String.format(reservationJson, arrival,
arrival + MINIMUM_RESOURCE_DURATION, reservationName);
String reservationJsonRequest = String.format(reservationJson,
reservationId.toString(), arrival, arrival + MINIMUM_RESOURCE_DURATION,
reservationName);
return submitAndVerifyReservation(path, media, reservationJsonRequest,
expectedId);
return submitAndVerifyReservation(path, media, reservationJsonRequest);
}
private ReservationId submitAndVerifyReservation(String path, String media,
String reservationJson, int expectedId) throws Exception {
private ClientResponse submitAndVerifyReservation(String path, String media,
String reservationJson) throws Exception {
JSONJAXBContext jc =
new JSONJAXBContext(JSONConfiguration.mapped()
.build(), ReservationSubmissionRequestInfo.class);
@ -867,25 +1014,12 @@ public class TestRMWebServicesReservation extends JerseyTestBase {
if (!this.isAuthenticationEnabled()) {
assertEquals(Status.UNAUTHORIZED, response.getClientResponseStatus());
return null;
}
System.out.println("RESPONSE:" + response);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
ReservationId rid = null;
try {
rid = ReservationId.parseReservationId(json.getString("reservation-id"));
assertEquals("incorrect return value", rid.getId(), expectedId);
} catch (JSONException j) {
// failure is possible and is checked outside
}
return rid;
return response;
}
private void testUpdateReservationHelper(String path,
private void updateReservationTestHelper(String path,
ReservationId reservationId, String media) throws JSONException,
Exception {
@ -1002,6 +1136,25 @@ public class TestRMWebServicesReservation extends JerseyTestBase {
return response.getEntity(JSONObject.class);
}
private void verifyReservationCount(int count) throws Exception {
WebResource resource = constructWebResource(LIST_RESERVATION_PATH)
.queryParam("queue", DEFAULT_QUEUE);
JSONObject json = testListReservationHelper(resource);
if (count == 1) {
// If there are any number other than one reservation, this will throw.
json.getJSONObject("reservations");
} else {
JSONArray reservations = json.getJSONArray("reservations");
assertTrue(reservations.length() == count);
}
}
private boolean isHttpSuccessResponse(ClientResponse response) {
return (response.getStatus() / 100) == 2;
}
private void setupCluster(int nodes) throws Exception {
for (int i = 0; i < nodes; i++) {
MockNM amNodeManager =

View File

@ -1,5 +1,6 @@
{
"queue" : "dedicated",
"reservation-id" : "%s",
"reservation-definition" : {
"arrival" : %s,
"deadline" : %s,

View File

@ -39,7 +39,9 @@ Flow of a Reservation
With reference to the figure above, a typical reservation proceeds as follows:
* **Step 1** The user (or an automated tool on its behalf) submit a reservation request specified by the Reservation Definition Language (RDL). This describes the user need for resources over-time (e.g., a skyline of resources) and temporal constraints (e.g., deadline). This can be done both programmatically through the usual Client-to-RM protocols or via the REST api of the RM.
* **Step 0** The user (or an automated tool on its behalf) submits a reservation creation request, and receives a response containing the ReservationId.
* **Step 1** The user (or an automated tool on its behalf) submits a reservation request specified by the Reservation Definition Language (RDL) and ReservationId retrieved from the previous step. This describes the user need for resources over-time (e.g., a skyline of resources) and temporal constraints (e.g., deadline). This can be done both programmatically through the usual Client-to-RM protocols or via the REST api of the RM. If a reservation is submitted with the same ReservationId, and the RDL is the same, a new reservation will not be created and the request will be successful. If the RDL is different, the reservation will be rejected, and the request will be unsuccessful.
* **Step 2** The ReservationSystem leverages a ReservationAgent (GREE in the figure) to find a plausible allocation for the reservation in the Plan, a data structure tracking all reservation currently accepted and the available resources in the system.

View File

@ -34,6 +34,7 @@ ResourceManager REST API's.
* [Cluster Application Priority API](#Cluster_Application_Priority_API)
* [Cluster Delegation Tokens API](#Cluster_Delegation_Tokens_API)
* [Cluster Reservation API List](#Cluster_Reservation_API_List)
* [Cluster Reservation API Create](#Cluster_Reservation_API_Create)
* [Cluster Reservation API Submit](#Cluster_Reservation_API_Submit)
* [Cluster Reservation API Update](#Cluster_Reservation_API_Update)
* [Cluster Reservation API Delete](#Cluster_Reservation_API_Delete)
@ -3385,10 +3386,82 @@ Response Body:
</reservationListInfo>
```
Cluster Reservation API Create
---------------------------
Use the New Reservation API, to obtain a reservation-id which can then be used as part of the [Cluster Reservation API Submit](#Cluster_Reservation_API_Submit) to submit reservations.
This feature is currently in the alpha stage and may change in the future.
### URI
* http://<rm http address:port>/ws/v1/cluster/reservation/new-reservation
### HTTP Operations Supported
* POST
### Query Parameters Supported
None
### Elements of the new-reservation object
The new-reservation response contains the following elements:
| Item | Data Type | Description |
|:---- |:---- |:---- |
| reservation-id | string | The newly created reservation id |
### Response Examples
**JSON response**
HTTP Request:
POST http://<rm http address:port>/ws/v1/cluster/reservation/new-reservation
Response Header:
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
Server: Jetty(6.1.26)
Response Body:
```json
{
"reservation-id":"reservation_1404198295326_0003"
}
```
**XML response**
HTTP Request:
POST http://<rm http address:port>/ws/v1/cluster/reservation/new-reservation
Response Header:
HTTP/1.1 200 OK
Content-Type: application/xml
Content-Length: 248
Server: Jetty(6.1.26)
Response Body:
```xml
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<new-reservation>
<reservation-id>reservation_1404198295326_0003</reservation-id>
</new-reservation>
```
Cluster Reservation API Submit
------------------------------
The Cluster Reservation API can be used to submit reservations.When submitting a reservation the user specify the constraints in terms of resources, and time that are required, the resulting page returns a reservation-id that the user can use to get access to the resources by specifying it as part of [Cluster Submit Applications API](#Cluster_Applications_APISubmit_Application).
The Cluster Reservation API can be used to submit reservations. When submitting a reservation the user specifies the constraints in terms of resources, and time that is required. The resulting response is successful if the reservation can be made. If a reservation-id is used to submit a reservation multiple times, the request will succeed if the reservation definition is the same, but only one reservation will be created. If the reservation definition is different, the server will respond with an error response. When the reservation is made, the user can use the reservation-id used to submit the reservation to get access to the resources by specifying it as part of [Cluster Submit Applications API](#Cluster_Applications_APISubmit_Application).
### URI
@ -3410,6 +3483,7 @@ Please note that this feature is currently in the alpha stage and may change in
|:---- |:---- |:---- |
| queue | string | The (reservable) queue you are submitting to|
| reservation-definition | object | A set of constraints representing the need for resources over time of a user. |
| reservation-id | string | The reservation id to use to submit the reservation. |
Elements of the *reservation-definition* object
@ -3456,6 +3530,7 @@ POST http://rmdns:8088/ws/v1/cluster/reservation/submit
Content-Type: application/json
{
"queue" : "dedicated",
"reservation-id":"reservation_1404198295326_0003"
"reservation-definition" : {
"arrival" : 1765541532000,
"deadline" : 1765542252000,
@ -3501,9 +3576,7 @@ Server: Jetty(6.1.26)
Response Body:
```json
{"reservation-id":"reservation_1448064217915_0009"}
```
No response body
**XML response**
@ -3515,6 +3588,7 @@ Accept: application/xml
Content-Type: application/xml
<reservation-submission-context>
<queue>dedicated</queue>
<reservation-id>reservation_1404198295326_0003</reservation-id>
<reservation-definition>
<arrival>1765541532000</arrival>
<deadline>1765542252000</deadline>
@ -3558,13 +3632,7 @@ Server: Jetty(6.1.26)
Response Body:
```xml
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<reservation-submission-response>
<reservation-id>reservation_1448064217915_0010</reservation-id>
</reservation-submission-response>
```
No response body
Cluster Reservation API Update
------------------------------