[7.x][ML] Set df-analytics task state to failed when appropriate (#43880) (#43906)

This introduces a `failed` state to which the data frame analytics
persistent task is set to when something unexpected fails. It could
be the process crashing, the results processor hitting some error,
etc. The failure message is then captured and set on the task state.
From there, it becomes available via the _stats API as `failure_reason`.

The df-analytics stop API now has a `force` boolean parameter. This allows
the user to call it for a failed task in order to reset it to `stopped` after
we have ensured the failure has been communicated to the user.

This commit also adds the analytics version in the persistent task
params as this allows us to prevent tasks to run on unsuitable nodes in
the future.
This commit is contained in:
Dimitris Athanasiou 2019-07-03 12:41:56 +03:00 committed by GitHub
parent 1e0f67fb38
commit 96b0b27f18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 566 additions and 118 deletions

View File

@ -676,6 +676,9 @@ final class MLRequestConverters {
params.putParam( params.putParam(
StopDataFrameAnalyticsRequest.ALLOW_NO_MATCH.getPreferredName(), Boolean.toString(stopRequest.getAllowNoMatch())); StopDataFrameAnalyticsRequest.ALLOW_NO_MATCH.getPreferredName(), Boolean.toString(stopRequest.getAllowNoMatch()));
} }
if (stopRequest.getForce() != null) {
params.putParam(StopDataFrameAnalyticsRequest.FORCE.getPreferredName(), Boolean.toString(stopRequest.getForce()));
}
request.addParameters(params.asMap()); request.addParameters(params.asMap());
return request; return request;
} }

View File

@ -31,10 +31,12 @@ import java.util.Optional;
public class StopDataFrameAnalyticsRequest implements Validatable { public class StopDataFrameAnalyticsRequest implements Validatable {
public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match"); public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match");
public static final ParseField FORCE = new ParseField("force");
private final String id; private final String id;
private TimeValue timeout;
private Boolean allowNoMatch; private Boolean allowNoMatch;
private Boolean force;
private TimeValue timeout;
public StopDataFrameAnalyticsRequest(String id) { public StopDataFrameAnalyticsRequest(String id) {
this.id = id; this.id = id;
@ -62,6 +64,15 @@ public class StopDataFrameAnalyticsRequest implements Validatable {
return this; return this;
} }
public Boolean getForce() {
return force;
}
public StopDataFrameAnalyticsRequest setForce(boolean force) {
this.force = force;
return this;
}
@Override @Override
public Optional<ValidationException> validate() { public Optional<ValidationException> validate() {
if (id == null) { if (id == null) {
@ -78,11 +89,12 @@ public class StopDataFrameAnalyticsRequest implements Validatable {
StopDataFrameAnalyticsRequest other = (StopDataFrameAnalyticsRequest) o; StopDataFrameAnalyticsRequest other = (StopDataFrameAnalyticsRequest) o;
return Objects.equals(id, other.id) return Objects.equals(id, other.id)
&& Objects.equals(timeout, other.timeout) && Objects.equals(timeout, other.timeout)
&& Objects.equals(allowNoMatch, other.allowNoMatch); && Objects.equals(allowNoMatch, other.allowNoMatch)
&& Objects.equals(force, other.force);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(id, timeout, allowNoMatch); return Objects.hash(id, timeout, allowNoMatch, force);
} }
} }

View File

@ -41,6 +41,7 @@ public class DataFrameAnalyticsStats {
static final ParseField ID = new ParseField("id"); static final ParseField ID = new ParseField("id");
static final ParseField STATE = new ParseField("state"); static final ParseField STATE = new ParseField("state");
static final ParseField FAILURE_REASON = new ParseField("failure_reason");
static final ParseField PROGRESS_PERCENT = new ParseField("progress_percent"); static final ParseField PROGRESS_PERCENT = new ParseField("progress_percent");
static final ParseField NODE = new ParseField("node"); static final ParseField NODE = new ParseField("node");
static final ParseField ASSIGNMENT_EXPLANATION = new ParseField("assignment_explanation"); static final ParseField ASSIGNMENT_EXPLANATION = new ParseField("assignment_explanation");
@ -50,9 +51,10 @@ public class DataFrameAnalyticsStats {
args -> new DataFrameAnalyticsStats( args -> new DataFrameAnalyticsStats(
(String) args[0], (String) args[0],
(DataFrameAnalyticsState) args[1], (DataFrameAnalyticsState) args[1],
(Integer) args[2], (String) args[2],
(NodeAttributes) args[3], (Integer) args[3],
(String) args[4])); (NodeAttributes) args[4],
(String) args[5]));
static { static {
PARSER.declareString(constructorArg(), ID); PARSER.declareString(constructorArg(), ID);
@ -62,6 +64,7 @@ public class DataFrameAnalyticsStats {
} }
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, STATE, ObjectParser.ValueType.STRING); }, STATE, ObjectParser.ValueType.STRING);
PARSER.declareString(optionalConstructorArg(), FAILURE_REASON);
PARSER.declareInt(optionalConstructorArg(), PROGRESS_PERCENT); PARSER.declareInt(optionalConstructorArg(), PROGRESS_PERCENT);
PARSER.declareObject(optionalConstructorArg(), NodeAttributes.PARSER, NODE); PARSER.declareObject(optionalConstructorArg(), NodeAttributes.PARSER, NODE);
PARSER.declareString(optionalConstructorArg(), ASSIGNMENT_EXPLANATION); PARSER.declareString(optionalConstructorArg(), ASSIGNMENT_EXPLANATION);
@ -69,14 +72,17 @@ public class DataFrameAnalyticsStats {
private final String id; private final String id;
private final DataFrameAnalyticsState state; private final DataFrameAnalyticsState state;
private final String failureReason;
private final Integer progressPercent; private final Integer progressPercent;
private final NodeAttributes node; private final NodeAttributes node;
private final String assignmentExplanation; private final String assignmentExplanation;
public DataFrameAnalyticsStats(String id, DataFrameAnalyticsState state, @Nullable Integer progressPercent, public DataFrameAnalyticsStats(String id, DataFrameAnalyticsState state, @Nullable String failureReason,
@Nullable NodeAttributes node, @Nullable String assignmentExplanation) { @Nullable Integer progressPercent, @Nullable NodeAttributes node,
@Nullable String assignmentExplanation) {
this.id = id; this.id = id;
this.state = state; this.state = state;
this.failureReason = failureReason;
this.progressPercent = progressPercent; this.progressPercent = progressPercent;
this.node = node; this.node = node;
this.assignmentExplanation = assignmentExplanation; this.assignmentExplanation = assignmentExplanation;
@ -90,6 +96,10 @@ public class DataFrameAnalyticsStats {
return state; return state;
} }
public String getFailureReason() {
return failureReason;
}
public Integer getProgressPercent() { public Integer getProgressPercent() {
return progressPercent; return progressPercent;
} }
@ -110,6 +120,7 @@ public class DataFrameAnalyticsStats {
DataFrameAnalyticsStats other = (DataFrameAnalyticsStats) o; DataFrameAnalyticsStats other = (DataFrameAnalyticsStats) o;
return Objects.equals(id, other.id) return Objects.equals(id, other.id)
&& Objects.equals(state, other.state) && Objects.equals(state, other.state)
&& Objects.equals(failureReason, other.failureReason)
&& Objects.equals(progressPercent, other.progressPercent) && Objects.equals(progressPercent, other.progressPercent)
&& Objects.equals(node, other.node) && Objects.equals(node, other.node)
&& Objects.equals(assignmentExplanation, other.assignmentExplanation); && Objects.equals(assignmentExplanation, other.assignmentExplanation);
@ -117,7 +128,7 @@ public class DataFrameAnalyticsStats {
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(id, state, progressPercent, node, assignmentExplanation); return Objects.hash(id, state, failureReason, progressPercent, node, assignmentExplanation);
} }
@Override @Override
@ -125,6 +136,7 @@ public class DataFrameAnalyticsStats {
return new ToStringBuilder(getClass()) return new ToStringBuilder(getClass())
.add("id", id) .add("id", id)
.add("state", state) .add("state", state)
.add("failureReason", failureReason)
.add("progressPercent", progressPercent) .add("progressPercent", progressPercent)
.add("node", node) .add("node", node)
.add("assignmentExplanation", assignmentExplanation) .add("assignmentExplanation", assignmentExplanation)

View File

@ -758,11 +758,15 @@ public class MLRequestConvertersTests extends ESTestCase {
public void testStopDataFrameAnalytics_WithParams() { public void testStopDataFrameAnalytics_WithParams() {
StopDataFrameAnalyticsRequest stopRequest = new StopDataFrameAnalyticsRequest(randomAlphaOfLength(10)) StopDataFrameAnalyticsRequest stopRequest = new StopDataFrameAnalyticsRequest(randomAlphaOfLength(10))
.setTimeout(TimeValue.timeValueMinutes(1)) .setTimeout(TimeValue.timeValueMinutes(1))
.setAllowNoMatch(false); .setAllowNoMatch(false)
.setForce(true);
Request request = MLRequestConverters.stopDataFrameAnalytics(stopRequest); Request request = MLRequestConverters.stopDataFrameAnalytics(stopRequest);
assertEquals(HttpPost.METHOD_NAME, request.getMethod()); assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertEquals("/_ml/data_frame/analytics/" + stopRequest.getId() + "/_stop", request.getEndpoint()); assertEquals("/_ml/data_frame/analytics/" + stopRequest.getId() + "/_stop", request.getEndpoint());
assertThat(request.getParameters(), allOf(hasEntry("timeout", "1m"), hasEntry("allow_no_match", "false"))); assertThat(request.getParameters(), allOf(
hasEntry("timeout", "1m"),
hasEntry("allow_no_match", "false"),
hasEntry("force", "true")));
assertNull(request.getEntity()); assertNull(request.getEntity());
} }

View File

@ -1359,6 +1359,7 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase {
DataFrameAnalyticsStats stats = statsResponse.getAnalyticsStats().get(0); DataFrameAnalyticsStats stats = statsResponse.getAnalyticsStats().get(0);
assertThat(stats.getId(), equalTo(configId)); assertThat(stats.getId(), equalTo(configId));
assertThat(stats.getState(), equalTo(DataFrameAnalyticsState.STOPPED)); assertThat(stats.getState(), equalTo(DataFrameAnalyticsState.STOPPED));
assertNull(stats.getFailureReason());
assertNull(stats.getProgressPercent()); assertNull(stats.getProgressPercent());
assertNull(stats.getNode()); assertNull(stats.getNode());
assertNull(stats.getAssignmentExplanation()); assertNull(stats.getAssignmentExplanation());

View File

@ -3110,6 +3110,7 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
{ {
// tag::stop-data-frame-analytics-request // tag::stop-data-frame-analytics-request
StopDataFrameAnalyticsRequest request = new StopDataFrameAnalyticsRequest("my-analytics-config"); // <1> StopDataFrameAnalyticsRequest request = new StopDataFrameAnalyticsRequest("my-analytics-config"); // <1>
request.setForce(false); // <2>
// end::stop-data-frame-analytics-request // end::stop-data-frame-analytics-request
// tag::stop-data-frame-analytics-execute // tag::stop-data-frame-analytics-execute

View File

@ -43,6 +43,7 @@ public class DataFrameAnalyticsStatsTests extends ESTestCase {
return new DataFrameAnalyticsStats( return new DataFrameAnalyticsStats(
randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
randomFrom(DataFrameAnalyticsState.values()), randomFrom(DataFrameAnalyticsState.values()),
randomBoolean() ? null : randomAlphaOfLength(10),
randomBoolean() ? null : randomIntBetween(0, 100), randomBoolean() ? null : randomIntBetween(0, 100),
randomBoolean() ? null : NodeAttributesTests.createRandom(), randomBoolean() ? null : NodeAttributesTests.createRandom(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20)); randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20));
@ -52,6 +53,9 @@ public class DataFrameAnalyticsStatsTests extends ESTestCase {
builder.startObject(); builder.startObject();
builder.field(DataFrameAnalyticsStats.ID.getPreferredName(), stats.getId()); builder.field(DataFrameAnalyticsStats.ID.getPreferredName(), stats.getId());
builder.field(DataFrameAnalyticsStats.STATE.getPreferredName(), stats.getState().value()); builder.field(DataFrameAnalyticsStats.STATE.getPreferredName(), stats.getState().value());
if (stats.getFailureReason() != null) {
builder.field(DataFrameAnalyticsStats.FAILURE_REASON.getPreferredName(), stats.getFailureReason());
}
if (stats.getProgressPercent() != null) { if (stats.getProgressPercent() != null) {
builder.field(DataFrameAnalyticsStats.PROGRESS_PERCENT.getPreferredName(), stats.getProgressPercent()); builder.field(DataFrameAnalyticsStats.PROGRESS_PERCENT.getPreferredName(), stats.getProgressPercent());
} }

View File

@ -19,6 +19,7 @@ A +{request}+ object requires a {dataframe-analytics-config} id.
include-tagged::{doc-tests-file}[{api}-request] include-tagged::{doc-tests-file}[{api}-request]
--------------------------------------------------- ---------------------------------------------------
<1> Constructing a new stop request referencing an existing {dataframe-analytics-config} <1> Constructing a new stop request referencing an existing {dataframe-analytics-config}
<2> Optionally used to stop a failed task
include::../execution.asciidoc[] include::../execution.asciidoc[]

View File

@ -6,9 +6,9 @@
package org.elasticsearch.xpack.core.ml.action; package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.tasks.BaseTasksRequest; import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.action.support.tasks.BaseTasksResponse;
@ -158,16 +158,19 @@ public class GetDataFrameAnalyticsStatsAction extends ActionType<GetDataFrameAna
private final String id; private final String id;
private final DataFrameAnalyticsState state; private final DataFrameAnalyticsState state;
@Nullable @Nullable
private final String failureReason;
@Nullable
private final Integer progressPercentage; private final Integer progressPercentage;
@Nullable @Nullable
private final DiscoveryNode node; private final DiscoveryNode node;
@Nullable @Nullable
private final String assignmentExplanation; private final String assignmentExplanation;
public Stats(String id, DataFrameAnalyticsState state, @Nullable Integer progressPercentage, public Stats(String id, DataFrameAnalyticsState state, @Nullable String failureReason, @Nullable Integer progressPercentage,
@Nullable DiscoveryNode node, @Nullable String assignmentExplanation) { @Nullable DiscoveryNode node, @Nullable String assignmentExplanation) {
this.id = Objects.requireNonNull(id); this.id = Objects.requireNonNull(id);
this.state = Objects.requireNonNull(state); this.state = Objects.requireNonNull(state);
this.failureReason = failureReason;
this.progressPercentage = progressPercentage; this.progressPercentage = progressPercentage;
this.node = node; this.node = node;
this.assignmentExplanation = assignmentExplanation; this.assignmentExplanation = assignmentExplanation;
@ -176,6 +179,7 @@ public class GetDataFrameAnalyticsStatsAction extends ActionType<GetDataFrameAna
public Stats(StreamInput in) throws IOException { public Stats(StreamInput in) throws IOException {
id = in.readString(); id = in.readString();
state = DataFrameAnalyticsState.fromStream(in); state = DataFrameAnalyticsState.fromStream(in);
failureReason = in.readOptionalString();
progressPercentage = in.readOptionalInt(); progressPercentage = in.readOptionalInt();
node = in.readOptionalWriteable(DiscoveryNode::new); node = in.readOptionalWriteable(DiscoveryNode::new);
assignmentExplanation = in.readOptionalString(); assignmentExplanation = in.readOptionalString();
@ -202,6 +206,9 @@ public class GetDataFrameAnalyticsStatsAction extends ActionType<GetDataFrameAna
public XContentBuilder toUnwrappedXContent(XContentBuilder builder) throws IOException { public XContentBuilder toUnwrappedXContent(XContentBuilder builder) throws IOException {
builder.field(DataFrameAnalyticsConfig.ID.getPreferredName(), id); builder.field(DataFrameAnalyticsConfig.ID.getPreferredName(), id);
builder.field("state", state.toString()); builder.field("state", state.toString());
if (failureReason != null) {
builder.field("failure_reason", failureReason);
}
if (progressPercentage != null) { if (progressPercentage != null) {
builder.field("progress_percent", progressPercentage); builder.field("progress_percent", progressPercentage);
} }
@ -229,6 +236,7 @@ public class GetDataFrameAnalyticsStatsAction extends ActionType<GetDataFrameAna
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeString(id); out.writeString(id);
state.writeTo(out); state.writeTo(out);
out.writeOptionalString(failureReason);
out.writeOptionalInt(progressPercentage); out.writeOptionalInt(progressPercentage);
out.writeOptionalWriteable(node); out.writeOptionalWriteable(node);
out.writeOptionalString(assignmentExplanation); out.writeOptionalString(assignmentExplanation);
@ -236,7 +244,7 @@ public class GetDataFrameAnalyticsStatsAction extends ActionType<GetDataFrameAna
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(id, state, progressPercentage, node, assignmentExplanation); return Objects.hash(id, state, failureReason, progressPercentage, node, assignmentExplanation);
} }
@Override @Override
@ -250,6 +258,7 @@ public class GetDataFrameAnalyticsStatsAction extends ActionType<GetDataFrameAna
Stats other = (Stats) obj; Stats other = (Stats) obj;
return Objects.equals(id, other.id) return Objects.equals(id, other.id)
&& Objects.equals(this.state, other.state) && Objects.equals(this.state, other.state)
&& Objects.equals(this.failureReason, other.failureReason)
&& Objects.equals(this.node, other.node) && Objects.equals(this.node, other.node)
&& Objects.equals(this.assignmentExplanation, other.assignmentExplanation); && Objects.equals(this.assignmentExplanation, other.assignmentExplanation);
} }

View File

@ -6,9 +6,9 @@
package org.elasticsearch.xpack.core.ml.action; package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
@ -157,20 +157,32 @@ public class StartDataFrameAnalyticsAction extends ActionType<AcknowledgedRespon
public static final Version VERSION_INTRODUCED = Version.V_7_3_0; public static final Version VERSION_INTRODUCED = Version.V_7_3_0;
public static ConstructingObjectParser<TaskParams, Void> PARSER = new ConstructingObjectParser<>( public static ConstructingObjectParser<TaskParams, Void> PARSER = new ConstructingObjectParser<>(
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, true, a -> new TaskParams((String) a[0])); MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, true, a -> new TaskParams((String) a[0], (String) a[1]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameAnalyticsConfig.ID);
PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameAnalyticsConfig.VERSION);
}
public static TaskParams fromXContent(XContentParser parser) { public static TaskParams fromXContent(XContentParser parser) {
return PARSER.apply(parser, null); return PARSER.apply(parser, null);
} }
private String id; private final String id;
private final Version version;
public TaskParams(String id) { public TaskParams(String id, Version version) {
this.id = Objects.requireNonNull(id); this.id = Objects.requireNonNull(id);
this.version = Objects.requireNonNull(version);
}
private TaskParams(String id, String version) {
this(id, Version.fromString(version));
} }
public TaskParams(StreamInput in) throws IOException { public TaskParams(StreamInput in) throws IOException {
this.id = in.readString(); this.id = in.readString();
this.version = Version.readVersion(in);
} }
public String getId() { public String getId() {
@ -190,15 +202,31 @@ public class StartDataFrameAnalyticsAction extends ActionType<AcknowledgedRespon
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeString(id); out.writeString(id);
Version.writeVersion(version, out);
} }
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();
builder.field(DataFrameAnalyticsConfig.ID.getPreferredName(), id); builder.field(DataFrameAnalyticsConfig.ID.getPreferredName(), id);
builder.field(DataFrameAnalyticsConfig.VERSION.getPreferredName(), version);
builder.endObject(); builder.endObject();
return builder; return builder;
} }
@Override
public int hashCode() {
return Objects.hash(id, version);
}
@Override
public boolean equals(Object o) {
if (o == this) return true;
if (o == null || getClass() != o.getClass()) return false;
TaskParams other = (TaskParams) o;
return Objects.equals(id, other.id) && Objects.equals(version, other.version);
}
} }
public interface TaskMatcher { public interface TaskMatcher {

View File

@ -5,9 +5,9 @@
*/ */
package org.elasticsearch.xpack.core.ml.action; package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.tasks.BaseTasksRequest; import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
@ -49,14 +49,17 @@ public class StopDataFrameAnalyticsAction extends ActionType<StopDataFrameAnalyt
public static class Request extends BaseTasksRequest<Request> implements ToXContentObject { public static class Request extends BaseTasksRequest<Request> implements ToXContentObject {
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match"); public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match");
public static final ParseField FORCE = new ParseField("force");
public static final ParseField TIMEOUT = new ParseField("timeout");
private static final ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new); private static final ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);
static { static {
PARSER.declareString((request, id) -> request.id = id, DataFrameAnalyticsConfig.ID); PARSER.declareString((request, id) -> request.id = id, DataFrameAnalyticsConfig.ID);
PARSER.declareString((request, val) -> request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT); PARSER.declareString((request, val) -> request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
PARSER.declareBoolean(Request::setAllowNoMatch, ALLOW_NO_MATCH);
PARSER.declareBoolean(Request::setForce, FORCE);
} }
public static Request parseRequest(String id, XContentParser parser) { public static Request parseRequest(String id, XContentParser parser) {
@ -71,8 +74,9 @@ public class StopDataFrameAnalyticsAction extends ActionType<StopDataFrameAnalyt
} }
private String id; private String id;
private Set<String> expandedIds = Collections.emptySet();
private boolean allowNoMatch = true; private boolean allowNoMatch = true;
private boolean force;
private Set<String> expandedIds = Collections.emptySet();
public Request(String id) { public Request(String id) {
setId(id); setId(id);
@ -81,8 +85,9 @@ public class StopDataFrameAnalyticsAction extends ActionType<StopDataFrameAnalyt
public Request(StreamInput in) throws IOException { public Request(StreamInput in) throws IOException {
super(in); super(in);
id = in.readString(); id = in.readString();
expandedIds = new HashSet<>(Arrays.asList(in.readStringArray()));
allowNoMatch = in.readBoolean(); allowNoMatch = in.readBoolean();
force = in.readBoolean();
expandedIds = new HashSet<>(Arrays.asList(in.readStringArray()));
} }
public Request() {} public Request() {}
@ -95,6 +100,22 @@ public class StopDataFrameAnalyticsAction extends ActionType<StopDataFrameAnalyt
return id; return id;
} }
public boolean allowNoMatch() {
return allowNoMatch;
}
public void setAllowNoMatch(boolean allowNoMatch) {
this.allowNoMatch = allowNoMatch;
}
public boolean isForce() {
return force;
}
public void setForce(boolean force) {
this.force = force;
}
@Nullable @Nullable
public Set<String> getExpandedIds() { public Set<String> getExpandedIds() {
return expandedIds; return expandedIds;
@ -104,14 +125,6 @@ public class StopDataFrameAnalyticsAction extends ActionType<StopDataFrameAnalyt
this.expandedIds = Objects.requireNonNull(expandedIds); this.expandedIds = Objects.requireNonNull(expandedIds);
} }
public boolean allowNoMatch() {
return allowNoMatch;
}
public void setAllowNoMatch(boolean allowNoMatch) {
this.allowNoMatch = allowNoMatch;
}
@Override @Override
public ActionRequestValidationException validate() { public ActionRequestValidationException validate() {
return null; return null;
@ -121,8 +134,9 @@ public class StopDataFrameAnalyticsAction extends ActionType<StopDataFrameAnalyt
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeString(id); out.writeString(id);
out.writeStringArray(expandedIds.toArray(new String[0]));
out.writeBoolean(allowNoMatch); out.writeBoolean(allowNoMatch);
out.writeBoolean(force);
out.writeStringArray(expandedIds.toArray(new String[0]));
} }
@Override @Override
@ -131,12 +145,13 @@ public class StopDataFrameAnalyticsAction extends ActionType<StopDataFrameAnalyt
.startObject() .startObject()
.field(DataFrameAnalyticsConfig.ID.getPreferredName(), id) .field(DataFrameAnalyticsConfig.ID.getPreferredName(), id)
.field(ALLOW_NO_MATCH.getPreferredName(), allowNoMatch) .field(ALLOW_NO_MATCH.getPreferredName(), allowNoMatch)
.field(FORCE.getPreferredName(), force)
.endObject(); .endObject();
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(id, getTimeout(), expandedIds, allowNoMatch); return Objects.hash(id, getTimeout(), allowNoMatch, force, expandedIds);
} }
@Override @Override
@ -150,8 +165,9 @@ public class StopDataFrameAnalyticsAction extends ActionType<StopDataFrameAnalyt
StopDataFrameAnalyticsAction.Request other = (StopDataFrameAnalyticsAction.Request) obj; StopDataFrameAnalyticsAction.Request other = (StopDataFrameAnalyticsAction.Request) obj;
return Objects.equals(id, other.id) return Objects.equals(id, other.id)
&& Objects.equals(getTimeout(), other.getTimeout()) && Objects.equals(getTimeout(), other.getTimeout())
&& Objects.equals(expandedIds, other.expandedIds) && allowNoMatch == other.allowNoMatch
&& allowNoMatch == other.allowNoMatch; && force == other.force
&& Objects.equals(expandedIds, other.expandedIds);
} }
@Override @Override

View File

@ -10,11 +10,12 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Locale; import java.util.Locale;
public enum DataFrameAnalyticsState implements Writeable { public enum DataFrameAnalyticsState implements Writeable {
STARTED, REINDEXING, ANALYZING, STOPPING, STOPPED; STARTED, REINDEXING, ANALYZING, STOPPING, STOPPED, FAILED;
public static DataFrameAnalyticsState fromString(String name) { public static DataFrameAnalyticsState fromString(String name) {
return valueOf(name.trim().toUpperCase(Locale.ROOT)); return valueOf(name.trim().toUpperCase(Locale.ROOT));
@ -33,4 +34,11 @@ public enum DataFrameAnalyticsState implements Writeable {
public String toString() { public String toString() {
return name().toLowerCase(Locale.ROOT); return name().toLowerCase(Locale.ROOT);
} }
/**
* @return {@code true} if state matches any of the given {@code candidates}
*/
public boolean isAnyOf(DataFrameAnalyticsState... candidates) {
return Arrays.stream(candidates).anyMatch(candidate -> this == candidate);
}
} }

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.core.ml.dataframe; package org.elasticsearch.xpack.core.ml.dataframe;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -25,13 +26,15 @@ public class DataFrameAnalyticsTaskState implements PersistentTaskState {
private static ParseField STATE = new ParseField("state"); private static ParseField STATE = new ParseField("state");
private static ParseField ALLOCATION_ID = new ParseField("allocation_id"); private static ParseField ALLOCATION_ID = new ParseField("allocation_id");
private static ParseField REASON = new ParseField("reason");
private final DataFrameAnalyticsState state; private final DataFrameAnalyticsState state;
private final long allocationId; private final long allocationId;
private final String reason;
private static final ConstructingObjectParser<DataFrameAnalyticsTaskState, Void> PARSER = private static final ConstructingObjectParser<DataFrameAnalyticsTaskState, Void> PARSER =
new ConstructingObjectParser<>(NAME, true, new ConstructingObjectParser<>(NAME, true,
a -> new DataFrameAnalyticsTaskState((DataFrameAnalyticsState) a[0], (long) a[1])); a -> new DataFrameAnalyticsTaskState((DataFrameAnalyticsState) a[0], (long) a[1], (String) a[2]));
static { static {
PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> { PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> {
@ -41,6 +44,7 @@ public class DataFrameAnalyticsTaskState implements PersistentTaskState {
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, STATE, ObjectParser.ValueType.STRING); }, STATE, ObjectParser.ValueType.STRING);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), ALLOCATION_ID); PARSER.declareLong(ConstructingObjectParser.constructorArg(), ALLOCATION_ID);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REASON);
} }
public static DataFrameAnalyticsTaskState fromXContent(XContentParser parser) { public static DataFrameAnalyticsTaskState fromXContent(XContentParser parser) {
@ -51,20 +55,27 @@ public class DataFrameAnalyticsTaskState implements PersistentTaskState {
} }
} }
public DataFrameAnalyticsTaskState(DataFrameAnalyticsState state, long allocationId) { public DataFrameAnalyticsTaskState(DataFrameAnalyticsState state, long allocationId, @Nullable String reason) {
this.state = Objects.requireNonNull(state); this.state = Objects.requireNonNull(state);
this.allocationId = allocationId; this.allocationId = allocationId;
this.reason = reason;
} }
public DataFrameAnalyticsTaskState(StreamInput in) throws IOException { public DataFrameAnalyticsTaskState(StreamInput in) throws IOException {
this.state = DataFrameAnalyticsState.fromStream(in); this.state = DataFrameAnalyticsState.fromStream(in);
this.allocationId = in.readLong(); this.allocationId = in.readLong();
this.reason = in.readOptionalString();
} }
public DataFrameAnalyticsState getState() { public DataFrameAnalyticsState getState() {
return state; return state;
} }
@Nullable
public String getReason() {
return reason;
}
public boolean isStatusStale(PersistentTasksCustomMetaData.PersistentTask<?> task) { public boolean isStatusStale(PersistentTasksCustomMetaData.PersistentTask<?> task) {
return allocationId != task.getAllocationId(); return allocationId != task.getAllocationId();
} }
@ -78,6 +89,7 @@ public class DataFrameAnalyticsTaskState implements PersistentTaskState {
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
state.writeTo(out); state.writeTo(out);
out.writeLong(allocationId); out.writeLong(allocationId);
out.writeOptionalString(reason);
} }
@Override @Override
@ -85,6 +97,9 @@ public class DataFrameAnalyticsTaskState implements PersistentTaskState {
builder.startObject(); builder.startObject();
builder.field(STATE.getPreferredName(), state.toString()); builder.field(STATE.getPreferredName(), state.toString());
builder.field(ALLOCATION_ID.getPreferredName(), allocationId); builder.field(ALLOCATION_ID.getPreferredName(), allocationId);
if (reason != null) {
builder.field(REASON.getPreferredName(), reason);
}
builder.endObject(); builder.endObject();
return builder; return builder;
} }
@ -95,11 +110,12 @@ public class DataFrameAnalyticsTaskState implements PersistentTaskState {
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
DataFrameAnalyticsTaskState that = (DataFrameAnalyticsTaskState) o; DataFrameAnalyticsTaskState that = (DataFrameAnalyticsTaskState) o;
return allocationId == that.allocationId && return allocationId == that.allocationId &&
state == that.state; state == that.state &&
Objects.equals(reason, that.reason);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(state, allocationId); return Objects.hash(state, allocationId, reason);
} }
} }

View File

@ -23,8 +23,9 @@ public class GetDataFrameAnalyticsStatsActionResponseTests extends AbstractWireS
List<Response.Stats> analytics = new ArrayList<>(listSize); List<Response.Stats> analytics = new ArrayList<>(listSize);
for (int j = 0; j < listSize; j++) { for (int j = 0; j < listSize; j++) {
Integer progressPercentage = randomBoolean() ? null : randomIntBetween(0, 100); Integer progressPercentage = randomBoolean() ? null : randomIntBetween(0, 100);
String failureReason = randomBoolean() ? null : randomAlphaOfLength(10);
Response.Stats stats = new Response.Stats(DataFrameAnalyticsConfigTests.randomValidId(), Response.Stats stats = new Response.Stats(DataFrameAnalyticsConfigTests.randomValidId(),
randomFrom(DataFrameAnalyticsState.values()), progressPercentage, null, randomAlphaOfLength(20)); randomFrom(DataFrameAnalyticsState.values()), failureReason, progressPercentage, null, randomAlphaOfLength(20));
analytics.add(stats); analytics.add(stats);
} }
return new Response(new QueryPage<>(analytics, analytics.size(), GetDataFrameAnalyticsAction.Response.RESULTS_FIELD)); return new Response(new QueryPage<>(analytics, analytics.size(), GetDataFrameAnalyticsAction.Response.RESULTS_FIELD));

View File

@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
public class StartDataFrameAnalyticsActionTaskParamsTests extends AbstractSerializingTestCase<StartDataFrameAnalyticsAction.TaskParams> {
@Override
protected StartDataFrameAnalyticsAction.TaskParams doParseInstance(XContentParser parser) throws IOException {
return StartDataFrameAnalyticsAction.TaskParams.fromXContent(parser);
}
@Override
protected StartDataFrameAnalyticsAction.TaskParams createTestInstance() {
return new StartDataFrameAnalyticsAction.TaskParams(randomAlphaOfLength(10), Version.CURRENT);
}
@Override
protected Writeable.Reader<StartDataFrameAnalyticsAction.TaskParams> instanceReader() {
return StartDataFrameAnalyticsAction.TaskParams::new;
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
}

View File

@ -24,6 +24,9 @@ public class StopDataFrameAnalyticsRequestTests extends AbstractWireSerializingT
if (randomBoolean()) { if (randomBoolean()) {
request.setAllowNoMatch(randomBoolean()); request.setAllowNoMatch(randomBoolean());
} }
if (randomBoolean()) {
request.setForce(randomBoolean());
}
int expandedIdsCount = randomIntBetween(0, 10); int expandedIdsCount = randomIntBetween(0, 10);
Set<String> expandedIds = new HashSet<>(); Set<String> expandedIds = new HashSet<>();
for (int i = 0; i < expandedIdsCount; i++) { for (int i = 0; i < expandedIdsCount; i++) {

View File

@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.dataframe;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
public class DataFrameAnalyticsStateTests extends ESTestCase {
public void testFromString() {
assertThat(DataFrameAnalyticsState.fromString("started"), equalTo(DataFrameAnalyticsState.STARTED));
assertThat(DataFrameAnalyticsState.fromString("reindexing"), equalTo(DataFrameAnalyticsState.REINDEXING));
assertThat(DataFrameAnalyticsState.fromString("analyzing"), equalTo(DataFrameAnalyticsState.ANALYZING));
assertThat(DataFrameAnalyticsState.fromString("stopping"), equalTo(DataFrameAnalyticsState.STOPPING));
assertThat(DataFrameAnalyticsState.fromString("stopped"), equalTo(DataFrameAnalyticsState.STOPPED));
assertThat(DataFrameAnalyticsState.fromString("failed"), equalTo(DataFrameAnalyticsState.FAILED));
}
public void testToString() {
assertThat(DataFrameAnalyticsState.STARTED.toString(), equalTo("started"));
assertThat(DataFrameAnalyticsState.REINDEXING.toString(), equalTo("reindexing"));
assertThat(DataFrameAnalyticsState.ANALYZING.toString(), equalTo("analyzing"));
assertThat(DataFrameAnalyticsState.STOPPING.toString(), equalTo("stopping"));
assertThat(DataFrameAnalyticsState.STOPPED.toString(), equalTo("stopped"));
assertThat(DataFrameAnalyticsState.FAILED.toString(), equalTo("failed"));
}
public void testIsAnyOf() {
assertThat(DataFrameAnalyticsState.STARTED.isAnyOf(), is(false));
assertThat(DataFrameAnalyticsState.STARTED.isAnyOf(DataFrameAnalyticsState.STARTED), is(true));
assertThat(DataFrameAnalyticsState.STARTED.isAnyOf(DataFrameAnalyticsState.ANALYZING, DataFrameAnalyticsState.STOPPED), is(false));
assertThat(DataFrameAnalyticsState.STARTED.isAnyOf(DataFrameAnalyticsState.STARTED, DataFrameAnalyticsState.STOPPED), is(true));
assertThat(DataFrameAnalyticsState.ANALYZING.isAnyOf(DataFrameAnalyticsState.STARTED, DataFrameAnalyticsState.STOPPED), is(false));
assertThat(DataFrameAnalyticsState.ANALYZING.isAnyOf(DataFrameAnalyticsState.ANALYZING, DataFrameAnalyticsState.FAILED), is(true));
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.dataframe;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
public class DataFrameAnalyticsTaskStateTests extends AbstractSerializingTestCase<DataFrameAnalyticsTaskState> {
@Override
protected DataFrameAnalyticsTaskState createTestInstance() {
return new DataFrameAnalyticsTaskState(randomFrom(DataFrameAnalyticsState.values()), randomLong(), randomAlphaOfLength(10));
}
@Override
protected Writeable.Reader<DataFrameAnalyticsTaskState> instanceReader() {
return DataFrameAnalyticsTaskState::new;
}
@Override
protected DataFrameAnalyticsTaskState doParseInstance(XContentParser parser) throws IOException {
return DataFrameAnalyticsTaskState.fromXContent(parser);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
}

View File

@ -501,7 +501,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
new BlackHoleAutodetectProcess(job.getId()); new BlackHoleAutodetectProcess(job.getId());
// factor of 1.0 makes renormalization a no-op // factor of 1.0 makes renormalization a no-op
normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) -> new MultiplyingNormalizerProcess(1.0); normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) -> new MultiplyingNormalizerProcess(1.0);
analyticsProcessFactory = (jobId, analyticsProcessConfig, executorService) -> null; analyticsProcessFactory = (jobId, analyticsProcessConfig, executorService, onProcessCrash) -> null;
} }
NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory, NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory,
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)); threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME));

View File

@ -34,6 +34,7 @@ import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction.Response.Stats; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction.Response.Stats;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask; import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.process.AnalyticsProcessManager; import org.elasticsearch.xpack.ml.dataframe.process.AnalyticsProcessManager;
@ -178,6 +179,11 @@ public class TransportGetDataFrameAnalyticsStatsAction
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData.PersistentTask<?> analyticsTask = MlTasks.getDataFrameAnalyticsTask(concreteAnalyticsId, tasks); PersistentTasksCustomMetaData.PersistentTask<?> analyticsTask = MlTasks.getDataFrameAnalyticsTask(concreteAnalyticsId, tasks);
DataFrameAnalyticsState analyticsState = MlTasks.getDataFrameAnalyticsState(concreteAnalyticsId, tasks); DataFrameAnalyticsState analyticsState = MlTasks.getDataFrameAnalyticsState(concreteAnalyticsId, tasks);
String failureReason = null;
if (analyticsState == DataFrameAnalyticsState.FAILED) {
DataFrameAnalyticsTaskState taskState = (DataFrameAnalyticsTaskState) analyticsTask.getState();
failureReason = taskState.getReason();
}
DiscoveryNode node = null; DiscoveryNode node = null;
String assignmentExplanation = null; String assignmentExplanation = null;
if (analyticsTask != null) { if (analyticsTask != null) {
@ -185,6 +191,6 @@ public class TransportGetDataFrameAnalyticsStatsAction
assignmentExplanation = analyticsTask.getAssignment().getExplanation(); assignmentExplanation = analyticsTask.getAssignment().getExplanation();
} }
return new GetDataFrameAnalyticsStatsAction.Response.Stats( return new GetDataFrameAnalyticsStatsAction.Response.Stats(
concreteAnalyticsId, analyticsState, progressPercent, node, assignmentExplanation); concreteAnalyticsId, analyticsState, failureReason, progressPercent, node, assignmentExplanation);
} }
} }

View File

@ -61,6 +61,7 @@ import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate; import java.util.function.Predicate;
import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE; import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE;
@ -130,8 +131,6 @@ public class TransportStartDataFrameAnalyticsAction
return; return;
} }
StartDataFrameAnalyticsAction.TaskParams taskParams = new StartDataFrameAnalyticsAction.TaskParams(request.getId());
// Wait for analytics to be started // Wait for analytics to be started
ActionListener<PersistentTasksCustomMetaData.PersistentTask<StartDataFrameAnalyticsAction.TaskParams>> waitForAnalyticsToStart = ActionListener<PersistentTasksCustomMetaData.PersistentTask<StartDataFrameAnalyticsAction.TaskParams>> waitForAnalyticsToStart =
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<StartDataFrameAnalyticsAction.TaskParams>>() { new ActionListener<PersistentTasksCustomMetaData.PersistentTask<StartDataFrameAnalyticsAction.TaskParams>>() {
@ -150,17 +149,26 @@ public class TransportStartDataFrameAnalyticsAction
} }
}; };
AtomicReference<DataFrameAnalyticsConfig> configHolder = new AtomicReference<>();
// Start persistent task // Start persistent task
ActionListener<Void> memoryRequirementRefreshListener = ActionListener.wrap( ActionListener<Void> memoryRequirementRefreshListener = ActionListener.wrap(
validated -> persistentTasksService.sendStartRequest(MlTasks.dataFrameAnalyticsTaskId(request.getId()), aVoid -> {
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, taskParams, waitForAnalyticsToStart), StartDataFrameAnalyticsAction.TaskParams taskParams = new StartDataFrameAnalyticsAction.TaskParams(
request.getId(), configHolder.get().getVersion());
persistentTasksService.sendStartRequest(MlTasks.dataFrameAnalyticsTaskId(request.getId()),
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, taskParams, waitForAnalyticsToStart);
},
listener::onFailure listener::onFailure
); );
// Tell the job tracker to refresh the memory requirement for this job and all other jobs that have persistent tasks // Tell the job tracker to refresh the memory requirement for this job and all other jobs that have persistent tasks
ActionListener<DataFrameAnalyticsConfig> configListener = ActionListener.wrap( ActionListener<DataFrameAnalyticsConfig> configListener = ActionListener.wrap(
config -> memoryTracker.addDataFrameAnalyticsJobMemoryAndRefreshAllOthers( config -> {
request.getId(), config.getModelMemoryLimit().getBytes(), memoryRequirementRefreshListener), configHolder.set(config);
memoryTracker.addDataFrameAnalyticsJobMemoryAndRefreshAllOthers(
request.getId(), config.getModelMemoryLimit().getBytes(), memoryRequirementRefreshListener);
},
listener::onFailure listener::onFailure
); );
@ -250,7 +258,21 @@ public class TransportStartDataFrameAnalyticsAction
} }
DataFrameAnalyticsTaskState taskState = (DataFrameAnalyticsTaskState) persistentTask.getState(); DataFrameAnalyticsTaskState taskState = (DataFrameAnalyticsTaskState) persistentTask.getState();
DataFrameAnalyticsState analyticsState = taskState == null ? DataFrameAnalyticsState.STOPPED : taskState.getState(); DataFrameAnalyticsState analyticsState = taskState == null ? DataFrameAnalyticsState.STOPPED : taskState.getState();
return analyticsState == DataFrameAnalyticsState.STARTED; switch (analyticsState) {
case STARTED:
case REINDEXING:
case ANALYZING:
return true;
case STOPPING:
exception = ExceptionsHelper.conflictStatusException("the task has been stopped while waiting to be started");
return true;
case STOPPED:
return false;
case FAILED:
default:
exception = ExceptionsHelper.serverError("Unexpected task state [" + analyticsState + "] while waiting to be started");
return true;
}
} }
} }
@ -424,13 +446,15 @@ public class TransportStartDataFrameAnalyticsAction
DataFrameAnalyticsTaskState analyticsTaskState = (DataFrameAnalyticsTaskState) state; DataFrameAnalyticsTaskState analyticsTaskState = (DataFrameAnalyticsTaskState) state;
// If we are "stopping" there is nothing to do // If we are "stopping" there is nothing to do
if (analyticsTaskState != null && analyticsTaskState.getState() == DataFrameAnalyticsState.STOPPING) { // If we are "failed" then we should leave the task as is; for recovery it must be force stopped.
if (analyticsTaskState != null && analyticsTaskState.getState().isAnyOf(
DataFrameAnalyticsState.STOPPING, DataFrameAnalyticsState.FAILED)) {
return; return;
} }
if (analyticsTaskState == null) { if (analyticsTaskState == null) {
DataFrameAnalyticsTaskState startedState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.STARTED, DataFrameAnalyticsTaskState startedState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.STARTED,
task.getAllocationId()); task.getAllocationId(), null);
task.updatePersistentTaskState(startedState, ActionListener.wrap( task.updatePersistentTaskState(startedState, ActionListener.wrap(
response -> manager.execute((DataFrameAnalyticsTask) task, DataFrameAnalyticsState.STARTED), response -> manager.execute((DataFrameAnalyticsTask) task, DataFrameAnalyticsState.STARTED),
task::markAsFailed)); task::markAsFailed));

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.action;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionListenerResponseHandler;
@ -31,6 +32,7 @@ import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider; import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
@ -89,13 +91,10 @@ public class TransportStopDataFrameAnalyticsAction
return; return;
} }
Set<String> startedAnalytics = new HashSet<>();
Set<String> stoppingAnalytics = new HashSet<>();
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
sortAnalyticsByTaskState(expandedIds, tasks, startedAnalytics, stoppingAnalytics); Set<String> analyticsToStop = findAnalyticsToStop(tasks, expandedIds, request.isForce());
request.setExpandedIds(analyticsToStop);
request.setExpandedIds(startedAnalytics); request.setNodes(findAllocatedNodesAndRemoveUnassignedTasks(analyticsToStop, tasks));
request.setNodes(findAllocatedNodesAndRemoveUnassignedTasks(startedAnalytics, tasks));
ActionListener<StopDataFrameAnalyticsAction.Response> finalListener = ActionListener.wrap( ActionListener<StopDataFrameAnalyticsAction.Response> finalListener = ActionListener.wrap(
r -> waitForTaskRemoved(expandedIds, request, r, listener), r -> waitForTaskRemoved(expandedIds, request, r, listener),
@ -110,8 +109,28 @@ public class TransportStopDataFrameAnalyticsAction
expandIds(state, request, expandedIdsListener); expandIds(state, request, expandedIdsListener);
} }
/** Visible for testing */
static Set<String> findAnalyticsToStop(PersistentTasksCustomMetaData tasks, Set<String> ids, boolean force) {
Set<String> startedAnalytics = new HashSet<>();
Set<String> stoppingAnalytics = new HashSet<>();
Set<String> failedAnalytics = new HashSet<>();
sortAnalyticsByTaskState(ids, tasks, startedAnalytics, stoppingAnalytics, failedAnalytics);
if (force == false && failedAnalytics.isEmpty() == false) {
ElasticsearchStatusException e = failedAnalytics.size() == 1 ? ExceptionsHelper.conflictStatusException(
"cannot close data frame analytics [{}] because it failed, use force stop instead", failedAnalytics.iterator().next()) :
ExceptionsHelper.conflictStatusException("one or more data frame analytics are in failed state, " +
"use force stop instead");
throw e;
}
startedAnalytics.addAll(failedAnalytics);
return startedAnalytics;
}
private static void sortAnalyticsByTaskState(Set<String> analyticsIds, PersistentTasksCustomMetaData tasks, private static void sortAnalyticsByTaskState(Set<String> analyticsIds, PersistentTasksCustomMetaData tasks,
Set<String> startedAnalytics, Set<String> stoppingAnalytics) { Set<String> startedAnalytics, Set<String> stoppingAnalytics,
Set<String> failedAnalytics) {
for (String analyticsId : analyticsIds) { for (String analyticsId : analyticsIds) {
switch (MlTasks.getDataFrameAnalyticsState(analyticsId, tasks)) { switch (MlTasks.getDataFrameAnalyticsState(analyticsId, tasks)) {
case STARTED: case STARTED:
@ -124,6 +143,9 @@ public class TransportStopDataFrameAnalyticsAction
break; break;
case STOPPED: case STOPPED:
break; break;
case FAILED:
failedAnalytics.add(analyticsId);
break;
default: default:
break; break;
} }
@ -203,7 +225,7 @@ public class TransportStopDataFrameAnalyticsAction
TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask task, TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask task,
ActionListener<StopDataFrameAnalyticsAction.Response> listener) { ActionListener<StopDataFrameAnalyticsAction.Response> listener) {
DataFrameAnalyticsTaskState stoppingState = DataFrameAnalyticsTaskState stoppingState =
new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.STOPPING, task.getAllocationId()); new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.STOPPING, task.getAllocationId(), null);
task.updatePersistentTaskState(stoppingState, ActionListener.wrap(pTask -> { task.updatePersistentTaskState(stoppingState, ActionListener.wrap(pTask -> {
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {
@Override @Override
@ -213,6 +235,7 @@ public class TransportStopDataFrameAnalyticsAction
@Override @Override
protected void doRun() { protected void doRun() {
logger.info("[{}] Stopping task with force [{}]", task.getParams().getId(), request.isForce());
task.stop("stop_data_frame_analytics (api)", request.getTimeout()); task.stop("stop_data_frame_analytics (api)", request.getTimeout());
listener.onResponse(new StopDataFrameAnalyticsAction.Response(true)); listener.onResponse(new StopDataFrameAnalyticsAction.Response(true));
} }

View File

@ -70,7 +70,7 @@ public class DataFrameAnalyticsManager {
ActionListener<DataFrameAnalyticsConfig> configListener = ActionListener.wrap( ActionListener<DataFrameAnalyticsConfig> configListener = ActionListener.wrap(
config -> { config -> {
DataFrameAnalyticsTaskState reindexingState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.REINDEXING, DataFrameAnalyticsTaskState reindexingState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.REINDEXING,
task.getAllocationId()); task.getAllocationId(), null);
switch(currentState) { switch(currentState) {
// If we are STARTED, we are right at the beginning of our task, we should indicate that we are entering the // If we are STARTED, we are right at the beginning of our task, we should indicate that we are entering the
// REINDEX state and start reindexing. // REINDEX state and start reindexing.
@ -191,7 +191,7 @@ public class DataFrameAnalyticsManager {
ActionListener<DataFrameDataExtractorFactory> dataExtractorFactoryListener = ActionListener.wrap( ActionListener<DataFrameDataExtractorFactory> dataExtractorFactoryListener = ActionListener.wrap(
dataExtractorFactory -> { dataExtractorFactory -> {
DataFrameAnalyticsTaskState analyzingState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.ANALYZING, DataFrameAnalyticsTaskState analyzingState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.ANALYZING,
task.getAllocationId()); task.getAllocationId(), null);
task.updatePersistentTaskState(analyzingState, ActionListener.wrap( task.updatePersistentTaskState(analyzingState, ActionListener.wrap(
updatedTask -> processManager.runJob(task, config, dataExtractorFactory, updatedTask -> processManager.runJob(task, config, dataExtractorFactory,
error -> { error -> {

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.dataframe.process; package org.elasticsearch.xpack.ml.dataframe.process;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
public interface AnalyticsProcessFactory { public interface AnalyticsProcessFactory {
@ -15,7 +16,9 @@ public interface AnalyticsProcessFactory {
* @param jobId The job id * @param jobId The job id
* @param analyticsProcessConfig The process configuration * @param analyticsProcessConfig The process configuration
* @param executorService Executor service used to start the async tasks a job needs to operate the analytical process * @param executorService Executor service used to start the async tasks a job needs to operate the analytical process
* @param onProcessCrash Callback to execute if the process stops unexpectedly
* @return The process * @return The process
*/ */
AnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, ExecutorService executorService); AnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, ExecutorService executorService,
Consumer<String> onProcessCrash);
} }

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.dataframe.process;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
@ -15,9 +16,11 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction; import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor; import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory; import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory;
@ -46,7 +49,7 @@ public class AnalyticsProcessManager {
this.processFactory = Objects.requireNonNull(analyticsProcessFactory); this.processFactory = Objects.requireNonNull(analyticsProcessFactory);
} }
public void runJob(TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config,
DataFrameDataExtractorFactory dataExtractorFactory, Consumer<Exception> finishHandler) { DataFrameDataExtractorFactory dataExtractorFactory, Consumer<Exception> finishHandler) {
threadPool.generic().execute(() -> { threadPool.generic().execute(() -> {
if (task.isStopping()) { if (task.isStopping()) {
@ -61,10 +64,10 @@ public class AnalyticsProcessManager {
+ "] Could not create process as one already exists")); + "] Could not create process as one already exists"));
return; return;
} }
if (processContext.startProcess(dataExtractorFactory, config)) { if (processContext.startProcess(dataExtractorFactory, config, task)) {
ExecutorService executorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME); ExecutorService executorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME);
executorService.execute(() -> processContext.resultProcessor.process(processContext.process)); executorService.execute(() -> processResults(processContext));
executorService.execute(() -> processData(task.getAllocationId(), config, processContext.dataExtractor, executorService.execute(() -> processData(task, config, processContext.dataExtractor,
processContext.process, processContext.resultProcessor, finishHandler)); processContext.process, processContext.resultProcessor, finishHandler));
} else { } else {
finishHandler.accept(null); finishHandler.accept(null);
@ -72,8 +75,17 @@ public class AnalyticsProcessManager {
}); });
} }
private void processData(long taskAllocationId, DataFrameAnalyticsConfig config, DataFrameDataExtractor dataExtractor, private void processResults(ProcessContext processContext) {
try {
processContext.resultProcessor.process(processContext.process);
} catch (Exception e) {
processContext.setFailureReason(e.getMessage());
}
}
private void processData(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, DataFrameDataExtractor dataExtractor,
AnalyticsProcess process, AnalyticsResultProcessor resultProcessor, Consumer<Exception> finishHandler) { AnalyticsProcess process, AnalyticsResultProcessor resultProcessor, Consumer<Exception> finishHandler) {
try { try {
writeHeaderRecord(dataExtractor, process); writeHeaderRecord(dataExtractor, process);
writeDataRows(dataExtractor, process); writeDataRows(dataExtractor, process);
@ -82,26 +94,28 @@ public class AnalyticsProcessManager {
LOGGER.info("[{}] Waiting for result processor to complete", config.getId()); LOGGER.info("[{}] Waiting for result processor to complete", config.getId());
resultProcessor.awaitForCompletion(); resultProcessor.awaitForCompletion();
processContextByAllocation.get(task.getAllocationId()).setFailureReason(resultProcessor.getFailure());
refreshDest(config); refreshDest(config);
LOGGER.info("[{}] Result processor has completed", config.getId()); LOGGER.info("[{}] Result processor has completed", config.getId());
} catch (IOException e) { } catch (Exception e) {
LOGGER.error(new ParameterizedMessage("[{}] Error writing data to the process", config.getId()), e); String errorMsg = new ParameterizedMessage("[{}] Error while processing data", config.getId()).getFormattedMessage();
// TODO Handle this failure by setting the task state to FAILED processContextByAllocation.get(task.getAllocationId()).setFailureReason(errorMsg);
} finally { } finally {
LOGGER.info("[{}] Closing process", config.getId()); closeProcess(task);
try {
process.close();
LOGGER.info("[{}] Closed process", config.getId());
// This results in marking the persistent task as complete ProcessContext processContext = processContextByAllocation.remove(task.getAllocationId());
finishHandler.accept(null);
} catch (IOException e) {
LOGGER.error("[{}] Error closing data frame analyzer process", config.getId());
finishHandler.accept(e);
}
processContextByAllocation.remove(taskAllocationId);
LOGGER.debug("Removed process context for task [{}]; [{}] processes still running", config.getId(), LOGGER.debug("Removed process context for task [{}]; [{}] processes still running", config.getId(),
processContextByAllocation.size()); processContextByAllocation.size());
if (processContext.getFailureReason() == null) {
// This results in marking the persistent task as complete
LOGGER.info("[{}] Marking task completed", config.getId());
finishHandler.accept(null);
} else {
LOGGER.error("[{}] Marking task failed; {}", config.getId(), processContext.getFailureReason());
updateTaskState(task, DataFrameAnalyticsState.FAILED, processContext.getFailureReason());
}
} }
} }
@ -142,15 +156,34 @@ public class AnalyticsProcessManager {
process.writeRecord(headerRecord); process.writeRecord(headerRecord);
} }
private AnalyticsProcess createProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig) { private AnalyticsProcess createProcess(DataFrameAnalyticsTask task, AnalyticsProcessConfig analyticsProcessConfig) {
ExecutorService executorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME); ExecutorService executorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME);
AnalyticsProcess process = processFactory.createAnalyticsProcess(jobId, analyticsProcessConfig, executorService); AnalyticsProcess process = processFactory.createAnalyticsProcess(task.getParams().getId(), analyticsProcessConfig,
executorService, onProcessCrash(task));
if (process.isProcessAlive() == false) { if (process.isProcessAlive() == false) {
throw ExceptionsHelper.serverError("Failed to start data frame analytics process"); throw ExceptionsHelper.serverError("Failed to start data frame analytics process");
} }
return process; return process;
} }
private Consumer<String> onProcessCrash(DataFrameAnalyticsTask task) {
return reason -> {
ProcessContext processContext = processContextByAllocation.get(task.getAllocationId());
if (processContext != null) {
processContext.setFailureReason(reason);
processContext.stop();
}
};
}
private void updateTaskState(DataFrameAnalyticsTask task, DataFrameAnalyticsState state, @Nullable String reason) {
DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(state, task.getAllocationId(), reason);
task.updatePersistentTaskState(newTaskState, ActionListener.wrap(
updatedTask -> LOGGER.info("[{}] Successfully update task state to [{}]", task.getParams().getId(), state),
e -> LOGGER.error(new ParameterizedMessage("[{}] Could not update task state to [{}]", task.getParams().getId(), state), e)
));
}
@Nullable @Nullable
public Integer getProgressPercent(long allocationId) { public Integer getProgressPercent(long allocationId) {
ProcessContext processContext = processContextByAllocation.get(allocationId); ProcessContext processContext = processContextByAllocation.get(allocationId);
@ -162,13 +195,29 @@ public class AnalyticsProcessManager {
() -> client.execute(RefreshAction.INSTANCE, new RefreshRequest(config.getDest().getIndex())).actionGet()); () -> client.execute(RefreshAction.INSTANCE, new RefreshRequest(config.getDest().getIndex())).actionGet());
} }
public void stop(TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask task) { private void closeProcess(DataFrameAnalyticsTask task) {
String configId = task.getParams().getId();
LOGGER.info("[{}] Closing process", configId);
ProcessContext processContext = processContextByAllocation.get(task.getAllocationId());
try {
processContext.process.close();
LOGGER.info("[{}] Closed process", configId);
} catch (IOException e) {
String errorMsg = new ParameterizedMessage("[{}] Error closing data frame analyzer process [{}]"
, configId, e.getMessage()).getFormattedMessage();
processContext.setFailureReason(errorMsg);
}
}
public void stop(DataFrameAnalyticsTask task) {
ProcessContext processContext = processContextByAllocation.get(task.getAllocationId()); ProcessContext processContext = processContextByAllocation.get(task.getAllocationId());
if (processContext != null) { if (processContext != null) {
LOGGER.debug("[{}] Stopping process", task.getParams().getId() ); LOGGER.debug("[{}] Stopping process", task.getParams().getId() );
processContext.stop(); processContext.stop();
} else { } else {
LOGGER.debug("[{}] No process context to stop", task.getParams().getId() ); LOGGER.debug("[{}] No process context to stop", task.getParams().getId() );
task.markAsCompleted();
} }
} }
@ -180,6 +229,7 @@ public class AnalyticsProcessManager {
private volatile AnalyticsResultProcessor resultProcessor; private volatile AnalyticsResultProcessor resultProcessor;
private final AtomicInteger progressPercent = new AtomicInteger(0); private final AtomicInteger progressPercent = new AtomicInteger(0);
private volatile boolean processKilled; private volatile boolean processKilled;
private volatile String failureReason;
ProcessContext(String id) { ProcessContext(String id) {
this.id = Objects.requireNonNull(id); this.id = Objects.requireNonNull(id);
@ -197,6 +247,17 @@ public class AnalyticsProcessManager {
this.progressPercent.set(progressPercent); this.progressPercent.set(progressPercent);
} }
private synchronized void setFailureReason(String failureReason) {
// Only set the new reason if there isn't one already as we want to keep the first reason
if (failureReason != null) {
this.failureReason = failureReason;
}
}
private String getFailureReason() {
return failureReason;
}
public synchronized void stop() { public synchronized void stop() {
LOGGER.debug("[{}] Stopping process", id); LOGGER.debug("[{}] Stopping process", id);
processKilled = true; processKilled = true;
@ -215,14 +276,15 @@ public class AnalyticsProcessManager {
/** /**
* @return {@code true} if the process was started or {@code false} if it was not because it was stopped in the meantime * @return {@code true} if the process was started or {@code false} if it was not because it was stopped in the meantime
*/ */
private synchronized boolean startProcess(DataFrameDataExtractorFactory dataExtractorFactory, DataFrameAnalyticsConfig config) { private synchronized boolean startProcess(DataFrameDataExtractorFactory dataExtractorFactory, DataFrameAnalyticsConfig config,
DataFrameAnalyticsTask task) {
if (processKilled) { if (processKilled) {
// The job was stopped before we started the process so no need to start it // The job was stopped before we started the process so no need to start it
return false; return false;
} }
dataExtractor = dataExtractorFactory.newExtractor(false); dataExtractor = dataExtractorFactory.newExtractor(false);
process = createProcess(config.getId(), createProcessConfig(config, dataExtractor)); process = createProcess(task, createProcessConfig(config, dataExtractor));
DataFrameRowsJoiner dataFrameRowsJoiner = new DataFrameRowsJoiner(config.getId(), client, DataFrameRowsJoiner dataFrameRowsJoiner = new DataFrameRowsJoiner(config.getId(), client,
dataExtractorFactory.newExtractor(true)); dataExtractorFactory.newExtractor(true));
resultProcessor = new AnalyticsResultProcessor(id, dataFrameRowsJoiner, this::isProcessKilled, this::setProgressPercent); resultProcessor = new AnalyticsResultProcessor(id, dataFrameRowsJoiner, this::isProcessKilled, this::setProgressPercent);

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.dataframe.process;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.xpack.ml.dataframe.process.results.RowResults; import org.elasticsearch.xpack.ml.dataframe.process.results.RowResults;
import java.util.Iterator; import java.util.Iterator;
@ -26,6 +27,7 @@ public class AnalyticsResultProcessor {
private final Supplier<Boolean> isProcessKilled; private final Supplier<Boolean> isProcessKilled;
private final Consumer<Integer> progressConsumer; private final Consumer<Integer> progressConsumer;
private final CountDownLatch completionLatch = new CountDownLatch(1); private final CountDownLatch completionLatch = new CountDownLatch(1);
private volatile String failure;
public AnalyticsResultProcessor(String dataFrameAnalyticsId, DataFrameRowsJoiner dataFrameRowsJoiner, Supplier<Boolean> isProcessKilled, public AnalyticsResultProcessor(String dataFrameAnalyticsId, DataFrameRowsJoiner dataFrameRowsJoiner, Supplier<Boolean> isProcessKilled,
Consumer<Integer> progressConsumer) { Consumer<Integer> progressConsumer) {
@ -35,6 +37,11 @@ public class AnalyticsResultProcessor {
this.progressConsumer = Objects.requireNonNull(progressConsumer); this.progressConsumer = Objects.requireNonNull(progressConsumer);
} }
@Nullable
public String getFailure() {
return failure == null ? dataFrameRowsJoiner.getFailure() : failure;
}
public void awaitForCompletion() { public void awaitForCompletion() {
try { try {
if (completionLatch.await(30, TimeUnit.MINUTES) == false) { if (completionLatch.await(30, TimeUnit.MINUTES) == false) {
@ -59,6 +66,7 @@ public class AnalyticsResultProcessor {
// No need to log error as it's due to stopping // No need to log error as it's due to stopping
} else { } else {
LOGGER.error(new ParameterizedMessage("[{}] Error parsing data frame analytics output", dataFrameAnalyticsId), e); LOGGER.error(new ParameterizedMessage("[{}] Error parsing data frame analytics output", dataFrameAnalyticsId), e);
failure = "error parsing data frame analytics output: [" + e.getMessage() + "]";
} }
} finally { } finally {
completionLatch.countDown(); completionLatch.countDown();

View File

@ -14,6 +14,7 @@ import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
@ -41,7 +42,7 @@ class DataFrameRowsJoiner implements AutoCloseable {
private final DataFrameDataExtractor dataExtractor; private final DataFrameDataExtractor dataExtractor;
private final Iterator<DataFrameDataExtractor.Row> dataFrameRowsIterator; private final Iterator<DataFrameDataExtractor.Row> dataFrameRowsIterator;
private LinkedList<RowResults> currentResults; private LinkedList<RowResults> currentResults;
private boolean failed; private volatile String failure;
DataFrameRowsJoiner(String analyticsId, Client client, DataFrameDataExtractor dataExtractor) { DataFrameRowsJoiner(String analyticsId, Client client, DataFrameDataExtractor dataExtractor) {
this.analyticsId = Objects.requireNonNull(analyticsId); this.analyticsId = Objects.requireNonNull(analyticsId);
@ -51,8 +52,13 @@ class DataFrameRowsJoiner implements AutoCloseable {
this.currentResults = new LinkedList<>(); this.currentResults = new LinkedList<>();
} }
@Nullable
String getFailure() {
return failure;
}
void processRowResults(RowResults rowResults) { void processRowResults(RowResults rowResults) {
if (failed) { if (failure != null) {
// If we are in failed state we drop the results but we let the processor // If we are in failed state we drop the results but we let the processor
// parse the output // parse the output
return; return;
@ -61,8 +67,8 @@ class DataFrameRowsJoiner implements AutoCloseable {
try { try {
addResultAndJoinIfEndOfBatch(rowResults); addResultAndJoinIfEndOfBatch(rowResults);
} catch (Exception e) { } catch (Exception e) {
LOGGER.error(new ParameterizedMessage("[{}] Failed to join results", analyticsId), e); LOGGER.error(new ParameterizedMessage("[{}] Failed to join results ", analyticsId), e);
failed = true; failure = "[" + analyticsId + "] Failed to join results: " + e.getMessage();
} }
} }
@ -93,8 +99,7 @@ class DataFrameRowsJoiner implements AutoCloseable {
msg += "expected [" + row.getChecksum() + "] but result had [" + result.getChecksum() + "]; "; msg += "expected [" + row.getChecksum() + "] but result had [" + result.getChecksum() + "]; ";
msg += "this implies the data frame index [" + row.getHit().getIndex() + "] was modified while the analysis was running. "; msg += "this implies the data frame index [" + row.getHit().getIndex() + "] was modified while the analysis was running. ";
msg += "We rely on this index being immutable during a running analysis and so the results will be unreliable."; msg += "We rely on this index being immutable during a running analysis and so the results will be unreliable.";
throw new RuntimeException(msg); throw ExceptionsHelper.serverError(msg);
// TODO Communicate this error to the user as effectively the analytics have failed (e.g. FAILED state, audit error, etc.)
} }
} }
@ -112,8 +117,7 @@ class DataFrameRowsJoiner implements AutoCloseable {
BulkResponse bulkResponse = ClientHelper.executeWithHeaders(dataExtractor.getHeaders(), ClientHelper.ML_ORIGIN, client, BulkResponse bulkResponse = ClientHelper.executeWithHeaders(dataExtractor.getHeaders(), ClientHelper.ML_ORIGIN, client,
() -> client.execute(BulkAction.INSTANCE, bulkRequest).actionGet()); () -> client.execute(BulkAction.INSTANCE, bulkRequest).actionGet());
if (bulkResponse.hasFailures()) { if (bulkResponse.hasFailures()) {
LOGGER.error("Failures while writing data frame"); throw ExceptionsHelper.serverError("failures while writing results [" + bulkResponse.buildFailureMessage() + "]");
// TODO Better error handling
} }
} }
@ -123,7 +127,7 @@ class DataFrameRowsJoiner implements AutoCloseable {
joinCurrentResults(); joinCurrentResults();
} catch (Exception e) { } catch (Exception e) {
LOGGER.error(new ParameterizedMessage("[{}] Failed to join results", analyticsId), e); LOGGER.error(new ParameterizedMessage("[{}] Failed to join results", analyticsId), e);
failed = true; failure = "[" + analyticsId + "] Failed to join results: " + e.getMessage();
} finally { } finally {
try { try {
consumeDataExtractor(); consumeDataExtractor();
@ -159,7 +163,7 @@ class DataFrameRowsJoiner implements AutoCloseable {
} }
if (row == null || row.shouldSkip()) { if (row == null || row.shouldSkip()) {
throw ExceptionsHelper.serverError("No more data frame rows could be found while joining results"); throw ExceptionsHelper.serverError("no more data frame rows could be found while joining results");
} }
return row; return row;
} }
@ -175,9 +179,7 @@ class DataFrameRowsJoiner implements AutoCloseable {
try { try {
return dataExtractor.next(); return dataExtractor.next();
} catch (IOException e) { } catch (IOException e) {
// TODO Implement recovery strategy or better error reporting throw ExceptionsHelper.serverError("error reading next batch of data frame rows [" + e.getMessage() + "]");
LOGGER.error("Error reading next batch of data frame rows", e);
return Optional.empty();
} }
} }
} }

View File

@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory { public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory {
@ -50,7 +51,7 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory {
@Override @Override
public AnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, public AnalyticsProcess createAnalyticsProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig,
ExecutorService executorService) { ExecutorService executorService, Consumer<String> onProcessCrash) {
List<Path> filesToDelete = new ArrayList<>(); List<Path> filesToDelete = new ArrayList<>();
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, jobId, ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, jobId,
true, false, true, true, false, false); true, false, true, true, false, false);
@ -62,8 +63,7 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory {
NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, processPipes.getLogStream().get(), NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, processPipes.getLogStream().get(),
processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), null, numberOfFields, processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), null, numberOfFields,
filesToDelete, reason -> {}); filesToDelete, onProcessCrash);
try { try {
analyticsProcess.start(executorService); analyticsProcess.start(executorService);

View File

@ -265,8 +265,9 @@ public class JobNodeSelector {
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, task -> node.getId().equals(task.getExecutorNode())); MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, task -> node.getId().equals(task.getExecutorNode()));
for (PersistentTasksCustomMetaData.PersistentTask<?> assignedTask : assignedAnalyticsTasks) { for (PersistentTasksCustomMetaData.PersistentTask<?> assignedTask : assignedAnalyticsTasks) {
DataFrameAnalyticsState dataFrameAnalyticsState = ((DataFrameAnalyticsTaskState) assignedTask.getState()).getState(); DataFrameAnalyticsState dataFrameAnalyticsState = ((DataFrameAnalyticsTaskState) assignedTask.getState()).getState();
// TODO: skip FAILED here too if such a state is ever added
if (dataFrameAnalyticsState != DataFrameAnalyticsState.STOPPED) { // Don't count stopped and failed df-analytics tasks as they don't consume native memory
if (dataFrameAnalyticsState.isAnyOf(DataFrameAnalyticsState.STOPPED, DataFrameAnalyticsState.FAILED) == false) {
// The native process is only running in the ANALYZING and STOPPING states, but in the STARTED // The native process is only running in the ANALYZING and STOPPING states, but in the STARTED
// and REINDEXING states we're committed to using the memory soon, so account for it here // and REINDEXING states we're committed to using the memory soon, so account for it here
++result.numberOfAssignedJobs; ++result.numberOfAssignedJobs;

View File

@ -7,7 +7,6 @@ package org.elasticsearch.xpack.ml.rest.dataframe;
import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest;
@ -39,15 +38,11 @@ public class RestStopDataFrameAnalyticsAction extends BaseRestHandler {
request = StopDataFrameAnalyticsAction.Request.parseRequest(id, restRequest.contentOrSourceParamParser()); request = StopDataFrameAnalyticsAction.Request.parseRequest(id, restRequest.contentOrSourceParamParser());
} else { } else {
request = new StopDataFrameAnalyticsAction.Request(id); request = new StopDataFrameAnalyticsAction.Request(id);
if (restRequest.hasParam(StopDataFrameAnalyticsAction.Request.TIMEOUT.getPreferredName())) { request.setTimeout(restRequest.paramAsTime(StopDataFrameAnalyticsAction.Request.TIMEOUT.getPreferredName(),
TimeValue timeout = restRequest.paramAsTime(StopDataFrameAnalyticsAction.Request.TIMEOUT.getPreferredName(), request.getTimeout()));
request.getTimeout());
request.setTimeout(timeout);
}
if (restRequest.hasParam(StopDataFrameAnalyticsAction.Request.ALLOW_NO_MATCH.getPreferredName())) {
request.setAllowNoMatch(restRequest.paramAsBoolean(StopDataFrameAnalyticsAction.Request.ALLOW_NO_MATCH.getPreferredName(), request.setAllowNoMatch(restRequest.paramAsBoolean(StopDataFrameAnalyticsAction.Request.ALLOW_NO_MATCH.getPreferredName(),
request.allowNoMatch())); request.allowNoMatch()));
} request.setForce(restRequest.paramAsBoolean(StopDataFrameAnalyticsAction.Request.FORCE.getPreferredName(), request.isForce()));
} }
return channel -> client.execute(StopDataFrameAnalyticsAction.INSTANCE, request, new RestToXContentListener<>(channel)); return channel -> client.execute(StopDataFrameAnalyticsAction.INSTANCE, request, new RestToXContentListener<>(channel));
} }

View File

@ -0,0 +1,85 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
public class TransportStopDataFrameAnalyticsActionTests extends ESTestCase {
public void testFindAnalyticsToStop_GivenOneFailedTaskAndNotForce() {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addAnalyticsTask(tasksBuilder, "started", "foo-node", DataFrameAnalyticsState.STARTED);
addAnalyticsTask(tasksBuilder, "reindexing", "foo-node", DataFrameAnalyticsState.REINDEXING);
addAnalyticsTask(tasksBuilder, "analyzing", "foo-node", DataFrameAnalyticsState.ANALYZING);
addAnalyticsTask(tasksBuilder, "stopping", "foo-node", DataFrameAnalyticsState.STOPPING);
addAnalyticsTask(tasksBuilder, "stopped", "foo-node", DataFrameAnalyticsState.STOPPED);
addAnalyticsTask(tasksBuilder, "failed", "foo-node", DataFrameAnalyticsState.FAILED);
Set<String> ids = new HashSet<>(Arrays.asList("started", "reindexing", "analyzing", "stopping", "stopped", "failed"));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> TransportStopDataFrameAnalyticsAction.findAnalyticsToStop(tasksBuilder.build(), ids, false));
assertThat(e.status(), equalTo(RestStatus.CONFLICT));
assertThat(e.getMessage(), equalTo("cannot close data frame analytics [failed] because it failed, use force stop instead"));
}
public void testFindAnalyticsToStop_GivenTwoFailedTasksAndNotForce() {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addAnalyticsTask(tasksBuilder, "failed", "foo-node", DataFrameAnalyticsState.FAILED);
addAnalyticsTask(tasksBuilder, "another_failed", "foo-node", DataFrameAnalyticsState.FAILED);
Set<String> ids = new HashSet<>(Arrays.asList("failed", "another_failed"));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> TransportStopDataFrameAnalyticsAction.findAnalyticsToStop(tasksBuilder.build(), ids, false));
assertThat(e.status(), equalTo(RestStatus.CONFLICT));
assertThat(e.getMessage(), equalTo("one or more data frame analytics are in failed state, use force stop instead"));
}
public void testFindAnalyticsToStop_GivenFailedTaskAndForce() {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addAnalyticsTask(tasksBuilder, "started", "foo-node", DataFrameAnalyticsState.STARTED);
addAnalyticsTask(tasksBuilder, "reindexing", "foo-node", DataFrameAnalyticsState.REINDEXING);
addAnalyticsTask(tasksBuilder, "analyzing", "foo-node", DataFrameAnalyticsState.ANALYZING);
addAnalyticsTask(tasksBuilder, "stopping", "foo-node", DataFrameAnalyticsState.STOPPING);
addAnalyticsTask(tasksBuilder, "stopped", "foo-node", DataFrameAnalyticsState.STOPPED);
addAnalyticsTask(tasksBuilder, "failed", "foo-node", DataFrameAnalyticsState.FAILED);
Set<String> ids = new HashSet<>(Arrays.asList("started", "reindexing", "analyzing", "stopping", "stopped", "failed"));
Set<String> analyticsToStop = TransportStopDataFrameAnalyticsAction.findAnalyticsToStop(tasksBuilder.build(), ids, true);
assertThat(analyticsToStop, containsInAnyOrder("started", "reindexing", "analyzing", "failed"));
}
private static void addAnalyticsTask(PersistentTasksCustomMetaData.Builder builder, String analyticsId, String nodeId,
DataFrameAnalyticsState state) {
builder.addTask(MlTasks.dataFrameAnalyticsTaskId(analyticsId), MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
new StartDataFrameAnalyticsAction.TaskParams(analyticsId, Version.CURRENT),
new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment"));
builder.updateTaskState(MlTasks.dataFrameAnalyticsTaskId(analyticsId),
new DataFrameAnalyticsTaskState(state, builder.getLastAllocationId(), null));
}
}

View File

@ -566,10 +566,11 @@ public class JobNodeSelectorTests extends ESTestCase {
static void addDataFrameAnalyticsJobTask(String id, String nodeId, DataFrameAnalyticsState state, static void addDataFrameAnalyticsJobTask(String id, String nodeId, DataFrameAnalyticsState state,
PersistentTasksCustomMetaData.Builder builder, boolean isStale) { PersistentTasksCustomMetaData.Builder builder, boolean isStale) {
builder.addTask(MlTasks.dataFrameAnalyticsTaskId(id), MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, builder.addTask(MlTasks.dataFrameAnalyticsTaskId(id), MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
new StartDataFrameAnalyticsAction.TaskParams(id), new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment")); new StartDataFrameAnalyticsAction.TaskParams(id, Version.CURRENT),
new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment"));
if (state != null) { if (state != null) {
builder.updateTaskState(MlTasks.dataFrameAnalyticsTaskId(id), builder.updateTaskState(MlTasks.dataFrameAnalyticsTaskId(id),
new DataFrameAnalyticsTaskState(state, builder.getLastAllocationId() - (isStale ? 1 : 0))); new DataFrameAnalyticsTaskState(state, builder.getLastAllocationId() - (isStale ? 1 : 0), null));
} }
} }
} }

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.ml.process; package org.elasticsearch.xpack.ml.process;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
@ -199,7 +200,7 @@ public class MlMemoryTrackerTests extends ESTestCase {
private private
PersistentTasksCustomMetaData.PersistentTask<StartDataFrameAnalyticsAction.TaskParams> makeTestDataFrameAnalyticsTask(String id) { PersistentTasksCustomMetaData.PersistentTask<StartDataFrameAnalyticsAction.TaskParams> makeTestDataFrameAnalyticsTask(String id) {
return new PersistentTasksCustomMetaData.PersistentTask<>(MlTasks.dataFrameAnalyticsTaskId(id), return new PersistentTasksCustomMetaData.PersistentTask<>(MlTasks.dataFrameAnalyticsTaskId(id),
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, new StartDataFrameAnalyticsAction.TaskParams(id), 0, MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, new StartDataFrameAnalyticsAction.TaskParams(id, Version.CURRENT), 0,
PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT); PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT);
} }
} }

View File

@ -18,6 +18,11 @@
"required": false, "required": false,
"description": "Whether to ignore if a wildcard expression matches no data frame analytics. (This includes `_all` string or when no data frame analytics have been specified)" "description": "Whether to ignore if a wildcard expression matches no data frame analytics. (This includes `_all` string or when no data frame analytics have been specified)"
}, },
"force": {
"type": "boolean",
"required": false,
"description": "True if the data frame analytics should be forcefully stopped"
},
"timeout": { "timeout": {
"type": "time", "type": "time",
"required": false, "required": false,