YARN-8482. [Router] Add cache for fast answers to getApps. (#4769)
This commit is contained in:
parent
4031b0774e
commit
0075ef15c2
|
@ -4128,6 +4128,15 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final String ROUTER_KERBEROS_PRINCIPAL_HOSTNAME_KEY = ROUTER_PREFIX +
|
||||
"kerberos.principal.hostname";
|
||||
|
||||
/** Router enable AppsInfo Cache. **/
|
||||
public static final String ROUTER_APPSINFO_ENABLED = ROUTER_WEBAPP_PREFIX + "appsinfo-enabled";
|
||||
public static final boolean DEFAULT_ROUTER_APPSINFO_ENABLED = false;
|
||||
|
||||
/** Router AppsInfo Cache Count. **/
|
||||
public static final String ROUTER_APPSINFO_CACHED_COUNT =
|
||||
ROUTER_WEBAPP_PREFIX + "appsinfo-cached-count";
|
||||
public static final int DEFAULT_ROUTER_APPSINFO_CACHED_COUNT = 100;
|
||||
|
||||
////////////////////////////////
|
||||
// CSI Volume configs
|
||||
////////////////////////////////
|
||||
|
|
|
@ -4943,4 +4943,24 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>yarn.router.webapp.appsinfo-enabled</name>
|
||||
<value>false</value>
|
||||
<description>
|
||||
This configuration is used to enable the cache of AppsInfo.
|
||||
If it is set to true, the cache is enabled.
|
||||
If it is set to false, the cache is not enabled.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>yarn.router.webapp.appsinfo-cached-count</name>
|
||||
<value>100</value>
|
||||
<description>
|
||||
When yarn.router.appsinfo-enabled is set to true,
|
||||
the number of cached appsInfo.
|
||||
Default is 100
|
||||
</description>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
|
|
@ -100,9 +100,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
|
|||
import org.apache.hadoop.yarn.server.router.RouterMetrics;
|
||||
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
|
||||
import org.apache.hadoop.yarn.server.router.clientrm.ClientMethod;
|
||||
import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
||||
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
|
||||
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.MonotonicClock;
|
||||
|
@ -132,8 +134,11 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
private RouterMetrics routerMetrics;
|
||||
private final Clock clock = new MonotonicClock();
|
||||
private boolean returnPartialReport;
|
||||
private boolean appInfosCacheEnabled;
|
||||
private int appInfosCacheCount;
|
||||
|
||||
private Map<SubClusterId, DefaultRequestInterceptorREST> interceptors;
|
||||
private LRUCacheHashMap<RouterAppInfoCacheKey, AppsInfo> appInfosCaches;
|
||||
|
||||
/**
|
||||
* Thread pool used for asynchronous operations.
|
||||
|
@ -170,6 +175,17 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
returnPartialReport = conf.getBoolean(
|
||||
YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED,
|
||||
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED);
|
||||
|
||||
appInfosCacheEnabled = conf.getBoolean(
|
||||
YarnConfiguration.ROUTER_APPSINFO_ENABLED,
|
||||
YarnConfiguration.DEFAULT_ROUTER_APPSINFO_ENABLED);
|
||||
|
||||
if(appInfosCacheEnabled) {
|
||||
appInfosCacheCount = conf.getInt(
|
||||
YarnConfiguration.ROUTER_APPSINFO_CACHED_COUNT,
|
||||
YarnConfiguration.DEFAULT_ROUTER_APPSINFO_CACHED_COUNT);
|
||||
appInfosCaches = new LRUCacheHashMap<>(appInfosCacheCount, true);
|
||||
}
|
||||
}
|
||||
|
||||
private SubClusterId getRandomActiveSubCluster(
|
||||
|
@ -681,6 +697,18 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
String queueQuery, String count, String startedBegin, String startedEnd,
|
||||
String finishBegin, String finishEnd, Set<String> applicationTypes,
|
||||
Set<String> applicationTags, String name, Set<String> unselectedFields) {
|
||||
|
||||
RouterAppInfoCacheKey routerAppInfoCacheKey = RouterAppInfoCacheKey.newInstance(
|
||||
hsr, stateQuery, statesQuery, finalStatusQuery, userQuery, queueQuery, count,
|
||||
startedBegin, startedEnd, finishBegin, finishEnd, applicationTypes,
|
||||
applicationTags, name, unselectedFields);
|
||||
|
||||
if (appInfosCacheEnabled && routerAppInfoCacheKey != null) {
|
||||
if (appInfosCaches.containsKey(routerAppInfoCacheKey)) {
|
||||
return appInfosCaches.get(routerAppInfoCacheKey);
|
||||
}
|
||||
}
|
||||
|
||||
AppsInfo apps = new AppsInfo();
|
||||
long startTime = clock.getTime();
|
||||
|
||||
|
@ -744,8 +772,14 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
}
|
||||
|
||||
// Merge all the application reports got from all the available YARN RMs
|
||||
return RouterWebServiceUtil.mergeAppsInfo(
|
||||
AppsInfo resultAppsInfo = RouterWebServiceUtil.mergeAppsInfo(
|
||||
apps.getApps(), returnPartialReport);
|
||||
|
||||
if (appInfosCacheEnabled && routerAppInfoCacheKey != null) {
|
||||
appInfosCaches.put(routerAppInfoCacheKey, resultAppsInfo);
|
||||
}
|
||||
|
||||
return resultAppsInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1773,4 +1807,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
}
|
||||
throw new YarnException("Unable to get subCluster by applicationId = " + appId);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public LRUCacheHashMap<RouterAppInfoCacheKey, AppsInfo> getAppInfosCaches() {
|
||||
return appInfosCaches;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,156 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.router.webapp.cache;
|
||||
|
||||
import org.apache.commons.lang3.builder.EqualsBuilder;
|
||||
import org.apache.commons.lang3.builder.HashCodeBuilder;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import java.util.Set;
|
||||
|
||||
public class RouterAppInfoCacheKey {
|
||||
|
||||
private static String user = "YarnRouter";
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(RouterAppInfoCacheKey.class.getName());
|
||||
|
||||
private UserGroupInformation ugi;
|
||||
private String stateQuery;
|
||||
private Set<String> statesQuery;
|
||||
private String finalStatusQuery;
|
||||
private String userQuery;
|
||||
private String queueQuery;
|
||||
private String count;
|
||||
private String startedBegin;
|
||||
private String startedEnd;
|
||||
private String finishBegin;
|
||||
private String finishEnd;
|
||||
private Set<String> applicationTypes;
|
||||
private Set<String> applicationTags;
|
||||
private String name;
|
||||
private Set<String> unselectedFields;
|
||||
|
||||
public RouterAppInfoCacheKey() {
|
||||
|
||||
}
|
||||
|
||||
@SuppressWarnings("checkstyle:ParameterNumber")
|
||||
public RouterAppInfoCacheKey(UserGroupInformation ugi, String stateQuery,
|
||||
Set<String> statesQuery, String finalStatusQuery, String userQuery,
|
||||
String queueQuery, String count, String startedBegin, String startedEnd,
|
||||
String finishBegin, String finishEnd, Set<String> applicationTypes,
|
||||
Set<String> applicationTags, String name, Set<String> unselectedFields) {
|
||||
this.ugi = ugi;
|
||||
this.stateQuery = stateQuery;
|
||||
this.statesQuery = statesQuery;
|
||||
this.finalStatusQuery = finalStatusQuery;
|
||||
this.userQuery = userQuery;
|
||||
this.queueQuery = queueQuery;
|
||||
this.count = count;
|
||||
this.startedBegin = startedBegin;
|
||||
this.startedEnd = startedEnd;
|
||||
this.finishBegin = finishBegin;
|
||||
this.finishEnd = finishEnd;
|
||||
this.applicationTypes = applicationTypes;
|
||||
this.applicationTags = applicationTags;
|
||||
this.name = name;
|
||||
this.unselectedFields = unselectedFields;
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("checkstyle:ParameterNumber")
|
||||
public static RouterAppInfoCacheKey newInstance(HttpServletRequest hsr, String stateQuery,
|
||||
Set<String> statesQuery, String finalStatusQuery, String userQuery,
|
||||
String queueQuery, String count, String startedBegin, String startedEnd,
|
||||
String finishBegin, String finishEnd, Set<String> applicationTypes,
|
||||
Set<String> applicationTags, String name, Set<String> unselectedFields) {
|
||||
|
||||
UserGroupInformation callerUGI = null;
|
||||
if (hsr != null) {
|
||||
callerUGI = RMWebAppUtil.getCallerUserGroupInformation(hsr, true);
|
||||
} else {
|
||||
// user not required
|
||||
callerUGI = UserGroupInformation.createRemoteUser("YarnRouter");
|
||||
}
|
||||
|
||||
if (callerUGI == null) {
|
||||
LOG.error("Unable to obtain user name, user not authenticated.");
|
||||
return null;
|
||||
}
|
||||
|
||||
return new RouterAppInfoCacheKey(
|
||||
callerUGI, stateQuery, statesQuery, finalStatusQuery, userQuery,
|
||||
queueQuery, count, startedBegin, startedEnd, finishBegin, finishEnd,
|
||||
applicationTypes, applicationTags, name, unselectedFields);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
RouterAppInfoCacheKey that = (RouterAppInfoCacheKey) o;
|
||||
|
||||
return new EqualsBuilder()
|
||||
.append(this.ugi.getUserName(), that.ugi.getUserName())
|
||||
.append(this.stateQuery, that.stateQuery)
|
||||
.append(this.statesQuery, that.statesQuery)
|
||||
.append(this.finalStatusQuery, that.finalStatusQuery)
|
||||
.append(this.userQuery, that.userQuery)
|
||||
.append(this.queueQuery, that.queueQuery)
|
||||
.append(this.count, that.count)
|
||||
.append(this.startedBegin, that.startedBegin)
|
||||
.append(this.startedEnd, that.startedEnd)
|
||||
.append(this.finishBegin, that.finishBegin)
|
||||
.append(this.finishEnd, that.finishEnd)
|
||||
.append(this.applicationTypes, that.applicationTypes)
|
||||
.append(this.applicationTags, that.applicationTags)
|
||||
.append(this.name, that.name)
|
||||
.append(this.unselectedFields, that.unselectedFields)
|
||||
.isEquals();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return new HashCodeBuilder()
|
||||
.append(this.ugi.getUserName())
|
||||
.append(this.stateQuery)
|
||||
.append(this.statesQuery)
|
||||
.append(this.finalStatusQuery)
|
||||
.append(this.userQuery)
|
||||
.append(this.queueQuery)
|
||||
.append(this.count)
|
||||
.append(this.startedBegin)
|
||||
.append(this.startedEnd)
|
||||
.append(this.finishBegin)
|
||||
.append(this.finishEnd)
|
||||
.append(this.applicationTypes)
|
||||
.append(this.applicationTags)
|
||||
.append(this.name)
|
||||
.append(this.unselectedFields)
|
||||
.toHashCode();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.router.webapp.cache;
|
|
@ -72,10 +72,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemIn
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
|
||||
import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
||||
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
|
||||
import org.apache.hadoop.yarn.util.MonotonicClock;
|
||||
import org.apache.hadoop.yarn.util.Times;
|
||||
import org.junit.Assert;
|
||||
|
@ -160,6 +162,10 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
|
|||
// Disable StateStoreFacade cache
|
||||
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
|
||||
|
||||
// Open AppsInfo Cache
|
||||
conf.setBoolean(YarnConfiguration.ROUTER_APPSINFO_ENABLED, true);
|
||||
conf.setInt(YarnConfiguration.ROUTER_APPSINFO_CACHED_COUNT, 10);
|
||||
|
||||
return conf;
|
||||
}
|
||||
|
||||
|
@ -960,6 +966,28 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
|
|||
Assert.assertEquals(queueName, queue.getQueue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAppsInfoCache() throws IOException, InterruptedException, YarnException {
|
||||
|
||||
AppsInfo responseGet = interceptor.getApps(
|
||||
null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
|
||||
Assert.assertNotNull(responseGet);
|
||||
|
||||
RouterAppInfoCacheKey cacheKey = RouterAppInfoCacheKey.newInstance(
|
||||
null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
|
||||
|
||||
LRUCacheHashMap<RouterAppInfoCacheKey, AppsInfo> appsInfoCache =
|
||||
interceptor.getAppInfosCaches();
|
||||
Assert.assertNotNull(appsInfoCache);
|
||||
Assert.assertTrue(!appsInfoCache.isEmpty());
|
||||
Assert.assertEquals(1, appsInfoCache.size());
|
||||
Assert.assertTrue(appsInfoCache.containsKey(cacheKey));
|
||||
|
||||
AppsInfo cacheResult = appsInfoCache.get(cacheKey);
|
||||
Assert.assertNotNull(cacheResult);
|
||||
Assert.assertEquals(responseGet, cacheResult);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAppStatistics() throws IOException, InterruptedException, YarnException {
|
||||
AppState appStateRUNNING = new AppState(YarnApplicationState.RUNNING.name());
|
||||
|
|
Loading…
Reference in New Issue