Support multiple indices in SecurityLifecycleService (elastic/x-pack-elasticsearch#961)

Moves the direct management of the security index from SecurityLifecycleService to IndexLifecycleManager, so that the SecurityLifecycleService can take responsibility for several indices.

Multiple security indices are required as we move away from storing multiple types in a single index.

Original commit: elastic/x-pack-elasticsearch@fde3a42b4d
This commit is contained in:
Tim Vernum 2017-04-06 21:37:33 +10:00 committed by GitHub
parent 7efc9e1270
commit 573b421446
9 changed files with 635 additions and 444 deletions

View File

@ -390,7 +390,6 @@
<suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]security[/\\]Security.java" checks="LineLength" />
<suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]security[/\\]SecurityContext.java" checks="LineLength" />
<suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]security[/\\]SecurityFeatureSet.java" checks="LineLength" />
<suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]security[/\\]SecurityLifecycleService.java" checks="LineLength" />
<suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]security[/\\]action[/\\]SecurityActionMapper.java" checks="LineLength" />
<suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]security[/\\]action[/\\]filter[/\\]SecurityActionFilter.java" checks="LineLength" />
<suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]security[/\\]action[/\\]interceptor[/\\]BulkShardRequestInterceptor.java" checks="LineLength" />

View File

@ -286,23 +286,28 @@ public class Security implements ActionPlugin, IngestPlugin, NetworkPlugin {
}
}
final AuditTrailService auditTrailService =
new AuditTrailService(settings, auditTrails.stream().collect(Collectors.toList()), licenseState);
new AuditTrailService(settings,
auditTrails.stream().collect(Collectors.toList()), licenseState);
components.add(auditTrailService);
SecurityLifecycleService securityLifecycleService =
new SecurityLifecycleService(settings, clusterService, threadPool, client, licenseState, indexAuditTrail);
new SecurityLifecycleService(settings, clusterService, threadPool, client, licenseState,
indexAuditTrail);
// realms construction
final NativeUsersStore nativeUsersStore = new NativeUsersStore(settings, client, securityLifecycleService);
final NativeUsersStore nativeUsersStore = new NativeUsersStore(settings, client,
securityLifecycleService);
final AnonymousUser anonymousUser = new AnonymousUser(settings);
final ReservedRealm reservedRealm = new ReservedRealm(env, settings, nativeUsersStore, anonymousUser, securityLifecycleService);
final ReservedRealm reservedRealm = new ReservedRealm(env, settings, nativeUsersStore,
anonymousUser, securityLifecycleService);
Map<String, Realm.Factory> realmFactories = new HashMap<>();
realmFactories.putAll(InternalRealms.getFactories(threadPool, resourceWatcherService, sslService, nativeUsersStore));
for (XPackExtension extension : extensions) {
Map<String, Realm.Factory> newRealms = extension.getRealms(resourceWatcherService);
for (Map.Entry<String, Realm.Factory> entry : newRealms.entrySet()) {
if (realmFactories.put(entry.getKey(), entry.getValue()) != null) {
throw new IllegalArgumentException("Realm type [" + entry.getKey() + "] is already registered");
throw new IllegalArgumentException("Realm type [" + entry.getKey() +
"] is already registered");
}
}
}
@ -663,10 +668,18 @@ public class Security implements ActionPlugin, IngestPlugin, NetworkPlugin {
}
final boolean indexAuditingEnabled = Security.indexAuditLoggingEnabled(settings);
final String auditIndex = indexAuditingEnabled ? "," + IndexAuditTrail.INDEX_NAME_PREFIX + "*" : "";
String errorMessage = LoggerMessageFormat.format("the [action.auto_create_index] setting value [{}] is too" +
" restrictive. disable [action.auto_create_index] or set it to " +
"[{}{}]", (Object) value, SecurityLifecycleService.SECURITY_INDEX_NAME, auditIndex);
final String auditIndex;
if (indexAuditingEnabled) {
auditIndex = "," + IndexAuditTrail.INDEX_NAME_PREFIX + "*";
} else {
auditIndex = "";
}
String securityIndices = SecurityLifecycleService.indexNames().stream()
.collect(Collectors.joining(","));
String errorMessage = LoggerMessageFormat.format(
"the [action.auto_create_index] setting value [{}] is too" +
" restrictive. disable [action.auto_create_index] or set it to " +
"[{}{}]", (Object) value, securityIndices, auditIndex);
if (Booleans.isFalse(value)) {
throw new IllegalArgumentException(errorMessage);
}
@ -677,7 +690,7 @@ public class Security implements ActionPlugin, IngestPlugin, NetworkPlugin {
String[] matches = Strings.commaDelimitedListToStringArray(value);
List<String> indices = new ArrayList<>();
indices.add(SecurityLifecycleService.SECURITY_INDEX_NAME);
indices.addAll(SecurityLifecycleService.indexNames());
if (indexAuditingEnabled) {
DateTime now = new DateTime(DateTimeZone.UTC);
// just use daily rollover

View File

@ -6,105 +6,73 @@
package org.elasticsearch.xpack.security;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.security.audit.index.IndexAuditTrail;
import org.elasticsearch.xpack.security.authc.esnative.NativeRealmMigrator;
import org.elasticsearch.xpack.template.TemplateUtils;
import org.elasticsearch.xpack.security.support.IndexLifecycleManager;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;
import java.util.regex.Pattern;
/**
* This class is used to provide a lifecycle for services that is based on the cluster's state
* rather than the typical lifecycle that is used to start services as part of the node startup.
*
* This type of lifecycle is necessary for services that need to perform actions that require the cluster to be in a
* certain state; some examples are storing index templates and creating indices. These actions would most likely fail
* from within a plugin if executed in the {@link org.elasticsearch.common.component.AbstractLifecycleComponent#doStart()}
* method. However, if the startup of these services waits for the cluster to form and recover indices then it will be
* successful. This lifecycle service allows for this to happen by listening for {@link ClusterChangedEvent} and checking
* if the services can start. Additionally, the service also provides hooks for stop and close functionality.
* This type of lifecycle is necessary for services that need to perform actions that require the
* cluster to be in a certain state; some examples are storing index templates and creating indices.
* These actions would most likely fail from within a plugin if executed in the
* {@link org.elasticsearch.common.component.AbstractLifecycleComponent#doStart()} method.
* However, if the startup of these services waits for the cluster to form and recover indices then
* it will be successful. This lifecycle service allows for this to happen by listening for
* {@link ClusterChangedEvent} and checking if the services can start. Additionally, the service
* also provides hooks for stop and close functionality.
*/
public class SecurityLifecycleService extends AbstractComponent implements ClusterStateListener {
public static final String SECURITY_INDEX_NAME = ".security";
public static final String SECURITY_TEMPLATE_NAME = "security-index-template";
private static final String SECURITY_VERSION_STRING = "security-version";
private static final Version MIN_READ_VERSION = Version.V_5_0_0;
static final String SECURITY_INDEX_TEMPLATE_VERSION_PATTERN = Pattern.quote("${security.template.version}");
private final Settings settings;
private final ThreadPool threadPool;
private final InternalClient client;
private final IndexAuditTrail indexAuditTrail;
private final NativeRealmMigrator nativeRealmMigrator;
final AtomicBoolean templateCreationPending = new AtomicBoolean(false);
final AtomicBoolean updateMappingPending = new AtomicBoolean(false);
final AtomicReference<UpgradeState> upgradeDataState = new AtomicReference<>(UpgradeState.NOT_STARTED);
private volatile boolean securityIndexExists;
private volatile boolean securityIndexAvailable;
private volatile boolean canWriteToSecurityIndex;
private volatile Version mappingVersion;
enum UpgradeState {
NOT_STARTED, IN_PROGRESS, COMPLETE, FAILED
}
private final IndexLifecycleManager securityIndex;
public SecurityLifecycleService(Settings settings, ClusterService clusterService, ThreadPool threadPool, InternalClient client,
XPackLicenseState licenseState, @Nullable IndexAuditTrail indexAuditTrail) {
this(settings, clusterService, threadPool, client, new NativeRealmMigrator(settings, licenseState, client), indexAuditTrail);
public SecurityLifecycleService(Settings settings, ClusterService clusterService,
ThreadPool threadPool, InternalClient client,
XPackLicenseState licenseState,
@Nullable IndexAuditTrail indexAuditTrail) {
this(settings, clusterService, threadPool, client,
new NativeRealmMigrator(settings, licenseState, client), indexAuditTrail);
}
// package private for testing
SecurityLifecycleService(Settings settings, ClusterService clusterService, ThreadPool threadPool, InternalClient client,
NativeRealmMigrator migrator, @Nullable IndexAuditTrail indexAuditTrail) {
SecurityLifecycleService(Settings settings, ClusterService clusterService,
ThreadPool threadPool, InternalClient client,
NativeRealmMigrator migrator,
@Nullable IndexAuditTrail indexAuditTrail) {
super(settings);
this.settings = settings;
this.threadPool = threadPool;
this.client = client;
this.indexAuditTrail = indexAuditTrail;
this.nativeRealmMigrator = migrator;
this.securityIndex = new IndexLifecycleManager(settings, client,
SECURITY_INDEX_NAME, SECURITY_TEMPLATE_NAME, migrator);
clusterService.addListener(this);
clusterService.addLifecycleListener(new LifecycleListener() {
@Override
@ -124,21 +92,7 @@ public class SecurityLifecycleService extends AbstractComponent implements Clust
return;
}
securityIndexExists = event.state().metaData().indices().get(SECURITY_INDEX_NAME) != null;
securityIndexAvailable = securityIndexAvailable(state, logger);
final boolean securityTemplateUpToDate = securityTemplateExistsAndIsUpToDate(state, logger);
final boolean securityMappingUpToDate = securityIndexMappingUpToDate(state, logger);
canWriteToSecurityIndex = securityTemplateUpToDate && securityMappingUpToDate;
mappingVersion = oldestSecurityIndexMappingVersion(event.state(), logger);
if (event.localNodeMaster()) {
if (securityTemplateUpToDate == false) {
updateSecurityTemplate();
}
if (securityIndexAvailable && securityMappingUpToDate == false) {
upgradeSecurityData(state, this::updateSecurityMapping);
}
}
securityIndex.clusterChanged(event);
final boolean master = event.localNodeMaster();
try {
@ -165,282 +119,31 @@ public class SecurityLifecycleService extends AbstractComponent implements Clust
}
}
protected IndexLifecycleManager securityIndex() {
return securityIndex;
}
public boolean securityIndexExists() {
return securityIndexExists;
return securityIndex.indexExists();
}
public boolean securityIndexAvailable() {
return securityIndexAvailable;
return securityIndex.isAvailable();
}
public boolean canWriteToSecurityIndex() {
return canWriteToSecurityIndex;
}
private boolean securityIndexAvailable(ClusterState state, Logger logger) {
final IndexRoutingTable routingTable = getSecurityIndexRoutingTable(state);
if (routingTable != null && routingTable.allPrimaryShardsActive()) {
return true;
}
logger.debug("Security index is not yet active");
return false;
return securityIndex.isWritable();
}
/**
* Returns the routing-table for the security index, or <code>null</code> if the security index does not exist.
*/
public static IndexRoutingTable getSecurityIndexRoutingTable(ClusterState clusterState) {
IndexMetaData metaData = clusterState.metaData().index(SECURITY_INDEX_NAME);
if (metaData == null) {
return null;
} else {
return clusterState.routingTable().index(SECURITY_INDEX_NAME);
}
}
public static boolean securityIndexMappingAndTemplateUpToDate(ClusterState clusterState, Logger logger) {
if (securityTemplateExistsAndIsUpToDate(clusterState, logger) == false) {
logger.debug("security template [{}] does not exist or is not up to date, so security module is not ready for use",
SECURITY_TEMPLATE_NAME);
return false;
}
if (securityIndexMappingUpToDate(clusterState, logger) == false) {
logger.debug("mapping for the security index is not up to date, so security module is not ready for use");
return false;
}
return true;
}
public static boolean securityIndexMappingAndTemplateSufficientToRead(ClusterState clusterState, Logger logger) {
if (securityTemplateExistsAndVersionMatches(clusterState, logger, MIN_READ_VERSION::onOrBefore) == false) {
logger.debug("security template [{}] does not exist or is not up to date, so security module is not ready for use",
SECURITY_TEMPLATE_NAME);
return false;
}
if (securityIndexMappingVersionMatches(clusterState, logger, MIN_READ_VERSION::onOrBefore) == false) {
logger.debug("mapping for the security index is not up to date, so security module is not ready for use");
return false;
}
return true;
}
/**
* Test whether the effective (active) version of the security mapping meets the <code>requiredVersion</code>.
* Test whether the effective (active) version of the security mapping meets the
* <code>requiredVersion</code>.
*
* @return <code>true</code> if the effective version passes the predicate, or the security mapping does not exist (<code>null</code>
* version). Otherwise, <code>false</code>.
* @return <code>true</code> if the effective version passes the predicate, or the security
* mapping does not exist (<code>null</code> version). Otherwise, <code>false</code>.
*/
public boolean checkMappingVersion(Predicate<Version> requiredVersion) {
return this.mappingVersion == null || requiredVersion.test(this.mappingVersion);
}
static boolean securityIndexMappingUpToDate(ClusterState clusterState, Logger logger) {
return securityIndexMappingVersionMatches(clusterState, logger, Version.CURRENT::equals);
}
static boolean securityIndexMappingVersionMatches(ClusterState clusterState, Logger logger, Predicate<Version> predicate) {
return securityIndexMappingVersions(clusterState, logger).stream().allMatch(predicate);
}
private static Set<Version> securityIndexMappingVersions(ClusterState clusterState, Logger logger) {
Set<Version> versions = new HashSet<>();
IndexMetaData indexMetaData = clusterState.metaData().getIndices().get(SECURITY_INDEX_NAME);
if (indexMetaData != null) {
for (Object object : indexMetaData.getMappings().values().toArray()) {
MappingMetaData mappingMetaData = (MappingMetaData) object;
if (mappingMetaData.type().equals(MapperService.DEFAULT_MAPPING)) {
continue;
}
versions.add(readMappingVersion(mappingMetaData, logger));
}
}
return versions;
}
private static Version readMappingVersion(MappingMetaData mappingMetaData, Logger logger) {
try {
Map<String, Object> meta = (Map<String, Object>) mappingMetaData.sourceAsMap().get("_meta");
if (meta == null) {
// something pre-5.0, but we don't know what. Use 2.3.0 as a placeholder for "old"
return Version.V_2_3_0;
}
return Version.fromString((String) meta.get(SECURITY_VERSION_STRING));
} catch (IOException e) {
logger.error("Cannot parse the mapping for security index.", e);
throw new ElasticsearchException("Cannot parse the mapping for security index.", e);
}
}
static boolean securityTemplateExistsAndIsUpToDate(ClusterState state, Logger logger) {
return securityTemplateExistsAndVersionMatches(state, logger, Version.CURRENT::equals);
}
static boolean securityTemplateExistsAndVersionMatches(ClusterState state, Logger logger, Predicate<Version> predicate) {
IndexTemplateMetaData templateMeta = state.metaData().templates().get(SECURITY_TEMPLATE_NAME);
if (templateMeta == null) {
return false;
}
ImmutableOpenMap<String, CompressedXContent> mappings = templateMeta.getMappings();
// check all mappings contain correct version in _meta
// we have to parse the source here which is annoying
for (Object typeMapping : mappings.values().toArray()) {
CompressedXContent typeMappingXContent = (CompressedXContent) typeMapping;
try {
Map<String, Object> typeMappingMap =
XContentHelper.convertToMap(new BytesArray(typeMappingXContent.uncompressed()), false, XContentType.JSON).v2();
// should always contain one entry with key = typename
assert (typeMappingMap.size() == 1);
String key = typeMappingMap.keySet().iterator().next();
// get the actual mapping entries
@SuppressWarnings("unchecked")
Map<String, Object> mappingMap = (Map<String, Object>) typeMappingMap.get(key);
if (containsCorrectVersion(mappingMap, predicate) == false) {
return false;
}
} catch (ElasticsearchParseException e) {
logger.error("Cannot parse the template for security index.", e);
throw new IllegalStateException("Cannot parse the template for security index.", e);
}
}
return true;
}
private static boolean containsCorrectVersion(Map<String, Object> typeMappingMap, Predicate<Version> predicate) {
@SuppressWarnings("unchecked")
Map<String, Object> meta = (Map<String, Object>) typeMappingMap.get("_meta");
if (meta == null) {
// pre 5.0, cannot be up to date
return false;
}
return predicate.test(Version.fromString((String) meta.get(SECURITY_VERSION_STRING)));
}
public static Version oldestSecurityIndexMappingVersion(ClusterState clusterState, Logger logger) {
final Set<Version> versions = securityIndexMappingVersions(clusterState, logger);
return versions.stream().min(Version::compareTo).orElse(null);
}
private void updateSecurityTemplate() {
// only put the template if this is not already in progress
if (templateCreationPending.compareAndSet(false, true)) {
putSecurityTemplate();
}
}
private boolean upgradeSecurityData(ClusterState state, Runnable andThen) {
// only update the data if this is not already in progress
if (upgradeDataState.compareAndSet(UpgradeState.NOT_STARTED, UpgradeState.IN_PROGRESS) ) {
final Version previousVersion = oldestSecurityIndexMappingVersion(state, logger);
nativeRealmMigrator.performUpgrade(previousVersion, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean upgraded) {
upgradeDataState.set(UpgradeState.COMPLETE);
andThen.run();
}
@Override
public void onFailure(Exception e) {
upgradeDataState.set(UpgradeState.FAILED);
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to upgrade security data from version [{}] ",
previousVersion), e);
}
});
return true;
} else {
if (upgradeDataState.get() == UpgradeState.COMPLETE) {
andThen.run();
}
return false;
}
}
private void updateSecurityMapping() {
// only update the mapping if this is not already in progress
if (updateMappingPending.compareAndSet(false, true)) {
putSecurityMappings();
}
}
private void putSecurityMappings() {
String template = TemplateUtils.loadTemplate("/" + SECURITY_TEMPLATE_NAME + ".json", Version.CURRENT.toString(),
SECURITY_INDEX_TEMPLATE_VERSION_PATTERN);
Map<String, Object> typeMappingMap;
try {
typeMappingMap = XContentHelper.convertToMap(JsonXContent.jsonXContent, template, false);
} catch (ElasticsearchParseException e) {
updateMappingPending.set(false);
logger.error("failed to parse the security index template", e);
throw new ElasticsearchException("failed to parse the security index template", e);
}
// here go over all types found in the template and update them
// we need to wait for all types
final Map<String, PutMappingResponse> updateResults = ConcurrentCollections.newConcurrentMap();
@SuppressWarnings("unchecked")
Map<String, Object> typeMappings = (Map<String, Object>) typeMappingMap.get("mappings");
int expectedResults = typeMappings.size();
for (String type : typeMappings.keySet()) {
// get the mappings from the template definition
@SuppressWarnings("unchecked")
Map<String, Object> typeMapping = (Map<String, Object>) typeMappings.get(type);
// update the mapping
putSecurityMapping(updateResults, expectedResults, type, typeMapping);
}
}
private void putSecurityMapping(final Map<String, PutMappingResponse> updateResults, int expectedResults,
final String type, Map<String, Object> typeMapping) {
logger.debug("updating mapping of the security index for type [{}]", type);
PutMappingRequest putMappingRequest = client.admin().indices()
.preparePutMapping(SECURITY_INDEX_NAME).setSource(typeMapping).setType(type).request();
client.admin().indices().putMapping(putMappingRequest, new ActionListener<PutMappingResponse>() {
@Override
public void onResponse(PutMappingResponse putMappingResponse) {
if (putMappingResponse.isAcknowledged() == false) {
updateMappingPending.set(false);
throw new ElasticsearchException("update mapping for [{}] security index " +
"was not acknowledged", type);
} else {
updateResults.put(type, putMappingResponse);
if (updateResults.size() == expectedResults) {
updateMappingPending.set(false);
}
}
}
@Override
public void onFailure(Exception e) {
updateMappingPending.set(false);
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to update mapping for [{}] on security index", type), e);
}
});
}
private void putSecurityTemplate() {
logger.debug("putting the security index template");
String template = TemplateUtils.loadTemplate("/" + SECURITY_TEMPLATE_NAME + ".json", Version.CURRENT.toString(),
SECURITY_INDEX_TEMPLATE_VERSION_PATTERN);
PutIndexTemplateRequest putTemplateRequest = client.admin().indices()
.preparePutTemplate(SECURITY_TEMPLATE_NAME)
.setSource(new BytesArray(template.getBytes(StandardCharsets.UTF_8)), XContentType.JSON)
.request();
client.admin().indices().putTemplate(putTemplateRequest, new ActionListener<PutIndexTemplateResponse>() {
@Override
public void onResponse(PutIndexTemplateResponse putIndexTemplateResponse) {
templateCreationPending.set(false);
if (putIndexTemplateResponse.isAcknowledged() == false) {
throw new ElasticsearchException("put template for security index was not acknowledged");
}
}
@Override
public void onFailure(Exception e) {
templateCreationPending.set(false);
logger.warn("failed to put security index template", e);
}
});
public boolean checkSecurityMappingVersion(Predicate<Version> requiredVersion) {
return securityIndex.checkMappingVersion(requiredVersion);
}
public void stop() {
@ -452,4 +155,26 @@ public class SecurityLifecycleService extends AbstractComponent implements Clust
}
}
}
public static boolean securityIndexMappingAndTemplateSufficientToRead(ClusterState clusterState,
Logger logger) {
return checkTemplateAndMappingVersions(clusterState, logger, MIN_READ_VERSION::onOrBefore);
}
public static boolean securityIndexMappingAndTemplateUpToDate(ClusterState clusterState,
Logger logger) {
return checkTemplateAndMappingVersions(clusterState, logger, Version.CURRENT::equals);
}
private static boolean checkTemplateAndMappingVersions(ClusterState clusterState, Logger logger,
Predicate<Version> versionPredicate) {
return IndexLifecycleManager.checkTemplateExistsAndVersionMatches(SECURITY_TEMPLATE_NAME,
clusterState, logger, versionPredicate) &&
IndexLifecycleManager.checkIndexMappingVersionMatches(SECURITY_INDEX_NAME,
clusterState, logger, versionPredicate);
}
public static List<String> indexNames() {
return Collections.unmodifiableList(Arrays.asList(SECURITY_INDEX_NAME));
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.xpack.security.SecurityLifecycleService;
import org.elasticsearch.xpack.security.authc.support.Hasher;
import org.elasticsearch.xpack.security.authc.support.SecuredString;
import org.elasticsearch.xpack.security.client.SecurityClient;
import org.elasticsearch.xpack.security.support.IndexLifecycleManager;
import org.elasticsearch.xpack.security.user.BeatsSystemUser;
import org.elasticsearch.xpack.security.user.BuiltinUserInfo;
import org.elasticsearch.xpack.security.user.LogstashSystemUser;
@ -46,7 +47,7 @@ import java.util.ArrayList;
* When upgrading an Elasticsearch/X-Pack installation from a previous version, this class is responsible for ensuring that user/role
* data stored in the security index is converted to a format that is appropriate for the newly installed version.
*/
public class NativeRealmMigrator {
public class NativeRealmMigrator implements IndexLifecycleManager.IndexDataMigrator {
private final XPackLicenseState licenseState;
private final Logger logger;
@ -72,8 +73,9 @@ public class NativeRealmMigrator {
* {@link ActionListener#onResponse(Object) onResponse(false)} if no upgrade was required.
* @see SecurityLifecycleService#securityIndexMappingAndTemplateSufficientToRead(ClusterState, Logger)
* @see SecurityLifecycleService#canWriteToSecurityIndex
* @see SecurityLifecycleService#mappingVersion
* @see IndexLifecycleManager#mappingVersion
*/
@Override
public void performUpgrade(@Nullable Version previousVersion, ActionListener<Boolean> listener) {
try {
List<BiConsumer<Version, ActionListener<Void>>> tasks = collectUpgradeTasks(previousVersion);

View File

@ -223,7 +223,7 @@ public class ReservedRealm extends CachingUsernamePasswordRealm {
private boolean userIsDefinedForCurrentSecurityMapping(String username) {
final Version requiredVersion = getDefinedVersion(username);
return securityLifecycleService.checkMappingVersion(requiredVersion::onOrBefore);
return securityLifecycleService.checkSecurityMappingVersion(requiredVersion::onOrBefore);
}
private Version getDefinedVersion(String username) {

View File

@ -0,0 +1,424 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.security.support;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.template.TemplateUtils;
import static org.elasticsearch.common.xcontent.XContentHelper.convertToMap;
/**
* Manages the lifecycle of a single index, its template, mapping and and data upgrades/migrations.
*/
public class IndexLifecycleManager extends AbstractComponent {
private static final String SECURITY_VERSION_STRING = "security-version";
public static final String TEMPLATE_VERSION_PATTERN =
Pattern.quote("${security.template.version}");
private final String indexName;
private final String templateName;
private final InternalClient client;
private final IndexDataMigrator migrator;
private final AtomicBoolean templateCreationPending = new AtomicBoolean(false);
private final AtomicBoolean updateMappingPending = new AtomicBoolean(false);
final AtomicReference<UpgradeState> migrateDataState =
new AtomicReference<>(UpgradeState.NOT_STARTED);
private volatile boolean templateIsUpToDate;
private volatile boolean indexExists;
private volatile boolean indexAvailable;
private volatile boolean canWriteToIndex;
private volatile boolean mappingIsUpToDate;
private volatile Version mappingVersion;
public enum UpgradeState {
NOT_STARTED, IN_PROGRESS, COMPLETE, FAILED
}
public interface IndexDataMigrator {
void performUpgrade(@Nullable Version previousVersion, ActionListener<Boolean> listener);
}
public static final IndexDataMigrator NULL_MIGRATOR =
(version, listener) -> listener.onResponse(false);
public IndexLifecycleManager(Settings settings, InternalClient client, String indexName,
String templateName, IndexDataMigrator migrator) {
super(settings);
this.client = client;
this.indexName = indexName;
this.templateName = templateName;
this.migrator = migrator;
}
public boolean isTemplateUpToDate() {
return templateIsUpToDate;
}
public boolean isTemplateCreationPending() {
return templateCreationPending.get();
}
public boolean isMappingUpToDate() {
return mappingIsUpToDate;
}
public Version getMappingVersion() {
return mappingVersion;
}
public boolean checkMappingVersion(Predicate<Version> requiredVersion) {
return this.mappingVersion == null || requiredVersion.test(this.mappingVersion);
}
public boolean isMappingUpdatePending() {
return this.updateMappingPending.get();
}
public boolean indexExists() {
return indexExists;
}
public boolean isAvailable() {
return indexAvailable;
}
public boolean isWritable() {
return canWriteToIndex;
}
public UpgradeState getMigrationState() {
return this.migrateDataState.get();
}
public void clusterChanged(ClusterChangedEvent event) {
final ClusterState state = event.state();
this.indexExists = event.state().metaData().indices().get(indexName) != null;
this.indexAvailable = checkIndexAvailable(state);
this.templateIsUpToDate = checkTemplateExistsAndIsUpToDate(state);
this.mappingIsUpToDate = checkIndexMappingUpToDate(state);
this.canWriteToIndex = templateIsUpToDate && mappingIsUpToDate;
this.mappingVersion = oldestIndexMappingVersion(event.state());
if (event.localNodeMaster()) {
if (templateIsUpToDate == false) {
updateTemplate();
}
if (indexAvailable && mappingIsUpToDate == false) {
migrateData(state, this::updateMapping);
}
}
}
private boolean checkIndexAvailable(ClusterState state) {
final IndexRoutingTable routingTable = getIndexRoutingTable(state);
if (routingTable != null && routingTable.allPrimaryShardsActive()) {
return true;
}
logger.debug("Security index is not yet active");
return false;
}
/**
* Returns the routing-table for this index, or <code>null</code> if the index does not exist.
*/
private IndexRoutingTable getIndexRoutingTable(ClusterState clusterState) {
IndexMetaData metaData = clusterState.metaData().index(indexName);
if (metaData == null) {
return null;
} else {
return clusterState.routingTable().index(indexName);
}
}
private boolean checkTemplateExistsAndIsUpToDate(ClusterState state) {
return checkTemplateExistsAndVersionMatches(templateName, state, logger,
Version.CURRENT::equals);
}
public static boolean checkTemplateExistsAndVersionMatches(
String templateName, ClusterState state, Logger logger, Predicate<Version> predicate) {
IndexTemplateMetaData templateMeta = state.metaData().templates().get(templateName);
if (templateMeta == null) {
return false;
}
ImmutableOpenMap<String, CompressedXContent> mappings = templateMeta.getMappings();
// check all mappings contain correct version in _meta
// we have to parse the source here which is annoying
for (Object typeMapping : mappings.values().toArray()) {
CompressedXContent typeMappingXContent = (CompressedXContent) typeMapping;
try {
Map<String, Object> typeMappingMap = convertToMap(
new BytesArray(typeMappingXContent.uncompressed()), false,
XContentType.JSON).v2();
// should always contain one entry with key = typename
assert (typeMappingMap.size() == 1);
String key = typeMappingMap.keySet().iterator().next();
// get the actual mapping entries
@SuppressWarnings("unchecked")
Map<String, Object> mappingMap = (Map<String, Object>) typeMappingMap.get(key);
if (containsCorrectVersion(mappingMap, predicate) == false) {
return false;
}
} catch (ElasticsearchParseException e) {
logger.error(new ParameterizedMessage(
"Cannot parse the template [{}]", templateName), e);
throw new IllegalStateException("Cannot parse the template " + templateName, e);
}
}
return true;
}
private static boolean containsCorrectVersion(Map<String, Object> typeMappingMap,
Predicate<Version> predicate) {
@SuppressWarnings("unchecked")
Map<String, Object> meta = (Map<String, Object>) typeMappingMap.get("_meta");
if (meta == null) {
// pre 5.0, cannot be up to date
return false;
}
return predicate.test(Version.fromString((String) meta.get(SECURITY_VERSION_STRING)));
}
private boolean checkIndexMappingUpToDate(ClusterState clusterState) {
return checkIndexMappingVersionMatches(clusterState, Version.CURRENT::equals);
}
private boolean checkIndexMappingVersionMatches(ClusterState clusterState,
Predicate<Version> predicate) {
return checkIndexMappingVersionMatches(indexName, clusterState, logger, predicate);
}
public static boolean checkIndexMappingVersionMatches(String indexName,
ClusterState clusterState, Logger logger,
Predicate<Version> predicate) {
return loadIndexMappingVersions(indexName, clusterState, logger)
.stream().allMatch(predicate);
}
private Version oldestIndexMappingVersion(ClusterState clusterState) {
final Set<Version> versions = loadIndexMappingVersions(indexName, clusterState, logger);
return versions.stream().min(Version::compareTo).orElse(null);
}
private static Set<Version> loadIndexMappingVersions(String indexName,
ClusterState clusterState, Logger logger) {
Set<Version> versions = new HashSet<>();
IndexMetaData indexMetaData = clusterState.metaData().getIndices().get(indexName);
if (indexMetaData != null) {
for (Object object : indexMetaData.getMappings().values().toArray()) {
MappingMetaData mappingMetaData = (MappingMetaData) object;
if (mappingMetaData.type().equals(MapperService.DEFAULT_MAPPING)) {
continue;
}
versions.add(readMappingVersion(indexName, mappingMetaData, logger));
}
}
return versions;
}
private static Version readMappingVersion(String indexName, MappingMetaData mappingMetaData,
Logger logger) {
try {
Map<String, Object> meta =
(Map<String, Object>) mappingMetaData.sourceAsMap().get("_meta");
if (meta == null) {
// something pre-5.0, but we don't know what. Use 2.3.0 as a placeholder for "old"
return Version.V_2_3_0;
}
return Version.fromString((String) meta.get(SECURITY_VERSION_STRING));
} catch (IOException e) {
logger.error(new ParameterizedMessage(
"Cannot parse the mapping for index [{}]", indexName), e);
throw new ElasticsearchException(
"Cannot parse the mapping for index [{}]", e, indexName);
}
}
private void updateTemplate() {
// only put the template if this is not already in progress
if (templateCreationPending.compareAndSet(false, true)) {
putTemplate();
}
}
private boolean migrateData(ClusterState state, Runnable andThen) {
// only update the data if this is not already in progress
if (migrateDataState.compareAndSet(UpgradeState.NOT_STARTED, UpgradeState.IN_PROGRESS)) {
final Version previousVersion = oldestIndexMappingVersion(state);
migrator.performUpgrade(previousVersion, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean upgraded) {
migrateDataState.set(UpgradeState.COMPLETE);
andThen.run();
}
@Override
public void onFailure(Exception e) {
migrateDataState.set(UpgradeState.FAILED);
logger.error((Supplier<?>) () -> new ParameterizedMessage(
"failed to upgrade security [{}] data from version [{}] ",
indexName, previousVersion),
e);
}
});
return true;
} else {
if (migrateDataState.get() == UpgradeState.COMPLETE) {
andThen.run();
}
return false;
}
}
private void updateMapping() {
// only update the mapping if this is not already in progress
if (updateMappingPending.compareAndSet(false, true)) {
putMappings();
}
}
private void putMappings() {
String template = TemplateUtils.loadTemplate("/" + templateName + ".json",
Version.CURRENT.toString(), TEMPLATE_VERSION_PATTERN);
Map<String, Object> typeMappingMap;
try {
typeMappingMap = convertToMap(JsonXContent.jsonXContent, template, false);
} catch (ElasticsearchParseException e) {
updateMappingPending.set(false);
logger.error(new ParameterizedMessage(
"failed to parse index template {}", templateName), e);
throw new ElasticsearchException("failed to parse index template {}", e, templateName);
}
// here go over all types found in the template and update them
// we need to wait for all types
final Map<String, PutMappingResponse> updateResults =
ConcurrentCollections.newConcurrentMap();
@SuppressWarnings("unchecked")
Map<String, Object> typeMappings = (Map<String, Object>) typeMappingMap.get("mappings");
int expectedResults = typeMappings.size();
for (String type : typeMappings.keySet()) {
// get the mappings from the template definition
@SuppressWarnings("unchecked")
Map<String, Object> typeMapping = (Map<String, Object>) typeMappings.get(type);
// update the mapping
putMapping(updateResults, expectedResults, type, typeMapping);
}
}
private void putMapping(final Map<String, PutMappingResponse> updateResults,
int expectedResults, final String type,
Map<String, Object> typeMapping) {
logger.debug("updating mapping of the [{}] index for type [{}]", indexName, type);
PutMappingRequest putMappingRequest = client.admin().indices()
.preparePutMapping(indexName).setSource(typeMapping).setType(type).request();
client.admin().indices().putMapping(putMappingRequest,
new ActionListener<PutMappingResponse>() {
@Override
public void onResponse(PutMappingResponse putMappingResponse) {
if (putMappingResponse.isAcknowledged() == false) {
updateMappingPending.set(false);
throw new ElasticsearchException("update mapping for type [{}]" +
" in index [{}] was not acknowledged", type, indexName);
} else {
updateResults.put(type, putMappingResponse);
if (updateResults.size() == expectedResults) {
updateMappingPending.set(false);
}
}
}
@Override
public void onFailure(Exception e) {
updateMappingPending.set(false);
logger.warn((Supplier<?>) () -> new ParameterizedMessage(
"failed to update mapping for type [{}] on index [{}]",
type, indexName), e);
}
});
}
private void putTemplate() {
logger.debug("putting the template [{}]", templateName);
String template = TemplateUtils.loadTemplate("/" + templateName + ".json",
Version.CURRENT.toString(), TEMPLATE_VERSION_PATTERN);
PutIndexTemplateRequest putTemplateRequest = client.admin().indices()
.preparePutTemplate(templateName)
.setSource(
new BytesArray(template.getBytes(StandardCharsets.UTF_8)),
XContentType.JSON)
.request();
client.admin().indices().putTemplate(putTemplateRequest,
new ActionListener<PutIndexTemplateResponse>() {
@Override
public void onResponse(PutIndexTemplateResponse putIndexTemplateResponse) {
templateCreationPending.set(false);
if (putIndexTemplateResponse.isAcknowledged()) {
templateIsUpToDate = true;
} else {
throw new ElasticsearchException(
"put template [{}] was not acknowledged", templateName
);
}
}
@Override
public void onFailure(Exception e) {
templateCreationPending.set(false);
logger.warn(new ParameterizedMessage(
"failed to put template [{}]", templateName), e);
}
});
}
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
@ -38,9 +39,10 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.MockTransportClient;
import org.elasticsearch.xpack.security.SecurityLifecycleService.UpgradeState;
import org.elasticsearch.xpack.security.audit.index.IndexAuditTrail;
import org.elasticsearch.xpack.security.authc.esnative.NativeRealmMigrator;
import org.elasticsearch.xpack.security.support.IndexLifecycleManager;
import org.elasticsearch.xpack.security.support.IndexLifecycleManager.UpgradeState;
import org.elasticsearch.xpack.security.test.SecurityTestUtils;
import org.elasticsearch.xpack.template.TemplateUtils;
import org.junit.After;
@ -48,10 +50,9 @@ import org.junit.Before;
import org.mockito.Mockito;
import static org.elasticsearch.xpack.security.SecurityLifecycleService.SECURITY_INDEX_NAME;
import static org.elasticsearch.xpack.security.SecurityLifecycleService.SECURITY_INDEX_TEMPLATE_VERSION_PATTERN;
import static org.elasticsearch.xpack.security.SecurityLifecycleService.SECURITY_TEMPLATE_NAME;
import static org.elasticsearch.xpack.security.SecurityLifecycleService.securityIndexMappingVersionMatches;
import static org.elasticsearch.xpack.security.SecurityLifecycleService.securityTemplateExistsAndVersionMatches;
import static org.elasticsearch.xpack.security.SecurityLifecycleService.securityIndexMappingAndTemplateSufficientToRead;
import static org.elasticsearch.xpack.security.SecurityLifecycleService.securityIndexMappingAndTemplateUpToDate;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ -65,7 +66,7 @@ public class SecurityLifecycleServiceTests extends ESTestCase {
private ThreadPool threadPool;
private ClusterService clusterService;
private NativeRealmMigrator nativeRealmMigrator;
SecurityLifecycleService securityLifecycleService;
private SecurityLifecycleService securityLifecycleService;
private static final ClusterState EMPTY_CLUSTER_STATE =
new ClusterState.Builder(new ClusterName("test-cluster")).build();
@ -86,24 +87,25 @@ public class SecurityLifecycleServiceTests extends ESTestCase {
}
@Override
protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends
ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(
Action<Request, Response, RequestBuilder> action, Request request
, ActionListener<Response> listener) {
protected <Request extends ActionRequest,
Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>>
void doExecute(Action<Request, Response, RequestBuilder> action, Request request,
ActionListener<Response> listener) {
listeners.add(listener);
}
}
nativeRealmMigrator = mock(NativeRealmMigrator.class);
Mockito.doAnswer(invocation -> {
ActionListener<Boolean> listener = (ActionListener<Boolean>) invocation.getArguments()[1];
ActionListener<Boolean> listener = (ActionListener) invocation.getArguments()[1];
listener.onResponse(false);
return null;
}).when(nativeRealmMigrator).performUpgrade(any(Version.class), any(ActionListener.class));
client = new IClient(transportClient);
securityLifecycleService = new SecurityLifecycleService(Settings.EMPTY, clusterService, threadPool,
client, nativeRealmMigrator, mock(IndexAuditTrail.class));
securityLifecycleService = new SecurityLifecycleService(Settings.EMPTY, clusterService,
threadPool, client, nativeRealmMigrator, mock(IndexAuditTrail.class));
listeners = new CopyOnWriteArrayList<>();
}
@ -118,66 +120,79 @@ public class SecurityLifecycleServiceTests extends ESTestCase {
public void testIndexTemplateIsIdentifiedAsUpToDate() throws IOException {
String templateString = "/" + SECURITY_TEMPLATE_NAME + ".json";
ClusterState.Builder clusterStateBuilder = createClusterStateWithTemplate(templateString);
assertTrue(SecurityLifecycleService.securityTemplateExistsAndIsUpToDate(clusterStateBuilder.build(), logger));
securityLifecycleService.clusterChanged(new ClusterChangedEvent("test-event", clusterStateBuilder.build()
, EMPTY_CLUSTER_STATE));
securityLifecycleService.clusterChanged(new ClusterChangedEvent("test-event",
clusterStateBuilder.build(), EMPTY_CLUSTER_STATE));
assertThat(securityLifecycleService.securityIndex().isTemplateUpToDate(), equalTo(true));
// No upgrade actions run
assertThat(listeners.size(), equalTo(0));
}
public void testFaultyIndexTemplateIsIdentifiedAsNotUpToDate() throws IOException {
String templateString = "/wrong-version-" + SECURITY_TEMPLATE_NAME + ".json";
ClusterState.Builder clusterStateBuilder = createClusterStateWithTemplate(templateString);
assertFalse(SecurityLifecycleService.securityTemplateExistsAndIsUpToDate(clusterStateBuilder.build(), logger));
checkTemplateUpdateWorkCorrectly(clusterStateBuilder);
}
public void testIndexTemplateVersionMatching() throws Exception {
String templateString = "/" + SECURITY_TEMPLATE_NAME + ".json";
ClusterState.Builder clusterStateBuilder = createClusterStateWithTemplate(templateString);
assertTrue(securityTemplateExistsAndVersionMatches(clusterStateBuilder.build(), logger, Version.V_5_0_0::before));
assertFalse(securityTemplateExistsAndVersionMatches(clusterStateBuilder.build(), logger, Version.V_5_0_0::after));
final ClusterState clusterState = clusterStateBuilder.build();
assertTrue(IndexLifecycleManager.checkTemplateExistsAndVersionMatches(
SecurityLifecycleService.SECURITY_TEMPLATE_NAME, clusterState, logger,
Version.V_5_0_0::before));
assertFalse(IndexLifecycleManager.checkTemplateExistsAndVersionMatches(
SecurityLifecycleService.SECURITY_TEMPLATE_NAME, clusterState, logger,
Version.V_5_0_0::after));
}
private void checkTemplateUpdateWorkCorrectly(ClusterState.Builder clusterStateBuilder) throws IOException {
securityLifecycleService.clusterChanged(new ClusterChangedEvent("test-event", clusterStateBuilder.build()
, EMPTY_CLUSTER_STATE));
private void checkTemplateUpdateWorkCorrectly(ClusterState.Builder clusterStateBuilder)
throws IOException {
securityLifecycleService.clusterChanged(new ClusterChangedEvent("test-event",
clusterStateBuilder.build(), EMPTY_CLUSTER_STATE));
assertThat(securityLifecycleService.securityIndex().isTemplateUpToDate(), equalTo(false));
assertThat(listeners.size(), equalTo(1));
assertTrue(securityLifecycleService.templateCreationPending.get());
assertTrue(securityLifecycleService.securityIndex().isTemplateCreationPending());
// if we do it again this should not send an update
ActionListener listener = listeners.get(0);
listeners.clear();
securityLifecycleService.clusterChanged(new ClusterChangedEvent("test-event", clusterStateBuilder.build()
, EMPTY_CLUSTER_STATE));
securityLifecycleService.clusterChanged(new ClusterChangedEvent("test-event",
clusterStateBuilder.build(), EMPTY_CLUSTER_STATE));
assertThat(securityLifecycleService.securityIndex().isTemplateUpToDate(), equalTo(false));
assertThat(listeners.size(), equalTo(0));
assertTrue(securityLifecycleService.templateCreationPending.get());
assertTrue(securityLifecycleService.securityIndex().isTemplateCreationPending());
// if we now simulate an error...
listener.onFailure(new Exception());
assertFalse(securityLifecycleService.templateCreationPending.get());
assertThat(securityLifecycleService.securityIndex().isTemplateUpToDate(), equalTo(false));
assertFalse(securityLifecycleService.securityIndex().isTemplateCreationPending());
// ... we should be able to send a new update
securityLifecycleService.clusterChanged(new ClusterChangedEvent("test-event", clusterStateBuilder.build()
, EMPTY_CLUSTER_STATE));
securityLifecycleService.clusterChanged(new ClusterChangedEvent("test-event",
clusterStateBuilder.build(), EMPTY_CLUSTER_STATE));
assertThat(securityLifecycleService.securityIndex().isTemplateUpToDate(), equalTo(false));
assertThat(listeners.size(), equalTo(1));
assertTrue(securityLifecycleService.templateCreationPending.get());
assertTrue(securityLifecycleService.securityIndex().isTemplateCreationPending());
// now check what happens if we get back an unacknowledged response
try {
listeners.get(0).onResponse(new TestPutIndexTemplateResponse());
fail("this should have failed because request was not acknowledged");
} catch (ElasticsearchException e) {
}
assertFalse(securityLifecycleService.updateMappingPending.get());
expectThrows(ElasticsearchException.class,
() -> listeners.get(0).onResponse(new TestPutIndexTemplateResponse())
);
assertThat(securityLifecycleService.securityIndex().isTemplateUpToDate(), equalTo(false));
assertFalse(securityLifecycleService.securityIndex().isTemplateCreationPending());
// and now let's see what happens if we get back a response
listeners.clear();
securityLifecycleService.clusterChanged(new ClusterChangedEvent("test-event", clusterStateBuilder.build()
, EMPTY_CLUSTER_STATE));
assertTrue(securityLifecycleService.templateCreationPending.get());
securityLifecycleService.clusterChanged(new ClusterChangedEvent("test-event",
clusterStateBuilder.build(), EMPTY_CLUSTER_STATE));
assertThat(securityLifecycleService.securityIndex().isTemplateUpToDate(), equalTo(false));
assertTrue(securityLifecycleService.securityIndex().isTemplateCreationPending());
assertThat(listeners.size(), equalTo(1));
listeners.get(0).onResponse(new TestPutIndexTemplateResponse(true));
assertFalse(securityLifecycleService.templateCreationPending.get());
assertThat(securityLifecycleService.securityIndex().isTemplateUpToDate(), equalTo(true));
assertFalse(securityLifecycleService.securityIndex().isTemplateCreationPending());
}
public void testMissingIndexTemplateIsIdentifiedAsMissing() throws IOException {
@ -188,14 +203,12 @@ public class SecurityLifecycleServiceTests extends ESTestCase {
MetaData.Builder builder = new MetaData.Builder(clusterStateBuilder.build().getMetaData());
builder.put(indexMeta);
clusterStateBuilder.metaData(builder);
assertFalse(SecurityLifecycleService.securityTemplateExistsAndIsUpToDate(clusterStateBuilder.build(), logger));
checkTemplateUpdateWorkCorrectly(clusterStateBuilder);
}
public void testMissingVersionIndexTemplateIsIdentifiedAsNotUpToDate() throws IOException {
String templateString = "/missing-version-" + SECURITY_TEMPLATE_NAME + ".json";
ClusterState.Builder clusterStateBuilder = createClusterStateWithTemplate(templateString);
assertFalse(SecurityLifecycleService.securityTemplateExistsAndIsUpToDate(clusterStateBuilder.build(), logger));
checkTemplateUpdateWorkCorrectly(clusterStateBuilder);
}
@ -203,12 +216,15 @@ public class SecurityLifecycleServiceTests extends ESTestCase {
String templateString = "/wrong-version-" + SECURITY_TEMPLATE_NAME + ".json";
final Version wrongVersion = Version.fromString("4.0.0");
ClusterState.Builder clusterStateBuilder = createClusterStateWithMapping(templateString);
assertFalse(SecurityLifecycleService.securityIndexMappingUpToDate(clusterStateBuilder.build(), logger));
assertThat(SecurityLifecycleService.oldestSecurityIndexMappingVersion(clusterStateBuilder.build(), logger), equalTo(wrongVersion));
final ClusterState clusterState = clusterStateBuilder.build();
assertFalse(securityIndexMappingAndTemplateUpToDate(clusterState, logger));
assertFalse(securityIndexMappingAndTemplateSufficientToRead(clusterState, logger));
checkMappingUpdateWorkCorrectly(clusterStateBuilder, wrongVersion);
}
private void checkMappingUpdateWorkCorrectly(ClusterState.Builder clusterStateBuilder, Version expectedOldVersion) {
final int expectedNumberOfListeners = 3; // we have three types in the mapping
AtomicReference<Version> migratorVersionRef = new AtomicReference<>(null);
AtomicReference<ActionListener<Boolean>> migratorListenerRef = new AtomicReference<>(null);
Mockito.doAnswer(invocation -> {
@ -217,40 +233,41 @@ public class SecurityLifecycleServiceTests extends ESTestCase {
return null;
}).when(nativeRealmMigrator).performUpgrade(any(Version.class), any(ActionListener.class));
assertThat(securityLifecycleService.upgradeDataState.get(), equalTo(UpgradeState.NOT_STARTED));
final IndexLifecycleManager securityIndex = securityLifecycleService.securityIndex();
assertThat(securityIndex.getMigrationState(), equalTo(UpgradeState.NOT_STARTED));
securityLifecycleService.clusterChanged(new ClusterChangedEvent("test-event", clusterStateBuilder.build(),
EMPTY_CLUSTER_STATE));
securityLifecycleService.clusterChanged(new ClusterChangedEvent("test-event",
clusterStateBuilder.build(), EMPTY_CLUSTER_STATE));
assertThat(migratorVersionRef.get(), equalTo(expectedOldVersion));
assertThat(migratorListenerRef.get(), notNullValue());
assertThat(listeners.size(), equalTo(0)); // migrator has not responded yet
assertThat(securityLifecycleService.updateMappingPending.get(), equalTo(false));
assertThat(securityLifecycleService.upgradeDataState.get(), equalTo(UpgradeState.IN_PROGRESS));
assertThat(securityIndex.isMappingUpdatePending(), equalTo(false));
assertThat(securityIndex.getMigrationState(), equalTo(UpgradeState.IN_PROGRESS));
migratorListenerRef.get().onResponse(true);
assertThat(listeners.size(), equalTo(3)); // we have three types in the mapping
assertTrue(securityLifecycleService.updateMappingPending.get());
assertThat(securityLifecycleService.upgradeDataState.get(), equalTo(UpgradeState.COMPLETE));
assertThat(listeners.size(), equalTo(expectedNumberOfListeners));
assertThat(securityIndex.isMappingUpdatePending(), equalTo(true));
assertThat(securityIndex.getMigrationState(), equalTo(UpgradeState.COMPLETE));
// if we do it again this should not send an update
ActionListener listener = listeners.get(0);
listeners.clear();
securityLifecycleService.clusterChanged(new ClusterChangedEvent("test-event", clusterStateBuilder.build()
, EMPTY_CLUSTER_STATE));
securityLifecycleService.clusterChanged(new ClusterChangedEvent("test-event",
clusterStateBuilder.build(), EMPTY_CLUSTER_STATE));
assertThat(listeners.size(), equalTo(0));
assertTrue(securityLifecycleService.updateMappingPending.get());
assertThat(securityIndex.isMappingUpdatePending(), equalTo(true));
// if we now simulate an error...
listener.onFailure(new Exception("Testing failure handling"));
assertFalse(securityLifecycleService.updateMappingPending.get());
assertThat(securityIndex.isMappingUpdatePending(), equalTo(false));
// ... we should be able to send a new update
securityLifecycleService.clusterChanged(new ClusterChangedEvent("test-event", clusterStateBuilder.build()
, EMPTY_CLUSTER_STATE));
assertThat(listeners.size(), equalTo(3));
assertTrue(securityLifecycleService.updateMappingPending.get());
securityLifecycleService.clusterChanged(new ClusterChangedEvent("test-event",
clusterStateBuilder.build(), EMPTY_CLUSTER_STATE));
assertThat(listeners.size(), equalTo(expectedNumberOfListeners));
assertThat(securityIndex.isMappingUpdatePending(), equalTo(true));
// now check what happens if we get back an unacknowledged response
try {
@ -258,20 +275,20 @@ public class SecurityLifecycleServiceTests extends ESTestCase {
fail("this hould have failed because request was not acknowledged");
} catch (ElasticsearchException e) {
}
assertFalse(securityLifecycleService.updateMappingPending.get());
assertThat(securityIndex.isMappingUpdatePending(), equalTo(false));
// and now check what happens if we get back an acknowledged response
listeners.clear();
securityLifecycleService.clusterChanged(new ClusterChangedEvent("test-event", clusterStateBuilder.build()
, EMPTY_CLUSTER_STATE));
assertThat(listeners.size(), equalTo(3)); // we have three types in the mapping
securityLifecycleService.clusterChanged(new ClusterChangedEvent("test-event",
clusterStateBuilder.build(), EMPTY_CLUSTER_STATE));
assertThat(listeners.size(), equalTo(expectedNumberOfListeners));
int counter = 0;
for (ActionListener actionListener : listeners) {
actionListener.onResponse(new TestPutMappingResponse(true));
if (counter++ < 2) {
assertTrue(securityLifecycleService.updateMappingPending.get());
if (++counter < expectedNumberOfListeners) {
assertThat(securityIndex.isMappingUpdatePending(), equalTo(true));
} else {
assertFalse(securityLifecycleService.updateMappingPending.get());
assertThat(securityIndex.isMappingUpdatePending(), equalTo(false));
}
}
}
@ -279,43 +296,48 @@ public class SecurityLifecycleServiceTests extends ESTestCase {
public void testUpToDateMappingIsIdentifiedAstUpToDate() throws IOException {
String templateString = "/" + SECURITY_TEMPLATE_NAME + ".json";
ClusterState.Builder clusterStateBuilder = createClusterStateWithMapping(templateString);
assertTrue(SecurityLifecycleService.securityIndexMappingUpToDate(clusterStateBuilder.build(), logger));
securityLifecycleService.clusterChanged(new ClusterChangedEvent("test-event", clusterStateBuilder.build()
, EMPTY_CLUSTER_STATE));
securityLifecycleService.clusterChanged(new ClusterChangedEvent("test-event",
clusterStateBuilder.build(), EMPTY_CLUSTER_STATE));
assertTrue(securityLifecycleService.securityIndex().isMappingUpToDate());
assertThat(listeners.size(), equalTo(0));
}
public void testMappingVersionMatching() throws IOException {
String templateString = "/" + SECURITY_TEMPLATE_NAME + ".json";
ClusterState.Builder clusterStateBuilder = createClusterStateWithMapping(templateString);
assertTrue(securityIndexMappingVersionMatches(clusterStateBuilder.build(), logger, Version.V_5_0_0::before));
assertFalse(securityIndexMappingVersionMatches(clusterStateBuilder.build(), logger, Version.V_5_0_0::after));
securityLifecycleService.clusterChanged(new ClusterChangedEvent("test-event",
clusterStateBuilder.build(), EMPTY_CLUSTER_STATE));
final IndexLifecycleManager securityIndex = securityLifecycleService.securityIndex();
assertTrue(securityIndex.checkMappingVersion(Version.V_5_0_0::before));
assertFalse(securityIndex.checkMappingVersion(Version.V_5_0_0::after));
}
public void testMissingVersionMappingIsIdentifiedAsNotUpToDate() throws IOException {
String templateString = "/missing-version-" + SECURITY_TEMPLATE_NAME + ".json";
ClusterState.Builder clusterStateBuilder = createClusterStateWithMapping(templateString);
assertFalse(SecurityLifecycleService.securityIndexMappingUpToDate(clusterStateBuilder.build(), logger));
assertThat(SecurityLifecycleService.oldestSecurityIndexMappingVersion(clusterStateBuilder.build(), logger),
equalTo(Version.V_2_3_0));
final ClusterState clusterState = clusterStateBuilder.build();
assertFalse(securityIndexMappingAndTemplateUpToDate(clusterState, logger));
assertFalse(securityIndexMappingAndTemplateSufficientToRead(clusterState, logger));
checkMappingUpdateWorkCorrectly(clusterStateBuilder, Version.V_2_3_0);
}
public void testMissingIndexIsIdentifiedAsUpToDate() throws IOException {
ClusterState.Builder clusterStateBuilder = ClusterState.builder(new ClusterName("test-cluster"));
final ClusterName clusterName = new ClusterName("test-cluster");
final ClusterState.Builder clusterStateBuilder = ClusterState.builder(clusterName);
String mappingString = "/" + SECURITY_TEMPLATE_NAME + ".json";
IndexTemplateMetaData.Builder templateMeta = getIndexTemplateMetaData(mappingString);
MetaData.Builder builder = new MetaData.Builder(clusterStateBuilder.build().getMetaData());
builder.put(templateMeta);
clusterStateBuilder.metaData(builder);
assertTrue(SecurityLifecycleService.securityIndexMappingUpToDate(clusterStateBuilder.build(), logger));
securityLifecycleService.clusterChanged(new ClusterChangedEvent("test-event", clusterStateBuilder.build()
, EMPTY_CLUSTER_STATE));
assertThat(SecurityLifecycleService.oldestSecurityIndexMappingVersion(clusterStateBuilder.build(), logger), nullValue());
assertTrue(securityLifecycleService.securityIndex().isMappingUpToDate());
assertThat(securityLifecycleService.securityIndex().getMappingVersion(), nullValue());
assertThat(listeners.size(), equalTo(0));
}
private ClusterState.Builder createClusterStateWithMapping(String templateString) throws IOException {
private ClusterState.Builder createClusterStateWithMapping(String templateString)
throws IOException {
IndexMetaData.Builder indexMetaData = createIndexMetadata(templateString);
ImmutableOpenMap.Builder mapBuilder = ImmutableOpenMap.builder();
mapBuilder.put(SECURITY_INDEX_NAME, indexMetaData.build());
@ -325,12 +347,15 @@ public class SecurityLifecycleServiceTests extends ESTestCase {
IndexTemplateMetaData.Builder templateMeta = getIndexTemplateMetaData(mappingString);
metaDataBuilder.put(templateMeta);
ClusterState.Builder clusterStateBuilder = ClusterState.builder(state());
clusterStateBuilder.metaData(metaDataBuilder.build()).routingTable(SecurityTestUtils.buildSecurityIndexRoutingTable());
final RoutingTable routingTable = SecurityTestUtils.buildSecurityIndexRoutingTable();
clusterStateBuilder.metaData(metaDataBuilder.build()).routingTable(routingTable);
return clusterStateBuilder;
}
private IndexMetaData.Builder createIndexMetadata(String templateString) throws IOException {
String template = TemplateUtils.loadTemplate(templateString, Version.CURRENT.toString(), SECURITY_INDEX_TEMPLATE_VERSION_PATTERN);
private static IndexMetaData.Builder createIndexMetadata(String templateString)
throws IOException {
String template = TemplateUtils.loadTemplate(templateString, Version.CURRENT.toString(),
IndexLifecycleManager.TEMPLATE_VERSION_PATTERN);
PutIndexTemplateRequest request = new PutIndexTemplateRequest();
request.source(template, XContentType.JSON);
IndexMetaData.Builder indexMetaData = IndexMetaData.builder(SECURITY_INDEX_NAME);
@ -346,7 +371,8 @@ public class SecurityLifecycleServiceTests extends ESTestCase {
return indexMetaData;
}
private ClusterState.Builder createClusterStateWithTemplate(String templateString) throws IOException {
public static ClusterState.Builder createClusterStateWithTemplate(String templateString)
throws IOException {
IndexTemplateMetaData.Builder templateBuilder = getIndexTemplateMetaData(templateString);
MetaData.Builder metaDataBuidler = new MetaData.Builder();
metaDataBuidler.put(templateBuilder);
@ -358,12 +384,14 @@ public class SecurityLifecycleServiceTests extends ESTestCase {
.metaData(metaDataBuidler.build());
}
private IndexTemplateMetaData.Builder getIndexTemplateMetaData(String templateString) throws IOException {
String template = TemplateUtils.loadTemplate(templateString, Version.CURRENT.toString()
, SECURITY_INDEX_TEMPLATE_VERSION_PATTERN);
private static IndexTemplateMetaData.Builder getIndexTemplateMetaData(String templateString)
throws IOException {
String template = TemplateUtils.loadTemplate(templateString, Version.CURRENT.toString(),
IndexLifecycleManager.TEMPLATE_VERSION_PATTERN);
PutIndexTemplateRequest request = new PutIndexTemplateRequest();
request.source(template, XContentType.JSON);
IndexTemplateMetaData.Builder templateBuilder = IndexTemplateMetaData.builder(SECURITY_TEMPLATE_NAME);
IndexTemplateMetaData.Builder templateBuilder =
IndexTemplateMetaData.builder(SECURITY_TEMPLATE_NAME);
for (Map.Entry<String, String> entry : request.mappings().entrySet()) {
templateBuilder.putMapping(entry.getKey(), entry.getValue());
}

View File

@ -144,7 +144,7 @@ public class TransportGetUsersActionTests extends ESTestCase {
NativeUsersStore usersStore = mock(NativeUsersStore.class);
SecurityLifecycleService securityLifecycleService = mock(SecurityLifecycleService.class);
when(securityLifecycleService.securityIndexAvailable()).thenReturn(true);
when(securityLifecycleService.checkMappingVersion(any())).thenReturn(true);
when(securityLifecycleService.checkSecurityMappingVersion(any())).thenReturn(true);
ReservedRealmTests.mockGetAllReservedUserInfo(usersStore, Collections.emptyMap());
ReservedRealm reservedRealm =

View File

@ -67,12 +67,12 @@ public class ReservedRealmTests extends ESTestCase {
usersStore = mock(NativeUsersStore.class);
securityLifecycleService = mock(SecurityLifecycleService.class);
when(securityLifecycleService.securityIndexAvailable()).thenReturn(true);
when(securityLifecycleService.checkMappingVersion(any())).thenReturn(true);
when(securityLifecycleService.checkSecurityMappingVersion(any())).thenReturn(true);
mockGetAllReservedUserInfo(usersStore, Collections.emptyMap());
}
public void testMappingVersionFromBeforeUserExisted() throws ExecutionException, InterruptedException {
when(securityLifecycleService.checkMappingVersion(any())).thenReturn(false);
when(securityLifecycleService.checkSecurityMappingVersion(any())).thenReturn(false);
final ReservedRealm reservedRealm =
new ReservedRealm(mock(Environment.class), Settings.EMPTY, usersStore,
new AnonymousUser(Settings.EMPTY), securityLifecycleService);
@ -108,7 +108,7 @@ public class ReservedRealmTests extends ESTestCase {
verify(usersStore).getReservedUserInfo(eq(principal), any(ActionListener.class));
}
final ArgumentCaptor<Predicate> predicateCaptor = ArgumentCaptor.forClass(Predicate.class);
verify(securityLifecycleService).checkMappingVersion(predicateCaptor.capture());
verify(securityLifecycleService).checkSecurityMappingVersion(predicateCaptor.capture());
verifyVersionPredicate(principal, predicateCaptor.getValue());
verifyNoMoreInteractions(usersStore);
}
@ -200,7 +200,7 @@ public class ReservedRealmTests extends ESTestCase {
verify(securityLifecycleService, times(2)).securityIndexExists();
verify(usersStore, times(2)).getReservedUserInfo(eq(principal), any(ActionListener.class));
final ArgumentCaptor<Predicate> predicateCaptor = ArgumentCaptor.forClass(Predicate.class);
verify(securityLifecycleService, times(2)).checkMappingVersion(predicateCaptor.capture());
verify(securityLifecycleService, times(2)).checkSecurityMappingVersion(predicateCaptor.capture());
verifyVersionPredicate(principal, predicateCaptor.getValue());
verifyNoMoreInteractions(usersStore);
}
@ -219,7 +219,7 @@ public class ReservedRealmTests extends ESTestCase {
verify(securityLifecycleService).securityIndexExists();
final ArgumentCaptor<Predicate> predicateCaptor = ArgumentCaptor.forClass(Predicate.class);
verify(securityLifecycleService).checkMappingVersion(predicateCaptor.capture());
verify(securityLifecycleService).checkSecurityMappingVersion(predicateCaptor.capture());
verifyVersionPredicate(principal, predicateCaptor.getValue());
PlainActionFuture<User> future = new PlainActionFuture<>();
@ -266,7 +266,7 @@ public class ReservedRealmTests extends ESTestCase {
verify(usersStore).getReservedUserInfo(eq(principal), any(ActionListener.class));
final ArgumentCaptor<Predicate> predicateCaptor = ArgumentCaptor.forClass(Predicate.class);
verify(securityLifecycleService).checkMappingVersion(predicateCaptor.capture());
verify(securityLifecycleService).checkSecurityMappingVersion(predicateCaptor.capture());
verifyVersionPredicate(principal, predicateCaptor.getValue());
verifyNoMoreInteractions(usersStore);