diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java index e5a98b46324..651851e345d 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java @@ -676,6 +676,9 @@ final class MLRequestConverters { params.putParam( StopDataFrameAnalyticsRequest.ALLOW_NO_MATCH.getPreferredName(), Boolean.toString(stopRequest.getAllowNoMatch())); } + if (stopRequest.getForce() != null) { + params.putParam(StopDataFrameAnalyticsRequest.FORCE.getPreferredName(), Boolean.toString(stopRequest.getForce())); + } request.addParameters(params.asMap()); return request; } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StopDataFrameAnalyticsRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StopDataFrameAnalyticsRequest.java index 9608d40fc7d..4ba6af852f6 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StopDataFrameAnalyticsRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/StopDataFrameAnalyticsRequest.java @@ -31,10 +31,12 @@ import java.util.Optional; public class StopDataFrameAnalyticsRequest implements Validatable { public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match"); + public static final ParseField FORCE = new ParseField("force"); private final String id; - private TimeValue timeout; private Boolean allowNoMatch; + private Boolean force; + private TimeValue timeout; public StopDataFrameAnalyticsRequest(String id) { this.id = id; @@ -62,6 +64,15 @@ public class StopDataFrameAnalyticsRequest implements Validatable { return this; } + public Boolean getForce() { + return force; + } + + public StopDataFrameAnalyticsRequest setForce(boolean force) { + this.force = force; + return this; + } + @Override public Optional validate() { if (id == null) { @@ -78,11 +89,12 @@ public class StopDataFrameAnalyticsRequest implements Validatable { StopDataFrameAnalyticsRequest other = (StopDataFrameAnalyticsRequest) o; return Objects.equals(id, other.id) && Objects.equals(timeout, other.timeout) - && Objects.equals(allowNoMatch, other.allowNoMatch); + && Objects.equals(allowNoMatch, other.allowNoMatch) + && Objects.equals(force, other.force); } @Override public int hashCode() { - return Objects.hash(id, timeout, allowNoMatch); + return Objects.hash(id, timeout, allowNoMatch, force); } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStats.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStats.java index 5c652f33edb..4e04204e650 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStats.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStats.java @@ -41,6 +41,7 @@ public class DataFrameAnalyticsStats { static final ParseField ID = new ParseField("id"); static final ParseField STATE = new ParseField("state"); + static final ParseField FAILURE_REASON = new ParseField("failure_reason"); static final ParseField PROGRESS_PERCENT = new ParseField("progress_percent"); static final ParseField NODE = new ParseField("node"); static final ParseField ASSIGNMENT_EXPLANATION = new ParseField("assignment_explanation"); @@ -50,9 +51,10 @@ public class DataFrameAnalyticsStats { args -> new DataFrameAnalyticsStats( (String) args[0], (DataFrameAnalyticsState) args[1], - (Integer) args[2], - (NodeAttributes) args[3], - (String) args[4])); + (String) args[2], + (Integer) args[3], + (NodeAttributes) args[4], + (String) args[5])); static { PARSER.declareString(constructorArg(), ID); @@ -62,6 +64,7 @@ public class DataFrameAnalyticsStats { } throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); }, STATE, ObjectParser.ValueType.STRING); + PARSER.declareString(optionalConstructorArg(), FAILURE_REASON); PARSER.declareInt(optionalConstructorArg(), PROGRESS_PERCENT); PARSER.declareObject(optionalConstructorArg(), NodeAttributes.PARSER, NODE); PARSER.declareString(optionalConstructorArg(), ASSIGNMENT_EXPLANATION); @@ -69,14 +72,17 @@ public class DataFrameAnalyticsStats { private final String id; private final DataFrameAnalyticsState state; + private final String failureReason; private final Integer progressPercent; private final NodeAttributes node; private final String assignmentExplanation; - public DataFrameAnalyticsStats(String id, DataFrameAnalyticsState state, @Nullable Integer progressPercent, - @Nullable NodeAttributes node, @Nullable String assignmentExplanation) { + public DataFrameAnalyticsStats(String id, DataFrameAnalyticsState state, @Nullable String failureReason, + @Nullable Integer progressPercent, @Nullable NodeAttributes node, + @Nullable String assignmentExplanation) { this.id = id; this.state = state; + this.failureReason = failureReason; this.progressPercent = progressPercent; this.node = node; this.assignmentExplanation = assignmentExplanation; @@ -90,6 +96,10 @@ public class DataFrameAnalyticsStats { return state; } + public String getFailureReason() { + return failureReason; + } + public Integer getProgressPercent() { return progressPercent; } @@ -110,6 +120,7 @@ public class DataFrameAnalyticsStats { DataFrameAnalyticsStats other = (DataFrameAnalyticsStats) o; return Objects.equals(id, other.id) && Objects.equals(state, other.state) + && Objects.equals(failureReason, other.failureReason) && Objects.equals(progressPercent, other.progressPercent) && Objects.equals(node, other.node) && Objects.equals(assignmentExplanation, other.assignmentExplanation); @@ -117,7 +128,7 @@ public class DataFrameAnalyticsStats { @Override public int hashCode() { - return Objects.hash(id, state, progressPercent, node, assignmentExplanation); + return Objects.hash(id, state, failureReason, progressPercent, node, assignmentExplanation); } @Override @@ -125,6 +136,7 @@ public class DataFrameAnalyticsStats { return new ToStringBuilder(getClass()) .add("id", id) .add("state", state) + .add("failureReason", failureReason) .add("progressPercent", progressPercent) .add("node", node) .add("assignmentExplanation", assignmentExplanation) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java index 36d71df5f91..01258d12960 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java @@ -758,11 +758,15 @@ public class MLRequestConvertersTests extends ESTestCase { public void testStopDataFrameAnalytics_WithParams() { StopDataFrameAnalyticsRequest stopRequest = new StopDataFrameAnalyticsRequest(randomAlphaOfLength(10)) .setTimeout(TimeValue.timeValueMinutes(1)) - .setAllowNoMatch(false); + .setAllowNoMatch(false) + .setForce(true); Request request = MLRequestConverters.stopDataFrameAnalytics(stopRequest); assertEquals(HttpPost.METHOD_NAME, request.getMethod()); assertEquals("/_ml/data_frame/analytics/" + stopRequest.getId() + "/_stop", request.getEndpoint()); - assertThat(request.getParameters(), allOf(hasEntry("timeout", "1m"), hasEntry("allow_no_match", "false"))); + assertThat(request.getParameters(), allOf( + hasEntry("timeout", "1m"), + hasEntry("allow_no_match", "false"), + hasEntry("force", "true"))); assertNull(request.getEntity()); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java index 77efe43b2e1..e44883823c2 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java @@ -1359,6 +1359,7 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase { DataFrameAnalyticsStats stats = statsResponse.getAnalyticsStats().get(0); assertThat(stats.getId(), equalTo(configId)); assertThat(stats.getState(), equalTo(DataFrameAnalyticsState.STOPPED)); + assertNull(stats.getFailureReason()); assertNull(stats.getProgressPercent()); assertNull(stats.getNode()); assertNull(stats.getAssignmentExplanation()); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java index 5c9017b7706..033861563dc 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java @@ -3110,6 +3110,7 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase { { // tag::stop-data-frame-analytics-request StopDataFrameAnalyticsRequest request = new StopDataFrameAnalyticsRequest("my-analytics-config"); // <1> + request.setForce(false); // <2> // end::stop-data-frame-analytics-request // tag::stop-data-frame-analytics-execute diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStatsTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStatsTests.java index ed6e24f754d..fad02eac161 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStatsTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStatsTests.java @@ -43,6 +43,7 @@ public class DataFrameAnalyticsStatsTests extends ESTestCase { return new DataFrameAnalyticsStats( randomAlphaOfLengthBetween(1, 10), randomFrom(DataFrameAnalyticsState.values()), + randomBoolean() ? null : randomAlphaOfLength(10), randomBoolean() ? null : randomIntBetween(0, 100), randomBoolean() ? null : NodeAttributesTests.createRandom(), randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20)); @@ -52,6 +53,9 @@ public class DataFrameAnalyticsStatsTests extends ESTestCase { builder.startObject(); builder.field(DataFrameAnalyticsStats.ID.getPreferredName(), stats.getId()); builder.field(DataFrameAnalyticsStats.STATE.getPreferredName(), stats.getState().value()); + if (stats.getFailureReason() != null) { + builder.field(DataFrameAnalyticsStats.FAILURE_REASON.getPreferredName(), stats.getFailureReason()); + } if (stats.getProgressPercent() != null) { builder.field(DataFrameAnalyticsStats.PROGRESS_PERCENT.getPreferredName(), stats.getProgressPercent()); } diff --git a/docs/java-rest/high-level/ml/stop-data-frame-analytics.asciidoc b/docs/java-rest/high-level/ml/stop-data-frame-analytics.asciidoc index 243c075e18b..3a06f268836 100644 --- a/docs/java-rest/high-level/ml/stop-data-frame-analytics.asciidoc +++ b/docs/java-rest/high-level/ml/stop-data-frame-analytics.asciidoc @@ -19,6 +19,7 @@ A +{request}+ object requires a {dataframe-analytics-config} id. include-tagged::{doc-tests-file}[{api}-request] --------------------------------------------------- <1> Constructing a new stop request referencing an existing {dataframe-analytics-config} +<2> Optionally used to stop a failed task include::../execution.asciidoc[] diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java index e92abb2619f..a0e70463c52 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsAction.java @@ -6,9 +6,9 @@ package org.elasticsearch.xpack.core.ml.action; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.ActionType; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.tasks.BaseTasksRequest; import org.elasticsearch.action.support.tasks.BaseTasksResponse; @@ -158,16 +158,19 @@ public class GetDataFrameAnalyticsStatsAction extends ActionType PARSER = new ConstructingObjectParser<>( - MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, true, a -> new TaskParams((String) a[0])); + MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, true, a -> new TaskParams((String) a[0], (String) a[1])); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameAnalyticsConfig.ID); + PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameAnalyticsConfig.VERSION); + } public static TaskParams fromXContent(XContentParser parser) { return PARSER.apply(parser, null); } - private String id; + private final String id; + private final Version version; - public TaskParams(String id) { + public TaskParams(String id, Version version) { this.id = Objects.requireNonNull(id); + this.version = Objects.requireNonNull(version); + } + + private TaskParams(String id, String version) { + this(id, Version.fromString(version)); } public TaskParams(StreamInput in) throws IOException { this.id = in.readString(); + this.version = Version.readVersion(in); } public String getId() { @@ -190,15 +202,31 @@ public class StartDataFrameAnalyticsAction extends ActionType implements ToXContentObject { - public static final ParseField TIMEOUT = new ParseField("timeout"); public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match"); + public static final ParseField FORCE = new ParseField("force"); + public static final ParseField TIMEOUT = new ParseField("timeout"); private static final ObjectParser PARSER = new ObjectParser<>(NAME, Request::new); static { PARSER.declareString((request, id) -> request.id = id, DataFrameAnalyticsConfig.ID); PARSER.declareString((request, val) -> request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT); + PARSER.declareBoolean(Request::setAllowNoMatch, ALLOW_NO_MATCH); + PARSER.declareBoolean(Request::setForce, FORCE); } public static Request parseRequest(String id, XContentParser parser) { @@ -71,8 +74,9 @@ public class StopDataFrameAnalyticsAction extends ActionType expandedIds = Collections.emptySet(); private boolean allowNoMatch = true; + private boolean force; + private Set expandedIds = Collections.emptySet(); public Request(String id) { setId(id); @@ -81,8 +85,9 @@ public class StopDataFrameAnalyticsAction extends ActionType(Arrays.asList(in.readStringArray())); allowNoMatch = in.readBoolean(); + force = in.readBoolean(); + expandedIds = new HashSet<>(Arrays.asList(in.readStringArray())); } public Request() {} @@ -95,6 +100,22 @@ public class StopDataFrameAnalyticsAction extends ActionType getExpandedIds() { return expandedIds; @@ -104,14 +125,6 @@ public class StopDataFrameAnalyticsAction extends ActionType this == candidate); + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsTaskState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsTaskState.java index 994faaaee6c..188f3a2bdf2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsTaskState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsTaskState.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.ml.dataframe; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -25,13 +26,15 @@ public class DataFrameAnalyticsTaskState implements PersistentTaskState { private static ParseField STATE = new ParseField("state"); private static ParseField ALLOCATION_ID = new ParseField("allocation_id"); + private static ParseField REASON = new ParseField("reason"); private final DataFrameAnalyticsState state; private final long allocationId; + private final String reason; private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, true, - a -> new DataFrameAnalyticsTaskState((DataFrameAnalyticsState) a[0], (long) a[1])); + a -> new DataFrameAnalyticsTaskState((DataFrameAnalyticsState) a[0], (long) a[1], (String) a[2])); static { PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> { @@ -41,6 +44,7 @@ public class DataFrameAnalyticsTaskState implements PersistentTaskState { throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); }, STATE, ObjectParser.ValueType.STRING); PARSER.declareLong(ConstructingObjectParser.constructorArg(), ALLOCATION_ID); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REASON); } public static DataFrameAnalyticsTaskState fromXContent(XContentParser parser) { @@ -51,20 +55,27 @@ public class DataFrameAnalyticsTaskState implements PersistentTaskState { } } - public DataFrameAnalyticsTaskState(DataFrameAnalyticsState state, long allocationId) { + public DataFrameAnalyticsTaskState(DataFrameAnalyticsState state, long allocationId, @Nullable String reason) { this.state = Objects.requireNonNull(state); this.allocationId = allocationId; + this.reason = reason; } public DataFrameAnalyticsTaskState(StreamInput in) throws IOException { this.state = DataFrameAnalyticsState.fromStream(in); this.allocationId = in.readLong(); + this.reason = in.readOptionalString(); } public DataFrameAnalyticsState getState() { return state; } + @Nullable + public String getReason() { + return reason; + } + public boolean isStatusStale(PersistentTasksCustomMetaData.PersistentTask task) { return allocationId != task.getAllocationId(); } @@ -78,6 +89,7 @@ public class DataFrameAnalyticsTaskState implements PersistentTaskState { public void writeTo(StreamOutput out) throws IOException { state.writeTo(out); out.writeLong(allocationId); + out.writeOptionalString(reason); } @Override @@ -85,6 +97,9 @@ public class DataFrameAnalyticsTaskState implements PersistentTaskState { builder.startObject(); builder.field(STATE.getPreferredName(), state.toString()); builder.field(ALLOCATION_ID.getPreferredName(), allocationId); + if (reason != null) { + builder.field(REASON.getPreferredName(), reason); + } builder.endObject(); return builder; } @@ -95,11 +110,12 @@ public class DataFrameAnalyticsTaskState implements PersistentTaskState { if (o == null || getClass() != o.getClass()) return false; DataFrameAnalyticsTaskState that = (DataFrameAnalyticsTaskState) o; return allocationId == that.allocationId && - state == that.state; + state == that.state && + Objects.equals(reason, that.reason); } @Override public int hashCode() { - return Objects.hash(state, allocationId); + return Objects.hash(state, allocationId, reason); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsActionResponseTests.java index e01618520f5..5a88f2ea52e 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsActionResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDataFrameAnalyticsStatsActionResponseTests.java @@ -23,8 +23,9 @@ public class GetDataFrameAnalyticsStatsActionResponseTests extends AbstractWireS List analytics = new ArrayList<>(listSize); for (int j = 0; j < listSize; j++) { Integer progressPercentage = randomBoolean() ? null : randomIntBetween(0, 100); + String failureReason = randomBoolean() ? null : randomAlphaOfLength(10); Response.Stats stats = new Response.Stats(DataFrameAnalyticsConfigTests.randomValidId(), - randomFrom(DataFrameAnalyticsState.values()), progressPercentage, null, randomAlphaOfLength(20)); + randomFrom(DataFrameAnalyticsState.values()), failureReason, progressPercentage, null, randomAlphaOfLength(20)); analytics.add(stats); } return new Response(new QueryPage<>(analytics, analytics.size(), GetDataFrameAnalyticsAction.Response.RESULTS_FIELD)); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsActionTaskParamsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsActionTaskParamsTests.java new file mode 100644 index 00000000000..4af9a2a374c --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsActionTaskParamsTests.java @@ -0,0 +1,37 @@ + +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; + +public class StartDataFrameAnalyticsActionTaskParamsTests extends AbstractSerializingTestCase { + + @Override + protected StartDataFrameAnalyticsAction.TaskParams doParseInstance(XContentParser parser) throws IOException { + return StartDataFrameAnalyticsAction.TaskParams.fromXContent(parser); + } + + @Override + protected StartDataFrameAnalyticsAction.TaskParams createTestInstance() { + return new StartDataFrameAnalyticsAction.TaskParams(randomAlphaOfLength(10), Version.CURRENT); + } + + @Override + protected Writeable.Reader instanceReader() { + return StartDataFrameAnalyticsAction.TaskParams::new; + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/StopDataFrameAnalyticsRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/StopDataFrameAnalyticsRequestTests.java index 9c61164c5f0..f2942905fad 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/StopDataFrameAnalyticsRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/StopDataFrameAnalyticsRequestTests.java @@ -24,6 +24,9 @@ public class StopDataFrameAnalyticsRequestTests extends AbstractWireSerializingT if (randomBoolean()) { request.setAllowNoMatch(randomBoolean()); } + if (randomBoolean()) { + request.setForce(randomBoolean()); + } int expandedIdsCount = randomIntBetween(0, 10); Set expandedIds = new HashSet<>(); for (int i = 0; i < expandedIdsCount; i++) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsStateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsStateTests.java new file mode 100644 index 00000000000..1b0800c91e3 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsStateTests.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.dataframe; + +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class DataFrameAnalyticsStateTests extends ESTestCase { + + public void testFromString() { + assertThat(DataFrameAnalyticsState.fromString("started"), equalTo(DataFrameAnalyticsState.STARTED)); + assertThat(DataFrameAnalyticsState.fromString("reindexing"), equalTo(DataFrameAnalyticsState.REINDEXING)); + assertThat(DataFrameAnalyticsState.fromString("analyzing"), equalTo(DataFrameAnalyticsState.ANALYZING)); + assertThat(DataFrameAnalyticsState.fromString("stopping"), equalTo(DataFrameAnalyticsState.STOPPING)); + assertThat(DataFrameAnalyticsState.fromString("stopped"), equalTo(DataFrameAnalyticsState.STOPPED)); + assertThat(DataFrameAnalyticsState.fromString("failed"), equalTo(DataFrameAnalyticsState.FAILED)); + } + + public void testToString() { + assertThat(DataFrameAnalyticsState.STARTED.toString(), equalTo("started")); + assertThat(DataFrameAnalyticsState.REINDEXING.toString(), equalTo("reindexing")); + assertThat(DataFrameAnalyticsState.ANALYZING.toString(), equalTo("analyzing")); + assertThat(DataFrameAnalyticsState.STOPPING.toString(), equalTo("stopping")); + assertThat(DataFrameAnalyticsState.STOPPED.toString(), equalTo("stopped")); + assertThat(DataFrameAnalyticsState.FAILED.toString(), equalTo("failed")); + } + + public void testIsAnyOf() { + assertThat(DataFrameAnalyticsState.STARTED.isAnyOf(), is(false)); + assertThat(DataFrameAnalyticsState.STARTED.isAnyOf(DataFrameAnalyticsState.STARTED), is(true)); + assertThat(DataFrameAnalyticsState.STARTED.isAnyOf(DataFrameAnalyticsState.ANALYZING, DataFrameAnalyticsState.STOPPED), is(false)); + assertThat(DataFrameAnalyticsState.STARTED.isAnyOf(DataFrameAnalyticsState.STARTED, DataFrameAnalyticsState.STOPPED), is(true)); + assertThat(DataFrameAnalyticsState.ANALYZING.isAnyOf(DataFrameAnalyticsState.STARTED, DataFrameAnalyticsState.STOPPED), is(false)); + assertThat(DataFrameAnalyticsState.ANALYZING.isAnyOf(DataFrameAnalyticsState.ANALYZING, DataFrameAnalyticsState.FAILED), is(true)); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsTaskStateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsTaskStateTests.java new file mode 100644 index 00000000000..f6d2d421b3d --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsTaskStateTests.java @@ -0,0 +1,35 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.dataframe; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; + +public class DataFrameAnalyticsTaskStateTests extends AbstractSerializingTestCase { + + @Override + protected DataFrameAnalyticsTaskState createTestInstance() { + return new DataFrameAnalyticsTaskState(randomFrom(DataFrameAnalyticsState.values()), randomLong(), randomAlphaOfLength(10)); + } + + @Override + protected Writeable.Reader instanceReader() { + return DataFrameAnalyticsTaskState::new; + } + + @Override + protected DataFrameAnalyticsTaskState doParseInstance(XContentParser parser) throws IOException { + return DataFrameAnalyticsTaskState.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 3ea249d8ac8..cdb2b07ae8f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -501,7 +501,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu new BlackHoleAutodetectProcess(job.getId()); // factor of 1.0 makes renormalization a no-op normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) -> new MultiplyingNormalizerProcess(1.0); - analyticsProcessFactory = (jobId, analyticsProcessConfig, executorService) -> null; + analyticsProcessFactory = (jobId, analyticsProcessConfig, executorService, onProcessCrash) -> null; } NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory, threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java index 575069e4fd4..8d139ba9b6c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction.Response.Stats; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask; import org.elasticsearch.xpack.ml.dataframe.process.AnalyticsProcessManager; @@ -178,6 +179,11 @@ public class TransportGetDataFrameAnalyticsStatsAction PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); PersistentTasksCustomMetaData.PersistentTask analyticsTask = MlTasks.getDataFrameAnalyticsTask(concreteAnalyticsId, tasks); DataFrameAnalyticsState analyticsState = MlTasks.getDataFrameAnalyticsState(concreteAnalyticsId, tasks); + String failureReason = null; + if (analyticsState == DataFrameAnalyticsState.FAILED) { + DataFrameAnalyticsTaskState taskState = (DataFrameAnalyticsTaskState) analyticsTask.getState(); + failureReason = taskState.getReason(); + } DiscoveryNode node = null; String assignmentExplanation = null; if (analyticsTask != null) { @@ -185,6 +191,6 @@ public class TransportGetDataFrameAnalyticsStatsAction assignmentExplanation = analyticsTask.getAssignment().getExplanation(); } return new GetDataFrameAnalyticsStatsAction.Response.Stats( - concreteAnalyticsId, analyticsState, progressPercent, node, assignmentExplanation); + concreteAnalyticsId, analyticsState, failureReason, progressPercent, node, assignmentExplanation); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index 352aefbcbe7..915e39abcf3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -61,6 +61,7 @@ import org.elasticsearch.xpack.ml.process.MlMemoryTracker; import java.io.IOException; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE; @@ -130,8 +131,6 @@ public class TransportStartDataFrameAnalyticsAction return; } - StartDataFrameAnalyticsAction.TaskParams taskParams = new StartDataFrameAnalyticsAction.TaskParams(request.getId()); - // Wait for analytics to be started ActionListener> waitForAnalyticsToStart = new ActionListener>() { @@ -150,17 +149,26 @@ public class TransportStartDataFrameAnalyticsAction } }; + AtomicReference configHolder = new AtomicReference<>(); + // Start persistent task ActionListener memoryRequirementRefreshListener = ActionListener.wrap( - validated -> persistentTasksService.sendStartRequest(MlTasks.dataFrameAnalyticsTaskId(request.getId()), - MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, taskParams, waitForAnalyticsToStart), + aVoid -> { + StartDataFrameAnalyticsAction.TaskParams taskParams = new StartDataFrameAnalyticsAction.TaskParams( + request.getId(), configHolder.get().getVersion()); + persistentTasksService.sendStartRequest(MlTasks.dataFrameAnalyticsTaskId(request.getId()), + MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, taskParams, waitForAnalyticsToStart); + }, listener::onFailure ); // Tell the job tracker to refresh the memory requirement for this job and all other jobs that have persistent tasks ActionListener configListener = ActionListener.wrap( - config -> memoryTracker.addDataFrameAnalyticsJobMemoryAndRefreshAllOthers( - request.getId(), config.getModelMemoryLimit().getBytes(), memoryRequirementRefreshListener), + config -> { + configHolder.set(config); + memoryTracker.addDataFrameAnalyticsJobMemoryAndRefreshAllOthers( + request.getId(), config.getModelMemoryLimit().getBytes(), memoryRequirementRefreshListener); + }, listener::onFailure ); @@ -250,7 +258,21 @@ public class TransportStartDataFrameAnalyticsAction } DataFrameAnalyticsTaskState taskState = (DataFrameAnalyticsTaskState) persistentTask.getState(); DataFrameAnalyticsState analyticsState = taskState == null ? DataFrameAnalyticsState.STOPPED : taskState.getState(); - return analyticsState == DataFrameAnalyticsState.STARTED; + switch (analyticsState) { + case STARTED: + case REINDEXING: + case ANALYZING: + return true; + case STOPPING: + exception = ExceptionsHelper.conflictStatusException("the task has been stopped while waiting to be started"); + return true; + case STOPPED: + return false; + case FAILED: + default: + exception = ExceptionsHelper.serverError("Unexpected task state [" + analyticsState + "] while waiting to be started"); + return true; + } } } @@ -424,13 +446,15 @@ public class TransportStartDataFrameAnalyticsAction DataFrameAnalyticsTaskState analyticsTaskState = (DataFrameAnalyticsTaskState) state; // If we are "stopping" there is nothing to do - if (analyticsTaskState != null && analyticsTaskState.getState() == DataFrameAnalyticsState.STOPPING) { + // If we are "failed" then we should leave the task as is; for recovery it must be force stopped. + if (analyticsTaskState != null && analyticsTaskState.getState().isAnyOf( + DataFrameAnalyticsState.STOPPING, DataFrameAnalyticsState.FAILED)) { return; } if (analyticsTaskState == null) { DataFrameAnalyticsTaskState startedState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.STARTED, - task.getAllocationId()); + task.getAllocationId(), null); task.updatePersistentTaskState(startedState, ActionListener.wrap( response -> manager.execute((DataFrameAnalyticsTask) task, DataFrameAnalyticsState.STARTED), task::markAsFailed)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java index 7c8222d83f3..b02d89c1776 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.action; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; @@ -31,6 +32,7 @@ import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider; @@ -89,13 +91,10 @@ public class TransportStopDataFrameAnalyticsAction return; } - Set startedAnalytics = new HashSet<>(); - Set stoppingAnalytics = new HashSet<>(); PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - sortAnalyticsByTaskState(expandedIds, tasks, startedAnalytics, stoppingAnalytics); - - request.setExpandedIds(startedAnalytics); - request.setNodes(findAllocatedNodesAndRemoveUnassignedTasks(startedAnalytics, tasks)); + Set analyticsToStop = findAnalyticsToStop(tasks, expandedIds, request.isForce()); + request.setExpandedIds(analyticsToStop); + request.setNodes(findAllocatedNodesAndRemoveUnassignedTasks(analyticsToStop, tasks)); ActionListener finalListener = ActionListener.wrap( r -> waitForTaskRemoved(expandedIds, request, r, listener), @@ -110,8 +109,28 @@ public class TransportStopDataFrameAnalyticsAction expandIds(state, request, expandedIdsListener); } + /** Visible for testing */ + static Set findAnalyticsToStop(PersistentTasksCustomMetaData tasks, Set ids, boolean force) { + Set startedAnalytics = new HashSet<>(); + Set stoppingAnalytics = new HashSet<>(); + Set failedAnalytics = new HashSet<>(); + sortAnalyticsByTaskState(ids, tasks, startedAnalytics, stoppingAnalytics, failedAnalytics); + + if (force == false && failedAnalytics.isEmpty() == false) { + ElasticsearchStatusException e = failedAnalytics.size() == 1 ? ExceptionsHelper.conflictStatusException( + "cannot close data frame analytics [{}] because it failed, use force stop instead", failedAnalytics.iterator().next()) : + ExceptionsHelper.conflictStatusException("one or more data frame analytics are in failed state, " + + "use force stop instead"); + throw e; + } + + startedAnalytics.addAll(failedAnalytics); + return startedAnalytics; + } + private static void sortAnalyticsByTaskState(Set analyticsIds, PersistentTasksCustomMetaData tasks, - Set startedAnalytics, Set stoppingAnalytics) { + Set startedAnalytics, Set stoppingAnalytics, + Set failedAnalytics) { for (String analyticsId : analyticsIds) { switch (MlTasks.getDataFrameAnalyticsState(analyticsId, tasks)) { case STARTED: @@ -124,6 +143,9 @@ public class TransportStopDataFrameAnalyticsAction break; case STOPPED: break; + case FAILED: + failedAnalytics.add(analyticsId); + break; default: break; } @@ -203,7 +225,7 @@ public class TransportStopDataFrameAnalyticsAction TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask task, ActionListener listener) { DataFrameAnalyticsTaskState stoppingState = - new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.STOPPING, task.getAllocationId()); + new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.STOPPING, task.getAllocationId(), null); task.updatePersistentTaskState(stoppingState, ActionListener.wrap(pTask -> { threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { @Override @@ -213,6 +235,7 @@ public class TransportStopDataFrameAnalyticsAction @Override protected void doRun() { + logger.info("[{}] Stopping task with force [{}]", task.getParams().getId(), request.isForce()); task.stop("stop_data_frame_analytics (api)", request.getTimeout()); listener.onResponse(new StopDataFrameAnalyticsAction.Response(true)); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java index 9132e0f8192..28f277dc84b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java @@ -70,7 +70,7 @@ public class DataFrameAnalyticsManager { ActionListener configListener = ActionListener.wrap( config -> { DataFrameAnalyticsTaskState reindexingState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.REINDEXING, - task.getAllocationId()); + task.getAllocationId(), null); switch(currentState) { // If we are STARTED, we are right at the beginning of our task, we should indicate that we are entering the // REINDEX state and start reindexing. @@ -191,7 +191,7 @@ public class DataFrameAnalyticsManager { ActionListener dataExtractorFactoryListener = ActionListener.wrap( dataExtractorFactory -> { DataFrameAnalyticsTaskState analyzingState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.ANALYZING, - task.getAllocationId()); + task.getAllocationId(), null); task.updatePersistentTaskState(analyzingState, ActionListener.wrap( updatedTask -> processManager.runJob(task, config, dataExtractorFactory, error -> { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessFactory.java index d09757ddc5c..1df29b88ba4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessFactory.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.dataframe.process; import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; public interface AnalyticsProcessFactory { @@ -15,7 +16,9 @@ public interface AnalyticsProcessFactory { * @param jobId The job id * @param analyticsProcessConfig The process configuration * @param executorService Executor service used to start the async tasks a job needs to operate the analytical process + * @param onProcessCrash Callback to execute if the process stops unexpectedly * @return The process */ - AnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, ExecutorService executorService); + AnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, ExecutorService executorService, + Consumer onProcessCrash); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java index c1447f4d18b..bb54895b2fa 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.dataframe.process; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.client.Client; @@ -15,9 +16,11 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction; +import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask; import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor; import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory; @@ -46,7 +49,7 @@ public class AnalyticsProcessManager { this.processFactory = Objects.requireNonNull(analyticsProcessFactory); } - public void runJob(TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, + public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, DataFrameDataExtractorFactory dataExtractorFactory, Consumer finishHandler) { threadPool.generic().execute(() -> { if (task.isStopping()) { @@ -61,10 +64,10 @@ public class AnalyticsProcessManager { + "] Could not create process as one already exists")); return; } - if (processContext.startProcess(dataExtractorFactory, config)) { + if (processContext.startProcess(dataExtractorFactory, config, task)) { ExecutorService executorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME); - executorService.execute(() -> processContext.resultProcessor.process(processContext.process)); - executorService.execute(() -> processData(task.getAllocationId(), config, processContext.dataExtractor, + executorService.execute(() -> processResults(processContext)); + executorService.execute(() -> processData(task, config, processContext.dataExtractor, processContext.process, processContext.resultProcessor, finishHandler)); } else { finishHandler.accept(null); @@ -72,8 +75,17 @@ public class AnalyticsProcessManager { }); } - private void processData(long taskAllocationId, DataFrameAnalyticsConfig config, DataFrameDataExtractor dataExtractor, + private void processResults(ProcessContext processContext) { + try { + processContext.resultProcessor.process(processContext.process); + } catch (Exception e) { + processContext.setFailureReason(e.getMessage()); + } + } + + private void processData(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, DataFrameDataExtractor dataExtractor, AnalyticsProcess process, AnalyticsResultProcessor resultProcessor, Consumer finishHandler) { + try { writeHeaderRecord(dataExtractor, process); writeDataRows(dataExtractor, process); @@ -82,26 +94,28 @@ public class AnalyticsProcessManager { LOGGER.info("[{}] Waiting for result processor to complete", config.getId()); resultProcessor.awaitForCompletion(); + processContextByAllocation.get(task.getAllocationId()).setFailureReason(resultProcessor.getFailure()); + refreshDest(config); LOGGER.info("[{}] Result processor has completed", config.getId()); - } catch (IOException e) { - LOGGER.error(new ParameterizedMessage("[{}] Error writing data to the process", config.getId()), e); - // TODO Handle this failure by setting the task state to FAILED + } catch (Exception e) { + String errorMsg = new ParameterizedMessage("[{}] Error while processing data", config.getId()).getFormattedMessage(); + processContextByAllocation.get(task.getAllocationId()).setFailureReason(errorMsg); } finally { - LOGGER.info("[{}] Closing process", config.getId()); - try { - process.close(); - LOGGER.info("[{}] Closed process", config.getId()); + closeProcess(task); - // This results in marking the persistent task as complete - finishHandler.accept(null); - } catch (IOException e) { - LOGGER.error("[{}] Error closing data frame analyzer process", config.getId()); - finishHandler.accept(e); - } - processContextByAllocation.remove(taskAllocationId); + ProcessContext processContext = processContextByAllocation.remove(task.getAllocationId()); LOGGER.debug("Removed process context for task [{}]; [{}] processes still running", config.getId(), processContextByAllocation.size()); + + if (processContext.getFailureReason() == null) { + // This results in marking the persistent task as complete + LOGGER.info("[{}] Marking task completed", config.getId()); + finishHandler.accept(null); + } else { + LOGGER.error("[{}] Marking task failed; {}", config.getId(), processContext.getFailureReason()); + updateTaskState(task, DataFrameAnalyticsState.FAILED, processContext.getFailureReason()); + } } } @@ -142,15 +156,34 @@ public class AnalyticsProcessManager { process.writeRecord(headerRecord); } - private AnalyticsProcess createProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig) { + private AnalyticsProcess createProcess(DataFrameAnalyticsTask task, AnalyticsProcessConfig analyticsProcessConfig) { ExecutorService executorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME); - AnalyticsProcess process = processFactory.createAnalyticsProcess(jobId, analyticsProcessConfig, executorService); + AnalyticsProcess process = processFactory.createAnalyticsProcess(task.getParams().getId(), analyticsProcessConfig, + executorService, onProcessCrash(task)); if (process.isProcessAlive() == false) { throw ExceptionsHelper.serverError("Failed to start data frame analytics process"); } return process; } + private Consumer onProcessCrash(DataFrameAnalyticsTask task) { + return reason -> { + ProcessContext processContext = processContextByAllocation.get(task.getAllocationId()); + if (processContext != null) { + processContext.setFailureReason(reason); + processContext.stop(); + } + }; + } + + private void updateTaskState(DataFrameAnalyticsTask task, DataFrameAnalyticsState state, @Nullable String reason) { + DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(state, task.getAllocationId(), reason); + task.updatePersistentTaskState(newTaskState, ActionListener.wrap( + updatedTask -> LOGGER.info("[{}] Successfully update task state to [{}]", task.getParams().getId(), state), + e -> LOGGER.error(new ParameterizedMessage("[{}] Could not update task state to [{}]", task.getParams().getId(), state), e) + )); + } + @Nullable public Integer getProgressPercent(long allocationId) { ProcessContext processContext = processContextByAllocation.get(allocationId); @@ -162,13 +195,29 @@ public class AnalyticsProcessManager { () -> client.execute(RefreshAction.INSTANCE, new RefreshRequest(config.getDest().getIndex())).actionGet()); } - public void stop(TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask task) { + private void closeProcess(DataFrameAnalyticsTask task) { + String configId = task.getParams().getId(); + LOGGER.info("[{}] Closing process", configId); + + ProcessContext processContext = processContextByAllocation.get(task.getAllocationId()); + try { + processContext.process.close(); + LOGGER.info("[{}] Closed process", configId); + } catch (IOException e) { + String errorMsg = new ParameterizedMessage("[{}] Error closing data frame analyzer process [{}]" + , configId, e.getMessage()).getFormattedMessage(); + processContext.setFailureReason(errorMsg); + } + } + + public void stop(DataFrameAnalyticsTask task) { ProcessContext processContext = processContextByAllocation.get(task.getAllocationId()); if (processContext != null) { LOGGER.debug("[{}] Stopping process", task.getParams().getId() ); processContext.stop(); } else { LOGGER.debug("[{}] No process context to stop", task.getParams().getId() ); + task.markAsCompleted(); } } @@ -180,6 +229,7 @@ public class AnalyticsProcessManager { private volatile AnalyticsResultProcessor resultProcessor; private final AtomicInteger progressPercent = new AtomicInteger(0); private volatile boolean processKilled; + private volatile String failureReason; ProcessContext(String id) { this.id = Objects.requireNonNull(id); @@ -197,6 +247,17 @@ public class AnalyticsProcessManager { this.progressPercent.set(progressPercent); } + private synchronized void setFailureReason(String failureReason) { + // Only set the new reason if there isn't one already as we want to keep the first reason + if (failureReason != null) { + this.failureReason = failureReason; + } + } + + private String getFailureReason() { + return failureReason; + } + public synchronized void stop() { LOGGER.debug("[{}] Stopping process", id); processKilled = true; @@ -215,14 +276,15 @@ public class AnalyticsProcessManager { /** * @return {@code true} if the process was started or {@code false} if it was not because it was stopped in the meantime */ - private synchronized boolean startProcess(DataFrameDataExtractorFactory dataExtractorFactory, DataFrameAnalyticsConfig config) { + private synchronized boolean startProcess(DataFrameDataExtractorFactory dataExtractorFactory, DataFrameAnalyticsConfig config, + DataFrameAnalyticsTask task) { if (processKilled) { // The job was stopped before we started the process so no need to start it return false; } dataExtractor = dataExtractorFactory.newExtractor(false); - process = createProcess(config.getId(), createProcessConfig(config, dataExtractor)); + process = createProcess(task, createProcessConfig(config, dataExtractor)); DataFrameRowsJoiner dataFrameRowsJoiner = new DataFrameRowsJoiner(config.getId(), client, dataExtractorFactory.newExtractor(true)); resultProcessor = new AnalyticsResultProcessor(id, dataFrameRowsJoiner, this::isProcessKilled, this::setProgressPercent); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java index f9b13139354..11c451e9c39 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.dataframe.process; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.Nullable; import org.elasticsearch.xpack.ml.dataframe.process.results.RowResults; import java.util.Iterator; @@ -26,6 +27,7 @@ public class AnalyticsResultProcessor { private final Supplier isProcessKilled; private final Consumer progressConsumer; private final CountDownLatch completionLatch = new CountDownLatch(1); + private volatile String failure; public AnalyticsResultProcessor(String dataFrameAnalyticsId, DataFrameRowsJoiner dataFrameRowsJoiner, Supplier isProcessKilled, Consumer progressConsumer) { @@ -35,6 +37,11 @@ public class AnalyticsResultProcessor { this.progressConsumer = Objects.requireNonNull(progressConsumer); } + @Nullable + public String getFailure() { + return failure == null ? dataFrameRowsJoiner.getFailure() : failure; + } + public void awaitForCompletion() { try { if (completionLatch.await(30, TimeUnit.MINUTES) == false) { @@ -59,6 +66,7 @@ public class AnalyticsResultProcessor { // No need to log error as it's due to stopping } else { LOGGER.error(new ParameterizedMessage("[{}] Error parsing data frame analytics output", dataFrameAnalyticsId), e); + failure = "error parsing data frame analytics output: [" + e.getMessage() + "]"; } } finally { completionLatch.countDown(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/DataFrameRowsJoiner.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/DataFrameRowsJoiner.java index ef943820374..1b3dd2932ab 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/DataFrameRowsJoiner.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/DataFrameRowsJoiner.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.common.Nullable; import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; @@ -41,7 +42,7 @@ class DataFrameRowsJoiner implements AutoCloseable { private final DataFrameDataExtractor dataExtractor; private final Iterator dataFrameRowsIterator; private LinkedList currentResults; - private boolean failed; + private volatile String failure; DataFrameRowsJoiner(String analyticsId, Client client, DataFrameDataExtractor dataExtractor) { this.analyticsId = Objects.requireNonNull(analyticsId); @@ -51,8 +52,13 @@ class DataFrameRowsJoiner implements AutoCloseable { this.currentResults = new LinkedList<>(); } + @Nullable + String getFailure() { + return failure; + } + void processRowResults(RowResults rowResults) { - if (failed) { + if (failure != null) { // If we are in failed state we drop the results but we let the processor // parse the output return; @@ -61,8 +67,8 @@ class DataFrameRowsJoiner implements AutoCloseable { try { addResultAndJoinIfEndOfBatch(rowResults); } catch (Exception e) { - LOGGER.error(new ParameterizedMessage("[{}] Failed to join results", analyticsId), e); - failed = true; + LOGGER.error(new ParameterizedMessage("[{}] Failed to join results ", analyticsId), e); + failure = "[" + analyticsId + "] Failed to join results: " + e.getMessage(); } } @@ -93,8 +99,7 @@ class DataFrameRowsJoiner implements AutoCloseable { msg += "expected [" + row.getChecksum() + "] but result had [" + result.getChecksum() + "]; "; msg += "this implies the data frame index [" + row.getHit().getIndex() + "] was modified while the analysis was running. "; msg += "We rely on this index being immutable during a running analysis and so the results will be unreliable."; - throw new RuntimeException(msg); - // TODO Communicate this error to the user as effectively the analytics have failed (e.g. FAILED state, audit error, etc.) + throw ExceptionsHelper.serverError(msg); } } @@ -112,8 +117,7 @@ class DataFrameRowsJoiner implements AutoCloseable { BulkResponse bulkResponse = ClientHelper.executeWithHeaders(dataExtractor.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> client.execute(BulkAction.INSTANCE, bulkRequest).actionGet()); if (bulkResponse.hasFailures()) { - LOGGER.error("Failures while writing data frame"); - // TODO Better error handling + throw ExceptionsHelper.serverError("failures while writing results [" + bulkResponse.buildFailureMessage() + "]"); } } @@ -123,7 +127,7 @@ class DataFrameRowsJoiner implements AutoCloseable { joinCurrentResults(); } catch (Exception e) { LOGGER.error(new ParameterizedMessage("[{}] Failed to join results", analyticsId), e); - failed = true; + failure = "[" + analyticsId + "] Failed to join results: " + e.getMessage(); } finally { try { consumeDataExtractor(); @@ -159,7 +163,7 @@ class DataFrameRowsJoiner implements AutoCloseable { } if (row == null || row.shouldSkip()) { - throw ExceptionsHelper.serverError("No more data frame rows could be found while joining results"); + throw ExceptionsHelper.serverError("no more data frame rows could be found while joining results"); } return row; } @@ -175,9 +179,7 @@ class DataFrameRowsJoiner implements AutoCloseable { try { return dataExtractor.next(); } catch (IOException e) { - // TODO Implement recovery strategy or better error reporting - LOGGER.error("Error reading next batch of data frame rows", e); - return Optional.empty(); + throw ExceptionsHelper.serverError("error reading next batch of data frame rows [" + e.getMessage() + "]"); } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java index 0429ca4fc63..2ac9eb301e7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory { @@ -50,7 +51,7 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory { @Override public AnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, - ExecutorService executorService) { + ExecutorService executorService, Consumer onProcessCrash) { List filesToDelete = new ArrayList<>(); ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, jobId, true, false, true, true, false, false); @@ -62,8 +63,7 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory { NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, processPipes.getLogStream().get(), processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), null, numberOfFields, - filesToDelete, reason -> {}); - + filesToDelete, onProcessCrash); try { analyticsProcess.start(executorService); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java index b22ad9482e4..aa97c13b21d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java @@ -265,8 +265,9 @@ public class JobNodeSelector { MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, task -> node.getId().equals(task.getExecutorNode())); for (PersistentTasksCustomMetaData.PersistentTask assignedTask : assignedAnalyticsTasks) { DataFrameAnalyticsState dataFrameAnalyticsState = ((DataFrameAnalyticsTaskState) assignedTask.getState()).getState(); - // TODO: skip FAILED here too if such a state is ever added - if (dataFrameAnalyticsState != DataFrameAnalyticsState.STOPPED) { + + // Don't count stopped and failed df-analytics tasks as they don't consume native memory + if (dataFrameAnalyticsState.isAnyOf(DataFrameAnalyticsState.STOPPED, DataFrameAnalyticsState.FAILED) == false) { // The native process is only running in the ANALYZING and STOPPING states, but in the STARTED // and REINDEXING states we're committed to using the memory soon, so account for it here ++result.numberOfAssignedJobs; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestStopDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestStopDataFrameAnalyticsAction.java index 8a399c736c9..65f1d402735 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestStopDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestStopDataFrameAnalyticsAction.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.ml.rest.dataframe; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; @@ -39,15 +38,11 @@ public class RestStopDataFrameAnalyticsAction extends BaseRestHandler { request = StopDataFrameAnalyticsAction.Request.parseRequest(id, restRequest.contentOrSourceParamParser()); } else { request = new StopDataFrameAnalyticsAction.Request(id); - if (restRequest.hasParam(StopDataFrameAnalyticsAction.Request.TIMEOUT.getPreferredName())) { - TimeValue timeout = restRequest.paramAsTime(StopDataFrameAnalyticsAction.Request.TIMEOUT.getPreferredName(), - request.getTimeout()); - request.setTimeout(timeout); - } - if (restRequest.hasParam(StopDataFrameAnalyticsAction.Request.ALLOW_NO_MATCH.getPreferredName())) { - request.setAllowNoMatch(restRequest.paramAsBoolean(StopDataFrameAnalyticsAction.Request.ALLOW_NO_MATCH.getPreferredName(), - request.allowNoMatch())); - } + request.setTimeout(restRequest.paramAsTime(StopDataFrameAnalyticsAction.Request.TIMEOUT.getPreferredName(), + request.getTimeout())); + request.setAllowNoMatch(restRequest.paramAsBoolean(StopDataFrameAnalyticsAction.Request.ALLOW_NO_MATCH.getPreferredName(), + request.allowNoMatch())); + request.setForce(restRequest.paramAsBoolean(StopDataFrameAnalyticsAction.Request.FORCE.getPreferredName(), request.isForce())); } return channel -> client.execute(StopDataFrameAnalyticsAction.INSTANCE, request, new RestToXContentListener<>(channel)); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsActionTests.java new file mode 100644 index 00000000000..2f28463c520 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDataFrameAnalyticsActionTests.java @@ -0,0 +1,85 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.Version; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; + +public class TransportStopDataFrameAnalyticsActionTests extends ESTestCase { + + public void testFindAnalyticsToStop_GivenOneFailedTaskAndNotForce() { + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addAnalyticsTask(tasksBuilder, "started", "foo-node", DataFrameAnalyticsState.STARTED); + addAnalyticsTask(tasksBuilder, "reindexing", "foo-node", DataFrameAnalyticsState.REINDEXING); + addAnalyticsTask(tasksBuilder, "analyzing", "foo-node", DataFrameAnalyticsState.ANALYZING); + addAnalyticsTask(tasksBuilder, "stopping", "foo-node", DataFrameAnalyticsState.STOPPING); + addAnalyticsTask(tasksBuilder, "stopped", "foo-node", DataFrameAnalyticsState.STOPPED); + addAnalyticsTask(tasksBuilder, "failed", "foo-node", DataFrameAnalyticsState.FAILED); + + Set ids = new HashSet<>(Arrays.asList("started", "reindexing", "analyzing", "stopping", "stopped", "failed")); + + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, + () -> TransportStopDataFrameAnalyticsAction.findAnalyticsToStop(tasksBuilder.build(), ids, false)); + + assertThat(e.status(), equalTo(RestStatus.CONFLICT)); + assertThat(e.getMessage(), equalTo("cannot close data frame analytics [failed] because it failed, use force stop instead")); + } + + public void testFindAnalyticsToStop_GivenTwoFailedTasksAndNotForce() { + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addAnalyticsTask(tasksBuilder, "failed", "foo-node", DataFrameAnalyticsState.FAILED); + addAnalyticsTask(tasksBuilder, "another_failed", "foo-node", DataFrameAnalyticsState.FAILED); + + Set ids = new HashSet<>(Arrays.asList("failed", "another_failed")); + + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, + () -> TransportStopDataFrameAnalyticsAction.findAnalyticsToStop(tasksBuilder.build(), ids, false)); + + assertThat(e.status(), equalTo(RestStatus.CONFLICT)); + assertThat(e.getMessage(), equalTo("one or more data frame analytics are in failed state, use force stop instead")); + } + + public void testFindAnalyticsToStop_GivenFailedTaskAndForce() { + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addAnalyticsTask(tasksBuilder, "started", "foo-node", DataFrameAnalyticsState.STARTED); + addAnalyticsTask(tasksBuilder, "reindexing", "foo-node", DataFrameAnalyticsState.REINDEXING); + addAnalyticsTask(tasksBuilder, "analyzing", "foo-node", DataFrameAnalyticsState.ANALYZING); + addAnalyticsTask(tasksBuilder, "stopping", "foo-node", DataFrameAnalyticsState.STOPPING); + addAnalyticsTask(tasksBuilder, "stopped", "foo-node", DataFrameAnalyticsState.STOPPED); + addAnalyticsTask(tasksBuilder, "failed", "foo-node", DataFrameAnalyticsState.FAILED); + + Set ids = new HashSet<>(Arrays.asList("started", "reindexing", "analyzing", "stopping", "stopped", "failed")); + + Set analyticsToStop = TransportStopDataFrameAnalyticsAction.findAnalyticsToStop(tasksBuilder.build(), ids, true); + + assertThat(analyticsToStop, containsInAnyOrder("started", "reindexing", "analyzing", "failed")); + } + + private static void addAnalyticsTask(PersistentTasksCustomMetaData.Builder builder, String analyticsId, String nodeId, + DataFrameAnalyticsState state) { + builder.addTask(MlTasks.dataFrameAnalyticsTaskId(analyticsId), MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, + new StartDataFrameAnalyticsAction.TaskParams(analyticsId, Version.CURRENT), + new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment")); + + builder.updateTaskState(MlTasks.dataFrameAnalyticsTaskId(analyticsId), + new DataFrameAnalyticsTaskState(state, builder.getLastAllocationId(), null)); + + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java index ad69cbc9729..f0388abd5d5 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java @@ -566,10 +566,11 @@ public class JobNodeSelectorTests extends ESTestCase { static void addDataFrameAnalyticsJobTask(String id, String nodeId, DataFrameAnalyticsState state, PersistentTasksCustomMetaData.Builder builder, boolean isStale) { builder.addTask(MlTasks.dataFrameAnalyticsTaskId(id), MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, - new StartDataFrameAnalyticsAction.TaskParams(id), new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment")); + new StartDataFrameAnalyticsAction.TaskParams(id, Version.CURRENT), + new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment")); if (state != null) { builder.updateTaskState(MlTasks.dataFrameAnalyticsTaskId(id), - new DataFrameAnalyticsTaskState(state, builder.getLastAllocationId() - (isStale ? 1 : 0))); + new DataFrameAnalyticsTaskState(state, builder.getLastAllocationId() - (isStale ? 1 : 0), null)); } } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java index 1dea073123a..429575902b0 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.process; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; @@ -199,7 +200,7 @@ public class MlMemoryTrackerTests extends ESTestCase { private PersistentTasksCustomMetaData.PersistentTask makeTestDataFrameAnalyticsTask(String id) { return new PersistentTasksCustomMetaData.PersistentTask<>(MlTasks.dataFrameAnalyticsTaskId(id), - MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, new StartDataFrameAnalyticsAction.TaskParams(id), 0, + MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, new StartDataFrameAnalyticsAction.TaskParams(id, Version.CURRENT), 0, PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT); } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.stop_data_frame_analytics.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.stop_data_frame_analytics.json index 962e4e391a0..ee1a9c85be4 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.stop_data_frame_analytics.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.stop_data_frame_analytics.json @@ -18,6 +18,11 @@ "required": false, "description": "Whether to ignore if a wildcard expression matches no data frame analytics. (This includes `_all` string or when no data frame analytics have been specified)" }, + "force": { + "type": "boolean", + "required": false, + "description": "True if the data frame analytics should be forcefully stopped" + }, "timeout": { "type": "time", "required": false,