Native roles store uses mget to retrieve roles (#33531)
The native roles store previously would issue a search if attempting to retrieve more than a single role. This is fine when we are attempting to retrieve all of the roles to list them in the api, but could cause issues when attempting to find roles for a user. The search is not prioritized over other search requests, so heavy aggregations/searches or overloaded nodes could cause roles to be cached as missing if the search is rejected. When attempting to load specific roles, we know the document id for the role that we are trying to load, which allows us to use the multi-get api for loading these roles. This change makes use of the multi-get api when attempting to load more than one role by name. This api is also constrained by a threadpool but the tasks in the GET threadpool should be quicker than searches. See #33205
This commit is contained in:
parent
7de9eda391
commit
09a124e8d6
|
@ -13,6 +13,9 @@ import org.elasticsearch.action.DocWriteResponse;
|
|||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.get.MultiGetItemResponse;
|
||||
import org.elasticsearch.action.get.MultiGetRequest;
|
||||
import org.elasticsearch.action.get.MultiGetResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.search.MultiSearchResponse;
|
||||
|
@ -49,7 +52,6 @@ import org.elasticsearch.xpack.security.support.SecurityIndexManager;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -113,17 +115,9 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer<Se
|
|||
if (securityIndex.indexExists() == false) {
|
||||
// TODO remove this short circuiting and fix tests that fail without this!
|
||||
listener.onResponse(RoleRetrievalResult.success(Collections.emptySet()));
|
||||
} else if (names != null && names.size() == 1) {
|
||||
getRoleDescriptor(Objects.requireNonNull(names.iterator().next()), listener);
|
||||
} else {
|
||||
} else if (names == null || names.isEmpty()) {
|
||||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> {
|
||||
QueryBuilder query;
|
||||
if (names == null || names.isEmpty()) {
|
||||
query = QueryBuilders.termQuery(RoleDescriptor.Fields.TYPE.getPreferredName(), ROLE_TYPE);
|
||||
} else {
|
||||
final String[] roleNames = names.stream().map(NativeRolesStore::getIdForUser).toArray(String[]::new);
|
||||
query = QueryBuilders.boolQuery().filter(QueryBuilders.idsQuery(ROLE_DOC_TYPE).addIds(roleNames));
|
||||
}
|
||||
QueryBuilder query = QueryBuilders.termQuery(RoleDescriptor.Fields.TYPE.getPreferredName(), ROLE_TYPE);
|
||||
final Supplier<ThreadContext.StoredContext> supplier = client.threadPool().getThreadContext().newRestorableContext(false);
|
||||
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN)) {
|
||||
SearchRequest request = client.prepareSearch(SecurityIndexManager.SECURITY_INDEX_NAME)
|
||||
|
@ -133,13 +127,42 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer<Se
|
|||
.setFetchSource(true)
|
||||
.request();
|
||||
request.indicesOptions().ignoreUnavailable();
|
||||
final ActionListener<Collection<RoleDescriptor>> descriptorsListener = ActionListener.wrap(
|
||||
roleDescriptors -> listener.onResponse(RoleRetrievalResult.success(new HashSet<>(roleDescriptors))),
|
||||
e -> listener.onResponse(RoleRetrievalResult.failure(e)));
|
||||
ScrollHelper.fetchAllByEntity(client, request, new ContextPreservingActionListener<>(supplier, descriptorsListener),
|
||||
ScrollHelper.fetchAllByEntity(client, request, new ContextPreservingActionListener<>(supplier,
|
||||
ActionListener.wrap(roles -> listener.onResponse(RoleRetrievalResult.success(new HashSet<>(roles))),
|
||||
e -> listener.onResponse(RoleRetrievalResult.failure(e)))),
|
||||
(hit) -> transformRole(hit.getId(), hit.getSourceRef(), logger, licenseState));
|
||||
}
|
||||
});
|
||||
} else if (names.size() == 1) {
|
||||
getRoleDescriptor(Objects.requireNonNull(names.iterator().next()), listener);
|
||||
} else {
|
||||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> {
|
||||
final String[] roleIds = names.stream().map(NativeRolesStore::getIdForRole).toArray(String[]::new);
|
||||
MultiGetRequest multiGetRequest = client.prepareMultiGet().add(SECURITY_INDEX_NAME, ROLE_DOC_TYPE, roleIds).request();
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, multiGetRequest,
|
||||
ActionListener.<MultiGetResponse>wrap(mGetResponse -> {
|
||||
final MultiGetItemResponse[] responses = mGetResponse.getResponses();
|
||||
Set<RoleDescriptor> descriptors = new HashSet<>();
|
||||
for (int i = 0; i < responses.length; i++) {
|
||||
MultiGetItemResponse item = responses[i];
|
||||
if (item.isFailed()) {
|
||||
final Exception failure = item.getFailure().getFailure();
|
||||
for (int j = i + 1; j < responses.length; j++) {
|
||||
item = responses[j];
|
||||
if (item.isFailed()) {
|
||||
failure.addSuppressed(failure);
|
||||
}
|
||||
}
|
||||
listener.onResponse(RoleRetrievalResult.failure(failure));
|
||||
return;
|
||||
} else if (item.getResponse().isExists()) {
|
||||
descriptors.add(transformRole(item.getResponse()));
|
||||
}
|
||||
}
|
||||
listener.onResponse(RoleRetrievalResult.success(descriptors));
|
||||
},
|
||||
e -> listener.onResponse(RoleRetrievalResult.failure(e))), client::multiGet);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -152,7 +175,7 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer<Se
|
|||
} else {
|
||||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> {
|
||||
DeleteRequest request = client.prepareDelete(SecurityIndexManager.SECURITY_INDEX_NAME,
|
||||
ROLE_DOC_TYPE, getIdForUser(deleteRoleRequest.name())).request();
|
||||
ROLE_DOC_TYPE, getIdForRole(deleteRoleRequest.name())).request();
|
||||
request.setRefreshPolicy(deleteRoleRequest.getRefreshPolicy());
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
|
||||
new ActionListener<DeleteResponse>() {
|
||||
|
@ -192,7 +215,7 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer<Se
|
|||
listener.onFailure(e);
|
||||
return;
|
||||
}
|
||||
final IndexRequest indexRequest = client.prepareIndex(SECURITY_INDEX_NAME, ROLE_DOC_TYPE, getIdForUser(role.getName()))
|
||||
final IndexRequest indexRequest = client.prepareIndex(SECURITY_INDEX_NAME, ROLE_DOC_TYPE, getIdForRole(role.getName()))
|
||||
.setSource(xContentBuilder)
|
||||
.setRefreshPolicy(request.getRefreshPolicy())
|
||||
.request();
|
||||
|
@ -308,7 +331,7 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer<Se
|
|||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () ->
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
|
||||
client.prepareGet(SECURITY_INDEX_NAME,
|
||||
ROLE_DOC_TYPE, getIdForUser(role)).request(),
|
||||
ROLE_DOC_TYPE, getIdForRole(role)).request(),
|
||||
listener,
|
||||
client::get));
|
||||
}
|
||||
|
@ -388,7 +411,7 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer<Se
|
|||
/**
|
||||
* Gets the document's id field for the given role name.
|
||||
*/
|
||||
private static String getIdForUser(final String roleName) {
|
||||
private static String getIdForRole(final String roleName) {
|
||||
return ROLE_TYPE + "-" + roleName;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue