mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-25 06:16:40 +00:00
Fix security index auto-create and state recovery race (#39582)
Previously, the security index could be wrongfully recreated. This might happen if the index was interpreted as missing, as in the case of a fresh install, but the index existed and the state did not yet recover. This fix will return HTTP SERVICE_UNAVAILABLE (503) for requests that try to write to the security index before the state has not been recovered yet.
This commit is contained in:
parent
5c023770d2
commit
e7dbfda5d3
@ -10,6 +10,7 @@ import org.apache.logging.log4j.Logger;
|
|||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.ElasticsearchParseException;
|
import org.elasticsearch.ElasticsearchParseException;
|
||||||
|
import org.elasticsearch.ElasticsearchStatusException;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.ResourceAlreadyExistsException;
|
import org.elasticsearch.ResourceAlreadyExistsException;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
@ -40,6 +41,7 @@ import org.elasticsearch.common.xcontent.XContentType;
|
|||||||
import org.elasticsearch.gateway.GatewayService;
|
import org.elasticsearch.gateway.GatewayService;
|
||||||
import org.elasticsearch.index.IndexNotFoundException;
|
import org.elasticsearch.index.IndexNotFoundException;
|
||||||
import org.elasticsearch.index.mapper.MapperService;
|
import org.elasticsearch.index.mapper.MapperService;
|
||||||
|
import org.elasticsearch.rest.RestStatus;
|
||||||
import org.elasticsearch.xpack.core.security.index.RestrictedIndicesNames;
|
import org.elasticsearch.xpack.core.security.index.RestrictedIndicesNames;
|
||||||
import org.elasticsearch.xpack.core.template.TemplateUtils;
|
import org.elasticsearch.xpack.core.template.TemplateUtils;
|
||||||
|
|
||||||
@ -81,7 +83,7 @@ public class SecurityIndexManager implements ClusterStateListener {
|
|||||||
private volatile State indexState;
|
private volatile State indexState;
|
||||||
|
|
||||||
public SecurityIndexManager(Client client, String indexName, ClusterService clusterService) {
|
public SecurityIndexManager(Client client, String indexName, ClusterService clusterService) {
|
||||||
this(client, indexName, new State(false, false, false, false, null, null, null));
|
this(client, indexName, State.UNRECOVERED_STATE);
|
||||||
clusterService.addListener(this);
|
clusterService.addListener(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -121,6 +123,10 @@ public class SecurityIndexManager implements ClusterStateListener {
|
|||||||
return this.indexState.mappingUpToDate;
|
return this.indexState.mappingUpToDate;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isStateRecovered() {
|
||||||
|
return this.indexState != State.UNRECOVERED_STATE;
|
||||||
|
}
|
||||||
|
|
||||||
public ElasticsearchException getUnavailableReason() {
|
public ElasticsearchException getUnavailableReason() {
|
||||||
final State localState = this.indexState;
|
final State localState = this.indexState;
|
||||||
if (localState.indexAvailable) {
|
if (localState.indexAvailable) {
|
||||||
@ -297,7 +303,10 @@ public class SecurityIndexManager implements ClusterStateListener {
|
|||||||
public void prepareIndexIfNeededThenExecute(final Consumer<Exception> consumer, final Runnable andThen) {
|
public void prepareIndexIfNeededThenExecute(final Consumer<Exception> consumer, final Runnable andThen) {
|
||||||
final State indexState = this.indexState; // use a local copy so all checks execute against the same state!
|
final State indexState = this.indexState; // use a local copy so all checks execute against the same state!
|
||||||
// TODO we should improve this so we don't fire off a bunch of requests to do the same thing (create or update mappings)
|
// TODO we should improve this so we don't fire off a bunch of requests to do the same thing (create or update mappings)
|
||||||
if (indexState.indexExists && indexState.isIndexUpToDate == false) {
|
if (indexState == State.UNRECOVERED_STATE) {
|
||||||
|
consumer.accept(new ElasticsearchStatusException("Cluster state has not been recovered yet, cannot write to the security index",
|
||||||
|
RestStatus.SERVICE_UNAVAILABLE));
|
||||||
|
} else if (indexState.indexExists && indexState.isIndexUpToDate == false) {
|
||||||
consumer.accept(new IllegalStateException(
|
consumer.accept(new IllegalStateException(
|
||||||
"Security index is not on the current version. Security features relying on the index will not be available until " +
|
"Security index is not on the current version. Security features relying on the index will not be available until " +
|
||||||
"the upgrade API is run on the security index"));
|
"the upgrade API is run on the security index"));
|
||||||
@ -377,6 +386,7 @@ public class SecurityIndexManager implements ClusterStateListener {
|
|||||||
* State of the security index.
|
* State of the security index.
|
||||||
*/
|
*/
|
||||||
public static class State {
|
public static class State {
|
||||||
|
public static final State UNRECOVERED_STATE = new State(false, false, false, false, null, null, null);
|
||||||
public final boolean indexExists;
|
public final boolean indexExists;
|
||||||
public final boolean isIndexUpToDate;
|
public final boolean isIndexUpToDate;
|
||||||
public final boolean indexAvailable;
|
public final boolean indexAvailable;
|
||||||
|
@ -15,6 +15,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
|
|
||||||
|
import org.elasticsearch.ElasticsearchStatusException;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.Action;
|
import org.elasticsearch.action.Action;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
@ -26,6 +27,7 @@ import org.elasticsearch.client.FilterClient;
|
|||||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||||
import org.elasticsearch.cluster.ClusterName;
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
|
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
|
||||||
@ -40,10 +42,13 @@ import org.elasticsearch.cluster.routing.UnassignedInfo;
|
|||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.UUIDs;
|
import org.elasticsearch.common.UUIDs;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
|
import org.elasticsearch.gateway.GatewayService;
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
import org.elasticsearch.rest.RestStatus;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.xpack.security.test.SecurityTestUtils;
|
import org.elasticsearch.xpack.security.test.SecurityTestUtils;
|
||||||
@ -55,6 +60,10 @@ import static org.elasticsearch.xpack.security.support.SecurityIndexManager.SECU
|
|||||||
import static org.elasticsearch.xpack.security.support.SecurityIndexManager.SECURITY_TEMPLATE_NAME;
|
import static org.elasticsearch.xpack.security.support.SecurityIndexManager.SECURITY_TEMPLATE_NAME;
|
||||||
import static org.elasticsearch.xpack.security.support.SecurityIndexManager.TEMPLATE_VERSION_PATTERN;
|
import static org.elasticsearch.xpack.security.support.SecurityIndexManager.TEMPLATE_VERSION_PATTERN;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
@ -72,6 +81,7 @@ public class SecurityIndexManagerTests extends ESTestCase {
|
|||||||
final Client mockClient = mock(Client.class);
|
final Client mockClient = mock(Client.class);
|
||||||
final ThreadPool threadPool = mock(ThreadPool.class);
|
final ThreadPool threadPool = mock(ThreadPool.class);
|
||||||
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
|
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
|
||||||
|
when(threadPool.generic()).thenReturn(EsExecutors.newDirectExecutorService());
|
||||||
when(mockClient.threadPool()).thenReturn(threadPool);
|
when(mockClient.threadPool()).thenReturn(threadPool);
|
||||||
when(mockClient.settings()).thenReturn(Settings.EMPTY);
|
when(mockClient.settings()).thenReturn(Settings.EMPTY);
|
||||||
final ClusterService clusterService = mock(ClusterService.class);
|
final ClusterService clusterService = mock(ClusterService.class);
|
||||||
@ -192,6 +202,67 @@ public class SecurityIndexManagerTests extends ESTestCase {
|
|||||||
assertEquals(ClusterHealthStatus.GREEN, currentState.get().indexStatus);
|
assertEquals(ClusterHealthStatus.GREEN, currentState.get().indexStatus);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testWriteBeforeStateNotRecovered() throws Exception {
|
||||||
|
final AtomicBoolean prepareRunnableCalled = new AtomicBoolean(false);
|
||||||
|
final AtomicReference<Exception> prepareException = new AtomicReference<>(null);
|
||||||
|
manager.prepareIndexIfNeededThenExecute(ex -> {
|
||||||
|
prepareException.set(ex);
|
||||||
|
}, () -> {
|
||||||
|
prepareRunnableCalled.set(true);
|
||||||
|
});
|
||||||
|
assertThat(prepareException.get(), is(notNullValue()));
|
||||||
|
assertThat(prepareException.get(), instanceOf(ElasticsearchStatusException.class));
|
||||||
|
assertThat(((ElasticsearchStatusException)prepareException.get()).status(), is(RestStatus.SERVICE_UNAVAILABLE));
|
||||||
|
assertThat(prepareRunnableCalled.get(), is(false));
|
||||||
|
prepareException.set(null);
|
||||||
|
prepareRunnableCalled.set(false);
|
||||||
|
// state not recovered
|
||||||
|
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK);
|
||||||
|
manager.clusterChanged(event(new ClusterState.Builder(CLUSTER_NAME).blocks(blocks)));
|
||||||
|
manager.prepareIndexIfNeededThenExecute(ex -> {
|
||||||
|
prepareException.set(ex);
|
||||||
|
}, () -> {
|
||||||
|
prepareRunnableCalled.set(true);
|
||||||
|
});
|
||||||
|
assertThat(prepareException.get(), is(notNullValue()));
|
||||||
|
assertThat(prepareException.get(), instanceOf(ElasticsearchStatusException.class));
|
||||||
|
assertThat(((ElasticsearchStatusException)prepareException.get()).status(), is(RestStatus.SERVICE_UNAVAILABLE));
|
||||||
|
assertThat(prepareRunnableCalled.get(), is(false));
|
||||||
|
prepareException.set(null);
|
||||||
|
prepareRunnableCalled.set(false);
|
||||||
|
// state recovered with index
|
||||||
|
ClusterState.Builder clusterStateBuilder = createClusterState(INDEX_NAME, TEMPLATE_NAME,
|
||||||
|
SecurityIndexManager.INTERNAL_INDEX_FORMAT);
|
||||||
|
markShardsAvailable(clusterStateBuilder);
|
||||||
|
manager.clusterChanged(event(clusterStateBuilder));
|
||||||
|
manager.prepareIndexIfNeededThenExecute(ex -> {
|
||||||
|
prepareException.set(ex);
|
||||||
|
}, () -> {
|
||||||
|
prepareRunnableCalled.set(true);
|
||||||
|
});
|
||||||
|
assertThat(prepareException.get(), is(nullValue()));
|
||||||
|
assertThat(prepareRunnableCalled.get(), is(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testListeneredNotCalledBeforeStateNotRecovered() throws Exception {
|
||||||
|
final AtomicBoolean listenerCalled = new AtomicBoolean(false);
|
||||||
|
manager.addIndexStateListener((prev, current) -> {
|
||||||
|
listenerCalled.set(true);
|
||||||
|
});
|
||||||
|
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK);
|
||||||
|
// state not recovered
|
||||||
|
manager.clusterChanged(event(new ClusterState.Builder(CLUSTER_NAME).blocks(blocks)));
|
||||||
|
assertThat(manager.isStateRecovered(), is(false));
|
||||||
|
assertThat(listenerCalled.get(), is(false));
|
||||||
|
// state recovered with index
|
||||||
|
ClusterState.Builder clusterStateBuilder = createClusterState(INDEX_NAME, TEMPLATE_NAME,
|
||||||
|
SecurityIndexManager.INTERNAL_INDEX_FORMAT);
|
||||||
|
markShardsAvailable(clusterStateBuilder);
|
||||||
|
manager.clusterChanged(event(clusterStateBuilder));
|
||||||
|
assertThat(manager.isStateRecovered(), is(true));
|
||||||
|
assertThat(listenerCalled.get(), is(true));
|
||||||
|
}
|
||||||
|
|
||||||
public void testIndexOutOfDateListeners() throws Exception {
|
public void testIndexOutOfDateListeners() throws Exception {
|
||||||
final AtomicBoolean listenerCalled = new AtomicBoolean(false);
|
final AtomicBoolean listenerCalled = new AtomicBoolean(false);
|
||||||
manager.clusterChanged(event(new ClusterState.Builder(CLUSTER_NAME)));
|
manager.clusterChanged(event(new ClusterState.Builder(CLUSTER_NAME)));
|
||||||
@ -236,12 +307,14 @@ public class SecurityIndexManagerTests extends ESTestCase {
|
|||||||
assertThat(manager.indexExists(), Matchers.equalTo(false));
|
assertThat(manager.indexExists(), Matchers.equalTo(false));
|
||||||
assertThat(manager.isAvailable(), Matchers.equalTo(false));
|
assertThat(manager.isAvailable(), Matchers.equalTo(false));
|
||||||
assertThat(manager.isMappingUpToDate(), Matchers.equalTo(false));
|
assertThat(manager.isMappingUpToDate(), Matchers.equalTo(false));
|
||||||
|
assertThat(manager.isStateRecovered(), Matchers.equalTo(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertIndexUpToDateButNotAvailable() {
|
private void assertIndexUpToDateButNotAvailable() {
|
||||||
assertThat(manager.indexExists(), Matchers.equalTo(true));
|
assertThat(manager.indexExists(), Matchers.equalTo(true));
|
||||||
assertThat(manager.isAvailable(), Matchers.equalTo(false));
|
assertThat(manager.isAvailable(), Matchers.equalTo(false));
|
||||||
assertThat(manager.isMappingUpToDate(), Matchers.equalTo(true));
|
assertThat(manager.isMappingUpToDate(), Matchers.equalTo(true));
|
||||||
|
assertThat(manager.isStateRecovered(), Matchers.equalTo(true));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ClusterState.Builder createClusterState(String indexName, String templateName) throws IOException {
|
public static ClusterState.Builder createClusterState(String indexName, String templateName) throws IOException {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user