[ML-DataFrame] Combine task_state and indexer_state in _stats (#45324)

This commit replaces task_state and indexer_state in the
data frame _stats output with a single top level state
that combines the two. It is defined as:

- failed if what's currently reported as task_state is failed
- stopped if there is no persistent task
- Otherwise what's currently reported as indexer_state

Backport of #45276
This commit is contained in:
David Roberts 2019-08-08 16:24:26 +01:00 committed by GitHub
parent e53bb050db
commit 14545f8958
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 275 additions and 308 deletions

View File

@ -19,10 +19,8 @@
package org.elasticsearch.client.dataframe.transforms;
import org.elasticsearch.client.core.IndexerState;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
@ -33,16 +31,14 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
public class DataFrameTransformCheckpointStats {
public static final ParseField CHECKPOINT = new ParseField("checkpoint");
public static final ParseField INDEXER_STATE = new ParseField("indexer_state");
public static final ParseField POSITION = new ParseField("position");
public static final ParseField CHECKPOINT_PROGRESS = new ParseField("checkpoint_progress");
public static final ParseField TIMESTAMP_MILLIS = new ParseField("timestamp_millis");
public static final ParseField TIME_UPPER_BOUND_MILLIS = new ParseField("time_upper_bound_millis");
public static final DataFrameTransformCheckpointStats EMPTY = new DataFrameTransformCheckpointStats(0L, null, null, null, 0L, 0L);
public static final DataFrameTransformCheckpointStats EMPTY = new DataFrameTransformCheckpointStats(0L, null, null, 0L, 0L);
private final long checkpoint;
private final IndexerState indexerState;
private final DataFrameIndexerPosition position;
private final DataFrameTransformProgress checkpointProgress;
private final long timestampMillis;
@ -51,19 +47,16 @@ public class DataFrameTransformCheckpointStats {
public static final ConstructingObjectParser<DataFrameTransformCheckpointStats, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
"data_frame_transform_checkpoint_stats", true, args -> {
long checkpoint = args[0] == null ? 0L : (Long) args[0];
IndexerState indexerState = (IndexerState) args[1];
DataFrameIndexerPosition position = (DataFrameIndexerPosition) args[2];
DataFrameTransformProgress checkpointProgress = (DataFrameTransformProgress) args[3];
long timestamp = args[4] == null ? 0L : (Long) args[4];
long timeUpperBound = args[5] == null ? 0L : (Long) args[5];
DataFrameIndexerPosition position = (DataFrameIndexerPosition) args[1];
DataFrameTransformProgress checkpointProgress = (DataFrameTransformProgress) args[2];
long timestamp = args[3] == null ? 0L : (Long) args[3];
long timeUpperBound = args[4] == null ? 0L : (Long) args[4];
return new DataFrameTransformCheckpointStats(checkpoint, indexerState, position, checkpointProgress, timestamp, timeUpperBound);
return new DataFrameTransformCheckpointStats(checkpoint, position, checkpointProgress, timestamp, timeUpperBound);
});
static {
LENIENT_PARSER.declareLong(optionalConstructorArg(), CHECKPOINT);
LENIENT_PARSER.declareField(optionalConstructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE,
ObjectParser.ValueType.STRING);
LENIENT_PARSER.declareObject(optionalConstructorArg(), DataFrameIndexerPosition.PARSER, POSITION);
LENIENT_PARSER.declareObject(optionalConstructorArg(), DataFrameTransformProgress.PARSER, CHECKPOINT_PROGRESS);
LENIENT_PARSER.declareLong(optionalConstructorArg(), TIMESTAMP_MILLIS);
@ -74,11 +67,10 @@ public class DataFrameTransformCheckpointStats {
return LENIENT_PARSER.parse(parser, null);
}
public DataFrameTransformCheckpointStats(final long checkpoint, final IndexerState indexerState,
final DataFrameIndexerPosition position, final DataFrameTransformProgress checkpointProgress,
final long timestampMillis, final long timeUpperBoundMillis) {
public DataFrameTransformCheckpointStats(final long checkpoint, final DataFrameIndexerPosition position,
final DataFrameTransformProgress checkpointProgress, final long timestampMillis,
final long timeUpperBoundMillis) {
this.checkpoint = checkpoint;
this.indexerState = indexerState;
this.position = position;
this.checkpointProgress = checkpointProgress;
this.timestampMillis = timestampMillis;
@ -89,10 +81,6 @@ public class DataFrameTransformCheckpointStats {
return checkpoint;
}
public IndexerState getIndexerState() {
return indexerState;
}
public DataFrameIndexerPosition getPosition() {
return position;
}
@ -111,7 +99,7 @@ public class DataFrameTransformCheckpointStats {
@Override
public int hashCode() {
return Objects.hash(checkpoint, indexerState, position, checkpointProgress, timestampMillis, timeUpperBoundMillis);
return Objects.hash(checkpoint, position, checkpointProgress, timestampMillis, timeUpperBoundMillis);
}
@Override
@ -127,7 +115,6 @@ public class DataFrameTransformCheckpointStats {
DataFrameTransformCheckpointStats that = (DataFrameTransformCheckpointStats) other;
return this.checkpoint == that.checkpoint
&& Objects.equals(this.indexerState, that.indexerState)
&& Objects.equals(this.position, that.position)
&& Objects.equals(this.checkpointProgress, that.checkpointProgress)
&& this.timestampMillis == that.timestampMillis

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Locale;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
@ -33,7 +34,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
public class DataFrameTransformStats {
public static final ParseField ID = new ParseField("id");
public static final ParseField TASK_STATE_FIELD = new ParseField("task_state");
public static final ParseField STATE_FIELD = new ParseField("state");
public static final ParseField REASON_FIELD = new ParseField("reason");
public static final ParseField NODE_FIELD = new ParseField("node");
public static final ParseField STATS_FIELD = new ParseField("stats");
@ -41,12 +42,12 @@ public class DataFrameTransformStats {
public static final ConstructingObjectParser<DataFrameTransformStats, Void> PARSER = new ConstructingObjectParser<>(
"data_frame_transform_state_and_stats_info", true,
a -> new DataFrameTransformStats((String) a[0], (DataFrameTransformTaskState) a[1], (String) a[2],
a -> new DataFrameTransformStats((String) a[0], (State) a[1], (String) a[2],
(NodeAttributes) a[3], (DataFrameIndexerTransformStats) a[4], (DataFrameTransformCheckpointingInfo) a[5]));
static {
PARSER.declareString(constructorArg(), ID);
PARSER.declareField(optionalConstructorArg(), p -> DataFrameTransformTaskState.fromString(p.text()), TASK_STATE_FIELD,
PARSER.declareField(optionalConstructorArg(), p -> State.fromString(p.text()), STATE_FIELD,
ObjectParser.ValueType.STRING);
PARSER.declareString(optionalConstructorArg(), REASON_FIELD);
PARSER.declareField(optionalConstructorArg(), NodeAttributes.PARSER::apply, NODE_FIELD, ObjectParser.ValueType.OBJECT);
@ -61,16 +62,15 @@ public class DataFrameTransformStats {
private final String id;
private final String reason;
private final DataFrameTransformTaskState taskState;
private final State state;
private final NodeAttributes node;
private final DataFrameIndexerTransformStats indexerStats;
private final DataFrameTransformCheckpointingInfo checkpointingInfo;
public DataFrameTransformStats(String id, DataFrameTransformTaskState taskState, String reason, NodeAttributes node,
DataFrameIndexerTransformStats stats,
public DataFrameTransformStats(String id, State state, String reason, NodeAttributes node, DataFrameIndexerTransformStats stats,
DataFrameTransformCheckpointingInfo checkpointingInfo) {
this.id = id;
this.taskState = taskState;
this.state = state;
this.reason = reason;
this.node = node;
this.indexerStats = stats;
@ -81,8 +81,8 @@ public class DataFrameTransformStats {
return id;
}
public DataFrameTransformTaskState getTaskState() {
return taskState;
public State getState() {
return state;
}
public String getReason() {
@ -103,7 +103,7 @@ public class DataFrameTransformStats {
@Override
public int hashCode() {
return Objects.hash(id, taskState, reason, node, indexerStats, checkpointingInfo);
return Objects.hash(id, state, reason, node, indexerStats, checkpointingInfo);
}
@Override
@ -119,10 +119,23 @@ public class DataFrameTransformStats {
DataFrameTransformStats that = (DataFrameTransformStats) other;
return Objects.equals(this.id, that.id)
&& Objects.equals(this.taskState, that.taskState)
&& Objects.equals(this.state, that.state)
&& Objects.equals(this.reason, that.reason)
&& Objects.equals(this.node, that.node)
&& Objects.equals(this.indexerStats, that.indexerStats)
&& Objects.equals(this.checkpointingInfo, that.checkpointingInfo);
}
public enum State {
STARTED, INDEXING, ABORTING, STOPPING, STOPPED, FAILED;
public static State fromString(String name) {
return valueOf(name.trim().toUpperCase(Locale.ROOT));
}
public String value() {
return name().toLowerCase(Locale.ROOT);
}
}
}

View File

@ -1,34 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe.transforms;
import java.util.Locale;
public enum DataFrameTransformTaskState {
STOPPED, STARTED, FAILED;
public static DataFrameTransformTaskState fromString(String name) {
return valueOf(name.trim().toUpperCase(Locale.ROOT));
}
public String value() {
return name().toLowerCase(Locale.ROOT);
}
}

View File

@ -44,7 +44,6 @@ import org.elasticsearch.client.dataframe.transforms.DataFrameIndexerTransformSt
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigUpdate;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStats;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.client.dataframe.transforms.DestConfig;
import org.elasticsearch.client.dataframe.transforms.SourceConfig;
import org.elasticsearch.client.dataframe.transforms.TimeSyncConfig;
@ -306,10 +305,11 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
GetDataFrameTransformStatsResponse statsResponse = execute(new GetDataFrameTransformStatsRequest(id),
client::getDataFrameTransformStats, client::getDataFrameTransformStatsAsync);
assertThat(statsResponse.getTransformsStats(), hasSize(1));
DataFrameTransformTaskState taskState = statsResponse.getTransformsStats().get(0).getTaskState();
DataFrameTransformStats.State taskState = statsResponse.getTransformsStats().get(0).getState();
// Since we are non-continuous, the transform could auto-stop between being started earlier and us gathering the statistics
assertThat(taskState, is(oneOf(DataFrameTransformTaskState.STARTED, DataFrameTransformTaskState.STOPPED)));
assertThat(taskState, oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING,
DataFrameTransformStats.State.STOPPING, DataFrameTransformStats.State.STOPPED));
StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, Boolean.TRUE, null);
StopDataFrameTransformResponse stopResponse =
@ -321,8 +321,8 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
// Calling stop with wait_for_completion assures that we will be in the `STOPPED` state for the transform task
statsResponse = execute(new GetDataFrameTransformStatsRequest(id),
client::getDataFrameTransformStats, client::getDataFrameTransformStatsAsync);
taskState = statsResponse.getTransformsStats().get(0).getTaskState();
assertThat(taskState, is(DataFrameTransformTaskState.STOPPED));
taskState = statsResponse.getTransformsStats().get(0).getState();
assertThat(taskState, is(DataFrameTransformStats.State.STOPPED));
}
@SuppressWarnings("unchecked")
@ -405,7 +405,7 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
assertEquals(1, statsResponse.getTransformsStats().size());
DataFrameTransformStats stats = statsResponse.getTransformsStats().get(0);
assertEquals(DataFrameTransformTaskState.STOPPED, stats.getTaskState());
assertEquals(DataFrameTransformStats.State.STOPPED, stats.getState());
DataFrameIndexerTransformStats zeroIndexerStats = new DataFrameIndexerTransformStats(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
assertEquals(zeroIndexerStats, stats.getIndexerStats());
@ -420,8 +420,8 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
client::getDataFrameTransformStats, client::getDataFrameTransformStatsAsync);
DataFrameTransformStats stateAndStats = response.getTransformsStats().get(0);
assertNotEquals(zeroIndexerStats, stateAndStats.getIndexerStats());
assertThat(stateAndStats.getTaskState(),
is(oneOf(DataFrameTransformTaskState.STARTED, DataFrameTransformTaskState.STOPPED)));
assertThat(stateAndStats.getState(), oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING,
DataFrameTransformStats.State.STOPPING, DataFrameTransformStats.State.STOPPED));
assertThat(stateAndStats.getReason(), is(nullValue()));
});
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.client.dataframe.transforms;
import org.elasticsearch.client.core.IndexerState;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.test.ESTestCase;
@ -41,7 +40,6 @@ public class DataFrameTransformCheckpointStatsTests extends ESTestCase {
public static DataFrameTransformCheckpointStats randomDataFrameTransformCheckpointStats() {
return new DataFrameTransformCheckpointStats(randomLongBetween(1, 1_000_000),
randomBoolean() ? null : randomFrom(IndexerState.values()),
randomBoolean() ? null : DataFrameIndexerPositionTests.randomDataFrameIndexerPosition(),
randomBoolean() ? null : DataFrameTransformProgressTests.randomInstance(),
randomLongBetween(1, 1_000_000), randomLongBetween(0, 1_000_000));
@ -50,9 +48,6 @@ public class DataFrameTransformCheckpointStatsTests extends ESTestCase {
public static void toXContent(DataFrameTransformCheckpointStats stats, XContentBuilder builder) throws IOException {
builder.startObject();
builder.field(DataFrameTransformCheckpointStats.CHECKPOINT.getPreferredName(), stats.getCheckpoint());
if (stats.getIndexerState() != null) {
builder.field(DataFrameTransformCheckpointStats.INDEXER_STATE.getPreferredName(), stats.getIndexerState().value());
}
if (stats.getPosition() != null) {
builder.field(DataFrameTransformCheckpointStats.POSITION.getPreferredName());
DataFrameIndexerPositionTests.toXContent(stats.getPosition(), builder);

View File

@ -41,7 +41,7 @@ public class DataFrameTransformStatsTests extends ESTestCase {
public static DataFrameTransformStats randomInstance() {
return new DataFrameTransformStats(randomAlphaOfLength(10),
randomBoolean() ? null : randomFrom(DataFrameTransformTaskState.values()),
randomBoolean() ? null : randomFrom(DataFrameTransformStats.State.values()),
randomBoolean() ? null : randomAlphaOfLength(100),
randomBoolean() ? null : NodeAttributesTests.createRandom(),
DataFrameIndexerTransformStatsTests.randomStats(),
@ -51,9 +51,9 @@ public class DataFrameTransformStatsTests extends ESTestCase {
public static void toXContent(DataFrameTransformStats stats, XContentBuilder builder) throws IOException {
builder.startObject();
builder.field(DataFrameTransformStats.ID.getPreferredName(), stats.getId());
if (stats.getTaskState() != null) {
builder.field(DataFrameTransformStats.TASK_STATE_FIELD.getPreferredName(),
stats.getTaskState().value());
if (stats.getState() != null) {
builder.field(DataFrameTransformStats.STATE_FIELD.getPreferredName(),
stats.getState().value());
}
if (stats.getReason() != null) {
builder.field(DataFrameTransformStats.REASON_FIELD.getPreferredName(), stats.getReason());

View File

@ -22,7 +22,6 @@ package org.elasticsearch.client.dataframe.transforms.hlrc;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.client.AbstractHlrcXContentTestCase;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStats;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import java.io.IOException;
import java.util.function.Predicate;
@ -34,7 +33,6 @@ public class DataFrameTransformCheckpointStatsTests extends AbstractHlrcXContent
public static DataFrameTransformCheckpointStats fromHlrc(
org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats instance) {
return new DataFrameTransformCheckpointStats(instance.getCheckpoint(),
(instance.getIndexerState() != null) ? IndexerState.fromString(instance.getIndexerState().value()) : null,
DataFrameIndexerPositionTests.fromHlrc(instance.getPosition()),
DataFrameTransformProgressTests.fromHlrc(instance.getCheckpointProgress()),
instance.getTimestampMillis(),
@ -55,7 +53,6 @@ public class DataFrameTransformCheckpointStatsTests extends AbstractHlrcXContent
public static DataFrameTransformCheckpointStats randomDataFrameTransformCheckpointStats() {
return new DataFrameTransformCheckpointStats(randomLongBetween(1, 1_000_000),
randomBoolean() ? null : randomFrom(IndexerState.values()),
DataFrameIndexerPositionTests.randomDataFrameIndexerPosition(),
randomBoolean() ? null : DataFrameTransformProgressTests.randomDataFrameTransformProgress(),
randomLongBetween(1, 1_000_000), randomLongBetween(0, 1_000_000));

View File

@ -26,9 +26,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheck
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.core.dataframe.transforms.NodeAttributes;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import java.io.IOException;
import java.util.HashMap;
@ -50,7 +48,7 @@ public class DataFrameTransformStatsTests extends AbstractHlrcXContentTestCase<D
fromHlrc(org.elasticsearch.client.dataframe.transforms.DataFrameTransformStats instance) {
return new DataFrameTransformStats(instance.getId(),
DataFrameTransformTaskState.fromString(instance.getTaskState().value()),
DataFrameTransformStats.State.fromString(instance.getState().value()),
instance.getReason(),
fromHlrc(instance.getNode()),
DataFrameIndexerTransformStatsTests.fromHlrc(instance.getIndexerStats()),
@ -67,7 +65,7 @@ public class DataFrameTransformStatsTests extends AbstractHlrcXContentTestCase<D
public DataFrameTransformStats convertHlrcToInternal(
org.elasticsearch.client.dataframe.transforms.DataFrameTransformStats instance) {
return new DataFrameTransformStats(instance.getId(),
DataFrameTransformTaskState.fromString(instance.getTaskState().value()),
DataFrameTransformStats.State.fromString(instance.getState().value()),
instance.getReason(),
fromHlrc(instance.getNode()),
DataFrameIndexerTransformStatsTests.fromHlrc(instance.getIndexerStats()),
@ -76,7 +74,7 @@ public class DataFrameTransformStatsTests extends AbstractHlrcXContentTestCase<D
public static DataFrameTransformStats randomDataFrameTransformStats() {
return new DataFrameTransformStats(randomAlphaOfLength(10),
randomFrom(DataFrameTransformTaskState.values()),
randomFrom(DataFrameTransformStats.State.values()),
randomBoolean() ? null : randomAlphaOfLength(100),
randomBoolean() ? null : randomNodeAttributes(),
randomStats(),
@ -111,7 +109,6 @@ public class DataFrameTransformStatsTests extends AbstractHlrcXContentTestCase<D
public static DataFrameTransformCheckpointStats randomDataFrameTransformCheckpointStats() {
return new DataFrameTransformCheckpointStats(randomLongBetween(1, 1_000_000),
randomBoolean() ? null : randomFrom(IndexerState.values()),
DataFrameIndexerPositionTests.randomDataFrameIndexerPosition(),
randomBoolean() ? null : DataFrameTransformProgressTests.randomDataFrameTransformProgress(),
randomLongBetween(1, 1_000_000), randomLongBetween(0, 1_000_000));

View File

@ -25,7 +25,6 @@ import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.AcknowledgedResponse;
import org.elasticsearch.client.core.IndexerState;
import org.elasticsearch.client.core.PageParams;
import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.GetDataFrameTransformRequest;
@ -46,7 +45,6 @@ import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigUpdate;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformProgress;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStats;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.client.dataframe.transforms.DestConfig;
import org.elasticsearch.client.dataframe.transforms.NodeAttributes;
import org.elasticsearch.client.dataframe.transforms.QueryConfig;
@ -622,24 +620,21 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
assertThat(response.getTransformsStats(), hasSize(1));
// tag::get-data-frame-transform-stats-response
DataFrameTransformStats stateAndStatsInfo =
DataFrameTransformStats stats =
response.getTransformsStats().get(0); // <1>
DataFrameTransformTaskState taskState =
stateAndStatsInfo.getTaskState(); // <2>
IndexerState indexerState =
stateAndStatsInfo.getCheckpointingInfo()
.getNext().getIndexerState(); // <3>
DataFrameIndexerTransformStats transformStats =
stateAndStatsInfo.getIndexerStats(); // <4>
DataFrameTransformStats.State state =
stats.getState(); // <2>
DataFrameIndexerTransformStats indexerStats =
stats.getIndexerStats(); // <3>
DataFrameTransformProgress progress =
stateAndStatsInfo.getCheckpointingInfo()
.getNext().getCheckpointProgress(); // <5>
stats.getCheckpointingInfo()
.getNext().getCheckpointProgress(); // <4>
NodeAttributes node =
stateAndStatsInfo.getNode(); // <6>
stats.getNode(); // <5>
// end::get-data-frame-transform-stats-response
assertEquals(DataFrameTransformTaskState.STOPPED, taskState);
assertNotNull(transformStats);
assertEquals(DataFrameTransformStats.State.STOPPED, state);
assertNotNull(indexerStats);
assertNull(progress);
}
{

View File

@ -48,9 +48,8 @@ The returned +{response}+ contains the requested {dataframe-transform} statistic
include-tagged::{doc-tests-file}[{api}-response]
--------------------------------------------------
<1> The response contains a list of `DataFrameTransformStats` objects
<2> The running state of the transform task e.g `started`
<3> The running state of the transform indexer e.g `started`, `indexing`, etc.
<4> The overall transform statistics recording the number of documents indexed etc.
<5> The progress of the current run in the transform. Supplies the number of docs left until the next checkpoint
<2> The running state of the transform, for example `started`, `indexing`, etc.
<3> The overall transform statistics recording the number of documents indexed etc.
<4> The progress of the current run in the transform. Supplies the number of docs left until the next checkpoint
and the total number of docs expected.
<6> The assigned node information if the task is currently assigned to a node and running.
<5> The assigned node information if the task is currently assigned to a node and running.

View File

@ -126,7 +126,7 @@ The API returns the following results:
"transforms" : [
{
"id" : "ecommerce_transform",
"task_state" : "started",
"state" : "indexing",
"stats" : {
"pages_processed" : 2,
"documents_processed" : 1220,
@ -147,7 +147,6 @@ The API returns the following results:
},
"next" : {
"checkpoint" : 101,
"indexer_state" : "started",
"position" : {
"indexer_position" : {
"hashtag" : "abcd1234"

View File

@ -11,12 +11,10 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import java.io.IOException;
import java.util.Objects;
@ -30,10 +28,9 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
*/
public class DataFrameTransformCheckpointStats implements Writeable, ToXContentObject {
public static final DataFrameTransformCheckpointStats EMPTY = new DataFrameTransformCheckpointStats(0L, null, null, null, 0L, 0L);
public static final DataFrameTransformCheckpointStats EMPTY = new DataFrameTransformCheckpointStats(0L, null, null, 0L, 0L);
private final long checkpoint;
private final IndexerState indexerState;
private final DataFrameIndexerPosition position;
private final DataFrameTransformProgress checkpointProgress;
private final long timestampMillis;
@ -42,30 +39,26 @@ public class DataFrameTransformCheckpointStats implements Writeable, ToXContentO
static final ConstructingObjectParser<DataFrameTransformCheckpointStats, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
"data_frame_transform_checkpoint_stats", true, args -> {
long checkpoint = args[0] == null ? 0L : (Long) args[0];
IndexerState indexerState = (IndexerState) args[1];
DataFrameIndexerPosition position = (DataFrameIndexerPosition) args[2];
DataFrameTransformProgress checkpointProgress = (DataFrameTransformProgress) args[3];
long timestamp = args[4] == null ? 0L : (Long) args[4];
long timeUpperBound = args[5] == null ? 0L : (Long) args[5];
DataFrameIndexerPosition position = (DataFrameIndexerPosition) args[1];
DataFrameTransformProgress checkpointProgress = (DataFrameTransformProgress) args[2];
long timestamp = args[3] == null ? 0L : (Long) args[3];
long timeUpperBound = args[4] == null ? 0L : (Long) args[4];
return new DataFrameTransformCheckpointStats(checkpoint, indexerState, position, checkpointProgress, timestamp, timeUpperBound);
return new DataFrameTransformCheckpointStats(checkpoint, position, checkpointProgress, timestamp, timeUpperBound);
});
static {
LENIENT_PARSER.declareLong(optionalConstructorArg(), DataFrameField.CHECKPOINT);
LENIENT_PARSER.declareField(optionalConstructorArg(), p -> IndexerState.fromString(p.text()), DataFrameField.INDEXER_STATE,
ObjectParser.ValueType.STRING);
LENIENT_PARSER.declareObject(optionalConstructorArg(), DataFrameIndexerPosition.PARSER, DataFrameField.POSITION);
LENIENT_PARSER.declareObject(optionalConstructorArg(), DataFrameTransformProgress.PARSER, DataFrameField.CHECKPOINT_PROGRESS);
LENIENT_PARSER.declareLong(optionalConstructorArg(), DataFrameField.TIMESTAMP_MILLIS);
LENIENT_PARSER.declareLong(optionalConstructorArg(), DataFrameField.TIME_UPPER_BOUND_MILLIS);
}
public DataFrameTransformCheckpointStats(final long checkpoint, final IndexerState indexerState,
final DataFrameIndexerPosition position, final DataFrameTransformProgress checkpointProgress,
final long timestampMillis, final long timeUpperBoundMillis) {
public DataFrameTransformCheckpointStats(final long checkpoint, final DataFrameIndexerPosition position,
final DataFrameTransformProgress checkpointProgress, final long timestampMillis,
final long timeUpperBoundMillis) {
this.checkpoint = checkpoint;
this.indexerState = indexerState;
this.position = position;
this.checkpointProgress = checkpointProgress;
this.timestampMillis = timestampMillis;
@ -75,11 +68,6 @@ public class DataFrameTransformCheckpointStats implements Writeable, ToXContentO
public DataFrameTransformCheckpointStats(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
this.checkpoint = in.readVLong();
if (in.readBoolean()) {
this.indexerState = in.readEnum(IndexerState.class);
} else {
this.indexerState = null;
}
if (in.readBoolean()) {
this.position = new DataFrameIndexerPosition(in);
} else {
@ -92,7 +80,6 @@ public class DataFrameTransformCheckpointStats implements Writeable, ToXContentO
}
} else {
this.checkpoint = 0;
this.indexerState = null;
this.position = null;
this.checkpointProgress = null;
}
@ -104,10 +91,6 @@ public class DataFrameTransformCheckpointStats implements Writeable, ToXContentO
return checkpoint;
}
public IndexerState getIndexerState() {
return indexerState;
}
public DataFrameIndexerPosition getPosition() {
return position;
}
@ -128,9 +111,6 @@ public class DataFrameTransformCheckpointStats implements Writeable, ToXContentO
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DataFrameField.CHECKPOINT.getPreferredName(), checkpoint);
if (indexerState != null) {
builder.field(DataFrameField.INDEXER_STATE.getPreferredName(), indexerState.value());
}
if (position != null) {
builder.field(DataFrameField.POSITION.getPreferredName(), position);
}
@ -153,12 +133,6 @@ public class DataFrameTransformCheckpointStats implements Writeable, ToXContentO
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
out.writeVLong(checkpoint);
if (indexerState != null) {
out.writeBoolean(true);
out.writeEnum(indexerState);
} else {
out.writeBoolean(false);
}
if (position != null) {
out.writeBoolean(true);
position.writeTo(out);
@ -178,7 +152,7 @@ public class DataFrameTransformCheckpointStats implements Writeable, ToXContentO
@Override
public int hashCode() {
return Objects.hash(checkpoint, indexerState, position, checkpointProgress, timestampMillis, timeUpperBoundMillis);
return Objects.hash(checkpoint, position, checkpointProgress, timestampMillis, timeUpperBoundMillis);
}
@Override
@ -194,7 +168,6 @@ public class DataFrameTransformCheckpointStats implements Writeable, ToXContentO
DataFrameTransformCheckpointStats that = (DataFrameTransformCheckpointStats) other;
return this.checkpoint == that.checkpoint
&& Objects.equals(this.indexerState, that.indexerState)
&& Objects.equals(this.position, that.position)
&& Objects.equals(this.checkpointProgress, that.checkpointProgress)
&& this.timestampMillis == that.timestampMillis

View File

@ -10,6 +10,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
@ -19,8 +20,10 @@ import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import java.io.IOException;
import java.util.Locale;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
@ -34,13 +37,13 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
public class DataFrameTransformStats implements Writeable, ToXContentObject {
public static final String NAME = "data_frame_transform_stats";
public static final ParseField TASK_STATE_FIELD = new ParseField("task_state");
public static final ParseField STATE_FIELD = new ParseField("state");
public static final ParseField REASON_FIELD = new ParseField("reason");
public static final ParseField NODE_FIELD = new ParseField("node");
public static final ParseField CHECKPOINTING_INFO_FIELD = new ParseField("checkpointing");
private final String id;
private final DataFrameTransformTaskState taskState;
private final State state;
@Nullable
private final String reason;
@Nullable
@ -52,7 +55,7 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject {
NAME,
true,
a -> new DataFrameTransformStats((String) a[0],
(DataFrameTransformTaskState) a[1],
(State) a[1],
(String) a[2],
(NodeAttributes) a[3],
(DataFrameIndexerTransformStats) a[4],
@ -60,7 +63,7 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject {
static {
PARSER.declareString(constructorArg(), DataFrameField.ID);
PARSER.declareField(constructorArg(), p -> DataFrameTransformTaskState.fromString(p.text()), TASK_STATE_FIELD,
PARSER.declareField(constructorArg(), p -> DataFrameTransformStats.State.fromString(p.text()), STATE_FIELD,
ObjectParser.ValueType.STRING);
PARSER.declareString(optionalConstructorArg(), REASON_FIELD);
PARSER.declareField(optionalConstructorArg(), NodeAttributes.PARSER::apply, NODE_FIELD, ObjectParser.ValueType.OBJECT);
@ -80,7 +83,7 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject {
public static DataFrameTransformStats stoppedStats(String id, DataFrameIndexerTransformStats indexerTransformStats) {
return new DataFrameTransformStats(id,
DataFrameTransformTaskState.STOPPED,
State.STOPPED,
null,
null,
indexerTransformStats,
@ -88,11 +91,11 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject {
}
public DataFrameTransformStats(String id, DataFrameTransformTaskState taskState, @Nullable String reason,
public DataFrameTransformStats(String id, State state, @Nullable String reason,
@Nullable NodeAttributes node, DataFrameIndexerTransformStats stats,
DataFrameTransformCheckpointingInfo checkpointingInfo) {
this.id = Objects.requireNonNull(id);
this.taskState = Objects.requireNonNull(taskState);
this.state = Objects.requireNonNull(state);
this.reason = reason;
this.node = node;
this.indexerStats = Objects.requireNonNull(stats);
@ -102,7 +105,7 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject {
public DataFrameTransformStats(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
this.id = in.readString();
this.taskState = in.readEnum(DataFrameTransformTaskState.class);
this.state = in.readEnum(State.class);
this.reason = in.readOptionalString();
if (in.readBoolean()) {
this.node = new NodeAttributes(in);
@ -117,9 +120,9 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject {
// to do the best we can of reading from a DataFrameTransformStoredDoc object
// (which is called DataFrameTransformStateAndStats in 7.2/7.3)
this.id = in.readString();
DataFrameTransformState state = new DataFrameTransformState(in);
this.taskState = state.getTaskState();
this.reason = state.getReason();
DataFrameTransformState transformState = new DataFrameTransformState(in);
this.state = State.fromComponents(transformState.getTaskState(), transformState.getIndexerState());
this.reason = transformState.getReason();
this.node = null;
this.indexerStats = new DataFrameIndexerTransformStats(in);
this.checkpointingInfo = new DataFrameTransformCheckpointingInfo(in);
@ -130,7 +133,7 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DataFrameField.ID.getPreferredName(), id);
builder.field(TASK_STATE_FIELD.getPreferredName(), taskState.value());
builder.field(STATE_FIELD.getPreferredName(), state.value());
if (reason != null) {
builder.field(REASON_FIELD.getPreferredName(), reason);
}
@ -147,7 +150,7 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject {
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
out.writeString(id);
out.writeEnum(taskState);
out.writeEnum(state);
out.writeOptionalString(reason);
if (node != null) {
out.writeBoolean(true);
@ -162,8 +165,9 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject {
// to do the best we can of writing to a DataFrameTransformStoredDoc object
// (which is called DataFrameTransformStateAndStats in 7.2/7.3)
out.writeString(id);
new DataFrameTransformState(taskState,
checkpointingInfo.getNext().getIndexerState(),
Tuple<DataFrameTransformTaskState, IndexerState> stateComponents = state.toComponents();
new DataFrameTransformState(stateComponents.v1(),
stateComponents.v2(),
checkpointingInfo.getNext().getPosition(),
checkpointingInfo.getLast().getCheckpoint(),
reason,
@ -176,7 +180,7 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject {
@Override
public int hashCode() {
return Objects.hash(id, taskState, reason, node, indexerStats, checkpointingInfo);
return Objects.hash(id, state, reason, node, indexerStats, checkpointingInfo);
}
@Override
@ -192,7 +196,7 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject {
DataFrameTransformStats that = (DataFrameTransformStats) other;
return Objects.equals(this.id, that.id)
&& Objects.equals(this.taskState, that.taskState)
&& Objects.equals(this.state, that.state)
&& Objects.equals(this.reason, that.reason)
&& Objects.equals(this.node, that.node)
&& Objects.equals(this.indexerStats, that.indexerStats)
@ -203,8 +207,8 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject {
return id;
}
public DataFrameTransformTaskState getTaskState() {
return taskState;
public State getState() {
return state;
}
@Nullable
@ -233,4 +237,79 @@ public class DataFrameTransformStats implements Writeable, ToXContentObject {
public String toString() {
return Strings.toString(this);
}
public enum State implements Writeable {
STARTED, INDEXING, ABORTING, STOPPING, STOPPED, FAILED;
public static State fromString(String name) {
return valueOf(name.trim().toUpperCase(Locale.ROOT));
}
public static State fromStream(StreamInput in) throws IOException {
return in.readEnum(State.class);
}
public static State fromComponents(DataFrameTransformTaskState taskState, IndexerState indexerState) {
if (taskState == null || taskState == DataFrameTransformTaskState.STOPPED) {
return STOPPED;
} else if (taskState == DataFrameTransformTaskState.FAILED) {
return FAILED;
} else {
// If we get here then the task state must be started, and that means we should have an indexer state
assert(taskState == DataFrameTransformTaskState.STARTED);
assert(indexerState != null);
switch (indexerState) {
case STARTED:
return STARTED;
case INDEXING:
return INDEXING;
case STOPPING:
return STOPPING;
case STOPPED:
return STOPPED;
case ABORTING:
return ABORTING;
default:
throw new IllegalStateException("Unexpected indexer state enum value: " + indexerState);
}
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeEnum(this);
}
public String value() {
return name().toLowerCase(Locale.ROOT);
}
public Tuple<DataFrameTransformTaskState, IndexerState> toComponents() {
switch (this) {
case STARTED:
return new Tuple<>(DataFrameTransformTaskState.STARTED, IndexerState.STARTED);
case INDEXING:
return new Tuple<>(DataFrameTransformTaskState.STARTED, IndexerState.INDEXING);
case ABORTING:
return new Tuple<>(DataFrameTransformTaskState.STARTED, IndexerState.ABORTING);
case STOPPING:
return new Tuple<>(DataFrameTransformTaskState.STARTED, IndexerState.STOPPING);
case STOPPED:
// This one is not deterministic, because an overall state of STOPPED could arise
// from either (STOPPED, null) or (STARTED, STOPPED). However, (STARTED, STOPPED)
// is a very short-lived state so it's reasonable to assume the other, especially
// as this method is only for mixed version cluster compatibility.
return new Tuple<>(DataFrameTransformTaskState.STOPPED, null);
case FAILED:
return new Tuple<>(DataFrameTransformTaskState.FAILED, null);
default:
throw new IllegalStateException("Unexpected state enum value: " + this);
}
}
}
}

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import java.io.IOException;
@ -16,7 +15,6 @@ public class DataFrameTransformCheckpointStatsTests extends AbstractSerializingD
{
public static DataFrameTransformCheckpointStats randomDataFrameTransformCheckpointStats() {
return new DataFrameTransformCheckpointStats(randomLongBetween(1, 1_000_000),
randomBoolean() ? null : randomFrom(IndexerState.values()),
DataFrameIndexerPositionTests.randomDataFrameIndexerPosition(),
randomBoolean() ? null : DataFrameTransformProgressTests.randomDataFrameTransformProgress(),
randomLongBetween(1, 1_000_000), randomLongBetween(0, 1_000_000));

View File

@ -17,7 +17,7 @@ public class DataFrameTransformStatsTests extends AbstractSerializingTestCase<Da
public static DataFrameTransformStats randomDataFrameTransformStats() {
return new DataFrameTransformStats(randomAlphaOfLength(10),
randomFrom(DataFrameTransformTaskState.values()),
randomFrom(DataFrameTransformStats.State.values()),
randomBoolean() ? null : randomAlphaOfLength(100),
randomBoolean() ? null : NodeAttributeTests.randomNodeAttributes(),
DataFrameIndexerTransformStatsTests.randomStats(),

View File

@ -18,7 +18,7 @@ import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigUpdate;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStats;
import org.elasticsearch.client.dataframe.transforms.DestConfig;
import org.elasticsearch.client.dataframe.transforms.TimeSyncConfig;
import org.elasticsearch.client.dataframe.transforms.pivot.SingleGroupSource;
@ -43,6 +43,7 @@ import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.oneOf;
public class DataFrameTransformIT extends DataFrameIntegTestCase {
@ -110,8 +111,8 @@ public class DataFrameTransformIT extends DataFrameIntegTestCase {
assertTrue(startDataFrameTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged());
waitUntilCheckpoint(config.getId(), 1L);
assertThat(getDataFrameTransformStats(config.getId()).getTransformsStats().get(0).getTaskState(),
equalTo(DataFrameTransformTaskState.STARTED));
assertThat(getDataFrameTransformStats(config.getId()).getTransformsStats().get(0).getState(),
equalTo(DataFrameTransformStats.State.STARTED));
long docsIndexed = getDataFrameTransformStats(config.getId())
.getTransformsStats()
@ -167,8 +168,8 @@ public class DataFrameTransformIT extends DataFrameIntegTestCase {
assertTrue(startDataFrameTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged());
waitUntilCheckpoint(config.getId(), 1L);
assertThat(getDataFrameTransformStats(config.getId()).getTransformsStats().get(0).getTaskState(),
equalTo(DataFrameTransformTaskState.STARTED));
assertThat(getDataFrameTransformStats(config.getId()).getTransformsStats().get(0).getState(),
oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING));
long docsIndexed = getDataFrameTransformStats(config.getId())
.getTransformsStats()

View File

@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.oneOf;
public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {
@ -114,7 +115,7 @@ public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {
transformsStats = (List<Map<String, Object>>)XContentMapValues.extractValue("transforms", stats);
assertEquals(1, transformsStats.size());
assertEquals("stopped", XContentMapValues.extractValue("task_state", transformsStats.get(0)));
assertEquals("stopped", XContentMapValues.extractValue("state", transformsStats.get(0)));
assertNull(XContentMapValues.extractValue("checkpointing.next.position", transformsStats.get(0)));
assertEquals(1, XContentMapValues.extractValue("checkpointing.last.checkpoint", transformsStats.get(0)));
@ -125,7 +126,7 @@ public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {
transformsStats = (List<Map<String, Object>>)XContentMapValues.extractValue("transforms", stats);
assertEquals(1, transformsStats.size());
assertEquals("started", XContentMapValues.extractValue("task_state", transformsStats.get(0)));
assertThat(XContentMapValues.extractValue("state", transformsStats.get(0)), oneOf("started", "indexing"));
assertEquals(1, XContentMapValues.extractValue("checkpointing.last.checkpoint", transformsStats.get(0)));

View File

@ -300,8 +300,7 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
void waitForDataFrameStopped(String transformId) throws Exception {
assertBusy(() -> {
assertEquals("stopped", getDataFrameTaskState(transformId));
assertEquals("stopped", getDataFrameIndexerState(transformId));
assertEquals("stopped", getDataFrameTransformState(transformId));
}, 15, TimeUnit.SECONDS);
}
@ -326,19 +325,9 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
return transformConfigs == null ? Collections.emptyList() : transformConfigs;
}
protected static String getDataFrameIndexerState(String transformId) throws IOException {
protected static String getDataFrameTransformState(String transformId) throws IOException {
Map<?, ?> transformStatsAsMap = getDataFrameState(transformId);
if (transformStatsAsMap == null) {
return null;
}
String indexerState = (String) XContentMapValues.extractValue("checkpointing.next.indexer_state", transformStatsAsMap);
// If the transform is stopped then it might not have an indexer state, but logically that's the same as the indexer being stopped
return indexerState == null ? "stopped" : indexerState;
}
protected static String getDataFrameTaskState(String transformId) throws IOException {
Map<?, ?> transformStatsAsMap = getDataFrameState(transformId);
return transformStatsAsMap == null ? null : (String) XContentMapValues.extractValue("task_state", transformStatsAsMap);
return transformStatsAsMap == null ? null : (String) XContentMapValues.extractValue("state", transformStatsAsMap);
}
protected static Map<?, ?> getDataFrameState(String transformId) throws IOException {
@ -378,10 +367,12 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
request.addParameter("timeout", "10s");
request.addParameter("ignore", "404");
adminClient().performRequest(request);
String state = getDataFrameIndexerState(transformId);
if (state != null) {
assertEquals("stopped", getDataFrameIndexerState(transformId));
}
for (Map<String, Object> transformConfig : transformConfigs) {
String transformId = (String) transformConfig.get("id");
String state = getDataFrameTransformState(transformId);
assertEquals("Transform [" + transformId + "] is not in the stopped state", "stopped", state);
}
for (Map<String, Object> transformConfig : transformConfigs) {

View File

@ -14,7 +14,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStats;
import org.junit.After;
import org.junit.Before;
@ -29,6 +29,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.oneOf;
public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
@ -60,7 +61,7 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
createContinuousPivotReviewsTransform(transformId, dataFrameIndex, null);
failureTransforms.add(transformId);
startDataframeTransform(transformId, false);
awaitState(transformId, DataFrameTransformTaskState.FAILED);
awaitState(transformId, DataFrameTransformStats.State.FAILED);
Map<?, ?> fullState = getDataFrameState(transformId);
final String failureReason = "task encountered more than 0 failures; latest failure: " +
"Bulk index experienced failures. See the logs of the node running the transform for details.";
@ -78,7 +79,7 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
// Verify that we can force stop a failed transform
stopDataFrameTransform(transformId, true);
awaitState(transformId, DataFrameTransformTaskState.STOPPED);
awaitState(transformId, DataFrameTransformStats.State.STOPPED);
fullState = getDataFrameState(transformId);
assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue()));
}
@ -91,7 +92,7 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
createContinuousPivotReviewsTransform(transformId, dataFrameIndex, null);
failureTransforms.add(transformId);
startDataframeTransform(transformId, false);
awaitState(transformId, DataFrameTransformTaskState.FAILED);
awaitState(transformId, DataFrameTransformStats.State.FAILED);
Map<?, ?> fullState = getDataFrameState(transformId);
final String failureReason = "task encountered more than 0 failures; latest failure: " +
"Bulk index experienced failures. See the logs of the node running the transform for details.";
@ -114,15 +115,15 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
// Verify that we have started and that our reason is cleared
fullState = getDataFrameState(transformId);
assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue()));
assertThat(XContentMapValues.extractValue("task_state", fullState), equalTo("started"));
assertThat(XContentMapValues.extractValue("state", fullState), oneOf("started", "indexing"));
assertThat((Integer)XContentMapValues.extractValue("stats.index_failures", fullState), greaterThanOrEqualTo(1));
stopDataFrameTransform(transformId, true);
}
private void awaitState(String transformId, DataFrameTransformTaskState state) throws Exception {
private void awaitState(String transformId, DataFrameTransformStats.State state) throws Exception {
assertBusy(() -> {
String currentState = getDataFrameTaskState(transformId);
String currentState = getDataFrameTransformState(transformId);
assertThat(currentState, equalTo(state.value()));
}, 180, TimeUnit.SECONDS); // It should not take this long, but if the scheduler gets deferred, it could
}

View File

@ -30,7 +30,6 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheck
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStoredDoc;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.core.dataframe.transforms.NodeAttributes;
import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
@ -90,7 +89,7 @@ public class TransportGetDataFrameTransformsStatsAction extends
task.getCheckpointingInfo(transformsCheckpointService, ActionListener.wrap(
checkpointingInfo -> listener.onResponse(new Response(
Collections.singletonList(new DataFrameTransformStats(task.getTransformId(),
transformState.getTaskState(),
DataFrameTransformStats.State.fromComponents(transformState.getTaskState(), transformState.getIndexerState()),
transformState.getReason(),
null,
task.getStats(),
@ -100,7 +99,7 @@ public class TransportGetDataFrameTransformsStatsAction extends
logger.warn("Failed to retrieve checkpointing info for transform [" + task.getTransformId() + "]", e);
listener.onResponse(new Response(
Collections.singletonList(new DataFrameTransformStats(task.getTransformId(),
transformState.getTaskState(),
DataFrameTransformStats.State.fromComponents(transformState.getTaskState(), transformState.getIndexerState()),
transformState.getReason(),
null,
task.getStats(),
@ -223,7 +222,6 @@ public class TransportGetDataFrameTransformsStatsAction extends
transformsCheckpointService.getCheckpointingInfo(
transform.getId(),
transform.getTransformState().getCheckpoint(),
transform.getTransformState().getIndexerState(),
transform.getTransformState().getPosition(),
transform.getTransformState().getProgress(),
ActionListener.wrap(
@ -254,7 +252,7 @@ public class TransportGetDataFrameTransformsStatsAction extends
synchronized (allStateAndStats) {
allStateAndStats.add(new DataFrameTransformStats(
stat.getId(),
DataFrameTransformTaskState.STOPPED,
DataFrameTransformStats.State.STOPPED,
null,
null,
stat.getTransformStats(),

View File

@ -11,7 +11,6 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerPositio
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress;
import org.elasticsearch.xpack.core.indexing.IndexerState;
/**
* Interface for checkpoint creation, checking for changes and getting statistics about checkpoints
@ -41,14 +40,12 @@ public interface CheckpointProvider {
*
* @param lastCheckpoint the last checkpoint
* @param nextCheckpoint the next checkpoint
* @param nextCheckpointIndexerState indexer state for the next checkpoint
* @param nextCheckpointPosition position for the next checkpoint
* @param nextCheckpointProgress progress for the next checkpoint
* @param listener listener to retrieve the result
*/
void getCheckpointingInfo(DataFrameTransformCheckpoint lastCheckpoint,
DataFrameTransformCheckpoint nextCheckpoint,
IndexerState nextCheckpointIndexerState,
DataFrameIndexerPosition nextCheckpointPosition,
DataFrameTransformProgress nextCheckpointProgress,
ActionListener<DataFrameTransformCheckpointingInfo> listener);
@ -59,13 +56,11 @@ public interface CheckpointProvider {
* For stopped data frames we need to do lookups in the internal index.
*
* @param lastCheckpointNumber the last checkpoint number
* @param nextCheckpointIndexerState indexer state for the next checkpoint
* @param nextCheckpointPosition position for the next checkpoint
* @param nextCheckpointProgress progress for the next checkpoint
* @param listener listener to retrieve the result
*/
void getCheckpointingInfo(long lastCheckpointNumber,
IndexerState nextCheckpointIndexerState,
DataFrameIndexerPosition nextCheckpointPosition,
DataFrameTransformProgress nextCheckpointProgress,
ActionListener<DataFrameTransformCheckpointingInfo> listener);

View File

@ -15,7 +15,6 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheck
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress;
import org.elasticsearch.xpack.core.dataframe.transforms.TimeSyncConfig;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
/**
@ -52,14 +51,12 @@ public class DataFrameTransformsCheckpointService {
*
* @param transformId The data frame task
* @param lastCheckpointNumber the last checkpoint
* @param nextCheckpointIndexerState indexer state for the next checkpoint
* @param nextCheckpointPosition position for the next checkpoint
* @param nextCheckpointProgress progress for the next checkpoint
* @param listener listener to retrieve the result
*/
public void getCheckpointingInfo(final String transformId,
final long lastCheckpointNumber,
final IndexerState nextCheckpointIndexerState,
final DataFrameIndexerPosition nextCheckpointPosition,
final DataFrameTransformProgress nextCheckpointProgress,
final ActionListener<DataFrameTransformCheckpointingInfo> listener) {
@ -67,7 +64,7 @@ public class DataFrameTransformsCheckpointService {
// we need to retrieve the config first before we can defer the rest to the corresponding provider
dataFrameTransformsConfigManager.getTransformConfiguration(transformId, ActionListener.wrap(
transformConfig -> {
getCheckpointProvider(transformConfig).getCheckpointingInfo(lastCheckpointNumber, nextCheckpointIndexerState,
getCheckpointProvider(transformConfig).getCheckpointingInfo(lastCheckpointNumber,
nextCheckpointPosition, nextCheckpointProgress, listener);
},
transformError -> {

View File

@ -25,7 +25,6 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheck
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import java.util.Arrays;
@ -40,7 +39,6 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
* Builder for collecting checkpointing information for the purpose of _stats
*/
private static class DataFrameTransformCheckpointingInfoBuilder {
private IndexerState nextCheckpointIndexerState;
private DataFrameIndexerPosition nextCheckpointPosition;
private DataFrameTransformProgress nextCheckpointProgress;
private DataFrameTransformCheckpoint lastCheckpoint;
@ -66,9 +64,9 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
long nextCheckpointNumber = nextCheckpoint.getCheckpoint() > 0 ? nextCheckpoint.getCheckpoint() : 0;
return new DataFrameTransformCheckpointingInfo(
new DataFrameTransformCheckpointStats(lastCheckpointNumber, null, null, null,
new DataFrameTransformCheckpointStats(lastCheckpointNumber, null, null,
lastCheckpoint.getTimestamp(), lastCheckpoint.getTimeUpperBound()),
new DataFrameTransformCheckpointStats(nextCheckpointNumber, nextCheckpointIndexerState, nextCheckpointPosition,
new DataFrameTransformCheckpointStats(nextCheckpointNumber, nextCheckpointPosition,
nextCheckpointProgress, nextCheckpoint.getTimestamp(), nextCheckpoint.getTimeUpperBound()),
DataFrameTransformCheckpoint.getBehind(lastCheckpoint, sourceCheckpoint));
}
@ -97,12 +95,6 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
this.nextCheckpointPosition = nextCheckpointPosition;
return this;
}
public DataFrameTransformCheckpointingInfoBuilder setNextCheckpointIndexerState(IndexerState nextCheckpointIndexerState) {
this.nextCheckpointIndexerState = nextCheckpointIndexerState;
return this;
}
}
private static final Logger logger = LogManager.getLogger(DefaultCheckpointProvider.class);
@ -227,7 +219,6 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
@Override
public void getCheckpointingInfo(DataFrameTransformCheckpoint lastCheckpoint,
DataFrameTransformCheckpoint nextCheckpoint,
IndexerState nextCheckpointIndexerState,
DataFrameIndexerPosition nextCheckpointPosition,
DataFrameTransformProgress nextCheckpointProgress,
ActionListener<DataFrameTransformCheckpointingInfo> listener) {
@ -236,7 +227,6 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
checkpointingInfoBuilder.setLastCheckpoint(lastCheckpoint)
.setNextCheckpoint(nextCheckpoint)
.setNextCheckpointIndexerState(nextCheckpointIndexerState)
.setNextCheckpointPosition(nextCheckpointPosition)
.setNextCheckpointProgress(nextCheckpointProgress);
@ -250,15 +240,13 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
}
@Override
public void getCheckpointingInfo(long lastCheckpointNumber, IndexerState nextCheckpointIndexerState,
DataFrameIndexerPosition nextCheckpointPosition, DataFrameTransformProgress nextCheckpointProgress,
public void getCheckpointingInfo(long lastCheckpointNumber, DataFrameIndexerPosition nextCheckpointPosition,
DataFrameTransformProgress nextCheckpointProgress,
ActionListener<DataFrameTransformCheckpointingInfo> listener) {
DataFrameTransformCheckpointingInfoBuilder checkpointingInfoBuilder = new DataFrameTransformCheckpointingInfoBuilder();
checkpointingInfoBuilder.setNextCheckpointIndexerState(nextCheckpointIndexerState)
.setNextCheckpointPosition(nextCheckpointPosition)
.setNextCheckpointProgress(nextCheckpointProgress);
checkpointingInfoBuilder.setNextCheckpointPosition(nextCheckpointPosition).setNextCheckpointProgress(nextCheckpointProgress);
long timestamp = System.currentTimeMillis();

View File

@ -187,7 +187,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
transformsCheckpointService.getCheckpointingInfo(
transform.getId(),
currentCheckpoint.get(),
initialIndexerState,
initialPosition,
null,
listener);
@ -196,7 +195,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
indexer.getCheckpointProvider().getCheckpointingInfo(
indexer.getLastCheckpoint(),
indexer.getNextCheckpoint(),
indexer.getState(),
indexer.getPosition(),
indexer.getProgress(),
listener);

View File

@ -47,7 +47,6 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheck
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgressTests;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.dataframe.DataFrameSingleNodeTestCase;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.junit.After;
@ -198,31 +197,31 @@ public class DataFrameTransformCheckpointServiceNodeTests extends DataFrameSingl
mockClientForCheckpointing.setShardStats(createShardStats(createCheckPointMap(transformId, 20, 20, 20)));
DataFrameTransformCheckpointingInfo checkpointInfo = new DataFrameTransformCheckpointingInfo(
new DataFrameTransformCheckpointStats(1, null, null, null, timestamp, 0L),
new DataFrameTransformCheckpointStats(2, IndexerState.STARTED, position, progress, timestamp + 100L, 0L),
new DataFrameTransformCheckpointStats(1, null, null, timestamp, 0L),
new DataFrameTransformCheckpointStats(2, position, progress, timestamp + 100L, 0L),
30L);
assertAsync(listener ->
transformsCheckpointService.getCheckpointingInfo(transformId, 1, IndexerState.STARTED, position, progress, listener),
transformsCheckpointService.getCheckpointingInfo(transformId, 1, position, progress, listener),
checkpointInfo, null, null);
mockClientForCheckpointing.setShardStats(createShardStats(createCheckPointMap(transformId, 10, 50, 33)));
checkpointInfo = new DataFrameTransformCheckpointingInfo(
new DataFrameTransformCheckpointStats(1, null, null, null, timestamp, 0L),
new DataFrameTransformCheckpointStats(2, IndexerState.INDEXING, position, progress, timestamp + 100L, 0L),
new DataFrameTransformCheckpointStats(1, null, null, timestamp, 0L),
new DataFrameTransformCheckpointStats(2, position, progress, timestamp + 100L, 0L),
63L);
assertAsync(listener ->
transformsCheckpointService.getCheckpointingInfo(transformId, 1, IndexerState.INDEXING, position, progress, listener),
transformsCheckpointService.getCheckpointingInfo(transformId, 1, position, progress, listener),
checkpointInfo, null, null);
// same as current
mockClientForCheckpointing.setShardStats(createShardStats(createCheckPointMap(transformId, 10, 10, 10)));
checkpointInfo = new DataFrameTransformCheckpointingInfo(
new DataFrameTransformCheckpointStats(1, null, null, null, timestamp, 0L),
new DataFrameTransformCheckpointStats(2, IndexerState.STOPPING, position, progress, timestamp + 100L, 0L),
new DataFrameTransformCheckpointStats(1, null, null, timestamp, 0L),
new DataFrameTransformCheckpointStats(2, position, progress, timestamp + 100L, 0L),
0L);
assertAsync(listener ->
transformsCheckpointService.getCheckpointingInfo(transformId, 1, IndexerState.STOPPING, position, progress, listener),
transformsCheckpointService.getCheckpointingInfo(transformId, 1, position, progress, listener),
checkpointInfo, null, null);
}

View File

@ -619,7 +619,7 @@ setup:
transform_id: "airline-transform-start-delete"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-delete" }
- match: { transforms.0.task_state: "started" }
- match: { transforms.0.state: "/started|indexing/" }
- do:
catch: /Cannot delete data frame \[airline-transform-start-delete\] as the task is running/

View File

@ -100,7 +100,7 @@ teardown:
transform_id: "airline-transform-start-stop"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-stop" }
- match: { transforms.0.task_state: "started" }
- match: { transforms.0.state: "/started|indexing/" }
- do:
data_frame.stop_data_frame_transform:
@ -113,7 +113,7 @@ teardown:
transform_id: "airline-transform-start-stop"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-stop" }
- match: { transforms.0.task_state: "stopped" }
- match: { transforms.0.state: "stopped" }
- do:
data_frame.start_data_frame_transform:
@ -125,7 +125,7 @@ teardown:
transform_id: "airline-transform-start-stop"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-stop" }
- match: { transforms.0.task_state: "started" }
- match: { transforms.0.state: "/started|indexing/" }
---
"Test start/stop/start continuous transform":
- do:
@ -157,7 +157,7 @@ teardown:
transform_id: "airline-transform-start-stop-continuous"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-stop-continuous" }
- match: { transforms.0.task_state: "started" }
- match: { transforms.0.state: "/started|indexing/" }
- do:
data_frame.stop_data_frame_transform:
@ -170,7 +170,7 @@ teardown:
transform_id: "airline-transform-start-stop-continuous"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-stop-continuous" }
- match: { transforms.0.task_state: "stopped" }
- match: { transforms.0.state: "stopped" }
- do:
data_frame.start_data_frame_transform:
@ -182,7 +182,7 @@ teardown:
transform_id: "airline-transform-start-stop-continuous"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-stop-continuous" }
- match: { transforms.0.task_state: "started" }
- match: { transforms.0.state: "/started|indexing/" }
- do:
data_frame.stop_data_frame_transform:
@ -244,14 +244,14 @@ teardown:
transform_id: "airline-transform-start-stop"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-stop" }
- match: { transforms.0.task_state: "started" }
- match: { transforms.0.state: "/started|indexing/" }
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "airline-transform-start-later"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-later" }
- match: { transforms.0.task_state: "stopped" }
- match: { transforms.0.state: "stopped" }
- do:
data_frame.start_data_frame_transform:
@ -270,7 +270,7 @@ teardown:
transform_id: "airline-transform-start-later"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-later" }
- match: { transforms.0.task_state: "started" }
- match: { transforms.0.state: "/started|indexing/" }
- do:
data_frame.stop_data_frame_transform:
@ -316,8 +316,8 @@ teardown:
data_frame.get_data_frame_transform_stats:
transform_id: "*"
- match: { count: 2 }
- match: { transforms.0.task_state: "stopped" }
- match: { transforms.1.task_state: "stopped" }
- match: { transforms.0.state: "stopped" }
- match: { transforms.1.state: "stopped" }
- do:
data_frame.delete_data_frame_transform:

View File

@ -47,7 +47,7 @@ teardown:
transform_id: "airline-transform-stats"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-stats" }
- match: { transforms.0.task_state: "/started|stopped/" }
- match: { transforms.0.state: "/started|indexing|stopping|stopped/" }
- lte: { transforms.0.checkpointing.last.checkpoint: 1 }
- lte: { transforms.0.stats.pages_processed: 1 }
- match: { transforms.0.stats.documents_processed: 0 }
@ -227,7 +227,7 @@ teardown:
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-stats-continuous" }
# Since this is continuous, there is no worry of it automatically stopping
- match: { transforms.0.task_state: "started" }
- match: { transforms.0.state: "/started|indexing/" }
- lte: { transforms.0.checkpointing.last.checkpoint: 1 }
# Since this is continuous, and _start does not return until it is assigned
# we should see a node assignment

View File

@ -14,7 +14,6 @@ import org.elasticsearch.client.Response;
import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsResponse;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStats;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.client.dataframe.transforms.DestConfig;
import org.elasticsearch.client.dataframe.transforms.SourceConfig;
import org.elasticsearch.client.dataframe.transforms.TimeSyncConfig;
@ -47,6 +46,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.oneOf;
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/43662")
public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
@ -139,7 +139,7 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
assertThat(stateAndStats.getIndexerStats().getOutputDocuments(), equalTo((long)ENTITIES.size()));
assertThat(stateAndStats.getIndexerStats().getNumDocuments(), equalTo(totalDocsWritten));
assertThat(stateAndStats.getTaskState(), equalTo(DataFrameTransformTaskState.STARTED));
assertThat(stateAndStats.getState(), oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING));
}
private void verifyContinuousDataFrameHandlesData(long expectedLastCheckpoint) throws Exception {
@ -148,7 +148,7 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
// if it was assigned to the node that was removed from the cluster
assertBusy(() -> {
DataFrameTransformStats stateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID);
assertThat(stateAndStats.getTaskState(), equalTo(DataFrameTransformTaskState.STARTED));
assertThat(stateAndStats.getState(), oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING));
},
120,
TimeUnit.SECONDS);
@ -174,8 +174,8 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
TimeUnit.SECONDS);
DataFrameTransformStats stateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID);
assertThat(stateAndStats.getTaskState(),
equalTo(DataFrameTransformTaskState.STARTED));
assertThat(stateAndStats.getState(),
oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING));
assertThat(stateAndStats.getIndexerStats().getOutputDocuments(),
greaterThan(previousStateAndStats.getIndexerStats().getOutputDocuments()));
assertThat(stateAndStats.getIndexerStats().getNumDocuments(),

View File

@ -29,10 +29,10 @@
transform_id: "mixed-simple-transform"
- match: { count: 1 }
- match: { transforms.0.id: "mixed-simple-transform" }
# Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we
# cannot assert on task_state in the mixed cluster as it could be at the top level or under state
# Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we cannot
# assert on state in the mixed cluster as it could be state at the top level or state.task_state
# TODO: uncomment this assertion in master
#- match: { transforms.0.task_state: "/started|stopped/" }
#- match: { transforms.0.state: "/started|indexing|stopping|stopped/" }
- do:
data_frame.stop_data_frame_transform:
@ -45,10 +45,10 @@
transform_id: "mixed-simple-transform"
- match: { count: 1 }
- match: { transforms.0.id: "mixed-simple-transform" }
# Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we
# cannot assert on task_state in the mixed cluster as it could be at the top level or under state
# Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we cannot
# assert on state in the mixed cluster as it could be state at the top level or state.task_state
# TODO: uncomment this assertion in master
#- match: { transforms.0.task_state: "stopped" }
#- match: { transforms.0.state: "stopped" }
- do:
data_frame.put_data_frame_transform:
@ -92,10 +92,10 @@
transform_id: "mixed-complex-transform"
- match: { count: 1 }
- match: { transforms.0.id: "mixed-complex-transform" }
# Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we
# cannot assert on task_state in the mixed cluster as it could be at the top level or under state
# Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we cannot
# assert on state in the mixed cluster as it could be state at the top level or state.task_state
# TODO: uncomment this assertion in master
#- match: { transforms.0.task_state: "/started|stopped/" }
#- match: { transforms.0.state: "/started|indexing|stopping|stopped/" }
- do:
data_frame.stop_data_frame_transform:
@ -108,10 +108,10 @@
transform_id: "mixed-complex-transform"
- match: { count: 1 }
- match: { transforms.0.id: "mixed-complex-transform" }
# Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we
# cannot assert on task_state in the mixed cluster as it could be at the top level or under state
# Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we cannot
# assert on state in the mixed cluster as it could be state at the top level or state.task_state
# TODO: uncomment this assertion in master
#- match: { transforms.0.task_state: "stopped" }
#- match: { transforms.0.state: "stopped" }
---
"Test GET, start, and stop old cluster batch transforms":
@ -143,10 +143,10 @@
transform_id: "old-simple-transform"
- match: { count: 1 }
- match: { transforms.0.id: "old-simple-transform" }
# Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we
# cannot assert on task_state in the mixed cluster as it could be at the top level or under state
# Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we cannot
# assert on state in the mixed cluster as it could be state at the top level or state.task_state
# TODO: uncomment this assertion in master
#- match: { transforms.0.task_state: "/started|stopped/" }
#- match: { transforms.0.state: "/started|indexing|stopping|stopped/" }
- do:
data_frame.stop_data_frame_transform:
@ -158,10 +158,10 @@
transform_id: "old-simple-transform"
- match: { count: 1 }
- match: { transforms.0.id: "old-simple-transform" }
# Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we
# cannot assert on task_state in the mixed cluster as it could be at the top level or under state
# Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we cannot
# assert on state in the mixed cluster as it could be state at the top level or state.task_state
# TODO: uncomment this assertion in master
#- match: { transforms.0.task_state: "stopped" }
#- match: { transforms.0.state: "stopped" }
- do:
data_frame.get_data_frame_transform:
@ -184,10 +184,10 @@
transform_id: "old-complex-transform"
- match: { count: 1 }
- match: { transforms.0.id: "old-complex-transform" }
# Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we
# cannot assert on task_state in the mixed cluster as it could be at the top level or under state
# Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we cannot
# assert on state in the mixed cluster as it could be state at the top level or state.task_state
# TODO: uncomment this assertion in master
#- match: { transforms.0.task_state: "/started|stopped/" }
#- match: { transforms.0.state: "/started|indexing|stopping|stopped/" }
- do:
data_frame.stop_data_frame_transform:
@ -199,7 +199,7 @@
transform_id: "old-complex-transform"
- match: { count: 1 }
- match: { transforms.0.id: "old-complex-transform" }
# Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we
# cannot assert on task_state in the mixed cluster as it could be at the top level or under state
# Since we are breaking the stats format between 7.3 and 7.4 (allowed because we're beta) we cannot
# assert on state in the mixed cluster as it could be state at the top level or state.task_state
# TODO: uncomment this assertion in master
#- match: { transforms.0.task_state: "stopped" }
#- match: { transforms.0.state: "stopped" }

View File

@ -27,7 +27,7 @@ setup:
transform_id: "old-simple-transform"
- match: { count: 1 }
- match: { transforms.0.id: "old-simple-transform" }
- match: { transforms.0.task_state: "/started|stopped/" }
- match: { transforms.0.state: "/started|indexing|stopping|stopped/" }
- do:
data_frame.stop_data_frame_transform:
@ -39,7 +39,7 @@ setup:
transform_id: "old-simple-transform"
- match: { count: 1 }
- match: { transforms.0.id: "old-simple-transform" }
- match: { transforms.0.task_state: "stopped" }
- match: { transforms.0.state: "stopped" }
- do:
data_frame.get_data_frame_transform:
transform_id: "old-complex-transform"
@ -61,7 +61,7 @@ setup:
transform_id: "old-complex-transform"
- match: { count: 1 }
- match: { transforms.0.id: "old-complex-transform" }
- match: { transforms.0.task_state: "/started|stopped/" }
- match: { transforms.0.state: "/started|indexing|stopping|stopped/" }
- do:
data_frame.stop_data_frame_transform:
@ -73,7 +73,7 @@ setup:
transform_id: "old-complex-transform"
- match: { count: 1 }
- match: { transforms.0.id: "old-complex-transform" }
- match: { transforms.0.task_state: "stopped" }
- match: { transforms.0.state: "stopped" }
# Simple and complex Mixed cluster transforms
- do:
@ -95,7 +95,7 @@ setup:
transform_id: "mixed-simple-transform"
- match: { count: 1 }
- match: { transforms.0.id: "mixed-simple-transform" }
- match: { transforms.0.task_state: "/started|stopped/" }
- match: { transforms.0.state: "/started|indexing|stopping|stopped/" }
- do:
data_frame.stop_data_frame_transform:
@ -107,7 +107,7 @@ setup:
transform_id: "mixed-simple-transform"
- match: { count: 1 }
- match: { transforms.0.id: "mixed-simple-transform" }
- match: { transforms.0.task_state: "stopped" }
- match: { transforms.0.state: "stopped" }
- do:
data_frame.get_data_frame_transform:
@ -130,7 +130,7 @@ setup:
transform_id: "mixed-complex-transform"
- match: { count: 1 }
- match: { transforms.0.id: "mixed-complex-transform" }
- match: { transforms.0.task_state: "/started|stopped/" }
- match: { transforms.0.state: "/started|indexing|stopping|stopped/" }
- do:
data_frame.stop_data_frame_transform:
@ -142,7 +142,7 @@ setup:
transform_id: "mixed-complex-transform"
- match: { count: 1 }
- match: { transforms.0.id: "mixed-complex-transform" }
- match: { transforms.0.task_state: "stopped" }
- match: { transforms.0.state: "stopped" }
# Delete all old and mixed transforms
- do: