YARN-7292. Retrospect Resource Profile Behavior for overriding capability. Contributed by Wangda Tan.
This commit is contained in:
parent
8013475d44
commit
aae629913c
|
@ -1,173 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api.records;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Class to capture capability requirements when using resource profiles. The
|
||||
* ProfileCapability is meant to be used as part of the ResourceRequest. A
|
||||
* profile capability has two pieces - the resource profile name and the
|
||||
* overrides. The resource profile specifies the name of the resource profile
|
||||
* to be used and the capability override is the overrides desired on specific
|
||||
* resource types.
|
||||
*
|
||||
* For example, if you have a resource profile "small" that maps to
|
||||
* {@literal <4096M, 2 cores, 1 gpu>} and you set the capability override to
|
||||
* {@literal <8192M, 0 cores, 0 gpu>}, then the actual resource allocation on
|
||||
* the ResourceManager will be {@literal <8192M, 2 cores, 1 gpu>}.
|
||||
*
|
||||
* Note that the conversion from the ProfileCapability to the Resource class
|
||||
* with the actual resource requirements will be done by the ResourceManager,
|
||||
* which has the actual profile to Resource mapping.
|
||||
*
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Unstable
|
||||
public abstract class ProfileCapability {
|
||||
|
||||
public static final String DEFAULT_PROFILE = "default";
|
||||
|
||||
public static ProfileCapability newInstance(Resource override) {
|
||||
return newInstance(DEFAULT_PROFILE, override);
|
||||
}
|
||||
|
||||
public static ProfileCapability newInstance(String profile) {
|
||||
Preconditions
|
||||
.checkArgument(profile != null, "The profile name cannot be null");
|
||||
ProfileCapability obj = Records.newRecord(ProfileCapability.class);
|
||||
obj.setProfileName(profile);
|
||||
obj.setProfileCapabilityOverride(Resource.newInstance(0, 0));
|
||||
return obj;
|
||||
}
|
||||
|
||||
public static ProfileCapability newInstance(String profile,
|
||||
Resource override) {
|
||||
Preconditions
|
||||
.checkArgument(profile != null, "The profile name cannot be null");
|
||||
ProfileCapability obj = Records.newRecord(ProfileCapability.class);
|
||||
obj.setProfileName(profile);
|
||||
obj.setProfileCapabilityOverride(override);
|
||||
return obj;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the profile name.
|
||||
* @return the profile name
|
||||
*/
|
||||
public abstract String getProfileName();
|
||||
|
||||
/**
|
||||
* Get the profile capability override.
|
||||
* @return Resource object containing the override.
|
||||
*/
|
||||
public abstract Resource getProfileCapabilityOverride();
|
||||
|
||||
/**
|
||||
* Set the resource profile name.
|
||||
* @param profileName the resource profile name
|
||||
*/
|
||||
public abstract void setProfileName(String profileName);
|
||||
|
||||
/**
|
||||
* Set the capability override to override specific resource types on the
|
||||
* resource profile.
|
||||
*
|
||||
* For example, if you have a resource profile "small" that maps to
|
||||
* {@literal <4096M, 2 cores, 1 gpu>} and you set the capability override to
|
||||
* {@literal <8192M, 0 cores, 0 gpu>}, then the actual resource allocation on
|
||||
* the ResourceManager will be {@literal <8192M, 2 cores, 1 gpu>}.
|
||||
*
|
||||
* Note that the conversion from the ProfileCapability to the Resource class
|
||||
* with the actual resource requirements will be done by the ResourceManager,
|
||||
* which has the actual profile to Resource mapping.
|
||||
*
|
||||
* @param r Resource object containing the capability override
|
||||
*/
|
||||
public abstract void setProfileCapabilityOverride(Resource r);
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (this == other) {
|
||||
return true;
|
||||
}
|
||||
if (other == null || !(other instanceof ProfileCapability)) {
|
||||
return false;
|
||||
}
|
||||
return ((ProfileCapability) other).getProfileName()
|
||||
.equals(this.getProfileName()) && ((ProfileCapability) other)
|
||||
.getProfileCapabilityOverride()
|
||||
.equals(this.getProfileCapabilityOverride());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 2153;
|
||||
int result = 2459;
|
||||
String name = getProfileName();
|
||||
Resource override = getProfileCapabilityOverride();
|
||||
result = prime * result + ((name == null) ? 0 : name.hashCode());
|
||||
result = prime * result + ((override == null) ? 0 : override.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "{ profile: " + this.getProfileName() + ", capabilityOverride: "
|
||||
+ this.getProfileCapabilityOverride() + " }";
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a representation of the capability as a Resource object.
|
||||
* @param capability the capability we wish to convert
|
||||
* @param resourceProfilesMap map of profile name to Resource object
|
||||
* @return Resource object representing the capability
|
||||
*/
|
||||
public static Resource toResource(ProfileCapability capability,
|
||||
Map<String, Resource> resourceProfilesMap) {
|
||||
Preconditions
|
||||
.checkArgument(capability != null, "Capability cannot be null");
|
||||
Preconditions.checkArgument(resourceProfilesMap != null,
|
||||
"Resource profiles map cannot be null");
|
||||
Resource none = Resource.newInstance(0, 0);
|
||||
Resource resource = Resource.newInstance(0, 0);
|
||||
String profileName = capability.getProfileName();
|
||||
if (null == profileName || profileName.isEmpty()) {
|
||||
profileName = DEFAULT_PROFILE;
|
||||
}
|
||||
if (resourceProfilesMap.containsKey(profileName)) {
|
||||
resource = Resource.newInstance(resourceProfilesMap.get(profileName));
|
||||
}
|
||||
if (capability.getProfileCapabilityOverride() != null &&
|
||||
!capability.getProfileCapabilityOverride().equals(none)) {
|
||||
for (ResourceInformation entry : capability
|
||||
.getProfileCapabilityOverride().getResources()) {
|
||||
if (entry != null && entry.getValue() > 0) {
|
||||
resource.setResourceInformation(entry.getName(), entry);
|
||||
}
|
||||
}
|
||||
}
|
||||
return resource;
|
||||
}
|
||||
}
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.api.records;
|
|||
import java.io.Serializable;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
|
@ -99,21 +98,7 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
|
|||
.resourceName(hostName).capability(capability)
|
||||
.numContainers(numContainers).relaxLocality(relaxLocality)
|
||||
.nodeLabelExpression(labelExpression)
|
||||
.executionTypeRequest(executionTypeRequest).profileCapability(null)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public static ResourceRequest newInstance(Priority priority, String hostName,
|
||||
Resource capability, int numContainers, boolean relaxLocality,
|
||||
String labelExpression, ExecutionTypeRequest executionTypeRequest,
|
||||
ProfileCapability profile) {
|
||||
return ResourceRequest.newBuilder().priority(priority)
|
||||
.resourceName(hostName).capability(capability)
|
||||
.numContainers(numContainers).relaxLocality(relaxLocality)
|
||||
.nodeLabelExpression(labelExpression)
|
||||
.executionTypeRequest(executionTypeRequest).profileCapability(profile)
|
||||
.executionTypeRequest(executionTypeRequest)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
@ -140,7 +125,6 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
|
|||
resourceRequest.setRelaxLocality(true);
|
||||
resourceRequest.setExecutionTypeRequest(
|
||||
ExecutionTypeRequest.newInstance());
|
||||
resourceRequest.setProfileCapability(null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -270,21 +254,6 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the <code>resourceProfile</code> of the request.
|
||||
* @see ResourceRequest#setProfileCapability(ProfileCapability)
|
||||
* @param profileCapability
|
||||
* <code>profileCapability</code> of the request
|
||||
* @return {@link ResourceRequestBuilder}
|
||||
*/
|
||||
@Public
|
||||
@InterfaceStability.Unstable
|
||||
public ResourceRequestBuilder profileCapability(
|
||||
ProfileCapability profileCapability) {
|
||||
resourceRequest.setProfileCapability(profileCapability);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return generated {@link ResourceRequest} object.
|
||||
* @return {@link ResourceRequest}
|
||||
|
@ -502,14 +471,6 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
|
|||
@Evolving
|
||||
public abstract void setNodeLabelExpression(String nodelabelExpression);
|
||||
|
||||
@Public
|
||||
@InterfaceStability.Unstable
|
||||
public abstract ProfileCapability getProfileCapability();
|
||||
|
||||
@Public
|
||||
@InterfaceStability.Unstable
|
||||
public abstract void setProfileCapability(ProfileCapability p);
|
||||
|
||||
/**
|
||||
* Get the optional <em>ID</em> corresponding to this allocation request. This
|
||||
* ID is an identifier for different {@code ResourceRequest}s from the <b>same
|
||||
|
@ -585,14 +546,12 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
|
|||
Resource capability = getCapability();
|
||||
String hostName = getResourceName();
|
||||
Priority priority = getPriority();
|
||||
ProfileCapability profile = getProfileCapability();
|
||||
result =
|
||||
prime * result + ((capability == null) ? 0 : capability.hashCode());
|
||||
result = prime * result + ((hostName == null) ? 0 : hostName.hashCode());
|
||||
result = prime * result + getNumContainers();
|
||||
result = prime * result + ((priority == null) ? 0 : priority.hashCode());
|
||||
result = prime * result + Long.valueOf(getAllocationRequestId()).hashCode();
|
||||
result = prime * result + ((profile == null) ? 0 : profile.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
@ -384,11 +384,6 @@ enum ExecutionTypeProto {
|
|||
////////////////////////////////////////////////////////////////////////
|
||||
////// From AM_RM_Protocol /////////////////////////////////////////////
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
message ProfileCapabilityProto {
|
||||
required string profile = 1;
|
||||
required ResourceProto profileCapabilityOverride = 2;
|
||||
}
|
||||
|
||||
message ResourceRequestProto {
|
||||
optional PriorityProto priority = 1;
|
||||
optional string resource_name = 2;
|
||||
|
@ -398,7 +393,6 @@ message ResourceRequestProto {
|
|||
optional string node_label_expression = 6;
|
||||
optional ExecutionTypeRequestProto execution_type_request = 7;
|
||||
optional int64 allocation_request_id = 8 [default = -1];
|
||||
optional ProfileCapabilityProto profile = 9;
|
||||
}
|
||||
|
||||
message ExecutionTypeRequestProto {
|
||||
|
|
|
@ -89,7 +89,6 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
|||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.ProfileCapability;
|
||||
import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
|
@ -1422,10 +1421,11 @@ public class ApplicationMaster {
|
|||
Priority pri = Priority.newInstance(requestPriority);
|
||||
|
||||
// Set up resource type requirements
|
||||
ContainerRequest request =
|
||||
new ContainerRequest(createProfileCapability(), null, null,
|
||||
pri, 0, true, null,
|
||||
ExecutionTypeRequest.newInstance(containerType));
|
||||
ContainerRequest request = new ContainerRequest(
|
||||
getTaskResourceCapability(),
|
||||
null, null, pri, 0, true, null,
|
||||
ExecutionTypeRequest.newInstance(containerType),
|
||||
containerResourceProfile);
|
||||
LOG.info("Requested container ask: " + request.toString());
|
||||
return request;
|
||||
}
|
||||
|
@ -1437,7 +1437,7 @@ public class ApplicationMaster {
|
|||
ExecutionTypeRequest.newInstance(),
|
||||
Collections.singleton(spec.sourceTag),
|
||||
ResourceSizing.newInstance(
|
||||
createProfileCapability().getProfileCapabilityOverride()), null);
|
||||
getTaskResourceCapability()), null);
|
||||
sReq.setPlacementConstraint(spec.constraint);
|
||||
LOG.info("Scheduling Request made: " + sReq.toString());
|
||||
return sReq;
|
||||
|
@ -1702,7 +1702,7 @@ public class ApplicationMaster {
|
|||
}
|
||||
}
|
||||
|
||||
private ProfileCapability createProfileCapability()
|
||||
private Resource getTaskResourceCapability()
|
||||
throws YarnRuntimeException {
|
||||
if (containerMemory < -1 || containerMemory == 0) {
|
||||
throw new YarnRuntimeException("Value of AM memory '" + containerMemory
|
||||
|
@ -1727,12 +1727,6 @@ public class ApplicationMaster {
|
|||
resourceCapability.setResourceValue(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
String profileName = containerResourceProfile;
|
||||
if ("".equals(containerResourceProfile) && resourceProfiles != null) {
|
||||
profileName = "default";
|
||||
}
|
||||
ProfileCapability capability =
|
||||
ProfileCapability.newInstance(profileName, resourceCapability);
|
||||
return capability;
|
||||
return resourceCapability;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,7 +68,6 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
|||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.ProfileCapability;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||
|
@ -1123,10 +1122,17 @@ public class Client {
|
|||
+ " application master, exiting. " +
|
||||
"Specified virtual cores=" + amVCores);
|
||||
}
|
||||
String tmp = amResourceProfile;
|
||||
if (amResourceProfile.isEmpty()) {
|
||||
tmp = "default";
|
||||
Resource capability = Resource.newInstance(0, 0);
|
||||
|
||||
if (!amResourceProfile.isEmpty()) {
|
||||
if (!profiles.containsKey(amResourceProfile)) {
|
||||
throw new IllegalArgumentException(
|
||||
"Failed to find specified resource profile for application master="
|
||||
+ amResourceProfile);
|
||||
}
|
||||
capability = Resources.clone(profiles.get(amResourceProfile));
|
||||
}
|
||||
|
||||
if (appContext.getAMContainerResourceRequests() == null) {
|
||||
List<ResourceRequest> amResourceRequests = new ArrayList<ResourceRequest>();
|
||||
amResourceRequests
|
||||
|
@ -1135,31 +1141,26 @@ public class Client {
|
|||
appContext.setAMContainerResourceRequests(amResourceRequests);
|
||||
}
|
||||
|
||||
if (appContext.getAMContainerResourceRequests().get(0)
|
||||
.getProfileCapability() == null) {
|
||||
appContext.getAMContainerResourceRequests().get(0).setProfileCapability(
|
||||
ProfileCapability.newInstance(tmp, Resource.newInstance(0, 0)));
|
||||
}
|
||||
|
||||
Resource capability = Resource.newInstance(0, 0);
|
||||
|
||||
validateResourceTypes(amResources.keySet(), resourceTypes);
|
||||
for (Map.Entry<String, Long> entry : amResources.entrySet()) {
|
||||
capability.setResourceValue(entry.getKey(), entry.getValue());
|
||||
}
|
||||
// set amMemory because it's used to set Xmx param
|
||||
if (amMemory == -1) {
|
||||
amMemory = (profiles == null) ? DEFAULT_AM_MEMORY :
|
||||
profiles.get(tmp).getMemorySize();
|
||||
amMemory = DEFAULT_AM_MEMORY;
|
||||
LOG.warn("AM Memory not specified, use " + DEFAULT_AM_MEMORY
|
||||
+ " mb as AM memory");
|
||||
}
|
||||
if (amVCores == -1) {
|
||||
amVCores = (profiles == null) ? DEFAULT_AM_VCORES :
|
||||
profiles.get(tmp).getVirtualCores();
|
||||
amVCores = DEFAULT_AM_VCORES;
|
||||
LOG.warn("AM vcore not specified, use " + DEFAULT_AM_VCORES
|
||||
+ " mb as AM vcores");
|
||||
}
|
||||
capability.setMemorySize(amMemory);
|
||||
capability.setVirtualCores(amVCores);
|
||||
appContext.getAMContainerResourceRequests().get(0).getProfileCapability()
|
||||
.setProfileCapabilityOverride(capability);
|
||||
appContext.getAMContainerResourceRequests().get(0).setCapability(
|
||||
capability);
|
||||
LOG.warn("AM Resource capability=" + capability);
|
||||
}
|
||||
|
||||
private void setContainerResources(Map<String, Resource> profiles,
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.Set;
|
|||
import java.util.function.Supplier;
|
||||
import java.util.List;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
|
@ -39,7 +40,6 @@ import org.apache.hadoop.yarn.api.records.ExecutionType;
|
|||
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.ProfileCapability;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
|
||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||
|
@ -123,7 +123,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
|
|||
private String nodeLabelsExpression;
|
||||
private ExecutionTypeRequest executionTypeRequest =
|
||||
ExecutionTypeRequest.newInstance();
|
||||
private String resourceProfile = ProfileCapability.DEFAULT_PROFILE;
|
||||
private String resourceProfile = null;
|
||||
|
||||
/**
|
||||
* Instantiates a {@link ContainerRequest} with the given constraints and
|
||||
|
@ -146,6 +146,13 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
|
|||
this(capability, nodes, racks, priority, true, null);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public ContainerRequest(Resource capability, String[] nodes, String[] racks,
|
||||
Priority priority, String profile) {
|
||||
this(capability, nodes, racks, priority, 0, true, null,
|
||||
ExecutionTypeRequest.newInstance(), profile);
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiates a {@link ContainerRequest} with the given constraints and
|
||||
* locality relaxation enabled.
|
||||
|
@ -170,27 +177,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
|
|||
this(capability, nodes, racks, priority, allocationRequestId, true, null,
|
||||
ExecutionTypeRequest.newInstance());
|
||||
}
|
||||
/**
|
||||
* Instantiates a {@link ContainerRequest} with the given constraints and
|
||||
* locality relaxation enabled.
|
||||
*
|
||||
* @param capability
|
||||
* The {@link ProfileCapability} to be requested for each container.
|
||||
* @param nodes
|
||||
* Any hosts to request that the containers are placed on.
|
||||
* @param racks
|
||||
* Any racks to request that the containers are placed on. The
|
||||
* racks corresponding to any hosts requested will be automatically
|
||||
* added to this list.
|
||||
* @param priority
|
||||
* The priority at which to request the containers. Higher
|
||||
* priorities have lower numerical values.
|
||||
*/
|
||||
public ContainerRequest(ProfileCapability capability, String[] nodes,
|
||||
String[] racks, Priority priority) {
|
||||
this(capability, nodes, racks, priority, 0, true, null);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Instantiates a {@link ContainerRequest} with the given constraints.
|
||||
*
|
||||
|
@ -214,29 +201,6 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
|
|||
this(capability, nodes, racks, priority, relaxLocality, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiates a {@link ContainerRequest} with the given constraints.
|
||||
*
|
||||
* @param capability
|
||||
* The {@link ProfileCapability} to be requested for each container.
|
||||
* @param nodes
|
||||
* Any hosts to request that the containers are placed on.
|
||||
* @param racks
|
||||
* Any racks to request that the containers are placed on. The
|
||||
* racks corresponding to any hosts requested will be automatically
|
||||
* added to this list.
|
||||
* @param priority
|
||||
* The priority at which to request the containers. Higher
|
||||
* priorities have lower numerical values.
|
||||
* @param relaxLocality
|
||||
* If true, containers for this request may be assigned on hosts
|
||||
* and racks other than the ones explicitly requested.
|
||||
*/
|
||||
public ContainerRequest(ProfileCapability capability, String[] nodes,
|
||||
String[] racks, Priority priority, boolean relaxLocality) {
|
||||
this(capability, nodes, racks, priority, 0, relaxLocality, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiates a {@link ContainerRequest} with the given constraints.
|
||||
*
|
||||
|
@ -328,14 +292,6 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
|
|||
ExecutionTypeRequest.newInstance());
|
||||
}
|
||||
|
||||
public ContainerRequest(ProfileCapability capability, String[] nodes,
|
||||
String[] racks, Priority priority, long allocationRequestId,
|
||||
boolean relaxLocality, String nodeLabelsExpression) {
|
||||
this(capability, nodes, racks, priority, allocationRequestId,
|
||||
relaxLocality, nodeLabelsExpression,
|
||||
ExecutionTypeRequest.newInstance());
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiates a {@link ContainerRequest} with the given constraints.
|
||||
*
|
||||
|
@ -369,18 +325,9 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
|
|||
ExecutionTypeRequest executionTypeRequest) {
|
||||
this(capability, nodes, racks, priority, allocationRequestId,
|
||||
relaxLocality, nodeLabelsExpression, executionTypeRequest,
|
||||
ProfileCapability.DEFAULT_PROFILE);
|
||||
null);
|
||||
}
|
||||
|
||||
public ContainerRequest(ProfileCapability capability, String[] nodes,
|
||||
String[] racks, Priority priority, long allocationRequestId,
|
||||
boolean relaxLocality, String nodeLabelsExpression,
|
||||
ExecutionTypeRequest executionTypeRequest) {
|
||||
this(capability.getProfileCapabilityOverride(), nodes, racks, priority,
|
||||
allocationRequestId, relaxLocality, nodeLabelsExpression,
|
||||
executionTypeRequest, capability.getProfileName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiates a {@link ContainerRequest} with the given constraints.
|
||||
*
|
||||
|
@ -779,7 +726,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
|
|||
@InterfaceStability.Evolving
|
||||
public List<? extends Collection<T>> getMatchingRequests(
|
||||
Priority priority, String resourceName, ExecutionType executionType,
|
||||
ProfileCapability capability) {
|
||||
Resource capability, String profile) {
|
||||
throw new UnsupportedOperationException("The sub-class extending" +
|
||||
" AMRMClient is expected to implement this !!");
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.yarn.client.api.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -58,9 +59,9 @@ import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
|||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.ProfileCapability;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
|
@ -124,14 +125,11 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
LinkedHashSet<T> containerRequests;
|
||||
|
||||
ResourceRequestInfo(Long allocationRequestId, Priority priority,
|
||||
String resourceName, Resource capability, boolean relaxLocality,
|
||||
String resourceProfile) {
|
||||
ProfileCapability profileCapability = ProfileCapability
|
||||
.newInstance(resourceProfile, capability);
|
||||
String resourceName, Resource capability, boolean relaxLocality) {
|
||||
remoteRequest = ResourceRequest.newBuilder().priority(priority)
|
||||
.resourceName(resourceName).capability(capability).numContainers(0)
|
||||
.allocationRequestId(allocationRequestId).relaxLocality(relaxLocality)
|
||||
.profileCapability(profileCapability).build();
|
||||
.build();
|
||||
containerRequests = new LinkedHashSet<T>();
|
||||
}
|
||||
}
|
||||
|
@ -140,32 +138,11 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
* Class compares Resource by memory, then cpu and then the remaining resource
|
||||
* types in reverse order.
|
||||
*/
|
||||
static class ProfileCapabilityComparator<T extends ProfileCapability>
|
||||
implements Comparator<T> {
|
||||
|
||||
HashMap<String, Resource> resourceProfilesMap;
|
||||
|
||||
public ProfileCapabilityComparator(
|
||||
HashMap<String, Resource> resourceProfileMap) {
|
||||
this.resourceProfilesMap = resourceProfileMap;
|
||||
static class ResourceReverseComparator<T extends Resource>
|
||||
implements Comparator<T>, Serializable {
|
||||
public int compare(Resource res0, Resource res1) {
|
||||
return res1.compareTo(res0);
|
||||
}
|
||||
|
||||
public int compare(T arg0, T arg1) {
|
||||
Resource resource0 =
|
||||
ProfileCapability.toResource(arg0, resourceProfilesMap);
|
||||
Resource resource1 =
|
||||
ProfileCapability.toResource(arg1, resourceProfilesMap);
|
||||
return resource1.compareTo(resource0);
|
||||
}
|
||||
}
|
||||
|
||||
boolean canFit(ProfileCapability arg0, ProfileCapability arg1) {
|
||||
Resource resource0 =
|
||||
ProfileCapability.toResource(arg0, resourceProfilesMap);
|
||||
Resource resource1 =
|
||||
ProfileCapability.toResource(arg1, resourceProfilesMap);
|
||||
return Resources.fitsIn(resource0, resource1);
|
||||
|
||||
}
|
||||
|
||||
private final Map<Long, RemoteRequestsTable<T>> remoteRequests =
|
||||
|
@ -567,7 +544,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
.nodeLabelExpression(r.getNodeLabelExpression())
|
||||
.executionTypeRequest(r.getExecutionTypeRequest())
|
||||
.allocationRequestId(r.getAllocationRequestId())
|
||||
.profileCapability(r.getProfileCapability()).build();
|
||||
.build();
|
||||
askList.add(rr);
|
||||
}
|
||||
return askList;
|
||||
|
@ -649,8 +626,6 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
public synchronized void addContainerRequest(T req) {
|
||||
Preconditions.checkArgument(req != null,
|
||||
"Resource request can not be null.");
|
||||
ProfileCapability profileCapability = ProfileCapability
|
||||
.newInstance(req.getResourceProfile(), req.getCapability());
|
||||
Set<String> dedupedRacks = new HashSet<String>();
|
||||
if (req.getRacks() != null) {
|
||||
dedupedRacks.addAll(req.getRacks());
|
||||
|
@ -663,7 +638,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
Set<String> inferredRacks = resolveRacks(req.getNodes());
|
||||
inferredRacks.removeAll(dedupedRacks);
|
||||
|
||||
checkResourceProfile(req.getResourceProfile());
|
||||
Resource resource = checkAndGetResourceProfile(req.getResourceProfile(),
|
||||
req.getCapability());
|
||||
|
||||
// check that specific and non-specific requests cannot be mixed within a
|
||||
// priority
|
||||
|
@ -689,26 +665,26 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
}
|
||||
for (String node : dedupedNodes) {
|
||||
addResourceRequest(req.getPriority(), node,
|
||||
req.getExecutionTypeRequest(), profileCapability, req, true,
|
||||
req.getExecutionTypeRequest(), resource, req, true,
|
||||
req.getNodeLabelExpression());
|
||||
}
|
||||
}
|
||||
|
||||
for (String rack : dedupedRacks) {
|
||||
addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(),
|
||||
profileCapability, req, true, req.getNodeLabelExpression());
|
||||
resource, req, true, req.getNodeLabelExpression());
|
||||
}
|
||||
|
||||
// Ensure node requests are accompanied by requests for
|
||||
// corresponding rack
|
||||
for (String rack : inferredRacks) {
|
||||
addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(),
|
||||
profileCapability, req, req.getRelaxLocality(),
|
||||
resource, req, req.getRelaxLocality(),
|
||||
req.getNodeLabelExpression());
|
||||
}
|
||||
// Off-switch
|
||||
addResourceRequest(req.getPriority(), ResourceRequest.ANY,
|
||||
req.getExecutionTypeRequest(), profileCapability, req,
|
||||
req.getExecutionTypeRequest(), resource, req,
|
||||
req.getRelaxLocality(), req.getNodeLabelExpression());
|
||||
}
|
||||
|
||||
|
@ -716,8 +692,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
public synchronized void removeContainerRequest(T req) {
|
||||
Preconditions.checkArgument(req != null,
|
||||
"Resource request can not be null.");
|
||||
ProfileCapability profileCapability = ProfileCapability
|
||||
.newInstance(req.getResourceProfile(), req.getCapability());
|
||||
Resource resource = checkAndGetResourceProfile(req.getResourceProfile(),
|
||||
req.getCapability());
|
||||
Set<String> allRacks = new HashSet<String>();
|
||||
if (req.getRacks() != null) {
|
||||
allRacks.addAll(req.getRacks());
|
||||
|
@ -728,17 +704,17 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
if (req.getNodes() != null) {
|
||||
for (String node : new HashSet<String>(req.getNodes())) {
|
||||
decResourceRequest(req.getPriority(), node,
|
||||
req.getExecutionTypeRequest(), profileCapability, req);
|
||||
req.getExecutionTypeRequest(), resource, req);
|
||||
}
|
||||
}
|
||||
|
||||
for (String rack : allRacks) {
|
||||
decResourceRequest(req.getPriority(), rack,
|
||||
req.getExecutionTypeRequest(), profileCapability, req);
|
||||
req.getExecutionTypeRequest(), resource, req);
|
||||
}
|
||||
|
||||
decResourceRequest(req.getPriority(), ResourceRequest.ANY,
|
||||
req.getExecutionTypeRequest(), profileCapability, req);
|
||||
req.getExecutionTypeRequest(), resource, req);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -833,26 +809,23 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public synchronized List<? extends Collection<T>> getMatchingRequests(
|
||||
Priority priority, String resourceName, ExecutionType executionType,
|
||||
Resource capability) {
|
||||
ProfileCapability profileCapability =
|
||||
ProfileCapability.newInstance(capability);
|
||||
return getMatchingRequests(priority, resourceName, executionType,
|
||||
profileCapability);
|
||||
public List<? extends Collection<T>> getMatchingRequests(Priority priority,
|
||||
String resourceName, ExecutionType executionType,
|
||||
Resource capability, String profile) {
|
||||
capability = checkAndGetResourceProfile(profile, capability);
|
||||
return getMatchingRequests(priority, resourceName, executionType, capability);
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public synchronized List<? extends Collection<T>> getMatchingRequests(
|
||||
Priority priority, String resourceName, ExecutionType executionType,
|
||||
ProfileCapability capability) {
|
||||
Resource capability) {
|
||||
Preconditions.checkArgument(capability != null,
|
||||
"The Resource to be requested should not be null ");
|
||||
Preconditions.checkArgument(priority != null,
|
||||
"The priority at which to request containers should not be null ");
|
||||
List<LinkedHashSet<T>> list = new LinkedList<LinkedHashSet<T>>();
|
||||
List<LinkedHashSet<T>> list = new LinkedList<>();
|
||||
|
||||
RemoteRequestsTable remoteRequestsTable = getTable(0);
|
||||
|
||||
|
@ -864,7 +837,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
// If no exact match. Container may be larger than what was requested.
|
||||
// get all resources <= capability. map is reverse sorted.
|
||||
for (ResourceRequestInfo<T> resReqInfo : matchingRequests) {
|
||||
if (canFit(resReqInfo.remoteRequest.getProfileCapability(),
|
||||
if (Resources.fitsIn(resReqInfo.remoteRequest.getCapability(),
|
||||
capability) && !resReqInfo.containerRequests.isEmpty()) {
|
||||
list.add(resReqInfo.containerRequests);
|
||||
}
|
||||
|
@ -921,13 +894,34 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
}
|
||||
}
|
||||
|
||||
private void checkResourceProfile(String profile) {
|
||||
if (resourceProfilesMap != null && !resourceProfilesMap.isEmpty()
|
||||
&& !resourceProfilesMap.containsKey(profile)) {
|
||||
throw new InvalidContainerRequestException(
|
||||
"Invalid profile name, valid profile names are " + resourceProfilesMap
|
||||
.keySet());
|
||||
// When profile and override resource are specified at the same time, override
|
||||
// predefined resource value in profile if any resource type has a positive
|
||||
// value.
|
||||
private Resource checkAndGetResourceProfile(String profile,
|
||||
Resource overrideResource) {
|
||||
Resource returnResource = overrideResource;
|
||||
|
||||
// if application requested a non-empty/null profile, and the
|
||||
if (profile != null && !profile.isEmpty()) {
|
||||
if (resourceProfilesMap == null || (!resourceProfilesMap.containsKey(
|
||||
profile))) {
|
||||
throw new InvalidContainerRequestException(
|
||||
"Invalid profile name specified=" + profile + (
|
||||
resourceProfilesMap == null ?
|
||||
"" :
|
||||
(", valid profile names are " + resourceProfilesMap
|
||||
.keySet())));
|
||||
}
|
||||
returnResource = Resources.clone(resourceProfilesMap.get(profile));
|
||||
for (ResourceInformation info : overrideResource
|
||||
.getAllResourcesListCopy()) {
|
||||
if (info.getValue() > 0) {
|
||||
returnResource.setResourceInformation(info.getName(), info);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return returnResource;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1016,16 +1010,12 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
}
|
||||
|
||||
private void addResourceRequest(Priority priority, String resourceName,
|
||||
ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req,
|
||||
ExecutionTypeRequest execTypeReq, Resource capability, T req,
|
||||
boolean relaxLocality, String labelExpression) {
|
||||
RemoteRequestsTable<T> remoteRequestsTable =
|
||||
getTable(req.getAllocationRequestId());
|
||||
if (remoteRequestsTable == null) {
|
||||
remoteRequestsTable = new RemoteRequestsTable<T>();
|
||||
if (this.resourceProfilesMap instanceof HashMap) {
|
||||
remoteRequestsTable.setResourceComparator(
|
||||
new ProfileCapabilityComparator((HashMap) resourceProfilesMap));
|
||||
}
|
||||
remoteRequestsTable = new RemoteRequestsTable<>();
|
||||
putTable(req.getAllocationRequestId(), remoteRequestsTable);
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -1048,7 +1038,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|||
}
|
||||
|
||||
private void decResourceRequest(Priority priority, String resourceName,
|
||||
ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req) {
|
||||
ExecutionTypeRequest execTypeReq, Resource capability, T req) {
|
||||
RemoteRequestsTable<T> remoteRequestsTable =
|
||||
getTable(req.getAllocationRequestId());
|
||||
if (remoteRequestsTable != null) {
|
||||
|
|
|
@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.client.api.impl;
|
|||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.ProfileCapability;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
|
@ -33,7 +33,6 @@ import java.util.TreeMap;
|
|||
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ResourceRequestInfo;
|
||||
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ProfileCapabilityComparator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -42,36 +41,34 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
|
|||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(RemoteRequestsTable.class);
|
||||
|
||||
private ProfileCapabilityComparator resourceComparator;
|
||||
|
||||
/**
|
||||
* Nested Iterator that iterates over just the ResourceRequestInfo
|
||||
* object.
|
||||
*/
|
||||
class RequestInfoIterator implements Iterator<ResourceRequestInfo> {
|
||||
private Iterator<Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
|
||||
private Iterator<Map<String, Map<ExecutionType, TreeMap<Resource,
|
||||
ResourceRequestInfo>>>> iLocMap;
|
||||
private Iterator<Map<ExecutionType, TreeMap<ProfileCapability,
|
||||
private Iterator<Map<ExecutionType, TreeMap<Resource,
|
||||
ResourceRequestInfo>>> iExecTypeMap;
|
||||
private Iterator<TreeMap<ProfileCapability, ResourceRequestInfo>> iCapMap;
|
||||
private Iterator<TreeMap<Resource, ResourceRequestInfo>> iCapMap;
|
||||
private Iterator<ResourceRequestInfo> iResReqInfo;
|
||||
|
||||
public RequestInfoIterator(Iterator<Map<String,
|
||||
Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>>>
|
||||
Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>>>
|
||||
iLocationMap) {
|
||||
this.iLocMap = iLocationMap;
|
||||
if (iLocMap.hasNext()) {
|
||||
iExecTypeMap = iLocMap.next().values().iterator();
|
||||
} else {
|
||||
iExecTypeMap =
|
||||
new LinkedList<Map<ExecutionType, TreeMap<ProfileCapability,
|
||||
new LinkedList<Map<ExecutionType, TreeMap<Resource,
|
||||
ResourceRequestInfo>>>().iterator();
|
||||
}
|
||||
if (iExecTypeMap.hasNext()) {
|
||||
iCapMap = iExecTypeMap.next().values().iterator();
|
||||
} else {
|
||||
iCapMap =
|
||||
new LinkedList<TreeMap<ProfileCapability, ResourceRequestInfo>>()
|
||||
new LinkedList<TreeMap<Resource, ResourceRequestInfo>>()
|
||||
.iterator();
|
||||
}
|
||||
if (iCapMap.hasNext()) {
|
||||
|
@ -113,7 +110,7 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
|
|||
// Nest map with Primary key :
|
||||
// Priority -> ResourceName(String) -> ExecutionType -> Capability(Resource)
|
||||
// and value : ResourceRequestInfo
|
||||
private Map<Priority, Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
|
||||
private Map<Priority, Map<String, Map<ExecutionType, TreeMap<Resource,
|
||||
ResourceRequestInfo>>>> remoteRequestsTable = new HashMap<>();
|
||||
|
||||
@Override
|
||||
|
@ -122,8 +119,8 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
|
|||
}
|
||||
|
||||
ResourceRequestInfo get(Priority priority, String location,
|
||||
ExecutionType execType, ProfileCapability capability) {
|
||||
TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap =
|
||||
ExecutionType execType, Resource capability) {
|
||||
TreeMap<Resource, ResourceRequestInfo> capabilityMap =
|
||||
getCapabilityMap(priority, location, execType);
|
||||
if (capabilityMap == null) {
|
||||
return null;
|
||||
|
@ -133,8 +130,8 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
|
|||
|
||||
@SuppressWarnings("unchecked")
|
||||
void put(Priority priority, String resourceName, ExecutionType execType,
|
||||
ProfileCapability capability, ResourceRequestInfo resReqInfo) {
|
||||
Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
|
||||
Resource capability, ResourceRequestInfo resReqInfo) {
|
||||
Map<String, Map<ExecutionType, TreeMap<Resource,
|
||||
ResourceRequestInfo>>> locationMap =
|
||||
remoteRequestsTable.get(priority);
|
||||
if (locationMap == null) {
|
||||
|
@ -144,7 +141,7 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
|
|||
LOG.debug("Added priority=" + priority);
|
||||
}
|
||||
}
|
||||
Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>
|
||||
Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
|
||||
execTypeMap = locationMap.get(resourceName);
|
||||
if (execTypeMap == null) {
|
||||
execTypeMap = new HashMap<>();
|
||||
|
@ -153,15 +150,10 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
|
|||
LOG.debug("Added resourceName=" + resourceName);
|
||||
}
|
||||
}
|
||||
TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap =
|
||||
TreeMap<Resource, ResourceRequestInfo> capabilityMap =
|
||||
execTypeMap.get(execType);
|
||||
if (capabilityMap == null) {
|
||||
// this can happen if the user doesn't register with the RM before
|
||||
// calling addResourceRequest
|
||||
if (resourceComparator == null) {
|
||||
resourceComparator = new ProfileCapabilityComparator(new HashMap<>());
|
||||
}
|
||||
capabilityMap = new TreeMap<>(resourceComparator);
|
||||
capabilityMap = new TreeMap<>(new AMRMClientImpl.ResourceReverseComparator());
|
||||
execTypeMap.put(execType, capabilityMap);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Added Execution Type=" + execType);
|
||||
|
@ -171,9 +163,9 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
|
|||
}
|
||||
|
||||
ResourceRequestInfo remove(Priority priority, String resourceName,
|
||||
ExecutionType execType, ProfileCapability capability) {
|
||||
ExecutionType execType, Resource capability) {
|
||||
ResourceRequestInfo retVal = null;
|
||||
Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
|
||||
Map<String, Map<ExecutionType, TreeMap<Resource,
|
||||
ResourceRequestInfo>>> locationMap = remoteRequestsTable.get(priority);
|
||||
if (locationMap == null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -181,7 +173,7 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
|
|||
}
|
||||
return null;
|
||||
}
|
||||
Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>
|
||||
Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
|
||||
execTypeMap = locationMap.get(resourceName);
|
||||
if (execTypeMap == null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -189,7 +181,7 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
|
|||
}
|
||||
return null;
|
||||
}
|
||||
TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap =
|
||||
TreeMap<Resource, ResourceRequestInfo> capabilityMap =
|
||||
execTypeMap.get(execType);
|
||||
if (capabilityMap == null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -210,14 +202,14 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
|
|||
return retVal;
|
||||
}
|
||||
|
||||
Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
|
||||
Map<String, Map<ExecutionType, TreeMap<Resource,
|
||||
ResourceRequestInfo>>> getLocationMap(Priority priority) {
|
||||
return remoteRequestsTable.get(priority);
|
||||
}
|
||||
|
||||
Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>
|
||||
Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
|
||||
getExecutionTypeMap(Priority priority, String location) {
|
||||
Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
|
||||
Map<String, Map<ExecutionType, TreeMap<Resource,
|
||||
ResourceRequestInfo>>> locationMap = getLocationMap(priority);
|
||||
if (locationMap == null) {
|
||||
return null;
|
||||
|
@ -225,10 +217,10 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
|
|||
return locationMap.get(location);
|
||||
}
|
||||
|
||||
TreeMap<ProfileCapability, ResourceRequestInfo> getCapabilityMap(Priority
|
||||
TreeMap<Resource, ResourceRequestInfo> getCapabilityMap(Priority
|
||||
priority, String location,
|
||||
ExecutionType execType) {
|
||||
Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>
|
||||
Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
|
||||
executionTypeMap = getExecutionTypeMap(priority, location);
|
||||
if (executionTypeMap == null) {
|
||||
return null;
|
||||
|
@ -242,7 +234,7 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
|
|||
List retList = new LinkedList<>();
|
||||
for (String location : locations) {
|
||||
for (ExecutionType eType : ExecutionType.values()) {
|
||||
TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap =
|
||||
TreeMap<Resource, ResourceRequestInfo> capabilityMap =
|
||||
getCapabilityMap(priority, location, eType);
|
||||
if (capabilityMap != null) {
|
||||
retList.addAll(capabilityMap.values());
|
||||
|
@ -254,9 +246,9 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
|
|||
|
||||
List<ResourceRequestInfo> getMatchingRequests(
|
||||
Priority priority, String resourceName, ExecutionType executionType,
|
||||
ProfileCapability capability) {
|
||||
Resource capability) {
|
||||
List<ResourceRequestInfo> list = new LinkedList<>();
|
||||
TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap =
|
||||
TreeMap<Resource, ResourceRequestInfo> capabilityMap =
|
||||
getCapabilityMap(priority, resourceName, executionType);
|
||||
if (capabilityMap != null) {
|
||||
ResourceRequestInfo resourceRequestInfo = capabilityMap.get(capability);
|
||||
|
@ -272,15 +264,14 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
|
|||
@SuppressWarnings("unchecked")
|
||||
ResourceRequestInfo addResourceRequest(Long allocationRequestId,
|
||||
Priority priority, String resourceName, ExecutionTypeRequest execTypeReq,
|
||||
ProfileCapability capability, T req, boolean relaxLocality,
|
||||
Resource capability, T req, boolean relaxLocality,
|
||||
String labelExpression) {
|
||||
ResourceRequestInfo resourceRequestInfo =
|
||||
get(priority, resourceName, execTypeReq.getExecutionType(), capability);
|
||||
if (resourceRequestInfo == null) {
|
||||
resourceRequestInfo =
|
||||
new ResourceRequestInfo(allocationRequestId, priority, resourceName,
|
||||
capability.getProfileCapabilityOverride(), relaxLocality,
|
||||
capability.getProfileName());
|
||||
capability, relaxLocality);
|
||||
put(priority, resourceName, execTypeReq.getExecutionType(), capability,
|
||||
resourceRequestInfo);
|
||||
}
|
||||
|
@ -302,7 +293,7 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
|
|||
}
|
||||
|
||||
ResourceRequestInfo decResourceRequest(Priority priority, String resourceName,
|
||||
ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req) {
|
||||
ExecutionTypeRequest execTypeReq, Resource capability, T req) {
|
||||
ResourceRequestInfo resourceRequestInfo = get(priority, resourceName,
|
||||
execTypeReq.getExecutionType(), capability);
|
||||
|
||||
|
@ -339,35 +330,4 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
|
|||
boolean isEmpty() {
|
||||
return remoteRequestsTable.isEmpty();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void setResourceComparator(ProfileCapabilityComparator comparator) {
|
||||
ProfileCapabilityComparator old = this.resourceComparator;
|
||||
this.resourceComparator = comparator;
|
||||
if (old != null) {
|
||||
// we've already set a resource comparator - re-create the maps with the
|
||||
// new one. this is needed in case someone adds container requests before
|
||||
// registering with the RM. In such a case, the comparator won't have
|
||||
// the resource profiles map. After registration, the map is available
|
||||
// so re-create the capabilities maps
|
||||
|
||||
for (Map.Entry<Priority, Map<String, Map<ExecutionType,
|
||||
TreeMap<ProfileCapability, ResourceRequestInfo>>>>
|
||||
priEntry : remoteRequestsTable.entrySet()) {
|
||||
for (Map.Entry<String, Map<ExecutionType, TreeMap<ProfileCapability,
|
||||
ResourceRequestInfo>>> nameEntry : priEntry.getValue().entrySet()) {
|
||||
for (Map.Entry<ExecutionType, TreeMap<ProfileCapability,
|
||||
ResourceRequestInfo>> execEntry : nameEntry
|
||||
.getValue().entrySet()) {
|
||||
Map<ProfileCapability, ResourceRequestInfo> capabilityMap =
|
||||
execEntry.getValue();
|
||||
TreeMap<ProfileCapability, ResourceRequestInfo> newCapabiltyMap =
|
||||
new TreeMap<>(resourceComparator);
|
||||
newCapabiltyMap.putAll(capabilityMap);
|
||||
execEntry.setValue(newCapabiltyMap);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -470,19 +470,16 @@ public class TestAMRMClient extends BaseAMRMClientTest{
|
|||
amClient.addContainerRequest(storedContainer1);
|
||||
amClient.addContainerRequest(storedContainer2);
|
||||
amClient.addContainerRequest(storedContainer3);
|
||||
|
||||
ProfileCapability profileCapability =
|
||||
ProfileCapability.newInstance(capability);
|
||||
|
||||
// test addition and storage
|
||||
RemoteRequestsTable<ContainerRequest> remoteRequestsTable =
|
||||
amClient.getTable(0);
|
||||
int containersRequestedAny = remoteRequestsTable.get(priority,
|
||||
ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
|
||||
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
|
||||
.remoteRequest.getNumContainers();
|
||||
assertEquals(2, containersRequestedAny);
|
||||
containersRequestedAny = remoteRequestsTable.get(priority1,
|
||||
ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
|
||||
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
|
||||
.remoteRequest.getNumContainers();
|
||||
assertEquals(1, containersRequestedAny);
|
||||
List<? extends Collection<ContainerRequest>> matches =
|
||||
|
@ -1185,11 +1182,9 @@ public class TestAMRMClient extends BaseAMRMClientTest{
|
|||
true, null, ExecutionTypeRequest
|
||||
.newInstance(ExecutionType.OPPORTUNISTIC, true)));
|
||||
|
||||
ProfileCapability profileCapability =
|
||||
ProfileCapability.newInstance(capability);
|
||||
int oppContainersRequestedAny =
|
||||
amClient.getTable(0).get(priority2, ResourceRequest.ANY,
|
||||
ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
|
||||
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
|
||||
.getNumContainers();
|
||||
|
||||
assertEquals(1, oppContainersRequestedAny);
|
||||
|
@ -1326,11 +1321,9 @@ public class TestAMRMClient extends BaseAMRMClientTest{
|
|||
true, null, ExecutionTypeRequest
|
||||
.newInstance(ExecutionType.GUARANTEED, true)));
|
||||
|
||||
ProfileCapability profileCapability =
|
||||
ProfileCapability.newInstance(capability);
|
||||
int oppContainersRequestedAny =
|
||||
amClient.getTable(0).get(priority2, ResourceRequest.ANY,
|
||||
ExecutionType.GUARANTEED, profileCapability).remoteRequest
|
||||
ExecutionType.GUARANTEED, capability).remoteRequest
|
||||
.getNumContainers();
|
||||
|
||||
assertEquals(1, oppContainersRequestedAny);
|
||||
|
@ -1710,16 +1703,14 @@ public class TestAMRMClient extends BaseAMRMClientTest{
|
|||
int expAsks, int expRelease) {
|
||||
RemoteRequestsTable<ContainerRequest> remoteRequestsTable =
|
||||
amClient.getTable(allocationReqId);
|
||||
ProfileCapability profileCapability =
|
||||
ProfileCapability.newInstance(capability);
|
||||
int containersRequestedNode = remoteRequestsTable.get(priority,
|
||||
node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
|
||||
node, ExecutionType.GUARANTEED, capability).remoteRequest
|
||||
.getNumContainers();
|
||||
int containersRequestedRack = remoteRequestsTable.get(priority,
|
||||
rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
|
||||
rack, ExecutionType.GUARANTEED, capability).remoteRequest
|
||||
.getNumContainers();
|
||||
int containersRequestedAny = remoteRequestsTable.get(priority,
|
||||
ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
|
||||
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
|
||||
.remoteRequest.getNumContainers();
|
||||
|
||||
assertEquals(expNode, containersRequestedNode);
|
||||
|
@ -1931,31 +1922,20 @@ public class TestAMRMClient extends BaseAMRMClientTest{
|
|||
amClient.start();
|
||||
amClient.registerApplicationMaster("Host", 10000, "");
|
||||
|
||||
ProfileCapability capability1 = ProfileCapability.newInstance("minimum");
|
||||
ProfileCapability capability2 = ProfileCapability.newInstance("default");
|
||||
ProfileCapability capability3 = ProfileCapability.newInstance("maximum");
|
||||
ProfileCapability capability4 = ProfileCapability
|
||||
.newInstance("minimum", Resource.newInstance(2048, 1));
|
||||
ProfileCapability capability5 = ProfileCapability.newInstance("default");
|
||||
ProfileCapability capability6 = ProfileCapability
|
||||
.newInstance("default", Resource.newInstance(2048, 1));
|
||||
// http has the same capabilities as default
|
||||
ProfileCapability capability7 = ProfileCapability.newInstance("http");
|
||||
|
||||
ContainerRequest storedContainer1 =
|
||||
new ContainerRequest(capability1, nodes, racks, priority);
|
||||
ContainerRequest storedContainer2 =
|
||||
new ContainerRequest(capability2, nodes, racks, priority);
|
||||
ContainerRequest storedContainer3 =
|
||||
new ContainerRequest(capability3, nodes, racks, priority);
|
||||
ContainerRequest storedContainer4 =
|
||||
new ContainerRequest(capability4, nodes, racks, priority);
|
||||
ContainerRequest storedContainer5 =
|
||||
new ContainerRequest(capability5, nodes, racks, priority2);
|
||||
ContainerRequest storedContainer6 =
|
||||
new ContainerRequest(capability6, nodes, racks, priority);
|
||||
ContainerRequest storedContainer7 =
|
||||
new ContainerRequest(capability7, nodes, racks, priority);
|
||||
ContainerRequest storedContainer1 = new ContainerRequest(
|
||||
Resource.newInstance(0, 0), nodes, racks, priority, "minimum");
|
||||
ContainerRequest storedContainer2 = new ContainerRequest(
|
||||
Resource.newInstance(0, 0), nodes, racks, priority, "default");
|
||||
ContainerRequest storedContainer3 = new ContainerRequest(
|
||||
Resource.newInstance(0, 0), nodes, racks, priority, "maximum");
|
||||
ContainerRequest storedContainer4 = new ContainerRequest(
|
||||
Resource.newInstance(2048, 1), nodes, racks, priority, "minimum");
|
||||
ContainerRequest storedContainer5 = new ContainerRequest(
|
||||
Resource.newInstance(2048, 1), nodes, racks, priority2, "default");
|
||||
ContainerRequest storedContainer6 = new ContainerRequest(
|
||||
Resource.newInstance(2048, 1), nodes, racks, priority, "default");
|
||||
ContainerRequest storedContainer7 = new ContainerRequest(
|
||||
Resource.newInstance(0, 0), nodes, racks, priority, "http");
|
||||
|
||||
|
||||
amClient.addContainerRequest(storedContainer1);
|
||||
|
@ -1970,11 +1950,8 @@ public class TestAMRMClient extends BaseAMRMClientTest{
|
|||
List<? extends Collection<ContainerRequest>> matches;
|
||||
ContainerRequest storedRequest;
|
||||
// exact match
|
||||
ProfileCapability testCapability1 =
|
||||
ProfileCapability.newInstance("minimum");
|
||||
matches = amClient
|
||||
.getMatchingRequests(priority, node, ExecutionType.GUARANTEED,
|
||||
testCapability1);
|
||||
matches = amClient.getMatchingRequests(priority, node,
|
||||
ExecutionType.GUARANTEED, Resource.newInstance(0, 0), "minimum");
|
||||
verifyMatches(matches, 1);
|
||||
storedRequest = matches.get(0).iterator().next();
|
||||
assertEquals(storedContainer1, storedRequest);
|
||||
|
@ -1983,11 +1960,9 @@ public class TestAMRMClient extends BaseAMRMClientTest{
|
|||
// exact matching with order maintained
|
||||
// we should get back 3 matches - default + http because they have the
|
||||
// same capability
|
||||
ProfileCapability testCapability2 =
|
||||
ProfileCapability.newInstance("default");
|
||||
matches = amClient
|
||||
.getMatchingRequests(priority, node, ExecutionType.GUARANTEED,
|
||||
testCapability2);
|
||||
Resource.newInstance(0, 0), "default");
|
||||
verifyMatches(matches, 2);
|
||||
// must be returned in the order they were made
|
||||
int i = 0;
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.hadoop.net.DNSToSwitchMapping;
|
|||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.ProfileCapability;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||
|
@ -277,10 +276,8 @@ public class TestAMRMClientContainerRequest {
|
|||
AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
|
||||
String location, boolean expectedRelaxLocality,
|
||||
ExecutionType executionType) {
|
||||
ProfileCapability profileCapability = ProfileCapability
|
||||
.newInstance(request.getResourceProfile(), request.getCapability());
|
||||
ResourceRequest ask = client.getTable(0).get(request.getPriority(),
|
||||
location, executionType, profileCapability).remoteRequest;
|
||||
location, executionType, request.getCapability()).remoteRequest;
|
||||
assertEquals(location, ask.getResourceName());
|
||||
assertEquals(1, ask.getNumContainers());
|
||||
assertEquals(expectedRelaxLocality, ask.getRelaxLocality());
|
||||
|
|
|
@ -18,22 +18,6 @@
|
|||
|
||||
package org.apache.hadoop.yarn.client.api.impl;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
|
@ -58,7 +42,6 @@ import org.apache.hadoop.yarn.api.records.NMToken;
|
|||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.ProfileCapability;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
|
@ -84,6 +67,22 @@ import org.junit.Test;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class TestNMClient {
|
||||
Configuration conf = null;
|
||||
MiniYARNCluster yarnCluster = null;
|
||||
|
@ -332,11 +331,9 @@ public class TestNMClient {
|
|||
racks, priority));
|
||||
}
|
||||
|
||||
ProfileCapability profileCapability =
|
||||
ProfileCapability.newInstance(capability);
|
||||
int containersRequestedAny = rmClient.getTable(0)
|
||||
.get(priority, ResourceRequest.ANY, ExecutionType.GUARANTEED,
|
||||
profileCapability).remoteRequest.getNumContainers();
|
||||
capability).remoteRequest.getNumContainers();
|
||||
|
||||
// RM should allocate container within 2 calls to allocate()
|
||||
int allocatedContainerCount = 0;
|
||||
|
|
|
@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.api.records.NMToken;
|
|||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.ProfileCapability;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
|
@ -100,7 +99,6 @@ public class TestOpportunisticContainerAllocationE2E {
|
|||
private static final long AM_EXPIRE_MS = 4000;
|
||||
|
||||
private static Resource capability;
|
||||
private static ProfileCapability profileCapability;
|
||||
private static Priority priority;
|
||||
private static Priority priority2;
|
||||
private static Priority priority3;
|
||||
|
@ -153,7 +151,6 @@ public class TestOpportunisticContainerAllocationE2E {
|
|||
priority3 = Priority.newInstance(3);
|
||||
priority4 = Priority.newInstance(4);
|
||||
capability = Resource.newInstance(512, 1);
|
||||
profileCapability = ProfileCapability.newInstance(capability);
|
||||
|
||||
node = nodeReports.get(0).getNodeId().getHost();
|
||||
rack = nodeReports.get(0).getRackName();
|
||||
|
@ -276,7 +273,7 @@ public class TestOpportunisticContainerAllocationE2E {
|
|||
|
||||
int oppContainersRequestedAny =
|
||||
amClient.getTable(0).get(priority2, ResourceRequest.ANY,
|
||||
ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
|
||||
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
|
||||
.getNumContainers();
|
||||
|
||||
assertEquals(1, oppContainersRequestedAny);
|
||||
|
@ -397,7 +394,7 @@ public class TestOpportunisticContainerAllocationE2E {
|
|||
new AMRMClient.ContainerRequest(capability, null, null, priority3));
|
||||
|
||||
int guarContainersRequestedAny = amClient.getTable(0).get(priority3,
|
||||
ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
|
||||
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
|
||||
.remoteRequest.getNumContainers();
|
||||
|
||||
assertEquals(1, guarContainersRequestedAny);
|
||||
|
@ -536,17 +533,17 @@ public class TestOpportunisticContainerAllocationE2E {
|
|||
ExecutionType.OPPORTUNISTIC, true)));
|
||||
|
||||
int containersRequestedNode = amClient.getTable(0).get(priority,
|
||||
node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
|
||||
node, ExecutionType.GUARANTEED, capability).remoteRequest
|
||||
.getNumContainers();
|
||||
int containersRequestedRack = amClient.getTable(0).get(priority,
|
||||
rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
|
||||
rack, ExecutionType.GUARANTEED, capability).remoteRequest
|
||||
.getNumContainers();
|
||||
int containersRequestedAny = amClient.getTable(0).get(priority,
|
||||
ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
|
||||
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
|
||||
.remoteRequest.getNumContainers();
|
||||
int oppContainersRequestedAny =
|
||||
amClient.getTable(0).get(priority2, ResourceRequest.ANY,
|
||||
ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
|
||||
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
|
||||
.getNumContainers();
|
||||
|
||||
assertEquals(4, containersRequestedNode);
|
||||
|
@ -568,17 +565,17 @@ public class TestOpportunisticContainerAllocationE2E {
|
|||
ExecutionType.OPPORTUNISTIC, true)));
|
||||
|
||||
containersRequestedNode = amClient.getTable(0).get(priority,
|
||||
node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
|
||||
node, ExecutionType.GUARANTEED, capability).remoteRequest
|
||||
.getNumContainers();
|
||||
containersRequestedRack = amClient.getTable(0).get(priority,
|
||||
rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
|
||||
rack, ExecutionType.GUARANTEED, capability).remoteRequest
|
||||
.getNumContainers();
|
||||
containersRequestedAny = amClient.getTable(0).get(priority,
|
||||
ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
|
||||
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
|
||||
.remoteRequest.getNumContainers();
|
||||
oppContainersRequestedAny =
|
||||
amClient.getTable(0).get(priority2, ResourceRequest.ANY,
|
||||
ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
|
||||
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
|
||||
.getNumContainers();
|
||||
|
||||
assertEquals(2, containersRequestedNode);
|
||||
|
@ -697,7 +694,7 @@ public class TestOpportunisticContainerAllocationE2E {
|
|||
|
||||
int oppContainersRequestedAny = amClient.getTable(0)
|
||||
.get(priority3, ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC,
|
||||
profileCapability).remoteRequest.getNumContainers();
|
||||
capability).remoteRequest.getNumContainers();
|
||||
|
||||
assertEquals(2, oppContainersRequestedAny);
|
||||
|
||||
|
|
|
@ -1,126 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.yarn.api.records.ProfileCapability;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ProfileCapabilityProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ProfileCapabilityProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
/**
|
||||
* Protobuf implementation for the ProfileCapability class.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class ProfileCapabilityPBImpl extends ProfileCapability {
|
||||
|
||||
private ProfileCapabilityProto proto =
|
||||
ProfileCapabilityProto.getDefaultInstance();
|
||||
private ProfileCapabilityProto.Builder builder;
|
||||
|
||||
private boolean viaProto;
|
||||
|
||||
private String profile;
|
||||
private Resource profileCapabilityOverride;
|
||||
|
||||
public ProfileCapabilityPBImpl() {
|
||||
builder = ProfileCapabilityProto.newBuilder();
|
||||
}
|
||||
|
||||
public ProfileCapabilityPBImpl(ProfileCapabilityProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getProfileName() {
|
||||
if (profile != null) {
|
||||
return profile;
|
||||
}
|
||||
ProfileCapabilityProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (p.hasProfile()) {
|
||||
profile = p.getProfile();
|
||||
}
|
||||
return profile;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getProfileCapabilityOverride() {
|
||||
if (profileCapabilityOverride != null) {
|
||||
return profileCapabilityOverride;
|
||||
}
|
||||
ProfileCapabilityProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (p.hasProfileCapabilityOverride()) {
|
||||
profileCapabilityOverride =
|
||||
Resources.clone(new ResourcePBImpl(p.getProfileCapabilityOverride()));
|
||||
}
|
||||
return profileCapabilityOverride;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setProfileName(String profileName) {
|
||||
this.profile = profileName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setProfileCapabilityOverride(Resource r) {
|
||||
this.profileCapabilityOverride = r;
|
||||
}
|
||||
|
||||
public ProfileCapabilityProto getProto() {
|
||||
mergeLocalToProto();
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
if (viaProto) {
|
||||
maybeInitBuilder();
|
||||
}
|
||||
mergeLocalToBuilder();
|
||||
proto = builder.build();
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
private void mergeLocalToBuilder() {
|
||||
if (profile != null) {
|
||||
builder.setProfile(profile);
|
||||
}
|
||||
if (profileCapabilityOverride != null) {
|
||||
builder.setProfileCapabilityOverride(
|
||||
convertToProtoFormat(profileCapabilityOverride));
|
||||
}
|
||||
}
|
||||
|
||||
private void maybeInitBuilder() {
|
||||
if (viaProto || builder == null) {
|
||||
builder = ProfileCapabilityProto.newBuilder(proto);
|
||||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
private YarnProtos.ResourceProto convertToProtoFormat(Resource res) {
|
||||
return ProtoUtils.convertToProtoFormat(res);
|
||||
}
|
||||
}
|
|
@ -23,10 +23,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.ProfileCapability;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ProfileCapabilityProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto;
|
||||
|
@ -42,8 +40,7 @@ public class ResourceRequestPBImpl extends ResourceRequest {
|
|||
private Priority priority = null;
|
||||
private Resource capability = null;
|
||||
private ExecutionTypeRequest executionTypeRequest = null;
|
||||
private ProfileCapability profile = null;
|
||||
|
||||
|
||||
|
||||
public ResourceRequestPBImpl() {
|
||||
builder = ResourceRequestProto.newBuilder();
|
||||
|
@ -72,9 +69,6 @@ public class ResourceRequestPBImpl extends ResourceRequest {
|
|||
builder.setExecutionTypeRequest(
|
||||
ProtoUtils.convertToProtoFormat(this.executionTypeRequest));
|
||||
}
|
||||
if (this.profile != null) {
|
||||
builder.setProfile(converToProtoFormat(this.profile));
|
||||
}
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
|
@ -236,7 +230,7 @@ public class ResourceRequestPBImpl extends ResourceRequest {
|
|||
+ ", Relax Locality: " + getRelaxLocality()
|
||||
+ ", Execution Type Request: " + getExecutionTypeRequest()
|
||||
+ ", Node Label Expression: " + getNodeLabelExpression()
|
||||
+ ", Resource Profile: " + getProfileCapability() + "}";
|
||||
+ "}";
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -257,34 +251,4 @@ public class ResourceRequestPBImpl extends ResourceRequest {
|
|||
}
|
||||
builder.setNodeLabelExpression(nodeLabelExpression);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setProfileCapability(ProfileCapability profileCapability) {
|
||||
maybeInitBuilder();
|
||||
if (profile == null) {
|
||||
builder.clearProfile();
|
||||
}
|
||||
this.profile = profileCapability;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProfileCapability getProfileCapability() {
|
||||
if (profile != null) {
|
||||
return profile;
|
||||
}
|
||||
ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (!p.hasProfile()) {
|
||||
return null;
|
||||
}
|
||||
return new ProfileCapabilityPBImpl(p.getProfile());
|
||||
}
|
||||
|
||||
private ProfileCapabilityProto converToProtoFormat(
|
||||
ProfileCapability profileCapability) {
|
||||
ProfileCapabilityPBImpl tmp = new ProfileCapabilityPBImpl();
|
||||
tmp.setProfileName(profileCapability.getProfileName());
|
||||
tmp.setProfileCapabilityOverride(
|
||||
profileCapability.getProfileCapabilityOverride());
|
||||
return tmp.getProto();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -132,7 +132,6 @@ import org.apache.hadoop.yarn.api.records.PreemptionContract;
|
|||
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
|
||||
import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.ProfileCapability;
|
||||
import org.apache.hadoop.yarn.api.records.QueueConfigurations;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||
|
@ -185,7 +184,6 @@ import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionContractPBImpl;
|
|||
import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionResourceRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ProfileCapabilityPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.QueueInfoPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.QueueUserACLInfoPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceBlacklistRequestPBImpl;
|
||||
|
@ -322,7 +320,6 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseP
|
|||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllResourceProfilesResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetResourceProfileRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetResourceProfileResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ProfileCapabilityProto;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesRequestPBImpl;
|
||||
|
@ -384,7 +381,6 @@ public class TestPBImplRecords extends BasePBImplRecordsTest {
|
|||
generateByNewInstance(NodeReport.class);
|
||||
generateByNewInstance(Token.class);
|
||||
generateByNewInstance(NMToken.class);
|
||||
generateByNewInstance(ProfileCapability.class);
|
||||
generateByNewInstance(ResourceRequest.class);
|
||||
generateByNewInstance(ApplicationAttemptReport.class);
|
||||
generateByNewInstance(ApplicationResourceUsageReport.class);
|
||||
|
@ -1215,12 +1211,6 @@ public class TestPBImplRecords extends BasePBImplRecordsTest {
|
|||
GetResourceProfileResponseProto.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProfileCapabilityPBImpl() throws Exception {
|
||||
validatePBImplRecord(ProfileCapabilityPBImpl.class,
|
||||
ProfileCapabilityProto.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResourceTypesInfoPBImpl() throws Exception {
|
||||
validatePBImplRecord(ResourceTypeInfoPBImpl.class,
|
||||
|
|
|
@ -1,109 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||
import org.apache.hadoop.yarn.api.records.ProfileCapability;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Test profile capability behavior.
|
||||
*/
|
||||
public class TestProfileCapability {
|
||||
@Before
|
||||
public void setup() {
|
||||
// Initialize resource map
|
||||
Map<String, ResourceInformation> riMap = new HashMap<>();
|
||||
|
||||
// Initialize mandatory resources
|
||||
riMap.put(ResourceInformation.MEMORY_URI, ResourceInformation.MEMORY_MB);
|
||||
riMap.put(ResourceInformation.VCORES_URI, ResourceInformation.VCORES);
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
String resourceName = "res-" + i;
|
||||
riMap.put(resourceName, ResourceInformation
|
||||
.newInstance(resourceName, "", 0, ResourceTypes.COUNTABLE, 0,
|
||||
Integer.MAX_VALUE));
|
||||
}
|
||||
|
||||
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertProfileCapabilityToResource() {
|
||||
Resource profile1 = Resource.newInstance(1, 1);
|
||||
profile1.setResourceValue("res-0", 1);
|
||||
profile1.setResourceValue("res-1", 1);
|
||||
|
||||
Resource profile2 = Resource.newInstance(2, 2);
|
||||
profile2.setResourceValue("res-0", 2);
|
||||
profile2.setResourceValue("res-1", 2);
|
||||
|
||||
Resource profile3 = Resource.newInstance(3, 3);
|
||||
profile3.setResourceValue("res-0", 3);
|
||||
profile3.setResourceValue("res-1", 3);
|
||||
|
||||
Map<String, Resource> profiles = ImmutableMap.of("profile1", profile1,
|
||||
"profile2", profile2, "profile3", profile3, "default", profile1);
|
||||
|
||||
// Test case 1, set override value to (1, 1, 0), since we only allow
|
||||
// overwrite for positive value, it is still profile1.
|
||||
ProfileCapability pc = ProfileCapability.newInstance("profile1",
|
||||
Resource.newInstance(1, 1));
|
||||
Assert.assertEquals(profile1, ProfileCapability.toResource(pc, profiles));
|
||||
|
||||
// Test case 2, similarly, negative value won't be respected.
|
||||
pc = ProfileCapability.newInstance("profile1",
|
||||
Resource.newInstance(1, -1));
|
||||
Assert.assertEquals(profile1, ProfileCapability.toResource(pc, profiles));
|
||||
|
||||
// Test case 3, do overwrite for memory and vcores, the result is (3,3,1,1)
|
||||
Resource expected = Resource.newInstance(3, 3);
|
||||
expected.setResourceValue("res-0", 1);
|
||||
expected.setResourceValue("res-1", 1);
|
||||
pc = ProfileCapability.newInstance("profile1",
|
||||
Resource.newInstance(3, 3));
|
||||
Assert.assertEquals(expected, ProfileCapability.toResource(pc, profiles));
|
||||
|
||||
// Test case 3, do overwrite for mem and res-1, the result is (3,1,3,1)
|
||||
expected = Resource.newInstance(3, 1);
|
||||
expected.setResourceValue("res-0", 3);
|
||||
expected.setResourceValue("res-1", 1);
|
||||
|
||||
Resource overwrite = Resource.newInstance(3, 0);
|
||||
overwrite.setResourceValue("res-0", 3);
|
||||
overwrite.setResourceValue("res-1", 0);
|
||||
|
||||
pc = ProfileCapability.newInstance("profile1", overwrite);
|
||||
Assert.assertEquals(expected, ProfileCapability.toResource(pc, profiles));
|
||||
|
||||
// Test case 4, when null profile is specified, use default.
|
||||
pc = ProfileCapability.newInstance("", null);
|
||||
Assert.assertEquals(profile1, ProfileCapability.toResource(pc, profiles));
|
||||
}
|
||||
}
|
|
@ -384,7 +384,6 @@ public class BuilderUtils {
|
|||
request.setNumContainers(r.getNumContainers());
|
||||
request.setNodeLabelExpression(r.getNodeLabelExpression());
|
||||
request.setExecutionTypeRequest(r.getExecutionTypeRequest());
|
||||
request.setProfileCapability(r.getProfileCapability());
|
||||
return request;
|
||||
}
|
||||
|
||||
|
|
|
@ -47,7 +47,6 @@ import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
|
|||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.ProfileCapability;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||
|
@ -67,7 +66,6 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
|||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceProfilesManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
|
||||
.RMAppAttemptState;
|
||||
|
@ -627,37 +625,6 @@ public class RMServerUtils {
|
|||
}
|
||||
}
|
||||
|
||||
public static void convertProfileToResourceCapability(ResourceRequest ask,
|
||||
Configuration conf, ResourceProfilesManager resourceProfilesManager)
|
||||
throws YarnException {
|
||||
|
||||
if (LOG_HANDLE.isDebugEnabled()) {
|
||||
LOG_HANDLE
|
||||
.debug("Converting profile to resource capability for ask " + ask);
|
||||
}
|
||||
|
||||
boolean profilesEnabled =
|
||||
conf.getBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED,
|
||||
YarnConfiguration.DEFAULT_RM_RESOURCE_PROFILES_ENABLED);
|
||||
if (!profilesEnabled) {
|
||||
if (ask.getProfileCapability() != null && !ask.getProfileCapability()
|
||||
.getProfileCapabilityOverride().equals(Resources.none())) {
|
||||
ask.setCapability(
|
||||
ask.getProfileCapability().getProfileCapabilityOverride());
|
||||
}
|
||||
} else {
|
||||
if (ask.getProfileCapability() != null) {
|
||||
ask.setCapability(ProfileCapability
|
||||
.toResource(ask.getProfileCapability(),
|
||||
resourceProfilesManager.getResourceProfiles()));
|
||||
}
|
||||
}
|
||||
if (LOG_HANDLE.isDebugEnabled()) {
|
||||
LOG_HANDLE
|
||||
.debug("Converted profile to resource capability for ask " + ask);
|
||||
}
|
||||
}
|
||||
|
||||
public static Long getOrDefault(Map<String, Long> map, String key,
|
||||
Long defaultValue) {
|
||||
if (map.containsKey(key)) {
|
||||
|
|
|
@ -40,12 +40,10 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.security.AccessType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||
|
@ -270,14 +268,6 @@ public class SchedulerUtils {
|
|||
private static void validateResourceRequest(ResourceRequest resReq,
|
||||
Resource maximumResource, QueueInfo queueInfo, RMContext rmContext)
|
||||
throws InvalidResourceRequestException {
|
||||
try {
|
||||
RMServerUtils.convertProfileToResourceCapability(resReq,
|
||||
rmContext.getYarnConfiguration(),
|
||||
rmContext.getResourceProfilesManager());
|
||||
} catch (YarnException ye) {
|
||||
throw new InvalidResourceRequestException(ye);
|
||||
}
|
||||
|
||||
Resource requestedResource = resReq.getCapability();
|
||||
for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) {
|
||||
ResourceInformation reqRI = requestedResource.getResourceInformation(i);
|
||||
|
|
|
@ -44,7 +44,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRespons
|
|||
import org.apache.hadoop.yarn.api.protocolrecords
|
||||
.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
|
@ -52,7 +51,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.ProfileCapability;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
|
@ -64,8 +62,6 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
|||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.MockResourceProfileManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceProfilesManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
|
@ -73,13 +69,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
|
|
|
@ -1,79 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.resource;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YARNFeatureNotEnabledException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Mock ResourceProfileManager for unit test.
|
||||
*/
|
||||
public class MockResourceProfileManager extends ResourceProfilesManagerImpl {
|
||||
private Map<String, Resource> profiles;
|
||||
private boolean featureEnabled;
|
||||
|
||||
public MockResourceProfileManager(Map<String, Resource> profiles) {
|
||||
this.profiles = new HashMap<>();
|
||||
this.profiles.putAll(profiles);
|
||||
|
||||
// Set minimum / maximum allocation so test doesn't need to add them
|
||||
// every time.
|
||||
this.profiles.put(ResourceProfilesManagerImpl.MINIMUM_PROFILE,
|
||||
ResourceUtils.getResourceTypesMinimumAllocation());
|
||||
this.profiles.put(ResourceProfilesManagerImpl.MAXIMUM_PROFILE,
|
||||
ResourceUtils.getResourceTypesMaximumAllocation());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Configuration config) throws IOException {
|
||||
this.featureEnabled = config.getBoolean(
|
||||
YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED,
|
||||
YarnConfiguration.DEFAULT_RM_RESOURCE_PROFILES_ENABLED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getProfile(String profile) throws YarnException {
|
||||
if (!featureEnabled) {
|
||||
throw new YARNFeatureNotEnabledException("");
|
||||
}
|
||||
return profiles.get(profile);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Resource> getResourceProfiles()
|
||||
throws YARNFeatureNotEnabledException {
|
||||
if (!featureEnabled) {
|
||||
throw new YARNFeatureNotEnabledException("");
|
||||
}
|
||||
return profiles;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reloadProfiles() throws IOException {
|
||||
throw new IOException("Not supported");
|
||||
}
|
||||
}
|
|
@ -18,27 +18,13 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||
import org.apache.hadoop.yarn.api.records.ProfileCapability;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.MockResourceProfileManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceProfilesManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
|
@ -46,7 +32,6 @@ import org.junit.Assert;
|
|||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -55,101 +40,6 @@ import java.util.Map;
|
|||
*/
|
||||
public class TestCapacitySchedulerWithMultiResourceTypes {
|
||||
private static String RESOURCE_1 = "res1";
|
||||
private final int GB = 1024;
|
||||
|
||||
@Test
|
||||
public void testBasicCapacitySchedulerWithProfile() throws Exception {
|
||||
|
||||
// Initialize resource map
|
||||
Map<String, ResourceInformation> riMap = new HashMap<>();
|
||||
|
||||
// Initialize mandatory resources
|
||||
ResourceInformation memory = ResourceInformation.newInstance(
|
||||
ResourceInformation.MEMORY_MB.getName(),
|
||||
ResourceInformation.MEMORY_MB.getUnits(),
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
||||
ResourceInformation vcores = ResourceInformation.newInstance(
|
||||
ResourceInformation.VCORES.getName(),
|
||||
ResourceInformation.VCORES.getUnits(),
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
|
||||
riMap.put(ResourceInformation.MEMORY_URI, memory);
|
||||
riMap.put(ResourceInformation.VCORES_URI, vcores);
|
||||
riMap.put(RESOURCE_1, ResourceInformation.newInstance(RESOURCE_1, "", 0,
|
||||
ResourceTypes.COUNTABLE, 0, Integer.MAX_VALUE));
|
||||
|
||||
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
||||
|
||||
CapacitySchedulerConfiguration csconf =
|
||||
new CapacitySchedulerConfiguration();
|
||||
csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f);
|
||||
csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f);
|
||||
csconf.setMaximumApplicationMasterResourcePerQueuePercent("root.default",
|
||||
100.0f);
|
||||
csconf.setMaximumAMResourcePercentPerPartition("root.default", "", 100.0f);
|
||||
csconf.setResourceComparator(DominantResourceCalculator.class);
|
||||
|
||||
YarnConfiguration conf = new YarnConfiguration(csconf);
|
||||
// Don't reset resource types since we have already configured resource
|
||||
// types
|
||||
conf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, false);
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, true);
|
||||
|
||||
final MockResourceProfileManager mrpm = new MockResourceProfileManager(
|
||||
ImmutableMap.of("res-1", TestUtils
|
||||
.createResource(2 * GB, 2, ImmutableMap.of(RESOURCE_1, 2))));
|
||||
|
||||
MockRM rm = new MockRM(conf) {
|
||||
@Override
|
||||
protected ResourceProfilesManager createResourceProfileManager() {
|
||||
return mrpm;
|
||||
}
|
||||
};
|
||||
rm.start();
|
||||
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
|
||||
|
||||
MockNM nm1 = rm.registerNode("h1:1234",
|
||||
TestUtils.createResource(8 * GB, 8, ImmutableMap.of(RESOURCE_1, 8)));
|
||||
|
||||
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
|
||||
|
||||
Assert.assertEquals(Resource.newInstance(1 * GB, 1),
|
||||
leafQueue.getUsedResources());
|
||||
|
||||
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
|
||||
|
||||
// Now request resource:
|
||||
am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability(
|
||||
Resource.newInstance(1 * GB, 1)).numContainers(1).resourceName("*")
|
||||
.profileCapability(ProfileCapability
|
||||
.newInstance("res-1",
|
||||
Resource.newInstance(2 * GB, 2))).build()),
|
||||
null);
|
||||
|
||||
// Do node heartbeats 1 time and check container allocated.
|
||||
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||
|
||||
// Now used resource = <mem=1GB, vcore=1> + <mem=2GB,vcore=2,res_1=2>
|
||||
Assert.assertEquals(
|
||||
TestUtils.createResource(3 * GB, 3, ImmutableMap.of(RESOURCE_1, 2)),
|
||||
leafQueue.getUsedResources());
|
||||
|
||||
// Acquire container
|
||||
AllocateResponse amResponse = am1.allocate(null, null);
|
||||
Assert.assertFalse(amResponse.getAllocatedContainers().isEmpty());
|
||||
ContainerTokenIdentifier containerTokenIdentifier =
|
||||
BuilderUtils.newContainerTokenIdentifier(
|
||||
amResponse.getAllocatedContainers().get(0).getContainerToken());
|
||||
Assert.assertEquals(
|
||||
TestUtils.createResource(2 * GB, 2, ImmutableMap.of(RESOURCE_1, 2)),
|
||||
containerTokenIdentifier.getResource());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaximumAllocationRefreshWithMultipleResourceTypes() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue