YARN-6247. Share a single instance of SubClusterResolver instead of instantiating one per AM. (Botong Huang via Subru)
This commit is contained in:
parent
5c486961cd
commit
51aeb2ce0c
|
@ -2594,6 +2594,12 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final String FEDERATION_MACHINE_LIST =
|
public static final String FEDERATION_MACHINE_LIST =
|
||||||
FEDERATION_PREFIX + "machine-list";
|
FEDERATION_PREFIX + "machine-list";
|
||||||
|
|
||||||
|
public static final String FEDERATION_CLUSTER_RESOLVER_CLASS =
|
||||||
|
FEDERATION_PREFIX + "subcluster-resolver.class";
|
||||||
|
public static final String DEFAULT_FEDERATION_CLUSTER_RESOLVER_CLASS =
|
||||||
|
"org.apache.hadoop.yarn.server.federation.resolver."
|
||||||
|
+ "DefaultSubClusterResolverImpl";
|
||||||
|
|
||||||
public static final String DEFAULT_FEDERATION_POLICY_KEY = "*";
|
public static final String DEFAULT_FEDERATION_POLICY_KEY = "*";
|
||||||
|
|
||||||
public static final String FEDERATION_POLICY_MANAGER = FEDERATION_PREFIX
|
public static final String FEDERATION_POLICY_MANAGER = FEDERATION_PREFIX
|
||||||
|
|
|
@ -2701,6 +2701,13 @@
|
||||||
</description>
|
</description>
|
||||||
<name>yarn.federation.machine-list</name>
|
<name>yarn.federation.machine-list</name>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<description>
|
||||||
|
Class name for SubClusterResolver
|
||||||
|
</description>
|
||||||
|
<name>yarn.federation.subcluster-resolver.class</name>
|
||||||
|
<value>org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>
|
<description>
|
||||||
|
|
|
@ -21,8 +21,8 @@ package org.apache.hadoop.yarn.server.federation.resolver;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -31,9 +31,9 @@ import java.util.Map;
|
||||||
*/
|
*/
|
||||||
public abstract class AbstractSubClusterResolver implements SubClusterResolver {
|
public abstract class AbstractSubClusterResolver implements SubClusterResolver {
|
||||||
private Map<String, SubClusterId> nodeToSubCluster =
|
private Map<String, SubClusterId> nodeToSubCluster =
|
||||||
new HashMap<String, SubClusterId>();
|
new ConcurrentHashMap<String, SubClusterId>();
|
||||||
private Map<String, Set<SubClusterId>> rackToSubClusters =
|
private Map<String, Set<SubClusterId>> rackToSubClusters =
|
||||||
new HashMap<String, Set<SubClusterId>>();
|
new ConcurrentHashMap<String, Set<SubClusterId>>();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SubClusterId getSubClusterForNode(String nodename)
|
public SubClusterId getSubClusterForNode(String nodename)
|
||||||
|
|
|
@ -25,8 +25,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An utility that helps to determine the sub-cluster that a specified node
|
* An utility that helps to determine the sub-cluster that a specified node or
|
||||||
* belongs to.
|
* rack belongs to. All implementing classes should be thread-safe.
|
||||||
*/
|
*/
|
||||||
public interface SubClusterResolver extends Configurable {
|
public interface SubClusterResolver extends Configurable {
|
||||||
|
|
||||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
|
||||||
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
|
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
|
||||||
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
|
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
|
||||||
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
|
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
|
||||||
|
@ -90,6 +91,7 @@ public final class FederationStateStoreFacade {
|
||||||
private int cacheTimeToLive;
|
private int cacheTimeToLive;
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private Cache<Object, Object> cache;
|
private Cache<Object, Object> cache;
|
||||||
|
private SubClusterResolver subclusterResolver;
|
||||||
|
|
||||||
private FederationStateStoreFacade() {
|
private FederationStateStoreFacade() {
|
||||||
initializeFacadeInternal(new Configuration());
|
initializeFacadeInternal(new Configuration());
|
||||||
|
@ -104,6 +106,12 @@ public final class FederationStateStoreFacade {
|
||||||
FederationStateStore.class, createRetryPolicy(conf));
|
FederationStateStore.class, createRetryPolicy(conf));
|
||||||
this.stateStore.init(conf);
|
this.stateStore.init(conf);
|
||||||
|
|
||||||
|
this.subclusterResolver = createInstance(conf,
|
||||||
|
YarnConfiguration.FEDERATION_CLUSTER_RESOLVER_CLASS,
|
||||||
|
YarnConfiguration.DEFAULT_FEDERATION_CLUSTER_RESOLVER_CLASS,
|
||||||
|
SubClusterResolver.class);
|
||||||
|
this.subclusterResolver.load();
|
||||||
|
|
||||||
initCache();
|
initCache();
|
||||||
|
|
||||||
} catch (YarnException ex) {
|
} catch (YarnException ex) {
|
||||||
|
@ -347,6 +355,15 @@ public final class FederationStateStoreFacade {
|
||||||
return response.getApplicationHomeSubCluster().getHomeSubCluster();
|
return response.getApplicationHomeSubCluster().getHomeSubCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the singleton instance of SubClusterResolver.
|
||||||
|
*
|
||||||
|
* @return SubClusterResolver instance
|
||||||
|
*/
|
||||||
|
public SubClusterResolver getSubClusterResolver() {
|
||||||
|
return this.subclusterResolver;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper method to create instances of Object using the class name defined in
|
* Helper method to create instances of Object using the class name defined in
|
||||||
* the configuration object. The instances creates {@link RetryProxy} using
|
* the configuration object. The instances creates {@link RetryProxy} using
|
||||||
|
@ -359,23 +376,40 @@ public final class FederationStateStoreFacade {
|
||||||
* @param retryPolicy the policy for retrying method call failures
|
* @param retryPolicy the policy for retrying method call failures
|
||||||
* @return a retry proxy for the specified interface
|
* @return a retry proxy for the specified interface
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public static <T> Object createRetryInstance(Configuration conf,
|
public static <T> Object createRetryInstance(Configuration conf,
|
||||||
String configuredClassName, String defaultValue, Class<T> type,
|
String configuredClassName, String defaultValue, Class<T> type,
|
||||||
RetryPolicy retryPolicy) {
|
RetryPolicy retryPolicy) {
|
||||||
|
|
||||||
|
return RetryProxy.create(type,
|
||||||
|
createInstance(conf, configuredClassName, defaultValue, type),
|
||||||
|
retryPolicy);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method to create instances of Object using the class name specified
|
||||||
|
* in the configuration object.
|
||||||
|
*
|
||||||
|
* @param conf the yarn configuration
|
||||||
|
* @param configuredClassName the configuration provider key
|
||||||
|
* @param defaultValue the default implementation class
|
||||||
|
* @param type the required interface/base class
|
||||||
|
* @param <T> The type of the instance to create
|
||||||
|
* @return the instances created
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public static <T> T createInstance(Configuration conf,
|
||||||
|
String configuredClassName, String defaultValue, Class<T> type) {
|
||||||
|
|
||||||
String className = conf.get(configuredClassName, defaultValue);
|
String className = conf.get(configuredClassName, defaultValue);
|
||||||
try {
|
try {
|
||||||
Class<?> clusterResolverClass = conf.getClassByName(className);
|
Class<?> clusterResolverClass = conf.getClassByName(className);
|
||||||
if (type.isAssignableFrom(clusterResolverClass)) {
|
if (type.isAssignableFrom(clusterResolverClass)) {
|
||||||
return RetryProxy.create(type,
|
return (T) ReflectionUtils.newInstance(clusterResolverClass, conf);
|
||||||
(T) ReflectionUtils.newInstance(clusterResolverClass, conf),
|
|
||||||
retryPolicy);
|
|
||||||
} else {
|
} else {
|
||||||
throw new YarnRuntimeException(
|
throw new YarnRuntimeException("Class: " + className
|
||||||
"Class: " + className + " not instance of " + type.getSimpleName());
|
+ " not instance of " + type.getCanonicalName());
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (ClassNotFoundException e) {
|
||||||
throw new YarnRuntimeException("Could not instantiate : " + className, e);
|
throw new YarnRuntimeException("Could not instantiate : " + className, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue