diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java index f7ce127168c..0786794d5bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java @@ -66,8 +66,10 @@ public abstract class AllocateRequest { List resourceAsk, List containersToBeReleased, ResourceBlacklistRequest resourceBlacklistRequest) { - return newInstance(responseID, appProgress, resourceAsk, - containersToBeReleased, resourceBlacklistRequest, null); + return AllocateRequest.newBuilder().responseId(responseID) + .progress(appProgress).askList(resourceAsk) + .releaseList(containersToBeReleased) + .resourceBlacklistRequest(resourceBlacklistRequest).build(); } @Public @@ -77,14 +79,12 @@ public abstract class AllocateRequest { List containersToBeReleased, ResourceBlacklistRequest resourceBlacklistRequest, List updateRequests) { - AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); - allocateRequest.setResponseId(responseID); - allocateRequest.setProgress(appProgress); - allocateRequest.setAskList(resourceAsk); - allocateRequest.setReleaseList(containersToBeReleased); - allocateRequest.setResourceBlacklistRequest(resourceBlacklistRequest); - allocateRequest.setUpdateRequests(updateRequests); - return allocateRequest; + return AllocateRequest.newBuilder().responseId(responseID) + .progress(appProgress).askList(resourceAsk) + .releaseList(containersToBeReleased) + .resourceBlacklistRequest(resourceBlacklistRequest) + .updateRequests(updateRequests) + .build(); } /** @@ -211,4 +211,116 @@ public abstract class AllocateRequest { @Unstable public abstract void setUpdateRequests( List updateRequests); -} + + @Public + @Unstable + public static AllocateRequestBuilder newBuilder() { + return new AllocateRequestBuilder(); + } + + /** + * Class to construct instances of {@link AllocateRequest} with specific + * options. + */ + @Public + @Stable + public static final class AllocateRequestBuilder { + private AllocateRequest allocateRequest = + Records.newRecord(AllocateRequest.class); + + private AllocateRequestBuilder() { + } + + /** + * Set the responseId of the request. + * @see AllocateRequest#setResponseId(int) + * @param responseId responseId of the request + * @return {@link AllocateRequestBuilder} + */ + @Public + @Stable + public AllocateRequestBuilder responseId(int responseId) { + allocateRequest.setResponseId(responseId); + return this; + } + + /** + * Set the progress of the request. + * @see AllocateRequest#setProgress(float) + * @param progress progress of the request + * @return {@link AllocateRequestBuilder} + */ + @Public + @Stable + public AllocateRequestBuilder progress(float progress) { + allocateRequest.setProgress(progress); + return this; + } + + /** + * Set the askList of the request. + * @see AllocateRequest#setAskList(List) + * @param askList askList of the request + * @return {@link AllocateRequestBuilder} + */ + @Public + @Stable + public AllocateRequestBuilder askList(List askList) { + allocateRequest.setAskList(askList); + return this; + } + + /** + * Set the releaseList of the request. + * @see AllocateRequest#setReleaseList(List) + * @param releaseList releaseList of the request + * @return {@link AllocateRequestBuilder} + */ + @Public + @Stable + public AllocateRequestBuilder releaseList(List releaseList) { + allocateRequest.setReleaseList(releaseList); + return this; + } + + /** + * Set the resourceBlacklistRequest of the request. + * @see AllocateRequest#setResourceBlacklistRequest( + * ResourceBlacklistRequest) + * @param resourceBlacklistRequest + * resourceBlacklistRequest of the request + * @return {@link AllocateRequestBuilder} + */ + @Public + @Stable + public AllocateRequestBuilder resourceBlacklistRequest( + ResourceBlacklistRequest resourceBlacklistRequest) { + allocateRequest.setResourceBlacklistRequest(resourceBlacklistRequest); + return this; + } + + /** + * Set the updateRequests of the request. + * @see AllocateRequest#setUpdateRequests(List) + * @param updateRequests updateRequests of the request + * @return {@link AllocateRequestBuilder} + */ + @Public + @Unstable + public AllocateRequestBuilder updateRequests( + List updateRequests) { + allocateRequest.setUpdateRequests(updateRequests); + return this; + } + + /** + * Return generated {@link AllocateRequest} object. + * @return {@link AllocateRequest} + */ + @Public + @Stable + public AllocateRequest build() { + return allocateRequest; + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index 69089ee8b36..d3ca765f91e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -84,17 +84,12 @@ public abstract class AllocateResponse { List allocatedContainers, List updatedNodes, Resource availResources, AMCommand command, int numClusterNodes, PreemptionMessage preempt, List nmTokens) { - AllocateResponse response = Records.newRecord(AllocateResponse.class); - response.setNumClusterNodes(numClusterNodes); - response.setResponseId(responseId); - response.setCompletedContainersStatuses(completedContainers); - response.setAllocatedContainers(allocatedContainers); - response.setUpdatedNodes(updatedNodes); - response.setAvailableResources(availResources); - response.setAMCommand(command); - response.setPreemptionMessage(preempt); - response.setNMTokens(nmTokens); - return response; + return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes) + .responseId(responseId) + .completedContainersStatuses(completedContainers) + .allocatedContainers(allocatedContainers).updatedNodes(updatedNodes) + .availableResources(availResources).amCommand(command) + .preemptionMessage(preempt).nmTokens(nmTokens).build(); } @Public @@ -105,11 +100,13 @@ public abstract class AllocateResponse { Resource availResources, AMCommand command, int numClusterNodes, PreemptionMessage preempt, List nmTokens, List updatedContainers) { - AllocateResponse response = newInstance(responseId, completedContainers, - allocatedContainers, updatedNodes, availResources, command, - numClusterNodes, preempt, nmTokens); - response.setUpdatedContainers(updatedContainers); - return response; + return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes) + .responseId(responseId) + .completedContainersStatuses(completedContainers) + .allocatedContainers(allocatedContainers).updatedNodes(updatedNodes) + .availableResources(availResources).amCommand(command) + .preemptionMessage(preempt).nmTokens(nmTokens) + .updatedContainers(updatedContainers).build(); } @Private @@ -120,12 +117,13 @@ public abstract class AllocateResponse { Resource availResources, AMCommand command, int numClusterNodes, PreemptionMessage preempt, List nmTokens, Token amRMToken, List updatedContainers) { - AllocateResponse response = - newInstance(responseId, completedContainers, allocatedContainers, - updatedNodes, availResources, command, numClusterNodes, preempt, - nmTokens, updatedContainers); - response.setAMRMToken(amRMToken); - return response; + return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes) + .responseId(responseId) + .completedContainersStatuses(completedContainers) + .allocatedContainers(allocatedContainers).updatedNodes(updatedNodes) + .availableResources(availResources).amCommand(command) + .preemptionMessage(preempt).nmTokens(nmTokens) + .updatedContainers(updatedContainers).amRmToken(amRMToken).build(); } @Public @@ -136,13 +134,14 @@ public abstract class AllocateResponse { Resource availResources, AMCommand command, int numClusterNodes, PreemptionMessage preempt, List nmTokens, Token amRMToken, List updatedContainers, String collectorAddr) { - AllocateResponse response = - newInstance(responseId, completedContainers, allocatedContainers, - updatedNodes, availResources, command, numClusterNodes, preempt, - nmTokens, updatedContainers); - response.setAMRMToken(amRMToken); - response.setCollectorAddr(collectorAddr); - return response; + return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes) + .responseId(responseId) + .completedContainersStatuses(completedContainers) + .allocatedContainers(allocatedContainers).updatedNodes(updatedNodes) + .availableResources(availResources).amCommand(command) + .preemptionMessage(preempt).nmTokens(nmTokens) + .updatedContainers(updatedContainers).amRmToken(amRMToken) + .collectorAddr(collectorAddr).build(); } /** @@ -370,4 +369,230 @@ public abstract class AllocateResponse { @Unstable public void setUpdateErrors(List updateErrors) { } + + @Private + @Unstable + public static AllocateResponseBuilder newBuilder() { + return new AllocateResponseBuilder(); + } + + /** + * Class to construct instances of {@link AllocateResponse} with specific + * options. + */ + @Private + @Unstable + public static final class AllocateResponseBuilder { + private AllocateResponse allocateResponse = + Records.newRecord(AllocateResponse.class); + + private AllocateResponseBuilder() { + allocateResponse.setApplicationPriority(Priority.newInstance(0)); + } + + /** + * Set the amCommand of the response. + * @see AllocateResponse#setAMCommand(AMCommand) + * @param amCommand amCommand of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder amCommand(AMCommand amCommand) { + allocateResponse.setAMCommand(amCommand); + return this; + } + + /** + * Set the responseId of the response. + * @see AllocateResponse#setResponseId(int) + * @param responseId responseId of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder responseId(int responseId) { + allocateResponse.setResponseId(responseId); + return this; + } + + /** + * Set the allocatedContainers of the response. + * @see AllocateResponse#setAllocatedContainers(List) + * @param allocatedContainers + * allocatedContainers of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder allocatedContainers( + List allocatedContainers) { + allocateResponse.setAllocatedContainers(allocatedContainers); + return this; + } + + /** + * Set the availableResources of the response. + * @see AllocateResponse#setAvailableResources(Resource) + * @param availableResources + * availableResources of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder availableResources( + Resource availableResources) { + allocateResponse.setAvailableResources(availableResources); + return this; + } + + /** + * Set the completedContainersStatuses of the response. + * @see AllocateResponse#setCompletedContainersStatuses(List) + * @param completedContainersStatuses + * completedContainersStatuses of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder completedContainersStatuses( + List completedContainersStatuses) { + allocateResponse + .setCompletedContainersStatuses(completedContainersStatuses); + return this; + } + + /** + * Set the updatedNodes of the response. + * @see AllocateResponse#setUpdatedNodes(List) + * @param updatedNodes updatedNodes of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder updatedNodes( + List updatedNodes) { + allocateResponse.setUpdatedNodes(updatedNodes); + return this; + } + + /** + * Set the numClusterNodes of the response. + * @see AllocateResponse#setNumClusterNodes(int) + * @param numClusterNodes numClusterNodes of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder numClusterNodes(int numClusterNodes) { + allocateResponse.setNumClusterNodes(numClusterNodes); + return this; + } + + /** + * Set the preemptionMessage of the response. + * @see AllocateResponse#setPreemptionMessage(PreemptionMessage) + * @param preemptionMessage preemptionMessage of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder preemptionMessage( + PreemptionMessage preemptionMessage) { + allocateResponse.setPreemptionMessage(preemptionMessage); + return this; + } + + /** + * Set the nmTokens of the response. + * @see AllocateResponse#setNMTokens(List) + * @param nmTokens nmTokens of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder nmTokens(List nmTokens) { + allocateResponse.setNMTokens(nmTokens); + return this; + } + + /** + * Set the updatedContainers of the response. + * @see AllocateResponse#setUpdatedContainers(List) + * @param updatedContainers updatedContainers of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder updatedContainers( + List updatedContainers) { + allocateResponse.setUpdatedContainers(updatedContainers); + return this; + } + + /** + * Set the amRmToken of the response. + * @see AllocateResponse#setAMRMToken(Token) + * @param amRmToken amRmToken of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder amRmToken(Token amRmToken) { + allocateResponse.setAMRMToken(amRmToken); + return this; + } + + /** + * Set the applicationPriority of the response. + * @see AllocateResponse#setApplicationPriority(Priority) + * @param applicationPriority + * applicationPriority of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder applicationPriority( + Priority applicationPriority) { + allocateResponse.setApplicationPriority(applicationPriority); + return this; + } + + /** + * Set the collectorAddr of the response. + * @see AllocateResponse#setCollectorAddr(String) + * @param collectorAddr collectorAddr of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder collectorAddr(String collectorAddr) { + allocateResponse.setCollectorAddr(collectorAddr); + return this; + } + + /** + * Set the updateErrors of the response. + * @see AllocateResponse#setUpdateErrors(List) + * @param updateErrors updateErrors of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder updateErrors( + List updateErrors) { + allocateResponse.setUpdateErrors(updateErrors); + return this; + } + + /** + * Return generated {@link AllocateResponse} object. + * @return {@link AllocateResponse} + */ + @Private + @Unstable + public AllocateResponse build() { + return allocateResponse; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java index cefae0eeb1b..be2c783fefe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java @@ -23,6 +23,7 @@ import java.io.Serializable; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.util.Records; @@ -63,15 +64,18 @@ public abstract class ResourceRequest implements Comparable { @Stable public static ResourceRequest newInstance(Priority priority, String hostName, Resource capability, int numContainers) { - return newInstance(priority, hostName, capability, numContainers, true); + return ResourceRequest.newBuilder().priority(priority) + .resourceName(hostName).capability(capability) + .numContainers(numContainers).build(); } @Public @Stable public static ResourceRequest newInstance(Priority priority, String hostName, Resource capability, int numContainers, boolean relaxLocality) { - return newInstance(priority, hostName, capability, numContainers, - relaxLocality, null); + return ResourceRequest.newBuilder().priority(priority) + .resourceName(hostName).capability(capability) + .numContainers(numContainers).relaxLocality(relaxLocality).build(); } @Public @@ -79,8 +83,10 @@ public abstract class ResourceRequest implements Comparable { public static ResourceRequest newInstance(Priority priority, String hostName, Resource capability, int numContainers, boolean relaxLocality, String labelExpression) { - return newInstance(priority, hostName, capability, numContainers, - relaxLocality, labelExpression, ExecutionTypeRequest.newInstance()); + return ResourceRequest.newBuilder().priority(priority) + .resourceName(hostName).capability(capability) + .numContainers(numContainers).relaxLocality(relaxLocality) + .nodeLabelExpression(labelExpression).build(); } @Public @@ -88,15 +94,158 @@ public abstract class ResourceRequest implements Comparable { public static ResourceRequest newInstance(Priority priority, String hostName, Resource capability, int numContainers, boolean relaxLocality, String labelExpression, ExecutionTypeRequest executionTypeRequest) { - ResourceRequest request = Records.newRecord(ResourceRequest.class); - request.setPriority(priority); - request.setResourceName(hostName); - request.setCapability(capability); - request.setNumContainers(numContainers); - request.setRelaxLocality(relaxLocality); - request.setNodeLabelExpression(labelExpression); - request.setExecutionTypeRequest(executionTypeRequest); - return request; + return ResourceRequest.newBuilder().priority(priority) + .resourceName(hostName).capability(capability) + .numContainers(numContainers).relaxLocality(relaxLocality) + .nodeLabelExpression(labelExpression) + .executionTypeRequest(executionTypeRequest).build(); + } + + @Public + @Unstable + public static ResourceRequestBuilder newBuilder() { + return new ResourceRequestBuilder(); + } + + /** + * Class to construct instances of {@link ResourceRequest} with specific + * options. + */ + @Public + @Stable + public static final class ResourceRequestBuilder { + private ResourceRequest resourceRequest = + Records.newRecord(ResourceRequest.class); + + private ResourceRequestBuilder() { + resourceRequest.setResourceName(ANY); + resourceRequest.setNumContainers(1); + resourceRequest.setPriority(Priority.newInstance(0)); + resourceRequest.setRelaxLocality(true); + resourceRequest.setExecutionTypeRequest( + ExecutionTypeRequest.newInstance()); + } + + /** + * Set the priority of the request. + * @see ResourceRequest#setPriority(Priority) + * @param priority priority of the request + * @return {@link ResourceRequestBuilder} + */ + @Public + @Stable + public ResourceRequestBuilder priority(Priority priority) { + resourceRequest.setPriority(priority); + return this; + } + + /** + * Set the resourceName of the request. + * @see ResourceRequest#setResourceName(String) + * @param resourceName resourceName of the request + * @return {@link ResourceRequestBuilder} + */ + @Public + @Stable + public ResourceRequestBuilder resourceName(String resourceName) { + resourceRequest.setResourceName(resourceName); + return this; + } + + /** + * Set the capability of the request. + * @see ResourceRequest#setCapability(Resource) + * @param capability capability of the request + * @return {@link ResourceRequestBuilder} + */ + @Public + @Stable + public ResourceRequestBuilder capability(Resource capability) { + resourceRequest.setCapability(capability); + return this; + } + + /** + * Set the numContainers of the request. + * @see ResourceRequest#setNumContainers(int) + * @param numContainers numContainers of the request + * @return {@link ResourceRequestBuilder} + */ + @Public + @Stable + public ResourceRequestBuilder numContainers(int numContainers) { + resourceRequest.setNumContainers(numContainers); + return this; + } + + /** + * Set the relaxLocality of the request. + * @see ResourceRequest#setRelaxLocality(boolean) + * @param relaxLocality relaxLocality of the request + * @return {@link ResourceRequestBuilder} + */ + @Public + @Stable + public ResourceRequestBuilder relaxLocality(boolean relaxLocality) { + resourceRequest.setRelaxLocality(relaxLocality); + return this; + } + + /** + * Set the nodeLabelExpression of the request. + * @see ResourceRequest#setNodeLabelExpression(String) + * @param nodeLabelExpression + * nodeLabelExpression of the request + * @return {@link ResourceRequestBuilder} + */ + @Public + @Evolving + public ResourceRequestBuilder nodeLabelExpression( + String nodeLabelExpression) { + resourceRequest.setNodeLabelExpression(nodeLabelExpression); + return this; + } + + /** + * Set the executionTypeRequest of the request. + * @see ResourceRequest#setExecutionTypeRequest( + * ExecutionTypeRequest) + * @param executionTypeRequest + * executionTypeRequest of the request + * @return {@link ResourceRequestBuilder} + */ + @Public + @Evolving + public ResourceRequestBuilder executionTypeRequest( + ExecutionTypeRequest executionTypeRequest) { + resourceRequest.setExecutionTypeRequest(executionTypeRequest); + return this; + } + + /** + * Set the allocationRequestId of the request. + * @see ResourceRequest#setAllocationRequestId(long) + * @param allocationRequestId + * allocationRequestId of the request + * @return {@link ResourceRequestBuilder} + */ + @Public + @Evolving + public ResourceRequestBuilder allocationRequestId( + long allocationRequestId) { + resourceRequest.setAllocationRequestId(allocationRequestId); + return this; + } + + /** + * Return generated {@link ResourceRequest} object. + * @return {@link ResourceRequest} + */ + @Public + @Stable + public ResourceRequest build() { + return resourceRequest; + } } @Public diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index 2990c05130f..52155f58cac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -31,7 +31,6 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; - import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ExecutionType; @@ -106,14 +105,14 @@ public abstract class AMRMClient extends * All getters return immutable values. */ public static class ContainerRequest { - final Resource capability; - final List nodes; - final List racks; - final Priority priority; - final long allocationRequestId; - final boolean relaxLocality; - final String nodeLabelsExpression; - final ExecutionTypeRequest executionTypeRequest; + private Resource capability; + private List nodes; + private List racks; + private Priority priority; + private long allocationRequestId; + private boolean relaxLocality; + private String nodeLabelsExpression; + private ExecutionTypeRequest executionTypeRequest; /** * Instantiates a {@link ContainerRequest} with the given constraints and @@ -306,17 +305,6 @@ public abstract class AMRMClient extends Priority priority, long allocationRequestId, boolean relaxLocality, String nodeLabelsExpression, ExecutionTypeRequest executionTypeRequest) { - // Validate request - Preconditions.checkArgument(capability != null, - "The Resource to be requested for each container " + - "should not be null "); - Preconditions.checkArgument(priority != null, - "The priority at which to request containers should not be null "); - Preconditions.checkArgument( - !(!relaxLocality && (racks == null || racks.length == 0) - && (nodes == null || nodes.length == 0)), - "Can't turn off locality relaxation on a " + - "request with no location constraints"); this.allocationRequestId = allocationRequestId; this.capability = capability; this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null); @@ -325,8 +313,25 @@ public abstract class AMRMClient extends this.relaxLocality = relaxLocality; this.nodeLabelsExpression = nodeLabelsExpression; this.executionTypeRequest = executionTypeRequest; + sanityCheck(); + } + + // Validate request + private void sanityCheck() { + Preconditions.checkArgument(capability != null, + "The Resource to be requested for each container " + + "should not be null "); + Preconditions.checkArgument(priority != null, + "The priority at which to request containers should not be null "); + Preconditions.checkArgument( + !(!relaxLocality && (racks == null || racks.size() == 0) + && (nodes == null || nodes.size() == 0)), + "Can't turn off locality relaxation on a " + + "request with no location constraints"); } + private ContainerRequest() {}; + public Resource getCapability() { return capability; } @@ -368,8 +373,70 @@ public abstract class AMRMClient extends .append("]"); return sb.toString(); } + + public static ContainerRequestBuilder newBuilder() { + return new ContainerRequestBuilder(); + } + + /** + * Class to construct instances of {@link ContainerRequest} with specific + * options. + */ + public static final class ContainerRequestBuilder { + private ContainerRequest containerRequest = new ContainerRequest(); + + public ContainerRequestBuilder capability(Resource capability) { + containerRequest.capability = capability; + return this; + } + + public ContainerRequestBuilder nodes(String[] nodes) { + containerRequest.nodes = + (nodes != null ? ImmutableList.copyOf(nodes): null); + return this; + } + + public ContainerRequestBuilder racks(String[] racks) { + containerRequest.racks = + (racks != null ? ImmutableList.copyOf(racks) : null); + return this; + } + + public ContainerRequestBuilder priority(Priority priority) { + containerRequest.priority = priority; + return this; + } + + public ContainerRequestBuilder allocationRequestId( + long allocationRequestId) { + containerRequest.allocationRequestId = allocationRequestId; + return this; + } + + public ContainerRequestBuilder relaxLocality(boolean relaxLocality) { + containerRequest.relaxLocality = relaxLocality; + return this; + } + + public ContainerRequestBuilder nodeLabelsExpression( + String nodeLabelsExpression) { + containerRequest.nodeLabelsExpression = nodeLabelsExpression; + return this; + } + + public ContainerRequestBuilder executionTypeRequest( + ExecutionTypeRequest executionTypeRequest) { + containerRequest.executionTypeRequest = executionTypeRequest; + return this; + } + + public ContainerRequest build() { + containerRequest.sanityCheck(); + return containerRequest; + } + } } - + /** * Register the application master. This must be called before any * other interaction diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 3ed43b004b0..44fc1e07551 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -112,10 +112,10 @@ public class AMRMClientImpl extends AMRMClient { ResourceRequestInfo(Long allocationRequestId, Priority priority, String resourceName, Resource capability, boolean relaxLocality) { - remoteRequest = ResourceRequest.newInstance(priority, resourceName, - capability, 0); - remoteRequest.setAllocationRequestId(allocationRequestId); - remoteRequest.setRelaxLocality(relaxLocality); + remoteRequest = ResourceRequest.newBuilder().priority(priority) + .resourceName(resourceName).capability(capability).numContainers(0) + .allocationRequestId(allocationRequestId) + .relaxLocality(relaxLocality).build(); containerRequests = new LinkedHashSet(); } } @@ -279,10 +279,11 @@ public class AMRMClientImpl extends AMRMClient { ResourceBlacklistRequest blacklistRequest = ResourceBlacklistRequest.newInstance(blacklistToAdd, blacklistToRemove); - - allocateRequest = - AllocateRequest.newInstance(lastResponseId, progressIndicator, - askList, releaseList, blacklistRequest, updateList); + + allocateRequest = AllocateRequest.newBuilder() + .responseId(lastResponseId).progress(progressIndicator) + .askList(askList).resourceBlacklistRequest(blacklistRequest) + .releaseList(releaseList).updateRequests(updateList).build(); // clear blacklistAdditions and blacklistRemovals before // unsynchronized part blacklistAdditions.clear(); @@ -415,11 +416,13 @@ public class AMRMClientImpl extends AMRMClient { for(ResourceRequest r : ask) { // create a copy of ResourceRequest as we might change it while the // RPC layer is using it to send info across - ResourceRequest rr = ResourceRequest.newInstance(r.getPriority(), - r.getResourceName(), r.getCapability(), r.getNumContainers(), - r.getRelaxLocality(), r.getNodeLabelExpression(), - r.getExecutionTypeRequest()); - rr.setAllocationRequestId(r.getAllocationRequestId()); + ResourceRequest rr = ResourceRequest.newBuilder() + .priority(r.getPriority()).resourceName(r.getResourceName()) + .capability(r.getCapability()).numContainers(r.getNumContainers()) + .relaxLocality(r.getRelaxLocality()) + .nodeLabelExpression(r.getNodeLabelExpression()) + .executionTypeRequest(r.getExecutionTypeRequest()) + .allocationRequestId(r.getAllocationRequestId()).build(); askList.add(rr); } return askList; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index b4f51ef22a5..c0d52a60dd1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -261,7 +261,9 @@ public class AllocateResponsePBImpl extends AllocateResponse { public synchronized void setUpdateErrors( List updateErrors) { if (updateErrors == null) { - this.updateErrors.clear(); + if (this.updateErrors != null) { + this.updateErrors.clear(); + } return; } this.updateErrors = new ArrayList<>( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index ffb188554e0..80811b14b3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -946,10 +946,13 @@ public class AppSchedulingInfo { } public ResourceRequest cloneResourceRequest(ResourceRequest request) { - ResourceRequest newRequest = - ResourceRequest.newInstance(request.getPriority(), - request.getResourceName(), request.getCapability(), 1, - request.getRelaxLocality(), request.getNodeLabelExpression()); + ResourceRequest newRequest = ResourceRequest.newBuilder() + .priority(request.getPriority()) + .resourceName(request.getResourceName()) + .capability(request.getCapability()) + .numContainers(1) + .relaxLocality(request.getRelaxLocality()) + .nodeLabelExpression(request.getNodeLabelExpression()).build(); return newRequest; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 6d9dda84a03..f076e4f6f1f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -744,8 +744,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { } int numCont = (int) Math.ceil( Resources.divide(rc, clusterResource, tot, minimumAllocation)); - ResourceRequest rr = ResourceRequest.newInstance(Priority.UNDEFINED, - ResourceRequest.ANY, minimumAllocation, numCont); + ResourceRequest rr = ResourceRequest.newBuilder() + .priority(Priority.UNDEFINED).resourceName(ResourceRequest.ANY) + .capability(minimumAllocation).numContainers(numCont).build(); List newlyAllocatedContainers = pullNewlyAllocatedContainers(); List newlyIncreasedContainers = pullNewlyIncreasedContainers(); List newlyDecreasedContainers = pullNewlyDecreasedContainers();