YARN-3672. Create Facade for Federation State and Policy Store. Contributed by Subru Krishnan

This commit is contained in:
Jian He 2016-08-17 11:13:19 +08:00 committed by Carlo Curino
parent e4f928cf9c
commit 5e976cd2b9
9 changed files with 905 additions and 1 deletions

View File

@ -98,6 +98,9 @@
<apacheds.version>2.0.0-M21</apacheds.version>
<ldap-api.version>1.0.0-M33</ldap-api.version>
<jcache.version>1.0.0</jcache.version>
<ehcache.version>3.0.3</ehcache.version>
<!-- define the Java language version used by the compiler -->
<javac.version>1.8</javac.version>
@ -1265,6 +1268,16 @@
<artifactId>kerb-simplekdc</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>javax.cache</groupId>
<artifactId>cache-api</artifactId>
<version>${jcache.version}</version>
</dependency>
<dependency>
<groupId>org.ehcache</groupId>
<artifactId>ehcache</artifactId>
<version>${ehcache.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -2560,6 +2560,19 @@ public class YarnConfiguration extends Configuration {
////////////////////////////////
public static final String FEDERATION_PREFIX = YARN_PREFIX + "federation.";
public static final String FEDERATION_STATESTORE_CLIENT_CLASS =
FEDERATION_PREFIX + "state-store.class";
public static final String DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS =
"org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore";
public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS =
FEDERATION_PREFIX + "cache-ttl.secs";
// 5 minutes
public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
public static final String FEDERATION_MACHINE_LIST =
FEDERATION_PREFIX + "machine-list";

View File

@ -68,6 +68,10 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
.YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCETRACKER_PROTOCOL);
configurationPropsToSkipCompare.add(YarnConfiguration.CURATOR_LEADER_ELECTOR);
// Federation default configs to be ignored
configurationPropsToSkipCompare
.add(YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS);
// Ignore blacklisting nodes for AM failures feature since it is still a
// "work in progress"
configurationPropsToSkipCompare.add(YarnConfiguration.

View File

@ -2686,8 +2686,8 @@
<description>The arguments to pass to the Node label script.</description>
<name>yarn.nodemanager.node-labels.provider.script.opts</name>
</property>
<!-- Other Configuration -->
<!-- Federation Configuration -->
<property>
<description>
Machine list file to be loaded by the FederationSubCluster Resolver
@ -2695,6 +2695,24 @@
<name>yarn.federation.machine-list</name>
</property>
<property>
<description>
Store class name for federation state store
</description>
<name>yarn.federation.state-store.class</name>
<value>org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore</value>
</property>
<property>
<description>
The time in seconds after which the federation state store local cache
will be refreshed periodically
</description>
<name>yarn.federation.cache-ttl.secs</name>
<value>300</value>
</property>
<!-- Other Configuration -->
<property>
<description>The interval that the yarn client library uses to poll the
completion status of the asynchronous API of application client protocol.

View File

@ -102,6 +102,16 @@
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
</dependency>
<dependency>
<groupId>javax.cache</groupId>
<artifactId>cache-api</artifactId>
<version>${jcache.version}</version>
</dependency>
<dependency>
<groupId>org.ehcache</groupId>
<artifactId>ehcache</artifactId>
<version>${ehcache.version}</version>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,532 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.utils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.CompleteConfiguration;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.integration.CacheLoader;
import javax.cache.integration.CacheLoaderException;
import javax.cache.spi.CachingProvider;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
*
* The FederationStateStoreFacade is an utility wrapper that provides singleton
* access to the Federation state store. It abstracts out retries and in
* addition, it also implements the caching for various objects.
*
*/
public final class FederationStateStoreFacade {
private static final Logger LOG =
LoggerFactory.getLogger(FederationStateStoreFacade.class);
private static final String GET_SUBCLUSTERS_CACHEID = "getSubClusters";
private static final String GET_POLICIES_CONFIGURATIONS_CACHEID =
"getPoliciesConfigurations";
private static final FederationStateStoreFacade FACADE =
new FederationStateStoreFacade();
private FederationStateStore stateStore;
private int cacheTimeToLive;
private Configuration conf;
private Cache<Object, Object> cache;
private FederationStateStoreFacade() {
initializeFacadeInternal(new Configuration());
}
private void initializeFacadeInternal(Configuration config) {
this.conf = config;
try {
this.stateStore = (FederationStateStore) createRetryInstance(this.conf,
YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS,
FederationStateStore.class, createRetryPolicy(conf));
this.stateStore.init(conf);
initCache();
} catch (YarnException ex) {
LOG.error("Failed to initialize the FederationStateStoreFacade object",
ex);
throw new RuntimeException(ex);
}
}
/**
* Delete and re-initialize the cache, to force it to use the given
* configuration.
*
* @param store the {@link FederationStateStore} instance to reinitialize with
* @param config the updated configuration to reinitialize with
*/
@VisibleForTesting
public synchronized void reinitialize(FederationStateStore store,
Configuration config) {
this.conf = config;
this.stateStore = store;
clearCache();
initCache();
}
public static RetryPolicy createRetryPolicy(Configuration conf) {
// Retry settings for StateStore
RetryPolicy retryPolicy = RetryPolicies.exponentialBackoffRetry(
conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, Integer.SIZE),
conf.getLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS,
YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS),
TimeUnit.MILLISECONDS);
return retryPolicy;
}
private boolean isCachingEnabled() {
return (cacheTimeToLive > 0);
}
private void initCache() {
// Picking the JCache provider from classpath, need to make sure there's
// no conflict or pick up a specific one in the future
cacheTimeToLive =
conf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS,
YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS);
if (isCachingEnabled()) {
CachingProvider jcacheProvider = Caching.getCachingProvider();
CacheManager jcacheManager = jcacheProvider.getCacheManager();
this.cache = jcacheManager.getCache(this.getClass().getSimpleName());
if (this.cache == null) {
LOG.info("Creating a JCache Manager with name "
+ this.getClass().getSimpleName());
Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive);
CompleteConfiguration<Object, Object> configuration =
new MutableConfiguration<Object, Object>().setStoreByValue(false)
.setReadThrough(true)
.setExpiryPolicyFactory(
new FactoryBuilder.SingletonFactory<ExpiryPolicy>(
new CreatedExpiryPolicy(cacheExpiry)))
.setCacheLoaderFactory(
new FactoryBuilder.SingletonFactory<CacheLoader<Object, Object>>(
new CacheLoaderImpl<Object, Object>()));
this.cache = jcacheManager.createCache(this.getClass().getSimpleName(),
configuration);
}
}
}
private void clearCache() {
CachingProvider jcacheProvider = Caching.getCachingProvider();
CacheManager jcacheManager = jcacheProvider.getCacheManager();
jcacheManager.destroyCache(this.getClass().getSimpleName());
this.cache = null;
}
/**
* Returns the singleton instance of the FederationStateStoreFacade object.
*
* @return the singleton {@link FederationStateStoreFacade} instance
*/
public static FederationStateStoreFacade getInstance() {
return FACADE;
}
/**
* Returns the {@link SubClusterInfo} for the specified {@link SubClusterId}.
*
* @param subClusterId the identifier of the sub-cluster
* @return the sub cluster information
* @throws YarnException if the call to the state store is unsuccessful
*/
public SubClusterInfo getSubCluster(final SubClusterId subClusterId)
throws YarnException {
if (isCachingEnabled()) {
return getSubClusters(false).get(subClusterId);
} else {
return stateStore
.getSubCluster(GetSubClusterInfoRequest.newInstance(subClusterId))
.getSubClusterInfo();
}
}
/**
* Updates the cache with the central {@link FederationStateStore} and returns
* the {@link SubClusterInfo} for the specified {@link SubClusterId}.
*
* @param subClusterId the identifier of the sub-cluster
* @param flushCache flag to indicate if the cache should be flushed or not
* @return the sub cluster information
* @throws YarnException if the call to the state store is unsuccessful
*/
public SubClusterInfo getSubCluster(final SubClusterId subClusterId,
final boolean flushCache) throws YarnException {
if (flushCache && isCachingEnabled()) {
LOG.info("Flushing subClusters from cache and rehydrating from store,"
+ " most likely on account of RM failover.");
cache.remove(buildGetSubClustersCacheRequest(false));
}
return getSubCluster(subClusterId);
}
/**
* Returns the {@link SubClusterInfo} of all active sub cluster(s).
*
* @param filterInactiveSubClusters whether to filter out inactive
* sub-clusters
* @return the information of all active sub cluster(s)
* @throws YarnException if the call to the state store is unsuccessful
*/
@SuppressWarnings("unchecked")
public Map<SubClusterId, SubClusterInfo> getSubClusters(
final boolean filterInactiveSubClusters) throws YarnException {
try {
if (isCachingEnabled()) {
return (Map<SubClusterId, SubClusterInfo>) cache
.get(buildGetSubClustersCacheRequest(filterInactiveSubClusters));
} else {
return buildSubClusterInfoMap(stateStore.getSubClusters(
GetSubClustersInfoRequest.newInstance(filterInactiveSubClusters)));
}
} catch (Throwable ex) {
throw new YarnException(ex);
}
}
/**
* Returns the {@link SubClusterPolicyConfiguration} for the specified queue.
*
* @param queue the queue whose policy is required
* @return the corresponding configured policy
* @throws YarnException if the call to the state store is unsuccessful
*/
public SubClusterPolicyConfiguration getPolicyConfiguration(
final String queue) throws YarnException {
if (isCachingEnabled()) {
return getPoliciesConfigurations().get(queue);
} else {
return stateStore
.getPolicyConfiguration(
GetSubClusterPolicyConfigurationRequest.newInstance(queue))
.getPolicyConfiguration();
}
}
/**
* Get the policies that is represented as
* {@link SubClusterPolicyConfiguration} for all currently active queues in
* the system.
*
* @return the policies for all currently active queues in the system
* @throws YarnException if the call to the state store is unsuccessful
*/
@SuppressWarnings("unchecked")
public Map<String, SubClusterPolicyConfiguration> getPoliciesConfigurations()
throws YarnException {
try {
if (isCachingEnabled()) {
return (Map<String, SubClusterPolicyConfiguration>) cache
.get(buildGetPoliciesConfigurationsCacheRequest());
} else {
return buildPolicyConfigMap(stateStore.getPoliciesConfigurations(
GetSubClusterPoliciesConfigurationsRequest.newInstance()));
}
} catch (Throwable ex) {
throw new YarnException(ex);
}
}
/**
* Adds the home {@link SubClusterId} for the specified {@link ApplicationId}.
*
* @param appHomeSubCluster the mapping of the application to it's home
* sub-cluster
* @throws YarnException if the call to the state store is unsuccessful
*/
public void addApplicationHomeSubCluster(
ApplicationHomeSubCluster appHomeSubCluster) throws YarnException {
stateStore.addApplicationHomeSubCluster(
AddApplicationHomeSubClusterRequest.newInstance(appHomeSubCluster));
return;
}
/**
* Updates the home {@link SubClusterId} for the specified
* {@link ApplicationId}.
*
* @param appHomeSubCluster the mapping of the application to it's home
* sub-cluster
* @throws YarnException if the call to the state store is unsuccessful
*/
public void updateApplicationHomeSubCluster(
ApplicationHomeSubCluster appHomeSubCluster) throws YarnException {
stateStore.updateApplicationHomeSubCluster(
UpdateApplicationHomeSubClusterRequest.newInstance(appHomeSubCluster));
return;
}
/**
* Returns the home {@link SubClusterId} for the specified
* {@link ApplicationId}.
*
* @param appId the identifier of the application
* @return the home sub cluster identifier
* @throws YarnException if the call to the state store is unsuccessful
*/
public SubClusterId getApplicationHomeSubCluster(ApplicationId appId)
throws YarnException {
GetApplicationHomeSubClusterResponse response =
stateStore.getApplicationHomeSubCluster(
GetApplicationHomeSubClusterRequest.newInstance(appId));
return response.getApplicationHomeSubCluster().getHomeSubCluster();
}
/**
* Helper method to create instances of Object using the class name defined in
* the configuration object. The instances creates {@link RetryProxy} using
* the specific {@link RetryPolicy}.
*
* @param conf the yarn configuration
* @param configuredClassName the configuration provider key
* @param defaultValue the default implementation for fallback
* @param type the class for which a retry proxy is required
* @param retryPolicy the policy for retrying method call failures
* @return a retry proxy for the specified interface
*/
@SuppressWarnings("unchecked")
public static <T> Object createRetryInstance(Configuration conf,
String configuredClassName, String defaultValue, Class<T> type,
RetryPolicy retryPolicy) {
String className = conf.get(configuredClassName, defaultValue);
try {
Class<?> clusterResolverClass = conf.getClassByName(className);
if (type.isAssignableFrom(clusterResolverClass)) {
return RetryProxy.create(type,
(T) ReflectionUtils.newInstance(clusterResolverClass, conf),
retryPolicy);
} else {
throw new YarnRuntimeException(
"Class: " + className + " not instance of " + type.getSimpleName());
}
} catch (Exception e) {
throw new YarnRuntimeException("Could not instantiate : " + className, e);
}
}
private Map<SubClusterId, SubClusterInfo> buildSubClusterInfoMap(
final GetSubClustersInfoResponse response) {
List<SubClusterInfo> subClusters = response.getSubClusters();
Map<SubClusterId, SubClusterInfo> subClustersMap =
new HashMap<>(subClusters.size());
for (SubClusterInfo subCluster : subClusters) {
subClustersMap.put(subCluster.getSubClusterId(), subCluster);
}
return subClustersMap;
}
private Object buildGetSubClustersCacheRequest(
final boolean filterInactiveSubClusters) {
final String cacheKey = buildCacheKey(getClass().getSimpleName(),
GET_SUBCLUSTERS_CACHEID, null);
CacheRequest<String, Map<SubClusterId, SubClusterInfo>> cacheRequest =
new CacheRequest<String, Map<SubClusterId, SubClusterInfo>>(cacheKey,
new Func<String, Map<SubClusterId, SubClusterInfo>>() {
@Override
public Map<SubClusterId, SubClusterInfo> invoke(String key)
throws Exception {
GetSubClustersInfoResponse subClusters =
stateStore.getSubClusters(GetSubClustersInfoRequest
.newInstance(filterInactiveSubClusters));
return buildSubClusterInfoMap(subClusters);
}
});
return cacheRequest;
}
private Map<String, SubClusterPolicyConfiguration> buildPolicyConfigMap(
GetSubClusterPoliciesConfigurationsResponse response) {
List<SubClusterPolicyConfiguration> policyConfigs =
response.getPoliciesConfigs();
Map<String, SubClusterPolicyConfiguration> queuePolicyConfigs =
new HashMap<>();
for (SubClusterPolicyConfiguration policyConfig : policyConfigs) {
queuePolicyConfigs.put(policyConfig.getQueue(), policyConfig);
}
return queuePolicyConfigs;
}
private Object buildGetPoliciesConfigurationsCacheRequest() {
final String cacheKey = buildCacheKey(getClass().getSimpleName(),
GET_POLICIES_CONFIGURATIONS_CACHEID, null);
CacheRequest<String, Map<String, SubClusterPolicyConfiguration>> cacheRequest =
new CacheRequest<String, Map<String, SubClusterPolicyConfiguration>>(
cacheKey,
new Func<String, Map<String, SubClusterPolicyConfiguration>>() {
@Override
public Map<String, SubClusterPolicyConfiguration> invoke(
String key) throws Exception {
GetSubClusterPoliciesConfigurationsResponse policyConfigs =
stateStore.getPoliciesConfigurations(
GetSubClusterPoliciesConfigurationsRequest
.newInstance());
return buildPolicyConfigMap(policyConfigs);
}
});
return cacheRequest;
}
protected String buildCacheKey(String typeName, String methodName,
String argName) {
StringBuilder buffer = new StringBuilder();
buffer.append(typeName).append(".");
buffer.append(methodName);
if (argName != null) {
buffer.append("::");
buffer.append(argName);
}
return buffer.toString();
}
/**
* Internal class that implements the CacheLoader interface that can be
* plugged into the CacheManager to load objects into the cache for specified
* keys.
*/
private static class CacheLoaderImpl<K, V> implements CacheLoader<K, V> {
@SuppressWarnings("unchecked")
@Override
public V load(K key) throws CacheLoaderException {
try {
CacheRequest<K, V> query = (CacheRequest<K, V>) key;
assert query != null;
return query.getValue();
} catch (Throwable ex) {
throw new CacheLoaderException(ex);
}
}
@Override
public Map<K, V> loadAll(Iterable<? extends K> keys)
throws CacheLoaderException {
// The FACADE does not use the Cache's getAll API. Hence this is not
// required to be implemented
throw new NotImplementedException();
}
}
/**
* Internal class that encapsulates the cache key and a function that returns
* the value for the specified key.
*/
private static class CacheRequest<K, V> {
private K key;
private Func<K, V> func;
public CacheRequest(K key, Func<K, V> func) {
this.key = key;
this.func = func;
}
public V getValue() throws Exception {
return func.invoke(key);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((key == null) ? 0 : key.hashCode());
return result;
}
@SuppressWarnings("unchecked")
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
CacheRequest<K, V> other = (CacheRequest<K, V>) obj;
if (key == null) {
if (other.key != null) {
return false;
}
} else if (!key.equals(other.key)) {
return false;
}
return true;
}
}
/**
* Encapsulates a method that has one parameter and returns a value of the
* type specified by the TResult parameter.
*/
protected interface Func<T, TResult> {
TResult invoke(T input) throws Exception;
}
}

View File

@ -0,0 +1,17 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.utils;

View File

@ -0,0 +1,149 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.utils;
import java.nio.ByteBuffer;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.util.MonotonicClock;
/**
* Utility class for FederationStateStore unit tests.
*/
public class FederationStateStoreTestUtil {
private static final MonotonicClock CLOCK = new MonotonicClock();
public static final String SC_PREFIX = "SC-";
public static final String Q_PREFIX = "queue-";
public static final String POLICY_PREFIX = "policy-";
private FederationStateStore stateStore;
public FederationStateStoreTestUtil(FederationStateStore stateStore) {
this.stateStore = stateStore;
}
private SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) {
String amRMAddress = "1.2.3.4:1";
String clientRMAddress = "1.2.3.4:2";
String rmAdminAddress = "1.2.3.4:3";
String webAppAddress = "1.2.3.4:4";
return SubClusterInfo.newInstance(subClusterId, amRMAddress,
clientRMAddress, rmAdminAddress, webAppAddress, SubClusterState.SC_NEW,
CLOCK.getTime(), "capability");
}
private void registerSubCluster(SubClusterId subClusterId)
throws YarnException {
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
stateStore.registerSubCluster(
SubClusterRegisterRequest.newInstance(subClusterInfo));
}
public void registerSubClusters(int numSubClusters) throws YarnException {
for (int i = 0; i < numSubClusters; i++) {
registerSubCluster(SubClusterId.newInstance(SC_PREFIX + i));
}
}
private void addApplicationHomeSC(ApplicationId appId,
SubClusterId subClusterId) throws YarnException {
ApplicationHomeSubCluster ahsc =
ApplicationHomeSubCluster.newInstance(appId, subClusterId);
AddApplicationHomeSubClusterRequest request =
AddApplicationHomeSubClusterRequest.newInstance(ahsc);
stateStore.addApplicationHomeSubCluster(request);
}
public void addAppsHomeSC(long clusterTs, int numApps) throws YarnException {
for (int i = 0; i < numApps; i++) {
addApplicationHomeSC(ApplicationId.newInstance(clusterTs, i),
SubClusterId.newInstance(SC_PREFIX + i));
}
}
private SubClusterPolicyConfiguration createSCPolicyConf(String queueName,
String policyType) {
return SubClusterPolicyConfiguration.newInstance(queueName, policyType,
ByteBuffer.allocate(1));
}
private void setPolicyConf(String queue, String policyType)
throws YarnException {
SetSubClusterPolicyConfigurationRequest request =
SetSubClusterPolicyConfigurationRequest
.newInstance(createSCPolicyConf(queue, policyType));
stateStore.setPolicyConfiguration(request);
}
public void addPolicyConfigs(int numQueues) throws YarnException {
for (int i = 0; i < numQueues; i++) {
setPolicyConf(Q_PREFIX + i, POLICY_PREFIX + i);
}
}
public SubClusterInfo querySubClusterInfo(SubClusterId subClusterId)
throws YarnException {
GetSubClusterInfoRequest request =
GetSubClusterInfoRequest.newInstance(subClusterId);
return stateStore.getSubCluster(request).getSubClusterInfo();
}
public SubClusterId queryApplicationHomeSC(ApplicationId appId)
throws YarnException {
GetApplicationHomeSubClusterRequest request =
GetApplicationHomeSubClusterRequest.newInstance(appId);
GetApplicationHomeSubClusterResponse response =
stateStore.getApplicationHomeSubCluster(request);
return response.getApplicationHomeSubCluster().getHomeSubCluster();
}
public SubClusterPolicyConfiguration queryPolicyConfiguration(String queue)
throws YarnException {
GetSubClusterPolicyConfigurationRequest request =
GetSubClusterPolicyConfigurationRequest.newInstance(queue);
GetSubClusterPolicyConfigurationResponse result =
stateStore.getPolicyConfiguration(request);
return result.getPolicyConfiguration();
}
}

View File

@ -0,0 +1,148 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.utils;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
/**
* Unit tests for FederationStateStoreFacade.
*/
@RunWith(Parameterized.class)
public class TestFederationStateStoreFacade {
@Parameters
public static Collection<Boolean[]> getParameters() {
return Arrays
.asList(new Boolean[][] {{Boolean.FALSE }, {Boolean.TRUE } });
}
private final long clusterTs = System.currentTimeMillis();
private final int numSubClusters = 3;
private final int numApps = 5;
private final int numQueues = 2;
private Configuration conf;
private FederationStateStore stateStore;
private FederationStateStoreTestUtil stateStoreTestUtil;
private FederationStateStoreFacade facade =
FederationStateStoreFacade.getInstance();
public TestFederationStateStoreFacade(Boolean isCachingEnabled) {
conf = new Configuration();
if (!(isCachingEnabled.booleanValue())) {
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
}
}
@Before
public void setUp() throws IOException, YarnException {
stateStore = new MemoryFederationStateStore();
stateStore.init(conf);
facade.reinitialize(stateStore, conf);
// hydrate the store
stateStoreTestUtil = new FederationStateStoreTestUtil(stateStore);
stateStoreTestUtil.registerSubClusters(numSubClusters);
stateStoreTestUtil.addAppsHomeSC(clusterTs, numApps);
stateStoreTestUtil.addPolicyConfigs(numQueues);
}
@After
public void tearDown() throws Exception {
stateStore.close();
stateStore = null;
}
@Test
public void testGetSubCluster() throws YarnException {
for (int i = 0; i < numSubClusters; i++) {
SubClusterId subClusterId =
SubClusterId.newInstance(FederationStateStoreTestUtil.SC_PREFIX + i);
Assert.assertEquals(stateStoreTestUtil.querySubClusterInfo(subClusterId),
facade.getSubCluster(subClusterId));
}
}
@Test
public void testGetSubClusterFlushCache() throws YarnException {
for (int i = 0; i < numSubClusters; i++) {
SubClusterId subClusterId =
SubClusterId.newInstance(FederationStateStoreTestUtil.SC_PREFIX + i);
Assert.assertEquals(stateStoreTestUtil.querySubClusterInfo(subClusterId),
facade.getSubCluster(subClusterId, true));
}
}
@Test
public void testGetSubClusters() throws YarnException {
Map<SubClusterId, SubClusterInfo> subClusters =
facade.getSubClusters(false);
for (SubClusterId subClusterId : subClusters.keySet()) {
Assert.assertEquals(stateStoreTestUtil.querySubClusterInfo(subClusterId),
subClusters.get(subClusterId));
}
}
@Test
public void testGetPolicyConfiguration() throws YarnException {
for (int i = 0; i < numQueues; i++) {
String queue = FederationStateStoreTestUtil.Q_PREFIX + i;
Assert.assertEquals(stateStoreTestUtil.queryPolicyConfiguration(queue),
facade.getPolicyConfiguration(queue));
}
}
@Test
public void testGetPoliciesConfigurations() throws YarnException {
Map<String, SubClusterPolicyConfiguration> queuePolicies =
facade.getPoliciesConfigurations();
for (String queue : queuePolicies.keySet()) {
Assert.assertEquals(stateStoreTestUtil.queryPolicyConfiguration(queue),
queuePolicies.get(queue));
}
}
@Test
public void testGetHomeSubClusterForApp() throws YarnException {
for (int i = 0; i < numApps; i++) {
ApplicationId appId = ApplicationId.newInstance(clusterTs, i);
Assert.assertEquals(stateStoreTestUtil.queryApplicationHomeSC(appId),
facade.getApplicationHomeSubCluster(appId));
}
}
}