mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 02:14:54 +00:00
Un-hardcode SecurityIndexManager to handle generic indices (#40064)
`SecurityIndexManager` is hardcoded to handle only the `.security`-`.security-7` alias-index pair. This commit removes the hardcoded bits, so that the `SecurityIndexManager` can be reused for other indices, such as the planned security tokens index (`.security-tokens-7`).
This commit is contained in:
parent
1b75ee0bd7
commit
124de8d938
@ -396,7 +396,7 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw
|
||||
components.add(auditTrailService);
|
||||
this.auditTrailService.set(auditTrailService);
|
||||
|
||||
securityIndex.set(new SecurityIndexManager(client, SecurityIndexManager.SECURITY_INDEX_NAME, clusterService));
|
||||
securityIndex.set(SecurityIndexManager.buildSecurityIndexManager(client, clusterService));
|
||||
|
||||
final TokenService tokenService = new TokenService(settings, Clock.systemUTC(), client, securityIndex.get(), clusterService);
|
||||
this.tokenService.set(tokenService);
|
||||
|
@ -62,6 +62,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -72,7 +73,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN;
|
||||
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
|
||||
|
||||
/**
|
||||
* Manages the lifecycle of a single index, its template, mapping and and data upgrades/migrations.
|
||||
* Manages the lifecycle of a single index, mapping and and data upgrades/migrations.
|
||||
*/
|
||||
public class SecurityIndexManager implements ClusterStateListener {
|
||||
|
||||
@ -82,28 +83,41 @@ public class SecurityIndexManager implements ClusterStateListener {
|
||||
public static final String TEMPLATE_VERSION_PATTERN = Pattern.quote("${security.template.version}");
|
||||
public static final String SECURITY_TEMPLATE_NAME = "security-index-template";
|
||||
public static final String SECURITY_INDEX_NAME = ".security";
|
||||
private static final Logger LOGGER = LogManager.getLogger(SecurityIndexManager.class);
|
||||
private static final Logger logger = LogManager.getLogger(SecurityIndexManager.class);
|
||||
|
||||
private final String indexName;
|
||||
private final String aliasName;
|
||||
private final String internalIndexName;
|
||||
private final int internalIndexFormat;
|
||||
private final Supplier<byte[]> mappingSourceSupplier;
|
||||
private final Client client;
|
||||
|
||||
private final List<BiConsumer<State, State>> stateChangeListeners = new CopyOnWriteArrayList<>();
|
||||
|
||||
private volatile State indexState;
|
||||
|
||||
public SecurityIndexManager(Client client, String indexName, ClusterService clusterService) {
|
||||
this(client, indexName, State.UNRECOVERED_STATE);
|
||||
public static SecurityIndexManager buildSecurityIndexManager(Client client, ClusterService clusterService) {
|
||||
return new SecurityIndexManager(client, SECURITY_INDEX_NAME, INTERNAL_SECURITY_INDEX, INTERNAL_INDEX_FORMAT,
|
||||
SecurityIndexManager::readSecurityTemplateAsBytes, clusterService);
|
||||
}
|
||||
|
||||
private SecurityIndexManager(Client client, String aliasName, String internalIndexName, int internalIndexFormat,
|
||||
Supplier<byte[]> mappingSourceSupplier, ClusterService clusterService) {
|
||||
this(client, aliasName, internalIndexName, internalIndexFormat, mappingSourceSupplier, State.UNRECOVERED_STATE);
|
||||
clusterService.addListener(this);
|
||||
}
|
||||
|
||||
private SecurityIndexManager(Client client, String indexName, State indexState) {
|
||||
this.client = client;
|
||||
this.indexName = indexName;
|
||||
private SecurityIndexManager(Client client, String aliasName, String internalIndexName, int internalIndexFormat,
|
||||
Supplier<byte[]> mappingSourceSupplier, State indexState) {
|
||||
this.aliasName = aliasName;
|
||||
this.internalIndexName = internalIndexName;
|
||||
this.internalIndexFormat = internalIndexFormat;
|
||||
this.mappingSourceSupplier = mappingSourceSupplier;
|
||||
this.indexState = indexState;
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
public SecurityIndexManager freeze() {
|
||||
return new SecurityIndexManager(null, indexName, indexState);
|
||||
return new SecurityIndexManager(null, aliasName, internalIndexName, internalIndexFormat, mappingSourceSupplier, indexState);
|
||||
}
|
||||
|
||||
public boolean checkMappingVersion(Predicate<Version> requiredVersion) {
|
||||
@ -143,9 +157,10 @@ public class SecurityIndexManager implements ClusterStateListener {
|
||||
}
|
||||
|
||||
if (localState.indexExists) {
|
||||
return new UnavailableShardsException(null, "at least one primary shard for the security index is unavailable");
|
||||
return new UnavailableShardsException(null,
|
||||
"at least one primary shard for the index [" + localState.concreteIndexName + "] is unavailable");
|
||||
} else {
|
||||
return new IndexNotFoundException(SECURITY_INDEX_NAME);
|
||||
return new IndexNotFoundException(localState.concreteIndexName);
|
||||
}
|
||||
}
|
||||
|
||||
@ -163,20 +178,20 @@ public class SecurityIndexManager implements ClusterStateListener {
|
||||
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
|
||||
// wait until the gateway has recovered from disk, otherwise we think we don't have the
|
||||
// .security index but they may not have been restored from the cluster state on disk
|
||||
LOGGER.debug("security index manager waiting until state has been recovered");
|
||||
logger.debug("security index manager waiting until state has been recovered");
|
||||
return;
|
||||
}
|
||||
final State previousState = indexState;
|
||||
final IndexMetaData indexMetaData = resolveConcreteIndex(indexName, event.state().metaData());
|
||||
final IndexMetaData indexMetaData = resolveConcreteIndex(aliasName, event.state().metaData());
|
||||
final boolean indexExists = indexMetaData != null;
|
||||
final boolean isIndexUpToDate = indexExists == false ||
|
||||
INDEX_FORMAT_SETTING.get(indexMetaData.getSettings()).intValue() == INTERNAL_INDEX_FORMAT;
|
||||
INDEX_FORMAT_SETTING.get(indexMetaData.getSettings()).intValue() == internalIndexFormat;
|
||||
final boolean indexAvailable = checkIndexAvailable(event.state());
|
||||
final boolean mappingIsUpToDate = indexExists == false || checkIndexMappingUpToDate(event.state());
|
||||
final Version mappingVersion = oldestIndexMappingVersion(event.state());
|
||||
final ClusterHealthStatus indexStatus = indexMetaData == null ? null :
|
||||
new ClusterIndexHealth(indexMetaData, event.state().getRoutingTable().index(indexMetaData.getIndex())).getStatus();
|
||||
final String concreteIndexName = indexMetaData == null ? INTERNAL_SECURITY_INDEX : indexMetaData.getIndex().getName();
|
||||
final String concreteIndexName = indexMetaData == null ? internalIndexName : indexMetaData.getIndex().getName();
|
||||
final State newState = new State(indexExists, isIndexUpToDate, indexAvailable, mappingIsUpToDate, mappingVersion, concreteIndexName,
|
||||
indexStatus);
|
||||
this.indexState = newState;
|
||||
@ -193,7 +208,7 @@ public class SecurityIndexManager implements ClusterStateListener {
|
||||
if (routingTable != null && routingTable.allPrimaryShardsActive()) {
|
||||
return true;
|
||||
}
|
||||
LOGGER.debug("Security index [{}] is not yet active", indexName);
|
||||
logger.debug("Index [{}] is not yet active", aliasName);
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -201,7 +216,7 @@ public class SecurityIndexManager implements ClusterStateListener {
|
||||
* Returns the routing-table for this index, or <code>null</code> if the index does not exist.
|
||||
*/
|
||||
private IndexRoutingTable getIndexRoutingTable(ClusterState clusterState) {
|
||||
IndexMetaData metaData = resolveConcreteIndex(indexName, clusterState.metaData());
|
||||
IndexMetaData metaData = resolveConcreteIndex(aliasName, clusterState.metaData());
|
||||
if (metaData == null) {
|
||||
return null;
|
||||
} else {
|
||||
@ -209,45 +224,39 @@ public class SecurityIndexManager implements ClusterStateListener {
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean checkTemplateExistsAndVersionMatches(
|
||||
String templateName, ClusterState state, Logger logger, Predicate<Version> predicate) {
|
||||
|
||||
return TemplateUtils.checkTemplateExistsAndVersionMatches(templateName, SECURITY_VERSION_STRING,
|
||||
state, logger, predicate);
|
||||
public static boolean checkTemplateExistsAndVersionMatches(String templateName, ClusterState state, Logger logger,
|
||||
Predicate<Version> predicate) {
|
||||
return TemplateUtils.checkTemplateExistsAndVersionMatches(templateName, SECURITY_VERSION_STRING, state, logger, predicate);
|
||||
}
|
||||
|
||||
private boolean checkIndexMappingUpToDate(ClusterState clusterState) {
|
||||
return checkIndexMappingVersionMatches(clusterState, Version.CURRENT::equals);
|
||||
}
|
||||
|
||||
private boolean checkIndexMappingVersionMatches(ClusterState clusterState,
|
||||
Predicate<Version> predicate) {
|
||||
return checkIndexMappingVersionMatches(indexName, clusterState, LOGGER, predicate);
|
||||
private boolean checkIndexMappingVersionMatches(ClusterState clusterState, Predicate<Version> predicate) {
|
||||
return checkIndexMappingVersionMatches(aliasName, clusterState, logger, predicate);
|
||||
}
|
||||
|
||||
public static boolean checkIndexMappingVersionMatches(String indexName,
|
||||
ClusterState clusterState, Logger logger,
|
||||
public static boolean checkIndexMappingVersionMatches(String indexName, ClusterState clusterState, Logger logger,
|
||||
Predicate<Version> predicate) {
|
||||
return loadIndexMappingVersions(indexName, clusterState, logger)
|
||||
.stream().allMatch(predicate);
|
||||
return loadIndexMappingVersions(indexName, clusterState, logger).stream().allMatch(predicate);
|
||||
}
|
||||
|
||||
private Version oldestIndexMappingVersion(ClusterState clusterState) {
|
||||
final Set<Version> versions = loadIndexMappingVersions(indexName, clusterState, LOGGER);
|
||||
final Set<Version> versions = loadIndexMappingVersions(aliasName, clusterState, logger);
|
||||
return versions.stream().min(Version::compareTo).orElse(null);
|
||||
}
|
||||
|
||||
private static Set<Version> loadIndexMappingVersions(String indexName,
|
||||
ClusterState clusterState, Logger logger) {
|
||||
private static Set<Version> loadIndexMappingVersions(String aliasName, ClusterState clusterState, Logger logger) {
|
||||
Set<Version> versions = new HashSet<>();
|
||||
IndexMetaData indexMetaData = resolveConcreteIndex(indexName, clusterState.metaData());
|
||||
IndexMetaData indexMetaData = resolveConcreteIndex(aliasName, clusterState.metaData());
|
||||
if (indexMetaData != null) {
|
||||
for (Object object : indexMetaData.getMappings().values().toArray()) {
|
||||
MappingMetaData mappingMetaData = (MappingMetaData) object;
|
||||
if (mappingMetaData.type().equals(MapperService.DEFAULT_MAPPING)) {
|
||||
continue;
|
||||
}
|
||||
versions.add(readMappingVersion(indexName, mappingMetaData, logger));
|
||||
versions.add(readMappingVersion(aliasName, mappingMetaData, logger));
|
||||
}
|
||||
}
|
||||
return versions;
|
||||
@ -270,8 +279,7 @@ public class SecurityIndexManager implements ClusterStateListener {
|
||||
return null;
|
||||
}
|
||||
|
||||
private static Version readMappingVersion(String indexName, MappingMetaData mappingMetaData,
|
||||
Logger logger) {
|
||||
private static Version readMappingVersion(String indexName, MappingMetaData mappingMetaData, Logger logger) {
|
||||
try {
|
||||
Map<String, Object> meta =
|
||||
(Map<String, Object>) mappingMetaData.sourceAsMap().get("_meta");
|
||||
@ -289,8 +297,8 @@ public class SecurityIndexManager implements ClusterStateListener {
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the security index is up to date and does not need to migrated. If it is not, the
|
||||
* consumer is called with an exception. If the security index is up to date, the runnable will
|
||||
* Validates that the index is up to date and does not need to be migrated. If it is not, the
|
||||
* consumer is called with an exception. If the index is up to date, the runnable will
|
||||
* be executed. <b>NOTE:</b> this method does not check the availability of the index; this check
|
||||
* is left to the caller so that this condition can be handled appropriately.
|
||||
*/
|
||||
@ -298,8 +306,8 @@ public class SecurityIndexManager implements ClusterStateListener {
|
||||
final State indexState = this.indexState; // use a local copy so all checks execute against the same state!
|
||||
if (indexState.indexExists && indexState.isIndexUpToDate == false) {
|
||||
consumer.accept(new IllegalStateException(
|
||||
"Security index is not on the current version. Security features relying on the index will not be available until " +
|
||||
"the upgrade API is run on the security index"));
|
||||
"Index [" + indexState.concreteIndexName + "] is not on the current version. Security features relying on the index"
|
||||
+ " will not be available until the upgrade API is run on the index"));
|
||||
} else {
|
||||
andThen.run();
|
||||
}
|
||||
@ -313,17 +321,20 @@ public class SecurityIndexManager implements ClusterStateListener {
|
||||
final State indexState = this.indexState; // use a local copy so all checks execute against the same state!
|
||||
// TODO we should improve this so we don't fire off a bunch of requests to do the same thing (create or update mappings)
|
||||
if (indexState == State.UNRECOVERED_STATE) {
|
||||
consumer.accept(new ElasticsearchStatusException("Cluster state has not been recovered yet, cannot write to the security index",
|
||||
consumer.accept(new ElasticsearchStatusException(
|
||||
"Cluster state has not been recovered yet, cannot write to the [" + indexState.concreteIndexName + "] index",
|
||||
RestStatus.SERVICE_UNAVAILABLE));
|
||||
} else if (indexState.indexExists && indexState.isIndexUpToDate == false) {
|
||||
consumer.accept(new IllegalStateException(
|
||||
"Security index is not on the current version. Security features relying on the index will not be available until " +
|
||||
"the upgrade API is run on the security index"));
|
||||
"Index [" + indexState.concreteIndexName + "] is not on the current version."
|
||||
+ "Security features relying on the index will not be available until the upgrade API is run on the index"));
|
||||
} else if (indexState.indexExists == false) {
|
||||
LOGGER.info("security index does not exist. Creating [{}] with alias [{}]", INTERNAL_SECURITY_INDEX, SECURITY_INDEX_NAME);
|
||||
Tuple<String, Settings> mappingAndSettings = loadMappingAndSettingsSourceFromTemplate();
|
||||
CreateIndexRequest request = new CreateIndexRequest(INTERNAL_SECURITY_INDEX)
|
||||
.alias(new Alias(SECURITY_INDEX_NAME))
|
||||
assert indexState.concreteIndexName != null;
|
||||
logger.info("security index does not exist. Creating [{}] with alias [{}]", indexState.concreteIndexName, this.aliasName);
|
||||
final byte[] mappingSource = mappingSourceSupplier.get();
|
||||
final Tuple<String, Settings> mappingAndSettings = parseMappingAndSettingsFromTemplateBytes(mappingSource);
|
||||
CreateIndexRequest request = new CreateIndexRequest(indexState.concreteIndexName)
|
||||
.alias(new Alias(this.aliasName))
|
||||
.mapping(MapperService.SINGLE_MAPPING_NAME, mappingAndSettings.v1(), XContentType.JSON)
|
||||
.waitForActiveShards(ActiveShardCount.ALL)
|
||||
.settings(mappingAndSettings.v2());
|
||||
@ -351,11 +362,11 @@ public class SecurityIndexManager implements ClusterStateListener {
|
||||
}
|
||||
}, client.admin().indices()::create);
|
||||
} else if (indexState.mappingUpToDate == false) {
|
||||
LOGGER.info(
|
||||
"security index [{}] (alias [{}]) is not up to date. Updating mapping", indexState.concreteIndexName, SECURITY_INDEX_NAME);
|
||||
|
||||
logger.info("Index [{}] (alias [{}]) is not up to date. Updating mapping", indexState.concreteIndexName, this.aliasName);
|
||||
final byte[] mappingSource = mappingSourceSupplier.get();
|
||||
final Tuple<String, Settings> mappingAndSettings = parseMappingAndSettingsFromTemplateBytes(mappingSource);
|
||||
PutMappingRequest request = new PutMappingRequest(indexState.concreteIndexName)
|
||||
.source(loadMappingAndSettingsSourceFromTemplate().v1(), XContentType.JSON)
|
||||
.source(mappingAndSettings.v1(), XContentType.JSON)
|
||||
.type(MapperService.SINGLE_MAPPING_NAME);
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
|
||||
ActionListener.<AcknowledgedResponse>wrap(putMappingResponse -> {
|
||||
@ -370,27 +381,6 @@ public class SecurityIndexManager implements ClusterStateListener {
|
||||
}
|
||||
}
|
||||
|
||||
private Tuple<String, Settings> loadMappingAndSettingsSourceFromTemplate() {
|
||||
final byte[] template = TemplateUtils.loadTemplate("/" + SECURITY_TEMPLATE_NAME + ".json", Version.CURRENT.toString(),
|
||||
SecurityIndexManager.TEMPLATE_VERSION_PATTERN).getBytes(StandardCharsets.UTF_8);
|
||||
final PutIndexTemplateRequest request = new PutIndexTemplateRequest(SECURITY_TEMPLATE_NAME).source(template, XContentType.JSON);
|
||||
|
||||
final String mappingSource = request.mappings().get(MapperService.SINGLE_MAPPING_NAME);
|
||||
try (XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
|
||||
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, mappingSource)) {
|
||||
// remove the type wrapping to get the mapping
|
||||
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); // {
|
||||
ensureFieldName(parser, parser.nextToken(), MapperService.SINGLE_MAPPING_NAME); // _doc
|
||||
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); // {
|
||||
|
||||
XContentBuilder builder = JsonXContent.contentBuilder();
|
||||
builder.generator().copyCurrentStructure(parser);
|
||||
return new Tuple<>(Strings.toString(builder), request.settings());
|
||||
} catch (IOException e) {
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if the state moves from an unhealthy ("RED") index state to a healthy ("non-RED") state.
|
||||
*/
|
||||
@ -406,6 +396,29 @@ public class SecurityIndexManager implements ClusterStateListener {
|
||||
return previousState.indexStatus != null && currentState.indexStatus == null;
|
||||
}
|
||||
|
||||
private static byte[] readSecurityTemplateAsBytes() {
|
||||
return TemplateUtils.loadTemplate("/" + SECURITY_TEMPLATE_NAME + ".json", Version.CURRENT.toString(),
|
||||
SecurityIndexManager.TEMPLATE_VERSION_PATTERN).getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
private static Tuple<String, Settings> parseMappingAndSettingsFromTemplateBytes(byte[] template) {
|
||||
final PutIndexTemplateRequest request = new PutIndexTemplateRequest("name_is_not_important").source(template, XContentType.JSON);
|
||||
final String mappingSource = request.mappings().get(MapperService.SINGLE_MAPPING_NAME);
|
||||
try (XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
|
||||
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, mappingSource)) {
|
||||
// remove the type wrapping to get the mapping
|
||||
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); // {
|
||||
ensureFieldName(parser, parser.nextToken(), MapperService.SINGLE_MAPPING_NAME); // _doc
|
||||
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); // {
|
||||
|
||||
XContentBuilder builder = JsonXContent.contentBuilder();
|
||||
builder.generator().copyCurrentStructure(parser);
|
||||
return new Tuple<>(Strings.toString(builder), request.settings());
|
||||
} catch (IOException e) {
|
||||
throw ExceptionsHelper.convertToRuntime(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* State of the security index.
|
||||
*/
|
||||
|
@ -187,8 +187,7 @@ public class NativeRolesStoreTests extends ESTestCase {
|
||||
final ClusterService clusterService = mock(ClusterService.class);
|
||||
final XPackLicenseState licenseState = mock(XPackLicenseState.class);
|
||||
final AtomicBoolean methodCalled = new AtomicBoolean(false);
|
||||
final SecurityIndexManager securityIndex =
|
||||
new SecurityIndexManager(client, SecurityIndexManager.SECURITY_INDEX_NAME, clusterService);
|
||||
final SecurityIndexManager securityIndex = SecurityIndexManager.buildSecurityIndexManager(client, clusterService);
|
||||
final NativeRolesStore rolesStore = new NativeRolesStore(Settings.EMPTY, client, licenseState, securityIndex) {
|
||||
@Override
|
||||
void innerPutRole(final PutRoleRequest request, final RoleDescriptor role, final ActionListener<Boolean> listener) {
|
||||
|
@ -96,7 +96,7 @@ public class SecurityIndexManagerTests extends ESTestCase {
|
||||
actions.put(action, map);
|
||||
}
|
||||
};
|
||||
manager = new SecurityIndexManager(client, INDEX_NAME, clusterService);
|
||||
manager = SecurityIndexManager.buildSecurityIndexManager(client, clusterService);
|
||||
}
|
||||
|
||||
public void testIndexWithUpToDateMappingAndTemplate() throws IOException {
|
||||
|
Loading…
x
Reference in New Issue
Block a user