[ML] Change how update job api delegates config changes to autodetect process

Submit job updates to a concurrent queue when job update has been processed by ClusterService. Then from a background thread delegate the job updates to the node running the autodetect process. This maintains the same order as how the job config updates have occurred to the cluster state and thus preventing job config updates to the same job to arrive in the wrong order to the job's autodetect process. (the expectation is that in practise this will rarely happen)

The behaviour of the update api changes with this pr, because the api now returns when the update has been made to cluster state, whereas before it would return when the update was made to the autodetect process too. Updating the autodetect process happens in the background. I think that this change in behaviour is acceptable.
Use TP#scheduleWithFixedDelay(...) instead of TP#schedule(...) and
removed the custom rescheduling and cancelling.
Also changed LocalNodeMasterListener#executorName to SAME

Original commit: elastic/x-pack-elasticsearch@c24c0dd7d7
This commit is contained in:
Martijn van Groningen 2017-03-05 16:11:59 +01:00
parent f687f3ea6e
commit 96f4a72825
11 changed files with 180 additions and 87 deletions

@ -73,6 +73,7 @@ import org.elasticsearch.xpack.ml.action.ValidateJobConfigAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunner;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.UpdateJobProcessNotifier;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
@ -266,7 +267,8 @@ public class MachineLearning implements ActionPlugin {
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, internalClient);
Auditor auditor = new Auditor(internalClient, clusterService);
JobManager jobManager = new JobManager(settings, jobProvider, jobResultsPersister, clusterService, auditor, internalClient);
UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(settings, internalClient, clusterService, threadPool);
JobManager jobManager = new JobManager(settings, jobProvider, jobResultsPersister, clusterService, auditor, internalClient, notifier);
AutodetectProcessFactory autodetectProcessFactory;
NormalizerProcessFactory normalizerProcessFactory;
if (AUTODETECT_PROCESS.get(settings)) {

@ -12,7 +12,6 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
@ -24,7 +23,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
@ -33,15 +31,10 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobAction.Response, UpdateJobAction.RequestBuilder> {
public static final UpdateJobAction INSTANCE = new UpdateJobAction();
@ -64,7 +57,7 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
public static class Request extends AcknowledgedRequest<UpdateJobAction.Request> implements ToXContent {
public static UpdateJobAction.Request parseRequest(String jobId, XContentParser parser) {
JobUpdate update = JobUpdate.PARSER.apply(parser, null).build();
JobUpdate update = JobUpdate.PARSER.apply(parser, null).setJobId(jobId).build();
return new UpdateJobAction.Request(jobId, update);
}
@ -142,18 +135,15 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
public static class TransportAction extends TransportMasterNodeAction<UpdateJobAction.Request, PutJobAction.Response> {
private final ConcurrentMap<String, Semaphore> semaphoreByJob = ConcurrentCollections.newConcurrentMap();
private final JobManager jobManager;
private final Client client;
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
JobManager jobManager, Client client) {
JobManager jobManager) {
super(settings, UpdateJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, UpdateJobAction.Request::new);
this.jobManager = jobManager;
this.client = client;
}
@Override
@ -173,57 +163,7 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
throw new IllegalArgumentException("Job Id " + Job.ALL + " cannot be for update");
}
PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
boolean jobIsOpen = MlMetadata.getJobState(request.getJobId(), tasks) == JobState.OPENED;
semaphoreByJob.computeIfAbsent(request.getJobId(), id -> new Semaphore(1)).acquire();
ActionListener<PutJobAction.Response> wrappedListener;
if (jobIsOpen && request.getJobUpdate().isAutodetectProcessUpdate()) {
wrappedListener = ActionListener.wrap(
response -> updateProcess(request, response, listener),
e -> {
releaseJobSemaphore(request.getJobId());
listener.onFailure(e);
});
} else {
wrappedListener = ActionListener.wrap(
response -> {
releaseJobSemaphore(request.getJobId());
listener.onResponse(response);
},
e -> {
releaseJobSemaphore(request.getJobId());
listener.onFailure(e);
});
}
jobManager.updateJob(request.getJobId(), request.getJobUpdate(), request, wrappedListener);
}
private void updateProcess(Request request, PutJobAction.Response updateConfigResponse,
ActionListener<PutJobAction.Response> listener) {
UpdateProcessAction.Request updateProcessRequest = new UpdateProcessAction.Request(request.getJobId(),
request.getJobUpdate().getModelPlotConfig(), request.getJobUpdate().getDetectorUpdates());
client.execute(UpdateProcessAction.INSTANCE, updateProcessRequest, new ActionListener<UpdateProcessAction.Response>() {
@Override
public void onResponse(UpdateProcessAction.Response response) {
releaseJobSemaphore(request.getJobId());
listener.onResponse(updateConfigResponse);
}
@Override
public void onFailure(Exception e) {
releaseJobSemaphore(request.getJobId());
listener.onFailure(e);
}
});
}
private void releaseJobSemaphore(String jobId) {
semaphoreByJob.remove(jobId).release();
jobManager.updateJob(request.getJobId(), request.getJobUpdate(), request, listener);
}
@Override

@ -79,6 +79,10 @@ public class UpdateProcessAction extends
out.writeBoolean(isUpdated);
}
public boolean isUpdated() {
return isUpdated;
}
@Override
public RestStatus status() {
return RestStatus.ACCEPTED;

@ -59,18 +59,21 @@ public class JobManager extends AbstractComponent {
private final JobResultsPersister jobResultsPersister;
private final Auditor auditor;
private final Client client;
private final UpdateJobProcessNotifier updateJobProcessNotifier;
/**
* Create a JobManager
*/
public JobManager(Settings settings, JobProvider jobProvider, JobResultsPersister jobResultsPersister,
ClusterService clusterService, Auditor auditor, Client client) {
ClusterService clusterService, Auditor auditor, Client client,
UpdateJobProcessNotifier updateJobProcessNotifier) {
super(settings);
this.jobProvider = Objects.requireNonNull(jobProvider);
this.clusterService = Objects.requireNonNull(clusterService);
this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister);
this.auditor = Objects.requireNonNull(auditor);
this.client = Objects.requireNonNull(client);
this.updateJobProcessNotifier = updateJobProcessNotifier;
}
/**
@ -207,13 +210,15 @@ public class JobManager extends AbstractComponent {
if (jobUpdate.getModelSnapshotId() != null) {
jobProvider.getModelSnapshot(job.getId(), jobUpdate.getModelSnapshotId(), newModelSnapshot -> {
if (newModelSnapshot == null) {
throw new ResourceNotFoundException(
Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, jobUpdate.getModelSnapshotId(), job.getId()));
String message = Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, jobUpdate.getModelSnapshotId(),
job.getId());
errorHandler.accept(new ResourceNotFoundException(message));
}
jobProvider.getModelSnapshot(job.getId(), job.getModelSnapshotId(), oldModelSnapshot -> {
if (oldModelSnapshot != null && newModelSnapshot.getTimestamp().before(oldModelSnapshot.getTimestamp())) {
throw new IllegalArgumentException("Job [" + job.getId() + "] has a more recent model snapshot ["
+ oldModelSnapshot.getSnapshotId() + "]");
String message = "Job [" + job.getId() + "] has a more recent model snapshot [" +
oldModelSnapshot.getSnapshotId() + "]";
errorHandler.accept(new IllegalArgumentException(message));
}
handler.accept(true);
}, errorHandler);
@ -227,7 +232,7 @@ public class JobManager extends AbstractComponent {
ActionListener<PutJobAction.Response> actionListener) {
clusterService.submitStateUpdateTask("update-job-" + jobId,
new AckedClusterStateUpdateTask<PutJobAction.Response>(request, actionListener) {
private Job updatedJob;
private volatile Job updatedJob;
@Override
protected PutJobAction.Response newResponse(boolean acknowledged) {
@ -240,6 +245,11 @@ public class JobManager extends AbstractComponent {
updatedJob = jobUpdate.mergeWithJob(job);
return updateClusterState(updatedJob, true, currentState);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
updateJobProcessNotifier.submitJobUpdate(jobUpdate);
}
});
}

@ -0,0 +1,116 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.action.UpdateProcessAction;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import java.util.concurrent.LinkedBlockingQueue;
import static org.elasticsearch.xpack.ml.action.UpdateProcessAction.Request;
import static org.elasticsearch.xpack.ml.action.UpdateProcessAction.Response;
public class UpdateJobProcessNotifier extends AbstractComponent
implements LocalNodeMasterListener {
private final Client client;
private final ThreadPool threadPool;
private final LinkedBlockingQueue<JobUpdate> orderedJobUpdates =
new LinkedBlockingQueue<>(1000);
private volatile ThreadPool.Cancellable cancellable;
public UpdateJobProcessNotifier(Settings settings, Client client,
ClusterService clusterService, ThreadPool threadPool) {
super(settings);
this.client = client;
this.threadPool = threadPool;
clusterService.addLocalNodeMasterListener(this);
clusterService.addLifecycleListener(new LifecycleListener() {
@Override
public void beforeStop() {
stop();
}
});
}
boolean submitJobUpdate(JobUpdate jobUpdate) {
return orderedJobUpdates.offer(jobUpdate);
}
@Override
public void onMaster() {
start();
}
@Override
public void offMaster() {
stop();
}
void start() {
cancellable = threadPool.scheduleWithFixedDelay(this::processNextUpdate,
TimeValue.timeValueSeconds(1), ThreadPool.Names.GENERIC);
}
void stop() {
orderedJobUpdates.clear();
ThreadPool.Cancellable cancellable = this.cancellable;
if (cancellable != null) {
cancellable.cancel();
}
}
@Override
public String executorName() {
// SAME is ok here, because both start() and stop() are inexpensive:
return ThreadPool.Names.SAME;
}
void processNextUpdate() {
try {
JobUpdate jobUpdate = orderedJobUpdates.poll();
if (jobUpdate != null) {
executeRemoteJob(jobUpdate);
}
} catch (Exception e) {
logger.error("Unable while processing next job update", e);
}
}
void executeRemoteJob(JobUpdate update) {
Request request = new Request(update.getJobId(), update.getModelPlotConfig(),
update.getDetectorUpdates());
client.execute(UpdateProcessAction.INSTANCE, request,
new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
if (response.isUpdated()) {
logger.info("Successfully updated remote job [{}]", update.getJobId());
} else {
logger.error("Failed to update remote job [{}]", update.getJobId());
}
}
@Override
public void onFailure(Exception e) {
logger.error("Failed to update remote job [" + update.getJobId() + "]",
e);
}
});
}
}

@ -24,9 +24,11 @@ import java.util.Objects;
public class JobUpdate implements Writeable, ToXContent {
public static final ParseField DETECTORS = new ParseField("detectors");
public static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("job_udpate", Builder::new);
public static final ConstructingObjectParser<Builder, Void> PARSER = new ConstructingObjectParser<>(
"job_update", args -> new Builder((String) args[0]));
static {
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), Job.ID);
PARSER.declareStringOrNull(Builder::setDescription, Job.DESCRIPTION);
PARSER.declareObjectArray(Builder::setDetectorUpdates, DetectorUpdate.PARSER, DETECTORS);
PARSER.declareObject(Builder::setModelPlotConfig, ModelPlotConfig.PARSER, Job.MODEL_PLOT_CONFIG);
@ -41,6 +43,7 @@ public class JobUpdate implements Writeable, ToXContent {
PARSER.declareString(Builder::setModelSnapshotId, Job.MODEL_SNAPSHOT_ID);
}
private final String jobId;
private final String description;
private final List<DetectorUpdate> detectorUpdates;
private final ModelPlotConfig modelPlotConfig;
@ -53,12 +56,13 @@ public class JobUpdate implements Writeable, ToXContent {
private final Map<String, Object> customSettings;
private final String modelSnapshotId;
private JobUpdate(@Nullable String description, @Nullable List<DetectorUpdate> detectorUpdates,
private JobUpdate(String jobId, @Nullable String description, @Nullable List<DetectorUpdate> detectorUpdates,
@Nullable ModelPlotConfig modelPlotConfig, @Nullable AnalysisLimits analysisLimits,
@Nullable TimeValue backgroundPersistInterval, @Nullable Long renormalizationWindowDays,
@Nullable Long resultsRetentionDays, @Nullable Long modelSnapshotRetentionDays,
@Nullable List<String> categorisationFilters, @Nullable Map<String, Object> customSettings,
@Nullable String modelSnapshotId) {
this.jobId = jobId;
this.description = description;
this.detectorUpdates = detectorUpdates;
this.modelPlotConfig = modelPlotConfig;
@ -73,6 +77,7 @@ public class JobUpdate implements Writeable, ToXContent {
}
public JobUpdate(StreamInput in) throws IOException {
jobId = in.readString();
description = in.readOptionalString();
if (in.readBoolean()) {
detectorUpdates = in.readList(DetectorUpdate::new);
@ -95,6 +100,7 @@ public class JobUpdate implements Writeable, ToXContent {
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId);
out.writeOptionalString(description);
out.writeBoolean(detectorUpdates != null);
if (detectorUpdates != null) {
@ -114,6 +120,10 @@ public class JobUpdate implements Writeable, ToXContent {
out.writeOptionalString(modelSnapshotId);
}
public String getJobId() {
return jobId;
}
public String getDescription() {
return description;
}
@ -165,6 +175,7 @@ public class JobUpdate implements Writeable, ToXContent {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
if (description != null) {
builder.field(Job.DESCRIPTION.getPreferredName(), description);
}
@ -279,7 +290,8 @@ public class JobUpdate implements Writeable, ToXContent {
JobUpdate that = (JobUpdate) other;
return Objects.equals(this.description, that.description)
return Objects.equals(this.jobId, that.jobId)
&& Objects.equals(this.description, that.description)
&& Objects.equals(this.detectorUpdates, that.detectorUpdates)
&& Objects.equals(this.modelPlotConfig, that.modelPlotConfig)
&& Objects.equals(this.analysisLimits, that.analysisLimits)
@ -294,7 +306,7 @@ public class JobUpdate implements Writeable, ToXContent {
@Override
public int hashCode() {
return Objects.hash(description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays,
return Objects.hash(jobId, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, categorizationFilters, customSettings,
modelSnapshotId);
}
@ -393,6 +405,8 @@ public class JobUpdate implements Writeable, ToXContent {
}
public static class Builder {
private String jobId;
private String description;
private List<DetectorUpdate> detectorUpdates;
private ModelPlotConfig modelPlotConfig;
@ -405,7 +419,14 @@ public class JobUpdate implements Writeable, ToXContent {
private Map<String, Object> customSettings;
private String modelSnapshotId;
public Builder() {}
public Builder(String jobId) {
this.jobId = jobId;
}
public Builder setJobId(String jobId) {
this.jobId = jobId;
return this;
}
public Builder setDescription(String description) {
this.description = description;
@ -463,7 +484,7 @@ public class JobUpdate implements Writeable, ToXContent {
}
public JobUpdate build() {
return new JobUpdate(description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval,
return new JobUpdate(jobId, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval,
renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, categorizationFilters, customSettings,
modelSnapshotId);
}

@ -189,7 +189,7 @@ public class AutoDetectResultProcessor {
}
protected void updateModelSnapshotIdOnJob(ModelSnapshot modelSnapshot) {
JobUpdate update = new JobUpdate.Builder().setModelSnapshotId(modelSnapshot.getSnapshotId()).build();
JobUpdate update = new JobUpdate.Builder(jobId).setModelSnapshotId(modelSnapshot.getSnapshotId()).build();
UpdateJobAction.Request updateRequest = new UpdateJobAction.Request(jobId, update);
client.execute(UpdateJobAction.INSTANCE, updateRequest, new ActionListener<PutJobAction.Response>() {
@Override

@ -16,7 +16,7 @@ public class UpdateJobActionRequestTests
protected UpdateJobAction.Request createTestInstance() {
String jobId = randomAsciiOfLength(10);
// no need to randomize JobUpdate this is already tested in: JobUpdateTests
JobUpdate.Builder jobUpdate = new JobUpdate.Builder();
JobUpdate.Builder jobUpdate = new JobUpdate.Builder(jobId);
jobUpdate.setAnalysisLimits(new AnalysisLimits(100L, 100L));
return new UpdateJobAction.Request(jobId, jobUpdate.build());
}

@ -14,10 +14,9 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.notifications.Auditor;
@ -108,7 +107,8 @@ public class JobManagerTests extends ESTestCase {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
JobResultsPersister jobResultsPersister = mock(JobResultsPersister.class);
Client client = mock(Client.class);
return new JobManager(settings, jobProvider, jobResultsPersister, clusterService, auditor, client);
UpdateJobProcessNotifier notifier = mock(UpdateJobProcessNotifier.class);
return new JobManager(settings, jobProvider, jobResultsPersister, clusterService, auditor, client, notifier);
}
private ClusterState createClusterState() {

@ -23,7 +23,7 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
@Override
protected JobUpdate createTestInstance() {
JobUpdate.Builder update = new JobUpdate.Builder();
JobUpdate.Builder update = new JobUpdate.Builder(randomAsciiOfLength(4));
if (randomBoolean()) {
update.setDescription(randomAsciiOfLength(20));
}
@ -104,7 +104,7 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
List<String> categorizationFilters = Arrays.asList(generateRandomStringArray(10, 10, false));
Map<String, Object> customSettings = Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10));
JobUpdate.Builder updateBuilder = new JobUpdate.Builder();
JobUpdate.Builder updateBuilder = new JobUpdate.Builder("foo");
updateBuilder.setDescription("updated_description");
updateBuilder.setDetectorUpdates(detectorUpdates);
updateBuilder.setModelPlotConfig(modelPlotConfig);
@ -151,11 +151,11 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
}
public void testIsAutodetectProcessUpdate() {
JobUpdate update = new JobUpdate.Builder().build();
JobUpdate update = new JobUpdate.Builder("foo").build();
assertFalse(update.isAutodetectProcessUpdate());
update = new JobUpdate.Builder().setModelPlotConfig(new ModelPlotConfig(true, "ff")).build();
update = new JobUpdate.Builder("foo").setModelPlotConfig(new ModelPlotConfig(true, "ff")).build();
assertTrue(update.isAutodetectProcessUpdate());
update = new JobUpdate.Builder().setDetectorUpdates(Arrays.asList(mock(JobUpdate.DetectorUpdate.class))).build();
update = new JobUpdate.Builder("foo").setDetectorUpdates(Arrays.asList(mock(JobUpdate.DetectorUpdate.class))).build();
assertTrue(update.isAutodetectProcessUpdate());
}
}

@ -269,7 +269,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
verify(persister, times(1)).persistModelSnapshot(modelSnapshot);
UpdateJobAction.Request expectedJobUpdateRequest = new UpdateJobAction.Request(JOB_ID,
new JobUpdate.Builder().setModelSnapshotId("a_snapshot_id").build());
new JobUpdate.Builder(JOB_ID).setModelSnapshotId("a_snapshot_id").build());
verify(client).execute(same(UpdateJobAction.INSTANCE), eq(expectedJobUpdateRequest), any());
verifyNoMoreInteractions(persister);