Security: IndexLifecycleManager provides a consistent view of index state (elastic/x-pack-elasticsearch#3008)
This commit changes the IndexLifecycleManager's handling of variables about an index to only update all of the values at a single time. Previously, all of the index state variables were volatile members of the IndexLifecycleManager, which meant we could get an inconsistent view of the index state. Although rare, this is still incorrect so this change adds a single volatile variable that holds the state as of the last processed cluster state update. Additionally, the IndexLifecycleManagerIntegTests were updated to have more concurrency and further stress this portion of the code and its checks. relates elastic/x-pack-elasticsearch#2973 Original commit: elastic/x-pack-elasticsearch@5f1552b298
This commit is contained in:
parent
d86e7870da
commit
4ae1ca5fa5
|
@ -154,7 +154,7 @@ public class SecurityLifecycleService extends AbstractComponent implements Clust
|
||||||
* current value will be provided to the listener so that the listener can determine if any action
|
* current value will be provided to the listener so that the listener can determine if any action
|
||||||
* needs to be taken.
|
* needs to be taken.
|
||||||
*/
|
*/
|
||||||
public void addSecurityIndexOutOfDateListener(BiConsumer<Boolean, Boolean> listener) {
|
void addSecurityIndexOutOfDateListener(BiConsumer<Boolean, Boolean> listener) {
|
||||||
securityIndex.addIndexOutOfDateListener(listener);
|
securityIndex.addIndexOutOfDateListener(listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -206,9 +206,10 @@ public class SecurityLifecycleService extends AbstractComponent implements Clust
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks if the security index is out of date with the current version.
|
* Checks if the security index is out of date with the current version. If the index does not exist
|
||||||
|
* we treat the index as up to date as we expect it to be created with the current format.
|
||||||
*/
|
*/
|
||||||
public boolean isSecurityIndexOutOfDate() {
|
public boolean isSecurityIndexOutOfDate() {
|
||||||
return securityIndex.indexExists() && !securityIndex.isIndexUpToDate();
|
return securityIndex.isIndexUpToDate() == false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,13 +62,7 @@ public class IndexLifecycleManager extends AbstractComponent {
|
||||||
private final List<BiConsumer<ClusterIndexHealth, ClusterIndexHealth>> indexHealthChangeListeners = new CopyOnWriteArrayList<>();
|
private final List<BiConsumer<ClusterIndexHealth, ClusterIndexHealth>> indexHealthChangeListeners = new CopyOnWriteArrayList<>();
|
||||||
private final List<BiConsumer<Boolean, Boolean>> indexOutOfDateListeners = new CopyOnWriteArrayList<>();
|
private final List<BiConsumer<Boolean, Boolean>> indexOutOfDateListeners = new CopyOnWriteArrayList<>();
|
||||||
|
|
||||||
private volatile boolean templateIsUpToDate;
|
private volatile State indexState = new State(false, false, false, false, null);
|
||||||
private volatile boolean indexExists;
|
|
||||||
private volatile boolean isIndexUpToDate;
|
|
||||||
private volatile boolean indexAvailable;
|
|
||||||
private volatile boolean canWriteToIndex;
|
|
||||||
private volatile boolean mappingIsUpToDate;
|
|
||||||
private volatile Version mappingVersion;
|
|
||||||
|
|
||||||
public IndexLifecycleManager(Settings settings, InternalSecurityClient client, String indexName, String templateName) {
|
public IndexLifecycleManager(Settings settings, InternalSecurityClient client, String indexName, String templateName) {
|
||||||
super(settings);
|
super(settings);
|
||||||
|
@ -78,23 +72,29 @@ public class IndexLifecycleManager extends AbstractComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean checkMappingVersion(Predicate<Version> requiredVersion) {
|
public boolean checkMappingVersion(Predicate<Version> requiredVersion) {
|
||||||
return this.mappingVersion == null || requiredVersion.test(this.mappingVersion);
|
// pull value into local variable for consistent view
|
||||||
|
final State currentIndexState = this.indexState;
|
||||||
|
return currentIndexState.mappingVersion == null || requiredVersion.test(currentIndexState.mappingVersion);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean indexExists() {
|
public boolean indexExists() {
|
||||||
return indexExists;
|
return this.indexState.indexExists;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns whether the index is on the current format if it exists. If the index does not exist
|
||||||
|
* we treat the index as up to date as we expect it to be created with the current format.
|
||||||
|
*/
|
||||||
public boolean isIndexUpToDate() {
|
public boolean isIndexUpToDate() {
|
||||||
return isIndexUpToDate;
|
return this.indexState.isIndexUpToDate;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isAvailable() {
|
public boolean isAvailable() {
|
||||||
return indexAvailable;
|
return this.indexState.indexAvailable;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isWritable() {
|
public boolean isWritable() {
|
||||||
return canWriteToIndex;
|
return this.indexState.canWriteToIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -116,26 +116,27 @@ public class IndexLifecycleManager extends AbstractComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clusterChanged(ClusterChangedEvent event) {
|
public void clusterChanged(ClusterChangedEvent event) {
|
||||||
final boolean previousUpToDate = this.isIndexUpToDate;
|
final boolean previousUpToDate = this.indexState.isIndexUpToDate;
|
||||||
processClusterState(event.state());
|
processClusterState(event.state());
|
||||||
checkIndexHealthChange(event);
|
checkIndexHealthChange(event);
|
||||||
if (previousUpToDate != this.isIndexUpToDate) {
|
if (previousUpToDate != this.indexState.isIndexUpToDate) {
|
||||||
notifyIndexOutOfDateListeners(previousUpToDate, this.isIndexUpToDate);
|
notifyIndexOutOfDateListeners(previousUpToDate, this.indexState.isIndexUpToDate);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processClusterState(ClusterState state) {
|
private void processClusterState(ClusterState clusterState) {
|
||||||
assert state != null;
|
assert clusterState != null;
|
||||||
final IndexMetaData securityIndex = resolveConcreteIndex(indexName, state.metaData());
|
final IndexMetaData securityIndex = resolveConcreteIndex(indexName, clusterState.metaData());
|
||||||
this.indexExists = securityIndex != null;
|
final boolean indexExists = securityIndex != null;
|
||||||
this.isIndexUpToDate = (securityIndex != null
|
final boolean isIndexUpToDate = indexExists == false ||
|
||||||
&& INDEX_FORMAT_SETTING.get(securityIndex.getSettings()).intValue() == INTERNAL_INDEX_FORMAT);
|
INDEX_FORMAT_SETTING.get(securityIndex.getSettings()).intValue() == INTERNAL_INDEX_FORMAT;
|
||||||
this.indexAvailable = checkIndexAvailable(state);
|
final boolean indexAvailable = checkIndexAvailable(clusterState);
|
||||||
this.templateIsUpToDate = TemplateUtils.checkTemplateExistsAndIsUpToDate(templateName,
|
final boolean templateIsUpToDate = TemplateUtils.checkTemplateExistsAndIsUpToDate(templateName,
|
||||||
SECURITY_VERSION_STRING, state, logger);
|
SECURITY_VERSION_STRING, clusterState, logger);
|
||||||
this.mappingIsUpToDate = checkIndexMappingUpToDate(state);
|
final boolean mappingIsUpToDate = checkIndexMappingUpToDate(clusterState);
|
||||||
this.canWriteToIndex = templateIsUpToDate && (mappingIsUpToDate || isIndexUpToDate);
|
final boolean canWriteToIndex = templateIsUpToDate && (mappingIsUpToDate || isIndexUpToDate);
|
||||||
this.mappingVersion = oldestIndexMappingVersion(state);
|
final Version mappingVersion = oldestIndexMappingVersion(clusterState);
|
||||||
|
this.indexState = new State(indexExists, isIndexUpToDate, indexAvailable, canWriteToIndex, mappingVersion);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkIndexHealthChange(ClusterChangedEvent event) {
|
private void checkIndexHealthChange(ClusterChangedEvent event) {
|
||||||
|
@ -285,7 +286,7 @@ public class IndexLifecycleManager extends AbstractComponent {
|
||||||
* action on the security index.
|
* action on the security index.
|
||||||
*/
|
*/
|
||||||
public <T> void createIndexIfNeededThenExecute(final ActionListener<T> listener, final Runnable andThen) {
|
public <T> void createIndexIfNeededThenExecute(final ActionListener<T> listener, final Runnable andThen) {
|
||||||
if (indexExists) {
|
if (this.indexState.indexExists) {
|
||||||
andThen.run();
|
andThen.run();
|
||||||
} else {
|
} else {
|
||||||
CreateIndexRequest request = new CreateIndexRequest(INTERNAL_SECURITY_INDEX);
|
CreateIndexRequest request = new CreateIndexRequest(INTERNAL_SECURITY_INDEX);
|
||||||
|
@ -314,4 +315,24 @@ public class IndexLifecycleManager extends AbstractComponent {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Holder class so we can update all values at once
|
||||||
|
*/
|
||||||
|
private static class State {
|
||||||
|
private final boolean indexExists;
|
||||||
|
private final boolean isIndexUpToDate;
|
||||||
|
private final boolean indexAvailable;
|
||||||
|
private final boolean canWriteToIndex;
|
||||||
|
private final Version mappingVersion;
|
||||||
|
|
||||||
|
private State(boolean indexExists, boolean isIndexUpToDate, boolean indexAvailable,
|
||||||
|
boolean canWriteToIndex, Version mappingVersion) {
|
||||||
|
this.indexExists = indexExists;
|
||||||
|
this.isIndexUpToDate = isIndexUpToDate;
|
||||||
|
this.indexAvailable = indexAvailable;
|
||||||
|
this.canWriteToIndex = canWriteToIndex;
|
||||||
|
this.mappingVersion = mappingVersion;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,33 +7,68 @@ package org.elasticsearch.xpack.security.support;
|
||||||
|
|
||||||
import org.elasticsearch.action.ActionFuture;
|
import org.elasticsearch.action.ActionFuture;
|
||||||
import org.elasticsearch.action.support.PlainActionFuture;
|
import org.elasticsearch.action.support.PlainActionFuture;
|
||||||
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||||
import org.elasticsearch.test.SecurityIntegTestCase;
|
import org.elasticsearch.test.SecurityIntegTestCase;
|
||||||
import org.elasticsearch.xpack.security.action.user.PutUserRequest;
|
import org.elasticsearch.xpack.security.action.user.PutUserRequest;
|
||||||
import org.elasticsearch.xpack.security.action.user.PutUserResponse;
|
import org.elasticsearch.xpack.security.action.user.PutUserResponse;
|
||||||
|
import org.hamcrest.Matchers;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.CyclicBarrier;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
public class IndexLifecycleManagerIntegTests extends SecurityIntegTestCase {
|
public class IndexLifecycleManagerIntegTests extends SecurityIntegTestCase {
|
||||||
|
|
||||||
public void testConcurrentOperationsTryingToCreateSecurityIndexAndAlias() throws Exception {
|
public void testConcurrentOperationsTryingToCreateSecurityIndexAndAlias() throws Exception {
|
||||||
assertSecurityIndexWriteable();
|
assertSecurityIndexWriteable();
|
||||||
|
final int processors = Runtime.getRuntime().availableProcessors();
|
||||||
|
final int numThreads = scaledRandomIntBetween((processors + 1) / 2, 4 * processors);
|
||||||
final int numRequests = scaledRandomIntBetween(4, 16);
|
final int numRequests = scaledRandomIntBetween(4, 16);
|
||||||
List<ActionFuture<PutUserResponse>> futures = new ArrayList<>(numRequests);
|
|
||||||
List<PutUserRequest> requests = new ArrayList<>(numRequests);
|
final List<ActionFuture<PutUserResponse>> futures = new CopyOnWriteArrayList<>();
|
||||||
for (int i = 0; i < numRequests; i++) {
|
final List<Exception> exceptions = new CopyOnWriteArrayList<>();
|
||||||
requests.add(securityClient()
|
final Thread[] threads = new Thread[numThreads];
|
||||||
.preparePutUser("user" + i, "password".toCharArray(), randomAlphaOfLengthBetween(1, 16))
|
final CyclicBarrier barrier = new CyclicBarrier(threads.length);
|
||||||
.request());
|
final AtomicInteger userNumber = new AtomicInteger(0);
|
||||||
|
for (int i = 0; i < threads.length; i++) {
|
||||||
|
threads[i] = new Thread(new AbstractRunnable() {
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
exceptions.add(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doRun() throws Exception {
|
||||||
|
final List<PutUserRequest> requests = new ArrayList<>(numRequests);
|
||||||
|
for (int i = 0; i < numRequests; i++) {
|
||||||
|
requests.add(securityClient()
|
||||||
|
.preparePutUser("user" + userNumber.getAndIncrement(), "password".toCharArray(),
|
||||||
|
randomAlphaOfLengthBetween(1, 16))
|
||||||
|
.request());
|
||||||
|
}
|
||||||
|
|
||||||
|
barrier.await(10L, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
for (PutUserRequest request : requests) {
|
||||||
|
PlainActionFuture<PutUserResponse> responsePlainActionFuture = new PlainActionFuture<>();
|
||||||
|
securityClient().putUser(request, responsePlainActionFuture);
|
||||||
|
futures.add(responsePlainActionFuture);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, "create_users_thread" + i);
|
||||||
|
threads[i].start();
|
||||||
}
|
}
|
||||||
|
|
||||||
for (PutUserRequest request : requests) {
|
for (Thread thread : threads) {
|
||||||
PlainActionFuture<PutUserResponse> responsePlainActionFuture = new PlainActionFuture<>();
|
thread.join();
|
||||||
securityClient().putUser(request, responsePlainActionFuture);
|
|
||||||
futures.add(responsePlainActionFuture);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assertThat(exceptions, Matchers.empty());
|
||||||
|
assertEquals(futures.size(), numRequests * numThreads);
|
||||||
for (ActionFuture<PutUserResponse> future : futures) {
|
for (ActionFuture<PutUserResponse> future : futures) {
|
||||||
assertTrue(future.actionGet().created());
|
assertTrue(future.actionGet().created());
|
||||||
}
|
}
|
||||||
|
|
|
@ -205,28 +205,38 @@ public class IndexLifecycleManagerTests extends ESTestCase {
|
||||||
|
|
||||||
public void testIndexOutOfDateListeners() throws Exception {
|
public void testIndexOutOfDateListeners() throws Exception {
|
||||||
final AtomicBoolean listenerCalled = new AtomicBoolean(false);
|
final AtomicBoolean listenerCalled = new AtomicBoolean(false);
|
||||||
|
manager.clusterChanged(event(new ClusterState.Builder(CLUSTER_NAME)));
|
||||||
manager.addIndexOutOfDateListener((prev, current) -> {
|
manager.addIndexOutOfDateListener((prev, current) -> {
|
||||||
listenerCalled.set(true);
|
listenerCalled.set(true);
|
||||||
assertNotEquals(prev, current);
|
assertNotEquals(prev, current);
|
||||||
});
|
});
|
||||||
assertFalse(manager.isIndexUpToDate());
|
assertTrue(manager.isIndexUpToDate());
|
||||||
|
|
||||||
manager.clusterChanged(event(new ClusterState.Builder(CLUSTER_NAME)));
|
manager.clusterChanged(event(new ClusterState.Builder(CLUSTER_NAME)));
|
||||||
assertFalse(listenerCalled.get());
|
assertFalse(listenerCalled.get());
|
||||||
assertFalse(manager.isIndexUpToDate());
|
assertTrue(manager.isIndexUpToDate());
|
||||||
|
|
||||||
// index doesn't exist and now exists
|
// index doesn't exist and now exists with wrong format
|
||||||
final ClusterState.Builder clusterStateBuilder = createClusterState(INDEX_NAME, TEMPLATE_NAME);
|
ClusterState.Builder clusterStateBuilder = createClusterState(INDEX_NAME, TEMPLATE_NAME,
|
||||||
|
IndexLifecycleManager.INTERNAL_INDEX_FORMAT - 1);
|
||||||
markShardsAvailable(clusterStateBuilder);
|
markShardsAvailable(clusterStateBuilder);
|
||||||
manager.clusterChanged(event(clusterStateBuilder));
|
manager.clusterChanged(event(clusterStateBuilder));
|
||||||
assertTrue(listenerCalled.get());
|
assertTrue(listenerCalled.get());
|
||||||
assertTrue(manager.isIndexUpToDate());
|
assertFalse(manager.isIndexUpToDate());
|
||||||
|
|
||||||
listenerCalled.set(false);
|
listenerCalled.set(false);
|
||||||
assertFalse(listenerCalled.get());
|
assertFalse(listenerCalled.get());
|
||||||
manager.clusterChanged(event(new ClusterState.Builder(CLUSTER_NAME)));
|
manager.clusterChanged(event(new ClusterState.Builder(CLUSTER_NAME)));
|
||||||
assertTrue(listenerCalled.get());
|
assertTrue(listenerCalled.get());
|
||||||
assertFalse(manager.isIndexUpToDate());
|
assertTrue(manager.isIndexUpToDate());
|
||||||
|
|
||||||
|
listenerCalled.set(false);
|
||||||
|
// index doesn't exist and now exists with correct format
|
||||||
|
clusterStateBuilder = createClusterState(INDEX_NAME, TEMPLATE_NAME, IndexLifecycleManager.INTERNAL_INDEX_FORMAT);
|
||||||
|
markShardsAvailable(clusterStateBuilder);
|
||||||
|
manager.clusterChanged(event(clusterStateBuilder));
|
||||||
|
assertFalse(listenerCalled.get());
|
||||||
|
assertTrue(manager.isIndexUpToDate());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertInitialState() {
|
private void assertInitialState() {
|
||||||
|
@ -242,13 +252,17 @@ public class IndexLifecycleManagerTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ClusterState.Builder createClusterState(String indexName, String templateName) throws IOException {
|
public static ClusterState.Builder createClusterState(String indexName, String templateName) throws IOException {
|
||||||
return createClusterState(indexName, templateName, templateName);
|
return createClusterState(indexName, templateName, templateName, IndexLifecycleManager.INTERNAL_INDEX_FORMAT);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ClusterState.Builder createClusterState(String indexName, String templateName, String buildMappingFrom)
|
public static ClusterState.Builder createClusterState(String indexName, String templateName, int format) throws IOException {
|
||||||
|
return createClusterState(indexName, templateName, templateName, format);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ClusterState.Builder createClusterState(String indexName, String templateName, String buildMappingFrom, int format)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
IndexTemplateMetaData.Builder templateBuilder = getIndexTemplateMetaData(templateName);
|
IndexTemplateMetaData.Builder templateBuilder = getIndexTemplateMetaData(templateName);
|
||||||
IndexMetaData.Builder indexMeta = getIndexMetadata(indexName, buildMappingFrom);
|
IndexMetaData.Builder indexMeta = getIndexMetadata(indexName, buildMappingFrom, format);
|
||||||
|
|
||||||
MetaData.Builder metaDataBuilder = new MetaData.Builder();
|
MetaData.Builder metaDataBuilder = new MetaData.Builder();
|
||||||
metaDataBuilder.put(templateBuilder);
|
metaDataBuilder.put(templateBuilder);
|
||||||
|
@ -269,13 +283,13 @@ public class IndexLifecycleManagerTests extends ESTestCase {
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static IndexMetaData.Builder getIndexMetadata(String indexName, String templateName) throws IOException {
|
private static IndexMetaData.Builder getIndexMetadata(String indexName, String templateName, int format) throws IOException {
|
||||||
IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName);
|
IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName);
|
||||||
indexMetaData.settings(Settings.builder()
|
indexMetaData.settings(Settings.builder()
|
||||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||||
.put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), IndexLifecycleManager.INTERNAL_INDEX_FORMAT)
|
.put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), format)
|
||||||
.build());
|
.build());
|
||||||
|
|
||||||
final Map<String, String> mappings = getTemplateMappings(templateName);
|
final Map<String, String> mappings = getTemplateMappings(templateName);
|
||||||
|
|
Loading…
Reference in New Issue