Move TokenService to seqno powered cas (#38311)

Relates #37872 
Relates #10708
This commit is contained in:
Boaz Leskes 2019-02-04 15:25:41 +01:00 committed by GitHub
parent ece8c659c5
commit e49b593c81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 5 deletions

View File

@ -225,6 +225,15 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
return this; return this;
} }
/**
* Should each {@link org.elasticsearch.search.SearchHit} be returned with the
* sequence number and primary term of the last modification of the document.
*/
public SearchRequestBuilder seqNoAndPrimaryTerm(boolean seqNoAndPrimaryTerm) {
sourceBuilder().seqNoAndPrimaryTerm(seqNoAndPrimaryTerm);
return this;
}
/** /**
* Sets the boost a specific index will receive when the query is executed against it. * Sets the boost a specific index will receive when the query is executed against it.
* *

View File

@ -30,6 +30,7 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
@ -744,13 +745,17 @@ public final class TokenService {
try (StreamInput in = StreamInput.wrap(Base64.getDecoder().decode(authString))) { try (StreamInput in = StreamInput.wrap(Base64.getDecoder().decode(authString))) {
in.setVersion(authVersion); in.setVersion(authVersion);
Authentication authentication = new Authentication(in); Authentication authentication = new Authentication(in);
UpdateRequest updateRequest = UpdateRequestBuilder updateRequest =
client.prepareUpdate(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, tokenDocId) client.prepareUpdate(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, tokenDocId)
.setVersion(response.getVersion())
.setDoc("refresh_token", Collections.singletonMap("refreshed", true)) .setDoc("refresh_token", Collections.singletonMap("refreshed", true))
.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL) .setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
.request(); if (clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0)) {
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, updateRequest, updateRequest.setIfSeqNo(response.getSeqNo());
updateRequest.setIfPrimaryTerm(response.getPrimaryTerm());
} else {
updateRequest.setVersion(response.getVersion());
}
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, updateRequest.request(),
ActionListener.<UpdateResponse>wrap( ActionListener.<UpdateResponse>wrap(
updateResponse -> createUserToken(authentication, userAuth, listener, metadata, true), updateResponse -> createUserToken(authentication, userAuth, listener, metadata, true),
e -> { e -> {