YARN-11276. Add LRU cache for RMWebServices.getApps. (#4793)

Reviewed-by: Shilun Fan <slfan1989@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
This commit is contained in:
Xianming Lei 2023-05-26 20:46:00 +08:00 committed by GitHub
parent b977065cc4
commit 97afb33c73
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 452 additions and 0 deletions

View File

@ -4946,6 +4946,19 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX =
"workflowid:";
public static final String APPS_CACHE_ENABLE = YARN_PREFIX + "apps.cache.enable";
public static final boolean DEFAULT_APPS_CACHE_ENABLE = false;
// The size of cache for RMWebServices.getApps when yarn.apps.cache.enable = true,
// default is 1000
public static final String APPS_CACHE_SIZE = YARN_PREFIX + "apps.cache.size";
public static final int DEFAULT_APPS_CACHE_SIZE = 1000;
// The expire time of cache for RMWebServices.getApps when yarn.apps.cache.enable = true,
// default is 30s
public static final String APPS_CACHE_EXPIRE = YARN_PREFIX + "apps.cache.expire";
public static final String DEFAULT_APPS_CACHE_EXPIRE = "30s";
public YarnConfiguration() {
super();
}

View File

@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.util;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Set;
public class AppsCacheKey {
private static final Logger LOG = LoggerFactory.getLogger(AppsCacheKey.class.getName());
private UserGroupInformation ugi;
private String stateQuery;
private String finalStatusQuery;
private String userQuery;
private String queueQuery;
private String limit;
private String startedBegin;
private String startedEnd;
private String finishBegin;
private String finishEnd;
private String name;
private Set<String> unselectedFields;
private Set<String> applicationTags;
private Set<String> applicationTypes;
private Set<String> statesQuery;
@SuppressWarnings("checkstyle:ParameterNumber")
public AppsCacheKey(UserGroupInformation ugi, String stateQuery, Set<String> statesQuery,
String finalStatusQuery, String userQuery, String queueQuery, String limit,
String startedBegin, String startedEnd, String finishBegin, String finishEnd,
Set<String> applicationTypes, Set<String> applicationTags, String name,
Set<String> unselectedFields) {
this.ugi = ugi;
this.stateQuery = stateQuery;
this.statesQuery = statesQuery;
this.finalStatusQuery = finalStatusQuery;
this.userQuery = userQuery;
this.queueQuery = queueQuery;
this.limit = limit;
this.startedBegin = startedBegin;
this.startedEnd = startedEnd;
this.finishBegin = finishBegin;
this.finishEnd = finishEnd;
this.applicationTypes = applicationTypes;
this.applicationTags = applicationTags;
this.name = name;
this.unselectedFields = unselectedFields;
}
@SuppressWarnings("checkstyle:ParameterNumber")
public static AppsCacheKey newInstance(String stateQuery,
Set<String> statesQuery, String finalStatusQuery, String userQuery, String queueQuery,
String limit, String startedBegin, String startedEnd, String finishBegin, String finishEnd,
Set<String> applicationTypes, Set<String> applicationTags, String name,
Set<String> unselectedFields) {
UserGroupInformation ugi = null;
try {
ugi = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
LOG.error("unable to get ugi", e);
}
return new AppsCacheKey(ugi, stateQuery, statesQuery, finalStatusQuery, userQuery, queueQuery,
limit, startedBegin, startedEnd, finishBegin, finishEnd, applicationTypes, applicationTags,
name, unselectedFields);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AppsCacheKey that = (AppsCacheKey) o;
return new EqualsBuilder()
.append(this.ugi.getUserName(), that.ugi.getUserName())
.append(this.stateQuery, that.stateQuery)
.append(this.statesQuery, that.statesQuery)
.append(this.finalStatusQuery, that.finalStatusQuery)
.append(this.userQuery, that.userQuery)
.append(this.queueQuery, that.queueQuery)
.append(this.limit, that.limit)
.append(this.startedBegin, that.startedBegin)
.append(this.startedEnd, that.startedEnd)
.append(this.finishBegin, that.finishBegin)
.append(this.finishEnd, that.finishEnd)
.append(this.applicationTypes, that.applicationTypes)
.append(this.applicationTags, that.applicationTags)
.append(this.name, that.name)
.append(this.unselectedFields, that.unselectedFields)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder()
.append(this.ugi.getUserName())
.append(this.stateQuery)
.append(this.statesQuery)
.append(this.finalStatusQuery)
.append(this.userQuery)
.append(this.queueQuery)
.append(this.limit)
.append(this.startedBegin)
.append(this.startedEnd)
.append(this.finishBegin)
.append(this.finishEnd)
.append(this.applicationTypes)
.append(this.applicationTags)
.append(this.name)
.append(this.unselectedFields)
.toHashCode();
}
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.util;
import org.apache.hadoop.util.Time;
public class CacheNode<V>{
private V value;
private long cacheTime;
public CacheNode(V value){
this.value = value;
cacheTime = Time.now();
}
public V get(){
return value;
}
public long getCacheTime(){
return cacheTime;
}
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.util;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Time;
import java.util.Map;
public class LRUCache<K, V> {
private final long expireTimeMs;
private final Map<K, CacheNode<V>> cache;
public LRUCache(int capacity) {
this(capacity, -1);
}
public LRUCache(int capacity, long expireTimeMs) {
cache = new LRUCacheHashMap<>(capacity, true);
this.expireTimeMs = expireTimeMs;
}
public synchronized V get(K key) {
CacheNode<V> cacheNode = cache.get(key);
if (cacheNode != null) {
if (expireTimeMs > 0 && Time.now() > cacheNode.getCacheTime() + expireTimeMs) {
cache.remove(key);
return null;
}
}
return cacheNode == null ? null : cacheNode.get();
}
public synchronized V put(K key, V value) {
cache.put(key, new CacheNode<>(value));
return value;
}
@VisibleForTesting
public void clear(){
cache.clear();
}
public int size() {
return cache.size();
}
}

View File

@ -5261,4 +5261,33 @@
<value>false</value>
</property>
<property>
<name>yarn.apps.cache.enable</name>
<value>false</value>
<description>
Optional.
To enable cache for RMWebServices.getApps
</description>
</property>
<property>
<name>yarn.apps.cache.size</name>
<value>1000</value>
<description>
Optional.
The size of cache for RMWebServices.getApps when
yarn.apps.cache.enable = true, Default is 1000
</description>
</property>
<property>
<name>yarn.apps.cache.expire</name>
<value>30s</value>
<description>
Optional.
The expire time of cache for RMWebServices.getApps when
yarn.apps.cache.enable = true, Default is 30s
</description>
</property>
</configuration>

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.util;
import org.junit.Assert;
import org.junit.Test;
public class TestLRUCache {
public static final int CACHE_EXPIRE_TIME = 200;
@Test
public void testLRUCache() throws InterruptedException {
LRUCache<String, Integer> lruCache = new LRUCache<>(3, CACHE_EXPIRE_TIME);
lruCache.put("1", 1);
lruCache.put("2", 1);
lruCache.put("3", 3);
lruCache.put("4", 4);
Assert.assertEquals(lruCache.size(), 3);
Assert.assertNull(lruCache.get("1"));
Assert.assertNotNull(lruCache.get("2"));
Assert.assertNotNull(lruCache.get("3"));
Assert.assertNotNull(lruCache.get("3"));
lruCache.clear();
lruCache.put("1", 1);
Thread.sleep(201);
Assert.assertEquals(lruCache.size(), 1);
lruCache.get("1");
Assert.assertEquals(lruCache.size(), 0);
lruCache.put("2", 2);
Assert.assertEquals(lruCache.size(), 1);
lruCache.put("3", 3);
Assert.assertEquals(lruCache.size(), 2);
}
}

View File

@ -37,6 +37,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
@ -211,7 +213,9 @@ import org.apache.hadoop.yarn.server.webapp.WebServices;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.util.AdHocLogDumper;
import org.apache.hadoop.yarn.util.AppsCacheKey;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.LRUCache;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.webapp.BadRequestException;
@ -257,6 +261,10 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
private boolean filterAppsByUser = false;
private boolean filterInvalidXMLChars = false;
private boolean enableRestAppSubmissions = true;
private LRUCache<AppsCacheKey, AppsInfo> appsLRUCache;
private AtomicLong getAppsSuccessTimes = new AtomicLong(0);
private AtomicLong hitAppsCacheTimes = new AtomicLong(0);
private boolean enableAppsCache = false;
public final static String DELEGATION_TOKEN_HEADER =
"Hadoop-YARN-RM-Delegation-Token";
@ -278,6 +286,15 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
this.enableRestAppSubmissions = conf.getBoolean(
YarnConfiguration.ENABLE_REST_APP_SUBMISSIONS,
YarnConfiguration.DEFAULT_ENABLE_REST_APP_SUBMISSIONS);
this.enableAppsCache = this.conf.getBoolean(YarnConfiguration.APPS_CACHE_ENABLE,
YarnConfiguration.DEFAULT_APPS_CACHE_ENABLE);
if (enableAppsCache) {
int cacheSize = this.conf.getInt(YarnConfiguration.APPS_CACHE_SIZE,
YarnConfiguration.DEFAULT_APPS_CACHE_SIZE);
long appsCacheTimeMs = this.conf.getTimeDuration(YarnConfiguration.APPS_CACHE_EXPIRE,
YarnConfiguration.DEFAULT_APPS_CACHE_EXPIRE, TimeUnit.MILLISECONDS);
appsLRUCache = new LRUCache<>(cacheSize, appsCacheTimeMs);
}
}
RMWebServices(ResourceManager rm, Configuration conf,
@ -625,6 +642,23 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
@QueryParam(RMWSConsts.NAME) String name,
@QueryParam(RMWSConsts.DESELECTS) Set<String> unselectedFields) {
AppsCacheKey cacheKey = AppsCacheKey.newInstance(stateQuery, new HashSet<>(statesQuery),
finalStatusQuery, userQuery, queueQuery, limit, startedBegin, startedEnd, finishBegin,
finishEnd, new HashSet<>(applicationTypes), new HashSet<>(applicationTags), name,
unselectedFields);
if (this.enableAppsCache) {
long successTimes = getAppsSuccessTimes.incrementAndGet();
if (successTimes % 1000 == 0) {
LOG.debug("hit cache info: getAppsSuccessTimes={}, hitAppsCacheTimes={}",
successTimes, hitAppsCacheTimes.get());
}
AppsInfo appsInfo = appsLRUCache.get(cacheKey);
if (appsInfo != null) {
hitAppsCacheTimes.getAndIncrement();
return appsInfo;
}
}
initForReadableEndpoints();
GetApplicationsRequest request =
@ -695,6 +729,10 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
}
}
if (enableAppsCache) {
appsLRUCache.put(cacheKey, allApps);
getAppsSuccessTimes.getAndIncrement();
}
return allApps;
}
@ -2981,4 +3019,9 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
ResourceScheduler rs = rm.getResourceScheduler();
return new SchedulerOverviewInfo(rs);
}
@VisibleForTesting
public LRUCache<AppsCacheKey, AppsInfo> getAppsLRUCache(){
return appsLRUCache;
}
}

View File

@ -40,6 +40,7 @@ import java.util.Set;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -49,6 +50,7 @@ import javax.ws.rs.core.Response;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
@ -72,6 +74,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.*;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -90,6 +93,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterUserInfo;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.AdHocLogDumper;
import org.apache.hadoop.yarn.util.AppsCacheKey;
import org.apache.hadoop.yarn.util.LRUCache;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.ForbiddenException;
@ -1157,4 +1162,73 @@ public class TestRMWebServices extends JerseyTestBase {
int applicationPriority = json.getInt("applicationPriority");
assertEquals(0, applicationPriority);
}
@Test
public void testGetAppsCache() throws YarnException, InterruptedException, TimeoutException {
// mock up an RM that returns app reports for apps that don't exist
// in the RMApps list
ApplicationId appId = ApplicationId.newInstance(1, 1);
ApplicationReport mockReport = mock(ApplicationReport.class);
when(mockReport.getApplicationId()).thenReturn(appId);
GetApplicationsResponse mockAppsResponse =
mock(GetApplicationsResponse.class);
when(mockAppsResponse.getApplicationList())
.thenReturn(Arrays.asList(new ApplicationReport[]{mockReport}));
ClientRMService mockClientSvc = mock(ClientRMService.class);
when(mockClientSvc.getApplications(isA(GetApplicationsRequest.class)))
.thenReturn(mockAppsResponse);
ResourceManager mockRM = mock(ResourceManager.class);
RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null, null, null, null,
null, null);
when(mockRM.getRMContext()).thenReturn(rmContext);
when(mockRM.getClientRMService()).thenReturn(mockClientSvc);
rmContext.setNodeLabelManager(mock(RMNodeLabelsManager.class));
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.APPS_CACHE_ENABLE, true);
conf.setInt(YarnConfiguration.APPS_CACHE_SIZE, 2);
conf.setInt(YarnConfiguration.APPS_CACHE_EXPIRE, 100);
RMWebServices webSvc = new RMWebServices(mockRM, conf,
mock(HttpServletResponse.class));
final Set<String> emptySet =
Collections.unmodifiableSet(Collections.<String>emptySet());
// verify we don't get any apps when querying
HttpServletRequest mockHsr = mock(HttpServletRequest.class);
AppsInfo appsInfo = webSvc.getApps(mockHsr, null, emptySet, null,
"mock_user", "mock_queue", null, null, null, null, null, emptySet,
emptySet, null, null);
LRUCache<AppsCacheKey, AppsInfo> cache = webSvc.getAppsLRUCache();
Assert.assertEquals(1, cache.size());
AppsCacheKey appsCacheKey = AppsCacheKey.newInstance(null, emptySet,
null, "mock_user", "mock_queue", null, null, null, null, null, emptySet,
emptySet, null, null);
Assert.assertEquals(appsInfo, cache.get(appsCacheKey));
AppsInfo appsInfo1 = webSvc.getApps(mockHsr, null, emptySet, null,
"mock_user1", "mock_queue", null, null, null, null, null, emptySet,
emptySet, null, null);
Assert.assertEquals(2, cache.size());
AppsCacheKey appsCacheKey1 = AppsCacheKey.newInstance(null, emptySet,
null, "mock_user1", "mock_queue", null, null, null, null, null, emptySet,
emptySet, null, null);
Assert.assertEquals(appsInfo1, cache.get(appsCacheKey1));
AppsInfo appsInfo2 = webSvc.getApps(mockHsr, null, emptySet, null,
"mock_user2", "mock_queue", null, null, null, null, null, emptySet,
emptySet, null, null);
Assert.assertEquals(2, cache.size());
AppsCacheKey appsCacheKey2 = AppsCacheKey.newInstance(null, emptySet,
null, "mock_user2", "mock_queue", null, null, null, null, null, emptySet,
emptySet, null, null);
Assert.assertEquals(appsInfo2, cache.get(appsCacheKey2));
// appsCacheKey have removed
Assert.assertNull(cache.get(appsCacheKey));
GenericTestUtils.waitFor(() -> cache.get(appsCacheKey1) == null,
300, 1000);
GenericTestUtils.waitFor(() -> cache.get(appsCacheKey2) == null,
300, 1000);
}
}