From 4ae1ca5fa5129390b00ea405eb852e5a13d90f52 Mon Sep 17 00:00:00 2001 From: Jay Modi Date: Tue, 21 Nov 2017 10:17:08 -0700 Subject: [PATCH] Security: IndexLifecycleManager provides a consistent view of index state (elastic/x-pack-elasticsearch#3008) This commit changes the IndexLifecycleManager's handling of variables about an index to only update all of the values at a single time. Previously, all of the index state variables were volatile members of the IndexLifecycleManager, which meant we could get an inconsistent view of the index state. Although rare, this is still incorrect so this change adds a single volatile variable that holds the state as of the last processed cluster state update. Additionally, the IndexLifecycleManagerIntegTests were updated to have more concurrency and further stress this portion of the code and its checks. relates elastic/x-pack-elasticsearch#2973 Original commit: elastic/x-pack-elasticsearch@5f1552b298ad25ccc9fe39da3b110bada6064c3e --- .../security/SecurityLifecycleService.java | 7 +- .../support/IndexLifecycleManager.java | 77 ++++++++++++------- .../IndexLifecycleManagerIntegTests.java | 55 ++++++++++--- .../support/IndexLifecycleManagerTests.java | 36 ++++++--- 4 files changed, 123 insertions(+), 52 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityLifecycleService.java b/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityLifecycleService.java index 73c81afd125..d700c92cdf9 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityLifecycleService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityLifecycleService.java @@ -154,7 +154,7 @@ public class SecurityLifecycleService extends AbstractComponent implements Clust * current value will be provided to the listener so that the listener can determine if any action * needs to be taken. */ - public void addSecurityIndexOutOfDateListener(BiConsumer listener) { + void addSecurityIndexOutOfDateListener(BiConsumer listener) { securityIndex.addIndexOutOfDateListener(listener); } @@ -206,9 +206,10 @@ public class SecurityLifecycleService extends AbstractComponent implements Clust } /** - * Checks if the security index is out of date with the current version. + * Checks if the security index is out of date with the current version. If the index does not exist + * we treat the index as up to date as we expect it to be created with the current format. */ public boolean isSecurityIndexOutOfDate() { - return securityIndex.indexExists() && !securityIndex.isIndexUpToDate(); + return securityIndex.isIndexUpToDate() == false; } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/support/IndexLifecycleManager.java b/plugin/src/main/java/org/elasticsearch/xpack/security/support/IndexLifecycleManager.java index 264aa7228dc..11992ace209 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/support/IndexLifecycleManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/support/IndexLifecycleManager.java @@ -62,13 +62,7 @@ public class IndexLifecycleManager extends AbstractComponent { private final List> indexHealthChangeListeners = new CopyOnWriteArrayList<>(); private final List> indexOutOfDateListeners = new CopyOnWriteArrayList<>(); - private volatile boolean templateIsUpToDate; - private volatile boolean indexExists; - private volatile boolean isIndexUpToDate; - private volatile boolean indexAvailable; - private volatile boolean canWriteToIndex; - private volatile boolean mappingIsUpToDate; - private volatile Version mappingVersion; + private volatile State indexState = new State(false, false, false, false, null); public IndexLifecycleManager(Settings settings, InternalSecurityClient client, String indexName, String templateName) { super(settings); @@ -78,23 +72,29 @@ public class IndexLifecycleManager extends AbstractComponent { } public boolean checkMappingVersion(Predicate requiredVersion) { - return this.mappingVersion == null || requiredVersion.test(this.mappingVersion); + // pull value into local variable for consistent view + final State currentIndexState = this.indexState; + return currentIndexState.mappingVersion == null || requiredVersion.test(currentIndexState.mappingVersion); } public boolean indexExists() { - return indexExists; + return this.indexState.indexExists; } + /** + * Returns whether the index is on the current format if it exists. If the index does not exist + * we treat the index as up to date as we expect it to be created with the current format. + */ public boolean isIndexUpToDate() { - return isIndexUpToDate; + return this.indexState.isIndexUpToDate; } public boolean isAvailable() { - return indexAvailable; + return this.indexState.indexAvailable; } public boolean isWritable() { - return canWriteToIndex; + return this.indexState.canWriteToIndex; } /** @@ -116,26 +116,27 @@ public class IndexLifecycleManager extends AbstractComponent { } public void clusterChanged(ClusterChangedEvent event) { - final boolean previousUpToDate = this.isIndexUpToDate; + final boolean previousUpToDate = this.indexState.isIndexUpToDate; processClusterState(event.state()); checkIndexHealthChange(event); - if (previousUpToDate != this.isIndexUpToDate) { - notifyIndexOutOfDateListeners(previousUpToDate, this.isIndexUpToDate); + if (previousUpToDate != this.indexState.isIndexUpToDate) { + notifyIndexOutOfDateListeners(previousUpToDate, this.indexState.isIndexUpToDate); } } - private void processClusterState(ClusterState state) { - assert state != null; - final IndexMetaData securityIndex = resolveConcreteIndex(indexName, state.metaData()); - this.indexExists = securityIndex != null; - this.isIndexUpToDate = (securityIndex != null - && INDEX_FORMAT_SETTING.get(securityIndex.getSettings()).intValue() == INTERNAL_INDEX_FORMAT); - this.indexAvailable = checkIndexAvailable(state); - this.templateIsUpToDate = TemplateUtils.checkTemplateExistsAndIsUpToDate(templateName, - SECURITY_VERSION_STRING, state, logger); - this.mappingIsUpToDate = checkIndexMappingUpToDate(state); - this.canWriteToIndex = templateIsUpToDate && (mappingIsUpToDate || isIndexUpToDate); - this.mappingVersion = oldestIndexMappingVersion(state); + private void processClusterState(ClusterState clusterState) { + assert clusterState != null; + final IndexMetaData securityIndex = resolveConcreteIndex(indexName, clusterState.metaData()); + final boolean indexExists = securityIndex != null; + final boolean isIndexUpToDate = indexExists == false || + INDEX_FORMAT_SETTING.get(securityIndex.getSettings()).intValue() == INTERNAL_INDEX_FORMAT; + final boolean indexAvailable = checkIndexAvailable(clusterState); + final boolean templateIsUpToDate = TemplateUtils.checkTemplateExistsAndIsUpToDate(templateName, + SECURITY_VERSION_STRING, clusterState, logger); + final boolean mappingIsUpToDate = checkIndexMappingUpToDate(clusterState); + final boolean canWriteToIndex = templateIsUpToDate && (mappingIsUpToDate || isIndexUpToDate); + final Version mappingVersion = oldestIndexMappingVersion(clusterState); + this.indexState = new State(indexExists, isIndexUpToDate, indexAvailable, canWriteToIndex, mappingVersion); } private void checkIndexHealthChange(ClusterChangedEvent event) { @@ -285,7 +286,7 @@ public class IndexLifecycleManager extends AbstractComponent { * action on the security index. */ public void createIndexIfNeededThenExecute(final ActionListener listener, final Runnable andThen) { - if (indexExists) { + if (this.indexState.indexExists) { andThen.run(); } else { CreateIndexRequest request = new CreateIndexRequest(INTERNAL_SECURITY_INDEX); @@ -314,4 +315,24 @@ public class IndexLifecycleManager extends AbstractComponent { }); } } + + /** + * Holder class so we can update all values at once + */ + private static class State { + private final boolean indexExists; + private final boolean isIndexUpToDate; + private final boolean indexAvailable; + private final boolean canWriteToIndex; + private final Version mappingVersion; + + private State(boolean indexExists, boolean isIndexUpToDate, boolean indexAvailable, + boolean canWriteToIndex, Version mappingVersion) { + this.indexExists = indexExists; + this.isIndexUpToDate = isIndexUpToDate; + this.indexAvailable = indexAvailable; + this.canWriteToIndex = canWriteToIndex; + this.mappingVersion = mappingVersion; + } + } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/security/support/IndexLifecycleManagerIntegTests.java b/plugin/src/test/java/org/elasticsearch/xpack/security/support/IndexLifecycleManagerIntegTests.java index 643e7d20a38..e2e2d7ca1ea 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/security/support/IndexLifecycleManagerIntegTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/security/support/IndexLifecycleManagerIntegTests.java @@ -7,33 +7,68 @@ package org.elasticsearch.xpack.security.support; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.test.SecurityIntegTestCase; import org.elasticsearch.xpack.security.action.user.PutUserRequest; import org.elasticsearch.xpack.security.action.user.PutUserResponse; +import org.hamcrest.Matchers; import org.junit.After; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; public class IndexLifecycleManagerIntegTests extends SecurityIntegTestCase { public void testConcurrentOperationsTryingToCreateSecurityIndexAndAlias() throws Exception { assertSecurityIndexWriteable(); + final int processors = Runtime.getRuntime().availableProcessors(); + final int numThreads = scaledRandomIntBetween((processors + 1) / 2, 4 * processors); final int numRequests = scaledRandomIntBetween(4, 16); - List> futures = new ArrayList<>(numRequests); - List requests = new ArrayList<>(numRequests); - for (int i = 0; i < numRequests; i++) { - requests.add(securityClient() - .preparePutUser("user" + i, "password".toCharArray(), randomAlphaOfLengthBetween(1, 16)) - .request()); + + final List> futures = new CopyOnWriteArrayList<>(); + final List exceptions = new CopyOnWriteArrayList<>(); + final Thread[] threads = new Thread[numThreads]; + final CyclicBarrier barrier = new CyclicBarrier(threads.length); + final AtomicInteger userNumber = new AtomicInteger(0); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + exceptions.add(e); + } + + @Override + protected void doRun() throws Exception { + final List requests = new ArrayList<>(numRequests); + for (int i = 0; i < numRequests; i++) { + requests.add(securityClient() + .preparePutUser("user" + userNumber.getAndIncrement(), "password".toCharArray(), + randomAlphaOfLengthBetween(1, 16)) + .request()); + } + + barrier.await(10L, TimeUnit.SECONDS); + + for (PutUserRequest request : requests) { + PlainActionFuture responsePlainActionFuture = new PlainActionFuture<>(); + securityClient().putUser(request, responsePlainActionFuture); + futures.add(responsePlainActionFuture); + } + } + }, "create_users_thread" + i); + threads[i].start(); } - for (PutUserRequest request : requests) { - PlainActionFuture responsePlainActionFuture = new PlainActionFuture<>(); - securityClient().putUser(request, responsePlainActionFuture); - futures.add(responsePlainActionFuture); + for (Thread thread : threads) { + thread.join(); } + assertThat(exceptions, Matchers.empty()); + assertEquals(futures.size(), numRequests * numThreads); for (ActionFuture future : futures) { assertTrue(future.actionGet().created()); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/security/support/IndexLifecycleManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/security/support/IndexLifecycleManagerTests.java index 7b4de55b10f..64dd4fe08f3 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/security/support/IndexLifecycleManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/security/support/IndexLifecycleManagerTests.java @@ -205,28 +205,38 @@ public class IndexLifecycleManagerTests extends ESTestCase { public void testIndexOutOfDateListeners() throws Exception { final AtomicBoolean listenerCalled = new AtomicBoolean(false); + manager.clusterChanged(event(new ClusterState.Builder(CLUSTER_NAME))); manager.addIndexOutOfDateListener((prev, current) -> { listenerCalled.set(true); assertNotEquals(prev, current); }); - assertFalse(manager.isIndexUpToDate()); + assertTrue(manager.isIndexUpToDate()); manager.clusterChanged(event(new ClusterState.Builder(CLUSTER_NAME))); assertFalse(listenerCalled.get()); - assertFalse(manager.isIndexUpToDate()); + assertTrue(manager.isIndexUpToDate()); - // index doesn't exist and now exists - final ClusterState.Builder clusterStateBuilder = createClusterState(INDEX_NAME, TEMPLATE_NAME); + // index doesn't exist and now exists with wrong format + ClusterState.Builder clusterStateBuilder = createClusterState(INDEX_NAME, TEMPLATE_NAME, + IndexLifecycleManager.INTERNAL_INDEX_FORMAT - 1); markShardsAvailable(clusterStateBuilder); manager.clusterChanged(event(clusterStateBuilder)); assertTrue(listenerCalled.get()); - assertTrue(manager.isIndexUpToDate()); + assertFalse(manager.isIndexUpToDate()); listenerCalled.set(false); assertFalse(listenerCalled.get()); manager.clusterChanged(event(new ClusterState.Builder(CLUSTER_NAME))); assertTrue(listenerCalled.get()); - assertFalse(manager.isIndexUpToDate()); + assertTrue(manager.isIndexUpToDate()); + + listenerCalled.set(false); + // index doesn't exist and now exists with correct format + clusterStateBuilder = createClusterState(INDEX_NAME, TEMPLATE_NAME, IndexLifecycleManager.INTERNAL_INDEX_FORMAT); + markShardsAvailable(clusterStateBuilder); + manager.clusterChanged(event(clusterStateBuilder)); + assertFalse(listenerCalled.get()); + assertTrue(manager.isIndexUpToDate()); } private void assertInitialState() { @@ -242,13 +252,17 @@ public class IndexLifecycleManagerTests extends ESTestCase { } public static ClusterState.Builder createClusterState(String indexName, String templateName) throws IOException { - return createClusterState(indexName, templateName, templateName); + return createClusterState(indexName, templateName, templateName, IndexLifecycleManager.INTERNAL_INDEX_FORMAT); } - private static ClusterState.Builder createClusterState(String indexName, String templateName, String buildMappingFrom) + public static ClusterState.Builder createClusterState(String indexName, String templateName, int format) throws IOException { + return createClusterState(indexName, templateName, templateName, format); + } + + private static ClusterState.Builder createClusterState(String indexName, String templateName, String buildMappingFrom, int format) throws IOException { IndexTemplateMetaData.Builder templateBuilder = getIndexTemplateMetaData(templateName); - IndexMetaData.Builder indexMeta = getIndexMetadata(indexName, buildMappingFrom); + IndexMetaData.Builder indexMeta = getIndexMetadata(indexName, buildMappingFrom, format); MetaData.Builder metaDataBuilder = new MetaData.Builder(); metaDataBuilder.put(templateBuilder); @@ -269,13 +283,13 @@ public class IndexLifecycleManagerTests extends ESTestCase { .build(); } - private static IndexMetaData.Builder getIndexMetadata(String indexName, String templateName) throws IOException { + private static IndexMetaData.Builder getIndexMetadata(String indexName, String templateName, int format) throws IOException { IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName); indexMetaData.settings(Settings.builder() .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), IndexLifecycleManager.INTERNAL_INDEX_FORMAT) + .put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), format) .build()); final Map mappings = getTemplateMappings(templateName);