mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
Security: don't call prepare index for reads (#34246)
The security native stores follow a pattern where `SecurityIndexManager#prepareIndexIfNeededThenExecute` wraps most calls made for the security index. The reasoning behind this was to check if the security index had been upgraded to the latest version in a consistent manner. However, this has the potential side effect that a read will trigger the creation of the security index or an updating of its mappings, which can lead to issues such as failures due to put mapping requests timing out even though we might have been able to read from the index and get the data necessary. This change introduces a new method, `checkIndexVersionThenExecute`, that provides the consistent checking of the security index to make sure it has been upgraded. That is the only check that this method performs prior to running the passed in operation, which removes the possible triggering of index creation and mapping updates for reads. Additionally, areas where we do reads now check the availability of the security index and can short circuit requests. Availability in this context means that the index exists and all primaries are active. Relates #33205
This commit is contained in:
parent
a93aefb4a4
commit
0b4e8db1d3
@ -361,30 +361,34 @@ public final class TokenService extends AbstractComponent {
|
||||
final Cipher cipher = getDecryptionCipher(iv, decodeKey, version, decodedSalt);
|
||||
if (version.onOrAfter(Version.V_6_2_0)) {
|
||||
// we only have the id and need to get the token from the doc!
|
||||
decryptTokenId(in, cipher, version, ActionListener.wrap(tokenId ->
|
||||
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> {
|
||||
final GetRequest getRequest =
|
||||
decryptTokenId(in, cipher, version, ActionListener.wrap(tokenId -> {
|
||||
if (securityIndex.isAvailable() == false) {
|
||||
logger.warn("failed to get token [{}] since index is not available", tokenId);
|
||||
listener.onResponse(null);
|
||||
} else {
|
||||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> {
|
||||
final GetRequest getRequest =
|
||||
client.prepareGet(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE,
|
||||
getTokenDocumentId(tokenId)).request();
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest,
|
||||
getTokenDocumentId(tokenId)).request();
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest,
|
||||
ActionListener.<GetResponse>wrap(response -> {
|
||||
if (response.isExists()) {
|
||||
Map<String, Object> accessTokenSource =
|
||||
(Map<String, Object>) response.getSource().get("access_token");
|
||||
(Map<String, Object>) response.getSource().get("access_token");
|
||||
if (accessTokenSource == null) {
|
||||
listener.onFailure(new IllegalStateException("token document is missing " +
|
||||
"the access_token field"));
|
||||
"the access_token field"));
|
||||
} else if (accessTokenSource.containsKey("user_token") == false) {
|
||||
listener.onFailure(new IllegalStateException("token document is missing " +
|
||||
"the user_token field"));
|
||||
"the user_token field"));
|
||||
} else {
|
||||
Map<String, Object> userTokenSource =
|
||||
(Map<String, Object>) accessTokenSource.get("user_token");
|
||||
(Map<String, Object>) accessTokenSource.get("user_token");
|
||||
listener.onResponse(UserToken.fromSourceMap(userTokenSource));
|
||||
}
|
||||
} else {
|
||||
listener.onFailure(
|
||||
new IllegalStateException("token document is missing and must be present"));
|
||||
new IllegalStateException("token document is missing and must be present"));
|
||||
}
|
||||
}, e -> {
|
||||
// if the index or the shard is not there / available we assume that
|
||||
@ -397,7 +401,8 @@ public final class TokenService extends AbstractComponent {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}), client::get);
|
||||
}), listener::onFailure));
|
||||
});
|
||||
}}, listener::onFailure));
|
||||
} else {
|
||||
decryptToken(in, cipher, version, listener);
|
||||
}
|
||||
@ -673,30 +678,36 @@ public final class TokenService extends AbstractComponent {
|
||||
.setVersion(true)
|
||||
.request();
|
||||
|
||||
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () ->
|
||||
if (securityIndex.isAvailable() == false) {
|
||||
logger.debug("security index is not available to find token from refresh token, retrying");
|
||||
attemptCount.incrementAndGet();
|
||||
findTokenFromRefreshToken(refreshToken, listener, attemptCount);
|
||||
} else {
|
||||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () ->
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
|
||||
ActionListener.<SearchResponse>wrap(searchResponse -> {
|
||||
if (searchResponse.isTimedOut()) {
|
||||
attemptCount.incrementAndGet();
|
||||
findTokenFromRefreshToken(refreshToken, listener, attemptCount);
|
||||
} else if (searchResponse.getHits().getHits().length < 1) {
|
||||
logger.info("could not find token document with refresh_token [{}]", refreshToken);
|
||||
listener.onFailure(invalidGrantException("could not refresh the requested token"));
|
||||
} else if (searchResponse.getHits().getHits().length > 1) {
|
||||
listener.onFailure(new IllegalStateException("multiple tokens share the same refresh token"));
|
||||
} else {
|
||||
listener.onResponse(new Tuple<>(searchResponse, attemptCount));
|
||||
}
|
||||
}, e -> {
|
||||
if (isShardNotAvailableException(e)) {
|
||||
logger.debug("failed to search for token document, retrying", e);
|
||||
attemptCount.incrementAndGet();
|
||||
findTokenFromRefreshToken(refreshToken, listener, attemptCount);
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}),
|
||||
client::search));
|
||||
ActionListener.<SearchResponse>wrap(searchResponse -> {
|
||||
if (searchResponse.isTimedOut()) {
|
||||
attemptCount.incrementAndGet();
|
||||
findTokenFromRefreshToken(refreshToken, listener, attemptCount);
|
||||
} else if (searchResponse.getHits().getHits().length < 1) {
|
||||
logger.info("could not find token document with refresh_token [{}]", refreshToken);
|
||||
listener.onFailure(invalidGrantException("could not refresh the requested token"));
|
||||
} else if (searchResponse.getHits().getHits().length > 1) {
|
||||
listener.onFailure(new IllegalStateException("multiple tokens share the same refresh token"));
|
||||
} else {
|
||||
listener.onResponse(new Tuple<>(searchResponse, attemptCount));
|
||||
}
|
||||
}, e -> {
|
||||
if (isShardNotAvailableException(e)) {
|
||||
logger.debug("failed to search for token document, retrying", e);
|
||||
attemptCount.incrementAndGet();
|
||||
findTokenFromRefreshToken(refreshToken, listener, attemptCount);
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}),
|
||||
client::search));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -831,22 +842,22 @@ public final class TokenService extends AbstractComponent {
|
||||
|
||||
if (Strings.isNullOrEmpty(realmName)) {
|
||||
listener.onFailure(new IllegalArgumentException("Realm name is required"));
|
||||
return;
|
||||
}
|
||||
|
||||
final Instant now = clock.instant();
|
||||
final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
|
||||
} else if (securityIndex.isAvailable() == false) {
|
||||
listener.onResponse(Collections.emptyList());
|
||||
} else {
|
||||
final Instant now = clock.instant();
|
||||
final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
|
||||
.filter(QueryBuilders.termQuery("doc_type", "token"))
|
||||
.filter(QueryBuilders.termQuery("access_token.realm", realmName))
|
||||
.filter(QueryBuilders.boolQuery()
|
||||
.should(QueryBuilders.boolQuery()
|
||||
.must(QueryBuilders.termQuery("access_token.invalidated", false))
|
||||
.must(QueryBuilders.rangeQuery("access_token.user_token.expiration_time").gte(now.toEpochMilli()))
|
||||
)
|
||||
.should(QueryBuilders.termQuery("refresh_token.invalidated", false))
|
||||
.should(QueryBuilders.boolQuery()
|
||||
.must(QueryBuilders.termQuery("access_token.invalidated", false))
|
||||
.must(QueryBuilders.rangeQuery("access_token.user_token.expiration_time").gte(now.toEpochMilli()))
|
||||
)
|
||||
.should(QueryBuilders.termQuery("refresh_token.invalidated", false))
|
||||
);
|
||||
|
||||
final SearchRequest request = client.prepareSearch(SecurityIndexManager.SECURITY_INDEX_NAME)
|
||||
final SearchRequest request = client.prepareSearch(SecurityIndexManager.SECURITY_INDEX_NAME)
|
||||
.setScroll(DEFAULT_KEEPALIVE_SETTING.get(settings))
|
||||
.setQuery(boolQuery)
|
||||
.setVersion(false)
|
||||
@ -854,9 +865,10 @@ public final class TokenService extends AbstractComponent {
|
||||
.setFetchSource(true)
|
||||
.request();
|
||||
|
||||
final Supplier<ThreadContext.StoredContext> supplier = client.threadPool().getThreadContext().newRestorableContext(false);
|
||||
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () ->
|
||||
ScrollHelper.fetchAllByEntity(client, request, new ContextPreservingActionListener<>(supplier, listener), this::parseHit));
|
||||
final Supplier<ThreadContext.StoredContext> supplier = client.threadPool().getThreadContext().newRestorableContext(false);
|
||||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () ->
|
||||
ScrollHelper.fetchAllByEntity(client, request, new ContextPreservingActionListener<>(supplier, listener), this::parseHit));
|
||||
}
|
||||
}
|
||||
|
||||
private Tuple<UserToken, String> parseHit(SearchHit hit) {
|
||||
@ -923,10 +935,12 @@ public final class TokenService extends AbstractComponent {
|
||||
*/
|
||||
private void checkIfTokenIsRevoked(UserToken userToken, ActionListener<UserToken> listener) {
|
||||
if (securityIndex.indexExists() == false) {
|
||||
// index doesn't exist so the token is considered valid.
|
||||
// index doesn't exist so the token is considered valid. it is important to note that
|
||||
// we do not use isAvailable as the lack of a shard being available is not equivalent
|
||||
// to the index not existing in the case of revocation checking.
|
||||
listener.onResponse(userToken);
|
||||
} else {
|
||||
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> {
|
||||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> {
|
||||
MultiGetRequest mGetRequest = client.prepareMultiGet()
|
||||
.add(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, getInvalidatedTokenDocumentId(userToken))
|
||||
.add(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, getTokenDocumentId(userToken))
|
||||
|
@ -118,8 +118,7 @@ public class NativeUsersStore extends AbstractComponent {
|
||||
}
|
||||
};
|
||||
|
||||
if (securityIndex.indexExists() == false) {
|
||||
// TODO remove this short circuiting and fix tests that fail without this!
|
||||
if (securityIndex.isAvailable() == false) {
|
||||
listener.onResponse(Collections.emptyList());
|
||||
} else if (userNames.length == 1) { // optimization for single user lookup
|
||||
final String username = userNames[0];
|
||||
@ -127,7 +126,7 @@ public class NativeUsersStore extends AbstractComponent {
|
||||
(uap) -> listener.onResponse(uap == null ? Collections.emptyList() : Collections.singletonList(uap.user())),
|
||||
handleException));
|
||||
} else {
|
||||
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> {
|
||||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> {
|
||||
final QueryBuilder query;
|
||||
if (userNames == null || userNames.length == 0) {
|
||||
query = QueryBuilders.termQuery(Fields.TYPE.getPreferredName(), USER_DOC_TYPE);
|
||||
@ -155,10 +154,10 @@ public class NativeUsersStore extends AbstractComponent {
|
||||
}
|
||||
|
||||
void getUserCount(final ActionListener<Long> listener) {
|
||||
if (securityIndex.indexExists() == false) {
|
||||
if (securityIndex.isAvailable() == false) {
|
||||
listener.onResponse(0L);
|
||||
} else {
|
||||
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () ->
|
||||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () ->
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
|
||||
client.prepareSearch(SECURITY_INDEX_NAME)
|
||||
.setQuery(QueryBuilders.termQuery(Fields.TYPE.getPreferredName(), USER_DOC_TYPE))
|
||||
@ -182,11 +181,10 @@ public class NativeUsersStore extends AbstractComponent {
|
||||
* Async method to retrieve a user and their password
|
||||
*/
|
||||
private void getUserAndPassword(final String user, final ActionListener<UserAndPassword> listener) {
|
||||
if (securityIndex.indexExists() == false) {
|
||||
// TODO remove this short circuiting and fix tests that fail without this!
|
||||
if (securityIndex.isAvailable() == false) {
|
||||
listener.onResponse(null);
|
||||
} else {
|
||||
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () ->
|
||||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () ->
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
|
||||
client.prepareGet(SECURITY_INDEX_NAME,
|
||||
INDEX_TYPE, getIdForUser(USER_DOC_TYPE, user)).request(),
|
||||
@ -459,16 +457,19 @@ public class NativeUsersStore extends AbstractComponent {
|
||||
}
|
||||
|
||||
public void deleteUser(final DeleteUserRequest deleteUserRequest, final ActionListener<Boolean> listener) {
|
||||
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> {
|
||||
DeleteRequest request = client.prepareDelete(SECURITY_INDEX_NAME,
|
||||
if (securityIndex.isAvailable() == false) {
|
||||
listener.onResponse(false);
|
||||
} else {
|
||||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> {
|
||||
DeleteRequest request = client.prepareDelete(SECURITY_INDEX_NAME,
|
||||
INDEX_TYPE, getIdForUser(USER_DOC_TYPE, deleteUserRequest.username())).request();
|
||||
request.setRefreshPolicy(deleteUserRequest.getRefreshPolicy());
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
|
||||
request.setRefreshPolicy(deleteUserRequest.getRefreshPolicy());
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
|
||||
new ActionListener<DeleteResponse>() {
|
||||
@Override
|
||||
public void onResponse(DeleteResponse deleteResponse) {
|
||||
clearRealmCache(deleteUserRequest.username(), listener,
|
||||
deleteResponse.getResult() == DocWriteResponse.Result.DELETED);
|
||||
deleteResponse.getResult() == DocWriteResponse.Result.DELETED);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -476,7 +477,8 @@ public class NativeUsersStore extends AbstractComponent {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}, client::delete);
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -498,11 +500,10 @@ public class NativeUsersStore extends AbstractComponent {
|
||||
}
|
||||
|
||||
void getReservedUserInfo(String username, ActionListener<ReservedUserInfo> listener) {
|
||||
if (securityIndex.indexExists() == false) {
|
||||
// TODO remove this short circuiting and fix tests that fail without this!
|
||||
if (securityIndex.isAvailable() == false) {
|
||||
listener.onResponse(null);
|
||||
} else {
|
||||
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () ->
|
||||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () ->
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
|
||||
client.prepareGet(SECURITY_INDEX_NAME, INDEX_TYPE,
|
||||
getIdForUser(RESERVED_USER_TYPE, username)).request(),
|
||||
@ -541,49 +542,53 @@ public class NativeUsersStore extends AbstractComponent {
|
||||
}
|
||||
|
||||
void getAllReservedUserInfo(ActionListener<Map<String, ReservedUserInfo>> listener) {
|
||||
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () ->
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
|
||||
client.prepareSearch(SECURITY_INDEX_NAME)
|
||||
if (securityIndex.isAvailable() == false) {
|
||||
listener.onResponse(Collections.emptyMap());
|
||||
} else {
|
||||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () ->
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
|
||||
client.prepareSearch(SECURITY_INDEX_NAME)
|
||||
.setQuery(QueryBuilders.termQuery(Fields.TYPE.getPreferredName(), RESERVED_USER_TYPE))
|
||||
.setFetchSource(true).request(),
|
||||
new ActionListener<SearchResponse>() {
|
||||
@Override
|
||||
public void onResponse(SearchResponse searchResponse) {
|
||||
Map<String, ReservedUserInfo> userInfos = new HashMap<>();
|
||||
assert searchResponse.getHits().getTotalHits() <= 10 :
|
||||
new ActionListener<SearchResponse>() {
|
||||
@Override
|
||||
public void onResponse(SearchResponse searchResponse) {
|
||||
Map<String, ReservedUserInfo> userInfos = new HashMap<>();
|
||||
assert searchResponse.getHits().getTotalHits() <= 10 :
|
||||
"there are more than 10 reserved users we need to change this to retrieve them all!";
|
||||
for (SearchHit searchHit : searchResponse.getHits().getHits()) {
|
||||
Map<String, Object> sourceMap = searchHit.getSourceAsMap();
|
||||
String password = (String) sourceMap.get(Fields.PASSWORD.getPreferredName());
|
||||
Boolean enabled = (Boolean) sourceMap.get(Fields.ENABLED.getPreferredName());
|
||||
final String id = searchHit.getId();
|
||||
assert id != null && id.startsWith(RESERVED_USER_TYPE) :
|
||||
for (SearchHit searchHit : searchResponse.getHits().getHits()) {
|
||||
Map<String, Object> sourceMap = searchHit.getSourceAsMap();
|
||||
String password = (String) sourceMap.get(Fields.PASSWORD.getPreferredName());
|
||||
Boolean enabled = (Boolean) sourceMap.get(Fields.ENABLED.getPreferredName());
|
||||
final String id = searchHit.getId();
|
||||
assert id != null && id.startsWith(RESERVED_USER_TYPE) :
|
||||
"id [" + id + "] does not start with reserved-user prefix";
|
||||
final String username = id.substring(RESERVED_USER_TYPE.length() + 1);
|
||||
if (password == null) {
|
||||
listener.onFailure(new IllegalStateException("password hash must not be null!"));
|
||||
return;
|
||||
} else if (enabled == null) {
|
||||
listener.onFailure(new IllegalStateException("enabled must not be null!"));
|
||||
return;
|
||||
final String username = id.substring(RESERVED_USER_TYPE.length() + 1);
|
||||
if (password == null) {
|
||||
listener.onFailure(new IllegalStateException("password hash must not be null!"));
|
||||
return;
|
||||
} else if (enabled == null) {
|
||||
listener.onFailure(new IllegalStateException("enabled must not be null!"));
|
||||
return;
|
||||
} else {
|
||||
userInfos.put(username, new ReservedUserInfo(password.toCharArray(), enabled, false));
|
||||
}
|
||||
}
|
||||
listener.onResponse(userInfos);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
if (e instanceof IndexNotFoundException) {
|
||||
logger.trace("could not retrieve built in users since security index does not exist", e);
|
||||
listener.onResponse(Collections.emptyMap());
|
||||
} else {
|
||||
userInfos.put(username, new ReservedUserInfo(password.toCharArray(), enabled, false));
|
||||
logger.error("failed to retrieve built in users", e);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
listener.onResponse(userInfos);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
if (e instanceof IndexNotFoundException) {
|
||||
logger.trace("could not retrieve built in users since security index does not exist", e);
|
||||
listener.onResponse(Collections.emptyMap());
|
||||
} else {
|
||||
logger.error("failed to retrieve built in users", e);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
}, client::search));
|
||||
}, client::search));
|
||||
}
|
||||
}
|
||||
|
||||
private <Response> void clearRealmCache(String username, ActionListener<Response> listener, Response response) {
|
||||
|
@ -220,32 +220,32 @@ public class NativeRoleMappingStore extends AbstractComponent implements UserRol
|
||||
});
|
||||
}
|
||||
|
||||
private void innerDeleteMapping(DeleteRoleMappingRequest request, ActionListener<Boolean> listener) throws IOException {
|
||||
if (securityIndex.isIndexUpToDate() == false) {
|
||||
listener.onFailure(new IllegalStateException(
|
||||
"Security index is not on the current version - the native realm will not be operational until " +
|
||||
"the upgrade API is run on the security index"));
|
||||
return;
|
||||
}
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
|
||||
client.prepareDelete(SECURITY_INDEX_NAME, SECURITY_GENERIC_TYPE, getIdForName(request.getName()))
|
||||
private void innerDeleteMapping(DeleteRoleMappingRequest request, ActionListener<Boolean> listener) {
|
||||
if (securityIndex.isAvailable() == false) {
|
||||
listener.onResponse(false);
|
||||
} else {
|
||||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> {
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
|
||||
client.prepareDelete(SECURITY_INDEX_NAME, SECURITY_GENERIC_TYPE, getIdForName(request.getName()))
|
||||
.setRefreshPolicy(request.getRefreshPolicy())
|
||||
.request(),
|
||||
new ActionListener<DeleteResponse>() {
|
||||
new ActionListener<DeleteResponse>() {
|
||||
|
||||
@Override
|
||||
public void onResponse(DeleteResponse deleteResponse) {
|
||||
boolean deleted = deleteResponse.getResult() == DELETED;
|
||||
listener.onResponse(deleted);
|
||||
}
|
||||
@Override
|
||||
public void onResponse(DeleteResponse deleteResponse) {
|
||||
boolean deleted = deleteResponse.getResult() == DELETED;
|
||||
listener.onResponse(deleted);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.error(new ParameterizedMessage("failed to delete role-mapping [{}]", request.getName()), e);
|
||||
listener.onFailure(e);
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.error(new ParameterizedMessage("failed to delete role-mapping [{}]", request.getName()), e);
|
||||
listener.onFailure(e);
|
||||
|
||||
}
|
||||
}, client::delete);
|
||||
}
|
||||
}, client::delete);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -301,7 +301,7 @@ public class NativeRoleMappingStore extends AbstractComponent implements UserRol
|
||||
* </ul>
|
||||
*/
|
||||
public void usageStats(ActionListener<Map<String, Object>> listener) {
|
||||
if (securityIndex.indexExists() == false) {
|
||||
if (securityIndex.isAvailable() == false) {
|
||||
reportStats(listener, Collections.emptyList());
|
||||
} else {
|
||||
getMappings(ActionListener.wrap(mappings -> reportStats(listener, mappings), listener::onFailure));
|
||||
|
@ -88,13 +88,15 @@ public class NativePrivilegeStore extends AbstractComponent {
|
||||
|
||||
public void getPrivileges(Collection<String> applications, Collection<String> names,
|
||||
ActionListener<Collection<ApplicationPrivilegeDescriptor>> listener) {
|
||||
if (applications != null && applications.size() == 1 && names != null && names.size() == 1) {
|
||||
if (securityIndexManager.isAvailable() == false) {
|
||||
listener.onResponse(Collections.emptyList());
|
||||
} else if (applications != null && applications.size() == 1 && names != null && names.size() == 1) {
|
||||
getPrivilege(Objects.requireNonNull(Iterables.get(applications, 0)), Objects.requireNonNull(Iterables.get(names, 0)),
|
||||
ActionListener.wrap(privilege ->
|
||||
listener.onResponse(privilege == null ? Collections.emptyList() : Collections.singletonList(privilege)),
|
||||
listener::onFailure));
|
||||
} else {
|
||||
securityIndexManager.prepareIndexIfNeededThenExecute(listener::onFailure, () -> {
|
||||
securityIndexManager.checkIndexVersionThenExecute(listener::onFailure, () -> {
|
||||
final QueryBuilder query;
|
||||
final TermQueryBuilder typeQuery = QueryBuilders
|
||||
.termQuery(ApplicationPrivilegeDescriptor.Fields.TYPE.getPreferredName(), DOC_TYPE_VALUE);
|
||||
@ -134,33 +136,37 @@ public class NativePrivilegeStore extends AbstractComponent {
|
||||
return collection == null || collection.isEmpty();
|
||||
}
|
||||
|
||||
public void getPrivilege(String application, String name, ActionListener<ApplicationPrivilegeDescriptor> listener) {
|
||||
securityIndexManager.prepareIndexIfNeededThenExecute(listener::onFailure,
|
||||
() -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
|
||||
client.prepareGet(SECURITY_INDEX_NAME, "doc", toDocId(application, name)).request(),
|
||||
new ActionListener<GetResponse>() {
|
||||
@Override
|
||||
public void onResponse(GetResponse response) {
|
||||
if (response.isExists()) {
|
||||
listener.onResponse(buildPrivilege(response.getId(), response.getSourceAsBytesRef()));
|
||||
} else {
|
||||
listener.onResponse(null);
|
||||
void getPrivilege(String application, String name, ActionListener<ApplicationPrivilegeDescriptor> listener) {
|
||||
if (securityIndexManager.isAvailable() == false) {
|
||||
listener.onResponse(null);
|
||||
} else {
|
||||
securityIndexManager.checkIndexVersionThenExecute(listener::onFailure,
|
||||
() -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
|
||||
client.prepareGet(SECURITY_INDEX_NAME, "doc", toDocId(application, name)).request(),
|
||||
new ActionListener<GetResponse>() {
|
||||
@Override
|
||||
public void onResponse(GetResponse response) {
|
||||
if (response.isExists()) {
|
||||
listener.onResponse(buildPrivilege(response.getId(), response.getSourceAsBytesRef()));
|
||||
} else {
|
||||
listener.onResponse(null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
// if the index or the shard is not there / available we just claim the privilege is not there
|
||||
if (TransportActions.isShardNotAvailableException(e)) {
|
||||
logger.warn(new ParameterizedMessage("failed to load privilege [{}] index not available", name), e);
|
||||
listener.onResponse(null);
|
||||
} else {
|
||||
logger.error(new ParameterizedMessage("failed to load privilege [{}]", name), e);
|
||||
listener.onFailure(e);
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
// if the index or the shard is not there / available we just claim the privilege is not there
|
||||
if (TransportActions.isShardNotAvailableException(e)) {
|
||||
logger.warn(new ParameterizedMessage("failed to load privilege [{}] index not available", name), e);
|
||||
listener.onResponse(null);
|
||||
} else {
|
||||
logger.error(new ParameterizedMessage("failed to load privilege [{}]", name), e);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
client::get));
|
||||
},
|
||||
client::get));
|
||||
}
|
||||
}
|
||||
|
||||
public void putPrivileges(Collection<ApplicationPrivilegeDescriptor> privileges, WriteRequest.RefreshPolicy refreshPolicy,
|
||||
@ -200,23 +206,27 @@ public class NativePrivilegeStore extends AbstractComponent {
|
||||
|
||||
public void deletePrivileges(String application, Collection<String> names, WriteRequest.RefreshPolicy refreshPolicy,
|
||||
ActionListener<Map<String, List<String>>> listener) {
|
||||
securityIndexManager.prepareIndexIfNeededThenExecute(listener::onFailure, () -> {
|
||||
ActionListener<DeleteResponse> groupListener = new GroupedActionListener<>(
|
||||
ActionListener.wrap(responses -> {
|
||||
final Map<String, List<String>> deletedNames = responses.stream()
|
||||
.filter(r -> r.getResult() == DocWriteResponse.Result.DELETED)
|
||||
.map(r -> r.getId())
|
||||
.map(NativePrivilegeStore::nameFromDocId)
|
||||
.collect(TUPLES_TO_MAP);
|
||||
clearRolesCache(listener, deletedNames);
|
||||
}, listener::onFailure), names.size(), Collections.emptyList());
|
||||
for (String name : names) {
|
||||
ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
|
||||
client.prepareDelete(SECURITY_INDEX_NAME, "doc", toDocId(application, name))
|
||||
.setRefreshPolicy(refreshPolicy)
|
||||
.request(), groupListener, client::delete);
|
||||
}
|
||||
});
|
||||
if (securityIndexManager.isAvailable() == false) {
|
||||
listener.onResponse(Collections.emptyMap());
|
||||
} else {
|
||||
securityIndexManager.checkIndexVersionThenExecute(listener::onFailure, () -> {
|
||||
ActionListener<DeleteResponse> groupListener = new GroupedActionListener<>(
|
||||
ActionListener.wrap(responses -> {
|
||||
final Map<String, List<String>> deletedNames = responses.stream()
|
||||
.filter(r -> r.getResult() == DocWriteResponse.Result.DELETED)
|
||||
.map(r -> r.getId())
|
||||
.map(NativePrivilegeStore::nameFromDocId)
|
||||
.collect(TUPLES_TO_MAP);
|
||||
clearRolesCache(listener, deletedNames);
|
||||
}, listener::onFailure), names.size(), Collections.emptyList());
|
||||
for (String name : names) {
|
||||
ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
|
||||
client.prepareDelete(SECURITY_INDEX_NAME, "doc", toDocId(application, name))
|
||||
.setRefreshPolicy(refreshPolicy)
|
||||
.request(), groupListener, client::delete);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private <T> void clearRolesCache(ActionListener<T> listener, T value) {
|
||||
|
@ -116,7 +116,7 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer<Se
|
||||
} else if (names != null && names.size() == 1) {
|
||||
getRoleDescriptor(Objects.requireNonNull(names.iterator().next()), listener);
|
||||
} else {
|
||||
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> {
|
||||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> {
|
||||
QueryBuilder query;
|
||||
if (names == null || names.isEmpty()) {
|
||||
query = QueryBuilders.termQuery(RoleDescriptor.Fields.TYPE.getPreferredName(), ROLE_TYPE);
|
||||
@ -144,16 +144,19 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer<Se
|
||||
}
|
||||
|
||||
public void deleteRole(final DeleteRoleRequest deleteRoleRequest, final ActionListener<Boolean> listener) {
|
||||
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> {
|
||||
DeleteRequest request = client.prepareDelete(SecurityIndexManager.SECURITY_INDEX_NAME,
|
||||
if (securityIndex.isAvailable() == false) {
|
||||
listener.onResponse(false);
|
||||
} else {
|
||||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> {
|
||||
DeleteRequest request = client.prepareDelete(SecurityIndexManager.SECURITY_INDEX_NAME,
|
||||
ROLE_DOC_TYPE, getIdForUser(deleteRoleRequest.name())).request();
|
||||
request.setRefreshPolicy(deleteRoleRequest.getRefreshPolicy());
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
|
||||
request.setRefreshPolicy(deleteRoleRequest.getRefreshPolicy());
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
|
||||
new ActionListener<DeleteResponse>() {
|
||||
@Override
|
||||
public void onResponse(DeleteResponse deleteResponse) {
|
||||
clearRoleCache(deleteRoleRequest.name(), listener,
|
||||
deleteResponse.getResult() == DocWriteResponse.Result.DELETED);
|
||||
deleteResponse.getResult() == DocWriteResponse.Result.DELETED);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -162,7 +165,8 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer<Se
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}, client::delete);
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public void putRole(final PutRoleRequest request, final RoleDescriptor role, final ActionListener<Boolean> listener) {
|
||||
@ -210,13 +214,13 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer<Se
|
||||
|
||||
public void usageStats(ActionListener<Map<String, Object>> listener) {
|
||||
Map<String, Object> usageStats = new HashMap<>(3);
|
||||
if (securityIndex.indexExists() == false) {
|
||||
if (securityIndex.isAvailable() == false) {
|
||||
usageStats.put("size", 0L);
|
||||
usageStats.put("fls", false);
|
||||
usageStats.put("dls", false);
|
||||
listener.onResponse(usageStats);
|
||||
} else {
|
||||
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () ->
|
||||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () ->
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
|
||||
client.prepareMultiSearch()
|
||||
.add(client.prepareSearch(SecurityIndexManager.SECURITY_INDEX_NAME)
|
||||
@ -298,7 +302,7 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer<Se
|
||||
}
|
||||
|
||||
private void executeGetRoleRequest(String role, ActionListener<GetResponse> listener) {
|
||||
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () ->
|
||||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () ->
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
|
||||
client.prepareGet(SECURITY_INDEX_NAME,
|
||||
ROLE_DOC_TYPE, getIdForUser(role)).request(),
|
||||
|
@ -253,6 +253,23 @@ public class SecurityIndexManager extends AbstractComponent implements ClusterSt
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the security index is up to date and does not need to migrated. If it is not, the
|
||||
* consumer is called with an exception. If the security index is up to date, the runnable will
|
||||
* be executed. <b>NOTE:</b> this method does not check the availability of the index; this check
|
||||
* is left to the caller so that this condition can be handled appropriately.
|
||||
*/
|
||||
public void checkIndexVersionThenExecute(final Consumer<Exception> consumer, final Runnable andThen) {
|
||||
final State indexState = this.indexState; // use a local copy so all checks execute against the same state!
|
||||
if (indexState.indexExists && indexState.isIndexUpToDate == false) {
|
||||
consumer.accept(new IllegalStateException(
|
||||
"Security index is not on the current version. Security features relying on the index will not be available until " +
|
||||
"the upgrade API is run on the security index"));
|
||||
} else {
|
||||
andThen.run();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepares the index by creating it if it doesn't exist or updating the mappings if the mappings are
|
||||
* out of date. After any tasks have been executed, the runnable is then executed.
|
||||
|
@ -163,6 +163,11 @@ public class TransportSamlInvalidateSessionActionTests extends SamlTestCase {
|
||||
((Runnable) inv.getArguments()[1]).run();
|
||||
return null;
|
||||
}).when(securityIndex).prepareIndexIfNeededThenExecute(any(Consumer.class), any(Runnable.class));
|
||||
doAnswer(inv -> {
|
||||
((Runnable) inv.getArguments()[1]).run();
|
||||
return null;
|
||||
}).when(securityIndex).checkIndexVersionThenExecute(any(Consumer.class), any(Runnable.class));
|
||||
when(securityIndex.isAvailable()).thenReturn(true);
|
||||
|
||||
final ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
|
||||
tokenService = new TokenService(settings, Clock.systemUTC(), client, securityIndex, clusterService);
|
||||
|
@ -178,6 +178,11 @@ public class TransportSamlLogoutActionTests extends SamlTestCase {
|
||||
((Runnable) inv.getArguments()[1]).run();
|
||||
return null;
|
||||
}).when(securityIndex).prepareIndexIfNeededThenExecute(any(Consumer.class), any(Runnable.class));
|
||||
doAnswer(inv -> {
|
||||
((Runnable) inv.getArguments()[1]).run();
|
||||
return null;
|
||||
}).when(securityIndex).checkIndexVersionThenExecute(any(Consumer.class), any(Runnable.class));
|
||||
when(securityIndex.isAvailable()).thenReturn(true);
|
||||
|
||||
final ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
|
||||
tokenService = new TokenService(settings, Clock.systemUTC(), client, securityIndex, clusterService);
|
||||
|
@ -87,6 +87,7 @@ import java.util.function.Consumer;
|
||||
|
||||
import static org.elasticsearch.test.SecurityTestsUtils.assertAuthenticationException;
|
||||
import static org.elasticsearch.xpack.core.security.support.Exceptions.authenticationError;
|
||||
import static org.elasticsearch.xpack.security.authc.TokenServiceTests.mockCheckTokenInvalidationFromId;
|
||||
import static org.elasticsearch.xpack.security.authc.TokenServiceTests.mockGetTokenFromId;
|
||||
import static org.hamcrest.Matchers.arrayContaining;
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
@ -187,6 +188,11 @@ public class AuthenticationServiceTests extends ESTestCase {
|
||||
runnable.run();
|
||||
return null;
|
||||
}).when(securityIndex).prepareIndexIfNeededThenExecute(any(Consumer.class), any(Runnable.class));
|
||||
doAnswer(invocationOnMock -> {
|
||||
Runnable runnable = (Runnable) invocationOnMock.getArguments()[1];
|
||||
runnable.run();
|
||||
return null;
|
||||
}).when(securityIndex).checkIndexVersionThenExecute(any(Consumer.class), any(Runnable.class));
|
||||
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
|
||||
tokenService = new TokenService(settings, Clock.systemUTC(), client, securityIndex, clusterService);
|
||||
service = new AuthenticationService(settings, realms, auditTrail,
|
||||
@ -898,7 +904,11 @@ public class AuthenticationServiceTests extends ESTestCase {
|
||||
tokenService.createUserToken(expected, originatingAuth, tokenFuture, Collections.emptyMap(), true);
|
||||
}
|
||||
String token = tokenService.getUserTokenString(tokenFuture.get().v1());
|
||||
when(client.prepareMultiGet()).thenReturn(new MultiGetRequestBuilder(client, MultiGetAction.INSTANCE));
|
||||
mockGetTokenFromId(tokenFuture.get().v1(), client);
|
||||
mockCheckTokenInvalidationFromId(tokenFuture.get().v1(), client);
|
||||
when(securityIndex.isAvailable()).thenReturn(true);
|
||||
when(securityIndex.indexExists()).thenReturn(true);
|
||||
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
||||
threadContext.putHeader("Authorization", "Bearer " + token);
|
||||
service.authenticate("_action", message, (User)null, ActionListener.wrap(result -> {
|
||||
|
@ -137,6 +137,13 @@ public class TokenServiceTests extends ESTestCase {
|
||||
runnable.run();
|
||||
return null;
|
||||
}).when(securityIndex).prepareIndexIfNeededThenExecute(any(Consumer.class), any(Runnable.class));
|
||||
doAnswer(invocationOnMock -> {
|
||||
Runnable runnable = (Runnable) invocationOnMock.getArguments()[1];
|
||||
runnable.run();
|
||||
return null;
|
||||
}).when(securityIndex).checkIndexVersionThenExecute(any(Consumer.class), any(Runnable.class));
|
||||
when(securityIndex.indexExists()).thenReturn(true);
|
||||
when(securityIndex.isAvailable()).thenReturn(true);
|
||||
this.clusterService = ClusterServiceUtils.createClusterService(threadPool);
|
||||
}
|
||||
|
||||
@ -161,6 +168,7 @@ public class TokenServiceTests extends ESTestCase {
|
||||
final UserToken token = tokenFuture.get().v1();
|
||||
assertNotNull(token);
|
||||
mockGetTokenFromId(token);
|
||||
mockCheckTokenInvalidationFromId(token);
|
||||
|
||||
ThreadContext requestContext = new ThreadContext(Settings.EMPTY);
|
||||
requestContext.putHeader("Authorization", randomFrom("Bearer ", "BEARER ", "bearer ") + tokenService.getUserTokenString(token));
|
||||
@ -207,6 +215,7 @@ public class TokenServiceTests extends ESTestCase {
|
||||
final UserToken token = tokenFuture.get().v1();
|
||||
assertNotNull(token);
|
||||
mockGetTokenFromId(token);
|
||||
mockCheckTokenInvalidationFromId(token);
|
||||
|
||||
ThreadContext requestContext = new ThreadContext(Settings.EMPTY);
|
||||
requestContext.putHeader("Authorization", "Bearer " + tokenService.getUserTokenString(token));
|
||||
@ -266,6 +275,7 @@ public class TokenServiceTests extends ESTestCase {
|
||||
final UserToken token = tokenFuture.get().v1();
|
||||
assertNotNull(token);
|
||||
mockGetTokenFromId(token);
|
||||
mockCheckTokenInvalidationFromId(token);
|
||||
|
||||
ThreadContext requestContext = new ThreadContext(Settings.EMPTY);
|
||||
requestContext.putHeader("Authorization", "Bearer " + tokenService.getUserTokenString(token));
|
||||
@ -296,6 +306,7 @@ public class TokenServiceTests extends ESTestCase {
|
||||
final UserToken token = tokenFuture.get().v1();
|
||||
assertNotNull(token);
|
||||
mockGetTokenFromId(token);
|
||||
mockCheckTokenInvalidationFromId(token);
|
||||
|
||||
ThreadContext requestContext = new ThreadContext(Settings.EMPTY);
|
||||
requestContext.putHeader("Authorization", "Bearer " + tokenService.getUserTokenString(token));
|
||||
@ -357,6 +368,7 @@ public class TokenServiceTests extends ESTestCase {
|
||||
final UserToken token = tokenFuture.get().v1();
|
||||
assertNotNull(token);
|
||||
mockGetTokenFromId(token);
|
||||
mockCheckTokenInvalidationFromId(token);
|
||||
|
||||
ThreadContext requestContext = new ThreadContext(Settings.EMPTY);
|
||||
requestContext.putHeader("Authorization", "Bearer " + tokenService.getUserTokenString(token));
|
||||
@ -454,6 +466,7 @@ public class TokenServiceTests extends ESTestCase {
|
||||
tokenService.createUserToken(authentication, authentication, tokenFuture, Collections.emptyMap(), true);
|
||||
final UserToken token = tokenFuture.get().v1();
|
||||
mockGetTokenFromId(token);
|
||||
mockCheckTokenInvalidationFromId(token);
|
||||
|
||||
ThreadContext requestContext = new ThreadContext(Settings.EMPTY);
|
||||
requestContext.putHeader("Authorization", "Bearer " + tokenService.getUserTokenString(token));
|
||||
@ -577,14 +590,25 @@ public class TokenServiceTests extends ESTestCase {
|
||||
try (ThreadContext.StoredContext ignore = requestContext.newStoredContext(true)) {
|
||||
PlainActionFuture<UserToken> future = new PlainActionFuture<>();
|
||||
tokenService.getAndValidateToken(requestContext, future);
|
||||
UserToken serialized = future.get();
|
||||
assertEquals(authentication, serialized.getAuthentication());
|
||||
assertNull(future.get());
|
||||
|
||||
when(securityIndex.isAvailable()).thenReturn(false);
|
||||
when(securityIndex.indexExists()).thenReturn(true);
|
||||
future = new PlainActionFuture<>();
|
||||
tokenService.getAndValidateToken(requestContext, future);
|
||||
assertNull(future.get());
|
||||
|
||||
when(securityIndex.indexExists()).thenReturn(false);
|
||||
future = new PlainActionFuture<>();
|
||||
tokenService.getAndValidateToken(requestContext, future);
|
||||
assertNull(future.get());
|
||||
|
||||
when(securityIndex.isAvailable()).thenReturn(true);
|
||||
when(securityIndex.indexExists()).thenReturn(true);
|
||||
mockCheckTokenInvalidationFromId(token);
|
||||
future = new PlainActionFuture<>();
|
||||
tokenService.getAndValidateToken(requestContext, future);
|
||||
assertEquals(token.getAuthentication(), future.get().getAuthentication());
|
||||
}
|
||||
}
|
||||
|
||||
@ -625,4 +649,38 @@ public class TokenServiceTests extends ESTestCase {
|
||||
return Void.TYPE;
|
||||
}).when(client).get(any(GetRequest.class), any(ActionListener.class));
|
||||
}
|
||||
|
||||
private void mockCheckTokenInvalidationFromId(UserToken userToken) {
|
||||
mockCheckTokenInvalidationFromId(userToken, client);
|
||||
}
|
||||
|
||||
public static void mockCheckTokenInvalidationFromId(UserToken userToken, Client client) {
|
||||
doAnswer(invocationOnMock -> {
|
||||
MultiGetRequest request = (MultiGetRequest) invocationOnMock.getArguments()[0];
|
||||
ActionListener<MultiGetResponse> listener = (ActionListener<MultiGetResponse>) invocationOnMock.getArguments()[1];
|
||||
MultiGetResponse response = mock(MultiGetResponse.class);
|
||||
MultiGetItemResponse[] responses = new MultiGetItemResponse[2];
|
||||
when(response.getResponses()).thenReturn(responses);
|
||||
GetResponse legacyResponse = mock(GetResponse.class);
|
||||
responses[0] = new MultiGetItemResponse(legacyResponse, null);
|
||||
when(legacyResponse.isExists()).thenReturn(false);
|
||||
GetResponse tokenResponse = mock(GetResponse.class);
|
||||
if (userToken.getId().equals(request.getItems().get(1).id().replace("token_", ""))) {
|
||||
when(tokenResponse.isExists()).thenReturn(true);
|
||||
Map<String, Object> sourceMap = new HashMap<>();
|
||||
try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) {
|
||||
userToken.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
Map<String, Object> accessTokenMap = new HashMap<>();
|
||||
accessTokenMap.put("user_token",
|
||||
XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(builder), false));
|
||||
accessTokenMap.put("invalidated", false);
|
||||
sourceMap.put("access_token", accessTokenMap);
|
||||
}
|
||||
when(tokenResponse.getSource()).thenReturn(sourceMap);
|
||||
}
|
||||
responses[1] = new MultiGetItemResponse(tokenResponse, null);
|
||||
listener.onResponse(response);
|
||||
return Void.TYPE;
|
||||
}).when(client).multiGet(any(MultiGetRequest.class), any(ActionListener.class));
|
||||
}
|
||||
}
|
||||
|
@ -242,6 +242,11 @@ public class NativeUsersStoreTests extends ESTestCase {
|
||||
action.run();
|
||||
return null;
|
||||
}).when(securityIndex).prepareIndexIfNeededThenExecute(any(Consumer.class), any(Runnable.class));
|
||||
doAnswer((i) -> {
|
||||
Runnable action = (Runnable) i.getArguments()[1];
|
||||
action.run();
|
||||
return null;
|
||||
}).when(securityIndex).checkIndexVersionThenExecute(any(Consumer.class), any(Runnable.class));
|
||||
return new NativeUsersStore(Settings.EMPTY, client, securityIndex);
|
||||
}
|
||||
|
||||
|
@ -96,6 +96,12 @@ public class NativePrivilegeStoreTests extends ESTestCase {
|
||||
((Runnable) invocationOnMock.getArguments()[1]).run();
|
||||
return null;
|
||||
}).when(securityIndex).prepareIndexIfNeededThenExecute(any(Consumer.class), any(Runnable.class));
|
||||
Mockito.doAnswer(invocationOnMock -> {
|
||||
assertThat(invocationOnMock.getArguments().length, equalTo(2));
|
||||
assertThat(invocationOnMock.getArguments()[1], instanceOf(Runnable.class));
|
||||
((Runnable) invocationOnMock.getArguments()[1]).run();
|
||||
return null;
|
||||
}).when(securityIndex).checkIndexVersionThenExecute(any(Consumer.class), any(Runnable.class));
|
||||
store = new NativePrivilegeStore(Settings.EMPTY, client, securityIndex);
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user