diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java index 995be17cbd5..46324c5c046 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -419,7 +419,7 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw components.add(auditTrailService); this.auditTrailService.set(auditTrailService); - securityIndex.set(new SecurityIndexManager(settings, client, SecurityIndexManager.SECURITY_INDEX_NAME, clusterService)); + securityIndex.set(new SecurityIndexManager(client, SecurityIndexManager.SECURITY_INDEX_NAME, clusterService)); final TokenService tokenService = new TokenService(settings, Clock.systemUTC(), client, securityIndex.get(), clusterService); this.tokenService.set(tokenService); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java index c72cf7e8c4f..8814a627087 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java @@ -25,7 +25,6 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.update.UpdateRequest; @@ -116,7 +115,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; -import java.util.function.Supplier; import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; @@ -366,45 +364,50 @@ public final class TokenService extends AbstractComponent { final Cipher cipher = getDecryptionCipher(iv, decodeKey, version, decodedSalt); if (version.onOrAfter(Version.V_6_2_0)) { // we only have the id and need to get the token from the doc! - decryptTokenId(in, cipher, version, ActionListener.wrap(tokenId -> - securityIndex.prepareIndexIfNeededThenExecute( - ex -> listener.onFailure(traceLog("prepare security index", tokenId, ex)), - () -> { - final GetRequest getRequest = client.prepareGet(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, - getTokenDocumentId(tokenId)).request(); - Consumer onFailure = ex -> listener.onFailure(traceLog("decode token", tokenId, ex)); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest, - ActionListener.wrap(response -> { - if (response.isExists()) { - Map accessTokenSource = - (Map) response.getSource().get("access_token"); - if (accessTokenSource == null) { - onFailure.accept(new IllegalStateException( - "token document is missing the access_token field")); - } else if (accessTokenSource.containsKey("user_token") == false) { - onFailure.accept(new IllegalStateException( - "token document is missing the user_token field")); + decryptTokenId(in, cipher, version, ActionListener.wrap(tokenId -> { + if (securityIndex.isAvailable() == false) { + logger.warn("failed to get token [{}] since index is not available", tokenId); + listener.onResponse(null); + } else { + securityIndex.checkIndexVersionThenExecute( + ex -> listener.onFailure(traceLog("prepare security index", tokenId, ex)), + () -> { + final GetRequest getRequest = client.prepareGet(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, + getTokenDocumentId(tokenId)).request(); + Consumer onFailure = ex -> listener.onFailure(traceLog("decode token", tokenId, ex)); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest, + ActionListener.wrap(response -> { + if (response.isExists()) { + Map accessTokenSource = + (Map) response.getSource().get("access_token"); + if (accessTokenSource == null) { + onFailure.accept(new IllegalStateException( + "token document is missing the access_token field")); + } else if (accessTokenSource.containsKey("user_token") == false) { + onFailure.accept(new IllegalStateException( + "token document is missing the user_token field")); + } else { + Map userTokenSource = + (Map) accessTokenSource.get("user_token"); + listener.onResponse(UserToken.fromSourceMap(userTokenSource)); + } } else { - Map userTokenSource = - (Map) accessTokenSource.get("user_token"); - listener.onResponse(UserToken.fromSourceMap(userTokenSource)); + onFailure.accept( + new IllegalStateException("token document is missing and must be present")); } - } else { - onFailure.accept( - new IllegalStateException("token document is missing and must be present")); - } - }, e -> { - // if the index or the shard is not there / available we assume that - // the token is not valid - if (isShardNotAvailableException(e)) { - logger.warn("failed to get token [{}] since index is not available", tokenId); - listener.onResponse(null); - } else { - logger.error(new ParameterizedMessage("failed to get token [{}]", tokenId), e); - listener.onFailure(e); - } - }), client::get); - }), listener::onFailure)); + }, e -> { + // if the index or the shard is not there / available we assume that + // the token is not valid + if (isShardNotAvailableException(e)) { + logger.warn("failed to get token [{}] since index is not available", tokenId); + listener.onResponse(null); + } else { + logger.error(new ParameterizedMessage("failed to get token [{}]", tokenId), e); + listener.onFailure(e); + } + }), client::get); + }); + }}, listener::onFailure)); } else { decryptToken(in, cipher, version, listener); } @@ -691,31 +694,41 @@ public final class TokenService extends AbstractComponent { .setVersion(true) .request(); - Consumer onFailure = ex -> listener.onFailure(traceLog("find by refresh token", refreshToken, ex)); - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> - executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request, - ActionListener.wrap(searchResponse -> { - if (searchResponse.isTimedOut()) { - attemptCount.incrementAndGet(); - findTokenFromRefreshToken(refreshToken, listener, attemptCount); - } else if (searchResponse.getHits().getHits().length < 1) { - logger.info("could not find token document with refresh_token [{}]", refreshToken); - onFailure.accept(invalidGrantException("could not refresh the requested token")); - } else if (searchResponse.getHits().getHits().length > 1) { - onFailure.accept(new IllegalStateException("multiple tokens share the same refresh token")); - } else { - listener.onResponse(new Tuple<>(searchResponse, attemptCount)); - } - }, e -> { - if (isShardNotAvailableException(e)) { - logger.debug("failed to search for token document, retrying", e); - attemptCount.incrementAndGet(); - findTokenFromRefreshToken(refreshToken, listener, attemptCount); - } else { - onFailure.accept(e); - } - }), - client::search)); + final SecurityIndexManager frozenSecurityIndex = securityIndex.freeze(); + if (frozenSecurityIndex.indexExists() == false) { + logger.warn("security index does not exist therefore refresh token [{}] cannot be validated", refreshToken); + listener.onFailure(invalidGrantException("could not refresh the requested token")); + } else if (frozenSecurityIndex.isAvailable() == false) { + logger.debug("security index is not available to find token from refresh token, retrying"); + attemptCount.incrementAndGet(); + findTokenFromRefreshToken(refreshToken, listener, attemptCount); + } else { + Consumer onFailure = ex -> listener.onFailure(traceLog("find by refresh token", refreshToken, ex)); + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> + executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request, + ActionListener.wrap(searchResponse -> { + if (searchResponse.isTimedOut()) { + attemptCount.incrementAndGet(); + findTokenFromRefreshToken(refreshToken, listener, attemptCount); + } else if (searchResponse.getHits().getHits().length < 1) { + logger.info("could not find token document with refresh_token [{}]", refreshToken); + onFailure.accept(invalidGrantException("could not refresh the requested token")); + } else if (searchResponse.getHits().getHits().length > 1) { + onFailure.accept(new IllegalStateException("multiple tokens share the same refresh token")); + } else { + listener.onResponse(new Tuple<>(searchResponse, attemptCount)); + } + }, e -> { + if (isShardNotAvailableException(e)) { + logger.debug("failed to search for token document, retrying", e); + attemptCount.incrementAndGet(); + findTokenFromRefreshToken(refreshToken, listener, attemptCount); + } else { + onFailure.accept(e); + } + }), + client::search)); + } } } @@ -850,24 +863,27 @@ public final class TokenService extends AbstractComponent { public void findActiveTokensForRealm(String realmName, ActionListener>> listener) { ensureEnabled(); + final SecurityIndexManager frozenSecurityIndex = securityIndex.freeze(); if (Strings.isNullOrEmpty(realmName)) { listener.onFailure(new IllegalArgumentException("Realm name is required")); - return; - } - - final Instant now = clock.instant(); - final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() + } else if (frozenSecurityIndex.indexExists() == false) { + listener.onResponse(Collections.emptyList()); + } else if (frozenSecurityIndex.isAvailable() == false) { + listener.onFailure(frozenSecurityIndex.getUnavailableReason()); + } else { + final Instant now = clock.instant(); + final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() .filter(QueryBuilders.termQuery("doc_type", "token")) .filter(QueryBuilders.termQuery("access_token.realm", realmName)) .filter(QueryBuilders.boolQuery() - .should(QueryBuilders.boolQuery() - .must(QueryBuilders.termQuery("access_token.invalidated", false)) - .must(QueryBuilders.rangeQuery("access_token.user_token.expiration_time").gte(now.toEpochMilli())) - ) - .should(QueryBuilders.termQuery("refresh_token.invalidated", false)) + .should(QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery("access_token.invalidated", false)) + .must(QueryBuilders.rangeQuery("access_token.user_token.expiration_time").gte(now.toEpochMilli())) + ) + .should(QueryBuilders.termQuery("refresh_token.invalidated", false)) ); - final SearchRequest request = client.prepareSearch(SecurityIndexManager.SECURITY_INDEX_NAME) + final SearchRequest request = client.prepareSearch(SecurityIndexManager.SECURITY_INDEX_NAME) .setScroll(DEFAULT_KEEPALIVE_SETTING.get(settings)) .setQuery(boolQuery) .setVersion(false) @@ -875,9 +891,9 @@ public final class TokenService extends AbstractComponent { .setFetchSource(true) .request(); - final Supplier supplier = client.threadPool().getThreadContext().newRestorableContext(false); - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> - ScrollHelper.fetchAllByEntity(client, request, new ContextPreservingActionListener<>(supplier, listener), this::parseHit)); + securityIndex.checkIndexVersionThenExecute(listener::onFailure, + () -> ScrollHelper.fetchAllByEntity(client, request, listener, this::parseHit)); + } } private Tuple parseHit(SearchHit hit) { @@ -944,10 +960,12 @@ public final class TokenService extends AbstractComponent { */ private void checkIfTokenIsRevoked(UserToken userToken, ActionListener listener) { if (securityIndex.indexExists() == false) { - // index doesn't exist so the token is considered valid. + // index doesn't exist so the token is considered valid. it is important to note that + // we do not use isAvailable as the lack of a shard being available is not equivalent + // to the index not existing in the case of revocation checking. listener.onResponse(userToken); } else { - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> { + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> { MultiGetRequest mGetRequest = client.prepareMultiGet() .add(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, getInvalidatedTokenDocumentId(userToken)) .add(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, getTokenDocumentId(userToken)) diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStore.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStore.java index 620c3817ebb..35912de4412 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStore.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStore.java @@ -109,31 +109,36 @@ public class NativeUsersStore extends AbstractComponent { */ public void getUsers(String[] userNames, final ActionListener> listener) { final Consumer handleException = (t) -> { - if (t instanceof IndexNotFoundException) { - logger.trace("could not retrieve users because security index does not exist"); - // We don't invoke the onFailure listener here, instead just pass an empty list - listener.onResponse(Collections.emptyList()); - } else { - listener.onFailure(t); + if (TransportActions.isShardNotAvailableException(t)) { + logger.trace("could not retrieve users because of a shard not available exception", t); + if (t instanceof IndexNotFoundException) { + // We don't invoke the onFailure listener here, instead just pass an empty list + // as the index doesn't exist. Could have been deleted between checks and execution + listener.onResponse(Collections.emptyList()); + } else { + listener.onFailure(t); + } } + listener.onFailure(t); }; - if (securityIndex.indexExists() == false) { - // TODO remove this short circuiting and fix tests that fail without this! + final SecurityIndexManager frozenSecurityIndex = this.securityIndex.freeze(); + if (frozenSecurityIndex.indexExists() == false) { listener.onResponse(Collections.emptyList()); + } else if (frozenSecurityIndex.isAvailable() == false) { + listener.onFailure(frozenSecurityIndex.getUnavailableReason()); } else if (userNames.length == 1) { // optimization for single user lookup final String username = userNames[0]; getUserAndPassword(username, ActionListener.wrap( (uap) -> listener.onResponse(uap == null ? Collections.emptyList() : Collections.singletonList(uap.user())), handleException)); } else { - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> { + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> { final QueryBuilder query; if (userNames == null || userNames.length == 0) { query = QueryBuilders.termQuery(Fields.TYPE.getPreferredName(), USER_DOC_TYPE); } else { - final String[] users = Arrays.asList(userNames).stream() - .map(s -> getIdForUser(USER_DOC_TYPE, s)).toArray(String[]::new); + final String[] users = Arrays.stream(userNames).map(s -> getIdForUser(USER_DOC_TYPE, s)).toArray(String[]::new); query = QueryBuilders.boolQuery().filter(QueryBuilders.idsQuery(INDEX_TYPE).addIds(users)); } final Supplier supplier = client.threadPool().getThreadContext().newRestorableContext(false); @@ -155,10 +160,13 @@ public class NativeUsersStore extends AbstractComponent { } void getUserCount(final ActionListener listener) { - if (securityIndex.indexExists() == false) { + final SecurityIndexManager frozenSecurityIndex = this.securityIndex.freeze(); + if (frozenSecurityIndex.indexExists() == false) { listener.onResponse(0L); + } else if (frozenSecurityIndex.isAvailable() == false) { + listener.onFailure(frozenSecurityIndex.getUnavailableReason()); } else { - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, client.prepareSearch(SECURITY_INDEX_NAME) .setQuery(QueryBuilders.termQuery(Fields.TYPE.getPreferredName(), USER_DOC_TYPE)) @@ -182,11 +190,16 @@ public class NativeUsersStore extends AbstractComponent { * Async method to retrieve a user and their password */ private void getUserAndPassword(final String user, final ActionListener listener) { - if (securityIndex.indexExists() == false) { - // TODO remove this short circuiting and fix tests that fail without this! + final SecurityIndexManager frozenSecurityIndex = securityIndex.freeze(); + if (frozenSecurityIndex.isAvailable() == false) { + if (frozenSecurityIndex.indexExists()) { + logger.trace("could not retrieve user [{}] because security index does not exist", user); + } else { + logger.error("security index is unavailable. short circuiting retrieval of user [{}]", user); + } listener.onResponse(null); } else { - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, client.prepareGet(SECURITY_INDEX_NAME, INDEX_TYPE, getIdForUser(USER_DOC_TYPE, user)).request(), @@ -459,16 +472,22 @@ public class NativeUsersStore extends AbstractComponent { } public void deleteUser(final DeleteUserRequest deleteUserRequest, final ActionListener listener) { - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> { - DeleteRequest request = client.prepareDelete(SECURITY_INDEX_NAME, + final SecurityIndexManager frozenSecurityIndex = securityIndex.freeze(); + if (frozenSecurityIndex.indexExists() == false) { + listener.onResponse(false); + } else if (frozenSecurityIndex.isAvailable() == false) { + listener.onFailure(frozenSecurityIndex.getUnavailableReason()); + } else { + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> { + DeleteRequest request = client.prepareDelete(SECURITY_INDEX_NAME, INDEX_TYPE, getIdForUser(USER_DOC_TYPE, deleteUserRequest.username())).request(); - request.setRefreshPolicy(deleteUserRequest.getRefreshPolicy()); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request, + request.setRefreshPolicy(deleteUserRequest.getRefreshPolicy()); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request, new ActionListener() { @Override public void onResponse(DeleteResponse deleteResponse) { clearRealmCache(deleteUserRequest.username(), listener, - deleteResponse.getResult() == DocWriteResponse.Result.DELETED); + deleteResponse.getResult() == DocWriteResponse.Result.DELETED); } @Override @@ -476,7 +495,8 @@ public class NativeUsersStore extends AbstractComponent { listener.onFailure(e); } }, client::delete); - }); + }); + } } /** @@ -498,11 +518,13 @@ public class NativeUsersStore extends AbstractComponent { } void getReservedUserInfo(String username, ActionListener listener) { - if (securityIndex.indexExists() == false) { - // TODO remove this short circuiting and fix tests that fail without this! + final SecurityIndexManager frozenSecurityIndex = securityIndex.freeze(); + if (frozenSecurityIndex.indexExists() == false) { listener.onResponse(null); + } else if (frozenSecurityIndex.isAvailable() == false) { + listener.onFailure(frozenSecurityIndex.getUnavailableReason()); } else { - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, client.prepareGet(SECURITY_INDEX_NAME, INDEX_TYPE, getIdForUser(RESERVED_USER_TYPE, username)).request(), @@ -541,49 +563,56 @@ public class NativeUsersStore extends AbstractComponent { } void getAllReservedUserInfo(ActionListener> listener) { - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> - executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, - client.prepareSearch(SECURITY_INDEX_NAME) + final SecurityIndexManager frozenSecurityIndex = securityIndex.freeze(); + if (frozenSecurityIndex.indexExists() == false) { + listener.onResponse(Collections.emptyMap()); + } else if (frozenSecurityIndex.isAvailable() == false) { + listener.onFailure(frozenSecurityIndex.getUnavailableReason()); + } else { + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> + executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, + client.prepareSearch(SECURITY_INDEX_NAME) .setQuery(QueryBuilders.termQuery(Fields.TYPE.getPreferredName(), RESERVED_USER_TYPE)) .setFetchSource(true).request(), - new ActionListener() { - @Override - public void onResponse(SearchResponse searchResponse) { - Map userInfos = new HashMap<>(); - assert searchResponse.getHits().getTotalHits() <= 10 : + new ActionListener() { + @Override + public void onResponse(SearchResponse searchResponse) { + Map userInfos = new HashMap<>(); + assert searchResponse.getHits().getTotalHits() <= 10 : "there are more than 10 reserved users we need to change this to retrieve them all!"; - for (SearchHit searchHit : searchResponse.getHits().getHits()) { - Map sourceMap = searchHit.getSourceAsMap(); - String password = (String) sourceMap.get(Fields.PASSWORD.getPreferredName()); - Boolean enabled = (Boolean) sourceMap.get(Fields.ENABLED.getPreferredName()); - final String id = searchHit.getId(); - assert id != null && id.startsWith(RESERVED_USER_TYPE) : + for (SearchHit searchHit : searchResponse.getHits().getHits()) { + Map sourceMap = searchHit.getSourceAsMap(); + String password = (String) sourceMap.get(Fields.PASSWORD.getPreferredName()); + Boolean enabled = (Boolean) sourceMap.get(Fields.ENABLED.getPreferredName()); + final String id = searchHit.getId(); + assert id != null && id.startsWith(RESERVED_USER_TYPE) : "id [" + id + "] does not start with reserved-user prefix"; - final String username = id.substring(RESERVED_USER_TYPE.length() + 1); - if (password == null) { - listener.onFailure(new IllegalStateException("password hash must not be null!")); - return; - } else if (enabled == null) { - listener.onFailure(new IllegalStateException("enabled must not be null!")); - return; + final String username = id.substring(RESERVED_USER_TYPE.length() + 1); + if (password == null) { + listener.onFailure(new IllegalStateException("password hash must not be null!")); + return; + } else if (enabled == null) { + listener.onFailure(new IllegalStateException("enabled must not be null!")); + return; + } else { + userInfos.put(username, new ReservedUserInfo(password.toCharArray(), enabled, false)); + } + } + listener.onResponse(userInfos); + } + + @Override + public void onFailure(Exception e) { + if (e instanceof IndexNotFoundException) { + logger.trace("could not retrieve built in users since security index does not exist", e); + listener.onResponse(Collections.emptyMap()); } else { - userInfos.put(username, new ReservedUserInfo(password.toCharArray(), enabled, false)); + logger.error("failed to retrieve built in users", e); + listener.onFailure(e); } } - listener.onResponse(userInfos); - } - - @Override - public void onFailure(Exception e) { - if (e instanceof IndexNotFoundException) { - logger.trace("could not retrieve built in users since security index does not exist", e); - listener.onResponse(Collections.emptyMap()); - } else { - logger.error("failed to retrieve built in users", e); - listener.onFailure(e); - } - } - }, client::search)); + }, client::search)); + } } private void clearRealmCache(String username, ActionListener listener, Response response) { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/mapper/NativeRoleMappingStore.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/mapper/NativeRoleMappingStore.java index b45de8184d6..e41bcfbfe17 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/mapper/NativeRoleMappingStore.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/mapper/NativeRoleMappingStore.java @@ -220,32 +220,35 @@ public class NativeRoleMappingStore extends AbstractComponent implements UserRol }); } - private void innerDeleteMapping(DeleteRoleMappingRequest request, ActionListener listener) throws IOException { - if (securityIndex.isIndexUpToDate() == false) { - listener.onFailure(new IllegalStateException( - "Security index is not on the current version - the native realm will not be operational until " + - "the upgrade API is run on the security index")); - return; - } - executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, - client.prepareDelete(SECURITY_INDEX_NAME, SECURITY_GENERIC_TYPE, getIdForName(request.getName())) + private void innerDeleteMapping(DeleteRoleMappingRequest request, ActionListener listener) { + final SecurityIndexManager frozenSecurityIndex = securityIndex.freeze(); + if (frozenSecurityIndex.indexExists() == false) { + listener.onResponse(false); + } else if (securityIndex.isAvailable() == false) { + listener.onFailure(frozenSecurityIndex.getUnavailableReason()); + } else { + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> { + executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, + client.prepareDelete(SECURITY_INDEX_NAME, SECURITY_GENERIC_TYPE, getIdForName(request.getName())) .setRefreshPolicy(request.getRefreshPolicy()) .request(), - new ActionListener() { + new ActionListener() { - @Override - public void onResponse(DeleteResponse deleteResponse) { - boolean deleted = deleteResponse.getResult() == DELETED; - listener.onResponse(deleted); - } + @Override + public void onResponse(DeleteResponse deleteResponse) { + boolean deleted = deleteResponse.getResult() == DELETED; + listener.onResponse(deleted); + } - @Override - public void onFailure(Exception e) { - logger.error(new ParameterizedMessage("failed to delete role-mapping [{}]", request.getName()), e); - listener.onFailure(e); + @Override + public void onFailure(Exception e) { + logger.error(new ParameterizedMessage("failed to delete role-mapping [{}]", request.getName()), e); + listener.onFailure(e); - } - }, client::delete); + } + }, client::delete); + }); + } } /** @@ -301,7 +304,7 @@ public class NativeRoleMappingStore extends AbstractComponent implements UserRol * */ public void usageStats(ActionListener> listener) { - if (securityIndex.indexExists() == false) { + if (securityIndex.isAvailable() == false) { reportStats(listener, Collections.emptyList()); } else { getMappings(ActionListener.wrap(mappings -> reportStats(listener, mappings), listener::onFailure)); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStore.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStore.java index 2cfa89b647c..a1905db9599 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStore.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStore.java @@ -88,13 +88,18 @@ public class NativePrivilegeStore extends AbstractComponent { public void getPrivileges(Collection applications, Collection names, ActionListener> listener) { - if (applications != null && applications.size() == 1 && names != null && names.size() == 1) { + final SecurityIndexManager frozenSecurityIndex = securityIndexManager.freeze(); + if (frozenSecurityIndex.indexExists() == false) { + listener.onResponse(Collections.emptyList()); + } else if (frozenSecurityIndex.isAvailable() == false) { + listener.onFailure(frozenSecurityIndex.getUnavailableReason()); + } else if (applications != null && applications.size() == 1 && names != null && names.size() == 1) { getPrivilege(Objects.requireNonNull(Iterables.get(applications, 0)), Objects.requireNonNull(Iterables.get(names, 0)), ActionListener.wrap(privilege -> listener.onResponse(privilege == null ? Collections.emptyList() : Collections.singletonList(privilege)), listener::onFailure)); } else { - securityIndexManager.prepareIndexIfNeededThenExecute(listener::onFailure, () -> { + securityIndexManager.checkIndexVersionThenExecute(listener::onFailure, () -> { final QueryBuilder query; final TermQueryBuilder typeQuery = QueryBuilders .termQuery(ApplicationPrivilegeDescriptor.Fields.TYPE.getPreferredName(), DOC_TYPE_VALUE); @@ -134,33 +139,40 @@ public class NativePrivilegeStore extends AbstractComponent { return collection == null || collection.isEmpty(); } - public void getPrivilege(String application, String name, ActionListener listener) { - securityIndexManager.prepareIndexIfNeededThenExecute(listener::onFailure, - () -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, - client.prepareGet(SECURITY_INDEX_NAME, "doc", toDocId(application, name)).request(), - new ActionListener() { - @Override - public void onResponse(GetResponse response) { - if (response.isExists()) { - listener.onResponse(buildPrivilege(response.getId(), response.getSourceAsBytesRef())); - } else { - listener.onResponse(null); + void getPrivilege(String application, String name, ActionListener listener) { + final SecurityIndexManager frozenSecurityIndex = securityIndexManager.freeze(); + if (frozenSecurityIndex.isAvailable() == false) { + logger.warn(new ParameterizedMessage("failed to load privilege [{}] index not available", name), + frozenSecurityIndex.getUnavailableReason()); + listener.onResponse(null); + } else { + securityIndexManager.checkIndexVersionThenExecute(listener::onFailure, + () -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, + client.prepareGet(SECURITY_INDEX_NAME, "doc", toDocId(application, name)).request(), + new ActionListener() { + @Override + public void onResponse(GetResponse response) { + if (response.isExists()) { + listener.onResponse(buildPrivilege(response.getId(), response.getSourceAsBytesRef())); + } else { + listener.onResponse(null); + } } - } - @Override - public void onFailure(Exception e) { - // if the index or the shard is not there / available we just claim the privilege is not there - if (TransportActions.isShardNotAvailableException(e)) { - logger.warn(new ParameterizedMessage("failed to load privilege [{}] index not available", name), e); - listener.onResponse(null); - } else { - logger.error(new ParameterizedMessage("failed to load privilege [{}]", name), e); - listener.onFailure(e); + @Override + public void onFailure(Exception e) { + // if the index or the shard is not there / available we just claim the privilege is not there + if (TransportActions.isShardNotAvailableException(e)) { + logger.warn(new ParameterizedMessage("failed to load privilege [{}] index not available", name), e); + listener.onResponse(null); + } else { + logger.error(new ParameterizedMessage("failed to load privilege [{}]", name), e); + listener.onFailure(e); + } } - } - }, - client::get)); + }, + client::get)); + } } public void putPrivileges(Collection privileges, WriteRequest.RefreshPolicy refreshPolicy, @@ -200,23 +212,30 @@ public class NativePrivilegeStore extends AbstractComponent { public void deletePrivileges(String application, Collection names, WriteRequest.RefreshPolicy refreshPolicy, ActionListener>> listener) { - securityIndexManager.prepareIndexIfNeededThenExecute(listener::onFailure, () -> { - ActionListener groupListener = new GroupedActionListener<>( - ActionListener.wrap(responses -> { - final Map> deletedNames = responses.stream() - .filter(r -> r.getResult() == DocWriteResponse.Result.DELETED) - .map(r -> r.getId()) - .map(NativePrivilegeStore::nameFromDocId) - .collect(TUPLES_TO_MAP); - clearRolesCache(listener, deletedNames); - }, listener::onFailure), names.size(), Collections.emptyList()); - for (String name : names) { - ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, - client.prepareDelete(SECURITY_INDEX_NAME, "doc", toDocId(application, name)) - .setRefreshPolicy(refreshPolicy) - .request(), groupListener, client::delete); - } - }); + final SecurityIndexManager frozenSecurityIndex = securityIndexManager.freeze(); + if (frozenSecurityIndex.indexExists() == false) { + listener.onResponse(Collections.emptyMap()); + } else if (frozenSecurityIndex.isAvailable() == false) { + listener.onFailure(frozenSecurityIndex.getUnavailableReason()); + } else { + securityIndexManager.checkIndexVersionThenExecute(listener::onFailure, () -> { + ActionListener groupListener = new GroupedActionListener<>( + ActionListener.wrap(responses -> { + final Map> deletedNames = responses.stream() + .filter(r -> r.getResult() == DocWriteResponse.Result.DELETED) + .map(r -> r.getId()) + .map(NativePrivilegeStore::nameFromDocId) + .collect(TUPLES_TO_MAP); + clearRolesCache(listener, deletedNames); + }, listener::onFailure), names.size(), Collections.emptyList()); + for (String name : names) { + ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, + client.prepareDelete(SECURITY_INDEX_NAME, "doc", toDocId(application, name)) + .setRefreshPolicy(refreshPolicy) + .request(), groupListener, client::delete); + } + }); + } } private void clearRolesCache(ActionListener listener, T value) { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java index d604d166812..91244a1ad36 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java @@ -116,7 +116,7 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer { + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> { QueryBuilder query; if (names == null || names.isEmpty()) { query = QueryBuilders.termQuery(RoleDescriptor.Fields.TYPE.getPreferredName(), ROLE_TYPE); @@ -144,16 +144,22 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer listener) { - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> { - DeleteRequest request = client.prepareDelete(SecurityIndexManager.SECURITY_INDEX_NAME, + final SecurityIndexManager frozenSecurityIndex = securityIndex.freeze(); + if (frozenSecurityIndex.indexExists() == false) { + listener.onResponse(false); + } else if (frozenSecurityIndex.isAvailable() == false) { + listener.onFailure(frozenSecurityIndex.getUnavailableReason()); + } else { + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> { + DeleteRequest request = client.prepareDelete(SecurityIndexManager.SECURITY_INDEX_NAME, ROLE_DOC_TYPE, getIdForUser(deleteRoleRequest.name())).request(); - request.setRefreshPolicy(deleteRoleRequest.getRefreshPolicy()); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request, + request.setRefreshPolicy(deleteRoleRequest.getRefreshPolicy()); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request, new ActionListener() { @Override public void onResponse(DeleteResponse deleteResponse) { clearRoleCache(deleteRoleRequest.name(), listener, - deleteResponse.getResult() == DocWriteResponse.Result.DELETED); + deleteResponse.getResult() == DocWriteResponse.Result.DELETED); } @Override @@ -162,7 +168,8 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer listener) { @@ -210,13 +217,13 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer> listener) { Map usageStats = new HashMap<>(3); - if (securityIndex.indexExists() == false) { + if (securityIndex.isAvailable() == false) { usageStats.put("size", 0L); usageStats.put("fls", false); usageStats.put("dls", false); listener.onResponse(usageStats); } else { - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, client.prepareMultiSearch() .add(client.prepareSearch(SecurityIndexManager.SECURITY_INDEX_NAME) @@ -298,7 +305,7 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer listener) { - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, client.prepareGet(SECURITY_INDEX_NAME, ROLE_DOC_TYPE, getIdForUser(role)).request(), diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityIndexManager.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityIndexManager.java index d02b569a744..acd4f1c4805 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityIndexManager.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityIndexManager.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.security.support; +import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; @@ -13,6 +14,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; @@ -33,10 +35,10 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.xpack.core.template.TemplateUtils; import org.elasticsearch.xpack.core.upgrade.IndexUpgradeCheckVersion; @@ -63,7 +65,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; /** * Manages the lifecycle of a single index, its template, mapping and and data upgrades/migrations. */ -public class SecurityIndexManager extends AbstractComponent implements ClusterStateListener { +public class SecurityIndexManager implements ClusterStateListener { public static final String INTERNAL_SECURITY_INDEX = ".security-" + IndexUpgradeCheckVersion.UPRADE_VERSION; public static final int INTERNAL_INDEX_FORMAT = 6; @@ -71,19 +73,28 @@ public class SecurityIndexManager extends AbstractComponent implements ClusterSt public static final String TEMPLATE_VERSION_PATTERN = Pattern.quote("${security.template.version}"); public static final String SECURITY_TEMPLATE_NAME = "security-index-template"; public static final String SECURITY_INDEX_NAME = ".security"; + private static final Logger LOGGER = LogManager.getLogger(SecurityIndexManager.class); private final String indexName; private final Client client; private final List> stateChangeListeners = new CopyOnWriteArrayList<>(); - private volatile State indexState = new State(false, false, false, false, null, null); + private volatile State indexState; - public SecurityIndexManager(Settings settings, Client client, String indexName, ClusterService clusterService) { - super(settings); + public SecurityIndexManager(Client client, String indexName, ClusterService clusterService) { + this(client, indexName, new State(false, false, false, false, null, null)); + clusterService.addListener(this); + } + + private SecurityIndexManager(Client client, String indexName, State indexState) { this.client = client; this.indexName = indexName; - clusterService.addListener(this); + this.indexState = indexState; + } + + public SecurityIndexManager freeze() { + return new SecurityIndexManager(null, indexName, indexState); } public static List indexNames() { @@ -116,6 +127,19 @@ public class SecurityIndexManager extends AbstractComponent implements ClusterSt return this.indexState.mappingUpToDate; } + public ElasticsearchException getUnavailableReason() { + final State localState = this.indexState; + if (localState.indexAvailable) { + throw new IllegalStateException("caller must make sure to use a frozen state and check indexAvailable"); + } + + if (localState.indexExists) { + return new UnavailableShardsException(null, "at least one primary shard for the security index is unavailable"); + } else { + return new IndexNotFoundException(SECURITY_INDEX_NAME); + } + } + /** * Add a listener for notifications on state changes to the configured index. * @@ -130,7 +154,7 @@ public class SecurityIndexManager extends AbstractComponent implements ClusterSt if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { // wait until the gateway has recovered from disk, otherwise we think we don't have the // .security index but they may not have been restored from the cluster state on disk - logger.debug("security index manager waiting until state has been recovered"); + LOGGER.debug("security index manager waiting until state has been recovered"); return; } final State previousState = indexState; @@ -158,7 +182,7 @@ public class SecurityIndexManager extends AbstractComponent implements ClusterSt if (routingTable != null && routingTable.allPrimaryShardsActive()) { return true; } - logger.debug("Security index [{}] is not yet active", indexName); + LOGGER.debug("Security index [{}] is not yet active", indexName); return false; } @@ -187,7 +211,7 @@ public class SecurityIndexManager extends AbstractComponent implements ClusterSt private boolean checkIndexMappingVersionMatches(ClusterState clusterState, Predicate predicate) { - return checkIndexMappingVersionMatches(indexName, clusterState, logger, predicate); + return checkIndexMappingVersionMatches(indexName, clusterState, LOGGER, predicate); } public static boolean checkIndexMappingVersionMatches(String indexName, @@ -198,7 +222,7 @@ public class SecurityIndexManager extends AbstractComponent implements ClusterSt } private Version oldestIndexMappingVersion(ClusterState clusterState) { - final Set versions = loadIndexMappingVersions(indexName, clusterState, logger); + final Set versions = loadIndexMappingVersions(indexName, clusterState, LOGGER); return versions.stream().min(Version::compareTo).orElse(null); } @@ -253,6 +277,23 @@ public class SecurityIndexManager extends AbstractComponent implements ClusterSt } } + /** + * Validates the security index is up to date and does not need to migrated. If it is not, the + * consumer is called with an exception. If the security index is up to date, the runnable will + * be executed. NOTE: this method does not check the availability of the index; this check + * is left to the caller so that this condition can be handled appropriately. + */ + public void checkIndexVersionThenExecute(final Consumer consumer, final Runnable andThen) { + final State indexState = this.indexState; // use a local copy so all checks execute against the same state! + if (indexState.indexExists && indexState.isIndexUpToDate == false) { + consumer.accept(new IllegalStateException( + "Security index is not on the current version. Security features relying on the index will not be available until " + + "the upgrade API is run on the security index")); + } else { + andThen.run(); + } + } + /** * Prepares the index by creating it if it doesn't exist or updating the mappings if the mappings are * out of date. After any tasks have been executed, the runnable is then executed. diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/license/LicensingTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/license/LicensingTests.java index ad1bb7be95c..6c1781f3b6e 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/license/LicensingTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/license/LicensingTests.java @@ -41,7 +41,8 @@ import org.elasticsearch.transport.Transport; import org.elasticsearch.xpack.core.TestXPackTransportClient; import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.security.SecurityField; -import org.elasticsearch.xpack.core.security.action.user.GetUsersResponse; +import org.elasticsearch.xpack.core.security.action.user.PutUserResponse; +import org.elasticsearch.xpack.core.security.authc.support.Hasher; import org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken; import org.elasticsearch.xpack.core.security.client.SecurityClient; import org.elasticsearch.xpack.security.LocalStateSecurity; @@ -231,7 +232,7 @@ public class LicensingTests extends SecurityIntegTestCase { Settings settings = internalCluster().transportClient().settings(); try (TransportClient client = new TestXPackTransportClient(settings, LocalStateSecurity.class)) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); - new SecurityClient(client).prepareGetUsers().get(); + new SecurityClient(client).preparePutUser("john", "password".toCharArray(), Hasher.BCRYPT).get(); fail("security actions should not be enabled!"); } catch (ElasticsearchSecurityException e) { assertThat(e.status(), is(RestStatus.FORBIDDEN)); @@ -245,7 +246,7 @@ public class LicensingTests extends SecurityIntegTestCase { // security actions should work! try (TransportClient client = new TestXPackTransportClient(settings, LocalStateSecurity.class)) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); - GetUsersResponse response = new SecurityClient(client).prepareGetUsers().get(); + PutUserResponse response = new SecurityClient(client).preparePutUser("john", "password".toCharArray(), Hasher.BCRYPT).get(); assertNotNull(response); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlInvalidateSessionActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlInvalidateSessionActionTests.java index 17a45f23893..b0ca953b69c 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlInvalidateSessionActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlInvalidateSessionActionTests.java @@ -163,6 +163,13 @@ public class TransportSamlInvalidateSessionActionTests extends SamlTestCase { ((Runnable) inv.getArguments()[1]).run(); return null; }).when(securityIndex).prepareIndexIfNeededThenExecute(any(Consumer.class), any(Runnable.class)); + doAnswer(inv -> { + ((Runnable) inv.getArguments()[1]).run(); + return null; + }).when(securityIndex).checkIndexVersionThenExecute(any(Consumer.class), any(Runnable.class)); + when(securityIndex.isAvailable()).thenReturn(true); + when(securityIndex.indexExists()).thenReturn(true); + when(securityIndex.freeze()).thenReturn(securityIndex); final ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); tokenService = new TokenService(settings, Clock.systemUTC(), client, securityIndex, clusterService); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlLogoutActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlLogoutActionTests.java index 291c102f396..f2f94176edc 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlLogoutActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlLogoutActionTests.java @@ -178,6 +178,11 @@ public class TransportSamlLogoutActionTests extends SamlTestCase { ((Runnable) inv.getArguments()[1]).run(); return null; }).when(securityIndex).prepareIndexIfNeededThenExecute(any(Consumer.class), any(Runnable.class)); + doAnswer(inv -> { + ((Runnable) inv.getArguments()[1]).run(); + return null; + }).when(securityIndex).checkIndexVersionThenExecute(any(Consumer.class), any(Runnable.class)); + when(securityIndex.isAvailable()).thenReturn(true); final ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); tokenService = new TokenService(settings, Clock.systemUTC(), client, securityIndex, clusterService); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/AuthenticationServiceTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/AuthenticationServiceTests.java index ef5b0386bc2..54908389610 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/AuthenticationServiceTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/AuthenticationServiceTests.java @@ -87,6 +87,7 @@ import java.util.function.Consumer; import static org.elasticsearch.test.SecurityTestsUtils.assertAuthenticationException; import static org.elasticsearch.xpack.core.security.support.Exceptions.authenticationError; +import static org.elasticsearch.xpack.security.authc.TokenServiceTests.mockCheckTokenInvalidationFromId; import static org.elasticsearch.xpack.security.authc.TokenServiceTests.mockGetTokenFromId; import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.contains; @@ -187,6 +188,11 @@ public class AuthenticationServiceTests extends ESTestCase { runnable.run(); return null; }).when(securityIndex).prepareIndexIfNeededThenExecute(any(Consumer.class), any(Runnable.class)); + doAnswer(invocationOnMock -> { + Runnable runnable = (Runnable) invocationOnMock.getArguments()[1]; + runnable.run(); + return null; + }).when(securityIndex).checkIndexVersionThenExecute(any(Consumer.class), any(Runnable.class)); ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); tokenService = new TokenService(settings, Clock.systemUTC(), client, securityIndex, clusterService); service = new AuthenticationService(settings, realms, auditTrail, @@ -898,7 +904,11 @@ public class AuthenticationServiceTests extends ESTestCase { tokenService.createUserToken(expected, originatingAuth, tokenFuture, Collections.emptyMap(), true); } String token = tokenService.getUserTokenString(tokenFuture.get().v1()); + when(client.prepareMultiGet()).thenReturn(new MultiGetRequestBuilder(client, MultiGetAction.INSTANCE)); mockGetTokenFromId(tokenFuture.get().v1(), client); + mockCheckTokenInvalidationFromId(tokenFuture.get().v1(), client); + when(securityIndex.isAvailable()).thenReturn(true); + when(securityIndex.indexExists()).thenReturn(true); try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { threadContext.putHeader("Authorization", "Bearer " + token); service.authenticate("_action", message, (User)null, ActionListener.wrap(result -> { diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/TokenServiceTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/TokenServiceTests.java index 213def0f0fe..7926b44a38c 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/TokenServiceTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/TokenServiceTests.java @@ -137,6 +137,13 @@ public class TokenServiceTests extends ESTestCase { runnable.run(); return null; }).when(securityIndex).prepareIndexIfNeededThenExecute(any(Consumer.class), any(Runnable.class)); + doAnswer(invocationOnMock -> { + Runnable runnable = (Runnable) invocationOnMock.getArguments()[1]; + runnable.run(); + return null; + }).when(securityIndex).checkIndexVersionThenExecute(any(Consumer.class), any(Runnable.class)); + when(securityIndex.indexExists()).thenReturn(true); + when(securityIndex.isAvailable()).thenReturn(true); this.clusterService = ClusterServiceUtils.createClusterService(threadPool); } @@ -161,6 +168,7 @@ public class TokenServiceTests extends ESTestCase { final UserToken token = tokenFuture.get().v1(); assertNotNull(token); mockGetTokenFromId(token); + mockCheckTokenInvalidationFromId(token); ThreadContext requestContext = new ThreadContext(Settings.EMPTY); requestContext.putHeader("Authorization", randomFrom("Bearer ", "BEARER ", "bearer ") + tokenService.getUserTokenString(token)); @@ -207,6 +215,7 @@ public class TokenServiceTests extends ESTestCase { final UserToken token = tokenFuture.get().v1(); assertNotNull(token); mockGetTokenFromId(token); + mockCheckTokenInvalidationFromId(token); ThreadContext requestContext = new ThreadContext(Settings.EMPTY); requestContext.putHeader("Authorization", "Bearer " + tokenService.getUserTokenString(token)); @@ -266,6 +275,7 @@ public class TokenServiceTests extends ESTestCase { final UserToken token = tokenFuture.get().v1(); assertNotNull(token); mockGetTokenFromId(token); + mockCheckTokenInvalidationFromId(token); ThreadContext requestContext = new ThreadContext(Settings.EMPTY); requestContext.putHeader("Authorization", "Bearer " + tokenService.getUserTokenString(token)); @@ -296,6 +306,7 @@ public class TokenServiceTests extends ESTestCase { final UserToken token = tokenFuture.get().v1(); assertNotNull(token); mockGetTokenFromId(token); + mockCheckTokenInvalidationFromId(token); ThreadContext requestContext = new ThreadContext(Settings.EMPTY); requestContext.putHeader("Authorization", "Bearer " + tokenService.getUserTokenString(token)); @@ -357,6 +368,7 @@ public class TokenServiceTests extends ESTestCase { final UserToken token = tokenFuture.get().v1(); assertNotNull(token); mockGetTokenFromId(token); + mockCheckTokenInvalidationFromId(token); ThreadContext requestContext = new ThreadContext(Settings.EMPTY); requestContext.putHeader("Authorization", "Bearer " + tokenService.getUserTokenString(token)); @@ -454,6 +466,7 @@ public class TokenServiceTests extends ESTestCase { tokenService.createUserToken(authentication, authentication, tokenFuture, Collections.emptyMap(), true); final UserToken token = tokenFuture.get().v1(); mockGetTokenFromId(token); + mockCheckTokenInvalidationFromId(token); ThreadContext requestContext = new ThreadContext(Settings.EMPTY); requestContext.putHeader("Authorization", "Bearer " + tokenService.getUserTokenString(token)); @@ -577,14 +590,25 @@ public class TokenServiceTests extends ESTestCase { try (ThreadContext.StoredContext ignore = requestContext.newStoredContext(true)) { PlainActionFuture future = new PlainActionFuture<>(); tokenService.getAndValidateToken(requestContext, future); - UserToken serialized = future.get(); - assertEquals(authentication, serialized.getAuthentication()); + assertNull(future.get()); when(securityIndex.isAvailable()).thenReturn(false); when(securityIndex.indexExists()).thenReturn(true); future = new PlainActionFuture<>(); tokenService.getAndValidateToken(requestContext, future); assertNull(future.get()); + + when(securityIndex.indexExists()).thenReturn(false); + future = new PlainActionFuture<>(); + tokenService.getAndValidateToken(requestContext, future); + assertNull(future.get()); + + when(securityIndex.isAvailable()).thenReturn(true); + when(securityIndex.indexExists()).thenReturn(true); + mockCheckTokenInvalidationFromId(token); + future = new PlainActionFuture<>(); + tokenService.getAndValidateToken(requestContext, future); + assertEquals(token.getAuthentication(), future.get().getAuthentication()); } } @@ -625,4 +649,38 @@ public class TokenServiceTests extends ESTestCase { return Void.TYPE; }).when(client).get(any(GetRequest.class), any(ActionListener.class)); } + + private void mockCheckTokenInvalidationFromId(UserToken userToken) { + mockCheckTokenInvalidationFromId(userToken, client); + } + + public static void mockCheckTokenInvalidationFromId(UserToken userToken, Client client) { + doAnswer(invocationOnMock -> { + MultiGetRequest request = (MultiGetRequest) invocationOnMock.getArguments()[0]; + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + MultiGetResponse response = mock(MultiGetResponse.class); + MultiGetItemResponse[] responses = new MultiGetItemResponse[2]; + when(response.getResponses()).thenReturn(responses); + GetResponse legacyResponse = mock(GetResponse.class); + responses[0] = new MultiGetItemResponse(legacyResponse, null); + when(legacyResponse.isExists()).thenReturn(false); + GetResponse tokenResponse = mock(GetResponse.class); + if (userToken.getId().equals(request.getItems().get(1).id().replace("token_", ""))) { + when(tokenResponse.isExists()).thenReturn(true); + Map sourceMap = new HashMap<>(); + try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) { + userToken.toXContent(builder, ToXContent.EMPTY_PARAMS); + Map accessTokenMap = new HashMap<>(); + accessTokenMap.put("user_token", + XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(builder), false)); + accessTokenMap.put("invalidated", false); + sourceMap.put("access_token", accessTokenMap); + } + when(tokenResponse.getSource()).thenReturn(sourceMap); + } + responses[1] = new MultiGetItemResponse(tokenResponse, null); + listener.onResponse(response); + return Void.TYPE; + }).when(client).multiGet(any(MultiGetRequest.class), any(ActionListener.class)); + } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStoreTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStoreTests.java index f280e85f4ab..014599dedae 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStoreTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStoreTests.java @@ -237,11 +237,17 @@ public class NativeUsersStoreTests extends ESTestCase { when(securityIndex.indexExists()).thenReturn(true); when(securityIndex.isMappingUpToDate()).thenReturn(true); when(securityIndex.isIndexUpToDate()).thenReturn(true); + when(securityIndex.freeze()).thenReturn(securityIndex); doAnswer((i) -> { Runnable action = (Runnable) i.getArguments()[1]; action.run(); return null; }).when(securityIndex).prepareIndexIfNeededThenExecute(any(Consumer.class), any(Runnable.class)); + doAnswer((i) -> { + Runnable action = (Runnable) i.getArguments()[1]; + action.run(); + return null; + }).when(securityIndex).checkIndexVersionThenExecute(any(Consumer.class), any(Runnable.class)); return new NativeUsersStore(Settings.EMPTY, client, securityIndex); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStoreTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStoreTests.java index 89058cf4a8b..2c5bd4a321e 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStoreTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStoreTests.java @@ -89,6 +89,8 @@ public class NativePrivilegeStoreTests extends ESTestCase { } }; final SecurityIndexManager securityIndex = mock(SecurityIndexManager.class); + when(securityIndex.freeze()).thenReturn(securityIndex); + when(securityIndex.indexExists()).thenReturn(true); when(securityIndex.isAvailable()).thenReturn(true); Mockito.doAnswer(invocationOnMock -> { assertThat(invocationOnMock.getArguments().length, equalTo(2)); @@ -96,6 +98,12 @@ public class NativePrivilegeStoreTests extends ESTestCase { ((Runnable) invocationOnMock.getArguments()[1]).run(); return null; }).when(securityIndex).prepareIndexIfNeededThenExecute(any(Consumer.class), any(Runnable.class)); + Mockito.doAnswer(invocationOnMock -> { + assertThat(invocationOnMock.getArguments().length, equalTo(2)); + assertThat(invocationOnMock.getArguments()[1], instanceOf(Runnable.class)); + ((Runnable) invocationOnMock.getArguments()[1]).run(); + return null; + }).when(securityIndex).checkIndexVersionThenExecute(any(Consumer.class), any(Runnable.class)); store = new NativePrivilegeStore(Settings.EMPTY, client, securityIndex); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStoreTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStoreTests.java index 4e5271c520a..0cd44e32e3c 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStoreTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStoreTests.java @@ -188,7 +188,7 @@ public class NativeRolesStoreTests extends ESTestCase { final XPackLicenseState licenseState = mock(XPackLicenseState.class); final AtomicBoolean methodCalled = new AtomicBoolean(false); final SecurityIndexManager securityIndex = - new SecurityIndexManager(Settings.EMPTY, client, SecurityIndexManager.SECURITY_INDEX_NAME, clusterService); + new SecurityIndexManager(client, SecurityIndexManager.SECURITY_INDEX_NAME, clusterService); final NativeRolesStore rolesStore = new NativeRolesStore(Settings.EMPTY, client, licenseState, securityIndex) { @Override void innerPutRole(final PutRoleRequest request, final RoleDescriptor role, final ActionListener listener) { diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/support/SecurityIndexManagerTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/support/SecurityIndexManagerTests.java index 76e84f83137..0741d1c04e9 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/support/SecurityIndexManagerTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/support/SecurityIndexManagerTests.java @@ -62,7 +62,7 @@ public class SecurityIndexManagerTests extends ESTestCase { private static final ClusterName CLUSTER_NAME = new ClusterName("security-index-manager-tests"); private static final ClusterState EMPTY_CLUSTER_STATE = new ClusterState.Builder(CLUSTER_NAME).build(); - public static final String INDEX_NAME = ".security"; + private static final String INDEX_NAME = ".security"; private static final String TEMPLATE_NAME = "SecurityIndexManagerTests-template"; private SecurityIndexManager manager; private Map, Map>> actions; @@ -86,7 +86,7 @@ public class SecurityIndexManagerTests extends ESTestCase { actions.put(action, map); } }; - manager = new SecurityIndexManager(Settings.EMPTY, client, INDEX_NAME, clusterService); + manager = new SecurityIndexManager(client, INDEX_NAME, clusterService); } public void testIndexWithUpToDateMappingAndTemplate() throws IOException {