diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformProgress.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformProgress.java new file mode 100644 index 00000000000..a4177a33487 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformProgress.java @@ -0,0 +1,94 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.dataframe.transforms; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.util.Objects; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; + +public class DataFrameTransformProgress { + + public static final ParseField TOTAL_DOCS = new ParseField("total_docs"); + public static final ParseField DOCS_REMAINING = new ParseField("docs_remaining"); + public static final ParseField PERCENT_COMPLETE = new ParseField("percent_complete"); + + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "data_frame_transform_progress", + true, + a -> new DataFrameTransformProgress((Long) a[0], (Long)a[1], (Double)a[2])); + + static { + PARSER.declareLong(constructorArg(), TOTAL_DOCS); + PARSER.declareLong(optionalConstructorArg(), DOCS_REMAINING); + PARSER.declareDouble(optionalConstructorArg(), PERCENT_COMPLETE); + } + + public static DataFrameTransformProgress fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } + + private final long totalDocs; + private final long remainingDocs; + private final double percentComplete; + + public DataFrameTransformProgress(long totalDocs, Long remainingDocs, double percentComplete) { + this.totalDocs = totalDocs; + this.remainingDocs = remainingDocs == null ? totalDocs : remainingDocs; + this.percentComplete = percentComplete; + } + + public double getPercentComplete() { + return percentComplete; + } + + public long getTotalDocs() { + return totalDocs; + } + + public long getRemainingDocs() { + return remainingDocs; + } + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } + + if (other == null || other.getClass() != getClass()) { + return false; + } + + DataFrameTransformProgress that = (DataFrameTransformProgress) other; + return Objects.equals(this.remainingDocs, that.remainingDocs) + && Objects.equals(this.totalDocs, that.totalDocs) + && Objects.equals(this.percentComplete, that.percentComplete); + } + + @Override + public int hashCode(){ + return Objects.hash(remainingDocs, totalDocs, percentComplete); + } +} diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformState.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformState.java index 6bbc7a00b1b..352cbfb67fc 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformState.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformState.java @@ -23,16 +23,14 @@ import org.elasticsearch.client.core.IndexerState; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.xcontent.ConstructingObjectParser; -import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser.ValueType; import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; import java.util.Collections; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; -import java.util.SortedMap; -import java.util.TreeMap; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; @@ -44,33 +42,25 @@ public class DataFrameTransformState { private static final ParseField CURRENT_POSITION = new ParseField("current_position"); private static final ParseField CHECKPOINT = new ParseField("checkpoint"); private static final ParseField REASON = new ParseField("reason"); + private static final ParseField PROGRESS = new ParseField("progress"); @SuppressWarnings("unchecked") public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("data_frame_transform_state", true, args -> new DataFrameTransformState((DataFrameTransformTaskState) args[0], (IndexerState) args[1], - (HashMap) args[2], + (Map) args[2], (long) args[3], - (String) args[4])); + (String) args[4], + (DataFrameTransformProgress) args[5])); static { - PARSER.declareField(constructorArg(), - p -> DataFrameTransformTaskState.fromString(p.text()), - TASK_STATE, - ObjectParser.ValueType.STRING); - PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE, ObjectParser.ValueType.STRING); - PARSER.declareField(optionalConstructorArg(), p -> { - if (p.currentToken() == XContentParser.Token.START_OBJECT) { - return p.map(); - } - if (p.currentToken() == XContentParser.Token.VALUE_NULL) { - return null; - } - throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); - }, CURRENT_POSITION, ObjectParser.ValueType.VALUE_OBJECT_ARRAY); + PARSER.declareField(constructorArg(), p -> DataFrameTransformTaskState.fromString(p.text()), TASK_STATE, ValueType.STRING); + PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE, ValueType.STRING); + PARSER.declareField(optionalConstructorArg(), (p, c) -> p.mapOrdered(), CURRENT_POSITION, ValueType.OBJECT); PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), CHECKPOINT); PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REASON); + PARSER.declareField(optionalConstructorArg(), DataFrameTransformProgress::fromXContent, PROGRESS, ValueType.OBJECT); } public static DataFrameTransformState fromXContent(XContentParser parser) throws IOException { @@ -80,19 +70,22 @@ public class DataFrameTransformState { private final DataFrameTransformTaskState taskState; private final IndexerState indexerState; private final long checkpoint; - private final SortedMap currentPosition; + private final Map currentPosition; private final String reason; + private final DataFrameTransformProgress progress; public DataFrameTransformState(DataFrameTransformTaskState taskState, IndexerState indexerState, @Nullable Map position, long checkpoint, - @Nullable String reason) { + @Nullable String reason, + @Nullable DataFrameTransformProgress progress) { this.taskState = taskState; this.indexerState = indexerState; - this.currentPosition = position == null ? null : Collections.unmodifiableSortedMap(new TreeMap<>(position)); + this.currentPosition = position == null ? null : Collections.unmodifiableMap(new LinkedHashMap<>(position)); this.checkpoint = checkpoint; this.reason = reason; + this.progress = progress; } public IndexerState getIndexerState() { @@ -117,6 +110,11 @@ public class DataFrameTransformState { return reason; } + @Nullable + public DataFrameTransformProgress getProgress() { + return progress; + } + @Override public boolean equals(Object other) { if (this == other) { @@ -132,13 +130,14 @@ public class DataFrameTransformState { return Objects.equals(this.taskState, that.taskState) && Objects.equals(this.indexerState, that.indexerState) && Objects.equals(this.currentPosition, that.currentPosition) && + Objects.equals(this.progress, that.progress) && this.checkpoint == that.checkpoint && Objects.equals(this.reason, that.reason); } @Override public int hashCode() { - return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason); + return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason, progress); } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateAndStats.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateAndStats.java index 938563796ca..9914a0e6331 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateAndStats.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateAndStats.java @@ -57,7 +57,7 @@ public class DataFrameTransformStateAndStats { private final DataFrameTransformCheckpointingInfo checkpointingInfo; public DataFrameTransformStateAndStats(String id, DataFrameTransformState state, DataFrameIndexerTransformStats stats, - DataFrameTransformCheckpointingInfo checkpointingInfo) { + DataFrameTransformCheckpointingInfo checkpointingInfo) { this.id = id; this.transformState = state; this.transformStats = stats; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java index 3e564a86207..3d7f5e3dbcb 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java @@ -71,6 +71,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; @@ -360,6 +361,10 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase { assertEquals(DataFrameTransformTaskState.STARTED, stateAndStats.getTransformState().getTaskState()); assertEquals(null, stateAndStats.getTransformState().getReason()); assertNotEquals(zeroIndexerStats, stateAndStats.getTransformStats()); + assertNotNull(stateAndStats.getTransformState().getProgress()); + assertThat(stateAndStats.getTransformState().getProgress().getPercentComplete(), equalTo(100.0)); + assertThat(stateAndStats.getTransformState().getProgress().getTotalDocs(), greaterThan(0L)); + assertThat(stateAndStats.getTransformState().getProgress().getRemainingDocs(), equalTo(0L)); }); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformProgressTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformProgressTests.java new file mode 100644 index 00000000000..573e2ffdbb9 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformProgressTests.java @@ -0,0 +1,55 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.dataframe.transforms; + +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester; + +public class DataFrameTransformProgressTests extends ESTestCase { + + public void testFromXContent() throws IOException { + xContentTester(this::createParser, + DataFrameTransformProgressTests::randomInstance, + DataFrameTransformProgressTests::toXContent, + DataFrameTransformProgress::fromXContent) + .supportsUnknownFields(true) + .randomFieldsExcludeFilter(field -> field.startsWith("state")) + .test(); + } + + public static DataFrameTransformProgress randomInstance() { + long totalDocs = randomNonNegativeLong(); + Long docsRemaining = randomBoolean() ? null : randomLongBetween(0, totalDocs); + double percentComplete = totalDocs == 0 ? 1.0 : docsRemaining == null ? 0.0 : 100.0*(double)(totalDocs - docsRemaining)/totalDocs; + return new DataFrameTransformProgress(totalDocs, docsRemaining, percentComplete); + } + + public static void toXContent(DataFrameTransformProgress progress, XContentBuilder builder) throws IOException { + builder.startObject(); + builder.field(DataFrameTransformProgress.TOTAL_DOCS.getPreferredName(), progress.getTotalDocs()); + builder.field(DataFrameTransformProgress.DOCS_REMAINING.getPreferredName(), progress.getRemainingDocs()); + builder.field(DataFrameTransformProgress.PERCENT_COMPLETE.getPreferredName(), progress.getPercentComplete()); + builder.endObject(); + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateAndStatsTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateAndStatsTests.java index 88628699104..6ebdec5a690 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateAndStatsTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateAndStatsTests.java @@ -40,9 +40,9 @@ public class DataFrameTransformStateAndStatsTests extends ESTestCase { public static DataFrameTransformStateAndStats randomInstance() { return new DataFrameTransformStateAndStats(randomAlphaOfLength(10), - DataFrameTransformStateTests.randomDataFrameTransformState(), - DataFrameIndexerTransformStatsTests.randomStats(), - DataFrameTransformCheckpointingInfoTests.randomDataFrameTransformCheckpointingInfo()); + DataFrameTransformStateTests.randomDataFrameTransformState(), + DataFrameIndexerTransformStatsTests.randomStats(), + DataFrameTransformCheckpointingInfoTests.randomDataFrameTransformCheckpointingInfo()); } public static void toXContent(DataFrameTransformStateAndStats stateAndStats, XContentBuilder builder) throws IOException { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateTests.java index 7d1d713a127..4ada50c20d2 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateTests.java @@ -24,7 +24,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.test.ESTestCase; import java.io.IOException; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester; @@ -46,7 +46,8 @@ public class DataFrameTransformStateTests extends ESTestCase { randomFrom(IndexerState.values()), randomPositionMap(), randomLongBetween(0,10), - randomBoolean() ? null : randomAlphaOfLength(10)); + randomBoolean() ? null : randomAlphaOfLength(10), + randomBoolean() ? null : DataFrameTransformProgressTests.randomInstance()); } public static void toXContent(DataFrameTransformState state, XContentBuilder builder) throws IOException { @@ -60,6 +61,10 @@ public class DataFrameTransformStateTests extends ESTestCase { if (state.getReason() != null) { builder.field("reason", state.getReason()); } + if (state.getProgress() != null) { + builder.field("progress"); + DataFrameTransformProgressTests.toXContent(state.getProgress(), builder); + } builder.endObject(); } @@ -68,7 +73,7 @@ public class DataFrameTransformStateTests extends ESTestCase { return null; } int numFields = randomIntBetween(1, 5); - Map position = new HashMap<>(); + Map position = new LinkedHashMap<>(); for (int i = 0; i < numFields; i++) { Object value; if (randomBoolean()) { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformProgressTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformProgressTests.java new file mode 100644 index 00000000000..be589a63248 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformProgressTests.java @@ -0,0 +1,57 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.dataframe.transforms.hlrc; + +import org.elasticsearch.client.AbstractResponseTestCase; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress; + +import static org.hamcrest.Matchers.equalTo; + +public class DataFrameTransformProgressTests extends AbstractResponseTestCase< + DataFrameTransformProgress, + org.elasticsearch.client.dataframe.transforms.DataFrameTransformProgress> { + + public static DataFrameTransformProgress fromHlrc( + org.elasticsearch.client.dataframe.transforms.DataFrameTransformProgress instance) { + if (instance == null) { + return null; + } + return new DataFrameTransformProgress(instance.getTotalDocs(), instance.getRemainingDocs()); + } + + @Override + protected DataFrameTransformProgress createServerTestInstance() { + return DataFrameTransformStateTests.randomDataFrameTransformProgress(); + } + + @Override + protected org.elasticsearch.client.dataframe.transforms.DataFrameTransformProgress doParseToClientInstance(XContentParser parser) { + return org.elasticsearch.client.dataframe.transforms.DataFrameTransformProgress.fromXContent(parser); + } + + @Override + protected void assertInstances(DataFrameTransformProgress serverTestInstance, + org.elasticsearch.client.dataframe.transforms.DataFrameTransformProgress clientInstance) { + assertThat(serverTestInstance.getTotalDocs(), equalTo(clientInstance.getTotalDocs())); + assertThat(serverTestInstance.getRemainingDocs(), equalTo(clientInstance.getRemainingDocs())); + assertThat(serverTestInstance.getPercentComplete(), equalTo(clientInstance.getPercentComplete())); + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStateTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStateTests.java index 457c68d593e..4c80365bc53 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStateTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStateTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.client.AbstractHlrcXContentTestCase; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; @@ -40,7 +41,7 @@ public class DataFrameTransformStateTests extends AbstractHlrcXContentTestCase randomPosition() { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java index b7d6967206c..daa9dc06fee 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java @@ -40,6 +40,7 @@ import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest; import org.elasticsearch.client.dataframe.StopDataFrameTransformResponse; import org.elasticsearch.client.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.client.dataframe.transforms.DataFrameTransformProgress; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStats; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState; import org.elasticsearch.client.dataframe.transforms.DestConfig; @@ -465,18 +466,21 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest // tag::get-data-frame-transform-stats-response DataFrameTransformStateAndStats stateAndStats = - response.getTransformsStateAndStats().get(0); // <1> + response.getTransformsStateAndStats().get(0); // <1> DataFrameTransformTaskState taskState = stateAndStats.getTransformState().getTaskState(); // <2> IndexerState indexerState = - stateAndStats.getTransformState().getIndexerState(); // <3> + stateAndStats.getTransformState().getIndexerState(); // <3> DataFrameIndexerTransformStats transformStats = - stateAndStats.getTransformStats(); // <4> + stateAndStats.getTransformStats(); // <4> + DataFrameTransformProgress progress = + stateAndStats.getTransformState().getProgress(); // <5> // end::get-data-frame-transform-stats-response assertEquals(IndexerState.STOPPED, indexerState); assertEquals(DataFrameTransformTaskState.STOPPED, taskState); assertNotNull(transformStats); + assertNull(progress); } { // tag::get-data-frame-transform-stats-execute-listener diff --git a/docs/java-rest/high-level/dataframe/get_data_frame_stats.asciidoc b/docs/java-rest/high-level/dataframe/get_data_frame_stats.asciidoc index 2b377d22c81..cdc6254a4e4 100644 --- a/docs/java-rest/high-level/dataframe/get_data_frame_stats.asciidoc +++ b/docs/java-rest/high-level/dataframe/get_data_frame_stats.asciidoc @@ -37,4 +37,6 @@ include-tagged::{doc-tests-file}[{api}-response] <1> The response contains a list of `DataFrameTransformStateAndStats` objects <2> The running state of the transform task e.g `started` <3> The running state of the transform indexer e.g `started`, `indexing`, etc. -<4> The transform progress statistics recording the number of documents indexed etc \ No newline at end of file +<4> The overall transform statistics recording the number of documents indexed etc. +<5> The progress of the current run in the transform. Supplies the number of docs left until the next checkpoint +and the total number of docs expected. \ No newline at end of file diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformProgress.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformProgress.java new file mode 100644 index 00000000000..5b7346bca2a --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformProgress.java @@ -0,0 +1,135 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.dataframe.transforms; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; + +public class DataFrameTransformProgress implements Writeable, ToXContentObject { + + private static final ParseField TOTAL_DOCS = new ParseField("total_docs"); + private static final ParseField DOCS_REMAINING = new ParseField("docs_remaining"); + private static final String PERCENT_COMPLETE = "percent_complete"; + + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "data_frame_transform_progress", + true, + a -> new DataFrameTransformProgress((Long) a[0], (Long)a[1])); + + static { + PARSER.declareLong(constructorArg(), TOTAL_DOCS); + PARSER.declareLong(optionalConstructorArg(), DOCS_REMAINING); + } + + private final long totalDocs; + private long remainingDocs; + + public DataFrameTransformProgress(long totalDocs, Long remainingDocs) { + if (totalDocs < 0) { + throw new IllegalArgumentException("[total_docs] must be >0."); + } + this.totalDocs = totalDocs; + if (remainingDocs != null && remainingDocs < 0) { + throw new IllegalArgumentException("[docs_remaining] must be >0."); + } + this.remainingDocs = remainingDocs == null ? totalDocs : remainingDocs; + } + + public DataFrameTransformProgress(DataFrameTransformProgress otherProgress) { + this.totalDocs = otherProgress.totalDocs; + this.remainingDocs = otherProgress.remainingDocs; + } + + public DataFrameTransformProgress(StreamInput in) throws IOException { + this.totalDocs = in.readLong(); + this.remainingDocs = in.readLong(); + } + + public Double getPercentComplete() { + if (totalDocs == 0) { + return 100.0; + } + long docsRead = totalDocs - remainingDocs; + if (docsRead < 0) { + return 100.0; + } + return 100.0*(double)docsRead/totalDocs; + } + + public long getTotalDocs() { + return totalDocs; + } + + public long getRemainingDocs() { + return remainingDocs; + } + + public void resetRemainingDocs() { + this.remainingDocs = totalDocs; + } + + public void docsProcessed(long docsProcessed) { + assert docsProcessed >= 0; + if (docsProcessed > remainingDocs) { + remainingDocs = 0; + } else { + remainingDocs -= docsProcessed; + } + } + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } + + if (other == null || other.getClass() != getClass()) { + return false; + } + + DataFrameTransformProgress that = (DataFrameTransformProgress) other; + return Objects.equals(this.remainingDocs, that.remainingDocs) && Objects.equals(this.totalDocs, that.totalDocs); + } + + @Override + public int hashCode(){ + return Objects.hash(remainingDocs, totalDocs); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(totalDocs); + out.writeLong(remainingDocs); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(TOTAL_DOCS.getPreferredName(), totalDocs); + builder.field(DOCS_REMAINING.getPreferredName(), remainingDocs); + builder.field(PERCENT_COMPLETE, getPercentComplete()); + builder.endObject(); + return builder; + } + + @Override + public String toString() { + return Strings.toString(this, true, true); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java index 76d68cf178c..bc1b710cd2e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java @@ -12,7 +12,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ConstructingObjectParser; -import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser.ValueType; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.persistent.PersistentTaskState; @@ -22,10 +22,9 @@ import org.elasticsearch.xpack.core.indexing.IndexerState; import java.io.IOException; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; -import java.util.SortedMap; -import java.util.TreeMap; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; @@ -35,10 +34,11 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState private final DataFrameTransformTaskState taskState; private final IndexerState indexerState; + private final DataFrameTransformProgress progress; private final long checkpoint; @Nullable - private final SortedMap currentPosition; + private final Map currentPosition; @Nullable private final String reason; @@ -47,6 +47,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState private static final ParseField CURRENT_POSITION = new ParseField("current_position"); private static final ParseField CHECKPOINT = new ParseField("checkpoint"); private static final ParseField REASON = new ParseField("reason"); + private static final ParseField PROGRESS = new ParseField("progress"); @SuppressWarnings("unchecked") public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, @@ -55,53 +56,40 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState (IndexerState) args[1], (Map) args[2], (long) args[3], - (String) args[4])); + (String) args[4], + (DataFrameTransformProgress) args[5])); static { - PARSER.declareField(constructorArg(), p -> { - if (p.currentToken() == XContentParser.Token.VALUE_STRING) { - return DataFrameTransformTaskState.fromString(p.text()); - } - throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); - }, TASK_STATE, ObjectParser.ValueType.STRING); - PARSER.declareField(constructorArg(), p -> { - if (p.currentToken() == XContentParser.Token.VALUE_STRING) { - return IndexerState.fromString(p.text()); - } - throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); - - }, INDEXER_STATE, ObjectParser.ValueType.STRING); - PARSER.declareField(optionalConstructorArg(), p -> { - if (p.currentToken() == XContentParser.Token.START_OBJECT) { - return p.map(); - } - if (p.currentToken() == XContentParser.Token.VALUE_NULL) { - return null; - } - throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); - }, CURRENT_POSITION, ObjectParser.ValueType.VALUE_OBJECT_ARRAY); + PARSER.declareField(constructorArg(), p -> DataFrameTransformTaskState.fromString(p.text()), TASK_STATE, ValueType.STRING); + PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE, ValueType.STRING); + PARSER.declareField(optionalConstructorArg(), XContentParser::mapOrdered, CURRENT_POSITION, ValueType.OBJECT); PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), CHECKPOINT); PARSER.declareString(optionalConstructorArg(), REASON); + PARSER.declareField(optionalConstructorArg(), DataFrameTransformProgress.PARSER::apply, PROGRESS, ValueType.OBJECT); } public DataFrameTransformState(DataFrameTransformTaskState taskState, IndexerState indexerState, @Nullable Map position, long checkpoint, - @Nullable String reason) { + @Nullable String reason, + @Nullable DataFrameTransformProgress progress) { this.taskState = taskState; this.indexerState = indexerState; - this.currentPosition = position == null ? null : Collections.unmodifiableSortedMap(new TreeMap<>(position)); + this.currentPosition = position == null ? null : Collections.unmodifiableMap(new LinkedHashMap<>(position)); this.checkpoint = checkpoint; this.reason = reason; + this.progress = progress; } public DataFrameTransformState(StreamInput in) throws IOException { taskState = DataFrameTransformTaskState.fromStream(in); indexerState = IndexerState.fromStream(in); - currentPosition = in.readBoolean() ? Collections.unmodifiableSortedMap(new TreeMap<>(in.readMap())) : null; + Map position = in.readMap(); + currentPosition = position == null ? null : Collections.unmodifiableMap(position); checkpoint = in.readLong(); reason = in.readOptionalString(); + progress = in.readOptionalWriteable(DataFrameTransformProgress::new); } public DataFrameTransformTaskState getTaskState() { @@ -120,6 +108,10 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState return checkpoint; } + public DataFrameTransformProgress getProgress() { + return progress; + } + /** * Get the in-progress checkpoint * @@ -153,6 +145,9 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState if (reason != null) { builder.field(REASON.getPreferredName(), reason); } + if (progress != null) { + builder.field(PROGRESS.getPreferredName(), progress); + } builder.endObject(); return builder; } @@ -166,12 +161,10 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState public void writeTo(StreamOutput out) throws IOException { taskState.writeTo(out); indexerState.writeTo(out); - out.writeBoolean(currentPosition != null); - if (currentPosition != null) { - out.writeMap(currentPosition); - } + out.writeMap(currentPosition); out.writeLong(checkpoint); out.writeOptionalString(reason); + out.writeOptionalWriteable(progress); } @Override @@ -190,12 +183,13 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState Objects.equals(this.indexerState, that.indexerState) && Objects.equals(this.currentPosition, that.currentPosition) && this.checkpoint == that.checkpoint && - Objects.equals(this.reason, that.reason); + Objects.equals(this.reason, that.reason) && + Objects.equals(this.progress, that.progress); } @Override public int hashCode() { - return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason); + return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason, progress); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateAndStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateAndStats.java index 57171fca43f..2a145ba260f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateAndStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateAndStats.java @@ -53,7 +53,7 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj public static DataFrameTransformStateAndStats initialStateAndStats(String id, DataFrameIndexerTransformStats indexerTransformStats) { return new DataFrameTransformStateAndStats(id, - new DataFrameTransformState(DataFrameTransformTaskState.STOPPED, IndexerState.STOPPED, null, 0L, null), + new DataFrameTransformState(DataFrameTransformTaskState.STOPPED, IndexerState.STOPPED, null, 0L, null, null), indexerTransformStats, DataFrameTransformCheckpointingInfo.EMPTY); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/SingleGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/SingleGroupSource.java index 0cdef0e4c3a..0a4cf257946 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/SingleGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/SingleGroupSource.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.dataframe.transforms.pivot; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -116,4 +117,9 @@ public abstract class SingleGroupSource implements Writeable, ToXContentObject { public int hashCode() { return Objects.hash(field); } + + @Override + public String toString() { + return Strings.toString(this, true, true); + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/utils/ExceptionsHelper.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/utils/ExceptionsHelper.java new file mode 100644 index 00000000000..8bfd558b209 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/utils/ExceptionsHelper.java @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.dataframe.utils; + +/** + * Collection of methods to aid in creating and checking for exceptions. + */ +public class ExceptionsHelper { + /** + * A more REST-friendly Object.requireNonNull() + */ + public static T requireNonNull(T obj, String paramName) { + if (obj == null) { + throw new IllegalArgumentException("[" + paramName + "] must not be null."); + } + return obj; + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformProgressTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformProgressTests.java new file mode 100644 index 00000000000..8339669057e --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformProgressTests.java @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.dataframe.transforms; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; + +import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.equalTo; + +public class DataFrameTransformProgressTests extends AbstractSerializingDataFrameTestCase { + public static DataFrameTransformProgress randomDataFrameTransformProgress() { + long totalDocs = randomNonNegativeLong(); + return new DataFrameTransformProgress(totalDocs, randomBoolean() ? null : randomLongBetween(0, totalDocs)); + } + + @Override + protected DataFrameTransformProgress doParseInstance(XContentParser parser) throws IOException { + return DataFrameTransformProgress.PARSER.apply(parser, null); + } + + @Override + protected DataFrameTransformProgress createTestInstance() { + return randomDataFrameTransformProgress(); + } + + @Override + protected Reader instanceReader() { + return DataFrameTransformProgress::new; + } + + public void testPercentComplete() { + DataFrameTransformProgress progress = new DataFrameTransformProgress(0L, 100L); + assertThat(progress.getPercentComplete(), equalTo(100.0)); + + progress = new DataFrameTransformProgress(100L, 0L); + assertThat(progress.getPercentComplete(), equalTo(100.0)); + + progress = new DataFrameTransformProgress(100L, 10000L); + assertThat(progress.getPercentComplete(), equalTo(100.0)); + + progress = new DataFrameTransformProgress(100L, null); + assertThat(progress.getPercentComplete(), equalTo(0.0)); + + progress = new DataFrameTransformProgress(100L, 50L); + assertThat(progress.getPercentComplete(), closeTo(50.0, 0.000001)); + } + + public void testConstructor() { + IllegalArgumentException ex = + expectThrows(IllegalArgumentException.class, () -> new DataFrameTransformProgress(-1, null)); + assertThat(ex.getMessage(), equalTo("[total_docs] must be >0.")); + + ex = expectThrows(IllegalArgumentException.class, () -> new DataFrameTransformProgress(1L, -1L)); + assertThat(ex.getMessage(), equalTo("[docs_remaining] must be >0.")); + } + +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateTests.java index 341faafdf12..c978978b058 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateTests.java @@ -16,6 +16,8 @@ import java.util.HashMap; import java.util.Map; import java.util.function.Predicate; +import static org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgressTests.randomDataFrameTransformProgress; + public class DataFrameTransformStateTests extends AbstractSerializingTestCase { public static DataFrameTransformState randomDataFrameTransformState() { @@ -23,7 +25,8 @@ public class DataFrameTransformStateTests extends AbstractSerializingTestCase> transformsStats = (List>)XContentMapValues.extractValue("transforms", stats); + // Verify that both transforms have valid stats + for (Map transformStats : transformsStats) { + Map stat = (Map)transformStats.get("stats"); + assertThat("documents_processed is not > 0.", ((Integer)stat.get("documents_processed")), greaterThan(0)); + assertThat("search_total is not > 0.", ((Integer)stat.get("search_total")), greaterThan(0)); + assertThat("pages_processed is not > 0.", ((Integer)stat.get("pages_processed")), greaterThan(0)); + Map progress = (Map)XContentMapValues.extractValue("state.progress", transformStats); + assertThat("total_docs is not 1000", progress.get("total_docs"), equalTo(1000)); + assertThat("docs_remaining is not 0", progress.get("docs_remaining"), equalTo(0)); + assertThat("percent_complete is not 100.0", progress.get("percent_complete"), equalTo(100.0)); + } + // only pivot_1 getRequest = createRequestWithAuth("GET", DATAFRAME_ENDPOINT + "pivot_1/_stats", authHeader); stats = entityAsMap(client().performRequest(getRequest)); @@ -134,4 +148,32 @@ public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase { assertThat(((Integer)stat.get("pages_processed")), greaterThan(0)); } } + + @SuppressWarnings("unchecked") + public void testGetProgressStatsWithPivotQuery() throws Exception { + String transformId = "simpleStatsPivotWithQuery"; + String dataFrameIndex = "pivot_stats_reviews_user_id_above_20"; + String query = "\"match\": {\"user_id\": \"user_26\"}"; + createPivotReviewsTransform(transformId, dataFrameIndex, query); + startAndWaitForTransform(transformId, dataFrameIndex); + + // Alternate testing between admin and lowly user, as both should be able to get the configs and stats + String authHeader = randomFrom(BASIC_AUTH_VALUE_DATA_FRAME_USER, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN); + + Request getRequest = createRequestWithAuth("GET", DATAFRAME_ENDPOINT + "simpleStatsPivotWithQuery/_stats", authHeader); + Map stats = entityAsMap(client().performRequest(getRequest)); + assertEquals(1, XContentMapValues.extractValue("count", stats)); + List> transformsStats = (List>)XContentMapValues.extractValue("transforms", stats); + // Verify that the transform has stats and the total docs process matches the expected + for (Map transformStats : transformsStats) { + Map stat = (Map)transformStats.get("stats"); + assertThat("documents_processed is not > 0.", ((Integer)stat.get("documents_processed")), greaterThan(0)); + assertThat("search_total is not > 0.", ((Integer)stat.get("search_total")), greaterThan(0)); + assertThat("pages_processed is not > 0.", ((Integer)stat.get("pages_processed")), greaterThan(0)); + Map progress = (Map)XContentMapValues.extractValue("state.progress", transformStats); + assertThat("total_docs is not 37", progress.get("total_docs"), equalTo(37)); + assertThat("docs_remaining is not 0", progress.get("docs_remaining"), equalTo(0)); + assertThat("percent_complete is not 100.0", progress.get("percent_complete"), equalTo(100.0)); + } + } } diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java new file mode 100644 index 00000000000..e32842af0cc --- /dev/null +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java @@ -0,0 +1,189 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.dataframe.integration; + +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.SecuritySettingsSourceField; +import org.elasticsearch.transport.Netty4Plugin; +import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.XPackClientPlugin; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress; +import org.elasticsearch.xpack.core.dataframe.transforms.DestConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.QueryConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.SourceConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.pivot.AggregationConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.pivot.GroupConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.pivot.HistogramGroupSource; +import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfig; +import org.elasticsearch.xpack.core.security.SecurityField; +import org.elasticsearch.xpack.dataframe.transforms.TransformProgressGatherer; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.xpack.dataframe.integration.DataFrameRestTestCase.REVIEWS_INDEX_NAME; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class DataFrameTransformProgressIT extends ESIntegTestCase { + + protected void createReviewsIndex() throws Exception { + final int numDocs = 1000; + + // create mapping + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + { + builder.startObject("properties") + .startObject("timestamp") + .field("type", "date") + .endObject() + .startObject("user_id") + .field("type", "keyword") + .endObject() + .startObject("count") + .field("type", "integer") + .endObject() + .startObject("business_id") + .field("type", "keyword") + .endObject() + .startObject("stars") + .field("type", "integer") + .endObject() + .endObject(); + } + builder.endObject(); + CreateIndexResponse response = client().admin() + .indices() + .prepareCreate(REVIEWS_INDEX_NAME) + .addMapping("_doc", builder) + .get(); + assertThat(response.isAcknowledged(), is(true)); + } + + // create index + BulkRequestBuilder bulk = client().prepareBulk(REVIEWS_INDEX_NAME, "_doc"); + int day = 10; + for (int i = 0; i < numDocs; i++) { + long user = i % 28; + int stars = (i + 20) % 5; + long business = (i + 100) % 50; + int hour = 10 + (i % 13); + int min = 10 + (i % 49); + int sec = 10 + (i % 49); + + String date_string = "2017-01-" + day + "T" + hour + ":" + min + ":" + sec + "Z"; + + StringBuilder sourceBuilder = new StringBuilder(); + sourceBuilder.append("{\"user_id\":\"") + .append("user_") + .append(user) + .append("\",\"count\":") + .append(i) + .append(",\"business_id\":\"") + .append("business_") + .append(business) + .append("\",\"stars\":") + .append(stars) + .append(",\"timestamp\":\"") + .append(date_string) + .append("\"}"); + bulk.add(new IndexRequest().source(sourceBuilder.toString(), XContentType.JSON)); + + if (i % 50 == 0) { + BulkResponse response = client().bulk(bulk.request()).get(); + assertThat(response.buildFailureMessage(), response.hasFailures(), is(false)); + bulk = client().prepareBulk(REVIEWS_INDEX_NAME, "_doc"); + day += 1; + } + } + client().bulk(bulk.request()).get(); + client().admin().indices().prepareRefresh(REVIEWS_INDEX_NAME).get(); + } + + public void testGetProgress() throws Exception { + createReviewsIndex(); + SourceConfig sourceConfig = new SourceConfig(REVIEWS_INDEX_NAME); + DestConfig destConfig = new DestConfig("unnecessary"); + GroupConfig histgramGroupConfig = new GroupConfig(Collections.emptyMap(), + Collections.singletonMap("every_50", new HistogramGroupSource("count", 50.0))); + AggregatorFactories.Builder aggs = new AggregatorFactories.Builder(); + aggs.addAggregator(AggregationBuilders.avg("avg_rating").field("stars")); + AggregationConfig aggregationConfig = new AggregationConfig(Collections.emptyMap(), aggs); + PivotConfig pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig); + DataFrameTransformConfig config = new DataFrameTransformConfig("get_progress_transform", + sourceConfig, + destConfig, + null, + pivotConfig); + + PlainActionFuture progressFuture = new PlainActionFuture<>(); + TransformProgressGatherer.getInitialProgress(client(), config, progressFuture); + + DataFrameTransformProgress progress = progressFuture.get(); + + assertThat(progress.getTotalDocs(), equalTo(1000L)); + assertThat(progress.getRemainingDocs(), equalTo(1000L)); + assertThat(progress.getPercentComplete(), equalTo(0.0)); + + + QueryConfig queryConfig = new QueryConfig(Collections.emptyMap(), QueryBuilders.termQuery("user_id", "user_26")); + pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig); + sourceConfig = new SourceConfig(new String[]{REVIEWS_INDEX_NAME}, queryConfig); + config = new DataFrameTransformConfig("get_progress_transform", + sourceConfig, + destConfig, + null, + pivotConfig); + + + progressFuture = new PlainActionFuture<>(); + + TransformProgressGatherer.getInitialProgress(client(), config, progressFuture); + progress = progressFuture.get(); + + assertThat(progress.getTotalDocs(), equalTo(35L)); + assertThat(progress.getRemainingDocs(), equalTo(35L)); + assertThat(progress.getPercentComplete(), equalTo(0.0)); + + client().admin().indices().prepareDelete(REVIEWS_INDEX_NAME).get(); + } + + @Override + protected Settings externalClusterClientSettings() { + Settings.Builder builder = Settings.builder(); + builder.put(NetworkModule.TRANSPORT_TYPE_KEY, SecurityField.NAME4); + builder.put(SecurityField.USER_SETTING.getKey(), "x_pack_rest_user:" + SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING); + return builder.build(); + } + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(LocalStateCompositeXPackPlugin.class, Netty4Plugin.class); + } + + @Override + protected Collection> transportClientPlugins() { + return Arrays.asList(XPackClientPlugin.class, Netty4Plugin.class); + } +} diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java index d8fcd15921e..1e65639a89f 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java @@ -95,21 +95,16 @@ public class TransportStartDataFrameTransformAction extends } final DataFrameTransform transformTask = createDataFrameTransform(request.getId(), threadPool); - // <4> Set the allocated task's state to STARTED - ActionListener> persistentTaskActionListener = ActionListener.wrap( - task -> { - waitForDataFrameTaskAllocated(task.getId(), - transformTask, - request.timeout(), - ActionListener.wrap( - taskAssigned -> ClientHelper.executeAsyncWithOrigin(client, - ClientHelper.DATA_FRAME_ORIGIN, - StartDataFrameTransformTaskAction.INSTANCE, - new StartDataFrameTransformTaskAction.Request(request.getId()), - ActionListener.wrap( - r -> listener.onResponse(new StartDataFrameTransformAction.Response(true)), - listener::onFailure)), - listener::onFailure)); + // <3> Wait for the allocated task's state to STARTED + ActionListener> newPersistentTaskActionListener = + ActionListener.wrap( + task -> { + waitForDataFrameTaskStarted(task.getId(), + transformTask, + request.timeout(), + ActionListener.wrap( + taskStarted -> listener.onResponse(new StartDataFrameTransformAction.Response(true)), + listener::onFailure)); }, listener::onFailure ); @@ -120,10 +115,11 @@ public class TransportStartDataFrameTransformAction extends PersistentTasksCustomMetaData.PersistentTask existingTask = getExistingTask(transformTask.getId(), state); if (existingTask == null) { + // Create the allocated task and wait for it to be started persistentTasksService.sendStartRequest(transformTask.getId(), DataFrameTransform.NAME, transformTask, - persistentTaskActionListener); + newPersistentTaskActionListener); } else { DataFrameTransformState transformState = (DataFrameTransformState)existingTask.getState(); if(transformState.getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) { @@ -138,7 +134,26 @@ public class TransportStartDataFrameTransformAction extends "Unable to start data frame transform [" + request.getId() + "] as it is in state [" + transformState.getTaskState() + "]", RestStatus.CONFLICT)); } else { - persistentTaskActionListener.onResponse(existingTask); + // If the task already exists but is not assigned to a node, something is weird + // return a failure that includes the current assignment explanation (if one exists) + if (existingTask.isAssigned() == false) { + String assignmentExplanation = "unknown reason"; + if (existingTask.getAssignment() != null) { + assignmentExplanation = existingTask.getAssignment().getExplanation(); + } + listener.onFailure(new ElasticsearchStatusException("Unable to start data frame transform [" + + request.getId() + "] as it is not assigned to a node, explanation: " + assignmentExplanation, + RestStatus.CONFLICT)); + return; + } + // If the task already exists and is assigned to a node, simply attempt to set it to start + ClientHelper.executeAsyncWithOrigin(client, + ClientHelper.DATA_FRAME_ORIGIN, + StartDataFrameTransformTaskAction.INSTANCE, + new StartDataFrameTransformTaskAction.Request(request.getId()), + ActionListener.wrap( + r -> listener.onResponse(new StartDataFrameTransformAction.Response(true)), + listener::onFailure)); } } }, @@ -269,10 +284,10 @@ public class TransportStartDataFrameTransformAction extends ); } - private void waitForDataFrameTaskAllocated(String taskId, - DataFrameTransform params, - TimeValue timeout, - ActionListener listener) { + private void waitForDataFrameTaskStarted(String taskId, + DataFrameTransform params, + TimeValue timeout, + ActionListener listener) { DataFramePredicate predicate = new DataFramePredicate(); persistentTasksService.waitForPersistentTaskCondition(taskId, predicate, timeout, new PersistentTasksService.WaitForPersistentTaskListener() { @@ -324,7 +339,15 @@ public class TransportStartDataFrameTransformAction extends return true; } // We just want it assigned so we can tell it to start working - return assignment != null && assignment.isAssigned(); + return assignment != null && assignment.isAssigned() && isNotStopped(persistentTask); + } + + // checking for `isNotStopped` as the state COULD be marked as failed for any number of reasons + // But if it is in a failed state, _stats will show as much and give good reason to the user. + // If it is not able to be assigned to a node all together, we should just close the task completely + private boolean isNotStopped(PersistentTasksCustomMetaData.PersistentTask task) { + DataFrameTransformState state = (DataFrameTransformState)task.getState(); + return state != null && state.getTaskState().equals(DataFrameTransformTaskState.STOPPED) == false; } } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java index 823ccaff71b..5fde9a1cac6 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.Nullable; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -23,6 +24,7 @@ import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress; import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IterationResult; @@ -64,6 +66,9 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer getFieldMappings(); + @Nullable + protected abstract DataFrameTransformProgress getProgress(); + protected abstract void failIndexer(String message); public int getPageSize() { @@ -87,7 +92,7 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer listener) { // reset the page size, so we do not memorize a low page size forever, the pagesize will be re-calculated on start @@ -106,8 +115,14 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer> doProcess(SearchResponse searchResponse) { final CompositeAggregation agg = searchResponse.getAggregations().get(COMPOSITE_AGGREGATION_NAME); - return new IterationResult<>(processBucketsToIndexRequests(agg).collect(Collectors.toList()), agg.afterKey(), - agg.getBuckets().isEmpty()); + long docsBeforeProcess = getStats().getNumDocuments(); + IterationResult> result = new IterationResult<>(processBucketsToIndexRequests(agg).collect(Collectors.toList()), + agg.afterKey(), + agg.getBuckets().isEmpty()); + if (getProgress() != null) { + getProgress().docsProcessed(getStats().getNumDocuments() - docsBeforeProcess); + } + return result; } /* diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index b6f38a5dd23..708585a8dc3 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.common.Nullable; import org.elasticsearch.persistent.AllocatedPersistentTask; @@ -19,21 +20,31 @@ import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.dataframe.DataFrameField; +import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; +import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; +import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.dataframe.DataFrame; import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService; import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; +import org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksExecutor { private static final Logger logger = LogManager.getLogger(DataFrameTransformPersistentTasksExecutor.class); + // The amount of time we wait for the cluster state to respond when being marked as failed + private static final int MARK_AS_FAILED_TIMEOUT_SEC = 90; private final Client client; private final DataFrameTransformsConfigManager transformsConfigManager; private final DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService; @@ -58,36 +69,116 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx @Override protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTransform params, PersistentTaskState state) { - DataFrameTransformTask buildTask = (DataFrameTransformTask) task; - SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job( - DataFrameTransformTask.SCHEDULE_NAME + "_" + params.getId(), next()); - DataFrameTransformState transformState = (DataFrameTransformState) state; - if (transformState != null && transformState.getTaskState() == DataFrameTransformTaskState.FAILED) { - logger.warn("Tried to start failed transform [" + params.getId() + "] failure reason: " + transformState.getReason()); - return; - } - transformsConfigManager.getTransformStats(params.getId(), ActionListener.wrap( + final String transformId = params.getId(); + final DataFrameTransformTask buildTask = (DataFrameTransformTask) task; + final SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(DataFrameTransformTask.SCHEDULE_NAME + "_" + transformId, + next()); + final DataFrameTransformState transformState = (DataFrameTransformState) state; + + final DataFrameTransformTask.ClientDataFrameIndexerBuilder indexerBuilder = + new DataFrameTransformTask.ClientDataFrameIndexerBuilder() + .setAuditor(auditor) + .setClient(client) + .setIndexerState(transformState == null ? IndexerState.STOPPED : transformState.getIndexerState()) + .setInitialPosition(transformState == null ? null : transformState.getPosition()) + // If the state is `null` that means this is a "first run". We can safely assume the + // task will attempt to gather the initial progress information + // if we have state, this may indicate the previous execution node crashed, so we should attempt to retrieve + // the progress from state to keep an accurate measurement of our progress + .setProgress(transformState == null ? null : transformState.getProgress()) + .setTransformsCheckpointService(dataFrameTransformsCheckpointService) + .setTransformsConfigManager(transformsConfigManager) + .setTransformId(transformId); + + ActionListener startTaskListener = ActionListener.wrap( + response -> logger.info("Successfully completed and scheduled task in node operation"), + failure -> logger.error("Failed to start task ["+ transformId +"] in node operation", failure) + ); + + // <3> Set the previous stats (if they exist), initialize the indexer, start the task (If it is STOPPED) + // Since we don't create the task until `_start` is called, if we see that the task state is stopped, attempt to start + // Schedule execution regardless + ActionListener transformStatsActionListener = ActionListener.wrap( stats -> { - // Initialize with the previously recorded stats - buildTask.initializePreviousStats(stats); - scheduleTask(buildTask, schedulerJob, params.getId()); + indexerBuilder.setInitialStats(stats); + buildTask.initializeIndexer(indexerBuilder); + scheduleAndStartTask(buildTask, schedulerJob, startTaskListener); }, error -> { if (error instanceof ResourceNotFoundException == false) { logger.error("Unable to load previously persisted statistics for transform [" + params.getId() + "]", error); } - scheduleTask(buildTask, schedulerJob, params.getId()); + indexerBuilder.setInitialStats(new DataFrameIndexerTransformStats(transformId)); + buildTask.initializeIndexer(indexerBuilder); + scheduleAndStartTask(buildTask, schedulerJob, startTaskListener); } - )); + ); + + // <2> set fieldmappings for the indexer, get the previous stats (if they exist) + ActionListener> getFieldMappingsListener = ActionListener.wrap( + fieldMappings -> { + indexerBuilder.setFieldMappings(fieldMappings); + transformsConfigManager.getTransformStats(transformId, transformStatsActionListener); + }, + error -> { + String msg = DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS, + indexerBuilder.getTransformConfig().getDestination().getIndex()); + logger.error(msg, error); + markAsFailed(buildTask, msg); + } + ); + + // <1> Validate the transform, assigning it to the indexer, and get the field mappings + ActionListener getTransformConfigListener = ActionListener.wrap( + config -> { + if (config.isValid()) { + indexerBuilder.setTransformConfig(config); + SchemaUtil.getDestinationFieldMappings(client, config.getDestination().getIndex(), getFieldMappingsListener); + } else { + markAsFailed(buildTask, + DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID, transformId)); + } + }, + error -> { + String msg = DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_LOAD_TRANSFORM_CONFIGURATION, transformId); + logger.error(msg, error); + markAsFailed(buildTask, msg); + } + ); + // <0> Get the transform config + transformsConfigManager.getTransformConfiguration(transformId, getTransformConfigListener); } - private void scheduleTask(DataFrameTransformTask buildTask, SchedulerEngine.Job schedulerJob, String id) { + private void markAsFailed(DataFrameTransformTask task, String reason) { + CountDownLatch latch = new CountDownLatch(1); + + task.markAsFailed(reason, new LatchedActionListener<>(ActionListener.wrap( + nil -> {}, + failure -> logger.error("Failed to set task [" + task.getTransformId() +"] to failed", failure) + ), latch)); + try { + latch.await(MARK_AS_FAILED_TIMEOUT_SEC, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.error("Timeout waiting for task [" + task.getTransformId() + "] to be marked as failed in cluster state", e); + } + } + + private void scheduleAndStartTask(DataFrameTransformTask buildTask, + SchedulerEngine.Job schedulerJob, + ActionListener listener) { // Note that while the task is added to the scheduler here, the internal state will prevent // it from doing any work until the task is "started" via the StartTransform api schedulerEngine.register(buildTask); schedulerEngine.add(schedulerJob); - - logger.info("Data frame transform [" + id + "] created."); + logger.info("Data frame transform [{}] created.", buildTask.getTransformId()); + // If we are stopped, and it is an initial run, this means we have never been started, + // attempt to start the task + if (buildTask.getState().getTaskState().equals(DataFrameTransformTaskState.STOPPED) && buildTask.isInitialRun()) { + buildTask.start(listener); + } else { + logger.debug("No need to start task. Its current state is: {}", buildTask.getState().getIndexerState()); + listener.onResponse(new StartDataFrameTransformTaskAction.Response(true)); + } } static SchedulerEngine.Schedule next() { @@ -100,7 +191,6 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId, PersistentTasksCustomMetaData.PersistentTask persistentTask, Map headers) { return new DataFrameTransformTask(id, type, action, parentTaskId, persistentTask.getParams(), - (DataFrameTransformState) persistentTask.getState(), client, transformsConfigManager, - dataFrameTransformsCheckpointService, schedulerEngine, auditor, threadPool, headers); + (DataFrameTransformState) persistentTask.getState(), schedulerEngine, auditor, threadPool, headers); } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index f142fc36179..15a555da488 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -8,9 +8,9 @@ package org.elasticsearch.xpack.dataframe.transforms; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; @@ -25,27 +25,25 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.dataframe.DataFrameField; -import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; +import org.elasticsearch.xpack.core.dataframe.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event; import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService; import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; -import org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil; import java.util.Arrays; import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -62,21 +60,20 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S private final DataFrameTransform transform; private final SchedulerEngine schedulerEngine; private final ThreadPool threadPool; - private final DataFrameIndexer indexer; private final DataFrameAuditor auditor; - private final DataFrameIndexerTransformStats previousStats; + private final Map initialPosition; + private final IndexerState initialIndexerState; + + private final SetOnce indexer = new SetOnce<>(); private final AtomicReference taskState; private final AtomicReference stateReason; // the checkpoint of this data frame, storing the checkpoint until data indexing from source to dest is _complete_ // Note: Each indexer run creates a new future checkpoint which becomes the current checkpoint only after the indexer run finished private final AtomicLong currentCheckpoint; - private final AtomicInteger failureCount; public DataFrameTransformTask(long id, String type, String action, TaskId parentTask, DataFrameTransform transform, - DataFrameTransformState state, Client client, DataFrameTransformsConfigManager transformsConfigManager, - DataFrameTransformsCheckpointService transformsCheckpointService, - SchedulerEngine schedulerEngine, DataFrameAuditor auditor, + DataFrameTransformState state, SchedulerEngine schedulerEngine, DataFrameAuditor auditor, ThreadPool threadPool, Map headers) { super(id, type, action, DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + transform.getId(), parentTask, headers); this.transform = transform; @@ -107,13 +104,11 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S initialGeneration = state.getCheckpoint(); } - this.indexer = new ClientDataFrameIndexer(transform.getId(), transformsConfigManager, transformsCheckpointService, - new AtomicReference<>(initialState), initialPosition, client, auditor); + this.initialIndexerState = initialState; + this.initialPosition = initialPosition; this.currentCheckpoint = new AtomicLong(initialGeneration); - this.previousStats = new DataFrameIndexerTransformStats(transform.getId()); this.taskState = new AtomicReference<>(initialTaskState); this.stateReason = new AtomicReference<>(initialReason); - this.failureCount = new AtomicInteger(0); } public String getTransformId() { @@ -128,21 +123,36 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S return getState(); } - public DataFrameTransformState getState() { - return new DataFrameTransformState( - taskState.get(), - indexer.getState(), - indexer.getPosition(), - currentCheckpoint.get(), - stateReason.get()); + private DataFrameIndexer getIndexer() { + return indexer.get(); } - void initializePreviousStats(DataFrameIndexerTransformStats stats) { - previousStats.merge(stats); + public DataFrameTransformState getState() { + if (getIndexer() == null) { + return new DataFrameTransformState( + taskState.get(), + initialIndexerState, + initialPosition, + currentCheckpoint.get(), + stateReason.get(), + null); + } else { + return new DataFrameTransformState( + taskState.get(), + indexer.get().getState(), + indexer.get().getPosition(), + currentCheckpoint.get(), + stateReason.get(), + getIndexer().getProgress()); + } } public DataFrameIndexerTransformStats getStats() { - return new DataFrameIndexerTransformStats(previousStats).merge(indexer.getStats()); + if (getIndexer() == null) { + return new DataFrameIndexerTransformStats(getTransformId()); + } else { + return getIndexer().getStats(); + } } public long getCheckpoint() { @@ -155,15 +165,29 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S * @return checkpoint in progress or 0 if task/indexer is not active */ public long getInProgressCheckpoint() { - return indexer.getState().equals(IndexerState.INDEXING) ? currentCheckpoint.get() + 1L : 0; + if (getIndexer() == null) { + return 0; + } else { + return indexer.get().getState().equals(IndexerState.INDEXING) ? currentCheckpoint.get() + 1L : 0; + } } public boolean isStopped() { - return indexer.getState().equals(IndexerState.STOPPED); + IndexerState currentState = getIndexer() == null ? initialIndexerState : getIndexer().getState(); + return currentState.equals(IndexerState.STOPPED); + } + + boolean isInitialRun() { + return getIndexer() != null && getIndexer().initialRun(); } public synchronized void start(ActionListener listener) { - final IndexerState newState = indexer.start(); + if (getIndexer() == null) { + listener.onFailure(new ElasticsearchException("Task for transform [{}] not fully initialized. Try again later", + getTransformId())); + return; + } + final IndexerState newState = getIndexer().start(); if (Arrays.stream(RUNNING_STATES).noneMatch(newState::equals)) { listener.onFailure(new ElasticsearchException("Cannot start task for data frame transform [{}], because state was [{}]", transform.getId(), newState)); @@ -171,14 +195,14 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S } stateReason.set(null); taskState.set(DataFrameTransformTaskState.STARTED); - failureCount.set(0); final DataFrameTransformState state = new DataFrameTransformState( DataFrameTransformTaskState.STARTED, IndexerState.STOPPED, - indexer.getPosition(), + getIndexer().getPosition(), currentCheckpoint.get(), - null); + null, + getIndexer().getProgress()); logger.info("Updating state for data frame transform [{}] to [{}]", transform.getId(), state.toString()); persistStateToClusterState(state, ActionListener.wrap( @@ -187,7 +211,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S listener.onResponse(new StartDataFrameTransformTaskAction.Response(true)); }, exc -> { - indexer.stop(); + getIndexer().stop(); listener.onFailure(new ElasticsearchException("Error while updating state for data frame transform [" + transform.getId() + "] to [" + state.getIndexerState() + "].", exc)); } @@ -195,13 +219,18 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S } public synchronized void stop(ActionListener listener) { + if (getIndexer() == null) { + listener.onFailure(new ElasticsearchException("Task for transform [{}] not fully initialized. Try again later", + getTransformId())); + return; + } // taskState is initialized as STOPPED and is updated in tandem with the indexerState // Consequently, if it is STOPPED, we consider the whole task STOPPED. if (taskState.get() == DataFrameTransformTaskState.STOPPED) { listener.onResponse(new StopDataFrameTransformAction.Response(true)); return; } - final IndexerState newState = indexer.stop(); + final IndexerState newState = getIndexer().stop(); switch (newState) { case STOPPED: // Fall through to `STOPPING` as the behavior is the same for both, we should persist for both @@ -215,9 +244,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S DataFrameTransformState state = new DataFrameTransformState( DataFrameTransformTaskState.STOPPED, IndexerState.STOPPED, - indexer.getPosition(), + getIndexer().getPosition(), currentCheckpoint.get(), - stateReason.get()); + stateReason.get(), + getIndexer().getProgress()); persistStateToClusterState(state, ActionListener.wrap( task -> { auditor.info(transform.getId(), "Updated state to [" + state.getTaskState() + "]"); @@ -237,10 +267,14 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S @Override public synchronized void triggered(Event event) { + if (getIndexer() == null) { + logger.warn("Data frame task [{}] triggered with an unintialized indexer", getTransformId()); + return; + } // for now no rerun, so only trigger if checkpoint == 0 if (currentCheckpoint.get() == 0 && event.getJobName().equals(SCHEDULE_NAME + "_" + transform.getId())) { - logger.debug("Data frame indexer [{}] schedule has triggered, state: [{}]", event.getJobName(), indexer.getState()); - indexer.maybeTriggerAsyncJob(System.currentTimeMillis()); + logger.debug("Data frame indexer [{}] schedule has triggered, state: [{}]", event.getJobName(), getIndexer().getState()); + getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis()); } } @@ -261,6 +295,17 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S markAsCompleted(); } + public DataFrameTransformProgress getProgress() { + if (indexer.get() == null) { + return null; + } + DataFrameTransformProgress indexerProgress = indexer.get().getProgress(); + if (indexerProgress == null) { + return null; + } + return new DataFrameTransformProgress(indexerProgress); + } + void persistStateToClusterState(DataFrameTransformState state, ActionListener> listener) { updatePersistentTaskState(state, ActionListener.wrap( @@ -276,6 +321,17 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S )); } + synchronized void markAsFailed(String reason, ActionListener listener) { + taskState.set(DataFrameTransformTaskState.FAILED); + stateReason.set(reason); + persistStateToClusterState(getState(), ActionListener.wrap( + r -> { + listener.onResponse(null); + }, + listener::onFailure + )); + } + /** * This is called when the persistent task signals that the allocated task should be terminated. * Termination in the task framework is essentially voluntary, as the allocated task can only be @@ -284,37 +340,174 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S @Override public synchronized void onCancelled() { logger.info( - "Received cancellation request for data frame transform [" + transform.getId() + "], state: [" + indexer.getState() + "]"); - if (indexer.abort()) { + "Received cancellation request for data frame transform [" + transform.getId() + "], state: [" + taskState.get() + "]"); + if (getIndexer() != null && getIndexer().abort()) { // there is no background transform running, we can shutdown safely shutdown(); } } - protected class ClientDataFrameIndexer extends DataFrameIndexer { - private static final int LOAD_TRANSFORM_TIMEOUT_IN_SECONDS = 30; + synchronized void initializeIndexer(ClientDataFrameIndexerBuilder indexerBuilder) { + indexer.set(indexerBuilder.build(this)); + } + static class ClientDataFrameIndexerBuilder { + private Client client; + private DataFrameTransformsConfigManager transformsConfigManager; + private DataFrameTransformsCheckpointService transformsCheckpointService; + private String transformId; + private DataFrameAuditor auditor; + private Map fieldMappings; + private DataFrameTransformConfig transformConfig; + private DataFrameIndexerTransformStats initialStats; + private IndexerState indexerState = IndexerState.STOPPED; + private Map initialPosition; + private DataFrameTransformProgress progress; + + ClientDataFrameIndexer build(DataFrameTransformTask parentTask) { + return new ClientDataFrameIndexer(this.transformId, + this.transformsConfigManager, + this.transformsCheckpointService, + new AtomicReference<>(this.indexerState), + this.initialPosition, + this.client, + this.auditor, + this.initialStats, + this.transformConfig, + this.fieldMappings, + this.progress, + parentTask); + } + + ClientDataFrameIndexerBuilder setClient(Client client) { + this.client = client; + return this; + } + + ClientDataFrameIndexerBuilder setTransformsConfigManager(DataFrameTransformsConfigManager transformsConfigManager) { + this.transformsConfigManager = transformsConfigManager; + return this; + } + + ClientDataFrameIndexerBuilder setTransformsCheckpointService(DataFrameTransformsCheckpointService transformsCheckpointService) { + this.transformsCheckpointService = transformsCheckpointService; + return this; + } + + ClientDataFrameIndexerBuilder setTransformId(String transformId) { + this.transformId = transformId; + return this; + } + + ClientDataFrameIndexerBuilder setAuditor(DataFrameAuditor auditor) { + this.auditor = auditor; + return this; + } + + ClientDataFrameIndexerBuilder setFieldMappings(Map fieldMappings) { + this.fieldMappings = fieldMappings; + return this; + } + + ClientDataFrameIndexerBuilder setTransformConfig(DataFrameTransformConfig transformConfig) { + this.transformConfig = transformConfig; + return this; + } + + DataFrameTransformConfig getTransformConfig() { + return this.transformConfig; + } + + ClientDataFrameIndexerBuilder setInitialStats(DataFrameIndexerTransformStats initialStats) { + this.initialStats = initialStats; + return this; + } + + ClientDataFrameIndexerBuilder setIndexerState(IndexerState indexerState) { + this.indexerState = indexerState; + return this; + } + + ClientDataFrameIndexerBuilder setInitialPosition(Map initialPosition) { + this.initialPosition = initialPosition; + return this; + } + + ClientDataFrameIndexerBuilder setProgress(DataFrameTransformProgress progress) { + this.progress = progress; + return this; + } + } + + static class ClientDataFrameIndexer extends DataFrameIndexer { private final Client client; private final DataFrameTransformsConfigManager transformsConfigManager; private final DataFrameTransformsCheckpointService transformsCheckpointService; private final String transformId; + private final DataFrameAuditor auditor; + private final DataFrameTransformTask transformTask; + private final Map fieldMappings; + private final DataFrameTransformConfig transformConfig; + private volatile DataFrameTransformProgress progress; private volatile DataFrameIndexerTransformStats previouslyPersistedStats = null; + private final AtomicInteger failureCount; // Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index private volatile String lastAuditedExceptionMessage = null; - private Map fieldMappings = null; - private DataFrameTransformConfig transformConfig = null; + ClientDataFrameIndexer(String transformId, + DataFrameTransformsConfigManager transformsConfigManager, + DataFrameTransformsCheckpointService transformsCheckpointService, + AtomicReference initialState, + Map initialPosition, + Client client, + DataFrameAuditor auditor, + DataFrameIndexerTransformStats initialStats, + DataFrameTransformConfig transformConfig, + Map fieldMappings, + DataFrameTransformProgress transformProgress, + DataFrameTransformTask parentTask) { + super(ExceptionsHelper.requireNonNull(parentTask, "parentTask") + .threadPool + .executor(ThreadPool.Names.GENERIC), + ExceptionsHelper.requireNonNull(auditor, "auditor"), + ExceptionsHelper.requireNonNull(initialState, "initialState"), + initialPosition, + initialStats == null ? new DataFrameIndexerTransformStats(transformId) : initialStats); + this.transformId = ExceptionsHelper.requireNonNull(transformId, "transformId"); + this.transformsConfigManager = ExceptionsHelper.requireNonNull(transformsConfigManager, "transformsConfigManager"); + this.transformsCheckpointService = ExceptionsHelper.requireNonNull(transformsCheckpointService, + "transformsCheckpointService"); + this.client = ExceptionsHelper.requireNonNull(client, "client"); + this.auditor = auditor; + this.transformConfig = ExceptionsHelper.requireNonNull(transformConfig, "transformConfig"); + this.transformTask = parentTask; + this.fieldMappings = ExceptionsHelper.requireNonNull(fieldMappings, "fieldMappings"); + this.progress = transformProgress; + this.failureCount = new AtomicInteger(0); + } - public ClientDataFrameIndexer(String transformId, DataFrameTransformsConfigManager transformsConfigManager, - DataFrameTransformsCheckpointService transformsCheckpointService, - AtomicReference initialState, Map initialPosition, Client client, - DataFrameAuditor auditor) { - super(threadPool.executor(ThreadPool.Names.GENERIC), auditor, initialState, initialPosition, - new DataFrameIndexerTransformStats(transformId)); - this.transformId = transformId; - this.transformsConfigManager = transformsConfigManager; - this.transformsCheckpointService = transformsCheckpointService; - this.client = client; + @Override + protected void onStart(long now, ActionListener listener) { + // Reset our failure count as we are starting again + failureCount.set(0); + // On each run, we need to get the total number of docs and reset the count of processed docs + // Since multiple checkpoints can be executed in the task while it is running on the same node, we need to gather + // the progress here, and not in the executor. + if (initialRun()) { + TransformProgressGatherer.getInitialProgress(this.client, getConfig(), ActionListener.wrap( + newProgress -> { + progress = newProgress; + super.onStart(now, listener); + }, + failure -> { + progress = null; + logger.warn("Unable to load progress information for task [" + transformId + "]", failure); + super.onStart(now, listener); + } + )); + } else { + super.onStart(now, listener); + } } @Override @@ -327,6 +520,11 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S return fieldMappings; } + @Override + protected DataFrameTransformProgress getProgress() { + return progress; + } + @Override protected String getJobId() { return transformId; @@ -334,56 +532,11 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S @Override public synchronized boolean maybeTriggerAsyncJob(long now) { - if (taskState.get() == DataFrameTransformTaskState.FAILED) { + if (transformTask.taskState.get() == DataFrameTransformTaskState.FAILED) { logger.debug("Schedule was triggered for transform [{}] but task is failed. Ignoring trigger.", getJobId()); return false; } - if (transformConfig == null) { - CountDownLatch latch = new CountDownLatch(1); - - transformsConfigManager.getTransformConfiguration(transformId, new LatchedActionListener<>(ActionListener.wrap( - config -> transformConfig = config, - e -> { - throw new RuntimeException( - DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_LOAD_TRANSFORM_CONFIGURATION, transformId), e); - }), latch)); - - try { - latch.await(LOAD_TRANSFORM_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS); - } catch (InterruptedException e) { - throw new RuntimeException( - DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_LOAD_TRANSFORM_CONFIGURATION, transformId), e); - } - } - - if (transformConfig.isValid() == false) { - DataFrameConfigurationException exception = new DataFrameConfigurationException(transformId); - handleFailure(exception); - throw exception; - } - - if (fieldMappings == null) { - CountDownLatch latch = new CountDownLatch(1); - SchemaUtil.getDestinationFieldMappings(client, transformConfig.getDestination().getIndex(), new LatchedActionListener<>( - ActionListener.wrap( - destinationMappings -> fieldMappings = destinationMappings, - e -> { - throw new RuntimeException( - DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS, - transformConfig.getDestination().getIndex()), - e); - }), latch)); - try { - latch.await(LOAD_TRANSFORM_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS); - } catch (InterruptedException e) { - throw new RuntimeException( - DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS, - transformConfig.getDestination().getIndex()), - e); - } - } - return super.maybeTriggerAsyncJob(now); } @@ -408,30 +561,28 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S } final DataFrameTransformState state = new DataFrameTransformState( - taskState.get(), + transformTask.taskState.get(), indexerState, getPosition(), - currentCheckpoint.get(), - stateReason.get()); - logger.info("Updating persistent state of transform [" + transform.getId() + "] to [" + state.toString() + "]"); + transformTask.currentCheckpoint.get(), + transformTask.stateReason.get(), + getProgress()); + logger.debug("Updating persistent state of transform [{}] to [{}]", transformConfig.getId(), state.toString()); // Persisting stats when we call `doSaveState` should be ok as we only call it on a state transition and // only every-so-often when doing the bulk indexing calls. See AsyncTwoPhaseIndexer#onBulkResponse for current periodicity ActionListener> updateClusterStateListener = ActionListener.wrap( task -> { - // Make a copy of the previousStats so that they are not constantly updated when `merge` is called - DataFrameIndexerTransformStats tempStats = new DataFrameIndexerTransformStats(previousStats).merge(getStats()); - // Only persist the stats if something has actually changed - if (previouslyPersistedStats == null || previouslyPersistedStats.equals(tempStats) == false) { - transformsConfigManager.putOrUpdateTransformStats(tempStats, + if (previouslyPersistedStats == null || previouslyPersistedStats.equals(getStats()) == false) { + transformsConfigManager.putOrUpdateTransformStats(getStats(), ActionListener.wrap( r -> { - previouslyPersistedStats = tempStats; + previouslyPersistedStats = getStats(); next.run(); }, statsExc -> { - logger.error("Updating stats of transform [" + transform.getId() + "] failed", statsExc); + logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc); next.run(); } )); @@ -441,24 +592,24 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S } }, exc -> { - logger.error("Updating persistent state of transform [" + transform.getId() + "] failed", exc); + logger.error("Updating persistent state of transform [" + transformConfig.getId() + "] failed", exc); next.run(); } ); - persistStateToClusterState(state, updateClusterStateListener); + transformTask.persistStateToClusterState(state, updateClusterStateListener); } @Override protected void onFailure(Exception exc) { // the failure handler must not throw an exception due to internal problems try { - logger.warn("Data frame transform [" + transform.getId() + "] encountered an exception: ", exc); + logger.warn("Data frame transform [" + transformTask.getTransformId() + "] encountered an exception: ", exc); // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one if (exc.getMessage().equals(lastAuditedExceptionMessage) == false) { - auditor.warning(transform.getId(), "Data frame transform encountered an exception: " + exc.getMessage()); + auditor.warning(transformTask.getTransformId(), "Data frame transform encountered an exception: " + exc.getMessage()); lastAuditedExceptionMessage = exc.getMessage(); } handleFailure(exc); @@ -471,9 +622,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S protected void onFinish(ActionListener listener) { try { super.onFinish(listener); - long checkpoint = currentCheckpoint.incrementAndGet(); - auditor.info(transform.getId(), "Finished indexing for data frame transform checkpoint [" + checkpoint + "]"); - logger.info("Finished indexing for data frame transform [" + transform.getId() + "] checkpoint [" + checkpoint + "]"); + long checkpoint = transformTask.currentCheckpoint.incrementAndGet(); + auditor.info(transformTask.getTransformId(), "Finished indexing for data frame transform checkpoint [" + checkpoint + "]"); + logger.info( + "Finished indexing for data frame transform [" + transformTask.getTransformId() + "] checkpoint [" + checkpoint + "]"); listener.onResponse(null); } catch (Exception e) { listener.onFailure(e); @@ -482,26 +634,29 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S @Override protected void onAbort() { - auditor.info(transform.getId(), "Received abort request, stopping indexer"); - logger.info("Data frame transform [" + transform.getId() + "] received abort request, stopping indexer"); - shutdown(); + auditor.info(transformConfig.getId(), "Received abort request, stopping indexer"); + logger.info("Data frame transform [" + transformConfig.getId() + "] received abort request, stopping indexer"); + transformTask.shutdown(); } @Override protected void createCheckpoint(ActionListener listener) { - transformsCheckpointService.getCheckpoint(transformConfig, currentCheckpoint.get() + 1, ActionListener.wrap(checkpoint -> { - transformsConfigManager.putTransformCheckpoint(checkpoint, ActionListener.wrap(putCheckPointResponse -> { - listener.onResponse(null); - }, createCheckpointException -> { - listener.onFailure(new RuntimeException("Failed to create checkpoint", createCheckpointException)); - })); - }, getCheckPointException -> { - listener.onFailure(new RuntimeException("Failed to retrieve checkpoint", getCheckPointException)); - })); + transformsCheckpointService.getCheckpoint(transformConfig, + transformTask.currentCheckpoint.get() + 1, + ActionListener.wrap( + checkpoint -> transformsConfigManager.putTransformCheckpoint(checkpoint, + ActionListener.wrap( + putCheckPointResponse -> listener.onResponse(null), + createCheckpointException -> + listener.onFailure(new RuntimeException("Failed to create checkpoint", createCheckpointException)) + )), + getCheckPointException -> + listener.onFailure(new RuntimeException("Failed to retrieve checkpoint", getCheckPointException)) + )); } private boolean isIrrecoverableFailure(Exception e) { - return e instanceof IndexNotFoundException || e instanceof DataFrameConfigurationException; + return e instanceof IndexNotFoundException; } synchronized void handleFailure(Exception e) { @@ -520,21 +675,12 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S @Override protected void failIndexer(String failureMessage) { logger.error("Data frame transform [" + getJobId() + "]:" + failureMessage); - auditor.error(transform.getId(), failureMessage); - stateReason.set(failureMessage); - taskState.set(DataFrameTransformTaskState.FAILED); - persistStateToClusterState(DataFrameTransformTask.this.getState(), ActionListener.wrap( - r -> failureCount.set(0), // Successfully marked as failed, reset counter so that task can be restarted - exception -> {} // Noop, internal method logs the failure to update the state - )); + auditor.error(transformTask.getTransformId(), failureMessage); + transformTask.markAsFailed(failureMessage, ActionListener.wrap( + r -> { + // Successfully marked as failed, reset counter so that task can be restarted + failureCount.set(0); + }, e -> {})); } } - - class DataFrameConfigurationException extends RuntimeException { - - DataFrameConfigurationException(String transformId) { - super(DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID, transformId)); - } - - } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/TransformProgressGatherer.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/TransformProgressGatherer.java new file mode 100644 index 00000000000..23168627d44 --- /dev/null +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/TransformProgressGatherer.java @@ -0,0 +1,53 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.dataframe.transforms; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress; + +/** + * Utility class to gather the progress information for a given config and its cursor position + */ +public final class TransformProgressGatherer { + + /** + * This gathers the total docs given the config and search + * + * TODO: Support checkpointing logic to restrict the query + * @param progressListener The listener to alert on completion + */ + public static void getInitialProgress(Client client, + DataFrameTransformConfig config, + ActionListener progressListener) { + SearchRequest request = client.prepareSearch(config.getSource().getIndex()) + .setSize(0) + .setAllowPartialSearchResults(false) + .setTrackTotalHits(true) + .setQuery(config.getSource().getQueryConfig().getQuery()) + .request(); + + ActionListener searchResponseActionListener = ActionListener.wrap( + searchResponse -> { + progressListener.onResponse(new DataFrameTransformProgress(searchResponse.getHits().getTotalHits().value, null)); + }, + progressListener::onFailure + ); + ClientHelper.executeWithHeadersAsync(config.getHeaders(), + ClientHelper.DATA_FRAME_ORIGIN, + client, + SearchAction.INSTANCE, + request, + searchResponseActionListener); + } + +} diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java index 89388d82e08..015eb4b65e3 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; @@ -94,6 +95,11 @@ public class DataFrameIndexerTests extends ESTestCase { return fieldMappings; } + @Override + protected DataFrameTransformProgress getProgress() { + return null; + } + @Override protected void createCheckpoint(ActionListener listener) { listener.onResponse(null); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml index 24533098537..33b0f40863a 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml @@ -34,6 +34,7 @@ teardown: - do: data_frame.stop_data_frame_transform: transform_id: "airline-transform-stats" + wait_for_completion: true - do: data_frame.delete_data_frame_transform: @@ -197,6 +198,7 @@ teardown: - match: { transforms.0.id: "airline-transform-stats-dos" } - match: { transforms.0.state.indexer_state: "stopped" } - match: { transforms.0.state.checkpoint: 0 } + - is_false: transforms.0.state.progress - match: { transforms.0.stats.pages_processed: 0 } - match: { transforms.0.stats.documents_processed: 0 } - match: { transforms.0.stats.documents_indexed: 0 }