From 0b4e8db1d3341d096e02bbdbb44fd8e5d221722a Mon Sep 17 00:00:00 2001 From: Jay Modi Date: Tue, 16 Oct 2018 12:48:58 -0600 Subject: [PATCH] Security: don't call prepare index for reads (#34246) The security native stores follow a pattern where `SecurityIndexManager#prepareIndexIfNeededThenExecute` wraps most calls made for the security index. The reasoning behind this was to check if the security index had been upgraded to the latest version in a consistent manner. However, this has the potential side effect that a read will trigger the creation of the security index or an updating of its mappings, which can lead to issues such as failures due to put mapping requests timing out even though we might have been able to read from the index and get the data necessary. This change introduces a new method, `checkIndexVersionThenExecute`, that provides the consistent checking of the security index to make sure it has been upgraded. That is the only check that this method performs prior to running the passed in operation, which removes the possible triggering of index creation and mapping updates for reads. Additionally, areas where we do reads now check the availability of the security index and can short circuit requests. Availability in this context means that the index exists and all primaries are active. Relates #33205 --- .../xpack/security/authc/TokenService.java | 114 ++++++++++-------- .../authc/esnative/NativeUsersStore.java | 111 +++++++++-------- .../mapper/NativeRoleMappingStore.java | 44 +++---- .../authz/store/NativePrivilegeStore.java | 96 ++++++++------- .../authz/store/NativeRolesStore.java | 24 ++-- .../support/SecurityIndexManager.java | 17 +++ ...sportSamlInvalidateSessionActionTests.java | 5 + .../saml/TransportSamlLogoutActionTests.java | 5 + .../authc/AuthenticationServiceTests.java | 10 ++ .../security/authc/TokenServiceTests.java | 62 +++++++++- .../authc/esnative/NativeUsersStoreTests.java | 5 + .../store/NativePrivilegeStoreTests.java | 6 + 12 files changed, 319 insertions(+), 180 deletions(-) diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java index 4f1ec4ad8c0..18f2fe6def4 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java @@ -361,30 +361,34 @@ public final class TokenService extends AbstractComponent { final Cipher cipher = getDecryptionCipher(iv, decodeKey, version, decodedSalt); if (version.onOrAfter(Version.V_6_2_0)) { // we only have the id and need to get the token from the doc! - decryptTokenId(in, cipher, version, ActionListener.wrap(tokenId -> - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> { - final GetRequest getRequest = + decryptTokenId(in, cipher, version, ActionListener.wrap(tokenId -> { + if (securityIndex.isAvailable() == false) { + logger.warn("failed to get token [{}] since index is not available", tokenId); + listener.onResponse(null); + } else { + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> { + final GetRequest getRequest = client.prepareGet(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, - getTokenDocumentId(tokenId)).request(); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest, + getTokenDocumentId(tokenId)).request(); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest, ActionListener.wrap(response -> { if (response.isExists()) { Map accessTokenSource = - (Map) response.getSource().get("access_token"); + (Map) response.getSource().get("access_token"); if (accessTokenSource == null) { listener.onFailure(new IllegalStateException("token document is missing " + - "the access_token field")); + "the access_token field")); } else if (accessTokenSource.containsKey("user_token") == false) { listener.onFailure(new IllegalStateException("token document is missing " + - "the user_token field")); + "the user_token field")); } else { Map userTokenSource = - (Map) accessTokenSource.get("user_token"); + (Map) accessTokenSource.get("user_token"); listener.onResponse(UserToken.fromSourceMap(userTokenSource)); } } else { listener.onFailure( - new IllegalStateException("token document is missing and must be present")); + new IllegalStateException("token document is missing and must be present")); } }, e -> { // if the index or the shard is not there / available we assume that @@ -397,7 +401,8 @@ public final class TokenService extends AbstractComponent { listener.onFailure(e); } }), client::get); - }), listener::onFailure)); + }); + }}, listener::onFailure)); } else { decryptToken(in, cipher, version, listener); } @@ -673,30 +678,36 @@ public final class TokenService extends AbstractComponent { .setVersion(true) .request(); - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> + if (securityIndex.isAvailable() == false) { + logger.debug("security index is not available to find token from refresh token, retrying"); + attemptCount.incrementAndGet(); + findTokenFromRefreshToken(refreshToken, listener, attemptCount); + } else { + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request, - ActionListener.wrap(searchResponse -> { - if (searchResponse.isTimedOut()) { - attemptCount.incrementAndGet(); - findTokenFromRefreshToken(refreshToken, listener, attemptCount); - } else if (searchResponse.getHits().getHits().length < 1) { - logger.info("could not find token document with refresh_token [{}]", refreshToken); - listener.onFailure(invalidGrantException("could not refresh the requested token")); - } else if (searchResponse.getHits().getHits().length > 1) { - listener.onFailure(new IllegalStateException("multiple tokens share the same refresh token")); - } else { - listener.onResponse(new Tuple<>(searchResponse, attemptCount)); - } - }, e -> { - if (isShardNotAvailableException(e)) { - logger.debug("failed to search for token document, retrying", e); - attemptCount.incrementAndGet(); - findTokenFromRefreshToken(refreshToken, listener, attemptCount); - } else { - listener.onFailure(e); - } - }), - client::search)); + ActionListener.wrap(searchResponse -> { + if (searchResponse.isTimedOut()) { + attemptCount.incrementAndGet(); + findTokenFromRefreshToken(refreshToken, listener, attemptCount); + } else if (searchResponse.getHits().getHits().length < 1) { + logger.info("could not find token document with refresh_token [{}]", refreshToken); + listener.onFailure(invalidGrantException("could not refresh the requested token")); + } else if (searchResponse.getHits().getHits().length > 1) { + listener.onFailure(new IllegalStateException("multiple tokens share the same refresh token")); + } else { + listener.onResponse(new Tuple<>(searchResponse, attemptCount)); + } + }, e -> { + if (isShardNotAvailableException(e)) { + logger.debug("failed to search for token document, retrying", e); + attemptCount.incrementAndGet(); + findTokenFromRefreshToken(refreshToken, listener, attemptCount); + } else { + listener.onFailure(e); + } + }), + client::search)); + } } } @@ -831,22 +842,22 @@ public final class TokenService extends AbstractComponent { if (Strings.isNullOrEmpty(realmName)) { listener.onFailure(new IllegalArgumentException("Realm name is required")); - return; - } - - final Instant now = clock.instant(); - final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() + } else if (securityIndex.isAvailable() == false) { + listener.onResponse(Collections.emptyList()); + } else { + final Instant now = clock.instant(); + final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() .filter(QueryBuilders.termQuery("doc_type", "token")) .filter(QueryBuilders.termQuery("access_token.realm", realmName)) .filter(QueryBuilders.boolQuery() - .should(QueryBuilders.boolQuery() - .must(QueryBuilders.termQuery("access_token.invalidated", false)) - .must(QueryBuilders.rangeQuery("access_token.user_token.expiration_time").gte(now.toEpochMilli())) - ) - .should(QueryBuilders.termQuery("refresh_token.invalidated", false)) + .should(QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery("access_token.invalidated", false)) + .must(QueryBuilders.rangeQuery("access_token.user_token.expiration_time").gte(now.toEpochMilli())) + ) + .should(QueryBuilders.termQuery("refresh_token.invalidated", false)) ); - final SearchRequest request = client.prepareSearch(SecurityIndexManager.SECURITY_INDEX_NAME) + final SearchRequest request = client.prepareSearch(SecurityIndexManager.SECURITY_INDEX_NAME) .setScroll(DEFAULT_KEEPALIVE_SETTING.get(settings)) .setQuery(boolQuery) .setVersion(false) @@ -854,9 +865,10 @@ public final class TokenService extends AbstractComponent { .setFetchSource(true) .request(); - final Supplier supplier = client.threadPool().getThreadContext().newRestorableContext(false); - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> - ScrollHelper.fetchAllByEntity(client, request, new ContextPreservingActionListener<>(supplier, listener), this::parseHit)); + final Supplier supplier = client.threadPool().getThreadContext().newRestorableContext(false); + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> + ScrollHelper.fetchAllByEntity(client, request, new ContextPreservingActionListener<>(supplier, listener), this::parseHit)); + } } private Tuple parseHit(SearchHit hit) { @@ -923,10 +935,12 @@ public final class TokenService extends AbstractComponent { */ private void checkIfTokenIsRevoked(UserToken userToken, ActionListener listener) { if (securityIndex.indexExists() == false) { - // index doesn't exist so the token is considered valid. + // index doesn't exist so the token is considered valid. it is important to note that + // we do not use isAvailable as the lack of a shard being available is not equivalent + // to the index not existing in the case of revocation checking. listener.onResponse(userToken); } else { - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> { + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> { MultiGetRequest mGetRequest = client.prepareMultiGet() .add(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, getInvalidatedTokenDocumentId(userToken)) .add(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, getTokenDocumentId(userToken)) diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStore.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStore.java index 620c3817ebb..196a48416a4 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStore.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStore.java @@ -118,8 +118,7 @@ public class NativeUsersStore extends AbstractComponent { } }; - if (securityIndex.indexExists() == false) { - // TODO remove this short circuiting and fix tests that fail without this! + if (securityIndex.isAvailable() == false) { listener.onResponse(Collections.emptyList()); } else if (userNames.length == 1) { // optimization for single user lookup final String username = userNames[0]; @@ -127,7 +126,7 @@ public class NativeUsersStore extends AbstractComponent { (uap) -> listener.onResponse(uap == null ? Collections.emptyList() : Collections.singletonList(uap.user())), handleException)); } else { - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> { + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> { final QueryBuilder query; if (userNames == null || userNames.length == 0) { query = QueryBuilders.termQuery(Fields.TYPE.getPreferredName(), USER_DOC_TYPE); @@ -155,10 +154,10 @@ public class NativeUsersStore extends AbstractComponent { } void getUserCount(final ActionListener listener) { - if (securityIndex.indexExists() == false) { + if (securityIndex.isAvailable() == false) { listener.onResponse(0L); } else { - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, client.prepareSearch(SECURITY_INDEX_NAME) .setQuery(QueryBuilders.termQuery(Fields.TYPE.getPreferredName(), USER_DOC_TYPE)) @@ -182,11 +181,10 @@ public class NativeUsersStore extends AbstractComponent { * Async method to retrieve a user and their password */ private void getUserAndPassword(final String user, final ActionListener listener) { - if (securityIndex.indexExists() == false) { - // TODO remove this short circuiting and fix tests that fail without this! + if (securityIndex.isAvailable() == false) { listener.onResponse(null); } else { - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, client.prepareGet(SECURITY_INDEX_NAME, INDEX_TYPE, getIdForUser(USER_DOC_TYPE, user)).request(), @@ -459,16 +457,19 @@ public class NativeUsersStore extends AbstractComponent { } public void deleteUser(final DeleteUserRequest deleteUserRequest, final ActionListener listener) { - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> { - DeleteRequest request = client.prepareDelete(SECURITY_INDEX_NAME, + if (securityIndex.isAvailable() == false) { + listener.onResponse(false); + } else { + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> { + DeleteRequest request = client.prepareDelete(SECURITY_INDEX_NAME, INDEX_TYPE, getIdForUser(USER_DOC_TYPE, deleteUserRequest.username())).request(); - request.setRefreshPolicy(deleteUserRequest.getRefreshPolicy()); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request, + request.setRefreshPolicy(deleteUserRequest.getRefreshPolicy()); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request, new ActionListener() { @Override public void onResponse(DeleteResponse deleteResponse) { clearRealmCache(deleteUserRequest.username(), listener, - deleteResponse.getResult() == DocWriteResponse.Result.DELETED); + deleteResponse.getResult() == DocWriteResponse.Result.DELETED); } @Override @@ -476,7 +477,8 @@ public class NativeUsersStore extends AbstractComponent { listener.onFailure(e); } }, client::delete); - }); + }); + } } /** @@ -498,11 +500,10 @@ public class NativeUsersStore extends AbstractComponent { } void getReservedUserInfo(String username, ActionListener listener) { - if (securityIndex.indexExists() == false) { - // TODO remove this short circuiting and fix tests that fail without this! + if (securityIndex.isAvailable() == false) { listener.onResponse(null); } else { - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, client.prepareGet(SECURITY_INDEX_NAME, INDEX_TYPE, getIdForUser(RESERVED_USER_TYPE, username)).request(), @@ -541,49 +542,53 @@ public class NativeUsersStore extends AbstractComponent { } void getAllReservedUserInfo(ActionListener> listener) { - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> - executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, - client.prepareSearch(SECURITY_INDEX_NAME) + if (securityIndex.isAvailable() == false) { + listener.onResponse(Collections.emptyMap()); + } else { + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> + executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, + client.prepareSearch(SECURITY_INDEX_NAME) .setQuery(QueryBuilders.termQuery(Fields.TYPE.getPreferredName(), RESERVED_USER_TYPE)) .setFetchSource(true).request(), - new ActionListener() { - @Override - public void onResponse(SearchResponse searchResponse) { - Map userInfos = new HashMap<>(); - assert searchResponse.getHits().getTotalHits() <= 10 : + new ActionListener() { + @Override + public void onResponse(SearchResponse searchResponse) { + Map userInfos = new HashMap<>(); + assert searchResponse.getHits().getTotalHits() <= 10 : "there are more than 10 reserved users we need to change this to retrieve them all!"; - for (SearchHit searchHit : searchResponse.getHits().getHits()) { - Map sourceMap = searchHit.getSourceAsMap(); - String password = (String) sourceMap.get(Fields.PASSWORD.getPreferredName()); - Boolean enabled = (Boolean) sourceMap.get(Fields.ENABLED.getPreferredName()); - final String id = searchHit.getId(); - assert id != null && id.startsWith(RESERVED_USER_TYPE) : + for (SearchHit searchHit : searchResponse.getHits().getHits()) { + Map sourceMap = searchHit.getSourceAsMap(); + String password = (String) sourceMap.get(Fields.PASSWORD.getPreferredName()); + Boolean enabled = (Boolean) sourceMap.get(Fields.ENABLED.getPreferredName()); + final String id = searchHit.getId(); + assert id != null && id.startsWith(RESERVED_USER_TYPE) : "id [" + id + "] does not start with reserved-user prefix"; - final String username = id.substring(RESERVED_USER_TYPE.length() + 1); - if (password == null) { - listener.onFailure(new IllegalStateException("password hash must not be null!")); - return; - } else if (enabled == null) { - listener.onFailure(new IllegalStateException("enabled must not be null!")); - return; + final String username = id.substring(RESERVED_USER_TYPE.length() + 1); + if (password == null) { + listener.onFailure(new IllegalStateException("password hash must not be null!")); + return; + } else if (enabled == null) { + listener.onFailure(new IllegalStateException("enabled must not be null!")); + return; + } else { + userInfos.put(username, new ReservedUserInfo(password.toCharArray(), enabled, false)); + } + } + listener.onResponse(userInfos); + } + + @Override + public void onFailure(Exception e) { + if (e instanceof IndexNotFoundException) { + logger.trace("could not retrieve built in users since security index does not exist", e); + listener.onResponse(Collections.emptyMap()); } else { - userInfos.put(username, new ReservedUserInfo(password.toCharArray(), enabled, false)); + logger.error("failed to retrieve built in users", e); + listener.onFailure(e); } } - listener.onResponse(userInfos); - } - - @Override - public void onFailure(Exception e) { - if (e instanceof IndexNotFoundException) { - logger.trace("could not retrieve built in users since security index does not exist", e); - listener.onResponse(Collections.emptyMap()); - } else { - logger.error("failed to retrieve built in users", e); - listener.onFailure(e); - } - } - }, client::search)); + }, client::search)); + } } private void clearRealmCache(String username, ActionListener listener, Response response) { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/mapper/NativeRoleMappingStore.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/mapper/NativeRoleMappingStore.java index b45de8184d6..ba3e5ee3793 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/mapper/NativeRoleMappingStore.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/mapper/NativeRoleMappingStore.java @@ -220,32 +220,32 @@ public class NativeRoleMappingStore extends AbstractComponent implements UserRol }); } - private void innerDeleteMapping(DeleteRoleMappingRequest request, ActionListener listener) throws IOException { - if (securityIndex.isIndexUpToDate() == false) { - listener.onFailure(new IllegalStateException( - "Security index is not on the current version - the native realm will not be operational until " + - "the upgrade API is run on the security index")); - return; - } - executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, - client.prepareDelete(SECURITY_INDEX_NAME, SECURITY_GENERIC_TYPE, getIdForName(request.getName())) + private void innerDeleteMapping(DeleteRoleMappingRequest request, ActionListener listener) { + if (securityIndex.isAvailable() == false) { + listener.onResponse(false); + } else { + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> { + executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, + client.prepareDelete(SECURITY_INDEX_NAME, SECURITY_GENERIC_TYPE, getIdForName(request.getName())) .setRefreshPolicy(request.getRefreshPolicy()) .request(), - new ActionListener() { + new ActionListener() { - @Override - public void onResponse(DeleteResponse deleteResponse) { - boolean deleted = deleteResponse.getResult() == DELETED; - listener.onResponse(deleted); - } + @Override + public void onResponse(DeleteResponse deleteResponse) { + boolean deleted = deleteResponse.getResult() == DELETED; + listener.onResponse(deleted); + } - @Override - public void onFailure(Exception e) { - logger.error(new ParameterizedMessage("failed to delete role-mapping [{}]", request.getName()), e); - listener.onFailure(e); + @Override + public void onFailure(Exception e) { + logger.error(new ParameterizedMessage("failed to delete role-mapping [{}]", request.getName()), e); + listener.onFailure(e); - } - }, client::delete); + } + }, client::delete); + }); + } } /** @@ -301,7 +301,7 @@ public class NativeRoleMappingStore extends AbstractComponent implements UserRol * */ public void usageStats(ActionListener> listener) { - if (securityIndex.indexExists() == false) { + if (securityIndex.isAvailable() == false) { reportStats(listener, Collections.emptyList()); } else { getMappings(ActionListener.wrap(mappings -> reportStats(listener, mappings), listener::onFailure)); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStore.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStore.java index 2cfa89b647c..e7a27855f5d 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStore.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStore.java @@ -88,13 +88,15 @@ public class NativePrivilegeStore extends AbstractComponent { public void getPrivileges(Collection applications, Collection names, ActionListener> listener) { - if (applications != null && applications.size() == 1 && names != null && names.size() == 1) { + if (securityIndexManager.isAvailable() == false) { + listener.onResponse(Collections.emptyList()); + } else if (applications != null && applications.size() == 1 && names != null && names.size() == 1) { getPrivilege(Objects.requireNonNull(Iterables.get(applications, 0)), Objects.requireNonNull(Iterables.get(names, 0)), ActionListener.wrap(privilege -> listener.onResponse(privilege == null ? Collections.emptyList() : Collections.singletonList(privilege)), listener::onFailure)); } else { - securityIndexManager.prepareIndexIfNeededThenExecute(listener::onFailure, () -> { + securityIndexManager.checkIndexVersionThenExecute(listener::onFailure, () -> { final QueryBuilder query; final TermQueryBuilder typeQuery = QueryBuilders .termQuery(ApplicationPrivilegeDescriptor.Fields.TYPE.getPreferredName(), DOC_TYPE_VALUE); @@ -134,33 +136,37 @@ public class NativePrivilegeStore extends AbstractComponent { return collection == null || collection.isEmpty(); } - public void getPrivilege(String application, String name, ActionListener listener) { - securityIndexManager.prepareIndexIfNeededThenExecute(listener::onFailure, - () -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, - client.prepareGet(SECURITY_INDEX_NAME, "doc", toDocId(application, name)).request(), - new ActionListener() { - @Override - public void onResponse(GetResponse response) { - if (response.isExists()) { - listener.onResponse(buildPrivilege(response.getId(), response.getSourceAsBytesRef())); - } else { - listener.onResponse(null); + void getPrivilege(String application, String name, ActionListener listener) { + if (securityIndexManager.isAvailable() == false) { + listener.onResponse(null); + } else { + securityIndexManager.checkIndexVersionThenExecute(listener::onFailure, + () -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, + client.prepareGet(SECURITY_INDEX_NAME, "doc", toDocId(application, name)).request(), + new ActionListener() { + @Override + public void onResponse(GetResponse response) { + if (response.isExists()) { + listener.onResponse(buildPrivilege(response.getId(), response.getSourceAsBytesRef())); + } else { + listener.onResponse(null); + } } - } - @Override - public void onFailure(Exception e) { - // if the index or the shard is not there / available we just claim the privilege is not there - if (TransportActions.isShardNotAvailableException(e)) { - logger.warn(new ParameterizedMessage("failed to load privilege [{}] index not available", name), e); - listener.onResponse(null); - } else { - logger.error(new ParameterizedMessage("failed to load privilege [{}]", name), e); - listener.onFailure(e); + @Override + public void onFailure(Exception e) { + // if the index or the shard is not there / available we just claim the privilege is not there + if (TransportActions.isShardNotAvailableException(e)) { + logger.warn(new ParameterizedMessage("failed to load privilege [{}] index not available", name), e); + listener.onResponse(null); + } else { + logger.error(new ParameterizedMessage("failed to load privilege [{}]", name), e); + listener.onFailure(e); + } } - } - }, - client::get)); + }, + client::get)); + } } public void putPrivileges(Collection privileges, WriteRequest.RefreshPolicy refreshPolicy, @@ -200,23 +206,27 @@ public class NativePrivilegeStore extends AbstractComponent { public void deletePrivileges(String application, Collection names, WriteRequest.RefreshPolicy refreshPolicy, ActionListener>> listener) { - securityIndexManager.prepareIndexIfNeededThenExecute(listener::onFailure, () -> { - ActionListener groupListener = new GroupedActionListener<>( - ActionListener.wrap(responses -> { - final Map> deletedNames = responses.stream() - .filter(r -> r.getResult() == DocWriteResponse.Result.DELETED) - .map(r -> r.getId()) - .map(NativePrivilegeStore::nameFromDocId) - .collect(TUPLES_TO_MAP); - clearRolesCache(listener, deletedNames); - }, listener::onFailure), names.size(), Collections.emptyList()); - for (String name : names) { - ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, - client.prepareDelete(SECURITY_INDEX_NAME, "doc", toDocId(application, name)) - .setRefreshPolicy(refreshPolicy) - .request(), groupListener, client::delete); - } - }); + if (securityIndexManager.isAvailable() == false) { + listener.onResponse(Collections.emptyMap()); + } else { + securityIndexManager.checkIndexVersionThenExecute(listener::onFailure, () -> { + ActionListener groupListener = new GroupedActionListener<>( + ActionListener.wrap(responses -> { + final Map> deletedNames = responses.stream() + .filter(r -> r.getResult() == DocWriteResponse.Result.DELETED) + .map(r -> r.getId()) + .map(NativePrivilegeStore::nameFromDocId) + .collect(TUPLES_TO_MAP); + clearRolesCache(listener, deletedNames); + }, listener::onFailure), names.size(), Collections.emptyList()); + for (String name : names) { + ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, + client.prepareDelete(SECURITY_INDEX_NAME, "doc", toDocId(application, name)) + .setRefreshPolicy(refreshPolicy) + .request(), groupListener, client::delete); + } + }); + } } private void clearRolesCache(ActionListener listener, T value) { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java index d604d166812..c5be68f4199 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativeRolesStore.java @@ -116,7 +116,7 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer { + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> { QueryBuilder query; if (names == null || names.isEmpty()) { query = QueryBuilders.termQuery(RoleDescriptor.Fields.TYPE.getPreferredName(), ROLE_TYPE); @@ -144,16 +144,19 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer listener) { - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> { - DeleteRequest request = client.prepareDelete(SecurityIndexManager.SECURITY_INDEX_NAME, + if (securityIndex.isAvailable() == false) { + listener.onResponse(false); + } else { + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> { + DeleteRequest request = client.prepareDelete(SecurityIndexManager.SECURITY_INDEX_NAME, ROLE_DOC_TYPE, getIdForUser(deleteRoleRequest.name())).request(); - request.setRefreshPolicy(deleteRoleRequest.getRefreshPolicy()); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request, + request.setRefreshPolicy(deleteRoleRequest.getRefreshPolicy()); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request, new ActionListener() { @Override public void onResponse(DeleteResponse deleteResponse) { clearRoleCache(deleteRoleRequest.name(), listener, - deleteResponse.getResult() == DocWriteResponse.Result.DELETED); + deleteResponse.getResult() == DocWriteResponse.Result.DELETED); } @Override @@ -162,7 +165,8 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer listener) { @@ -210,13 +214,13 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer> listener) { Map usageStats = new HashMap<>(3); - if (securityIndex.indexExists() == false) { + if (securityIndex.isAvailable() == false) { usageStats.put("size", 0L); usageStats.put("fls", false); usageStats.put("dls", false); listener.onResponse(usageStats); } else { - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, client.prepareMultiSearch() .add(client.prepareSearch(SecurityIndexManager.SECURITY_INDEX_NAME) @@ -298,7 +302,7 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer listener) { - securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> + securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, client.prepareGet(SECURITY_INDEX_NAME, ROLE_DOC_TYPE, getIdForUser(role)).request(), diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityIndexManager.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityIndexManager.java index d02b569a744..18d86ab028c 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityIndexManager.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityIndexManager.java @@ -253,6 +253,23 @@ public class SecurityIndexManager extends AbstractComponent implements ClusterSt } } + /** + * Validates the security index is up to date and does not need to migrated. If it is not, the + * consumer is called with an exception. If the security index is up to date, the runnable will + * be executed. NOTE: this method does not check the availability of the index; this check + * is left to the caller so that this condition can be handled appropriately. + */ + public void checkIndexVersionThenExecute(final Consumer consumer, final Runnable andThen) { + final State indexState = this.indexState; // use a local copy so all checks execute against the same state! + if (indexState.indexExists && indexState.isIndexUpToDate == false) { + consumer.accept(new IllegalStateException( + "Security index is not on the current version. Security features relying on the index will not be available until " + + "the upgrade API is run on the security index")); + } else { + andThen.run(); + } + } + /** * Prepares the index by creating it if it doesn't exist or updating the mappings if the mappings are * out of date. After any tasks have been executed, the runnable is then executed. diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlInvalidateSessionActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlInvalidateSessionActionTests.java index 17a45f23893..65f95ecb2f8 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlInvalidateSessionActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlInvalidateSessionActionTests.java @@ -163,6 +163,11 @@ public class TransportSamlInvalidateSessionActionTests extends SamlTestCase { ((Runnable) inv.getArguments()[1]).run(); return null; }).when(securityIndex).prepareIndexIfNeededThenExecute(any(Consumer.class), any(Runnable.class)); + doAnswer(inv -> { + ((Runnable) inv.getArguments()[1]).run(); + return null; + }).when(securityIndex).checkIndexVersionThenExecute(any(Consumer.class), any(Runnable.class)); + when(securityIndex.isAvailable()).thenReturn(true); final ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); tokenService = new TokenService(settings, Clock.systemUTC(), client, securityIndex, clusterService); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlLogoutActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlLogoutActionTests.java index 291c102f396..f2f94176edc 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlLogoutActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlLogoutActionTests.java @@ -178,6 +178,11 @@ public class TransportSamlLogoutActionTests extends SamlTestCase { ((Runnable) inv.getArguments()[1]).run(); return null; }).when(securityIndex).prepareIndexIfNeededThenExecute(any(Consumer.class), any(Runnable.class)); + doAnswer(inv -> { + ((Runnable) inv.getArguments()[1]).run(); + return null; + }).when(securityIndex).checkIndexVersionThenExecute(any(Consumer.class), any(Runnable.class)); + when(securityIndex.isAvailable()).thenReturn(true); final ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); tokenService = new TokenService(settings, Clock.systemUTC(), client, securityIndex, clusterService); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/AuthenticationServiceTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/AuthenticationServiceTests.java index ef5b0386bc2..54908389610 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/AuthenticationServiceTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/AuthenticationServiceTests.java @@ -87,6 +87,7 @@ import java.util.function.Consumer; import static org.elasticsearch.test.SecurityTestsUtils.assertAuthenticationException; import static org.elasticsearch.xpack.core.security.support.Exceptions.authenticationError; +import static org.elasticsearch.xpack.security.authc.TokenServiceTests.mockCheckTokenInvalidationFromId; import static org.elasticsearch.xpack.security.authc.TokenServiceTests.mockGetTokenFromId; import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.contains; @@ -187,6 +188,11 @@ public class AuthenticationServiceTests extends ESTestCase { runnable.run(); return null; }).when(securityIndex).prepareIndexIfNeededThenExecute(any(Consumer.class), any(Runnable.class)); + doAnswer(invocationOnMock -> { + Runnable runnable = (Runnable) invocationOnMock.getArguments()[1]; + runnable.run(); + return null; + }).when(securityIndex).checkIndexVersionThenExecute(any(Consumer.class), any(Runnable.class)); ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); tokenService = new TokenService(settings, Clock.systemUTC(), client, securityIndex, clusterService); service = new AuthenticationService(settings, realms, auditTrail, @@ -898,7 +904,11 @@ public class AuthenticationServiceTests extends ESTestCase { tokenService.createUserToken(expected, originatingAuth, tokenFuture, Collections.emptyMap(), true); } String token = tokenService.getUserTokenString(tokenFuture.get().v1()); + when(client.prepareMultiGet()).thenReturn(new MultiGetRequestBuilder(client, MultiGetAction.INSTANCE)); mockGetTokenFromId(tokenFuture.get().v1(), client); + mockCheckTokenInvalidationFromId(tokenFuture.get().v1(), client); + when(securityIndex.isAvailable()).thenReturn(true); + when(securityIndex.indexExists()).thenReturn(true); try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { threadContext.putHeader("Authorization", "Bearer " + token); service.authenticate("_action", message, (User)null, ActionListener.wrap(result -> { diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/TokenServiceTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/TokenServiceTests.java index 213def0f0fe..7926b44a38c 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/TokenServiceTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/TokenServiceTests.java @@ -137,6 +137,13 @@ public class TokenServiceTests extends ESTestCase { runnable.run(); return null; }).when(securityIndex).prepareIndexIfNeededThenExecute(any(Consumer.class), any(Runnable.class)); + doAnswer(invocationOnMock -> { + Runnable runnable = (Runnable) invocationOnMock.getArguments()[1]; + runnable.run(); + return null; + }).when(securityIndex).checkIndexVersionThenExecute(any(Consumer.class), any(Runnable.class)); + when(securityIndex.indexExists()).thenReturn(true); + when(securityIndex.isAvailable()).thenReturn(true); this.clusterService = ClusterServiceUtils.createClusterService(threadPool); } @@ -161,6 +168,7 @@ public class TokenServiceTests extends ESTestCase { final UserToken token = tokenFuture.get().v1(); assertNotNull(token); mockGetTokenFromId(token); + mockCheckTokenInvalidationFromId(token); ThreadContext requestContext = new ThreadContext(Settings.EMPTY); requestContext.putHeader("Authorization", randomFrom("Bearer ", "BEARER ", "bearer ") + tokenService.getUserTokenString(token)); @@ -207,6 +215,7 @@ public class TokenServiceTests extends ESTestCase { final UserToken token = tokenFuture.get().v1(); assertNotNull(token); mockGetTokenFromId(token); + mockCheckTokenInvalidationFromId(token); ThreadContext requestContext = new ThreadContext(Settings.EMPTY); requestContext.putHeader("Authorization", "Bearer " + tokenService.getUserTokenString(token)); @@ -266,6 +275,7 @@ public class TokenServiceTests extends ESTestCase { final UserToken token = tokenFuture.get().v1(); assertNotNull(token); mockGetTokenFromId(token); + mockCheckTokenInvalidationFromId(token); ThreadContext requestContext = new ThreadContext(Settings.EMPTY); requestContext.putHeader("Authorization", "Bearer " + tokenService.getUserTokenString(token)); @@ -296,6 +306,7 @@ public class TokenServiceTests extends ESTestCase { final UserToken token = tokenFuture.get().v1(); assertNotNull(token); mockGetTokenFromId(token); + mockCheckTokenInvalidationFromId(token); ThreadContext requestContext = new ThreadContext(Settings.EMPTY); requestContext.putHeader("Authorization", "Bearer " + tokenService.getUserTokenString(token)); @@ -357,6 +368,7 @@ public class TokenServiceTests extends ESTestCase { final UserToken token = tokenFuture.get().v1(); assertNotNull(token); mockGetTokenFromId(token); + mockCheckTokenInvalidationFromId(token); ThreadContext requestContext = new ThreadContext(Settings.EMPTY); requestContext.putHeader("Authorization", "Bearer " + tokenService.getUserTokenString(token)); @@ -454,6 +466,7 @@ public class TokenServiceTests extends ESTestCase { tokenService.createUserToken(authentication, authentication, tokenFuture, Collections.emptyMap(), true); final UserToken token = tokenFuture.get().v1(); mockGetTokenFromId(token); + mockCheckTokenInvalidationFromId(token); ThreadContext requestContext = new ThreadContext(Settings.EMPTY); requestContext.putHeader("Authorization", "Bearer " + tokenService.getUserTokenString(token)); @@ -577,14 +590,25 @@ public class TokenServiceTests extends ESTestCase { try (ThreadContext.StoredContext ignore = requestContext.newStoredContext(true)) { PlainActionFuture future = new PlainActionFuture<>(); tokenService.getAndValidateToken(requestContext, future); - UserToken serialized = future.get(); - assertEquals(authentication, serialized.getAuthentication()); + assertNull(future.get()); when(securityIndex.isAvailable()).thenReturn(false); when(securityIndex.indexExists()).thenReturn(true); future = new PlainActionFuture<>(); tokenService.getAndValidateToken(requestContext, future); assertNull(future.get()); + + when(securityIndex.indexExists()).thenReturn(false); + future = new PlainActionFuture<>(); + tokenService.getAndValidateToken(requestContext, future); + assertNull(future.get()); + + when(securityIndex.isAvailable()).thenReturn(true); + when(securityIndex.indexExists()).thenReturn(true); + mockCheckTokenInvalidationFromId(token); + future = new PlainActionFuture<>(); + tokenService.getAndValidateToken(requestContext, future); + assertEquals(token.getAuthentication(), future.get().getAuthentication()); } } @@ -625,4 +649,38 @@ public class TokenServiceTests extends ESTestCase { return Void.TYPE; }).when(client).get(any(GetRequest.class), any(ActionListener.class)); } + + private void mockCheckTokenInvalidationFromId(UserToken userToken) { + mockCheckTokenInvalidationFromId(userToken, client); + } + + public static void mockCheckTokenInvalidationFromId(UserToken userToken, Client client) { + doAnswer(invocationOnMock -> { + MultiGetRequest request = (MultiGetRequest) invocationOnMock.getArguments()[0]; + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + MultiGetResponse response = mock(MultiGetResponse.class); + MultiGetItemResponse[] responses = new MultiGetItemResponse[2]; + when(response.getResponses()).thenReturn(responses); + GetResponse legacyResponse = mock(GetResponse.class); + responses[0] = new MultiGetItemResponse(legacyResponse, null); + when(legacyResponse.isExists()).thenReturn(false); + GetResponse tokenResponse = mock(GetResponse.class); + if (userToken.getId().equals(request.getItems().get(1).id().replace("token_", ""))) { + when(tokenResponse.isExists()).thenReturn(true); + Map sourceMap = new HashMap<>(); + try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) { + userToken.toXContent(builder, ToXContent.EMPTY_PARAMS); + Map accessTokenMap = new HashMap<>(); + accessTokenMap.put("user_token", + XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(builder), false)); + accessTokenMap.put("invalidated", false); + sourceMap.put("access_token", accessTokenMap); + } + when(tokenResponse.getSource()).thenReturn(sourceMap); + } + responses[1] = new MultiGetItemResponse(tokenResponse, null); + listener.onResponse(response); + return Void.TYPE; + }).when(client).multiGet(any(MultiGetRequest.class), any(ActionListener.class)); + } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStoreTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStoreTests.java index f280e85f4ab..425bebc0600 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStoreTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/esnative/NativeUsersStoreTests.java @@ -242,6 +242,11 @@ public class NativeUsersStoreTests extends ESTestCase { action.run(); return null; }).when(securityIndex).prepareIndexIfNeededThenExecute(any(Consumer.class), any(Runnable.class)); + doAnswer((i) -> { + Runnable action = (Runnable) i.getArguments()[1]; + action.run(); + return null; + }).when(securityIndex).checkIndexVersionThenExecute(any(Consumer.class), any(Runnable.class)); return new NativeUsersStore(Settings.EMPTY, client, securityIndex); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStoreTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStoreTests.java index 89058cf4a8b..9b08691f415 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStoreTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStoreTests.java @@ -96,6 +96,12 @@ public class NativePrivilegeStoreTests extends ESTestCase { ((Runnable) invocationOnMock.getArguments()[1]).run(); return null; }).when(securityIndex).prepareIndexIfNeededThenExecute(any(Consumer.class), any(Runnable.class)); + Mockito.doAnswer(invocationOnMock -> { + assertThat(invocationOnMock.getArguments().length, equalTo(2)); + assertThat(invocationOnMock.getArguments()[1], instanceOf(Runnable.class)); + ((Runnable) invocationOnMock.getArguments()[1]).run(); + return null; + }).when(securityIndex).checkIndexVersionThenExecute(any(Consumer.class), any(Runnable.class)); store = new NativePrivilegeStore(Settings.EMPTY, client, securityIndex); }