From 043a877a31a76790e1d4678f4459b3a0afa0818a Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 20 Apr 2018 12:42:31 +0100 Subject: [PATCH] [ML] Prevent unnecessary job updates. (elastic/x-pack-elasticsearch#4424) Original commit: elastic/x-pack-elasticsearch@8e0629789e3f849864e14d728792a91f4a83b558 --- .../xpack/core/ml/action/UpdateJobAction.java | 17 +++ .../action/UpdateJobActionRequestTests.java | 4 +- .../xpack/ml/job/JobManager.java | 119 +++++++++++------- .../output/AutoDetectResultProcessor.java | 3 +- 4 files changed, 98 insertions(+), 45 deletions(-) diff --git a/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java b/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java index 911da6cad4b..f7998a52d49 100644 --- a/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java +++ b/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java @@ -54,6 +54,7 @@ public class UpdateJobAction extends Action actionListener) { - clusterService.submitStateUpdateTask("update-job-" + request.getJobId(), - new AckedClusterStateUpdateTask(request, actionListener) { - private volatile Job updatedJob; - private volatile boolean processUpdateRequired; - @Override - protected PutJobAction.Response newResponse(boolean acknowledged) { - return new PutJobAction.Response(updatedJob); - } + Job job = getJobOrThrowIfUnknown(request.getJobId()); + final Job updatedJob = request.getJobUpdate().mergeWithJob(job, maxModelMemoryLimit); + if (updatedJob.equals(job)) { + // No change will results in a clusterstate update no-op so don't + // submit the request. + actionListener.onResponse(new PutJobAction.Response(updatedJob)); + return; + } - @Override - public ClusterState execute(ClusterState currentState) { - Job job = getJobOrThrowIfUnknown(request.getJobId(), currentState); - updatedJob = request.getJobUpdate().mergeWithJob(job, maxModelMemoryLimit); - if (updatedJob.equals(job)) { - // nothing to do - return currentState; + if (request.isWaitForAck()) { + // Use the ack cluster state update + clusterService.submitStateUpdateTask("update-job-" + request.getJobId(), + new AckedClusterStateUpdateTask(request, actionListener) { + + @Override + protected PutJobAction.Response newResponse(boolean acknowledged) { + return new PutJobAction.Response(updatedJob); } - // No change is required if the fields that the C++ uses aren't being updated - processUpdateRequired = request.getJobUpdate().isAutodetectProcessUpdate(); - return updateClusterState(updatedJob, true, currentState); - } - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - JobUpdate jobUpdate = request.getJobUpdate(); - if (processUpdateRequired && isJobOpen(newState, request.getJobId())) { - updateJobProcessNotifier.submitJobUpdate(UpdateParams.fromJobUpdate(jobUpdate), ActionListener.wrap( - isUpdated -> { - if (isUpdated) { - auditJobUpdatedIfNotInternal(request); - } - }, e -> { - // No need to do anything - } - )); - } else { - logger.debug("[{}] No process update required for job update: {}", () -> request.getJobId(), () -> { - try { - XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); - jobUpdate.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); - return Strings.toString(jsonBuilder); - } catch (IOException e) { - return "(unprintable due to " + e.getMessage() + ")"; - } - }); + @Override + public ClusterState execute(ClusterState currentState) { + return updateClusterState(updatedJob, true, currentState); + } + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + afterClusterStateUpdate(newState, request); + } + }); + } else { + clusterService.submitStateUpdateTask("update-job-" + request.getJobId(), new ClusterStateUpdateTask() { + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return updateClusterState(updatedJob, true, currentState); + } + + @Override + public void onFailure(String source, Exception e) { + actionListener.onFailure(e); + } + + @Override + public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { + afterClusterStateUpdate(clusterChangedEvent.state(), request); + actionListener.onResponse(new PutJobAction.Response(updatedJob)); + + } + }); + } + } + + private void afterClusterStateUpdate(ClusterState newState, UpdateJobAction.Request request) { + JobUpdate jobUpdate = request.getJobUpdate(); + + // Change is required if the fields that the C++ uses are being updated + boolean processUpdateRequired = jobUpdate.isAutodetectProcessUpdate(); + + if (processUpdateRequired && isJobOpen(newState, request.getJobId())) { + updateJobProcessNotifier.submitJobUpdate(UpdateParams.fromJobUpdate(jobUpdate), ActionListener.wrap( + isUpdated -> { + if (isUpdated) { auditJobUpdatedIfNotInternal(request); } + }, e -> { + // No need to do anything } - }); + )); + } else { + logger.debug("[{}] No process update required for job update: {}", () -> request.getJobId(), () -> { + try { + XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); + jobUpdate.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); + return Strings.toString(jsonBuilder); + } catch (IOException e) { + return "(unprintable due to " + e.getMessage() + ")"; + } + }); + + auditJobUpdatedIfNotInternal(request); + } } private void auditJobUpdatedIfNotInternal(UpdateJobAction.Request request) { diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index d707af422c0..67eccb1caef 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -351,11 +351,12 @@ public class AutoDetectResultProcessor { }); } - protected void updateEstablishedModelMemoryOnJob(Date latestBucketTimestamp, ModelSizeStats modelSizeStats) { + private void updateEstablishedModelMemoryOnJob(Date latestBucketTimestamp, ModelSizeStats modelSizeStats) { jobProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStats, establishedModelMemory -> { JobUpdate update = new JobUpdate.Builder(jobId) .setEstablishedModelMemory(establishedModelMemory).build(); UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update); + updateRequest.setWaitForAck(false); executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, new ActionListener() { @Override