YARN-5552. Add Builder methods for common yarn API records. (Tao Jie via wangda)

This commit is contained in:
Wangda Tan 2016-11-11 13:34:56 -08:00
parent aa6010ccca
commit ede1a473f5
8 changed files with 657 additions and 95 deletions

View File

@ -66,8 +66,10 @@ public abstract class AllocateRequest {
List<ResourceRequest> resourceAsk,
List<ContainerId> containersToBeReleased,
ResourceBlacklistRequest resourceBlacklistRequest) {
return newInstance(responseID, appProgress, resourceAsk,
containersToBeReleased, resourceBlacklistRequest, null);
return AllocateRequest.newBuilder().responseId(responseID)
.progress(appProgress).askList(resourceAsk)
.releaseList(containersToBeReleased)
.resourceBlacklistRequest(resourceBlacklistRequest).build();
}
@Public
@ -77,14 +79,12 @@ public abstract class AllocateRequest {
List<ContainerId> containersToBeReleased,
ResourceBlacklistRequest resourceBlacklistRequest,
List<UpdateContainerRequest> updateRequests) {
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
allocateRequest.setResponseId(responseID);
allocateRequest.setProgress(appProgress);
allocateRequest.setAskList(resourceAsk);
allocateRequest.setReleaseList(containersToBeReleased);
allocateRequest.setResourceBlacklistRequest(resourceBlacklistRequest);
allocateRequest.setUpdateRequests(updateRequests);
return allocateRequest;
return AllocateRequest.newBuilder().responseId(responseID)
.progress(appProgress).askList(resourceAsk)
.releaseList(containersToBeReleased)
.resourceBlacklistRequest(resourceBlacklistRequest)
.updateRequests(updateRequests)
.build();
}
/**
@ -211,4 +211,116 @@ public abstract class AllocateRequest {
@Unstable
public abstract void setUpdateRequests(
List<UpdateContainerRequest> updateRequests);
@Public
@Unstable
public static AllocateRequestBuilder newBuilder() {
return new AllocateRequestBuilder();
}
/**
* Class to construct instances of {@link AllocateRequest} with specific
* options.
*/
@Public
@Stable
public static final class AllocateRequestBuilder {
private AllocateRequest allocateRequest =
Records.newRecord(AllocateRequest.class);
private AllocateRequestBuilder() {
}
/**
* Set the <code>responseId</code> of the request.
* @see AllocateRequest#setResponseId(int)
* @param responseId <code>responseId</code> of the request
* @return {@link AllocateRequestBuilder}
*/
@Public
@Stable
public AllocateRequestBuilder responseId(int responseId) {
allocateRequest.setResponseId(responseId);
return this;
}
/**
* Set the <code>progress</code> of the request.
* @see AllocateRequest#setProgress(float)
* @param progress <code>progress</code> of the request
* @return {@link AllocateRequestBuilder}
*/
@Public
@Stable
public AllocateRequestBuilder progress(float progress) {
allocateRequest.setProgress(progress);
return this;
}
/**
* Set the <code>askList</code> of the request.
* @see AllocateRequest#setAskList(List)
* @param askList <code>askList</code> of the request
* @return {@link AllocateRequestBuilder}
*/
@Public
@Stable
public AllocateRequestBuilder askList(List<ResourceRequest> askList) {
allocateRequest.setAskList(askList);
return this;
}
/**
* Set the <code>releaseList</code> of the request.
* @see AllocateRequest#setReleaseList(List)
* @param releaseList <code>releaseList</code> of the request
* @return {@link AllocateRequestBuilder}
*/
@Public
@Stable
public AllocateRequestBuilder releaseList(List<ContainerId> releaseList) {
allocateRequest.setReleaseList(releaseList);
return this;
}
/**
* Set the <code>resourceBlacklistRequest</code> of the request.
* @see AllocateRequest#setResourceBlacklistRequest(
* ResourceBlacklistRequest)
* @param resourceBlacklistRequest
* <code>resourceBlacklistRequest</code> of the request
* @return {@link AllocateRequestBuilder}
*/
@Public
@Stable
public AllocateRequestBuilder resourceBlacklistRequest(
ResourceBlacklistRequest resourceBlacklistRequest) {
allocateRequest.setResourceBlacklistRequest(resourceBlacklistRequest);
return this;
}
/**
* Set the <code>updateRequests</code> of the request.
* @see AllocateRequest#setUpdateRequests(List)
* @param updateRequests <code>updateRequests</code> of the request
* @return {@link AllocateRequestBuilder}
*/
@Public
@Unstable
public AllocateRequestBuilder updateRequests(
List<UpdateContainerRequest> updateRequests) {
allocateRequest.setUpdateRequests(updateRequests);
return this;
}
/**
* Return generated {@link AllocateRequest} object.
* @return {@link AllocateRequest}
*/
@Public
@Stable
public AllocateRequest build() {
return allocateRequest;
}
}
}

View File

@ -84,17 +84,12 @@ public abstract class AllocateResponse {
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
Resource availResources, AMCommand command, int numClusterNodes,
PreemptionMessage preempt, List<NMToken> nmTokens) {
AllocateResponse response = Records.newRecord(AllocateResponse.class);
response.setNumClusterNodes(numClusterNodes);
response.setResponseId(responseId);
response.setCompletedContainersStatuses(completedContainers);
response.setAllocatedContainers(allocatedContainers);
response.setUpdatedNodes(updatedNodes);
response.setAvailableResources(availResources);
response.setAMCommand(command);
response.setPreemptionMessage(preempt);
response.setNMTokens(nmTokens);
return response;
return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes)
.responseId(responseId)
.completedContainersStatuses(completedContainers)
.allocatedContainers(allocatedContainers).updatedNodes(updatedNodes)
.availableResources(availResources).amCommand(command)
.preemptionMessage(preempt).nmTokens(nmTokens).build();
}
@Public
@ -105,11 +100,13 @@ public abstract class AllocateResponse {
Resource availResources, AMCommand command, int numClusterNodes,
PreemptionMessage preempt, List<NMToken> nmTokens,
List<UpdatedContainer> updatedContainers) {
AllocateResponse response = newInstance(responseId, completedContainers,
allocatedContainers, updatedNodes, availResources, command,
numClusterNodes, preempt, nmTokens);
response.setUpdatedContainers(updatedContainers);
return response;
return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes)
.responseId(responseId)
.completedContainersStatuses(completedContainers)
.allocatedContainers(allocatedContainers).updatedNodes(updatedNodes)
.availableResources(availResources).amCommand(command)
.preemptionMessage(preempt).nmTokens(nmTokens)
.updatedContainers(updatedContainers).build();
}
@Private
@ -120,12 +117,13 @@ public abstract class AllocateResponse {
Resource availResources, AMCommand command, int numClusterNodes,
PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken,
List<UpdatedContainer> updatedContainers) {
AllocateResponse response =
newInstance(responseId, completedContainers, allocatedContainers,
updatedNodes, availResources, command, numClusterNodes, preempt,
nmTokens, updatedContainers);
response.setAMRMToken(amRMToken);
return response;
return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes)
.responseId(responseId)
.completedContainersStatuses(completedContainers)
.allocatedContainers(allocatedContainers).updatedNodes(updatedNodes)
.availableResources(availResources).amCommand(command)
.preemptionMessage(preempt).nmTokens(nmTokens)
.updatedContainers(updatedContainers).amRmToken(amRMToken).build();
}
@Public
@ -136,13 +134,14 @@ public abstract class AllocateResponse {
Resource availResources, AMCommand command, int numClusterNodes,
PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken,
List<UpdatedContainer> updatedContainers, String collectorAddr) {
AllocateResponse response =
newInstance(responseId, completedContainers, allocatedContainers,
updatedNodes, availResources, command, numClusterNodes, preempt,
nmTokens, updatedContainers);
response.setAMRMToken(amRMToken);
response.setCollectorAddr(collectorAddr);
return response;
return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes)
.responseId(responseId)
.completedContainersStatuses(completedContainers)
.allocatedContainers(allocatedContainers).updatedNodes(updatedNodes)
.availableResources(availResources).amCommand(command)
.preemptionMessage(preempt).nmTokens(nmTokens)
.updatedContainers(updatedContainers).amRmToken(amRMToken)
.collectorAddr(collectorAddr).build();
}
/**
@ -370,4 +369,230 @@ public abstract class AllocateResponse {
@Unstable
public void setUpdateErrors(List<UpdateContainerError> updateErrors) {
}
@Private
@Unstable
public static AllocateResponseBuilder newBuilder() {
return new AllocateResponseBuilder();
}
/**
* Class to construct instances of {@link AllocateResponse} with specific
* options.
*/
@Private
@Unstable
public static final class AllocateResponseBuilder {
private AllocateResponse allocateResponse =
Records.newRecord(AllocateResponse.class);
private AllocateResponseBuilder() {
allocateResponse.setApplicationPriority(Priority.newInstance(0));
}
/**
* Set the <code>amCommand</code> of the response.
* @see AllocateResponse#setAMCommand(AMCommand)
* @param amCommand <code>amCommand</code> of the response
* @return {@link AllocateResponseBuilder}
*/
@Private
@Unstable
public AllocateResponseBuilder amCommand(AMCommand amCommand) {
allocateResponse.setAMCommand(amCommand);
return this;
}
/**
* Set the <code>responseId</code> of the response.
* @see AllocateResponse#setResponseId(int)
* @param responseId <code>responseId</code> of the response
* @return {@link AllocateResponseBuilder}
*/
@Private
@Unstable
public AllocateResponseBuilder responseId(int responseId) {
allocateResponse.setResponseId(responseId);
return this;
}
/**
* Set the <code>allocatedContainers</code> of the response.
* @see AllocateResponse#setAllocatedContainers(List)
* @param allocatedContainers
* <code>allocatedContainers</code> of the response
* @return {@link AllocateResponseBuilder}
*/
@Private
@Unstable
public AllocateResponseBuilder allocatedContainers(
List<Container> allocatedContainers) {
allocateResponse.setAllocatedContainers(allocatedContainers);
return this;
}
/**
* Set the <code>availableResources</code> of the response.
* @see AllocateResponse#setAvailableResources(Resource)
* @param availableResources
* <code>availableResources</code> of the response
* @return {@link AllocateResponseBuilder}
*/
@Private
@Unstable
public AllocateResponseBuilder availableResources(
Resource availableResources) {
allocateResponse.setAvailableResources(availableResources);
return this;
}
/**
* Set the <code>completedContainersStatuses</code> of the response.
* @see AllocateResponse#setCompletedContainersStatuses(List)
* @param completedContainersStatuses
* <code>completedContainersStatuses</code> of the response
* @return {@link AllocateResponseBuilder}
*/
@Private
@Unstable
public AllocateResponseBuilder completedContainersStatuses(
List<ContainerStatus> completedContainersStatuses) {
allocateResponse
.setCompletedContainersStatuses(completedContainersStatuses);
return this;
}
/**
* Set the <code>updatedNodes</code> of the response.
* @see AllocateResponse#setUpdatedNodes(List)
* @param updatedNodes <code>updatedNodes</code> of the response
* @return {@link AllocateResponseBuilder}
*/
@Private
@Unstable
public AllocateResponseBuilder updatedNodes(
List<NodeReport> updatedNodes) {
allocateResponse.setUpdatedNodes(updatedNodes);
return this;
}
/**
* Set the <code>numClusterNodes</code> of the response.
* @see AllocateResponse#setNumClusterNodes(int)
* @param numClusterNodes <code>numClusterNodes</code> of the response
* @return {@link AllocateResponseBuilder}
*/
@Private
@Unstable
public AllocateResponseBuilder numClusterNodes(int numClusterNodes) {
allocateResponse.setNumClusterNodes(numClusterNodes);
return this;
}
/**
* Set the <code>preemptionMessage</code> of the response.
* @see AllocateResponse#setPreemptionMessage(PreemptionMessage)
* @param preemptionMessage <code>preemptionMessage</code> of the response
* @return {@link AllocateResponseBuilder}
*/
@Private
@Unstable
public AllocateResponseBuilder preemptionMessage(
PreemptionMessage preemptionMessage) {
allocateResponse.setPreemptionMessage(preemptionMessage);
return this;
}
/**
* Set the <code>nmTokens</code> of the response.
* @see AllocateResponse#setNMTokens(List)
* @param nmTokens <code>nmTokens</code> of the response
* @return {@link AllocateResponseBuilder}
*/
@Private
@Unstable
public AllocateResponseBuilder nmTokens(List<NMToken> nmTokens) {
allocateResponse.setNMTokens(nmTokens);
return this;
}
/**
* Set the <code>updatedContainers</code> of the response.
* @see AllocateResponse#setUpdatedContainers(List)
* @param updatedContainers <code>updatedContainers</code> of the response
* @return {@link AllocateResponseBuilder}
*/
@Private
@Unstable
public AllocateResponseBuilder updatedContainers(
List<UpdatedContainer> updatedContainers) {
allocateResponse.setUpdatedContainers(updatedContainers);
return this;
}
/**
* Set the <code>amRmToken</code> of the response.
* @see AllocateResponse#setAMRMToken(Token)
* @param amRmToken <code>amRmToken</code> of the response
* @return {@link AllocateResponseBuilder}
*/
@Private
@Unstable
public AllocateResponseBuilder amRmToken(Token amRmToken) {
allocateResponse.setAMRMToken(amRmToken);
return this;
}
/**
* Set the <code>applicationPriority</code> of the response.
* @see AllocateResponse#setApplicationPriority(Priority)
* @param applicationPriority
* <code>applicationPriority</code> of the response
* @return {@link AllocateResponseBuilder}
*/
@Private
@Unstable
public AllocateResponseBuilder applicationPriority(
Priority applicationPriority) {
allocateResponse.setApplicationPriority(applicationPriority);
return this;
}
/**
* Set the <code>collectorAddr</code> of the response.
* @see AllocateResponse#setCollectorAddr(String)
* @param collectorAddr <code>collectorAddr</code> of the response
* @return {@link AllocateResponseBuilder}
*/
@Private
@Unstable
public AllocateResponseBuilder collectorAddr(String collectorAddr) {
allocateResponse.setCollectorAddr(collectorAddr);
return this;
}
/**
* Set the <code>updateErrors</code> of the response.
* @see AllocateResponse#setUpdateErrors(List)
* @param updateErrors <code>updateErrors</code> of the response
* @return {@link AllocateResponseBuilder}
*/
@Private
@Unstable
public AllocateResponseBuilder updateErrors(
List<UpdateContainerError> updateErrors) {
allocateResponse.setUpdateErrors(updateErrors);
return this;
}
/**
* Return generated {@link AllocateResponse} object.
* @return {@link AllocateResponse}
*/
@Private
@Unstable
public AllocateResponse build() {
return allocateResponse;
}
}
}

View File

@ -23,6 +23,7 @@ import java.io.Serializable;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.util.Records;
@ -63,15 +64,18 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
@Stable
public static ResourceRequest newInstance(Priority priority, String hostName,
Resource capability, int numContainers) {
return newInstance(priority, hostName, capability, numContainers, true);
return ResourceRequest.newBuilder().priority(priority)
.resourceName(hostName).capability(capability)
.numContainers(numContainers).build();
}
@Public
@Stable
public static ResourceRequest newInstance(Priority priority, String hostName,
Resource capability, int numContainers, boolean relaxLocality) {
return newInstance(priority, hostName, capability, numContainers,
relaxLocality, null);
return ResourceRequest.newBuilder().priority(priority)
.resourceName(hostName).capability(capability)
.numContainers(numContainers).relaxLocality(relaxLocality).build();
}
@Public
@ -79,8 +83,10 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
public static ResourceRequest newInstance(Priority priority, String hostName,
Resource capability, int numContainers, boolean relaxLocality,
String labelExpression) {
return newInstance(priority, hostName, capability, numContainers,
relaxLocality, labelExpression, ExecutionTypeRequest.newInstance());
return ResourceRequest.newBuilder().priority(priority)
.resourceName(hostName).capability(capability)
.numContainers(numContainers).relaxLocality(relaxLocality)
.nodeLabelExpression(labelExpression).build();
}
@Public
@ -88,15 +94,158 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
public static ResourceRequest newInstance(Priority priority, String hostName,
Resource capability, int numContainers, boolean relaxLocality, String
labelExpression, ExecutionTypeRequest executionTypeRequest) {
ResourceRequest request = Records.newRecord(ResourceRequest.class);
request.setPriority(priority);
request.setResourceName(hostName);
request.setCapability(capability);
request.setNumContainers(numContainers);
request.setRelaxLocality(relaxLocality);
request.setNodeLabelExpression(labelExpression);
request.setExecutionTypeRequest(executionTypeRequest);
return request;
return ResourceRequest.newBuilder().priority(priority)
.resourceName(hostName).capability(capability)
.numContainers(numContainers).relaxLocality(relaxLocality)
.nodeLabelExpression(labelExpression)
.executionTypeRequest(executionTypeRequest).build();
}
@Public
@Unstable
public static ResourceRequestBuilder newBuilder() {
return new ResourceRequestBuilder();
}
/**
* Class to construct instances of {@link ResourceRequest} with specific
* options.
*/
@Public
@Stable
public static final class ResourceRequestBuilder {
private ResourceRequest resourceRequest =
Records.newRecord(ResourceRequest.class);
private ResourceRequestBuilder() {
resourceRequest.setResourceName(ANY);
resourceRequest.setNumContainers(1);
resourceRequest.setPriority(Priority.newInstance(0));
resourceRequest.setRelaxLocality(true);
resourceRequest.setExecutionTypeRequest(
ExecutionTypeRequest.newInstance());
}
/**
* Set the <code>priority</code> of the request.
* @see ResourceRequest#setPriority(Priority)
* @param priority <code>priority</code> of the request
* @return {@link ResourceRequestBuilder}
*/
@Public
@Stable
public ResourceRequestBuilder priority(Priority priority) {
resourceRequest.setPriority(priority);
return this;
}
/**
* Set the <code>resourceName</code> of the request.
* @see ResourceRequest#setResourceName(String)
* @param resourceName <code>resourceName</code> of the request
* @return {@link ResourceRequestBuilder}
*/
@Public
@Stable
public ResourceRequestBuilder resourceName(String resourceName) {
resourceRequest.setResourceName(resourceName);
return this;
}
/**
* Set the <code>capability</code> of the request.
* @see ResourceRequest#setCapability(Resource)
* @param capability <code>capability</code> of the request
* @return {@link ResourceRequestBuilder}
*/
@Public
@Stable
public ResourceRequestBuilder capability(Resource capability) {
resourceRequest.setCapability(capability);
return this;
}
/**
* Set the <code>numContainers</code> of the request.
* @see ResourceRequest#setNumContainers(int)
* @param numContainers <code>numContainers</code> of the request
* @return {@link ResourceRequestBuilder}
*/
@Public
@Stable
public ResourceRequestBuilder numContainers(int numContainers) {
resourceRequest.setNumContainers(numContainers);
return this;
}
/**
* Set the <code>relaxLocality</code> of the request.
* @see ResourceRequest#setRelaxLocality(boolean)
* @param relaxLocality <code>relaxLocality</code> of the request
* @return {@link ResourceRequestBuilder}
*/
@Public
@Stable
public ResourceRequestBuilder relaxLocality(boolean relaxLocality) {
resourceRequest.setRelaxLocality(relaxLocality);
return this;
}
/**
* Set the <code>nodeLabelExpression</code> of the request.
* @see ResourceRequest#setNodeLabelExpression(String)
* @param nodeLabelExpression
* <code>nodeLabelExpression</code> of the request
* @return {@link ResourceRequestBuilder}
*/
@Public
@Evolving
public ResourceRequestBuilder nodeLabelExpression(
String nodeLabelExpression) {
resourceRequest.setNodeLabelExpression(nodeLabelExpression);
return this;
}
/**
* Set the <code>executionTypeRequest</code> of the request.
* @see ResourceRequest#setExecutionTypeRequest(
* ExecutionTypeRequest)
* @param executionTypeRequest
* <code>executionTypeRequest</code> of the request
* @return {@link ResourceRequestBuilder}
*/
@Public
@Evolving
public ResourceRequestBuilder executionTypeRequest(
ExecutionTypeRequest executionTypeRequest) {
resourceRequest.setExecutionTypeRequest(executionTypeRequest);
return this;
}
/**
* Set the <code>allocationRequestId</code> of the request.
* @see ResourceRequest#setAllocationRequestId(long)
* @param allocationRequestId
* <code>allocationRequestId</code> of the request
* @return {@link ResourceRequestBuilder}
*/
@Public
@Evolving
public ResourceRequestBuilder allocationRequestId(
long allocationRequestId) {
resourceRequest.setAllocationRequestId(allocationRequestId);
return this;
}
/**
* Return generated {@link ResourceRequest} object.
* @return {@link ResourceRequest}
*/
@Public
@Stable
public ResourceRequest build() {
return resourceRequest;
}
}
@Public

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
@ -106,14 +105,14 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
* All getters return immutable values.
*/
public static class ContainerRequest {
final Resource capability;
final List<String> nodes;
final List<String> racks;
final Priority priority;
final long allocationRequestId;
final boolean relaxLocality;
final String nodeLabelsExpression;
final ExecutionTypeRequest executionTypeRequest;
private Resource capability;
private List<String> nodes;
private List<String> racks;
private Priority priority;
private long allocationRequestId;
private boolean relaxLocality;
private String nodeLabelsExpression;
private ExecutionTypeRequest executionTypeRequest;
/**
* Instantiates a {@link ContainerRequest} with the given constraints and
@ -306,17 +305,6 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
Priority priority, long allocationRequestId, boolean relaxLocality,
String nodeLabelsExpression,
ExecutionTypeRequest executionTypeRequest) {
// Validate request
Preconditions.checkArgument(capability != null,
"The Resource to be requested for each container " +
"should not be null ");
Preconditions.checkArgument(priority != null,
"The priority at which to request containers should not be null ");
Preconditions.checkArgument(
!(!relaxLocality && (racks == null || racks.length == 0)
&& (nodes == null || nodes.length == 0)),
"Can't turn off locality relaxation on a " +
"request with no location constraints");
this.allocationRequestId = allocationRequestId;
this.capability = capability;
this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
@ -325,8 +313,25 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
this.relaxLocality = relaxLocality;
this.nodeLabelsExpression = nodeLabelsExpression;
this.executionTypeRequest = executionTypeRequest;
sanityCheck();
}
// Validate request
private void sanityCheck() {
Preconditions.checkArgument(capability != null,
"The Resource to be requested for each container " +
"should not be null ");
Preconditions.checkArgument(priority != null,
"The priority at which to request containers should not be null ");
Preconditions.checkArgument(
!(!relaxLocality && (racks == null || racks.size() == 0)
&& (nodes == null || nodes.size() == 0)),
"Can't turn off locality relaxation on a " +
"request with no location constraints");
}
private ContainerRequest() {};
public Resource getCapability() {
return capability;
}
@ -368,6 +373,68 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
.append("]");
return sb.toString();
}
public static ContainerRequestBuilder newBuilder() {
return new ContainerRequestBuilder();
}
/**
* Class to construct instances of {@link ContainerRequest} with specific
* options.
*/
public static final class ContainerRequestBuilder {
private ContainerRequest containerRequest = new ContainerRequest();
public ContainerRequestBuilder capability(Resource capability) {
containerRequest.capability = capability;
return this;
}
public ContainerRequestBuilder nodes(String[] nodes) {
containerRequest.nodes =
(nodes != null ? ImmutableList.copyOf(nodes): null);
return this;
}
public ContainerRequestBuilder racks(String[] racks) {
containerRequest.racks =
(racks != null ? ImmutableList.copyOf(racks) : null);
return this;
}
public ContainerRequestBuilder priority(Priority priority) {
containerRequest.priority = priority;
return this;
}
public ContainerRequestBuilder allocationRequestId(
long allocationRequestId) {
containerRequest.allocationRequestId = allocationRequestId;
return this;
}
public ContainerRequestBuilder relaxLocality(boolean relaxLocality) {
containerRequest.relaxLocality = relaxLocality;
return this;
}
public ContainerRequestBuilder nodeLabelsExpression(
String nodeLabelsExpression) {
containerRequest.nodeLabelsExpression = nodeLabelsExpression;
return this;
}
public ContainerRequestBuilder executionTypeRequest(
ExecutionTypeRequest executionTypeRequest) {
containerRequest.executionTypeRequest = executionTypeRequest;
return this;
}
public ContainerRequest build() {
containerRequest.sanityCheck();
return containerRequest;
}
}
}
/**

View File

@ -112,10 +112,10 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
ResourceRequestInfo(Long allocationRequestId, Priority priority,
String resourceName, Resource capability, boolean relaxLocality) {
remoteRequest = ResourceRequest.newInstance(priority, resourceName,
capability, 0);
remoteRequest.setAllocationRequestId(allocationRequestId);
remoteRequest.setRelaxLocality(relaxLocality);
remoteRequest = ResourceRequest.newBuilder().priority(priority)
.resourceName(resourceName).capability(capability).numContainers(0)
.allocationRequestId(allocationRequestId)
.relaxLocality(relaxLocality).build();
containerRequests = new LinkedHashSet<T>();
}
}
@ -280,9 +280,10 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
ResourceBlacklistRequest.newInstance(blacklistToAdd,
blacklistToRemove);
allocateRequest =
AllocateRequest.newInstance(lastResponseId, progressIndicator,
askList, releaseList, blacklistRequest, updateList);
allocateRequest = AllocateRequest.newBuilder()
.responseId(lastResponseId).progress(progressIndicator)
.askList(askList).resourceBlacklistRequest(blacklistRequest)
.releaseList(releaseList).updateRequests(updateList).build();
// clear blacklistAdditions and blacklistRemovals before
// unsynchronized part
blacklistAdditions.clear();
@ -415,11 +416,13 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
for(ResourceRequest r : ask) {
// create a copy of ResourceRequest as we might change it while the
// RPC layer is using it to send info across
ResourceRequest rr = ResourceRequest.newInstance(r.getPriority(),
r.getResourceName(), r.getCapability(), r.getNumContainers(),
r.getRelaxLocality(), r.getNodeLabelExpression(),
r.getExecutionTypeRequest());
rr.setAllocationRequestId(r.getAllocationRequestId());
ResourceRequest rr = ResourceRequest.newBuilder()
.priority(r.getPriority()).resourceName(r.getResourceName())
.capability(r.getCapability()).numContainers(r.getNumContainers())
.relaxLocality(r.getRelaxLocality())
.nodeLabelExpression(r.getNodeLabelExpression())
.executionTypeRequest(r.getExecutionTypeRequest())
.allocationRequestId(r.getAllocationRequestId()).build();
askList.add(rr);
}
return askList;

View File

@ -261,7 +261,9 @@ public class AllocateResponsePBImpl extends AllocateResponse {
public synchronized void setUpdateErrors(
List<UpdateContainerError> updateErrors) {
if (updateErrors == null) {
this.updateErrors.clear();
if (this.updateErrors != null) {
this.updateErrors.clear();
}
return;
}
this.updateErrors = new ArrayList<>(

View File

@ -946,10 +946,13 @@ public class AppSchedulingInfo {
}
public ResourceRequest cloneResourceRequest(ResourceRequest request) {
ResourceRequest newRequest =
ResourceRequest.newInstance(request.getPriority(),
request.getResourceName(), request.getCapability(), 1,
request.getRelaxLocality(), request.getNodeLabelExpression());
ResourceRequest newRequest = ResourceRequest.newBuilder()
.priority(request.getPriority())
.resourceName(request.getResourceName())
.capability(request.getCapability())
.numContainers(1)
.relaxLocality(request.getRelaxLocality())
.nodeLabelExpression(request.getNodeLabelExpression()).build();
return newRequest;
}

View File

@ -744,8 +744,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
}
int numCont = (int) Math.ceil(
Resources.divide(rc, clusterResource, tot, minimumAllocation));
ResourceRequest rr = ResourceRequest.newInstance(Priority.UNDEFINED,
ResourceRequest.ANY, minimumAllocation, numCont);
ResourceRequest rr = ResourceRequest.newBuilder()
.priority(Priority.UNDEFINED).resourceName(ResourceRequest.ANY)
.capability(minimumAllocation).numContainers(numCont).build();
List<Container> newlyAllocatedContainers = pullNewlyAllocatedContainers();
List<Container> newlyIncreasedContainers = pullNewlyIncreasedContainers();
List<Container> newlyDecreasedContainers = pullNewlyDecreasedContainers();