From 6b4468ea5c8d6bb55b2b6202229d331038dd1bf9 Mon Sep 17 00:00:00 2001 From: Jay Modi Date: Fri, 14 Jul 2017 09:28:28 -0600 Subject: [PATCH] Clear security caches on security index health changes (elastic/x-pack-elasticsearch#1957) This change clears the caches in the native realm and the composite roles store when there is a a change in the health of the security index that necessitates this. When the security index goes to a red state, the caches are left in tact as this allows for management operations to be performed for a limited amount of time. When the index transitions out of the red state or exists when it didn't exist before, the caches will be cleared so that we remove any stale values. relates elastic/x-pack-elasticsearch#1789 Original commit: elastic/x-pack-elasticsearch@914959ea6bc5e350575e793e75e88498251932f7 --- .../xpack/security/Security.java | 5 +- .../security/SecurityLifecycleService.java | 11 ++ .../xpack/security/authc/InternalRealms.java | 15 ++- .../security/authc/esnative/NativeRealm.java | 17 +++ .../authz/store/CompositeRolesStore.java | 15 ++- .../support/IndexLifecycleManager.java | 50 +++++++- .../SecurityLifecycleServiceTests.java | 1 + .../security/authc/InternalRealmsTests.java | 47 ++++++++ .../authc/esnative/NativeRealmTests.java | 65 +++++++++++ .../authz/store/CompositeRolesStoreTests.java | 52 +++++++++ .../support/IndexLifecycleManagerTests.java | 108 +++++++++++++++++- .../security/test/SecurityTestUtils.java | 61 ++++++++++ 12 files changed, 433 insertions(+), 14 deletions(-) create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/security/authc/InternalRealmsTests.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealmTests.java diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/Security.java b/plugin/src/main/java/org/elasticsearch/xpack/security/Security.java index 21b8479d155..21af90b93fe 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -323,8 +323,8 @@ public class Security implements ActionPlugin, IngestPlugin, NetworkPlugin { final ReservedRealm reservedRealm = new ReservedRealm(env, settings, nativeUsersStore, anonymousUser, securityLifecycleService, threadPool.getThreadContext()); Map realmFactories = new HashMap<>(); - realmFactories.putAll(InternalRealms.getFactories(threadPool, resourceWatcherService, - sslService, nativeUsersStore, nativeRoleMappingStore)); + realmFactories.putAll(InternalRealms.getFactories(threadPool, resourceWatcherService, sslService, nativeUsersStore, + nativeRoleMappingStore, securityLifecycleService)); for (XPackExtension extension : extensions) { Map newRealms = extension.getRealms(resourceWatcherService); for (Map.Entry entry : newRealms.entrySet()) { @@ -369,6 +369,7 @@ public class Security implements ActionPlugin, IngestPlugin, NetworkPlugin { } final CompositeRolesStore allRolesStore = new CompositeRolesStore(settings, fileRolesStore, nativeRolesStore, reservedRolesStore, rolesProviders, threadPool.getThreadContext(), licenseState); + securityLifecycleService.addSecurityIndexHealthChangeListener(allRolesStore::onSecurityIndexHealthChange); // to keep things simple, just invalidate all cached entries on license change. this happens so rarely that the impact should be // minimal licenseState.addListener(allRolesStore::invalidateAll); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityLifecycleService.java b/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityLifecycleService.java index 8a527d87029..e4e1cd61518 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityLifecycleService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityLifecycleService.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.health.ClusterIndexHealth; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.LifecycleListener; @@ -26,6 +27,7 @@ import org.elasticsearch.xpack.security.support.IndexLifecycleManager; import java.util.Collections; import java.util.List; +import java.util.function.BiConsumer; import java.util.function.Predicate; /** @@ -147,6 +149,15 @@ public class SecurityLifecycleService extends AbstractComponent implements Clust return securityIndex.checkMappingVersion(requiredVersion); } + /** + * Adds a listener which will be notified when the security index health changes. The previous and + * current health will be provided to the listener so that the listener can determine if any action + * needs to be taken. + */ + public void addSecurityIndexHealthChangeListener(BiConsumer listener) { + securityIndex.addIndexHealthChangeListener(listener); + } + // this is called in a lifecycle listener beforeStop on the cluster service private void close() { if (indexAuditTrail != null) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/authc/InternalRealms.java b/plugin/src/main/java/org/elasticsearch/xpack/security/authc/InternalRealms.java index 0eded110335..27e237c94a0 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/authc/InternalRealms.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/authc/InternalRealms.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; +import org.elasticsearch.xpack.security.SecurityLifecycleService; import org.elasticsearch.xpack.security.authc.esnative.NativeRealm; import org.elasticsearch.xpack.security.authc.esnative.NativeUsersStore; import org.elasticsearch.xpack.security.authc.esnative.ReservedRealm; @@ -62,14 +63,18 @@ public class InternalRealms { * This excludes the {@link ReservedRealm}, as it cannot be created dynamically. * @return A map from realm-type to Factory */ - public static Map getFactories( - ThreadPool threadPool, ResourceWatcherService resourceWatcherService, - SSLService sslService, NativeUsersStore nativeUsersStore, - NativeRoleMappingStore nativeRoleMappingStore) { + public static Map getFactories(ThreadPool threadPool, ResourceWatcherService resourceWatcherService, + SSLService sslService, NativeUsersStore nativeUsersStore, + NativeRoleMappingStore nativeRoleMappingStore, + SecurityLifecycleService securityLifecycleService) { Map map = new HashMap<>(); map.put(FileRealm.TYPE, config -> new FileRealm(config, resourceWatcherService)); - map.put(NativeRealm.TYPE, config -> new NativeRealm(config, nativeUsersStore)); + map.put(NativeRealm.TYPE, config -> { + final NativeRealm nativeRealm = new NativeRealm(config, nativeUsersStore); + securityLifecycleService.addSecurityIndexHealthChangeListener(nativeRealm::onSecurityIndexHealthChange); + return nativeRealm; + }); map.put(LdapRealm.AD_TYPE, config -> new LdapRealm(LdapRealm.AD_TYPE, config, sslService, resourceWatcherService, nativeRoleMappingStore, threadPool)); map.put(LdapRealm.LDAP_TYPE, config -> new LdapRealm(LdapRealm.LDAP_TYPE, config, diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealm.java b/plugin/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealm.java index 0e7736018d5..85c7aa11e21 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealm.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealm.java @@ -6,6 +6,8 @@ package org.elasticsearch.xpack.security.authc.esnative; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.health.ClusterIndexHealth; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.xpack.security.authc.AuthenticationResult; import org.elasticsearch.xpack.security.authc.RealmConfig; @@ -39,6 +41,21 @@ public class NativeRealm extends CachingUsernamePasswordRealm { userStore.verifyPassword(token.principal(), token.credentials(), listener); } + public void onSecurityIndexHealthChange(ClusterIndexHealth previousHealth, ClusterIndexHealth currentHealth) { + final boolean movedFromRedToNonRed = (previousHealth == null || previousHealth.getStatus() == ClusterHealthStatus.RED) + && currentHealth != null && currentHealth.getStatus() != ClusterHealthStatus.RED; + final boolean indexDeleted = previousHealth != null && currentHealth == null; + + if (movedFromRedToNonRed || indexDeleted) { + clearCache(); + } + } + + // method is used for testing to verify cache expiration since expireAll is final + void clearCache() { + expireAll(); + } + /** * @return The {@link Setting setting configuration} for this realm type */ diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStore.java b/plugin/src/main/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStore.java index 8c6c69bcfa2..e3835c5e219 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStore.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStore.java @@ -6,6 +6,8 @@ package org.elasticsearch.xpack.security.authz.store; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.health.ClusterIndexHealth; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.cache.Cache; @@ -83,8 +85,7 @@ public class CompositeRolesStore extends AbstractComponent { public CompositeRolesStore(Settings settings, FileRolesStore fileRolesStore, NativeRolesStore nativeRolesStore, ReservedRolesStore reservedRolesStore, List, ActionListener>>> rolesProviders, - ThreadContext threadContext, - XPackLicenseState licenseState) { + ThreadContext threadContext, XPackLicenseState licenseState) { super(settings); this.fileRolesStore = fileRolesStore; // invalidating all on a file based role update is heavy handed to say the least, but in general this should be infrequent so the @@ -289,6 +290,16 @@ public class CompositeRolesStore extends AbstractComponent { }, listener::onFailure)); } + public void onSecurityIndexHealthChange(ClusterIndexHealth previousHealth, ClusterIndexHealth currentHealth) { + final boolean movedFromRedToNonRed = (previousHealth == null || previousHealth.getStatus() == ClusterHealthStatus.RED) + && currentHealth != null && currentHealth.getStatus() != ClusterHealthStatus.RED; + final boolean indexDeleted = previousHealth != null && currentHealth == null; + + if (movedFromRedToNonRed || indexDeleted) { + invalidateAll(); + } + } + /** * A mutable class that can be used to represent the combination of one or more {@link IndicesPrivileges} */ diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/support/IndexLifecycleManager.java b/plugin/src/main/java/org/elasticsearch/xpack/security/support/IndexLifecycleManager.java index a229ea3bb12..04cf5f432f1 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/support/IndexLifecycleManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/support/IndexLifecycleManager.java @@ -5,15 +5,16 @@ */ package org.elasticsearch.xpack.security.support; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import java.util.function.Predicate; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -34,6 +35,7 @@ import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateReque import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.health.ClusterIndexHealth; import org.elasticsearch.cluster.metadata.AliasOrIndex; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; @@ -82,6 +84,8 @@ public class IndexLifecycleManager extends AbstractComponent { private final AtomicReference migrateDataState = new AtomicReference<>(UpgradeState.NOT_STARTED); private final AtomicInteger migrateDataAttempts = new AtomicInteger(0); + private final List> indexHealthChangeListeners = new CopyOnWriteArrayList<>(); + private volatile boolean templateIsUpToDate; private volatile boolean indexExists; private volatile boolean isIndexUpToDate; @@ -155,9 +159,18 @@ public class IndexLifecycleManager extends AbstractComponent { return this.migrateDataState.get(); } + /** + * Adds a listener which will be notified when the security index health changes. The previous and + * current health will be provided to the listener so that the listener can determine if any action + * needs to be taken. + */ + public void addIndexHealthChangeListener(BiConsumer listener) { + indexHealthChangeListeners.add(listener); + } + public void clusterChanged(ClusterChangedEvent event) { - final ClusterState state = event.state(); - processClusterState(state); + processClusterState(event.state()); + checkIndexHealthChange(event); } private void processClusterState(ClusterState state) { @@ -183,6 +196,37 @@ public class IndexLifecycleManager extends AbstractComponent { } } + private void checkIndexHealthChange(ClusterChangedEvent event) { + final ClusterState state = event.state(); + final ClusterState previousState = event.previousState(); + final IndexMetaData indexMetaData = resolveConcreteIndex(indexName, state.metaData()); + final IndexMetaData previousIndexMetaData = resolveConcreteIndex(indexName, previousState.metaData()); + if (indexMetaData != null) { + final ClusterIndexHealth currentHealth = + new ClusterIndexHealth(indexMetaData, state.getRoutingTable().index(indexMetaData.getIndex())); + final ClusterIndexHealth previousHealth = previousIndexMetaData != null ? new ClusterIndexHealth(previousIndexMetaData, + previousState.getRoutingTable().index(previousIndexMetaData.getIndex())) : null; + + if (previousHealth == null || previousHealth.getStatus() != currentHealth.getStatus()) { + notifyIndexHealthChangeListeners(previousHealth, currentHealth); + } + } else if (previousIndexMetaData != null) { + final ClusterIndexHealth previousHealth = + new ClusterIndexHealth(previousIndexMetaData, previousState.getRoutingTable().index(previousIndexMetaData.getIndex())); + notifyIndexHealthChangeListeners(previousHealth, null); + } + } + + private void notifyIndexHealthChangeListeners(ClusterIndexHealth previousHealth, ClusterIndexHealth currentHealth) { + for (BiConsumer consumer : indexHealthChangeListeners) { + try { + consumer.accept(previousHealth, currentHealth); + } catch (Exception e) { + logger.warn(new ParameterizedMessage("failed to notify listener [{}] of index health change", consumer), e); + } + } + } + private boolean checkIndexAvailable(ClusterState state) { final IndexRoutingTable routingTable = getIndexRoutingTable(state); if (routingTable != null && routingTable.allPrimaryShardsActive()) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/security/SecurityLifecycleServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/security/SecurityLifecycleServiceTests.java index d822c1efaa2..a0dc3d5ed4e 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/security/SecurityLifecycleServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/security/SecurityLifecycleServiceTests.java @@ -395,6 +395,7 @@ public class SecurityLifecycleServiceTests extends ESTestCase { clusterStateBuilder.metaData(metaDataBuilder); } + clusterStateBuilder.routingTable(SecurityTestUtils.buildIndexRoutingTable(securityIndexName)); return clusterStateBuilder; } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/security/authc/InternalRealmsTests.java b/plugin/src/test/java/org/elasticsearch/xpack/security/authc/InternalRealmsTests.java new file mode 100644 index 00000000000..e68b709a280 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/security/authc/InternalRealmsTests.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.security.authc; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.ResourceWatcherService; +import org.elasticsearch.xpack.security.SecurityLifecycleService; +import org.elasticsearch.xpack.security.authc.esnative.NativeRealm; +import org.elasticsearch.xpack.security.authc.esnative.NativeUsersStore; +import org.elasticsearch.xpack.security.authc.support.mapper.NativeRoleMappingStore; +import org.elasticsearch.xpack.ssl.SSLService; + +import java.util.Map; +import java.util.function.BiConsumer; + +import static org.elasticsearch.mock.orig.Mockito.times; +import static org.hamcrest.Matchers.any; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.is; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; + +public class InternalRealmsTests extends ESTestCase { + + public void testNativeRealmRegistersIndexHealthChangeListener() throws Exception { + SecurityLifecycleService lifecycleService = mock(SecurityLifecycleService.class); + Map factories = InternalRealms.getFactories(mock(ThreadPool.class), mock(ResourceWatcherService.class), + mock(SSLService.class), mock(NativeUsersStore.class), mock(NativeRoleMappingStore.class), lifecycleService); + assertThat(factories, hasEntry(is(NativeRealm.TYPE), any(Realm.Factory.class))); + verifyZeroInteractions(lifecycleService); + + Settings settings = Settings.builder().put("path.home", createTempDir()).build(); + factories.get(NativeRealm.TYPE).create(new RealmConfig("test", Settings.EMPTY, settings, new ThreadContext(settings))); + verify(lifecycleService).addSecurityIndexHealthChangeListener(isA(BiConsumer.class)); + + factories.get(NativeRealm.TYPE).create(new RealmConfig("test", Settings.EMPTY, settings, new ThreadContext(settings))); + verify(lifecycleService, times(2)).addSecurityIndexHealthChangeListener(isA(BiConsumer.class)); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealmTests.java b/plugin/src/test/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealmTests.java new file mode 100644 index 00000000000..eb6b3de2f51 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealmTests.java @@ -0,0 +1,65 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.security.authc.esnative; + +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.health.ClusterIndexHealth; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.security.authc.RealmConfig; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.xpack.security.test.SecurityTestUtils.getClusterIndexHealth; +import static org.mockito.Mockito.mock; + +public class NativeRealmTests extends ESTestCase { + + public void testCacheClearOnIndexHealthChange() { + final AtomicInteger numInvalidation = new AtomicInteger(0); + int expectedInvalidation = 0; + Settings settings = Settings.builder().put("path.home", createTempDir()).build(); + RealmConfig config = new RealmConfig("native", Settings.EMPTY, settings, new ThreadContext(settings)); + final NativeRealm nativeRealm = new NativeRealm(config, mock(NativeUsersStore.class)) { + @Override + void clearCache() { + numInvalidation.incrementAndGet(); + } + }; + + // existing to no longer present + ClusterIndexHealth previousHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW)); + ClusterIndexHealth currentHealth = null; + nativeRealm.onSecurityIndexHealthChange(previousHealth, currentHealth); + assertEquals(++expectedInvalidation, numInvalidation.get()); + + // doesn't exist to exists + previousHealth = null; + currentHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW)); + nativeRealm.onSecurityIndexHealthChange(previousHealth, currentHealth); + assertEquals(++expectedInvalidation, numInvalidation.get()); + + // green or yellow to red + previousHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW)); + currentHealth = getClusterIndexHealth(ClusterHealthStatus.RED); + nativeRealm.onSecurityIndexHealthChange(previousHealth, currentHealth); + assertEquals(expectedInvalidation, numInvalidation.get()); + + // red to non red + previousHealth = getClusterIndexHealth(ClusterHealthStatus.RED); + currentHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW)); + nativeRealm.onSecurityIndexHealthChange(previousHealth, currentHealth); + assertEquals(++expectedInvalidation, numInvalidation.get()); + + // green to yellow or yellow to green + previousHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW)); + currentHealth = getClusterIndexHealth( + previousHealth.getStatus() == ClusterHealthStatus.GREEN ? ClusterHealthStatus.YELLOW : ClusterHealthStatus.GREEN); + nativeRealm.onSecurityIndexHealthChange(previousHealth, currentHealth); + assertEquals(expectedInvalidation, numInvalidation.get()); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStoreTests.java b/plugin/src/test/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStoreTests.java index 7424db98d03..cafe3fc5269 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStoreTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStoreTests.java @@ -8,6 +8,8 @@ package org.elasticsearch.xpack.security.authz.store; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.health.ClusterIndexHealth; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.settings.Settings; @@ -31,11 +33,13 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Function; import static org.elasticsearch.mock.orig.Mockito.times; import static org.elasticsearch.mock.orig.Mockito.verifyNoMoreInteractions; +import static org.elasticsearch.xpack.security.test.SecurityTestUtils.getClusterIndexHealth; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.any; @@ -209,6 +213,9 @@ public class CompositeRolesStoreTests extends ESTestCase { verify(reservedRolesStore, times(2)).roleDescriptors(); } verifyNoMoreInteractions(fileRolesStore, reservedRolesStore, nativeRolesStore); + + // force a cache clear + } public void testCustomRolesProviders() { @@ -430,6 +437,51 @@ public class CompositeRolesStoreTests extends ESTestCase { assertEquals(0, role.indices().groups().length); } + public void testCacheClearOnIndexHealthChange() { + final AtomicInteger numInvalidation = new AtomicInteger(0); + + CompositeRolesStore compositeRolesStore = new CompositeRolesStore( + Settings.EMPTY, mock(FileRolesStore.class), mock(NativeRolesStore.class), mock(ReservedRolesStore.class), + Collections.emptyList(), new ThreadContext(Settings.EMPTY), new XPackLicenseState()) { + @Override + public void invalidateAll() { + numInvalidation.incrementAndGet(); + } + }; + + int expectedInvalidation = 0; + // existing to no longer present + ClusterIndexHealth previousHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW)); + ClusterIndexHealth currentHealth = null; + compositeRolesStore.onSecurityIndexHealthChange(previousHealth, currentHealth); + assertEquals(++expectedInvalidation, numInvalidation.get()); + + // doesn't exist to exists + previousHealth = null; + currentHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW)); + compositeRolesStore.onSecurityIndexHealthChange(previousHealth, currentHealth); + assertEquals(++expectedInvalidation, numInvalidation.get()); + + // green or yellow to red + previousHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW)); + currentHealth = getClusterIndexHealth(ClusterHealthStatus.RED); + compositeRolesStore.onSecurityIndexHealthChange(previousHealth, currentHealth); + assertEquals(expectedInvalidation, numInvalidation.get()); + + // red to non red + previousHealth = getClusterIndexHealth(ClusterHealthStatus.RED); + currentHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW)); + compositeRolesStore.onSecurityIndexHealthChange(previousHealth, currentHealth); + assertEquals(++expectedInvalidation, numInvalidation.get()); + + // green to yellow or yellow to green + previousHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW)); + currentHealth = getClusterIndexHealth( + previousHealth.getStatus() == ClusterHealthStatus.GREEN ? ClusterHealthStatus.YELLOW : ClusterHealthStatus.GREEN); + compositeRolesStore.onSecurityIndexHealthChange(previousHealth, currentHealth); + assertEquals(expectedInvalidation, numInvalidation.get()); + } + private static class InMemoryRolesProvider implements BiConsumer, ActionListener>> { private final Function, Set> roleDescriptorsFunc; diff --git a/plugin/src/test/java/org/elasticsearch/xpack/security/support/IndexLifecycleManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/security/support/IndexLifecycleManagerTests.java index 18f2b27b986..2ebfa208411 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/security/support/IndexLifecycleManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/security/support/IndexLifecycleManagerTests.java @@ -10,7 +10,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import org.elasticsearch.Version; import org.elasticsearch.action.Action; @@ -28,15 +31,25 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.health.ClusterIndexHealth; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.security.InternalClient; @@ -46,11 +59,11 @@ import org.hamcrest.Matchers; import org.junit.Before; import org.mockito.Mockito; +import static org.elasticsearch.cluster.routing.RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE; import static org.elasticsearch.xpack.security.support.IndexLifecycleManager.NULL_MIGRATOR; import static org.elasticsearch.xpack.security.support.IndexLifecycleManager.TEMPLATE_VERSION_PATTERN; import static org.hamcrest.Matchers.notNullValue; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.notNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -108,6 +121,17 @@ public class IndexLifecycleManagerTests extends ESTestCase { assertInitialState(); final ClusterState.Builder clusterStateBuilder = createClusterState(INDEX_NAME, TEMPLATE_NAME); + Index index = new Index(INDEX_NAME, UUID.randomUUID().toString()); + ShardRouting shardRouting = ShardRouting.newUnassigned(new ShardId(index, 0), true, EXISTING_STORE_INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")); + String nodeId = ESTestCase.randomAlphaOfLength(8); + IndexShardRoutingTable table = new IndexShardRoutingTable.Builder(new ShardId(index, 0)) + .addShard(shardRouting.initialize(nodeId, null, shardRouting.getExpectedShardSize()) + .moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, ""))) + .build(); + clusterStateBuilder.routingTable(RoutingTable.builder() + .add(IndexRoutingTable.builder(index).addIndexShard(table).build()) + .build()); manager.clusterChanged(event(clusterStateBuilder)); assertIndexUpToDateButNotAvailable(); @@ -220,6 +244,87 @@ public class IndexLifecycleManagerTests extends ESTestCase { assertCompleteState(true); } + public void testIndexHealthChangeListeners() throws Exception { + final AtomicBoolean listenerCalled = new AtomicBoolean(false); + final AtomicReference previousHealth = new AtomicReference<>(); + final AtomicReference currentHealth = new AtomicReference<>(); + final BiConsumer listener = (prevState, state) -> { + previousHealth.set(prevState); + currentHealth.set(state); + listenerCalled.set(true); + }; + + if (randomBoolean()) { + if (randomBoolean()) { + manager.addIndexHealthChangeListener(listener); + manager.addIndexHealthChangeListener((prevState, state) -> { + throw new RuntimeException("throw after listener"); + }); + } else { + manager.addIndexHealthChangeListener((prevState, state) -> { + throw new RuntimeException("throw before listener"); + }); + manager.addIndexHealthChangeListener(listener); + } + } else { + manager.addIndexHealthChangeListener(listener); + } + + // index doesn't exist and now exists + final ClusterState.Builder clusterStateBuilder = createClusterState(INDEX_NAME, TEMPLATE_NAME); + markShardsAvailable(clusterStateBuilder); + manager.clusterChanged(event(clusterStateBuilder)); + + assertTrue(listenerCalled.get()); + assertNull(previousHealth.get()); + assertEquals(ClusterHealthStatus.GREEN, currentHealth.get().getStatus()); + + // reset and call with no change to the index + listenerCalled.set(false); + previousHealth.set(null); + currentHealth.set(null); + ClusterChangedEvent event = new ClusterChangedEvent("same index health", clusterStateBuilder.build(), clusterStateBuilder.build()); + manager.clusterChanged(event); + + assertFalse(listenerCalled.get()); + assertNull(previousHealth.get()); + assertNull(currentHealth.get()); + + // index with different health + listenerCalled.set(false); + previousHealth.set(null); + currentHealth.set(null); + ClusterState previousState = clusterStateBuilder.build(); + Index prevIndex = previousState.getRoutingTable().index(INDEX_NAME).getIndex(); + clusterStateBuilder.routingTable(RoutingTable.builder() + .add(IndexRoutingTable.builder(prevIndex) + .addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(prevIndex, 0)) + .addShard(ShardRouting.newUnassigned(new ShardId(prevIndex, 0), true, EXISTING_STORE_INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")) + .initialize(UUIDs.randomBase64UUID(random()), null, 0L) + .moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, ""))) + .build())) + .build()); + + + + event = new ClusterChangedEvent("different index health", clusterStateBuilder.build(), previousState); + manager.clusterChanged(event); + assertTrue(listenerCalled.get()); + assertEquals(ClusterHealthStatus.GREEN, previousHealth.get().getStatus()); + assertEquals(ClusterHealthStatus.RED, currentHealth.get().getStatus()); + + // swap prev and current + listenerCalled.set(false); + previousHealth.set(null); + currentHealth.set(null); + event = new ClusterChangedEvent("different index health swapped", previousState, clusterStateBuilder.build()); + manager.clusterChanged(event); + assertTrue(listenerCalled.get()); + assertEquals(ClusterHealthStatus.RED, previousHealth.get().getStatus()); + assertEquals(ClusterHealthStatus.GREEN, currentHealth.get().getStatus()); + } + private void assertInitialState() { assertThat(manager.indexExists(), Matchers.equalTo(false)); assertThat(manager.isAvailable(), Matchers.equalTo(false)); @@ -368,5 +473,4 @@ public class IndexLifecycleManagerTests extends ESTestCase { final String resource = "/" + templateName + ".json"; return TemplateUtils.loadTemplate(resource, Version.CURRENT.toString(), TEMPLATE_VERSION_PATTERN); } - } \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/security/test/SecurityTestUtils.java b/plugin/src/test/java/org/elasticsearch/xpack/security/test/SecurityTestUtils.java index 115cde86a7f..0b109874061 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/security/test/SecurityTestUtils.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/security/test/SecurityTestUtils.java @@ -6,16 +6,21 @@ package org.elasticsearch.xpack.security.test; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.health.ClusterIndexHealth; import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; @@ -29,6 +34,7 @@ import java.util.UUID; import static org.elasticsearch.cluster.routing.RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE; import static org.elasticsearch.xpack.security.SecurityLifecycleService.SECURITY_INDEX_NAME; +import static org.junit.Assert.assertEquals; public class SecurityTestUtils { @@ -87,4 +93,59 @@ public class SecurityTestUtils { return metaDataBuilder.build(); } + public static ClusterIndexHealth getClusterIndexHealth(ClusterHealthStatus status) { + IndexMetaData metaData = IndexMetaData.builder("foo").settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build()) + .build(); + final IndexRoutingTable routingTable; + switch (status) { + case RED: + routingTable = IndexRoutingTable.builder(metaData.getIndex()) + .addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(metaData.getIndex(), 0)) + .addShard(ShardRouting.newUnassigned(new ShardId(metaData.getIndex(), 0), true, EXISTING_STORE_INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")) + .initialize(ESTestCase.randomAlphaOfLength(8), null, 0L)) + .addShard(ShardRouting.newUnassigned(new ShardId(metaData.getIndex(), 0), false, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")) + .initialize(ESTestCase.randomAlphaOfLength(8), null, 0L)) + .build()) + .build(); + break; + case YELLOW: + routingTable = IndexRoutingTable.builder(metaData.getIndex()) + .addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(metaData.getIndex(), 0)) + .addShard(ShardRouting.newUnassigned(new ShardId(metaData.getIndex(), 0), true, EXISTING_STORE_INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")) + .initialize(ESTestCase.randomAlphaOfLength(8), null, 0L).moveToStarted()) + .addShard(ShardRouting.newUnassigned(new ShardId(metaData.getIndex(), 0), false, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")) + .initialize(ESTestCase.randomAlphaOfLength(8), null, 0L)) + .build()) + .build(); + break; + case GREEN: + routingTable = IndexRoutingTable.builder(metaData.getIndex()) + .addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(metaData.getIndex(), 0)) + .addShard(ShardRouting.newUnassigned(new ShardId(metaData.getIndex(), 0), true, EXISTING_STORE_INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")) + .initialize(ESTestCase.randomAlphaOfLength(8), null, 0L).moveToStarted()) + .addShard(ShardRouting.newUnassigned(new ShardId(metaData.getIndex(), 0), false, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")) + .initialize(ESTestCase.randomAlphaOfLength(8), null, 0L).moveToStarted()) + .build()) + .build(); + break; + default: + throw new IllegalStateException("unknown status: " + status); + } + ClusterIndexHealth health = new ClusterIndexHealth(metaData, routingTable); + assertEquals(status, health.getStatus()); + return health; + } }