diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/Security.java b/plugin/src/main/java/org/elasticsearch/xpack/security/Security.java index 21b8479d155..21af90b93fe 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -323,8 +323,8 @@ public class Security implements ActionPlugin, IngestPlugin, NetworkPlugin { final ReservedRealm reservedRealm = new ReservedRealm(env, settings, nativeUsersStore, anonymousUser, securityLifecycleService, threadPool.getThreadContext()); Map realmFactories = new HashMap<>(); - realmFactories.putAll(InternalRealms.getFactories(threadPool, resourceWatcherService, - sslService, nativeUsersStore, nativeRoleMappingStore)); + realmFactories.putAll(InternalRealms.getFactories(threadPool, resourceWatcherService, sslService, nativeUsersStore, + nativeRoleMappingStore, securityLifecycleService)); for (XPackExtension extension : extensions) { Map newRealms = extension.getRealms(resourceWatcherService); for (Map.Entry entry : newRealms.entrySet()) { @@ -369,6 +369,7 @@ public class Security implements ActionPlugin, IngestPlugin, NetworkPlugin { } final CompositeRolesStore allRolesStore = new CompositeRolesStore(settings, fileRolesStore, nativeRolesStore, reservedRolesStore, rolesProviders, threadPool.getThreadContext(), licenseState); + securityLifecycleService.addSecurityIndexHealthChangeListener(allRolesStore::onSecurityIndexHealthChange); // to keep things simple, just invalidate all cached entries on license change. this happens so rarely that the impact should be // minimal licenseState.addListener(allRolesStore::invalidateAll); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityLifecycleService.java b/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityLifecycleService.java index 8a527d87029..e4e1cd61518 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityLifecycleService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityLifecycleService.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.health.ClusterIndexHealth; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.LifecycleListener; @@ -26,6 +27,7 @@ import org.elasticsearch.xpack.security.support.IndexLifecycleManager; import java.util.Collections; import java.util.List; +import java.util.function.BiConsumer; import java.util.function.Predicate; /** @@ -147,6 +149,15 @@ public class SecurityLifecycleService extends AbstractComponent implements Clust return securityIndex.checkMappingVersion(requiredVersion); } + /** + * Adds a listener which will be notified when the security index health changes. The previous and + * current health will be provided to the listener so that the listener can determine if any action + * needs to be taken. + */ + public void addSecurityIndexHealthChangeListener(BiConsumer listener) { + securityIndex.addIndexHealthChangeListener(listener); + } + // this is called in a lifecycle listener beforeStop on the cluster service private void close() { if (indexAuditTrail != null) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/authc/InternalRealms.java b/plugin/src/main/java/org/elasticsearch/xpack/security/authc/InternalRealms.java index 0eded110335..27e237c94a0 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/authc/InternalRealms.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/authc/InternalRealms.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; +import org.elasticsearch.xpack.security.SecurityLifecycleService; import org.elasticsearch.xpack.security.authc.esnative.NativeRealm; import org.elasticsearch.xpack.security.authc.esnative.NativeUsersStore; import org.elasticsearch.xpack.security.authc.esnative.ReservedRealm; @@ -62,14 +63,18 @@ public class InternalRealms { * This excludes the {@link ReservedRealm}, as it cannot be created dynamically. * @return A map from realm-type to Factory */ - public static Map getFactories( - ThreadPool threadPool, ResourceWatcherService resourceWatcherService, - SSLService sslService, NativeUsersStore nativeUsersStore, - NativeRoleMappingStore nativeRoleMappingStore) { + public static Map getFactories(ThreadPool threadPool, ResourceWatcherService resourceWatcherService, + SSLService sslService, NativeUsersStore nativeUsersStore, + NativeRoleMappingStore nativeRoleMappingStore, + SecurityLifecycleService securityLifecycleService) { Map map = new HashMap<>(); map.put(FileRealm.TYPE, config -> new FileRealm(config, resourceWatcherService)); - map.put(NativeRealm.TYPE, config -> new NativeRealm(config, nativeUsersStore)); + map.put(NativeRealm.TYPE, config -> { + final NativeRealm nativeRealm = new NativeRealm(config, nativeUsersStore); + securityLifecycleService.addSecurityIndexHealthChangeListener(nativeRealm::onSecurityIndexHealthChange); + return nativeRealm; + }); map.put(LdapRealm.AD_TYPE, config -> new LdapRealm(LdapRealm.AD_TYPE, config, sslService, resourceWatcherService, nativeRoleMappingStore, threadPool)); map.put(LdapRealm.LDAP_TYPE, config -> new LdapRealm(LdapRealm.LDAP_TYPE, config, diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealm.java b/plugin/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealm.java index 0e7736018d5..85c7aa11e21 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealm.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealm.java @@ -6,6 +6,8 @@ package org.elasticsearch.xpack.security.authc.esnative; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.health.ClusterIndexHealth; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.xpack.security.authc.AuthenticationResult; import org.elasticsearch.xpack.security.authc.RealmConfig; @@ -39,6 +41,21 @@ public class NativeRealm extends CachingUsernamePasswordRealm { userStore.verifyPassword(token.principal(), token.credentials(), listener); } + public void onSecurityIndexHealthChange(ClusterIndexHealth previousHealth, ClusterIndexHealth currentHealth) { + final boolean movedFromRedToNonRed = (previousHealth == null || previousHealth.getStatus() == ClusterHealthStatus.RED) + && currentHealth != null && currentHealth.getStatus() != ClusterHealthStatus.RED; + final boolean indexDeleted = previousHealth != null && currentHealth == null; + + if (movedFromRedToNonRed || indexDeleted) { + clearCache(); + } + } + + // method is used for testing to verify cache expiration since expireAll is final + void clearCache() { + expireAll(); + } + /** * @return The {@link Setting setting configuration} for this realm type */ diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStore.java b/plugin/src/main/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStore.java index 8c6c69bcfa2..e3835c5e219 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStore.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStore.java @@ -6,6 +6,8 @@ package org.elasticsearch.xpack.security.authz.store; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.health.ClusterIndexHealth; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.cache.Cache; @@ -83,8 +85,7 @@ public class CompositeRolesStore extends AbstractComponent { public CompositeRolesStore(Settings settings, FileRolesStore fileRolesStore, NativeRolesStore nativeRolesStore, ReservedRolesStore reservedRolesStore, List, ActionListener>>> rolesProviders, - ThreadContext threadContext, - XPackLicenseState licenseState) { + ThreadContext threadContext, XPackLicenseState licenseState) { super(settings); this.fileRolesStore = fileRolesStore; // invalidating all on a file based role update is heavy handed to say the least, but in general this should be infrequent so the @@ -289,6 +290,16 @@ public class CompositeRolesStore extends AbstractComponent { }, listener::onFailure)); } + public void onSecurityIndexHealthChange(ClusterIndexHealth previousHealth, ClusterIndexHealth currentHealth) { + final boolean movedFromRedToNonRed = (previousHealth == null || previousHealth.getStatus() == ClusterHealthStatus.RED) + && currentHealth != null && currentHealth.getStatus() != ClusterHealthStatus.RED; + final boolean indexDeleted = previousHealth != null && currentHealth == null; + + if (movedFromRedToNonRed || indexDeleted) { + invalidateAll(); + } + } + /** * A mutable class that can be used to represent the combination of one or more {@link IndicesPrivileges} */ diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/support/IndexLifecycleManager.java b/plugin/src/main/java/org/elasticsearch/xpack/security/support/IndexLifecycleManager.java index a229ea3bb12..04cf5f432f1 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/support/IndexLifecycleManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/support/IndexLifecycleManager.java @@ -5,15 +5,16 @@ */ package org.elasticsearch.xpack.security.support; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import java.util.function.Predicate; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -34,6 +35,7 @@ import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateReque import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.health.ClusterIndexHealth; import org.elasticsearch.cluster.metadata.AliasOrIndex; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; @@ -82,6 +84,8 @@ public class IndexLifecycleManager extends AbstractComponent { private final AtomicReference migrateDataState = new AtomicReference<>(UpgradeState.NOT_STARTED); private final AtomicInteger migrateDataAttempts = new AtomicInteger(0); + private final List> indexHealthChangeListeners = new CopyOnWriteArrayList<>(); + private volatile boolean templateIsUpToDate; private volatile boolean indexExists; private volatile boolean isIndexUpToDate; @@ -155,9 +159,18 @@ public class IndexLifecycleManager extends AbstractComponent { return this.migrateDataState.get(); } + /** + * Adds a listener which will be notified when the security index health changes. The previous and + * current health will be provided to the listener so that the listener can determine if any action + * needs to be taken. + */ + public void addIndexHealthChangeListener(BiConsumer listener) { + indexHealthChangeListeners.add(listener); + } + public void clusterChanged(ClusterChangedEvent event) { - final ClusterState state = event.state(); - processClusterState(state); + processClusterState(event.state()); + checkIndexHealthChange(event); } private void processClusterState(ClusterState state) { @@ -183,6 +196,37 @@ public class IndexLifecycleManager extends AbstractComponent { } } + private void checkIndexHealthChange(ClusterChangedEvent event) { + final ClusterState state = event.state(); + final ClusterState previousState = event.previousState(); + final IndexMetaData indexMetaData = resolveConcreteIndex(indexName, state.metaData()); + final IndexMetaData previousIndexMetaData = resolveConcreteIndex(indexName, previousState.metaData()); + if (indexMetaData != null) { + final ClusterIndexHealth currentHealth = + new ClusterIndexHealth(indexMetaData, state.getRoutingTable().index(indexMetaData.getIndex())); + final ClusterIndexHealth previousHealth = previousIndexMetaData != null ? new ClusterIndexHealth(previousIndexMetaData, + previousState.getRoutingTable().index(previousIndexMetaData.getIndex())) : null; + + if (previousHealth == null || previousHealth.getStatus() != currentHealth.getStatus()) { + notifyIndexHealthChangeListeners(previousHealth, currentHealth); + } + } else if (previousIndexMetaData != null) { + final ClusterIndexHealth previousHealth = + new ClusterIndexHealth(previousIndexMetaData, previousState.getRoutingTable().index(previousIndexMetaData.getIndex())); + notifyIndexHealthChangeListeners(previousHealth, null); + } + } + + private void notifyIndexHealthChangeListeners(ClusterIndexHealth previousHealth, ClusterIndexHealth currentHealth) { + for (BiConsumer consumer : indexHealthChangeListeners) { + try { + consumer.accept(previousHealth, currentHealth); + } catch (Exception e) { + logger.warn(new ParameterizedMessage("failed to notify listener [{}] of index health change", consumer), e); + } + } + } + private boolean checkIndexAvailable(ClusterState state) { final IndexRoutingTable routingTable = getIndexRoutingTable(state); if (routingTable != null && routingTable.allPrimaryShardsActive()) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/security/SecurityLifecycleServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/security/SecurityLifecycleServiceTests.java index d822c1efaa2..a0dc3d5ed4e 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/security/SecurityLifecycleServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/security/SecurityLifecycleServiceTests.java @@ -395,6 +395,7 @@ public class SecurityLifecycleServiceTests extends ESTestCase { clusterStateBuilder.metaData(metaDataBuilder); } + clusterStateBuilder.routingTable(SecurityTestUtils.buildIndexRoutingTable(securityIndexName)); return clusterStateBuilder; } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/security/authc/InternalRealmsTests.java b/plugin/src/test/java/org/elasticsearch/xpack/security/authc/InternalRealmsTests.java new file mode 100644 index 00000000000..e68b709a280 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/security/authc/InternalRealmsTests.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.security.authc; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.ResourceWatcherService; +import org.elasticsearch.xpack.security.SecurityLifecycleService; +import org.elasticsearch.xpack.security.authc.esnative.NativeRealm; +import org.elasticsearch.xpack.security.authc.esnative.NativeUsersStore; +import org.elasticsearch.xpack.security.authc.support.mapper.NativeRoleMappingStore; +import org.elasticsearch.xpack.ssl.SSLService; + +import java.util.Map; +import java.util.function.BiConsumer; + +import static org.elasticsearch.mock.orig.Mockito.times; +import static org.hamcrest.Matchers.any; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.is; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; + +public class InternalRealmsTests extends ESTestCase { + + public void testNativeRealmRegistersIndexHealthChangeListener() throws Exception { + SecurityLifecycleService lifecycleService = mock(SecurityLifecycleService.class); + Map factories = InternalRealms.getFactories(mock(ThreadPool.class), mock(ResourceWatcherService.class), + mock(SSLService.class), mock(NativeUsersStore.class), mock(NativeRoleMappingStore.class), lifecycleService); + assertThat(factories, hasEntry(is(NativeRealm.TYPE), any(Realm.Factory.class))); + verifyZeroInteractions(lifecycleService); + + Settings settings = Settings.builder().put("path.home", createTempDir()).build(); + factories.get(NativeRealm.TYPE).create(new RealmConfig("test", Settings.EMPTY, settings, new ThreadContext(settings))); + verify(lifecycleService).addSecurityIndexHealthChangeListener(isA(BiConsumer.class)); + + factories.get(NativeRealm.TYPE).create(new RealmConfig("test", Settings.EMPTY, settings, new ThreadContext(settings))); + verify(lifecycleService, times(2)).addSecurityIndexHealthChangeListener(isA(BiConsumer.class)); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealmTests.java b/plugin/src/test/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealmTests.java new file mode 100644 index 00000000000..eb6b3de2f51 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealmTests.java @@ -0,0 +1,65 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.security.authc.esnative; + +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.health.ClusterIndexHealth; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.security.authc.RealmConfig; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.xpack.security.test.SecurityTestUtils.getClusterIndexHealth; +import static org.mockito.Mockito.mock; + +public class NativeRealmTests extends ESTestCase { + + public void testCacheClearOnIndexHealthChange() { + final AtomicInteger numInvalidation = new AtomicInteger(0); + int expectedInvalidation = 0; + Settings settings = Settings.builder().put("path.home", createTempDir()).build(); + RealmConfig config = new RealmConfig("native", Settings.EMPTY, settings, new ThreadContext(settings)); + final NativeRealm nativeRealm = new NativeRealm(config, mock(NativeUsersStore.class)) { + @Override + void clearCache() { + numInvalidation.incrementAndGet(); + } + }; + + // existing to no longer present + ClusterIndexHealth previousHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW)); + ClusterIndexHealth currentHealth = null; + nativeRealm.onSecurityIndexHealthChange(previousHealth, currentHealth); + assertEquals(++expectedInvalidation, numInvalidation.get()); + + // doesn't exist to exists + previousHealth = null; + currentHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW)); + nativeRealm.onSecurityIndexHealthChange(previousHealth, currentHealth); + assertEquals(++expectedInvalidation, numInvalidation.get()); + + // green or yellow to red + previousHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW)); + currentHealth = getClusterIndexHealth(ClusterHealthStatus.RED); + nativeRealm.onSecurityIndexHealthChange(previousHealth, currentHealth); + assertEquals(expectedInvalidation, numInvalidation.get()); + + // red to non red + previousHealth = getClusterIndexHealth(ClusterHealthStatus.RED); + currentHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW)); + nativeRealm.onSecurityIndexHealthChange(previousHealth, currentHealth); + assertEquals(++expectedInvalidation, numInvalidation.get()); + + // green to yellow or yellow to green + previousHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW)); + currentHealth = getClusterIndexHealth( + previousHealth.getStatus() == ClusterHealthStatus.GREEN ? ClusterHealthStatus.YELLOW : ClusterHealthStatus.GREEN); + nativeRealm.onSecurityIndexHealthChange(previousHealth, currentHealth); + assertEquals(expectedInvalidation, numInvalidation.get()); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStoreTests.java b/plugin/src/test/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStoreTests.java index 7424db98d03..cafe3fc5269 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStoreTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStoreTests.java @@ -8,6 +8,8 @@ package org.elasticsearch.xpack.security.authz.store; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.health.ClusterIndexHealth; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.settings.Settings; @@ -31,11 +33,13 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Function; import static org.elasticsearch.mock.orig.Mockito.times; import static org.elasticsearch.mock.orig.Mockito.verifyNoMoreInteractions; +import static org.elasticsearch.xpack.security.test.SecurityTestUtils.getClusterIndexHealth; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.any; @@ -209,6 +213,9 @@ public class CompositeRolesStoreTests extends ESTestCase { verify(reservedRolesStore, times(2)).roleDescriptors(); } verifyNoMoreInteractions(fileRolesStore, reservedRolesStore, nativeRolesStore); + + // force a cache clear + } public void testCustomRolesProviders() { @@ -430,6 +437,51 @@ public class CompositeRolesStoreTests extends ESTestCase { assertEquals(0, role.indices().groups().length); } + public void testCacheClearOnIndexHealthChange() { + final AtomicInteger numInvalidation = new AtomicInteger(0); + + CompositeRolesStore compositeRolesStore = new CompositeRolesStore( + Settings.EMPTY, mock(FileRolesStore.class), mock(NativeRolesStore.class), mock(ReservedRolesStore.class), + Collections.emptyList(), new ThreadContext(Settings.EMPTY), new XPackLicenseState()) { + @Override + public void invalidateAll() { + numInvalidation.incrementAndGet(); + } + }; + + int expectedInvalidation = 0; + // existing to no longer present + ClusterIndexHealth previousHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW)); + ClusterIndexHealth currentHealth = null; + compositeRolesStore.onSecurityIndexHealthChange(previousHealth, currentHealth); + assertEquals(++expectedInvalidation, numInvalidation.get()); + + // doesn't exist to exists + previousHealth = null; + currentHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW)); + compositeRolesStore.onSecurityIndexHealthChange(previousHealth, currentHealth); + assertEquals(++expectedInvalidation, numInvalidation.get()); + + // green or yellow to red + previousHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW)); + currentHealth = getClusterIndexHealth(ClusterHealthStatus.RED); + compositeRolesStore.onSecurityIndexHealthChange(previousHealth, currentHealth); + assertEquals(expectedInvalidation, numInvalidation.get()); + + // red to non red + previousHealth = getClusterIndexHealth(ClusterHealthStatus.RED); + currentHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW)); + compositeRolesStore.onSecurityIndexHealthChange(previousHealth, currentHealth); + assertEquals(++expectedInvalidation, numInvalidation.get()); + + // green to yellow or yellow to green + previousHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW)); + currentHealth = getClusterIndexHealth( + previousHealth.getStatus() == ClusterHealthStatus.GREEN ? ClusterHealthStatus.YELLOW : ClusterHealthStatus.GREEN); + compositeRolesStore.onSecurityIndexHealthChange(previousHealth, currentHealth); + assertEquals(expectedInvalidation, numInvalidation.get()); + } + private static class InMemoryRolesProvider implements BiConsumer, ActionListener>> { private final Function, Set> roleDescriptorsFunc; diff --git a/plugin/src/test/java/org/elasticsearch/xpack/security/support/IndexLifecycleManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/security/support/IndexLifecycleManagerTests.java index 18f2b27b986..2ebfa208411 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/security/support/IndexLifecycleManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/security/support/IndexLifecycleManagerTests.java @@ -10,7 +10,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import org.elasticsearch.Version; import org.elasticsearch.action.Action; @@ -28,15 +31,25 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.health.ClusterIndexHealth; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.security.InternalClient; @@ -46,11 +59,11 @@ import org.hamcrest.Matchers; import org.junit.Before; import org.mockito.Mockito; +import static org.elasticsearch.cluster.routing.RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE; import static org.elasticsearch.xpack.security.support.IndexLifecycleManager.NULL_MIGRATOR; import static org.elasticsearch.xpack.security.support.IndexLifecycleManager.TEMPLATE_VERSION_PATTERN; import static org.hamcrest.Matchers.notNullValue; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.notNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -108,6 +121,17 @@ public class IndexLifecycleManagerTests extends ESTestCase { assertInitialState(); final ClusterState.Builder clusterStateBuilder = createClusterState(INDEX_NAME, TEMPLATE_NAME); + Index index = new Index(INDEX_NAME, UUID.randomUUID().toString()); + ShardRouting shardRouting = ShardRouting.newUnassigned(new ShardId(index, 0), true, EXISTING_STORE_INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")); + String nodeId = ESTestCase.randomAlphaOfLength(8); + IndexShardRoutingTable table = new IndexShardRoutingTable.Builder(new ShardId(index, 0)) + .addShard(shardRouting.initialize(nodeId, null, shardRouting.getExpectedShardSize()) + .moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, ""))) + .build(); + clusterStateBuilder.routingTable(RoutingTable.builder() + .add(IndexRoutingTable.builder(index).addIndexShard(table).build()) + .build()); manager.clusterChanged(event(clusterStateBuilder)); assertIndexUpToDateButNotAvailable(); @@ -220,6 +244,87 @@ public class IndexLifecycleManagerTests extends ESTestCase { assertCompleteState(true); } + public void testIndexHealthChangeListeners() throws Exception { + final AtomicBoolean listenerCalled = new AtomicBoolean(false); + final AtomicReference previousHealth = new AtomicReference<>(); + final AtomicReference currentHealth = new AtomicReference<>(); + final BiConsumer listener = (prevState, state) -> { + previousHealth.set(prevState); + currentHealth.set(state); + listenerCalled.set(true); + }; + + if (randomBoolean()) { + if (randomBoolean()) { + manager.addIndexHealthChangeListener(listener); + manager.addIndexHealthChangeListener((prevState, state) -> { + throw new RuntimeException("throw after listener"); + }); + } else { + manager.addIndexHealthChangeListener((prevState, state) -> { + throw new RuntimeException("throw before listener"); + }); + manager.addIndexHealthChangeListener(listener); + } + } else { + manager.addIndexHealthChangeListener(listener); + } + + // index doesn't exist and now exists + final ClusterState.Builder clusterStateBuilder = createClusterState(INDEX_NAME, TEMPLATE_NAME); + markShardsAvailable(clusterStateBuilder); + manager.clusterChanged(event(clusterStateBuilder)); + + assertTrue(listenerCalled.get()); + assertNull(previousHealth.get()); + assertEquals(ClusterHealthStatus.GREEN, currentHealth.get().getStatus()); + + // reset and call with no change to the index + listenerCalled.set(false); + previousHealth.set(null); + currentHealth.set(null); + ClusterChangedEvent event = new ClusterChangedEvent("same index health", clusterStateBuilder.build(), clusterStateBuilder.build()); + manager.clusterChanged(event); + + assertFalse(listenerCalled.get()); + assertNull(previousHealth.get()); + assertNull(currentHealth.get()); + + // index with different health + listenerCalled.set(false); + previousHealth.set(null); + currentHealth.set(null); + ClusterState previousState = clusterStateBuilder.build(); + Index prevIndex = previousState.getRoutingTable().index(INDEX_NAME).getIndex(); + clusterStateBuilder.routingTable(RoutingTable.builder() + .add(IndexRoutingTable.builder(prevIndex) + .addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(prevIndex, 0)) + .addShard(ShardRouting.newUnassigned(new ShardId(prevIndex, 0), true, EXISTING_STORE_INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")) + .initialize(UUIDs.randomBase64UUID(random()), null, 0L) + .moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, ""))) + .build())) + .build()); + + + + event = new ClusterChangedEvent("different index health", clusterStateBuilder.build(), previousState); + manager.clusterChanged(event); + assertTrue(listenerCalled.get()); + assertEquals(ClusterHealthStatus.GREEN, previousHealth.get().getStatus()); + assertEquals(ClusterHealthStatus.RED, currentHealth.get().getStatus()); + + // swap prev and current + listenerCalled.set(false); + previousHealth.set(null); + currentHealth.set(null); + event = new ClusterChangedEvent("different index health swapped", previousState, clusterStateBuilder.build()); + manager.clusterChanged(event); + assertTrue(listenerCalled.get()); + assertEquals(ClusterHealthStatus.RED, previousHealth.get().getStatus()); + assertEquals(ClusterHealthStatus.GREEN, currentHealth.get().getStatus()); + } + private void assertInitialState() { assertThat(manager.indexExists(), Matchers.equalTo(false)); assertThat(manager.isAvailable(), Matchers.equalTo(false)); @@ -368,5 +473,4 @@ public class IndexLifecycleManagerTests extends ESTestCase { final String resource = "/" + templateName + ".json"; return TemplateUtils.loadTemplate(resource, Version.CURRENT.toString(), TEMPLATE_VERSION_PATTERN); } - } \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/security/test/SecurityTestUtils.java b/plugin/src/test/java/org/elasticsearch/xpack/security/test/SecurityTestUtils.java index 115cde86a7f..0b109874061 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/security/test/SecurityTestUtils.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/security/test/SecurityTestUtils.java @@ -6,16 +6,21 @@ package org.elasticsearch.xpack.security.test; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.health.ClusterIndexHealth; import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; @@ -29,6 +34,7 @@ import java.util.UUID; import static org.elasticsearch.cluster.routing.RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE; import static org.elasticsearch.xpack.security.SecurityLifecycleService.SECURITY_INDEX_NAME; +import static org.junit.Assert.assertEquals; public class SecurityTestUtils { @@ -87,4 +93,59 @@ public class SecurityTestUtils { return metaDataBuilder.build(); } + public static ClusterIndexHealth getClusterIndexHealth(ClusterHealthStatus status) { + IndexMetaData metaData = IndexMetaData.builder("foo").settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .build()) + .build(); + final IndexRoutingTable routingTable; + switch (status) { + case RED: + routingTable = IndexRoutingTable.builder(metaData.getIndex()) + .addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(metaData.getIndex(), 0)) + .addShard(ShardRouting.newUnassigned(new ShardId(metaData.getIndex(), 0), true, EXISTING_STORE_INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")) + .initialize(ESTestCase.randomAlphaOfLength(8), null, 0L)) + .addShard(ShardRouting.newUnassigned(new ShardId(metaData.getIndex(), 0), false, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")) + .initialize(ESTestCase.randomAlphaOfLength(8), null, 0L)) + .build()) + .build(); + break; + case YELLOW: + routingTable = IndexRoutingTable.builder(metaData.getIndex()) + .addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(metaData.getIndex(), 0)) + .addShard(ShardRouting.newUnassigned(new ShardId(metaData.getIndex(), 0), true, EXISTING_STORE_INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")) + .initialize(ESTestCase.randomAlphaOfLength(8), null, 0L).moveToStarted()) + .addShard(ShardRouting.newUnassigned(new ShardId(metaData.getIndex(), 0), false, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")) + .initialize(ESTestCase.randomAlphaOfLength(8), null, 0L)) + .build()) + .build(); + break; + case GREEN: + routingTable = IndexRoutingTable.builder(metaData.getIndex()) + .addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(metaData.getIndex(), 0)) + .addShard(ShardRouting.newUnassigned(new ShardId(metaData.getIndex(), 0), true, EXISTING_STORE_INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")) + .initialize(ESTestCase.randomAlphaOfLength(8), null, 0L).moveToStarted()) + .addShard(ShardRouting.newUnassigned(new ShardId(metaData.getIndex(), 0), false, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")) + .initialize(ESTestCase.randomAlphaOfLength(8), null, 0L).moveToStarted()) + .build()) + .build(); + break; + default: + throw new IllegalStateException("unknown status: " + status); + } + ClusterIndexHealth health = new ClusterIndexHealth(metaData, routingTable); + assertEquals(status, health.getStatus()); + return health; + } }