Clear security caches on security index health changes (elastic/x-pack-elasticsearch#1957)

This change clears the caches in the native realm and the composite roles store when there is a
a change in the health of the security index that necessitates this. When the security index goes
to a red state, the caches are left in tact as this allows for management operations to be
performed for a limited amount of time. When the index transitions out of the red state or exists
when it didn't exist before, the caches will be cleared so that we remove any stale values.

relates elastic/x-pack-elasticsearch#1789

Original commit: elastic/x-pack-elasticsearch@914959ea6b
This commit is contained in:
Jay Modi 2017-07-14 09:28:28 -06:00 committed by GitHub
parent 8ab167cccb
commit 6b4468ea5c
12 changed files with 433 additions and 14 deletions

View File

@ -323,8 +323,8 @@ public class Security implements ActionPlugin, IngestPlugin, NetworkPlugin {
final ReservedRealm reservedRealm = new ReservedRealm(env, settings, nativeUsersStore,
anonymousUser, securityLifecycleService, threadPool.getThreadContext());
Map<String, Realm.Factory> realmFactories = new HashMap<>();
realmFactories.putAll(InternalRealms.getFactories(threadPool, resourceWatcherService,
sslService, nativeUsersStore, nativeRoleMappingStore));
realmFactories.putAll(InternalRealms.getFactories(threadPool, resourceWatcherService, sslService, nativeUsersStore,
nativeRoleMappingStore, securityLifecycleService));
for (XPackExtension extension : extensions) {
Map<String, Realm.Factory> newRealms = extension.getRealms(resourceWatcherService);
for (Map.Entry<String, Realm.Factory> entry : newRealms.entrySet()) {
@ -369,6 +369,7 @@ public class Security implements ActionPlugin, IngestPlugin, NetworkPlugin {
}
final CompositeRolesStore allRolesStore = new CompositeRolesStore(settings, fileRolesStore, nativeRolesStore,
reservedRolesStore, rolesProviders, threadPool.getThreadContext(), licenseState);
securityLifecycleService.addSecurityIndexHealthChangeListener(allRolesStore::onSecurityIndexHealthChange);
// to keep things simple, just invalidate all cached entries on license change. this happens so rarely that the impact should be
// minimal
licenseState.addListener(allRolesStore::invalidateAll);

View File

@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.health.ClusterIndexHealth;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.LifecycleListener;
@ -26,6 +27,7 @@ import org.elasticsearch.xpack.security.support.IndexLifecycleManager;
import java.util.Collections;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
/**
@ -147,6 +149,15 @@ public class SecurityLifecycleService extends AbstractComponent implements Clust
return securityIndex.checkMappingVersion(requiredVersion);
}
/**
* Adds a listener which will be notified when the security index health changes. The previous and
* current health will be provided to the listener so that the listener can determine if any action
* needs to be taken.
*/
public void addSecurityIndexHealthChangeListener(BiConsumer<ClusterIndexHealth, ClusterIndexHealth> listener) {
securityIndex.addIndexHealthChangeListener(listener);
}
// this is called in a lifecycle listener beforeStop on the cluster service
private void close() {
if (indexAuditTrail != null) {

View File

@ -19,6 +19,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.security.SecurityLifecycleService;
import org.elasticsearch.xpack.security.authc.esnative.NativeRealm;
import org.elasticsearch.xpack.security.authc.esnative.NativeUsersStore;
import org.elasticsearch.xpack.security.authc.esnative.ReservedRealm;
@ -62,14 +63,18 @@ public class InternalRealms {
* This excludes the {@link ReservedRealm}, as it cannot be created dynamically.
* @return A map from <em>realm-type</em> to <code>Factory</code>
*/
public static Map<String, Realm.Factory> getFactories(
ThreadPool threadPool, ResourceWatcherService resourceWatcherService,
SSLService sslService, NativeUsersStore nativeUsersStore,
NativeRoleMappingStore nativeRoleMappingStore) {
public static Map<String, Realm.Factory> getFactories(ThreadPool threadPool, ResourceWatcherService resourceWatcherService,
SSLService sslService, NativeUsersStore nativeUsersStore,
NativeRoleMappingStore nativeRoleMappingStore,
SecurityLifecycleService securityLifecycleService) {
Map<String, Realm.Factory> map = new HashMap<>();
map.put(FileRealm.TYPE, config -> new FileRealm(config, resourceWatcherService));
map.put(NativeRealm.TYPE, config -> new NativeRealm(config, nativeUsersStore));
map.put(NativeRealm.TYPE, config -> {
final NativeRealm nativeRealm = new NativeRealm(config, nativeUsersStore);
securityLifecycleService.addSecurityIndexHealthChangeListener(nativeRealm::onSecurityIndexHealthChange);
return nativeRealm;
});
map.put(LdapRealm.AD_TYPE, config -> new LdapRealm(LdapRealm.AD_TYPE, config, sslService,
resourceWatcherService, nativeRoleMappingStore, threadPool));
map.put(LdapRealm.LDAP_TYPE, config -> new LdapRealm(LdapRealm.LDAP_TYPE, config,

View File

@ -6,6 +6,8 @@
package org.elasticsearch.xpack.security.authc.esnative;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterIndexHealth;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.xpack.security.authc.AuthenticationResult;
import org.elasticsearch.xpack.security.authc.RealmConfig;
@ -39,6 +41,21 @@ public class NativeRealm extends CachingUsernamePasswordRealm {
userStore.verifyPassword(token.principal(), token.credentials(), listener);
}
public void onSecurityIndexHealthChange(ClusterIndexHealth previousHealth, ClusterIndexHealth currentHealth) {
final boolean movedFromRedToNonRed = (previousHealth == null || previousHealth.getStatus() == ClusterHealthStatus.RED)
&& currentHealth != null && currentHealth.getStatus() != ClusterHealthStatus.RED;
final boolean indexDeleted = previousHealth != null && currentHealth == null;
if (movedFromRedToNonRed || indexDeleted) {
clearCache();
}
}
// method is used for testing to verify cache expiration since expireAll is final
void clearCache() {
expireAll();
}
/**
* @return The {@link Setting setting configuration} for this realm type
*/

View File

@ -6,6 +6,8 @@
package org.elasticsearch.xpack.security.authz.store;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterIndexHealth;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.cache.Cache;
@ -83,8 +85,7 @@ public class CompositeRolesStore extends AbstractComponent {
public CompositeRolesStore(Settings settings, FileRolesStore fileRolesStore, NativeRolesStore nativeRolesStore,
ReservedRolesStore reservedRolesStore,
List<BiConsumer<Set<String>, ActionListener<Set<RoleDescriptor>>>> rolesProviders,
ThreadContext threadContext,
XPackLicenseState licenseState) {
ThreadContext threadContext, XPackLicenseState licenseState) {
super(settings);
this.fileRolesStore = fileRolesStore;
// invalidating all on a file based role update is heavy handed to say the least, but in general this should be infrequent so the
@ -289,6 +290,16 @@ public class CompositeRolesStore extends AbstractComponent {
}, listener::onFailure));
}
public void onSecurityIndexHealthChange(ClusterIndexHealth previousHealth, ClusterIndexHealth currentHealth) {
final boolean movedFromRedToNonRed = (previousHealth == null || previousHealth.getStatus() == ClusterHealthStatus.RED)
&& currentHealth != null && currentHealth.getStatus() != ClusterHealthStatus.RED;
final boolean indexDeleted = previousHealth != null && currentHealth == null;
if (movedFromRedToNonRed || indexDeleted) {
invalidateAll();
}
}
/**
* A mutable class that can be used to represent the combination of one or more {@link IndicesPrivileges}
*/

View File

@ -5,15 +5,16 @@
*/
package org.elasticsearch.xpack.security.support;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -34,6 +35,7 @@ import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateReque
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterIndexHealth;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
@ -82,6 +84,8 @@ public class IndexLifecycleManager extends AbstractComponent {
private final AtomicReference<UpgradeState> migrateDataState = new AtomicReference<>(UpgradeState.NOT_STARTED);
private final AtomicInteger migrateDataAttempts = new AtomicInteger(0);
private final List<BiConsumer<ClusterIndexHealth, ClusterIndexHealth>> indexHealthChangeListeners = new CopyOnWriteArrayList<>();
private volatile boolean templateIsUpToDate;
private volatile boolean indexExists;
private volatile boolean isIndexUpToDate;
@ -155,9 +159,18 @@ public class IndexLifecycleManager extends AbstractComponent {
return this.migrateDataState.get();
}
/**
* Adds a listener which will be notified when the security index health changes. The previous and
* current health will be provided to the listener so that the listener can determine if any action
* needs to be taken.
*/
public void addIndexHealthChangeListener(BiConsumer<ClusterIndexHealth, ClusterIndexHealth> listener) {
indexHealthChangeListeners.add(listener);
}
public void clusterChanged(ClusterChangedEvent event) {
final ClusterState state = event.state();
processClusterState(state);
processClusterState(event.state());
checkIndexHealthChange(event);
}
private void processClusterState(ClusterState state) {
@ -183,6 +196,37 @@ public class IndexLifecycleManager extends AbstractComponent {
}
}
private void checkIndexHealthChange(ClusterChangedEvent event) {
final ClusterState state = event.state();
final ClusterState previousState = event.previousState();
final IndexMetaData indexMetaData = resolveConcreteIndex(indexName, state.metaData());
final IndexMetaData previousIndexMetaData = resolveConcreteIndex(indexName, previousState.metaData());
if (indexMetaData != null) {
final ClusterIndexHealth currentHealth =
new ClusterIndexHealth(indexMetaData, state.getRoutingTable().index(indexMetaData.getIndex()));
final ClusterIndexHealth previousHealth = previousIndexMetaData != null ? new ClusterIndexHealth(previousIndexMetaData,
previousState.getRoutingTable().index(previousIndexMetaData.getIndex())) : null;
if (previousHealth == null || previousHealth.getStatus() != currentHealth.getStatus()) {
notifyIndexHealthChangeListeners(previousHealth, currentHealth);
}
} else if (previousIndexMetaData != null) {
final ClusterIndexHealth previousHealth =
new ClusterIndexHealth(previousIndexMetaData, previousState.getRoutingTable().index(previousIndexMetaData.getIndex()));
notifyIndexHealthChangeListeners(previousHealth, null);
}
}
private void notifyIndexHealthChangeListeners(ClusterIndexHealth previousHealth, ClusterIndexHealth currentHealth) {
for (BiConsumer<ClusterIndexHealth, ClusterIndexHealth> consumer : indexHealthChangeListeners) {
try {
consumer.accept(previousHealth, currentHealth);
} catch (Exception e) {
logger.warn(new ParameterizedMessage("failed to notify listener [{}] of index health change", consumer), e);
}
}
}
private boolean checkIndexAvailable(ClusterState state) {
final IndexRoutingTable routingTable = getIndexRoutingTable(state);
if (routingTable != null && routingTable.allPrimaryShardsActive()) {

View File

@ -395,6 +395,7 @@ public class SecurityLifecycleServiceTests extends ESTestCase {
clusterStateBuilder.metaData(metaDataBuilder);
}
clusterStateBuilder.routingTable(SecurityTestUtils.buildIndexRoutingTable(securityIndexName));
return clusterStateBuilder;
}

View File

@ -0,0 +1,47 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.security.authc;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.security.SecurityLifecycleService;
import org.elasticsearch.xpack.security.authc.esnative.NativeRealm;
import org.elasticsearch.xpack.security.authc.esnative.NativeUsersStore;
import org.elasticsearch.xpack.security.authc.support.mapper.NativeRoleMappingStore;
import org.elasticsearch.xpack.ssl.SSLService;
import java.util.Map;
import java.util.function.BiConsumer;
import static org.elasticsearch.mock.orig.Mockito.times;
import static org.hamcrest.Matchers.any;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
public class InternalRealmsTests extends ESTestCase {
public void testNativeRealmRegistersIndexHealthChangeListener() throws Exception {
SecurityLifecycleService lifecycleService = mock(SecurityLifecycleService.class);
Map<String, Realm.Factory> factories = InternalRealms.getFactories(mock(ThreadPool.class), mock(ResourceWatcherService.class),
mock(SSLService.class), mock(NativeUsersStore.class), mock(NativeRoleMappingStore.class), lifecycleService);
assertThat(factories, hasEntry(is(NativeRealm.TYPE), any(Realm.Factory.class)));
verifyZeroInteractions(lifecycleService);
Settings settings = Settings.builder().put("path.home", createTempDir()).build();
factories.get(NativeRealm.TYPE).create(new RealmConfig("test", Settings.EMPTY, settings, new ThreadContext(settings)));
verify(lifecycleService).addSecurityIndexHealthChangeListener(isA(BiConsumer.class));
factories.get(NativeRealm.TYPE).create(new RealmConfig("test", Settings.EMPTY, settings, new ThreadContext(settings)));
verify(lifecycleService, times(2)).addSecurityIndexHealthChangeListener(isA(BiConsumer.class));
}
}

View File

@ -0,0 +1,65 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.security.authc.esnative;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterIndexHealth;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.security.authc.RealmConfig;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.xpack.security.test.SecurityTestUtils.getClusterIndexHealth;
import static org.mockito.Mockito.mock;
public class NativeRealmTests extends ESTestCase {
public void testCacheClearOnIndexHealthChange() {
final AtomicInteger numInvalidation = new AtomicInteger(0);
int expectedInvalidation = 0;
Settings settings = Settings.builder().put("path.home", createTempDir()).build();
RealmConfig config = new RealmConfig("native", Settings.EMPTY, settings, new ThreadContext(settings));
final NativeRealm nativeRealm = new NativeRealm(config, mock(NativeUsersStore.class)) {
@Override
void clearCache() {
numInvalidation.incrementAndGet();
}
};
// existing to no longer present
ClusterIndexHealth previousHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW));
ClusterIndexHealth currentHealth = null;
nativeRealm.onSecurityIndexHealthChange(previousHealth, currentHealth);
assertEquals(++expectedInvalidation, numInvalidation.get());
// doesn't exist to exists
previousHealth = null;
currentHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW));
nativeRealm.onSecurityIndexHealthChange(previousHealth, currentHealth);
assertEquals(++expectedInvalidation, numInvalidation.get());
// green or yellow to red
previousHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW));
currentHealth = getClusterIndexHealth(ClusterHealthStatus.RED);
nativeRealm.onSecurityIndexHealthChange(previousHealth, currentHealth);
assertEquals(expectedInvalidation, numInvalidation.get());
// red to non red
previousHealth = getClusterIndexHealth(ClusterHealthStatus.RED);
currentHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW));
nativeRealm.onSecurityIndexHealthChange(previousHealth, currentHealth);
assertEquals(++expectedInvalidation, numInvalidation.get());
// green to yellow or yellow to green
previousHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW));
currentHealth = getClusterIndexHealth(
previousHealth.getStatus() == ClusterHealthStatus.GREEN ? ClusterHealthStatus.YELLOW : ClusterHealthStatus.GREEN);
nativeRealm.onSecurityIndexHealthChange(previousHealth, currentHealth);
assertEquals(expectedInvalidation, numInvalidation.get());
}
}

View File

@ -8,6 +8,8 @@ package org.elasticsearch.xpack.security.authz.store;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterIndexHealth;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.settings.Settings;
@ -31,11 +33,13 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import static org.elasticsearch.mock.orig.Mockito.times;
import static org.elasticsearch.mock.orig.Mockito.verifyNoMoreInteractions;
import static org.elasticsearch.xpack.security.test.SecurityTestUtils.getClusterIndexHealth;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
@ -209,6 +213,9 @@ public class CompositeRolesStoreTests extends ESTestCase {
verify(reservedRolesStore, times(2)).roleDescriptors();
}
verifyNoMoreInteractions(fileRolesStore, reservedRolesStore, nativeRolesStore);
// force a cache clear
}
public void testCustomRolesProviders() {
@ -430,6 +437,51 @@ public class CompositeRolesStoreTests extends ESTestCase {
assertEquals(0, role.indices().groups().length);
}
public void testCacheClearOnIndexHealthChange() {
final AtomicInteger numInvalidation = new AtomicInteger(0);
CompositeRolesStore compositeRolesStore = new CompositeRolesStore(
Settings.EMPTY, mock(FileRolesStore.class), mock(NativeRolesStore.class), mock(ReservedRolesStore.class),
Collections.emptyList(), new ThreadContext(Settings.EMPTY), new XPackLicenseState()) {
@Override
public void invalidateAll() {
numInvalidation.incrementAndGet();
}
};
int expectedInvalidation = 0;
// existing to no longer present
ClusterIndexHealth previousHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW));
ClusterIndexHealth currentHealth = null;
compositeRolesStore.onSecurityIndexHealthChange(previousHealth, currentHealth);
assertEquals(++expectedInvalidation, numInvalidation.get());
// doesn't exist to exists
previousHealth = null;
currentHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW));
compositeRolesStore.onSecurityIndexHealthChange(previousHealth, currentHealth);
assertEquals(++expectedInvalidation, numInvalidation.get());
// green or yellow to red
previousHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW));
currentHealth = getClusterIndexHealth(ClusterHealthStatus.RED);
compositeRolesStore.onSecurityIndexHealthChange(previousHealth, currentHealth);
assertEquals(expectedInvalidation, numInvalidation.get());
// red to non red
previousHealth = getClusterIndexHealth(ClusterHealthStatus.RED);
currentHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW));
compositeRolesStore.onSecurityIndexHealthChange(previousHealth, currentHealth);
assertEquals(++expectedInvalidation, numInvalidation.get());
// green to yellow or yellow to green
previousHealth = getClusterIndexHealth(randomFrom(ClusterHealthStatus.GREEN, ClusterHealthStatus.YELLOW));
currentHealth = getClusterIndexHealth(
previousHealth.getStatus() == ClusterHealthStatus.GREEN ? ClusterHealthStatus.YELLOW : ClusterHealthStatus.GREEN);
compositeRolesStore.onSecurityIndexHealthChange(previousHealth, currentHealth);
assertEquals(expectedInvalidation, numInvalidation.get());
}
private static class InMemoryRolesProvider implements BiConsumer<Set<String>, ActionListener<Set<RoleDescriptor>>> {
private final Function<Set<String>, Set<RoleDescriptor>> roleDescriptorsFunc;

View File

@ -10,7 +10,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
@ -28,15 +31,25 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterIndexHealth;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.security.InternalClient;
@ -46,11 +59,11 @@ import org.hamcrest.Matchers;
import org.junit.Before;
import org.mockito.Mockito;
import static org.elasticsearch.cluster.routing.RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE;
import static org.elasticsearch.xpack.security.support.IndexLifecycleManager.NULL_MIGRATOR;
import static org.elasticsearch.xpack.security.support.IndexLifecycleManager.TEMPLATE_VERSION_PATTERN;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.notNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -108,6 +121,17 @@ public class IndexLifecycleManagerTests extends ESTestCase {
assertInitialState();
final ClusterState.Builder clusterStateBuilder = createClusterState(INDEX_NAME, TEMPLATE_NAME);
Index index = new Index(INDEX_NAME, UUID.randomUUID().toString());
ShardRouting shardRouting = ShardRouting.newUnassigned(new ShardId(index, 0), true, EXISTING_STORE_INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""));
String nodeId = ESTestCase.randomAlphaOfLength(8);
IndexShardRoutingTable table = new IndexShardRoutingTable.Builder(new ShardId(index, 0))
.addShard(shardRouting.initialize(nodeId, null, shardRouting.getExpectedShardSize())
.moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "")))
.build();
clusterStateBuilder.routingTable(RoutingTable.builder()
.add(IndexRoutingTable.builder(index).addIndexShard(table).build())
.build());
manager.clusterChanged(event(clusterStateBuilder));
assertIndexUpToDateButNotAvailable();
@ -220,6 +244,87 @@ public class IndexLifecycleManagerTests extends ESTestCase {
assertCompleteState(true);
}
public void testIndexHealthChangeListeners() throws Exception {
final AtomicBoolean listenerCalled = new AtomicBoolean(false);
final AtomicReference<ClusterIndexHealth> previousHealth = new AtomicReference<>();
final AtomicReference<ClusterIndexHealth> currentHealth = new AtomicReference<>();
final BiConsumer<ClusterIndexHealth, ClusterIndexHealth> listener = (prevState, state) -> {
previousHealth.set(prevState);
currentHealth.set(state);
listenerCalled.set(true);
};
if (randomBoolean()) {
if (randomBoolean()) {
manager.addIndexHealthChangeListener(listener);
manager.addIndexHealthChangeListener((prevState, state) -> {
throw new RuntimeException("throw after listener");
});
} else {
manager.addIndexHealthChangeListener((prevState, state) -> {
throw new RuntimeException("throw before listener");
});
manager.addIndexHealthChangeListener(listener);
}
} else {
manager.addIndexHealthChangeListener(listener);
}
// index doesn't exist and now exists
final ClusterState.Builder clusterStateBuilder = createClusterState(INDEX_NAME, TEMPLATE_NAME);
markShardsAvailable(clusterStateBuilder);
manager.clusterChanged(event(clusterStateBuilder));
assertTrue(listenerCalled.get());
assertNull(previousHealth.get());
assertEquals(ClusterHealthStatus.GREEN, currentHealth.get().getStatus());
// reset and call with no change to the index
listenerCalled.set(false);
previousHealth.set(null);
currentHealth.set(null);
ClusterChangedEvent event = new ClusterChangedEvent("same index health", clusterStateBuilder.build(), clusterStateBuilder.build());
manager.clusterChanged(event);
assertFalse(listenerCalled.get());
assertNull(previousHealth.get());
assertNull(currentHealth.get());
// index with different health
listenerCalled.set(false);
previousHealth.set(null);
currentHealth.set(null);
ClusterState previousState = clusterStateBuilder.build();
Index prevIndex = previousState.getRoutingTable().index(INDEX_NAME).getIndex();
clusterStateBuilder.routingTable(RoutingTable.builder()
.add(IndexRoutingTable.builder(prevIndex)
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(prevIndex, 0))
.addShard(ShardRouting.newUnassigned(new ShardId(prevIndex, 0), true, EXISTING_STORE_INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""))
.initialize(UUIDs.randomBase64UUID(random()), null, 0L)
.moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "")))
.build()))
.build());
event = new ClusterChangedEvent("different index health", clusterStateBuilder.build(), previousState);
manager.clusterChanged(event);
assertTrue(listenerCalled.get());
assertEquals(ClusterHealthStatus.GREEN, previousHealth.get().getStatus());
assertEquals(ClusterHealthStatus.RED, currentHealth.get().getStatus());
// swap prev and current
listenerCalled.set(false);
previousHealth.set(null);
currentHealth.set(null);
event = new ClusterChangedEvent("different index health swapped", previousState, clusterStateBuilder.build());
manager.clusterChanged(event);
assertTrue(listenerCalled.get());
assertEquals(ClusterHealthStatus.RED, previousHealth.get().getStatus());
assertEquals(ClusterHealthStatus.GREEN, currentHealth.get().getStatus());
}
private void assertInitialState() {
assertThat(manager.indexExists(), Matchers.equalTo(false));
assertThat(manager.isAvailable(), Matchers.equalTo(false));
@ -368,5 +473,4 @@ public class IndexLifecycleManagerTests extends ESTestCase {
final String resource = "/" + templateName + ".json";
return TemplateUtils.loadTemplate(resource, Version.CURRENT.toString(), TEMPLATE_VERSION_PATTERN);
}
}

View File

@ -6,16 +6,21 @@
package org.elasticsearch.xpack.security.test;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterIndexHealth;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
@ -29,6 +34,7 @@ import java.util.UUID;
import static org.elasticsearch.cluster.routing.RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE;
import static org.elasticsearch.xpack.security.SecurityLifecycleService.SECURITY_INDEX_NAME;
import static org.junit.Assert.assertEquals;
public class SecurityTestUtils {
@ -87,4 +93,59 @@ public class SecurityTestUtils {
return metaDataBuilder.build();
}
public static ClusterIndexHealth getClusterIndexHealth(ClusterHealthStatus status) {
IndexMetaData metaData = IndexMetaData.builder("foo").settings(Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.build())
.build();
final IndexRoutingTable routingTable;
switch (status) {
case RED:
routingTable = IndexRoutingTable.builder(metaData.getIndex())
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(metaData.getIndex(), 0))
.addShard(ShardRouting.newUnassigned(new ShardId(metaData.getIndex(), 0), true, EXISTING_STORE_INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""))
.initialize(ESTestCase.randomAlphaOfLength(8), null, 0L))
.addShard(ShardRouting.newUnassigned(new ShardId(metaData.getIndex(), 0), false,
RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""))
.initialize(ESTestCase.randomAlphaOfLength(8), null, 0L))
.build())
.build();
break;
case YELLOW:
routingTable = IndexRoutingTable.builder(metaData.getIndex())
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(metaData.getIndex(), 0))
.addShard(ShardRouting.newUnassigned(new ShardId(metaData.getIndex(), 0), true, EXISTING_STORE_INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""))
.initialize(ESTestCase.randomAlphaOfLength(8), null, 0L).moveToStarted())
.addShard(ShardRouting.newUnassigned(new ShardId(metaData.getIndex(), 0), false,
RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""))
.initialize(ESTestCase.randomAlphaOfLength(8), null, 0L))
.build())
.build();
break;
case GREEN:
routingTable = IndexRoutingTable.builder(metaData.getIndex())
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(metaData.getIndex(), 0))
.addShard(ShardRouting.newUnassigned(new ShardId(metaData.getIndex(), 0), true, EXISTING_STORE_INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""))
.initialize(ESTestCase.randomAlphaOfLength(8), null, 0L).moveToStarted())
.addShard(ShardRouting.newUnassigned(new ShardId(metaData.getIndex(), 0), false,
RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""))
.initialize(ESTestCase.randomAlphaOfLength(8), null, 0L).moveToStarted())
.build())
.build();
break;
default:
throw new IllegalStateException("unknown status: " + status);
}
ClusterIndexHealth health = new ClusterIndexHealth(metaData, routingTable);
assertEquals(status, health.getStatus());
return health;
}
}