mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-24 22:09:24 +00:00
Add Debug/Trace logging to token service (#34022)
The token service has fairly strict validation and there are a range of reasons why request may be rejected. The detail is typically returned in the client exception / json body but the ES admin can only debug that if they have access to detailed logs from the client. This commit adds debug & trace logging to the token service so that it is possible to perform this debugging from the server side if necessary.
This commit is contained in:
parent
e0cab14c6e
commit
e0a1803638
@ -11,6 +11,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
@ -60,6 +61,7 @@ final class ExpiredTokenRemover extends AbstractRunnable {
|
||||
.filter(QueryBuilders.boolQuery()
|
||||
.should(QueryBuilders.rangeQuery("expiration_time").lte(now.toEpochMilli()))
|
||||
.should(QueryBuilders.rangeQuery("creation_time").lte(now.minus(24L, ChronoUnit.HOURS).toEpochMilli()))));
|
||||
logger.trace(() -> new ParameterizedMessage("Removing old tokens: [{}]", Strings.toString(expiredDbq)));
|
||||
executeAsyncWithOrigin(client, SECURITY_ORIGIN, DeleteByQueryAction.INSTANCE, expiredDbq,
|
||||
ActionListener.wrap(r -> {
|
||||
debugDbqResponse(r);
|
||||
|
@ -8,11 +8,8 @@ package org.elasticsearch.xpack.security.authc;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefBuilder;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.apache.lucene.util.UnicodeUtil;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchSecurityException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.Version;
|
||||
@ -36,9 +33,12 @@ import org.elasticsearch.action.update.UpdateResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ack.AckedRequest;
|
||||
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.cache.Cache;
|
||||
@ -60,15 +60,16 @@ import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.common.util.iterable.Iterables;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.index.engine.DocumentMissingException;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.xpack.core.XPackField;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
import org.elasticsearch.xpack.core.XPackSettings;
|
||||
import org.elasticsearch.xpack.core.XPackField;
|
||||
import org.elasticsearch.xpack.core.security.ScrollHelper;
|
||||
import org.elasticsearch.xpack.core.security.authc.Authentication;
|
||||
import org.elasticsearch.xpack.core.security.authc.KeyAndTimestamp;
|
||||
@ -114,6 +115,7 @@ import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException;
|
||||
@ -159,6 +161,7 @@ public final class TokenService extends AbstractComponent {
|
||||
static final String INVALIDATED_TOKEN_DOC_TYPE = "invalidated-token";
|
||||
static final int MINIMUM_BYTES = VERSION_BYTES + SALT_BYTES + IV_BYTES + 1;
|
||||
private static final int MINIMUM_BASE64_BYTES = Double.valueOf(Math.ceil((4 * MINIMUM_BYTES) / 3)).intValue();
|
||||
private static final int MAX_RETRY_ATTEMPTS = 5;
|
||||
|
||||
private final SecureRandom secureRandom = new SecureRandom();
|
||||
private final ClusterService clusterService;
|
||||
@ -217,9 +220,10 @@ public final class TokenService extends AbstractComponent {
|
||||
boolean includeRefreshToken) throws IOException {
|
||||
ensureEnabled();
|
||||
if (authentication == null) {
|
||||
listener.onFailure(new IllegalArgumentException("authentication must be provided"));
|
||||
listener.onFailure(traceLog("create token", null, new IllegalArgumentException("authentication must be provided")));
|
||||
} else if (originatingClientAuth == null) {
|
||||
listener.onFailure(new IllegalArgumentException("originating client authentication must be provided"));
|
||||
listener.onFailure(traceLog("create token", null,
|
||||
new IllegalArgumentException("originating client authentication must be provided")));
|
||||
} else {
|
||||
final Instant created = clock.instant();
|
||||
final Instant expiration = getExpirationTime(created);
|
||||
@ -252,16 +256,17 @@ public final class TokenService extends AbstractComponent {
|
||||
.field("realm", authentication.getAuthenticatedBy().getName())
|
||||
.endObject();
|
||||
builder.endObject();
|
||||
final String documentId = getTokenDocumentId(userToken);
|
||||
IndexRequest request =
|
||||
client.prepareIndex(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, getTokenDocumentId(userToken))
|
||||
client.prepareIndex(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, documentId)
|
||||
.setOpType(OpType.CREATE)
|
||||
.setSource(builder)
|
||||
.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL)
|
||||
.request();
|
||||
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () ->
|
||||
executeAsyncWithOrigin(client, SECURITY_ORIGIN, IndexAction.INSTANCE, request,
|
||||
ActionListener.wrap(indexResponse -> listener.onResponse(new Tuple<>(userToken, refreshToken)),
|
||||
listener::onFailure))
|
||||
securityIndex.prepareIndexIfNeededThenExecute(ex -> listener.onFailure(traceLog("prepare security index", documentId, ex)),
|
||||
() -> executeAsyncWithOrigin(client, SECURITY_ORIGIN, IndexAction.INSTANCE, request,
|
||||
ActionListener.wrap(indexResponse -> listener.onResponse(new Tuple<>(userToken, refreshToken)),
|
||||
listener::onFailure))
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -323,7 +328,7 @@ public final class TokenService extends AbstractComponent {
|
||||
Instant currentTime = clock.instant();
|
||||
if (currentTime.isAfter(userToken.getExpirationTime())) {
|
||||
// token expired
|
||||
listener.onFailure(expiredTokenException());
|
||||
listener.onFailure(traceLog("decode token", token, expiredTokenException()));
|
||||
} else {
|
||||
checkIfTokenIsRevoked(userToken, listener);
|
||||
}
|
||||
@ -366,42 +371,44 @@ public final class TokenService extends AbstractComponent {
|
||||
logger.warn("failed to get token [{}] since index is not available", tokenId);
|
||||
listener.onResponse(null);
|
||||
} else {
|
||||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> {
|
||||
final GetRequest getRequest =
|
||||
client.prepareGet(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE,
|
||||
getTokenDocumentId(tokenId)).request();
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest,
|
||||
ActionListener.<GetResponse>wrap(response -> {
|
||||
if (response.isExists()) {
|
||||
Map<String, Object> accessTokenSource =
|
||||
(Map<String, Object>) response.getSource().get("access_token");
|
||||
if (accessTokenSource == null) {
|
||||
listener.onFailure(new IllegalStateException("token document is missing " +
|
||||
"the access_token field"));
|
||||
} else if (accessTokenSource.containsKey("user_token") == false) {
|
||||
listener.onFailure(new IllegalStateException("token document is missing " +
|
||||
"the user_token field"));
|
||||
securityIndex.checkIndexVersionThenExecute(
|
||||
ex -> listener.onFailure(traceLog("prepare security index", tokenId, ex)),
|
||||
() -> {
|
||||
final GetRequest getRequest = client.prepareGet(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE,
|
||||
getTokenDocumentId(tokenId)).request();
|
||||
Consumer<Exception> onFailure = ex -> listener.onFailure(traceLog("decode token", tokenId, ex));
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest,
|
||||
ActionListener.<GetResponse>wrap(response -> {
|
||||
if (response.isExists()) {
|
||||
Map<String, Object> accessTokenSource =
|
||||
(Map<String, Object>) response.getSource().get("access_token");
|
||||
if (accessTokenSource == null) {
|
||||
onFailure.accept(new IllegalStateException(
|
||||
"token document is missing the access_token field"));
|
||||
} else if (accessTokenSource.containsKey("user_token") == false) {
|
||||
onFailure.accept(new IllegalStateException(
|
||||
"token document is missing the user_token field"));
|
||||
} else {
|
||||
Map<String, Object> userTokenSource =
|
||||
(Map<String, Object>) accessTokenSource.get("user_token");
|
||||
listener.onResponse(UserToken.fromSourceMap(userTokenSource));
|
||||
}
|
||||
} else {
|
||||
Map<String, Object> userTokenSource =
|
||||
(Map<String, Object>) accessTokenSource.get("user_token");
|
||||
listener.onResponse(UserToken.fromSourceMap(userTokenSource));
|
||||
onFailure.accept(
|
||||
new IllegalStateException("token document is missing and must be present"));
|
||||
}
|
||||
} else {
|
||||
listener.onFailure(
|
||||
new IllegalStateException("token document is missing and must be present"));
|
||||
}
|
||||
}, e -> {
|
||||
// if the index or the shard is not there / available we assume that
|
||||
// the token is not valid
|
||||
if (isShardNotAvailableException(e)) {
|
||||
logger.warn("failed to get token [{}] since index is not available", tokenId);
|
||||
listener.onResponse(null);
|
||||
} else {
|
||||
logger.error(new ParameterizedMessage("failed to get token [{}]", tokenId), e);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}), client::get);
|
||||
});
|
||||
}, e -> {
|
||||
// if the index or the shard is not there / available we assume that
|
||||
// the token is not valid
|
||||
if (isShardNotAvailableException(e)) {
|
||||
logger.warn("failed to get token [{}] since index is not available", tokenId);
|
||||
listener.onResponse(null);
|
||||
} else {
|
||||
logger.error(new ParameterizedMessage("failed to get token [{}]", tokenId), e);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}), client::get);
|
||||
});
|
||||
}}, listener::onFailure));
|
||||
} else {
|
||||
decryptToken(in, cipher, version, listener);
|
||||
@ -466,13 +473,14 @@ public final class TokenService extends AbstractComponent {
|
||||
public void invalidateAccessToken(String tokenString, ActionListener<Boolean> listener) {
|
||||
ensureEnabled();
|
||||
if (Strings.isNullOrEmpty(tokenString)) {
|
||||
logger.trace("No token-string provided");
|
||||
listener.onFailure(new IllegalArgumentException("token must be provided"));
|
||||
} else {
|
||||
maybeStartTokenRemover();
|
||||
try {
|
||||
decodeToken(tokenString, ActionListener.wrap(userToken -> {
|
||||
if (userToken == null) {
|
||||
listener.onFailure(malformedTokenException());
|
||||
listener.onFailure(traceLog("invalidate token", tokenString, malformedTokenException()));
|
||||
} else {
|
||||
final long expirationEpochMilli = getExpirationTime().toEpochMilli();
|
||||
indexBwcInvalidation(userToken, listener, new AtomicInteger(0), expirationEpochMilli);
|
||||
@ -493,6 +501,7 @@ public final class TokenService extends AbstractComponent {
|
||||
public void invalidateAccessToken(UserToken userToken, ActionListener<Boolean> listener) {
|
||||
ensureEnabled();
|
||||
if (userToken == null) {
|
||||
logger.trace("No access token provided");
|
||||
listener.onFailure(new IllegalArgumentException("token must be provided"));
|
||||
} else {
|
||||
maybeStartTokenRemover();
|
||||
@ -504,6 +513,7 @@ public final class TokenService extends AbstractComponent {
|
||||
public void invalidateRefreshToken(String refreshToken, ActionListener<Boolean> listener) {
|
||||
ensureEnabled();
|
||||
if (Strings.isNullOrEmpty(refreshToken)) {
|
||||
logger.trace("No refresh token provided");
|
||||
listener.onFailure(new IllegalArgumentException("refresh token must be provided"));
|
||||
} else {
|
||||
maybeStartTokenRemover();
|
||||
@ -526,7 +536,8 @@ public final class TokenService extends AbstractComponent {
|
||||
*/
|
||||
private void indexBwcInvalidation(UserToken userToken, ActionListener<Boolean> listener, AtomicInteger attemptCount,
|
||||
long expirationEpochMilli) {
|
||||
if (attemptCount.get() > 5) {
|
||||
if (attemptCount.get() > MAX_RETRY_ATTEMPTS) {
|
||||
logger.warn("Failed to invalidate token [{}] after [{}] attempts", userToken.getId(), attemptCount.get());
|
||||
listener.onFailure(invalidGrantException("failed to invalidate token"));
|
||||
} else {
|
||||
final String invalidatedTokenId = getInvalidatedTokenDocumentId(userToken);
|
||||
@ -537,14 +548,15 @@ public final class TokenService extends AbstractComponent {
|
||||
.request();
|
||||
final String tokenDocId = getTokenDocumentId(userToken);
|
||||
final Version version = userToken.getVersion();
|
||||
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () ->
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, indexRequest,
|
||||
securityIndex.prepareIndexIfNeededThenExecute(ex -> listener.onFailure(traceLog("prepare security index", tokenDocId, ex)),
|
||||
() -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, indexRequest,
|
||||
ActionListener.<IndexResponse>wrap(indexResponse -> {
|
||||
ActionListener<Boolean> wrappedListener =
|
||||
ActionListener.wrap(ignore -> listener.onResponse(true), listener::onFailure);
|
||||
indexInvalidation(tokenDocId, version, wrappedListener, attemptCount, "access_token", 1L);
|
||||
}, e -> {
|
||||
Throwable cause = ExceptionsHelper.unwrapCause(e);
|
||||
traceLog("(bwc) invalidate token", tokenDocId, cause);
|
||||
if (cause instanceof VersionConflictEngineException) {
|
||||
// expected since something else could have invalidated
|
||||
ActionListener<Boolean> wrappedListener =
|
||||
@ -571,7 +583,8 @@ public final class TokenService extends AbstractComponent {
|
||||
*/
|
||||
private void indexInvalidation(String tokenDocId, Version version, ActionListener<Boolean> listener, AtomicInteger attemptCount,
|
||||
String srcPrefix, long documentVersion) {
|
||||
if (attemptCount.get() > 5) {
|
||||
if (attemptCount.get() > MAX_RETRY_ATTEMPTS) {
|
||||
logger.warn("Failed to invalidate token [{}] after [{}] attempts", tokenDocId, attemptCount.get());
|
||||
listener.onFailure(invalidGrantException("failed to invalidate token"));
|
||||
} else {
|
||||
UpdateRequest request = client.prepareUpdate(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, tokenDocId)
|
||||
@ -579,75 +592,79 @@ public final class TokenService extends AbstractComponent {
|
||||
.setVersion(documentVersion)
|
||||
.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL)
|
||||
.request();
|
||||
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () ->
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
|
||||
ActionListener.<UpdateResponse>wrap(updateResponse -> {
|
||||
if (updateResponse.getGetResult() != null
|
||||
&& updateResponse.getGetResult().sourceAsMap().containsKey(srcPrefix)
|
||||
&& ((Map<String, Object>) updateResponse.getGetResult().sourceAsMap().get(srcPrefix))
|
||||
.containsKey("invalidated")) {
|
||||
final boolean prevInvalidated = (boolean)
|
||||
((Map<String, Object>) updateResponse.getGetResult().sourceAsMap().get(srcPrefix))
|
||||
.get("invalidated");
|
||||
listener.onResponse(prevInvalidated == false);
|
||||
} else {
|
||||
listener.onResponse(true);
|
||||
}
|
||||
}, e -> {
|
||||
Throwable cause = ExceptionsHelper.unwrapCause(e);
|
||||
if (cause instanceof DocumentMissingException) {
|
||||
if (version.onOrAfter(Version.V_6_2_0)) {
|
||||
// the document should always be there!
|
||||
listener.onFailure(e);
|
||||
} else {
|
||||
listener.onResponse(false);
|
||||
}
|
||||
} else if (cause instanceof VersionConflictEngineException
|
||||
|| isShardNotAvailableException(cause)) {
|
||||
attemptCount.incrementAndGet();
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
|
||||
client.prepareGet(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, tokenDocId).request(),
|
||||
ActionListener.<GetResponse>wrap(getResult -> {
|
||||
if (getResult.isExists()) {
|
||||
Map<String, Object> source = getResult.getSource();
|
||||
Map<String, Object> accessTokenSource =
|
||||
(Map<String, Object>) source.get("access_token");
|
||||
if (accessTokenSource == null) {
|
||||
listener.onFailure(new IllegalArgumentException("token document is " +
|
||||
"missing access_token field"));
|
||||
} else {
|
||||
Boolean invalidated = (Boolean) accessTokenSource.get("invalidated");
|
||||
if (invalidated == null) {
|
||||
listener.onFailure(new IllegalStateException(
|
||||
"token document missing invalidated value"));
|
||||
} else if (invalidated) {
|
||||
listener.onResponse(false);
|
||||
} else {
|
||||
indexInvalidation(tokenDocId, version, listener, attemptCount, srcPrefix,
|
||||
getResult.getVersion());
|
||||
}
|
||||
}
|
||||
} else if (version.onOrAfter(Version.V_6_2_0)) {
|
||||
logger.warn("could not find token document [{}] but there should " +
|
||||
"be one as token has version [{}]", tokenDocId, version);
|
||||
listener.onFailure(invalidGrantException("could not invalidate the token"));
|
||||
} else {
|
||||
listener.onResponse(false);
|
||||
}
|
||||
},
|
||||
e1 -> {
|
||||
if (isShardNotAvailableException(e1)) {
|
||||
// don't increment count; call again
|
||||
indexInvalidation(tokenDocId, version, listener, attemptCount, srcPrefix,
|
||||
documentVersion);
|
||||
} else {
|
||||
listener.onFailure(e1);
|
||||
}
|
||||
}), client::get);
|
||||
} else {
|
||||
securityIndex.prepareIndexIfNeededThenExecute(ex -> listener.onFailure(traceLog("prepare security index", tokenDocId, ex)),
|
||||
() -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
|
||||
ActionListener.<UpdateResponse>wrap(updateResponse -> {
|
||||
logger.debug("Invalidated [{}] for doc [{}]", srcPrefix, tokenDocId);
|
||||
if (updateResponse.getGetResult() != null
|
||||
&& updateResponse.getGetResult().sourceAsMap().containsKey(srcPrefix)
|
||||
&& ((Map<String, Object>) updateResponse.getGetResult().sourceAsMap().get(srcPrefix))
|
||||
.containsKey("invalidated")) {
|
||||
final boolean prevInvalidated = (boolean)
|
||||
((Map<String, Object>) updateResponse.getGetResult().sourceAsMap().get(srcPrefix))
|
||||
.get("invalidated");
|
||||
listener.onResponse(prevInvalidated == false);
|
||||
} else {
|
||||
listener.onResponse(true);
|
||||
}
|
||||
}, e -> {
|
||||
Throwable cause = ExceptionsHelper.unwrapCause(e);
|
||||
traceLog("invalidate token", tokenDocId, cause);
|
||||
if (cause instanceof DocumentMissingException) {
|
||||
if (version.onOrAfter(Version.V_6_2_0)) {
|
||||
// the document should always be there!
|
||||
listener.onFailure(e);
|
||||
} else {
|
||||
listener.onResponse(false);
|
||||
}
|
||||
}), client::update));
|
||||
} else if (cause instanceof VersionConflictEngineException
|
||||
|| isShardNotAvailableException(cause)) {
|
||||
attemptCount.incrementAndGet();
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
|
||||
client.prepareGet(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, tokenDocId).request(),
|
||||
ActionListener.<GetResponse>wrap(getResult -> {
|
||||
if (getResult.isExists()) {
|
||||
Map<String, Object> source = getResult.getSource();
|
||||
Map<String, Object> accessTokenSource = (Map<String, Object>) source.get("access_token");
|
||||
Consumer<Exception> onFailure = ex -> listener.onFailure(traceLog("get token", tokenDocId, ex));
|
||||
if (accessTokenSource == null) {
|
||||
onFailure.accept(new IllegalArgumentException(
|
||||
"token document is missing access_token field"));
|
||||
} else {
|
||||
Boolean invalidated = (Boolean) accessTokenSource.get("invalidated");
|
||||
if (invalidated == null) {
|
||||
onFailure.accept(new IllegalStateException(
|
||||
"token document missing invalidated value"));
|
||||
} else if (invalidated) {
|
||||
logger.trace("Token [{}] is already invalidated", tokenDocId);
|
||||
listener.onResponse(false);
|
||||
} else {
|
||||
indexInvalidation(tokenDocId, version, listener, attemptCount, srcPrefix,
|
||||
getResult.getVersion());
|
||||
}
|
||||
}
|
||||
} else if (version.onOrAfter(Version.V_6_2_0)) {
|
||||
logger.warn("could not find token document [{}] but there should " +
|
||||
"be one as token has version [{}]", tokenDocId, version);
|
||||
listener.onFailure(invalidGrantException("could not invalidate the token"));
|
||||
} else {
|
||||
listener.onResponse(false);
|
||||
}
|
||||
},
|
||||
e1 -> {
|
||||
traceLog("get token", tokenDocId, e1);
|
||||
if (isShardNotAvailableException(e1)) {
|
||||
// don't increment count; call again
|
||||
indexInvalidation(tokenDocId, version, listener, attemptCount, srcPrefix,
|
||||
documentVersion);
|
||||
} else {
|
||||
listener.onFailure(e1);
|
||||
}
|
||||
}), client::get);
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}), client::update));
|
||||
}
|
||||
}
|
||||
|
||||
@ -668,7 +685,8 @@ public final class TokenService extends AbstractComponent {
|
||||
|
||||
private void findTokenFromRefreshToken(String refreshToken, ActionListener<Tuple<SearchResponse, AtomicInteger>> listener,
|
||||
AtomicInteger attemptCount) {
|
||||
if (attemptCount.get() > 5) {
|
||||
if (attemptCount.get() > MAX_RETRY_ATTEMPTS) {
|
||||
logger.warn("Failed to find token for refresh token [{}] after [{}] attempts", refreshToken, attemptCount.get());
|
||||
listener.onFailure(invalidGrantException("could not refresh the requested token"));
|
||||
} else {
|
||||
SearchRequest request = client.prepareSearch(SecurityIndexManager.SECURITY_INDEX_NAME)
|
||||
@ -683,6 +701,7 @@ public final class TokenService extends AbstractComponent {
|
||||
attemptCount.incrementAndGet();
|
||||
findTokenFromRefreshToken(refreshToken, listener, attemptCount);
|
||||
} else {
|
||||
Consumer<Exception> onFailure = ex -> listener.onFailure(traceLog("find by refresh token", refreshToken, ex));
|
||||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () ->
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
|
||||
ActionListener.<SearchResponse>wrap(searchResponse -> {
|
||||
@ -691,9 +710,9 @@ public final class TokenService extends AbstractComponent {
|
||||
findTokenFromRefreshToken(refreshToken, listener, attemptCount);
|
||||
} else if (searchResponse.getHits().getHits().length < 1) {
|
||||
logger.info("could not find token document with refresh_token [{}]", refreshToken);
|
||||
listener.onFailure(invalidGrantException("could not refresh the requested token"));
|
||||
onFailure.accept(invalidGrantException("could not refresh the requested token"));
|
||||
} else if (searchResponse.getHits().getHits().length > 1) {
|
||||
listener.onFailure(new IllegalStateException("multiple tokens share the same refresh token"));
|
||||
onFailure.accept(new IllegalStateException("multiple tokens share the same refresh token"));
|
||||
} else {
|
||||
listener.onResponse(new Tuple<>(searchResponse, attemptCount));
|
||||
}
|
||||
@ -703,7 +722,7 @@ public final class TokenService extends AbstractComponent {
|
||||
attemptCount.incrementAndGet();
|
||||
findTokenFromRefreshToken(refreshToken, listener, attemptCount);
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
onFailure.accept(e);
|
||||
}
|
||||
}),
|
||||
client::search));
|
||||
@ -718,62 +737,64 @@ public final class TokenService extends AbstractComponent {
|
||||
*/
|
||||
private void innerRefresh(String tokenDocId, Authentication userAuth, ActionListener<Tuple<UserToken, String>> listener,
|
||||
AtomicInteger attemptCount) {
|
||||
if (attemptCount.getAndIncrement() > 5) {
|
||||
if (attemptCount.getAndIncrement() > MAX_RETRY_ATTEMPTS) {
|
||||
logger.warn("Failed to refresh token for doc [{}] after [{}] attempts", tokenDocId, attemptCount.get());
|
||||
listener.onFailure(invalidGrantException("could not refresh the requested token"));
|
||||
} else {
|
||||
Consumer<Exception> onFailure = ex -> listener.onFailure(traceLog("refresh token", tokenDocId, ex));
|
||||
GetRequest getRequest = client.prepareGet(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, tokenDocId).request();
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest,
|
||||
ActionListener.<GetResponse>wrap(response -> {
|
||||
if (response.isExists()) {
|
||||
final Map<String, Object> source = response.getSource();
|
||||
final Optional<ElasticsearchSecurityException> invalidSource = checkTokenDocForRefresh(source, userAuth);
|
||||
ActionListener.<GetResponse>wrap(response -> {
|
||||
if (response.isExists()) {
|
||||
final Map<String, Object> source = response.getSource();
|
||||
final Optional<ElasticsearchSecurityException> invalidSource = checkTokenDocForRefresh(source, userAuth);
|
||||
|
||||
if (invalidSource.isPresent()) {
|
||||
listener.onFailure(invalidSource.get());
|
||||
} else {
|
||||
final Map<String, Object> userTokenSource = (Map<String, Object>)
|
||||
((Map<String, Object>) source.get("access_token")).get("user_token");
|
||||
final String authString = (String) userTokenSource.get("authentication");
|
||||
final Integer version = (Integer) userTokenSource.get("version");
|
||||
final Map<String, Object> metadata = (Map<String, Object>) userTokenSource.get("metadata");
|
||||
if (invalidSource.isPresent()) {
|
||||
onFailure.accept(invalidSource.get());
|
||||
} else {
|
||||
final Map<String, Object> userTokenSource = (Map<String, Object>)
|
||||
((Map<String, Object>) source.get("access_token")).get("user_token");
|
||||
final String authString = (String) userTokenSource.get("authentication");
|
||||
final Integer version = (Integer) userTokenSource.get("version");
|
||||
final Map<String, Object> metadata = (Map<String, Object>) userTokenSource.get("metadata");
|
||||
|
||||
Version authVersion = Version.fromId(version);
|
||||
try (StreamInput in = StreamInput.wrap(Base64.getDecoder().decode(authString))) {
|
||||
in.setVersion(authVersion);
|
||||
Authentication authentication = new Authentication(in);
|
||||
UpdateRequest updateRequest =
|
||||
client.prepareUpdate(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, tokenDocId)
|
||||
.setVersion(response.getVersion())
|
||||
.setDoc("refresh_token", Collections.singletonMap("refreshed", true))
|
||||
.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL)
|
||||
.request();
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, updateRequest,
|
||||
ActionListener.<UpdateResponse>wrap(
|
||||
updateResponse -> createUserToken(authentication, userAuth, listener, metadata, true),
|
||||
e -> {
|
||||
Throwable cause = ExceptionsHelper.unwrapCause(e);
|
||||
if (cause instanceof VersionConflictEngineException ||
|
||||
isShardNotAvailableException(e)) {
|
||||
innerRefresh(tokenDocId, userAuth,
|
||||
listener, attemptCount);
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}),
|
||||
client::update);
|
||||
}
|
||||
Version authVersion = Version.fromId(version);
|
||||
try (StreamInput in = StreamInput.wrap(Base64.getDecoder().decode(authString))) {
|
||||
in.setVersion(authVersion);
|
||||
Authentication authentication = new Authentication(in);
|
||||
UpdateRequest updateRequest =
|
||||
client.prepareUpdate(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, tokenDocId)
|
||||
.setVersion(response.getVersion())
|
||||
.setDoc("refresh_token", Collections.singletonMap("refreshed", true))
|
||||
.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL)
|
||||
.request();
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, updateRequest,
|
||||
ActionListener.<UpdateResponse>wrap(
|
||||
updateResponse -> createUserToken(authentication, userAuth, listener, metadata, true),
|
||||
e -> {
|
||||
Throwable cause = ExceptionsHelper.unwrapCause(e);
|
||||
if (cause instanceof VersionConflictEngineException ||
|
||||
isShardNotAvailableException(e)) {
|
||||
innerRefresh(tokenDocId, userAuth,
|
||||
listener, attemptCount);
|
||||
} else {
|
||||
onFailure.accept(e);
|
||||
}
|
||||
}),
|
||||
client::update);
|
||||
}
|
||||
} else {
|
||||
logger.info("could not find token document [{}] for refresh", tokenDocId);
|
||||
listener.onFailure(invalidGrantException("could not refresh the requested token"));
|
||||
}
|
||||
}, e -> {
|
||||
if (isShardNotAvailableException(e)) {
|
||||
innerRefresh(tokenDocId, userAuth, listener, attemptCount);
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}), client::get);
|
||||
} else {
|
||||
logger.info("could not find token document [{}] for refresh", tokenDocId);
|
||||
onFailure.accept(invalidGrantException("could not refresh the requested token"));
|
||||
}
|
||||
}, e -> {
|
||||
if (isShardNotAvailableException(e)) {
|
||||
innerRefresh(tokenDocId, userAuth, listener, attemptCount);
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}), client::get);
|
||||
}
|
||||
}
|
||||
|
||||
@ -945,6 +966,7 @@ public final class TokenService extends AbstractComponent {
|
||||
.add(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, getInvalidatedTokenDocumentId(userToken))
|
||||
.add(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, getTokenDocumentId(userToken))
|
||||
.request();
|
||||
Consumer<Exception> onFailure = ex -> listener.onFailure(traceLog("check token state", userToken.getId(), ex));
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
|
||||
mGetRequest,
|
||||
new ActionListener<MultiGetResponse>() {
|
||||
@ -955,26 +977,26 @@ public final class TokenService extends AbstractComponent {
|
||||
if (itemResponse[0].isFailed()) {
|
||||
onFailure(itemResponse[0].getFailure().getFailure());
|
||||
} else if (itemResponse[0].getResponse().isExists()) {
|
||||
listener.onFailure(expiredTokenException());
|
||||
onFailure.accept(expiredTokenException());
|
||||
} else if (itemResponse[1].isFailed()) {
|
||||
onFailure(itemResponse[1].getFailure().getFailure());
|
||||
} else if (itemResponse[1].getResponse().isExists()) {
|
||||
Map<String, Object> source = itemResponse[1].getResponse().getSource();
|
||||
Map<String, Object> accessTokenSource = (Map<String, Object>) source.get("access_token");
|
||||
if (accessTokenSource == null) {
|
||||
listener.onFailure(new IllegalStateException("token document is missing access_token field"));
|
||||
onFailure.accept(new IllegalStateException("token document is missing access_token field"));
|
||||
} else {
|
||||
Boolean invalidated = (Boolean) accessTokenSource.get("invalidated");
|
||||
if (invalidated == null) {
|
||||
listener.onFailure(new IllegalStateException("token document is missing invalidated field"));
|
||||
onFailure.accept(new IllegalStateException("token document is missing invalidated field"));
|
||||
} else if (invalidated) {
|
||||
listener.onFailure(expiredTokenException());
|
||||
onFailure.accept(expiredTokenException());
|
||||
} else {
|
||||
listener.onResponse(userToken);
|
||||
}
|
||||
}
|
||||
} else if (userToken.getVersion().onOrAfter(Version.V_6_2_0)) {
|
||||
listener.onFailure(new IllegalStateException("token document is missing and must be present"));
|
||||
onFailure.accept(new IllegalStateException("token document is missing and must be present"));
|
||||
} else {
|
||||
listener.onResponse(userToken);
|
||||
}
|
||||
@ -1136,11 +1158,31 @@ public final class TokenService extends AbstractComponent {
|
||||
*/
|
||||
private static ElasticsearchSecurityException invalidGrantException(String detail) {
|
||||
ElasticsearchSecurityException e =
|
||||
new ElasticsearchSecurityException("invalid_grant", RestStatus.BAD_REQUEST);
|
||||
new ElasticsearchSecurityException("invalid_grant", RestStatus.BAD_REQUEST);
|
||||
e.addHeader("error_description", detail);
|
||||
return e;
|
||||
}
|
||||
|
||||
/**
|
||||
* Logs an exception at TRACE level (if enabled)
|
||||
*/
|
||||
private <E extends Throwable> E traceLog(String action, String identifier, E exception) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
if (exception instanceof ElasticsearchException) {
|
||||
final ElasticsearchException esEx = (ElasticsearchException) exception;
|
||||
final Object detail = esEx.getHeader("error_description");
|
||||
if (detail != null) {
|
||||
logger.trace("Failure in [{}] for id [{}] - [{}] [{}]", action, identifier, detail, esEx.getDetailedMessage());
|
||||
} else {
|
||||
logger.trace("Failure in [{}] for id [{}] - [{}]", action, identifier, esEx.getDetailedMessage());
|
||||
}
|
||||
} else {
|
||||
logger.trace("Failure in [{}] for id [{}] - [{}]", action, identifier, exception.toString());
|
||||
}
|
||||
}
|
||||
return exception;
|
||||
}
|
||||
|
||||
boolean isExpiredTokenException(ElasticsearchSecurityException e) {
|
||||
final List<String> headers = e.getHeader("WWW-Authenticate");
|
||||
return headers != null && headers.stream().anyMatch(EXPIRED_TOKEN_WWW_AUTH_VALUE::equals);
|
||||
|
@ -104,6 +104,7 @@ public final class RestGetTokenAction extends SecurityBaseRestHandler {
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.debug("Failed to create token", e);
|
||||
if (e instanceof ActionRequestValidationException) {
|
||||
ActionRequestValidationException validationException = (ActionRequestValidationException) e;
|
||||
final TokenRequestError error;
|
||||
|
Loading…
x
Reference in New Issue
Block a user