From a6e7a3d65ff4d6e212161089f16c3f61d1f3ffcc Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Wed, 26 Aug 2020 10:35:38 -0400 Subject: [PATCH] [7.x] [ML] write warning if configured memory limit is too low for analytics job (#61505) (#61528) Backports the following commits to 7.x: [ML] write warning if configured memory limit is too low for analytics job (#61505) Having `_start` fail when the configured memory limit is too low can be frustrating. We should instead warn the user that their job might not run properly if their configured limit is too low. It might be that our estimate is too high, and their configured limit works just fine. --- .../xpack/core/ml/job/messages/Messages.java | 3 +++ .../ml/integration/ClassificationIT.java | 18 +++++++++++++++++ ...NativeDataFrameAnalyticsIntegTestCase.java | 9 +++++++++ .../integration/RunDataFrameAnalyticsIT.java | 20 ++++++------------- ...ransportStartDataFrameAnalyticsAction.java | 13 ++++++------ 5 files changed, 43 insertions(+), 20 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index a382f269bd1..6a7965a01b2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -66,6 +66,9 @@ public final class Messages { public static final String DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE_WITH_REASON = "Updated analytics task state to [{0}] with reason [{1}]"; public static final String DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE = "Estimated memory usage for this analytics to be [{0}]"; + public static final String DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE_HIGHER_THAN_CONFIGURED = + "Configured model memory limit [{0}] is lower than the expected memory usage [{1}]. " + + "The analytics job may fail due to configured memory constraints."; public static final String DATA_FRAME_ANALYTICS_AUDIT_CREATING_DEST_INDEX = "Creating destination index [{0}]"; public static final String DATA_FRAME_ANALYTICS_AUDIT_REUSING_DEST_INDEX = "Using existing destination index [{0}]"; public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_REINDEXING = "Started reindexing to destination index [{0}]"; diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java index 014cd06f646..e9f831d9277 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java @@ -19,6 +19,8 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -778,6 +780,22 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase { assertThat(getOnlyElement(getAnalytics(jobId)).getDescription(), is(equalTo("updated-description-2"))); } + public void testTooLowConfiguredMemoryStillStarts() throws Exception { + initialize("low_memory_analysis"); + indexData(sourceIndex, 10_000, 0, NESTED_FIELD); + + DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder( + buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(NESTED_FIELD))) + .setModelMemoryLimit(new ByteSizeValue(1, ByteSizeUnit.KB)) + .build(); + putAnalytics(config); + // Shouldn't throw + startAnalytics(jobId); + waitUntilAnalyticsIsFailed(jobId); + forceStopAnalytics(jobId); + waitUntilAnalyticsIsStopped(jobId); + } + private static T getOnlyElement(List list) { assertThat(list, hasSize(1)); return list.get(0); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java index 8d53b4cd6b0..8fdaead6fdf 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java @@ -157,6 +157,10 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest assertBusy(() -> assertIsStopped(id), waitTime.getMillis(), TimeUnit.MILLISECONDS); } + protected void waitUntilAnalyticsIsFailed(String id) throws Exception { + assertBusy(() -> assertIsFailed(id), TimeValue.timeValueSeconds(30).millis(), TimeUnit.MILLISECONDS); + } + protected List getAnalytics(String id) { GetDataFrameAnalyticsAction.Request request = new GetDataFrameAnalyticsAction.Request(id); return client().execute(GetDataFrameAnalyticsAction.INSTANCE, request).actionGet().getResources().results(); @@ -207,6 +211,11 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest assertThat("Stats were: " + Strings.toString(stats), stats.getState(), equalTo(DataFrameAnalyticsState.STOPPED)); } + protected void assertIsFailed(String id) { + GetDataFrameAnalyticsStatsAction.Response.Stats stats = getAnalyticsStats(id); + assertThat("Stats were: " + Strings.toString(stats), stats.getState(), equalTo(DataFrameAnalyticsState.FAILED)); + } + protected void assertProgressIsZero(String id) { List progress = getProgress(id); assertThat("progress is not all zero: " + progress, diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java index 3bd9730b3a5..64a36c85161 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.ml.integration; -import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; @@ -20,7 +19,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse; @@ -46,7 +44,6 @@ import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; -import static org.hamcrest.Matchers.startsWith; public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTestCase { @@ -520,17 +517,12 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest putAnalytics(config); assertIsStopped(id); - - ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> startAnalytics(id)); - assertThat(exception.status(), equalTo(RestStatus.BAD_REQUEST)); - assertThat( - exception.getMessage(), - startsWith("Cannot start because the configured model memory limit [" + modelMemoryLimit + - "] is lower than the expected memory usage")); - - assertThatAuditMessagesMatch(id, - "Created analytics with analysis type [outlier_detection]", - "Estimated memory usage for this analytics to be"); + //should not throw + startAnalytics(id); + waitUntilAnalyticsIsFailed(id); + // Might have been marked as failed + forceStopAnalytics(id); + waitUntilAnalyticsIsStopped(id); } public void testLazyAssignmentWithModelMemoryLimitTooHighForAssignment() throws Exception { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index 19ad386d09f..bf376c062f8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -214,14 +214,15 @@ public class TransportStartDataFrameAnalyticsAction auditor.info(jobId, Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE, expectedMemoryWithoutDisk)); // Validate that model memory limit is sufficient to run the analysis + // We will only warn the caller if the configured limit is too low. if (startContext.config.getModelMemoryLimit() .compareTo(expectedMemoryWithoutDisk) < 0) { - ElasticsearchStatusException e = - ExceptionsHelper.badRequestException( - "Cannot start because the configured model memory limit [{}] is lower than the expected memory usage [{}]", - startContext.config.getModelMemoryLimit(), expectedMemoryWithoutDisk); - listener.onFailure(e); - return; + String warning = Messages.getMessage( + Messages.DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE_HIGHER_THAN_CONFIGURED, + startContext.config.getModelMemoryLimit(), + expectedMemoryWithoutDisk); + auditor.warning(jobId, warning); + logger.warn("[{}] {}", jobId, warning); } // Refresh memory requirement for jobs memoryTracker.addDataFrameAnalyticsJobMemoryAndRefreshAllOthers(