security: limit the size of the role store cache

Previously the roles store cache was unbounded as it was a just using a ConcurrentHashMap,
which could lead to excessive memory usage in cases where there are a large number of roles
as we tried to eagerly load the roles into the cache if they were not present. The roles store now
loads roles on demand and caches them for a finite period of time.

Additionally, the background polling of roles has been removed to reduce complexity. A best effort
attempt is made to clear the roles cache upon modification and if necessary the cache can be
cleared manually.

See elastic/elasticsearch#1837

Original commit: elastic/x-pack-elasticsearch@450dd779c8
This commit is contained in:
jaymode 2016-08-30 09:01:26 -04:00
parent 06ff97f63d
commit 7d78911082
3 changed files with 102 additions and 247 deletions

View File

@ -302,7 +302,7 @@ public class Security implements ActionPlugin, IngestPlugin {
components.add(authcService);
final FileRolesStore fileRolesStore = new FileRolesStore(settings, env, resourceWatcherService);
final NativeRolesStore nativeRolesStore = new NativeRolesStore(settings, client, threadPool);
final NativeRolesStore nativeRolesStore = new NativeRolesStore(settings, client);
final ReservedRolesStore reservedRolesStore = new ReservedRolesStore(securityContext);
final CompositeRolesStore allRolesStore = new CompositeRolesStore(settings, fileRolesStore, nativeRolesStore, reservedRolesStore);
final AuthorizationService authzService = new AuthorizationService(settings, allRolesStore, clusterService,

View File

@ -9,16 +9,14 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
@ -38,18 +36,18 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.IndexNotFoundException;
@ -57,8 +55,6 @@ import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.security.SecurityTemplateService;
import org.elasticsearch.xpack.security.action.role.ClearRolesCacheRequest;
@ -69,7 +65,6 @@ import org.elasticsearch.xpack.security.authz.RoleDescriptor;
import org.elasticsearch.xpack.security.authz.permission.IndicesPermission.Group;
import org.elasticsearch.xpack.security.authz.permission.Role;
import org.elasticsearch.xpack.security.client.SecurityClient;
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.security.Security.setting;
@ -91,8 +86,10 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C
public static final Setting<TimeValue> SCROLL_KEEP_ALIVE_SETTING =
Setting.timeSetting(setting("authz.store.roles.index.scroll.keep_alive"), TimeValue.timeValueSeconds(10L), Property.NodeScope);
public static final Setting<TimeValue> POLL_INTERVAL_SETTING =
Setting.timeSetting(setting("authz.store.roles.index.reload.interval"), TimeValue.timeValueSeconds(30L), Property.NodeScope);
private static final Setting<Integer> CACHE_SIZE_SETTING =
Setting.intSetting(setting("authz.store.roles.index.cache.max_size"), 10000, Property.NodeScope);
private static final Setting<TimeValue> CACHE_TTL_SETTING =
Setting.timeSetting(setting("authz.store.roles.index.cache.ttl"), TimeValue.timeValueMinutes(20), Property.NodeScope);
public enum State {
INITIALIZED,
@ -106,21 +103,27 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C
public static final String ROLE_DOC_TYPE = "role";
private final InternalClient client;
private final ThreadPool threadPool;
private final AtomicReference<State> state = new AtomicReference<>(State.INITIALIZED);
private final ConcurrentHashMap<String, RoleAndVersion> roleCache = new ConcurrentHashMap<>();
private final Cache<String, RoleAndVersion> roleCache;
// the lock is used in an odd manner; when iterating over the cache we cannot have modifiers other than deletes using
// the iterator but when not iterating we can modify the cache without external locking. When making normal modifications to the cache
// the read lock is obtained so that we can allow concurrent modifications; however when we need to iterate over the keys or values of
// the cache the write lock must obtained to prevent any modifications
private final ReadWriteLock iterationLock = new ReentrantReadWriteLock();
private SecurityClient securityClient;
private int scrollSize;
private TimeValue scrollKeepAlive;
private Cancellable pollerCancellable;
private volatile boolean securityIndexExists = false;
public NativeRolesStore(Settings settings, InternalClient client, ThreadPool threadPool) {
public NativeRolesStore(Settings settings, InternalClient client) {
super(settings);
this.client = client;
this.threadPool = threadPool;
this.roleCache = CacheBuilder.<String, RoleAndVersion>builder()
.setMaximumWeight(CACHE_SIZE_SETTING.get(settings))
.setExpireAfterWrite(CACHE_TTL_SETTING.get(settings).getMillis())
.build();
}
public boolean canStart(ClusterState clusterState, boolean master) {
@ -144,15 +147,6 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C
this.securityClient = new SecurityClient(client);
this.scrollSize = SCROLL_SIZE_SETTING.get(settings);
this.scrollKeepAlive = SCROLL_KEEP_ALIVE_SETTING.get(settings);
TimeValue pollInterval = POLL_INTERVAL_SETTING.get(settings);
RolesStorePoller poller = new RolesStorePoller();
try {
poller.doRun();
} catch (Exception e) {
logger.warn("failed to perform initial poll of roles index [{}]. scheduling again in [{}]", e,
SecurityTemplateService.SECURITY_INDEX_NAME, pollInterval);
}
pollerCancellable = threadPool.scheduleWithFixedDelay(poller, pollInterval, Names.GENERIC);
state.set(State.STARTED);
}
} catch (Exception e) {
@ -163,11 +157,7 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C
public void stop() {
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
try {
pollerCancellable.cancel();
} finally {
state.set(State.STOPPED);
}
state.set(State.STOPPED);
}
}
@ -341,16 +331,21 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C
return usageStats;
}
long count = (long) roleCache.size();
for (RoleAndVersion rv : roleCache.values()) {
Role role = rv.getRole();
for (Group group : role.indices()) {
fls = fls || group.hasFields();
dls = dls || group.hasQuery();
}
if (fls && dls) {
break;
long count = roleCache.count();
iterationLock.writeLock().lock();
try {
for (RoleAndVersion rv : roleCache.values()) {
Role role = rv.getRole();
for (Group group : role.indices()) {
fls = fls || group.hasFields();
dls = dls || group.hasQuery();
}
if (fls && dls) {
break;
}
}
} finally {
iterationLock.writeLock().unlock();
}
// slow path - query for necessary information
@ -408,47 +403,53 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C
final AtomicReference<GetResponse> getRef = new AtomicReference<>(null);
final CountDownLatch latch = new CountDownLatch(1);
try {
roleAndVersion = roleCache.computeIfAbsent(roleId, new Function<String, RoleAndVersion>() {
@Override
public RoleAndVersion apply(String key) {
logger.debug("attempting to load role [{}] from index", key);
executeGetRoleRequest(roleId, new LatchedActionListener<>(new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse role) {
getRef.set(role);
roleAndVersion = roleCache.computeIfAbsent(roleId, (key) -> {
logger.debug("attempting to load role [{}] from index", key);
executeGetRoleRequest(roleId, new LatchedActionListener<>(new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse role) {
getRef.set(role);
}
@Override
public void onFailure(Exception t) {
if (t instanceof IndexNotFoundException) {
logger.trace("failed to retrieve role [{}] since security index does not exist", t, roleId);
} else {
logger.error("failed to retrieve role [{}]", t, roleId);
}
@Override
public void onFailure(Exception t) {
if (t instanceof IndexNotFoundException) {
logger.trace("failed to retrieve role [{}] since security index does not exist", t, roleId);
} else {
logger.error("failed to retrieve role [{}]", t, roleId);
}
}
}, latch));
try {
latch.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("timed out retrieving role [{}]", roleId);
}
}, latch));
GetResponse response = getRef.get();
if (response == null) {
return null;
}
try {
latch.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("timed out retrieving role [{}]", roleId);
}
RoleDescriptor descriptor = transformRole(response);
if (descriptor == null) {
return null;
}
logger.debug("loaded role [{}] from index with version [{}]", key, response.getVersion());
GetResponse response = getRef.get();
if (response == null) {
return null;
}
RoleDescriptor descriptor = transformRole(response);
if (descriptor == null) {
return null;
}
logger.debug("loaded role [{}] from index with version [{}]", key, response.getVersion());
iterationLock.readLock().lock();
try {
return new RoleAndVersion(descriptor, response.getVersion());
} finally {
iterationLock.readLock().unlock();
}
});
} catch (RuntimeException e) {
logger.error("could not get or load value from cache for role [{}]", e, roleId);
} catch (ExecutionException e) {
if (e.getCause() instanceof NullPointerException) {
logger.trace("role [{}] was not found", e, roleId);
} else {
logger.error("failed to load role [{}]", e, roleId);
}
}
return roleAndVersion;
@ -490,19 +491,29 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C
if (state != State.STOPPED && state != State.FAILED) {
throw new IllegalStateException("can only reset if stopped!!!");
}
this.roleCache.clear();
invalidateAll();
this.securityIndexExists = false;
this.state.set(State.INITIALIZED);
}
public void invalidateAll() {
logger.debug("invalidating all roles in cache");
roleCache.clear();
iterationLock.readLock().lock();
try {
roleCache.invalidateAll();
} finally {
iterationLock.readLock().unlock();
}
}
public void invalidate(String role) {
logger.debug("invalidating role [{}] in cache", role);
roleCache.remove(role);
iterationLock.readLock().lock();
try {
roleCache.invalidate(role);
} finally {
iterationLock.readLock().unlock();
}
}
private <Response> void clearRoleCache(final String role, ActionListener<Response> listener, Response response) {
@ -560,97 +571,6 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C
}
}
private class RolesStorePoller extends AbstractRunnable {
@Override
protected void doRun() throws Exception {
if (isStopped()) {
return;
}
if (securityIndexExists == false) {
logger.trace("cannot poll for role changes since security index [{}] does not exist",
SecurityTemplateService.SECURITY_INDEX_NAME);
return;
}
// hold a reference to the client since the poller may run after the class is stopped (we don't interrupt it running) and
// we reset when we test which sets the client to null...
final Client client = NativeRolesStore.this.client;
logger.trace("starting polling of roles index to check for changes");
SearchResponse response = null;
// create a copy of the keys in the cache since we will be modifying this list
final Set<String> existingRoles = new HashSet<>(roleCache.keySet());
try {
client.admin().indices().prepareRefresh(SecurityTemplateService.SECURITY_INDEX_NAME);
SearchRequest request = client.prepareSearch(SecurityTemplateService.SECURITY_INDEX_NAME)
.setScroll(scrollKeepAlive)
.setQuery(QueryBuilders.typeQuery(ROLE_DOC_TYPE))
.setSize(scrollSize)
.setFetchSource(true)
.setVersion(true)
.request();
response = client.search(request).actionGet();
boolean keepScrolling = response.getHits().getHits().length > 0;
while (keepScrolling) {
if (isStopped()) {
return;
}
for (SearchHit hit : response.getHits().getHits()) {
final String roleName = hit.getId();
final long version = hit.version();
existingRoles.remove(roleName);
// we use the locking mechanisms provided by the map/cache to help protect against concurrent operations
// that will leave the cache in a bad state
roleCache.computeIfPresent(roleName, new BiFunction<String, RoleAndVersion, RoleAndVersion>() {
@Override
public RoleAndVersion apply(String roleName, RoleAndVersion existing) {
if (version > existing.getVersion()) {
RoleDescriptor rd = transformRole(hit.getId(), hit.getSourceRef());
if (rd != null) {
return new RoleAndVersion(rd, version);
}
}
return existing;
}
});
}
SearchScrollRequest scrollRequest = client.prepareSearchScroll(response.getScrollId())
.setScroll(scrollKeepAlive).request();
response = client.searchScroll(scrollRequest).actionGet();
keepScrolling = response.getHits().getHits().length > 0;
}
// check to see if we had roles that do not exist in the index
if (existingRoles.isEmpty() == false) {
for (String roleName : existingRoles) {
logger.trace("role [{}] does not exist anymore, removing from cache", roleName);
roleCache.remove(roleName);
}
}
} catch (IndexNotFoundException e) {
logger.trace("security index does not exist", e);
} finally {
if (response != null) {
ClearScrollRequest clearScrollRequest = client.prepareClearScroll().addScrollId(response.getScrollId()).request();
client.clearScroll(clearScrollRequest).actionGet();
}
}
logger.trace("completed polling of roles index");
}
@Override
public void onFailure(Exception e) {
logger.error("error occurred while checking the native roles for changes", e);
}
private boolean isStopped() {
State state = state();
return state == State.STOPPED || state == State.STOPPING;
}
}
private static class RoleAndVersion {
private final RoleDescriptor roleDescriptor;
@ -679,6 +599,7 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C
public static void addSettings(List<Setting<?>> settings) {
settings.add(SCROLL_SIZE_SETTING);
settings.add(SCROLL_KEEP_ALIVE_SETTING);
settings.add(POLL_INTERVAL_SETTING);
settings.add(CACHE_SIZE_SETTING);
settings.add(CACHE_TTL_SETTING);
}
}

View File

@ -5,25 +5,14 @@
*/
package org.elasticsearch.integration;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.NativeRealmIntegTestCase;
import org.elasticsearch.test.SecuritySettingsSource;
import org.elasticsearch.xpack.security.SecurityTemplateService;
import org.elasticsearch.xpack.security.action.role.DeleteRoleResponse;
import org.elasticsearch.xpack.security.action.role.GetRolesResponse;
import org.elasticsearch.xpack.security.action.role.PutRoleResponse;
import org.elasticsearch.xpack.security.authc.support.SecuredString;
import org.elasticsearch.xpack.security.authc.support.UsernamePasswordToken;
import org.elasticsearch.xpack.security.authz.RoleDescriptor;
import org.elasticsearch.xpack.security.authz.store.NativeRolesStore;
import org.elasticsearch.xpack.security.client.SecurityClient;
import org.junit.Before;
@ -34,14 +23,12 @@ import java.util.List;
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.NONE;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
/**
* Test for the clear roles API that changes the polling aspect of security to only run once an hour in order to
* test the cache clearing APIs.
* Test for the clear roles API
*/
public class ClearRolesCacheTests extends NativeRealmIntegTestCase {
@ -79,11 +66,8 @@ public class ClearRolesCacheTests extends NativeRealmIntegTestCase {
@Override
public Settings nodeSettings(int nodeOrdinal) {
TimeValue pollerInterval = TimeValue.timeValueMillis((long) randomIntBetween(2, 2000));
logger.debug("using poller interval [{}]", pollerInterval);
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(NativeRolesStore.POLL_INTERVAL_SETTING.getKey(), pollerInterval.getStringRep())
.put(NetworkModule.HTTP_ENABLED.getKey(), true)
.build();
}
@ -109,67 +93,17 @@ public class ClearRolesCacheTests extends NativeRealmIntegTestCase {
assertRolesAreCorrect(securityClient, toModify);
}
public void testModifyingDocumentsDirectly() throws Exception {
int modifiedRolesCount = randomIntBetween(1, roles.length);
List<String> toModify = randomSubsetOf(modifiedRolesCount, roles);
logger.debug("--> modifying roles {} to have run_as", toModify);
final boolean refresh = randomBoolean();
for (String role : toModify) {
UpdateResponse response = internalClient().prepareUpdate().setId(role).setIndex(SecurityTemplateService.SECURITY_INDEX_NAME)
.setType(NativeRolesStore.ROLE_DOC_TYPE)
.setDoc("run_as", new String[] { role })
.setRefreshPolicy(refresh ? IMMEDIATE : NONE)
.get();
assertEquals(DocWriteResponse.Result.UPDATED, response.getResult());
logger.debug("--> updated role [{}] with run_as", role);
public void testDeletingViaApiClearsCache() throws Exception {
final int rolesToDelete = randomIntBetween(1, roles.length - 1);
List<String> toDelete = randomSubsetOf(rolesToDelete, roles);
for (String role : toDelete) {
DeleteRoleResponse response = securityClient().prepareDeleteRole(role).get();
assertTrue(response.found());
}
// in this test, the poller runs too frequently to check the cache still has roles without run as
// clear the cache and we should definitely see the latest values!
SecurityClient securityClient = securityClient(internalCluster().transportClient());
final boolean useHttp = randomBoolean();
final boolean clearAll = randomBoolean();
logger.debug("--> starting to clear roles. using http [{}] clearing all [{}]", useHttp, clearAll);
String[] rolesToClear = clearAll ? (randomBoolean() ? roles : null) : toModify.toArray(new String[toModify.size()]);
if (useHttp) {
String path;
if (rolesToClear == null) {
path = "/_xpack/security/role/" + (randomBoolean() ? "*" : "_all") + "/_clear_cache";
} else {
path = "/_xpack/security/role/" + Strings.arrayToCommaDelimitedString(rolesToClear) + "/_clear_cache";
}
Response response = getRestClient().performRequest("POST", path,
new BasicHeader(UsernamePasswordToken.BASIC_AUTH_HEADER,
UsernamePasswordToken.basicAuthHeaderValue(SecuritySettingsSource.DEFAULT_USER_NAME,
new SecuredString(SecuritySettingsSource.DEFAULT_PASSWORD.toCharArray()))));
assertThat(response.getStatusLine().getStatusCode(), is(RestStatus.OK.getStatus()));
} else {
securityClient.prepareClearRolesCache().names(rolesToClear).get();
}
assertRolesAreCorrect(securityClient, toModify);
}
public void testDeletingRoleDocumentDirectly() throws Exception {
SecurityClient securityClient = securityClient(internalCluster().transportClient());
final String role = randomFrom(roles);
RoleDescriptor[] foundRoles = securityClient.prepareGetRoles().names(role).get().roles();
assertThat(foundRoles.length, is(1));
logger.debug("--> deleting role [{}]", role);
final boolean refresh = randomBoolean();
DeleteResponse response = internalClient()
.prepareDelete(SecurityTemplateService.SECURITY_INDEX_NAME, NativeRolesStore.ROLE_DOC_TYPE, role)
.setRefreshPolicy(refresh ? IMMEDIATE : NONE)
.get();
assertEquals(DocWriteResponse.Result.DELETED, response.getResult());
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(securityClient.prepareGetRoles().names(role).get().roles(), arrayWithSize(0));
}
});
GetRolesResponse roleResponse = securityClient().prepareGetRoles().names(roles).get();
assertTrue(roleResponse.hasRoles());
assertThat(roleResponse.roles().length, is(roles.length - rolesToDelete));
}
private void assertRolesAreCorrect(SecurityClient securityClient, List<String> toModify) {