Security: don't call prepare index for reads (#34568)

The security native stores follow a pattern where
`SecurityIndexManager#prepareIndexIfNeededThenExecute` wraps most calls
made for the security index. The reasoning behind this was to check if
the security index had been upgraded to the latest version in a
consistent manner. However, this has the potential side effect that a
read will trigger the creation of the security index or an updating of
its mappings, which can lead to issues such as failures due to put
mapping requests timing out even though we might have been able to read
from the index and get the data necessary.

This change introduces a new method, `checkIndexVersionThenExecute`,
that provides the consistent checking of the security index to make
sure it has been upgraded. That is the only check that this method
performs prior to running the passed in operation, which removes the
possible triggering of index creation and mapping updates for reads.

Additionally, areas where we do reads now check the availability of the
security index and can short circuit requests. Availability in this
context means that the index exists and all primaries are active.

This is the fixed version of #34246, which was reverted.

Relates #33205
This commit is contained in:
Jay Modi 2018-10-22 10:12:37 -06:00 committed by GitHub
parent 5dd79bf58c
commit c344293aed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 447 additions and 235 deletions

View File

@ -419,7 +419,7 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw
components.add(auditTrailService);
this.auditTrailService.set(auditTrailService);
securityIndex.set(new SecurityIndexManager(settings, client, SecurityIndexManager.SECURITY_INDEX_NAME, clusterService));
securityIndex.set(new SecurityIndexManager(client, SecurityIndexManager.SECURITY_INDEX_NAME, clusterService));
final TokenService tokenService = new TokenService(settings, Clock.systemUTC(), client, securityIndex.get(), clusterService);
this.tokenService.set(tokenService);

View File

@ -25,7 +25,6 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.update.UpdateRequest;
@ -116,7 +115,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException;
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
@ -366,45 +364,50 @@ public final class TokenService extends AbstractComponent {
final Cipher cipher = getDecryptionCipher(iv, decodeKey, version, decodedSalt);
if (version.onOrAfter(Version.V_6_2_0)) {
// we only have the id and need to get the token from the doc!
decryptTokenId(in, cipher, version, ActionListener.wrap(tokenId ->
securityIndex.prepareIndexIfNeededThenExecute(
ex -> listener.onFailure(traceLog("prepare security index", tokenId, ex)),
() -> {
final GetRequest getRequest = client.prepareGet(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE,
getTokenDocumentId(tokenId)).request();
Consumer<Exception> onFailure = ex -> listener.onFailure(traceLog("decode token", tokenId, ex));
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest,
ActionListener.<GetResponse>wrap(response -> {
if (response.isExists()) {
Map<String, Object> accessTokenSource =
(Map<String, Object>) response.getSource().get("access_token");
if (accessTokenSource == null) {
onFailure.accept(new IllegalStateException(
"token document is missing the access_token field"));
} else if (accessTokenSource.containsKey("user_token") == false) {
onFailure.accept(new IllegalStateException(
"token document is missing the user_token field"));
decryptTokenId(in, cipher, version, ActionListener.wrap(tokenId -> {
if (securityIndex.isAvailable() == false) {
logger.warn("failed to get token [{}] since index is not available", tokenId);
listener.onResponse(null);
} else {
securityIndex.checkIndexVersionThenExecute(
ex -> listener.onFailure(traceLog("prepare security index", tokenId, ex)),
() -> {
final GetRequest getRequest = client.prepareGet(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE,
getTokenDocumentId(tokenId)).request();
Consumer<Exception> onFailure = ex -> listener.onFailure(traceLog("decode token", tokenId, ex));
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest,
ActionListener.<GetResponse>wrap(response -> {
if (response.isExists()) {
Map<String, Object> accessTokenSource =
(Map<String, Object>) response.getSource().get("access_token");
if (accessTokenSource == null) {
onFailure.accept(new IllegalStateException(
"token document is missing the access_token field"));
} else if (accessTokenSource.containsKey("user_token") == false) {
onFailure.accept(new IllegalStateException(
"token document is missing the user_token field"));
} else {
Map<String, Object> userTokenSource =
(Map<String, Object>) accessTokenSource.get("user_token");
listener.onResponse(UserToken.fromSourceMap(userTokenSource));
}
} else {
Map<String, Object> userTokenSource =
(Map<String, Object>) accessTokenSource.get("user_token");
listener.onResponse(UserToken.fromSourceMap(userTokenSource));
onFailure.accept(
new IllegalStateException("token document is missing and must be present"));
}
} else {
onFailure.accept(
new IllegalStateException("token document is missing and must be present"));
}
}, e -> {
// if the index or the shard is not there / available we assume that
// the token is not valid
if (isShardNotAvailableException(e)) {
logger.warn("failed to get token [{}] since index is not available", tokenId);
listener.onResponse(null);
} else {
logger.error(new ParameterizedMessage("failed to get token [{}]", tokenId), e);
listener.onFailure(e);
}
}), client::get);
}), listener::onFailure));
}, e -> {
// if the index or the shard is not there / available we assume that
// the token is not valid
if (isShardNotAvailableException(e)) {
logger.warn("failed to get token [{}] since index is not available", tokenId);
listener.onResponse(null);
} else {
logger.error(new ParameterizedMessage("failed to get token [{}]", tokenId), e);
listener.onFailure(e);
}
}), client::get);
});
}}, listener::onFailure));
} else {
decryptToken(in, cipher, version, listener);
}
@ -691,31 +694,41 @@ public final class TokenService extends AbstractComponent {
.setVersion(true)
.request();
Consumer<Exception> onFailure = ex -> listener.onFailure(traceLog("find by refresh token", refreshToken, ex));
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () ->
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
ActionListener.<SearchResponse>wrap(searchResponse -> {
if (searchResponse.isTimedOut()) {
attemptCount.incrementAndGet();
findTokenFromRefreshToken(refreshToken, listener, attemptCount);
} else if (searchResponse.getHits().getHits().length < 1) {
logger.info("could not find token document with refresh_token [{}]", refreshToken);
onFailure.accept(invalidGrantException("could not refresh the requested token"));
} else if (searchResponse.getHits().getHits().length > 1) {
onFailure.accept(new IllegalStateException("multiple tokens share the same refresh token"));
} else {
listener.onResponse(new Tuple<>(searchResponse, attemptCount));
}
}, e -> {
if (isShardNotAvailableException(e)) {
logger.debug("failed to search for token document, retrying", e);
attemptCount.incrementAndGet();
findTokenFromRefreshToken(refreshToken, listener, attemptCount);
} else {
onFailure.accept(e);
}
}),
client::search));
final SecurityIndexManager frozenSecurityIndex = securityIndex.freeze();
if (frozenSecurityIndex.indexExists() == false) {
logger.warn("security index does not exist therefore refresh token [{}] cannot be validated", refreshToken);
listener.onFailure(invalidGrantException("could not refresh the requested token"));
} else if (frozenSecurityIndex.isAvailable() == false) {
logger.debug("security index is not available to find token from refresh token, retrying");
attemptCount.incrementAndGet();
findTokenFromRefreshToken(refreshToken, listener, attemptCount);
} else {
Consumer<Exception> onFailure = ex -> listener.onFailure(traceLog("find by refresh token", refreshToken, ex));
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () ->
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
ActionListener.<SearchResponse>wrap(searchResponse -> {
if (searchResponse.isTimedOut()) {
attemptCount.incrementAndGet();
findTokenFromRefreshToken(refreshToken, listener, attemptCount);
} else if (searchResponse.getHits().getHits().length < 1) {
logger.info("could not find token document with refresh_token [{}]", refreshToken);
onFailure.accept(invalidGrantException("could not refresh the requested token"));
} else if (searchResponse.getHits().getHits().length > 1) {
onFailure.accept(new IllegalStateException("multiple tokens share the same refresh token"));
} else {
listener.onResponse(new Tuple<>(searchResponse, attemptCount));
}
}, e -> {
if (isShardNotAvailableException(e)) {
logger.debug("failed to search for token document, retrying", e);
attemptCount.incrementAndGet();
findTokenFromRefreshToken(refreshToken, listener, attemptCount);
} else {
onFailure.accept(e);
}
}),
client::search));
}
}
}
@ -850,24 +863,27 @@ public final class TokenService extends AbstractComponent {
public void findActiveTokensForRealm(String realmName, ActionListener<Collection<Tuple<UserToken, String>>> listener) {
ensureEnabled();
final SecurityIndexManager frozenSecurityIndex = securityIndex.freeze();
if (Strings.isNullOrEmpty(realmName)) {
listener.onFailure(new IllegalArgumentException("Realm name is required"));
return;
}
final Instant now = clock.instant();
final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
} else if (frozenSecurityIndex.indexExists() == false) {
listener.onResponse(Collections.emptyList());
} else if (frozenSecurityIndex.isAvailable() == false) {
listener.onFailure(frozenSecurityIndex.getUnavailableReason());
} else {
final Instant now = clock.instant();
final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery("doc_type", "token"))
.filter(QueryBuilders.termQuery("access_token.realm", realmName))
.filter(QueryBuilders.boolQuery()
.should(QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("access_token.invalidated", false))
.must(QueryBuilders.rangeQuery("access_token.user_token.expiration_time").gte(now.toEpochMilli()))
)
.should(QueryBuilders.termQuery("refresh_token.invalidated", false))
.should(QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("access_token.invalidated", false))
.must(QueryBuilders.rangeQuery("access_token.user_token.expiration_time").gte(now.toEpochMilli()))
)
.should(QueryBuilders.termQuery("refresh_token.invalidated", false))
);
final SearchRequest request = client.prepareSearch(SecurityIndexManager.SECURITY_INDEX_NAME)
final SearchRequest request = client.prepareSearch(SecurityIndexManager.SECURITY_INDEX_NAME)
.setScroll(DEFAULT_KEEPALIVE_SETTING.get(settings))
.setQuery(boolQuery)
.setVersion(false)
@ -875,9 +891,9 @@ public final class TokenService extends AbstractComponent {
.setFetchSource(true)
.request();
final Supplier<ThreadContext.StoredContext> supplier = client.threadPool().getThreadContext().newRestorableContext(false);
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () ->
ScrollHelper.fetchAllByEntity(client, request, new ContextPreservingActionListener<>(supplier, listener), this::parseHit));
securityIndex.checkIndexVersionThenExecute(listener::onFailure,
() -> ScrollHelper.fetchAllByEntity(client, request, listener, this::parseHit));
}
}
private Tuple<UserToken, String> parseHit(SearchHit hit) {
@ -944,10 +960,12 @@ public final class TokenService extends AbstractComponent {
*/
private void checkIfTokenIsRevoked(UserToken userToken, ActionListener<UserToken> listener) {
if (securityIndex.indexExists() == false) {
// index doesn't exist so the token is considered valid.
// index doesn't exist so the token is considered valid. it is important to note that
// we do not use isAvailable as the lack of a shard being available is not equivalent
// to the index not existing in the case of revocation checking.
listener.onResponse(userToken);
} else {
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> {
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> {
MultiGetRequest mGetRequest = client.prepareMultiGet()
.add(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, getInvalidatedTokenDocumentId(userToken))
.add(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, getTokenDocumentId(userToken))

View File

@ -109,31 +109,36 @@ public class NativeUsersStore extends AbstractComponent {
*/
public void getUsers(String[] userNames, final ActionListener<Collection<User>> listener) {
final Consumer<Exception> handleException = (t) -> {
if (t instanceof IndexNotFoundException) {
logger.trace("could not retrieve users because security index does not exist");
// We don't invoke the onFailure listener here, instead just pass an empty list
listener.onResponse(Collections.emptyList());
} else {
listener.onFailure(t);
if (TransportActions.isShardNotAvailableException(t)) {
logger.trace("could not retrieve users because of a shard not available exception", t);
if (t instanceof IndexNotFoundException) {
// We don't invoke the onFailure listener here, instead just pass an empty list
// as the index doesn't exist. Could have been deleted between checks and execution
listener.onResponse(Collections.emptyList());
} else {
listener.onFailure(t);
}
}
listener.onFailure(t);
};
if (securityIndex.indexExists() == false) {
// TODO remove this short circuiting and fix tests that fail without this!
final SecurityIndexManager frozenSecurityIndex = this.securityIndex.freeze();
if (frozenSecurityIndex.indexExists() == false) {
listener.onResponse(Collections.emptyList());
} else if (frozenSecurityIndex.isAvailable() == false) {
listener.onFailure(frozenSecurityIndex.getUnavailableReason());
} else if (userNames.length == 1) { // optimization for single user lookup
final String username = userNames[0];
getUserAndPassword(username, ActionListener.wrap(
(uap) -> listener.onResponse(uap == null ? Collections.emptyList() : Collections.singletonList(uap.user())),
handleException));
} else {
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> {
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> {
final QueryBuilder query;
if (userNames == null || userNames.length == 0) {
query = QueryBuilders.termQuery(Fields.TYPE.getPreferredName(), USER_DOC_TYPE);
} else {
final String[] users = Arrays.asList(userNames).stream()
.map(s -> getIdForUser(USER_DOC_TYPE, s)).toArray(String[]::new);
final String[] users = Arrays.stream(userNames).map(s -> getIdForUser(USER_DOC_TYPE, s)).toArray(String[]::new);
query = QueryBuilders.boolQuery().filter(QueryBuilders.idsQuery(INDEX_TYPE).addIds(users));
}
final Supplier<ThreadContext.StoredContext> supplier = client.threadPool().getThreadContext().newRestorableContext(false);
@ -155,10 +160,13 @@ public class NativeUsersStore extends AbstractComponent {
}
void getUserCount(final ActionListener<Long> listener) {
if (securityIndex.indexExists() == false) {
final SecurityIndexManager frozenSecurityIndex = this.securityIndex.freeze();
if (frozenSecurityIndex.indexExists() == false) {
listener.onResponse(0L);
} else if (frozenSecurityIndex.isAvailable() == false) {
listener.onFailure(frozenSecurityIndex.getUnavailableReason());
} else {
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () ->
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () ->
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
client.prepareSearch(SECURITY_INDEX_NAME)
.setQuery(QueryBuilders.termQuery(Fields.TYPE.getPreferredName(), USER_DOC_TYPE))
@ -182,11 +190,16 @@ public class NativeUsersStore extends AbstractComponent {
* Async method to retrieve a user and their password
*/
private void getUserAndPassword(final String user, final ActionListener<UserAndPassword> listener) {
if (securityIndex.indexExists() == false) {
// TODO remove this short circuiting and fix tests that fail without this!
final SecurityIndexManager frozenSecurityIndex = securityIndex.freeze();
if (frozenSecurityIndex.isAvailable() == false) {
if (frozenSecurityIndex.indexExists()) {
logger.trace("could not retrieve user [{}] because security index does not exist", user);
} else {
logger.error("security index is unavailable. short circuiting retrieval of user [{}]", user);
}
listener.onResponse(null);
} else {
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () ->
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () ->
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
client.prepareGet(SECURITY_INDEX_NAME,
INDEX_TYPE, getIdForUser(USER_DOC_TYPE, user)).request(),
@ -459,16 +472,22 @@ public class NativeUsersStore extends AbstractComponent {
}
public void deleteUser(final DeleteUserRequest deleteUserRequest, final ActionListener<Boolean> listener) {
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> {
DeleteRequest request = client.prepareDelete(SECURITY_INDEX_NAME,
final SecurityIndexManager frozenSecurityIndex = securityIndex.freeze();
if (frozenSecurityIndex.indexExists() == false) {
listener.onResponse(false);
} else if (frozenSecurityIndex.isAvailable() == false) {
listener.onFailure(frozenSecurityIndex.getUnavailableReason());
} else {
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> {
DeleteRequest request = client.prepareDelete(SECURITY_INDEX_NAME,
INDEX_TYPE, getIdForUser(USER_DOC_TYPE, deleteUserRequest.username())).request();
request.setRefreshPolicy(deleteUserRequest.getRefreshPolicy());
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
request.setRefreshPolicy(deleteUserRequest.getRefreshPolicy());
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
clearRealmCache(deleteUserRequest.username(), listener,
deleteResponse.getResult() == DocWriteResponse.Result.DELETED);
deleteResponse.getResult() == DocWriteResponse.Result.DELETED);
}
@Override
@ -476,7 +495,8 @@ public class NativeUsersStore extends AbstractComponent {
listener.onFailure(e);
}
}, client::delete);
});
});
}
}
/**
@ -498,11 +518,13 @@ public class NativeUsersStore extends AbstractComponent {
}
void getReservedUserInfo(String username, ActionListener<ReservedUserInfo> listener) {
if (securityIndex.indexExists() == false) {
// TODO remove this short circuiting and fix tests that fail without this!
final SecurityIndexManager frozenSecurityIndex = securityIndex.freeze();
if (frozenSecurityIndex.indexExists() == false) {
listener.onResponse(null);
} else if (frozenSecurityIndex.isAvailable() == false) {
listener.onFailure(frozenSecurityIndex.getUnavailableReason());
} else {
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () ->
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () ->
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
client.prepareGet(SECURITY_INDEX_NAME, INDEX_TYPE,
getIdForUser(RESERVED_USER_TYPE, username)).request(),
@ -541,49 +563,56 @@ public class NativeUsersStore extends AbstractComponent {
}
void getAllReservedUserInfo(ActionListener<Map<String, ReservedUserInfo>> listener) {
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () ->
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
client.prepareSearch(SECURITY_INDEX_NAME)
final SecurityIndexManager frozenSecurityIndex = securityIndex.freeze();
if (frozenSecurityIndex.indexExists() == false) {
listener.onResponse(Collections.emptyMap());
} else if (frozenSecurityIndex.isAvailable() == false) {
listener.onFailure(frozenSecurityIndex.getUnavailableReason());
} else {
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () ->
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
client.prepareSearch(SECURITY_INDEX_NAME)
.setQuery(QueryBuilders.termQuery(Fields.TYPE.getPreferredName(), RESERVED_USER_TYPE))
.setFetchSource(true).request(),
new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
Map<String, ReservedUserInfo> userInfos = new HashMap<>();
assert searchResponse.getHits().getTotalHits() <= 10 :
new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
Map<String, ReservedUserInfo> userInfos = new HashMap<>();
assert searchResponse.getHits().getTotalHits() <= 10 :
"there are more than 10 reserved users we need to change this to retrieve them all!";
for (SearchHit searchHit : searchResponse.getHits().getHits()) {
Map<String, Object> sourceMap = searchHit.getSourceAsMap();
String password = (String) sourceMap.get(Fields.PASSWORD.getPreferredName());
Boolean enabled = (Boolean) sourceMap.get(Fields.ENABLED.getPreferredName());
final String id = searchHit.getId();
assert id != null && id.startsWith(RESERVED_USER_TYPE) :
for (SearchHit searchHit : searchResponse.getHits().getHits()) {
Map<String, Object> sourceMap = searchHit.getSourceAsMap();
String password = (String) sourceMap.get(Fields.PASSWORD.getPreferredName());
Boolean enabled = (Boolean) sourceMap.get(Fields.ENABLED.getPreferredName());
final String id = searchHit.getId();
assert id != null && id.startsWith(RESERVED_USER_TYPE) :
"id [" + id + "] does not start with reserved-user prefix";
final String username = id.substring(RESERVED_USER_TYPE.length() + 1);
if (password == null) {
listener.onFailure(new IllegalStateException("password hash must not be null!"));
return;
} else if (enabled == null) {
listener.onFailure(new IllegalStateException("enabled must not be null!"));
return;
final String username = id.substring(RESERVED_USER_TYPE.length() + 1);
if (password == null) {
listener.onFailure(new IllegalStateException("password hash must not be null!"));
return;
} else if (enabled == null) {
listener.onFailure(new IllegalStateException("enabled must not be null!"));
return;
} else {
userInfos.put(username, new ReservedUserInfo(password.toCharArray(), enabled, false));
}
}
listener.onResponse(userInfos);
}
@Override
public void onFailure(Exception e) {
if (e instanceof IndexNotFoundException) {
logger.trace("could not retrieve built in users since security index does not exist", e);
listener.onResponse(Collections.emptyMap());
} else {
userInfos.put(username, new ReservedUserInfo(password.toCharArray(), enabled, false));
logger.error("failed to retrieve built in users", e);
listener.onFailure(e);
}
}
listener.onResponse(userInfos);
}
@Override
public void onFailure(Exception e) {
if (e instanceof IndexNotFoundException) {
logger.trace("could not retrieve built in users since security index does not exist", e);
listener.onResponse(Collections.emptyMap());
} else {
logger.error("failed to retrieve built in users", e);
listener.onFailure(e);
}
}
}, client::search));
}, client::search));
}
}
private <Response> void clearRealmCache(String username, ActionListener<Response> listener, Response response) {

View File

@ -220,32 +220,35 @@ public class NativeRoleMappingStore extends AbstractComponent implements UserRol
});
}
private void innerDeleteMapping(DeleteRoleMappingRequest request, ActionListener<Boolean> listener) throws IOException {
if (securityIndex.isIndexUpToDate() == false) {
listener.onFailure(new IllegalStateException(
"Security index is not on the current version - the native realm will not be operational until " +
"the upgrade API is run on the security index"));
return;
}
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
client.prepareDelete(SECURITY_INDEX_NAME, SECURITY_GENERIC_TYPE, getIdForName(request.getName()))
private void innerDeleteMapping(DeleteRoleMappingRequest request, ActionListener<Boolean> listener) {
final SecurityIndexManager frozenSecurityIndex = securityIndex.freeze();
if (frozenSecurityIndex.indexExists() == false) {
listener.onResponse(false);
} else if (securityIndex.isAvailable() == false) {
listener.onFailure(frozenSecurityIndex.getUnavailableReason());
} else {
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> {
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
client.prepareDelete(SECURITY_INDEX_NAME, SECURITY_GENERIC_TYPE, getIdForName(request.getName()))
.setRefreshPolicy(request.getRefreshPolicy())
.request(),
new ActionListener<DeleteResponse>() {
new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
boolean deleted = deleteResponse.getResult() == DELETED;
listener.onResponse(deleted);
}
@Override
public void onResponse(DeleteResponse deleteResponse) {
boolean deleted = deleteResponse.getResult() == DELETED;
listener.onResponse(deleted);
}
@Override
public void onFailure(Exception e) {
logger.error(new ParameterizedMessage("failed to delete role-mapping [{}]", request.getName()), e);
listener.onFailure(e);
@Override
public void onFailure(Exception e) {
logger.error(new ParameterizedMessage("failed to delete role-mapping [{}]", request.getName()), e);
listener.onFailure(e);
}
}, client::delete);
}
}, client::delete);
});
}
}
/**
@ -301,7 +304,7 @@ public class NativeRoleMappingStore extends AbstractComponent implements UserRol
* </ul>
*/
public void usageStats(ActionListener<Map<String, Object>> listener) {
if (securityIndex.indexExists() == false) {
if (securityIndex.isAvailable() == false) {
reportStats(listener, Collections.emptyList());
} else {
getMappings(ActionListener.wrap(mappings -> reportStats(listener, mappings), listener::onFailure));

View File

@ -88,13 +88,18 @@ public class NativePrivilegeStore extends AbstractComponent {
public void getPrivileges(Collection<String> applications, Collection<String> names,
ActionListener<Collection<ApplicationPrivilegeDescriptor>> listener) {
if (applications != null && applications.size() == 1 && names != null && names.size() == 1) {
final SecurityIndexManager frozenSecurityIndex = securityIndexManager.freeze();
if (frozenSecurityIndex.indexExists() == false) {
listener.onResponse(Collections.emptyList());
} else if (frozenSecurityIndex.isAvailable() == false) {
listener.onFailure(frozenSecurityIndex.getUnavailableReason());
} else if (applications != null && applications.size() == 1 && names != null && names.size() == 1) {
getPrivilege(Objects.requireNonNull(Iterables.get(applications, 0)), Objects.requireNonNull(Iterables.get(names, 0)),
ActionListener.wrap(privilege ->
listener.onResponse(privilege == null ? Collections.emptyList() : Collections.singletonList(privilege)),
listener::onFailure));
} else {
securityIndexManager.prepareIndexIfNeededThenExecute(listener::onFailure, () -> {
securityIndexManager.checkIndexVersionThenExecute(listener::onFailure, () -> {
final QueryBuilder query;
final TermQueryBuilder typeQuery = QueryBuilders
.termQuery(ApplicationPrivilegeDescriptor.Fields.TYPE.getPreferredName(), DOC_TYPE_VALUE);
@ -134,33 +139,40 @@ public class NativePrivilegeStore extends AbstractComponent {
return collection == null || collection.isEmpty();
}
public void getPrivilege(String application, String name, ActionListener<ApplicationPrivilegeDescriptor> listener) {
securityIndexManager.prepareIndexIfNeededThenExecute(listener::onFailure,
() -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
client.prepareGet(SECURITY_INDEX_NAME, "doc", toDocId(application, name)).request(),
new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse response) {
if (response.isExists()) {
listener.onResponse(buildPrivilege(response.getId(), response.getSourceAsBytesRef()));
} else {
listener.onResponse(null);
void getPrivilege(String application, String name, ActionListener<ApplicationPrivilegeDescriptor> listener) {
final SecurityIndexManager frozenSecurityIndex = securityIndexManager.freeze();
if (frozenSecurityIndex.isAvailable() == false) {
logger.warn(new ParameterizedMessage("failed to load privilege [{}] index not available", name),
frozenSecurityIndex.getUnavailableReason());
listener.onResponse(null);
} else {
securityIndexManager.checkIndexVersionThenExecute(listener::onFailure,
() -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
client.prepareGet(SECURITY_INDEX_NAME, "doc", toDocId(application, name)).request(),
new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse response) {
if (response.isExists()) {
listener.onResponse(buildPrivilege(response.getId(), response.getSourceAsBytesRef()));
} else {
listener.onResponse(null);
}
}
}
@Override
public void onFailure(Exception e) {
// if the index or the shard is not there / available we just claim the privilege is not there
if (TransportActions.isShardNotAvailableException(e)) {
logger.warn(new ParameterizedMessage("failed to load privilege [{}] index not available", name), e);
listener.onResponse(null);
} else {
logger.error(new ParameterizedMessage("failed to load privilege [{}]", name), e);
listener.onFailure(e);
@Override
public void onFailure(Exception e) {
// if the index or the shard is not there / available we just claim the privilege is not there
if (TransportActions.isShardNotAvailableException(e)) {
logger.warn(new ParameterizedMessage("failed to load privilege [{}] index not available", name), e);
listener.onResponse(null);
} else {
logger.error(new ParameterizedMessage("failed to load privilege [{}]", name), e);
listener.onFailure(e);
}
}
}
},
client::get));
},
client::get));
}
}
public void putPrivileges(Collection<ApplicationPrivilegeDescriptor> privileges, WriteRequest.RefreshPolicy refreshPolicy,
@ -200,23 +212,30 @@ public class NativePrivilegeStore extends AbstractComponent {
public void deletePrivileges(String application, Collection<String> names, WriteRequest.RefreshPolicy refreshPolicy,
ActionListener<Map<String, List<String>>> listener) {
securityIndexManager.prepareIndexIfNeededThenExecute(listener::onFailure, () -> {
ActionListener<DeleteResponse> groupListener = new GroupedActionListener<>(
ActionListener.wrap(responses -> {
final Map<String, List<String>> deletedNames = responses.stream()
.filter(r -> r.getResult() == DocWriteResponse.Result.DELETED)
.map(r -> r.getId())
.map(NativePrivilegeStore::nameFromDocId)
.collect(TUPLES_TO_MAP);
clearRolesCache(listener, deletedNames);
}, listener::onFailure), names.size(), Collections.emptyList());
for (String name : names) {
ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
client.prepareDelete(SECURITY_INDEX_NAME, "doc", toDocId(application, name))
.setRefreshPolicy(refreshPolicy)
.request(), groupListener, client::delete);
}
});
final SecurityIndexManager frozenSecurityIndex = securityIndexManager.freeze();
if (frozenSecurityIndex.indexExists() == false) {
listener.onResponse(Collections.emptyMap());
} else if (frozenSecurityIndex.isAvailable() == false) {
listener.onFailure(frozenSecurityIndex.getUnavailableReason());
} else {
securityIndexManager.checkIndexVersionThenExecute(listener::onFailure, () -> {
ActionListener<DeleteResponse> groupListener = new GroupedActionListener<>(
ActionListener.wrap(responses -> {
final Map<String, List<String>> deletedNames = responses.stream()
.filter(r -> r.getResult() == DocWriteResponse.Result.DELETED)
.map(r -> r.getId())
.map(NativePrivilegeStore::nameFromDocId)
.collect(TUPLES_TO_MAP);
clearRolesCache(listener, deletedNames);
}, listener::onFailure), names.size(), Collections.emptyList());
for (String name : names) {
ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
client.prepareDelete(SECURITY_INDEX_NAME, "doc", toDocId(application, name))
.setRefreshPolicy(refreshPolicy)
.request(), groupListener, client::delete);
}
});
}
}
private <T> void clearRolesCache(ActionListener<T> listener, T value) {

View File

@ -116,7 +116,7 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer<Se
} else if (names != null && names.size() == 1) {
getRoleDescriptor(Objects.requireNonNull(names.iterator().next()), listener);
} else {
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> {
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> {
QueryBuilder query;
if (names == null || names.isEmpty()) {
query = QueryBuilders.termQuery(RoleDescriptor.Fields.TYPE.getPreferredName(), ROLE_TYPE);
@ -144,16 +144,22 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer<Se
}
public void deleteRole(final DeleteRoleRequest deleteRoleRequest, final ActionListener<Boolean> listener) {
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> {
DeleteRequest request = client.prepareDelete(SecurityIndexManager.SECURITY_INDEX_NAME,
final SecurityIndexManager frozenSecurityIndex = securityIndex.freeze();
if (frozenSecurityIndex.indexExists() == false) {
listener.onResponse(false);
} else if (frozenSecurityIndex.isAvailable() == false) {
listener.onFailure(frozenSecurityIndex.getUnavailableReason());
} else {
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> {
DeleteRequest request = client.prepareDelete(SecurityIndexManager.SECURITY_INDEX_NAME,
ROLE_DOC_TYPE, getIdForUser(deleteRoleRequest.name())).request();
request.setRefreshPolicy(deleteRoleRequest.getRefreshPolicy());
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
request.setRefreshPolicy(deleteRoleRequest.getRefreshPolicy());
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
clearRoleCache(deleteRoleRequest.name(), listener,
deleteResponse.getResult() == DocWriteResponse.Result.DELETED);
deleteResponse.getResult() == DocWriteResponse.Result.DELETED);
}
@Override
@ -162,7 +168,8 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer<Se
listener.onFailure(e);
}
}, client::delete);
});
});
}
}
public void putRole(final PutRoleRequest request, final RoleDescriptor role, final ActionListener<Boolean> listener) {
@ -210,13 +217,13 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer<Se
public void usageStats(ActionListener<Map<String, Object>> listener) {
Map<String, Object> usageStats = new HashMap<>(3);
if (securityIndex.indexExists() == false) {
if (securityIndex.isAvailable() == false) {
usageStats.put("size", 0L);
usageStats.put("fls", false);
usageStats.put("dls", false);
listener.onResponse(usageStats);
} else {
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () ->
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () ->
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
client.prepareMultiSearch()
.add(client.prepareSearch(SecurityIndexManager.SECURITY_INDEX_NAME)
@ -298,7 +305,7 @@ public class NativeRolesStore extends AbstractComponent implements BiConsumer<Se
}
private void executeGetRoleRequest(String role, ActionListener<GetResponse> listener) {
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () ->
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () ->
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
client.prepareGet(SECURITY_INDEX_NAME,
ROLE_DOC_TYPE, getIdForUser(role)).request(),

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.security.support;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
@ -13,6 +14,7 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@ -33,10 +35,10 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.xpack.core.template.TemplateUtils;
import org.elasticsearch.xpack.core.upgrade.IndexUpgradeCheckVersion;
@ -63,7 +65,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
/**
* Manages the lifecycle of a single index, its template, mapping and and data upgrades/migrations.
*/
public class SecurityIndexManager extends AbstractComponent implements ClusterStateListener {
public class SecurityIndexManager implements ClusterStateListener {
public static final String INTERNAL_SECURITY_INDEX = ".security-" + IndexUpgradeCheckVersion.UPRADE_VERSION;
public static final int INTERNAL_INDEX_FORMAT = 6;
@ -71,19 +73,28 @@ public class SecurityIndexManager extends AbstractComponent implements ClusterSt
public static final String TEMPLATE_VERSION_PATTERN = Pattern.quote("${security.template.version}");
public static final String SECURITY_TEMPLATE_NAME = "security-index-template";
public static final String SECURITY_INDEX_NAME = ".security";
private static final Logger LOGGER = LogManager.getLogger(SecurityIndexManager.class);
private final String indexName;
private final Client client;
private final List<BiConsumer<State, State>> stateChangeListeners = new CopyOnWriteArrayList<>();
private volatile State indexState = new State(false, false, false, false, null, null);
private volatile State indexState;
public SecurityIndexManager(Settings settings, Client client, String indexName, ClusterService clusterService) {
super(settings);
public SecurityIndexManager(Client client, String indexName, ClusterService clusterService) {
this(client, indexName, new State(false, false, false, false, null, null));
clusterService.addListener(this);
}
private SecurityIndexManager(Client client, String indexName, State indexState) {
this.client = client;
this.indexName = indexName;
clusterService.addListener(this);
this.indexState = indexState;
}
public SecurityIndexManager freeze() {
return new SecurityIndexManager(null, indexName, indexState);
}
public static List<String> indexNames() {
@ -116,6 +127,19 @@ public class SecurityIndexManager extends AbstractComponent implements ClusterSt
return this.indexState.mappingUpToDate;
}
public ElasticsearchException getUnavailableReason() {
final State localState = this.indexState;
if (localState.indexAvailable) {
throw new IllegalStateException("caller must make sure to use a frozen state and check indexAvailable");
}
if (localState.indexExists) {
return new UnavailableShardsException(null, "at least one primary shard for the security index is unavailable");
} else {
return new IndexNotFoundException(SECURITY_INDEX_NAME);
}
}
/**
* Add a listener for notifications on state changes to the configured index.
*
@ -130,7 +154,7 @@ public class SecurityIndexManager extends AbstractComponent implements ClusterSt
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until the gateway has recovered from disk, otherwise we think we don't have the
// .security index but they may not have been restored from the cluster state on disk
logger.debug("security index manager waiting until state has been recovered");
LOGGER.debug("security index manager waiting until state has been recovered");
return;
}
final State previousState = indexState;
@ -158,7 +182,7 @@ public class SecurityIndexManager extends AbstractComponent implements ClusterSt
if (routingTable != null && routingTable.allPrimaryShardsActive()) {
return true;
}
logger.debug("Security index [{}] is not yet active", indexName);
LOGGER.debug("Security index [{}] is not yet active", indexName);
return false;
}
@ -187,7 +211,7 @@ public class SecurityIndexManager extends AbstractComponent implements ClusterSt
private boolean checkIndexMappingVersionMatches(ClusterState clusterState,
Predicate<Version> predicate) {
return checkIndexMappingVersionMatches(indexName, clusterState, logger, predicate);
return checkIndexMappingVersionMatches(indexName, clusterState, LOGGER, predicate);
}
public static boolean checkIndexMappingVersionMatches(String indexName,
@ -198,7 +222,7 @@ public class SecurityIndexManager extends AbstractComponent implements ClusterSt
}
private Version oldestIndexMappingVersion(ClusterState clusterState) {
final Set<Version> versions = loadIndexMappingVersions(indexName, clusterState, logger);
final Set<Version> versions = loadIndexMappingVersions(indexName, clusterState, LOGGER);
return versions.stream().min(Version::compareTo).orElse(null);
}
@ -253,6 +277,23 @@ public class SecurityIndexManager extends AbstractComponent implements ClusterSt
}
}
/**
* Validates the security index is up to date and does not need to migrated. If it is not, the
* consumer is called with an exception. If the security index is up to date, the runnable will
* be executed. <b>NOTE:</b> this method does not check the availability of the index; this check
* is left to the caller so that this condition can be handled appropriately.
*/
public void checkIndexVersionThenExecute(final Consumer<Exception> consumer, final Runnable andThen) {
final State indexState = this.indexState; // use a local copy so all checks execute against the same state!
if (indexState.indexExists && indexState.isIndexUpToDate == false) {
consumer.accept(new IllegalStateException(
"Security index is not on the current version. Security features relying on the index will not be available until " +
"the upgrade API is run on the security index"));
} else {
andThen.run();
}
}
/**
* Prepares the index by creating it if it doesn't exist or updating the mappings if the mappings are
* out of date. After any tasks have been executed, the runnable is then executed.

View File

@ -41,7 +41,8 @@ import org.elasticsearch.transport.Transport;
import org.elasticsearch.xpack.core.TestXPackTransportClient;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.security.SecurityField;
import org.elasticsearch.xpack.core.security.action.user.GetUsersResponse;
import org.elasticsearch.xpack.core.security.action.user.PutUserResponse;
import org.elasticsearch.xpack.core.security.authc.support.Hasher;
import org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken;
import org.elasticsearch.xpack.core.security.client.SecurityClient;
import org.elasticsearch.xpack.security.LocalStateSecurity;
@ -231,7 +232,7 @@ public class LicensingTests extends SecurityIntegTestCase {
Settings settings = internalCluster().transportClient().settings();
try (TransportClient client = new TestXPackTransportClient(settings, LocalStateSecurity.class)) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
new SecurityClient(client).prepareGetUsers().get();
new SecurityClient(client).preparePutUser("john", "password".toCharArray(), Hasher.BCRYPT).get();
fail("security actions should not be enabled!");
} catch (ElasticsearchSecurityException e) {
assertThat(e.status(), is(RestStatus.FORBIDDEN));
@ -245,7 +246,7 @@ public class LicensingTests extends SecurityIntegTestCase {
// security actions should work!
try (TransportClient client = new TestXPackTransportClient(settings, LocalStateSecurity.class)) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
GetUsersResponse response = new SecurityClient(client).prepareGetUsers().get();
PutUserResponse response = new SecurityClient(client).preparePutUser("john", "password".toCharArray(), Hasher.BCRYPT).get();
assertNotNull(response);
}
}

View File

@ -163,6 +163,13 @@ public class TransportSamlInvalidateSessionActionTests extends SamlTestCase {
((Runnable) inv.getArguments()[1]).run();
return null;
}).when(securityIndex).prepareIndexIfNeededThenExecute(any(Consumer.class), any(Runnable.class));
doAnswer(inv -> {
((Runnable) inv.getArguments()[1]).run();
return null;
}).when(securityIndex).checkIndexVersionThenExecute(any(Consumer.class), any(Runnable.class));
when(securityIndex.isAvailable()).thenReturn(true);
when(securityIndex.indexExists()).thenReturn(true);
when(securityIndex.freeze()).thenReturn(securityIndex);
final ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
tokenService = new TokenService(settings, Clock.systemUTC(), client, securityIndex, clusterService);

View File

@ -178,6 +178,11 @@ public class TransportSamlLogoutActionTests extends SamlTestCase {
((Runnable) inv.getArguments()[1]).run();
return null;
}).when(securityIndex).prepareIndexIfNeededThenExecute(any(Consumer.class), any(Runnable.class));
doAnswer(inv -> {
((Runnable) inv.getArguments()[1]).run();
return null;
}).when(securityIndex).checkIndexVersionThenExecute(any(Consumer.class), any(Runnable.class));
when(securityIndex.isAvailable()).thenReturn(true);
final ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
tokenService = new TokenService(settings, Clock.systemUTC(), client, securityIndex, clusterService);

View File

@ -87,6 +87,7 @@ import java.util.function.Consumer;
import static org.elasticsearch.test.SecurityTestsUtils.assertAuthenticationException;
import static org.elasticsearch.xpack.core.security.support.Exceptions.authenticationError;
import static org.elasticsearch.xpack.security.authc.TokenServiceTests.mockCheckTokenInvalidationFromId;
import static org.elasticsearch.xpack.security.authc.TokenServiceTests.mockGetTokenFromId;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.contains;
@ -187,6 +188,11 @@ public class AuthenticationServiceTests extends ESTestCase {
runnable.run();
return null;
}).when(securityIndex).prepareIndexIfNeededThenExecute(any(Consumer.class), any(Runnable.class));
doAnswer(invocationOnMock -> {
Runnable runnable = (Runnable) invocationOnMock.getArguments()[1];
runnable.run();
return null;
}).when(securityIndex).checkIndexVersionThenExecute(any(Consumer.class), any(Runnable.class));
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
tokenService = new TokenService(settings, Clock.systemUTC(), client, securityIndex, clusterService);
service = new AuthenticationService(settings, realms, auditTrail,
@ -898,7 +904,11 @@ public class AuthenticationServiceTests extends ESTestCase {
tokenService.createUserToken(expected, originatingAuth, tokenFuture, Collections.emptyMap(), true);
}
String token = tokenService.getUserTokenString(tokenFuture.get().v1());
when(client.prepareMultiGet()).thenReturn(new MultiGetRequestBuilder(client, MultiGetAction.INSTANCE));
mockGetTokenFromId(tokenFuture.get().v1(), client);
mockCheckTokenInvalidationFromId(tokenFuture.get().v1(), client);
when(securityIndex.isAvailable()).thenReturn(true);
when(securityIndex.indexExists()).thenReturn(true);
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.putHeader("Authorization", "Bearer " + token);
service.authenticate("_action", message, (User)null, ActionListener.wrap(result -> {

View File

@ -137,6 +137,13 @@ public class TokenServiceTests extends ESTestCase {
runnable.run();
return null;
}).when(securityIndex).prepareIndexIfNeededThenExecute(any(Consumer.class), any(Runnable.class));
doAnswer(invocationOnMock -> {
Runnable runnable = (Runnable) invocationOnMock.getArguments()[1];
runnable.run();
return null;
}).when(securityIndex).checkIndexVersionThenExecute(any(Consumer.class), any(Runnable.class));
when(securityIndex.indexExists()).thenReturn(true);
when(securityIndex.isAvailable()).thenReturn(true);
this.clusterService = ClusterServiceUtils.createClusterService(threadPool);
}
@ -161,6 +168,7 @@ public class TokenServiceTests extends ESTestCase {
final UserToken token = tokenFuture.get().v1();
assertNotNull(token);
mockGetTokenFromId(token);
mockCheckTokenInvalidationFromId(token);
ThreadContext requestContext = new ThreadContext(Settings.EMPTY);
requestContext.putHeader("Authorization", randomFrom("Bearer ", "BEARER ", "bearer ") + tokenService.getUserTokenString(token));
@ -207,6 +215,7 @@ public class TokenServiceTests extends ESTestCase {
final UserToken token = tokenFuture.get().v1();
assertNotNull(token);
mockGetTokenFromId(token);
mockCheckTokenInvalidationFromId(token);
ThreadContext requestContext = new ThreadContext(Settings.EMPTY);
requestContext.putHeader("Authorization", "Bearer " + tokenService.getUserTokenString(token));
@ -266,6 +275,7 @@ public class TokenServiceTests extends ESTestCase {
final UserToken token = tokenFuture.get().v1();
assertNotNull(token);
mockGetTokenFromId(token);
mockCheckTokenInvalidationFromId(token);
ThreadContext requestContext = new ThreadContext(Settings.EMPTY);
requestContext.putHeader("Authorization", "Bearer " + tokenService.getUserTokenString(token));
@ -296,6 +306,7 @@ public class TokenServiceTests extends ESTestCase {
final UserToken token = tokenFuture.get().v1();
assertNotNull(token);
mockGetTokenFromId(token);
mockCheckTokenInvalidationFromId(token);
ThreadContext requestContext = new ThreadContext(Settings.EMPTY);
requestContext.putHeader("Authorization", "Bearer " + tokenService.getUserTokenString(token));
@ -357,6 +368,7 @@ public class TokenServiceTests extends ESTestCase {
final UserToken token = tokenFuture.get().v1();
assertNotNull(token);
mockGetTokenFromId(token);
mockCheckTokenInvalidationFromId(token);
ThreadContext requestContext = new ThreadContext(Settings.EMPTY);
requestContext.putHeader("Authorization", "Bearer " + tokenService.getUserTokenString(token));
@ -454,6 +466,7 @@ public class TokenServiceTests extends ESTestCase {
tokenService.createUserToken(authentication, authentication, tokenFuture, Collections.emptyMap(), true);
final UserToken token = tokenFuture.get().v1();
mockGetTokenFromId(token);
mockCheckTokenInvalidationFromId(token);
ThreadContext requestContext = new ThreadContext(Settings.EMPTY);
requestContext.putHeader("Authorization", "Bearer " + tokenService.getUserTokenString(token));
@ -577,14 +590,25 @@ public class TokenServiceTests extends ESTestCase {
try (ThreadContext.StoredContext ignore = requestContext.newStoredContext(true)) {
PlainActionFuture<UserToken> future = new PlainActionFuture<>();
tokenService.getAndValidateToken(requestContext, future);
UserToken serialized = future.get();
assertEquals(authentication, serialized.getAuthentication());
assertNull(future.get());
when(securityIndex.isAvailable()).thenReturn(false);
when(securityIndex.indexExists()).thenReturn(true);
future = new PlainActionFuture<>();
tokenService.getAndValidateToken(requestContext, future);
assertNull(future.get());
when(securityIndex.indexExists()).thenReturn(false);
future = new PlainActionFuture<>();
tokenService.getAndValidateToken(requestContext, future);
assertNull(future.get());
when(securityIndex.isAvailable()).thenReturn(true);
when(securityIndex.indexExists()).thenReturn(true);
mockCheckTokenInvalidationFromId(token);
future = new PlainActionFuture<>();
tokenService.getAndValidateToken(requestContext, future);
assertEquals(token.getAuthentication(), future.get().getAuthentication());
}
}
@ -625,4 +649,38 @@ public class TokenServiceTests extends ESTestCase {
return Void.TYPE;
}).when(client).get(any(GetRequest.class), any(ActionListener.class));
}
private void mockCheckTokenInvalidationFromId(UserToken userToken) {
mockCheckTokenInvalidationFromId(userToken, client);
}
public static void mockCheckTokenInvalidationFromId(UserToken userToken, Client client) {
doAnswer(invocationOnMock -> {
MultiGetRequest request = (MultiGetRequest) invocationOnMock.getArguments()[0];
ActionListener<MultiGetResponse> listener = (ActionListener<MultiGetResponse>) invocationOnMock.getArguments()[1];
MultiGetResponse response = mock(MultiGetResponse.class);
MultiGetItemResponse[] responses = new MultiGetItemResponse[2];
when(response.getResponses()).thenReturn(responses);
GetResponse legacyResponse = mock(GetResponse.class);
responses[0] = new MultiGetItemResponse(legacyResponse, null);
when(legacyResponse.isExists()).thenReturn(false);
GetResponse tokenResponse = mock(GetResponse.class);
if (userToken.getId().equals(request.getItems().get(1).id().replace("token_", ""))) {
when(tokenResponse.isExists()).thenReturn(true);
Map<String, Object> sourceMap = new HashMap<>();
try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) {
userToken.toXContent(builder, ToXContent.EMPTY_PARAMS);
Map<String, Object> accessTokenMap = new HashMap<>();
accessTokenMap.put("user_token",
XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(builder), false));
accessTokenMap.put("invalidated", false);
sourceMap.put("access_token", accessTokenMap);
}
when(tokenResponse.getSource()).thenReturn(sourceMap);
}
responses[1] = new MultiGetItemResponse(tokenResponse, null);
listener.onResponse(response);
return Void.TYPE;
}).when(client).multiGet(any(MultiGetRequest.class), any(ActionListener.class));
}
}

View File

@ -237,11 +237,17 @@ public class NativeUsersStoreTests extends ESTestCase {
when(securityIndex.indexExists()).thenReturn(true);
when(securityIndex.isMappingUpToDate()).thenReturn(true);
when(securityIndex.isIndexUpToDate()).thenReturn(true);
when(securityIndex.freeze()).thenReturn(securityIndex);
doAnswer((i) -> {
Runnable action = (Runnable) i.getArguments()[1];
action.run();
return null;
}).when(securityIndex).prepareIndexIfNeededThenExecute(any(Consumer.class), any(Runnable.class));
doAnswer((i) -> {
Runnable action = (Runnable) i.getArguments()[1];
action.run();
return null;
}).when(securityIndex).checkIndexVersionThenExecute(any(Consumer.class), any(Runnable.class));
return new NativeUsersStore(Settings.EMPTY, client, securityIndex);
}

View File

@ -89,6 +89,8 @@ public class NativePrivilegeStoreTests extends ESTestCase {
}
};
final SecurityIndexManager securityIndex = mock(SecurityIndexManager.class);
when(securityIndex.freeze()).thenReturn(securityIndex);
when(securityIndex.indexExists()).thenReturn(true);
when(securityIndex.isAvailable()).thenReturn(true);
Mockito.doAnswer(invocationOnMock -> {
assertThat(invocationOnMock.getArguments().length, equalTo(2));
@ -96,6 +98,12 @@ public class NativePrivilegeStoreTests extends ESTestCase {
((Runnable) invocationOnMock.getArguments()[1]).run();
return null;
}).when(securityIndex).prepareIndexIfNeededThenExecute(any(Consumer.class), any(Runnable.class));
Mockito.doAnswer(invocationOnMock -> {
assertThat(invocationOnMock.getArguments().length, equalTo(2));
assertThat(invocationOnMock.getArguments()[1], instanceOf(Runnable.class));
((Runnable) invocationOnMock.getArguments()[1]).run();
return null;
}).when(securityIndex).checkIndexVersionThenExecute(any(Consumer.class), any(Runnable.class));
store = new NativePrivilegeStore(Settings.EMPTY, client, securityIndex);
}

View File

@ -188,7 +188,7 @@ public class NativeRolesStoreTests extends ESTestCase {
final XPackLicenseState licenseState = mock(XPackLicenseState.class);
final AtomicBoolean methodCalled = new AtomicBoolean(false);
final SecurityIndexManager securityIndex =
new SecurityIndexManager(Settings.EMPTY, client, SecurityIndexManager.SECURITY_INDEX_NAME, clusterService);
new SecurityIndexManager(client, SecurityIndexManager.SECURITY_INDEX_NAME, clusterService);
final NativeRolesStore rolesStore = new NativeRolesStore(Settings.EMPTY, client, licenseState, securityIndex) {
@Override
void innerPutRole(final PutRoleRequest request, final RoleDescriptor role, final ActionListener<Boolean> listener) {

View File

@ -62,7 +62,7 @@ public class SecurityIndexManagerTests extends ESTestCase {
private static final ClusterName CLUSTER_NAME = new ClusterName("security-index-manager-tests");
private static final ClusterState EMPTY_CLUSTER_STATE = new ClusterState.Builder(CLUSTER_NAME).build();
public static final String INDEX_NAME = ".security";
private static final String INDEX_NAME = ".security";
private static final String TEMPLATE_NAME = "SecurityIndexManagerTests-template";
private SecurityIndexManager manager;
private Map<Action<?>, Map<ActionRequest, ActionListener<?>>> actions;
@ -86,7 +86,7 @@ public class SecurityIndexManagerTests extends ESTestCase {
actions.put(action, map);
}
};
manager = new SecurityIndexManager(Settings.EMPTY, client, INDEX_NAME, clusterService);
manager = new SecurityIndexManager(client, INDEX_NAME, clusterService);
}
public void testIndexWithUpToDateMappingAndTemplate() throws IOException {