YARN-7307. Allow client/AM update supported resource types via YARN APIs. (Sunil G via wangda)

This commit is contained in:
Wangda Tan 2017-11-06 10:01:10 -08:00
parent e49509f024
commit 170b6a48c4
9 changed files with 173 additions and 10 deletions

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -204,4 +205,23 @@ public abstract class RegisterApplicationMasterResponse {
@Unstable @Unstable
public abstract void setSchedulerResourceTypes( public abstract void setSchedulerResourceTypes(
EnumSet<SchedulerResourceTypes> types); EnumSet<SchedulerResourceTypes> types);
/**
* Get available resource types supported by RM.
*
* @return a Map of RM settings
*/
@Public
@Unstable
public abstract List<ResourceTypeInfo> getResourceTypes();
/**
* Set the resource types used by RM.
*
* @param types
* a set of the resource types supported by RM.
*/
@Private
@Unstable
public abstract void setResourceTypes(List<ResourceTypeInfo> types);
} }

View File

@ -204,6 +204,12 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
Long.MAX_VALUE); Long.MAX_VALUE);
} }
public static ResourceInformation newInstance(String name, String units,
ResourceTypes resourceType) {
return ResourceInformation.newInstance(name, units, 0L, resourceType, 0L,
Long.MAX_VALUE);
}
public static ResourceInformation newInstance(String name, long value) { public static ResourceInformation newInstance(String name, long value) {
return ResourceInformation return ResourceInformation
.newInstance(name, "", value, ResourceTypes.COUNTABLE, 0L, .newInstance(name, "", value, ResourceTypes.COUNTABLE, 0L,

View File

@ -952,6 +952,17 @@ public class YarnConfiguration extends Configuration {
*/ */
public static final String RM_PROXY_USER_PREFIX = RM_PREFIX + "proxyuser."; public static final String RM_PROXY_USER_PREFIX = RM_PREFIX + "proxyuser.";
/**
* Enable/disable loading resource-types.xml at client side.
*/
@Public
@Unstable
public static final String YARN_CLIENT_LOAD_RESOURCETYPES_FROM_SERVER = YARN_PREFIX
+ "client.load.resource-types.from-server";
@Public
@Unstable
public static final boolean DEFAULT_YARN_CLIENT_LOAD_RESOURCETYPES_FROM_SERVER = false;
/** /**
* Timeout in seconds for YARN node graceful decommission. * Timeout in seconds for YARN node graceful decommission.
* This is the maximal time to wait for running containers and applications * This is the maximal time to wait for running containers and applications

View File

@ -22,8 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -617,4 +615,25 @@ public class ResourceUtils {
return result; return result;
} }
/**
* Reinitialize all resource types from external source (in case of client,
* server will send the updated list and local resourceutils cache will be
* updated as per server's list of resources)
*
* @param resourceTypeInfo
* List of resource types
*/
public static void reinitializeResources(
List<ResourceTypeInfo> resourceTypeInfo) {
Map<String, ResourceInformation> resourceInformationMap = new HashMap<>();
for (ResourceTypeInfo resourceType : resourceTypeInfo) {
resourceInformationMap.put(resourceType.getName(),
ResourceInformation.newInstance(resourceType.getName(),
resourceType.getDefaultUnit(), resourceType.getResourceType()));
}
ResourceUtils
.initializeResourcesFromResourceInformationMap(resourceInformationMap);
}
} }

View File

@ -48,6 +48,7 @@ message RegisterApplicationMasterResponseProto {
optional string queue = 5; optional string queue = 5;
repeated NMTokenProto nm_tokens_from_previous_attempts = 6; repeated NMTokenProto nm_tokens_from_previous_attempts = 6;
repeated SchedulerResourceTypes scheduler_resource_types = 7; repeated SchedulerResourceTypes scheduler_resource_types = 7;
repeated ResourceTypeInfoProto resource_types = 9;
} }
message FinishApplicationMasterRequestProto { message FinishApplicationMasterRequestProto {

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -101,6 +102,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
@ -121,6 +123,7 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -147,6 +150,7 @@ public class YarnClientImpl extends YarnClient {
String timelineDTRenewer; String timelineDTRenewer;
private boolean timelineV1ServiceEnabled; private boolean timelineV1ServiceEnabled;
protected boolean timelineServiceBestEffort; protected boolean timelineServiceBestEffort;
private boolean loadResourceTypesFromServer;
private static final String ROOT = "root"; private static final String ROOT = "root";
@ -198,6 +202,11 @@ public class YarnClientImpl extends YarnClient {
timelineServiceBestEffort = conf.getBoolean( timelineServiceBestEffort = conf.getBoolean(
YarnConfiguration.TIMELINE_SERVICE_CLIENT_BEST_EFFORT, YarnConfiguration.TIMELINE_SERVICE_CLIENT_BEST_EFFORT,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_BEST_EFFORT); YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_BEST_EFFORT);
loadResourceTypesFromServer = conf.getBoolean(
YarnConfiguration.YARN_CLIENT_LOAD_RESOURCETYPES_FROM_SERVER,
YarnConfiguration.DEFAULT_YARN_CLIENT_LOAD_RESOURCETYPES_FROM_SERVER);
super.serviceInit(conf); super.serviceInit(conf);
} }
@ -216,6 +225,13 @@ public class YarnClientImpl extends YarnClient {
} catch (IOException e) { } catch (IOException e) {
throw new YarnRuntimeException(e); throw new YarnRuntimeException(e);
} }
// Reinitialize local resource types cache from list of resources pulled from
// RM.
if (loadResourceTypesFromServer) {
ResourceUtils.reinitializeResources(getResourceTypeInfo());
}
super.serviceStart(); super.serviceStart();
} }

View File

@ -18,10 +18,8 @@
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import com.google.protobuf.ByteString;
import java.nio.ByteBuffer; import com.google.protobuf.TextFormat;
import java.util.*;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@ -29,21 +27,28 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceTypeInfoPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceTypeInfoProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import com.google.protobuf.ByteString; import java.nio.ByteBuffer;
import com.google.protobuf.TextFormat; import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@Private @Private
@Unstable @Unstable
@ -59,6 +64,7 @@ public class RegisterApplicationMasterResponsePBImpl extends
private List<Container> containersFromPreviousAttempts = null; private List<Container> containersFromPreviousAttempts = null;
private List<NMToken> nmTokens = null; private List<NMToken> nmTokens = null;
private EnumSet<SchedulerResourceTypes> schedulerResourceTypes = null; private EnumSet<SchedulerResourceTypes> schedulerResourceTypes = null;
private List<ResourceTypeInfo> resourceTypeInfo = null;
public RegisterApplicationMasterResponsePBImpl() { public RegisterApplicationMasterResponsePBImpl() {
builder = RegisterApplicationMasterResponseProto.newBuilder(); builder = RegisterApplicationMasterResponseProto.newBuilder();
@ -123,9 +129,11 @@ public class RegisterApplicationMasterResponsePBImpl extends
if(schedulerResourceTypes != null) { if(schedulerResourceTypes != null) {
addSchedulerResourceTypes(); addSchedulerResourceTypes();
} }
if (resourceTypeInfo != null) {
addResourceTypeInfosToProto();
}
} }
private void maybeInitBuilder() { private void maybeInitBuilder() {
if (viaProto || builder == null) { if (viaProto || builder == null) {
builder = RegisterApplicationMasterResponseProto.newBuilder(proto); builder = RegisterApplicationMasterResponseProto.newBuilder(proto);
@ -456,4 +464,75 @@ public class RegisterApplicationMasterResponsePBImpl extends
private NMToken convertFromProtoFormat(NMTokenProto proto) { private NMToken convertFromProtoFormat(NMTokenProto proto) {
return new NMTokenPBImpl(proto); return new NMTokenPBImpl(proto);
} }
private ResourceTypeInfoPBImpl convertFromProtoFormat(
ResourceTypeInfoProto p) {
return new ResourceTypeInfoPBImpl(p);
}
private ResourceTypeInfoProto convertToProtoFormat(ResourceTypeInfo t) {
return ((ResourceTypeInfoPBImpl) t).getProto();
}
@Override
public List<ResourceTypeInfo> getResourceTypes() {
initResourceTypeInfosList();
return this.resourceTypeInfo;
}
@Override
public void setResourceTypes(List<ResourceTypeInfo> types) {
if (resourceTypeInfo == null) {
builder.clearResourceTypes();
}
this.resourceTypeInfo = types;
}
private void addResourceTypeInfosToProto() {
maybeInitBuilder();
builder.clearResourceTypes();
if (resourceTypeInfo == null) {
return;
}
Iterable<ResourceTypeInfoProto> iterable = new Iterable<ResourceTypeInfoProto>() {
@Override
public Iterator<ResourceTypeInfoProto> iterator() {
return new Iterator<ResourceTypeInfoProto>() {
Iterator<ResourceTypeInfo> iter = resourceTypeInfo.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public ResourceTypeInfoProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllResourceTypes(iterable);
}
private void initResourceTypeInfosList() {
if (this.resourceTypeInfo != null) {
return;
}
RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ResourceTypeInfoProto> list = p.getResourceTypesList();
resourceTypeInfo = new ArrayList<ResourceTypeInfo>();
for (ResourceTypeInfoProto a : list) {
resourceTypeInfo.add(convertFromProtoFormat(a));
}
}
} }

View File

@ -3433,4 +3433,13 @@
to specify details about the individual resource types. to specify details about the individual resource types.
</description> </description>
</property> </property>
<property>
<description>
Provides an option for client to load supported resource types from RM
instead of depending on local resource-types.xml file.
</description>
<name>yarn.client.load.resource-types.from-server</name>
<value>false</value>
</property>
</configuration> </configuration>

View File

@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.SchedulerNodeReport; .SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import java.io.IOException; import java.io.IOException;
@ -177,6 +178,7 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
response.setSchedulerResourceTypes(getScheduler() response.setSchedulerResourceTypes(getScheduler()
.getSchedulingResourceTypes()); .getSchedulingResourceTypes());
response.setResourceTypes(ResourceUtils.getResourcesTypeInfo());
} }
@Override @Override