[ML] Prevent unnecessary job updates. (elastic/x-pack-elasticsearch#4424)

Original commit: elastic/x-pack-elasticsearch@8e0629789e
This commit is contained in:
David Kyle 2018-04-20 12:42:31 +01:00 committed by GitHub
parent 14ad96ea8b
commit 043a877a31
4 changed files with 98 additions and 45 deletions

View File

@ -54,6 +54,7 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
/** Indicates an update that was not triggered by a user */
private boolean isInternal;
private boolean waitForAck = true;
public Request(String jobId, JobUpdate update) {
this(jobId, update, false);
@ -87,6 +88,14 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
return isInternal;
}
public boolean isWaitForAck() {
return waitForAck;
}
public void setWaitForAck(boolean waitForAck) {
this.waitForAck = waitForAck;
}
@Override
public ActionRequestValidationException validate() {
return null;
@ -102,6 +111,11 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
} else {
isInternal = false;
}
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
waitForAck = in.readBoolean();
} else {
waitForAck = true;
}
}
@Override
@ -112,6 +126,9 @@ public class UpdateJobAction extends Action<UpdateJobAction.Request, PutJobActio
if (out.getVersion().onOrAfter(Version.V_6_2_2)) {
out.writeBoolean(isInternal);
}
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
out.writeBoolean(waitForAck);
}
}
@Override

View File

@ -18,7 +18,9 @@ public class UpdateJobActionRequestTests
// no need to randomize JobUpdate this is already tested in: JobUpdateTests
JobUpdate.Builder jobUpdate = new JobUpdate.Builder(jobId);
jobUpdate.setAnalysisLimits(new AnalysisLimits(100L, 100L));
return new UpdateJobAction.Request(jobId, jobUpdate.build());
UpdateJobAction.Request request = new UpdateJobAction.Request(jobId, jobUpdate.build());
request.setWaitForAck(randomBoolean());
return request;
}
@Override

View File

@ -11,7 +11,9 @@ import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
@ -303,57 +305,88 @@ public class JobManager extends AbstractComponent {
}
private void internalJobUpdate(UpdateJobAction.Request request, ActionListener<PutJobAction.Response> actionListener) {
clusterService.submitStateUpdateTask("update-job-" + request.getJobId(),
new AckedClusterStateUpdateTask<PutJobAction.Response>(request, actionListener) {
private volatile Job updatedJob;
private volatile boolean processUpdateRequired;
@Override
protected PutJobAction.Response newResponse(boolean acknowledged) {
return new PutJobAction.Response(updatedJob);
}
Job job = getJobOrThrowIfUnknown(request.getJobId());
final Job updatedJob = request.getJobUpdate().mergeWithJob(job, maxModelMemoryLimit);
if (updatedJob.equals(job)) {
// No change will results in a clusterstate update no-op so don't
// submit the request.
actionListener.onResponse(new PutJobAction.Response(updatedJob));
return;
}
@Override
public ClusterState execute(ClusterState currentState) {
Job job = getJobOrThrowIfUnknown(request.getJobId(), currentState);
updatedJob = request.getJobUpdate().mergeWithJob(job, maxModelMemoryLimit);
if (updatedJob.equals(job)) {
// nothing to do
return currentState;
if (request.isWaitForAck()) {
// Use the ack cluster state update
clusterService.submitStateUpdateTask("update-job-" + request.getJobId(),
new AckedClusterStateUpdateTask<PutJobAction.Response>(request, actionListener) {
@Override
protected PutJobAction.Response newResponse(boolean acknowledged) {
return new PutJobAction.Response(updatedJob);
}
// No change is required if the fields that the C++ uses aren't being updated
processUpdateRequired = request.getJobUpdate().isAutodetectProcessUpdate();
return updateClusterState(updatedJob, true, currentState);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
JobUpdate jobUpdate = request.getJobUpdate();
if (processUpdateRequired && isJobOpen(newState, request.getJobId())) {
updateJobProcessNotifier.submitJobUpdate(UpdateParams.fromJobUpdate(jobUpdate), ActionListener.wrap(
isUpdated -> {
if (isUpdated) {
auditJobUpdatedIfNotInternal(request);
}
}, e -> {
// No need to do anything
}
));
} else {
logger.debug("[{}] No process update required for job update: {}", () -> request.getJobId(), () -> {
try {
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
jobUpdate.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
return Strings.toString(jsonBuilder);
} catch (IOException e) {
return "(unprintable due to " + e.getMessage() + ")";
}
});
@Override
public ClusterState execute(ClusterState currentState) {
return updateClusterState(updatedJob, true, currentState);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
afterClusterStateUpdate(newState, request);
}
});
} else {
clusterService.submitStateUpdateTask("update-job-" + request.getJobId(), new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return updateClusterState(updatedJob, true, currentState);
}
@Override
public void onFailure(String source, Exception e) {
actionListener.onFailure(e);
}
@Override
public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
afterClusterStateUpdate(clusterChangedEvent.state(), request);
actionListener.onResponse(new PutJobAction.Response(updatedJob));
}
});
}
}
private void afterClusterStateUpdate(ClusterState newState, UpdateJobAction.Request request) {
JobUpdate jobUpdate = request.getJobUpdate();
// Change is required if the fields that the C++ uses are being updated
boolean processUpdateRequired = jobUpdate.isAutodetectProcessUpdate();
if (processUpdateRequired && isJobOpen(newState, request.getJobId())) {
updateJobProcessNotifier.submitJobUpdate(UpdateParams.fromJobUpdate(jobUpdate), ActionListener.wrap(
isUpdated -> {
if (isUpdated) {
auditJobUpdatedIfNotInternal(request);
}
}, e -> {
// No need to do anything
}
});
));
} else {
logger.debug("[{}] No process update required for job update: {}", () -> request.getJobId(), () -> {
try {
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
jobUpdate.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
return Strings.toString(jsonBuilder);
} catch (IOException e) {
return "(unprintable due to " + e.getMessage() + ")";
}
});
auditJobUpdatedIfNotInternal(request);
}
}
private void auditJobUpdatedIfNotInternal(UpdateJobAction.Request request) {

View File

@ -351,11 +351,12 @@ public class AutoDetectResultProcessor {
});
}
protected void updateEstablishedModelMemoryOnJob(Date latestBucketTimestamp, ModelSizeStats modelSizeStats) {
private void updateEstablishedModelMemoryOnJob(Date latestBucketTimestamp, ModelSizeStats modelSizeStats) {
jobProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStats, establishedModelMemory -> {
JobUpdate update = new JobUpdate.Builder(jobId)
.setEstablishedModelMemory(establishedModelMemory).build();
UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update);
updateRequest.setWaitForAck(false);
executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, new ActionListener<PutJobAction.Response>() {
@Override