Backports the following commits to 7.x: [ML] write warning if configured memory limit is too low for analytics job (#61505) Having `_start` fail when the configured memory limit is too low can be frustrating. We should instead warn the user that their job might not run properly if their configured limit is too low. It might be that our estimate is too high, and their configured limit works just fine.
This commit is contained in:
parent
9f566644af
commit
a6e7a3d65f
|
@ -66,6 +66,9 @@ public final class Messages {
|
|||
public static final String DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE_WITH_REASON =
|
||||
"Updated analytics task state to [{0}] with reason [{1}]";
|
||||
public static final String DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE = "Estimated memory usage for this analytics to be [{0}]";
|
||||
public static final String DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE_HIGHER_THAN_CONFIGURED =
|
||||
"Configured model memory limit [{0}] is lower than the expected memory usage [{1}]. " +
|
||||
"The analytics job may fail due to configured memory constraints.";
|
||||
public static final String DATA_FRAME_ANALYTICS_AUDIT_CREATING_DEST_INDEX = "Creating destination index [{0}]";
|
||||
public static final String DATA_FRAME_ANALYTICS_AUDIT_REUSING_DEST_INDEX = "Using existing destination index [{0}]";
|
||||
public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_REINDEXING = "Started reindexing to destination index [{0}]";
|
||||
|
|
|
@ -19,6 +19,8 @@ import org.elasticsearch.action.index.IndexRequest;
|
|||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
|
@ -778,6 +780,22 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
|
|||
assertThat(getOnlyElement(getAnalytics(jobId)).getDescription(), is(equalTo("updated-description-2")));
|
||||
}
|
||||
|
||||
public void testTooLowConfiguredMemoryStillStarts() throws Exception {
|
||||
initialize("low_memory_analysis");
|
||||
indexData(sourceIndex, 10_000, 0, NESTED_FIELD);
|
||||
|
||||
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder(
|
||||
buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(NESTED_FIELD)))
|
||||
.setModelMemoryLimit(new ByteSizeValue(1, ByteSizeUnit.KB))
|
||||
.build();
|
||||
putAnalytics(config);
|
||||
// Shouldn't throw
|
||||
startAnalytics(jobId);
|
||||
waitUntilAnalyticsIsFailed(jobId);
|
||||
forceStopAnalytics(jobId);
|
||||
waitUntilAnalyticsIsStopped(jobId);
|
||||
}
|
||||
|
||||
private static <T> T getOnlyElement(List<T> list) {
|
||||
assertThat(list, hasSize(1));
|
||||
return list.get(0);
|
||||
|
|
|
@ -157,6 +157,10 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
|
|||
assertBusy(() -> assertIsStopped(id), waitTime.getMillis(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
protected void waitUntilAnalyticsIsFailed(String id) throws Exception {
|
||||
assertBusy(() -> assertIsFailed(id), TimeValue.timeValueSeconds(30).millis(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
protected List<DataFrameAnalyticsConfig> getAnalytics(String id) {
|
||||
GetDataFrameAnalyticsAction.Request request = new GetDataFrameAnalyticsAction.Request(id);
|
||||
return client().execute(GetDataFrameAnalyticsAction.INSTANCE, request).actionGet().getResources().results();
|
||||
|
@ -207,6 +211,11 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
|
|||
assertThat("Stats were: " + Strings.toString(stats), stats.getState(), equalTo(DataFrameAnalyticsState.STOPPED));
|
||||
}
|
||||
|
||||
protected void assertIsFailed(String id) {
|
||||
GetDataFrameAnalyticsStatsAction.Response.Stats stats = getAnalyticsStats(id);
|
||||
assertThat("Stats were: " + Strings.toString(stats), stats.getState(), equalTo(DataFrameAnalyticsState.FAILED));
|
||||
}
|
||||
|
||||
protected void assertProgressIsZero(String id) {
|
||||
List<PhaseProgress> progress = getProgress(id);
|
||||
assertThat("progress is not all zero: " + progress,
|
||||
|
|
|
@ -5,7 +5,6 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ml.integration;
|
||||
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
|
||||
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||
|
@ -20,7 +19,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
|||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
|
||||
import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse;
|
||||
|
@ -46,7 +44,6 @@ import static org.hamcrest.Matchers.lessThan;
|
|||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
||||
public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTestCase {
|
||||
|
||||
|
@ -520,17 +517,12 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
|
|||
|
||||
putAnalytics(config);
|
||||
assertIsStopped(id);
|
||||
|
||||
ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> startAnalytics(id));
|
||||
assertThat(exception.status(), equalTo(RestStatus.BAD_REQUEST));
|
||||
assertThat(
|
||||
exception.getMessage(),
|
||||
startsWith("Cannot start because the configured model memory limit [" + modelMemoryLimit +
|
||||
"] is lower than the expected memory usage"));
|
||||
|
||||
assertThatAuditMessagesMatch(id,
|
||||
"Created analytics with analysis type [outlier_detection]",
|
||||
"Estimated memory usage for this analytics to be");
|
||||
//should not throw
|
||||
startAnalytics(id);
|
||||
waitUntilAnalyticsIsFailed(id);
|
||||
// Might have been marked as failed
|
||||
forceStopAnalytics(id);
|
||||
waitUntilAnalyticsIsStopped(id);
|
||||
}
|
||||
|
||||
public void testLazyAssignmentWithModelMemoryLimitTooHighForAssignment() throws Exception {
|
||||
|
|
|
@ -214,14 +214,15 @@ public class TransportStartDataFrameAnalyticsAction
|
|||
auditor.info(jobId,
|
||||
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE, expectedMemoryWithoutDisk));
|
||||
// Validate that model memory limit is sufficient to run the analysis
|
||||
// We will only warn the caller if the configured limit is too low.
|
||||
if (startContext.config.getModelMemoryLimit()
|
||||
.compareTo(expectedMemoryWithoutDisk) < 0) {
|
||||
ElasticsearchStatusException e =
|
||||
ExceptionsHelper.badRequestException(
|
||||
"Cannot start because the configured model memory limit [{}] is lower than the expected memory usage [{}]",
|
||||
startContext.config.getModelMemoryLimit(), expectedMemoryWithoutDisk);
|
||||
listener.onFailure(e);
|
||||
return;
|
||||
String warning = Messages.getMessage(
|
||||
Messages.DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE_HIGHER_THAN_CONFIGURED,
|
||||
startContext.config.getModelMemoryLimit(),
|
||||
expectedMemoryWithoutDisk);
|
||||
auditor.warning(jobId, warning);
|
||||
logger.warn("[{}] {}", jobId, warning);
|
||||
}
|
||||
// Refresh memory requirement for jobs
|
||||
memoryTracker.addDataFrameAnalyticsJobMemoryAndRefreshAllOthers(
|
||||
|
|
Loading…
Reference in New Issue