YARN-7307. Allow client/AM update supported resource types via YARN APIs. (Sunil G via wangda)
Change-Id: I14c5ea7252b7c17e86ab38f692b5f9d43196dbe0
This commit is contained in:
parent
a25b5aa0cf
commit
36e158ae98
|
@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
|
@ -205,11 +206,41 @@ public abstract class RegisterApplicationMasterResponse {
|
||||||
public abstract void setSchedulerResourceTypes(
|
public abstract void setSchedulerResourceTypes(
|
||||||
EnumSet<SchedulerResourceTypes> types);
|
EnumSet<SchedulerResourceTypes> types);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get list of supported resource profiles from RM.
|
||||||
|
*
|
||||||
|
* @return a map of resource profiles and its capabilities.
|
||||||
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Unstable
|
@Unstable
|
||||||
public abstract Map<String, Resource> getResourceProfiles();
|
public abstract Map<String, Resource> getResourceProfiles();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set supported resource profiles for RM.
|
||||||
|
*
|
||||||
|
* @param profiles
|
||||||
|
* a map of resource profiles with its capabilities.
|
||||||
|
*/
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public abstract void setResourceProfiles(Map<String, Resource> profiles);
|
public abstract void setResourceProfiles(Map<String, Resource> profiles);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get available resource types supported by RM.
|
||||||
|
*
|
||||||
|
* @return a Map of RM settings
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public abstract List<ResourceTypeInfo> getResourceTypes();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the resource types used by RM.
|
||||||
|
*
|
||||||
|
* @param types
|
||||||
|
* a set of the resource types supported by RM.
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
@Unstable
|
||||||
|
public abstract void setResourceTypes(List<ResourceTypeInfo> types);
|
||||||
}
|
}
|
||||||
|
|
|
@ -215,6 +215,12 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
|
||||||
Long.MAX_VALUE);
|
Long.MAX_VALUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static ResourceInformation newInstance(String name, String units,
|
||||||
|
ResourceTypes resourceType) {
|
||||||
|
return ResourceInformation.newInstance(name, units, 0L, resourceType, 0L,
|
||||||
|
Long.MAX_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
public static ResourceInformation newInstance(String name, long value) {
|
public static ResourceInformation newInstance(String name, long value) {
|
||||||
return ResourceInformation
|
return ResourceInformation
|
||||||
.newInstance(name, "", value, ResourceTypes.COUNTABLE, 0L,
|
.newInstance(name, "", value, ResourceTypes.COUNTABLE, 0L,
|
||||||
|
|
|
@ -975,6 +975,17 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final String DEFAULT_RM_RESOURCE_PROFILES_SOURCE_FILE =
|
public static final String DEFAULT_RM_RESOURCE_PROFILES_SOURCE_FILE =
|
||||||
"resource-profiles.json";
|
"resource-profiles.json";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enable/disable loading resource-types.xml at client side.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public static final String YARN_CLIENT_LOAD_RESOURCETYPES_FROM_SERVER = YARN_PREFIX
|
||||||
|
+ "client.load.resource-types.from-server";
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public static final boolean DEFAULT_YARN_CLIENT_LOAD_RESOURCETYPES_FROM_SERVER = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Timeout in seconds for YARN node graceful decommission.
|
* Timeout in seconds for YARN node graceful decommission.
|
||||||
* This is the maximal time to wait for running containers and applications
|
* This is the maximal time to wait for running containers and applications
|
||||||
|
|
|
@ -44,8 +44,6 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper class to read the resource-types to be supported by the system.
|
* Helper class to read the resource-types to be supported by the system.
|
||||||
*/
|
*/
|
||||||
|
@ -581,4 +579,25 @@ public class ResourceUtils {
|
||||||
}
|
}
|
||||||
return array;
|
return array;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reinitialize all resource types from external source (in case of client,
|
||||||
|
* server will send the updated list and local resourceutils cache will be
|
||||||
|
* updated as per server's list of resources)
|
||||||
|
*
|
||||||
|
* @param resourceTypeInfo
|
||||||
|
* List of resource types
|
||||||
|
*/
|
||||||
|
public static void reinitializeResources(
|
||||||
|
List<ResourceTypeInfo> resourceTypeInfo) {
|
||||||
|
Map<String, ResourceInformation> resourceInformationMap = new HashMap<>();
|
||||||
|
|
||||||
|
for (ResourceTypeInfo resourceType : resourceTypeInfo) {
|
||||||
|
resourceInformationMap.put(resourceType.getName(),
|
||||||
|
ResourceInformation.newInstance(resourceType.getName(),
|
||||||
|
resourceType.getDefaultUnit(), resourceType.getResourceType()));
|
||||||
|
}
|
||||||
|
ResourceUtils
|
||||||
|
.initializeResourcesFromResourceInformationMap(resourceInformationMap);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,6 +49,7 @@ message RegisterApplicationMasterResponseProto {
|
||||||
repeated NMTokenProto nm_tokens_from_previous_attempts = 6;
|
repeated NMTokenProto nm_tokens_from_previous_attempts = 6;
|
||||||
repeated SchedulerResourceTypes scheduler_resource_types = 7;
|
repeated SchedulerResourceTypes scheduler_resource_types = 7;
|
||||||
optional ResourceProfilesProto resource_profiles = 8;
|
optional ResourceProfilesProto resource_profiles = 8;
|
||||||
|
repeated ResourceTypeInfoProto resource_types = 9;
|
||||||
}
|
}
|
||||||
|
|
||||||
message FinishApplicationMasterRequestProto {
|
message FinishApplicationMasterRequestProto {
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -105,6 +106,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
|
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
||||||
import org.apache.hadoop.yarn.api.records.Token;
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
|
@ -125,6 +127,7 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
@ -148,6 +151,7 @@ public class YarnClientImpl extends YarnClient {
|
||||||
String timelineDTRenewer;
|
String timelineDTRenewer;
|
||||||
private boolean timelineV1ServiceEnabled;
|
private boolean timelineV1ServiceEnabled;
|
||||||
protected boolean timelineServiceBestEffort;
|
protected boolean timelineServiceBestEffort;
|
||||||
|
private boolean loadResourceTypesFromServer;
|
||||||
|
|
||||||
private static final String ROOT = "root";
|
private static final String ROOT = "root";
|
||||||
|
|
||||||
|
@ -199,6 +203,11 @@ public class YarnClientImpl extends YarnClient {
|
||||||
timelineServiceBestEffort = conf.getBoolean(
|
timelineServiceBestEffort = conf.getBoolean(
|
||||||
YarnConfiguration.TIMELINE_SERVICE_CLIENT_BEST_EFFORT,
|
YarnConfiguration.TIMELINE_SERVICE_CLIENT_BEST_EFFORT,
|
||||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_BEST_EFFORT);
|
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_BEST_EFFORT);
|
||||||
|
|
||||||
|
loadResourceTypesFromServer = conf.getBoolean(
|
||||||
|
YarnConfiguration.YARN_CLIENT_LOAD_RESOURCETYPES_FROM_SERVER,
|
||||||
|
YarnConfiguration.DEFAULT_YARN_CLIENT_LOAD_RESOURCETYPES_FROM_SERVER);
|
||||||
|
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -217,6 +226,13 @@ public class YarnClientImpl extends YarnClient {
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new YarnRuntimeException(e);
|
throw new YarnRuntimeException(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reinitialize local resource types cache from list of resources pulled from
|
||||||
|
// RM.
|
||||||
|
if (loadResourceTypesFromServer) {
|
||||||
|
ResourceUtils.reinitializeResources(getResourceTypeInfo());
|
||||||
|
}
|
||||||
|
|
||||||
super.serviceStart();
|
super.serviceStart();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -29,15 +29,18 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceTypeInfoPBImpl;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProfilesProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProfilesProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProfileEntry;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProfileEntry;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceTypeInfoProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder;
|
||||||
|
@ -62,6 +65,7 @@ public class RegisterApplicationMasterResponsePBImpl extends
|
||||||
private List<NMToken> nmTokens = null;
|
private List<NMToken> nmTokens = null;
|
||||||
private EnumSet<SchedulerResourceTypes> schedulerResourceTypes = null;
|
private EnumSet<SchedulerResourceTypes> schedulerResourceTypes = null;
|
||||||
private Map<String, Resource> profiles = null;
|
private Map<String, Resource> profiles = null;
|
||||||
|
private List<ResourceTypeInfo> resourceTypeInfo = null;
|
||||||
|
|
||||||
public RegisterApplicationMasterResponsePBImpl() {
|
public RegisterApplicationMasterResponsePBImpl() {
|
||||||
builder = RegisterApplicationMasterResponseProto.newBuilder();
|
builder = RegisterApplicationMasterResponseProto.newBuilder();
|
||||||
|
@ -129,9 +133,11 @@ public class RegisterApplicationMasterResponsePBImpl extends
|
||||||
if (profiles != null) {
|
if (profiles != null) {
|
||||||
addResourceProfiles();
|
addResourceProfiles();
|
||||||
}
|
}
|
||||||
|
if (resourceTypeInfo != null) {
|
||||||
|
addResourceTypeInfosToProto();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private void maybeInitBuilder() {
|
private void maybeInitBuilder() {
|
||||||
if (viaProto || builder == null) {
|
if (viaProto || builder == null) {
|
||||||
builder = RegisterApplicationMasterResponseProto.newBuilder(proto);
|
builder = RegisterApplicationMasterResponseProto.newBuilder(proto);
|
||||||
|
@ -514,4 +520,75 @@ public class RegisterApplicationMasterResponsePBImpl extends
|
||||||
private NMToken convertFromProtoFormat(NMTokenProto proto) {
|
private NMToken convertFromProtoFormat(NMTokenProto proto) {
|
||||||
return new NMTokenPBImpl(proto);
|
return new NMTokenPBImpl(proto);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ResourceTypeInfoPBImpl convertFromProtoFormat(
|
||||||
|
ResourceTypeInfoProto p) {
|
||||||
|
return new ResourceTypeInfoPBImpl(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ResourceTypeInfoProto convertToProtoFormat(ResourceTypeInfo t) {
|
||||||
|
return ((ResourceTypeInfoPBImpl) t).getProto();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ResourceTypeInfo> getResourceTypes() {
|
||||||
|
initResourceTypeInfosList();
|
||||||
|
return this.resourceTypeInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setResourceTypes(List<ResourceTypeInfo> types) {
|
||||||
|
if (resourceTypeInfo == null) {
|
||||||
|
builder.clearResourceTypes();
|
||||||
|
}
|
||||||
|
this.resourceTypeInfo = types;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addResourceTypeInfosToProto() {
|
||||||
|
maybeInitBuilder();
|
||||||
|
builder.clearResourceTypes();
|
||||||
|
if (resourceTypeInfo == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Iterable<ResourceTypeInfoProto> iterable = new Iterable<ResourceTypeInfoProto>() {
|
||||||
|
@Override
|
||||||
|
public Iterator<ResourceTypeInfoProto> iterator() {
|
||||||
|
return new Iterator<ResourceTypeInfoProto>() {
|
||||||
|
|
||||||
|
Iterator<ResourceTypeInfo> iter = resourceTypeInfo.iterator();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
return iter.hasNext();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ResourceTypeInfoProto next() {
|
||||||
|
return convertToProtoFormat(iter.next());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
};
|
||||||
|
builder.addAllResourceTypes(iterable);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initResourceTypeInfosList() {
|
||||||
|
if (this.resourceTypeInfo != null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
List<ResourceTypeInfoProto> list = p.getResourceTypesList();
|
||||||
|
resourceTypeInfo = new ArrayList<ResourceTypeInfo>();
|
||||||
|
|
||||||
|
for (ResourceTypeInfoProto a : list) {
|
||||||
|
resourceTypeInfo.add(convertFromProtoFormat(a));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3488,5 +3488,13 @@
|
||||||
<value>auto</value>
|
<value>auto</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>
|
||||||
|
Provides an option for client to load supported resource types from RM
|
||||||
|
instead of depending on local resource-types.xml file.
|
||||||
|
</description>
|
||||||
|
<name>yarn.client.load.resource-types.from-server</name>
|
||||||
|
<value>false</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
</configuration>
|
</configuration>
|
||||||
|
|
|
@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||||
.SchedulerNodeReport;
|
.SchedulerNodeReport;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -180,6 +181,7 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
|
||||||
|
|
||||||
response.setSchedulerResourceTypes(getScheduler()
|
response.setSchedulerResourceTypes(getScheduler()
|
||||||
.getSchedulingResourceTypes());
|
.getSchedulingResourceTypes());
|
||||||
|
response.setResourceTypes(ResourceUtils.getResourcesTypeInfo());
|
||||||
if (getRmContext().getYarnConfiguration().getBoolean(
|
if (getRmContext().getYarnConfiguration().getBoolean(
|
||||||
YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED,
|
YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED,
|
||||||
YarnConfiguration.DEFAULT_RM_RESOURCE_PROFILES_ENABLED)) {
|
YarnConfiguration.DEFAULT_RM_RESOURCE_PROFILES_ENABLED)) {
|
||||||
|
|
Loading…
Reference in New Issue