[ML] Adds progress reporting for transforms (#41278) (#41529)

* [ML] Adds progress reporting for transforms

* fixing after master merge

* Addressing PR comments

* removing unused imports

* Adjusting afterKey handling and percentage to be 100*

* Making sure it is a linked hashmap for serialization

* removing unused import

* addressing PR comments

* removing unused import

* simplifying code, only storing total docs and decrementing

* adjusting for rewrite

* removing initial progress gathering from executor
This commit is contained in:
Benjamin Trent 2019-04-25 11:23:12 -05:00 committed by GitHub
parent 6184efaff6
commit 08843ba62b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1285 additions and 265 deletions

View File

@ -0,0 +1,94 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe.transforms;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class DataFrameTransformProgress {
public static final ParseField TOTAL_DOCS = new ParseField("total_docs");
public static final ParseField DOCS_REMAINING = new ParseField("docs_remaining");
public static final ParseField PERCENT_COMPLETE = new ParseField("percent_complete");
public static final ConstructingObjectParser<DataFrameTransformProgress, Void> PARSER = new ConstructingObjectParser<>(
"data_frame_transform_progress",
true,
a -> new DataFrameTransformProgress((Long) a[0], (Long)a[1], (Double)a[2]));
static {
PARSER.declareLong(constructorArg(), TOTAL_DOCS);
PARSER.declareLong(optionalConstructorArg(), DOCS_REMAINING);
PARSER.declareDouble(optionalConstructorArg(), PERCENT_COMPLETE);
}
public static DataFrameTransformProgress fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
private final long totalDocs;
private final long remainingDocs;
private final double percentComplete;
public DataFrameTransformProgress(long totalDocs, Long remainingDocs, double percentComplete) {
this.totalDocs = totalDocs;
this.remainingDocs = remainingDocs == null ? totalDocs : remainingDocs;
this.percentComplete = percentComplete;
}
public double getPercentComplete() {
return percentComplete;
}
public long getTotalDocs() {
return totalDocs;
}
public long getRemainingDocs() {
return remainingDocs;
}
@Override
public boolean equals(Object other) {
if (other == this) {
return true;
}
if (other == null || other.getClass() != getClass()) {
return false;
}
DataFrameTransformProgress that = (DataFrameTransformProgress) other;
return Objects.equals(this.remainingDocs, that.remainingDocs)
&& Objects.equals(this.totalDocs, that.totalDocs)
&& Objects.equals(this.percentComplete, that.percentComplete);
}
@Override
public int hashCode(){
return Objects.hash(remainingDocs, totalDocs, percentComplete);
}
}

View File

@ -23,16 +23,14 @@ import org.elasticsearch.client.core.IndexerState;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
@ -44,33 +42,25 @@ public class DataFrameTransformState {
private static final ParseField CURRENT_POSITION = new ParseField("current_position");
private static final ParseField CHECKPOINT = new ParseField("checkpoint");
private static final ParseField REASON = new ParseField("reason");
private static final ParseField PROGRESS = new ParseField("progress");
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<DataFrameTransformState, Void> PARSER =
new ConstructingObjectParser<>("data_frame_transform_state", true,
args -> new DataFrameTransformState((DataFrameTransformTaskState) args[0],
(IndexerState) args[1],
(HashMap<String, Object>) args[2],
(Map<String, Object>) args[2],
(long) args[3],
(String) args[4]));
(String) args[4],
(DataFrameTransformProgress) args[5]));
static {
PARSER.declareField(constructorArg(),
p -> DataFrameTransformTaskState.fromString(p.text()),
TASK_STATE,
ObjectParser.ValueType.STRING);
PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE, ObjectParser.ValueType.STRING);
PARSER.declareField(optionalConstructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.START_OBJECT) {
return p.map();
}
if (p.currentToken() == XContentParser.Token.VALUE_NULL) {
return null;
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, CURRENT_POSITION, ObjectParser.ValueType.VALUE_OBJECT_ARRAY);
PARSER.declareField(constructorArg(), p -> DataFrameTransformTaskState.fromString(p.text()), TASK_STATE, ValueType.STRING);
PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE, ValueType.STRING);
PARSER.declareField(optionalConstructorArg(), (p, c) -> p.mapOrdered(), CURRENT_POSITION, ValueType.OBJECT);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), CHECKPOINT);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REASON);
PARSER.declareField(optionalConstructorArg(), DataFrameTransformProgress::fromXContent, PROGRESS, ValueType.OBJECT);
}
public static DataFrameTransformState fromXContent(XContentParser parser) throws IOException {
@ -80,19 +70,22 @@ public class DataFrameTransformState {
private final DataFrameTransformTaskState taskState;
private final IndexerState indexerState;
private final long checkpoint;
private final SortedMap<String, Object> currentPosition;
private final Map<String, Object> currentPosition;
private final String reason;
private final DataFrameTransformProgress progress;
public DataFrameTransformState(DataFrameTransformTaskState taskState,
IndexerState indexerState,
@Nullable Map<String, Object> position,
long checkpoint,
@Nullable String reason) {
@Nullable String reason,
@Nullable DataFrameTransformProgress progress) {
this.taskState = taskState;
this.indexerState = indexerState;
this.currentPosition = position == null ? null : Collections.unmodifiableSortedMap(new TreeMap<>(position));
this.currentPosition = position == null ? null : Collections.unmodifiableMap(new LinkedHashMap<>(position));
this.checkpoint = checkpoint;
this.reason = reason;
this.progress = progress;
}
public IndexerState getIndexerState() {
@ -117,6 +110,11 @@ public class DataFrameTransformState {
return reason;
}
@Nullable
public DataFrameTransformProgress getProgress() {
return progress;
}
@Override
public boolean equals(Object other) {
if (this == other) {
@ -132,13 +130,14 @@ public class DataFrameTransformState {
return Objects.equals(this.taskState, that.taskState) &&
Objects.equals(this.indexerState, that.indexerState) &&
Objects.equals(this.currentPosition, that.currentPosition) &&
Objects.equals(this.progress, that.progress) &&
this.checkpoint == that.checkpoint &&
Objects.equals(this.reason, that.reason);
}
@Override
public int hashCode() {
return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason);
return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason, progress);
}
}

View File

@ -71,6 +71,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
@ -360,6 +361,10 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
assertEquals(DataFrameTransformTaskState.STARTED, stateAndStats.getTransformState().getTaskState());
assertEquals(null, stateAndStats.getTransformState().getReason());
assertNotEquals(zeroIndexerStats, stateAndStats.getTransformStats());
assertNotNull(stateAndStats.getTransformState().getProgress());
assertThat(stateAndStats.getTransformState().getProgress().getPercentComplete(), equalTo(100.0));
assertThat(stateAndStats.getTransformState().getProgress().getTotalDocs(), greaterThan(0L));
assertThat(stateAndStats.getTransformState().getProgress().getRemainingDocs(), equalTo(0L));
});
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe.transforms;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
public class DataFrameTransformProgressTests extends ESTestCase {
public void testFromXContent() throws IOException {
xContentTester(this::createParser,
DataFrameTransformProgressTests::randomInstance,
DataFrameTransformProgressTests::toXContent,
DataFrameTransformProgress::fromXContent)
.supportsUnknownFields(true)
.randomFieldsExcludeFilter(field -> field.startsWith("state"))
.test();
}
public static DataFrameTransformProgress randomInstance() {
long totalDocs = randomNonNegativeLong();
Long docsRemaining = randomBoolean() ? null : randomLongBetween(0, totalDocs);
double percentComplete = totalDocs == 0 ? 1.0 : docsRemaining == null ? 0.0 : 100.0*(double)(totalDocs - docsRemaining)/totalDocs;
return new DataFrameTransformProgress(totalDocs, docsRemaining, percentComplete);
}
public static void toXContent(DataFrameTransformProgress progress, XContentBuilder builder) throws IOException {
builder.startObject();
builder.field(DataFrameTransformProgress.TOTAL_DOCS.getPreferredName(), progress.getTotalDocs());
builder.field(DataFrameTransformProgress.DOCS_REMAINING.getPreferredName(), progress.getRemainingDocs());
builder.field(DataFrameTransformProgress.PERCENT_COMPLETE.getPreferredName(), progress.getPercentComplete());
builder.endObject();
}
}

View File

@ -24,7 +24,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
@ -46,7 +46,8 @@ public class DataFrameTransformStateTests extends ESTestCase {
randomFrom(IndexerState.values()),
randomPositionMap(),
randomLongBetween(0,10),
randomBoolean() ? null : randomAlphaOfLength(10));
randomBoolean() ? null : randomAlphaOfLength(10),
randomBoolean() ? null : DataFrameTransformProgressTests.randomInstance());
}
public static void toXContent(DataFrameTransformState state, XContentBuilder builder) throws IOException {
@ -60,6 +61,10 @@ public class DataFrameTransformStateTests extends ESTestCase {
if (state.getReason() != null) {
builder.field("reason", state.getReason());
}
if (state.getProgress() != null) {
builder.field("progress");
DataFrameTransformProgressTests.toXContent(state.getProgress(), builder);
}
builder.endObject();
}
@ -68,7 +73,7 @@ public class DataFrameTransformStateTests extends ESTestCase {
return null;
}
int numFields = randomIntBetween(1, 5);
Map<String, Object> position = new HashMap<>();
Map<String, Object> position = new LinkedHashMap<>();
for (int i = 0; i < numFields; i++) {
Object value;
if (randomBoolean()) {

View File

@ -0,0 +1,57 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe.transforms.hlrc;
import org.elasticsearch.client.AbstractResponseTestCase;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress;
import static org.hamcrest.Matchers.equalTo;
public class DataFrameTransformProgressTests extends AbstractResponseTestCase<
DataFrameTransformProgress,
org.elasticsearch.client.dataframe.transforms.DataFrameTransformProgress> {
public static DataFrameTransformProgress fromHlrc(
org.elasticsearch.client.dataframe.transforms.DataFrameTransformProgress instance) {
if (instance == null) {
return null;
}
return new DataFrameTransformProgress(instance.getTotalDocs(), instance.getRemainingDocs());
}
@Override
protected DataFrameTransformProgress createServerTestInstance() {
return DataFrameTransformStateTests.randomDataFrameTransformProgress();
}
@Override
protected org.elasticsearch.client.dataframe.transforms.DataFrameTransformProgress doParseToClientInstance(XContentParser parser) {
return org.elasticsearch.client.dataframe.transforms.DataFrameTransformProgress.fromXContent(parser);
}
@Override
protected void assertInstances(DataFrameTransformProgress serverTestInstance,
org.elasticsearch.client.dataframe.transforms.DataFrameTransformProgress clientInstance) {
assertThat(serverTestInstance.getTotalDocs(), equalTo(clientInstance.getTotalDocs()));
assertThat(serverTestInstance.getRemainingDocs(), equalTo(clientInstance.getRemainingDocs()));
assertThat(serverTestInstance.getPercentComplete(), equalTo(clientInstance.getPercentComplete()));
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.client.AbstractHlrcXContentTestCase;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
@ -40,7 +41,7 @@ public class DataFrameTransformStateTests extends AbstractHlrcXContentTestCase<D
public static DataFrameTransformState fromHlrc(org.elasticsearch.client.dataframe.transforms.DataFrameTransformState instance) {
return new DataFrameTransformState(DataFrameTransformTaskState.fromString(instance.getTaskState().value()),
IndexerState.fromString(instance.getIndexerState().value()), instance.getPosition(), instance.getCheckpoint(),
instance.getReason());
instance.getReason(), DataFrameTransformProgressTests.fromHlrc(instance.getProgress()));
}
@Override
@ -90,6 +91,12 @@ public class DataFrameTransformStateTests extends AbstractHlrcXContentTestCase<D
return new DataFrameTransformCheckpointStats(randomNonNegativeLong(), randomNonNegativeLong());
}
public static DataFrameTransformProgress randomDataFrameTransformProgress() {
long totalDocs = randomNonNegativeLong();
Long remainingDocs = randomBoolean() ? null : randomLongBetween(0, totalDocs);
return new DataFrameTransformProgress(totalDocs, remainingDocs);
}
public static DataFrameIndexerTransformStats randomStats(String transformId) {
return new DataFrameIndexerTransformStats(transformId, randomLongBetween(10L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
@ -102,7 +109,8 @@ public class DataFrameTransformStateTests extends AbstractHlrcXContentTestCase<D
randomFrom(IndexerState.values()),
randomPosition(),
randomLongBetween(0,10),
randomBoolean() ? null : randomAlphaOfLength(10));
randomBoolean() ? null : randomAlphaOfLength(10),
randomBoolean() ? null : randomDataFrameTransformProgress());
}
private static Map<String, Object> randomPosition() {

View File

@ -40,6 +40,7 @@ import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StopDataFrameTransformResponse;
import org.elasticsearch.client.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformProgress;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.client.dataframe.transforms.DestConfig;
@ -472,11 +473,14 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
stateAndStats.getTransformState().getIndexerState(); // <3>
DataFrameIndexerTransformStats transformStats =
stateAndStats.getTransformStats(); // <4>
DataFrameTransformProgress progress =
stateAndStats.getTransformState().getProgress(); // <5>
// end::get-data-frame-transform-stats-response
assertEquals(IndexerState.STOPPED, indexerState);
assertEquals(DataFrameTransformTaskState.STOPPED, taskState);
assertNotNull(transformStats);
assertNull(progress);
}
{
// tag::get-data-frame-transform-stats-execute-listener

View File

@ -37,4 +37,6 @@ include-tagged::{doc-tests-file}[{api}-response]
<1> The response contains a list of `DataFrameTransformStateAndStats` objects
<2> The running state of the transform task e.g `started`
<3> The running state of the transform indexer e.g `started`, `indexing`, etc.
<4> The transform progress statistics recording the number of documents indexed etc
<4> The overall transform statistics recording the number of documents indexed etc.
<5> The progress of the current run in the transform. Supplies the number of docs left until the next checkpoint
and the total number of docs expected.

View File

@ -0,0 +1,135 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class DataFrameTransformProgress implements Writeable, ToXContentObject {
private static final ParseField TOTAL_DOCS = new ParseField("total_docs");
private static final ParseField DOCS_REMAINING = new ParseField("docs_remaining");
private static final String PERCENT_COMPLETE = "percent_complete";
public static final ConstructingObjectParser<DataFrameTransformProgress, Void> PARSER = new ConstructingObjectParser<>(
"data_frame_transform_progress",
true,
a -> new DataFrameTransformProgress((Long) a[0], (Long)a[1]));
static {
PARSER.declareLong(constructorArg(), TOTAL_DOCS);
PARSER.declareLong(optionalConstructorArg(), DOCS_REMAINING);
}
private final long totalDocs;
private long remainingDocs;
public DataFrameTransformProgress(long totalDocs, Long remainingDocs) {
if (totalDocs < 0) {
throw new IllegalArgumentException("[total_docs] must be >0.");
}
this.totalDocs = totalDocs;
if (remainingDocs != null && remainingDocs < 0) {
throw new IllegalArgumentException("[docs_remaining] must be >0.");
}
this.remainingDocs = remainingDocs == null ? totalDocs : remainingDocs;
}
public DataFrameTransformProgress(DataFrameTransformProgress otherProgress) {
this.totalDocs = otherProgress.totalDocs;
this.remainingDocs = otherProgress.remainingDocs;
}
public DataFrameTransformProgress(StreamInput in) throws IOException {
this.totalDocs = in.readLong();
this.remainingDocs = in.readLong();
}
public Double getPercentComplete() {
if (totalDocs == 0) {
return 100.0;
}
long docsRead = totalDocs - remainingDocs;
if (docsRead < 0) {
return 100.0;
}
return 100.0*(double)docsRead/totalDocs;
}
public long getTotalDocs() {
return totalDocs;
}
public long getRemainingDocs() {
return remainingDocs;
}
public void resetRemainingDocs() {
this.remainingDocs = totalDocs;
}
public void docsProcessed(long docsProcessed) {
assert docsProcessed >= 0;
if (docsProcessed > remainingDocs) {
remainingDocs = 0;
} else {
remainingDocs -= docsProcessed;
}
}
@Override
public boolean equals(Object other) {
if (other == this) {
return true;
}
if (other == null || other.getClass() != getClass()) {
return false;
}
DataFrameTransformProgress that = (DataFrameTransformProgress) other;
return Objects.equals(this.remainingDocs, that.remainingDocs) && Objects.equals(this.totalDocs, that.totalDocs);
}
@Override
public int hashCode(){
return Objects.hash(remainingDocs, totalDocs);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(totalDocs);
out.writeLong(remainingDocs);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(TOTAL_DOCS.getPreferredName(), totalDocs);
builder.field(DOCS_REMAINING.getPreferredName(), remainingDocs);
builder.field(PERCENT_COMPLETE, getPercentComplete());
builder.endObject();
return builder;
}
@Override
public String toString() {
return Strings.toString(this, true, true);
}
}

View File

@ -12,7 +12,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.persistent.PersistentTaskState;
@ -22,10 +22,9 @@ import org.elasticsearch.xpack.core.indexing.IndexerState;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
@ -35,10 +34,11 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
private final DataFrameTransformTaskState taskState;
private final IndexerState indexerState;
private final DataFrameTransformProgress progress;
private final long checkpoint;
@Nullable
private final SortedMap<String, Object> currentPosition;
private final Map<String, Object> currentPosition;
@Nullable
private final String reason;
@ -47,6 +47,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
private static final ParseField CURRENT_POSITION = new ParseField("current_position");
private static final ParseField CHECKPOINT = new ParseField("checkpoint");
private static final ParseField REASON = new ParseField("reason");
private static final ParseField PROGRESS = new ParseField("progress");
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<DataFrameTransformState, Void> PARSER = new ConstructingObjectParser<>(NAME,
@ -55,53 +56,40 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
(IndexerState) args[1],
(Map<String, Object>) args[2],
(long) args[3],
(String) args[4]));
(String) args[4],
(DataFrameTransformProgress) args[5]));
static {
PARSER.declareField(constructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return DataFrameTransformTaskState.fromString(p.text());
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, TASK_STATE, ObjectParser.ValueType.STRING);
PARSER.declareField(constructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return IndexerState.fromString(p.text());
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, INDEXER_STATE, ObjectParser.ValueType.STRING);
PARSER.declareField(optionalConstructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.START_OBJECT) {
return p.map();
}
if (p.currentToken() == XContentParser.Token.VALUE_NULL) {
return null;
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, CURRENT_POSITION, ObjectParser.ValueType.VALUE_OBJECT_ARRAY);
PARSER.declareField(constructorArg(), p -> DataFrameTransformTaskState.fromString(p.text()), TASK_STATE, ValueType.STRING);
PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE, ValueType.STRING);
PARSER.declareField(optionalConstructorArg(), XContentParser::mapOrdered, CURRENT_POSITION, ValueType.OBJECT);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), CHECKPOINT);
PARSER.declareString(optionalConstructorArg(), REASON);
PARSER.declareField(optionalConstructorArg(), DataFrameTransformProgress.PARSER::apply, PROGRESS, ValueType.OBJECT);
}
public DataFrameTransformState(DataFrameTransformTaskState taskState,
IndexerState indexerState,
@Nullable Map<String, Object> position,
long checkpoint,
@Nullable String reason) {
@Nullable String reason,
@Nullable DataFrameTransformProgress progress) {
this.taskState = taskState;
this.indexerState = indexerState;
this.currentPosition = position == null ? null : Collections.unmodifiableSortedMap(new TreeMap<>(position));
this.currentPosition = position == null ? null : Collections.unmodifiableMap(new LinkedHashMap<>(position));
this.checkpoint = checkpoint;
this.reason = reason;
this.progress = progress;
}
public DataFrameTransformState(StreamInput in) throws IOException {
taskState = DataFrameTransformTaskState.fromStream(in);
indexerState = IndexerState.fromStream(in);
currentPosition = in.readBoolean() ? Collections.unmodifiableSortedMap(new TreeMap<>(in.readMap())) : null;
Map<String, Object> position = in.readMap();
currentPosition = position == null ? null : Collections.unmodifiableMap(position);
checkpoint = in.readLong();
reason = in.readOptionalString();
progress = in.readOptionalWriteable(DataFrameTransformProgress::new);
}
public DataFrameTransformTaskState getTaskState() {
@ -120,6 +108,10 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
return checkpoint;
}
public DataFrameTransformProgress getProgress() {
return progress;
}
/**
* Get the in-progress checkpoint
*
@ -153,6 +145,9 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
if (reason != null) {
builder.field(REASON.getPreferredName(), reason);
}
if (progress != null) {
builder.field(PROGRESS.getPreferredName(), progress);
}
builder.endObject();
return builder;
}
@ -166,12 +161,10 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
public void writeTo(StreamOutput out) throws IOException {
taskState.writeTo(out);
indexerState.writeTo(out);
out.writeBoolean(currentPosition != null);
if (currentPosition != null) {
out.writeMap(currentPosition);
}
out.writeLong(checkpoint);
out.writeOptionalString(reason);
out.writeOptionalWriteable(progress);
}
@Override
@ -190,12 +183,13 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
Objects.equals(this.indexerState, that.indexerState) &&
Objects.equals(this.currentPosition, that.currentPosition) &&
this.checkpoint == that.checkpoint &&
Objects.equals(this.reason, that.reason);
Objects.equals(this.reason, that.reason) &&
Objects.equals(this.progress, that.progress);
}
@Override
public int hashCode() {
return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason);
return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason, progress);
}
@Override

View File

@ -53,7 +53,7 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
public static DataFrameTransformStateAndStats initialStateAndStats(String id, DataFrameIndexerTransformStats indexerTransformStats) {
return new DataFrameTransformStateAndStats(id,
new DataFrameTransformState(DataFrameTransformTaskState.STOPPED, IndexerState.STOPPED, null, 0L, null),
new DataFrameTransformState(DataFrameTransformTaskState.STOPPED, IndexerState.STOPPED, null, 0L, null, null),
indexerTransformStats,
DataFrameTransformCheckpointingInfo.EMPTY);
}

View File

@ -7,6 +7,7 @@
package org.elasticsearch.xpack.core.dataframe.transforms.pivot;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
@ -116,4 +117,9 @@ public abstract class SingleGroupSource implements Writeable, ToXContentObject {
public int hashCode() {
return Objects.hash(field);
}
@Override
public String toString() {
return Strings.toString(this, true, true);
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.utils;
/**
* Collection of methods to aid in creating and checking for exceptions.
*/
public class ExceptionsHelper {
/**
* A more REST-friendly Object.requireNonNull()
*/
public static <T> T requireNonNull(T obj, String paramName) {
if (obj == null) {
throw new IllegalArgumentException("[" + paramName + "] must not be null.");
}
return obj;
}
}

View File

@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
public class DataFrameTransformProgressTests extends AbstractSerializingDataFrameTestCase<DataFrameTransformProgress> {
public static DataFrameTransformProgress randomDataFrameTransformProgress() {
long totalDocs = randomNonNegativeLong();
return new DataFrameTransformProgress(totalDocs, randomBoolean() ? null : randomLongBetween(0, totalDocs));
}
@Override
protected DataFrameTransformProgress doParseInstance(XContentParser parser) throws IOException {
return DataFrameTransformProgress.PARSER.apply(parser, null);
}
@Override
protected DataFrameTransformProgress createTestInstance() {
return randomDataFrameTransformProgress();
}
@Override
protected Reader<DataFrameTransformProgress> instanceReader() {
return DataFrameTransformProgress::new;
}
public void testPercentComplete() {
DataFrameTransformProgress progress = new DataFrameTransformProgress(0L, 100L);
assertThat(progress.getPercentComplete(), equalTo(100.0));
progress = new DataFrameTransformProgress(100L, 0L);
assertThat(progress.getPercentComplete(), equalTo(100.0));
progress = new DataFrameTransformProgress(100L, 10000L);
assertThat(progress.getPercentComplete(), equalTo(100.0));
progress = new DataFrameTransformProgress(100L, null);
assertThat(progress.getPercentComplete(), equalTo(0.0));
progress = new DataFrameTransformProgress(100L, 50L);
assertThat(progress.getPercentComplete(), closeTo(50.0, 0.000001));
}
public void testConstructor() {
IllegalArgumentException ex =
expectThrows(IllegalArgumentException.class, () -> new DataFrameTransformProgress(-1, null));
assertThat(ex.getMessage(), equalTo("[total_docs] must be >0."));
ex = expectThrows(IllegalArgumentException.class, () -> new DataFrameTransformProgress(1L, -1L));
assertThat(ex.getMessage(), equalTo("[docs_remaining] must be >0."));
}
}

View File

@ -16,6 +16,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.function.Predicate;
import static org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgressTests.randomDataFrameTransformProgress;
public class DataFrameTransformStateTests extends AbstractSerializingTestCase<DataFrameTransformState> {
public static DataFrameTransformState randomDataFrameTransformState() {
@ -23,7 +25,8 @@ public class DataFrameTransformStateTests extends AbstractSerializingTestCase<Da
randomFrom(IndexerState.values()),
randomPosition(),
randomLongBetween(0,10),
randomBoolean() ? null : randomAlphaOfLength(10));
randomBoolean() ? null : randomAlphaOfLength(10),
randomBoolean() ? null : randomDataFrameTransformProgress());
}
@Override

View File

@ -2,7 +2,8 @@ apply plugin: 'elasticsearch.standalone-rest-test'
apply plugin: 'elasticsearch.rest-test'
dependencies {
testCompile "org.elasticsearch.plugin:x-pack-core:${version}"
testCompile project(path: xpackModule('core'), configuration: 'default')
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
testCompile project(path: xpackModule('data-frame'), configuration: 'runtime')
}

View File

@ -18,6 +18,7 @@ import java.util.List;
import java.util.Map;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {
@ -83,6 +84,19 @@ public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {
stats = entityAsMap(client().performRequest(getRequest));
assertEquals(2, XContentMapValues.extractValue("count", stats));
List<Map<String, Object>> transformsStats = (List<Map<String, Object>>)XContentMapValues.extractValue("transforms", stats);
// Verify that both transforms have valid stats
for (Map<String, Object> transformStats : transformsStats) {
Map<String, Object> stat = (Map<String, Object>)transformStats.get("stats");
assertThat("documents_processed is not > 0.", ((Integer)stat.get("documents_processed")), greaterThan(0));
assertThat("search_total is not > 0.", ((Integer)stat.get("search_total")), greaterThan(0));
assertThat("pages_processed is not > 0.", ((Integer)stat.get("pages_processed")), greaterThan(0));
Map<String, Object> progress = (Map<String, Object>)XContentMapValues.extractValue("state.progress", transformStats);
assertThat("total_docs is not 1000", progress.get("total_docs"), equalTo(1000));
assertThat("docs_remaining is not 0", progress.get("docs_remaining"), equalTo(0));
assertThat("percent_complete is not 100.0", progress.get("percent_complete"), equalTo(100.0));
}
// only pivot_1
getRequest = createRequestWithAuth("GET", DATAFRAME_ENDPOINT + "pivot_1/_stats", authHeader);
stats = entityAsMap(client().performRequest(getRequest));
@ -134,4 +148,32 @@ public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {
assertThat(((Integer)stat.get("pages_processed")), greaterThan(0));
}
}
@SuppressWarnings("unchecked")
public void testGetProgressStatsWithPivotQuery() throws Exception {
String transformId = "simpleStatsPivotWithQuery";
String dataFrameIndex = "pivot_stats_reviews_user_id_above_20";
String query = "\"match\": {\"user_id\": \"user_26\"}";
createPivotReviewsTransform(transformId, dataFrameIndex, query);
startAndWaitForTransform(transformId, dataFrameIndex);
// Alternate testing between admin and lowly user, as both should be able to get the configs and stats
String authHeader = randomFrom(BASIC_AUTH_VALUE_DATA_FRAME_USER, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN);
Request getRequest = createRequestWithAuth("GET", DATAFRAME_ENDPOINT + "simpleStatsPivotWithQuery/_stats", authHeader);
Map<String, Object> stats = entityAsMap(client().performRequest(getRequest));
assertEquals(1, XContentMapValues.extractValue("count", stats));
List<Map<String, Object>> transformsStats = (List<Map<String, Object>>)XContentMapValues.extractValue("transforms", stats);
// Verify that the transform has stats and the total docs process matches the expected
for (Map<String, Object> transformStats : transformsStats) {
Map<String, Object> stat = (Map<String, Object>)transformStats.get("stats");
assertThat("documents_processed is not > 0.", ((Integer)stat.get("documents_processed")), greaterThan(0));
assertThat("search_total is not > 0.", ((Integer)stat.get("search_total")), greaterThan(0));
assertThat("pages_processed is not > 0.", ((Integer)stat.get("pages_processed")), greaterThan(0));
Map<String, Object> progress = (Map<String, Object>)XContentMapValues.extractValue("state.progress", transformStats);
assertThat("total_docs is not 37", progress.get("total_docs"), equalTo(37));
assertThat("docs_remaining is not 0", progress.get("docs_remaining"), equalTo(0));
assertThat("percent_complete is not 100.0", progress.get("percent_complete"), equalTo(100.0));
}
}
}

View File

@ -0,0 +1,189 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.integration;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.SecuritySettingsSourceField;
import org.elasticsearch.transport.Netty4Plugin;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.XPackClientPlugin;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress;
import org.elasticsearch.xpack.core.dataframe.transforms.DestConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.QueryConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.SourceConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.AggregationConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.GroupConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.HistogramGroupSource;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfig;
import org.elasticsearch.xpack.core.security.SecurityField;
import org.elasticsearch.xpack.dataframe.transforms.TransformProgressGatherer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.dataframe.integration.DataFrameRestTestCase.REVIEWS_INDEX_NAME;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
public class DataFrameTransformProgressIT extends ESIntegTestCase {
protected void createReviewsIndex() throws Exception {
final int numDocs = 1000;
// create mapping
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
{
builder.startObject("properties")
.startObject("timestamp")
.field("type", "date")
.endObject()
.startObject("user_id")
.field("type", "keyword")
.endObject()
.startObject("count")
.field("type", "integer")
.endObject()
.startObject("business_id")
.field("type", "keyword")
.endObject()
.startObject("stars")
.field("type", "integer")
.endObject()
.endObject();
}
builder.endObject();
CreateIndexResponse response = client().admin()
.indices()
.prepareCreate(REVIEWS_INDEX_NAME)
.addMapping("_doc", builder)
.get();
assertThat(response.isAcknowledged(), is(true));
}
// create index
BulkRequestBuilder bulk = client().prepareBulk(REVIEWS_INDEX_NAME, "_doc");
int day = 10;
for (int i = 0; i < numDocs; i++) {
long user = i % 28;
int stars = (i + 20) % 5;
long business = (i + 100) % 50;
int hour = 10 + (i % 13);
int min = 10 + (i % 49);
int sec = 10 + (i % 49);
String date_string = "2017-01-" + day + "T" + hour + ":" + min + ":" + sec + "Z";
StringBuilder sourceBuilder = new StringBuilder();
sourceBuilder.append("{\"user_id\":\"")
.append("user_")
.append(user)
.append("\",\"count\":")
.append(i)
.append(",\"business_id\":\"")
.append("business_")
.append(business)
.append("\",\"stars\":")
.append(stars)
.append(",\"timestamp\":\"")
.append(date_string)
.append("\"}");
bulk.add(new IndexRequest().source(sourceBuilder.toString(), XContentType.JSON));
if (i % 50 == 0) {
BulkResponse response = client().bulk(bulk.request()).get();
assertThat(response.buildFailureMessage(), response.hasFailures(), is(false));
bulk = client().prepareBulk(REVIEWS_INDEX_NAME, "_doc");
day += 1;
}
}
client().bulk(bulk.request()).get();
client().admin().indices().prepareRefresh(REVIEWS_INDEX_NAME).get();
}
public void testGetProgress() throws Exception {
createReviewsIndex();
SourceConfig sourceConfig = new SourceConfig(REVIEWS_INDEX_NAME);
DestConfig destConfig = new DestConfig("unnecessary");
GroupConfig histgramGroupConfig = new GroupConfig(Collections.emptyMap(),
Collections.singletonMap("every_50", new HistogramGroupSource("count", 50.0)));
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder();
aggs.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
AggregationConfig aggregationConfig = new AggregationConfig(Collections.emptyMap(), aggs);
PivotConfig pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig);
DataFrameTransformConfig config = new DataFrameTransformConfig("get_progress_transform",
sourceConfig,
destConfig,
null,
pivotConfig);
PlainActionFuture<DataFrameTransformProgress> progressFuture = new PlainActionFuture<>();
TransformProgressGatherer.getInitialProgress(client(), config, progressFuture);
DataFrameTransformProgress progress = progressFuture.get();
assertThat(progress.getTotalDocs(), equalTo(1000L));
assertThat(progress.getRemainingDocs(), equalTo(1000L));
assertThat(progress.getPercentComplete(), equalTo(0.0));
QueryConfig queryConfig = new QueryConfig(Collections.emptyMap(), QueryBuilders.termQuery("user_id", "user_26"));
pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig);
sourceConfig = new SourceConfig(new String[]{REVIEWS_INDEX_NAME}, queryConfig);
config = new DataFrameTransformConfig("get_progress_transform",
sourceConfig,
destConfig,
null,
pivotConfig);
progressFuture = new PlainActionFuture<>();
TransformProgressGatherer.getInitialProgress(client(), config, progressFuture);
progress = progressFuture.get();
assertThat(progress.getTotalDocs(), equalTo(35L));
assertThat(progress.getRemainingDocs(), equalTo(35L));
assertThat(progress.getPercentComplete(), equalTo(0.0));
client().admin().indices().prepareDelete(REVIEWS_INDEX_NAME).get();
}
@Override
protected Settings externalClusterClientSettings() {
Settings.Builder builder = Settings.builder();
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, SecurityField.NAME4);
builder.put(SecurityField.USER_SETTING.getKey(), "x_pack_rest_user:" + SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING);
return builder.build();
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(LocalStateCompositeXPackPlugin.class, Netty4Plugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Arrays.asList(XPackClientPlugin.class, Netty4Plugin.class);
}
}

View File

@ -95,20 +95,15 @@ public class TransportStartDataFrameTransformAction extends
}
final DataFrameTransform transformTask = createDataFrameTransform(request.getId(), threadPool);
// <4> Set the allocated task's state to STARTED
ActionListener<PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform>> persistentTaskActionListener = ActionListener.wrap(
// <3> Wait for the allocated task's state to STARTED
ActionListener<PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform>> newPersistentTaskActionListener =
ActionListener.wrap(
task -> {
waitForDataFrameTaskAllocated(task.getId(),
waitForDataFrameTaskStarted(task.getId(),
transformTask,
request.timeout(),
ActionListener.wrap(
taskAssigned -> ClientHelper.executeAsyncWithOrigin(client,
ClientHelper.DATA_FRAME_ORIGIN,
StartDataFrameTransformTaskAction.INSTANCE,
new StartDataFrameTransformTaskAction.Request(request.getId()),
ActionListener.wrap(
r -> listener.onResponse(new StartDataFrameTransformAction.Response(true)),
listener::onFailure)),
taskStarted -> listener.onResponse(new StartDataFrameTransformAction.Response(true)),
listener::onFailure));
},
listener::onFailure
@ -120,10 +115,11 @@ public class TransportStartDataFrameTransformAction extends
PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform> existingTask =
getExistingTask(transformTask.getId(), state);
if (existingTask == null) {
// Create the allocated task and wait for it to be started
persistentTasksService.sendStartRequest(transformTask.getId(),
DataFrameTransform.NAME,
transformTask,
persistentTaskActionListener);
newPersistentTaskActionListener);
} else {
DataFrameTransformState transformState = (DataFrameTransformState)existingTask.getState();
if(transformState.getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) {
@ -138,7 +134,26 @@ public class TransportStartDataFrameTransformAction extends
"Unable to start data frame transform [" + request.getId() +
"] as it is in state [" + transformState.getTaskState() + "]", RestStatus.CONFLICT));
} else {
persistentTaskActionListener.onResponse(existingTask);
// If the task already exists but is not assigned to a node, something is weird
// return a failure that includes the current assignment explanation (if one exists)
if (existingTask.isAssigned() == false) {
String assignmentExplanation = "unknown reason";
if (existingTask.getAssignment() != null) {
assignmentExplanation = existingTask.getAssignment().getExplanation();
}
listener.onFailure(new ElasticsearchStatusException("Unable to start data frame transform [" +
request.getId() + "] as it is not assigned to a node, explanation: " + assignmentExplanation,
RestStatus.CONFLICT));
return;
}
// If the task already exists and is assigned to a node, simply attempt to set it to start
ClientHelper.executeAsyncWithOrigin(client,
ClientHelper.DATA_FRAME_ORIGIN,
StartDataFrameTransformTaskAction.INSTANCE,
new StartDataFrameTransformTaskAction.Request(request.getId()),
ActionListener.wrap(
r -> listener.onResponse(new StartDataFrameTransformAction.Response(true)),
listener::onFailure));
}
}
},
@ -269,7 +284,7 @@ public class TransportStartDataFrameTransformAction extends
);
}
private void waitForDataFrameTaskAllocated(String taskId,
private void waitForDataFrameTaskStarted(String taskId,
DataFrameTransform params,
TimeValue timeout,
ActionListener<Boolean> listener) {
@ -324,7 +339,15 @@ public class TransportStartDataFrameTransformAction extends
return true;
}
// We just want it assigned so we can tell it to start working
return assignment != null && assignment.isAssigned();
return assignment != null && assignment.isAssigned() && isNotStopped(persistentTask);
}
// checking for `isNotStopped` as the state COULD be marked as failed for any number of reasons
// But if it is in a failed state, _stats will show as much and give good reason to the user.
// If it is not able to be assigned to a node all together, we should just close the task completely
private boolean isNotStopped(PersistentTasksCustomMetaData.PersistentTask<?> task) {
DataFrameTransformState state = (DataFrameTransformState)task.getState();
return state != null && state.getTaskState().equals(DataFrameTransformTaskState.STOPPED) == false;
}
}
}

View File

@ -14,6 +14,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -23,6 +24,7 @@ import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress;
import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.indexing.IterationResult;
@ -64,6 +66,9 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
protected abstract Map<String, String> getFieldMappings();
@Nullable
protected abstract DataFrameTransformProgress getProgress();
protected abstract void failIndexer(String message);
public int getPageSize() {
@ -87,7 +92,7 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
}
// if run for the 1st time, create checkpoint
if (getPosition() == null) {
if (initialRun()) {
createCheckpoint(listener);
} else {
listener.onResponse(null);
@ -97,6 +102,10 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
}
}
protected boolean initialRun() {
return getPosition() == null;
}
@Override
protected void onFinish(ActionListener<Void> listener) {
// reset the page size, so we do not memorize a low page size forever, the pagesize will be re-calculated on start
@ -106,8 +115,14 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
@Override
protected IterationResult<Map<String, Object>> doProcess(SearchResponse searchResponse) {
final CompositeAggregation agg = searchResponse.getAggregations().get(COMPOSITE_AGGREGATION_NAME);
return new IterationResult<>(processBucketsToIndexRequests(agg).collect(Collectors.toList()), agg.afterKey(),
long docsBeforeProcess = getStats().getNumDocuments();
IterationResult<Map<String, Object>> result = new IterationResult<>(processBucketsToIndexRequests(agg).collect(Collectors.toList()),
agg.afterKey(),
agg.getBuckets().isEmpty());
if (getProgress() != null) {
getProgress().docsProcessed(getStats().getNumDocuments() - docsBeforeProcess);
}
return result;
}
/*

View File

@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.persistent.AllocatedPersistentTask;
@ -19,21 +20,31 @@ import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.dataframe.DataFrame;
import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksExecutor<DataFrameTransform> {
private static final Logger logger = LogManager.getLogger(DataFrameTransformPersistentTasksExecutor.class);
// The amount of time we wait for the cluster state to respond when being marked as failed
private static final int MARK_AS_FAILED_TIMEOUT_SEC = 90;
private final Client client;
private final DataFrameTransformsConfigManager transformsConfigManager;
private final DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService;
@ -58,36 +69,116 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
@Override
protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTransform params, PersistentTaskState state) {
DataFrameTransformTask buildTask = (DataFrameTransformTask) task;
SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(
DataFrameTransformTask.SCHEDULE_NAME + "_" + params.getId(), next());
DataFrameTransformState transformState = (DataFrameTransformState) state;
if (transformState != null && transformState.getTaskState() == DataFrameTransformTaskState.FAILED) {
logger.warn("Tried to start failed transform [" + params.getId() + "] failure reason: " + transformState.getReason());
return;
}
transformsConfigManager.getTransformStats(params.getId(), ActionListener.wrap(
final String transformId = params.getId();
final DataFrameTransformTask buildTask = (DataFrameTransformTask) task;
final SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(DataFrameTransformTask.SCHEDULE_NAME + "_" + transformId,
next());
final DataFrameTransformState transformState = (DataFrameTransformState) state;
final DataFrameTransformTask.ClientDataFrameIndexerBuilder indexerBuilder =
new DataFrameTransformTask.ClientDataFrameIndexerBuilder()
.setAuditor(auditor)
.setClient(client)
.setIndexerState(transformState == null ? IndexerState.STOPPED : transformState.getIndexerState())
.setInitialPosition(transformState == null ? null : transformState.getPosition())
// If the state is `null` that means this is a "first run". We can safely assume the
// task will attempt to gather the initial progress information
// if we have state, this may indicate the previous execution node crashed, so we should attempt to retrieve
// the progress from state to keep an accurate measurement of our progress
.setProgress(transformState == null ? null : transformState.getProgress())
.setTransformsCheckpointService(dataFrameTransformsCheckpointService)
.setTransformsConfigManager(transformsConfigManager)
.setTransformId(transformId);
ActionListener<StartDataFrameTransformTaskAction.Response> startTaskListener = ActionListener.wrap(
response -> logger.info("Successfully completed and scheduled task in node operation"),
failure -> logger.error("Failed to start task ["+ transformId +"] in node operation", failure)
);
// <3> Set the previous stats (if they exist), initialize the indexer, start the task (If it is STOPPED)
// Since we don't create the task until `_start` is called, if we see that the task state is stopped, attempt to start
// Schedule execution regardless
ActionListener<DataFrameIndexerTransformStats> transformStatsActionListener = ActionListener.wrap(
stats -> {
// Initialize with the previously recorded stats
buildTask.initializePreviousStats(stats);
scheduleTask(buildTask, schedulerJob, params.getId());
indexerBuilder.setInitialStats(stats);
buildTask.initializeIndexer(indexerBuilder);
scheduleAndStartTask(buildTask, schedulerJob, startTaskListener);
},
error -> {
if (error instanceof ResourceNotFoundException == false) {
logger.error("Unable to load previously persisted statistics for transform [" + params.getId() + "]", error);
}
scheduleTask(buildTask, schedulerJob, params.getId());
indexerBuilder.setInitialStats(new DataFrameIndexerTransformStats(transformId));
buildTask.initializeIndexer(indexerBuilder);
scheduleAndStartTask(buildTask, schedulerJob, startTaskListener);
}
));
);
// <2> set fieldmappings for the indexer, get the previous stats (if they exist)
ActionListener<Map<String, String>> getFieldMappingsListener = ActionListener.wrap(
fieldMappings -> {
indexerBuilder.setFieldMappings(fieldMappings);
transformsConfigManager.getTransformStats(transformId, transformStatsActionListener);
},
error -> {
String msg = DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS,
indexerBuilder.getTransformConfig().getDestination().getIndex());
logger.error(msg, error);
markAsFailed(buildTask, msg);
}
);
// <1> Validate the transform, assigning it to the indexer, and get the field mappings
ActionListener<DataFrameTransformConfig> getTransformConfigListener = ActionListener.wrap(
config -> {
if (config.isValid()) {
indexerBuilder.setTransformConfig(config);
SchemaUtil.getDestinationFieldMappings(client, config.getDestination().getIndex(), getFieldMappingsListener);
} else {
markAsFailed(buildTask,
DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID, transformId));
}
},
error -> {
String msg = DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_LOAD_TRANSFORM_CONFIGURATION, transformId);
logger.error(msg, error);
markAsFailed(buildTask, msg);
}
);
// <0> Get the transform config
transformsConfigManager.getTransformConfiguration(transformId, getTransformConfigListener);
}
private void scheduleTask(DataFrameTransformTask buildTask, SchedulerEngine.Job schedulerJob, String id) {
private void markAsFailed(DataFrameTransformTask task, String reason) {
CountDownLatch latch = new CountDownLatch(1);
task.markAsFailed(reason, new LatchedActionListener<>(ActionListener.wrap(
nil -> {},
failure -> logger.error("Failed to set task [" + task.getTransformId() +"] to failed", failure)
), latch));
try {
latch.await(MARK_AS_FAILED_TIMEOUT_SEC, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("Timeout waiting for task [" + task.getTransformId() + "] to be marked as failed in cluster state", e);
}
}
private void scheduleAndStartTask(DataFrameTransformTask buildTask,
SchedulerEngine.Job schedulerJob,
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
// Note that while the task is added to the scheduler here, the internal state will prevent
// it from doing any work until the task is "started" via the StartTransform api
schedulerEngine.register(buildTask);
schedulerEngine.add(schedulerJob);
logger.info("Data frame transform [" + id + "] created.");
logger.info("Data frame transform [{}] created.", buildTask.getTransformId());
// If we are stopped, and it is an initial run, this means we have never been started,
// attempt to start the task
if (buildTask.getState().getTaskState().equals(DataFrameTransformTaskState.STOPPED) && buildTask.isInitialRun()) {
buildTask.start(listener);
} else {
logger.debug("No need to start task. Its current state is: {}", buildTask.getState().getIndexerState());
listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));
}
}
static SchedulerEngine.Schedule next() {
@ -100,7 +191,6 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform> persistentTask, Map<String, String> headers) {
return new DataFrameTransformTask(id, type, action, parentTaskId, persistentTask.getParams(),
(DataFrameTransformState) persistentTask.getState(), client, transformsConfigManager,
dataFrameTransformsCheckpointService, schedulerEngine, auditor, threadPool, headers);
(DataFrameTransformState) persistentTask.getState(), schedulerEngine, auditor, threadPool, headers);
}
}

View File

@ -8,9 +8,9 @@ package org.elasticsearch.xpack.dataframe.transforms;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
@ -25,27 +25,25 @@ import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.core.dataframe.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event;
import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -62,21 +60,20 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
private final DataFrameTransform transform;
private final SchedulerEngine schedulerEngine;
private final ThreadPool threadPool;
private final DataFrameIndexer indexer;
private final DataFrameAuditor auditor;
private final DataFrameIndexerTransformStats previousStats;
private final Map<String, Object> initialPosition;
private final IndexerState initialIndexerState;
private final SetOnce<DataFrameIndexer> indexer = new SetOnce<>();
private final AtomicReference<DataFrameTransformTaskState> taskState;
private final AtomicReference<String> stateReason;
// the checkpoint of this data frame, storing the checkpoint until data indexing from source to dest is _complete_
// Note: Each indexer run creates a new future checkpoint which becomes the current checkpoint only after the indexer run finished
private final AtomicLong currentCheckpoint;
private final AtomicInteger failureCount;
public DataFrameTransformTask(long id, String type, String action, TaskId parentTask, DataFrameTransform transform,
DataFrameTransformState state, Client client, DataFrameTransformsConfigManager transformsConfigManager,
DataFrameTransformsCheckpointService transformsCheckpointService,
SchedulerEngine schedulerEngine, DataFrameAuditor auditor,
DataFrameTransformState state, SchedulerEngine schedulerEngine, DataFrameAuditor auditor,
ThreadPool threadPool, Map<String, String> headers) {
super(id, type, action, DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + transform.getId(), parentTask, headers);
this.transform = transform;
@ -107,13 +104,11 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
initialGeneration = state.getCheckpoint();
}
this.indexer = new ClientDataFrameIndexer(transform.getId(), transformsConfigManager, transformsCheckpointService,
new AtomicReference<>(initialState), initialPosition, client, auditor);
this.initialIndexerState = initialState;
this.initialPosition = initialPosition;
this.currentCheckpoint = new AtomicLong(initialGeneration);
this.previousStats = new DataFrameIndexerTransformStats(transform.getId());
this.taskState = new AtomicReference<>(initialTaskState);
this.stateReason = new AtomicReference<>(initialReason);
this.failureCount = new AtomicInteger(0);
}
public String getTransformId() {
@ -128,21 +123,36 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
return getState();
}
public DataFrameTransformState getState() {
return new DataFrameTransformState(
taskState.get(),
indexer.getState(),
indexer.getPosition(),
currentCheckpoint.get(),
stateReason.get());
private DataFrameIndexer getIndexer() {
return indexer.get();
}
void initializePreviousStats(DataFrameIndexerTransformStats stats) {
previousStats.merge(stats);
public DataFrameTransformState getState() {
if (getIndexer() == null) {
return new DataFrameTransformState(
taskState.get(),
initialIndexerState,
initialPosition,
currentCheckpoint.get(),
stateReason.get(),
null);
} else {
return new DataFrameTransformState(
taskState.get(),
indexer.get().getState(),
indexer.get().getPosition(),
currentCheckpoint.get(),
stateReason.get(),
getIndexer().getProgress());
}
}
public DataFrameIndexerTransformStats getStats() {
return new DataFrameIndexerTransformStats(previousStats).merge(indexer.getStats());
if (getIndexer() == null) {
return new DataFrameIndexerTransformStats(getTransformId());
} else {
return getIndexer().getStats();
}
}
public long getCheckpoint() {
@ -155,15 +165,29 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
* @return checkpoint in progress or 0 if task/indexer is not active
*/
public long getInProgressCheckpoint() {
return indexer.getState().equals(IndexerState.INDEXING) ? currentCheckpoint.get() + 1L : 0;
if (getIndexer() == null) {
return 0;
} else {
return indexer.get().getState().equals(IndexerState.INDEXING) ? currentCheckpoint.get() + 1L : 0;
}
}
public boolean isStopped() {
return indexer.getState().equals(IndexerState.STOPPED);
IndexerState currentState = getIndexer() == null ? initialIndexerState : getIndexer().getState();
return currentState.equals(IndexerState.STOPPED);
}
boolean isInitialRun() {
return getIndexer() != null && getIndexer().initialRun();
}
public synchronized void start(ActionListener<Response> listener) {
final IndexerState newState = indexer.start();
if (getIndexer() == null) {
listener.onFailure(new ElasticsearchException("Task for transform [{}] not fully initialized. Try again later",
getTransformId()));
return;
}
final IndexerState newState = getIndexer().start();
if (Arrays.stream(RUNNING_STATES).noneMatch(newState::equals)) {
listener.onFailure(new ElasticsearchException("Cannot start task for data frame transform [{}], because state was [{}]",
transform.getId(), newState));
@ -171,14 +195,14 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
}
stateReason.set(null);
taskState.set(DataFrameTransformTaskState.STARTED);
failureCount.set(0);
final DataFrameTransformState state = new DataFrameTransformState(
DataFrameTransformTaskState.STARTED,
IndexerState.STOPPED,
indexer.getPosition(),
getIndexer().getPosition(),
currentCheckpoint.get(),
null);
null,
getIndexer().getProgress());
logger.info("Updating state for data frame transform [{}] to [{}]", transform.getId(), state.toString());
persistStateToClusterState(state, ActionListener.wrap(
@ -187,7 +211,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));
},
exc -> {
indexer.stop();
getIndexer().stop();
listener.onFailure(new ElasticsearchException("Error while updating state for data frame transform ["
+ transform.getId() + "] to [" + state.getIndexerState() + "].", exc));
}
@ -195,13 +219,18 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
}
public synchronized void stop(ActionListener<StopDataFrameTransformAction.Response> listener) {
if (getIndexer() == null) {
listener.onFailure(new ElasticsearchException("Task for transform [{}] not fully initialized. Try again later",
getTransformId()));
return;
}
// taskState is initialized as STOPPED and is updated in tandem with the indexerState
// Consequently, if it is STOPPED, we consider the whole task STOPPED.
if (taskState.get() == DataFrameTransformTaskState.STOPPED) {
listener.onResponse(new StopDataFrameTransformAction.Response(true));
return;
}
final IndexerState newState = indexer.stop();
final IndexerState newState = getIndexer().stop();
switch (newState) {
case STOPPED:
// Fall through to `STOPPING` as the behavior is the same for both, we should persist for both
@ -215,9 +244,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
DataFrameTransformState state = new DataFrameTransformState(
DataFrameTransformTaskState.STOPPED,
IndexerState.STOPPED,
indexer.getPosition(),
getIndexer().getPosition(),
currentCheckpoint.get(),
stateReason.get());
stateReason.get(),
getIndexer().getProgress());
persistStateToClusterState(state, ActionListener.wrap(
task -> {
auditor.info(transform.getId(), "Updated state to [" + state.getTaskState() + "]");
@ -237,10 +267,14 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
@Override
public synchronized void triggered(Event event) {
if (getIndexer() == null) {
logger.warn("Data frame task [{}] triggered with an unintialized indexer", getTransformId());
return;
}
// for now no rerun, so only trigger if checkpoint == 0
if (currentCheckpoint.get() == 0 && event.getJobName().equals(SCHEDULE_NAME + "_" + transform.getId())) {
logger.debug("Data frame indexer [{}] schedule has triggered, state: [{}]", event.getJobName(), indexer.getState());
indexer.maybeTriggerAsyncJob(System.currentTimeMillis());
logger.debug("Data frame indexer [{}] schedule has triggered, state: [{}]", event.getJobName(), getIndexer().getState());
getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis());
}
}
@ -261,6 +295,17 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
markAsCompleted();
}
public DataFrameTransformProgress getProgress() {
if (indexer.get() == null) {
return null;
}
DataFrameTransformProgress indexerProgress = indexer.get().getProgress();
if (indexerProgress == null) {
return null;
}
return new DataFrameTransformProgress(indexerProgress);
}
void persistStateToClusterState(DataFrameTransformState state,
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
updatePersistentTaskState(state, ActionListener.wrap(
@ -276,6 +321,17 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
));
}
synchronized void markAsFailed(String reason, ActionListener<Void> listener) {
taskState.set(DataFrameTransformTaskState.FAILED);
stateReason.set(reason);
persistStateToClusterState(getState(), ActionListener.wrap(
r -> {
listener.onResponse(null);
},
listener::onFailure
));
}
/**
* This is called when the persistent task signals that the allocated task should be terminated.
* Termination in the task framework is essentially voluntary, as the allocated task can only be
@ -284,37 +340,174 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
@Override
public synchronized void onCancelled() {
logger.info(
"Received cancellation request for data frame transform [" + transform.getId() + "], state: [" + indexer.getState() + "]");
if (indexer.abort()) {
"Received cancellation request for data frame transform [" + transform.getId() + "], state: [" + taskState.get() + "]");
if (getIndexer() != null && getIndexer().abort()) {
// there is no background transform running, we can shutdown safely
shutdown();
}
}
protected class ClientDataFrameIndexer extends DataFrameIndexer {
private static final int LOAD_TRANSFORM_TIMEOUT_IN_SECONDS = 30;
synchronized void initializeIndexer(ClientDataFrameIndexerBuilder indexerBuilder) {
indexer.set(indexerBuilder.build(this));
}
static class ClientDataFrameIndexerBuilder {
private Client client;
private DataFrameTransformsConfigManager transformsConfigManager;
private DataFrameTransformsCheckpointService transformsCheckpointService;
private String transformId;
private DataFrameAuditor auditor;
private Map<String, String> fieldMappings;
private DataFrameTransformConfig transformConfig;
private DataFrameIndexerTransformStats initialStats;
private IndexerState indexerState = IndexerState.STOPPED;
private Map<String, Object> initialPosition;
private DataFrameTransformProgress progress;
ClientDataFrameIndexer build(DataFrameTransformTask parentTask) {
return new ClientDataFrameIndexer(this.transformId,
this.transformsConfigManager,
this.transformsCheckpointService,
new AtomicReference<>(this.indexerState),
this.initialPosition,
this.client,
this.auditor,
this.initialStats,
this.transformConfig,
this.fieldMappings,
this.progress,
parentTask);
}
ClientDataFrameIndexerBuilder setClient(Client client) {
this.client = client;
return this;
}
ClientDataFrameIndexerBuilder setTransformsConfigManager(DataFrameTransformsConfigManager transformsConfigManager) {
this.transformsConfigManager = transformsConfigManager;
return this;
}
ClientDataFrameIndexerBuilder setTransformsCheckpointService(DataFrameTransformsCheckpointService transformsCheckpointService) {
this.transformsCheckpointService = transformsCheckpointService;
return this;
}
ClientDataFrameIndexerBuilder setTransformId(String transformId) {
this.transformId = transformId;
return this;
}
ClientDataFrameIndexerBuilder setAuditor(DataFrameAuditor auditor) {
this.auditor = auditor;
return this;
}
ClientDataFrameIndexerBuilder setFieldMappings(Map<String, String> fieldMappings) {
this.fieldMappings = fieldMappings;
return this;
}
ClientDataFrameIndexerBuilder setTransformConfig(DataFrameTransformConfig transformConfig) {
this.transformConfig = transformConfig;
return this;
}
DataFrameTransformConfig getTransformConfig() {
return this.transformConfig;
}
ClientDataFrameIndexerBuilder setInitialStats(DataFrameIndexerTransformStats initialStats) {
this.initialStats = initialStats;
return this;
}
ClientDataFrameIndexerBuilder setIndexerState(IndexerState indexerState) {
this.indexerState = indexerState;
return this;
}
ClientDataFrameIndexerBuilder setInitialPosition(Map<String, Object> initialPosition) {
this.initialPosition = initialPosition;
return this;
}
ClientDataFrameIndexerBuilder setProgress(DataFrameTransformProgress progress) {
this.progress = progress;
return this;
}
}
static class ClientDataFrameIndexer extends DataFrameIndexer {
private final Client client;
private final DataFrameTransformsConfigManager transformsConfigManager;
private final DataFrameTransformsCheckpointService transformsCheckpointService;
private final String transformId;
private final DataFrameAuditor auditor;
private final DataFrameTransformTask transformTask;
private final Map<String, String> fieldMappings;
private final DataFrameTransformConfig transformConfig;
private volatile DataFrameTransformProgress progress;
private volatile DataFrameIndexerTransformStats previouslyPersistedStats = null;
private final AtomicInteger failureCount;
// Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index
private volatile String lastAuditedExceptionMessage = null;
private Map<String, String> fieldMappings = null;
private DataFrameTransformConfig transformConfig = null;
public ClientDataFrameIndexer(String transformId, DataFrameTransformsConfigManager transformsConfigManager,
ClientDataFrameIndexer(String transformId,
DataFrameTransformsConfigManager transformsConfigManager,
DataFrameTransformsCheckpointService transformsCheckpointService,
AtomicReference<IndexerState> initialState, Map<String, Object> initialPosition, Client client,
DataFrameAuditor auditor) {
super(threadPool.executor(ThreadPool.Names.GENERIC), auditor, initialState, initialPosition,
new DataFrameIndexerTransformStats(transformId));
this.transformId = transformId;
this.transformsConfigManager = transformsConfigManager;
this.transformsCheckpointService = transformsCheckpointService;
this.client = client;
AtomicReference<IndexerState> initialState,
Map<String, Object> initialPosition,
Client client,
DataFrameAuditor auditor,
DataFrameIndexerTransformStats initialStats,
DataFrameTransformConfig transformConfig,
Map<String, String> fieldMappings,
DataFrameTransformProgress transformProgress,
DataFrameTransformTask parentTask) {
super(ExceptionsHelper.requireNonNull(parentTask, "parentTask")
.threadPool
.executor(ThreadPool.Names.GENERIC),
ExceptionsHelper.requireNonNull(auditor, "auditor"),
ExceptionsHelper.requireNonNull(initialState, "initialState"),
initialPosition,
initialStats == null ? new DataFrameIndexerTransformStats(transformId) : initialStats);
this.transformId = ExceptionsHelper.requireNonNull(transformId, "transformId");
this.transformsConfigManager = ExceptionsHelper.requireNonNull(transformsConfigManager, "transformsConfigManager");
this.transformsCheckpointService = ExceptionsHelper.requireNonNull(transformsCheckpointService,
"transformsCheckpointService");
this.client = ExceptionsHelper.requireNonNull(client, "client");
this.auditor = auditor;
this.transformConfig = ExceptionsHelper.requireNonNull(transformConfig, "transformConfig");
this.transformTask = parentTask;
this.fieldMappings = ExceptionsHelper.requireNonNull(fieldMappings, "fieldMappings");
this.progress = transformProgress;
this.failureCount = new AtomicInteger(0);
}
@Override
protected void onStart(long now, ActionListener<Void> listener) {
// Reset our failure count as we are starting again
failureCount.set(0);
// On each run, we need to get the total number of docs and reset the count of processed docs
// Since multiple checkpoints can be executed in the task while it is running on the same node, we need to gather
// the progress here, and not in the executor.
if (initialRun()) {
TransformProgressGatherer.getInitialProgress(this.client, getConfig(), ActionListener.wrap(
newProgress -> {
progress = newProgress;
super.onStart(now, listener);
},
failure -> {
progress = null;
logger.warn("Unable to load progress information for task [" + transformId + "]", failure);
super.onStart(now, listener);
}
));
} else {
super.onStart(now, listener);
}
}
@Override
@ -327,6 +520,11 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
return fieldMappings;
}
@Override
protected DataFrameTransformProgress getProgress() {
return progress;
}
@Override
protected String getJobId() {
return transformId;
@ -334,56 +532,11 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
@Override
public synchronized boolean maybeTriggerAsyncJob(long now) {
if (taskState.get() == DataFrameTransformTaskState.FAILED) {
if (transformTask.taskState.get() == DataFrameTransformTaskState.FAILED) {
logger.debug("Schedule was triggered for transform [{}] but task is failed. Ignoring trigger.", getJobId());
return false;
}
if (transformConfig == null) {
CountDownLatch latch = new CountDownLatch(1);
transformsConfigManager.getTransformConfiguration(transformId, new LatchedActionListener<>(ActionListener.wrap(
config -> transformConfig = config,
e -> {
throw new RuntimeException(
DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_LOAD_TRANSFORM_CONFIGURATION, transformId), e);
}), latch));
try {
latch.await(LOAD_TRANSFORM_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(
DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_LOAD_TRANSFORM_CONFIGURATION, transformId), e);
}
}
if (transformConfig.isValid() == false) {
DataFrameConfigurationException exception = new DataFrameConfigurationException(transformId);
handleFailure(exception);
throw exception;
}
if (fieldMappings == null) {
CountDownLatch latch = new CountDownLatch(1);
SchemaUtil.getDestinationFieldMappings(client, transformConfig.getDestination().getIndex(), new LatchedActionListener<>(
ActionListener.wrap(
destinationMappings -> fieldMappings = destinationMappings,
e -> {
throw new RuntimeException(
DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS,
transformConfig.getDestination().getIndex()),
e);
}), latch));
try {
latch.await(LOAD_TRANSFORM_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(
DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS,
transformConfig.getDestination().getIndex()),
e);
}
}
return super.maybeTriggerAsyncJob(now);
}
@ -408,30 +561,28 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
}
final DataFrameTransformState state = new DataFrameTransformState(
taskState.get(),
transformTask.taskState.get(),
indexerState,
getPosition(),
currentCheckpoint.get(),
stateReason.get());
logger.info("Updating persistent state of transform [" + transform.getId() + "] to [" + state.toString() + "]");
transformTask.currentCheckpoint.get(),
transformTask.stateReason.get(),
getProgress());
logger.debug("Updating persistent state of transform [{}] to [{}]", transformConfig.getId(), state.toString());
// Persisting stats when we call `doSaveState` should be ok as we only call it on a state transition and
// only every-so-often when doing the bulk indexing calls. See AsyncTwoPhaseIndexer#onBulkResponse for current periodicity
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> updateClusterStateListener = ActionListener.wrap(
task -> {
// Make a copy of the previousStats so that they are not constantly updated when `merge` is called
DataFrameIndexerTransformStats tempStats = new DataFrameIndexerTransformStats(previousStats).merge(getStats());
// Only persist the stats if something has actually changed
if (previouslyPersistedStats == null || previouslyPersistedStats.equals(tempStats) == false) {
transformsConfigManager.putOrUpdateTransformStats(tempStats,
if (previouslyPersistedStats == null || previouslyPersistedStats.equals(getStats()) == false) {
transformsConfigManager.putOrUpdateTransformStats(getStats(),
ActionListener.wrap(
r -> {
previouslyPersistedStats = tempStats;
previouslyPersistedStats = getStats();
next.run();
},
statsExc -> {
logger.error("Updating stats of transform [" + transform.getId() + "] failed", statsExc);
logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);
next.run();
}
));
@ -441,24 +592,24 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
}
},
exc -> {
logger.error("Updating persistent state of transform [" + transform.getId() + "] failed", exc);
logger.error("Updating persistent state of transform [" + transformConfig.getId() + "] failed", exc);
next.run();
}
);
persistStateToClusterState(state, updateClusterStateListener);
transformTask.persistStateToClusterState(state, updateClusterStateListener);
}
@Override
protected void onFailure(Exception exc) {
// the failure handler must not throw an exception due to internal problems
try {
logger.warn("Data frame transform [" + transform.getId() + "] encountered an exception: ", exc);
logger.warn("Data frame transform [" + transformTask.getTransformId() + "] encountered an exception: ", exc);
// Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
// times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
if (exc.getMessage().equals(lastAuditedExceptionMessage) == false) {
auditor.warning(transform.getId(), "Data frame transform encountered an exception: " + exc.getMessage());
auditor.warning(transformTask.getTransformId(), "Data frame transform encountered an exception: " + exc.getMessage());
lastAuditedExceptionMessage = exc.getMessage();
}
handleFailure(exc);
@ -471,9 +622,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
protected void onFinish(ActionListener<Void> listener) {
try {
super.onFinish(listener);
long checkpoint = currentCheckpoint.incrementAndGet();
auditor.info(transform.getId(), "Finished indexing for data frame transform checkpoint [" + checkpoint + "]");
logger.info("Finished indexing for data frame transform [" + transform.getId() + "] checkpoint [" + checkpoint + "]");
long checkpoint = transformTask.currentCheckpoint.incrementAndGet();
auditor.info(transformTask.getTransformId(), "Finished indexing for data frame transform checkpoint [" + checkpoint + "]");
logger.info(
"Finished indexing for data frame transform [" + transformTask.getTransformId() + "] checkpoint [" + checkpoint + "]");
listener.onResponse(null);
} catch (Exception e) {
listener.onFailure(e);
@ -482,26 +634,29 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
@Override
protected void onAbort() {
auditor.info(transform.getId(), "Received abort request, stopping indexer");
logger.info("Data frame transform [" + transform.getId() + "] received abort request, stopping indexer");
shutdown();
auditor.info(transformConfig.getId(), "Received abort request, stopping indexer");
logger.info("Data frame transform [" + transformConfig.getId() + "] received abort request, stopping indexer");
transformTask.shutdown();
}
@Override
protected void createCheckpoint(ActionListener<Void> listener) {
transformsCheckpointService.getCheckpoint(transformConfig, currentCheckpoint.get() + 1, ActionListener.wrap(checkpoint -> {
transformsConfigManager.putTransformCheckpoint(checkpoint, ActionListener.wrap(putCheckPointResponse -> {
listener.onResponse(null);
}, createCheckpointException -> {
listener.onFailure(new RuntimeException("Failed to create checkpoint", createCheckpointException));
}));
}, getCheckPointException -> {
listener.onFailure(new RuntimeException("Failed to retrieve checkpoint", getCheckPointException));
}));
transformsCheckpointService.getCheckpoint(transformConfig,
transformTask.currentCheckpoint.get() + 1,
ActionListener.wrap(
checkpoint -> transformsConfigManager.putTransformCheckpoint(checkpoint,
ActionListener.wrap(
putCheckPointResponse -> listener.onResponse(null),
createCheckpointException ->
listener.onFailure(new RuntimeException("Failed to create checkpoint", createCheckpointException))
)),
getCheckPointException ->
listener.onFailure(new RuntimeException("Failed to retrieve checkpoint", getCheckPointException))
));
}
private boolean isIrrecoverableFailure(Exception e) {
return e instanceof IndexNotFoundException || e instanceof DataFrameConfigurationException;
return e instanceof IndexNotFoundException;
}
synchronized void handleFailure(Exception e) {
@ -520,21 +675,12 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
@Override
protected void failIndexer(String failureMessage) {
logger.error("Data frame transform [" + getJobId() + "]:" + failureMessage);
auditor.error(transform.getId(), failureMessage);
stateReason.set(failureMessage);
taskState.set(DataFrameTransformTaskState.FAILED);
persistStateToClusterState(DataFrameTransformTask.this.getState(), ActionListener.wrap(
r -> failureCount.set(0), // Successfully marked as failed, reset counter so that task can be restarted
exception -> {} // Noop, internal method logs the failure to update the state
));
auditor.error(transformTask.getTransformId(), failureMessage);
transformTask.markAsFailed(failureMessage, ActionListener.wrap(
r -> {
// Successfully marked as failed, reset counter so that task can be restarted
failureCount.set(0);
}, e -> {}));
}
}
class DataFrameConfigurationException extends RuntimeException {
DataFrameConfigurationException(String transformId) {
super(DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID, transformId));
}
}
}

View File

@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.transforms;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress;
/**
* Utility class to gather the progress information for a given config and its cursor position
*/
public final class TransformProgressGatherer {
/**
* This gathers the total docs given the config and search
*
* TODO: Support checkpointing logic to restrict the query
* @param progressListener The listener to alert on completion
*/
public static void getInitialProgress(Client client,
DataFrameTransformConfig config,
ActionListener<DataFrameTransformProgress> progressListener) {
SearchRequest request = client.prepareSearch(config.getSource().getIndex())
.setSize(0)
.setAllowPartialSearchResults(false)
.setTrackTotalHits(true)
.setQuery(config.getSource().getQueryConfig().getQuery())
.request();
ActionListener<SearchResponse> searchResponseActionListener = ActionListener.wrap(
searchResponse -> {
progressListener.onResponse(new DataFrameTransformProgress(searchResponse.getHits().getTotalHits().value, null));
},
progressListener::onFailure
);
ClientHelper.executeWithHeadersAsync(config.getHeaders(),
ClientHelper.DATA_FRAME_ORIGIN,
client,
SearchAction.INSTANCE,
request,
searchResponseActionListener);
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
@ -94,6 +95,11 @@ public class DataFrameIndexerTests extends ESTestCase {
return fieldMappings;
}
@Override
protected DataFrameTransformProgress getProgress() {
return null;
}
@Override
protected void createCheckpoint(ActionListener<Void> listener) {
listener.onResponse(null);

View File

@ -34,6 +34,7 @@ teardown:
- do:
data_frame.stop_data_frame_transform:
transform_id: "airline-transform-stats"
wait_for_completion: true
- do:
data_frame.delete_data_frame_transform:
@ -197,6 +198,7 @@ teardown:
- match: { transforms.0.id: "airline-transform-stats-dos" }
- match: { transforms.0.state.indexer_state: "stopped" }
- match: { transforms.0.state.checkpoint: 0 }
- is_false: transforms.0.state.progress
- match: { transforms.0.stats.pages_processed: 0 }
- match: { transforms.0.stats.documents_processed: 0 }
- match: { transforms.0.stats.documents_indexed: 0 }