YARN-4248. REST API for submit/update/delete Reservations. (curino)

(cherry picked from commit c25a635459)
(cherry picked from commit 4ac1564418)
This commit is contained in:
= 2015-12-07 13:33:28 -08:00
parent 03e615ba55
commit 3205fe3da8
15 changed files with 1407 additions and 0 deletions

View File

@ -528,6 +528,8 @@ Release 2.8.0 - UNRELEASED
YARN-3456. Improve handling of incomplete TimelineEntities. (Varun Saxena
via rohithsharmaks)
YARN-4248. REST API for submit/update/delete Reservations. (curino)
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -98,6 +98,11 @@ import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -145,6 +150,19 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntr
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntryList;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteResponseInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionResponseInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateResponseInfo;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
@ -1822,4 +1840,290 @@ public class RMWebServices {
}
return token;
}
/**
* Function to submit a Reservation to the RM.
*
* @param resContext provides information to construct the
* ReservationSubmissionRequest
* @param hsr the servlet request
* @return Response containing the status code
* @throws AuthorizationException
* @throws IOException
* @throws InterruptedException
*/
@POST
@Path("/reservation/submit")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public Response submitReservation(
ReservationSubmissionRequestInfo resContext,
@Context HttpServletRequest hsr) throws AuthorizationException,
IOException, InterruptedException {
init();
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
if (callerUGI == null) {
throw new AuthorizationException("Unable to obtain user name, "
+ "user not authenticated");
}
if (UserGroupInformation.isSecurityEnabled() && isStaticUser(callerUGI)) {
String msg = "The default static user cannot carry out this operation.";
return Response.status(Status.FORBIDDEN).entity(msg).build();
}
final ReservationSubmissionRequest reservation =
createReservationSubmissionRequest(resContext);
ReservationSubmissionResponseInfo resRespInfo;
try {
resRespInfo =
callerUGI.doAs(
new PrivilegedExceptionAction<ReservationSubmissionResponseInfo>() {
@Override
public ReservationSubmissionResponseInfo run()
throws IOException, YarnException {
ReservationSubmissionResponse tempRes =
rm.getClientRMService().submitReservation(reservation);
return new ReservationSubmissionResponseInfo(tempRes);
}
});
} catch (UndeclaredThrowableException ue) {
if (ue.getCause() instanceof YarnException) {
throw new BadRequestException(ue.getCause().getMessage());
}
LOG.info("Submit reservation request failed", ue);
throw ue;
}
return Response.status(Status.OK).entity(resRespInfo).build();
}
private ReservationSubmissionRequest createReservationSubmissionRequest(
ReservationSubmissionRequestInfo resContext) {
// defending against a couple of common submission format problems
if (resContext == null) {
throw new BadRequestException(
"Input ReservationSubmissionContext should not be null");
}
ReservationDefinitionInfo resInfo = resContext.getReservationDefinition();
if (resInfo == null) {
throw new BadRequestException(
"Input ReservationDefinition should not be null");
}
ReservationRequestsInfo resReqsInfo = resInfo.getReservationRequests();
if (resReqsInfo == null || resReqsInfo.getReservationRequest() == null
|| resReqsInfo.getReservationRequest().size() == 0) {
throw new BadRequestException("The ReservationDefinition should"
+ " contain at least one ReservationRequest");
}
ReservationRequestInterpreter[] values =
ReservationRequestInterpreter.values();
ReservationRequestInterpreter resInt =
values[resReqsInfo.getReservationRequestsInterpreter()];
List<ReservationRequest> list = new ArrayList<ReservationRequest>();
for (ReservationRequestInfo resReqInfo : resReqsInfo
.getReservationRequest()) {
ResourceInfo rInfo = resReqInfo.getCapability();
Resource capability =
Resource.newInstance(rInfo.getMemory(), rInfo.getvCores());
int numContainers = resReqInfo.getNumContainers();
int minConcurrency = resReqInfo.getMinConcurrency();
long duration = resReqInfo.getDuration();
ReservationRequest rr =
ReservationRequest.newInstance(capability, numContainers,
minConcurrency, duration);
list.add(rr);
}
ReservationRequests reqs = ReservationRequests.newInstance(list, resInt);
ReservationDefinition rDef =
ReservationDefinition.newInstance(resInfo.getArrival(),
resInfo.getDeadline(), reqs, resInfo.getReservationName());
ReservationSubmissionRequest request =
ReservationSubmissionRequest.newInstance(rDef, resContext.getQueue());
return request;
}
/**
* Function to update a Reservation to the RM.
*
* @param resContext provides information to construct the
* ReservationUpdateRequest
* @param hsr the servlet request
* @return Response containing the status code
* @throws AuthorizationException
* @throws IOException
* @throws InterruptedException
*/
@POST
@Path("/reservation/update")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public Response updateReservation(ReservationUpdateRequestInfo resContext,
@Context HttpServletRequest hsr) throws AuthorizationException,
IOException, InterruptedException {
init();
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
if (callerUGI == null) {
throw new AuthorizationException("Unable to obtain user name, "
+ "user not authenticated");
}
if (UserGroupInformation.isSecurityEnabled() && isStaticUser(callerUGI)) {
String msg = "The default static user cannot carry out this operation.";
return Response.status(Status.FORBIDDEN).entity(msg).build();
}
final ReservationUpdateRequest reservation =
createReservationUpdateRequest(resContext);
ReservationUpdateResponseInfo resRespInfo;
try {
resRespInfo =
callerUGI.doAs(
new PrivilegedExceptionAction<ReservationUpdateResponseInfo>() {
@Override
public ReservationUpdateResponseInfo run() throws IOException,
YarnException {
rm.getClientRMService().updateReservation(reservation);
return new ReservationUpdateResponseInfo();
}
});
} catch (UndeclaredThrowableException ue) {
if (ue.getCause() instanceof YarnException) {
throw new BadRequestException(ue.getCause().getMessage());
}
LOG.info("Update reservation request failed", ue);
throw ue;
}
return Response.status(Status.OK).entity(resRespInfo).build();
}
private ReservationUpdateRequest createReservationUpdateRequest(
ReservationUpdateRequestInfo resContext) throws IOException {
// defending against a couple of common submission format problems
if (resContext == null) {
throw new BadRequestException(
"Input ReservationSubmissionContext should not be null");
}
ReservationDefinitionInfo resInfo = resContext.getReservationDefinition();
if (resInfo == null) {
throw new BadRequestException(
"Input ReservationDefinition should not be null");
}
ReservationRequestsInfo resReqsInfo = resInfo.getReservationRequests();
if (resReqsInfo == null || resReqsInfo.getReservationRequest() == null
|| resReqsInfo.getReservationRequest().size() == 0) {
throw new BadRequestException("The ReservationDefinition should"
+ " contain at least one ReservationRequest");
}
if (resContext.getReservationId() == null) {
throw new BadRequestException(
"Update operations must specify an existing ReservaitonId");
}
ReservationRequestInterpreter[] values =
ReservationRequestInterpreter.values();
ReservationRequestInterpreter resInt =
values[resReqsInfo.getReservationRequestsInterpreter()];
List<ReservationRequest> list = new ArrayList<ReservationRequest>();
for (ReservationRequestInfo resReqInfo : resReqsInfo
.getReservationRequest()) {
ResourceInfo rInfo = resReqInfo.getCapability();
Resource capability =
Resource.newInstance(rInfo.getMemory(), rInfo.getvCores());
int numContainers = resReqInfo.getNumContainers();
int minConcurrency = resReqInfo.getMinConcurrency();
long duration = resReqInfo.getDuration();
ReservationRequest rr =
ReservationRequest.newInstance(capability, numContainers,
minConcurrency, duration);
list.add(rr);
}
ReservationRequests reqs = ReservationRequests.newInstance(list, resInt);
ReservationDefinition rDef =
ReservationDefinition.newInstance(resInfo.getArrival(),
resInfo.getDeadline(), reqs, resInfo.getReservationName());
ReservationUpdateRequest request =
ReservationUpdateRequest.newInstance(rDef, ReservationId
.parseReservationId(resContext.getReservationId()));
return request;
}
/**
* Function to delete a Reservation to the RM.
*
* @param resContext provides information to construct
* the ReservationDeleteRequest
* @param hsr the servlet request
* @return Response containing the status code
* @throws AuthorizationException
* @throws IOException
* @throws InterruptedException
*/
@POST
@Path("/reservation/delete")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public Response deleteReservation(ReservationDeleteRequestInfo resContext,
@Context HttpServletRequest hsr) throws AuthorizationException,
IOException, InterruptedException {
init();
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
if (callerUGI == null) {
throw new AuthorizationException("Unable to obtain user name, "
+ "user not authenticated");
}
if (UserGroupInformation.isSecurityEnabled() && isStaticUser(callerUGI)) {
String msg = "The default static user cannot carry out this operation.";
return Response.status(Status.FORBIDDEN).entity(msg).build();
}
final ReservationDeleteRequest reservation =
createReservationDeleteRequest(resContext);
ReservationDeleteResponseInfo resRespInfo;
try {
resRespInfo =
callerUGI.doAs(
new PrivilegedExceptionAction<ReservationDeleteResponseInfo>() {
@Override
public ReservationDeleteResponseInfo run() throws IOException,
YarnException {
rm.getClientRMService().deleteReservation(reservation);
return new ReservationDeleteResponseInfo();
}
});
} catch (UndeclaredThrowableException ue) {
if (ue.getCause() instanceof YarnException) {
throw new BadRequestException(ue.getCause().getMessage());
}
LOG.info("Update reservation request failed", ue);
throw ue;
}
return Response.status(Status.OK).entity(resRespInfo).build();
}
private ReservationDeleteRequest createReservationDeleteRequest(
ReservationDeleteRequestInfo resContext) throws IOException {
ReservationDeleteRequest request =
ReservationDeleteRequest.newInstance(ReservationId
.parseReservationId(resContext.getReservationId()));
return request;
}
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
/**
* Simple class that represent a reservation definition.
*/
@XmlRootElement(name = "reservation-definition")
@XmlAccessorType(XmlAccessType.FIELD)
public class ReservationDefinitionInfo {
@XmlElement(name = "arrival")
private long arrival;
@XmlElement(name = "deadline")
private long deadline;
@XmlElement(name = "reservation-requests")
private ReservationRequestsInfo reservationRequests;
@XmlElement(name = "reservation-name")
private String reservationName;
public ReservationDefinitionInfo() {
}
public long getArrival() {
return arrival;
}
public void setArrival(long arrival) {
this.arrival = arrival;
}
public long getDeadline() {
return deadline;
}
public void setDeadline(long deadline) {
this.deadline = deadline;
}
public ReservationRequestsInfo getReservationRequests() {
return reservationRequests;
}
public void setReservationRequests(
ReservationRequestsInfo reservationRequests) {
this.reservationRequests = reservationRequests;
}
public String getReservationName() {
return reservationName;
}
public void setReservationName(String reservationName) {
this.reservationName = reservationName;
}
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
/**
* Simple class represent the request of deleting a given reservation,
* selected by its id.
*/
@XmlRootElement(name = "reservation-delete-context")
@XmlAccessorType(XmlAccessType.FIELD)
public class ReservationDeleteRequestInfo {
@XmlElement(name = "reservation-id")
private String reservationId;
public ReservationDeleteRequestInfo() {
reservationId = null;
}
public String getReservationId() {
return reservationId;
}
public void setReservationId(String reservationId) {
this.reservationId = reservationId;
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
/**
* Simple class that represent a reponse to a delete operation.
*/
@XmlRootElement(name = "reservation-delete-response")
@XmlAccessorType(XmlAccessType.FIELD)
public class ReservationDeleteResponseInfo {
public ReservationDeleteResponseInfo() {
}
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
/**
* Simple class representing a reservation request.
*/
@XmlRootElement(name = "reservation-definition")
@XmlAccessorType(XmlAccessType.FIELD)
public class ReservationRequestInfo {
@XmlElement(name = "capability")
private ResourceInfo capability;
@XmlElement(name = "min-concurrency")
private int minConcurrency;
@XmlElement(name = "num-containers")
private int numContainers;
@XmlElement(name = "duration")
private long duration;
public ReservationRequestInfo() {
}
public ResourceInfo getCapability() {
return capability;
}
public void setCapability(ResourceInfo capability) {
this.capability = capability;
}
public int getMinConcurrency() {
return minConcurrency;
}
public void setMinConcurrency(int minConcurrency) {
this.minConcurrency = minConcurrency;
}
public int getNumContainers() {
return numContainers;
}
public void setNumContainers(int numContainers) {
this.numContainers = numContainers;
}
public long getDuration() {
return duration;
}
public void setDuration(long duration) {
this.duration = duration;
}
}

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import java.util.ArrayList;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
/**
* Simple class representing a list of ReservationRequest and the
* interpreter which capture the semantic of this list (all/any/order).
*/
@XmlRootElement(name = "reservation-definition")
@XmlAccessorType(XmlAccessType.FIELD)
public class ReservationRequestsInfo {
@XmlElement(name = "reservation-request-interpreter")
private int reservationRequestsInterpreter;
@XmlElement(name = "reservation-request")
private ArrayList<ReservationRequestInfo> reservationRequest;
public ReservationRequestsInfo() {
}
public int getReservationRequestsInterpreter() {
return reservationRequestsInterpreter;
}
public void setReservationRequestsInterpreter(
int reservationRequestsInterpreter) {
this.reservationRequestsInterpreter = reservationRequestsInterpreter;
}
public ArrayList<ReservationRequestInfo> getReservationRequest() {
return reservationRequest;
}
public void setReservationRequest(
ArrayList<ReservationRequestInfo> reservationRequest) {
this.reservationRequest = reservationRequest;
}
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
/**
* Simple class to allow users to send information required to create an
* ReservationSubmissionContext which can then be used to submit a reservation.
*/
@XmlRootElement(name = "reservation-submission-context")
@XmlAccessorType(XmlAccessType.FIELD)
public class ReservationSubmissionRequestInfo {
@XmlElement(name = "queue")
private String queue;
@XmlElement(name = "reservation-definition")
private ReservationDefinitionInfo reservationDefinition;
public ReservationSubmissionRequestInfo() {
}
public String getQueue() {
return queue;
}
public void setQueue(String queue) {
this.queue = queue;
}
public ReservationDefinitionInfo getReservationDefinition() {
return reservationDefinition;
}
public void setReservationDefinition(
ReservationDefinitionInfo reservationDefinition) {
this.reservationDefinition = reservationDefinition;
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
/**
* Simple class that represent a response to a reservation submission.
*/
@XmlRootElement(name = "reservation-submission-response")
@XmlAccessorType(XmlAccessType.FIELD)
public class ReservationSubmissionResponseInfo {
@XmlElement(name = "reservation-id")
private String reservationId;
public ReservationSubmissionResponseInfo() {
}
public ReservationSubmissionResponseInfo(
ReservationSubmissionResponse response) {
this.reservationId = response.getReservationId().toString();
}
public String getReservationId() {
return reservationId;
}
public void setReservationId(String reservationId) {
this.reservationId = reservationId;
}
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
/**
* Simple class to allow users to send information required to update an
* existing reservation.
*/
@XmlRootElement(name = "reservation-update-context")
@XmlAccessorType(XmlAccessType.FIELD)
public class ReservationUpdateRequestInfo {
@XmlElement(name = "reservation-id")
private String reservationId;
@XmlElement(name = "reservation-definition")
private ReservationDefinitionInfo reservationDefinition;
public ReservationUpdateRequestInfo() {
}
public String getReservationId() {
return reservationId;
}
public void setReservationId(String reservationId) {
this.reservationId = reservationId;
}
public ReservationDefinitionInfo getReservationDefinition() {
return reservationDefinition;
}
public void setReservationDefinition(
ReservationDefinitionInfo reservationDefinition) {
this.reservationDefinition = reservationDefinition;
}
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
/**
* Simple class that represent the response to a reservation update
* request.
*/
@XmlRootElement(name = "reservation-update-response")
@XmlAccessorType(XmlAccessType.FIELD)
public class ReservationUpdateResponseInfo {
public ReservationUpdateResponseInfo() {
}
}

View File

@ -0,0 +1,517 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringReader;
import java.net.URL;
import java.util.Arrays;
import java.util.Collection;
import java.util.Enumeration;
import java.util.Properties;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.ws.rs.core.MediaType;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Singleton;
import com.google.inject.servlet.GuiceServletContextListener;
import com.google.inject.servlet.ServletModule;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.api.json.JSONConfiguration;
import com.sun.jersey.api.json.JSONJAXBContext;
import com.sun.jersey.api.json.JSONUnmarshaller;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import com.sun.jersey.test.framework.WebAppDescriptor;
@RunWith(Parameterized.class)
public class TestRMWebServicesReservation extends JerseyTestBase {
private static MockRM rm;
private static Injector injector;
private String webserviceUserName = "testuser";
private boolean setAuthFilter = false;
private static final String TEST_DIR = new File(System.getProperty(
"test.build.data", "/tmp")).getAbsolutePath();
private static final String FS_ALLOC_FILE = new File(TEST_DIR,
"test-fs-queues.xml").getAbsolutePath();
public static class GuiceServletConfig extends GuiceServletContextListener {
@Override
protected Injector getInjector() {
return injector;
}
}
/*
* Helper class to allow testing of RM web services which require
* authorization Add this class as a filter in the Guice injector for the
* MockRM
*/
@Singleton
public static class TestRMCustomAuthFilter extends AuthenticationFilter {
@Override
protected Properties getConfiguration(String configPrefix,
FilterConfig filterConfig) throws ServletException {
Properties props = new Properties();
Enumeration<?> names = filterConfig.getInitParameterNames();
while (names.hasMoreElements()) {
String name = (String) names.nextElement();
if (name.startsWith(configPrefix)) {
String value = filterConfig.getInitParameter(name);
props.put(name.substring(configPrefix.length()), value);
}
}
props.put(AuthenticationFilter.AUTH_TYPE, "simple");
props.put(PseudoAuthenticationHandler.ANONYMOUS_ALLOWED, "false");
return props;
}
}
private abstract class TestServletModule extends ServletModule {
public Configuration conf = new Configuration();
public abstract void configureScheduler();
@Override
protected void configureServlets() {
configureScheduler();
bind(JAXBContextResolver.class);
bind(RMWebServices.class);
bind(GenericExceptionHandler.class);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
CapacitySchedulerConfiguration csconf =
new CapacitySchedulerConfiguration(conf);
String[] queues = { "default", "dedicated" };
csconf.setQueues("root", queues);
csconf.setCapacity("root.default", 50.0f);
csconf.setCapacity("root.dedicated", 50.0f);
csconf.setReservable("root.dedicated", true);
rm = new MockRM(csconf);
bind(ResourceManager.class).toInstance(rm);
if (setAuthFilter) {
filter("/*").through(TestRMCustomAuthFilter.class);
}
serve("/*").with(GuiceContainer.class);
}
}
private class CapTestServletModule extends TestServletModule {
@Override
public void configureScheduler() {
conf.set("yarn.resourcemanager.scheduler.class",
CapacityScheduler.class.getName());
}
}
private class FairTestServletModule extends TestServletModule {
@Override
public void configureScheduler() {
try {
PrintWriter out = new PrintWriter(new FileWriter(FS_ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"root\">");
out.println(" <aclAdministerApps>someuser </aclAdministerApps>");
out.println(" <queue name=\"default\">");
out.println(" <aclAdministerApps>someuser </aclAdministerApps>");
out.println(" </queue>");
out.println(" <queue name=\"dedicated\">");
out.println(" <aclAdministerApps>someuser </aclAdministerApps>");
out.println(" </queue>");
out.println("</queue>");
out.println("</allocations>");
out.close();
} catch (IOException e) {
}
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, FS_ALLOC_FILE);
conf.set("yarn.resourcemanager.scheduler.class",
FairScheduler.class.getName());
}
}
private Injector getNoAuthInjectorCap() {
return Guice.createInjector(new CapTestServletModule() {
@Override
protected void configureServlets() {
setAuthFilter = false;
super.configureServlets();
}
});
}
private Injector getSimpleAuthInjectorCap() {
return Guice.createInjector(new CapTestServletModule() {
@Override
protected void configureServlets() {
setAuthFilter = true;
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
// set the admin acls otherwise all users are considered admins
// and we can't test authorization
conf.setStrings(YarnConfiguration.YARN_ADMIN_ACL, "testuser1");
super.configureServlets();
}
});
}
private Injector getNoAuthInjectorFair() {
return Guice.createInjector(new FairTestServletModule() {
@Override
protected void configureServlets() {
setAuthFilter = false;
super.configureServlets();
}
});
}
private Injector getSimpleAuthInjectorFair() {
return Guice.createInjector(new FairTestServletModule() {
@Override
protected void configureServlets() {
setAuthFilter = true;
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
// set the admin acls otherwise all users are considered admins
// and we can't test authorization
conf.setStrings(YarnConfiguration.YARN_ADMIN_ACL, "testuser1");
super.configureServlets();
}
});
}
@Parameters
public static Collection<Object[]> guiceConfigs() {
return Arrays.asList(new Object[][] { { 0 }, { 1 }, { 2 }, { 3 } });
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
}
public TestRMWebServicesReservation(int run) {
super(new WebAppDescriptor.Builder(
"org.apache.hadoop.yarn.server.resourcemanager.webapp")
.contextListenerClass(GuiceServletConfig.class)
.filterClass(com.google.inject.servlet.GuiceFilter.class)
.clientConfig(new DefaultClientConfig(JAXBContextResolver.class))
.contextPath("jersey-guice-filter").servletPath("/").build());
switch (run) {
case 0:
default:
// No Auth Capacity Scheduler
injector = getNoAuthInjectorCap();
break;
case 1:
// Simple Auth Capacity Scheduler
injector = getSimpleAuthInjectorCap();
break;
case 2:
// No Auth Fair Scheduler
injector = getNoAuthInjectorFair();
break;
case 3:
// Simple Auth Fair Scheduler
injector = getSimpleAuthInjectorFair();
break;
}
}
private boolean isAuthenticationEnabled() {
return setAuthFilter;
}
private WebResource constructWebResource(WebResource r, String... paths) {
WebResource rt = r;
for (String path : paths) {
rt = rt.path(path);
}
if (isAuthenticationEnabled()) {
rt = rt.queryParam("user.name", webserviceUserName);
}
return rt;
}
private WebResource constructWebResource(String... paths) {
WebResource r = resource();
WebResource ws = r.path("ws").path("v1").path("cluster");
return this.constructWebResource(ws, paths);
}
@After
@Override
public void tearDown() throws Exception {
if (rm != null) {
rm.stop();
}
super.tearDown();
}
@Test
public void testSubmitReservation() throws JSONException, Exception {
rm.start();
for (int i = 0; i < 100; i++) {
MockNM amNodeManager =
rm.registerNode("127.0.0." + i + ":1234", 100 * 1024);
amNodeManager.nodeHeartbeat(true);
}
ReservationId rid =
testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON);
if (this.isAuthenticationEnabled()) {
assertNotNull(rid);
}
rm.stop();
}
@Test
public void testFailedSubmitReservation() throws JSONException, Exception {
rm.start();
// setup a cluster too small to accept the reservation
for (int i = 0; i < 1; i++) {
MockNM amNodeManager =
rm.registerNode("127.0.0." + i + ":1234", 100 * 1024);
amNodeManager.nodeHeartbeat(true);
}
ReservationId rid =
testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON);
assertNull(rid);
rm.stop();
}
@Test
public void testUpdateReservation() throws JSONException, Exception {
rm.start();
for (int i = 0; i < 100; i++) {
MockNM amNodeManager =
rm.registerNode("127.0.0." + i + ":1234", 100 * 1024);
amNodeManager.nodeHeartbeat(true);
}
ReservationId rid =
testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON);
if (this.isAuthenticationEnabled()) {
assertNotNull(rid);
}
testUpdateReservationHelper("reservation/update", rid,
MediaType.APPLICATION_JSON);
rm.stop();
}
@Test
public void testDeleteReservation() throws JSONException, Exception {
rm.start();
for (int i = 0; i < 100; i++) {
MockNM amNodeManager =
rm.registerNode("127.0.0." + i + ":1234", 100 * 1024);
amNodeManager.nodeHeartbeat(true);
}
ReservationId rid =
testSubmissionReservationHelper("reservation/submit",
MediaType.APPLICATION_JSON);
if (this.isAuthenticationEnabled()) {
assertNotNull(rid);
}
testDeleteReservationHelper("reservation/delete", rid,
MediaType.APPLICATION_JSON);
rm.stop();
}
private ReservationId testSubmissionReservationHelper(String path,
String media) throws JSONException, Exception {
String reservationJson = loadJsonFile("submit-reservation.json");
JSONJAXBContext jc =
new JSONJAXBContext(JSONConfiguration.mapped()
.build(), ReservationSubmissionRequestInfo.class);
JSONUnmarshaller unmarshaller = jc.createJSONUnmarshaller();
ReservationSubmissionRequestInfo rsci =
unmarshaller.unmarshalFromJSON(new StringReader(reservationJson),
ReservationSubmissionRequestInfo.class);
Thread.sleep(1000);
ClientResponse response =
constructWebResource(path).entity(rsci, MediaType.APPLICATION_JSON)
.accept(media).post(ClientResponse.class);
if (!this.isAuthenticationEnabled()) {
assertEquals(Status.UNAUTHORIZED, response.getClientResponseStatus());
return null;
}
System.out.println("RESPONSE:" + response);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
ReservationId rid = null;
try {
rid = ReservationId.parseReservationId(json.getString("reservation-id"));
assertEquals("incorrect return value", rid.getId(), 1);
} catch (JSONException j) {
// failure is possible and is checked outside
}
return rid;
}
private void testUpdateReservationHelper(String path,
ReservationId reservationId, String media) throws JSONException,
Exception {
String reservationJson = loadJsonFile("update-reservation.json");
JSONJAXBContext jc =
new JSONJAXBContext(JSONConfiguration.mapped()
.build(), ReservationUpdateRequestInfo.class);
JSONUnmarshaller unmarshaller = jc.createJSONUnmarshaller();
ReservationUpdateRequestInfo rsci =
unmarshaller.unmarshalFromJSON(new StringReader(reservationJson),
ReservationUpdateRequestInfo.class);
if (this.isAuthenticationEnabled()) {
// only works when previous submit worked
if(rsci.getReservationId() == null) {
throw new IOException("Incorrectly parsed the reservatinId");
}
rsci.setReservationId(reservationId.toString());
}
Thread.sleep(1000);
ClientResponse response =
constructWebResource(path).entity(rsci, MediaType.APPLICATION_JSON)
.accept(media).post(ClientResponse.class);
if (!this.isAuthenticationEnabled()) {
assertEquals(Status.UNAUTHORIZED, response.getClientResponseStatus());
return;
}
System.out.println("RESPONSE:" + response);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
assertEquals(Status.OK, response.getClientResponseStatus());
}
private String loadJsonFile(String filename) throws IOException {
ClassLoader cL = Thread.currentThread().getContextClassLoader();
if (cL == null) {
cL = Configuration.class.getClassLoader();
}
URL submitURI = cL.getResource(filename);
String reservationJson =
FileUtils.readFileToString(new File(submitURI.getFile()));
return reservationJson;
}
private void testDeleteReservationHelper(String path,
ReservationId reservationId, String media) throws JSONException,
Exception {
String reservationJson = loadJsonFile("delete-reservation.json");
JSONJAXBContext jc =
new JSONJAXBContext(JSONConfiguration.mapped()
.build(), ReservationDeleteRequestInfo.class);
JSONUnmarshaller unmarshaller = jc.createJSONUnmarshaller();
ReservationDeleteRequestInfo rsci =
unmarshaller.unmarshalFromJSON(new StringReader(reservationJson),
ReservationDeleteRequestInfo.class);
if (this.isAuthenticationEnabled()) {
// only works when previous submit worked
if(rsci.getReservationId() == null) {
throw new IOException("Incorrectly parsed the reservatinId");
}
rsci.setReservationId(reservationId.toString());
}
Thread.sleep(1000);
ClientResponse response =
constructWebResource(path).entity(rsci, MediaType.APPLICATION_JSON)
.accept(media).post(ClientResponse.class);
if (!this.isAuthenticationEnabled()) {
assertEquals(Status.UNAUTHORIZED, response.getClientResponseStatus());
return;
}
System.out.println("RESPONSE:" + response);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
assertEquals(Status.OK, response.getClientResponseStatus());
}
}

View File

@ -0,0 +1,3 @@
{
"reservation-id" : "reservation_12341234_1"
}

View File

@ -0,0 +1,31 @@
{
"queue" : "dedicated",
"reservation-definition" : {
"arrival" : 1765541532000,
"deadline" : 1765542252000,
"reservation-name" : "res_1",
"reservation-requests" : {
"reservation-request-interpreter" : 0,
"reservation-request" : [
{
"duration" : 60,
"num-containers" : 220,
"min-concurrency" : 220,
"capability" : {
"memory" : 1024,
"vCores" : 1
}
},
{
"duration" : 120,
"num-containers" : 110,
"min-concurrency" : 110,
"capability" : {
"memory" : 1024,
"vCores" : 1
}
}
]
}
}
}

View File

@ -0,0 +1,31 @@
{
"reservation-id" : "reservation_12341234_1",
"reservation-definition" : {
"arrival" : 1765541532000,
"deadline" : 1765542252000,
"reservation-name" : "res_1",
"reservation-requests" : {
"reservation-request-interpreter" : 0,
"reservation-request" : [
{
"duration" : 60,
"num-containers" : 100,
"min-concurrency" : 1,
"capability" : {
"memory" : 1024,
"vCores" : 1
}
},
{
"duration" : 120,
"num-containers" : 40,
"min-concurrency" : 1,
"capability" : {
"memory" : 1024,
"vCores" : 1
}
}
]
}
}
}