From 41b254cdf45c591d4432bbf71b3005eea6845dfd Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 21 Nov 2017 11:47:34 +0100 Subject: [PATCH 1/8] change forecast message into an array of messages (elastic/x-pack-elasticsearch#3070) depends on elastic/machine-learning-cpp#419 Turns the forecast message into an array of messages. Original commit: elastic/x-pack-elasticsearch@7598342712e33e967e9816f89524462d209da849 --- .../persistence/ElasticsearchMappings.java | 2 +- .../ml/job/results/ForecastRequestStats.java | 36 ++++++++++++------- .../ml/job/results/ReservedFieldNames.java | 2 +- .../results/ForecastRequestStatsTests.java | 10 ++++-- 4 files changed, 33 insertions(+), 17 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java index 0473a69338d..70f22e355e1 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java @@ -347,7 +347,7 @@ public class ElasticsearchMappings { .startObject(ForecastRequestStats.END_TIME.getPreferredName()) .field(TYPE, DATE) .endObject() - .startObject(ForecastRequestStats.MESSAGE.getPreferredName()) + .startObject(ForecastRequestStats.MESSAGES.getPreferredName()) .field(TYPE, KEYWORD) .endObject() .startObject(ForecastRequestStats.PROGRESS.getPreferredName()) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ForecastRequestStats.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ForecastRequestStats.java index a91ba9e892e..606c5fd81b7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ForecastRequestStats.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ForecastRequestStats.java @@ -17,6 +17,7 @@ import org.elasticsearch.xpack.ml.job.config.Job; import java.io.IOException; import java.time.Instant; +import java.util.List; import java.util.Locale; import java.util.Objects; @@ -37,7 +38,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { public static final ParseField FORECAST_ID = new ParseField("forecast_id"); public static final ParseField START_TIME = new ParseField("forecast_start_timestamp"); public static final ParseField END_TIME = new ParseField("forecast_end_timestamp"); - public static final ParseField MESSAGE = new ParseField("forecast_message"); + public static final ParseField MESSAGES = new ParseField("forecast_messages"); public static final ParseField PROCESSING_TIME_MS = new ParseField("processing_time_ms"); public static final ParseField PROGRESS = new ParseField("forecast_progress"); public static final ParseField PROCESSED_RECORD_COUNT = new ParseField("processed_record_count"); @@ -53,7 +54,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { PARSER.declareString((modelForecastRequestStats, s) -> {}, Result.RESULT_TYPE); PARSER.declareLong(ForecastRequestStats::setRecordCount, PROCESSED_RECORD_COUNT); - PARSER.declareString(ForecastRequestStats::setMessage, MESSAGE); + PARSER.declareStringArray(ForecastRequestStats::setMessages, MESSAGES); PARSER.declareField(ForecastRequestStats::setStartTimeStamp, p -> Instant.ofEpochMilli(p.longValue()), START_TIME, ValueType.LONG); PARSER.declareField(ForecastRequestStats::setEndTimeStamp, @@ -89,7 +90,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { private final String jobId; private final long forecastId; private long recordCount; - private String message; + private List messages; private Instant dateStarted = Instant.EPOCH; private Instant dateEnded = Instant.EPOCH; private double progress; @@ -106,7 +107,11 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { jobId = in.readString(); forecastId = in.readLong(); recordCount = in.readLong(); - message = in.readOptionalString(); + if (in.readBoolean()) { + messages = in.readList(StreamInput::readString); + } else { + messages = null; + } dateStarted = Instant.ofEpochMilli(in.readVLong()); dateEnded = Instant.ofEpochMilli(in.readVLong()); progress = in.readDouble(); @@ -120,7 +125,12 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { out.writeString(jobId); out.writeLong(forecastId); out.writeLong(recordCount); - out.writeOptionalString(message); + if (messages != null) { + out.writeBoolean(true); + out.writeStringList(messages); + } else { + out.writeBoolean(false); + } out.writeVLong(dateStarted.toEpochMilli()); out.writeVLong(dateEnded.toEpochMilli()); out.writeDouble(progress); @@ -136,8 +146,8 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE); builder.field(FORECAST_ID.getPreferredName(), forecastId); builder.field(PROCESSED_RECORD_COUNT.getPreferredName(), recordCount); - if (message != null) { - builder.field(MESSAGE.getPreferredName(), message); + if (messages != null) { + builder.field(MESSAGES.getPreferredName(), messages); } if (dateStarted.equals(Instant.EPOCH) == false) { builder.field(START_TIME.getPreferredName(), dateStarted.toEpochMilli()); @@ -175,12 +185,12 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { return recordCount; } - public String getMessage() { - return message; + public List getMessages() { + return messages; } - public void setMessage(String message) { - this.message = message; + public void setMessages(List messages) { + this.messages = messages; } public Instant getDateStarted() { @@ -250,7 +260,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { return Objects.equals(this.jobId, that.jobId) && this.forecastId == that.forecastId && this.recordCount == that.recordCount && - Objects.equals(this.message, that.message) && + Objects.equals(this.messages, that.messages) && Objects.equals(this.dateStarted, that.dateStarted) && Objects.equals(this.dateEnded, that.dateEnded) && this.progress == that.progress && @@ -261,7 +271,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { @Override public int hashCode() { - return Objects.hash(jobId, forecastId, recordCount, message, dateStarted, dateEnded, progress, + return Objects.hash(jobId, forecastId, recordCount, messages, dateStarted, dateEnded, progress, processingTime, memoryUsage, status); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java index b6d8f55331c..64fb5c3230e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java @@ -133,7 +133,7 @@ public final class ReservedFieldNames { ForecastRequestStats.START_TIME.getPreferredName(), ForecastRequestStats.END_TIME.getPreferredName(), - ForecastRequestStats.MESSAGE.getPreferredName(), + ForecastRequestStats.MESSAGES.getPreferredName(), ForecastRequestStats.PROGRESS.getPreferredName(), ForecastRequestStats.STATUS.getPreferredName(), diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/ForecastRequestStatsTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/ForecastRequestStatsTests.java index 6ca8bead2df..0af6a9459b9 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/ForecastRequestStatsTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/ForecastRequestStatsTests.java @@ -12,6 +12,8 @@ import org.elasticsearch.xpack.ml.job.results.ForecastRequestStats.ForecastReque import java.io.IOException; import java.time.Instant; +import java.util.ArrayList; +import java.util.List; public class ForecastRequestStatsTests extends AbstractSerializingTestCase { @@ -32,7 +34,12 @@ public class ForecastRequestStatsTests extends AbstractSerializingTestCase list = new ArrayList<>(); + for (int i = 0; i < size; i++) { + list.add(randomAlphaOfLength(40)); + } + forecastRequestStats.setMessages(list); } if (randomBoolean()) { forecastRequestStats.setStartTimeStamp(Instant.ofEpochMilli(randomNonNegativeLong())); @@ -65,5 +72,4 @@ public class ForecastRequestStatsTests extends AbstractSerializingTestCase Date: Tue, 21 Nov 2017 10:22:43 +0000 Subject: [PATCH 2/8] Update BWC version after backporting to 6.1 Relates elastic/x-pack-elasticsearch#2975 Original commit: elastic/x-pack-elasticsearch@a63c56a019f116e866ebf96d4e976170529bccda --- .../org/elasticsearch/xpack/ml/job/config/Job.java | 12 ++++-------- .../elasticsearch/xpack/ml/job/config/JobUpdate.java | 6 ++---- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java index 496398275bf..c697ebb02a3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/Job.java @@ -222,8 +222,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO createTime = new Date(in.readVLong()); finishedTime = in.readBoolean() ? new Date(in.readVLong()) : null; lastDataTime = in.readBoolean() ? new Date(in.readVLong()) : null; - // TODO: set to V_6_1_0 after backporting - if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + if (in.getVersion().onOrAfter(Version.V_6_1_0)) { establishedModelMemory = in.readOptionalLong(); } else { establishedModelMemory = null; @@ -484,8 +483,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO } else { out.writeBoolean(false); } - // TODO: set to V_6_1_0 after backporting - if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + if (out.getVersion().onOrAfter(Version.V_6_1_0)) { out.writeOptionalLong(establishedModelMemory); } analysisConfig.writeTo(out); @@ -707,8 +705,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO createTime = in.readBoolean() ? new Date(in.readVLong()) : null; finishedTime = in.readBoolean() ? new Date(in.readVLong()) : null; lastDataTime = in.readBoolean() ? new Date(in.readVLong()) : null; - // TODO: set to V_6_1_0 after backporting - if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + if (in.getVersion().onOrAfter(Version.V_6_1_0)) { establishedModelMemory = in.readOptionalLong(); } analysisConfig = in.readOptionalWriteable(AnalysisConfig::new); @@ -896,8 +893,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO } else { out.writeBoolean(false); } - // TODO: set to V_6_1_0 after backporting - if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + if (out.getVersion().onOrAfter(Version.V_6_1_0)) { out.writeOptionalLong(establishedModelMemory); } out.writeOptionalWriteable(analysisConfig); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobUpdate.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobUpdate.java index 7c41af6bf4f..b78b7afa042 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobUpdate.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobUpdate.java @@ -124,8 +124,7 @@ public class JobUpdate implements Writeable, ToXContentObject { } customSettings = in.readMap(); modelSnapshotId = in.readOptionalString(); - // TODO: set to V_6_1_0 after backporting - if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + if (in.getVersion().onOrAfter(Version.V_6_1_0)) { establishedModelMemory = in.readOptionalLong(); } else { establishedModelMemory = null; @@ -156,8 +155,7 @@ public class JobUpdate implements Writeable, ToXContentObject { } out.writeMap(customSettings); out.writeOptionalString(modelSnapshotId); - // TODO: set to V_6_1_0 after backporting - if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + if (out.getVersion().onOrAfter(Version.V_6_1_0)) { out.writeOptionalLong(establishedModelMemory); } } From 754623753a0c0c64986d346f947f97b94d44aadc Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Tue, 21 Nov 2017 11:43:01 +0000 Subject: [PATCH 3/8] [ML] Make it easier to add various ml data removal (elastic/x-pack-elasticsearch#3048) Original commit: elastic/x-pack-elasticsearch@3e4ac6033b869f5775cd2e1ba13d07145e121671 --- .../ml/action/DeleteExpiredDataAction.java | 30 +++++++++++---- .../AbstractExpiredJobDataRemover.java | 26 ++----------- .../xpack/ml/job/retention/MlDataRemover.java | 10 +++++ .../ml/utils/VolatileCursorIterator.java | 38 +++++++++++++++++++ .../ExpiredModelSnapshotsRemoverTests.java | 20 +++++----- .../retention/ExpiredResultsRemoverTests.java | 16 ++++---- 6 files changed, 93 insertions(+), 47 deletions(-) create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/utils/VolatileCursorIterator.java diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java index 2f38aed0768..73f43a38c12 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java @@ -28,11 +28,17 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover; import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover; +import org.elasticsearch.xpack.ml.job.retention.MlDataRemover; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator; import org.elasticsearch.xpack.security.InternalClient; import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; public class DeleteExpiredDataAction extends Action { @@ -139,14 +145,24 @@ public class DeleteExpiredDataAction extends Action listener) { Auditor auditor = new Auditor(client, clusterService); - ExpiredResultsRemover resultsRemover = new ExpiredResultsRemover(client, clusterService, auditor); - resultsRemover.trigger(() -> { - ExpiredModelSnapshotsRemover modelSnapshotsRemover = new ExpiredModelSnapshotsRemover(client, clusterService); - modelSnapshotsRemover.trigger(() -> { - logger.debug("Finished deleting expired data"); - listener.onResponse(new Response(true)); + List dataRemovers = Arrays.asList( + new ExpiredResultsRemover(client, clusterService, auditor), + new ExpiredModelSnapshotsRemover(client, clusterService) + ); + Iterator dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers); + deleteExpiredData(dataRemoversIterator, listener); + } + + private void deleteExpiredData(Iterator mlDataRemoversIterator, ActionListener listener) { + if (mlDataRemoversIterator.hasNext()) { + MlDataRemover remover = mlDataRemoversIterator.next(); + remover.remove(() -> { + deleteExpiredData(mlDataRemoversIterator, listener); }); - }); + } else { + logger.debug("Finished deleting expired data"); + listener.onResponse(new Response(true)); + } } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java index 0943c2630fc..efb30a1e378 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java @@ -13,6 +13,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.results.Result; +import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator; import org.joda.time.DateTime; import org.joda.time.chrono.ISOChronology; @@ -29,7 +30,7 @@ import java.util.concurrent.TimeUnit; * blocking the thread it was called at for too long. It does so by * chaining the steps together. */ -abstract class AbstractExpiredJobDataRemover { +abstract class AbstractExpiredJobDataRemover implements MlDataRemover { private final ClusterService clusterService; @@ -37,7 +38,8 @@ abstract class AbstractExpiredJobDataRemover { this.clusterService = Objects.requireNonNull(clusterService); } - public void trigger(Runnable onFinish) { + @Override + public void remove(Runnable onFinish) { removeData(newJobIterator(), onFinish); } @@ -88,24 +90,4 @@ abstract class AbstractExpiredJobDataRemover { .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)) .filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).lt(cutoffEpochMs).format("epoch_millis")); } - - private static class VolatileCursorIterator implements Iterator { - private final List items; - private volatile int cursor; - - private VolatileCursorIterator(List items) { - this.items = items; - this.cursor = 0; - } - - @Override - public boolean hasNext() { - return cursor < items.size(); - } - - @Override - public T next() { - return items.get(cursor++); - } - } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java new file mode 100644 index 00000000000..14decf04cf6 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java @@ -0,0 +1,10 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.retention; + +public interface MlDataRemover { + void remove(Runnable onFinish); +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/VolatileCursorIterator.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/VolatileCursorIterator.java new file mode 100644 index 00000000000..5eae6351310 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/VolatileCursorIterator.java @@ -0,0 +1,38 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.utils; + +import java.util.Iterator; +import java.util.List; + +/** + * An iterator whose cursor is volatile. The intended usage + * is to allow safe iteration which is done serially but + * from potentially different threads. In particular, this + * allows iterating over a collection via callbacks, where + * each call deals with the next item and only calls the next + * callback once it's finished. + */ +public class VolatileCursorIterator implements Iterator { + + private final List items; + private volatile int cursor; + + public VolatileCursorIterator(List items) { + this.items = items; + this.cursor = 0; + } + + @Override + public boolean hasNext() { + return cursor < items.size(); + } + + @Override + public T next() { + return items.get(cursor++); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index f934d0e388b..c157d97813e 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -67,30 +67,30 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { onFinish = mock(Runnable.class); } - public void testTrigger_GivenJobsWithoutRetentionPolicy() { + public void testRemove_GivenJobsWithoutRetentionPolicy() { givenClientRequestsSucceed(); givenJobs(Arrays.asList( JobTests.buildJobBuilder("foo").build(), JobTests.buildJobBuilder("bar").build() )); - createExpiredModelSnapshotsRemover().trigger(onFinish); + createExpiredModelSnapshotsRemover().remove(onFinish); verify(onFinish).run(); Mockito.verifyNoMoreInteractions(client); } - public void testTrigger_GivenJobWithoutActiveSnapshot() { + public void testRemove_GivenJobWithoutActiveSnapshot() { givenClientRequestsSucceed(); givenJobs(Arrays.asList(JobTests.buildJobBuilder("foo").setModelSnapshotRetentionDays(7L).build())); - createExpiredModelSnapshotsRemover().trigger(onFinish); + createExpiredModelSnapshotsRemover().remove(onFinish); verify(onFinish).run(); Mockito.verifyNoMoreInteractions(client); } - public void testTrigger_GivenJobsWithMixedRetentionPolicies() throws IOException { + public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException { givenClientRequestsSucceed(); givenJobs(Arrays.asList( JobTests.buildJobBuilder("none").build(), @@ -104,7 +104,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots)); searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots)); - createExpiredModelSnapshotsRemover().trigger(onFinish); + createExpiredModelSnapshotsRemover().remove(onFinish); assertThat(capturedSearchRequests.size(), equalTo(2)); SearchRequest searchRequest = capturedSearchRequests.get(0); @@ -126,7 +126,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { verify(onFinish).run(); } - public void testTrigger_GivenClientSearchRequestsFail() throws IOException { + public void testRemove_GivenClientSearchRequestsFail() throws IOException { givenClientSearchRequestsFail(); givenJobs(Arrays.asList( JobTests.buildJobBuilder("none").build(), @@ -140,7 +140,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots)); searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots)); - createExpiredModelSnapshotsRemover().trigger(onFinish); + createExpiredModelSnapshotsRemover().remove(onFinish); assertThat(capturedSearchRequests.size(), equalTo(2)); SearchRequest searchRequest = capturedSearchRequests.get(0); @@ -153,7 +153,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { verify(onFinish).run(); } - public void testTrigger_GivenClientDeleteSnapshotRequestsFail() throws IOException { + public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOException { givenClientDeleteModelSnapshotRequestsFail(); givenJobs(Arrays.asList( JobTests.buildJobBuilder("none").build(), @@ -167,7 +167,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots)); searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots)); - createExpiredModelSnapshotsRemover().trigger(onFinish); + createExpiredModelSnapshotsRemover().remove(onFinish); assertThat(capturedSearchRequests.size(), equalTo(2)); SearchRequest searchRequest = capturedSearchRequests.get(0); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java index ed2a9bca884..01ce1de6a8a 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java @@ -68,30 +68,30 @@ public class ExpiredResultsRemoverTests extends ESTestCase { onFinish = mock(Runnable.class); } - public void testTrigger_GivenNoJobs() { + public void testRemove_GivenNoJobs() { givenClientRequestsSucceed(); givenJobs(Collections.emptyList()); - createExpiredResultsRemover().trigger(onFinish); + createExpiredResultsRemover().remove(onFinish); verify(onFinish).run(); Mockito.verifyNoMoreInteractions(client); } - public void testTrigger_GivenJobsWithoutRetentionPolicy() { + public void testRemove_GivenJobsWithoutRetentionPolicy() { givenClientRequestsSucceed(); givenJobs(Arrays.asList( JobTests.buildJobBuilder("foo").build(), JobTests.buildJobBuilder("bar").build() )); - createExpiredResultsRemover().trigger(onFinish); + createExpiredResultsRemover().remove(onFinish); verify(onFinish).run(); Mockito.verifyNoMoreInteractions(client); } - public void testTrigger_GivenJobsWithAndWithoutRetentionPolicy() throws IOException { + public void testRemove_GivenJobsWithAndWithoutRetentionPolicy() throws IOException { givenClientRequestsSucceed(); givenJobs(Arrays.asList( JobTests.buildJobBuilder("none").build(), @@ -99,7 +99,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase { JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() )); - createExpiredResultsRemover().trigger(onFinish); + createExpiredResultsRemover().remove(onFinish); assertThat(capturedDeleteByQueryRequests.size(), equalTo(2)); DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0); @@ -109,7 +109,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase { verify(onFinish).run(); } - public void testTrigger_GivenClientRequestsFailed_StillIteratesThroughJobs() throws IOException { + public void testRemove_GivenClientRequestsFailed_StillIteratesThroughJobs() throws IOException { givenClientRequestsFailed(); givenJobs(Arrays.asList( JobTests.buildJobBuilder("none").build(), @@ -117,7 +117,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase { JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() )); - createExpiredResultsRemover().trigger(onFinish); + createExpiredResultsRemover().remove(onFinish); assertThat(capturedDeleteByQueryRequests.size(), equalTo(2)); DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0); From 83ca6e8064861db285e21cc3934491e44a1a9986 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Tue, 21 Nov 2017 12:03:04 +0000 Subject: [PATCH 4/8] [ML] Report errors back to listener in DeleteExpiredDataAction (elastic/x-pack-elasticsearch#3072) Currently, any errors that occur during the DeleteExpiredDataAction are logged and the deletion proceeds to the next job. The user will get no indication in the response that something went wrong although nothing should really go wrong unless the cluster is messed up. This commit changes this so that errors are reported back to the action. Original commit: elastic/x-pack-elasticsearch@489cf03c3e7b8cfd68116303283d45684bb8beff --- .../xpack/ml/MlDailyMaintenanceService.java | 9 ++-- .../ml/action/DeleteExpiredDataAction.java | 9 ++-- .../AbstractExpiredJobDataRemover.java | 17 ++++---- .../ExpiredModelSnapshotsRemover.java | 21 +++++----- .../job/retention/ExpiredResultsRemover.java | 8 ++-- .../xpack/ml/job/retention/MlDataRemover.java | 4 +- .../ml/MlDailyManagementServiceTests.java | 3 +- .../ExpiredModelSnapshotsRemoverTests.java | 40 +++++++----------- .../retention/ExpiredResultsRemoverTests.java | 42 +++++++------------ 9 files changed, 67 insertions(+), 86 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java index 6c926d61b22..346773411e7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.lease.Releasable; @@ -106,11 +107,9 @@ public class MlDailyMaintenanceService implements Releasable { private void triggerTasks() { LOGGER.info("triggering scheduled [ML] maintenance tasks"); - try { - client.execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()); - } catch (Exception e) { - LOGGER.error("An error occurred during maintenance tasks execution", e); - } + client.execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request(), ActionListener.wrap( + response -> LOGGER.info("Successfully completed [ML] maintenance tasks"), + e -> LOGGER.error("An error occurred during maintenance tasks execution", e))); scheduleNext(); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java index 73f43a38c12..174cc5dd3c7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteExpiredDataAction.java @@ -38,7 +38,6 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; public class DeleteExpiredDataAction extends Action { @@ -156,11 +155,11 @@ public class DeleteExpiredDataAction extends Action mlDataRemoversIterator, ActionListener listener) { if (mlDataRemoversIterator.hasNext()) { MlDataRemover remover = mlDataRemoversIterator.next(); - remover.remove(() -> { - deleteExpiredData(mlDataRemoversIterator, listener); - }); + remover.remove(ActionListener.wrap( + booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener), + listener::onFailure)); } else { - logger.debug("Finished deleting expired data"); + logger.info("Completed deletion of expired data"); listener.onResponse(new Response(true)); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java index efb30a1e378..44ff73d465b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.job.retention; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.TimeValue; @@ -39,23 +40,23 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover { } @Override - public void remove(Runnable onFinish) { - removeData(newJobIterator(), onFinish); + public void remove(ActionListener listener) { + removeData(newJobIterator(), listener); } - private void removeData(Iterator jobIterator, Runnable onFinish) { + private void removeData(Iterator jobIterator, ActionListener listener) { if (jobIterator.hasNext() == false) { - onFinish.run(); + listener.onResponse(true); return; } Job job = jobIterator.next(); Long retentionDays = getRetentionDays(job); if (retentionDays == null) { - removeData(jobIterator, () -> removeData(jobIterator, onFinish)); + removeData(jobIterator, listener); return; } long cutoffEpochMs = calcCutoffEpochMs(retentionDays); - removeDataBefore(job, cutoffEpochMs, () -> removeData(jobIterator, onFinish)); + removeDataBefore(job, cutoffEpochMs, ActionListener.wrap(response -> removeData(jobIterator, listener), listener::onFailure)); } private Iterator newJobIterator() { @@ -81,9 +82,9 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover { /** * Template method to allow implementation details of various types of data (e.g. results, model snapshots). - * Implementors need to call {@code onFinish} when they are done in order to continue to the next job. + * Implementors need to call {@code listener.onResponse} when they are done in order to continue to the next job. */ - protected abstract void removeDataBefore(Job job, long cutoffEpochMs, Runnable onFinish); + protected abstract void removeDataBefore(Job job, long cutoffEpochMs, ActionListener listener); protected static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) { return QueryBuilders.boolQuery() diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index 34bc90ab7b2..0f2b9aa2b73 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.job.retention; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; @@ -57,10 +58,10 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover } @Override - protected void removeDataBefore(Job job, long cutoffEpochMs, Runnable onFinish) { + protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener listener) { if (job.getModelSnapshotId() == null) { // No snapshot to remove - onFinish.run(); + listener.onResponse(true); return; } LOGGER.debug("Removing model snapshots of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs); @@ -86,7 +87,7 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover for (SearchHit hit : searchResponse.getHits()) { modelSnapshots.add(ModelSnapshot.fromJson(hit.getSourceRef())); } - deleteModelSnapshots(createVolatileCursorIterator(modelSnapshots), onFinish); + deleteModelSnapshots(createVolatileCursorIterator(modelSnapshots), listener); } catch (Exception e) { onFailure(e); } @@ -94,15 +95,14 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover @Override public void onFailure(Exception e) { - LOGGER.error("[" + job.getId() + "] Search for expired snapshots failed", e); - onFinish.run(); + listener.onFailure(new ElasticsearchException("[" + job.getId() + "] Search for expired snapshots failed", e)); } }); } - private void deleteModelSnapshots(Iterator modelSnapshotIterator, Runnable onFinish) { + private void deleteModelSnapshots(Iterator modelSnapshotIterator, ActionListener listener) { if (modelSnapshotIterator.hasNext() == false) { - onFinish.run(); + listener.onResponse(true); return; } ModelSnapshot modelSnapshot = modelSnapshotIterator.next(); @@ -112,7 +112,7 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover @Override public void onResponse(DeleteModelSnapshotAction.Response response) { try { - deleteModelSnapshots(modelSnapshotIterator, onFinish); + deleteModelSnapshots(modelSnapshotIterator, listener); } catch (Exception e) { onFailure(e); } @@ -120,9 +120,8 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover @Override public void onFailure(Exception e) { - LOGGER.error("[" + modelSnapshot.getJobId() + "] Failed to delete snapshot [" - + modelSnapshot.getSnapshotId() + "]", e); - deleteModelSnapshots(modelSnapshotIterator, onFinish); + listener.onFailure(new ElasticsearchException("[" + modelSnapshot.getJobId() + "] Failed to delete snapshot [" + + modelSnapshot.getSnapshotId() + "]", e)); } }); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index acd17eb4d52..738fc28c1c6 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.job.retention; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Client; @@ -55,7 +56,7 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover { } @Override - protected void removeDataBefore(Job job, long cutoffEpochMs, Runnable onFinish) { + protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener listener) { LOGGER.debug("Removing results of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs); DeleteByQueryRequest request = createDBQRequest(job, cutoffEpochMs); @@ -66,7 +67,7 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover { if (bulkByScrollResponse.getDeleted() > 0) { auditResultsWereDeleted(job.getId(), cutoffEpochMs); } - onFinish.run(); + listener.onResponse(true); } catch (Exception e) { onFailure(e); } @@ -74,8 +75,7 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover { @Override public void onFailure(Exception e) { - LOGGER.error("Failed to remove expired results for job [" + job.getId() + "]", e); - onFinish.run(); + listener.onFailure(new ElasticsearchException("Failed to remove expired results for job [" + job.getId() + "]", e)); } }); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java index 14decf04cf6..03238cdfc88 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.ml.job.retention; +import org.elasticsearch.action.ActionListener; + public interface MlDataRemover { - void remove(Runnable onFinish); + void remove(ActionListener listener); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java index 54e21079512..907929114a1 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.action.DeleteExpiredDataAction; import org.junit.After; import org.junit.Before; +import org.mockito.Mockito; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -46,7 +47,7 @@ public class MlDailyManagementServiceTests extends ESTestCase { latch.await(1, TimeUnit.SECONDS); } - verify(client, org.mockito.Mockito.atLeast(triggerCount - 1)).execute(same(DeleteExpiredDataAction.INSTANCE), any()); + verify(client, Mockito.atLeast(triggerCount - 1)).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any()); } private MlDailyMaintenanceService createService(CountDownLatch latch, Client client) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index c157d97813e..bef60071a61 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -53,7 +53,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { private List capturedSearchRequests; private List capturedDeleteModelSnapshotRequests; private List searchResponsesPerCall; - private Runnable onFinish; + private ActionListener listener; @Before public void setUpTests() { @@ -64,7 +64,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { clusterState = mock(ClusterState.class); when(clusterService.state()).thenReturn(clusterState); client = mock(Client.class); - onFinish = mock(Runnable.class); + listener = mock(ActionListener.class); } public void testRemove_GivenJobsWithoutRetentionPolicy() { @@ -74,9 +74,9 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { JobTests.buildJobBuilder("bar").build() )); - createExpiredModelSnapshotsRemover().remove(onFinish); + createExpiredModelSnapshotsRemover().remove(listener); - verify(onFinish).run(); + verify(listener).onResponse(true); Mockito.verifyNoMoreInteractions(client); } @@ -84,9 +84,9 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { givenClientRequestsSucceed(); givenJobs(Arrays.asList(JobTests.buildJobBuilder("foo").setModelSnapshotRetentionDays(7L).build())); - createExpiredModelSnapshotsRemover().remove(onFinish); + createExpiredModelSnapshotsRemover().remove(listener); - verify(onFinish).run(); + verify(listener).onResponse(true); Mockito.verifyNoMoreInteractions(client); } @@ -104,7 +104,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots)); searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots)); - createExpiredModelSnapshotsRemover().remove(onFinish); + createExpiredModelSnapshotsRemover().remove(listener); assertThat(capturedSearchRequests.size(), equalTo(2)); SearchRequest searchRequest = capturedSearchRequests.get(0); @@ -123,7 +123,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-2")); assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-2_1")); - verify(onFinish).run(); + verify(listener).onResponse(true); } public void testRemove_GivenClientSearchRequestsFail() throws IOException { @@ -140,17 +140,15 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots)); searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots)); - createExpiredModelSnapshotsRemover().remove(onFinish); + createExpiredModelSnapshotsRemover().remove(listener); - assertThat(capturedSearchRequests.size(), equalTo(2)); + assertThat(capturedSearchRequests.size(), equalTo(1)); SearchRequest searchRequest = capturedSearchRequests.get(0); assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")})); - searchRequest = capturedSearchRequests.get(1); - assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-2")})); assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(0)); - verify(onFinish).run(); + verify(listener).onFailure(any()); } public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOException { @@ -167,26 +165,18 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { searchResponsesPerCall.add(createSearchResponse(snapshots1JobSnapshots)); searchResponsesPerCall.add(createSearchResponse(snapshots2JobSnapshots)); - createExpiredModelSnapshotsRemover().remove(onFinish); + createExpiredModelSnapshotsRemover().remove(listener); - assertThat(capturedSearchRequests.size(), equalTo(2)); + assertThat(capturedSearchRequests.size(), equalTo(1)); SearchRequest searchRequest = capturedSearchRequests.get(0); assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")})); - searchRequest = capturedSearchRequests.get(1); - assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-2")})); - assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(3)); + assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(1)); DeleteModelSnapshotAction.Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0); assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1")); assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1")); - deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(1); - assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1")); - assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_2")); - deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(2); - assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-2")); - assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-2_1")); - verify(onFinish).run(); + verify(listener).onFailure(any()); } private void givenJobs(List jobs) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java index 01ce1de6a8a..8e8eb7fdea8 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java @@ -46,7 +46,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase { private ClusterService clusterService; private ClusterState clusterState; private List capturedDeleteByQueryRequests; - private Runnable onFinish; + private ActionListener listener; @Before public void setUpTests() { @@ -55,26 +55,16 @@ public class ExpiredResultsRemoverTests extends ESTestCase { clusterState = mock(ClusterState.class); when(clusterService.state()).thenReturn(clusterState); client = mock(Client.class); - doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - capturedDeleteByQueryRequests.add((DeleteByQueryRequest) invocationOnMock.getArguments()[1]); - ActionListener listener = - (ActionListener) invocationOnMock.getArguments()[2]; - listener.onResponse(null); - return null; - } - }).when(client).execute(same(DeleteByQueryAction.INSTANCE), any(), any()); - onFinish = mock(Runnable.class); + listener = mock(ActionListener.class); } public void testRemove_GivenNoJobs() { givenClientRequestsSucceed(); givenJobs(Collections.emptyList()); - createExpiredResultsRemover().remove(onFinish); + createExpiredResultsRemover().remove(listener); - verify(onFinish).run(); + verify(listener).onResponse(true); Mockito.verifyNoMoreInteractions(client); } @@ -85,13 +75,13 @@ public class ExpiredResultsRemoverTests extends ESTestCase { JobTests.buildJobBuilder("bar").build() )); - createExpiredResultsRemover().remove(onFinish); + createExpiredResultsRemover().remove(listener); - verify(onFinish).run(); + verify(listener).onResponse(true); Mockito.verifyNoMoreInteractions(client); } - public void testRemove_GivenJobsWithAndWithoutRetentionPolicy() throws IOException { + public void testRemove_GivenJobsWithAndWithoutRetentionPolicy() throws Exception { givenClientRequestsSucceed(); givenJobs(Arrays.asList( JobTests.buildJobBuilder("none").build(), @@ -99,17 +89,17 @@ public class ExpiredResultsRemoverTests extends ESTestCase { JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() )); - createExpiredResultsRemover().remove(onFinish); + createExpiredResultsRemover().remove(listener); assertThat(capturedDeleteByQueryRequests.size(), equalTo(2)); DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0); assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-1")})); dbqRequest = capturedDeleteByQueryRequests.get(1); assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-2")})); - verify(onFinish).run(); + verify(listener).onResponse(true); } - public void testRemove_GivenClientRequestsFailed_StillIteratesThroughJobs() throws IOException { + public void testRemove_GivenClientRequestsFailed() throws IOException { givenClientRequestsFailed(); givenJobs(Arrays.asList( JobTests.buildJobBuilder("none").build(), @@ -117,14 +107,12 @@ public class ExpiredResultsRemoverTests extends ESTestCase { JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build() )); - createExpiredResultsRemover().remove(onFinish); + createExpiredResultsRemover().remove(listener); - assertThat(capturedDeleteByQueryRequests.size(), equalTo(2)); + assertThat(capturedDeleteByQueryRequests.size(), equalTo(1)); DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0); assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-1")})); - dbqRequest = capturedDeleteByQueryRequests.get(1); - assertThat(dbqRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("results-2")})); - verify(onFinish).run(); + verify(listener).onFailure(any()); } private void givenClientRequestsSucceed() { @@ -143,7 +131,9 @@ public class ExpiredResultsRemoverTests extends ESTestCase { ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; if (shouldSucceed) { - listener.onResponse(null); + BulkByScrollResponse bulkByScrollResponse = mock(BulkByScrollResponse.class); + when(bulkByScrollResponse.getDeleted()).thenReturn(42L); + listener.onResponse(bulkByScrollResponse); } else { listener.onFailure(new RuntimeException("failed")); } From e71b5639de042976e4a4b2a24856a37751bfa4c6 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Tue, 21 Nov 2017 13:57:41 +0000 Subject: [PATCH 5/8] [ML] Rename id to forecast_id in forecast API response (elastic/x-pack-elasticsearch#3074) Original commit: elastic/x-pack-elasticsearch@c05d9fc6020ffd36e0a567f8486b1c34ab579453 --- .../xpack/ml/action/ForecastJobAction.java | 38 +++++++++++-------- .../autodetect/params/ForecastParams.java | 16 ++++---- .../writer/ControlMsgToProcessWriter.java | 2 +- 3 files changed, 33 insertions(+), 23 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ForecastJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ForecastJobAction.java index b9b99f30daf..2fd0a52ae03 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ForecastJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ForecastJobAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; +import org.elasticsearch.xpack.ml.job.results.Forecast; import java.io.IOException; import java.util.Objects; @@ -157,56 +158,64 @@ public class ForecastJobAction extends Action { if (e == null) { - listener.onResponse(new Response(true, params.getId())); + listener.onResponse(new Response(true, params.getForecastId())); } else { listener.onFailure(e); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParams.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParams.java index 3e50d618e43..7c2ea46b62d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParams.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParams.java @@ -18,10 +18,10 @@ public class ForecastParams { private final long endTime; private final long duration; - private final long id; + private final long forecastId; - private ForecastParams(long id, long endTime, long duration) { - this.id = id; + private ForecastParams(long forecastId, long endTime, long duration) { + this.forecastId = forecastId; this.endTime = endTime; this.duration = duration; } @@ -47,13 +47,13 @@ public class ForecastParams { * * @return The forecast Id */ - public long getId() { - return id; + public long getForecastId() { + return forecastId; } @Override public int hashCode() { - return Objects.hash(id, endTime, duration); + return Objects.hash(forecastId, endTime, duration); } @Override @@ -65,7 +65,9 @@ public class ForecastParams { return false; } ForecastParams other = (ForecastParams) obj; - return Objects.equals(id, other.id) && Objects.equals(endTime, other.endTime) && Objects.equals(duration, other.duration); + return Objects.equals(forecastId, other.forecastId) + && Objects.equals(endTime, other.endTime) + && Objects.equals(duration, other.duration); } public static Builder builder() { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java index 18c3303fd59..0b161b977b7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java @@ -153,7 +153,7 @@ public class ControlMsgToProcessWriter { public void writeForecastMessage(ForecastParams params) throws IOException { XContentBuilder builder = XContentFactory.jsonBuilder() .startObject() - .field("forecast_id", params.getId()); + .field("forecast_id", params.getForecastId()); if (params.getEndTime() != 0) { builder.field("end_time", params.getEndTime()); From 601222903d6cf63747f489c7bdb20aa91d4a1750 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 21 Nov 2017 15:15:24 +0100 Subject: [PATCH 6/8] X-Pack side of elastic/elasticsearch#27469 (elastic/x-pack-elasticsearch#3071) Original commit: elastic/x-pack-elasticsearch@99499b6bd6245da2cdd285c79b2fb8ee82f88f7b --- .../authz/accesscontrol/SecurityIndexSearcherWrapper.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/authz/accesscontrol/SecurityIndexSearcherWrapper.java b/plugin/src/main/java/org/elasticsearch/xpack/security/authz/accesscontrol/SecurityIndexSearcherWrapper.java index 3f47c5a360a..a03a29ab138 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/authz/accesscontrol/SecurityIndexSearcherWrapper.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/authz/accesscontrol/SecurityIndexSearcherWrapper.java @@ -139,7 +139,8 @@ public class SecurityIndexSearcherWrapper extends IndexSearcherWrapper { filter.add(roleQuery, SHOULD); if (queryShardContext.getMapperService().hasNested()) { // If access is allowed on root doc then also access is allowed on all nested docs of that root document: - BitSetProducer rootDocs = queryShardContext.bitsetFilter(Queries.newNonNestedFilter()); + BitSetProducer rootDocs = queryShardContext.bitsetFilter( + Queries.newNonNestedFilter(queryShardContext.indexVersionCreated())); ToChildBlockJoinQuery includeNestedDocs = new ToChildBlockJoinQuery(roleQuery, rootDocs); filter.add(includeNestedDocs, SHOULD); } From cc66020cf331f1ce0f24ecdb3c1ee261befc4b5b Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 21 Nov 2017 15:32:06 +0100 Subject: [PATCH 7/8] [ML-FC] add expires_in parameter and change forecast_start_timestamp to timestamp (elastic/x-pack-elasticsearch#3073) add expires_in parameter and change forecast_start_timestamp to timestamp depends on elastic/machine-learning-cpp#421 Original commit: elastic/x-pack-elasticsearch@3a3eebd49c1a3b23386bce2228c9d1c5181377fe --- .../xpack/ml/action/ForecastJobAction.java | 24 ++++++++++++-- .../persistence/ElasticsearchMappings.java | 6 ++-- .../autodetect/params/ForecastParams.java | 30 ++++++++++++++---- .../writer/ControlMsgToProcessWriter.java | 3 ++ .../ml/job/results/ForecastRequestStats.java | 31 ++++++++++++++----- .../ml/job/results/ReservedFieldNames.java | 3 +- .../ml/rest/job/RestForecastJobAction.java | 3 ++ .../results/ForecastRequestStatsTests.java | 5 ++- 8 files changed, 85 insertions(+), 20 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ForecastJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ForecastJobAction.java index 2fd0a52ae03..f738ff15829 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ForecastJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ForecastJobAction.java @@ -59,6 +59,7 @@ public class ForecastJobAction extends Action PARSER = new ObjectParser<>(NAME, Request::new); @@ -66,6 +67,7 @@ public class ForecastJobAction extends Action request.jobId = jobId, Job.ID); PARSER.declareString(Request::setEndTime, END_TIME); PARSER.declareString(Request::setDuration, DURATION); + PARSER.declareString(Request::setExpiresIn, EXPIRES_IN); } public static Request parseRequest(String jobId, XContentParser parser) { @@ -78,6 +80,7 @@ public class ForecastJobAction extends Action { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java index 70f22e355e1..30e01b5bff6 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ElasticsearchMappings.java @@ -340,11 +340,11 @@ public class ElasticsearchMappings { .endObject(); // Forecast Stats Output - // re-used: PROCESSING_TIME_MS, PROCESSED_RECORD_COUNT, LATEST_RECORD_TIME - builder.startObject(ForecastRequestStats.START_TIME.getPreferredName()) + // re-used: TIMESTAMP, PROCESSING_TIME_MS, PROCESSED_RECORD_COUNT, LATEST_RECORD_TIME + builder.startObject(ForecastRequestStats.END_TIME.getPreferredName()) .field(TYPE, DATE) .endObject() - .startObject(ForecastRequestStats.END_TIME.getPreferredName()) + .startObject(ForecastRequestStats.EXPIRY_TIME.getPreferredName()) .field(TYPE, DATE) .endObject() .startObject(ForecastRequestStats.MESSAGES.getPreferredName()) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParams.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParams.java index 7c2ea46b62d..745fc9e8d1e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParams.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParams.java @@ -18,12 +18,14 @@ public class ForecastParams { private final long endTime; private final long duration; + private final long expiresIn; private final long forecastId; - private ForecastParams(long forecastId, long endTime, long duration) { + private ForecastParams(long forecastId, long endTime, long duration, long expiresIn) { this.forecastId = forecastId; this.endTime = endTime; this.duration = duration; + this.expiresIn = expiresIn; } /** @@ -42,6 +44,14 @@ public class ForecastParams { return duration; } + /** + * The forecast expiration in seconds (duration added to start time) + * @return The expiration in seconds + */ + public long getExpiresIn() { + return expiresIn; + } + /** * The forecast id * @@ -53,7 +63,7 @@ public class ForecastParams { @Override public int hashCode() { - return Objects.hash(forecastId, endTime, duration); + return Objects.hash(forecastId, endTime, duration, expiresIn); } @Override @@ -65,9 +75,8 @@ public class ForecastParams { return false; } ForecastParams other = (ForecastParams) obj; - return Objects.equals(forecastId, other.forecastId) - && Objects.equals(endTime, other.endTime) - && Objects.equals(duration, other.duration); + return Objects.equals(forecastId, other.forecastId) && Objects.equals(endTime, other.endTime) && + Objects.equals(duration, other.duration) && Objects.equals(expiresIn, other.expiresIn); } public static Builder builder() { @@ -77,6 +86,7 @@ public class ForecastParams { public static class Builder { private long endTimeEpochSecs; private long durationSecs; + private long expiresInSecs; private long startTime; private long forecastId; @@ -85,6 +95,9 @@ public class ForecastParams { endTimeEpochSecs = 0; forecastId = generateId(); durationSecs = 0; + + // because 0 means never expire, the default is -1 + expiresInSecs = -1; } private long generateId() { @@ -109,12 +122,17 @@ public class ForecastParams { return this; } + public Builder expiresIn(TimeValue expiresIn) { + expiresInSecs = expiresIn.seconds(); + return this; + } + public ForecastParams build() { if (endTimeEpochSecs != 0 && durationSecs != 0) { throw new ElasticsearchParseException(Messages.getMessage(Messages.REST_INVALID_DURATION_AND_ENDTIME)); } - return new ForecastParams(forecastId, endTimeEpochSecs, durationSecs); + return new ForecastParams(forecastId, endTimeEpochSecs, durationSecs, expiresInSecs); } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java index 0b161b977b7..b365c03108c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java @@ -161,6 +161,9 @@ public class ControlMsgToProcessWriter { if (params.getDuration() != 0) { builder.field("duration", params.getDuration()); } + if (params.getExpiresIn() != -1) { + builder.field("expires_in", params.getExpiresIn()); + } builder.endObject(); writeMessage(FORECAST_MESSAGE_CODE + builder.string()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ForecastRequestStats.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ForecastRequestStats.java index 606c5fd81b7..1a1bdb71cd0 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ForecastRequestStats.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ForecastRequestStats.java @@ -36,8 +36,8 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { public static final ParseField RESULTS_FIELD = new ParseField(RESULT_TYPE_VALUE); public static final ParseField FORECAST_ID = new ParseField("forecast_id"); - public static final ParseField START_TIME = new ParseField("forecast_start_timestamp"); public static final ParseField END_TIME = new ParseField("forecast_end_timestamp"); + public static final ParseField EXPIRY_TIME = new ParseField("forecast_expiry_timestamp"); public static final ParseField MESSAGES = new ParseField("forecast_messages"); public static final ParseField PROCESSING_TIME_MS = new ParseField("processing_time_ms"); public static final ParseField PROGRESS = new ParseField("forecast_progress"); @@ -55,10 +55,12 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { PARSER.declareString((modelForecastRequestStats, s) -> {}, Result.RESULT_TYPE); PARSER.declareLong(ForecastRequestStats::setRecordCount, PROCESSED_RECORD_COUNT); PARSER.declareStringArray(ForecastRequestStats::setMessages, MESSAGES); - PARSER.declareField(ForecastRequestStats::setStartTimeStamp, - p -> Instant.ofEpochMilli(p.longValue()), START_TIME, ValueType.LONG); + PARSER.declareField(ForecastRequestStats::setTimeStamp, + p -> Instant.ofEpochMilli(p.longValue()), Result.TIMESTAMP, ValueType.LONG); PARSER.declareField(ForecastRequestStats::setEndTimeStamp, p -> Instant.ofEpochMilli(p.longValue()), END_TIME, ValueType.LONG); + PARSER.declareField(ForecastRequestStats::setExpiryTimeStamp, + p -> Instant.ofEpochMilli(p.longValue()), EXPIRY_TIME, ValueType.LONG); PARSER.declareDouble(ForecastRequestStats::setProgress, PROGRESS); PARSER.declareLong(ForecastRequestStats::setProcessingTime, PROCESSING_TIME_MS); PARSER.declareField(ForecastRequestStats::setStatus, p -> ForecastRequestStatus.fromString(p.text()), STATUS, ValueType.STRING); @@ -93,6 +95,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { private List messages; private Instant dateStarted = Instant.EPOCH; private Instant dateEnded = Instant.EPOCH; + private Instant dateExpires = Instant.EPOCH; private double progress; private long processingTime; private long memoryUsage; @@ -114,6 +117,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { } dateStarted = Instant.ofEpochMilli(in.readVLong()); dateEnded = Instant.ofEpochMilli(in.readVLong()); + dateExpires = Instant.ofEpochMilli(in.readVLong()); progress = in.readDouble(); processingTime = in.readLong(); setMemoryUsage(in.readLong()); @@ -133,6 +137,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { } out.writeVLong(dateStarted.toEpochMilli()); out.writeVLong(dateEnded.toEpochMilli()); + out.writeVLong(dateExpires.toEpochMilli()); out.writeDouble(progress); out.writeLong(processingTime); out.writeLong(getMemoryUsage()); @@ -150,11 +155,14 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { builder.field(MESSAGES.getPreferredName(), messages); } if (dateStarted.equals(Instant.EPOCH) == false) { - builder.field(START_TIME.getPreferredName(), dateStarted.toEpochMilli()); + builder.field(Result.TIMESTAMP.getPreferredName(), dateStarted.toEpochMilli()); } if (dateEnded.equals(Instant.EPOCH) == false) { builder.field(END_TIME.getPreferredName(), dateEnded.toEpochMilli()); } + if (dateExpires.equals(Instant.EPOCH) == false) { + builder.field(EXPIRY_TIME.getPreferredName(), dateExpires.toEpochMilli()); + } builder.field(PROGRESS.getPreferredName(), progress); builder.field(PROCESSING_TIME_MS.getPreferredName(), processingTime); builder.field(MEMORY_USAGE.getPreferredName(), getMemoryUsage()); @@ -197,7 +205,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { return dateStarted; } - public void setStartTimeStamp(Instant dateStarted) { + public void setTimeStamp(Instant dateStarted) { this.dateStarted = dateStarted; } @@ -209,6 +217,14 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { this.dateEnded = dateEnded; } + public void setExpiryTimeStamp(Instant dateExpires) { + this.dateExpires = dateExpires; + } + + public Instant getDateExpired() { + return dateExpires; + } + /** * Progress information of the ForecastRequest in the range 0 to 1, * while 1 means finished @@ -263,6 +279,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { Objects.equals(this.messages, that.messages) && Objects.equals(this.dateStarted, that.dateStarted) && Objects.equals(this.dateEnded, that.dateEnded) && + Objects.equals(this.dateExpires, that.dateExpires) && this.progress == that.progress && this.processingTime == that.processingTime && this.memoryUsage == that.memoryUsage && @@ -271,7 +288,7 @@ public class ForecastRequestStats implements ToXContentObject, Writeable { @Override public int hashCode() { - return Objects.hash(jobId, forecastId, recordCount, messages, dateStarted, dateEnded, progress, - processingTime, memoryUsage, status); + return Objects.hash(jobId, forecastId, recordCount, messages, dateStarted, dateEnded, dateExpires, + progress, processingTime, memoryUsage, status); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java index 64fb5c3230e..2a4fb7f30dc 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/results/ReservedFieldNames.java @@ -131,8 +131,9 @@ public final class ReservedFieldNames { Forecast.FORECAST_PREDICTION.getPreferredName(), Forecast.FORECAST_ID.getPreferredName(), - ForecastRequestStats.START_TIME.getPreferredName(), + //re-use: ForecastRequestStats.TIMESTAMP ForecastRequestStats.END_TIME.getPreferredName(), + ForecastRequestStats.EXPIRY_TIME.getPreferredName(), ForecastRequestStats.MESSAGES.getPreferredName(), ForecastRequestStats.PROGRESS.getPreferredName(), ForecastRequestStats.STATUS.getPreferredName(), diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestForecastJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestForecastJobAction.java index 0ba2a38bd89..07e21ce745d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestForecastJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestForecastJobAction.java @@ -46,6 +46,9 @@ public class RestForecastJobAction extends BaseRestHandler { if (restRequest.hasParam(ForecastJobAction.Request.DURATION.getPreferredName())) { request.setDuration(restRequest.param(ForecastJobAction.Request.DURATION.getPreferredName())); } + if (restRequest.hasParam(ForecastJobAction.Request.EXPIRES_IN.getPreferredName())) { + request.setExpiresIn(restRequest.param(ForecastJobAction.Request.EXPIRES_IN.getPreferredName())); + } } return channel -> client.execute(ForecastJobAction.INSTANCE, request, new RestToXContentListener<>(channel)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/ForecastRequestStatsTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/ForecastRequestStatsTests.java index 0af6a9459b9..f77bf62d109 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/ForecastRequestStatsTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/results/ForecastRequestStatsTests.java @@ -42,11 +42,14 @@ public class ForecastRequestStatsTests extends AbstractSerializingTestCase Date: Tue, 21 Nov 2017 15:11:28 +0000 Subject: [PATCH 8/8] [TEST] Fix side effects of elastic/x-pack-elasticsearch#2975 on build servers with very little RAM Some of our REST tests open many jobs, and assuming each will use 1GB of RAM on a single node cluster could fail the test. The solution is to explicitly say the test jobs will use very little RAM. Original commit: elastic/x-pack-elasticsearch@a3fcfc4589d2ab7cfee233d1c0420af6883f20e4 --- .../test/ml/get_datafeed_stats.yml | 6 +++++ .../rest-api-spec/test/ml/index_layout.yml | 18 +++++++++++++ .../rest-api-spec/test/ml/job_groups.yml | 15 +++++++++++ .../rest-api-spec/test/ml/jobs_crud.yml | 27 +++++++++++++++++++ .../rest-api-spec/test/ml/jobs_get_stats.yml | 9 +++++++ .../test/ml/start_stop_datafeed.yml | 15 +++++++++++ 6 files changed, 90 insertions(+) diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yml b/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yml index d7c20e5ec08..7f3250c7db6 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yml @@ -41,6 +41,9 @@ setup: "bucket_span": "1h", "detectors" :[{"function":"count"}] }, + "analysis_limits" : { + "model_memory_limit": "10mb" + }, "data_description" : { "format":"xcontent", "time_field":"time", @@ -60,6 +63,9 @@ setup: "bucket_span": "1h", "detectors" :[{"function":"count"}] }, + "analysis_limits" : { + "model_memory_limit": "10mb" + }, "data_description" : { "time_field":"time" } diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/index_layout.yml b/plugin/src/test/resources/rest-api-spec/test/ml/index_layout.yml index ed874efd4a5..c13ae86e06f 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/index_layout.yml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/index_layout.yml @@ -12,6 +12,9 @@ "bucket_span": "1h", "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, + "analysis_limits" : { + "model_memory_limit": "20mb" + }, "data_description" : { "format":"xcontent", "time_field":"time", @@ -32,6 +35,9 @@ "bucket_span": "1h", "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, + "analysis_limits" : { + "model_memory_limit": "20mb" + }, "data_description" : { "format":"xcontent", "time_field":"time", @@ -339,6 +345,9 @@ "bucket_span": "1h", "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, + "analysis_limits" : { + "model_memory_limit": "20mb" + }, "data_description" : { "format":"xcontent", "time_field":"time", @@ -486,6 +495,9 @@ "bucket_span": "1h", "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, + "analysis_limits" : { + "model_memory_limit": "20mb" + }, "data_description" : { "format":"xcontent" } @@ -532,6 +544,9 @@ "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"mlcategory"}], "categorization_field_name": "message" }, + "analysis_limits" : { + "model_memory_limit": "20mb" + }, "data_description" : { "format":"xcontent" } @@ -637,6 +652,9 @@ "bucket_span": "1h", "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, + "analysis_limits" : { + "model_memory_limit": "20mb" + }, "data_description" : { "time_field":"time", "time_format":"epoch" diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/job_groups.yml b/plugin/src/test/resources/rest-api-spec/test/ml/job_groups.yml index 217eee86e22..d1e2851e176 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/job_groups.yml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/job_groups.yml @@ -10,6 +10,9 @@ setup: "analysis_config" : { "detectors" :[{"function":"count"}] }, + "analysis_limits" : { + "model_memory_limit": "10mb" + }, "data_description" : {} } @@ -24,6 +27,9 @@ setup: "analysis_config" : { "detectors" :[{"function":"count"}] }, + "analysis_limits" : { + "model_memory_limit": "10mb" + }, "data_description" : {} } @@ -38,6 +44,9 @@ setup: "analysis_config" : { "detectors" :[{"function":"count"}] }, + "analysis_limits" : { + "model_memory_limit": "10mb" + }, "data_description" : {} } @@ -52,6 +61,9 @@ setup: "analysis_config" : { "detectors" :[{"function":"count"}] }, + "analysis_limits" : { + "model_memory_limit": "10mb" + }, "data_description" : {} } @@ -65,6 +77,9 @@ setup: "analysis_config" : { "detectors" :[{"function":"count"}] }, + "analysis_limits" : { + "model_memory_limit": "10mb" + }, "data_description" : {} } diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml index d142ac86f6e..6d36599a294 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml @@ -404,6 +404,9 @@ "bucket_span": "1h", "detectors" :[{"function":"count"}] }, + "analysis_limits" : { + "model_memory_limit": "20mb" + }, "data_description" : { "time_field":"time" } @@ -492,6 +495,9 @@ "analysis_config" : { "detectors" :[{"function":"count"}] }, + "analysis_limits" : { + "model_memory_limit": "20mb" + }, "data_description" : { "format":"xcontent" } @@ -523,6 +529,9 @@ "analysis_config" : { "detectors" :[{"function":"count"}] }, + "analysis_limits" : { + "model_memory_limit": "20mb" + }, "data_description" : { "format":"xcontent" } @@ -538,6 +547,9 @@ "analysis_config" : { "detectors" :[{"function":"count"}] }, + "analysis_limits" : { + "model_memory_limit": "20mb" + }, "data_description" : { "format":"xcontent" } @@ -569,6 +581,9 @@ "analysis_config" : { "detectors" :[{"function":"count"}] }, + "analysis_limits" : { + "model_memory_limit": "20mb" + }, "data_description" : {} } @@ -580,6 +595,9 @@ "analysis_config" : { "detectors" :[{"function":"count"}] }, + "analysis_limits" : { + "model_memory_limit": "20mb" + }, "data_description" : {} } @@ -591,6 +609,9 @@ "analysis_config" : { "detectors" :[{"function":"count"}] }, + "analysis_limits" : { + "model_memory_limit": "20mb" + }, "data_description" : {} } @@ -654,6 +675,9 @@ "bucket_span":"1h", "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, + "analysis_limits" : { + "model_memory_limit": "20mb" + }, "data_description" : { "format":"xcontent", "time_field":"time", @@ -713,6 +737,9 @@ "analysis_config" : { "detectors" :[{"function":"count"}] }, + "analysis_limits" : { + "model_memory_limit": "20mb" + }, "data_description" : { "format":"xcontent" } diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yml b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yml index 4cd5364a163..61bcf63e398 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yml @@ -12,6 +12,9 @@ setup: "bucket_span": "1h", "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, + "analysis_limits" : { + "model_memory_limit": "10mb" + }, "data_description" : { "format":"xcontent", "time_field":"time", @@ -38,6 +41,9 @@ setup: "bucket_span": "1h", "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, + "analysis_limits" : { + "model_memory_limit": "10mb" + }, "data_description" : { "format" : "xcontent", "time_field":"time", @@ -201,6 +207,9 @@ setup: "bucket_span": "1h", "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] }, + "analysis_limits" : { + "model_memory_limit": "10mb" + }, "data_description" : { } } diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yml b/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yml index 2802d8a1c5d..55938068cfc 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/start_stop_datafeed.yml @@ -29,6 +29,9 @@ setup: "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}], "influencers": ["airport"] }, + "analysis_limits" : { + "model_memory_limit": "30mb" + }, "data_description" : { "format":"xcontent", "time_field":"time", @@ -203,6 +206,9 @@ setup: "bucket_span": "1h", "detectors" :[{"function":"count","by_field_name":"airline2"}] }, + "analysis_limits" : { + "model_memory_limit": "30mb" + }, "data_description" : { "time_field":"time" } @@ -274,6 +280,9 @@ setup: "bucket_span": "1h", "detectors" :[{"function":"count"}] }, + "analysis_limits" : { + "model_memory_limit": "30mb" + }, "data_description" : { "time_field":"time" } @@ -288,6 +297,9 @@ setup: "bucket_span": "1h", "detectors" :[{"function":"count"}] }, + "analysis_limits" : { + "model_memory_limit": "30mb" + }, "data_description" : { "time_field":"time" } @@ -302,6 +314,9 @@ setup: "bucket_span": "1h", "detectors" :[{"function":"count"}] }, + "analysis_limits" : { + "model_memory_limit": "30mb" + }, "data_description" : { "time_field":"time" }