From cfc66a1fd5fcd2351184fddc0a370de965ac885d Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 24 Apr 2018 18:46:58 +0100 Subject: [PATCH] [ML] Wait for updates to established memory usage Tests need to wait for changes to the job's established memory usage to propagate and an over enthusiastic optimisation meant jobs were updated from stale state causing recent change to be lost. --- .../xpack/ml/job/JobManager.java | 26 ++++++++----------- .../integration/BasicRenormalizationIT.java | 10 ++++--- .../xpack/ml/integration/DatafeedJobsIT.java | 10 ++++--- .../ml/integration/OverallBucketsIT.java | 10 ++++--- .../integration/RestoreModelSnapshotIT.java | 10 ++++--- 5 files changed, 35 insertions(+), 31 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index ca6593654e8..0f6a0f44cbf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -63,6 +63,7 @@ import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -305,29 +306,22 @@ public class JobManager extends AbstractComponent { } private void internalJobUpdate(UpdateJobAction.Request request, ActionListener actionListener) { - - Job job = getJobOrThrowIfUnknown(request.getJobId()); - final Job updatedJob = request.getJobUpdate().mergeWithJob(job, maxModelMemoryLimit); - if (updatedJob.equals(job)) { - // No change will results in a clusterstate update no-op so don't - // submit the request. - actionListener.onResponse(new PutJobAction.Response(updatedJob)); - return; - } - if (request.isWaitForAck()) { // Use the ack cluster state update clusterService.submitStateUpdateTask("update-job-" + request.getJobId(), new AckedClusterStateUpdateTask(request, actionListener) { + private AtomicReference updatedJob = new AtomicReference<>(); @Override protected PutJobAction.Response newResponse(boolean acknowledged) { - return new PutJobAction.Response(updatedJob); + return new PutJobAction.Response(updatedJob.get()); } @Override public ClusterState execute(ClusterState currentState) { - return updateClusterState(updatedJob, true, currentState); + Job job = getJobOrThrowIfUnknown(request.getJobId(), currentState); + updatedJob.set(request.getJobUpdate().mergeWithJob(job, maxModelMemoryLimit)); + return updateClusterState(updatedJob.get(), true, currentState); } @Override @@ -337,10 +331,13 @@ public class JobManager extends AbstractComponent { }); } else { clusterService.submitStateUpdateTask("update-job-" + request.getJobId(), new ClusterStateUpdateTask() { + private AtomicReference updatedJob = new AtomicReference<>(); @Override public ClusterState execute(ClusterState currentState) throws Exception { - return updateClusterState(updatedJob, true, currentState); + Job job = getJobOrThrowIfUnknown(request.getJobId(), currentState); + updatedJob.set(request.getJobUpdate().mergeWithJob(job, maxModelMemoryLimit)); + return updateClusterState(updatedJob.get(), true, currentState); } @Override @@ -351,8 +348,7 @@ public class JobManager extends AbstractComponent { @Override public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { afterClusterStateUpdate(clusterChangedEvent.state(), request); - actionListener.onResponse(new PutJobAction.Response(updatedJob)); - + actionListener.onResponse(new PutJobAction.Response(updatedJob.get())); } }); } diff --git a/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java index ea80673b1d2..80afdeff82a 100644 --- a/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java +++ b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java @@ -50,10 +50,12 @@ public class BasicRenormalizationIT extends MlNativeAutodetectIntegTestCase { // Since this job ran for 50 buckets, it's a good place to assert // that established model memory matches model memory in the job stats - GetJobsStatsAction.Response.JobStats jobStats = getJobStats(jobId).get(0); - ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); - Job updatedJob = getJob(jobId).get(0); - assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes())); + assertBusy(() -> { + GetJobsStatsAction.Response.JobStats jobStats = getJobStats(jobId).get(0); + ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); + Job updatedJob = getJob(jobId).get(0); + assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes())); + }); } public void testRenormalizationDisabled() throws Exception { diff --git a/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java index c211d717c6e..23bd9dc8495 100644 --- a/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java +++ b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java @@ -94,10 +94,12 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase { // Since this job ran for 168 buckets, it's a good place to assert // that established model memory matches model memory in the job stats - GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0); - ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); - Job updatedJob = getJob(job.getId()).get(0); - assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes())); + assertBusy(() -> { + GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0); + ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); + Job updatedJob = getJob(job.getId()).get(0); + assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes())); + }); } public void testRealtime() throws Exception { diff --git a/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/OverallBucketsIT.java b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/OverallBucketsIT.java index dfabfdacf09..785c3c6f677 100644 --- a/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/OverallBucketsIT.java +++ b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/OverallBucketsIT.java @@ -103,10 +103,12 @@ public class OverallBucketsIT extends MlNativeAutodetectIntegTestCase { // Since this job ran for 3000 buckets, it's a good place to assert // that established model memory matches model memory in the job stats - GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0); - ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); - Job updatedJob = getJob(job.getId()).get(0); - assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes())); + assertBusy(() -> { + GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0); + ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); + Job updatedJob = getJob(job.getId()).get(0); + assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes())); + }); } private static Map createRecord(long timestamp) { diff --git a/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RestoreModelSnapshotIT.java b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RestoreModelSnapshotIT.java index bd67f7165f1..c5bc7c4ed14 100644 --- a/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RestoreModelSnapshotIT.java +++ b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RestoreModelSnapshotIT.java @@ -75,10 +75,12 @@ public class RestoreModelSnapshotIT extends MlNativeAutodetectIntegTestCase { // Since these jobs ran for 72 buckets, it's a good place to assert // that established model memory matches model memory in the job stats - GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0); - ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); - Job updatedJob = getJob(job.getId()).get(0); - assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes())); + assertBusy(() -> { + GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0); + ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); + Job updatedJob = getJob(job.getId()).get(0); + assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes())); + }); } private Job.Builder buildAndRegisterJob(String jobId, TimeValue bucketSpan) throws Exception {