mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-24 05:44:59 +00:00
SecurityIndexManager handle RuntimeEx while reading mapping (#44409)
Fixes exception handling while reading and parsing `.security-*` mappings and templates.
This commit is contained in:
parent
383d7b7713
commit
af937b14ae
@ -117,7 +117,8 @@ public class SecurityIndexManager implements ClusterStateListener {
|
||||
clusterService.addListener(this);
|
||||
}
|
||||
|
||||
private SecurityIndexManager(Client client, String aliasName, String internalIndexName, int internalIndexFormat,
|
||||
// protected for testing
|
||||
protected SecurityIndexManager(Client client, String aliasName, String internalIndexName, int internalIndexFormat,
|
||||
Supplier<byte[]> mappingSourceSupplier, State indexState) {
|
||||
this.aliasName = aliasName;
|
||||
this.internalIndexName = internalIndexName;
|
||||
@ -351,65 +352,68 @@ public class SecurityIndexManager implements ClusterStateListener {
|
||||
*/
|
||||
public void prepareIndexIfNeededThenExecute(final Consumer<Exception> consumer, final Runnable andThen) {
|
||||
final State indexState = this.indexState; // use a local copy so all checks execute against the same state!
|
||||
// TODO we should improve this so we don't fire off a bunch of requests to do the same thing (create or update mappings)
|
||||
if (indexState == State.UNRECOVERED_STATE) {
|
||||
consumer.accept(new ElasticsearchStatusException(
|
||||
"Cluster state has not been recovered yet, cannot write to the [" + indexState.concreteIndexName + "] index",
|
||||
RestStatus.SERVICE_UNAVAILABLE));
|
||||
} else if (indexState.indexExists() && indexState.isIndexUpToDate == false) {
|
||||
consumer.accept(new IllegalStateException(
|
||||
"Index [" + indexState.concreteIndexName + "] is not on the current version."
|
||||
+ "Security features relying on the index will not be available until the upgrade API is run on the index"));
|
||||
} else if (indexState.indexExists() == false) {
|
||||
assert indexState.concreteIndexName != null;
|
||||
logger.info("security index does not exist. Creating [{}] with alias [{}]", indexState.concreteIndexName, this.aliasName);
|
||||
final byte[] mappingSource = mappingSourceSupplier.get();
|
||||
final Tuple<String, Settings> mappingAndSettings = parseMappingAndSettingsFromTemplateBytes(mappingSource);
|
||||
CreateIndexRequest request = new CreateIndexRequest(indexState.concreteIndexName)
|
||||
.alias(new Alias(this.aliasName))
|
||||
.mapping(MapperService.SINGLE_MAPPING_NAME, mappingAndSettings.v1(), XContentType.JSON)
|
||||
.waitForActiveShards(ActiveShardCount.ALL)
|
||||
.settings(mappingAndSettings.v2());
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
|
||||
new ActionListener<CreateIndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(CreateIndexResponse createIndexResponse) {
|
||||
if (createIndexResponse.isAcknowledged()) {
|
||||
andThen.run();
|
||||
} else {
|
||||
consumer.accept(new ElasticsearchException("Failed to create security index"));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
final Throwable cause = ExceptionsHelper.unwrapCause(e);
|
||||
if (cause instanceof ResourceAlreadyExistsException) {
|
||||
// the index already exists - it was probably just created so this
|
||||
// node hasn't yet received the cluster state update with the index
|
||||
andThen.run();
|
||||
} else {
|
||||
consumer.accept(e);
|
||||
}
|
||||
}
|
||||
}, client.admin().indices()::create);
|
||||
} else if (indexState.mappingUpToDate == false) {
|
||||
logger.info("Index [{}] (alias [{}]) is not up to date. Updating mapping", indexState.concreteIndexName, this.aliasName);
|
||||
final byte[] mappingSource = mappingSourceSupplier.get();
|
||||
final Tuple<String, Settings> mappingAndSettings = parseMappingAndSettingsFromTemplateBytes(mappingSource);
|
||||
PutMappingRequest request = new PutMappingRequest(indexState.concreteIndexName)
|
||||
.source(mappingAndSettings.v1(), XContentType.JSON)
|
||||
.type(MapperService.SINGLE_MAPPING_NAME);
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
|
||||
ActionListener.<AcknowledgedResponse>wrap(putMappingResponse -> {
|
||||
if (putMappingResponse.isAcknowledged()) {
|
||||
try {
|
||||
// TODO we should improve this so we don't fire off a bunch of requests to do the same thing (create or update mappings)
|
||||
if (indexState == State.UNRECOVERED_STATE) {
|
||||
throw new ElasticsearchStatusException(
|
||||
"Cluster state has not been recovered yet, cannot write to the [" + indexState.concreteIndexName + "] index",
|
||||
RestStatus.SERVICE_UNAVAILABLE);
|
||||
} else if (indexState.indexExists() && indexState.isIndexUpToDate == false) {
|
||||
throw new IllegalStateException("Index [" + indexState.concreteIndexName + "] is not on the current version."
|
||||
+ "Security features relying on the index will not be available until the upgrade API is run on the index");
|
||||
} else if (indexState.indexExists() == false) {
|
||||
assert indexState.concreteIndexName != null;
|
||||
logger.info("security index does not exist. Creating [{}] with alias [{}]", indexState.concreteIndexName, this.aliasName);
|
||||
final byte[] mappingSource = mappingSourceSupplier.get();
|
||||
final Tuple<String, Settings> mappingAndSettings = parseMappingAndSettingsFromTemplateBytes(mappingSource);
|
||||
CreateIndexRequest request = new CreateIndexRequest(indexState.concreteIndexName)
|
||||
.alias(new Alias(this.aliasName))
|
||||
.mapping(MapperService.SINGLE_MAPPING_NAME, mappingAndSettings.v1(), XContentType.JSON)
|
||||
.waitForActiveShards(ActiveShardCount.ALL)
|
||||
.settings(mappingAndSettings.v2());
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
|
||||
new ActionListener<CreateIndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(CreateIndexResponse createIndexResponse) {
|
||||
if (createIndexResponse.isAcknowledged()) {
|
||||
andThen.run();
|
||||
} else {
|
||||
consumer.accept(new IllegalStateException("put mapping request was not acknowledged"));
|
||||
consumer.accept(new ElasticsearchException("Failed to create security index"));
|
||||
}
|
||||
}, consumer), client.admin().indices()::putMapping);
|
||||
} else {
|
||||
andThen.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
final Throwable cause = ExceptionsHelper.unwrapCause(e);
|
||||
if (cause instanceof ResourceAlreadyExistsException) {
|
||||
// the index already exists - it was probably just created so this
|
||||
// node hasn't yet received the cluster state update with the index
|
||||
andThen.run();
|
||||
} else {
|
||||
consumer.accept(e);
|
||||
}
|
||||
}
|
||||
}, client.admin().indices()::create);
|
||||
} else if (indexState.mappingUpToDate == false) {
|
||||
logger.info("Index [{}] (alias [{}]) is not up to date. Updating mapping", indexState.concreteIndexName, this.aliasName);
|
||||
final byte[] mappingSource = mappingSourceSupplier.get();
|
||||
final Tuple<String, Settings> mappingAndSettings = parseMappingAndSettingsFromTemplateBytes(mappingSource);
|
||||
PutMappingRequest request = new PutMappingRequest(indexState.concreteIndexName)
|
||||
.source(mappingAndSettings.v1(), XContentType.JSON)
|
||||
.type(MapperService.SINGLE_MAPPING_NAME);
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
|
||||
ActionListener.<AcknowledgedResponse>wrap(putMappingResponse -> {
|
||||
if (putMappingResponse.isAcknowledged()) {
|
||||
andThen.run();
|
||||
} else {
|
||||
consumer.accept(new IllegalStateException("put mapping request was not acknowledged"));
|
||||
}
|
||||
}, consumer), client.admin().indices()::putMapping);
|
||||
} else {
|
||||
andThen.run();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
consumer.accept(e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -433,7 +437,7 @@ public class SecurityIndexManager implements ClusterStateListener {
|
||||
SecurityIndexManager.TEMPLATE_VERSION_PATTERN).getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
private static Tuple<String, Settings> parseMappingAndSettingsFromTemplateBytes(byte[] template) {
|
||||
private static Tuple<String, Settings> parseMappingAndSettingsFromTemplateBytes(byte[] template) throws IOException {
|
||||
final PutIndexTemplateRequest request = new PutIndexTemplateRequest("name_is_not_important").source(template, XContentType.JSON);
|
||||
final String mappingSource = request.mappings().get(MapperService.SINGLE_MAPPING_NAME);
|
||||
try (XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
|
||||
@ -446,8 +450,6 @@ public class SecurityIndexManager implements ClusterStateListener {
|
||||
XContentBuilder builder = JsonXContent.contentBuilder();
|
||||
builder.generator().copyCurrentStructure(parser);
|
||||
return new Tuple<>(Strings.toString(builder), request.settings());
|
||||
} catch (IOException e) {
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6,6 +6,7 @@
|
||||
package org.elasticsearch.xpack.security.support;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
@ -14,6 +15,7 @@ import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.Version;
|
||||
@ -67,6 +69,8 @@ import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
public class SecurityIndexManagerTests extends ESTestCase {
|
||||
|
||||
@ -97,6 +101,23 @@ public class SecurityIndexManagerTests extends ESTestCase {
|
||||
}
|
||||
};
|
||||
manager = SecurityIndexManager.buildSecurityMainIndexManager(client, clusterService);
|
||||
|
||||
}
|
||||
|
||||
public void testIndexWithFaultyMappingOnDisk() {
|
||||
SecurityIndexManager.State state = new SecurityIndexManager.State(randomBoolean() ? Instant.now() : null, true, randomBoolean(),
|
||||
false, null, "not_important", null, null);
|
||||
Supplier<byte[]> mappingSourceSupplier = () -> {
|
||||
throw new RuntimeException();
|
||||
};
|
||||
Runnable runnable = mock(Runnable.class);
|
||||
manager = new SecurityIndexManager(mock(Client.class), RestrictedIndicesNames.SECURITY_MAIN_ALIAS,
|
||||
RestrictedIndicesNames.INTERNAL_SECURITY_MAIN_INDEX_7, SecurityIndexManager.INTERNAL_MAIN_INDEX_FORMAT,
|
||||
mappingSourceSupplier, state);
|
||||
AtomicReference<Exception> exceptionConsumer = new AtomicReference<>();
|
||||
manager.prepareIndexIfNeededThenExecute(e -> exceptionConsumer.set(e), runnable);
|
||||
verify(runnable, never()).run();
|
||||
assertThat(exceptionConsumer.get(), is(notNullValue()));
|
||||
}
|
||||
|
||||
public void testIndexWithUpToDateMappingAndTemplate() throws IOException {
|
||||
|
Loading…
x
Reference in New Issue
Block a user