Our handling for concurrent refresh of access tokens suffered from a race condition where: 1. Thread A has just finished with updating the existing token document, but hasn't stored the new tokens in a new document yet 2. Thread B attempts to refresh the same token and since the original token document is marked as refreshed, it decrypts and gets the new access token and refresh token and returns that to the caller of the API. 3. The caller attempts to use the newly refreshed access token immediately and gets an authentication error since thread A still hasn't finished writing the document. This commit changes the behavior so that Thread B, would first try to do a Get request for the token document where it expects that the access token it decrypted is stored(with exponential backoff ) and will not respond until it can verify that it reads it in the tokens index. That ensures that we only ever return tokens in a response if they are already valid and can be used immediately It also adjusts TokenAuthIntegTests to test authenticating with the tokens each thread receives, which would fail without the fix. Resolves: #54289
This commit is contained in:
parent
0753d9a35c
commit
38b55f06ba
|
@ -1011,10 +1011,11 @@ public final class TokenService {
|
|||
return;
|
||||
}
|
||||
final RefreshTokenStatus refreshTokenStatus = checkRefreshResult.v1();
|
||||
final SecurityIndexManager refreshedTokenIndex = getTokensIndexForVersion(refreshTokenStatus.getVersion());
|
||||
if (refreshTokenStatus.isRefreshed()) {
|
||||
logger.debug("Token document [{}] was recently refreshed, when a new token document was generated. Reusing that result.",
|
||||
tokenDocId);
|
||||
decryptAndReturnSupersedingTokens(refreshToken, refreshTokenStatus, listener);
|
||||
decryptAndReturnSupersedingTokens(refreshToken, refreshTokenStatus, refreshedTokenIndex, listener);
|
||||
} else {
|
||||
final String newAccessTokenString = UUIDs.randomBase64UUID();
|
||||
final String newRefreshTokenString = UUIDs.randomBase64UUID();
|
||||
|
@ -1038,7 +1039,6 @@ public final class TokenService {
|
|||
}
|
||||
assert seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO : "expected an assigned sequence number";
|
||||
assert primaryTerm != SequenceNumbers.UNASSIGNED_PRIMARY_TERM : "expected an assigned primary term";
|
||||
final SecurityIndexManager refreshedTokenIndex = getTokensIndexForVersion(refreshTokenStatus.getVersion());
|
||||
final UpdateRequestBuilder updateRequest = client
|
||||
.prepareUpdate(refreshedTokenIndex.aliasName(), SINGLE_MAPPING_NAME, tokenDocId)
|
||||
.setDoc("refresh_token", updateMap)
|
||||
|
@ -1073,7 +1073,7 @@ public final class TokenService {
|
|||
if (cause instanceof VersionConflictEngineException) {
|
||||
// The document has been updated by another thread, get it again.
|
||||
logger.debug("version conflict while updating document [{}], attempting to get it again", tokenDocId);
|
||||
getTokenDocAsync(tokenDocId, refreshedTokenIndex, new ActionListener<GetResponse>() {
|
||||
getTokenDocAsync(tokenDocId, refreshedTokenIndex, true, new ActionListener<GetResponse>() {
|
||||
@Override
|
||||
public void onResponse(GetResponse response) {
|
||||
if (response.isExists()) {
|
||||
|
@ -1090,7 +1090,8 @@ public final class TokenService {
|
|||
if (isShardNotAvailableException(e)) {
|
||||
if (backoff.hasNext()) {
|
||||
logger.info("could not get token document [{}] for refresh, retrying", tokenDocId);
|
||||
client.threadPool().schedule(() -> getTokenDocAsync(tokenDocId, refreshedTokenIndex, this),
|
||||
client.threadPool().schedule(
|
||||
() -> getTokenDocAsync(tokenDocId, refreshedTokenIndex, true, this),
|
||||
backoff.next(), GENERIC);
|
||||
} else {
|
||||
logger.warn("could not get token document [{}] for refresh after all retries", tokenDocId);
|
||||
|
@ -1118,17 +1119,20 @@ public final class TokenService {
|
|||
}
|
||||
|
||||
/**
|
||||
* Decrypts the values of the superseding access token and the refresh token, using a key derived from the superseded refresh token. It
|
||||
* Decrypts the values of the superseding access token and the refresh token, using a key derived from the superseded refresh token.
|
||||
* It verifies that the token document for the access token it decrypted exists first, before calling the listener. It
|
||||
* encodes the version and serializes the tokens before calling the listener, in the same manner as {@link #createOAuth2Tokens } does.
|
||||
*
|
||||
* @param refreshToken The refresh token that the user sent in the request, used to derive the decryption key
|
||||
* @param refreshToken The refresh token that the user sent in the request, used to derive the decryption key
|
||||
* @param refreshTokenStatus The {@link RefreshTokenStatus} containing information about the superseding tokens as retrieved from the
|
||||
* index
|
||||
* @param listener The listener to call upon completion with a {@link Tuple} containing the
|
||||
* serialized access token and serialized refresh token as these will be returned to the client
|
||||
* index
|
||||
* @param tokensIndex the manager for the index where the tokens are stored
|
||||
* @param listener The listener to call upon completion with a {@link Tuple} containing the
|
||||
* serialized access token and serialized refresh token as these will be returned to the client
|
||||
*/
|
||||
void decryptAndReturnSupersedingTokens(String refreshToken, RefreshTokenStatus refreshTokenStatus,
|
||||
void decryptAndReturnSupersedingTokens(String refreshToken, RefreshTokenStatus refreshTokenStatus, SecurityIndexManager tokensIndex,
|
||||
ActionListener<Tuple<String, String>> listener) {
|
||||
|
||||
final byte[] iv = Base64.getDecoder().decode(refreshTokenStatus.getIv());
|
||||
final byte[] salt = Base64.getDecoder().decode(refreshTokenStatus.getSalt());
|
||||
final byte[] encryptedSupersedingTokens = Base64.getDecoder().decode(refreshTokenStatus.getSupersedingTokens());
|
||||
|
@ -1140,10 +1144,51 @@ public final class TokenService {
|
|||
logger.warn("Decrypted tokens string is not correctly formatted");
|
||||
listener.onFailure(invalidGrantException("could not refresh the requested token"));
|
||||
} else {
|
||||
listener.onResponse(new Tuple<>(prependVersionAndEncodeAccessToken(refreshTokenStatus.getVersion(), decryptedTokens[0]),
|
||||
prependVersionAndEncodeRefreshToken(refreshTokenStatus.getVersion(), decryptedTokens[1])));
|
||||
// We expect this to protect against race conditions that manifest within few ms
|
||||
final Iterator<TimeValue> backoff = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(10), 8).iterator();
|
||||
final String tokenDocId = getTokenDocumentId(hashTokenString(decryptedTokens[0]));
|
||||
final Consumer<Exception> onFailure = ex ->
|
||||
listener.onFailure(traceLog("decrypt and get superseding token", tokenDocId, ex));
|
||||
final Consumer<ActionListener<GetResponse>> maybeRetryGet = actionListener -> {
|
||||
if (backoff.hasNext()) {
|
||||
logger.info("could not get token document [{}] that should have been created, retrying", tokenDocId);
|
||||
client.threadPool().schedule(
|
||||
() -> getTokenDocAsync(tokenDocId, tokensIndex, false, actionListener),
|
||||
backoff.next(), GENERIC);
|
||||
} else {
|
||||
logger.warn("could not get token document [{}] that should have been created after all retries",
|
||||
tokenDocId);
|
||||
onFailure.accept(invalidGrantException("could not refresh the requested token"));
|
||||
}
|
||||
};
|
||||
getTokenDocAsync(tokenDocId, tokensIndex, false, new ActionListener<GetResponse>() {
|
||||
@Override
|
||||
public void onResponse(GetResponse response) {
|
||||
if (response.isExists()) {
|
||||
try {
|
||||
listener.onResponse(
|
||||
new Tuple<>(prependVersionAndEncodeAccessToken(refreshTokenStatus.getVersion(), decryptedTokens[0]),
|
||||
prependVersionAndEncodeRefreshToken(refreshTokenStatus.getVersion(), decryptedTokens[1])));
|
||||
} catch (GeneralSecurityException | IOException e) {
|
||||
logger.warn("Could not format stored superseding token values", e);
|
||||
onFailure.accept(invalidGrantException("could not refresh the requested token"));
|
||||
}
|
||||
} else {
|
||||
maybeRetryGet.accept(this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
if (isShardNotAvailableException(e)) {
|
||||
maybeRetryGet.accept(this);
|
||||
} else {
|
||||
onFailure.accept(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
} catch (GeneralSecurityException | IOException e) {
|
||||
} catch (GeneralSecurityException e) {
|
||||
logger.warn("Could not get stored superseding token values", e);
|
||||
listener.onFailure(invalidGrantException("could not refresh the requested token"));
|
||||
}
|
||||
|
@ -1161,11 +1206,14 @@ public final class TokenService {
|
|||
return Base64.getEncoder().encodeToString(cipher.doFinal(supersedingTokens.getBytes(StandardCharsets.UTF_8)));
|
||||
}
|
||||
|
||||
private void getTokenDocAsync(String tokenDocId, SecurityIndexManager tokensIndex, ActionListener<GetResponse> listener) {
|
||||
final GetRequest getRequest = client.prepareGet(tokensIndex.aliasName(), SINGLE_MAPPING_NAME, tokenDocId).request();
|
||||
private void getTokenDocAsync(String tokenDocId, SecurityIndexManager tokensIndex,
|
||||
boolean fetchSource, ActionListener<GetResponse> listener) {
|
||||
final GetRequest getRequest = client.prepareGet(tokensIndex.aliasName(), SINGLE_MAPPING_NAME, tokenDocId)
|
||||
.setFetchSource(fetchSource)
|
||||
.request();
|
||||
tokensIndex.checkIndexVersionThenExecute(
|
||||
ex -> listener.onFailure(traceLog("prepare tokens index [" + tokensIndex.aliasName() + "]", tokenDocId, ex)),
|
||||
() -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest, listener, client::get));
|
||||
ex -> listener.onFailure(traceLog("prepare tokens index [" + tokensIndex.aliasName() + "]", tokenDocId, ex)),
|
||||
() -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest, listener, client::get));
|
||||
}
|
||||
|
||||
Version getTokenVersionCompatibility() {
|
||||
|
|
|
@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchSecurityException;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
|
@ -55,24 +56,24 @@ import java.util.concurrent.CountDownLatch;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
|
||||
import static org.elasticsearch.test.SecuritySettingsSource.SECURITY_REQUEST_OPTIONS;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
|
||||
public class TokenAuthIntegTests extends SecurityIntegTestCase {
|
||||
|
||||
@Override
|
||||
public Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
// crank up the deletion interval and set timeout for delete requests
|
||||
.put(TokenService.DELETE_INTERVAL.getKey(), TimeValue.timeValueMillis(200L))
|
||||
.put(TokenService.DELETE_TIMEOUT.getKey(), TimeValue.timeValueSeconds(5L))
|
||||
.put(XPackSettings.TOKEN_SERVICE_ENABLED_SETTING.getKey(), true)
|
||||
.build();
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
// crank up the deletion interval and set timeout for delete requests
|
||||
.put(TokenService.DELETE_INTERVAL.getKey(), TimeValue.timeValueMillis(200L))
|
||||
.put(TokenService.DELETE_TIMEOUT.getKey(), TimeValue.timeValueSeconds(5L))
|
||||
.put(XPackSettings.TOKEN_SERVICE_ENABLED_SETTING.getKey(), true)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -90,10 +91,10 @@ public class TokenAuthIntegTests extends SecurityIntegTestCase {
|
|||
final Client client = client();
|
||||
SecurityClient securityClient = new SecurityClient(client);
|
||||
CreateTokenResponse response = securityClient.prepareCreateToken()
|
||||
.setGrantType("password")
|
||||
.setUsername(SecuritySettingsSource.TEST_USER_NAME)
|
||||
.setPassword(new SecureString(SecuritySettingsSourceField.TEST_PASSWORD.toCharArray()))
|
||||
.get();
|
||||
.setGrantType("password")
|
||||
.setUsername(SecuritySettingsSource.TEST_USER_NAME)
|
||||
.setPassword(new SecureString(SecuritySettingsSourceField.TEST_PASSWORD.toCharArray()))
|
||||
.get();
|
||||
for (TokenService tokenService : internalCluster().getInstances(TokenService.class)) {
|
||||
PlainActionFuture<UserToken> userTokenFuture = new PlainActionFuture<>();
|
||||
tokenService.decodeToken(response.getTokenString(), userTokenFuture);
|
||||
|
@ -118,10 +119,10 @@ public class TokenAuthIntegTests extends SecurityIntegTestCase {
|
|||
final Client client = client();
|
||||
SecurityClient securityClient = new SecurityClient(client);
|
||||
CreateTokenResponse response = securityClient.prepareCreateToken()
|
||||
.setGrantType("password")
|
||||
.setUsername(SecuritySettingsSource.TEST_USER_NAME)
|
||||
.setPassword(new SecureString(SecuritySettingsSourceField.TEST_PASSWORD.toCharArray()))
|
||||
.get();
|
||||
.setGrantType("password")
|
||||
.setUsername(SecuritySettingsSource.TEST_USER_NAME)
|
||||
.setPassword(new SecureString(SecuritySettingsSourceField.TEST_PASSWORD.toCharArray()))
|
||||
.get();
|
||||
String masterName = internalCluster().getMasterName();
|
||||
TokenService masterTokenService = internalCluster().getInstance(TokenService.class, masterName);
|
||||
String activeKeyHash = masterTokenService.getActiveKeyHash();
|
||||
|
@ -170,11 +171,11 @@ public class TokenAuthIntegTests extends SecurityIntegTestCase {
|
|||
AtomicReference<String> docId = new AtomicReference<>();
|
||||
assertBusy(() -> {
|
||||
SearchResponse searchResponse = client.prepareSearch(RestrictedIndicesNames.SECURITY_TOKENS_ALIAS)
|
||||
.setSource(SearchSourceBuilder.searchSource()
|
||||
.query(QueryBuilders.termQuery("doc_type", "token")))
|
||||
.setSize(1)
|
||||
.setTerminateAfter(1)
|
||||
.get();
|
||||
.setSource(SearchSourceBuilder.searchSource()
|
||||
.query(QueryBuilders.termQuery("doc_type", "token")))
|
||||
.setSize(1)
|
||||
.setTerminateAfter(1)
|
||||
.get();
|
||||
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
|
||||
docId.set(searchResponse.getHits().getAt(0).getId());
|
||||
});
|
||||
|
@ -191,13 +192,13 @@ public class TokenAuthIntegTests extends SecurityIntegTestCase {
|
|||
assertBusy(() -> {
|
||||
if (deleteTriggered.compareAndSet(false, true)) {
|
||||
// invalidate a invalid token... doesn't matter that it is bad... we just want this action to trigger the deletion
|
||||
InvalidateTokenResponse invalidateResponseTwo = securityClient.prepareInvalidateToken("fooobar")
|
||||
.setType(randomFrom(InvalidateTokenRequest.Type.values()))
|
||||
.execute()
|
||||
.actionGet();
|
||||
assertThat(invalidateResponseTwo.getResult().getInvalidatedTokens(), equalTo(0));
|
||||
assertThat(invalidateResponseTwo.getResult().getPreviouslyInvalidatedTokens(), equalTo(0));
|
||||
assertThat(invalidateResponseTwo.getResult().getErrors().size(), equalTo(0));
|
||||
InvalidateTokenResponse invalidateResponseTwo = securityClient.prepareInvalidateToken("fooobar")
|
||||
.setType(randomFrom(InvalidateTokenRequest.Type.values()))
|
||||
.execute()
|
||||
.actionGet();
|
||||
assertThat(invalidateResponseTwo.getResult().getInvalidatedTokens(), equalTo(0));
|
||||
assertThat(invalidateResponseTwo.getResult().getPreviouslyInvalidatedTokens(), equalTo(0));
|
||||
assertThat(invalidateResponseTwo.getResult().getErrors().size(), equalTo(0));
|
||||
}
|
||||
client.admin().indices().prepareRefresh(RestrictedIndicesNames.SECURITY_TOKENS_ALIAS).get();
|
||||
SearchResponse searchResponse = client.prepareSearch(RestrictedIndicesNames.SECURITY_TOKENS_ALIAS)
|
||||
|
@ -293,15 +294,15 @@ public class TokenAuthIntegTests extends SecurityIntegTestCase {
|
|||
|
||||
public void testExpireMultipleTimes() {
|
||||
CreateTokenResponse response = securityClient().prepareCreateToken()
|
||||
.setGrantType("password")
|
||||
.setUsername(SecuritySettingsSource.TEST_USER_NAME)
|
||||
.setPassword(new SecureString(SecuritySettingsSourceField.TEST_PASSWORD.toCharArray()))
|
||||
.get();
|
||||
.setGrantType("password")
|
||||
.setUsername(SecuritySettingsSource.TEST_USER_NAME)
|
||||
.setPassword(new SecureString(SecuritySettingsSourceField.TEST_PASSWORD.toCharArray()))
|
||||
.get();
|
||||
|
||||
InvalidateTokenResponse invalidateResponse = securityClient()
|
||||
.prepareInvalidateToken(response.getTokenString())
|
||||
.setType(InvalidateTokenRequest.Type.ACCESS_TOKEN)
|
||||
.get();
|
||||
.prepareInvalidateToken(response.getTokenString())
|
||||
.setType(InvalidateTokenRequest.Type.ACCESS_TOKEN)
|
||||
.get();
|
||||
assertThat(invalidateResponse.getResult().getInvalidatedTokens().size(), equalTo(1));
|
||||
assertThat(invalidateResponse.getResult().getPreviouslyInvalidatedTokens().size(), equalTo(0));
|
||||
assertThat(invalidateResponse.getResult().getErrors().size(), equalTo(0));
|
||||
|
@ -470,25 +471,25 @@ public class TokenAuthIntegTests extends SecurityIntegTestCase {
|
|||
|
||||
public void testRefreshingInvalidatedToken() {
|
||||
Client client = client().filterWithHeader(Collections.singletonMap("Authorization",
|
||||
UsernamePasswordToken.basicAuthHeaderValue(SecuritySettingsSource.TEST_USER_NAME,
|
||||
SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING)));
|
||||
UsernamePasswordToken.basicAuthHeaderValue(SecuritySettingsSource.TEST_USER_NAME,
|
||||
SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING)));
|
||||
SecurityClient securityClient = new SecurityClient(client);
|
||||
CreateTokenResponse createTokenResponse = securityClient.prepareCreateToken()
|
||||
.setGrantType("password")
|
||||
.setUsername(SecuritySettingsSource.TEST_USER_NAME)
|
||||
.setPassword(new SecureString(SecuritySettingsSourceField.TEST_PASSWORD.toCharArray()))
|
||||
.get();
|
||||
.setGrantType("password")
|
||||
.setUsername(SecuritySettingsSource.TEST_USER_NAME)
|
||||
.setPassword(new SecureString(SecuritySettingsSourceField.TEST_PASSWORD.toCharArray()))
|
||||
.get();
|
||||
assertNotNull(createTokenResponse.getRefreshToken());
|
||||
InvalidateTokenResponse invalidateResponse = securityClient
|
||||
.prepareInvalidateToken(createTokenResponse.getRefreshToken())
|
||||
.setType(InvalidateTokenRequest.Type.REFRESH_TOKEN)
|
||||
.get();
|
||||
.prepareInvalidateToken(createTokenResponse.getRefreshToken())
|
||||
.setType(InvalidateTokenRequest.Type.REFRESH_TOKEN)
|
||||
.get();
|
||||
assertThat(invalidateResponse.getResult().getInvalidatedTokens().size(), equalTo(1));
|
||||
assertThat(invalidateResponse.getResult().getPreviouslyInvalidatedTokens().size(), equalTo(0));
|
||||
assertThat(invalidateResponse.getResult().getErrors().size(), equalTo(0));
|
||||
|
||||
ElasticsearchSecurityException e = expectThrows(ElasticsearchSecurityException.class,
|
||||
() -> securityClient.prepareRefreshToken(createTokenResponse.getRefreshToken()).get());
|
||||
() -> securityClient.prepareRefreshToken(createTokenResponse.getRefreshToken()).get());
|
||||
assertEquals("invalid_grant", e.getMessage());
|
||||
assertEquals(RestStatus.BAD_REQUEST, e.status());
|
||||
assertEquals("token has been invalidated", e.getHeader("error_description").get(0));
|
||||
|
@ -496,14 +497,14 @@ public class TokenAuthIntegTests extends SecurityIntegTestCase {
|
|||
|
||||
public void testRefreshingMultipleTimesFails() throws Exception {
|
||||
Client client = client().filterWithHeader(Collections.singletonMap("Authorization",
|
||||
UsernamePasswordToken.basicAuthHeaderValue(SecuritySettingsSource.TEST_USER_NAME,
|
||||
SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING)));
|
||||
UsernamePasswordToken.basicAuthHeaderValue(SecuritySettingsSource.TEST_USER_NAME,
|
||||
SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING)));
|
||||
SecurityClient securityClient = new SecurityClient(client);
|
||||
CreateTokenResponse createTokenResponse = securityClient.prepareCreateToken()
|
||||
.setGrantType("password")
|
||||
.setUsername(SecuritySettingsSource.TEST_USER_NAME)
|
||||
.setPassword(new SecureString(SecuritySettingsSourceField.TEST_PASSWORD.toCharArray()))
|
||||
.get();
|
||||
.setGrantType("password")
|
||||
.setUsername(SecuritySettingsSource.TEST_USER_NAME)
|
||||
.setPassword(new SecureString(SecuritySettingsSourceField.TEST_PASSWORD.toCharArray()))
|
||||
.get();
|
||||
assertNotNull(createTokenResponse.getRefreshToken());
|
||||
CreateTokenResponse refreshResponse = securityClient.prepareRefreshToken(createTokenResponse.getRefreshToken()).get();
|
||||
assertNotNull(refreshResponse);
|
||||
|
@ -536,7 +537,7 @@ public class TokenAuthIntegTests extends SecurityIntegTestCase {
|
|||
assertTrue(
|
||||
Instant.ofEpochMilli((long) refreshTokenMap.get("refresh_time")).isBefore(Instant.now().minus(30L, ChronoUnit.SECONDS)));
|
||||
ElasticsearchSecurityException e = expectThrows(ElasticsearchSecurityException.class,
|
||||
() -> securityClient.prepareRefreshToken(createTokenResponse.getRefreshToken()).get());
|
||||
() -> securityClient.prepareRefreshToken(createTokenResponse.getRefreshToken()).get());
|
||||
assertEquals("invalid_grant", e.getMessage());
|
||||
assertEquals(RestStatus.BAD_REQUEST, e.status());
|
||||
assertEquals("token has already been refreshed more than 30 seconds in the past", e.getHeader("error_description").get(0));
|
||||
|
@ -549,6 +550,7 @@ public class TokenAuthIntegTests extends SecurityIntegTestCase {
|
|||
SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING)));
|
||||
SecurityClient securityClient = new SecurityClient(client);
|
||||
final List<String> tokens = Collections.synchronizedList(new ArrayList<>());
|
||||
final List<RestStatus> authStatuses = Collections.synchronizedList(new ArrayList<>());
|
||||
CreateTokenResponse createTokenResponse = securityClient.prepareCreateToken()
|
||||
.setGrantType("password")
|
||||
.setUsername(SecuritySettingsSource.TEST_USER_NAME)
|
||||
|
@ -586,6 +588,10 @@ public class TokenAuthIntegTests extends SecurityIntegTestCase {
|
|||
result.getTokenString(), result.getRefreshToken());
|
||||
} else {
|
||||
tokens.add(result.getTokenString() + result.getRefreshToken());
|
||||
client().filterWithHeader(Collections.singletonMap("Authorization", "Bearer " + result.getTokenString()))
|
||||
.admin().cluster().health(new ClusterHealthRequest(), ActionListener.wrap(
|
||||
r -> authStatuses.add(RestStatus.OK),
|
||||
e -> authStatuses.add(RestStatus.BAD_REQUEST)));
|
||||
}
|
||||
logger.info("received access token [{}] and refresh token [{}]", result.getTokenString(), result.getRefreshToken());
|
||||
completedLatch.countDown();
|
||||
|
@ -606,28 +612,32 @@ public class TokenAuthIntegTests extends SecurityIntegTestCase {
|
|||
}
|
||||
completedLatch.await();
|
||||
assertThat(failed.get(), equalTo(false));
|
||||
// Assert that we only ever got one access_token/refresh_token pair
|
||||
assertThat(tokens.stream().distinct().collect(Collectors.toList()).size(), equalTo(1));
|
||||
// Assert that we only ever got one token/refresh_token pair
|
||||
assertThat((int) tokens.stream().distinct().count(), equalTo(1));
|
||||
// Assert that all requests from all threads could authenticate at the time they received the access token
|
||||
// see: https://github.com/elastic/elasticsearch/issues/54289
|
||||
assertThat((int) authStatuses.stream().distinct().count(), equalTo(1));
|
||||
assertThat(authStatuses, hasItem(RestStatus.OK));
|
||||
}
|
||||
|
||||
public void testRefreshAsDifferentUser() {
|
||||
Client client = client().filterWithHeader(Collections.singletonMap("Authorization",
|
||||
UsernamePasswordToken.basicAuthHeaderValue(SecuritySettingsSource.TEST_USER_NAME,
|
||||
SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING)));
|
||||
UsernamePasswordToken.basicAuthHeaderValue(SecuritySettingsSource.TEST_USER_NAME,
|
||||
SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING)));
|
||||
SecurityClient securityClient = new SecurityClient(client);
|
||||
CreateTokenResponse createTokenResponse = securityClient.prepareCreateToken()
|
||||
.setGrantType("password")
|
||||
.setUsername(SecuritySettingsSource.TEST_USER_NAME)
|
||||
.setPassword(new SecureString(SecuritySettingsSourceField.TEST_PASSWORD.toCharArray()))
|
||||
.get();
|
||||
.setGrantType("password")
|
||||
.setUsername(SecuritySettingsSource.TEST_USER_NAME)
|
||||
.setPassword(new SecureString(SecuritySettingsSourceField.TEST_PASSWORD.toCharArray()))
|
||||
.get();
|
||||
assertNotNull(createTokenResponse.getRefreshToken());
|
||||
|
||||
ElasticsearchSecurityException e = expectThrows(ElasticsearchSecurityException.class,
|
||||
() -> new SecurityClient(client()
|
||||
.filterWithHeader(Collections.singletonMap("Authorization",
|
||||
UsernamePasswordToken.basicAuthHeaderValue(SecuritySettingsSource.TEST_SUPERUSER,
|
||||
SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING))))
|
||||
.prepareRefreshToken(createTokenResponse.getRefreshToken()).get());
|
||||
() -> new SecurityClient(client()
|
||||
.filterWithHeader(Collections.singletonMap("Authorization",
|
||||
UsernamePasswordToken.basicAuthHeaderValue(SecuritySettingsSource.TEST_SUPERUSER,
|
||||
SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING))))
|
||||
.prepareRefreshToken(createTokenResponse.getRefreshToken()).get());
|
||||
assertEquals("invalid_grant", e.getMessage());
|
||||
assertEquals(RestStatus.BAD_REQUEST, e.status());
|
||||
assertEquals("tokens must be refreshed by the creating client", e.getHeader("error_description").get(0));
|
||||
|
@ -635,14 +645,14 @@ public class TokenAuthIntegTests extends SecurityIntegTestCase {
|
|||
|
||||
public void testCreateThenRefreshAsDifferentUser() {
|
||||
Client client = client().filterWithHeader(Collections.singletonMap("Authorization",
|
||||
UsernamePasswordToken.basicAuthHeaderValue(SecuritySettingsSource.TEST_SUPERUSER,
|
||||
SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING)));
|
||||
UsernamePasswordToken.basicAuthHeaderValue(SecuritySettingsSource.TEST_SUPERUSER,
|
||||
SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING)));
|
||||
SecurityClient securityClient = new SecurityClient(client);
|
||||
CreateTokenResponse createTokenResponse = securityClient.prepareCreateToken()
|
||||
.setGrantType("password")
|
||||
.setUsername(SecuritySettingsSource.TEST_USER_NAME)
|
||||
.setPassword(new SecureString(SecuritySettingsSourceField.TEST_PASSWORD.toCharArray()))
|
||||
.get();
|
||||
.setGrantType("password")
|
||||
.setUsername(SecuritySettingsSource.TEST_USER_NAME)
|
||||
.setPassword(new SecureString(SecuritySettingsSourceField.TEST_PASSWORD.toCharArray()))
|
||||
.get();
|
||||
assertNotNull(createTokenResponse.getRefreshToken());
|
||||
|
||||
CreateTokenResponse refreshResponse = securityClient.prepareRefreshToken(createTokenResponse.getRefreshToken()).get();
|
||||
|
@ -660,7 +670,7 @@ public class TokenAuthIntegTests extends SecurityIntegTestCase {
|
|||
request = new AuthenticateRequest();
|
||||
request.username(SecuritySettingsSource.TEST_USER_NAME);
|
||||
client.filterWithHeader(Collections.singletonMap("Authorization", "Bearer " + createTokenResponse.getTokenString()))
|
||||
.execute(AuthenticateAction.INSTANCE, request, authFuture);
|
||||
.execute(AuthenticateAction.INSTANCE, request, authFuture);
|
||||
response = authFuture.actionGet();
|
||||
assertEquals(SecuritySettingsSource.TEST_USER_NAME, response.authentication().getUser().principal());
|
||||
|
||||
|
@ -668,7 +678,7 @@ public class TokenAuthIntegTests extends SecurityIntegTestCase {
|
|||
request = new AuthenticateRequest();
|
||||
request.username(SecuritySettingsSource.TEST_USER_NAME);
|
||||
client.filterWithHeader(Collections.singletonMap("Authorization", "Bearer " + refreshResponse.getTokenString()))
|
||||
.execute(AuthenticateAction.INSTANCE, request, authFuture);
|
||||
.execute(AuthenticateAction.INSTANCE, request, authFuture);
|
||||
response = authFuture.actionGet();
|
||||
assertEquals(SecuritySettingsSource.TEST_USER_NAME, response.authentication().getUser().principal());
|
||||
}
|
||||
|
@ -712,7 +722,7 @@ public class TokenAuthIntegTests extends SecurityIntegTestCase {
|
|||
final RestHighLevelClient restClient = new TestRestHighLevelClient();
|
||||
org.elasticsearch.client.security.CreateTokenResponse response = restClient.security().createToken(
|
||||
org.elasticsearch.client.security.CreateTokenRequest.passwordGrant(SecuritySettingsSource.TEST_USER_NAME,
|
||||
SecuritySettingsSourceField.TEST_PASSWORD.toCharArray()), SECURITY_REQUEST_OPTIONS);
|
||||
SecuritySettingsSourceField.TEST_PASSWORD.toCharArray()), SECURITY_REQUEST_OPTIONS);
|
||||
assertNotNull(response.getAccessToken());
|
||||
// First authenticate with token
|
||||
RequestOptions correctAuthOptions =
|
||||
|
@ -738,6 +748,7 @@ public class TokenAuthIntegTests extends SecurityIntegTestCase {
|
|||
() -> restClient.security().authenticate(wrongAuthOptionsAfter72));
|
||||
assertThat(e2.status(), equalTo(RestStatus.UNAUTHORIZED));
|
||||
}
|
||||
|
||||
@Before
|
||||
public void waitForSecurityIndexWritable() throws Exception {
|
||||
assertSecurityIndexActive();
|
||||
|
|
|
@ -131,7 +131,7 @@ public class TokenServiceTests extends ESTestCase {
|
|||
doAnswer(invocationOnMock -> {
|
||||
ActionListener<IndexResponse> responseActionListener = (ActionListener<IndexResponse>) invocationOnMock.getArguments()[2];
|
||||
responseActionListener.onResponse(new IndexResponse(new ShardId(".security", UUIDs.randomBase64UUID(), randomInt()), "_doc",
|
||||
randomAlphaOfLength(4), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), true));
|
||||
randomAlphaOfLength(4), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), true));
|
||||
return null;
|
||||
}).when(client).execute(eq(IndexAction.INSTANCE), any(IndexRequest.class), any(ActionListener.class));
|
||||
|
||||
|
@ -745,7 +745,8 @@ public class TokenServiceTests extends ESTestCase {
|
|||
authentication.getUser().principal(), authentication.getAuthenticatedBy().getName(), true, Instant.now().minusSeconds(5L),
|
||||
encryptedTokens, Base64.getEncoder().encodeToString(iv), Base64.getEncoder().encodeToString(salt));
|
||||
refreshTokenStatus.setVersion(version);
|
||||
tokenService.decryptAndReturnSupersedingTokens(refrehToken, refreshTokenStatus, tokenFuture);
|
||||
mockGetTokenAsyncForDecryptedToken(newAccessToken);
|
||||
tokenService.decryptAndReturnSupersedingTokens(refrehToken, refreshTokenStatus, securityTokensIndex, tokenFuture);
|
||||
if (version.onOrAfter(TokenService.VERSION_ACCESS_TOKENS_AS_UUIDS)) {
|
||||
// previous versions serialized the access token encrypted and the cipher text was different each time (due to different IVs)
|
||||
assertThat(tokenService.prependVersionAndEncodeAccessToken(version, newAccessToken), equalTo(tokenFuture.get().v1()));
|
||||
|
@ -853,6 +854,19 @@ public class TokenServiceTests extends ESTestCase {
|
|||
}).when(client).get(any(GetRequest.class), any(ActionListener.class));
|
||||
}
|
||||
|
||||
private void mockGetTokenAsyncForDecryptedToken(String accessToken) {
|
||||
doAnswer(invocationOnMock -> {
|
||||
GetRequest request = (GetRequest) invocationOnMock.getArguments()[0];
|
||||
ActionListener<GetResponse> listener = (ActionListener<GetResponse>) invocationOnMock.getArguments()[1];
|
||||
GetResponse response = mock(GetResponse.class);
|
||||
if (request.id().replace("token_", "").equals(TokenService.hashTokenString(accessToken))) {
|
||||
when(response.isExists()).thenReturn(true);
|
||||
}
|
||||
listener.onResponse(response);
|
||||
return Void.TYPE;
|
||||
}).when(client).get(any(GetRequest.class), any(ActionListener.class));
|
||||
}
|
||||
|
||||
public static void assertAuthentication(Authentication result, Authentication expected) {
|
||||
assertEquals(expected.getUser(), result.getUser());
|
||||
assertEquals(expected.getAuthenticatedBy(), result.getAuthenticatedBy());
|
||||
|
|
Loading…
Reference in New Issue