diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointStats.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointStats.java index 2239fe9f46c..7c8d853768b 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointStats.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointStats.java @@ -19,10 +19,8 @@ package org.elasticsearch.client.dataframe.transforms; -import org.elasticsearch.client.core.IndexerState; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.xcontent.ConstructingObjectParser; -import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; @@ -33,16 +31,14 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona public class DataFrameTransformCheckpointStats { public static final ParseField CHECKPOINT = new ParseField("checkpoint"); - public static final ParseField INDEXER_STATE = new ParseField("indexer_state"); public static final ParseField POSITION = new ParseField("position"); public static final ParseField CHECKPOINT_PROGRESS = new ParseField("checkpoint_progress"); public static final ParseField TIMESTAMP_MILLIS = new ParseField("timestamp_millis"); public static final ParseField TIME_UPPER_BOUND_MILLIS = new ParseField("time_upper_bound_millis"); - public static final DataFrameTransformCheckpointStats EMPTY = new DataFrameTransformCheckpointStats(0L, null, null, null, 0L, 0L); + public static final DataFrameTransformCheckpointStats EMPTY = new DataFrameTransformCheckpointStats(0L, null, null, 0L, 0L); private final long checkpoint; - private final IndexerState indexerState; private final DataFrameIndexerPosition position; private final DataFrameTransformProgress checkpointProgress; private final long timestampMillis; @@ -51,19 +47,16 @@ public class DataFrameTransformCheckpointStats { public static final ConstructingObjectParser LENIENT_PARSER = new ConstructingObjectParser<>( "data_frame_transform_checkpoint_stats", true, args -> { long checkpoint = args[0] == null ? 0L : (Long) args[0]; - IndexerState indexerState = (IndexerState) args[1]; - DataFrameIndexerPosition position = (DataFrameIndexerPosition) args[2]; - DataFrameTransformProgress checkpointProgress = (DataFrameTransformProgress) args[3]; - long timestamp = args[4] == null ? 0L : (Long) args[4]; - long timeUpperBound = args[5] == null ? 0L : (Long) args[5]; + DataFrameIndexerPosition position = (DataFrameIndexerPosition) args[1]; + DataFrameTransformProgress checkpointProgress = (DataFrameTransformProgress) args[2]; + long timestamp = args[3] == null ? 0L : (Long) args[3]; + long timeUpperBound = args[4] == null ? 0L : (Long) args[4]; - return new DataFrameTransformCheckpointStats(checkpoint, indexerState, position, checkpointProgress, timestamp, timeUpperBound); + return new DataFrameTransformCheckpointStats(checkpoint, position, checkpointProgress, timestamp, timeUpperBound); }); static { LENIENT_PARSER.declareLong(optionalConstructorArg(), CHECKPOINT); - LENIENT_PARSER.declareField(optionalConstructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE, - ObjectParser.ValueType.STRING); LENIENT_PARSER.declareObject(optionalConstructorArg(), DataFrameIndexerPosition.PARSER, POSITION); LENIENT_PARSER.declareObject(optionalConstructorArg(), DataFrameTransformProgress.PARSER, CHECKPOINT_PROGRESS); LENIENT_PARSER.declareLong(optionalConstructorArg(), TIMESTAMP_MILLIS); @@ -74,11 +67,10 @@ public class DataFrameTransformCheckpointStats { return LENIENT_PARSER.parse(parser, null); } - public DataFrameTransformCheckpointStats(final long checkpoint, final IndexerState indexerState, - final DataFrameIndexerPosition position, final DataFrameTransformProgress checkpointProgress, - final long timestampMillis, final long timeUpperBoundMillis) { + public DataFrameTransformCheckpointStats(final long checkpoint, final DataFrameIndexerPosition position, + final DataFrameTransformProgress checkpointProgress, final long timestampMillis, + final long timeUpperBoundMillis) { this.checkpoint = checkpoint; - this.indexerState = indexerState; this.position = position; this.checkpointProgress = checkpointProgress; this.timestampMillis = timestampMillis; @@ -89,10 +81,6 @@ public class DataFrameTransformCheckpointStats { return checkpoint; } - public IndexerState getIndexerState() { - return indexerState; - } - public DataFrameIndexerPosition getPosition() { return position; } @@ -111,7 +99,7 @@ public class DataFrameTransformCheckpointStats { @Override public int hashCode() { - return Objects.hash(checkpoint, indexerState, position, checkpointProgress, timestampMillis, timeUpperBoundMillis); + return Objects.hash(checkpoint, position, checkpointProgress, timestampMillis, timeUpperBoundMillis); } @Override @@ -127,7 +115,6 @@ public class DataFrameTransformCheckpointStats { DataFrameTransformCheckpointStats that = (DataFrameTransformCheckpointStats) other; return this.checkpoint == that.checkpoint - && Objects.equals(this.indexerState, that.indexerState) && Objects.equals(this.position, that.position) && Objects.equals(this.checkpointProgress, that.checkpointProgress) && this.timestampMillis == that.timestampMillis diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStats.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStats.java index 4d83d36c109..578bed0d37f 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStats.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStats.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; +import java.util.Locale; import java.util.Objects; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; @@ -33,20 +34,20 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona public class DataFrameTransformStats { public static final ParseField ID = new ParseField("id"); - public static final ParseField TASK_STATE_FIELD = new ParseField("task_state"); + public static final ParseField STATE_FIELD = new ParseField("state"); public static final ParseField REASON_FIELD = new ParseField("reason"); public static final ParseField NODE_FIELD = new ParseField("node"); public static final ParseField STATS_FIELD = new ParseField("stats"); public static final ParseField CHECKPOINTING_INFO_FIELD = new ParseField("checkpointing"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - "data_frame_transform_state_and_stats_info", true, - a -> new DataFrameTransformStats((String) a[0], (DataFrameTransformTaskState) a[1], (String) a[2], - (NodeAttributes) a[3], (DataFrameIndexerTransformStats) a[4], (DataFrameTransformCheckpointingInfo) a[5])); + "data_frame_transform_state_and_stats_info", true, + a -> new DataFrameTransformStats((String) a[0], (State) a[1], (String) a[2], + (NodeAttributes) a[3], (DataFrameIndexerTransformStats) a[4], (DataFrameTransformCheckpointingInfo) a[5])); static { PARSER.declareString(constructorArg(), ID); - PARSER.declareField(optionalConstructorArg(), p -> DataFrameTransformTaskState.fromString(p.text()), TASK_STATE_FIELD, + PARSER.declareField(optionalConstructorArg(), p -> State.fromString(p.text()), STATE_FIELD, ObjectParser.ValueType.STRING); PARSER.declareString(optionalConstructorArg(), REASON_FIELD); PARSER.declareField(optionalConstructorArg(), NodeAttributes.PARSER::apply, NODE_FIELD, ObjectParser.ValueType.OBJECT); @@ -61,16 +62,15 @@ public class DataFrameTransformStats { private final String id; private final String reason; - private final DataFrameTransformTaskState taskState; + private final State state; private final NodeAttributes node; private final DataFrameIndexerTransformStats indexerStats; private final DataFrameTransformCheckpointingInfo checkpointingInfo; - public DataFrameTransformStats(String id, DataFrameTransformTaskState taskState, String reason, NodeAttributes node, - DataFrameIndexerTransformStats stats, + public DataFrameTransformStats(String id, State state, String reason, NodeAttributes node, DataFrameIndexerTransformStats stats, DataFrameTransformCheckpointingInfo checkpointingInfo) { this.id = id; - this.taskState = taskState; + this.state = state; this.reason = reason; this.node = node; this.indexerStats = stats; @@ -81,8 +81,8 @@ public class DataFrameTransformStats { return id; } - public DataFrameTransformTaskState getTaskState() { - return taskState; + public State getState() { + return state; } public String getReason() { @@ -103,7 +103,7 @@ public class DataFrameTransformStats { @Override public int hashCode() { - return Objects.hash(id, taskState, reason, node, indexerStats, checkpointingInfo); + return Objects.hash(id, state, reason, node, indexerStats, checkpointingInfo); } @Override @@ -119,10 +119,23 @@ public class DataFrameTransformStats { DataFrameTransformStats that = (DataFrameTransformStats) other; return Objects.equals(this.id, that.id) - && Objects.equals(this.taskState, that.taskState) + && Objects.equals(this.state, that.state) && Objects.equals(this.reason, that.reason) && Objects.equals(this.node, that.node) && Objects.equals(this.indexerStats, that.indexerStats) && Objects.equals(this.checkpointingInfo, that.checkpointingInfo); } + + public enum State { + + STARTED, INDEXING, ABORTING, STOPPING, STOPPED, FAILED; + + public static State fromString(String name) { + return valueOf(name.trim().toUpperCase(Locale.ROOT)); + } + + public String value() { + return name().toLowerCase(Locale.ROOT); + } + } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformTaskState.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformTaskState.java deleted file mode 100644 index 7235a0aed28..00000000000 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformTaskState.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.client.dataframe.transforms; - -import java.util.Locale; - -public enum DataFrameTransformTaskState { - STOPPED, STARTED, FAILED; - - public static DataFrameTransformTaskState fromString(String name) { - return valueOf(name.trim().toUpperCase(Locale.ROOT)); - } - - public String value() { - return name().toLowerCase(Locale.ROOT); - } -} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java index a6899930086..2e0175f4164 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java @@ -44,7 +44,6 @@ import org.elasticsearch.client.dataframe.transforms.DataFrameIndexerTransformSt import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigUpdate; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStats; -import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState; import org.elasticsearch.client.dataframe.transforms.DestConfig; import org.elasticsearch.client.dataframe.transforms.SourceConfig; import org.elasticsearch.client.dataframe.transforms.TimeSyncConfig; @@ -306,10 +305,11 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase { GetDataFrameTransformStatsResponse statsResponse = execute(new GetDataFrameTransformStatsRequest(id), client::getDataFrameTransformStats, client::getDataFrameTransformStatsAsync); assertThat(statsResponse.getTransformsStats(), hasSize(1)); - DataFrameTransformTaskState taskState = statsResponse.getTransformsStats().get(0).getTaskState(); + DataFrameTransformStats.State taskState = statsResponse.getTransformsStats().get(0).getState(); // Since we are non-continuous, the transform could auto-stop between being started earlier and us gathering the statistics - assertThat(taskState, is(oneOf(DataFrameTransformTaskState.STARTED, DataFrameTransformTaskState.STOPPED))); + assertThat(taskState, oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING, + DataFrameTransformStats.State.STOPPING, DataFrameTransformStats.State.STOPPED)); StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, Boolean.TRUE, null); StopDataFrameTransformResponse stopResponse = @@ -321,8 +321,8 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase { // Calling stop with wait_for_completion assures that we will be in the `STOPPED` state for the transform task statsResponse = execute(new GetDataFrameTransformStatsRequest(id), client::getDataFrameTransformStats, client::getDataFrameTransformStatsAsync); - taskState = statsResponse.getTransformsStats().get(0).getTaskState(); - assertThat(taskState, is(DataFrameTransformTaskState.STOPPED)); + taskState = statsResponse.getTransformsStats().get(0).getState(); + assertThat(taskState, is(DataFrameTransformStats.State.STOPPED)); } @SuppressWarnings("unchecked") @@ -405,7 +405,7 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase { assertEquals(1, statsResponse.getTransformsStats().size()); DataFrameTransformStats stats = statsResponse.getTransformsStats().get(0); - assertEquals(DataFrameTransformTaskState.STOPPED, stats.getTaskState()); + assertEquals(DataFrameTransformStats.State.STOPPED, stats.getState()); DataFrameIndexerTransformStats zeroIndexerStats = new DataFrameIndexerTransformStats(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L); assertEquals(zeroIndexerStats, stats.getIndexerStats()); @@ -420,8 +420,8 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase { client::getDataFrameTransformStats, client::getDataFrameTransformStatsAsync); DataFrameTransformStats stateAndStats = response.getTransformsStats().get(0); assertNotEquals(zeroIndexerStats, stateAndStats.getIndexerStats()); - assertThat(stateAndStats.getTaskState(), - is(oneOf(DataFrameTransformTaskState.STARTED, DataFrameTransformTaskState.STOPPED))); + assertThat(stateAndStats.getState(), oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING, + DataFrameTransformStats.State.STOPPING, DataFrameTransformStats.State.STOPPED)); assertThat(stateAndStats.getReason(), is(nullValue())); }); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointStatsTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointStatsTests.java index 4d4ba5967e7..ec7e8b6422e 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointStatsTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformCheckpointStatsTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.client.dataframe.transforms; -import org.elasticsearch.client.core.IndexerState; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.test.ESTestCase; @@ -41,7 +40,6 @@ public class DataFrameTransformCheckpointStatsTests extends ESTestCase { public static DataFrameTransformCheckpointStats randomDataFrameTransformCheckpointStats() { return new DataFrameTransformCheckpointStats(randomLongBetween(1, 1_000_000), - randomBoolean() ? null : randomFrom(IndexerState.values()), randomBoolean() ? null : DataFrameIndexerPositionTests.randomDataFrameIndexerPosition(), randomBoolean() ? null : DataFrameTransformProgressTests.randomInstance(), randomLongBetween(1, 1_000_000), randomLongBetween(0, 1_000_000)); @@ -50,9 +48,6 @@ public class DataFrameTransformCheckpointStatsTests extends ESTestCase { public static void toXContent(DataFrameTransformCheckpointStats stats, XContentBuilder builder) throws IOException { builder.startObject(); builder.field(DataFrameTransformCheckpointStats.CHECKPOINT.getPreferredName(), stats.getCheckpoint()); - if (stats.getIndexerState() != null) { - builder.field(DataFrameTransformCheckpointStats.INDEXER_STATE.getPreferredName(), stats.getIndexerState().value()); - } if (stats.getPosition() != null) { builder.field(DataFrameTransformCheckpointStats.POSITION.getPreferredName()); DataFrameIndexerPositionTests.toXContent(stats.getPosition(), builder); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStatsTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStatsTests.java index af3bf53704a..ae252069c61 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStatsTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStatsTests.java @@ -41,7 +41,7 @@ public class DataFrameTransformStatsTests extends ESTestCase { public static DataFrameTransformStats randomInstance() { return new DataFrameTransformStats(randomAlphaOfLength(10), - randomBoolean() ? null : randomFrom(DataFrameTransformTaskState.values()), + randomBoolean() ? null : randomFrom(DataFrameTransformStats.State.values()), randomBoolean() ? null : randomAlphaOfLength(100), randomBoolean() ? null : NodeAttributesTests.createRandom(), DataFrameIndexerTransformStatsTests.randomStats(), @@ -51,9 +51,9 @@ public class DataFrameTransformStatsTests extends ESTestCase { public static void toXContent(DataFrameTransformStats stats, XContentBuilder builder) throws IOException { builder.startObject(); builder.field(DataFrameTransformStats.ID.getPreferredName(), stats.getId()); - if (stats.getTaskState() != null) { - builder.field(DataFrameTransformStats.TASK_STATE_FIELD.getPreferredName(), - stats.getTaskState().value()); + if (stats.getState() != null) { + builder.field(DataFrameTransformStats.STATE_FIELD.getPreferredName(), + stats.getState().value()); } if (stats.getReason() != null) { builder.field(DataFrameTransformStats.REASON_FIELD.getPreferredName(), stats.getReason()); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformCheckpointStatsTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformCheckpointStatsTests.java index 0a41cfc85e9..e0976f040de 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformCheckpointStatsTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformCheckpointStatsTests.java @@ -22,7 +22,6 @@ package org.elasticsearch.client.dataframe.transforms.hlrc; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.client.AbstractHlrcXContentTestCase; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStats; -import org.elasticsearch.xpack.core.indexing.IndexerState; import java.io.IOException; import java.util.function.Predicate; @@ -34,7 +33,6 @@ public class DataFrameTransformCheckpointStatsTests extends AbstractHlrcXContent public static DataFrameTransformCheckpointStats fromHlrc( org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats instance) { return new DataFrameTransformCheckpointStats(instance.getCheckpoint(), - (instance.getIndexerState() != null) ? IndexerState.fromString(instance.getIndexerState().value()) : null, DataFrameIndexerPositionTests.fromHlrc(instance.getPosition()), DataFrameTransformProgressTests.fromHlrc(instance.getCheckpointProgress()), instance.getTimestampMillis(), @@ -55,7 +53,6 @@ public class DataFrameTransformCheckpointStatsTests extends AbstractHlrcXContent public static DataFrameTransformCheckpointStats randomDataFrameTransformCheckpointStats() { return new DataFrameTransformCheckpointStats(randomLongBetween(1, 1_000_000), - randomBoolean() ? null : randomFrom(IndexerState.values()), DataFrameIndexerPositionTests.randomDataFrameIndexerPosition(), randomBoolean() ? null : DataFrameTransformProgressTests.randomDataFrameTransformProgress(), randomLongBetween(1, 1_000_000), randomLongBetween(0, 1_000_000)); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStatsTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStatsTests.java index e5dd37fcd9d..e65ecf10d96 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStatsTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStatsTests.java @@ -26,9 +26,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheck import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStats; -import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; import org.elasticsearch.xpack.core.dataframe.transforms.NodeAttributes; -import org.elasticsearch.xpack.core.indexing.IndexerState; import java.io.IOException; import java.util.HashMap; @@ -50,7 +48,7 @@ public class DataFrameTransformStatsTests extends AbstractHlrcXContentTestCase - DataFrameTransformTaskState taskState = - stateAndStatsInfo.getTaskState(); // <2> - IndexerState indexerState = - stateAndStatsInfo.getCheckpointingInfo() - .getNext().getIndexerState(); // <3> - DataFrameIndexerTransformStats transformStats = - stateAndStatsInfo.getIndexerStats(); // <4> + DataFrameTransformStats.State state = + stats.getState(); // <2> + DataFrameIndexerTransformStats indexerStats = + stats.getIndexerStats(); // <3> DataFrameTransformProgress progress = - stateAndStatsInfo.getCheckpointingInfo() - .getNext().getCheckpointProgress(); // <5> + stats.getCheckpointingInfo() + .getNext().getCheckpointProgress(); // <4> NodeAttributes node = - stateAndStatsInfo.getNode(); // <6> + stats.getNode(); // <5> // end::get-data-frame-transform-stats-response - assertEquals(DataFrameTransformTaskState.STOPPED, taskState); - assertNotNull(transformStats); + assertEquals(DataFrameTransformStats.State.STOPPED, state); + assertNotNull(indexerStats); assertNull(progress); } { diff --git a/docs/java-rest/high-level/dataframe/get_data_frame_stats.asciidoc b/docs/java-rest/high-level/dataframe/get_data_frame_stats.asciidoc index 578ea808b9e..76223e61c1d 100644 --- a/docs/java-rest/high-level/dataframe/get_data_frame_stats.asciidoc +++ b/docs/java-rest/high-level/dataframe/get_data_frame_stats.asciidoc @@ -48,9 +48,8 @@ The returned +{response}+ contains the requested {dataframe-transform} statistic include-tagged::{doc-tests-file}[{api}-response] -------------------------------------------------- <1> The response contains a list of `DataFrameTransformStats` objects -<2> The running state of the transform task e.g `started` -<3> The running state of the transform indexer e.g `started`, `indexing`, etc. -<4> The overall transform statistics recording the number of documents indexed etc. -<5> The progress of the current run in the transform. Supplies the number of docs left until the next checkpoint +<2> The running state of the transform, for example `started`, `indexing`, etc. +<3> The overall transform statistics recording the number of documents indexed etc. +<4> The progress of the current run in the transform. Supplies the number of docs left until the next checkpoint and the total number of docs expected. -<6> The assigned node information if the task is currently assigned to a node and running. +<5> The assigned node information if the task is currently assigned to a node and running. diff --git a/docs/reference/data-frames/apis/get-transform-stats.asciidoc b/docs/reference/data-frames/apis/get-transform-stats.asciidoc index 88536b93f9a..9235e189575 100644 --- a/docs/reference/data-frames/apis/get-transform-stats.asciidoc +++ b/docs/reference/data-frames/apis/get-transform-stats.asciidoc @@ -126,7 +126,7 @@ The API returns the following results: "transforms" : [ { "id" : "ecommerce_transform", - "task_state" : "started", + "state" : "indexing", "stats" : { "pages_processed" : 2, "documents_processed" : 1220, @@ -147,7 +147,6 @@ The API returns the following results: }, "next" : { "checkpoint" : 101, - "indexer_state" : "started", "position" : { "indexer_position" : { "hashtag" : "abcd1234" diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointStats.java index 1734bca7fcc..5a19bfd3000 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointStats.java @@ -11,12 +11,10 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ConstructingObjectParser; -import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.core.dataframe.DataFrameField; -import org.elasticsearch.xpack.core.indexing.IndexerState; import java.io.IOException; import java.util.Objects; @@ -30,10 +28,9 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona */ public class DataFrameTransformCheckpointStats implements Writeable, ToXContentObject { - public static final DataFrameTransformCheckpointStats EMPTY = new DataFrameTransformCheckpointStats(0L, null, null, null, 0L, 0L); + public static final DataFrameTransformCheckpointStats EMPTY = new DataFrameTransformCheckpointStats(0L, null, null, 0L, 0L); private final long checkpoint; - private final IndexerState indexerState; private final DataFrameIndexerPosition position; private final DataFrameTransformProgress checkpointProgress; private final long timestampMillis; @@ -42,30 +39,26 @@ public class DataFrameTransformCheckpointStats implements Writeable, ToXContentO static final ConstructingObjectParser LENIENT_PARSER = new ConstructingObjectParser<>( "data_frame_transform_checkpoint_stats", true, args -> { long checkpoint = args[0] == null ? 0L : (Long) args[0]; - IndexerState indexerState = (IndexerState) args[1]; - DataFrameIndexerPosition position = (DataFrameIndexerPosition) args[2]; - DataFrameTransformProgress checkpointProgress = (DataFrameTransformProgress) args[3]; - long timestamp = args[4] == null ? 0L : (Long) args[4]; - long timeUpperBound = args[5] == null ? 0L : (Long) args[5]; + DataFrameIndexerPosition position = (DataFrameIndexerPosition) args[1]; + DataFrameTransformProgress checkpointProgress = (DataFrameTransformProgress) args[2]; + long timestamp = args[3] == null ? 0L : (Long) args[3]; + long timeUpperBound = args[4] == null ? 0L : (Long) args[4]; - return new DataFrameTransformCheckpointStats(checkpoint, indexerState, position, checkpointProgress, timestamp, timeUpperBound); + return new DataFrameTransformCheckpointStats(checkpoint, position, checkpointProgress, timestamp, timeUpperBound); }); static { LENIENT_PARSER.declareLong(optionalConstructorArg(), DataFrameField.CHECKPOINT); - LENIENT_PARSER.declareField(optionalConstructorArg(), p -> IndexerState.fromString(p.text()), DataFrameField.INDEXER_STATE, - ObjectParser.ValueType.STRING); LENIENT_PARSER.declareObject(optionalConstructorArg(), DataFrameIndexerPosition.PARSER, DataFrameField.POSITION); LENIENT_PARSER.declareObject(optionalConstructorArg(), DataFrameTransformProgress.PARSER, DataFrameField.CHECKPOINT_PROGRESS); LENIENT_PARSER.declareLong(optionalConstructorArg(), DataFrameField.TIMESTAMP_MILLIS); LENIENT_PARSER.declareLong(optionalConstructorArg(), DataFrameField.TIME_UPPER_BOUND_MILLIS); } - public DataFrameTransformCheckpointStats(final long checkpoint, final IndexerState indexerState, - final DataFrameIndexerPosition position, final DataFrameTransformProgress checkpointProgress, - final long timestampMillis, final long timeUpperBoundMillis) { + public DataFrameTransformCheckpointStats(final long checkpoint, final DataFrameIndexerPosition position, + final DataFrameTransformProgress checkpointProgress, final long timestampMillis, + final long timeUpperBoundMillis) { this.checkpoint = checkpoint; - this.indexerState = indexerState; this.position = position; this.checkpointProgress = checkpointProgress; this.timestampMillis = timestampMillis; @@ -75,11 +68,6 @@ public class DataFrameTransformCheckpointStats implements Writeable, ToXContentO public DataFrameTransformCheckpointStats(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_7_4_0)) { this.checkpoint = in.readVLong(); - if (in.readBoolean()) { - this.indexerState = in.readEnum(IndexerState.class); - } else { - this.indexerState = null; - } if (in.readBoolean()) { this.position = new DataFrameIndexerPosition(in); } else { @@ -92,7 +80,6 @@ public class DataFrameTransformCheckpointStats implements Writeable, ToXContentO } } else { this.checkpoint = 0; - this.indexerState = null; this.position = null; this.checkpointProgress = null; } @@ -104,10 +91,6 @@ public class DataFrameTransformCheckpointStats implements Writeable, ToXContentO return checkpoint; } - public IndexerState getIndexerState() { - return indexerState; - } - public DataFrameIndexerPosition getPosition() { return position; } @@ -128,9 +111,6 @@ public class DataFrameTransformCheckpointStats implements Writeable, ToXContentO public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field(DataFrameField.CHECKPOINT.getPreferredName(), checkpoint); - if (indexerState != null) { - builder.field(DataFrameField.INDEXER_STATE.getPreferredName(), indexerState.value()); - } if (position != null) { builder.field(DataFrameField.POSITION.getPreferredName(), position); } @@ -153,12 +133,6 @@ public class DataFrameTransformCheckpointStats implements Writeable, ToXContentO public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_4_0)) { out.writeVLong(checkpoint); - if (indexerState != null) { - out.writeBoolean(true); - out.writeEnum(indexerState); - } else { - out.writeBoolean(false); - } if (position != null) { out.writeBoolean(true); position.writeTo(out); @@ -178,7 +152,7 @@ public class DataFrameTransformCheckpointStats implements Writeable, ToXContentO @Override public int hashCode() { - return Objects.hash(checkpoint, indexerState, position, checkpointProgress, timestampMillis, timeUpperBoundMillis); + return Objects.hash(checkpoint, position, checkpointProgress, timestampMillis, timeUpperBoundMillis); } @Override @@ -194,7 +168,6 @@ public class DataFrameTransformCheckpointStats implements Writeable, ToXContentO DataFrameTransformCheckpointStats that = (DataFrameTransformCheckpointStats) other; return this.checkpoint == that.checkpoint - && Objects.equals(this.indexerState, that.indexerState) && Objects.equals(this.position, that.position) && Objects.equals(this.checkpointProgress, that.checkpointProgress) && this.timestampMillis == that.timestampMillis diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStats.java index 865bc07931d..6b58374925b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStats.java @@ -10,6 +10,7 @@ import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -19,8 +20,10 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.core.dataframe.DataFrameField; +import org.elasticsearch.xpack.core.indexing.IndexerState; import java.io.IOException; +import java.util.Locale; import java.util.Objects; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; @@ -34,13 +37,13 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona public class DataFrameTransformStats implements Writeable, ToXContentObject { public static final String NAME = "data_frame_transform_stats"; - public static final ParseField TASK_STATE_FIELD = new ParseField("task_state"); + public static final ParseField STATE_FIELD = new ParseField("state"); public static final ParseField REASON_FIELD = new ParseField("reason"); public static final ParseField NODE_FIELD = new ParseField("node"); public static final ParseField CHECKPOINTING_INFO_FIELD = new ParseField("checkpointing"); private final String id; - private final DataFrameTransformTaskState taskState; + private final State state; @Nullable private final String reason; @Nullable @@ -52,7 +55,7 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject { NAME, true, a -> new DataFrameTransformStats((String) a[0], - (DataFrameTransformTaskState) a[1], + (State) a[1], (String) a[2], (NodeAttributes) a[3], (DataFrameIndexerTransformStats) a[4], @@ -60,7 +63,7 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject { static { PARSER.declareString(constructorArg(), DataFrameField.ID); - PARSER.declareField(constructorArg(), p -> DataFrameTransformTaskState.fromString(p.text()), TASK_STATE_FIELD, + PARSER.declareField(constructorArg(), p -> DataFrameTransformStats.State.fromString(p.text()), STATE_FIELD, ObjectParser.ValueType.STRING); PARSER.declareString(optionalConstructorArg(), REASON_FIELD); PARSER.declareField(optionalConstructorArg(), NodeAttributes.PARSER::apply, NODE_FIELD, ObjectParser.ValueType.OBJECT); @@ -80,7 +83,7 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject { public static DataFrameTransformStats stoppedStats(String id, DataFrameIndexerTransformStats indexerTransformStats) { return new DataFrameTransformStats(id, - DataFrameTransformTaskState.STOPPED, + State.STOPPED, null, null, indexerTransformStats, @@ -88,11 +91,11 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject { } - public DataFrameTransformStats(String id, DataFrameTransformTaskState taskState, @Nullable String reason, + public DataFrameTransformStats(String id, State state, @Nullable String reason, @Nullable NodeAttributes node, DataFrameIndexerTransformStats stats, DataFrameTransformCheckpointingInfo checkpointingInfo) { this.id = Objects.requireNonNull(id); - this.taskState = Objects.requireNonNull(taskState); + this.state = Objects.requireNonNull(state); this.reason = reason; this.node = node; this.indexerStats = Objects.requireNonNull(stats); @@ -102,7 +105,7 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject { public DataFrameTransformStats(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_7_4_0)) { this.id = in.readString(); - this.taskState = in.readEnum(DataFrameTransformTaskState.class); + this.state = in.readEnum(State.class); this.reason = in.readOptionalString(); if (in.readBoolean()) { this.node = new NodeAttributes(in); @@ -117,9 +120,9 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject { // to do the best we can of reading from a DataFrameTransformStoredDoc object // (which is called DataFrameTransformStateAndStats in 7.2/7.3) this.id = in.readString(); - DataFrameTransformState state = new DataFrameTransformState(in); - this.taskState = state.getTaskState(); - this.reason = state.getReason(); + DataFrameTransformState transformState = new DataFrameTransformState(in); + this.state = State.fromComponents(transformState.getTaskState(), transformState.getIndexerState()); + this.reason = transformState.getReason(); this.node = null; this.indexerStats = new DataFrameIndexerTransformStats(in); this.checkpointingInfo = new DataFrameTransformCheckpointingInfo(in); @@ -130,7 +133,7 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field(DataFrameField.ID.getPreferredName(), id); - builder.field(TASK_STATE_FIELD.getPreferredName(), taskState.value()); + builder.field(STATE_FIELD.getPreferredName(), state.value()); if (reason != null) { builder.field(REASON_FIELD.getPreferredName(), reason); } @@ -147,7 +150,7 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject { public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_4_0)) { out.writeString(id); - out.writeEnum(taskState); + out.writeEnum(state); out.writeOptionalString(reason); if (node != null) { out.writeBoolean(true); @@ -162,8 +165,9 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject { // to do the best we can of writing to a DataFrameTransformStoredDoc object // (which is called DataFrameTransformStateAndStats in 7.2/7.3) out.writeString(id); - new DataFrameTransformState(taskState, - checkpointingInfo.getNext().getIndexerState(), + Tuple stateComponents = state.toComponents(); + new DataFrameTransformState(stateComponents.v1(), + stateComponents.v2(), checkpointingInfo.getNext().getPosition(), checkpointingInfo.getLast().getCheckpoint(), reason, @@ -176,7 +180,7 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject { @Override public int hashCode() { - return Objects.hash(id, taskState, reason, node, indexerStats, checkpointingInfo); + return Objects.hash(id, state, reason, node, indexerStats, checkpointingInfo); } @Override @@ -192,7 +196,7 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject { DataFrameTransformStats that = (DataFrameTransformStats) other; return Objects.equals(this.id, that.id) - && Objects.equals(this.taskState, that.taskState) + && Objects.equals(this.state, that.state) && Objects.equals(this.reason, that.reason) && Objects.equals(this.node, that.node) && Objects.equals(this.indexerStats, that.indexerStats) @@ -203,8 +207,8 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject { return id; } - public DataFrameTransformTaskState getTaskState() { - return taskState; + public State getState() { + return state; } @Nullable @@ -233,4 +237,79 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject { public String toString() { return Strings.toString(this); } + + public enum State implements Writeable { + + STARTED, INDEXING, ABORTING, STOPPING, STOPPED, FAILED; + + public static State fromString(String name) { + return valueOf(name.trim().toUpperCase(Locale.ROOT)); + } + + public static State fromStream(StreamInput in) throws IOException { + return in.readEnum(State.class); + } + + public static State fromComponents(DataFrameTransformTaskState taskState, IndexerState indexerState) { + + if (taskState == null || taskState == DataFrameTransformTaskState.STOPPED) { + return STOPPED; + } else if (taskState == DataFrameTransformTaskState.FAILED) { + return FAILED; + } else { + + // If we get here then the task state must be started, and that means we should have an indexer state + assert(taskState == DataFrameTransformTaskState.STARTED); + assert(indexerState != null); + + switch (indexerState) { + case STARTED: + return STARTED; + case INDEXING: + return INDEXING; + case STOPPING: + return STOPPING; + case STOPPED: + return STOPPED; + case ABORTING: + return ABORTING; + default: + throw new IllegalStateException("Unexpected indexer state enum value: " + indexerState); + } + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeEnum(this); + } + + public String value() { + return name().toLowerCase(Locale.ROOT); + } + + public Tuple toComponents() { + + switch (this) { + case STARTED: + return new Tuple<>(DataFrameTransformTaskState.STARTED, IndexerState.STARTED); + case INDEXING: + return new Tuple<>(DataFrameTransformTaskState.STARTED, IndexerState.INDEXING); + case ABORTING: + return new Tuple<>(DataFrameTransformTaskState.STARTED, IndexerState.ABORTING); + case STOPPING: + return new Tuple<>(DataFrameTransformTaskState.STARTED, IndexerState.STOPPING); + case STOPPED: + // This one is not deterministic, because an overall state of STOPPED could arise + // from either (STOPPED, null) or (STARTED, STOPPED). However, (STARTED, STOPPED) + // is a very short-lived state so it's reasonable to assume the other, especially + // as this method is only for mixed version cluster compatibility. + return new Tuple<>(DataFrameTransformTaskState.STOPPED, null); + case FAILED: + return new Tuple<>(DataFrameTransformTaskState.FAILED, null); + default: + throw new IllegalStateException("Unexpected state enum value: " + this); + } + } + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointStatsTests.java index 4ea93c4c519..4145d773eee 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointStatsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointStatsTests.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.core.dataframe.transforms; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.xpack.core.indexing.IndexerState; import java.io.IOException; @@ -16,7 +15,6 @@ public class DataFrameTransformCheckpointStatsTests extends AbstractSerializingD { public static DataFrameTransformCheckpointStats randomDataFrameTransformCheckpointStats() { return new DataFrameTransformCheckpointStats(randomLongBetween(1, 1_000_000), - randomBoolean() ? null : randomFrom(IndexerState.values()), DataFrameIndexerPositionTests.randomDataFrameIndexerPosition(), randomBoolean() ? null : DataFrameTransformProgressTests.randomDataFrameTransformProgress(), randomLongBetween(1, 1_000_000), randomLongBetween(0, 1_000_000)); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStatsTests.java index d9409bcec45..f438d6cfcf6 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStatsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStatsTests.java @@ -17,7 +17,7 @@ public class DataFrameTransformStatsTests extends AbstractSerializingTestCase>)XContentMapValues.extractValue("transforms", stats); assertEquals(1, transformsStats.size()); - assertEquals("stopped", XContentMapValues.extractValue("task_state", transformsStats.get(0))); + assertEquals("stopped", XContentMapValues.extractValue("state", transformsStats.get(0))); assertNull(XContentMapValues.extractValue("checkpointing.next.position", transformsStats.get(0))); assertEquals(1, XContentMapValues.extractValue("checkpointing.last.checkpoint", transformsStats.get(0))); @@ -125,7 +126,7 @@ public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase { transformsStats = (List>)XContentMapValues.extractValue("transforms", stats); assertEquals(1, transformsStats.size()); - assertEquals("started", XContentMapValues.extractValue("task_state", transformsStats.get(0))); + assertThat(XContentMapValues.extractValue("state", transformsStats.get(0)), oneOf("started", "indexing")); assertEquals(1, XContentMapValues.extractValue("checkpointing.last.checkpoint", transformsStats.get(0))); diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java index 6ca60bd6654..09a6f1ee56a 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java @@ -300,8 +300,7 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase { void waitForDataFrameStopped(String transformId) throws Exception { assertBusy(() -> { - assertEquals("stopped", getDataFrameTaskState(transformId)); - assertEquals("stopped", getDataFrameIndexerState(transformId)); + assertEquals("stopped", getDataFrameTransformState(transformId)); }, 15, TimeUnit.SECONDS); } @@ -326,19 +325,9 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase { return transformConfigs == null ? Collections.emptyList() : transformConfigs; } - protected static String getDataFrameIndexerState(String transformId) throws IOException { + protected static String getDataFrameTransformState(String transformId) throws IOException { Map transformStatsAsMap = getDataFrameState(transformId); - if (transformStatsAsMap == null) { - return null; - } - String indexerState = (String) XContentMapValues.extractValue("checkpointing.next.indexer_state", transformStatsAsMap); - // If the transform is stopped then it might not have an indexer state, but logically that's the same as the indexer being stopped - return indexerState == null ? "stopped" : indexerState; - } - - protected static String getDataFrameTaskState(String transformId) throws IOException { - Map transformStatsAsMap = getDataFrameState(transformId); - return transformStatsAsMap == null ? null : (String) XContentMapValues.extractValue("task_state", transformStatsAsMap); + return transformStatsAsMap == null ? null : (String) XContentMapValues.extractValue("state", transformStatsAsMap); } protected static Map getDataFrameState(String transformId) throws IOException { @@ -378,10 +367,12 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase { request.addParameter("timeout", "10s"); request.addParameter("ignore", "404"); adminClient().performRequest(request); - String state = getDataFrameIndexerState(transformId); - if (state != null) { - assertEquals("stopped", getDataFrameIndexerState(transformId)); - } + } + + for (Map transformConfig : transformConfigs) { + String transformId = (String) transformConfig.get("id"); + String state = getDataFrameTransformState(transformId); + assertEquals("Transform [" + transformId + "] is not in the stopped state", "stopped", state); } for (Map transformConfig : transformConfigs) { diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java index e0300c96713..9551e4bf854 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java @@ -14,7 +14,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStats; import org.junit.After; import org.junit.Before; @@ -29,6 +29,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.oneOf; public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase { @@ -60,7 +61,7 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase { createContinuousPivotReviewsTransform(transformId, dataFrameIndex, null); failureTransforms.add(transformId); startDataframeTransform(transformId, false); - awaitState(transformId, DataFrameTransformTaskState.FAILED); + awaitState(transformId, DataFrameTransformStats.State.FAILED); Map fullState = getDataFrameState(transformId); final String failureReason = "task encountered more than 0 failures; latest failure: " + "Bulk index experienced failures. See the logs of the node running the transform for details."; @@ -78,7 +79,7 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase { // Verify that we can force stop a failed transform stopDataFrameTransform(transformId, true); - awaitState(transformId, DataFrameTransformTaskState.STOPPED); + awaitState(transformId, DataFrameTransformStats.State.STOPPED); fullState = getDataFrameState(transformId); assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue())); } @@ -91,7 +92,7 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase { createContinuousPivotReviewsTransform(transformId, dataFrameIndex, null); failureTransforms.add(transformId); startDataframeTransform(transformId, false); - awaitState(transformId, DataFrameTransformTaskState.FAILED); + awaitState(transformId, DataFrameTransformStats.State.FAILED); Map fullState = getDataFrameState(transformId); final String failureReason = "task encountered more than 0 failures; latest failure: " + "Bulk index experienced failures. See the logs of the node running the transform for details."; @@ -114,15 +115,15 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase { // Verify that we have started and that our reason is cleared fullState = getDataFrameState(transformId); assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue())); - assertThat(XContentMapValues.extractValue("task_state", fullState), equalTo("started")); + assertThat(XContentMapValues.extractValue("state", fullState), oneOf("started", "indexing")); assertThat((Integer)XContentMapValues.extractValue("stats.index_failures", fullState), greaterThanOrEqualTo(1)); stopDataFrameTransform(transformId, true); } - private void awaitState(String transformId, DataFrameTransformTaskState state) throws Exception { + private void awaitState(String transformId, DataFrameTransformStats.State state) throws Exception { assertBusy(() -> { - String currentState = getDataFrameTaskState(transformId); + String currentState = getDataFrameTransformState(transformId); assertThat(currentState, equalTo(state.value())); }, 180, TimeUnit.SECONDS); // It should not take this long, but if the scheduler gets deferred, it could } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java index 1b51e0e4018..c3b6bd39564 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java @@ -30,7 +30,6 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheck import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStoredDoc; -import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; import org.elasticsearch.xpack.core.dataframe.transforms.NodeAttributes; import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; @@ -90,7 +89,7 @@ public class TransportGetDataFrameTransformsStatsAction extends task.getCheckpointingInfo(transformsCheckpointService, ActionListener.wrap( checkpointingInfo -> listener.onResponse(new Response( Collections.singletonList(new DataFrameTransformStats(task.getTransformId(), - transformState.getTaskState(), + DataFrameTransformStats.State.fromComponents(transformState.getTaskState(), transformState.getIndexerState()), transformState.getReason(), null, task.getStats(), @@ -100,7 +99,7 @@ public class TransportGetDataFrameTransformsStatsAction extends logger.warn("Failed to retrieve checkpointing info for transform [" + task.getTransformId() + "]", e); listener.onResponse(new Response( Collections.singletonList(new DataFrameTransformStats(task.getTransformId(), - transformState.getTaskState(), + DataFrameTransformStats.State.fromComponents(transformState.getTaskState(), transformState.getIndexerState()), transformState.getReason(), null, task.getStats(), @@ -223,7 +222,6 @@ public class TransportGetDataFrameTransformsStatsAction extends transformsCheckpointService.getCheckpointingInfo( transform.getId(), transform.getTransformState().getCheckpoint(), - transform.getTransformState().getIndexerState(), transform.getTransformState().getPosition(), transform.getTransformState().getProgress(), ActionListener.wrap( @@ -254,7 +252,7 @@ public class TransportGetDataFrameTransformsStatsAction extends synchronized (allStateAndStats) { allStateAndStats.add(new DataFrameTransformStats( stat.getId(), - DataFrameTransformTaskState.STOPPED, + DataFrameTransformStats.State.STOPPED, null, null, stat.getTransformStats(), diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/CheckpointProvider.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/CheckpointProvider.java index a42f5f4bbae..8dcab5879fb 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/CheckpointProvider.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/CheckpointProvider.java @@ -11,7 +11,6 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerPositio import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress; -import org.elasticsearch.xpack.core.indexing.IndexerState; /** * Interface for checkpoint creation, checking for changes and getting statistics about checkpoints @@ -41,14 +40,12 @@ public interface CheckpointProvider { * * @param lastCheckpoint the last checkpoint * @param nextCheckpoint the next checkpoint - * @param nextCheckpointIndexerState indexer state for the next checkpoint * @param nextCheckpointPosition position for the next checkpoint * @param nextCheckpointProgress progress for the next checkpoint * @param listener listener to retrieve the result */ void getCheckpointingInfo(DataFrameTransformCheckpoint lastCheckpoint, DataFrameTransformCheckpoint nextCheckpoint, - IndexerState nextCheckpointIndexerState, DataFrameIndexerPosition nextCheckpointPosition, DataFrameTransformProgress nextCheckpointProgress, ActionListener listener); @@ -59,13 +56,11 @@ public interface CheckpointProvider { * For stopped data frames we need to do lookups in the internal index. * * @param lastCheckpointNumber the last checkpoint number - * @param nextCheckpointIndexerState indexer state for the next checkpoint * @param nextCheckpointPosition position for the next checkpoint * @param nextCheckpointProgress progress for the next checkpoint * @param listener listener to retrieve the result */ void getCheckpointingInfo(long lastCheckpointNumber, - IndexerState nextCheckpointIndexerState, DataFrameIndexerPosition nextCheckpointPosition, DataFrameTransformProgress nextCheckpointProgress, ActionListener listener); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java index 24e03114948..f1cf7efe23b 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java @@ -15,7 +15,6 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheck import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress; import org.elasticsearch.xpack.core.dataframe.transforms.TimeSyncConfig; -import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; /** @@ -52,14 +51,12 @@ public class DataFrameTransformsCheckpointService { * * @param transformId The data frame task * @param lastCheckpointNumber the last checkpoint - * @param nextCheckpointIndexerState indexer state for the next checkpoint * @param nextCheckpointPosition position for the next checkpoint * @param nextCheckpointProgress progress for the next checkpoint * @param listener listener to retrieve the result */ public void getCheckpointingInfo(final String transformId, final long lastCheckpointNumber, - final IndexerState nextCheckpointIndexerState, final DataFrameIndexerPosition nextCheckpointPosition, final DataFrameTransformProgress nextCheckpointProgress, final ActionListener listener) { @@ -67,7 +64,7 @@ public class DataFrameTransformsCheckpointService { // we need to retrieve the config first before we can defer the rest to the corresponding provider dataFrameTransformsConfigManager.getTransformConfiguration(transformId, ActionListener.wrap( transformConfig -> { - getCheckpointProvider(transformConfig).getCheckpointingInfo(lastCheckpointNumber, nextCheckpointIndexerState, + getCheckpointProvider(transformConfig).getCheckpointingInfo(lastCheckpointNumber, nextCheckpointPosition, nextCheckpointProgress, listener); }, transformError -> { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DefaultCheckpointProvider.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DefaultCheckpointProvider.java index a6e1e2dae74..15ca530ef06 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DefaultCheckpointProvider.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DefaultCheckpointProvider.java @@ -25,7 +25,6 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheck import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress; -import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import java.util.Arrays; @@ -40,7 +39,6 @@ public class DefaultCheckpointProvider implements CheckpointProvider { * Builder for collecting checkpointing information for the purpose of _stats */ private static class DataFrameTransformCheckpointingInfoBuilder { - private IndexerState nextCheckpointIndexerState; private DataFrameIndexerPosition nextCheckpointPosition; private DataFrameTransformProgress nextCheckpointProgress; private DataFrameTransformCheckpoint lastCheckpoint; @@ -66,9 +64,9 @@ public class DefaultCheckpointProvider implements CheckpointProvider { long nextCheckpointNumber = nextCheckpoint.getCheckpoint() > 0 ? nextCheckpoint.getCheckpoint() : 0; return new DataFrameTransformCheckpointingInfo( - new DataFrameTransformCheckpointStats(lastCheckpointNumber, null, null, null, + new DataFrameTransformCheckpointStats(lastCheckpointNumber, null, null, lastCheckpoint.getTimestamp(), lastCheckpoint.getTimeUpperBound()), - new DataFrameTransformCheckpointStats(nextCheckpointNumber, nextCheckpointIndexerState, nextCheckpointPosition, + new DataFrameTransformCheckpointStats(nextCheckpointNumber, nextCheckpointPosition, nextCheckpointProgress, nextCheckpoint.getTimestamp(), nextCheckpoint.getTimeUpperBound()), DataFrameTransformCheckpoint.getBehind(lastCheckpoint, sourceCheckpoint)); } @@ -97,12 +95,6 @@ public class DefaultCheckpointProvider implements CheckpointProvider { this.nextCheckpointPosition = nextCheckpointPosition; return this; } - - public DataFrameTransformCheckpointingInfoBuilder setNextCheckpointIndexerState(IndexerState nextCheckpointIndexerState) { - this.nextCheckpointIndexerState = nextCheckpointIndexerState; - return this; - } - } private static final Logger logger = LogManager.getLogger(DefaultCheckpointProvider.class); @@ -226,17 +218,15 @@ public class DefaultCheckpointProvider implements CheckpointProvider { @Override public void getCheckpointingInfo(DataFrameTransformCheckpoint lastCheckpoint, - DataFrameTransformCheckpoint nextCheckpoint, - IndexerState nextCheckpointIndexerState, - DataFrameIndexerPosition nextCheckpointPosition, - DataFrameTransformProgress nextCheckpointProgress, - ActionListener listener) { + DataFrameTransformCheckpoint nextCheckpoint, + DataFrameIndexerPosition nextCheckpointPosition, + DataFrameTransformProgress nextCheckpointProgress, + ActionListener listener) { DataFrameTransformCheckpointingInfoBuilder checkpointingInfoBuilder = new DataFrameTransformCheckpointingInfoBuilder(); checkpointingInfoBuilder.setLastCheckpoint(lastCheckpoint) .setNextCheckpoint(nextCheckpoint) - .setNextCheckpointIndexerState(nextCheckpointIndexerState) .setNextCheckpointPosition(nextCheckpointPosition) .setNextCheckpointProgress(nextCheckpointProgress); @@ -250,15 +240,13 @@ public class DefaultCheckpointProvider implements CheckpointProvider { } @Override - public void getCheckpointingInfo(long lastCheckpointNumber, IndexerState nextCheckpointIndexerState, - DataFrameIndexerPosition nextCheckpointPosition, DataFrameTransformProgress nextCheckpointProgress, - ActionListener listener) { + public void getCheckpointingInfo(long lastCheckpointNumber, DataFrameIndexerPosition nextCheckpointPosition, + DataFrameTransformProgress nextCheckpointProgress, + ActionListener listener) { DataFrameTransformCheckpointingInfoBuilder checkpointingInfoBuilder = new DataFrameTransformCheckpointingInfoBuilder(); - checkpointingInfoBuilder.setNextCheckpointIndexerState(nextCheckpointIndexerState) - .setNextCheckpointPosition(nextCheckpointPosition) - .setNextCheckpointProgress(nextCheckpointProgress); + checkpointingInfoBuilder.setNextCheckpointPosition(nextCheckpointPosition).setNextCheckpointProgress(nextCheckpointProgress); long timestamp = System.currentTimeMillis(); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 14edf8774e0..e70687592e9 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -187,7 +187,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S transformsCheckpointService.getCheckpointingInfo( transform.getId(), currentCheckpoint.get(), - initialIndexerState, initialPosition, null, listener); @@ -196,7 +195,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S indexer.getCheckpointProvider().getCheckpointingInfo( indexer.getLastCheckpoint(), indexer.getNextCheckpoint(), - indexer.getState(), indexer.getPosition(), indexer.getProgress(), listener); diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformCheckpointServiceNodeTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformCheckpointServiceNodeTests.java index 9ed64de3e02..0bf5c53eb81 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformCheckpointServiceNodeTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformCheckpointServiceNodeTests.java @@ -47,7 +47,6 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheck import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgressTests; -import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.dataframe.DataFrameSingleNodeTestCase; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.junit.After; @@ -198,31 +197,31 @@ public class DataFrameTransformCheckpointServiceNodeTests extends DataFrameSingl mockClientForCheckpointing.setShardStats(createShardStats(createCheckPointMap(transformId, 20, 20, 20))); DataFrameTransformCheckpointingInfo checkpointInfo = new DataFrameTransformCheckpointingInfo( - new DataFrameTransformCheckpointStats(1, null, null, null, timestamp, 0L), - new DataFrameTransformCheckpointStats(2, IndexerState.STARTED, position, progress, timestamp + 100L, 0L), + new DataFrameTransformCheckpointStats(1, null, null, timestamp, 0L), + new DataFrameTransformCheckpointStats(2, position, progress, timestamp + 100L, 0L), 30L); assertAsync(listener -> - transformsCheckpointService.getCheckpointingInfo(transformId, 1, IndexerState.STARTED, position, progress, listener), + transformsCheckpointService.getCheckpointingInfo(transformId, 1, position, progress, listener), checkpointInfo, null, null); mockClientForCheckpointing.setShardStats(createShardStats(createCheckPointMap(transformId, 10, 50, 33))); checkpointInfo = new DataFrameTransformCheckpointingInfo( - new DataFrameTransformCheckpointStats(1, null, null, null, timestamp, 0L), - new DataFrameTransformCheckpointStats(2, IndexerState.INDEXING, position, progress, timestamp + 100L, 0L), + new DataFrameTransformCheckpointStats(1, null, null, timestamp, 0L), + new DataFrameTransformCheckpointStats(2, position, progress, timestamp + 100L, 0L), 63L); assertAsync(listener -> - transformsCheckpointService.getCheckpointingInfo(transformId, 1, IndexerState.INDEXING, position, progress, listener), + transformsCheckpointService.getCheckpointingInfo(transformId, 1, position, progress, listener), checkpointInfo, null, null); // same as current mockClientForCheckpointing.setShardStats(createShardStats(createCheckPointMap(transformId, 10, 10, 10))); checkpointInfo = new DataFrameTransformCheckpointingInfo( - new DataFrameTransformCheckpointStats(1, null, null, null, timestamp, 0L), - new DataFrameTransformCheckpointStats(2, IndexerState.STOPPING, position, progress, timestamp + 100L, 0L), + new DataFrameTransformCheckpointStats(1, null, null, timestamp, 0L), + new DataFrameTransformCheckpointStats(2, position, progress, timestamp + 100L, 0L), 0L); assertAsync(listener -> - transformsCheckpointService.getCheckpointingInfo(transformId, 1, IndexerState.STOPPING, position, progress, listener), + transformsCheckpointService.getCheckpointingInfo(transformId, 1, position, progress, listener), checkpointInfo, null, null); } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml index 881cddc7102..93b69bbc3e2 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml @@ -619,7 +619,7 @@ setup: transform_id: "airline-transform-start-delete" - match: { count: 1 } - match: { transforms.0.id: "airline-transform-start-delete" } - - match: { transforms.0.task_state: "started" } + - match: { transforms.0.state: "/started|indexing/" } - do: catch: /Cannot delete data frame \[airline-transform-start-delete\] as the task is running/ diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml index 936294d8fa1..044f5212a99 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml @@ -100,7 +100,7 @@ teardown: transform_id: "airline-transform-start-stop" - match: { count: 1 } - match: { transforms.0.id: "airline-transform-start-stop" } - - match: { transforms.0.task_state: "started" } + - match: { transforms.0.state: "/started|indexing/" } - do: data_frame.stop_data_frame_transform: @@ -113,7 +113,7 @@ teardown: transform_id: "airline-transform-start-stop" - match: { count: 1 } - match: { transforms.0.id: "airline-transform-start-stop" } - - match: { transforms.0.task_state: "stopped" } + - match: { transforms.0.state: "stopped" } - do: data_frame.start_data_frame_transform: @@ -125,7 +125,7 @@ teardown: transform_id: "airline-transform-start-stop" - match: { count: 1 } - match: { transforms.0.id: "airline-transform-start-stop" } - - match: { transforms.0.task_state: "started" } + - match: { transforms.0.state: "/started|indexing/" } --- "Test start/stop/start continuous transform": - do: @@ -157,7 +157,7 @@ teardown: transform_id: "airline-transform-start-stop-continuous" - match: { count: 1 } - match: { transforms.0.id: "airline-transform-start-stop-continuous" } - - match: { transforms.0.task_state: "started" } + - match: { transforms.0.state: "/started|indexing/" } - do: data_frame.stop_data_frame_transform: @@ -170,7 +170,7 @@ teardown: transform_id: "airline-transform-start-stop-continuous" - match: { count: 1 } - match: { transforms.0.id: "airline-transform-start-stop-continuous" } - - match: { transforms.0.task_state: "stopped" } + - match: { transforms.0.state: "stopped" } - do: data_frame.start_data_frame_transform: @@ -182,7 +182,7 @@ teardown: transform_id: "airline-transform-start-stop-continuous" - match: { count: 1 } - match: { transforms.0.id: "airline-transform-start-stop-continuous" } - - match: { transforms.0.task_state: "started" } + - match: { transforms.0.state: "/started|indexing/" } - do: data_frame.stop_data_frame_transform: @@ -244,14 +244,14 @@ teardown: transform_id: "airline-transform-start-stop" - match: { count: 1 } - match: { transforms.0.id: "airline-transform-start-stop" } - - match: { transforms.0.task_state: "started" } + - match: { transforms.0.state: "/started|indexing/" } - do: data_frame.get_data_frame_transform_stats: transform_id: "airline-transform-start-later" - match: { count: 1 } - match: { transforms.0.id: "airline-transform-start-later" } - - match: { transforms.0.task_state: "stopped" } + - match: { transforms.0.state: "stopped" } - do: data_frame.start_data_frame_transform: @@ -270,7 +270,7 @@ teardown: transform_id: "airline-transform-start-later" - match: { count: 1 } - match: { transforms.0.id: "airline-transform-start-later" } - - match: { transforms.0.task_state: "started" } + - match: { transforms.0.state: "/started|indexing/" } - do: data_frame.stop_data_frame_transform: @@ -316,8 +316,8 @@ teardown: data_frame.get_data_frame_transform_stats: transform_id: "*" - match: { count: 2 } - - match: { transforms.0.task_state: "stopped" } - - match: { transforms.1.task_state: "stopped" } + - match: { transforms.0.state: "stopped" } + - match: { transforms.1.state: "stopped" } - do: data_frame.delete_data_frame_transform: diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml index 8609159f02a..08ab6ce969c 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml @@ -47,7 +47,7 @@ teardown: transform_id: "airline-transform-stats" - match: { count: 1 } - match: { transforms.0.id: "airline-transform-stats" } - - match: { transforms.0.task_state: "/started|stopped/" } + - match: { transforms.0.state: "/started|indexing|stopping|stopped/" } - lte: { transforms.0.checkpointing.last.checkpoint: 1 } - lte: { transforms.0.stats.pages_processed: 1 } - match: { transforms.0.stats.documents_processed: 0 } @@ -227,7 +227,7 @@ teardown: - match: { count: 1 } - match: { transforms.0.id: "airline-transform-stats-continuous" } # Since this is continuous, there is no worry of it automatically stopping - - match: { transforms.0.task_state: "started" } + - match: { transforms.0.state: "/started|indexing/" } - lte: { transforms.0.checkpointing.last.checkpoint: 1 } # Since this is continuous, and _start does not return until it is assigned # we should see a node assignment diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java index d4ebbb83a26..a4a5025a139 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java @@ -14,7 +14,6 @@ import org.elasticsearch.client.Response; import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsResponse; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStats; -import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState; import org.elasticsearch.client.dataframe.transforms.DestConfig; import org.elasticsearch.client.dataframe.transforms.SourceConfig; import org.elasticsearch.client.dataframe.transforms.TimeSyncConfig; @@ -47,6 +46,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.oneOf; @LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/43662") public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase { @@ -139,7 +139,7 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase { assertThat(stateAndStats.getIndexerStats().getOutputDocuments(), equalTo((long)ENTITIES.size())); assertThat(stateAndStats.getIndexerStats().getNumDocuments(), equalTo(totalDocsWritten)); - assertThat(stateAndStats.getTaskState(), equalTo(DataFrameTransformTaskState.STARTED)); + assertThat(stateAndStats.getState(), oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING)); } private void verifyContinuousDataFrameHandlesData(long expectedLastCheckpoint) throws Exception { @@ -148,7 +148,7 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase { // if it was assigned to the node that was removed from the cluster assertBusy(() -> { DataFrameTransformStats stateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID); - assertThat(stateAndStats.getTaskState(), equalTo(DataFrameTransformTaskState.STARTED)); + assertThat(stateAndStats.getState(), oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING)); }, 120, TimeUnit.SECONDS); @@ -174,8 +174,8 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase { TimeUnit.SECONDS); DataFrameTransformStats stateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID); - assertThat(stateAndStats.getTaskState(), - equalTo(DataFrameTransformTaskState.STARTED)); + assertThat(stateAndStats.getState(), + oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING)); assertThat(stateAndStats.getIndexerStats().getOutputDocuments(), greaterThan(previousStateAndStats.getIndexerStats().getOutputDocuments())); assertThat(stateAndStats.getIndexerStats().getNumDocuments(), diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml index 4d10ba94850..86a1e6a8daa 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml @@ -29,10 +29,10 @@ transform_id: "mixed-simple-transform" - match: { count: 1 } - match: { transforms.0.id: "mixed-simple-transform" } - # Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we - # cannot assert on task_state in the mixed cluster as it could be at the top level or under state + # Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we cannot + # assert on state in the mixed cluster as it could be state at the top level or state.task_state # TODO: uncomment this assertion in master - #- match: { transforms.0.task_state: "/started|stopped/" } + #- match: { transforms.0.state: "/started|indexing|stopping|stopped/" } - do: data_frame.stop_data_frame_transform: @@ -45,10 +45,10 @@ transform_id: "mixed-simple-transform" - match: { count: 1 } - match: { transforms.0.id: "mixed-simple-transform" } - # Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we - # cannot assert on task_state in the mixed cluster as it could be at the top level or under state + # Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we cannot + # assert on state in the mixed cluster as it could be state at the top level or state.task_state # TODO: uncomment this assertion in master - #- match: { transforms.0.task_state: "stopped" } + #- match: { transforms.0.state: "stopped" } - do: data_frame.put_data_frame_transform: @@ -92,10 +92,10 @@ transform_id: "mixed-complex-transform" - match: { count: 1 } - match: { transforms.0.id: "mixed-complex-transform" } - # Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we - # cannot assert on task_state in the mixed cluster as it could be at the top level or under state + # Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we cannot + # assert on state in the mixed cluster as it could be state at the top level or state.task_state # TODO: uncomment this assertion in master - #- match: { transforms.0.task_state: "/started|stopped/" } + #- match: { transforms.0.state: "/started|indexing|stopping|stopped/" } - do: data_frame.stop_data_frame_transform: @@ -108,10 +108,10 @@ transform_id: "mixed-complex-transform" - match: { count: 1 } - match: { transforms.0.id: "mixed-complex-transform" } - # Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we - # cannot assert on task_state in the mixed cluster as it could be at the top level or under state + # Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we cannot + # assert on state in the mixed cluster as it could be state at the top level or state.task_state # TODO: uncomment this assertion in master - #- match: { transforms.0.task_state: "stopped" } + #- match: { transforms.0.state: "stopped" } --- "Test GET, start, and stop old cluster batch transforms": @@ -143,10 +143,10 @@ transform_id: "old-simple-transform" - match: { count: 1 } - match: { transforms.0.id: "old-simple-transform" } - # Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we - # cannot assert on task_state in the mixed cluster as it could be at the top level or under state + # Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we cannot + # assert on state in the mixed cluster as it could be state at the top level or state.task_state # TODO: uncomment this assertion in master - #- match: { transforms.0.task_state: "/started|stopped/" } + #- match: { transforms.0.state: "/started|indexing|stopping|stopped/" } - do: data_frame.stop_data_frame_transform: @@ -158,10 +158,10 @@ transform_id: "old-simple-transform" - match: { count: 1 } - match: { transforms.0.id: "old-simple-transform" } - # Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we - # cannot assert on task_state in the mixed cluster as it could be at the top level or under state + # Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we cannot + # assert on state in the mixed cluster as it could be state at the top level or state.task_state # TODO: uncomment this assertion in master - #- match: { transforms.0.task_state: "stopped" } + #- match: { transforms.0.state: "stopped" } - do: data_frame.get_data_frame_transform: @@ -184,10 +184,10 @@ transform_id: "old-complex-transform" - match: { count: 1 } - match: { transforms.0.id: "old-complex-transform" } - # Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we - # cannot assert on task_state in the mixed cluster as it could be at the top level or under state + # Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we cannot + # assert on state in the mixed cluster as it could be state at the top level or state.task_state # TODO: uncomment this assertion in master - #- match: { transforms.0.task_state: "/started|stopped/" } + #- match: { transforms.0.state: "/started|indexing|stopping|stopped/" } - do: data_frame.stop_data_frame_transform: @@ -199,7 +199,7 @@ transform_id: "old-complex-transform" - match: { count: 1 } - match: { transforms.0.id: "old-complex-transform" } - # Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we - # cannot assert on task_state in the mixed cluster as it could be at the top level or under state + # Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we cannot + # assert on state in the mixed cluster as it could be state at the top level or state.task_state # TODO: uncomment this assertion in master - #- match: { transforms.0.task_state: "stopped" } + #- match: { transforms.0.state: "stopped" } diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml index d16bfe7c436..36df712fc35 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml @@ -27,7 +27,7 @@ setup: transform_id: "old-simple-transform" - match: { count: 1 } - match: { transforms.0.id: "old-simple-transform" } - - match: { transforms.0.task_state: "/started|stopped/" } + - match: { transforms.0.state: "/started|indexing|stopping|stopped/" } - do: data_frame.stop_data_frame_transform: @@ -39,7 +39,7 @@ setup: transform_id: "old-simple-transform" - match: { count: 1 } - match: { transforms.0.id: "old-simple-transform" } - - match: { transforms.0.task_state: "stopped" } + - match: { transforms.0.state: "stopped" } - do: data_frame.get_data_frame_transform: transform_id: "old-complex-transform" @@ -61,7 +61,7 @@ setup: transform_id: "old-complex-transform" - match: { count: 1 } - match: { transforms.0.id: "old-complex-transform" } - - match: { transforms.0.task_state: "/started|stopped/" } + - match: { transforms.0.state: "/started|indexing|stopping|stopped/" } - do: data_frame.stop_data_frame_transform: @@ -73,7 +73,7 @@ setup: transform_id: "old-complex-transform" - match: { count: 1 } - match: { transforms.0.id: "old-complex-transform" } - - match: { transforms.0.task_state: "stopped" } + - match: { transforms.0.state: "stopped" } # Simple and complex Mixed cluster transforms - do: @@ -95,7 +95,7 @@ setup: transform_id: "mixed-simple-transform" - match: { count: 1 } - match: { transforms.0.id: "mixed-simple-transform" } - - match: { transforms.0.task_state: "/started|stopped/" } + - match: { transforms.0.state: "/started|indexing|stopping|stopped/" } - do: data_frame.stop_data_frame_transform: @@ -107,7 +107,7 @@ setup: transform_id: "mixed-simple-transform" - match: { count: 1 } - match: { transforms.0.id: "mixed-simple-transform" } - - match: { transforms.0.task_state: "stopped" } + - match: { transforms.0.state: "stopped" } - do: data_frame.get_data_frame_transform: @@ -130,7 +130,7 @@ setup: transform_id: "mixed-complex-transform" - match: { count: 1 } - match: { transforms.0.id: "mixed-complex-transform" } - - match: { transforms.0.task_state: "/started|stopped/" } + - match: { transforms.0.state: "/started|indexing|stopping|stopped/" } - do: data_frame.stop_data_frame_transform: @@ -142,7 +142,7 @@ setup: transform_id: "mixed-complex-transform" - match: { count: 1 } - match: { transforms.0.id: "mixed-complex-transform" } - - match: { transforms.0.task_state: "stopped" } + - match: { transforms.0.state: "stopped" } # Delete all old and mixed transforms - do: