From 12943c5d2cf4a6891bd3f65300cba48392e39777 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Wed, 27 Mar 2019 06:53:58 -0500 Subject: [PATCH] [ML] Add data frame task state object and field (#40169) (#40490) * [ML] Add data frame task state object and field * A new state item is added so that the overall task state can be accoutned for * A new FAILED state and reason have been added as well so that failures can be shown to the user for optional correction * Addressing PR comments * adjusting after master merge * addressing pr comment * Adjusting auditor usage with failure state * Refactor, renamed state items to task_state and indexer_state * Adding todo and removing redundant auditor call * Address HLRC changes and PR comment * adjusting hlrc IT test --- .../transforms/DataFrameTransformState.java | 39 +++- .../DataFrameTransformTaskState.java | 34 +++ .../client/DataFrameTransformIT.java | 9 +- .../DataFrameTransformStateTests.java | 8 +- .../DataFrameTransformDocumentationIT.java | 8 +- .../dataframe/get_data_frame_stats.asciidoc | 5 +- .../apis/get-transform-stats.asciidoc | 3 +- .../xpack/core/dataframe/DataFrameField.java | 1 + .../action/StartDataFrameTransformAction.java | 8 +- .../action/StopDataFrameTransformAction.java | 19 +- .../transforms/DataFrameTransformState.java | 79 +++++-- .../DataFrameTransformStateAndStats.java | 2 +- .../DataFrameTransformTaskState.java | 36 ++++ ...tDataFrameTransformActionRequestTests.java | 2 +- ...pDataFrameTransformActionRequestTests.java | 7 +- .../DataFrameTransformStateTests.java | 17 +- .../DataFrameTransformTaskStateTests.java | 81 +++++++ .../integration/DataFrameRestTestCase.java | 51 ++++- .../DataFrameTaskFailedStateIT.java | 94 ++++++++ ...ransportStartDataFrameTransformAction.java | 24 ++- ...TransportStopDataFrameTransformAction.java | 11 + .../RestStartDataFrameTransformAction.java | 3 +- .../RestStopDataFrameTransformAction.java | 3 +- ...FrameTransformPersistentTasksExecutor.java | 6 + .../transforms/DataFrameTransformTask.java | 200 +++++++++++++----- .../test/data_frame/transforms_start_stop.yml | 12 +- .../test/data_frame/transforms_stats.yml | 13 +- 27 files changed, 644 insertions(+), 131 deletions(-) create mode 100644 client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformTaskState.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTaskState.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTaskStateTests.java create mode 100644 x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformState.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformState.java index 4914f5eb4a8..fd191bb600c 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformState.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformState.java @@ -39,17 +39,25 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona public class DataFrameTransformState { - private static final ParseField STATE = new ParseField("transform_state"); + private static final ParseField INDEXER_STATE = new ParseField("indexer_state"); + private static final ParseField TASK_STATE = new ParseField("task_state"); private static final ParseField CURRENT_POSITION = new ParseField("current_position"); private static final ParseField GENERATION = new ParseField("generation"); @SuppressWarnings("unchecked") public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("data_frame_transform_state", - args -> new DataFrameTransformState((IndexerState) args[0], (HashMap) args[1], (long) args[2])); + args -> new DataFrameTransformState((DataFrameTransformTaskState) args[0], + (IndexerState) args[1], + (HashMap) args[2], + (long) args[3])); static { - PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), STATE, ObjectParser.ValueType.STRING); + PARSER.declareField(constructorArg(), + p -> DataFrameTransformTaskState.fromString(p.text()), + TASK_STATE, + ObjectParser.ValueType.STRING); + PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE, ObjectParser.ValueType.STRING); PARSER.declareField(optionalConstructorArg(), p -> { if (p.currentToken() == XContentParser.Token.START_OBJECT) { return p.map(); @@ -66,18 +74,27 @@ public class DataFrameTransformState { return PARSER.parse(parser, null); } - private final IndexerState state; + private final DataFrameTransformTaskState taskState; + private final IndexerState indexerState; private final long generation; private final SortedMap currentPosition; - public DataFrameTransformState(IndexerState state, @Nullable Map position, long generation) { - this.state = state; + public DataFrameTransformState(DataFrameTransformTaskState taskState, + IndexerState indexerState, + @Nullable Map position, + long generation) { + this.taskState = taskState; + this.indexerState = indexerState; this.currentPosition = position == null ? null : Collections.unmodifiableSortedMap(new TreeMap<>(position)); this.generation = generation; } public IndexerState getIndexerState() { - return state; + return indexerState; + } + + public DataFrameTransformTaskState getTaskState() { + return taskState; } @Nullable @@ -101,12 +118,14 @@ public class DataFrameTransformState { DataFrameTransformState that = (DataFrameTransformState) other; - return Objects.equals(this.state, that.state) && Objects.equals(this.currentPosition, that.currentPosition) - && this.generation == that.generation; + return Objects.equals(this.taskState, that.taskState) && + Objects.equals(this.indexerState, that.indexerState) && + Objects.equals(this.currentPosition, that.currentPosition) && + this.generation == that.generation; } @Override public int hashCode() { - return Objects.hash(state, currentPosition, generation); + return Objects.hash(taskState, indexerState, currentPosition, generation); } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformTaskState.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformTaskState.java new file mode 100644 index 00000000000..7235a0aed28 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformTaskState.java @@ -0,0 +1,34 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.dataframe.transforms; + +import java.util.Locale; + +public enum DataFrameTransformTaskState { + STOPPED, STARTED, FAILED; + + public static DataFrameTransformTaskState fromString(String name) { + return valueOf(name.trim().toUpperCase(Locale.ROOT)); + } + + public String value() { + return name().toLowerCase(Locale.ROOT); + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java index 649524b4329..25b09866e15 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java @@ -39,6 +39,7 @@ import org.elasticsearch.client.dataframe.StopDataFrameTransformResponse; import org.elasticsearch.client.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStats; +import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState; import org.elasticsearch.client.dataframe.transforms.DestConfig; import org.elasticsearch.client.dataframe.transforms.QueryConfig; import org.elasticsearch.client.dataframe.transforms.SourceConfig; @@ -67,6 +68,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; public class DataFrameTransformIT extends ESRestHighLevelClientTestCase { @@ -277,18 +279,23 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase { assertEquals(1, statsResponse.getTransformsStateAndStats().size()); DataFrameTransformStateAndStats stats = statsResponse.getTransformsStateAndStats().get(0); + assertEquals(DataFrameTransformTaskState.STOPPED, stats.getTransformState().getTaskState()); assertEquals(IndexerState.STOPPED, stats.getTransformState().getIndexerState()); DataFrameIndexerTransformStats zeroIndexerStats = new DataFrameIndexerTransformStats(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L); assertEquals(zeroIndexerStats, stats.getTransformStats()); // start the transform - execute(new StartDataFrameTransformRequest(id), client::startDataFrameTransform, client::startDataFrameTransformAsync); + StartDataFrameTransformResponse startTransformResponse = execute(new StartDataFrameTransformRequest(id), + client::startDataFrameTransform, + client::startDataFrameTransformAsync); + assertThat(startTransformResponse.isStarted(), is(true)); assertBusy(() -> { GetDataFrameTransformStatsResponse response = execute(new GetDataFrameTransformStatsRequest(id), client::getDataFrameTransformStats, client::getDataFrameTransformStatsAsync); DataFrameTransformStateAndStats stateAndStats = response.getTransformsStateAndStats().get(0); assertEquals(IndexerState.STARTED, stateAndStats.getTransformState().getIndexerState()); + assertEquals(DataFrameTransformTaskState.STARTED, stateAndStats.getTransformState().getTaskState()); assertNotEquals(zeroIndexerStats, stateAndStats.getTransformStats()); }); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateTests.java index babaab445e2..17dc3889481 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateTests.java @@ -41,12 +41,16 @@ public class DataFrameTransformStateTests extends ESTestCase { } public static DataFrameTransformState randomDataFrameTransformState() { - return new DataFrameTransformState(randomFrom(IndexerState.values()), randomPositionMap(), randomLongBetween(0,10)); + return new DataFrameTransformState(randomFrom(DataFrameTransformTaskState.values()), + randomFrom(IndexerState.values()), + randomPositionMap(), + randomLongBetween(0,10)); } public static void toXContent(DataFrameTransformState state, XContentBuilder builder) throws IOException { builder.startObject(); - builder.field("transform_state", state.getIndexerState().value()); + builder.field("task_state", state.getTaskState().value()); + builder.field("indexer_state", state.getIndexerState().value()); if (state.getPosition() != null) { builder.field("current_position", state.getPosition()); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java index 2a5651272f7..bf4940654ef 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java @@ -39,6 +39,7 @@ import org.elasticsearch.client.dataframe.StopDataFrameTransformResponse; import org.elasticsearch.client.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStats; +import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState; import org.elasticsearch.client.dataframe.transforms.DestConfig; import org.elasticsearch.client.dataframe.transforms.QueryConfig; import org.elasticsearch.client.dataframe.transforms.SourceConfig; @@ -466,13 +467,16 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest // tag::get-data-frame-transform-stats-response DataFrameTransformStateAndStats stateAndStats = response.getTransformsStateAndStats().get(0); // <1> + DataFrameTransformTaskState taskState = + stateAndStats.getTransformState().getTaskState(); // <2> IndexerState indexerState = - stateAndStats.getTransformState().getIndexerState(); // <2> + stateAndStats.getTransformState().getIndexerState(); // <3> DataFrameIndexerTransformStats transformStats = - stateAndStats.getTransformStats(); // <3> + stateAndStats.getTransformStats(); // <4> // end::get-data-frame-transform-stats-response assertEquals(IndexerState.STOPPED, indexerState); + assertEquals(DataFrameTransformTaskState.STOPPED, taskState); assertNotNull(transformStats); } { diff --git a/docs/java-rest/high-level/dataframe/get_data_frame_stats.asciidoc b/docs/java-rest/high-level/dataframe/get_data_frame_stats.asciidoc index 5302125e490..2b377d22c81 100644 --- a/docs/java-rest/high-level/dataframe/get_data_frame_stats.asciidoc +++ b/docs/java-rest/high-level/dataframe/get_data_frame_stats.asciidoc @@ -35,5 +35,6 @@ The returned +{response}+ contains the requested {dataframe-transform} statistic include-tagged::{doc-tests-file}[{api}-response] -------------------------------------------------- <1> The response contains a list of `DataFrameTransformStateAndStats` objects -<2> The running state of the transform e.g `started` -<3> The transform progress statistics recording the number of documents indexed etc \ No newline at end of file +<2> The running state of the transform task e.g `started` +<3> The running state of the transform indexer e.g `started`, `indexing`, etc. +<4> The transform progress statistics recording the number of documents indexed etc \ No newline at end of file diff --git a/docs/reference/data-frames/apis/get-transform-stats.asciidoc b/docs/reference/data-frames/apis/get-transform-stats.asciidoc index f377f3d510c..badb1b665f6 100644 --- a/docs/reference/data-frames/apis/get-transform-stats.asciidoc +++ b/docs/reference/data-frames/apis/get-transform-stats.asciidoc @@ -63,7 +63,8 @@ The API returns the following results: { "id" : "ecommerce_transform", "state" : { - "transform_state" : "started", + "indexer_state" : "started", + "task_state": "started", "current_position" : { "customer_id" : "9" }, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java index ccfa2fa666f..73e639cec5e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java @@ -26,6 +26,7 @@ public final class DataFrameField { public static final ParseField INDEX_DOC_TYPE = new ParseField("doc_type"); public static final ParseField SOURCE = new ParseField("source"); public static final ParseField DESTINATION = new ParseField("dest"); + public static final ParseField FORCE = new ParseField("force"); // common strings public static final String TASK_NAME = "data_frame/transforms"; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformAction.java index 0ac94b6c6aa..161c4d7d258 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformAction.java @@ -42,9 +42,11 @@ public class StartDataFrameTransformAction extends Action implements ToXContent { private String id; + private boolean force; - public Request(String id) { + public Request(String id, boolean force) { this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName()); + this.force = force; } public Request() { @@ -59,6 +61,10 @@ public class StartDataFrameTransformAction extends Action implements ToXContent { private String id; private final boolean waitForCompletion; + private final boolean force; - public Request(String id, boolean waitForCompletion, @Nullable TimeValue timeout) { + public Request(String id, boolean waitForCompletion, boolean force, @Nullable TimeValue timeout) { this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName()); this.waitForCompletion = waitForCompletion; + this.force = force; // use the timeout value already present in BaseTasksRequest this.setTimeout(timeout == null ? DEFAULT_TIMEOUT : timeout); } private Request() { - this(null, false, null); + this(null, false, false, null); } public Request(StreamInput in) throws IOException { super(in); id = in.readString(); waitForCompletion = in.readBoolean(); + force = in.readBoolean(); } public String getId() { @@ -78,11 +81,16 @@ public class StopDataFrameTransformAction extends Action currentPosition; + @Nullable + private final String reason; - private static final ParseField STATE = new ParseField("transform_state"); + private static final ParseField TASK_STATE = new ParseField("task_state"); + private static final ParseField INDEXER_STATE = new ParseField("indexer_state"); private static final ParseField CURRENT_POSITION = new ParseField("current_position"); private static final ParseField GENERATION = new ParseField("generation"); + private static final ParseField REASON = new ParseField("reason"); @SuppressWarnings("unchecked") public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, - args -> new DataFrameTransformState((IndexerState) args[0], (HashMap) args[1], (long) args[2])); + true, + args -> new DataFrameTransformState((DataFrameTransformTaskState) args[0], + (IndexerState) args[1], + (Map) args[2], + (long) args[3], + (String) args[4])); static { + PARSER.declareField(constructorArg(), p -> { + if (p.currentToken() == XContentParser.Token.VALUE_STRING) { + return DataFrameTransformTaskState.fromString(p.text()); + } + throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); + }, TASK_STATE, ObjectParser.ValueType.STRING); PARSER.declareField(constructorArg(), p -> { if (p.currentToken() == XContentParser.Token.VALUE_STRING) { return IndexerState.fromString(p.text()); } throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); - }, STATE, ObjectParser.ValueType.STRING); + }, INDEXER_STATE, ObjectParser.ValueType.STRING); PARSER.declareField(optionalConstructorArg(), p -> { if (p.currentToken() == XContentParser.Token.START_OBJECT) { return p.map(); @@ -64,23 +80,36 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState } throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); }, CURRENT_POSITION, ObjectParser.ValueType.VALUE_OBJECT_ARRAY); - PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), GENERATION); + PARSER.declareLong(constructorArg(), GENERATION); + PARSER.declareString(optionalConstructorArg(), REASON); } - public DataFrameTransformState(IndexerState state, @Nullable Map position, long generation) { - this.state = state; + public DataFrameTransformState(DataFrameTransformTaskState taskState, + IndexerState indexerState, + @Nullable Map position, + long generation, + @Nullable String reason) { + this.taskState = taskState; + this.indexerState = indexerState; this.currentPosition = position == null ? null : Collections.unmodifiableSortedMap(new TreeMap<>(position)); this.generation = generation; + this.reason = reason; } public DataFrameTransformState(StreamInput in) throws IOException { - state = IndexerState.fromStream(in); + taskState = DataFrameTransformTaskState.fromStream(in); + indexerState = IndexerState.fromStream(in); currentPosition = in.readBoolean() ? Collections.unmodifiableSortedMap(new TreeMap<>(in.readMap())) : null; generation = in.readLong(); + reason = in.readOptionalString(); + } + + public DataFrameTransformTaskState getTaskState() { + return taskState; } public IndexerState getIndexerState() { - return state; + return indexerState; } public Map getPosition() { @@ -91,6 +120,10 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState return generation; } + public String getReason() { + return reason; + } + public static DataFrameTransformState fromXContent(XContentParser parser) { try { return PARSER.parse(parser, null); @@ -102,11 +135,15 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field(STATE.getPreferredName(), state.value()); + builder.field(TASK_STATE.getPreferredName(), taskState.value()); + builder.field(INDEXER_STATE.getPreferredName(), indexerState.value()); if (currentPosition != null) { builder.field(CURRENT_POSITION.getPreferredName(), currentPosition); } builder.field(GENERATION.getPreferredName(), generation); + if (reason != null) { + builder.field(REASON.getPreferredName(), reason); + } builder.endObject(); return builder; } @@ -118,12 +155,14 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState @Override public void writeTo(StreamOutput out) throws IOException { - state.writeTo(out); + taskState.writeTo(out); + indexerState.writeTo(out); out.writeBoolean(currentPosition != null); if (currentPosition != null) { out.writeMap(currentPosition); } out.writeLong(generation); + out.writeOptionalString(reason); } @Override @@ -138,12 +177,20 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState DataFrameTransformState that = (DataFrameTransformState) other; - return Objects.equals(this.state, that.state) && Objects.equals(this.currentPosition, that.currentPosition) - && this.generation == that.generation; + return Objects.equals(this.taskState, that.taskState) && + Objects.equals(this.indexerState, that.indexerState) && + Objects.equals(this.currentPosition, that.currentPosition) && + this.generation == that.generation && + Objects.equals(this.reason, that.reason); } @Override public int hashCode() { - return Objects.hash(state, currentPosition, generation); + return Objects.hash(taskState, indexerState, currentPosition, generation, reason); } -} \ No newline at end of file + + @Override + public String toString() { + return Strings.toString(this); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateAndStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateAndStats.java index 116ad482d01..e155998aa2e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateAndStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateAndStats.java @@ -41,7 +41,7 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj public static DataFrameTransformStateAndStats initialStateAndStats(String id) { return new DataFrameTransformStateAndStats(id, - new DataFrameTransformState(IndexerState.STOPPED, null, 0), + new DataFrameTransformState(DataFrameTransformTaskState.STOPPED, IndexerState.STOPPED, null, 0L, null), new DataFrameIndexerTransformStats()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTaskState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTaskState.java new file mode 100644 index 00000000000..795daca61ac --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTaskState.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.dataframe.transforms; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.Locale; + +public enum DataFrameTransformTaskState implements Writeable { + STOPPED, STARTED, FAILED; + + public static DataFrameTransformTaskState fromString(String name) { + return valueOf(name.trim().toUpperCase(Locale.ROOT)); + } + + public static DataFrameTransformTaskState fromStream(StreamInput in) throws IOException { + return in.readEnum(DataFrameTransformTaskState.class); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + DataFrameTransformTaskState state = this; + out.writeEnum(state); + } + + public String value() { + return name().toLowerCase(Locale.ROOT); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformActionRequestTests.java index 976db70c45f..6220a08fb10 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformActionRequestTests.java @@ -13,7 +13,7 @@ import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformActi public class StartDataFrameTransformActionRequestTests extends AbstractWireSerializingTestCase { @Override protected Request createTestInstance() { - return new Request(randomAlphaOfLengthBetween(1, 20)); + return new Request(randomAlphaOfLengthBetween(1, 20), randomBoolean()); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StopDataFrameTransformActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StopDataFrameTransformActionRequestTests.java index 767725a564a..c117e249aef 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StopDataFrameTransformActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StopDataFrameTransformActionRequestTests.java @@ -16,7 +16,7 @@ public class StopDataFrameTransformActionRequestTests extends AbstractWireSerial @Override protected Request createTestInstance() { TimeValue timeout = randomBoolean() ? TimeValue.timeValueMinutes(randomIntBetween(1, 10)) : null; - return new Request(randomAlphaOfLengthBetween(1, 10), randomBoolean(), timeout); + return new Request(randomAlphaOfLengthBetween(1, 10), randomBoolean(), randomBoolean(), timeout); } @Override @@ -27,9 +27,10 @@ public class StopDataFrameTransformActionRequestTests extends AbstractWireSerial public void testSameButDifferentTimeout() { String id = randomAlphaOfLengthBetween(1, 10); boolean waitForCompletion = randomBoolean(); + boolean force = randomBoolean(); - Request r1 = new Request(id, waitForCompletion, TimeValue.timeValueSeconds(10)); - Request r2 = new Request(id, waitForCompletion, TimeValue.timeValueSeconds(20)); + Request r1 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(10)); + Request r2 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(20)); assertNotEquals(r1,r2); assertNotEquals(r1.hashCode(),r2.hashCode()); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateTests.java index 1f8eac1f5ba..341faafdf12 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateTests.java @@ -14,11 +14,16 @@ import org.elasticsearch.xpack.core.indexing.IndexerState; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.function.Predicate; public class DataFrameTransformStateTests extends AbstractSerializingTestCase { public static DataFrameTransformState randomDataFrameTransformState() { - return new DataFrameTransformState(randomFrom(IndexerState.values()), randomPosition(), randomLongBetween(0,10)); + return new DataFrameTransformState(randomFrom(DataFrameTransformTaskState.values()), + randomFrom(IndexerState.values()), + randomPosition(), + randomLongBetween(0,10), + randomBoolean() ? null : randomAlphaOfLength(10)); } @Override @@ -53,4 +58,14 @@ public class DataFrameTransformStateTests extends AbstractSerializingTestCase getRandomFieldsExcludeFilter() { + return field -> !field.isEmpty(); + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTaskStateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTaskStateTests.java new file mode 100644 index 00000000000..62c73846f59 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTaskStateTests.java @@ -0,0 +1,81 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.dataframe.transforms; + +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class DataFrameTransformTaskStateTests extends ESTestCase { + + public void testValidOrdinals() { + assertThat(DataFrameTransformTaskState.STOPPED.ordinal(), equalTo(0)); + assertThat(DataFrameTransformTaskState.STARTED.ordinal(), equalTo(1)); + assertThat(DataFrameTransformTaskState.FAILED.ordinal(), equalTo(2)); + } + + public void testwriteTo() throws Exception { + try (BytesStreamOutput out = new BytesStreamOutput()) { + DataFrameTransformTaskState.STOPPED.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + assertThat(in.readVInt(), equalTo(0)); + } + } + + try (BytesStreamOutput out = new BytesStreamOutput()) { + DataFrameTransformTaskState.STARTED.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + assertThat(in.readVInt(), equalTo(1)); + } + } + + try (BytesStreamOutput out = new BytesStreamOutput()) { + DataFrameTransformTaskState.FAILED.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + assertThat(in.readVInt(), equalTo(2)); + } + } + } + + public void testReadFrom() throws Exception { + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.writeVInt(0); + try (StreamInput in = out.bytes().streamInput()) { + assertThat(DataFrameTransformTaskState.fromStream(in), equalTo(DataFrameTransformTaskState.STOPPED)); + } + } + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.writeVInt(1); + try (StreamInput in = out.bytes().streamInput()) { + assertThat(DataFrameTransformTaskState.fromStream(in), equalTo(DataFrameTransformTaskState.STARTED)); + } + } + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.writeVInt(2); + try (StreamInput in = out.bytes().streamInput()) { + assertThat(DataFrameTransformTaskState.fromStream(in), equalTo(DataFrameTransformTaskState.FAILED)); + } + } + } + + public void testInvalidReadFrom() throws Exception { + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.writeVInt(randomIntBetween(3, Integer.MAX_VALUE)); + try (StreamInput in = out.bytes().streamInput()) { + DataFrameTransformTaskState.fromStream(in); + fail("Expected IOException"); + } catch(IOException e) { + assertThat(e.getMessage(), containsString("Unknown DataFrameTransformTaskState ordinal [")); + } + + } + } +} diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java index 2e1817e4e90..6b9300916a9 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java @@ -167,17 +167,34 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase { assertTrue(indexExists(dataFrameIndex)); } + protected void startDataframeTransform(String transformId, boolean force) throws IOException { + startDataframeTransform(transformId, force, null); + } + + protected void startDataframeTransform(String transformId, boolean force, String authHeader) throws IOException { + // start the transform + final Request startTransformRequest = createRequestWithAuth("POST", DATAFRAME_ENDPOINT + transformId + "/_start", authHeader); + startTransformRequest.addParameter(DataFrameField.FORCE.getPreferredName(), Boolean.toString(force)); + Map startTransformResponse = entityAsMap(client().performRequest(startTransformRequest)); + assertThat(startTransformResponse.get("started"), equalTo(Boolean.TRUE)); + } + + protected void stopDataFrameTransform(String transformId, boolean force) throws Exception { + // start the transform + final Request stopTransformRequest = createRequestWithAuth("POST", DATAFRAME_ENDPOINT + transformId + "/_stop", null); + stopTransformRequest.addParameter(DataFrameField.FORCE.getPreferredName(), Boolean.toString(force)); + stopTransformRequest.addParameter(DataFrameField.WAIT_FOR_COMPLETION.getPreferredName(), Boolean.toString(true)); + Map stopTransformResponse = entityAsMap(client().performRequest(stopTransformRequest)); + assertThat(stopTransformResponse.get("stopped"), equalTo(Boolean.TRUE)); + } + protected void startAndWaitForTransform(String transformId, String dataFrameIndex) throws Exception { startAndWaitForTransform(transformId, dataFrameIndex, null); } protected void startAndWaitForTransform(String transformId, String dataFrameIndex, String authHeader) throws Exception { // start the transform - final Request startTransformRequest = createRequestWithAuth("POST", DATAFRAME_ENDPOINT + transformId + "/_start", authHeader); - - Map startTransformResponse = entityAsMap(client().performRequest(startTransformRequest)); - assertThat(startTransformResponse.get("started"), equalTo(Boolean.TRUE)); - + startDataframeTransform(transformId, false, authHeader); // wait until the dataframe has been created and all data is available waitForDataFrameGeneration(transformId); refreshIndex(dataFrameIndex); @@ -216,13 +233,29 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase { } protected static String getDataFrameIndexerState(String transformId) throws IOException { + Map transformStatsAsMap = getDataFrameState(transformId); + return transformStatsAsMap == null ? null : + (String) XContentMapValues.extractValue("state.indexer_state", transformStatsAsMap); + } + + protected static String getDataFrameTaskState(String transformId) throws IOException { + Map transformStatsAsMap = getDataFrameState(transformId); + return transformStatsAsMap == null ? null : (String) XContentMapValues.extractValue("state.task_state", transformStatsAsMap); + } + + protected static Map getDataFrameState(String transformId) throws IOException { Response statsResponse = client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + transformId + "/_stats")); List transforms = ((List) entityAsMap(statsResponse).get("transforms")); if (transforms.isEmpty()) { return null; } - Map transformStatsAsMap = (Map) transforms.get(0); - return (String) XContentMapValues.extractValue("state.transform_state", transformStatsAsMap); + return (Map) transforms.get(0); + } + + protected static void deleteDataFrameTransform(String transformId) throws IOException { + Request request = new Request("DELETE", DATAFRAME_ENDPOINT + transformId); + request.addParameter("ignore", "404"); // Ignore 404s because they imply someone was racing us to delete this + adminClient().performRequest(request); } @AfterClass @@ -251,9 +284,7 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase { for (Map transformConfig : transformConfigs) { String transformId = (String) transformConfig.get("id"); - Request request = new Request("DELETE", DATAFRAME_ENDPOINT + transformId); - request.addParameter("ignore", "404"); // Ignore 404s because they imply someone was racing us to delete this - adminClient().performRequest(request); + deleteDataFrameTransform(transformId); } // transforms should be all gone diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java new file mode 100644 index 00000000000..887279ef20f --- /dev/null +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java @@ -0,0 +1,94 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.dataframe.integration; + +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.Matchers.equalTo; + +public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase { + + public void testFailureStateInteraction() throws Exception { + createReviewsIndex(); + String transformId = "failure_pivot_1"; + String dataFrameIndex = "failure_pivot_reviews"; + createPivotReviewsTransform(transformId, dataFrameIndex, null); + deleteIndex(REVIEWS_INDEX_NAME); // trigger start failure due to index missing + startDataframeTransform(transformId, false); + awaitState(transformId, DataFrameTransformTaskState.FAILED); + Map fullState = getDataFrameState(transformId); + + // Verify we have failed for the expected reason + assertThat(XContentMapValues.extractValue("state.reason", fullState), + equalTo("task encountered irrecoverable failure: no such index [reviews]")); + assertThat(XContentMapValues.extractValue("state.indexer_state", fullState), equalTo("started")); + + // Verify that we cannot stop or start the transform when the task is in a failed state + ResponseException ex = expectThrows(ResponseException.class, () -> stopDataFrameTransform(transformId, false)); + assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus())); + assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())), + equalTo("Unable to stop data frame transform [failure_pivot_1] as it is in a failed state with reason: [" + + "task encountered irrecoverable failure: no such index [reviews]]. Use force stop to stop the data frame transform.")); + + ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId, false)); + assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus())); + assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())), + equalTo("Unable to start data frame transform [failure_pivot_1] as it is in a failed state with failure: [" + + "task encountered irrecoverable failure: no such index [reviews]]. " + + "Use force start to restart data frame transform once error is resolved.")); + + // Correct the failure by creating the reviews index again + createReviewsIndex(); + // Force start the data frame to indicate failure correction + startDataframeTransform(transformId, true); + // Wait for data to be indexed appropriately and refresh for search + waitForDataFrameGeneration(transformId); + refreshIndex(dataFrameIndex); + + // Verify that we have started and that our reason is cleared + fullState = getDataFrameState(transformId); + assertThat(XContentMapValues.extractValue("state.reason", fullState), is(nullValue())); + assertThat(XContentMapValues.extractValue("state.task_state", fullState), equalTo("started")); + assertThat(XContentMapValues.extractValue("state.indexer_state", fullState), equalTo("started")); + assertThat(XContentMapValues.extractValue("stats.search_failures", fullState), equalTo(1)); + + // get and check some users to verify we restarted + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_0", 3.776978417); + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_5", 3.72); + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_11", 3.846153846); + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_20", 3.769230769); + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_26", 3.918918918); + + + stopDataFrameTransform(transformId, true); + deleteDataFrameTransform(transformId); + } + + private void awaitState(String transformId, DataFrameTransformTaskState state) throws Exception { + assertBusy(() -> { + String currentState = getDataFrameTaskState(transformId); + assertThat(state.value(), equalTo(currentState)); + }); + } + + private void assertOnePivotValue(String query, double expected) throws IOException { + Map searchResult = getAsMap(query); + + assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult)); + double actual = (Double) ((List) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0); + assertEquals(expected, actual, 0.000001); + } +} diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java index 7fa19fa50e8..a6f52f22da4 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java @@ -33,6 +33,8 @@ import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformActi import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import java.util.Collection; @@ -94,11 +96,7 @@ public class TransportStartDataFrameTransformAction extends new StartDataFrameTransformTaskAction.Request(request.getId()), ActionListener.wrap( r -> listener.onResponse(new StartDataFrameTransformAction.Response(true)), - startingFailure -> cancelDataFrameTask(task.getId(), - transformTask.getId(), - startingFailure, - listener::onFailure) - )), + listener::onFailure)), listener::onFailure)); }, listener::onFailure @@ -122,7 +120,21 @@ public class TransportStartDataFrameTransformAction extends transformTask, persistentTaskActionListener); } else { - persistentTaskActionListener.onResponse(existingTask); + DataFrameTransformState transformState = (DataFrameTransformState)existingTask.getState(); + if(transformState.getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) { + listener.onFailure(new ElasticsearchStatusException( + "Unable to start data frame transform [" + config.getId() + + "] as it is in a failed state with failure: [" + transformState.getReason() + + "]. Use force start to restart data frame transform once error is resolved.", + RestStatus.CONFLICT)); + } else if (transformState.getTaskState() != DataFrameTransformTaskState.STOPPED && + transformState.getTaskState() != DataFrameTransformTaskState.FAILED) { + listener.onFailure(new ElasticsearchStatusException( + "Unable to start data frame transform [" + config.getId() + + "] as it is in state [" + transformState.getTaskState() + "]", RestStatus.CONFLICT)); + } else { + persistentTaskActionListener.onResponse(existingTask); + } } }, listener::onFailure diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java index 78e8425758f..2234226a501 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.dataframe.action; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; @@ -15,11 +16,13 @@ import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask; @@ -60,6 +63,14 @@ public class TransportStopDataFrameTransformAction extends protected void taskOperation(StopDataFrameTransformAction.Request request, DataFrameTransformTask transformTask, ActionListener listener) { if (transformTask.getTransformId().equals(request.getId())) { + if (transformTask.getState().getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) { + listener.onFailure( + new ElasticsearchStatusException("Unable to stop data frame transform [" + request.getId() + + "] as it is in a failed state with reason: [" + transformTask.getState().getReason() + + "]. Use force stop to stop the data frame transform.", + RestStatus.CONFLICT)); + return; + } if (request.waitForCompletion() == false) { transformTask.stop(listener); } else { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStartDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStartDataFrameTransformAction.java index 76bf5c7230b..1d9b3f29a61 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStartDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStartDataFrameTransformAction.java @@ -28,7 +28,8 @@ public class RestStartDataFrameTransformAction extends BaseRestHandler { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { String id = restRequest.param(DataFrameField.ID.getPreferredName()); - StartDataFrameTransformAction.Request request = new StartDataFrameTransformAction.Request(id); + boolean force = restRequest.paramAsBoolean(DataFrameField.FORCE.getPreferredName(), false); + StartDataFrameTransformAction.Request request = new StartDataFrameTransformAction.Request(id, force); request.timeout(restRequest.paramAsTime(DataFrameField.TIMEOUT.getPreferredName(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT)); return channel -> client.execute(StartDataFrameTransformAction.INSTANCE, request, new RestToXContentListener<>(channel)); } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStopDataFrameTransformAction.java index 88037318124..e93898b905b 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStopDataFrameTransformAction.java @@ -30,8 +30,9 @@ public class RestStopDataFrameTransformAction extends BaseRestHandler { TimeValue timeout = restRequest.paramAsTime(DataFrameField.TIMEOUT.getPreferredName(), StopDataFrameTransformAction.DEFAULT_TIMEOUT); boolean waitForCompletion = restRequest.paramAsBoolean(DataFrameField.WAIT_FOR_COMPLETION.getPreferredName(), false); + boolean force = restRequest.paramAsBoolean(DataFrameField.FORCE.getPreferredName(), false); - StopDataFrameTransformAction.Request request = new StopDataFrameTransformAction.Request(id, waitForCompletion, timeout); + StopDataFrameTransformAction.Request request = new StopDataFrameTransformAction.Request(id, waitForCompletion, force, timeout); return channel -> client.execute(StopDataFrameTransformAction.INSTANCE, request, new RestToXContentListener<>(channel)); } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index bac7ac449a9..d53354db2aa 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -21,6 +21,7 @@ import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.dataframe.DataFrame; import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService; @@ -60,6 +61,11 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job( DataFrameTransformTask.SCHEDULE_NAME + "_" + params.getId(), next()); + DataFrameTransformState transformState = (DataFrameTransformState) state; + if (transformState != null && transformState.getTaskState() == DataFrameTransformTaskState.FAILED) { + logger.warn("Tried to start failed transform [" + params.getId() + "] failure reason: " + transformState.getReason()); + return; + } // Note that while the task is added to the scheduler here, the internal state will prevent // it from doing any work until the task is "started" via the StartTransform api schedulerEngine.register(buildTask); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index b57ca56b824..b8bc2870307 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -18,7 +18,9 @@ import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; @@ -33,6 +35,7 @@ import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformActio import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event; @@ -40,14 +43,20 @@ import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpoin import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil; +import java.util.Arrays; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; + public class DataFrameTransformTask extends AllocatedPersistentTask implements SchedulerEngine.Listener { private static final Logger logger = LogManager.getLogger(DataFrameTransformTask.class); + // TODO consider moving to dynamic cluster setting + private static final int MAX_CONTINUOUS_FAILURES = 10; + private static final IndexerState[] RUNNING_STATES = new IndexerState[]{IndexerState.STARTED, IndexerState.INDEXING}; public static final String SCHEDULE_NAME = DataFrameField.TASK_NAME + "/schedule"; private final DataFrameTransform transform; @@ -56,10 +65,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S private final DataFrameIndexer indexer; private final Auditor auditor; + private final AtomicReference taskState; + private final AtomicReference stateReason; // the generation of this data frame, for v1 there will be only // 0: data frame not created or still indexing // 1: data frame complete, all data has been indexed private final AtomicReference generation; + private final AtomicInteger failureCount; public DataFrameTransformTask(long id, String type, String action, TaskId parentTask, DataFrameTransform transform, DataFrameTransformState state, Client client, DataFrameTransformsConfigManager transformsConfigManager, @@ -72,10 +84,14 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S this.threadPool = threadPool; this.auditor = auditor; IndexerState initialState = IndexerState.STOPPED; + DataFrameTransformTaskState initialTaskState = DataFrameTransformTaskState.STOPPED; + String initialReason = null; long initialGeneration = 0; Map initialPosition = null; logger.info("[{}] init, got state: [{}]", transform.getId(), state != null); if (state != null) { + initialTaskState = state.getTaskState(); + initialReason = state.getReason(); final IndexerState existingState = state.getIndexerState(); logger.info("[{}] Loading existing state: [{}], position [{}]", transform.getId(), existingState, state.getPosition()); if (existingState.equals(IndexerState.INDEXING)) { @@ -93,7 +109,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S this.indexer = new ClientDataFrameIndexer(transform.getId(), transformsConfigManager, transformsCheckpointService, new AtomicReference<>(initialState), initialPosition, client, auditor); - this.generation = new AtomicReference(initialGeneration); + this.generation = new AtomicReference<>(initialGeneration); + this.taskState = new AtomicReference<>(initialTaskState); + this.stateReason = new AtomicReference<>(initialReason); + this.failureCount = new AtomicInteger(0); } public String getTransformId() { @@ -109,7 +128,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S } public DataFrameTransformState getState() { - return new DataFrameTransformState(indexer.getState(), indexer.getPosition(), generation.get()); + return new DataFrameTransformState(taskState.get(), indexer.getState(), indexer.getPosition(), generation.get(), stateReason.get()); } public DataFrameIndexerTransformStats getStats() { @@ -125,66 +144,71 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S } public synchronized void start(ActionListener listener) { - final IndexerState prevState = indexer.getState(); - if (prevState != IndexerState.STOPPED) { - // fails if the task is not STOPPED - listener.onFailure(new ElasticsearchException("Cannot start task for data frame transform [{}], because state was [{}]", - transform.getId(), prevState)); - return; - } - final IndexerState newState = indexer.start(); - if (newState != IndexerState.STARTED) { + if (Arrays.stream(RUNNING_STATES).noneMatch(newState::equals)) { listener.onFailure(new ElasticsearchException("Cannot start task for data frame transform [{}], because state was [{}]", transform.getId(), newState)); return; } + stateReason.set(null); + taskState.set(DataFrameTransformTaskState.STARTED); + failureCount.set(0); - final DataFrameTransformState state = new DataFrameTransformState(IndexerState.STOPPED, indexer.getPosition(), generation.get()); + final DataFrameTransformState state = new DataFrameTransformState( + DataFrameTransformTaskState.STARTED, + IndexerState.STOPPED, + indexer.getPosition(), + generation.get(), + null); - logger.debug("Updating state for data frame transform [{}] to [{}][{}]", transform.getId(), state.getIndexerState(), - state.getPosition()); - updatePersistentTaskState(state, - ActionListener.wrap( - (task) -> { - auditor.info(transform.getId(), "Updated state to [" + state.getIndexerState() + "]"); - logger.debug("Successfully updated state for data frame transform [" + transform.getId() + "] to [" - + state.getIndexerState() + "][" + state.getPosition() + "]"); - listener.onResponse(new StartDataFrameTransformTaskAction.Response(true)); - }, (exc) -> { - // We were unable to update the persistent status, so we need to shutdown the indexer too. - indexer.stop(); - listener.onFailure(new ElasticsearchException("Error while updating state for data frame transform [" + logger.info("Updating state for data frame transform [{}] to [{}]", transform.getId(), state.toString()); + persistStateToClusterState(state, ActionListener.wrap( + task -> { + auditor.info(transform.getId(), "Updated state to [" + state.getTaskState() + "]"); + listener.onResponse(new StartDataFrameTransformTaskAction.Response(true)); + }, + exc -> { + indexer.stop(); + listener.onFailure(new ElasticsearchException("Error while updating state for data frame transform [" + transform.getId() + "] to [" + state.getIndexerState() + "].", exc)); - }) - ); + } + )); } public synchronized void stop(ActionListener listener) { + // taskState is initialized as STOPPED and is updated in tandem with the indexerState + // Consequently, if it is STOPPED, we consider the whole task STOPPED. + if (taskState.get() == DataFrameTransformTaskState.STOPPED) { + listener.onResponse(new StopDataFrameTransformAction.Response(true)); + return; + } final IndexerState newState = indexer.stop(); switch (newState) { case STOPPED: - listener.onResponse(new StopDataFrameTransformAction.Response(true)); - break; - + // Fall through to `STOPPING` as the behavior is the same for both, we should persist for both case STOPPING: // update the persistent state to STOPPED. There are two scenarios and both are safe: // 1. we persist STOPPED now, indexer continues a bit then sees the flag and checkpoints another STOPPED with the more recent // position. // 2. we persist STOPPED now, indexer continues a bit but then dies. When/if we resume we'll pick up at last checkpoint, // overwrite some docs and eventually checkpoint. - DataFrameTransformState state = new DataFrameTransformState(IndexerState.STOPPED, indexer.getPosition(), generation.get()); - updatePersistentTaskState(state, ActionListener.wrap((task) -> { - auditor.info(transform.getId(), "Updated state to [" + state.getIndexerState() + "]"); - logger.debug("Successfully updated state for data frame transform [{}] to [{}]", transform.getId(), - state.getIndexerState()); - listener.onResponse(new StopDataFrameTransformAction.Response(true)); - }, (exc) -> { - listener.onFailure(new ElasticsearchException("Error while updating state for data frame transform [{}] to [{}]", exc, - transform.getId(), state.getIndexerState())); - })); + taskState.set(DataFrameTransformTaskState.STOPPED); + DataFrameTransformState state = new DataFrameTransformState( + DataFrameTransformTaskState.STOPPED, + IndexerState.STOPPED, + indexer.getPosition(), + generation.get(), + stateReason.get()); + persistStateToClusterState(state, ActionListener.wrap( + task -> { + auditor.info(transform.getId(), "Updated state to [" + state.getTaskState() + "]"); + listener.onResponse(new StopDataFrameTransformAction.Response(true)); + }, + exc -> listener.onFailure(new ElasticsearchException( + "Error while updating state for data frame transform [{}] to [{}]", exc, + transform.getId(), + state.getIndexerState())))); break; - default: listener.onFailure(new ElasticsearchException("Cannot stop task for data frame transform [{}], because state was [{}]", transform.getId(), newState)); @@ -217,6 +241,40 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S markAsCompleted(); } + void persistStateToClusterState(DataFrameTransformState state, + ActionListener> listener) { + updatePersistentTaskState(state, ActionListener.wrap( + success -> { + logger.debug("Successfully updated state for data frame transform [{}] to [{}]", transform.getId(), state.toString()); + listener.onResponse(success); + }, + failure -> { + auditor.warning(transform.getId(), "Failed to persist to state to cluster state: " + failure.getMessage()); + logger.error("Failed to update state for data frame transform [" + transform.getId() + "]", failure); + listener.onFailure(failure); + } + )); + } + + private boolean isIrrecoverableFailure(Exception e) { + return e instanceof IndexNotFoundException || e instanceof DataFrameConfigurationException; + } + + synchronized void handleFailure(Exception e) { + if (isIrrecoverableFailure(e) || failureCount.incrementAndGet() > MAX_CONTINUOUS_FAILURES) { + String failureMessage = isIrrecoverableFailure(e) ? + "task encountered irrecoverable failure: " + e.getMessage() : + "task encountered more than " + MAX_CONTINUOUS_FAILURES + " failures; latest failure: " + e.getMessage(); + auditor.error(transform.getId(), failureMessage); + stateReason.set(failureMessage); + taskState.set(DataFrameTransformTaskState.FAILED); + persistStateToClusterState(getState(), ActionListener.wrap( + r -> failureCount.set(0), // Successfully marked as failed, reset counter so that task can be restarted + exception -> {} // Noop, internal method logs the failure to update the state + )); + } + } + /** * This is called when the persistent task signals that the allocated task should be terminated. * Termination in the task framework is essentially voluntary, as the allocated task can only be @@ -239,6 +297,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S private final DataFrameTransformsCheckpointService transformsCheckpointService; private final String transformId; private final Auditor auditor; + // Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index + private volatile String lastAuditedExceptionMessage = null; private Map fieldMappings = null; private DataFrameTransformConfig transformConfig = null; @@ -272,12 +332,17 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S @Override public synchronized boolean maybeTriggerAsyncJob(long now) { + if (taskState.get() == DataFrameTransformTaskState.FAILED) { + logger.debug("Schedule was triggered for transform [" + getJobId() + "] but task is failed. Ignoring trigger."); + return false; + } + if (transformConfig == null) { CountDownLatch latch = new CountDownLatch(1); - transformsConfigManager.getTransformConfiguration(transformId, new LatchedActionListener<>(ActionListener.wrap(config -> { - transformConfig = config; - }, e -> { + transformsConfigManager.getTransformConfiguration(transformId, new LatchedActionListener<>(ActionListener.wrap( + config -> transformConfig = config, + e -> { throw new RuntimeException( DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_LOAD_TRANSFORM_CONFIGURATION, transformId), e); }), latch)); @@ -290,11 +355,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S } } - // todo: set job into failed state if (transformConfig.isValid() == false) { - auditor.error(transformId, "Cannot execute data frame transform as configuration is invalid"); - throw new RuntimeException( - DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID, transformId)); + DataFrameConfigurationException exception = new DataFrameConfigurationException(transformId); + handleFailure(exception); + throw exception; } if (fieldMappings == null) { @@ -341,24 +405,36 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S return; } - if(indexerState.equals(IndexerState.STARTED)) { - // if the indexer resets the state to started, it means it is done, so increment the generation + if(indexerState.equals(IndexerState.STARTED) && getStats().getNumDocuments() > 0) { + // if the indexer resets the state to started, it means it is done with a run through the data. + // But, if there were no documents, we should allow it to attempt to gather more again, as there is no risk of overwriting + // Some reasons for no documents are (but is not limited to): + // * Could have failed early on search or index + // * Have an empty index + // * Have a query that returns no documents generation.compareAndSet(0L, 1L); } - final DataFrameTransformState state = new DataFrameTransformState(indexerState, getPosition(), generation.get()); + final DataFrameTransformState state = new DataFrameTransformState( + taskState.get(), + indexerState, + getPosition(), + generation.get(), + stateReason.get()); logger.info("Updating persistent state of transform [" + transform.getId() + "] to [" + state.toString() + "]"); - - updatePersistentTaskState(state, ActionListener.wrap(task -> next.run(), exc -> { - logger.error("Updating persistent state of transform [" + transform.getId() + "] failed", exc); - next.run(); - })); + persistStateToClusterState(state, ActionListener.wrap(t -> next.run(), e -> next.run())); } @Override protected void onFailure(Exception exc) { - auditor.error(transform.getId(), "Data frame transform failed with an exception: " + exc.getMessage()); - logger.warn("Data frame transform [" + transform.getId() + "] failed with an exception: ", exc); + // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous + // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one + if (exc.getMessage().equals(lastAuditedExceptionMessage) == false) { + auditor.warning(transform.getId(), "Data frame transform encountered an exception: " + exc.getMessage()); + lastAuditedExceptionMessage = exc.getMessage(); + } + logger.warn("Data frame transform [" + transform.getId() + "] encountered an exception: ", exc); + handleFailure(exc); } @Override @@ -374,4 +450,12 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S shutdown(); } } + + class DataFrameConfigurationException extends RuntimeException { + + DataFrameConfigurationException(String transformId) { + super(DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID, transformId)); + } + + } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml index 32cb3cbd721..56f320d4cb4 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml @@ -59,7 +59,7 @@ teardown: - match: { started: true } - do: - catch: /Cannot start task for data frame transform \[airline-transform-start-stop\], because state was \[STARTED\]/ + catch: /Unable to start data frame transform \[airline-transform-start-stop\] as it is in state \[STARTED\]/ data_frame.start_data_frame_transform: transform_id: "airline-transform-start-stop" @@ -75,7 +75,8 @@ teardown: transform_id: "airline-transform-start-stop" - match: { count: 1 } - match: { transforms.0.id: "airline-transform-start-stop" } - - match: { transforms.0.state.transform_state: "started" } + - match: { transforms.0.state.indexer_state: "started" } + - match: { transforms.0.state.task_state: "started" } - do: data_frame.stop_data_frame_transform: @@ -87,7 +88,8 @@ teardown: transform_id: "airline-transform-start-stop" - match: { count: 1 } - match: { transforms.0.id: "airline-transform-start-stop" } - - match: { transforms.0.state.transform_state: "stopped" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } - do: data_frame.start_data_frame_transform: @@ -99,7 +101,8 @@ teardown: transform_id: "airline-transform-start-stop" - match: { count: 1 } - match: { transforms.0.id: "airline-transform-start-stop" } - - match: { transforms.0.state.transform_state: "started" } + - match: { transforms.0.state.indexer_state: "started" } + - match: { transforms.0.state.task_state: "started" } --- "Test stop missing transform": @@ -114,3 +117,4 @@ teardown: data_frame.stop_data_frame_transform: transform_id: "airline-transform-start-stop" - match: { stopped: true } + diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml index 47c715b0eaf..ac6aca4f35d 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml @@ -46,7 +46,8 @@ teardown: transform_id: "airline-transform-stats" - match: { count: 1 } - match: { transforms.0.id: "airline-transform-stats" } - - match: { transforms.0.state.transform_state: "started" } + - match: { transforms.0.state.indexer_state: "started" } + - match: { transforms.0.state.task_state: "started" } - match: { transforms.0.state.generation: 0 } - match: { transforms.0.stats.pages_processed: 0 } - match: { transforms.0.stats.documents_processed: 0 } @@ -124,18 +125,18 @@ teardown: transform_id: "*" - match: { count: 2 } - match: { transforms.0.id: "airline-transform-stats" } - - match: { transforms.0.state.transform_state: "started" } + - match: { transforms.0.state.indexer_state: "started" } - match: { transforms.1.id: "airline-transform-stats-dos" } - - match: { transforms.1.state.transform_state: "stopped" } + - match: { transforms.1.state.indexer_state: "stopped" } - do: data_frame.get_data_frame_transform_stats: transform_id: "_all" - match: { count: 2 } - match: { transforms.0.id: "airline-transform-stats" } - - match: { transforms.0.state.transform_state: "started" } + - match: { transforms.0.state.indexer_state: "started" } - match: { transforms.1.id: "airline-transform-stats-dos" } - - match: { transforms.1.state.transform_state: "stopped" } + - match: { transforms.1.state.indexer_state: "stopped" } --- "Test get single transform stats when it does not have a task": @@ -157,7 +158,7 @@ teardown: transform_id: "airline-transform-stats-dos" - match: { count: 1 } - match: { transforms.0.id: "airline-transform-stats-dos" } - - match: { transforms.0.state.transform_state: "stopped" } + - match: { transforms.0.state.indexer_state: "stopped" } - match: { transforms.0.state.generation: 0 } - match: { transforms.0.stats.pages_processed: 0 } - match: { transforms.0.stats.documents_processed: 0 }