Move ML Optimistic Concurrency Control to Seq No (#38278)

This commit moves the usage of internal versioning for CAS operations to use sequence numbers and primary terms

Relates to #36148
Relates to #10708
This commit is contained in:
Boaz Leskes 2019-02-04 10:41:08 +01:00 committed by GitHub
parent 1d82a6d9f9
commit ff13a43144
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 88 additions and 44 deletions

View File

@ -514,7 +514,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
private void clearJobFinishedTime(String jobId, ActionListener<AcknowledgedResponse> listener) {
JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build();
jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap(
jobConfigProvider.updateJob(jobId, update, null, clusterService.state().nodes().getMinNodeVersion(), ActionListener.wrap(
job -> listener.onResponse(new AcknowledgedResponse(true)),
e -> {
logger.error("[" + jobId + "] Failed to clear finished_time", e);

View File

@ -87,7 +87,7 @@ public class TransportUpdateDatafeedAction extends TransportMasterNodeAction<Upd
CheckedConsumer<Boolean, Exception> updateConsumer = ok -> {
datafeedConfigProvider.updateDatefeedConfig(request.getUpdate().getId(), request.getUpdate(), headers,
jobConfigProvider::validateDatafeedJob,
jobConfigProvider::validateDatafeedJob, clusterService.state().nodes().getMinNodeVersion(),
ActionListener.wrap(
updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)),
listener::onFailure

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.get.GetRequest;
@ -17,6 +18,7 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
@ -52,14 +54,16 @@ public class TransportUpdateFilterAction extends HandledTransportAction<UpdateFi
private final Client client;
private final JobManager jobManager;
private final ClusterService clusterService;
@Inject
public TransportUpdateFilterAction(TransportService transportService, ActionFilters actionFilters, Client client,
JobManager jobManager) {
JobManager jobManager, ClusterService clusterService) {
super(UpdateFilterAction.NAME, transportService, actionFilters,
(Supplier<UpdateFilterAction.Request>) UpdateFilterAction.Request::new);
this.client = client;
this.jobManager = jobManager;
this.clusterService = clusterService;
}
@Override
@ -95,13 +99,20 @@ public class TransportUpdateFilterAction extends HandledTransportAction<UpdateFi
}
MlFilter updatedFilter = MlFilter.builder(filter.getId()).setDescription(description).setItems(items).build();
indexUpdatedFilter(updatedFilter, filterWithVersion.version, request, listener);
indexUpdatedFilter(
updatedFilter, filterWithVersion.version, filterWithVersion.seqNo, filterWithVersion.primaryTerm, request, listener);
}
private void indexUpdatedFilter(MlFilter filter, long version, UpdateFilterAction.Request request,
private void indexUpdatedFilter(MlFilter filter, final long version, final long seqNo, final long primaryTerm,
UpdateFilterAction.Request request,
ActionListener<PutFilterAction.Response> listener) {
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, filter.documentId());
indexRequest.version(version);
if (clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0)) {
indexRequest.setIfSeqNo(seqNo);
indexRequest.setIfPrimaryTerm(primaryTerm);
} else {
indexRequest.version(version);
}
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
@ -146,7 +157,7 @@ public class TransportUpdateFilterAction extends HandledTransportAction<UpdateFi
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
MlFilter filter = MlFilter.LENIENT_PARSER.apply(parser, null).build();
listener.onResponse(new FilterWithVersion(filter, getDocResponse.getVersion()));
listener.onResponse(new FilterWithVersion(filter, getDocResponse));
}
} else {
this.onFailure(new ResourceNotFoundException(Messages.getMessage(Messages.FILTER_NOT_FOUND, filterId)));
@ -167,10 +178,15 @@ public class TransportUpdateFilterAction extends HandledTransportAction<UpdateFi
private final MlFilter filter;
private final long version;
private final long seqNo;
private final long primaryTerm;
private FilterWithVersion(MlFilter filter, long version) {
private FilterWithVersion(MlFilter filter, GetResponse getDocResponse) {
this.filter = filter;
this.version = version;
this.version = getDocResponse.getVersion();
this.seqNo = getDocResponse.getSeqNo();
this.primaryTerm = getDocResponse.getPrimaryTerm();
}
}
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.datafeed.persistence;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
@ -19,6 +20,7 @@ import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
@ -262,10 +264,12 @@ public class DatafeedConfigProvider {
* @param headers Datafeed headers applied with the update
* @param validator BiConsumer that accepts the updated config and can perform
* extra validations. {@code validator} must call the passed listener
* @param minClusterNodeVersion minimum version of nodes in cluster
* @param updatedConfigListener Updated datafeed config listener
*/
public void updateDatefeedConfig(String datafeedId, DatafeedUpdate update, Map<String, String> headers,
BiConsumer<DatafeedConfig, ActionListener<Boolean>> validator,
Version minClusterNodeVersion,
ActionListener<DatafeedConfig> updatedConfigListener) {
GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(datafeedId));
@ -277,7 +281,9 @@ public class DatafeedConfigProvider {
updatedConfigListener.onFailure(ExceptionsHelper.missingDatafeedException(datafeedId));
return;
}
long version = getResponse.getVersion();
final long version = getResponse.getVersion();
final long seqNo = getResponse.getSeqNo();
final long primaryTerm = getResponse.getPrimaryTerm();
BytesReference source = getResponse.getSourceAsBytesRef();
DatafeedConfig.Builder configBuilder;
try {
@ -298,7 +304,7 @@ public class DatafeedConfigProvider {
ActionListener<Boolean> validatedListener = ActionListener.wrap(
ok -> {
indexUpdatedConfig(updatedConfig, version, ActionListener.wrap(
indexUpdatedConfig(updatedConfig, version, seqNo, primaryTerm, minClusterNodeVersion, ActionListener.wrap(
indexResponse -> {
assert indexResponse.getResult() == DocWriteResponse.Result.UPDATED;
updatedConfigListener.onResponse(updatedConfig);
@ -318,17 +324,23 @@ public class DatafeedConfigProvider {
});
}
private void indexUpdatedConfig(DatafeedConfig updatedConfig, long version, ActionListener<IndexResponse> listener) {
private void indexUpdatedConfig(DatafeedConfig updatedConfig, long version, long seqNo, long primaryTerm,
Version minClusterNodeVersion, ActionListener<IndexResponse> listener) {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
XContentBuilder updatedSource = updatedConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS));
IndexRequest indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(),
IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(updatedConfig.getId()))
.setSource(updatedSource)
.setVersion(version)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.request();
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, listener);
if (minClusterNodeVersion.onOrAfter(Version.V_6_7_0)) {
indexRequest.setIfSeqNo(seqNo);
indexRequest.setIfPrimaryTerm(primaryTerm);
} else {
indexRequest.setVersion(version);
}
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest.request(), listener);
} catch (IOException e) {
listener.onFailure(

View File

@ -333,7 +333,7 @@ public class JobManager {
Runnable doUpdate = () -> {
jobConfigProvider.updateJobWithValidation(request.getJobId(), request.getJobUpdate(), maxModelMemoryLimit,
this::validate, ActionListener.wrap(
this::validate, clusterService.state().nodes().getMinNodeVersion(), ActionListener.wrap(
updatedJob -> postJobUpdate(request, updatedJob, actionListener),
actionListener::onFailure
));
@ -603,8 +603,8 @@ public class JobManager {
.setModelSnapshotId(modelSnapshot.getSnapshotId())
.build();
jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, ActionListener.wrap(
job -> {
jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, clusterService.state().nodes().getMinNodeVersion(),
ActionListener.wrap(job -> {
auditor.info(request.getJobId(),
Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription()));
updateHandler.accept(Boolean.TRUE);

View File

@ -10,6 +10,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.join.ScoreMode;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
@ -21,6 +22,7 @@ import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
@ -225,9 +227,12 @@ public class JobConfigProvider {
* @param maxModelMemoryLimit The maximum model memory allowed. This can be {@code null}
* if the job's {@link org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits}
* are not changed.
* @param minClusterNodeVersion the minimum version of nodes in the cluster
* @param updatedJobListener Updated job listener
*/
public void updateJob(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit, ActionListener<Job> updatedJobListener) {
public void updateJob(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit,
Version minClusterNodeVersion,
ActionListener<Job> updatedJobListener) {
GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId));
@ -239,7 +244,9 @@ public class JobConfigProvider {
return;
}
long version = getResponse.getVersion();
final long version = getResponse.getVersion();
final long seqNo = getResponse.getSeqNo();
final long primaryTerm = getResponse.getPrimaryTerm();
BytesReference source = getResponse.getSourceAsBytesRef();
Job.Builder jobBuilder;
try {
@ -259,7 +266,7 @@ public class JobConfigProvider {
return;
}
indexUpdatedJob(updatedJob, version, updatedJobListener);
indexUpdatedJob(updatedJob, version, seqNo, primaryTerm, minClusterNodeVersion, updatedJobListener);
}
@Override
@ -280,17 +287,18 @@ public class JobConfigProvider {
}
/**
* Similar to {@link #updateJob(String, JobUpdate, ByteSizeValue, ActionListener)} but
* Similar to {@link #updateJob(String, JobUpdate, ByteSizeValue, Version, ActionListener)} but
* with an extra validation step which is called before the updated is applied.
*
* @param jobId The Id of the job to update
* @param update The job update
* @param maxModelMemoryLimit The maximum model memory allowed
* @param validator The job update validator
* @param minClusterNodeVersion the minimum version of a node ifn the cluster
* @param updatedJobListener Updated job listener
*/
public void updateJobWithValidation(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit,
UpdateValidator validator, ActionListener<Job> updatedJobListener) {
UpdateValidator validator, Version minClusterNodeVersion, ActionListener<Job> updatedJobListener) {
GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId));
@ -302,7 +310,9 @@ public class JobConfigProvider {
return;
}
long version = getResponse.getVersion();
final long version = getResponse.getVersion();
final long seqNo = getResponse.getSeqNo();
final long primaryTerm = getResponse.getPrimaryTerm();
BytesReference source = getResponse.getSourceAsBytesRef();
Job originalJob;
try {
@ -324,7 +334,7 @@ public class JobConfigProvider {
return;
}
indexUpdatedJob(updatedJob, version, updatedJobListener);
indexUpdatedJob(updatedJob, version, seqNo, primaryTerm, minClusterNodeVersion, updatedJobListener);
},
updatedJobListener::onFailure
));
@ -337,17 +347,22 @@ public class JobConfigProvider {
});
}
private void indexUpdatedJob(Job updatedJob, long version, ActionListener<Job> updatedJobListener) {
private void indexUpdatedJob(Job updatedJob, long version, long seqNo, long primaryTerm, Version minClusterNodeVersion,
ActionListener<Job> updatedJobListener) {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
XContentBuilder updatedSource = updatedJob.toXContent(builder, ToXContent.EMPTY_PARAMS);
IndexRequest indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(),
IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings.DOC_TYPE, Job.documentId(updatedJob.getId()))
.setSource(updatedSource)
.setVersion(version)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.request();
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
if (minClusterNodeVersion.onOrAfter(Version.V_6_7_0)) {
indexRequest.setIfSeqNo(seqNo);
indexRequest.setIfPrimaryTerm(primaryTerm);
} else {
indexRequest.setVersion(version);
}
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap(
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest.request(), ActionListener.wrap(
indexResponse -> {
assert indexResponse.getResult() == DocWriteResponse.Result.UPDATED;
updatedJobListener.onResponse(updatedJob);

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteResponse;
@ -86,7 +87,7 @@ public class DatafeedConfigProviderIT extends MlSingleNodeTestCase {
AtomicReference<DatafeedConfig> configHolder = new AtomicReference<>();
blockingCall(actionListener ->
datafeedConfigProvider.updateDatefeedConfig(datafeedId, update.build(), updateHeaders,
(updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), actionListener),
(updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), Version.CURRENT, actionListener),
configHolder, exceptionHolder);
assertNull(exceptionHolder.get());
assertThat(configHolder.get().getIndices(), equalTo(updateIndices));
@ -167,7 +168,7 @@ public class DatafeedConfigProviderIT extends MlSingleNodeTestCase {
AtomicReference<DatafeedConfig> configHolder = new AtomicReference<>();
blockingCall(actionListener ->
datafeedConfigProvider.updateDatefeedConfig(datafeedId, update.build(), Collections.emptyMap(),
(updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), actionListener),
(updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), Version.CURRENT, actionListener),
configHolder, exceptionHolder);
assertNull(configHolder.get());
assertNotNull(exceptionHolder.get());
@ -193,7 +194,7 @@ public class DatafeedConfigProviderIT extends MlSingleNodeTestCase {
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
blockingCall(actionListener ->
datafeedConfigProvider.updateDatefeedConfig(datafeedId, update.build(), Collections.emptyMap(),
validateErrorFunction, actionListener),
validateErrorFunction, Version.CURRENT, actionListener),
configHolder, exceptionHolder);
assertNull(configHolder.get());

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
@ -147,8 +148,8 @@ public class JobConfigProviderIT extends MlSingleNodeTestCase {
JobUpdate jobUpdate = new JobUpdate.Builder(jobId).setDescription("This job has been updated").build();
AtomicReference<Job> updateJobResponseHolder = new AtomicReference<>();
blockingCall(actionListener -> jobConfigProvider.updateJob(jobId, jobUpdate, new ByteSizeValue(32), actionListener),
updateJobResponseHolder, exceptionHolder);
blockingCall(actionListener -> jobConfigProvider.updateJob
(jobId, jobUpdate, new ByteSizeValue(32), Version.CURRENT, actionListener), updateJobResponseHolder, exceptionHolder);
assertNull(exceptionHolder.get());
assertEquals("This job has been updated", updateJobResponseHolder.get().getDescription());
@ -205,8 +206,8 @@ public class JobConfigProviderIT extends MlSingleNodeTestCase {
.build();
AtomicReference<Job> updateJobResponseHolder = new AtomicReference<>();
blockingCall(actionListener -> jobConfigProvider.updateJob(jobId, invalidUpdate, new ByteSizeValue(32), actionListener),
updateJobResponseHolder, exceptionHolder);
blockingCall(actionListener -> jobConfigProvider.updateJob(jobId, invalidUpdate, new ByteSizeValue(32), Version.CURRENT,
actionListener), updateJobResponseHolder, exceptionHolder);
assertNull(updateJobResponseHolder.get());
assertNotNull(exceptionHolder.get());
assertThat(exceptionHolder.get(), instanceOf(ElasticsearchStatusException.class));
@ -229,9 +230,8 @@ public class JobConfigProviderIT extends MlSingleNodeTestCase {
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
AtomicReference<Job> updateJobResponseHolder = new AtomicReference<>();
// update with the no-op validator
blockingCall(actionListener ->
jobConfigProvider.updateJobWithValidation(jobId, jobUpdate, new ByteSizeValue(32), validator, actionListener),
updateJobResponseHolder, exceptionHolder);
blockingCall(actionListener -> jobConfigProvider.updateJobWithValidation(
jobId, jobUpdate, new ByteSizeValue(32), validator, Version.CURRENT, actionListener), updateJobResponseHolder, exceptionHolder);
assertNull(exceptionHolder.get());
assertNotNull(updateJobResponseHolder.get());
@ -244,7 +244,7 @@ public class JobConfigProviderIT extends MlSingleNodeTestCase {
updateJobResponseHolder.set(null);
// Update with a validator that errors
blockingCall(actionListener -> jobConfigProvider.updateJobWithValidation(jobId, jobUpdate, new ByteSizeValue(32),
validatorWithAnError, actionListener),
validatorWithAnError, Version.CURRENT, actionListener),
updateJobResponseHolder, exceptionHolder);
assertNull(updateJobResponseHolder.get());