YARN-4340. Add list API to reservation system. (Sean Po via wangda)

(cherry picked from commit 9875325d5c)
This commit is contained in:
Wangda Tan 2016-02-02 10:17:33 +08:00
parent 9862879e27
commit c487453b91
43 changed files with 2702 additions and 206 deletions

View File

@ -46,6 +46,8 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
@ -451,6 +453,11 @@ public class ResourceMgrDelegate extends YarnClient {
return client.deleteReservation(request);
}
@Override
public ReservationListResponse listReservations(
ReservationListRequest request) throws YarnException, IOException {
return client.listReservations(request);
}
@Override
public Map<NodeId, Set<NodeLabel>> getNodeToLabels() throws YarnException,
IOException {

View File

@ -110,6 +110,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
@ -439,6 +441,12 @@ public class TestClientRedirect {
return null;
}
@Override
public ReservationListResponse listReservations(
ReservationListRequest request) throws YarnException, IOException {
return null;
}
@Override
public GetNodesToLabelsResponse getNodeToLabels(
GetNodesToLabelsRequest request) throws YarnException, IOException {

View File

@ -590,6 +590,8 @@ Release 2.8.0 - UNRELEASED
YARN-4371. "yarn application -kill" should take multiple application ids
(Sunil G via jlowe)
YARN-4340. Add "list" API to reservation system. (Sean Po via wangda)
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -49,6 +49,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesReq
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
@ -398,13 +400,58 @@ public interface ApplicationClientProtocol extends ApplicationBaseProtocol {
* @throws YarnException if the request is invalid or reservation cannot be
* deleted successfully
* @throws IOException
*
*
*/
@Public
@Unstable
public ReservationDeleteResponse deleteReservation(
ReservationDeleteRequest request) throws YarnException, IOException;
/**
* <p>
* The interface used by clients to get the list of reservations in a plan.
* The reservationId will be used to search for reservations to list if it is
* provided. Otherwise, it will select active reservations within the
* startTime and endTime (inclusive).
* </p>
*
* @param request to list reservations in a plan. Contains fields to select
* String queue, ReservationId reservationId, long startTime,
* long endTime, and a bool includeReservationAllocations.
*
* queue: Required. Cannot be null or empty. Refers to the
* reservable queue in the scheduler that was selected when
* creating a reservation submission
* {@link ReservationSubmissionRequest}.
*
* reservationId: Optional. If provided, other fields will
* be ignored.
*
* startTime: Optional. If provided, only reservations that
* end after the startTime will be selected. This defaults
* to 0 if an invalid number is used.
*
* endTime: Optional. If provided, only reservations that
* start on or before endTime will be selected. This defaults
* to Long.MAX_VALUE if an invalid number is used.
*
* includeReservationAllocations: Optional. Flag that
* determines whether the entire reservation allocations are
* to be returned. Reservation allocations are subject to
* change in the event of re-planning as described by
* {@code ReservationDefinition}.
*
* @return response that contains information about reservations that are
* being searched for.
* @throws YarnException if the request is invalid
* @throws IOException on IO failures
*
*/
@Public
@Unstable
ReservationListResponse listReservations(
ReservationListRequest request) throws YarnException, IOException;
/**
* <p>
* The interface used by client to get node to labels mappings in existing cluster

View File

@ -0,0 +1,228 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
/**
* {@link ReservationListRequest} captures the set of requirements the
* user has to list reservations.
*/
@Public
@Unstable
public abstract class ReservationListRequest {
/**
* The {@link ReservationListRequest} will use the reservationId to search for
* reservations to list if it is provided. Otherwise, it will select active
* reservations within the startTime and endTime (inclusive).
*
* @param queue Required. Cannot be null or empty. Refers to the reservable
* queue in the scheduler that was selected when creating a
* reservation submission {@link ReservationSubmissionRequest}.
* @param reservationId Optional. String representation of
* {@code ReservationId} If provided, other fields will
* be ignored.
* @param startTime Optional. If provided, only reservations that
* end after the startTime will be selected. This defaults
* to 0 if an invalid number is used.
* @param endTime Optional. If provided, only reservations that
* start on or before endTime will be selected. This defaults
* to Long.MAX_VALUE if an invalid number is used.
* @param includeReservationAllocations Optional. Flag that
* determines whether the entire reservation allocations are
* to be returned. Reservation allocations are subject to
* change in the event of re-planning as described by
* {@code ReservationDefinition}.
*/
@Public
@Unstable
public static ReservationListRequest newInstance(
String queue, String reservationId, long startTime, long endTime,
boolean includeReservationAllocations) {
ReservationListRequest request =
Records.newRecord(ReservationListRequest.class);
request.setQueue(queue);
request.setReservationId(reservationId);
request.setStartTime(startTime);
request.setEndTime(endTime);
request.setIncludeResourceAllocations(includeReservationAllocations);
return request;
}
/**
* The {@link ReservationListRequest} will use the reservationId to search for
* reservations to list if it is provided. Otherwise, it will select active
* reservations within the startTime and endTime (inclusive).
*
* @param queue Required. Cannot be null or empty. Refers to the reservable
* queue in the scheduler that was selected when creating a
* reservation submission {@link ReservationSubmissionRequest}.
* @param reservationId Optional. String representation of
* {@code ReservationId} If provided, other fields will
* be ignored.
* @param includeReservationAllocations Optional. Flag that
* determines whether the entire reservation allocations are
* to be returned. Reservation allocations are subject to
* change in the event of re-planning as described by
* {@code ReservationDefinition}.
*/
@Public
@Unstable
public static ReservationListRequest newInstance(
String queue, String reservationId, boolean
includeReservationAllocations) {
return newInstance(queue, reservationId, -1, -1,
includeReservationAllocations);
}
/**
* The {@link ReservationListRequest} will use the reservationId to search for
* reservations to list if it is provided. Otherwise, it will select active
* reservations within the startTime and endTime (inclusive).
*
* @param queue Required. Cannot be null or empty. Refers to the reservable
* queue in the scheduler that was selected when creating a
* reservation submission {@link ReservationSubmissionRequest}.
* @param reservationId Optional. String representation of
* {@code ReservationId} If provided, other fields will
* be ignored.
*/
@Public
@Unstable
public static ReservationListRequest newInstance(
String queue, String reservationId) {
return newInstance(queue, reservationId, -1, -1, false);
}
/**
* Get queue name to use to find reservations.
*
* @return the queue name to use to find reservations.
*/
@Public
@Unstable
public abstract String getQueue();
/**
* Set queue name to use to find resource allocations.
*
* @param queue Required. Cannot be null or empty.
*/
@Public
@Unstable
public abstract void setQueue(String queue);
/**
* Get the reservation id to use to find a reservation.
*
* @return the reservation id of the reservation.
*/
@Public
@Unstable
public abstract String getReservationId();
/**
* Set the reservation id to use to find a reservation.
*
* @param reservationId Optional. String representation of
* {@code ReservationId} If provided, other fields will
* be ignored.
*/
@Public
@Unstable
public abstract void setReservationId(String reservationId);
/**
* Get the start time to use to search for reservations.
* When this is set, reservations that start before this start
* time are ignored.
*
* @return the start time to use to search for reservations.
*/
@Public
@Unstable
public abstract long getStartTime();
/**
* Set the start time to use to search for reservations.
* When this is set, reservations that start before this start
* time are ignored.
*
* @param startTime Optional. If provided, only reservations that
* end after the startTime will be selected. This defaults
* to 0 if an invalid number is used.
*/
@Public
@Unstable
public abstract void setStartTime(long startTime);
/**
* Get the end time to use to search for reservations.
* When this is set, reservations that start after this end
* time are ignored.
*
* @return the end time to use to search for reservations.
*/
@Public
@Unstable
public abstract long getEndTime();
/**
* Set the end time to use to search for reservations.
* When this is set, reservations that start after this end
* time are ignored.
*
* @param endTime Optional. If provided, only reservations that
* start before endTime will be selected. This defaults
* to Long.MAX_VALUE if an invalid number is used.
*/
@Public
@Unstable
public abstract void setEndTime(long endTime);
/**
* Get the boolean representing whether or not the user
* is requesting the full resource allocation.
* If this is true, the full resource allocation will
* be included in the response.
*
* @return the end time to use to search for reservations.
*/
@Public
@Unstable
public abstract boolean getIncludeResourceAllocations();
/**
* Set the boolean representing whether or not the user
* is requesting the full resource allocation.
* If this is true, the full resource allocation will
* be included in the response.
*
* @param includeReservationAllocations Optional. Flag that
* determines whether the entire list of
* {@code ResourceAllocationRequest} will be returned.
*/
@Public
@Unstable
public abstract void setIncludeResourceAllocations(
boolean includeReservationAllocations);
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ReservationAllocationState;
import org.apache.hadoop.yarn.util.Records;
import java.util.List;
/**
* {@link ReservationListResponse} captures the list of reservations that the
* user has queried.
*
* The resulting list of {@link ReservationAllocationState} contains a list of
* {@code ResourceAllocationRequest} representing the current state of the
* reservation resource allocations will be returned. This is subject to change
* in the event of re-planning a described by {@code ReservationDefinition}
*
* @see ReservationAllocationState
*
*/
@Public
@Unstable
public abstract class ReservationListResponse {
@Private
@Unstable
public static ReservationListResponse newInstance(
List<ReservationAllocationState> reservationAllocationState) {
ReservationListResponse response =
Records.newRecord(ReservationListResponse.class);
response.setReservationAllocationState(reservationAllocationState);
return response;
}
/**
* Get the list of {@link ReservationAllocationState}, that corresponds
* to a reservation in the scheduler.
*
* @return the list of {@link ReservationAllocationState} which holds
* information of a particular reservation
*/
@Public
@Unstable
public abstract List<ReservationAllocationState>
getReservationAllocationState();
/**
* Set the list of {@link ReservationAllocationState}, that correspond
* to a reservation in the scheduler.
*
* @param reservationAllocationState the list of
* {@link ReservationAllocationState} which holds information of a
* particular reservation.
*/
@Private
@Unstable
public abstract void setReservationAllocationState(
List<ReservationAllocationState> reservationAllocationState);
}

View File

@ -0,0 +1,191 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
import java.util.List;
/**
* {@code ReservationAllocationState} represents the reservation that is
* made by a user.
* <p>
* It includes:
* <ul>
* <li>Duration of the reservation.</li>
* <li>Acceptance time of the duration.</li>
* <li>
* List of {@link ResourceAllocationRequest}, which includes the time
* interval, and capability of the allocation.
* {@code ResourceAllocationRequest} represents an allocation
* made for a reservation for the current state of the queue. This can be
* changed for reasons such as re-planning, but will always be subject to
* the constraints of the user contract as described by
* {@link ReservationDefinition}
* </li>
* <li>{@link ReservationId} of the reservation.</li>
* <li>{@link ReservationDefinition} used to make the reservation.</li>
* </ul>
*
* @see ResourceAllocationRequest
* @see ReservationId
* @see ReservationDefinition
*/
@Public
@Stable
public abstract class ReservationAllocationState {
/**
*
* @param acceptanceTime The acceptance time of the reservation.
* @param user The username of the user who made the reservation.
* @param resourceAllocations List of {@link ResourceAllocationRequest}
* representing the current state of the
* reservation resource allocations. This is
* subject to change in the event of re-planning.
* @param reservationId {@link ReservationId } of the reservation being
* listed.
* @param reservationDefinition {@link ReservationDefinition} used to make
* the reservation.
* @return {@code ReservationAllocationState} that represents the state of
* the reservation.
*/
@Public
@Stable
public static ReservationAllocationState newInstance(long acceptanceTime,
String user, List<ResourceAllocationRequest> resourceAllocations,
ReservationId reservationId,
ReservationDefinition reservationDefinition) {
ReservationAllocationState ri = Records.newRecord(
ReservationAllocationState.class);
ri.setAcceptanceTime(acceptanceTime);
ri.setUser(user);
ri.setResourceAllocationRequests(resourceAllocations);
ri.setReservationId(reservationId);
ri.setReservationDefinition(reservationDefinition);
return ri;
}
/**
* Get the acceptance time of the reservation.
*
* @return the time that the reservation was accepted.
*/
@Public
@Unstable
public abstract long getAcceptanceTime();
/**
* Set the time that the reservation was accepted.
*
* @param acceptanceTime The acceptance time of the reservation.
*/
@Private
@Unstable
public abstract void setAcceptanceTime(long acceptanceTime);
/**
* Get the user who made the reservation.
*
* @return the name of the user who made the reservation.
*/
@Public
@Unstable
public abstract String getUser();
/**
* Set the user who made the reservation.
*
* @param user The username of the user who made the reservation.
*/
@Private
@Unstable
public abstract void setUser(String user);
/**
* Get the Resource allocations of the reservation based on the current state
* of the plan. This is subject to change in the event of re-planning.
* The allocations will be constraint to the user contract as described by
* the {@link ReservationDefinition}
*
* @return a list of resource allocations for the reservation.
*/
@Public
@Unstable
public abstract List<ResourceAllocationRequest>
getResourceAllocationRequests();
/**
* Set the list of resource allocations made for the reservation.
*
* @param resourceAllocations List of {@link ResourceAllocationRequest}
* representing the current state of the
* reservation resource allocations. This is
* subject to change in the event of re-planning.
*/
@Private
@Unstable
public abstract void setResourceAllocationRequests(
List<ResourceAllocationRequest> resourceAllocations);
/**
* Get the id of the reservation.
*
* @return the reservation id corresponding to the reservation.
*/
@Public
@Unstable
public abstract ReservationId getReservationId();
/**
* Set the id corresponding to the reservation.
* `
* @param reservationId {@link ReservationId } of the reservation being
* listed.
*/
@Private
@Unstable
public abstract void setReservationId(ReservationId reservationId);
/**
* Get the reservation definition used to make the reservation.
*
* @return the reservation definition used to make the reservation.
*/
@Public
@Unstable
public abstract ReservationDefinition getReservationDefinition();
/**
* Set the definition of the reservation.
*
* @param reservationDefinition {@link ReservationDefinition} used to make
* the reservation.
*/
@Private
@Unstable
public abstract void setReservationDefinition(ReservationDefinition
reservationDefinition);
}

View File

@ -0,0 +1,123 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.util.Records;
/**
* {@code ResourceAllocationRequest} represents an allocation
* made for a reservation for the current state of the plan. This can be
* changed for reasons such as re-planning, but will always be subject to the
* constraints of the user contract as described by
* {@link ReservationDefinition}
* {@link Resource}
*
* <p>
* It includes:
* <ul>
* <li>StartTime of the allocation.</li>
* <li>EndTime of the allocation.</li>
* <li>{@link Resource} reserved for the allocation.</li>
* </ul>
*
* @see Resource
*/
@Public
@Stable
public abstract class ResourceAllocationRequest {
/**
* @param startTime The start time that the capability is reserved for.
* @param endTime The end time that the capability is reserved for.
* @param capability {@link Resource} representing the capability of the
* resource allocation.
* @return {ResourceAllocationRequest} which represents the capability of
* the resource allocation for a time interval.
*/
@Public
@Stable
public static ResourceAllocationRequest newInstance(long startTime,
long endTime, Resource capability) {
ResourceAllocationRequest ra = Records.newRecord(
ResourceAllocationRequest.class);
ra.setEndTime(endTime);
ra.setStartTime(startTime);
ra.setCapability(capability);
return ra;
}
/**
* Get the start time that the resource is allocated.
*
* @return the start time that the resource is allocated.
*/
@Public
@Unstable
public abstract long getStartTime();
/**
* Set the start time that the resource is allocated.
*
* @param startTime The start time that the capability is reserved for.
*/
@Private
@Unstable
public abstract void setStartTime(long startTime);
/**
* Get the end time that the resource is allocated.
*
* @return the end time that the resource is allocated.
*/
@Public
@Unstable
public abstract long getEndTime();
/**
* Set the end time that the resource is allocated.
*
* @param endTime The end time that the capability is reserved for.
*/
@Private
@Unstable
public abstract void setEndTime(long endTime);
/**
* Get the allocated resource.
*
* @return the allocated resource.
*/
@Public
@Unstable
public abstract Resource getCapability();
/**
* Set the allocated resource.
*
* @param resource {@link Resource} representing the capability of the
* resource allocation.
*/
@Private
@Unstable
public abstract void setCapability(Resource resource);
}

View File

@ -53,6 +53,7 @@ service ApplicationClientProtocolService {
rpc submitReservation (ReservationSubmissionRequestProto) returns (ReservationSubmissionResponseProto);
rpc updateReservation (ReservationUpdateRequestProto) returns (ReservationUpdateResponseProto);
rpc deleteReservation (ReservationDeleteRequestProto) returns (ReservationDeleteResponseProto);
rpc listReservations (ReservationListRequestProto) returns (ReservationListResponseProto);
rpc getNodeToLabels (GetNodesToLabelsRequestProto) returns (GetNodesToLabelsResponseProto);
rpc getLabelsToNodes (GetLabelsToNodesRequestProto) returns (GetLabelsToNodesResponseProto);
rpc getClusterNodeLabels (GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto);

View File

@ -469,6 +469,23 @@ message ReservationDefinitionProto {
optional string reservation_name = 4;
}
message ResourceAllocationRequestProto {
optional int64 start_time = 1;
optional int64 end_time = 2;
optional ResourceProto resource = 3;
}
message ReservationAllocationStateProto {
optional ReservationDefinitionProto reservation_definition = 1;
repeated ResourceAllocationRequestProto allocation_requests = 2;
optional int64 start_time = 3;
optional int64 end_time = 4;
optional string user = 5;
optional bool contains_gangs = 6;
optional int64 acceptance_time = 7;
optional ReservationIdProto reservation_id = 8;
}
enum ReservationRequestInterpreterProto {
R_ANY = 0;
R_ALL = 1;

View File

@ -390,6 +390,18 @@ message ReservationDeleteRequestProto {
message ReservationDeleteResponseProto {
}
message ReservationListRequestProto {
optional string queue = 1;
optional string reservation_id = 3;
optional int64 start_time = 4;
optional int64 end_time = 5;
optional bool include_resource_allocations = 6;
}
message ReservationListResponseProto {
repeated ReservationAllocationStateProto reservations = 1;
}
//////////////////////////////////////////////////////
/////// SCM_Admin_Protocol //////////////////////////
//////////////////////////////////////////////////////

View File

@ -34,6 +34,8 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
@ -626,7 +628,52 @@ public abstract class YarnClient extends AbstractService {
@Unstable
public abstract ReservationDeleteResponse deleteReservation(
ReservationDeleteRequest request) throws YarnException, IOException;
/**
* <p>
* The interface used by clients to get the list of reservations in a plan.
* The reservationId will be used to search for reservations to list if it is
* provided. Otherwise, it will select active reservations within the
* startTime and endTime (inclusive).
* </p>
*
* @param request to list reservations in a plan. Contains fields to select
* String queue, ReservationId reservationId, long startTime,
* long endTime, and a bool includeReservationAllocations.
*
* queue: Required. Cannot be null or empty. Refers to the
* reservable queue in the scheduler that was selected when
* creating a reservation submission
* {@link ReservationSubmissionRequest}.
*
* reservationId: Optional. If provided, other fields will
* be ignored.
*
* startTime: Optional. If provided, only reservations that
* end after the startTime will be selected. This defaults
* to 0 if an invalid number is used.
*
* endTime: Optional. If provided, only reservations that
* start on or before endTime will be selected. This defaults
* to Long.MAX_VALUE if an invalid number is used.
*
* includeReservationAllocations: Optional. Flag that
* determines whether the entire reservation allocations are
* to be returned. Reservation allocations are subject to
* change in the event of re-planning as described by
* {@link ReservationDefinition}.
*
* @return response that contains information about reservations that are
* being searched for.
* @throws YarnException if the request is invalid
* @throws IOException
*
*/
@Public
@Unstable
public abstract ReservationListResponse listReservations(
ReservationListRequest request) throws YarnException, IOException;
/**
* <p>
* The interface used by client to get node to labels mappings in existing cluster

View File

@ -73,6 +73,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
@ -803,7 +805,13 @@ public class YarnClientImpl extends YarnClient {
ReservationDeleteRequest request) throws YarnException, IOException {
return rmClient.deleteReservation(request);
}
@Override
public ReservationListResponse listReservations(
ReservationListRequest request) throws YarnException, IOException {
return rmClient.listReservations(request);
}
@Override
public Map<NodeId, Set<NodeLabel>> getNodeToLabels() throws YarnException,
IOException {

View File

@ -73,6 +73,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
@ -1233,6 +1235,163 @@ public class TestYarnClient {
Assert.assertNotNull(sResponse);
System.out.println("Update reservation response: " + uResponse);
// List reservations, search by reservation ID
ReservationListRequest request =
ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ,
reservationID.toString(), -1, -1, false);
ReservationListResponse response = null;
try {
response = client.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getResourceAllocationRequests().size(), 0);
// List reservations, search by time interval.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", arrival +
duration/2, arrival + duration/2, true);
response = null;
try {
response = client.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
// List reservations, search by invalid end time == -1.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 1, -1,
true);
response = null;
try {
response = client.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
// List reservations, search by invalid end time < -1.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 1, -10,
true);
response = null;
try {
response = client.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
// List reservations, search by time within reservation interval.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 1, Long.MAX_VALUE,
true);
response = null;
try {
response = client.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
// Verify that the full resource allocations exist.
Assert.assertTrue(response.getReservationAllocationState().get(0)
.getResourceAllocationRequests().size() > 0);
// Verify that the full RDL is returned.
ReservationRequests reservationRequests = response
.getReservationAllocationState().get(0)
.getReservationDefinition().getReservationRequests();
Assert.assertTrue(reservationRequests.getInterpreter().toString()
.equals("R_ALL"));
Assert.assertTrue(reservationRequests.getReservationResources().get(0)
.getDuration() == duration);
// List reservations, search by very large start time.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", Long.MAX_VALUE,
-1, false);
response = null;
try {
response = client.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
// List reservations, search by start time after the reservation
// end time.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", deadline + duration,
deadline + 2 * duration, false);
response = null;
try {
response = client.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
// Ensure all reservations are filtered out.
Assert.assertNotNull(response);
Assert.assertEquals(response.getReservationAllocationState().size(), 0);
// List reservations, search by end time before the reservation start
// time.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 0, arrival -
duration, false);
response = null;
try {
response = client.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
// Ensure all reservations are filtered out.
Assert.assertNotNull(response);
Assert.assertEquals(response.getReservationAllocationState().size(), 0);
// List reservations, search by very small end time.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 0, 1, false);
response = null;
try {
response = client.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
// Ensure all reservations are filtered out.
Assert.assertNotNull(response);
Assert.assertEquals(response.getReservationAllocationState().size(), 0);
// Delete the reservation
ReservationDeleteRequest dRequest =
ReservationDeleteRequest.newInstance(reservationID);
@ -1244,6 +1403,20 @@ public class TestYarnClient {
}
Assert.assertNotNull(sResponse);
System.out.println("Delete reservation response: " + dResponse);
// List reservations, search by non-existent reservationID
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ,
reservationID.toString(), -1, -1, false);
response = null;
try {
response = client.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertEquals(0, response.getReservationAllocationState().size());
} finally {
// clean-up
if (client != null) {

View File

@ -73,6 +73,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
@ -125,6 +127,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenRe
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationDeleteRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationDeleteResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationListRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationListResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateRequestPBImpl;
@ -490,6 +494,19 @@ public class ApplicationClientProtocolPBClientImpl implements ApplicationClientP
}
}
@Override
public ReservationListResponse listReservations(ReservationListRequest
request) throws YarnException, IOException {
YarnServiceProtos.ReservationListRequestProto requestProto =
((ReservationListRequestPBImpl) request).getProto();
try {
return new ReservationListResponsePBImpl(proxy.listReservations(null,
requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
@Override
public GetNodesToLabelsResponse getNodeToLabels(

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
@ -97,10 +98,14 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenRe
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationDeleteRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationDeleteResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationListRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationListResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
@ -147,11 +152,11 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionReque
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationListRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationListResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto;
@ -488,6 +493,21 @@ public class ApplicationClientProtocolPBServiceImpl implements ApplicationClient
}
}
@Override
public ReservationListResponseProto listReservations(RpcController controller,
ReservationListRequestProto requestProto) throws ServiceException {
ReservationListRequestPBImpl request =
new ReservationListRequestPBImpl(requestProto);
try {
ReservationListResponse response = real.listReservations(request);
return ((ReservationListResponsePBImpl) response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public GetNodesToLabelsResponseProto getNodeToLabels(
RpcController controller, GetNodesToLabelsRequestProto proto)

View File

@ -0,0 +1,178 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import com.google.protobuf.TextFormat;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.proto.YarnServiceProtos
.ReservationListRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos
.ReservationListRequestProtoOrBuilder;
/**
* {@link ReservationListRequestPBImpl} implements the {@link
* ReservationListRequest} abstract class which captures the set of requirements
* the user has to list reservations.
*
* @see ReservationListRequest
*/
public class ReservationListRequestPBImpl extends
ReservationListRequest {
private ReservationListRequestProto proto = ReservationListRequestProto
.getDefaultInstance();
private ReservationListRequestProto.Builder builder = null;
private boolean viaProto = false;
public ReservationListRequestPBImpl() {
builder = ReservationListRequestProto.newBuilder();
}
public ReservationListRequestPBImpl(
ReservationListRequestProto proto) {
this.proto = proto;
viaProto = true;
}
public ReservationListRequestProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public String getQueue() {
ReservationListRequestProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasQueue()) {
return null;
}
return (p.getQueue());
}
@Override
public void setQueue(String queue) {
maybeInitBuilder();
if (queue == null) {
builder.clearQueue();
return;
}
builder.setQueue(queue);
}
@Override
public String getReservationId() {
ReservationListRequestProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasReservationId()) {
return null;
}
return (p.getReservationId());
}
@Override
public void setReservationId(String reservationId) {
maybeInitBuilder();
if (reservationId == null) {
builder.clearReservationId();
return;
}
builder.setReservationId(reservationId);
}
@Override
public long getStartTime() {
ReservationListRequestProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasStartTime()) {
return 0;
}
return (p.getStartTime());
}
@Override
public void setStartTime(long startTime) {
maybeInitBuilder();
if (startTime <= 0) {
builder.clearStartTime();
return;
}
builder.setStartTime(startTime);
}
@Override
public long getEndTime() {
ReservationListRequestProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasEndTime()) {
return Long.MAX_VALUE;
}
return (p.getEndTime());
}
@Override
public void setEndTime(long endTime) {
maybeInitBuilder();
if (endTime < 0) {
builder.setEndTime(Long.MAX_VALUE);
return;
}
builder.setEndTime(endTime);
}
@Override
public boolean getIncludeResourceAllocations() {
ReservationListRequestProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasIncludeResourceAllocations()) {
return false;
}
return (p.getIncludeResourceAllocations());
}
@Override
public void setIncludeResourceAllocations(boolean
includeReservationAllocations) {
maybeInitBuilder();
builder.setIncludeResourceAllocations(includeReservationAllocations);
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = ReservationListRequestProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
}

View File

@ -0,0 +1,157 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import com.google.protobuf.TextFormat;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.records.ReservationAllocationState;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationAllocationStatePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationListResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationListResponseProtoOrBuilder;
import java.util.ArrayList;
import java.util.List;
/**
* {@link ReservationListResponsePBImpl} is the implementation of the
* {@link ReservationListResponse} which captures the list of reservations
* that the user has queried.
*/
public class ReservationListResponsePBImpl extends
ReservationListResponse {
private ReservationListResponseProto proto = ReservationListResponseProto
.getDefaultInstance();
private ReservationListResponseProto.Builder builder = null;
private boolean viaProto = false;
private List<ReservationAllocationState> reservations;
public ReservationListResponsePBImpl() {
builder = ReservationListResponseProto.newBuilder();
}
public ReservationListResponsePBImpl(
ReservationListResponseProto proto) {
this.proto = proto;
viaProto = true;
}
public ReservationListResponseProto getProto() {
if (viaProto) {
mergeLocalToProto();
} else {
proto = builder.build();
}
viaProto = true;
return proto;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = ReservationListResponseProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public List<ReservationAllocationState> getReservationAllocationState() {
initReservations();
mergeLocalToProto();
return this.reservations;
}
@Override
public void setReservationAllocationState(List<ReservationAllocationState>
newReservations) {
if (newReservations == null) {
builder.clearReservations();
return;
}
reservations = newReservations;
mergeLocalToProto();
}
private void mergeLocalToBuilder() {
if (this.reservations != null) {
int size = reservations.size();
builder.clearReservations();
for (int i = 0; i < size; i++) {
builder.addReservations(i, convertToProtoFormat(
reservations.get(i)
));
}
}
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private ReservationAllocationStatePBImpl convertFromProtoFormat(
ReservationAllocationStateProto p) {
return new ReservationAllocationStatePBImpl(p);
}
private ReservationAllocationStateProto convertToProtoFormat(
ReservationAllocationState r) {
return ((ReservationAllocationStatePBImpl)r).getProto();
}
private void initReservations() {
if (this.reservations != null) {
return;
}
ReservationListResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ReservationAllocationStateProto> reservationProtos =
p.getReservationsList();
reservations = new ArrayList<>();
for (ReservationAllocationStateProto r : reservationProtos) {
reservations.add(convertFromProtoFormat(r));
}
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
}

View File

@ -0,0 +1,288 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationDefinitionProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceAllocationRequestProto;
import java.util.ArrayList;
import java.util.List;
/**
* {@code ReservationAllocationStatePBImpl} implements the {@link
* ReservationAllocationState} that represents the reservation that is
* made by a user.
*
* <p>
* It includes:
* <ul>
* <li>Duration of the reservation.</li>
* <li>Acceptance time of the duration.</li>
* <li>
* List of {@link ResourceAllocationRequest}, which includes the time
* interval, and capability of the allocation.
* {@code ResourceAllocationRequest} represents an allocation
* made for a reservation for the current state of the plan. This can be
* changed for reasons such as re-planning, but will always be subject to
* the constraints of the user contract as described by
* {@link ReservationDefinition}
* </li>
* <li>{@link ReservationId} of the reservation.</li>
* <li>{@link ReservationDefinition} used to make the reservation.</li>
* </ul>
*
* @see ResourceAllocationRequest
* @see ReservationId
* @see ReservationDefinition
*/
@Private
@Unstable
public class ReservationAllocationStatePBImpl extends
ReservationAllocationState {
private ReservationAllocationStateProto proto =
ReservationAllocationStateProto.getDefaultInstance();;
private ReservationAllocationStateProto.Builder builder = null;
private boolean viaProto = false;
private List<ResourceAllocationRequest> resourceAllocations = null;
private ReservationId reservationId = null;
private ReservationDefinition reservationDefinition = null;
public ReservationAllocationStatePBImpl() {
builder = ReservationAllocationStateProto.newBuilder();
}
public ReservationAllocationStatePBImpl(
ReservationAllocationStateProto proto) {
this.proto = proto;
viaProto = true;
}
public ReservationAllocationStateProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = ReservationAllocationStateProto.newBuilder(proto);
}
viaProto = false;
}
private void mergeLocalToBuilder() {
if (this.resourceAllocations != null) {
int size = resourceAllocations.size();
builder.clearAllocationRequests();
for (int i = 0; i < size; i++) {
builder.addAllocationRequests(i, convertToProtoFormat(
resourceAllocations.get(i)
));
}
}
if (this.reservationId != null) {
builder.setReservationId(convertToProtoFormat(this.reservationId));
}
if (this.reservationDefinition != null) {
builder.setReservationDefinition(convertToProtoFormat(this
.reservationDefinition));
}
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
@Override
public long getAcceptanceTime() {
ReservationAllocationStateProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasAcceptanceTime()) {
return 0;
}
return (p.getAcceptanceTime());
}
@Override
public void setAcceptanceTime(long acceptanceTime) {
maybeInitBuilder();
if (acceptanceTime <= 0) {
builder.clearAcceptanceTime();
return;
}
builder.setAcceptanceTime(acceptanceTime);
}
@Override
public String getUser() {
ReservationAllocationStateProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasUser()) {
return null;
}
return p.getUser();
}
@Override
public void setUser(String user) {
maybeInitBuilder();
if (user == null) {
builder.clearUser();
return;
}
builder.setUser(user);
}
@Override
public List<ResourceAllocationRequest>
getResourceAllocationRequests() {
initResourceAllocations();
return this.resourceAllocations;
}
@Override
public void setResourceAllocationRequests(
List<ResourceAllocationRequest> newResourceAllocations) {
maybeInitBuilder();
if (newResourceAllocations == null) {
builder.clearAllocationRequests();
}
this.resourceAllocations = newResourceAllocations;
}
@Override
public ReservationId getReservationId() {
ReservationAllocationStateProtoOrBuilder p = viaProto ? proto : builder;
if (this.reservationId != null) {
return this.reservationId;
}
this.reservationId = convertFromProtoFormat(p.getReservationId());
return this.reservationId;
}
@Override
public void setReservationId(ReservationId newReservationId) {
maybeInitBuilder();
if (newReservationId == null) {
builder.clearReservationId();
}
reservationId = newReservationId;
}
@Override
public ReservationDefinition getReservationDefinition() {
ReservationAllocationStateProtoOrBuilder p = viaProto ? proto : builder;
if (this.reservationDefinition != null) {
return this.reservationDefinition;
}
this.reservationDefinition = convertFromProtoFormat(
p.getReservationDefinition());
return this.reservationDefinition;
}
@Override
public void setReservationDefinition(ReservationDefinition
newReservationDefinition) {
maybeInitBuilder();
if (newReservationDefinition == null) {
builder.clearReservationDefinition();
}
reservationDefinition = newReservationDefinition;
}
private ResourceAllocationRequestPBImpl convertFromProtoFormat(
ResourceAllocationRequestProto p) {
return new ResourceAllocationRequestPBImpl(p);
}
private ReservationIdPBImpl convertFromProtoFormat(ReservationIdProto p) {
return new ReservationIdPBImpl(p);
}
private ReservationDefinitionPBImpl convertFromProtoFormat(
ReservationDefinitionProto p) {
return new ReservationDefinitionPBImpl(p);
}
private ResourceAllocationRequestProto convertToProtoFormat(
ResourceAllocationRequest p) {
return ((ResourceAllocationRequestPBImpl)p).getProto();
}
private ReservationIdProto convertToProtoFormat(ReservationId p) {
return ((ReservationIdPBImpl)p).getProto();
}
private ReservationDefinitionProto convertToProtoFormat(
ReservationDefinition p) {
return ((ReservationDefinitionPBImpl)p).getProto();
}
private void initResourceAllocations() {
if (this.resourceAllocations != null) {
return;
}
ReservationAllocationStateProtoOrBuilder p = viaProto ? proto : builder;
List<ResourceAllocationRequestProto> resourceAllocationProtos =
p.getAllocationRequestsList();
resourceAllocations = new ArrayList<>();
for (ResourceAllocationRequestProto r : resourceAllocationProtos) {
resourceAllocations.add(convertFromProtoFormat(r));
}
}
@Override
public String toString() {
return "{Acceptance Time: "
+ getAcceptanceTime() + ", User: " + getUser()
+ ", Resource Allocations: " + getResourceAllocationRequests()
+ ", Reservation Id: " + getReservationId()
+ ", Reservation Definition: " + getReservationDefinition() + "}";
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
}

View File

@ -0,0 +1,188 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ResourceAllocationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceAllocationRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceAllocationRequestProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
/**
* {@code ResourceAllocationRequestPBImpl} which implements the
* {@link ResourceAllocationRequest} class which represents an allocation
* made for a reservation for the current state of the plan. This can be
* changed for reasons such as re-planning, but will always be subject to the
* constraints of the user contract as described by a
* {@code ReservationDefinition}
* {@link Resource}
*
* <p>
* It includes:
* <ul>
* <li>StartTime of the allocation.</li>
* <li>EndTime of the allocation.</li>
* <li>{@link Resource} reserved for the allocation.</li>
* </ul>
*
* @see Resource
*/
@Private
@Unstable
public class ResourceAllocationRequestPBImpl extends
ResourceAllocationRequest {
private ResourceAllocationRequestProto proto =
ResourceAllocationRequestProto.getDefaultInstance();
private ResourceAllocationRequestProto.Builder builder = null;
private boolean viaProto = false;
private Resource capability = null;
public ResourceAllocationRequestPBImpl() {
builder = ResourceAllocationRequestProto.newBuilder();
}
public ResourceAllocationRequestPBImpl(
ResourceAllocationRequestProto proto) {
this.proto = proto;
viaProto = true;
}
public ResourceAllocationRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = ResourceAllocationRequestProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public Resource getCapability() {
ResourceAllocationRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.capability != null) {
return this.capability;
}
if (!p.hasResource()) {
return null;
}
this.capability = convertFromProtoFormat(p.getResource());
return this.capability;
}
@Override
public void setCapability(Resource newCapability) {
maybeInitBuilder();
if (newCapability == null) {
builder.clearResource();
return;
}
capability = newCapability;
}
@Override
public long getStartTime() {
ResourceAllocationRequestProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasStartTime()) {
return 0;
}
return (p.getStartTime());
}
@Override
public void setStartTime(long startTime) {
maybeInitBuilder();
if (startTime <= 0) {
builder.clearStartTime();
return;
}
builder.setStartTime(startTime);
}
@Override
public long getEndTime() {
ResourceAllocationRequestProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasEndTime()) {
return 0;
}
return (p.getEndTime());
}
@Override
public void setEndTime(long endTime) {
maybeInitBuilder();
if (endTime <= 0) {
builder.clearEndTime();
return;
}
builder.setEndTime(endTime);
}
private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
return new ResourcePBImpl(p);
}
private ResourceProto convertToProtoFormat(Resource p) {
return ((ResourcePBImpl)p).getProto();
}
private void mergeLocalToBuilder() {
if (this.capability != null) {
builder.setResource(convertToProtoFormat(this.capability));
}
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
@Override
public String toString() {
return "{Resource: " + getCapability() + ", # Start Time: "
+ getStartTime() + ", End Time: " + getEndTime() + "}";
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
}

View File

@ -94,6 +94,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenRe
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RenewDelegationTokenResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationDeleteRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationDeleteResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationListRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationListResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateRequestPBImpl;
@ -135,10 +137,12 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueStatistics;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.ReservationAllocationState;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.ResourceAllocationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceOption;
@ -282,6 +286,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterR
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationDeleteRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationDeleteResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationListRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationListResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateRequestProto;
@ -500,6 +506,8 @@ public class TestPBImplRecords {
generateByNewInstance(ReservationRequest.class);
generateByNewInstance(ReservationRequests.class);
generateByNewInstance(ReservationDefinition.class);
generateByNewInstance(ResourceAllocationRequest.class);
generateByNewInstance(ReservationAllocationState.class);
generateByNewInstance(ResourceUtilization.class);
generateByNewInstance(AMBlackListingRequest.class);
}
@ -1233,7 +1241,19 @@ public class TestPBImplRecords {
validatePBImplRecord(ReservationDeleteResponsePBImpl.class,
ReservationDeleteResponseProto.class);
}
@Test
public void testReservationListRequestPBImpl() throws Exception {
validatePBImplRecord(ReservationListRequestPBImpl.class,
ReservationListRequestProto.class);
}
@Test
public void testReservationListResponsePBImpl() throws Exception {
validatePBImplRecord(ReservationListResponsePBImpl.class,
ReservationListResponseProto.class);
}
@Test
public void testAddToClusterNodeLabelsRequestPBImpl() throws Exception {
validatePBImplRecord(AddToClusterNodeLabelsRequestPBImpl.class,

View File

@ -24,14 +24,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Strings;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
@ -80,6 +77,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
@ -90,6 +89,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
@ -431,6 +432,13 @@ public class MockResourceManagerFacade implements
throw new NotImplementedException();
}
@Override
public ReservationListResponse listReservations(
ReservationListRequest request) throws YarnException,
IOException {
throw new NotImplementedException();
}
@Override
public ReservationUpdateResponse updateReservation(
ReservationUpdateRequest request) throws YarnException, IOException {

View File

@ -92,6 +92,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
@ -115,6 +117,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.ReservationAllocationState;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
@ -133,8 +136,11 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInputValidator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@ -1319,6 +1325,44 @@ public class ClientRMService extends AbstractService implements
return response;
}
@Override
public ReservationListResponse listReservations(
ReservationListRequest requestInfo) throws YarnException, IOException {
// Check if reservation system is enabled
checkReservationSytem(AuditConstants.LIST_RESERVATION_REQUEST);
ReservationListResponse response =
recordFactory.newRecordInstance(ReservationListResponse.class);
Plan plan = rValidator.validateReservationListRequest(
reservationSystem, requestInfo);
boolean includeResourceAllocations = requestInfo
.getIncludeResourceAllocations();
String user = checkReservationACLs(requestInfo.getQueue(),
AuditConstants.LIST_RESERVATION_REQUEST);
ReservationId requestedId = null;
if (requestInfo.getReservationId() != null
&& !requestInfo.getReservationId().isEmpty()) {
requestedId = ReservationId.parseReservationId(requestInfo
.getReservationId());
}
long startTime = Math.max(requestInfo.getStartTime(), 0);
long endTime = requestInfo.getEndTime() <= -1? Long.MAX_VALUE : requestInfo
.getEndTime();
Set<ReservationAllocation> reservations = plan.getReservations(
requestedId, new ReservationInterval(startTime, endTime), user);
List<ReservationAllocationState> info =
ReservationSystemUtil.convertAllocationsToReservationInfo(
reservations, includeResourceAllocations);
response.setReservationAllocationState(info);
return response;
}
@Override
public GetNodesToLabelsResponse getNodeToLabels(
GetNodesToLabelsRequest request) throws YarnException, IOException {

View File

@ -71,6 +71,8 @@ public class RMAuditLogger {
public static final String SUBMIT_RESERVATION_REQUEST = "Submit Reservation Request";
public static final String UPDATE_RESERVATION_REQUEST = "Update Reservation Request";
public static final String DELETE_RESERVATION_REQUEST = "Delete Reservation Request";
public static final String LIST_RESERVATION_REQUEST = "List " +
"Reservation Request";
}
static String createSuccessLog(String user, String operation, String target,

View File

@ -52,7 +52,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRM
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;

View File

@ -48,7 +48,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRM
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;

View File

@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;

View File

@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;

View File

@ -51,7 +51,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
/**
* Event representing maintaining ReservationSystem state.

View File

@ -55,7 +55,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRM
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;

View File

@ -346,7 +346,7 @@ public class InMemoryPlan implements Plan {
reservationTable.remove(reservation.getReservationId());
decrementAllocation(reservation);
LOG.info("Sucessfully deleted reservation: {} in plan.",
reservation.getReservationId());
reservation.getReservationId());
return true;
}
@ -412,30 +412,7 @@ public class InMemoryPlan implements Plan {
@Override
public Set<ReservationAllocation> getReservationsAtTime(long tick) {
ReservationInterval searchInterval =
new ReservationInterval(tick, Long.MAX_VALUE);
readLock.lock();
try {
SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>> reservations =
currentReservations.headMap(searchInterval, true);
if (!reservations.isEmpty()) {
Set<ReservationAllocation> flattenedReservations =
new HashSet<ReservationAllocation>();
for (Set<InMemoryReservationAllocation> reservationEntries : reservations
.values()) {
for (InMemoryReservationAllocation reservation : reservationEntries) {
if (reservation.getEndTime() > tick) {
flattenedReservations.add(reservation);
}
}
}
return Collections.unmodifiableSet(flattenedReservations);
} else {
return Collections.emptySet();
}
} finally {
readLock.unlock();
}
return getReservations(null, new ReservationInterval(tick, tick), "");
}
@Override
@ -498,6 +475,50 @@ public class InMemoryPlan implements Plan {
}
}
@Override
public Set<ReservationAllocation> getReservations(ReservationId
reservationID, ReservationInterval interval, String user) {
if (reservationID != null) {
ReservationAllocation allocation = getReservationById(reservationID);
if (allocation == null){
return Collections.emptySet();
}
return Collections.singleton(allocation);
}
long startTime = interval == null? 0 : interval.getStartTime();
long endTime = interval == null? Long.MAX_VALUE : interval.getEndTime();
ReservationInterval searchInterval =
new ReservationInterval(endTime, Long.MAX_VALUE);
readLock.lock();
try {
SortedMap<ReservationInterval, Set<InMemoryReservationAllocation>>
reservations = currentReservations.headMap(searchInterval, true);
if (!reservations.isEmpty()) {
Set<ReservationAllocation> flattenedReservations =
new HashSet<>();
for (Set<InMemoryReservationAllocation> reservationEntries :
reservations.values()) {
for (InMemoryReservationAllocation res : reservationEntries) {
if (res.getEndTime() > startTime) {
if (user != null && !user.isEmpty()
&& !res.getUser().equals(user)) {
continue;
}
flattenedReservations.add(res);
}
}
}
return Collections.unmodifiableSet(flattenedReservations);
} else {
return Collections.emptySet();
}
} finally {
readLock.unlock();
}
}
@Override
public ReservationAllocation getReservationById(ReservationId reservationID) {
if (reservationID == null) {

View File

@ -31,6 +31,23 @@ import java.util.Set;
*/
public interface PlanView extends PlanContext {
/**
* Return a set of {@link ReservationAllocation} identified by the user who
* made the reservation.
*
* @param reservationID the unqiue id to identify the
* {@link ReservationAllocation}
* @param interval the time interval used to retrieve the reservation
* allocations from. Only reservations with start time no
* greater than the interval end time, and end time no less
* than the interval start time will be selected.
* @param user the user to retrieve the reservation allocation from.
* @return {@link ReservationAllocation} identified by the user who
* made the reservation
*/
Set<ReservationAllocation> getReservations(ReservationId
reservationID, ReservationInterval interval, String user);
/**
* Return a {@link ReservationAllocation} identified by its
* {@link ReservationId}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.List;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
@ -50,37 +51,24 @@ public class ReservationInputValidator {
private Plan validateReservation(ReservationSystem reservationSystem,
ReservationId reservationId, String auditConstant) throws YarnException {
String message = "";
// check if the reservation id is valid
if (reservationId == null) {
message =
String message =
"Missing reservation id."
+ " Please try again by specifying a reservation id.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
String queueName = reservationSystem.getQueueForReservation(reservationId);
if (queueName == null) {
message =
"The specified reservation with ID: " + reservationId
+ " is unknown. Please try again with a valid reservation.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
// check if the associated plan is valid
Plan plan = reservationSystem.getPlan(queueName);
if (plan == null) {
message =
"The specified reservation: " + reservationId
+ " is not associated with any valid plan."
+ " Please try again with a valid reservation.";
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input", "ClientRMService", message);
throw RPCUtil.getRemoteException(message);
}
return plan;
String queue = reservationSystem.getQueueForReservation(reservationId);
String nullQueueErrorMessage =
"The specified reservation with ID: " + reservationId
+ " is unknown. Please try again with a valid reservation.";
String nullPlanErrorMessage = "The specified reservation: " + reservationId
+ " is not associated with any valid plan."
+ " Please try again with a valid reservation.";
return getPlanFromQueue(reservationSystem, queue, auditConstant,
nullQueueErrorMessage, nullPlanErrorMessage);
}
private void validateReservationDefinition(ReservationId reservationId,
@ -169,6 +157,37 @@ public class ReservationInputValidator {
}
}
private Plan getPlanFromQueue(ReservationSystem reservationSystem, String
queue, String auditConstant) throws YarnException {
String nullQueueErrorMessage = "The queue is not specified."
+ " Please try again with a valid reservable queue.";
String nullPlanErrorMessage = "The specified queue: " + queue
+ " is not managed by reservation system."
+ " Please try again with a valid reservable queue.";
return getPlanFromQueue(reservationSystem, queue, auditConstant,
nullQueueErrorMessage, nullPlanErrorMessage);
}
private Plan getPlanFromQueue(ReservationSystem reservationSystem, String
queue, String auditConstant, String nullQueueErrorMessage,
String nullPlanErrorMessage) throws YarnException {
if (queue == null || queue.isEmpty()) {
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input", "ClientRMService",
nullQueueErrorMessage);
throw RPCUtil.getRemoteException(nullQueueErrorMessage);
}
// check if the associated plan is valid
Plan plan = reservationSystem.getPlan(queue);
if (plan == null) {
RMAuditLogger.logFailure("UNKNOWN", auditConstant,
"validate reservation input", "ClientRMService",
nullPlanErrorMessage);
throw RPCUtil.getRemoteException(nullPlanErrorMessage);
}
return plan;
}
/**
* Quick validation on the input to check some obvious fail conditions (fail
* fast) the input and returns the appropriate {@link Plan} associated with
@ -188,27 +207,9 @@ public class ReservationInputValidator {
ReservationSubmissionRequest request, ReservationId reservationId)
throws YarnException {
// Check if it is a managed queue
String queueName = request.getQueue();
if (queueName == null || queueName.isEmpty()) {
String errMsg =
"The queue to submit is not specified."
+ " Please try again with a valid reservable queue.";
RMAuditLogger.logFailure("UNKNOWN",
AuditConstants.SUBMIT_RESERVATION_REQUEST,
"validate reservation input", "ClientRMService", errMsg);
throw RPCUtil.getRemoteException(errMsg);
}
Plan plan = reservationSystem.getPlan(queueName);
if (plan == null) {
String errMsg =
"The specified queue: " + queueName
+ " is not managed by reservation system."
+ " Please try again with a valid reservable queue.";
RMAuditLogger.logFailure("UNKNOWN",
AuditConstants.SUBMIT_RESERVATION_REQUEST,
"validate reservation input", "ClientRMService", errMsg);
throw RPCUtil.getRemoteException(errMsg);
}
String queue = request.getQueue();
Plan plan = getPlanFromQueue(reservationSystem, queue,
AuditConstants.SUBMIT_RESERVATION_REQUEST);
validateReservationDefinition(reservationId,
request.getReservationDefinition(), plan,
AuditConstants.SUBMIT_RESERVATION_REQUEST);
@ -240,6 +241,38 @@ public class ReservationInputValidator {
return plan;
}
/**
* Quick validation on the input to check some obvious fail conditions (fail
* fast) the input and returns the appropriate {@link Plan} associated with
* the specified {@link Queue} or throws an exception message illustrating the
* details of any validation check failures.
*
* @param reservationSystem the {@link ReservationSystem} to validate against
* @param request the {@link ReservationListRequest} defining search
* parameters for reservations in the {@link ReservationSystem}
* that is being validated against.
* @return the {@link Plan} to list reservations of.
* @throws YarnException
*/
public Plan validateReservationListRequest(
ReservationSystem reservationSystem,
ReservationListRequest request)
throws YarnException {
String queue = request.getQueue();
if (request.getEndTime() < request.getStartTime()) {
String errorMessage = "The specified end time must be greater than " +
"the specified start time.";
RMAuditLogger.logFailure("UNKNOWN",
AuditConstants.LIST_RESERVATION_REQUEST,
"validate list reservation input", "ClientRMService",
errorMessage);
throw RPCUtil.getRemoteException(errorMessage);
}
// Check if it is a managed queue
return getPlanFromQueue(reservationSystem, queue,
AuditConstants.LIST_RESERVATION_REQUEST);
}
/**
* Quick validation on the input to check some obvious fail conditions (fail
* fast) the input and returns the appropriate {@link Plan} associated with
@ -258,5 +291,4 @@ public class ReservationInputValidator {
return validateReservation(reservationSystem, request.getReservationId(),
AuditConstants.DELETE_RESERVATION_REQUEST);
}
}

View File

@ -18,24 +18,29 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.yarn.api.records.ReservationAllocationState;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ResourceAllocationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationDefinitionProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceAllocationRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ResourceAllocationRequestProto;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Simple helper class for static methods used to transform across
@ -70,7 +75,7 @@ public final class ReservationSystemUtil {
ReservationAllocationStateProto.Builder builder =
ReservationAllocationStateProto.newBuilder();
builder.setAcceptanceTimestamp(allocation.getAcceptanceTime());
builder.setAcceptanceTime(allocation.getAcceptanceTime());
builder.setContainsGangs(allocation.containsGangs());
builder.setStartTime(allocation.getStartTime());
builder.setEndTime(allocation.getEndTime());
@ -137,9 +142,9 @@ public final class ReservationSystemUtil {
}
public static InMemoryReservationAllocation toInMemoryAllocation(
String planName, ReservationId reservationId,
ReservationAllocationStateProto allocationState, Resource minAlloc,
ResourceCalculator planResourceCalculator) {
String planName, ReservationId reservationId,
ReservationAllocationStateProto allocationState, Resource minAlloc,
ResourceCalculator planResourceCalculator) {
ReservationDefinition definition =
convertFromProtoFormat(
allocationState.getReservationDefinition());
@ -152,4 +157,32 @@ public final class ReservationSystemUtil {
minAlloc, allocationState.getContainsGangs());
return allocation;
}
public static List<ReservationAllocationState>
convertAllocationsToReservationInfo(Set<ReservationAllocation> res,
boolean includeResourceAllocations) {
List<ReservationAllocationState> reservationInfo = new ArrayList<>();
Map<ReservationInterval, Resource> requests;
for (ReservationAllocation allocation : res) {
List<ResourceAllocationRequest> allocations = new ArrayList<>();
if (includeResourceAllocations) {
requests = allocation.getAllocationRequests();
for (Map.Entry<ReservationInterval, Resource> request :
requests.entrySet()) {
ReservationInterval interval = request.getKey();
allocations.add(ResourceAllocationRequest.newInstance(
interval.getStartTime(), interval.getEndTime(),
request.getValue()));
}
}
reservationInfo.add(ReservationAllocationState.newInstance(
allocation.getAcceptanceTime(), allocation.getUser(),
allocations, allocation.getReservationId(),
allocation.getReservationDefinition()));
}
return reservationInfo;
}
}

View File

@ -99,19 +99,3 @@ message RMDelegationTokenIdentifierDataProto {
optional YARNDelegationTokenIdentifierProto token_identifier = 1;
optional int64 renewDate = 2;
}
message ResourceAllocationRequestProto {
optional int64 start_time = 1;
optional int64 end_time = 2;
optional ResourceProto resource = 3;
}
message ReservationAllocationStateProto {
optional ReservationDefinitionProto reservation_definition = 1;
repeated ResourceAllocationRequestProto allocation_requests = 2;
optional int64 start_time = 3;
optional int64 end_time = 4;
optional string user = 5;
optional bool contains_gangs = 6;
optional int64 acceptance_timestamp = 7;
}

View File

@ -75,6 +75,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
@ -102,6 +104,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@ -1147,6 +1150,168 @@ public class TestClientRMService {
Assert.assertNotNull(sResponse);
LOG.info("Update reservation response: " + uResponse);
// List reservations, search by reservation ID
ReservationListRequest request =
ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ,
reservationID.toString(), -1, -1, false);
ReservationListResponse response = null;
try {
response = clientService.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertEquals(response.getReservationAllocationState().size(), 1);
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getResourceAllocationRequests().size(), 0);
// List reservations, search by time within reservation interval.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 1, Long.MAX_VALUE,
true);
response = null;
try {
response = clientService.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
// List reservations, search by invalid end time == -1.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 1, -1,
true);
response = null;
try {
response = clientService.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
// List reservations, search by invalid end time < -1.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 1, -10,
true);
response = null;
try {
response = clientService.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
// List reservations, search by time interval.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", arrival +
duration/2, arrival + duration/2, true);
response = null;
try {
response = clientService.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
// Verify that the full resource allocations exist.
Assert.assertTrue(response.getReservationAllocationState().get(0)
.getResourceAllocationRequests().size() > 0);
// Verify that the full RDL is returned.
ReservationRequests reservationRequests = response
.getReservationAllocationState().get(0).getReservationDefinition()
.getReservationRequests();
Assert.assertTrue(reservationRequests.getInterpreter().toString()
.equals("R_ALL"));
Assert.assertTrue(reservationRequests.getReservationResources().get(0)
.getDuration() == duration);
// List reservations, search by a very large start time.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", Long.MAX_VALUE,
-1, false);
response = null;
try {
response = clientService.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
// Ensure all reservations are filtered out.
Assert.assertNotNull(response);
Assert.assertEquals(0, response.getReservationAllocationState().size());
// List reservations, search by start time after the reservation
// end time.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", deadline + duration,
deadline + 2 * duration, false);
response = null;
try {
response = clientService.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
// Ensure all reservations are filtered out.
Assert.assertNotNull(response);
Assert.assertEquals(response.getReservationAllocationState().size(), 0);
// List reservations, search by end time before the reservation start
// time.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 0, arrival -
duration, false);
response = null;
try {
response = clientService.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
// Ensure all reservations are filtered out.
Assert.assertNotNull(response);
Assert.assertEquals(response.getReservationAllocationState().size(), 0);
// List reservations, search by a very small end time.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 0, 1,
false);
response = null;
try {
response = clientService.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
// Ensure all reservations are filtered out.
Assert.assertNotNull(response);
Assert.assertEquals(response.getReservationAllocationState().size(), 0);
// Delete the reservation
ReservationDeleteRequest dRequest =
ReservationDeleteRequest.newInstance(reservationID);
@ -1159,6 +1324,20 @@ public class TestClientRMService {
Assert.assertNotNull(sResponse);
LOG.info("Delete reservation response: " + dResponse);
// List reservations, search by non-existent reservationID
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, reservationID.toString(),
-1, -1, false);
response = null;
try {
response = clientService.listReservations(request);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(response);
Assert.assertEquals(0, response.getReservationAllocationState().size());
// clean-up
rm.stop();
nm = null;

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;

View File

@ -57,7 +57,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
@ -851,7 +851,7 @@ public class RMStateStoreTestBase {
ReservationAllocationStateProto actual) {
Assert.assertEquals(
expected.getAcceptanceTimestamp(), actual.getAcceptanceTimestamp());
expected.getAcceptanceTime(), actual.getAcceptanceTime());
Assert.assertEquals(expected.getStartTime(), actual.getStartTime());
Assert.assertEquals(expected.getEndTime(), actual.getEndTime());
Assert.assertEquals(expected.getContainsGangs(), actual.getContainsGangs());
@ -866,7 +866,7 @@ public class RMStateStoreTestBase {
ReservationAllocation expected,
ReservationAllocationStateProto actual) {
Assert.assertEquals(
expected.getAcceptanceTime(), actual.getAcceptanceTimestamp());
expected.getAcceptanceTime(), actual.getAcceptanceTime());
Assert.assertEquals(expected.getStartTime(), actual.getStartTime());
Assert.assertEquals(expected.getEndTime(), actual.getEndTime());
Assert.assertEquals(expected.containsGangs(), actual.getContainsGangs());

View File

@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
@ -98,19 +99,11 @@ public class TestInMemoryPlan {
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
ReservationSystemTestUtil.getNewReservationId();
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
Map<ReservationInterval, ReservationRequest> allocations =
generateAllocation(start, alloc, false);
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length,
alloc.length, allocations.values());
Map<ReservationInterval, Resource> allocs =
ReservationSystemUtil.toResources(allocations);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length, allocs, resCalc, minAlloc);
ReservationAllocation rAllocation = createReservationAllocation
(reservationID, start, alloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation, false);
@ -139,19 +132,11 @@ public class TestInMemoryPlan {
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
ReservationSystemTestUtil.getNewReservationId();
int[] alloc = {};
int start = 100;
Map<ReservationInterval, ReservationRequest> allocations =
new HashMap<ReservationInterval, ReservationRequest>();
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length,
alloc.length, allocations.values());
Map<ReservationInterval, Resource> allocs = ReservationSystemUtil.toResources
(allocations);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length, allocs, resCalc, minAlloc);
ReservationAllocation rAllocation = createReservationAllocation
(reservationID, start, alloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation, false);
@ -167,19 +152,11 @@ public class TestInMemoryPlan {
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
ReservationSystemTestUtil.getNewReservationId();
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
Map<ReservationInterval, ReservationRequest> allocations =
generateAllocation(start, alloc, false);
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length,
alloc.length, allocations.values());
Map<ReservationInterval, Resource> allocs = ReservationSystemUtil.toResources
(allocations);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length, allocs, resCalc, minAlloc);
ReservationAllocation rAllocation = createReservationAllocation
(reservationID, start, alloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation, false);
@ -211,16 +188,8 @@ public class TestInMemoryPlan {
// First add a reservation
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
Map<ReservationInterval, ReservationRequest> allocations =
generateAllocation(start, alloc, false);
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length,
alloc.length, allocations.values());
Map<ReservationInterval, Resource> allocs = ReservationSystemUtil.toResources
(allocations);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length, allocs, resCalc, minAlloc);
ReservationAllocation rAllocation = createReservationAllocation
(reservationID, start, alloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation, false);
@ -241,16 +210,8 @@ public class TestInMemoryPlan {
// Now update it
start = 110;
int[] updatedAlloc = { 0, 5, 10, 10, 5, 0 };
allocations = generateAllocation(start, updatedAlloc, true);
rDef =
createSimpleReservationDefinition(start, start + updatedAlloc.length,
updatedAlloc.length, allocations.values());
Map<ReservationInterval, Resource> updatedAllocs =
ReservationSystemUtil.toResources(allocations);
rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + updatedAlloc.length, updatedAllocs, resCalc,
minAlloc);
rAllocation = createReservationAllocation(reservationID, start,
updatedAlloc, true);
try {
plan.updateReservation(rAllocation);
} catch (PlanningException e) {
@ -282,16 +243,8 @@ public class TestInMemoryPlan {
// Try to update a reservation without adding
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
Map<ReservationInterval, ReservationRequest> allocations =
generateAllocation(start, alloc, false);
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length,
alloc.length, allocations.values());
Map<ReservationInterval, Resource> allocs =
ReservationSystemUtil.toResources(allocations);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length, allocs, resCalc, minAlloc);
createReservationAllocation(reservationID, start, alloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.updateReservation(rAllocation);
@ -314,16 +267,8 @@ public class TestInMemoryPlan {
ReservationSystemTestUtil.getNewReservationId();
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
Map<ReservationInterval, ReservationRequest> allocations =
generateAllocation(start, alloc, true);
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length,
alloc.length, allocations.values());
Map<ReservationInterval, Resource> allocs =
ReservationSystemUtil.toResources(allocations);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID, rDef, user, planName,
start, start + alloc.length, allocs, resCalc, minAlloc);
createReservationAllocation(reservationID, start, alloc, true);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation, false);
@ -391,17 +336,8 @@ public class TestInMemoryPlan {
// First add a reservation
int[] alloc1 = { 10, 10, 10, 10, 10, 10 };
int start = 100;
Map<ReservationInterval, ReservationRequest> allocations1 =
generateAllocation(start, alloc1, false);
ReservationDefinition rDef1 =
createSimpleReservationDefinition(start, start + alloc1.length,
alloc1.length, allocations1.values());
Map<ReservationInterval, Resource> allocs1 =
ReservationSystemUtil.toResources(allocations1);
ReservationAllocation rAllocation =
new InMemoryReservationAllocation(reservationID1, rDef1, user,
planName, start, start + alloc1.length, allocs1, resCalc,
minAlloc);
createReservationAllocation(reservationID1, start, alloc1);
Assert.assertNull(plan.getReservationById(reservationID1));
try {
plan.addReservation(rAllocation, false);
@ -416,17 +352,8 @@ public class TestInMemoryPlan {
ReservationId reservationID2 =
ReservationSystemTestUtil.getNewReservationId();
int[] alloc2 = { 0, 5, 10, 5, 0 };
Map<ReservationInterval, ReservationRequest> allocations2 =
generateAllocation(start, alloc2, true);
ReservationDefinition rDef2 =
createSimpleReservationDefinition(start, start + alloc2.length,
alloc2.length, allocations2.values());
Map<ReservationInterval, Resource> allocs2 =
ReservationSystemUtil.toResources(allocations2);
rAllocation =
new InMemoryReservationAllocation(reservationID2, rDef2, user,
planName, start, start + alloc2.length, allocs2, resCalc,
minAlloc);
createReservationAllocation(reservationID2, start, alloc2, true);
Assert.assertNull(plan.getReservationById(reservationID2));
try {
plan.addReservation(rAllocation, false);
@ -482,6 +409,192 @@ public class TestInMemoryPlan {
}
}
@Test
public void testGetReservationsById() {
Plan plan =
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
int[] alloc = {10, 10, 10, 10, 10, 10};
int start = 100;
ReservationAllocation rAllocation = createReservationAllocation
(reservationID, start, alloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation, false);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
// Verify that get by reservation id works.
Set<ReservationAllocation> rAllocations =
plan.getReservations(reservationID, null, "");
Assert.assertTrue(rAllocations.size() == 1);
Assert.assertTrue(rAllocation.compareTo(
(ReservationAllocation) rAllocations.toArray()[0]) == 0);
// Verify that get by reservation id works even when time range
// and user is invalid.
ReservationInterval interval = new ReservationInterval(0, 0);
rAllocations = plan.getReservations(reservationID, interval, "invalid");
Assert.assertTrue(rAllocations.size() == 1);
Assert.assertTrue(rAllocation.compareTo(
(ReservationAllocation) rAllocations.toArray()[0]) == 0);
}
@Test
public void testGetReservationsByInvalidId() {
Plan plan =
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
int[] alloc = {10, 10, 10, 10, 10, 10};
int start = 100;
ReservationAllocation rAllocation = createReservationAllocation
(reservationID, start, alloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation, false);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
// If reservationId is null, then nothing is returned.
ReservationId invalidReservationID =
ReservationSystemTestUtil.getNewReservationId();
Set<ReservationAllocation> rAllocations =
plan.getReservations(invalidReservationID, null, "");
Assert.assertTrue(rAllocations.size() == 0);
}
@Test
public void testGetReservationsByTimeInterval() {
Plan plan =
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
int[] alloc = {10, 10, 10, 10, 10, 10};
int start = 100;
ReservationAllocation rAllocation = createReservationAllocation
(reservationID, start, alloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation, false);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
// Verify that get by time interval works if the selection interval
// completely overlaps with the allocation.
ReservationInterval interval = new ReservationInterval(rAllocation
.getStartTime(), rAllocation.getEndTime());
Set<ReservationAllocation> rAllocations =
plan.getReservations(null, interval, "");
Assert.assertTrue(rAllocations.size() == 1);
Assert.assertTrue(rAllocation.compareTo(
(ReservationAllocation) rAllocations.toArray()[0]) == 0);
// Verify that get by time interval works if the selection interval
// falls within the allocation
long duration = rAllocation.getEndTime() - rAllocation.getStartTime();
interval = new ReservationInterval(rAllocation.getStartTime() + duration
* (long)0.3, rAllocation.getEndTime() - duration * (long)0.3);
rAllocations = plan.getReservations(null, interval, "");
Assert.assertTrue(rAllocations.size() == 1);
Assert.assertTrue(rAllocation.compareTo(
(ReservationAllocation) rAllocations.toArray()[0]) == 0);
// Verify that get by time interval selects 1 allocation if the end
// time of the selection interval falls right at the start of the
// allocation.
interval = new ReservationInterval(0, rAllocation.getStartTime());
rAllocations = plan.getReservations(null, interval, "");
Assert.assertTrue(rAllocations.size() == 1);
Assert.assertTrue(rAllocation.compareTo(
(ReservationAllocation) rAllocations.toArray()[0]) == 0);
// Verify that get by time interval selects no reservations if the start
// time of the selection interval falls right at the end of the allocation.
interval = new ReservationInterval(rAllocation
.getEndTime(), Long.MAX_VALUE);
rAllocations = plan.getReservations(null, interval, "");
Assert.assertTrue(rAllocations.size() == 0);
// Verify that get by time interval selects no reservations if the
// selection interval and allocation interval do not overlap.
interval = new ReservationInterval(0, rAllocation.getStartTime() / 2);
rAllocations = plan.getReservations(null, interval, "");
Assert.assertTrue(rAllocations.size() == 0);
}
@Test
public void testGetReservationsAtTime() {
Plan plan =
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
int[] alloc = {10, 10, 10, 10, 10, 10};
int start = 100;
ReservationAllocation rAllocation = createReservationAllocation
(reservationID, start, alloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation, false);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
Set<ReservationAllocation> rAllocations =
plan.getReservationsAtTime(rAllocation.getStartTime());
Assert.assertTrue(rAllocations.size() == 1);
Assert.assertTrue(rAllocation.compareTo(
(ReservationAllocation) rAllocations.toArray()[0]) == 0);
}
@Test
public void testGetReservationsWithNoInput() {
Plan plan =
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
ReservationId reservationID =
ReservationSystemTestUtil.getNewReservationId();
int[] alloc = {10, 10, 10, 10, 10, 10};
int start = 100;
ReservationAllocation rAllocation = createReservationAllocation
(reservationID, start, alloc);
Assert.assertNull(plan.getReservationById(reservationID));
try {
plan.addReservation(rAllocation, false);
} catch (PlanningException e) {
Assert.fail(e.getMessage());
}
// Verify that getReservations defaults to getting all reservations if no
// reservationID, time interval, and user is provided,
Set<ReservationAllocation> rAllocations =
plan.getReservations(null, null, "");
Assert.assertTrue(rAllocations.size() == 1);
Assert.assertTrue(rAllocation.compareTo(
(ReservationAllocation) rAllocations.toArray()[0]) == 0);
}
@Test
public void testGetReservationsWithNoReservation() {
Plan plan =
new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L,
resCalc, minAlloc, maxAlloc, planName, replanner, true, context);
// Verify that get reservation returns no entries if no queries are made.
ReservationInterval interval = new ReservationInterval(0, Long.MAX_VALUE);
Set<ReservationAllocation> rAllocations =
plan.getReservations(null, interval, "");
Assert.assertTrue(rAllocations.size() == 0);
}
private void doAssertions(Plan plan, ReservationAllocation rAllocation) {
ReservationId reservationID = rAllocation.getReservationId();
Assert.assertNotNull(plan.getReservationById(reservationID));
@ -528,4 +641,23 @@ public class TestInMemoryPlan {
return req;
}
private ReservationAllocation createReservationAllocation(ReservationId
reservationID, int start, int[] alloc) {
return createReservationAllocation(reservationID, start, alloc, false);
}
private ReservationAllocation createReservationAllocation(ReservationId
reservationID, int start, int[] alloc, boolean isStep) {
Map<ReservationInterval, ReservationRequest> allocations =
generateAllocation(start, alloc, isStep);
ReservationDefinition rDef =
createSimpleReservationDefinition(start, start + alloc.length,
alloc.length, allocations.values());
Map<ReservationInterval, Resource> allocs =
ReservationSystemUtil.toResources(allocations);
return new InMemoryReservationAllocation(reservationID, rDef, user,
planName,
start, start + alloc.length, allocs, resCalc, minAlloc);
}
}

View File

@ -29,9 +29,11 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationDeleteRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationListRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateRequestPBImpl;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
@ -105,7 +107,7 @@ public class TestReservationInputValidator {
}
@Test
public void testSubmitReservationDoesnotExist() {
public void testSubmitReservationDoesNotExist() {
ReservationSubmissionRequest request =
new ReservationSubmissionRequestPBImpl();
Plan plan = null;
@ -119,7 +121,7 @@ public class TestReservationInputValidator {
String message = e.getMessage();
Assert
.assertTrue(message
.equals("The queue to submit is not specified. Please try again with a valid reservable queue."));
.equals("The queue is not specified. Please try again with a valid reservable queue."));
LOG.info(message);
}
}
@ -523,6 +525,103 @@ public class TestReservationInputValidator {
}
}
@Test
public void testListReservationsNormal() {
ReservationListRequest request = new ReservationListRequestPBImpl();
request.setQueue(ReservationSystemTestUtil.reservationQ);
request.setEndTime(1000);
request.setStartTime(0);
when(rSystem.getPlan(ReservationSystemTestUtil.reservationQ)).thenReturn
(this.plan);
Plan plan = null;
try {
plan = rrValidator.validateReservationListRequest(rSystem, request);
} catch (YarnException e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(plan);
}
@Test
public void testListReservationsInvalidTimeIntervalDefaults() {
ReservationListRequest request = new ReservationListRequestPBImpl();
request.setQueue(ReservationSystemTestUtil.reservationQ);
// Negative time gets converted to default values for Start Time and End
// Time which are 0 and Long.MAX_VALUE respectively.
request.setEndTime(-2);
request.setStartTime(-1);
when(rSystem.getPlan(ReservationSystemTestUtil.reservationQ)).thenReturn
(this.plan);
Plan plan = null;
try {
plan = rrValidator.validateReservationListRequest(rSystem, request);
} catch (YarnException e) {
Assert.fail(e.getMessage());
}
Assert.assertNotNull(plan);
}
@Test
public void testListReservationsInvalidTimeInterval() {
ReservationListRequest request = new ReservationListRequestPBImpl();
request.setQueue(ReservationSystemTestUtil.reservationQ);
request.setEndTime(1000);
request.setStartTime(2000);
when(rSystem.getPlan(ReservationSystemTestUtil.reservationQ)).thenReturn
(this.plan);
Plan plan = null;
try {
plan = rrValidator.validateReservationListRequest(rSystem, request);
Assert.fail();
} catch (YarnException e) {
Assert.assertNull(plan);
String message = e.getMessage();
Assert.assertTrue(message.equals("The specified end time must be " +
"greater than the specified start time."));
LOG.info(message);
}
}
@Test
public void testListReservationsEmptyQueue() {
ReservationListRequest request = new ReservationListRequestPBImpl();
request.setQueue("");
Plan plan = null;
try {
plan = rrValidator.validateReservationListRequest(rSystem, request);
Assert.fail();
} catch (YarnException e) {
Assert.assertNull(plan);
String message = e.getMessage();
Assert.assertTrue(message.equals(
"The queue is not specified. Please try again with a valid " +
"reservable queue."));
LOG.info(message);
}
}
@Test
public void testListReservationsNullPlan() {
ReservationListRequest request = new ReservationListRequestPBImpl();
request.setQueue(ReservationSystemTestUtil.reservationQ);
when(rSystem.getPlan(ReservationSystemTestUtil.reservationQ)).thenReturn
(null);
Plan plan = null;
try {
plan = rrValidator.validateReservationListRequest(rSystem, request);
Assert.fail();
} catch (YarnException e) {
Assert.assertNull(plan);
String message = e.getMessage();
Assert.assertTrue(message.equals(
"The specified queue: " + ReservationSystemTestUtil.reservationQ
+ " is not managed by reservation system."
+ " Please try again with a valid reservable queue."
));
LOG.info(message);
}
}
private ReservationSubmissionRequest createSimpleReservationSubmissionRequest(
int numRequests, int numContainers, long arrival, long deadline,
long duration) {

View File

@ -0,0 +1,134 @@
/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.mockito.Mockito.mock;
import org.apache.hadoop.yarn.api.records.ReservationAllocationState;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
public class TestReservationSystemUtil {
@Test
public void testConvertAllocationsToReservationInfo() {
long startTime = new Date().getTime();
long step = 10000;
int[] alloc = {10, 10, 10};
ReservationId id = ReservationSystemTestUtil.getNewReservationId();
ReservationAllocation allocation = createReservationAllocation(
startTime, startTime + 10 * step, step, alloc, id,
createResource(4000, 2));
List<ReservationAllocationState> infoList = ReservationSystemUtil
.convertAllocationsToReservationInfo(
Collections.singleton(allocation), true);
Assert.assertEquals(infoList.size(), 1);
Assert.assertEquals(infoList.get(0).getReservationId().toString(),
id.toString());
Assert.assertFalse(infoList.get(0).getResourceAllocationRequests()
.isEmpty());
}
@Test
public void testConvertAllocationsToReservationInfoNoAllocations() {
long startTime = new Date().getTime();
long step = 10000;
int[] alloc = {10, 10, 10};
ReservationId id = ReservationSystemTestUtil.getNewReservationId();
ReservationAllocation allocation = createReservationAllocation(
startTime, startTime + 10 * step, step, alloc, id,
createResource(4000, 2));
List<ReservationAllocationState> infoList = ReservationSystemUtil
.convertAllocationsToReservationInfo(
Collections.singleton(allocation), false);
Assert.assertEquals(infoList.size(), 1);
Assert.assertEquals(infoList.get(0).getReservationId().toString(),
id.toString());
Assert.assertTrue(infoList.get(0).getResourceAllocationRequests()
.isEmpty());
}
@Test
public void testConvertAllocationsToReservationInfoEmptyAllocations() {
long startTime = new Date().getTime();
long step = 10000;
int[] alloc = {};
ReservationId id = ReservationSystemTestUtil.getNewReservationId();
ReservationAllocation allocation = createReservationAllocation(
startTime, startTime + 10 * step, step, alloc, id,
createResource(4000, 2));
List<ReservationAllocationState> infoList = ReservationSystemUtil
.convertAllocationsToReservationInfo(
Collections.singleton(allocation), false);
Assert.assertEquals(infoList.size(), 1);
Assert.assertEquals(infoList.get(0).getReservationId().toString(),
id.toString());
Assert.assertTrue(infoList.get(0).getResourceAllocationRequests()
.isEmpty());
}
@Test
public void testConvertAllocationsToReservationInfoEmptySet() {
List<ReservationAllocationState> infoList = ReservationSystemUtil
.convertAllocationsToReservationInfo(
Collections.<ReservationAllocation>emptySet(), false);
Assert.assertEquals(infoList.size(), 0);
}
private ReservationAllocation createReservationAllocation(long startTime,
long deadline, long step, int[] alloc, ReservationId id, Resource
minAlloc) {
Map<ReservationInterval, Resource> allocations = ReservationSystemTestUtil
.generateAllocation(startTime, step, alloc);
ResourceCalculator rs = mock(ResourceCalculator.class);
ReservationDefinition definition = ReservationSystemTestUtil
.createSimpleReservationDefinition(startTime, deadline, step);
return new InMemoryReservationAllocation(id,
definition, "user", ReservationSystemTestUtil.reservationQ,
startTime, startTime + step, allocations, rs, minAlloc, false);
}
public Resource createResource(int memory, int vCores) {
Resource resource = new ResourcePBImpl();
resource.setMemory(memory);
resource.setVirtualCores(vCores);
return resource;
}
}