mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
Fix Token Service retry mechanism (#39639)
Fixes several errors of the token retry logic: * not checking for backoff.hasNext() before calling backoff.next() * checking for backoff.hasNext() without calling backoff.next() * not preserving the context on the retry * calling scheduleWithFixedDelay instead of schedule
This commit is contained in:
parent
6c503824c8
commit
fb1005fffc
@ -201,7 +201,6 @@ public final class TokenService {
|
|||||||
SecurityIndexManager securityIndex, ClusterService clusterService) throws GeneralSecurityException {
|
SecurityIndexManager securityIndex, ClusterService clusterService) throws GeneralSecurityException {
|
||||||
byte[] saltArr = new byte[SALT_BYTES];
|
byte[] saltArr = new byte[SALT_BYTES];
|
||||||
secureRandom.nextBytes(saltArr);
|
secureRandom.nextBytes(saltArr);
|
||||||
|
|
||||||
final SecureString tokenPassphrase = generateTokenKey();
|
final SecureString tokenPassphrase = generateTokenKey();
|
||||||
this.settings = settings;
|
this.settings = settings;
|
||||||
this.clock = clock.withZone(ZoneOffset.UTC);
|
this.clock = clock.withZone(ZoneOffset.UTC);
|
||||||
@ -683,19 +682,19 @@ public final class TokenService {
|
|||||||
if (retryTokenDocIds.isEmpty() == false) {
|
if (retryTokenDocIds.isEmpty() == false) {
|
||||||
if (backoff.hasNext()) {
|
if (backoff.hasNext()) {
|
||||||
logger.debug("failed to invalidate [{}] tokens out of [{}], retrying to invalidate these too",
|
logger.debug("failed to invalidate [{}] tokens out of [{}], retrying to invalidate these too",
|
||||||
retryTokenDocIds.size(), tokenIds.size());
|
retryTokenDocIds.size(), tokenIds.size());
|
||||||
TokensInvalidationResult incompleteResult = new TokensInvalidationResult(invalidated, previouslyInvalidated,
|
final TokensInvalidationResult incompleteResult = new TokensInvalidationResult(invalidated,
|
||||||
failedRequestResponses);
|
previouslyInvalidated, failedRequestResponses);
|
||||||
client.threadPool().schedule(
|
final Runnable retryWithContextRunnable = client.threadPool().getThreadContext().preserveContext(
|
||||||
() -> indexInvalidation(retryTokenDocIds, listener, backoff, srcPrefix, incompleteResult),
|
() -> indexInvalidation(retryTokenDocIds, listener, backoff, srcPrefix, incompleteResult));
|
||||||
backoff.next(), GENERIC);
|
client.threadPool().schedule(retryWithContextRunnable, backoff.next(), GENERIC);
|
||||||
} else {
|
} else {
|
||||||
logger.warn("failed to invalidate [{}] tokens out of [{}] after all retries",
|
logger.warn("failed to invalidate [{}] tokens out of [{}] after all retries", retryTokenDocIds.size(),
|
||||||
retryTokenDocIds.size(), tokenIds.size());
|
tokenIds.size());
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
TokensInvalidationResult result = new TokensInvalidationResult(invalidated, previouslyInvalidated,
|
final TokensInvalidationResult result = new TokensInvalidationResult(invalidated, previouslyInvalidated,
|
||||||
failedRequestResponses);
|
failedRequestResponses);
|
||||||
listener.onResponse(result);
|
listener.onResponse(result);
|
||||||
}
|
}
|
||||||
}, e -> {
|
}, e -> {
|
||||||
@ -703,8 +702,9 @@ public final class TokenService {
|
|||||||
traceLog("invalidate tokens", cause);
|
traceLog("invalidate tokens", cause);
|
||||||
if (isShardNotAvailableException(cause) && backoff.hasNext()) {
|
if (isShardNotAvailableException(cause) && backoff.hasNext()) {
|
||||||
logger.debug("failed to invalidate tokens, retrying ");
|
logger.debug("failed to invalidate tokens, retrying ");
|
||||||
client.threadPool().schedule(
|
final Runnable retryWithContextRunnable = client.threadPool().getThreadContext()
|
||||||
() -> indexInvalidation(tokenIds, listener, backoff, srcPrefix, previousResult), backoff.next(), GENERIC);
|
.preserveContext(() -> indexInvalidation(tokenIds, listener, backoff, srcPrefix, previousResult));
|
||||||
|
client.threadPool().schedule(retryWithContextRunnable, backoff.next(), GENERIC);
|
||||||
} else {
|
} else {
|
||||||
listener.onFailure(e);
|
listener.onFailure(e);
|
||||||
}
|
}
|
||||||
@ -736,34 +736,39 @@ public final class TokenService {
|
|||||||
*/
|
*/
|
||||||
private void findTokenFromRefreshToken(String refreshToken, ActionListener<SearchResponse> listener,
|
private void findTokenFromRefreshToken(String refreshToken, ActionListener<SearchResponse> listener,
|
||||||
Iterator<TimeValue> backoff) {
|
Iterator<TimeValue> backoff) {
|
||||||
SearchRequest request = client.prepareSearch(SECURITY_INDEX_NAME)
|
final Consumer<Exception> onFailure = ex -> listener.onFailure(traceLog("find token by refresh token", refreshToken, ex));
|
||||||
.setQuery(QueryBuilders.boolQuery()
|
final Consumer<Exception> maybeRetryOnFailure = ex -> {
|
||||||
.filter(QueryBuilders.termQuery("doc_type", TOKEN_DOC_TYPE))
|
if (backoff.hasNext()) {
|
||||||
.filter(QueryBuilders.termQuery("refresh_token.token", refreshToken)))
|
final TimeValue backofTimeValue = backoff.next();
|
||||||
.seqNoAndPrimaryTerm(true)
|
logger.debug("retrying after [" + backofTimeValue + "] back off");
|
||||||
.request();
|
final Runnable retryWithContextRunnable = client.threadPool().getThreadContext()
|
||||||
|
.preserveContext(() -> findTokenFromRefreshToken(refreshToken, listener, backoff));
|
||||||
|
client.threadPool().schedule(retryWithContextRunnable, backofTimeValue, GENERIC);
|
||||||
|
} else {
|
||||||
|
logger.warn("failed to find token from refresh token after all retries");
|
||||||
|
onFailure.accept(ex);
|
||||||
|
}
|
||||||
|
};
|
||||||
final SecurityIndexManager frozenSecurityIndex = securityIndex.freeze();
|
final SecurityIndexManager frozenSecurityIndex = securityIndex.freeze();
|
||||||
if (frozenSecurityIndex.indexExists() == false) {
|
if (frozenSecurityIndex.indexExists() == false) {
|
||||||
logger.warn("security index does not exist therefore refresh token [{}] cannot be validated", refreshToken);
|
logger.warn("security index does not exist therefore refresh token [{}] cannot be validated", refreshToken);
|
||||||
listener.onFailure(invalidGrantException("could not refresh the requested token"));
|
listener.onFailure(invalidGrantException("could not refresh the requested token"));
|
||||||
} else if (frozenSecurityIndex.isAvailable() == false) {
|
} else if (frozenSecurityIndex.isAvailable() == false) {
|
||||||
logger.debug("security index is not available to find token from refresh token, retrying");
|
logger.debug("security index is not available to find token from refresh token, retrying");
|
||||||
client.threadPool().scheduleWithFixedDelay(
|
maybeRetryOnFailure.accept(invalidGrantException("could not refresh the requested token"));
|
||||||
() -> findTokenFromRefreshToken(refreshToken, listener, backoff), backoff.next(), GENERIC);
|
|
||||||
} else {
|
} else {
|
||||||
Consumer<Exception> onFailure = ex -> listener.onFailure(traceLog("find by refresh token", refreshToken, ex));
|
final SearchRequest request = client.prepareSearch(SecurityIndexManager.SECURITY_INDEX_NAME)
|
||||||
|
.setQuery(QueryBuilders.boolQuery()
|
||||||
|
.filter(QueryBuilders.termQuery("doc_type", TOKEN_DOC_TYPE))
|
||||||
|
.filter(QueryBuilders.termQuery("refresh_token.token", refreshToken)))
|
||||||
|
.seqNoAndPrimaryTerm(true)
|
||||||
|
.request();
|
||||||
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () ->
|
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () ->
|
||||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
|
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
|
||||||
ActionListener.<SearchResponse>wrap(searchResponse -> {
|
ActionListener.<SearchResponse>wrap(searchResponse -> {
|
||||||
if (searchResponse.isTimedOut()) {
|
if (searchResponse.isTimedOut()) {
|
||||||
if (backoff.hasNext()) {
|
logger.debug("find token from refresh token response timed out, retrying");
|
||||||
client.threadPool().scheduleWithFixedDelay(
|
maybeRetryOnFailure.accept(invalidGrantException("could not refresh the requested token"));
|
||||||
() -> findTokenFromRefreshToken(refreshToken, listener, backoff), backoff.next(), GENERIC);
|
|
||||||
} else {
|
|
||||||
logger.warn("could not find token document with refresh_token [{}] after all retries", refreshToken);
|
|
||||||
onFailure.accept(invalidGrantException("could not refresh the requested token"));
|
|
||||||
}
|
|
||||||
} else if (searchResponse.getHits().getHits().length < 1) {
|
} else if (searchResponse.getHits().getHits().length < 1) {
|
||||||
logger.warn("could not find token document with refresh_token [{}]", refreshToken);
|
logger.warn("could not find token document with refresh_token [{}]", refreshToken);
|
||||||
onFailure.accept(invalidGrantException("could not refresh the requested token"));
|
onFailure.accept(invalidGrantException("could not refresh the requested token"));
|
||||||
@ -774,14 +779,8 @@ public final class TokenService {
|
|||||||
}
|
}
|
||||||
}, e -> {
|
}, e -> {
|
||||||
if (isShardNotAvailableException(e)) {
|
if (isShardNotAvailableException(e)) {
|
||||||
if (backoff.hasNext()) {
|
logger.debug("find token from refresh token request failed because of unavailable shards, retrying");
|
||||||
logger.debug("failed to find token for refresh token [{}], retrying", refreshToken);
|
maybeRetryOnFailure.accept(invalidGrantException("could not refresh the requested token"));
|
||||||
client.threadPool().scheduleWithFixedDelay(
|
|
||||||
() -> findTokenFromRefreshToken(refreshToken, listener, backoff), backoff.next(), GENERIC);
|
|
||||||
} else {
|
|
||||||
logger.warn("could not find token document with refresh_token [{}] after all retries", refreshToken);
|
|
||||||
onFailure.accept(invalidGrantException("could not refresh the requested token"));
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
onFailure.accept(e);
|
onFailure.accept(e);
|
||||||
}
|
}
|
||||||
@ -806,7 +805,7 @@ public final class TokenService {
|
|||||||
private void innerRefresh(String tokenDocId, Map<String, Object> source, long seqNo, long primaryTerm, Authentication clientAuth,
|
private void innerRefresh(String tokenDocId, Map<String, Object> source, long seqNo, long primaryTerm, Authentication clientAuth,
|
||||||
ActionListener<Tuple<UserToken, String>> listener, Iterator<TimeValue> backoff, Instant refreshRequested) {
|
ActionListener<Tuple<UserToken, String>> listener, Iterator<TimeValue> backoff, Instant refreshRequested) {
|
||||||
logger.debug("Attempting to refresh token [{}]", tokenDocId);
|
logger.debug("Attempting to refresh token [{}]", tokenDocId);
|
||||||
Consumer<Exception> onFailure = ex -> listener.onFailure(traceLog("refresh token", tokenDocId, ex));
|
final Consumer<Exception> onFailure = ex -> listener.onFailure(traceLog("refresh token", tokenDocId, ex));
|
||||||
final Optional<ElasticsearchSecurityException> invalidSource = checkTokenDocForRefresh(source, clientAuth);
|
final Optional<ElasticsearchSecurityException> invalidSource = checkTokenDocForRefresh(source, clientAuth);
|
||||||
if (invalidSource.isPresent()) {
|
if (invalidSource.isPresent()) {
|
||||||
onFailure.accept(invalidSource.get());
|
onFailure.accept(invalidSource.get());
|
||||||
@ -817,6 +816,19 @@ public final class TokenService {
|
|||||||
logger.debug("Token document [{}] was recently refreshed, attempting to reuse [{}] for returning an " +
|
logger.debug("Token document [{}] was recently refreshed, attempting to reuse [{}] for returning an " +
|
||||||
"access token and refresh token", tokenDocId, supersedingTokenDocId);
|
"access token and refresh token", tokenDocId, supersedingTokenDocId);
|
||||||
final ActionListener<GetResponse> getSupersedingListener = new ActionListener<GetResponse>() {
|
final ActionListener<GetResponse> getSupersedingListener = new ActionListener<GetResponse>() {
|
||||||
|
private final Consumer<Exception> maybeRetryOnFailure = ex -> {
|
||||||
|
if (backoff.hasNext()) {
|
||||||
|
final TimeValue backofTimeValue = backoff.next();
|
||||||
|
logger.debug("retrying after [" + backofTimeValue + "] back off");
|
||||||
|
final Runnable retryWithContextRunnable = client.threadPool().getThreadContext()
|
||||||
|
.preserveContext(() -> getTokenDocAsync(supersedingTokenDocId, this));
|
||||||
|
client.threadPool().schedule(retryWithContextRunnable, backofTimeValue, GENERIC);
|
||||||
|
} else {
|
||||||
|
logger.warn("back off retries exhausted");
|
||||||
|
onFailure.accept(ex);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(GetResponse response) {
|
public void onResponse(GetResponse response) {
|
||||||
if (response.isExists()) {
|
if (response.isExists()) {
|
||||||
@ -828,30 +840,20 @@ public final class TokenService {
|
|||||||
(Map<String, Object>) supersedingTokenSource.get("refresh_token");
|
(Map<String, Object>) supersedingTokenSource.get("refresh_token");
|
||||||
final String supersedingRefreshTokenValue = (String) supersedingRefreshTokenSrc.get("token");
|
final String supersedingRefreshTokenValue = (String) supersedingRefreshTokenSrc.get("token");
|
||||||
reIssueTokens(supersedingUserTokenSource, supersedingRefreshTokenValue, listener);
|
reIssueTokens(supersedingUserTokenSource, supersedingRefreshTokenValue, listener);
|
||||||
} else if (backoff.hasNext()) {
|
} else {
|
||||||
// We retry this since the creation of the superseding token document might already be in flight but not
|
// We retry this since the creation of the superseding token document might already be in flight but not
|
||||||
// yet completed, triggered by a refresh request that came a few milliseconds ago
|
// yet completed, triggered by a refresh request that came a few milliseconds ago
|
||||||
logger.info("could not find superseding token document [{}] for token document [{}], retrying",
|
logger.info("could not find superseding token document [{}] for token document [{}], retrying",
|
||||||
supersedingTokenDocId, tokenDocId);
|
supersedingTokenDocId, tokenDocId);
|
||||||
client.threadPool().schedule(() -> getTokenDocAsync(supersedingTokenDocId, this), backoff.next(), GENERIC);
|
maybeRetryOnFailure.accept(invalidGrantException("could not refresh the requested token"));
|
||||||
} else {
|
|
||||||
logger.warn("could not find superseding token document [{}] for token document [{}] after all retries",
|
|
||||||
supersedingTokenDocId, tokenDocId);
|
|
||||||
onFailure.accept(invalidGrantException("could not refresh the requested token"));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Exception e) {
|
public void onFailure(Exception e) {
|
||||||
if (isShardNotAvailableException(e)) {
|
if (isShardNotAvailableException(e)) {
|
||||||
if (backoff.hasNext()) {
|
logger.info("could not find superseding token document [{}] for refresh, retrying", supersedingTokenDocId);
|
||||||
logger.info("could not find superseding token document [{}] for refresh, retrying", supersedingTokenDocId);
|
maybeRetryOnFailure.accept(invalidGrantException("could not refresh the requested token"));
|
||||||
client.threadPool().schedule(
|
|
||||||
() -> getTokenDocAsync(supersedingTokenDocId, this), backoff.next(), GENERIC);
|
|
||||||
} else {
|
|
||||||
logger.warn("could not find token document [{}] for refresh after all retries", supersedingTokenDocId);
|
|
||||||
onFailure.accept(invalidGrantException("could not refresh the requested token"));
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
logger.warn("could not find superseding token document [{}] for refresh", supersedingTokenDocId);
|
logger.warn("could not find superseding token document [{}] for refresh", supersedingTokenDocId);
|
||||||
onFailure.accept(invalidGrantException("could not refresh the requested token"));
|
onFailure.accept(invalidGrantException("could not refresh the requested token"));
|
||||||
@ -899,10 +901,10 @@ public final class TokenService {
|
|||||||
} else if (backoff.hasNext()) {
|
} else if (backoff.hasNext()) {
|
||||||
logger.info("failed to update the original token document [{}], the update result was [{}]. Retrying",
|
logger.info("failed to update the original token document [{}], the update result was [{}]. Retrying",
|
||||||
tokenDocId, updateResponse.getResult());
|
tokenDocId, updateResponse.getResult());
|
||||||
client.threadPool().schedule(
|
final Runnable retryWithContextRunnable = client.threadPool().getThreadContext()
|
||||||
() -> innerRefresh(tokenDocId, source, seqNo, primaryTerm, clientAuth, listener, backoff,
|
.preserveContext(() -> innerRefresh(tokenDocId, source, seqNo, primaryTerm, clientAuth, listener,
|
||||||
refreshRequested),
|
backoff, refreshRequested));
|
||||||
backoff.next(), GENERIC);
|
client.threadPool().schedule(retryWithContextRunnable, backoff.next(), GENERIC);
|
||||||
} else {
|
} else {
|
||||||
logger.info("failed to update the original token document [{}] after all retries, " +
|
logger.info("failed to update the original token document [{}] after all retries, " +
|
||||||
"the update result was [{}]. ", tokenDocId, updateResponse.getResult());
|
"the update result was [{}]. ", tokenDocId, updateResponse.getResult());
|
||||||
@ -912,51 +914,44 @@ public final class TokenService {
|
|||||||
Throwable cause = ExceptionsHelper.unwrapCause(e);
|
Throwable cause = ExceptionsHelper.unwrapCause(e);
|
||||||
if (cause instanceof VersionConflictEngineException) {
|
if (cause instanceof VersionConflictEngineException) {
|
||||||
//The document has been updated by another thread, get it again.
|
//The document has been updated by another thread, get it again.
|
||||||
if (backoff.hasNext()) {
|
logger.debug("version conflict while updating document [{}], attempting to get it again", tokenDocId);
|
||||||
logger.debug("version conflict while updating document [{}], attempting to get it again",
|
final ActionListener<GetResponse> getListener = new ActionListener<GetResponse>() {
|
||||||
tokenDocId);
|
@Override
|
||||||
final ActionListener<GetResponse> getListener = new ActionListener<GetResponse>() {
|
public void onResponse(GetResponse response) {
|
||||||
@Override
|
if (response.isExists()) {
|
||||||
public void onResponse(GetResponse response) {
|
innerRefresh(tokenDocId, response.getSource(), response.getSeqNo(), response.getPrimaryTerm(),
|
||||||
if (response.isExists()) {
|
clientAuth, listener, backoff, refreshRequested);
|
||||||
innerRefresh(tokenDocId, response.getSource(), response.getSeqNo(),
|
} else {
|
||||||
response.getPrimaryTerm(), clientAuth, listener, backoff, refreshRequested);
|
logger.warn("could not find token document [{}] for refresh", tokenDocId);
|
||||||
|
onFailure.accept(invalidGrantException("could not refresh the requested token"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
if (isShardNotAvailableException(e)) {
|
||||||
|
if (backoff.hasNext()) {
|
||||||
|
logger.info("could not get token document [{}] for refresh, retrying", tokenDocId);
|
||||||
|
final Runnable retryWithContextRunnable = client.threadPool().getThreadContext()
|
||||||
|
.preserveContext(() -> getTokenDocAsync(tokenDocId, this));
|
||||||
|
client.threadPool().schedule(retryWithContextRunnable, backoff.next(), GENERIC);
|
||||||
} else {
|
} else {
|
||||||
logger.warn("could not find token document [{}] for refresh", tokenDocId);
|
logger.warn("could not get token document [{}] for refresh after all retries", tokenDocId);
|
||||||
onFailure.accept(invalidGrantException("could not refresh the requested token"));
|
onFailure.accept(invalidGrantException("could not refresh the requested token"));
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
onFailure.accept(e);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
@Override
|
};
|
||||||
public void onFailure(Exception e) {
|
getTokenDocAsync(tokenDocId, getListener);
|
||||||
if (isShardNotAvailableException(e)) {
|
|
||||||
if (backoff.hasNext()) {
|
|
||||||
logger.info("could not get token document [{}] for refresh, " +
|
|
||||||
"retrying", tokenDocId);
|
|
||||||
client.threadPool().schedule(
|
|
||||||
() -> getTokenDocAsync(tokenDocId, this), backoff.next(), GENERIC);
|
|
||||||
} else {
|
|
||||||
logger.warn("could not get token document [{}] for refresh after all retries",
|
|
||||||
tokenDocId);
|
|
||||||
onFailure.accept(invalidGrantException("could not refresh the requested token"));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
onFailure.accept(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
getTokenDocAsync(tokenDocId, getListener);
|
|
||||||
} else {
|
|
||||||
logger.warn("version conflict while updating document [{}], no retries left", tokenDocId);
|
|
||||||
onFailure.accept(invalidGrantException("could not refresh the requested token"));
|
|
||||||
}
|
|
||||||
} else if (isShardNotAvailableException(e)) {
|
} else if (isShardNotAvailableException(e)) {
|
||||||
if (backoff.hasNext()) {
|
if (backoff.hasNext()) {
|
||||||
logger.debug("failed to update the original token document [{}], retrying", tokenDocId);
|
logger.debug("failed to update the original token document [{}], retrying", tokenDocId);
|
||||||
client.threadPool().schedule(
|
final Runnable retryWithContextRunnable = client.threadPool().getThreadContext().preserveContext(
|
||||||
() -> innerRefresh(tokenDocId, source, seqNo, primaryTerm, clientAuth, listener, backoff,
|
() -> innerRefresh(tokenDocId, source, seqNo, primaryTerm, clientAuth, listener, backoff,
|
||||||
refreshRequested),
|
refreshRequested));
|
||||||
backoff.next(), GENERIC);
|
client.threadPool().schedule(retryWithContextRunnable, backoff.next(), GENERIC);
|
||||||
} else {
|
} else {
|
||||||
logger.warn("failed to update the original token document [{}], after all retries", tokenDocId);
|
logger.warn("failed to update the original token document [{}], after all retries", tokenDocId);
|
||||||
onFailure.accept(invalidGrantException("could not refresh the requested token"));
|
onFailure.accept(invalidGrantException("could not refresh the requested token"));
|
||||||
|
Loading…
x
Reference in New Issue
Block a user