YARN-11469. Refactor FederationStateStoreFacade Cache Code. (#5570)

Co-authored-by: slfan1989 <louj1988@@>
This commit is contained in:
slfan1989 2023-04-29 05:11:13 +08:00 committed by GitHub
parent 0e63152218
commit 5ed7e912dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 710 additions and 292 deletions

View File

@ -0,0 +1,484 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.cache;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public abstract class FederationCache {
// ------------------------------------ Constants -------------------------
protected static final String GET_SUBCLUSTERS_CACHEID = "getSubClusters";
protected static final String GET_POLICIES_CONFIGURATIONS_CACHEID =
"getPoliciesConfigurations";
protected static final String GET_APPLICATION_HOME_SUBCLUSTER_CACHEID =
"getApplicationHomeSubCluster";
protected static final String POINT = ".";
private FederationStateStore stateStore;
/**
* Determine whether to enable cache.
* We judge whether to enable the cache according to the cache time.
* If the cache time is greater than 0, the cache is enabled.
* If the cache time is less than or equal 0, the cache is not enabled.
*
* @return true, enable cache; false, not enable cache.
*/
public abstract boolean isCachingEnabled();
/**
* Initialize the cache.
*
* @param pConf Configuration.
* @param pStateStore FederationStateStore.
*/
public abstract void initCache(Configuration pConf, FederationStateStore pStateStore);
/**
* clear cache.
*/
public abstract void clearCache();
/**
* Build CacheKey.
*
* @param className Cache Class Name.
* @param methodName Method Name.
* @return append result.
* Example: className:FederationJCache, methodName:getPoliciesConfigurations.
* We Will Return FederationJCache.getPoliciesConfigurations.
*/
protected String buildCacheKey(String className, String methodName) {
return buildCacheKey(className, methodName, null);
}
/**
* Build CacheKey.
*
* @param className Cache Class Name.
* @param methodName Method Name.
* @param argName ArgName.
* @return append result.
* Example:
* className:FederationJCache, methodName:getApplicationHomeSubCluster, argName: app_1
* We Will Return FederationJCache.getApplicationHomeSubCluster.app_1
*/
protected String buildCacheKey(String className, String methodName, String argName) {
StringBuilder buffer = new StringBuilder();
buffer.append(className).append(POINT).append(methodName);
if (argName != null) {
buffer.append(POINT);
buffer.append(argName);
}
return buffer.toString();
}
/**
* Returns the {@link SubClusterInfo} of all active sub cluster(s).
*
* @param filterInactiveSubClusters whether to filter out inactive
* sub-clusters
* @return the information of all active sub cluster(s)
* @throws YarnException if the call to the state store is unsuccessful
*/
public abstract Map<SubClusterId, SubClusterInfo> getSubClusters(
boolean filterInactiveSubClusters) throws YarnException;
/**
* Get the policies that is represented as
* {@link SubClusterPolicyConfiguration} for all currently active queues in
* the system.
*
* @return the policies for all currently active queues in the system
* @throws YarnException if the call to the state store is unsuccessful
*/
public abstract Map<String, SubClusterPolicyConfiguration> getPoliciesConfigurations()
throws Exception;
/**
* Returns the home {@link SubClusterId} for the specified
* {@link ApplicationId}.
*
* @param appId the identifier of the application
* @return the home sub cluster identifier
* @throws YarnException if the call to the state store is unsuccessful
*/
public abstract SubClusterId getApplicationHomeSubCluster(ApplicationId appId) throws Exception;
/**
* Remove SubCluster from cache.
*
* @param filterInactiveSubClusters whether to filter out inactive
* sub-clusters.
*/
public abstract void removeSubCluster(boolean filterInactiveSubClusters);
// ------------------------------------ SubClustersCache -------------------------
/**
* Build GetSubClusters CacheRequest.
*
* @param cacheKey cacheKey.
* @param filterInactiveSubClusters filter Inactive SubClusters.
* @return CacheRequest.
* @throws YarnException exceptions from yarn servers.
*/
protected CacheRequest<String, CacheResponse<SubClusterInfo>> buildGetSubClustersCacheRequest(
String cacheKey, final boolean filterInactiveSubClusters) throws YarnException {
CacheResponse<SubClusterInfo> response =
buildSubClusterInfoResponse(filterInactiveSubClusters);
CacheRequest<String, CacheResponse<SubClusterInfo>> cacheRequest =
new CacheRequest<>(cacheKey, response);
return cacheRequest;
}
/**
* Build SubClusterInfo Response.
*
* @param filterInactiveSubClusters whether to filter out inactive sub-clusters.
* @return SubClusterInfo Response.
* @throws YarnException exceptions from yarn servers.
*/
private CacheResponse<SubClusterInfo> buildSubClusterInfoResponse(
final boolean filterInactiveSubClusters) throws YarnException {
GetSubClustersInfoRequest request = GetSubClustersInfoRequest.newInstance(
filterInactiveSubClusters);
GetSubClustersInfoResponse subClusters = stateStore.getSubClusters(request);
CacheResponse<SubClusterInfo> response = new SubClusterInfoCacheResponse();
response.setList(subClusters.getSubClusters());
return response;
}
/**
* According to the response, build SubClusterInfoMap.
*
* @param response GetSubClustersInfoResponse.
* @return SubClusterInfoMap.
*/
public static Map<SubClusterId, SubClusterInfo> buildSubClusterInfoMap(
final GetSubClustersInfoResponse response) {
List<SubClusterInfo> subClusters = response.getSubClusters();
return buildSubClusterInfoMap(subClusters);
}
/**
* According to the cacheRequest, build SubClusterInfoMap.
*
* @param cacheRequest CacheRequest.
* @return SubClusterInfoMap.
*/
public static Map<SubClusterId, SubClusterInfo> buildSubClusterInfoMap(
CacheRequest<String, ?> cacheRequest) {
Object value = cacheRequest.value;
SubClusterInfoCacheResponse response = SubClusterInfoCacheResponse.class.cast(value);
List<SubClusterInfo> subClusters = response.getList();
return buildSubClusterInfoMap(subClusters);
}
/**
* According to the subClusters, build SubClusterInfoMap.
*
* @param subClusters subCluster List.
* @return SubClusterInfoMap.
*/
private static Map<SubClusterId, SubClusterInfo> buildSubClusterInfoMap(
List<SubClusterInfo> subClusters) {
Map<SubClusterId, SubClusterInfo> subClustersMap = new HashMap<>(subClusters.size());
for (SubClusterInfo subCluster : subClusters) {
subClustersMap.put(subCluster.getSubClusterId(), subCluster);
}
return subClustersMap;
}
// ------------------------------------ ApplicationHomeSubClusterCache -------------------------
/**
* Build GetApplicationHomeSubCluster CacheRequest.
*
* @param cacheKey cacheKey.
* @param applicationId applicationId.
* @return CacheRequest.
* @throws YarnException exceptions from yarn servers.
*/
protected CacheRequest<String, CacheResponse<SubClusterId>>
buildGetApplicationHomeSubClusterRequest(String cacheKey, ApplicationId applicationId)
throws YarnException {
CacheResponse<SubClusterId> response = buildSubClusterIdResponse(applicationId);
return new CacheRequest<>(cacheKey, response);
}
/**
* Build SubClusterId Response.
*
* @param applicationId applicationId.
* @return subClusterId
* @throws YarnException exceptions from yarn servers.
*/
private CacheResponse<SubClusterId> buildSubClusterIdResponse(final ApplicationId applicationId)
throws YarnException {
GetApplicationHomeSubClusterRequest request =
GetApplicationHomeSubClusterRequest.newInstance(applicationId);
GetApplicationHomeSubClusterResponse response =
stateStore.getApplicationHomeSubCluster(request);
ApplicationHomeSubCluster appHomeSubCluster = response.getApplicationHomeSubCluster();
SubClusterId subClusterId = appHomeSubCluster.getHomeSubCluster();
CacheResponse<SubClusterId> cacheResponse = new ApplicationHomeSubClusterCacheResponse();
cacheResponse.setItem(subClusterId);
return cacheResponse;
}
// ------------------------------ SubClusterPolicyConfigurationCache -------------------------
/**
* Build GetPoliciesConfigurations CacheRequest.
*
* @param cacheKey cacheKey.
* @return CacheRequest.
* @throws YarnException exceptions from yarn servers.
*/
protected CacheRequest<String, CacheResponse<SubClusterPolicyConfiguration>>
buildGetPoliciesConfigurationsCacheRequest(String cacheKey) throws YarnException {
CacheResponse<SubClusterPolicyConfiguration> response =
buildSubClusterPolicyConfigurationResponse();
return new CacheRequest<>(cacheKey, response);
}
/**
* According to the response, build PolicyConfigMap.
*
* @param response GetSubClusterPoliciesConfigurationsResponse.
* @return PolicyConfigMap.
*/
public static Map<String, SubClusterPolicyConfiguration> buildPolicyConfigMap(
GetSubClusterPoliciesConfigurationsResponse response) {
List<SubClusterPolicyConfiguration> policyConfigs = response.getPoliciesConfigs();
return buildPolicyConfigMap(policyConfigs);
}
/**
* According to the subClusters, build PolicyConfigMap.
*
* @param policyConfigs SubClusterPolicyConfigurations
* @return PolicyConfigMap.
*/
private static Map<String, SubClusterPolicyConfiguration> buildPolicyConfigMap(
List<SubClusterPolicyConfiguration> policyConfigs) {
Map<String, SubClusterPolicyConfiguration> queuePolicyConfigs = new HashMap<>();
for (SubClusterPolicyConfiguration policyConfig : policyConfigs) {
queuePolicyConfigs.put(policyConfig.getQueue(), policyConfig);
}
return queuePolicyConfigs;
}
/**
* According to the cacheRequest, build PolicyConfigMap.
*
* @param cacheRequest CacheRequest.
* @return PolicyConfigMap.
*/
public static Map<String, SubClusterPolicyConfiguration> buildPolicyConfigMap(
CacheRequest<String, ?> cacheRequest){
Object value = cacheRequest.value;
SubClusterPolicyConfigurationCacheResponse response =
SubClusterPolicyConfigurationCacheResponse.class.cast(value);
List<SubClusterPolicyConfiguration> subClusters = response.getList();
return buildPolicyConfigMap(subClusters);
}
/**
* Build SubClusterPolicyConfiguration Response.
*
* @return SubClusterPolicyConfiguration Response.
* @throws YarnException exceptions from yarn servers.
*/
private CacheResponse<SubClusterPolicyConfiguration> buildSubClusterPolicyConfigurationResponse()
throws YarnException {
GetSubClusterPoliciesConfigurationsRequest request =
GetSubClusterPoliciesConfigurationsRequest.newInstance();
GetSubClusterPoliciesConfigurationsResponse response =
stateStore.getPoliciesConfigurations(request);
List<SubClusterPolicyConfiguration> policyConfigs = response.getPoliciesConfigs();
CacheResponse<SubClusterPolicyConfiguration> cacheResponse =
new SubClusterPolicyConfigurationCacheResponse();
cacheResponse.setList(policyConfigs);
return cacheResponse;
}
/**
* Internal class that encapsulates the cache key and a function that returns
* the value for the specified key.
*/
public class CacheRequest<K, V> {
private K key;
private V value;
CacheRequest(K pKey, V pValue) {
this.key = pKey;
this.value = pValue;
}
public V getValue() throws Exception {
return value;
}
@Override
public int hashCode() {
return new HashCodeBuilder().append(key).toHashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (obj instanceof CacheRequest) {
Class<CacheRequest> cacheRequestClass = CacheRequest.class;
CacheRequest other = cacheRequestClass.cast(obj);
return new EqualsBuilder().append(key, other.key).isEquals();
}
return false;
}
}
public class CacheResponse<R> {
private List<R> list;
private R item;
public List<R> getList() {
return list;
}
public void setList(List<R> list) {
this.list = list;
}
public R getItem() {
return item;
}
public void setItem(R pItem) {
this.item = pItem;
}
}
public class SubClusterInfoCacheResponse extends CacheResponse<SubClusterInfo> {
@Override
public List<SubClusterInfo> getList() {
return super.getList();
}
@Override
public void setList(List<SubClusterInfo> list) {
super.setList(list);
}
@Override
public SubClusterInfo getItem() {
return super.getItem();
}
@Override
public void setItem(SubClusterInfo item) {
super.setItem(item);
}
}
public class SubClusterPolicyConfigurationCacheResponse
extends CacheResponse<SubClusterPolicyConfiguration> {
@Override
public List<SubClusterPolicyConfiguration> getList() {
return super.getList();
}
@Override
public void setList(List<SubClusterPolicyConfiguration> list) {
super.setList(list);
}
@Override
public SubClusterPolicyConfiguration getItem() {
return super.getItem();
}
@Override
public void setItem(SubClusterPolicyConfiguration item) {
super.setItem(item);
}
}
public class ApplicationHomeSubClusterCacheResponse
extends CacheResponse<SubClusterId> {
@Override
public List<SubClusterId> getList() {
return super.getList();
}
@Override
public void setList(List<SubClusterId> list) {
super.setList(list);
}
@Override
public SubClusterId getItem() {
return super.getItem();
}
@Override
public void setItem(SubClusterId item) {
super.setItem(item);
}
}
public FederationStateStore getStateStore() {
return stateStore;
}
public void setStateStore(FederationStateStore stateStore) {
this.stateStore = stateStore;
}
}

View File

@ -0,0 +1,155 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.cache;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.spi.CachingProvider;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class FederationJCache extends FederationCache {
private static final Logger LOG = LoggerFactory.getLogger(FederationJCache.class);
private Cache<String, CacheRequest<String, ?>> cache;
private int cacheTimeToLive;
private boolean isCachingEnabled = false;
private String className = this.getClass().getSimpleName();
@Override
public boolean isCachingEnabled() {
return isCachingEnabled;
}
@Override
public void initCache(Configuration pConf, FederationStateStore pStateStore) {
// Picking the JCache provider from classpath, need to make sure there's
// no conflict or pick up a specific one in the future
cacheTimeToLive = pConf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS,
YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS);
if (cacheTimeToLive <= 0) {
isCachingEnabled = false;
return;
}
this.setStateStore(pStateStore);
CachingProvider jcacheProvider = Caching.getCachingProvider();
CacheManager jcacheManager = jcacheProvider.getCacheManager();
this.cache = jcacheManager.getCache(className);
if (this.cache == null) {
LOG.info("Creating a JCache Manager with name {}.", className);
Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive);
FactoryBuilder.SingletonFactory<ExpiryPolicy> expiryPolicySingletonFactory =
new FactoryBuilder.SingletonFactory<>(new CreatedExpiryPolicy(cacheExpiry));
MutableConfiguration<String, CacheRequest<String, ?>> configuration =
new MutableConfiguration<>();
configuration.setStoreByValue(false);
configuration.setExpiryPolicyFactory(expiryPolicySingletonFactory);
this.cache = jcacheManager.createCache(className, configuration);
}
isCachingEnabled = true;
}
@Override
public void clearCache() {
CachingProvider jcacheProvider = Caching.getCachingProvider();
CacheManager jcacheManager = jcacheProvider.getCacheManager();
jcacheManager.destroyCache(className);
this.cache = null;
}
@Override
public Map<SubClusterId, SubClusterInfo> getSubClusters(boolean filterInactiveSubClusters)
throws YarnException {
final String cacheKey = buildCacheKey(className, GET_SUBCLUSTERS_CACHEID,
Boolean.toString(filterInactiveSubClusters));
CacheRequest<String, ?> cacheRequest = cache.get(cacheKey);
if (cacheRequest == null) {
cacheRequest = buildGetSubClustersCacheRequest(className, filterInactiveSubClusters);
cache.put(cacheKey, cacheRequest);
}
return buildSubClusterInfoMap(cacheRequest);
}
@Override
public Map<String, SubClusterPolicyConfiguration> getPoliciesConfigurations()
throws Exception {
final String cacheKey = buildCacheKey(className, GET_POLICIES_CONFIGURATIONS_CACHEID);
CacheRequest<String, ?> cacheRequest = cache.get(cacheKey);
if(cacheRequest == null){
cacheRequest = buildGetPoliciesConfigurationsCacheRequest(className);
cache.put(cacheKey, cacheRequest);
}
return buildPolicyConfigMap(cacheRequest);
}
@Override
public SubClusterId getApplicationHomeSubCluster(ApplicationId appId)
throws Exception {
final String cacheKey = buildCacheKey(className, GET_APPLICATION_HOME_SUBCLUSTER_CACHEID,
appId.toString());
CacheRequest<String, ?> cacheRequest = cache.get(cacheKey);
if (cacheRequest == null) {
cacheRequest = buildGetApplicationHomeSubClusterRequest(className, appId);
cache.put(cacheKey, cacheRequest);
}
CacheResponse<SubClusterId> response =
ApplicationHomeSubClusterCacheResponse.class.cast(cacheRequest.getValue());
return response.getItem();
}
@Override
public void removeSubCluster(boolean flushCache) {
final String cacheKey = buildCacheKey(className, GET_SUBCLUSTERS_CACHEID,
Boolean.toString(flushCache));
cache.remove(cacheKey);
}
@VisibleForTesting
public Cache<String, CacheRequest<String, ?>> getCache() {
return cache;
}
@VisibleForTesting
public String getAppHomeSubClusterCacheKey(ApplicationId appId)
throws YarnException {
return buildCacheKey(className, GET_APPLICATION_HOME_SUBCLUSTER_CACHEID,
appId.toString());
}
}

View File

@ -0,0 +1,17 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.cache;

View File

@ -28,22 +28,10 @@ import java.util.concurrent.TimeUnit;
import java.util.Random;
import java.util.Collection;
import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.CompleteConfiguration;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.integration.CacheLoader;
import javax.cache.integration.CacheLoaderException;
import javax.cache.spi.CachingProvider;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
@ -55,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.cache.FederationCache;
import org.apache.hadoop.yarn.server.federation.cache.FederationJCache;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
@ -72,11 +62,9 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHome
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@ -101,6 +89,9 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.VisibleForTesting;
import com.zaxxer.hikari.pool.HikariPool.PoolInitializationException;
import static org.apache.hadoop.yarn.server.federation.cache.FederationCache.buildPolicyConfigMap;
import static org.apache.hadoop.yarn.server.federation.cache.FederationCache.buildSubClusterInfoMap;
/**
*
* The FederationStateStoreFacade is an utility wrapper that provides singleton
@ -112,22 +103,15 @@ public final class FederationStateStoreFacade {
private static final Logger LOG =
LoggerFactory.getLogger(FederationStateStoreFacade.class);
private static final String GET_SUBCLUSTERS_CACHEID = "getSubClusters";
private static final String GET_POLICIES_CONFIGURATIONS_CACHEID =
"getPoliciesConfigurations";
private static final String GET_APPLICATION_HOME_SUBCLUSTER_CACHEID =
"getApplicationHomeSubCluster";
private static final FederationStateStoreFacade FACADE =
new FederationStateStoreFacade();
private static Random rand = new Random(System.currentTimeMillis());
private FederationStateStore stateStore;
private int cacheTimeToLive;
private Configuration conf;
private Cache<Object, Object> cache;
private SubClusterResolver subclusterResolver;
private FederationCache federationCache;
private FederationStateStoreFacade() {
initializeFacadeInternal(new Configuration());
@ -148,11 +132,11 @@ public final class FederationStateStoreFacade {
SubClusterResolver.class);
this.subclusterResolver.load();
initCache();
federationCache = new FederationJCache();
federationCache.initCache(config, stateStore);
} catch (YarnException ex) {
LOG.error("Failed to initialize the FederationStateStoreFacade object",
ex);
LOG.error("Failed to initialize the FederationStateStoreFacade object", ex);
throw new RuntimeException(ex);
}
}
@ -169,8 +153,8 @@ public final class FederationStateStoreFacade {
Configuration config) {
this.conf = config;
this.stateStore = store;
clearCache();
initCache();
federationCache.clearCache();
federationCache.initCache(config, stateStore);
}
/**
@ -191,8 +175,7 @@ public final class FederationStateStoreFacade {
conf.getLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS,
YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS),
TimeUnit.MILLISECONDS);
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = new HashMap<>();
exceptionToPolicyMap.put(FederationStateStoreRetriableException.class,
basePolicy);
exceptionToPolicyMap.put(CacheLoaderException.class, basePolicy);
@ -203,47 +186,6 @@ public final class FederationStateStoreFacade {
return retryPolicy;
}
private boolean isCachingEnabled() {
return (cacheTimeToLive > 0);
}
private void initCache() {
// Picking the JCache provider from classpath, need to make sure there's
// no conflict or pick up a specific one in the future
cacheTimeToLive =
conf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS,
YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS);
if (isCachingEnabled()) {
CachingProvider jcacheProvider = Caching.getCachingProvider();
CacheManager jcacheManager = jcacheProvider.getCacheManager();
this.cache = jcacheManager.getCache(this.getClass().getSimpleName());
if (this.cache == null) {
LOG.info("Creating a JCache Manager with name "
+ this.getClass().getSimpleName());
Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive);
CompleteConfiguration<Object, Object> configuration =
new MutableConfiguration<Object, Object>().setStoreByValue(false)
.setReadThrough(true)
.setExpiryPolicyFactory(
new FactoryBuilder.SingletonFactory<ExpiryPolicy>(
new CreatedExpiryPolicy(cacheExpiry)))
.setCacheLoaderFactory(
new FactoryBuilder.SingletonFactory<CacheLoader<Object, Object>>(
new CacheLoaderImpl<Object, Object>()));
this.cache = jcacheManager.createCache(this.getClass().getSimpleName(),
configuration);
}
}
}
private void clearCache() {
CachingProvider jcacheProvider = Caching.getCachingProvider();
CacheManager jcacheManager = jcacheProvider.getCacheManager();
jcacheManager.destroyCache(this.getClass().getSimpleName());
this.cache = null;
}
/**
* Returns the singleton instance of the FederationStateStoreFacade object.
*
@ -263,7 +205,7 @@ public final class FederationStateStoreFacade {
*/
public SubClusterInfo getSubCluster(final SubClusterId subClusterId)
throws YarnException {
if (isCachingEnabled()) {
if (federationCache.isCachingEnabled()) {
return getSubClusters(false).get(subClusterId);
} else {
GetSubClusterInfoResponse response = stateStore
@ -287,10 +229,10 @@ public final class FederationStateStoreFacade {
*/
public SubClusterInfo getSubCluster(final SubClusterId subClusterId,
final boolean flushCache) throws YarnException {
if (flushCache && isCachingEnabled()) {
if (flushCache && federationCache.isCachingEnabled()) {
LOG.info("Flushing subClusters from cache and rehydrating from store,"
+ " most likely on account of RM failover.");
cache.remove(buildGetSubClustersCacheRequest(false));
federationCache.removeSubCluster(false);
}
return getSubCluster(subClusterId);
}
@ -303,16 +245,15 @@ public final class FederationStateStoreFacade {
* @return the information of all active sub cluster(s)
* @throws YarnException if the call to the state store is unsuccessful
*/
@SuppressWarnings("unchecked")
public Map<SubClusterId, SubClusterInfo> getSubClusters(
final boolean filterInactiveSubClusters) throws YarnException {
public Map<SubClusterId, SubClusterInfo> getSubClusters(final boolean filterInactiveSubClusters)
throws YarnException {
try {
if (isCachingEnabled()) {
return (Map<SubClusterId, SubClusterInfo>) cache
.get(buildGetSubClustersCacheRequest(filterInactiveSubClusters));
if (federationCache.isCachingEnabled()) {
return federationCache.getSubClusters(filterInactiveSubClusters);
} else {
return buildSubClusterInfoMap(stateStore.getSubClusters(
GetSubClustersInfoRequest.newInstance(filterInactiveSubClusters)));
GetSubClustersInfoRequest request =
GetSubClustersInfoRequest.newInstance(filterInactiveSubClusters);
return buildSubClusterInfoMap(stateStore.getSubClusters(request));
}
} catch (Throwable ex) {
throw new YarnException(ex);
@ -327,15 +268,15 @@ public final class FederationStateStoreFacade {
* mapping for the queue
* @throws YarnException if the call to the state store is unsuccessful
*/
public SubClusterPolicyConfiguration getPolicyConfiguration(
final String queue) throws YarnException {
if (isCachingEnabled()) {
public SubClusterPolicyConfiguration getPolicyConfiguration(final String queue)
throws YarnException {
if (federationCache.isCachingEnabled()) {
return getPoliciesConfigurations().get(queue);
} else {
GetSubClusterPolicyConfigurationRequest request =
GetSubClusterPolicyConfigurationRequest.newInstance(queue);
GetSubClusterPolicyConfigurationResponse response =
stateStore.getPolicyConfiguration(
GetSubClusterPolicyConfigurationRequest.newInstance(queue));
stateStore.getPolicyConfiguration(request);
if (response == null) {
return null;
} else {
@ -352,16 +293,15 @@ public final class FederationStateStoreFacade {
* @return the policies for all currently active queues in the system
* @throws YarnException if the call to the state store is unsuccessful
*/
@SuppressWarnings("unchecked")
public Map<String, SubClusterPolicyConfiguration> getPoliciesConfigurations()
throws YarnException {
try {
if (isCachingEnabled()) {
return (Map<String, SubClusterPolicyConfiguration>) cache
.get(buildGetPoliciesConfigurationsCacheRequest());
if (federationCache.isCachingEnabled()) {
return federationCache.getPoliciesConfigurations();
} else {
return buildPolicyConfigMap(stateStore.getPoliciesConfigurations(
GetSubClusterPoliciesConfigurationsRequest.newInstance()));
GetSubClusterPoliciesConfigurationsRequest request =
GetSubClusterPoliciesConfigurationsRequest.newInstance();
return buildPolicyConfigMap(stateStore.getPoliciesConfigurations(request));
}
} catch (Throwable ex) {
throw new YarnException(ex);
@ -396,7 +336,6 @@ public final class FederationStateStoreFacade {
ApplicationHomeSubCluster appHomeSubCluster) throws YarnException {
stateStore.updateApplicationHomeSubCluster(
UpdateApplicationHomeSubClusterRequest.newInstance(appHomeSubCluster));
return;
}
/**
@ -410,10 +349,8 @@ public final class FederationStateStoreFacade {
public SubClusterId getApplicationHomeSubCluster(ApplicationId appId)
throws YarnException {
try {
if (isCachingEnabled()) {
SubClusterId value = SubClusterId.class.cast(
cache.get(buildGetApplicationHomeSubClusterRequest(appId)));
return value;
if (federationCache.isCachingEnabled()) {
return federationCache.getApplicationHomeSubCluster(appId);
} else {
GetApplicationHomeSubClusterResponse response = stateStore.getApplicationHomeSubCluster(
GetApplicationHomeSubClusterRequest.newInstance(appId));
@ -550,196 +487,6 @@ public final class FederationStateStoreFacade {
}
}
private Map<SubClusterId, SubClusterInfo> buildSubClusterInfoMap(
final GetSubClustersInfoResponse response) {
List<SubClusterInfo> subClusters = response.getSubClusters();
Map<SubClusterId, SubClusterInfo> subClustersMap =
new HashMap<>(subClusters.size());
for (SubClusterInfo subCluster : subClusters) {
subClustersMap.put(subCluster.getSubClusterId(), subCluster);
}
return subClustersMap;
}
private Object buildGetSubClustersCacheRequest(
final boolean filterInactiveSubClusters) {
final String cacheKey =
buildCacheKey(getClass().getSimpleName(), GET_SUBCLUSTERS_CACHEID,
Boolean.toString(filterInactiveSubClusters));
CacheRequest<String, Map<SubClusterId, SubClusterInfo>> cacheRequest =
new CacheRequest<String, Map<SubClusterId, SubClusterInfo>>(cacheKey,
new Func<String, Map<SubClusterId, SubClusterInfo>>() {
@Override
public Map<SubClusterId, SubClusterInfo> invoke(String key)
throws Exception {
GetSubClustersInfoResponse subClusters =
stateStore.getSubClusters(GetSubClustersInfoRequest
.newInstance(filterInactiveSubClusters));
return buildSubClusterInfoMap(subClusters);
}
});
return cacheRequest;
}
private Map<String, SubClusterPolicyConfiguration> buildPolicyConfigMap(
GetSubClusterPoliciesConfigurationsResponse response) {
List<SubClusterPolicyConfiguration> policyConfigs =
response.getPoliciesConfigs();
Map<String, SubClusterPolicyConfiguration> queuePolicyConfigs =
new HashMap<>();
for (SubClusterPolicyConfiguration policyConfig : policyConfigs) {
queuePolicyConfigs.put(policyConfig.getQueue(), policyConfig);
}
return queuePolicyConfigs;
}
private Object buildGetPoliciesConfigurationsCacheRequest() {
final String cacheKey = buildCacheKey(getClass().getSimpleName(),
GET_POLICIES_CONFIGURATIONS_CACHEID, null);
CacheRequest<String, Map<String, SubClusterPolicyConfiguration>> cacheRequest =
new CacheRequest<String, Map<String, SubClusterPolicyConfiguration>>(
cacheKey,
new Func<String, Map<String, SubClusterPolicyConfiguration>>() {
@Override
public Map<String, SubClusterPolicyConfiguration> invoke(
String key) throws Exception {
GetSubClusterPoliciesConfigurationsResponse policyConfigs =
stateStore.getPoliciesConfigurations(
GetSubClusterPoliciesConfigurationsRequest
.newInstance());
return buildPolicyConfigMap(policyConfigs);
}
});
return cacheRequest;
}
private Object buildGetApplicationHomeSubClusterRequest(ApplicationId applicationId) {
final String cacheKey = buildCacheKey(getClass().getSimpleName(),
GET_APPLICATION_HOME_SUBCLUSTER_CACHEID, applicationId.toString());
CacheRequest<String, SubClusterId> cacheRequest = new CacheRequest<>(
cacheKey,
input -> {
GetApplicationHomeSubClusterRequest request =
GetApplicationHomeSubClusterRequest.newInstance(applicationId);
GetApplicationHomeSubClusterResponse response =
stateStore.getApplicationHomeSubCluster(request);
ApplicationHomeSubCluster appHomeSubCluster = response.getApplicationHomeSubCluster();
SubClusterId subClusterId = appHomeSubCluster.getHomeSubCluster();
return subClusterId;
});
return cacheRequest;
}
protected String buildCacheKey(String typeName, String methodName,
String argName) {
StringBuilder buffer = new StringBuilder();
buffer.append(typeName).append(".")
.append(methodName);
if (argName != null) {
buffer.append("::");
buffer.append(argName);
}
return buffer.toString();
}
/**
* Internal class that implements the CacheLoader interface that can be
* plugged into the CacheManager to load objects into the cache for specified
* keys.
*/
private static class CacheLoaderImpl<K, V> implements CacheLoader<K, V> {
@SuppressWarnings("unchecked")
@Override
public V load(K key) throws CacheLoaderException {
try {
CacheRequest<K, V> query = (CacheRequest<K, V>) key;
assert query != null;
return query.getValue();
} catch (Throwable ex) {
throw new CacheLoaderException(ex);
}
}
@Override
public Map<K, V> loadAll(Iterable<? extends K> keys)
throws CacheLoaderException {
// The FACADE does not use the Cache's getAll API. Hence this is not
// required to be implemented
throw new NotImplementedException("Code is not implemented");
}
}
/**
* Internal class that encapsulates the cache key and a function that returns
* the value for the specified key.
*/
private static class CacheRequest<K, V> {
private K key;
private Func<K, V> func;
CacheRequest(K key, Func<K, V> func) {
this.key = key;
this.func = func;
}
public V getValue() throws Exception {
return func.invoke(key);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((key == null) ? 0 : key.hashCode());
return result;
}
@SuppressWarnings("unchecked")
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
CacheRequest<K, V> other = (CacheRequest<K, V>) obj;
if (key == null) {
if (other.key != null) {
return false;
}
} else if (!key.equals(other.key)) {
return false;
}
return true;
}
}
/**
* Encapsulates a method that has one parameter and returns a value of the
* type specified by the TResult parameter.
*/
protected interface Func<T, TResult> {
TResult invoke(T input) throws Exception;
}
@VisibleForTesting
public Cache<Object, Object> getCache() {
return cache;
}
@VisibleForTesting
protected Object getAppHomeSubClusterCacheRequest(ApplicationId applicationId) {
return buildGetApplicationHomeSubClusterRequest(applicationId);
}
@VisibleForTesting
public FederationStateStore getStateStore() {
return stateStore;
@ -1232,4 +979,9 @@ public final class FederationStateStoreFacade {
return null;
}
}
@VisibleForTesting
public FederationCache getFederationCache() {
return federationCache;
}
}

View File

@ -35,6 +35,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.cache.FederationCache;
import org.apache.hadoop.yarn.server.federation.cache.FederationCache.ApplicationHomeSubClusterCacheResponse;
import org.apache.hadoop.yarn.server.federation.cache.FederationCache.CacheRequest;
import org.apache.hadoop.yarn.server.federation.cache.FederationJCache;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
@ -223,7 +227,7 @@ public class TestFederationStateStoreFacade {
}
@Test
public void testGetApplicationHomeSubClusterCache() throws YarnException {
public void testGetApplicationHomeSubClusterCache() throws Exception {
ApplicationId appId = ApplicationId.newInstance(clusterTs, numApps + 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("Home1");
@ -236,9 +240,15 @@ public class TestFederationStateStoreFacade {
Assert.assertEquals(subClusterId1, subClusterIdAdd);
if (isCachingEnabled.booleanValue()) {
Cache<Object, Object> cache = facade.getCache();
Object cacheKey = facade.getAppHomeSubClusterCacheRequest(appId);
Object subClusterIdByCache = cache.get(cacheKey);
FederationCache fedCache = facade.getFederationCache();
assert fedCache instanceof FederationJCache;
FederationJCache jCache = (FederationJCache) fedCache;
String cacheKey = jCache.getAppHomeSubClusterCacheKey(appId);
Cache<String, CacheRequest<String, ?>> cache = jCache.getCache();
CacheRequest<String, ?> cacheRequest = cache.get(cacheKey);
ApplicationHomeSubClusterCacheResponse response =
ApplicationHomeSubClusterCacheResponse.class.cast(cacheRequest.getValue());
SubClusterId subClusterIdByCache = response.getItem();
Assert.assertEquals(subClusterIdByFacade, subClusterIdByCache);
Assert.assertEquals(subClusterId1, subClusterIdByCache);
}