diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index a382f269bd1..6a7965a01b2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -66,6 +66,9 @@ public final class Messages { public static final String DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE_WITH_REASON = "Updated analytics task state to [{0}] with reason [{1}]"; public static final String DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE = "Estimated memory usage for this analytics to be [{0}]"; + public static final String DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE_HIGHER_THAN_CONFIGURED = + "Configured model memory limit [{0}] is lower than the expected memory usage [{1}]. " + + "The analytics job may fail due to configured memory constraints."; public static final String DATA_FRAME_ANALYTICS_AUDIT_CREATING_DEST_INDEX = "Creating destination index [{0}]"; public static final String DATA_FRAME_ANALYTICS_AUDIT_REUSING_DEST_INDEX = "Using existing destination index [{0}]"; public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_REINDEXING = "Started reindexing to destination index [{0}]"; diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java index 014cd06f646..e9f831d9277 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java @@ -19,6 +19,8 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -778,6 +780,22 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase { assertThat(getOnlyElement(getAnalytics(jobId)).getDescription(), is(equalTo("updated-description-2"))); } + public void testTooLowConfiguredMemoryStillStarts() throws Exception { + initialize("low_memory_analysis"); + indexData(sourceIndex, 10_000, 0, NESTED_FIELD); + + DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder( + buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(NESTED_FIELD))) + .setModelMemoryLimit(new ByteSizeValue(1, ByteSizeUnit.KB)) + .build(); + putAnalytics(config); + // Shouldn't throw + startAnalytics(jobId); + waitUntilAnalyticsIsFailed(jobId); + forceStopAnalytics(jobId); + waitUntilAnalyticsIsStopped(jobId); + } + private static T getOnlyElement(List list) { assertThat(list, hasSize(1)); return list.get(0); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java index 8d53b4cd6b0..8fdaead6fdf 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java @@ -157,6 +157,10 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest assertBusy(() -> assertIsStopped(id), waitTime.getMillis(), TimeUnit.MILLISECONDS); } + protected void waitUntilAnalyticsIsFailed(String id) throws Exception { + assertBusy(() -> assertIsFailed(id), TimeValue.timeValueSeconds(30).millis(), TimeUnit.MILLISECONDS); + } + protected List getAnalytics(String id) { GetDataFrameAnalyticsAction.Request request = new GetDataFrameAnalyticsAction.Request(id); return client().execute(GetDataFrameAnalyticsAction.INSTANCE, request).actionGet().getResources().results(); @@ -207,6 +211,11 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest assertThat("Stats were: " + Strings.toString(stats), stats.getState(), equalTo(DataFrameAnalyticsState.STOPPED)); } + protected void assertIsFailed(String id) { + GetDataFrameAnalyticsStatsAction.Response.Stats stats = getAnalyticsStats(id); + assertThat("Stats were: " + Strings.toString(stats), stats.getState(), equalTo(DataFrameAnalyticsState.FAILED)); + } + protected void assertProgressIsZero(String id) { List progress = getProgress(id); assertThat("progress is not all zero: " + progress, diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java index 3bd9730b3a5..64a36c85161 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.ml.integration; -import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; @@ -20,7 +19,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse; @@ -46,7 +44,6 @@ import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; -import static org.hamcrest.Matchers.startsWith; public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTestCase { @@ -520,17 +517,12 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest putAnalytics(config); assertIsStopped(id); - - ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> startAnalytics(id)); - assertThat(exception.status(), equalTo(RestStatus.BAD_REQUEST)); - assertThat( - exception.getMessage(), - startsWith("Cannot start because the configured model memory limit [" + modelMemoryLimit + - "] is lower than the expected memory usage")); - - assertThatAuditMessagesMatch(id, - "Created analytics with analysis type [outlier_detection]", - "Estimated memory usage for this analytics to be"); + //should not throw + startAnalytics(id); + waitUntilAnalyticsIsFailed(id); + // Might have been marked as failed + forceStopAnalytics(id); + waitUntilAnalyticsIsStopped(id); } public void testLazyAssignmentWithModelMemoryLimitTooHighForAssignment() throws Exception { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index 19ad386d09f..bf376c062f8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -214,14 +214,15 @@ public class TransportStartDataFrameAnalyticsAction auditor.info(jobId, Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE, expectedMemoryWithoutDisk)); // Validate that model memory limit is sufficient to run the analysis + // We will only warn the caller if the configured limit is too low. if (startContext.config.getModelMemoryLimit() .compareTo(expectedMemoryWithoutDisk) < 0) { - ElasticsearchStatusException e = - ExceptionsHelper.badRequestException( - "Cannot start because the configured model memory limit [{}] is lower than the expected memory usage [{}]", - startContext.config.getModelMemoryLimit(), expectedMemoryWithoutDisk); - listener.onFailure(e); - return; + String warning = Messages.getMessage( + Messages.DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE_HIGHER_THAN_CONFIGURED, + startContext.config.getModelMemoryLimit(), + expectedMemoryWithoutDisk); + auditor.warning(jobId, warning); + logger.warn("[{}] {}", jobId, warning); } // Refresh memory requirement for jobs memoryTracker.addDataFrameAnalyticsJobMemoryAndRefreshAllOthers(