[ML] Add data frame task state object and field (#40169) (#40490)

* [ML] Add data frame task state object and field

* A new state item is added so that the overall task state can be
accoutned for
* A new FAILED state and reason have been added as well so that failures
can be shown to the user for optional correction

* Addressing PR comments

* adjusting after master merge

* addressing pr comment

* Adjusting auditor usage with failure state

* Refactor, renamed state items to task_state and indexer_state

* Adding todo and removing redundant auditor call

* Address HLRC changes and PR comment

* adjusting hlrc IT test
This commit is contained in:
Benjamin Trent 2019-03-27 06:53:58 -05:00 committed by GitHub
parent 38dc005b8d
commit 12943c5d2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 644 additions and 131 deletions

View File

@ -39,17 +39,25 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
public class DataFrameTransformState {
private static final ParseField STATE = new ParseField("transform_state");
private static final ParseField INDEXER_STATE = new ParseField("indexer_state");
private static final ParseField TASK_STATE = new ParseField("task_state");
private static final ParseField CURRENT_POSITION = new ParseField("current_position");
private static final ParseField GENERATION = new ParseField("generation");
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<DataFrameTransformState, Void> PARSER =
new ConstructingObjectParser<>("data_frame_transform_state",
args -> new DataFrameTransformState((IndexerState) args[0], (HashMap<String, Object>) args[1], (long) args[2]));
args -> new DataFrameTransformState((DataFrameTransformTaskState) args[0],
(IndexerState) args[1],
(HashMap<String, Object>) args[2],
(long) args[3]));
static {
PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), STATE, ObjectParser.ValueType.STRING);
PARSER.declareField(constructorArg(),
p -> DataFrameTransformTaskState.fromString(p.text()),
TASK_STATE,
ObjectParser.ValueType.STRING);
PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE, ObjectParser.ValueType.STRING);
PARSER.declareField(optionalConstructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.START_OBJECT) {
return p.map();
@ -66,18 +74,27 @@ public class DataFrameTransformState {
return PARSER.parse(parser, null);
}
private final IndexerState state;
private final DataFrameTransformTaskState taskState;
private final IndexerState indexerState;
private final long generation;
private final SortedMap<String, Object> currentPosition;
public DataFrameTransformState(IndexerState state, @Nullable Map<String, Object> position, long generation) {
this.state = state;
public DataFrameTransformState(DataFrameTransformTaskState taskState,
IndexerState indexerState,
@Nullable Map<String, Object> position,
long generation) {
this.taskState = taskState;
this.indexerState = indexerState;
this.currentPosition = position == null ? null : Collections.unmodifiableSortedMap(new TreeMap<>(position));
this.generation = generation;
}
public IndexerState getIndexerState() {
return state;
return indexerState;
}
public DataFrameTransformTaskState getTaskState() {
return taskState;
}
@Nullable
@ -101,12 +118,14 @@ public class DataFrameTransformState {
DataFrameTransformState that = (DataFrameTransformState) other;
return Objects.equals(this.state, that.state) && Objects.equals(this.currentPosition, that.currentPosition)
&& this.generation == that.generation;
return Objects.equals(this.taskState, that.taskState) &&
Objects.equals(this.indexerState, that.indexerState) &&
Objects.equals(this.currentPosition, that.currentPosition) &&
this.generation == that.generation;
}
@Override
public int hashCode() {
return Objects.hash(state, currentPosition, generation);
return Objects.hash(taskState, indexerState, currentPosition, generation);
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe.transforms;
import java.util.Locale;
public enum DataFrameTransformTaskState {
STOPPED, STARTED, FAILED;
public static DataFrameTransformTaskState fromString(String name) {
return valueOf(name.trim().toUpperCase(Locale.ROOT));
}
public String value() {
return name().toLowerCase(Locale.ROOT);
}
}

View File

@ -39,6 +39,7 @@ import org.elasticsearch.client.dataframe.StopDataFrameTransformResponse;
import org.elasticsearch.client.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.client.dataframe.transforms.DestConfig;
import org.elasticsearch.client.dataframe.transforms.QueryConfig;
import org.elasticsearch.client.dataframe.transforms.SourceConfig;
@ -67,6 +68,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
@ -277,18 +279,23 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
assertEquals(1, statsResponse.getTransformsStateAndStats().size());
DataFrameTransformStateAndStats stats = statsResponse.getTransformsStateAndStats().get(0);
assertEquals(DataFrameTransformTaskState.STOPPED, stats.getTransformState().getTaskState());
assertEquals(IndexerState.STOPPED, stats.getTransformState().getIndexerState());
DataFrameIndexerTransformStats zeroIndexerStats = new DataFrameIndexerTransformStats(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
assertEquals(zeroIndexerStats, stats.getTransformStats());
// start the transform
execute(new StartDataFrameTransformRequest(id), client::startDataFrameTransform, client::startDataFrameTransformAsync);
StartDataFrameTransformResponse startTransformResponse = execute(new StartDataFrameTransformRequest(id),
client::startDataFrameTransform,
client::startDataFrameTransformAsync);
assertThat(startTransformResponse.isStarted(), is(true));
assertBusy(() -> {
GetDataFrameTransformStatsResponse response = execute(new GetDataFrameTransformStatsRequest(id),
client::getDataFrameTransformStats, client::getDataFrameTransformStatsAsync);
DataFrameTransformStateAndStats stateAndStats = response.getTransformsStateAndStats().get(0);
assertEquals(IndexerState.STARTED, stateAndStats.getTransformState().getIndexerState());
assertEquals(DataFrameTransformTaskState.STARTED, stateAndStats.getTransformState().getTaskState());
assertNotEquals(zeroIndexerStats, stateAndStats.getTransformStats());
});
}

View File

@ -41,12 +41,16 @@ public class DataFrameTransformStateTests extends ESTestCase {
}
public static DataFrameTransformState randomDataFrameTransformState() {
return new DataFrameTransformState(randomFrom(IndexerState.values()), randomPositionMap(), randomLongBetween(0,10));
return new DataFrameTransformState(randomFrom(DataFrameTransformTaskState.values()),
randomFrom(IndexerState.values()),
randomPositionMap(),
randomLongBetween(0,10));
}
public static void toXContent(DataFrameTransformState state, XContentBuilder builder) throws IOException {
builder.startObject();
builder.field("transform_state", state.getIndexerState().value());
builder.field("task_state", state.getTaskState().value());
builder.field("indexer_state", state.getIndexerState().value());
if (state.getPosition() != null) {
builder.field("current_position", state.getPosition());
}

View File

@ -39,6 +39,7 @@ import org.elasticsearch.client.dataframe.StopDataFrameTransformResponse;
import org.elasticsearch.client.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.client.dataframe.transforms.DestConfig;
import org.elasticsearch.client.dataframe.transforms.QueryConfig;
import org.elasticsearch.client.dataframe.transforms.SourceConfig;
@ -466,13 +467,16 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
// tag::get-data-frame-transform-stats-response
DataFrameTransformStateAndStats stateAndStats =
response.getTransformsStateAndStats().get(0); // <1>
DataFrameTransformTaskState taskState =
stateAndStats.getTransformState().getTaskState(); // <2>
IndexerState indexerState =
stateAndStats.getTransformState().getIndexerState(); // <2>
stateAndStats.getTransformState().getIndexerState(); // <3>
DataFrameIndexerTransformStats transformStats =
stateAndStats.getTransformStats(); // <3>
stateAndStats.getTransformStats(); // <4>
// end::get-data-frame-transform-stats-response
assertEquals(IndexerState.STOPPED, indexerState);
assertEquals(DataFrameTransformTaskState.STOPPED, taskState);
assertNotNull(transformStats);
}
{

View File

@ -35,5 +35,6 @@ The returned +{response}+ contains the requested {dataframe-transform} statistic
include-tagged::{doc-tests-file}[{api}-response]
--------------------------------------------------
<1> The response contains a list of `DataFrameTransformStateAndStats` objects
<2> The running state of the transform e.g `started`
<3> The transform progress statistics recording the number of documents indexed etc
<2> The running state of the transform task e.g `started`
<3> The running state of the transform indexer e.g `started`, `indexing`, etc.
<4> The transform progress statistics recording the number of documents indexed etc

View File

@ -63,7 +63,8 @@ The API returns the following results:
{
"id" : "ecommerce_transform",
"state" : {
"transform_state" : "started",
"indexer_state" : "started",
"task_state": "started",
"current_position" : {
"customer_id" : "9"
},

View File

@ -26,6 +26,7 @@ public final class DataFrameField {
public static final ParseField INDEX_DOC_TYPE = new ParseField("doc_type");
public static final ParseField SOURCE = new ParseField("source");
public static final ParseField DESTINATION = new ParseField("dest");
public static final ParseField FORCE = new ParseField("force");
// common strings
public static final String TASK_NAME = "data_frame/transforms";

View File

@ -42,9 +42,11 @@ public class StartDataFrameTransformAction extends Action<StartDataFrameTransfor
public static class Request extends AcknowledgedRequest<Request> implements ToXContent {
private String id;
private boolean force;
public Request(String id) {
public Request(String id, boolean force) {
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
this.force = force;
}
public Request() {
@ -59,6 +61,10 @@ public class StartDataFrameTransformAction extends Action<StartDataFrameTransfor
return id;
}
public boolean isForce() {
return force;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -47,23 +47,26 @@ public class StopDataFrameTransformAction extends Action<StopDataFrameTransformA
public static class Request extends BaseTasksRequest<Request> implements ToXContent {
private String id;
private final boolean waitForCompletion;
private final boolean force;
public Request(String id, boolean waitForCompletion, @Nullable TimeValue timeout) {
public Request(String id, boolean waitForCompletion, boolean force, @Nullable TimeValue timeout) {
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
this.waitForCompletion = waitForCompletion;
this.force = force;
// use the timeout value already present in BaseTasksRequest
this.setTimeout(timeout == null ? DEFAULT_TIMEOUT : timeout);
}
private Request() {
this(null, false, null);
this(null, false, false, null);
}
public Request(StreamInput in) throws IOException {
super(in);
id = in.readString();
waitForCompletion = in.readBoolean();
force = in.readBoolean();
}
public String getId() {
@ -78,11 +81,16 @@ public class StopDataFrameTransformAction extends Action<StopDataFrameTransformA
return waitForCompletion;
}
public boolean isForce() {
return force;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
out.writeBoolean(waitForCompletion);
out.writeBoolean(force);
}
@Override
@ -94,6 +102,7 @@ public class StopDataFrameTransformAction extends Action<StopDataFrameTransformA
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(DataFrameField.ID.getPreferredName(), id);
builder.field(DataFrameField.WAIT_FOR_COMPLETION.getPreferredName(), waitForCompletion);
builder.field(DataFrameField.FORCE.getPreferredName(), force);
if (this.getTimeout() != null) {
builder.field(DataFrameField.TIMEOUT.getPreferredName(), this.getTimeout());
}
@ -103,7 +112,7 @@ public class StopDataFrameTransformAction extends Action<StopDataFrameTransformA
@Override
public int hashCode() {
// the base class does not implement hashCode, therefore we need to hash timeout ourselves
return Objects.hash(id, waitForCompletion, this.getTimeout());
return Objects.hash(id, waitForCompletion, force, this.getTimeout());
}
@Override
@ -122,7 +131,9 @@ public class StopDataFrameTransformAction extends Action<StopDataFrameTransformA
return false;
}
return Objects.equals(id, other.id) && Objects.equals(waitForCompletion, other.waitForCompletion);
return Objects.equals(id, other.id) &&
Objects.equals(waitForCompletion, other.waitForCompletion) &&
Objects.equals(force, other.force);
}
@Override

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
@ -21,7 +22,6 @@ import org.elasticsearch.xpack.core.indexing.IndexerState;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
@ -33,28 +33,44 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
public class DataFrameTransformState implements Task.Status, PersistentTaskState {
public static final String NAME = DataFrameField.TASK_NAME;
private final IndexerState state;
private final DataFrameTransformTaskState taskState;
private final IndexerState indexerState;
private final long generation;
@Nullable
private final SortedMap<String, Object> currentPosition;
@Nullable
private final String reason;
private static final ParseField STATE = new ParseField("transform_state");
private static final ParseField TASK_STATE = new ParseField("task_state");
private static final ParseField INDEXER_STATE = new ParseField("indexer_state");
private static final ParseField CURRENT_POSITION = new ParseField("current_position");
private static final ParseField GENERATION = new ParseField("generation");
private static final ParseField REASON = new ParseField("reason");
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<DataFrameTransformState, Void> PARSER = new ConstructingObjectParser<>(NAME,
args -> new DataFrameTransformState((IndexerState) args[0], (HashMap<String, Object>) args[1], (long) args[2]));
true,
args -> new DataFrameTransformState((DataFrameTransformTaskState) args[0],
(IndexerState) args[1],
(Map<String, Object>) args[2],
(long) args[3],
(String) args[4]));
static {
PARSER.declareField(constructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return DataFrameTransformTaskState.fromString(p.text());
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, TASK_STATE, ObjectParser.ValueType.STRING);
PARSER.declareField(constructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return IndexerState.fromString(p.text());
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, STATE, ObjectParser.ValueType.STRING);
}, INDEXER_STATE, ObjectParser.ValueType.STRING);
PARSER.declareField(optionalConstructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.START_OBJECT) {
return p.map();
@ -64,23 +80,36 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, CURRENT_POSITION, ObjectParser.ValueType.VALUE_OBJECT_ARRAY);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), GENERATION);
PARSER.declareLong(constructorArg(), GENERATION);
PARSER.declareString(optionalConstructorArg(), REASON);
}
public DataFrameTransformState(IndexerState state, @Nullable Map<String, Object> position, long generation) {
this.state = state;
public DataFrameTransformState(DataFrameTransformTaskState taskState,
IndexerState indexerState,
@Nullable Map<String, Object> position,
long generation,
@Nullable String reason) {
this.taskState = taskState;
this.indexerState = indexerState;
this.currentPosition = position == null ? null : Collections.unmodifiableSortedMap(new TreeMap<>(position));
this.generation = generation;
this.reason = reason;
}
public DataFrameTransformState(StreamInput in) throws IOException {
state = IndexerState.fromStream(in);
taskState = DataFrameTransformTaskState.fromStream(in);
indexerState = IndexerState.fromStream(in);
currentPosition = in.readBoolean() ? Collections.unmodifiableSortedMap(new TreeMap<>(in.readMap())) : null;
generation = in.readLong();
reason = in.readOptionalString();
}
public DataFrameTransformTaskState getTaskState() {
return taskState;
}
public IndexerState getIndexerState() {
return state;
return indexerState;
}
public Map<String, Object> getPosition() {
@ -91,6 +120,10 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
return generation;
}
public String getReason() {
return reason;
}
public static DataFrameTransformState fromXContent(XContentParser parser) {
try {
return PARSER.parse(parser, null);
@ -102,11 +135,15 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(STATE.getPreferredName(), state.value());
builder.field(TASK_STATE.getPreferredName(), taskState.value());
builder.field(INDEXER_STATE.getPreferredName(), indexerState.value());
if (currentPosition != null) {
builder.field(CURRENT_POSITION.getPreferredName(), currentPosition);
}
builder.field(GENERATION.getPreferredName(), generation);
if (reason != null) {
builder.field(REASON.getPreferredName(), reason);
}
builder.endObject();
return builder;
}
@ -118,12 +155,14 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
@Override
public void writeTo(StreamOutput out) throws IOException {
state.writeTo(out);
taskState.writeTo(out);
indexerState.writeTo(out);
out.writeBoolean(currentPosition != null);
if (currentPosition != null) {
out.writeMap(currentPosition);
}
out.writeLong(generation);
out.writeOptionalString(reason);
}
@Override
@ -138,12 +177,20 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
DataFrameTransformState that = (DataFrameTransformState) other;
return Objects.equals(this.state, that.state) && Objects.equals(this.currentPosition, that.currentPosition)
&& this.generation == that.generation;
return Objects.equals(this.taskState, that.taskState) &&
Objects.equals(this.indexerState, that.indexerState) &&
Objects.equals(this.currentPosition, that.currentPosition) &&
this.generation == that.generation &&
Objects.equals(this.reason, that.reason);
}
@Override
public int hashCode() {
return Objects.hash(state, currentPosition, generation);
return Objects.hash(taskState, indexerState, currentPosition, generation, reason);
}
}
@Override
public String toString() {
return Strings.toString(this);
}
}

View File

@ -41,7 +41,7 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
public static DataFrameTransformStateAndStats initialStateAndStats(String id) {
return new DataFrameTransformStateAndStats(id,
new DataFrameTransformState(IndexerState.STOPPED, null, 0),
new DataFrameTransformState(DataFrameTransformTaskState.STOPPED, IndexerState.STOPPED, null, 0L, null),
new DataFrameIndexerTransformStats());
}

View File

@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
import java.util.Locale;
public enum DataFrameTransformTaskState implements Writeable {
STOPPED, STARTED, FAILED;
public static DataFrameTransformTaskState fromString(String name) {
return valueOf(name.trim().toUpperCase(Locale.ROOT));
}
public static DataFrameTransformTaskState fromStream(StreamInput in) throws IOException {
return in.readEnum(DataFrameTransformTaskState.class);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
DataFrameTransformTaskState state = this;
out.writeEnum(state);
}
public String value() {
return name().toLowerCase(Locale.ROOT);
}
}

View File

@ -13,7 +13,7 @@ import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformActi
public class StartDataFrameTransformActionRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomAlphaOfLengthBetween(1, 20));
return new Request(randomAlphaOfLengthBetween(1, 20), randomBoolean());
}
@Override

View File

@ -16,7 +16,7 @@ public class StopDataFrameTransformActionRequestTests extends AbstractWireSerial
@Override
protected Request createTestInstance() {
TimeValue timeout = randomBoolean() ? TimeValue.timeValueMinutes(randomIntBetween(1, 10)) : null;
return new Request(randomAlphaOfLengthBetween(1, 10), randomBoolean(), timeout);
return new Request(randomAlphaOfLengthBetween(1, 10), randomBoolean(), randomBoolean(), timeout);
}
@Override
@ -27,9 +27,10 @@ public class StopDataFrameTransformActionRequestTests extends AbstractWireSerial
public void testSameButDifferentTimeout() {
String id = randomAlphaOfLengthBetween(1, 10);
boolean waitForCompletion = randomBoolean();
boolean force = randomBoolean();
Request r1 = new Request(id, waitForCompletion, TimeValue.timeValueSeconds(10));
Request r2 = new Request(id, waitForCompletion, TimeValue.timeValueSeconds(20));
Request r1 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(10));
Request r2 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(20));
assertNotEquals(r1,r2);
assertNotEquals(r1.hashCode(),r2.hashCode());

View File

@ -14,11 +14,16 @@ import org.elasticsearch.xpack.core.indexing.IndexerState;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Predicate;
public class DataFrameTransformStateTests extends AbstractSerializingTestCase<DataFrameTransformState> {
public static DataFrameTransformState randomDataFrameTransformState() {
return new DataFrameTransformState(randomFrom(IndexerState.values()), randomPosition(), randomLongBetween(0,10));
return new DataFrameTransformState(randomFrom(DataFrameTransformTaskState.values()),
randomFrom(IndexerState.values()),
randomPosition(),
randomLongBetween(0,10),
randomBoolean() ? null : randomAlphaOfLength(10));
}
@Override
@ -53,4 +58,14 @@ public class DataFrameTransformStateTests extends AbstractSerializingTestCase<Da
}
return position;
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
@Override
protected Predicate<String> getRandomFieldsExcludeFilter() {
return field -> !field.isEmpty();
}
}

View File

@ -0,0 +1,81 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
public class DataFrameTransformTaskStateTests extends ESTestCase {
public void testValidOrdinals() {
assertThat(DataFrameTransformTaskState.STOPPED.ordinal(), equalTo(0));
assertThat(DataFrameTransformTaskState.STARTED.ordinal(), equalTo(1));
assertThat(DataFrameTransformTaskState.FAILED.ordinal(), equalTo(2));
}
public void testwriteTo() throws Exception {
try (BytesStreamOutput out = new BytesStreamOutput()) {
DataFrameTransformTaskState.STOPPED.writeTo(out);
try (StreamInput in = out.bytes().streamInput()) {
assertThat(in.readVInt(), equalTo(0));
}
}
try (BytesStreamOutput out = new BytesStreamOutput()) {
DataFrameTransformTaskState.STARTED.writeTo(out);
try (StreamInput in = out.bytes().streamInput()) {
assertThat(in.readVInt(), equalTo(1));
}
}
try (BytesStreamOutput out = new BytesStreamOutput()) {
DataFrameTransformTaskState.FAILED.writeTo(out);
try (StreamInput in = out.bytes().streamInput()) {
assertThat(in.readVInt(), equalTo(2));
}
}
}
public void testReadFrom() throws Exception {
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeVInt(0);
try (StreamInput in = out.bytes().streamInput()) {
assertThat(DataFrameTransformTaskState.fromStream(in), equalTo(DataFrameTransformTaskState.STOPPED));
}
}
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeVInt(1);
try (StreamInput in = out.bytes().streamInput()) {
assertThat(DataFrameTransformTaskState.fromStream(in), equalTo(DataFrameTransformTaskState.STARTED));
}
}
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeVInt(2);
try (StreamInput in = out.bytes().streamInput()) {
assertThat(DataFrameTransformTaskState.fromStream(in), equalTo(DataFrameTransformTaskState.FAILED));
}
}
}
public void testInvalidReadFrom() throws Exception {
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeVInt(randomIntBetween(3, Integer.MAX_VALUE));
try (StreamInput in = out.bytes().streamInput()) {
DataFrameTransformTaskState.fromStream(in);
fail("Expected IOException");
} catch(IOException e) {
assertThat(e.getMessage(), containsString("Unknown DataFrameTransformTaskState ordinal ["));
}
}
}
}

View File

@ -167,17 +167,34 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
assertTrue(indexExists(dataFrameIndex));
}
protected void startDataframeTransform(String transformId, boolean force) throws IOException {
startDataframeTransform(transformId, force, null);
}
protected void startDataframeTransform(String transformId, boolean force, String authHeader) throws IOException {
// start the transform
final Request startTransformRequest = createRequestWithAuth("POST", DATAFRAME_ENDPOINT + transformId + "/_start", authHeader);
startTransformRequest.addParameter(DataFrameField.FORCE.getPreferredName(), Boolean.toString(force));
Map<String, Object> startTransformResponse = entityAsMap(client().performRequest(startTransformRequest));
assertThat(startTransformResponse.get("started"), equalTo(Boolean.TRUE));
}
protected void stopDataFrameTransform(String transformId, boolean force) throws Exception {
// start the transform
final Request stopTransformRequest = createRequestWithAuth("POST", DATAFRAME_ENDPOINT + transformId + "/_stop", null);
stopTransformRequest.addParameter(DataFrameField.FORCE.getPreferredName(), Boolean.toString(force));
stopTransformRequest.addParameter(DataFrameField.WAIT_FOR_COMPLETION.getPreferredName(), Boolean.toString(true));
Map<String, Object> stopTransformResponse = entityAsMap(client().performRequest(stopTransformRequest));
assertThat(stopTransformResponse.get("stopped"), equalTo(Boolean.TRUE));
}
protected void startAndWaitForTransform(String transformId, String dataFrameIndex) throws Exception {
startAndWaitForTransform(transformId, dataFrameIndex, null);
}
protected void startAndWaitForTransform(String transformId, String dataFrameIndex, String authHeader) throws Exception {
// start the transform
final Request startTransformRequest = createRequestWithAuth("POST", DATAFRAME_ENDPOINT + transformId + "/_start", authHeader);
Map<String, Object> startTransformResponse = entityAsMap(client().performRequest(startTransformRequest));
assertThat(startTransformResponse.get("started"), equalTo(Boolean.TRUE));
startDataframeTransform(transformId, false, authHeader);
// wait until the dataframe has been created and all data is available
waitForDataFrameGeneration(transformId);
refreshIndex(dataFrameIndex);
@ -216,13 +233,29 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
}
protected static String getDataFrameIndexerState(String transformId) throws IOException {
Map<?, ?> transformStatsAsMap = getDataFrameState(transformId);
return transformStatsAsMap == null ? null :
(String) XContentMapValues.extractValue("state.indexer_state", transformStatsAsMap);
}
protected static String getDataFrameTaskState(String transformId) throws IOException {
Map<?, ?> transformStatsAsMap = getDataFrameState(transformId);
return transformStatsAsMap == null ? null : (String) XContentMapValues.extractValue("state.task_state", transformStatsAsMap);
}
protected static Map<?, ?> getDataFrameState(String transformId) throws IOException {
Response statsResponse = client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + transformId + "/_stats"));
List<?> transforms = ((List<?>) entityAsMap(statsResponse).get("transforms"));
if (transforms.isEmpty()) {
return null;
}
Map<?, ?> transformStatsAsMap = (Map<?, ?>) transforms.get(0);
return (String) XContentMapValues.extractValue("state.transform_state", transformStatsAsMap);
return (Map<?, ?>) transforms.get(0);
}
protected static void deleteDataFrameTransform(String transformId) throws IOException {
Request request = new Request("DELETE", DATAFRAME_ENDPOINT + transformId);
request.addParameter("ignore", "404"); // Ignore 404s because they imply someone was racing us to delete this
adminClient().performRequest(request);
}
@AfterClass
@ -251,9 +284,7 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
for (Map<String, Object> transformConfig : transformConfigs) {
String transformId = (String) transformConfig.get("id");
Request request = new Request("DELETE", DATAFRAME_ENDPOINT + transformId);
request.addParameter("ignore", "404"); // Ignore 404s because they imply someone was racing us to delete this
adminClient().performRequest(request);
deleteDataFrameTransform(transformId);
}
// transforms should be all gone

View File

@ -0,0 +1,94 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.integration;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.equalTo;
public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
public void testFailureStateInteraction() throws Exception {
createReviewsIndex();
String transformId = "failure_pivot_1";
String dataFrameIndex = "failure_pivot_reviews";
createPivotReviewsTransform(transformId, dataFrameIndex, null);
deleteIndex(REVIEWS_INDEX_NAME); // trigger start failure due to index missing
startDataframeTransform(transformId, false);
awaitState(transformId, DataFrameTransformTaskState.FAILED);
Map<?, ?> fullState = getDataFrameState(transformId);
// Verify we have failed for the expected reason
assertThat(XContentMapValues.extractValue("state.reason", fullState),
equalTo("task encountered irrecoverable failure: no such index [reviews]"));
assertThat(XContentMapValues.extractValue("state.indexer_state", fullState), equalTo("started"));
// Verify that we cannot stop or start the transform when the task is in a failed state
ResponseException ex = expectThrows(ResponseException.class, () -> stopDataFrameTransform(transformId, false));
assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
equalTo("Unable to stop data frame transform [failure_pivot_1] as it is in a failed state with reason: [" +
"task encountered irrecoverable failure: no such index [reviews]]. Use force stop to stop the data frame transform."));
ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId, false));
assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
equalTo("Unable to start data frame transform [failure_pivot_1] as it is in a failed state with failure: [" +
"task encountered irrecoverable failure: no such index [reviews]]. " +
"Use force start to restart data frame transform once error is resolved."));
// Correct the failure by creating the reviews index again
createReviewsIndex();
// Force start the data frame to indicate failure correction
startDataframeTransform(transformId, true);
// Wait for data to be indexed appropriately and refresh for search
waitForDataFrameGeneration(transformId);
refreshIndex(dataFrameIndex);
// Verify that we have started and that our reason is cleared
fullState = getDataFrameState(transformId);
assertThat(XContentMapValues.extractValue("state.reason", fullState), is(nullValue()));
assertThat(XContentMapValues.extractValue("state.task_state", fullState), equalTo("started"));
assertThat(XContentMapValues.extractValue("state.indexer_state", fullState), equalTo("started"));
assertThat(XContentMapValues.extractValue("stats.search_failures", fullState), equalTo(1));
// get and check some users to verify we restarted
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_0", 3.776978417);
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_5", 3.72);
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_11", 3.846153846);
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_20", 3.769230769);
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_26", 3.918918918);
stopDataFrameTransform(transformId, true);
deleteDataFrameTransform(transformId);
}
private void awaitState(String transformId, DataFrameTransformTaskState state) throws Exception {
assertBusy(() -> {
String currentState = getDataFrameTaskState(transformId);
assertThat(state.value(), equalTo(currentState));
});
}
private void assertOnePivotValue(String query, double expected) throws IOException {
Map<String, Object> searchResult = getAsMap(query);
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
double actual = (Double) ((List<?>) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0);
assertEquals(expected, actual, 0.000001);
}
}

View File

@ -33,6 +33,8 @@ import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformActi
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import java.util.Collection;
@ -94,11 +96,7 @@ public class TransportStartDataFrameTransformAction extends
new StartDataFrameTransformTaskAction.Request(request.getId()),
ActionListener.wrap(
r -> listener.onResponse(new StartDataFrameTransformAction.Response(true)),
startingFailure -> cancelDataFrameTask(task.getId(),
transformTask.getId(),
startingFailure,
listener::onFailure)
)),
listener::onFailure)),
listener::onFailure));
},
listener::onFailure
@ -122,7 +120,21 @@ public class TransportStartDataFrameTransformAction extends
transformTask,
persistentTaskActionListener);
} else {
persistentTaskActionListener.onResponse(existingTask);
DataFrameTransformState transformState = (DataFrameTransformState)existingTask.getState();
if(transformState.getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) {
listener.onFailure(new ElasticsearchStatusException(
"Unable to start data frame transform [" + config.getId() +
"] as it is in a failed state with failure: [" + transformState.getReason() +
"]. Use force start to restart data frame transform once error is resolved.",
RestStatus.CONFLICT));
} else if (transformState.getTaskState() != DataFrameTransformTaskState.STOPPED &&
transformState.getTaskState() != DataFrameTransformTaskState.FAILED) {
listener.onFailure(new ElasticsearchStatusException(
"Unable to start data frame transform [" + config.getId() +
"] as it is in state [" + transformState.getTaskState() + "]", RestStatus.CONFLICT));
} else {
persistentTaskActionListener.onResponse(existingTask);
}
}
},
listener::onFailure

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.dataframe.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
@ -15,11 +16,13 @@ import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
@ -60,6 +63,14 @@ public class TransportStopDataFrameTransformAction extends
protected void taskOperation(StopDataFrameTransformAction.Request request, DataFrameTransformTask transformTask,
ActionListener<StopDataFrameTransformAction.Response> listener) {
if (transformTask.getTransformId().equals(request.getId())) {
if (transformTask.getState().getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) {
listener.onFailure(
new ElasticsearchStatusException("Unable to stop data frame transform [" + request.getId()
+ "] as it is in a failed state with reason: [" + transformTask.getState().getReason() +
"]. Use force stop to stop the data frame transform.",
RestStatus.CONFLICT));
return;
}
if (request.waitForCompletion() == false) {
transformTask.stop(listener);
} else {

View File

@ -28,7 +28,8 @@ public class RestStartDataFrameTransformAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String id = restRequest.param(DataFrameField.ID.getPreferredName());
StartDataFrameTransformAction.Request request = new StartDataFrameTransformAction.Request(id);
boolean force = restRequest.paramAsBoolean(DataFrameField.FORCE.getPreferredName(), false);
StartDataFrameTransformAction.Request request = new StartDataFrameTransformAction.Request(id, force);
request.timeout(restRequest.paramAsTime(DataFrameField.TIMEOUT.getPreferredName(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT));
return channel -> client.execute(StartDataFrameTransformAction.INSTANCE, request, new RestToXContentListener<>(channel));
}

View File

@ -30,8 +30,9 @@ public class RestStopDataFrameTransformAction extends BaseRestHandler {
TimeValue timeout = restRequest.paramAsTime(DataFrameField.TIMEOUT.getPreferredName(),
StopDataFrameTransformAction.DEFAULT_TIMEOUT);
boolean waitForCompletion = restRequest.paramAsBoolean(DataFrameField.WAIT_FOR_COMPLETION.getPreferredName(), false);
boolean force = restRequest.paramAsBoolean(DataFrameField.FORCE.getPreferredName(), false);
StopDataFrameTransformAction.Request request = new StopDataFrameTransformAction.Request(id, waitForCompletion, timeout);
StopDataFrameTransformAction.Request request = new StopDataFrameTransformAction.Request(id, waitForCompletion, force, timeout);
return channel -> client.execute(StopDataFrameTransformAction.INSTANCE, request, new RestToXContentListener<>(channel));
}

View File

@ -21,6 +21,7 @@ import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.dataframe.DataFrame;
import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
@ -60,6 +61,11 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(
DataFrameTransformTask.SCHEDULE_NAME + "_" + params.getId(), next());
DataFrameTransformState transformState = (DataFrameTransformState) state;
if (transformState != null && transformState.getTaskState() == DataFrameTransformTaskState.FAILED) {
logger.warn("Tried to start failed transform [" + params.getId() + "] failure reason: " + transformState.getReason());
return;
}
// Note that while the task is added to the scheduler here, the internal state will prevent
// it from doing any work until the task is "started" via the StartTransform api
schedulerEngine.register(buildTask);

View File

@ -18,7 +18,9 @@ import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
@ -33,6 +35,7 @@ import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformActio
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event;
@ -40,14 +43,20 @@ import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpoin
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class DataFrameTransformTask extends AllocatedPersistentTask implements SchedulerEngine.Listener {
private static final Logger logger = LogManager.getLogger(DataFrameTransformTask.class);
// TODO consider moving to dynamic cluster setting
private static final int MAX_CONTINUOUS_FAILURES = 10;
private static final IndexerState[] RUNNING_STATES = new IndexerState[]{IndexerState.STARTED, IndexerState.INDEXING};
public static final String SCHEDULE_NAME = DataFrameField.TASK_NAME + "/schedule";
private final DataFrameTransform transform;
@ -56,10 +65,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
private final DataFrameIndexer indexer;
private final Auditor<DataFrameAuditMessage> auditor;
private final AtomicReference<DataFrameTransformTaskState> taskState;
private final AtomicReference<String> stateReason;
// the generation of this data frame, for v1 there will be only
// 0: data frame not created or still indexing
// 1: data frame complete, all data has been indexed
private final AtomicReference<Long> generation;
private final AtomicInteger failureCount;
public DataFrameTransformTask(long id, String type, String action, TaskId parentTask, DataFrameTransform transform,
DataFrameTransformState state, Client client, DataFrameTransformsConfigManager transformsConfigManager,
@ -72,10 +84,14 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
this.threadPool = threadPool;
this.auditor = auditor;
IndexerState initialState = IndexerState.STOPPED;
DataFrameTransformTaskState initialTaskState = DataFrameTransformTaskState.STOPPED;
String initialReason = null;
long initialGeneration = 0;
Map<String, Object> initialPosition = null;
logger.info("[{}] init, got state: [{}]", transform.getId(), state != null);
if (state != null) {
initialTaskState = state.getTaskState();
initialReason = state.getReason();
final IndexerState existingState = state.getIndexerState();
logger.info("[{}] Loading existing state: [{}], position [{}]", transform.getId(), existingState, state.getPosition());
if (existingState.equals(IndexerState.INDEXING)) {
@ -93,7 +109,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
this.indexer = new ClientDataFrameIndexer(transform.getId(), transformsConfigManager, transformsCheckpointService,
new AtomicReference<>(initialState), initialPosition, client, auditor);
this.generation = new AtomicReference<Long>(initialGeneration);
this.generation = new AtomicReference<>(initialGeneration);
this.taskState = new AtomicReference<>(initialTaskState);
this.stateReason = new AtomicReference<>(initialReason);
this.failureCount = new AtomicInteger(0);
}
public String getTransformId() {
@ -109,7 +128,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
}
public DataFrameTransformState getState() {
return new DataFrameTransformState(indexer.getState(), indexer.getPosition(), generation.get());
return new DataFrameTransformState(taskState.get(), indexer.getState(), indexer.getPosition(), generation.get(), stateReason.get());
}
public DataFrameIndexerTransformStats getStats() {
@ -125,66 +144,71 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
}
public synchronized void start(ActionListener<Response> listener) {
final IndexerState prevState = indexer.getState();
if (prevState != IndexerState.STOPPED) {
// fails if the task is not STOPPED
listener.onFailure(new ElasticsearchException("Cannot start task for data frame transform [{}], because state was [{}]",
transform.getId(), prevState));
return;
}
final IndexerState newState = indexer.start();
if (newState != IndexerState.STARTED) {
if (Arrays.stream(RUNNING_STATES).noneMatch(newState::equals)) {
listener.onFailure(new ElasticsearchException("Cannot start task for data frame transform [{}], because state was [{}]",
transform.getId(), newState));
return;
}
stateReason.set(null);
taskState.set(DataFrameTransformTaskState.STARTED);
failureCount.set(0);
final DataFrameTransformState state = new DataFrameTransformState(IndexerState.STOPPED, indexer.getPosition(), generation.get());
final DataFrameTransformState state = new DataFrameTransformState(
DataFrameTransformTaskState.STARTED,
IndexerState.STOPPED,
indexer.getPosition(),
generation.get(),
null);
logger.debug("Updating state for data frame transform [{}] to [{}][{}]", transform.getId(), state.getIndexerState(),
state.getPosition());
updatePersistentTaskState(state,
ActionListener.wrap(
(task) -> {
auditor.info(transform.getId(), "Updated state to [" + state.getIndexerState() + "]");
logger.debug("Successfully updated state for data frame transform [" + transform.getId() + "] to ["
+ state.getIndexerState() + "][" + state.getPosition() + "]");
listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));
}, (exc) -> {
// We were unable to update the persistent status, so we need to shutdown the indexer too.
indexer.stop();
listener.onFailure(new ElasticsearchException("Error while updating state for data frame transform ["
logger.info("Updating state for data frame transform [{}] to [{}]", transform.getId(), state.toString());
persistStateToClusterState(state, ActionListener.wrap(
task -> {
auditor.info(transform.getId(), "Updated state to [" + state.getTaskState() + "]");
listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));
},
exc -> {
indexer.stop();
listener.onFailure(new ElasticsearchException("Error while updating state for data frame transform ["
+ transform.getId() + "] to [" + state.getIndexerState() + "].", exc));
})
);
}
));
}
public synchronized void stop(ActionListener<StopDataFrameTransformAction.Response> listener) {
// taskState is initialized as STOPPED and is updated in tandem with the indexerState
// Consequently, if it is STOPPED, we consider the whole task STOPPED.
if (taskState.get() == DataFrameTransformTaskState.STOPPED) {
listener.onResponse(new StopDataFrameTransformAction.Response(true));
return;
}
final IndexerState newState = indexer.stop();
switch (newState) {
case STOPPED:
listener.onResponse(new StopDataFrameTransformAction.Response(true));
break;
// Fall through to `STOPPING` as the behavior is the same for both, we should persist for both
case STOPPING:
// update the persistent state to STOPPED. There are two scenarios and both are safe:
// 1. we persist STOPPED now, indexer continues a bit then sees the flag and checkpoints another STOPPED with the more recent
// position.
// 2. we persist STOPPED now, indexer continues a bit but then dies. When/if we resume we'll pick up at last checkpoint,
// overwrite some docs and eventually checkpoint.
DataFrameTransformState state = new DataFrameTransformState(IndexerState.STOPPED, indexer.getPosition(), generation.get());
updatePersistentTaskState(state, ActionListener.wrap((task) -> {
auditor.info(transform.getId(), "Updated state to [" + state.getIndexerState() + "]");
logger.debug("Successfully updated state for data frame transform [{}] to [{}]", transform.getId(),
state.getIndexerState());
listener.onResponse(new StopDataFrameTransformAction.Response(true));
}, (exc) -> {
listener.onFailure(new ElasticsearchException("Error while updating state for data frame transform [{}] to [{}]", exc,
transform.getId(), state.getIndexerState()));
}));
taskState.set(DataFrameTransformTaskState.STOPPED);
DataFrameTransformState state = new DataFrameTransformState(
DataFrameTransformTaskState.STOPPED,
IndexerState.STOPPED,
indexer.getPosition(),
generation.get(),
stateReason.get());
persistStateToClusterState(state, ActionListener.wrap(
task -> {
auditor.info(transform.getId(), "Updated state to [" + state.getTaskState() + "]");
listener.onResponse(new StopDataFrameTransformAction.Response(true));
},
exc -> listener.onFailure(new ElasticsearchException(
"Error while updating state for data frame transform [{}] to [{}]", exc,
transform.getId(),
state.getIndexerState()))));
break;
default:
listener.onFailure(new ElasticsearchException("Cannot stop task for data frame transform [{}], because state was [{}]",
transform.getId(), newState));
@ -217,6 +241,40 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
markAsCompleted();
}
void persistStateToClusterState(DataFrameTransformState state,
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
updatePersistentTaskState(state, ActionListener.wrap(
success -> {
logger.debug("Successfully updated state for data frame transform [{}] to [{}]", transform.getId(), state.toString());
listener.onResponse(success);
},
failure -> {
auditor.warning(transform.getId(), "Failed to persist to state to cluster state: " + failure.getMessage());
logger.error("Failed to update state for data frame transform [" + transform.getId() + "]", failure);
listener.onFailure(failure);
}
));
}
private boolean isIrrecoverableFailure(Exception e) {
return e instanceof IndexNotFoundException || e instanceof DataFrameConfigurationException;
}
synchronized void handleFailure(Exception e) {
if (isIrrecoverableFailure(e) || failureCount.incrementAndGet() > MAX_CONTINUOUS_FAILURES) {
String failureMessage = isIrrecoverableFailure(e) ?
"task encountered irrecoverable failure: " + e.getMessage() :
"task encountered more than " + MAX_CONTINUOUS_FAILURES + " failures; latest failure: " + e.getMessage();
auditor.error(transform.getId(), failureMessage);
stateReason.set(failureMessage);
taskState.set(DataFrameTransformTaskState.FAILED);
persistStateToClusterState(getState(), ActionListener.wrap(
r -> failureCount.set(0), // Successfully marked as failed, reset counter so that task can be restarted
exception -> {} // Noop, internal method logs the failure to update the state
));
}
}
/**
* This is called when the persistent task signals that the allocated task should be terminated.
* Termination in the task framework is essentially voluntary, as the allocated task can only be
@ -239,6 +297,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
private final DataFrameTransformsCheckpointService transformsCheckpointService;
private final String transformId;
private final Auditor<DataFrameAuditMessage> auditor;
// Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index
private volatile String lastAuditedExceptionMessage = null;
private Map<String, String> fieldMappings = null;
private DataFrameTransformConfig transformConfig = null;
@ -272,12 +332,17 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
@Override
public synchronized boolean maybeTriggerAsyncJob(long now) {
if (taskState.get() == DataFrameTransformTaskState.FAILED) {
logger.debug("Schedule was triggered for transform [" + getJobId() + "] but task is failed. Ignoring trigger.");
return false;
}
if (transformConfig == null) {
CountDownLatch latch = new CountDownLatch(1);
transformsConfigManager.getTransformConfiguration(transformId, new LatchedActionListener<>(ActionListener.wrap(config -> {
transformConfig = config;
}, e -> {
transformsConfigManager.getTransformConfiguration(transformId, new LatchedActionListener<>(ActionListener.wrap(
config -> transformConfig = config,
e -> {
throw new RuntimeException(
DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_LOAD_TRANSFORM_CONFIGURATION, transformId), e);
}), latch));
@ -290,11 +355,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
}
}
// todo: set job into failed state
if (transformConfig.isValid() == false) {
auditor.error(transformId, "Cannot execute data frame transform as configuration is invalid");
throw new RuntimeException(
DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID, transformId));
DataFrameConfigurationException exception = new DataFrameConfigurationException(transformId);
handleFailure(exception);
throw exception;
}
if (fieldMappings == null) {
@ -341,24 +405,36 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
return;
}
if(indexerState.equals(IndexerState.STARTED)) {
// if the indexer resets the state to started, it means it is done, so increment the generation
if(indexerState.equals(IndexerState.STARTED) && getStats().getNumDocuments() > 0) {
// if the indexer resets the state to started, it means it is done with a run through the data.
// But, if there were no documents, we should allow it to attempt to gather more again, as there is no risk of overwriting
// Some reasons for no documents are (but is not limited to):
// * Could have failed early on search or index
// * Have an empty index
// * Have a query that returns no documents
generation.compareAndSet(0L, 1L);
}
final DataFrameTransformState state = new DataFrameTransformState(indexerState, getPosition(), generation.get());
final DataFrameTransformState state = new DataFrameTransformState(
taskState.get(),
indexerState,
getPosition(),
generation.get(),
stateReason.get());
logger.info("Updating persistent state of transform [" + transform.getId() + "] to [" + state.toString() + "]");
updatePersistentTaskState(state, ActionListener.wrap(task -> next.run(), exc -> {
logger.error("Updating persistent state of transform [" + transform.getId() + "] failed", exc);
next.run();
}));
persistStateToClusterState(state, ActionListener.wrap(t -> next.run(), e -> next.run()));
}
@Override
protected void onFailure(Exception exc) {
auditor.error(transform.getId(), "Data frame transform failed with an exception: " + exc.getMessage());
logger.warn("Data frame transform [" + transform.getId() + "] failed with an exception: ", exc);
// Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
// times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
if (exc.getMessage().equals(lastAuditedExceptionMessage) == false) {
auditor.warning(transform.getId(), "Data frame transform encountered an exception: " + exc.getMessage());
lastAuditedExceptionMessage = exc.getMessage();
}
logger.warn("Data frame transform [" + transform.getId() + "] encountered an exception: ", exc);
handleFailure(exc);
}
@Override
@ -374,4 +450,12 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
shutdown();
}
}
class DataFrameConfigurationException extends RuntimeException {
DataFrameConfigurationException(String transformId) {
super(DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID, transformId));
}
}
}

View File

@ -59,7 +59,7 @@ teardown:
- match: { started: true }
- do:
catch: /Cannot start task for data frame transform \[airline-transform-start-stop\], because state was \[STARTED\]/
catch: /Unable to start data frame transform \[airline-transform-start-stop\] as it is in state \[STARTED\]/
data_frame.start_data_frame_transform:
transform_id: "airline-transform-start-stop"
@ -75,7 +75,8 @@ teardown:
transform_id: "airline-transform-start-stop"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-stop" }
- match: { transforms.0.state.transform_state: "started" }
- match: { transforms.0.state.indexer_state: "started" }
- match: { transforms.0.state.task_state: "started" }
- do:
data_frame.stop_data_frame_transform:
@ -87,7 +88,8 @@ teardown:
transform_id: "airline-transform-start-stop"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-stop" }
- match: { transforms.0.state.transform_state: "stopped" }
- match: { transforms.0.state.indexer_state: "stopped" }
- match: { transforms.0.state.task_state: "stopped" }
- do:
data_frame.start_data_frame_transform:
@ -99,7 +101,8 @@ teardown:
transform_id: "airline-transform-start-stop"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-stop" }
- match: { transforms.0.state.transform_state: "started" }
- match: { transforms.0.state.indexer_state: "started" }
- match: { transforms.0.state.task_state: "started" }
---
"Test stop missing transform":
@ -114,3 +117,4 @@ teardown:
data_frame.stop_data_frame_transform:
transform_id: "airline-transform-start-stop"
- match: { stopped: true }

View File

@ -46,7 +46,8 @@ teardown:
transform_id: "airline-transform-stats"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-stats" }
- match: { transforms.0.state.transform_state: "started" }
- match: { transforms.0.state.indexer_state: "started" }
- match: { transforms.0.state.task_state: "started" }
- match: { transforms.0.state.generation: 0 }
- match: { transforms.0.stats.pages_processed: 0 }
- match: { transforms.0.stats.documents_processed: 0 }
@ -124,18 +125,18 @@ teardown:
transform_id: "*"
- match: { count: 2 }
- match: { transforms.0.id: "airline-transform-stats" }
- match: { transforms.0.state.transform_state: "started" }
- match: { transforms.0.state.indexer_state: "started" }
- match: { transforms.1.id: "airline-transform-stats-dos" }
- match: { transforms.1.state.transform_state: "stopped" }
- match: { transforms.1.state.indexer_state: "stopped" }
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "_all"
- match: { count: 2 }
- match: { transforms.0.id: "airline-transform-stats" }
- match: { transforms.0.state.transform_state: "started" }
- match: { transforms.0.state.indexer_state: "started" }
- match: { transforms.1.id: "airline-transform-stats-dos" }
- match: { transforms.1.state.transform_state: "stopped" }
- match: { transforms.1.state.indexer_state: "stopped" }
---
"Test get single transform stats when it does not have a task":
@ -157,7 +158,7 @@ teardown:
transform_id: "airline-transform-stats-dos"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-stats-dos" }
- match: { transforms.0.state.transform_state: "stopped" }
- match: { transforms.0.state.indexer_state: "stopped" }
- match: { transforms.0.state.generation: 0 }
- match: { transforms.0.stats.pages_processed: 0 }
- match: { transforms.0.stats.documents_processed: 0 }