diff --git a/build.gradle b/build.gradle index f3d683671e4..1bd2199add4 100644 --- a/build.gradle +++ b/build.gradle @@ -163,8 +163,8 @@ task verifyVersions { * after the backport of the backcompat code is complete. */ -boolean bwc_tests_enabled = true -final String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */ +boolean bwc_tests_enabled = false +final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/44219" /* place a PR link here when committing bwc changes */ if (bwc_tests_enabled == false) { if (bwc_tests_disabled_issue.isEmpty()) { throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false") diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameIndexerPosition.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameIndexerPosition.java new file mode 100644 index 00000000000..86a2527ffdd --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameIndexerPosition.java @@ -0,0 +1,99 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.dataframe.transforms; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser.ValueType; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; + +/** + * Holds state of the cursors: + * + * indexer_position: the position of the indexer querying the source + * bucket_position: the position used for identifying changes + */ +public class DataFrameIndexerPosition { + public static final ParseField INDEXER_POSITION = new ParseField("indexer_position"); + public static final ParseField BUCKET_POSITION = new ParseField("bucket_position"); + + private final Map indexerPosition; + private final Map bucketPosition; + + @SuppressWarnings("unchecked") + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "data_frame_indexer_position", + true, + args -> new DataFrameIndexerPosition((Map) args[0],(Map) args[1])); + + static { + PARSER.declareField(optionalConstructorArg(), XContentParser::mapOrdered, INDEXER_POSITION, ValueType.OBJECT); + PARSER.declareField(optionalConstructorArg(), XContentParser::mapOrdered, BUCKET_POSITION, ValueType.OBJECT); + } + + public DataFrameIndexerPosition(Map indexerPosition, Map bucketPosition) { + this.indexerPosition = indexerPosition == null ? null : Collections.unmodifiableMap(indexerPosition); + this.bucketPosition = bucketPosition == null ? null : Collections.unmodifiableMap(bucketPosition); + } + + public Map getIndexerPosition() { + return indexerPosition; + } + + public Map getBucketsPosition() { + return bucketPosition; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + DataFrameIndexerPosition that = (DataFrameIndexerPosition) other; + + return Objects.equals(this.indexerPosition, that.indexerPosition) && + Objects.equals(this.bucketPosition, that.bucketPosition); + } + + @Override + public int hashCode() { + return Objects.hash(indexerPosition, bucketPosition); + } + + public static DataFrameIndexerPosition fromXContent(XContentParser parser) { + try { + return PARSER.parse(parser, null); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformState.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformState.java index 186c67bf42c..65216827f48 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformState.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformState.java @@ -27,8 +27,6 @@ import org.elasticsearch.common.xcontent.ObjectParser.ValueType; import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; -import java.util.Collections; -import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; @@ -39,7 +37,10 @@ public class DataFrameTransformState { private static final ParseField INDEXER_STATE = new ParseField("indexer_state"); private static final ParseField TASK_STATE = new ParseField("task_state"); + + // 7.3 BWC: current_position only exists in 7.2. In 7.3+ it is replaced by position. private static final ParseField CURRENT_POSITION = new ParseField("current_position"); + private static final ParseField POSITION = new ParseField("position"); private static final ParseField CHECKPOINT = new ParseField("checkpoint"); private static final ParseField REASON = new ParseField("reason"); private static final ParseField PROGRESS = new ParseField("progress"); @@ -48,18 +49,31 @@ public class DataFrameTransformState { @SuppressWarnings("unchecked") public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("data_frame_transform_state", true, - args -> new DataFrameTransformState((DataFrameTransformTaskState) args[0], - (IndexerState) args[1], - (Map) args[2], - (long) args[3], - (String) args[4], - (DataFrameTransformProgress) args[5], - (NodeAttributes) args[6])); + args -> { + DataFrameTransformTaskState taskState = (DataFrameTransformTaskState) args[0]; + IndexerState indexerState = (IndexerState) args[1]; + Map bwcCurrentPosition = (Map) args[2]; + DataFrameIndexerPosition dataFrameIndexerPosition = (DataFrameIndexerPosition) args[3]; + + // BWC handling, translate current_position to position iff position isn't set + if (bwcCurrentPosition != null && dataFrameIndexerPosition == null) { + dataFrameIndexerPosition = new DataFrameIndexerPosition(bwcCurrentPosition, null); + } + + long checkpoint = (long) args[4]; + String reason = (String) args[5]; + DataFrameTransformProgress progress = (DataFrameTransformProgress) args[6]; + NodeAttributes node = (NodeAttributes) args[7]; + + return new DataFrameTransformState(taskState, indexerState, dataFrameIndexerPosition, checkpoint, reason, progress, + node); + }); static { PARSER.declareField(constructorArg(), p -> DataFrameTransformTaskState.fromString(p.text()), TASK_STATE, ValueType.STRING); PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE, ValueType.STRING); PARSER.declareField(optionalConstructorArg(), (p, c) -> p.mapOrdered(), CURRENT_POSITION, ValueType.OBJECT); + PARSER.declareField(optionalConstructorArg(), DataFrameIndexerPosition::fromXContent, POSITION, ValueType.OBJECT); PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), CHECKPOINT); PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REASON); PARSER.declareField(optionalConstructorArg(), DataFrameTransformProgress::fromXContent, PROGRESS, ValueType.OBJECT); @@ -73,21 +87,21 @@ public class DataFrameTransformState { private final DataFrameTransformTaskState taskState; private final IndexerState indexerState; private final long checkpoint; - private final Map currentPosition; + private final DataFrameIndexerPosition position; private final String reason; private final DataFrameTransformProgress progress; private final NodeAttributes node; public DataFrameTransformState(DataFrameTransformTaskState taskState, IndexerState indexerState, - @Nullable Map position, + @Nullable DataFrameIndexerPosition position, long checkpoint, @Nullable String reason, @Nullable DataFrameTransformProgress progress, @Nullable NodeAttributes node) { this.taskState = taskState; this.indexerState = indexerState; - this.currentPosition = position == null ? null : Collections.unmodifiableMap(new LinkedHashMap<>(position)); + this.position = position; this.checkpoint = checkpoint; this.reason = reason; this.progress = progress; @@ -103,8 +117,8 @@ public class DataFrameTransformState { } @Nullable - public Map getPosition() { - return currentPosition; + public DataFrameIndexerPosition getPosition() { + return position; } public long getCheckpoint() { @@ -140,7 +154,7 @@ public class DataFrameTransformState { return Objects.equals(this.taskState, that.taskState) && Objects.equals(this.indexerState, that.indexerState) && - Objects.equals(this.currentPosition, that.currentPosition) && + Objects.equals(this.position, that.position) && Objects.equals(this.progress, that.progress) && this.checkpoint == that.checkpoint && Objects.equals(this.node, that.node) && @@ -149,7 +163,7 @@ public class DataFrameTransformState { @Override public int hashCode() { - return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason, progress, node); + return Objects.hash(taskState, indexerState, position, checkpoint, reason, progress, node); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameIndexerPositionTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameIndexerPositionTests.java new file mode 100644 index 00000000000..cd17dd3fe8e --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameIndexerPositionTests.java @@ -0,0 +1,76 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.dataframe.transforms; + +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester; + +public class DataFrameIndexerPositionTests extends ESTestCase { + + public void testFromXContent() throws IOException { + xContentTester(this::createParser, + DataFrameIndexerPositionTests::randomDataFrameIndexerPosition, + DataFrameIndexerPositionTests::toXContent, + DataFrameIndexerPosition::fromXContent) + .supportsUnknownFields(true) + .randomFieldsExcludeFilter(field -> field.equals("indexer_position") || + field.equals("bucket_position")) + .test(); + } + + public static DataFrameIndexerPosition randomDataFrameIndexerPosition() { + return new DataFrameIndexerPosition(randomPositionMap(), randomPositionMap()); + } + + public static void toXContent(DataFrameIndexerPosition position, XContentBuilder builder) throws IOException { + builder.startObject(); + if (position.getIndexerPosition() != null) { + builder.field("indexer_position", position.getIndexerPosition()); + } + if (position.getBucketsPosition() != null) { + builder.field("bucket_position", position.getBucketsPosition()); + } + builder.endObject(); + } + + private static Map randomPositionMap() { + if (randomBoolean()) { + return null; + } + int numFields = randomIntBetween(1, 5); + Map position = new LinkedHashMap<>(); + for (int i = 0; i < numFields; i++) { + Object value; + if (randomBoolean()) { + value = randomLong(); + } else { + value = randomAlphaOfLengthBetween(1, 10); + } + position.put(randomAlphaOfLengthBetween(3, 10), value); + } + return position; + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateTests.java index ebb62890c3c..ef1cf3e89b6 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateTests.java @@ -25,8 +25,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.test.ESTestCase; import java.io.IOException; -import java.util.LinkedHashMap; -import java.util.Map; import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester; @@ -38,15 +36,16 @@ public class DataFrameTransformStateTests extends ESTestCase { DataFrameTransformStateTests::toXContent, DataFrameTransformState::fromXContent) .supportsUnknownFields(true) - .randomFieldsExcludeFilter(field -> field.equals("current_position") || - field.equals("node.attributes")) + .randomFieldsExcludeFilter(field -> field.equals("position.indexer_position") || + field.equals("position.bucket_position") || + field.equals("node.attributes")) .test(); } public static DataFrameTransformState randomDataFrameTransformState() { return new DataFrameTransformState(randomFrom(DataFrameTransformTaskState.values()), randomFrom(IndexerState.values()), - randomPositionMap(), + randomBoolean() ? null : DataFrameIndexerPositionTests.randomDataFrameIndexerPosition(), randomLongBetween(0,10), randomBoolean() ? null : randomAlphaOfLength(10), randomBoolean() ? null : DataFrameTransformProgressTests.randomInstance(), @@ -58,7 +57,8 @@ public class DataFrameTransformStateTests extends ESTestCase { builder.field("task_state", state.getTaskState().value()); builder.field("indexer_state", state.getIndexerState().value()); if (state.getPosition() != null) { - builder.field("current_position", state.getPosition()); + builder.field("position"); + DataFrameIndexerPositionTests.toXContent(state.getPosition(), builder); } builder.field("checkpoint", state.getCheckpoint()); if (state.getReason() != null) { @@ -75,21 +75,4 @@ public class DataFrameTransformStateTests extends ESTestCase { builder.endObject(); } - private static Map randomPositionMap() { - if (randomBoolean()) { - return null; - } - int numFields = randomIntBetween(1, 5); - Map position = new LinkedHashMap<>(); - for (int i = 0; i < numFields; i++) { - Object value; - if (randomBoolean()) { - value = randomLong(); - } else { - value = randomAlphaOfLengthBetween(1, 10); - } - position.put(randomAlphaOfLengthBetween(3, 10), value); - } - return position; - } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameIndexerPositionTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameIndexerPositionTests.java new file mode 100644 index 00000000000..9cf33e6500c --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameIndexerPositionTests.java @@ -0,0 +1,77 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.dataframe.transforms.hlrc; + +import org.elasticsearch.client.AbstractResponseTestCase; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerPosition; + +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class DataFrameIndexerPositionTests extends AbstractResponseTestCase< + DataFrameIndexerPosition, + org.elasticsearch.client.dataframe.transforms.DataFrameIndexerPosition> { + + public static DataFrameIndexerPosition fromHlrc( + org.elasticsearch.client.dataframe.transforms.DataFrameIndexerPosition instance) { + if (instance == null) { + return null; + } + return new DataFrameIndexerPosition(instance.getIndexerPosition(), instance.getBucketsPosition()); + } + + @Override + protected DataFrameIndexerPosition createServerTestInstance() { + return new DataFrameIndexerPosition(randomPositionMap(), randomPositionMap()); + } + + @Override + protected org.elasticsearch.client.dataframe.transforms.DataFrameIndexerPosition doParseToClientInstance(XContentParser parser) { + return org.elasticsearch.client.dataframe.transforms.DataFrameIndexerPosition.fromXContent(parser); + } + + @Override + protected void assertInstances(DataFrameIndexerPosition serverTestInstance, + org.elasticsearch.client.dataframe.transforms.DataFrameIndexerPosition clientInstance) { + assertThat(serverTestInstance.getIndexerPosition(), equalTo(clientInstance.getIndexerPosition())); + assertThat(serverTestInstance.getBucketsPosition(), equalTo(clientInstance.getBucketsPosition())); + } + + private static Map randomPositionMap() { + if (randomBoolean()) { + return null; + } + int numFields = randomIntBetween(1, 5); + Map position = new LinkedHashMap<>(); + for (int i = 0; i < numFields; i++) { + Object value; + if (randomBoolean()) { + value = randomLong(); + } else { + value = randomAlphaOfLengthBetween(1, 10); + } + position.put(randomAlphaOfLengthBetween(3, 10), value); + } + return position; + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStateAndStatsTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStateAndStatsTests.java index dde44898bf9..7b54ab538c3 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStateAndStatsTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStateAndStatsTests.java @@ -19,8 +19,8 @@ package org.elasticsearch.client.dataframe.transforms.hlrc; -import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.client.AbstractHlrcXContentTestCase; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats; @@ -64,7 +64,10 @@ public class DataFrameTransformStateAndStatsTests extends AbstractHlrcXContentTe @Override protected Predicate getRandomFieldsExcludeFilter() { - return field -> field.equals("state.current_position") || field.equals("state.node") || field.equals("state.node.attributes"); + return field -> field.equals("state.position.indexer_position") || + field.equals("state.position.bucket_position") || + field.equals("state.node") || + field.equals("state.node.attributes"); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStateTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStateTests.java index b97e0a72c1f..6d378bca5f8 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStateTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/hlrc/DataFrameTransformStateTests.java @@ -19,8 +19,9 @@ package org.elasticsearch.client.dataframe.transforms.hlrc; -import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.client.AbstractHlrcXContentTestCase; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerPosition; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo; @@ -42,7 +43,7 @@ public class DataFrameTransformStateTests extends AbstractHlrcXContentTestCase getRandomFieldsExcludeFilter() { - return field -> field.equals("current_position") || field.equals("node.attributes"); + return field -> field.equals("position.indexer_position") || + field.equals("position.bucket_position") || + field.equals("node.attributes"); } public static DataFrameTransformStateAndStats randomDataFrameTransformStateAndStats(String id) { @@ -95,6 +98,10 @@ public class DataFrameTransformStateTests extends AbstractHlrcXContentTestCase indexerPosition; + private final Map bucketPosition; + + @SuppressWarnings("unchecked") + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, + true, + args -> new DataFrameIndexerPosition((Map) args[0],(Map) args[1])); + + static { + PARSER.declareField(optionalConstructorArg(), XContentParser::mapOrdered, INDEXER_POSITION, ValueType.OBJECT); + PARSER.declareField(optionalConstructorArg(), XContentParser::mapOrdered, BUCKET_POSITION, ValueType.OBJECT); + } + + public DataFrameIndexerPosition(Map indexerPosition, Map bucketPosition) { + this.indexerPosition = indexerPosition == null ? null : Collections.unmodifiableMap(indexerPosition); + this.bucketPosition = bucketPosition == null ? null : Collections.unmodifiableMap(bucketPosition); + } + + public DataFrameIndexerPosition(StreamInput in) throws IOException { + Map position = in.readMap(); + indexerPosition = position == null ? null : Collections.unmodifiableMap(position); + position = in.readMap(); + bucketPosition = position == null ? null : Collections.unmodifiableMap(position); + } + + public Map getIndexerPosition() { + return indexerPosition; + } + + public Map getBucketsPosition() { + return bucketPosition; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(indexerPosition); + out.writeMap(bucketPosition); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (indexerPosition != null) { + builder.field(INDEXER_POSITION.getPreferredName(), indexerPosition); + } + if (bucketPosition != null) { + builder.field(BUCKET_POSITION.getPreferredName(), bucketPosition); + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + DataFrameIndexerPosition that = (DataFrameIndexerPosition) other; + + return Objects.equals(this.indexerPosition, that.indexerPosition) && + Objects.equals(this.bucketPosition, that.bucketPosition); + } + + @Override + public int hashCode() { + return Objects.hash(indexerPosition, bucketPosition); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + public static DataFrameIndexerPosition fromXContent(XContentParser parser) { + try { + return PARSER.parse(parser, null); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java index 2c3ad36d684..f942f0dd2a9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java @@ -22,8 +22,6 @@ import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.indexing.IndexerState; import java.io.IOException; -import java.util.Collections; -import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; @@ -39,7 +37,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState private final long checkpoint; @Nullable - private final Map currentPosition; + private final DataFrameIndexerPosition position; @Nullable private final String reason; @Nullable @@ -47,7 +45,10 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState public static final ParseField TASK_STATE = new ParseField("task_state"); public static final ParseField INDEXER_STATE = new ParseField("indexer_state"); + + // 7.3 BWC: current_position only exists in 7.2. In 7.3+ it is replaced by position. public static final ParseField CURRENT_POSITION = new ParseField("current_position"); + public static final ParseField POSITION = new ParseField("position"); public static final ParseField CHECKPOINT = new ParseField("checkpoint"); public static final ParseField REASON = new ParseField("reason"); public static final ParseField PROGRESS = new ParseField("progress"); @@ -56,18 +57,30 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState @SuppressWarnings("unchecked") public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, true, - args -> new DataFrameTransformState((DataFrameTransformTaskState) args[0], - (IndexerState) args[1], - (Map) args[2], - (long) args[3], - (String) args[4], - (DataFrameTransformProgress) args[5], - (NodeAttributes) args[6])); + args -> { + DataFrameTransformTaskState taskState = (DataFrameTransformTaskState) args[0]; + IndexerState indexerState = (IndexerState) args[1]; + Map bwcCurrentPosition = (Map) args[2]; + DataFrameIndexerPosition dataFrameIndexerPosition = (DataFrameIndexerPosition) args[3]; + + // BWC handling, translate current_position to position iff position isn't set + if (bwcCurrentPosition != null && dataFrameIndexerPosition == null) { + dataFrameIndexerPosition = new DataFrameIndexerPosition(bwcCurrentPosition, null); + } + + long checkpoint = (long) args[4]; + String reason = (String) args[5]; + DataFrameTransformProgress progress = (DataFrameTransformProgress) args[6]; + NodeAttributes node = (NodeAttributes) args[7]; + + return new DataFrameTransformState(taskState, indexerState, dataFrameIndexerPosition, checkpoint, reason, progress, node); + }); static { PARSER.declareField(constructorArg(), p -> DataFrameTransformTaskState.fromString(p.text()), TASK_STATE, ValueType.STRING); PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE, ValueType.STRING); PARSER.declareField(optionalConstructorArg(), XContentParser::mapOrdered, CURRENT_POSITION, ValueType.OBJECT); + PARSER.declareField(optionalConstructorArg(), DataFrameIndexerPosition::fromXContent, POSITION, ValueType.OBJECT); PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), CHECKPOINT); PARSER.declareString(optionalConstructorArg(), REASON); PARSER.declareField(optionalConstructorArg(), DataFrameTransformProgress.PARSER::apply, PROGRESS, ValueType.OBJECT); @@ -76,14 +89,14 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState public DataFrameTransformState(DataFrameTransformTaskState taskState, IndexerState indexerState, - @Nullable Map position, + @Nullable DataFrameIndexerPosition position, long checkpoint, @Nullable String reason, @Nullable DataFrameTransformProgress progress, @Nullable NodeAttributes node) { this.taskState = taskState; this.indexerState = indexerState; - this.currentPosition = position == null ? null : Collections.unmodifiableMap(new LinkedHashMap<>(position)); + this.position = position; this.checkpoint = checkpoint; this.reason = reason; this.progress = progress; @@ -92,7 +105,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState public DataFrameTransformState(DataFrameTransformTaskState taskState, IndexerState indexerState, - @Nullable Map position, + @Nullable DataFrameIndexerPosition position, long checkpoint, @Nullable String reason, @Nullable DataFrameTransformProgress progress) { @@ -102,8 +115,12 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState public DataFrameTransformState(StreamInput in) throws IOException { taskState = DataFrameTransformTaskState.fromStream(in); indexerState = IndexerState.fromStream(in); - Map position = in.readMap(); - currentPosition = position == null ? null : Collections.unmodifiableMap(position); + if (in.getVersion().onOrAfter(Version.V_7_3_0)) { + position = in.readOptionalWriteable(DataFrameIndexerPosition::new); + } else { + Map pos = in.readMap(); + position = new DataFrameIndexerPosition(pos, null); + } checkpoint = in.readLong(); reason = in.readOptionalString(); progress = in.readOptionalWriteable(DataFrameTransformProgress::new); @@ -122,8 +139,8 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState return indexerState; } - public Map getPosition() { - return currentPosition; + public DataFrameIndexerPosition getPosition() { + return position; } public long getCheckpoint() { @@ -169,8 +186,8 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState builder.startObject(); builder.field(TASK_STATE.getPreferredName(), taskState.value()); builder.field(INDEXER_STATE.getPreferredName(), indexerState.value()); - if (currentPosition != null) { - builder.field(CURRENT_POSITION.getPreferredName(), currentPosition); + if (position != null) { + builder.field(POSITION.getPreferredName(), position); } builder.field(CHECKPOINT.getPreferredName(), checkpoint); if (reason != null) { @@ -195,7 +212,11 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState public void writeTo(StreamOutput out) throws IOException { taskState.writeTo(out); indexerState.writeTo(out); - out.writeMap(currentPosition); + if (out.getVersion().onOrAfter(Version.V_7_3_0)) { + out.writeOptionalWriteable(position); + } else { + out.writeMap(position != null ? position.getIndexerPosition() : null); + } out.writeLong(checkpoint); out.writeOptionalString(reason); out.writeOptionalWriteable(progress); @@ -218,7 +239,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState return Objects.equals(this.taskState, that.taskState) && Objects.equals(this.indexerState, that.indexerState) && - Objects.equals(this.currentPosition, that.currentPosition) && + Objects.equals(this.position, that.position) && this.checkpoint == that.checkpoint && Objects.equals(this.reason, that.reason) && Objects.equals(this.progress, that.progress) && @@ -227,7 +248,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState @Override public int hashCode() { - return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason, progress, node); + return Objects.hash(taskState, indexerState, position, checkpoint, reason, progress, node); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index efe57f44e89..bd159e6fc33 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -349,31 +349,44 @@ public abstract class AsyncTwoPhaseIndexer docs = iterationResult.getToIndex(); - final BulkRequest bulkRequest = new BulkRequest(); - docs.forEach(bulkRequest::add); - // TODO this might be a valid case, e.g. if implementation filters - assert bulkRequest.requests().size() > 0; + // an iteration result might return an empty set of documents to be indexed + if (docs.isEmpty() == false) { + final BulkRequest bulkRequest = new BulkRequest(); + docs.forEach(bulkRequest::add); - stats.markStartIndexing(); - doNextBulk(bulkRequest, ActionListener.wrap(bulkResponse -> { - // TODO we should check items in the response and move after accordingly to - // resume the failing buckets ? - if (bulkResponse.hasFailures()) { - logger.warn("Error while attempting to bulk index documents: " + bulkResponse.buildFailureMessage()); + stats.markStartIndexing(); + doNextBulk(bulkRequest, ActionListener.wrap(bulkResponse -> { + // TODO we should check items in the response and move after accordingly to + // resume the failing buckets ? + if (bulkResponse.hasFailures()) { + logger.warn("Error while attempting to bulk index documents: " + bulkResponse.buildFailureMessage()); + } + stats.incrementNumOutputDocuments(bulkResponse.getItems().length); + + // check if indexer has been asked to stop, state {@link IndexerState#STOPPING} + if (checkState(getState()) == false) { + return; + } + + JobPosition newPosition = iterationResult.getPosition(); + position.set(newPosition); + + onBulkResponse(bulkResponse, newPosition); + }, this::finishWithIndexingFailure)); + } else { + // no documents need to be indexed, continue with search + try { + JobPosition newPosition = iterationResult.getPosition(); + position.set(newPosition); + + ActionListener listener = ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure); + nextSearch(listener); + } catch (Exception e) { + finishAndSetState(); + onFailure(e); } - stats.incrementNumOutputDocuments(bulkResponse.getItems().length); - - // check if indexer has been asked to stop, state {@link IndexerState#STOPPING} - if (checkState(getState()) == false) { - return; - } - - JobPosition newPosition = iterationResult.getPosition(); - position.set(newPosition); - - onBulkResponse(bulkResponse, newPosition); - }, this::finishWithIndexingFailure)); + } } catch (Exception e) { finishWithSearchFailure(e); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameIndexerPositionTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameIndexerPositionTests.java new file mode 100644 index 00000000000..dd57a0302a4 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameIndexerPositionTests.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.dataframe.transforms; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Predicate; + +public class DataFrameIndexerPositionTests extends AbstractSerializingTestCase { + + public static DataFrameIndexerPosition randomDataFrameIndexerPosition() { + return new DataFrameIndexerPosition(randomPosition(), randomPosition()); + } + + @Override + protected DataFrameIndexerPosition createTestInstance() { + return randomDataFrameIndexerPosition(); + } + + @Override + protected Reader instanceReader() { + return DataFrameIndexerPosition::new; + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } + + @Override + protected Predicate getRandomFieldsExcludeFilter() { + return field -> !field.isEmpty(); + } + + @Override + protected DataFrameIndexerPosition doParseInstance(XContentParser parser) throws IOException { + return DataFrameIndexerPosition.fromXContent(parser); + } + + private static Map randomPosition() { + if (randomBoolean()) { + return null; + } + int numFields = randomIntBetween(1, 5); + Map position = new HashMap<>(); + for (int i = 0; i < numFields; i++) { + Object value; + if (randomBoolean()) { + value = randomLong(); + } else { + value = randomAlphaOfLengthBetween(1, 10); + } + position.put(randomAlphaOfLengthBetween(3, 10), value); + } + + return position; + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateTests.java index 9f4ac546c89..cc6fe88e5b2 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateTests.java @@ -12,8 +12,6 @@ import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.xpack.core.indexing.IndexerState; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; import java.util.function.Predicate; import static org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgressTests.randomDataFrameTransformProgress; @@ -24,7 +22,7 @@ public class DataFrameTransformStateTests extends AbstractSerializingTestCase randomPosition() { - if (randomBoolean()) { - return null; - } - int numFields = randomIntBetween(1, 5); - Map position = new HashMap<>(); - for (int i = 0; i < numFields; i++) { - Object value; - if (randomBoolean()) { - value = randomLong(); - } else { - value = randomAlphaOfLengthBetween(1, 10); - } - position.put(randomAlphaOfLengthBetween(3, 10), value); - } - return position; - } - @Override protected boolean supportsUnknownFields() { return true; diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java index 1516b415d2a..60119ad5788 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java @@ -23,6 +23,7 @@ import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregati import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerPosition; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; @@ -47,7 +48,23 @@ import java.util.stream.Stream; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer, DataFrameIndexerTransformStats> { +public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer { + + /** + * RunState is an internal (non-persisted) state that controls the internal logic + * which query filters to run and which index requests to send + */ + private enum RunState { + // do a complete query/index, this is used for batch data frames and for bootstraping (1st run) + FULL_RUN, + + // Partial run modes in 2 stages: + // identify buckets that have changed + PARTIAL_RUN_IDENTIFY_CHANGES, + + // recalculate buckets based on the update list + PARTIAL_RUN_APPLY_CHANGES + } public static final int MINIMUM_PAGE_SIZE = 10; public static final String COMPOSITE_AGGREGATION_NAME = "_data_frame"; @@ -61,24 +78,34 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer> changedBuckets; + private volatile Map changedBucketsAfterKey; public DataFrameIndexer(Executor executor, DataFrameAuditor auditor, DataFrameTransformConfig transformConfig, Map fieldMappings, AtomicReference initialState, - Map initialPosition, + DataFrameIndexerPosition initialPosition, DataFrameIndexerTransformStats jobStats, DataFrameTransformProgress transformProgress, - DataFrameTransformCheckpoint inProgressOrLastCheckpoint) { + DataFrameTransformCheckpoint lastCheckpoint, + DataFrameTransformCheckpoint nextCheckpoint) { super(executor, initialState, initialPosition, jobStats); this.auditor = Objects.requireNonNull(auditor); this.transformConfig = ExceptionsHelper.requireNonNull(transformConfig, "transformConfig"); this.fieldMappings = ExceptionsHelper.requireNonNull(fieldMappings, "fieldMappings"); this.progress = transformProgress; - this.inProgressOrLastCheckpoint = inProgressOrLastCheckpoint; + this.lastCheckpoint = lastCheckpoint; + this.nextCheckpoint = nextCheckpoint; + // give runState a default + this.runState = RunState.FULL_RUN; } protected abstract void failIndexer(String message); @@ -117,6 +144,8 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer> doProcess(SearchResponse searchResponse) { + protected IterationResult doProcess(SearchResponse searchResponse) { final CompositeAggregation agg = searchResponse.getAggregations().get(COMPOSITE_AGGREGATION_NAME); + switch (runState) { + case FULL_RUN: + return processBuckets(agg); + case PARTIAL_RUN_APPLY_CHANGES: + return processPartialBucketUpdates(agg); + case PARTIAL_RUN_IDENTIFY_CHANGES: + return processChangedBuckets(agg); + + default: + // Any other state is a bug, should not happen + logger.warn("Encountered unexpected run state [" + runState + "]"); + throw new IllegalStateException("DataFrame indexer job encountered an illegal state [" + runState + "]"); + } + } + + private IterationResult processBuckets(final CompositeAggregation agg) { // we reached the end if (agg.getBuckets().isEmpty()) { return new IterationResult<>(Collections.emptyList(), null, true); } long docsBeforeProcess = getStats().getNumDocuments(); - IterationResult> result = new IterationResult<>(processBucketsToIndexRequests(agg).collect(Collectors.toList()), - agg.afterKey(), - agg.getBuckets().isEmpty()); + + DataFrameIndexerPosition oldPosition = getPosition(); + DataFrameIndexerPosition newPosition = new DataFrameIndexerPosition(agg.afterKey(), + oldPosition != null ? getPosition().getBucketsPosition() : null); + + IterationResult result = new IterationResult<>( + processBucketsToIndexRequests(agg).collect(Collectors.toList()), + newPosition, + agg.getBuckets().isEmpty()); + if (progress != null) { progress.docsProcessed(getStats().getNumDocuments() - docsBeforeProcess); } + return result; } + private IterationResult processPartialBucketUpdates(final CompositeAggregation agg) { + // we reached the end + if (agg.getBuckets().isEmpty()) { + // cleanup changed Buckets + changedBuckets = null; + + // reset the runState to fetch changed buckets + runState = RunState.PARTIAL_RUN_IDENTIFY_CHANGES; + // advance the cursor for changed bucket detection + return new IterationResult<>(Collections.emptyList(), + new DataFrameIndexerPosition(null, changedBucketsAfterKey), false); + } + + return processBuckets(agg); + } + + + private IterationResult processChangedBuckets(final CompositeAggregation agg) { + // initialize the map of changed buckets, the map might be empty if source do not require/implement + // changed bucket detection + changedBuckets = pivot.initialIncrementalBucketUpdateMap(); + + // reached the end? + if (agg.getBuckets().isEmpty()) { + // reset everything and return the end marker + changedBuckets = null; + changedBucketsAfterKey = null; + return new IterationResult<>(Collections.emptyList(), null, true); + } + // else + + // collect all buckets that require the update + agg.getBuckets().stream().forEach(bucket -> { + bucket.getKey().forEach((k, v) -> { + changedBuckets.get(k).add(v.toString()); + }); + }); + + // remember the after key but do not store it in the state yet (in the failure we need to retrieve it again) + changedBucketsAfterKey = agg.afterKey(); + + // reset the runState to fetch the partial updates next + runState = RunState.PARTIAL_RUN_APPLY_CHANGES; + + return new IterationResult<>(Collections.emptyList(), getPosition(), false); + } + /* * Parses the result and creates a stream of indexable documents * @@ -197,43 +297,128 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer>> listener) { - - ActionListener>> wrappedListener = ActionListener.wrap( - r -> { - this.inProgressOrLastCheckpoint = newCheckpoint; - this.changedBuckets = r; - listener.onResponse(r); - }, - listener::onFailure - ); - // initialize the map of changed buckets, the map might be empty if source do not require/implement - // changed bucket detection - Map> keys = pivot.initialIncrementalBucketUpdateMap(); - if (keys.isEmpty()) { - logger.trace("This data frame does not implement changed bucket detection, returning"); - wrappedListener.onResponse(null); - return; + private RunState determineRunStateAtStart() { + // either 1st run or not a continuous data frame + if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false) { + return RunState.FULL_RUN; } - SearchRequest searchRequest = new SearchRequest(getConfig().getSource().getIndex()); - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); - - // we do not need the sub-aggs - CompositeAggregationBuilder changesAgg = pivot.buildIncrementalBucketUpdateAggregation(pageSize); - sourceBuilder.aggregation(changesAgg); - sourceBuilder.size(0); - - QueryBuilder pivotQueryBuilder = getConfig().getSource().getQueryConfig().getQuery(); - - DataFrameTransformConfig config = getConfig(); - if (config.getSyncConfig() != null) { - BoolQueryBuilder filteredQuery = new BoolQueryBuilder(). - filter(pivotQueryBuilder). - filter(config.getSyncConfig().getRangeQuery(oldCheckpoint, newCheckpoint)); - - logger.trace("Gathering changes using query {}", filteredQuery); - sourceBuilder.query(filteredQuery); - } else { - logger.trace("No sync configured"); - wrappedListener.onResponse(null); - return; + // if incremental update is not supported, do a full run + if (pivot.supportsIncrementalBucketUpdate() == false) { + return RunState.FULL_RUN; } - searchRequest.source(sourceBuilder); - searchRequest.allowPartialSearchResults(false); - - collectChangedBuckets(searchRequest, changesAgg, keys, ActionListener.wrap(wrappedListener::onResponse, e -> { - // fall back if bucket collection failed - logger.error("Failed to retrieve changed buckets, fall back to complete retrieval", e); - wrappedListener.onResponse(null); - })); - } - - void collectChangedBuckets(SearchRequest searchRequest, CompositeAggregationBuilder changesAgg, Map> keys, - ActionListener>> finalListener) { - - // re-using the existing search hook - doNextSearch(searchRequest, ActionListener.wrap(searchResponse -> { - final CompositeAggregation agg = searchResponse.getAggregations().get(COMPOSITE_AGGREGATION_NAME); - - agg.getBuckets().stream().forEach(bucket -> { - bucket.getKey().forEach((k, v) -> { - keys.get(k).add(v.toString()); - }); - }); - - if (agg.getBuckets().isEmpty()) { - finalListener.onResponse(keys); - } else { - // adjust the after key - changesAgg.aggregateAfter(agg.afterKey()); - collectChangedBuckets(searchRequest, changesAgg, keys, finalListener); - } - }, finalListener::onFailure)); + // continuous mode: we need to get the changed buckets first + return RunState.PARTIAL_RUN_IDENTIFY_CHANGES; } /** diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index 1836c5e13b0..6942d09cd0e 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.dataframe.transforms; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; @@ -127,14 +128,53 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx .setTransformsCheckpointService(dataFrameTransformsCheckpointService) .setTransformsConfigManager(transformsConfigManager); + final SetOnce stateHolder = new SetOnce<>(); + ActionListener startTaskListener = ActionListener.wrap( response -> logger.info("Successfully completed and scheduled task in node operation"), failure -> logger.error("Failed to start task ["+ transformId +"] in node operation", failure) ); - Long previousCheckpoint = transformPTaskState != null ? transformPTaskState.getCheckpoint() : null; + // <5> load next checkpoint + ActionListener getTransformNextCheckpointListener = ActionListener.wrap( + nextCheckpoint -> { + indexerBuilder.setNextCheckpoint(nextCheckpoint); - // <4> Set the previous stats (if they exist), initialize the indexer, start the task (If it is STOPPED) + final long lastCheckpoint = stateHolder.get().getCheckpoint(); + + logger.trace("[{}] No next checkpoint found, starting the task", transformId); + startTask(buildTask, indexerBuilder, lastCheckpoint, startTaskListener); + }, + error -> { + // TODO: do not use the same error message as for loading the last checkpoint + String msg = DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_LOAD_TRANSFORM_CHECKPOINT, transformId); + logger.error(msg, error); + markAsFailed(buildTask, msg); + } + ); + + // <4> load last checkpoint + ActionListener getTransformLastCheckpointListener = ActionListener.wrap( + lastCheckpoint -> { + indexerBuilder.setLastCheckpoint(lastCheckpoint); + + final long nextCheckpoint = stateHolder.get().getInProgressCheckpoint(); + + if (nextCheckpoint > 0) { + transformsConfigManager.getTransformCheckpoint(transformId, nextCheckpoint, getTransformNextCheckpointListener); + } else { + logger.trace("[{}] No next checkpoint found, starting the task", transformId); + startTask(buildTask, indexerBuilder, lastCheckpoint.getCheckpoint(), startTaskListener); + } + }, + error -> { + String msg = DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_LOAD_TRANSFORM_CHECKPOINT, transformId); + logger.error(msg, error); + markAsFailed(buildTask, msg); + } + ); + + // <3> Set the previous stats (if they exist), initialize the indexer, start the task (If it is STOPPED) // Since we don't create the task until `_start` is called, if we see that the task state is stopped, attempt to start // Schedule execution regardless ActionListener transformStatsActionListener = ActionListener.wrap( @@ -149,27 +189,26 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx stateAndStats.getTransformState(), stateAndStats.getTransformState().getPosition()); - final Long checkpoint = stateAndStats.getTransformState().getCheckpoint(); - startTask(buildTask, indexerBuilder, checkpoint, startTaskListener); + stateHolder.set(stateAndStats.getTransformState()); + final long lastCheckpoint = stateHolder.get().getCheckpoint(); + + if (lastCheckpoint == 0) { + logger.trace("[{}] No checkpoint found, starting the task", transformId); + startTask(buildTask, indexerBuilder, lastCheckpoint, startTaskListener); + } else { + logger.trace ("[{}] Restore last checkpoint: [{}]", transformId, lastCheckpoint); + transformsConfigManager.getTransformCheckpoint(transformId, lastCheckpoint, getTransformLastCheckpointListener); + } }, error -> { if (error instanceof ResourceNotFoundException == false) { - logger.warn("Unable to load previously persisted statistics for transform [" + params.getId() + "]", error); + String msg = DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_LOAD_TRANSFORM_STATE, transformId); + logger.error(msg, error); + markAsFailed(buildTask, msg); } - startTask(buildTask, indexerBuilder, previousCheckpoint, startTaskListener); - } - ); - // <3> set the in progress checkpoint for the indexer, get the in progress checkpoint - ActionListener getTransformCheckpointListener = ActionListener.wrap( - cp -> { - indexerBuilder.setInProgressOrLastCheckpoint(cp); - transformsConfigManager.getTransformStats(transformId, transformStatsActionListener); - }, - error -> { - String msg = DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_LOAD_TRANSFORM_CHECKPOINT, transformId); - logger.error(msg, error); - markAsFailed(buildTask, msg); + logger.trace("[{}] No stats found(new transform), starting the task", transformId); + startTask(buildTask, indexerBuilder, null, startTaskListener); } ); @@ -177,17 +216,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx ActionListener> getFieldMappingsListener = ActionListener.wrap( fieldMappings -> { indexerBuilder.setFieldMappings(fieldMappings); - - long inProgressCheckpoint = transformPTaskState == null ? 0L : - Math.max(transformPTaskState.getCheckpoint(), transformPTaskState.getInProgressCheckpoint()); - - logger.debug("Restore in progress or last checkpoint: {}", inProgressCheckpoint); - - if (inProgressCheckpoint == 0) { - getTransformCheckpointListener.onResponse(DataFrameTransformCheckpoint.EMPTY); - } else { - transformsConfigManager.getTransformCheckpoint(transformId, inProgressCheckpoint, getTransformCheckpointListener); - } + transformsConfigManager.getTransformStats(transformId, transformStatsActionListener); }, error -> { String msg = DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS, diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 57cb468fdd8..8542222e4bd 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -30,6 +30,7 @@ import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerPosition; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint; @@ -50,7 +51,6 @@ import org.elasticsearch.xpack.dataframe.transforms.pivot.AggregationResultUtils import java.util.Arrays; import java.util.Map; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -72,7 +72,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S private final SchedulerEngine schedulerEngine; private final ThreadPool threadPool; private final DataFrameAuditor auditor; - private final Map initialPosition; + private final DataFrameIndexerPosition initialPosition; private final IndexerState initialIndexerState; private final SetOnce indexer = new SetOnce<>(); @@ -95,7 +95,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S DataFrameTransformTaskState initialTaskState = DataFrameTransformTaskState.STOPPED; String initialReason = null; long initialGeneration = 0; - Map initialPosition = null; + DataFrameIndexerPosition initialPosition = null; if (state != null) { initialTaskState = state.getTaskState(); initialReason = state.getReason(); @@ -383,9 +383,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S private DataFrameTransformConfig transformConfig; private DataFrameIndexerTransformStats initialStats; private IndexerState indexerState = IndexerState.STOPPED; - private Map initialPosition; + private DataFrameIndexerPosition initialPosition; private DataFrameTransformProgress progress; - private DataFrameTransformCheckpoint inProgressOrLastCheckpoint; + private DataFrameTransformCheckpoint lastCheckpoint; + private DataFrameTransformCheckpoint nextCheckpoint; ClientDataFrameIndexerBuilder(String transformId) { this.transformId = transformId; @@ -404,7 +405,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S this.transformConfig, this.fieldMappings, this.progress, - this.inProgressOrLastCheckpoint, + this.lastCheckpoint, + this.nextCheckpoint, parentTask); } @@ -457,7 +459,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S return this; } - ClientDataFrameIndexerBuilder setInitialPosition(Map initialPosition) { + ClientDataFrameIndexerBuilder setInitialPosition(DataFrameIndexerPosition initialPosition) { this.initialPosition = initialPosition; return this; } @@ -467,8 +469,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S return this; } - ClientDataFrameIndexerBuilder setInProgressOrLastCheckpoint(DataFrameTransformCheckpoint inProgressOrLastCheckpoint) { - this.inProgressOrLastCheckpoint = inProgressOrLastCheckpoint; + ClientDataFrameIndexerBuilder setLastCheckpoint(DataFrameTransformCheckpoint lastCheckpoint) { + this.lastCheckpoint = lastCheckpoint; + return this; + } + + ClientDataFrameIndexerBuilder setNextCheckpoint(DataFrameTransformCheckpoint nextCheckpoint) { + this.nextCheckpoint = nextCheckpoint; return this; } } @@ -491,14 +498,15 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S DataFrameTransformsConfigManager transformsConfigManager, DataFrameTransformsCheckpointService transformsCheckpointService, AtomicReference initialState, - Map initialPosition, + DataFrameIndexerPosition initialPosition, Client client, DataFrameAuditor auditor, DataFrameIndexerTransformStats initialStats, DataFrameTransformConfig transformConfig, Map fieldMappings, DataFrameTransformProgress transformProgress, - DataFrameTransformCheckpoint inProgressOrLastCheckpoint, + DataFrameTransformCheckpoint lastCheckpoint, + DataFrameTransformCheckpoint nextCheckpoint, DataFrameTransformTask parentTask) { super(ExceptionsHelper.requireNonNull(parentTask, "parentTask") .threadPool @@ -510,7 +518,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S initialPosition, initialStats == null ? new DataFrameIndexerTransformStats(transformId) : initialStats, transformProgress, - inProgressOrLastCheckpoint); + lastCheckpoint, + nextCheckpoint); this.transformId = ExceptionsHelper.requireNonNull(transformId, "transformId"); this.transformsConfigManager = ExceptionsHelper.requireNonNull(transformsConfigManager, "transformsConfigManager"); this.transformsCheckpointService = ExceptionsHelper.requireNonNull(transformsCheckpointService, @@ -526,9 +535,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S // Since multiple checkpoints can be executed in the task while it is running on the same node, we need to gather // the progress here, and not in the executor. if (initialRun()) { - ActionListener>> changedBucketsListener = ActionListener.wrap( - r -> { - TransformProgressGatherer.getInitialProgress(this.client, buildFilterQuery(), getConfig(), ActionListener.wrap( + createCheckpoint(ActionListener.wrap(cp -> { + nextCheckpoint = cp; + TransformProgressGatherer.getInitialProgress(this.client, buildFilterQuery(), getConfig(), ActionListener.wrap( newProgress -> { logger.trace("[{}] reset the progress from [{}] to [{}]", transformId, progress, newProgress); progress = newProgress; @@ -540,20 +549,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S super.onStart(now, listener); } )); - }, - listener::onFailure - ); - - createCheckpoint(ActionListener.wrap(cp -> { - DataFrameTransformCheckpoint oldCheckpoint = inProgressOrLastCheckpoint; - if (oldCheckpoint.isEmpty()) { - // this is the 1st run, accept the new in progress checkpoint and go on - inProgressOrLastCheckpoint = cp; - changedBucketsListener.onResponse(null); - } else { - logger.debug ("Getting changes from {} to {}", oldCheckpoint.getTimeUpperBound(), cp.getTimeUpperBound()); - getChangedBuckets(oldCheckpoint, cp, changedBucketsListener); - } }, listener::onFailure)); } else { super.onStart(now, listener); @@ -615,7 +610,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S } @Override - protected void doSaveState(IndexerState indexerState, Map position, Runnable next) { + protected void doSaveState(IndexerState indexerState, DataFrameIndexerPosition position, Runnable next) { if (indexerState.equals(IndexerState.ABORTING)) { // If we're aborting, just invoke `next` (which is likely an onFailure handler) next.run(); @@ -698,8 +693,12 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S @Override protected void onFinish(ActionListener listener) { try { + // TODO: needs cleanup super is called with a listener, but listener.onResponse is called below + // super.onFinish() fortunately ignores the listener super.onFinish(listener); long checkpoint = transformTask.currentCheckpoint.getAndIncrement(); + lastCheckpoint = nextCheckpoint; + nextCheckpoint = null; // Reset our failure count as we have finished and may start again with a new checkpoint failureCount.set(0); if (checkpoint % ON_FINISH_AUDIT_FREQUENCY == 0) { @@ -756,7 +755,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S SetOnce changed = new SetOnce<>(); transformsCheckpointService.getCheckpoint(transformConfig, new LatchedActionListener<>(ActionListener.wrap( cp -> { - long behind = DataFrameTransformCheckpoint.getBehind(inProgressOrLastCheckpoint, cp); + long behind = DataFrameTransformCheckpoint.getBehind(lastCheckpoint, cp); if (behind > 0) { logger.debug("Detected changes, dest is {} operations behind the source", behind); changed.set(true); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java index 0c0104603f9..2ecebb915c2 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java @@ -51,6 +51,7 @@ public class Pivot { private static final Logger logger = LogManager.getLogger(Pivot.class); private final PivotConfig config; + private final boolean supportsIncrementalBucketUpdate; // objects for re-using private final CompositeAggregationBuilder cachedCompositeAggregation; @@ -58,6 +59,13 @@ public class Pivot { public Pivot(PivotConfig config) { this.config = config; this.cachedCompositeAggregation = createCompositeAggregation(config); + + boolean supportsIncrementalBucketUpdate = false; + for(Entry entry: config.getGroupConfig().getGroups().entrySet()) { + supportsIncrementalBucketUpdate |= entry.getValue().supportsIncrementalBucketUpdate(); + } + + this.supportsIncrementalBucketUpdate = supportsIncrementalBucketUpdate; } public void validate(Client client, SourceConfig sourceConfig, final ActionListener listener) { @@ -135,6 +143,10 @@ public class Pivot { return changedBuckets; } + public boolean supportsIncrementalBucketUpdate() { + return supportsIncrementalBucketUpdate; + } + public Stream> extractResults(CompositeAggregation agg, Map fieldTypeMap, DataFrameIndexerTransformStats dataFrameIndexerTransformStats) { diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java index 03a34b01334..154588443cb 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerPosition; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; @@ -32,6 +33,8 @@ import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; import org.junit.Before; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Collections; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -68,13 +71,13 @@ public class DataFrameIndexerTests extends ESTestCase { Map fieldMappings, DataFrameAuditor auditor, AtomicReference initialState, - Map initialPosition, + DataFrameIndexerPosition initialPosition, DataFrameIndexerTransformStats jobStats, Function searchFunction, Function bulkFunction, Consumer failureConsumer) { super(executor, auditor, transformConfig, fieldMappings, initialState, initialPosition, jobStats, - /* DataFrameTransformProgress */ null, DataFrameTransformCheckpoint.EMPTY); + /* DataFrameTransformProgress */ null, DataFrameTransformCheckpoint.EMPTY, DataFrameTransformCheckpoint.EMPTY); this.searchFunction = searchFunction; this.bulkFunction = bulkFunction; this.failureConsumer = failureConsumer; @@ -129,7 +132,7 @@ public class DataFrameIndexerTests extends ESTestCase { } @Override - protected void doSaveState(IndexerState state, Map position, Runnable next) { + protected void doSaveState(IndexerState state, DataFrameIndexerPosition position, Runnable next) { assert state == IndexerState.STARTED || state == IndexerState.INDEXING || state == IndexerState.STOPPED; next.run(); } @@ -198,7 +201,12 @@ public class DataFrameIndexerTests extends ESTestCase { Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); - Consumer failureConsumer = e -> fail("expected circuit breaker exception to be handled"); + Consumer failureConsumer = e -> { + final StringWriter sw = new StringWriter(); + final PrintWriter pw = new PrintWriter(sw, true); + e.printStackTrace(pw); + fail("expected circuit breaker exception to be handled, got:" + e + " Trace: " + sw.getBuffer().toString()); + }; final ExecutorService executor = Executors.newFixedThreadPool(1); try {