[7.x][ML-DataFrame] Rewrite continuous logic to prevent terms count limit (#44287)

Rewrites how continuous data frame transforms calculates and handles buckets that require an update. Instead of storing the whole set in memory, it pages through the updates using a 2nd cursor. This lowers memory consumption and prevents problems with limits at query time (max_terms_count). The list of updates can be re-retrieved in a failure case (#43662)
This commit is contained in:
Hendrik Muhs 2019-07-13 06:58:04 +02:00 committed by GitHub
parent 1dcf53465c
commit 684b562381
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 907 additions and 277 deletions

View File

@ -163,8 +163,8 @@ task verifyVersions {
* after the backport of the backcompat code is complete.
*/
boolean bwc_tests_enabled = true
final String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */
boolean bwc_tests_enabled = false
final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/44219" /* place a PR link here when committing bwc changes */
if (bwc_tests_enabled == false) {
if (bwc_tests_disabled_issue.isEmpty()) {
throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false")

View File

@ -0,0 +1,99 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe.transforms;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
/**
* Holds state of the cursors:
*
* indexer_position: the position of the indexer querying the source
* bucket_position: the position used for identifying changes
*/
public class DataFrameIndexerPosition {
public static final ParseField INDEXER_POSITION = new ParseField("indexer_position");
public static final ParseField BUCKET_POSITION = new ParseField("bucket_position");
private final Map<String, Object> indexerPosition;
private final Map<String, Object> bucketPosition;
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<DataFrameIndexerPosition, Void> PARSER = new ConstructingObjectParser<>(
"data_frame_indexer_position",
true,
args -> new DataFrameIndexerPosition((Map<String, Object>) args[0],(Map<String, Object>) args[1]));
static {
PARSER.declareField(optionalConstructorArg(), XContentParser::mapOrdered, INDEXER_POSITION, ValueType.OBJECT);
PARSER.declareField(optionalConstructorArg(), XContentParser::mapOrdered, BUCKET_POSITION, ValueType.OBJECT);
}
public DataFrameIndexerPosition(Map<String, Object> indexerPosition, Map<String, Object> bucketPosition) {
this.indexerPosition = indexerPosition == null ? null : Collections.unmodifiableMap(indexerPosition);
this.bucketPosition = bucketPosition == null ? null : Collections.unmodifiableMap(bucketPosition);
}
public Map<String, Object> getIndexerPosition() {
return indexerPosition;
}
public Map<String, Object> getBucketsPosition() {
return bucketPosition;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
DataFrameIndexerPosition that = (DataFrameIndexerPosition) other;
return Objects.equals(this.indexerPosition, that.indexerPosition) &&
Objects.equals(this.bucketPosition, that.bucketPosition);
}
@Override
public int hashCode() {
return Objects.hash(indexerPosition, bucketPosition);
}
public static DataFrameIndexerPosition fromXContent(XContentParser parser) {
try {
return PARSER.parse(parser, null);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -27,8 +27,6 @@ import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
@ -39,7 +37,10 @@ public class DataFrameTransformState {
private static final ParseField INDEXER_STATE = new ParseField("indexer_state");
private static final ParseField TASK_STATE = new ParseField("task_state");
// 7.3 BWC: current_position only exists in 7.2. In 7.3+ it is replaced by position.
private static final ParseField CURRENT_POSITION = new ParseField("current_position");
private static final ParseField POSITION = new ParseField("position");
private static final ParseField CHECKPOINT = new ParseField("checkpoint");
private static final ParseField REASON = new ParseField("reason");
private static final ParseField PROGRESS = new ParseField("progress");
@ -48,18 +49,31 @@ public class DataFrameTransformState {
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<DataFrameTransformState, Void> PARSER =
new ConstructingObjectParser<>("data_frame_transform_state", true,
args -> new DataFrameTransformState((DataFrameTransformTaskState) args[0],
(IndexerState) args[1],
(Map<String, Object>) args[2],
(long) args[3],
(String) args[4],
(DataFrameTransformProgress) args[5],
(NodeAttributes) args[6]));
args -> {
DataFrameTransformTaskState taskState = (DataFrameTransformTaskState) args[0];
IndexerState indexerState = (IndexerState) args[1];
Map<String, Object> bwcCurrentPosition = (Map<String, Object>) args[2];
DataFrameIndexerPosition dataFrameIndexerPosition = (DataFrameIndexerPosition) args[3];
// BWC handling, translate current_position to position iff position isn't set
if (bwcCurrentPosition != null && dataFrameIndexerPosition == null) {
dataFrameIndexerPosition = new DataFrameIndexerPosition(bwcCurrentPosition, null);
}
long checkpoint = (long) args[4];
String reason = (String) args[5];
DataFrameTransformProgress progress = (DataFrameTransformProgress) args[6];
NodeAttributes node = (NodeAttributes) args[7];
return new DataFrameTransformState(taskState, indexerState, dataFrameIndexerPosition, checkpoint, reason, progress,
node);
});
static {
PARSER.declareField(constructorArg(), p -> DataFrameTransformTaskState.fromString(p.text()), TASK_STATE, ValueType.STRING);
PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE, ValueType.STRING);
PARSER.declareField(optionalConstructorArg(), (p, c) -> p.mapOrdered(), CURRENT_POSITION, ValueType.OBJECT);
PARSER.declareField(optionalConstructorArg(), DataFrameIndexerPosition::fromXContent, POSITION, ValueType.OBJECT);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), CHECKPOINT);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REASON);
PARSER.declareField(optionalConstructorArg(), DataFrameTransformProgress::fromXContent, PROGRESS, ValueType.OBJECT);
@ -73,21 +87,21 @@ public class DataFrameTransformState {
private final DataFrameTransformTaskState taskState;
private final IndexerState indexerState;
private final long checkpoint;
private final Map<String, Object> currentPosition;
private final DataFrameIndexerPosition position;
private final String reason;
private final DataFrameTransformProgress progress;
private final NodeAttributes node;
public DataFrameTransformState(DataFrameTransformTaskState taskState,
IndexerState indexerState,
@Nullable Map<String, Object> position,
@Nullable DataFrameIndexerPosition position,
long checkpoint,
@Nullable String reason,
@Nullable DataFrameTransformProgress progress,
@Nullable NodeAttributes node) {
this.taskState = taskState;
this.indexerState = indexerState;
this.currentPosition = position == null ? null : Collections.unmodifiableMap(new LinkedHashMap<>(position));
this.position = position;
this.checkpoint = checkpoint;
this.reason = reason;
this.progress = progress;
@ -103,8 +117,8 @@ public class DataFrameTransformState {
}
@Nullable
public Map<String, Object> getPosition() {
return currentPosition;
public DataFrameIndexerPosition getPosition() {
return position;
}
public long getCheckpoint() {
@ -140,7 +154,7 @@ public class DataFrameTransformState {
return Objects.equals(this.taskState, that.taskState) &&
Objects.equals(this.indexerState, that.indexerState) &&
Objects.equals(this.currentPosition, that.currentPosition) &&
Objects.equals(this.position, that.position) &&
Objects.equals(this.progress, that.progress) &&
this.checkpoint == that.checkpoint &&
Objects.equals(this.node, that.node) &&
@ -149,7 +163,7 @@ public class DataFrameTransformState {
@Override
public int hashCode() {
return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason, progress, node);
return Objects.hash(taskState, indexerState, position, checkpoint, reason, progress, node);
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe.transforms;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
public class DataFrameIndexerPositionTests extends ESTestCase {
public void testFromXContent() throws IOException {
xContentTester(this::createParser,
DataFrameIndexerPositionTests::randomDataFrameIndexerPosition,
DataFrameIndexerPositionTests::toXContent,
DataFrameIndexerPosition::fromXContent)
.supportsUnknownFields(true)
.randomFieldsExcludeFilter(field -> field.equals("indexer_position") ||
field.equals("bucket_position"))
.test();
}
public static DataFrameIndexerPosition randomDataFrameIndexerPosition() {
return new DataFrameIndexerPosition(randomPositionMap(), randomPositionMap());
}
public static void toXContent(DataFrameIndexerPosition position, XContentBuilder builder) throws IOException {
builder.startObject();
if (position.getIndexerPosition() != null) {
builder.field("indexer_position", position.getIndexerPosition());
}
if (position.getBucketsPosition() != null) {
builder.field("bucket_position", position.getBucketsPosition());
}
builder.endObject();
}
private static Map<String, Object> randomPositionMap() {
if (randomBoolean()) {
return null;
}
int numFields = randomIntBetween(1, 5);
Map<String, Object> position = new LinkedHashMap<>();
for (int i = 0; i < numFields; i++) {
Object value;
if (randomBoolean()) {
value = randomLong();
} else {
value = randomAlphaOfLengthBetween(1, 10);
}
position.put(randomAlphaOfLengthBetween(3, 10), value);
}
return position;
}
}

View File

@ -25,8 +25,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
@ -38,7 +36,8 @@ public class DataFrameTransformStateTests extends ESTestCase {
DataFrameTransformStateTests::toXContent,
DataFrameTransformState::fromXContent)
.supportsUnknownFields(true)
.randomFieldsExcludeFilter(field -> field.equals("current_position") ||
.randomFieldsExcludeFilter(field -> field.equals("position.indexer_position") ||
field.equals("position.bucket_position") ||
field.equals("node.attributes"))
.test();
}
@ -46,7 +45,7 @@ public class DataFrameTransformStateTests extends ESTestCase {
public static DataFrameTransformState randomDataFrameTransformState() {
return new DataFrameTransformState(randomFrom(DataFrameTransformTaskState.values()),
randomFrom(IndexerState.values()),
randomPositionMap(),
randomBoolean() ? null : DataFrameIndexerPositionTests.randomDataFrameIndexerPosition(),
randomLongBetween(0,10),
randomBoolean() ? null : randomAlphaOfLength(10),
randomBoolean() ? null : DataFrameTransformProgressTests.randomInstance(),
@ -58,7 +57,8 @@ public class DataFrameTransformStateTests extends ESTestCase {
builder.field("task_state", state.getTaskState().value());
builder.field("indexer_state", state.getIndexerState().value());
if (state.getPosition() != null) {
builder.field("current_position", state.getPosition());
builder.field("position");
DataFrameIndexerPositionTests.toXContent(state.getPosition(), builder);
}
builder.field("checkpoint", state.getCheckpoint());
if (state.getReason() != null) {
@ -75,21 +75,4 @@ public class DataFrameTransformStateTests extends ESTestCase {
builder.endObject();
}
private static Map<String, Object> randomPositionMap() {
if (randomBoolean()) {
return null;
}
int numFields = randomIntBetween(1, 5);
Map<String, Object> position = new LinkedHashMap<>();
for (int i = 0; i < numFields; i++) {
Object value;
if (randomBoolean()) {
value = randomLong();
} else {
value = randomAlphaOfLengthBetween(1, 10);
}
position.put(randomAlphaOfLengthBetween(3, 10), value);
}
return position;
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe.transforms.hlrc;
import org.elasticsearch.client.AbstractResponseTestCase;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerPosition;
import java.util.LinkedHashMap;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
public class DataFrameIndexerPositionTests extends AbstractResponseTestCase<
DataFrameIndexerPosition,
org.elasticsearch.client.dataframe.transforms.DataFrameIndexerPosition> {
public static DataFrameIndexerPosition fromHlrc(
org.elasticsearch.client.dataframe.transforms.DataFrameIndexerPosition instance) {
if (instance == null) {
return null;
}
return new DataFrameIndexerPosition(instance.getIndexerPosition(), instance.getBucketsPosition());
}
@Override
protected DataFrameIndexerPosition createServerTestInstance() {
return new DataFrameIndexerPosition(randomPositionMap(), randomPositionMap());
}
@Override
protected org.elasticsearch.client.dataframe.transforms.DataFrameIndexerPosition doParseToClientInstance(XContentParser parser) {
return org.elasticsearch.client.dataframe.transforms.DataFrameIndexerPosition.fromXContent(parser);
}
@Override
protected void assertInstances(DataFrameIndexerPosition serverTestInstance,
org.elasticsearch.client.dataframe.transforms.DataFrameIndexerPosition clientInstance) {
assertThat(serverTestInstance.getIndexerPosition(), equalTo(clientInstance.getIndexerPosition()));
assertThat(serverTestInstance.getBucketsPosition(), equalTo(clientInstance.getBucketsPosition()));
}
private static Map<String, Object> randomPositionMap() {
if (randomBoolean()) {
return null;
}
int numFields = randomIntBetween(1, 5);
Map<String, Object> position = new LinkedHashMap<>();
for (int i = 0; i < numFields; i++) {
Object value;
if (randomBoolean()) {
value = randomLong();
} else {
value = randomAlphaOfLengthBetween(1, 10);
}
position.put(randomAlphaOfLengthBetween(3, 10), value);
}
return position;
}
}

View File

@ -19,8 +19,8 @@
package org.elasticsearch.client.dataframe.transforms.hlrc;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.client.AbstractHlrcXContentTestCase;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
@ -64,7 +64,10 @@ public class DataFrameTransformStateAndStatsTests extends AbstractHlrcXContentTe
@Override
protected Predicate<String> getRandomFieldsExcludeFilter() {
return field -> field.equals("state.current_position") || field.equals("state.node") || field.equals("state.node.attributes");
return field -> field.equals("state.position.indexer_position") ||
field.equals("state.position.bucket_position") ||
field.equals("state.node") ||
field.equals("state.node.attributes");
}
}

View File

@ -19,8 +19,9 @@
package org.elasticsearch.client.dataframe.transforms.hlrc;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.client.AbstractHlrcXContentTestCase;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerPosition;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo;
@ -42,7 +43,7 @@ public class DataFrameTransformStateTests extends AbstractHlrcXContentTestCase<D
public static DataFrameTransformState fromHlrc(org.elasticsearch.client.dataframe.transforms.DataFrameTransformState instance) {
return new DataFrameTransformState(DataFrameTransformTaskState.fromString(instance.getTaskState().value()),
IndexerState.fromString(instance.getIndexerState().value()),
instance.getPosition(),
DataFrameIndexerPositionTests.fromHlrc(instance.getPosition()),
instance.getCheckpoint(),
instance.getReason(),
DataFrameTransformProgressTests.fromHlrc(instance.getProgress()),
@ -85,7 +86,9 @@ public class DataFrameTransformStateTests extends AbstractHlrcXContentTestCase<D
@Override
protected Predicate<String> getRandomFieldsExcludeFilter() {
return field -> field.equals("current_position") || field.equals("node.attributes");
return field -> field.equals("position.indexer_position") ||
field.equals("position.bucket_position") ||
field.equals("node.attributes");
}
public static DataFrameTransformStateAndStats randomDataFrameTransformStateAndStats(String id) {
@ -95,6 +98,10 @@ public class DataFrameTransformStateTests extends AbstractHlrcXContentTestCase<D
randomDataFrameTransformCheckpointingInfo());
}
public static DataFrameIndexerPosition randomDataFrameIndexerPosition() {
return new DataFrameIndexerPosition(randomPosition(), randomPosition());
}
public static DataFrameTransformCheckpointingInfo randomDataFrameTransformCheckpointingInfo() {
return new DataFrameTransformCheckpointingInfo(randomDataFrameTransformCheckpointStats(),
randomDataFrameTransformCheckpointStats(), randomNonNegativeLong());
@ -134,7 +141,7 @@ public class DataFrameTransformStateTests extends AbstractHlrcXContentTestCase<D
public static DataFrameTransformState randomDataFrameTransformState() {
return new DataFrameTransformState(randomFrom(DataFrameTransformTaskState.values()),
randomFrom(IndexerState.values()),
randomPosition(),
randomDataFrameIndexerPosition(),
randomLongBetween(0,10),
randomBoolean() ? null : randomAlphaOfLength(10),
randomBoolean() ? null : randomDataFrameTransformProgress(),

View File

@ -43,7 +43,9 @@ public class DataFrameMessages {
public static final String FAILED_TO_PARSE_TRANSFORM_STATISTICS_CONFIGURATION =
"Failed to parse transform statistics for data frame transform [{0}]";
public static final String FAILED_TO_LOAD_TRANSFORM_CHECKPOINT =
"Failed to load data frame transform configuration for transform [{0}]";
"Failed to load data frame transform checkpoint for transform [{0}]";
public static final String FAILED_TO_LOAD_TRANSFORM_STATE =
"Failed to load data frame transform state for transform [{0}]";
public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_NO_TRANSFORM =
"Data frame transform configuration must specify exactly 1 function";
public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_PIVOT_NO_GROUP_BY =

View File

@ -0,0 +1,118 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class DataFrameIndexerPosition implements Writeable, ToXContentObject {
public static final String NAME = "data_frame/indexer_position";
public static final ParseField INDEXER_POSITION = new ParseField("indexer_position");
public static final ParseField BUCKET_POSITION = new ParseField("bucket_position");
private final Map<String, Object> indexerPosition;
private final Map<String, Object> bucketPosition;
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<DataFrameIndexerPosition, Void> PARSER = new ConstructingObjectParser<>(NAME,
true,
args -> new DataFrameIndexerPosition((Map<String, Object>) args[0],(Map<String, Object>) args[1]));
static {
PARSER.declareField(optionalConstructorArg(), XContentParser::mapOrdered, INDEXER_POSITION, ValueType.OBJECT);
PARSER.declareField(optionalConstructorArg(), XContentParser::mapOrdered, BUCKET_POSITION, ValueType.OBJECT);
}
public DataFrameIndexerPosition(Map<String, Object> indexerPosition, Map<String, Object> bucketPosition) {
this.indexerPosition = indexerPosition == null ? null : Collections.unmodifiableMap(indexerPosition);
this.bucketPosition = bucketPosition == null ? null : Collections.unmodifiableMap(bucketPosition);
}
public DataFrameIndexerPosition(StreamInput in) throws IOException {
Map<String, Object> position = in.readMap();
indexerPosition = position == null ? null : Collections.unmodifiableMap(position);
position = in.readMap();
bucketPosition = position == null ? null : Collections.unmodifiableMap(position);
}
public Map<String, Object> getIndexerPosition() {
return indexerPosition;
}
public Map<String, Object> getBucketsPosition() {
return bucketPosition;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(indexerPosition);
out.writeMap(bucketPosition);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (indexerPosition != null) {
builder.field(INDEXER_POSITION.getPreferredName(), indexerPosition);
}
if (bucketPosition != null) {
builder.field(BUCKET_POSITION.getPreferredName(), bucketPosition);
}
builder.endObject();
return builder;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
DataFrameIndexerPosition that = (DataFrameIndexerPosition) other;
return Objects.equals(this.indexerPosition, that.indexerPosition) &&
Objects.equals(this.bucketPosition, that.bucketPosition);
}
@Override
public int hashCode() {
return Objects.hash(indexerPosition, bucketPosition);
}
@Override
public String toString() {
return Strings.toString(this);
}
public static DataFrameIndexerPosition fromXContent(XContentParser parser) {
try {
return PARSER.parse(parser, null);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -22,8 +22,6 @@ import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
@ -39,7 +37,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
private final long checkpoint;
@Nullable
private final Map<String, Object> currentPosition;
private final DataFrameIndexerPosition position;
@Nullable
private final String reason;
@Nullable
@ -47,7 +45,10 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
public static final ParseField TASK_STATE = new ParseField("task_state");
public static final ParseField INDEXER_STATE = new ParseField("indexer_state");
// 7.3 BWC: current_position only exists in 7.2. In 7.3+ it is replaced by position.
public static final ParseField CURRENT_POSITION = new ParseField("current_position");
public static final ParseField POSITION = new ParseField("position");
public static final ParseField CHECKPOINT = new ParseField("checkpoint");
public static final ParseField REASON = new ParseField("reason");
public static final ParseField PROGRESS = new ParseField("progress");
@ -56,18 +57,30 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<DataFrameTransformState, Void> PARSER = new ConstructingObjectParser<>(NAME,
true,
args -> new DataFrameTransformState((DataFrameTransformTaskState) args[0],
(IndexerState) args[1],
(Map<String, Object>) args[2],
(long) args[3],
(String) args[4],
(DataFrameTransformProgress) args[5],
(NodeAttributes) args[6]));
args -> {
DataFrameTransformTaskState taskState = (DataFrameTransformTaskState) args[0];
IndexerState indexerState = (IndexerState) args[1];
Map<String, Object> bwcCurrentPosition = (Map<String, Object>) args[2];
DataFrameIndexerPosition dataFrameIndexerPosition = (DataFrameIndexerPosition) args[3];
// BWC handling, translate current_position to position iff position isn't set
if (bwcCurrentPosition != null && dataFrameIndexerPosition == null) {
dataFrameIndexerPosition = new DataFrameIndexerPosition(bwcCurrentPosition, null);
}
long checkpoint = (long) args[4];
String reason = (String) args[5];
DataFrameTransformProgress progress = (DataFrameTransformProgress) args[6];
NodeAttributes node = (NodeAttributes) args[7];
return new DataFrameTransformState(taskState, indexerState, dataFrameIndexerPosition, checkpoint, reason, progress, node);
});
static {
PARSER.declareField(constructorArg(), p -> DataFrameTransformTaskState.fromString(p.text()), TASK_STATE, ValueType.STRING);
PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE, ValueType.STRING);
PARSER.declareField(optionalConstructorArg(), XContentParser::mapOrdered, CURRENT_POSITION, ValueType.OBJECT);
PARSER.declareField(optionalConstructorArg(), DataFrameIndexerPosition::fromXContent, POSITION, ValueType.OBJECT);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), CHECKPOINT);
PARSER.declareString(optionalConstructorArg(), REASON);
PARSER.declareField(optionalConstructorArg(), DataFrameTransformProgress.PARSER::apply, PROGRESS, ValueType.OBJECT);
@ -76,14 +89,14 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
public DataFrameTransformState(DataFrameTransformTaskState taskState,
IndexerState indexerState,
@Nullable Map<String, Object> position,
@Nullable DataFrameIndexerPosition position,
long checkpoint,
@Nullable String reason,
@Nullable DataFrameTransformProgress progress,
@Nullable NodeAttributes node) {
this.taskState = taskState;
this.indexerState = indexerState;
this.currentPosition = position == null ? null : Collections.unmodifiableMap(new LinkedHashMap<>(position));
this.position = position;
this.checkpoint = checkpoint;
this.reason = reason;
this.progress = progress;
@ -92,7 +105,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
public DataFrameTransformState(DataFrameTransformTaskState taskState,
IndexerState indexerState,
@Nullable Map<String, Object> position,
@Nullable DataFrameIndexerPosition position,
long checkpoint,
@Nullable String reason,
@Nullable DataFrameTransformProgress progress) {
@ -102,8 +115,12 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
public DataFrameTransformState(StreamInput in) throws IOException {
taskState = DataFrameTransformTaskState.fromStream(in);
indexerState = IndexerState.fromStream(in);
Map<String, Object> position = in.readMap();
currentPosition = position == null ? null : Collections.unmodifiableMap(position);
if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
position = in.readOptionalWriteable(DataFrameIndexerPosition::new);
} else {
Map<String, Object> pos = in.readMap();
position = new DataFrameIndexerPosition(pos, null);
}
checkpoint = in.readLong();
reason = in.readOptionalString();
progress = in.readOptionalWriteable(DataFrameTransformProgress::new);
@ -122,8 +139,8 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
return indexerState;
}
public Map<String, Object> getPosition() {
return currentPosition;
public DataFrameIndexerPosition getPosition() {
return position;
}
public long getCheckpoint() {
@ -169,8 +186,8 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
builder.startObject();
builder.field(TASK_STATE.getPreferredName(), taskState.value());
builder.field(INDEXER_STATE.getPreferredName(), indexerState.value());
if (currentPosition != null) {
builder.field(CURRENT_POSITION.getPreferredName(), currentPosition);
if (position != null) {
builder.field(POSITION.getPreferredName(), position);
}
builder.field(CHECKPOINT.getPreferredName(), checkpoint);
if (reason != null) {
@ -195,7 +212,11 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
public void writeTo(StreamOutput out) throws IOException {
taskState.writeTo(out);
indexerState.writeTo(out);
out.writeMap(currentPosition);
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
out.writeOptionalWriteable(position);
} else {
out.writeMap(position != null ? position.getIndexerPosition() : null);
}
out.writeLong(checkpoint);
out.writeOptionalString(reason);
out.writeOptionalWriteable(progress);
@ -218,7 +239,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
return Objects.equals(this.taskState, that.taskState) &&
Objects.equals(this.indexerState, that.indexerState) &&
Objects.equals(this.currentPosition, that.currentPosition) &&
Objects.equals(this.position, that.position) &&
this.checkpoint == that.checkpoint &&
Objects.equals(this.reason, that.reason) &&
Objects.equals(this.progress, that.progress) &&
@ -227,7 +248,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
@Override
public int hashCode() {
return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason, progress, node);
return Objects.hash(taskState, indexerState, position, checkpoint, reason, progress, node);
}
@Override

View File

@ -349,12 +349,12 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
}
final List<IndexRequest> docs = iterationResult.getToIndex();
// an iteration result might return an empty set of documents to be indexed
if (docs.isEmpty() == false) {
final BulkRequest bulkRequest = new BulkRequest();
docs.forEach(bulkRequest::add);
// TODO this might be a valid case, e.g. if implementation filters
assert bulkRequest.requests().size() > 0;
stats.markStartIndexing();
doNextBulk(bulkRequest, ActionListener.wrap(bulkResponse -> {
// TODO we should check items in the response and move after accordingly to
@ -374,6 +374,19 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
onBulkResponse(bulkResponse, newPosition);
}, this::finishWithIndexingFailure));
} else {
// no documents need to be indexed, continue with search
try {
JobPosition newPosition = iterationResult.getPosition();
position.set(newPosition);
ActionListener<SearchResponse> listener = ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure);
nextSearch(listener);
} catch (Exception e) {
finishAndSetState();
onFailure(e);
}
}
} catch (Exception e) {
finishWithSearchFailure(e);
}

View File

@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Predicate;
public class DataFrameIndexerPositionTests extends AbstractSerializingTestCase<DataFrameIndexerPosition> {
public static DataFrameIndexerPosition randomDataFrameIndexerPosition() {
return new DataFrameIndexerPosition(randomPosition(), randomPosition());
}
@Override
protected DataFrameIndexerPosition createTestInstance() {
return randomDataFrameIndexerPosition();
}
@Override
protected Reader<DataFrameIndexerPosition> instanceReader() {
return DataFrameIndexerPosition::new;
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
@Override
protected Predicate<String> getRandomFieldsExcludeFilter() {
return field -> !field.isEmpty();
}
@Override
protected DataFrameIndexerPosition doParseInstance(XContentParser parser) throws IOException {
return DataFrameIndexerPosition.fromXContent(parser);
}
private static Map<String, Object> randomPosition() {
if (randomBoolean()) {
return null;
}
int numFields = randomIntBetween(1, 5);
Map<String, Object> position = new HashMap<>();
for (int i = 0; i < numFields; i++) {
Object value;
if (randomBoolean()) {
value = randomLong();
} else {
value = randomAlphaOfLengthBetween(1, 10);
}
position.put(randomAlphaOfLengthBetween(3, 10), value);
}
return position;
}
}

View File

@ -12,8 +12,6 @@ import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Predicate;
import static org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgressTests.randomDataFrameTransformProgress;
@ -24,7 +22,7 @@ public class DataFrameTransformStateTests extends AbstractSerializingTestCase<Da
public static DataFrameTransformState randomDataFrameTransformState() {
return new DataFrameTransformState(randomFrom(DataFrameTransformTaskState.values()),
randomFrom(IndexerState.values()),
randomPosition(),
DataFrameIndexerPositionTests.randomDataFrameIndexerPosition(),
randomLongBetween(0,10),
randomBoolean() ? null : randomAlphaOfLength(10),
randomBoolean() ? null : randomDataFrameTransformProgress(),
@ -46,24 +44,6 @@ public class DataFrameTransformStateTests extends AbstractSerializingTestCase<Da
return DataFrameTransformState::new;
}
private static Map<String, Object> randomPosition() {
if (randomBoolean()) {
return null;
}
int numFields = randomIntBetween(1, 5);
Map<String, Object> position = new HashMap<>();
for (int i = 0; i < numFields; i++) {
Object value;
if (randomBoolean()) {
value = randomLong();
} else {
value = randomAlphaOfLengthBetween(1, 10);
}
position.put(randomAlphaOfLengthBetween(3, 10), value);
}
return position;
}
@Override
protected boolean supportsUnknownFields() {
return true;

View File

@ -23,6 +23,7 @@ import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregati
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerPosition;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
@ -47,7 +48,23 @@ import java.util.stream.Stream;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String, Object>, DataFrameIndexerTransformStats> {
public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<DataFrameIndexerPosition, DataFrameIndexerTransformStats> {
/**
* RunState is an internal (non-persisted) state that controls the internal logic
* which query filters to run and which index requests to send
*/
private enum RunState {
// do a complete query/index, this is used for batch data frames and for bootstraping (1st run)
FULL_RUN,
// Partial run modes in 2 stages:
// identify buckets that have changed
PARTIAL_RUN_IDENTIFY_CHANGES,
// recalculate buckets based on the update list
PARTIAL_RUN_APPLY_CHANGES
}
public static final int MINIMUM_PAGE_SIZE = 10;
public static final String COMPOSITE_AGGREGATION_NAME = "_data_frame";
@ -61,24 +78,34 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
private Pivot pivot;
private int pageSize = 0;
protected volatile DataFrameTransformCheckpoint inProgressOrLastCheckpoint;
protected volatile DataFrameTransformCheckpoint lastCheckpoint;
protected volatile DataFrameTransformCheckpoint nextCheckpoint;
private volatile RunState runState;
// hold information for continuous mode (partial updates)
private volatile Map<String, Set<String>> changedBuckets;
private volatile Map<String, Object> changedBucketsAfterKey;
public DataFrameIndexer(Executor executor,
DataFrameAuditor auditor,
DataFrameTransformConfig transformConfig,
Map<String, String> fieldMappings,
AtomicReference<IndexerState> initialState,
Map<String, Object> initialPosition,
DataFrameIndexerPosition initialPosition,
DataFrameIndexerTransformStats jobStats,
DataFrameTransformProgress transformProgress,
DataFrameTransformCheckpoint inProgressOrLastCheckpoint) {
DataFrameTransformCheckpoint lastCheckpoint,
DataFrameTransformCheckpoint nextCheckpoint) {
super(executor, initialState, initialPosition, jobStats);
this.auditor = Objects.requireNonNull(auditor);
this.transformConfig = ExceptionsHelper.requireNonNull(transformConfig, "transformConfig");
this.fieldMappings = ExceptionsHelper.requireNonNull(fieldMappings, "fieldMappings");
this.progress = transformProgress;
this.inProgressOrLastCheckpoint = inProgressOrLastCheckpoint;
this.lastCheckpoint = lastCheckpoint;
this.nextCheckpoint = nextCheckpoint;
// give runState a default
this.runState = RunState.FULL_RUN;
}
protected abstract void failIndexer(String message);
@ -117,6 +144,8 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
if (pageSize == 0) {
pageSize = pivot.getInitialPageSize();
}
runState = determineRunStateAtStart();
listener.onResponse(null);
} catch (Exception e) {
listener.onFailure(e);
@ -136,24 +165,95 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
}
@Override
protected IterationResult<Map<String, Object>> doProcess(SearchResponse searchResponse) {
protected IterationResult<DataFrameIndexerPosition> doProcess(SearchResponse searchResponse) {
final CompositeAggregation agg = searchResponse.getAggregations().get(COMPOSITE_AGGREGATION_NAME);
switch (runState) {
case FULL_RUN:
return processBuckets(agg);
case PARTIAL_RUN_APPLY_CHANGES:
return processPartialBucketUpdates(agg);
case PARTIAL_RUN_IDENTIFY_CHANGES:
return processChangedBuckets(agg);
default:
// Any other state is a bug, should not happen
logger.warn("Encountered unexpected run state [" + runState + "]");
throw new IllegalStateException("DataFrame indexer job encountered an illegal state [" + runState + "]");
}
}
private IterationResult<DataFrameIndexerPosition> processBuckets(final CompositeAggregation agg) {
// we reached the end
if (agg.getBuckets().isEmpty()) {
return new IterationResult<>(Collections.emptyList(), null, true);
}
long docsBeforeProcess = getStats().getNumDocuments();
IterationResult<Map<String, Object>> result = new IterationResult<>(processBucketsToIndexRequests(agg).collect(Collectors.toList()),
agg.afterKey(),
DataFrameIndexerPosition oldPosition = getPosition();
DataFrameIndexerPosition newPosition = new DataFrameIndexerPosition(agg.afterKey(),
oldPosition != null ? getPosition().getBucketsPosition() : null);
IterationResult<DataFrameIndexerPosition> result = new IterationResult<>(
processBucketsToIndexRequests(agg).collect(Collectors.toList()),
newPosition,
agg.getBuckets().isEmpty());
if (progress != null) {
progress.docsProcessed(getStats().getNumDocuments() - docsBeforeProcess);
}
return result;
}
private IterationResult<DataFrameIndexerPosition> processPartialBucketUpdates(final CompositeAggregation agg) {
// we reached the end
if (agg.getBuckets().isEmpty()) {
// cleanup changed Buckets
changedBuckets = null;
// reset the runState to fetch changed buckets
runState = RunState.PARTIAL_RUN_IDENTIFY_CHANGES;
// advance the cursor for changed bucket detection
return new IterationResult<>(Collections.emptyList(),
new DataFrameIndexerPosition(null, changedBucketsAfterKey), false);
}
return processBuckets(agg);
}
private IterationResult<DataFrameIndexerPosition> processChangedBuckets(final CompositeAggregation agg) {
// initialize the map of changed buckets, the map might be empty if source do not require/implement
// changed bucket detection
changedBuckets = pivot.initialIncrementalBucketUpdateMap();
// reached the end?
if (agg.getBuckets().isEmpty()) {
// reset everything and return the end marker
changedBuckets = null;
changedBucketsAfterKey = null;
return new IterationResult<>(Collections.emptyList(), null, true);
}
// else
// collect all buckets that require the update
agg.getBuckets().stream().forEach(bucket -> {
bucket.getKey().forEach((k, v) -> {
changedBuckets.get(k).add(v.toString());
});
});
// remember the after key but do not store it in the state yet (in the failure we need to retrieve it again)
changedBucketsAfterKey = agg.afterKey();
// reset the runState to fetch the partial updates next
runState = RunState.PARTIAL_RUN_APPLY_CHANGES;
return new IterationResult<>(Collections.emptyList(), getPosition(), false);
}
/*
* Parses the result and creates a stream of indexable documents
*
@ -197,17 +297,114 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
}
protected QueryBuilder buildFilterQuery() {
assert nextCheckpoint != null;
QueryBuilder pivotQueryBuilder = getConfig().getSource().getQueryConfig().getQuery();
DataFrameTransformConfig config = getConfig();
if (config.getSyncConfig() != null) {
if (inProgressOrLastCheckpoint == null) {
throw new RuntimeException("in progress checkpoint not found");
if (this.isContinuous()) {
BoolQueryBuilder filteredQuery = new BoolQueryBuilder()
.filter(pivotQueryBuilder);
if (lastCheckpoint != null) {
filteredQuery.filter(config.getSyncConfig().getRangeQuery(lastCheckpoint, nextCheckpoint));
} else {
filteredQuery.filter(config.getSyncConfig().getRangeQuery(nextCheckpoint));
}
return filteredQuery;
}
return pivotQueryBuilder;
}
@Override
protected SearchRequest buildSearchRequest() {
assert nextCheckpoint != null;
SearchRequest searchRequest = new SearchRequest(getConfig().getSource().getIndex())
.allowPartialSearchResults(false);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
.size(0);
switch (runState) {
case FULL_RUN:
buildFullRunQuery(sourceBuilder);
break;
case PARTIAL_RUN_IDENTIFY_CHANGES:
buildChangedBucketsQuery(sourceBuilder);
break;
case PARTIAL_RUN_APPLY_CHANGES:
buildPartialUpdateQuery(sourceBuilder);
break;
default:
// Any other state is a bug, should not happen
logger.warn("Encountered unexpected run state [" + runState + "]");
throw new IllegalStateException("DataFrame indexer job encountered an illegal state [" + runState + "]");
}
searchRequest.source(sourceBuilder);
return searchRequest;
}
private SearchSourceBuilder buildFullRunQuery(SearchSourceBuilder sourceBuilder) {
DataFrameIndexerPosition position = getPosition();
sourceBuilder.aggregation(pivot.buildAggregation(position != null ? position.getIndexerPosition() : null, pageSize));
DataFrameTransformConfig config = getConfig();
QueryBuilder pivotQueryBuilder = config.getSource().getQueryConfig().getQuery();
if (isContinuous()) {
BoolQueryBuilder filteredQuery = new BoolQueryBuilder()
.filter(pivotQueryBuilder)
.filter(config.getSyncConfig()
.getRangeQuery(nextCheckpoint));
sourceBuilder.query(filteredQuery);
} else {
sourceBuilder.query(pivotQueryBuilder);
}
logger.trace("running full run query: {}", sourceBuilder);
return sourceBuilder;
}
private SearchSourceBuilder buildChangedBucketsQuery(SearchSourceBuilder sourceBuilder) {
assert isContinuous();
DataFrameIndexerPosition position = getPosition();
CompositeAggregationBuilder changesAgg = pivot.buildIncrementalBucketUpdateAggregation(pageSize);
changesAgg.aggregateAfter(position != null ? position.getBucketsPosition() : null);
sourceBuilder.aggregation(changesAgg);
QueryBuilder pivotQueryBuilder = getConfig().getSource().getQueryConfig().getQuery();
DataFrameTransformConfig config = getConfig();
BoolQueryBuilder filteredQuery = new BoolQueryBuilder().
filter(pivotQueryBuilder).
filter(config.getSyncConfig().getRangeQuery(lastCheckpoint, nextCheckpoint));
sourceBuilder.query(filteredQuery);
logger.trace("running changes query {}", sourceBuilder);
return sourceBuilder;
}
private SearchSourceBuilder buildPartialUpdateQuery(SearchSourceBuilder sourceBuilder) {
assert isContinuous();
DataFrameIndexerPosition position = getPosition();
sourceBuilder.aggregation(pivot.buildAggregation(position != null ? position.getIndexerPosition() : null, pageSize));
DataFrameTransformConfig config = getConfig();
QueryBuilder pivotQueryBuilder = config.getSource().getQueryConfig().getQuery();
BoolQueryBuilder filteredQuery = new BoolQueryBuilder()
.filter(pivotQueryBuilder)
.filter(config.getSyncConfig().getRangeQuery(inProgressOrLastCheckpoint));
.filter(config.getSyncConfig()
.getRangeQuery(nextCheckpoint));
if (changedBuckets != null && changedBuckets.isEmpty() == false) {
QueryBuilder pivotFilter = pivot.filterBuckets(changedBuckets);
@ -216,22 +413,10 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
}
}
logger.trace("running filtered query: {}", filteredQuery);
return filteredQuery;
} else {
return pivotQueryBuilder;
}
}
sourceBuilder.query(filteredQuery);
logger.trace("running partial update query: {}", sourceBuilder);
@Override
protected SearchRequest buildSearchRequest() {
SearchRequest searchRequest = new SearchRequest(getConfig().getSource().getIndex());
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
.aggregation(pivot.buildAggregation(getPosition(), pageSize))
.size(0)
.query(buildFilterQuery());
searchRequest.source(sourceBuilder);
return searchRequest;
return sourceBuilder;
}
/**
@ -272,82 +457,19 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
return true;
}
protected void getChangedBuckets(DataFrameTransformCheckpoint oldCheckpoint,
DataFrameTransformCheckpoint newCheckpoint,
ActionListener<Map<String, Set<String>>> listener) {
ActionListener<Map<String, Set<String>>> wrappedListener = ActionListener.wrap(
r -> {
this.inProgressOrLastCheckpoint = newCheckpoint;
this.changedBuckets = r;
listener.onResponse(r);
},
listener::onFailure
);
// initialize the map of changed buckets, the map might be empty if source do not require/implement
// changed bucket detection
Map<String, Set<String>> keys = pivot.initialIncrementalBucketUpdateMap();
if (keys.isEmpty()) {
logger.trace("This data frame does not implement changed bucket detection, returning");
wrappedListener.onResponse(null);
return;
private RunState determineRunStateAtStart() {
// either 1st run or not a continuous data frame
if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false) {
return RunState.FULL_RUN;
}
SearchRequest searchRequest = new SearchRequest(getConfig().getSource().getIndex());
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// we do not need the sub-aggs
CompositeAggregationBuilder changesAgg = pivot.buildIncrementalBucketUpdateAggregation(pageSize);
sourceBuilder.aggregation(changesAgg);
sourceBuilder.size(0);
QueryBuilder pivotQueryBuilder = getConfig().getSource().getQueryConfig().getQuery();
DataFrameTransformConfig config = getConfig();
if (config.getSyncConfig() != null) {
BoolQueryBuilder filteredQuery = new BoolQueryBuilder().
filter(pivotQueryBuilder).
filter(config.getSyncConfig().getRangeQuery(oldCheckpoint, newCheckpoint));
logger.trace("Gathering changes using query {}", filteredQuery);
sourceBuilder.query(filteredQuery);
} else {
logger.trace("No sync configured");
wrappedListener.onResponse(null);
return;
// if incremental update is not supported, do a full run
if (pivot.supportsIncrementalBucketUpdate() == false) {
return RunState.FULL_RUN;
}
searchRequest.source(sourceBuilder);
searchRequest.allowPartialSearchResults(false);
collectChangedBuckets(searchRequest, changesAgg, keys, ActionListener.wrap(wrappedListener::onResponse, e -> {
// fall back if bucket collection failed
logger.error("Failed to retrieve changed buckets, fall back to complete retrieval", e);
wrappedListener.onResponse(null);
}));
}
void collectChangedBuckets(SearchRequest searchRequest, CompositeAggregationBuilder changesAgg, Map<String, Set<String>> keys,
ActionListener<Map<String, Set<String>>> finalListener) {
// re-using the existing search hook
doNextSearch(searchRequest, ActionListener.wrap(searchResponse -> {
final CompositeAggregation agg = searchResponse.getAggregations().get(COMPOSITE_AGGREGATION_NAME);
agg.getBuckets().stream().forEach(bucket -> {
bucket.getKey().forEach((k, v) -> {
keys.get(k).add(v.toString());
});
});
if (agg.getBuckets().isEmpty()) {
finalListener.onResponse(keys);
} else {
// adjust the after key
changesAgg.aggregateAfter(agg.afterKey());
collectChangedBuckets(searchRequest, changesAgg, keys, finalListener);
}
}, finalListener::onFailure));
// continuous mode: we need to get the changed buckets first
return RunState.PARTIAL_RUN_IDENTIFY_CHANGES;
}
/**

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.dataframe.transforms;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
@ -127,14 +128,53 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
.setTransformsCheckpointService(dataFrameTransformsCheckpointService)
.setTransformsConfigManager(transformsConfigManager);
final SetOnce<DataFrameTransformState> stateHolder = new SetOnce<>();
ActionListener<StartDataFrameTransformTaskAction.Response> startTaskListener = ActionListener.wrap(
response -> logger.info("Successfully completed and scheduled task in node operation"),
failure -> logger.error("Failed to start task ["+ transformId +"] in node operation", failure)
);
Long previousCheckpoint = transformPTaskState != null ? transformPTaskState.getCheckpoint() : null;
// <5> load next checkpoint
ActionListener<DataFrameTransformCheckpoint> getTransformNextCheckpointListener = ActionListener.wrap(
nextCheckpoint -> {
indexerBuilder.setNextCheckpoint(nextCheckpoint);
// <4> Set the previous stats (if they exist), initialize the indexer, start the task (If it is STOPPED)
final long lastCheckpoint = stateHolder.get().getCheckpoint();
logger.trace("[{}] No next checkpoint found, starting the task", transformId);
startTask(buildTask, indexerBuilder, lastCheckpoint, startTaskListener);
},
error -> {
// TODO: do not use the same error message as for loading the last checkpoint
String msg = DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_LOAD_TRANSFORM_CHECKPOINT, transformId);
logger.error(msg, error);
markAsFailed(buildTask, msg);
}
);
// <4> load last checkpoint
ActionListener<DataFrameTransformCheckpoint> getTransformLastCheckpointListener = ActionListener.wrap(
lastCheckpoint -> {
indexerBuilder.setLastCheckpoint(lastCheckpoint);
final long nextCheckpoint = stateHolder.get().getInProgressCheckpoint();
if (nextCheckpoint > 0) {
transformsConfigManager.getTransformCheckpoint(transformId, nextCheckpoint, getTransformNextCheckpointListener);
} else {
logger.trace("[{}] No next checkpoint found, starting the task", transformId);
startTask(buildTask, indexerBuilder, lastCheckpoint.getCheckpoint(), startTaskListener);
}
},
error -> {
String msg = DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_LOAD_TRANSFORM_CHECKPOINT, transformId);
logger.error(msg, error);
markAsFailed(buildTask, msg);
}
);
// <3> Set the previous stats (if they exist), initialize the indexer, start the task (If it is STOPPED)
// Since we don't create the task until `_start` is called, if we see that the task state is stopped, attempt to start
// Schedule execution regardless
ActionListener<DataFrameTransformStateAndStats> transformStatsActionListener = ActionListener.wrap(
@ -149,45 +189,34 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
stateAndStats.getTransformState(),
stateAndStats.getTransformState().getPosition());
final Long checkpoint = stateAndStats.getTransformState().getCheckpoint();
startTask(buildTask, indexerBuilder, checkpoint, startTaskListener);
stateHolder.set(stateAndStats.getTransformState());
final long lastCheckpoint = stateHolder.get().getCheckpoint();
if (lastCheckpoint == 0) {
logger.trace("[{}] No checkpoint found, starting the task", transformId);
startTask(buildTask, indexerBuilder, lastCheckpoint, startTaskListener);
} else {
logger.trace ("[{}] Restore last checkpoint: [{}]", transformId, lastCheckpoint);
transformsConfigManager.getTransformCheckpoint(transformId, lastCheckpoint, getTransformLastCheckpointListener);
}
},
error -> {
if (error instanceof ResourceNotFoundException == false) {
logger.warn("Unable to load previously persisted statistics for transform [" + params.getId() + "]", error);
}
startTask(buildTask, indexerBuilder, previousCheckpoint, startTaskListener);
}
);
// <3> set the in progress checkpoint for the indexer, get the in progress checkpoint
ActionListener<DataFrameTransformCheckpoint> getTransformCheckpointListener = ActionListener.wrap(
cp -> {
indexerBuilder.setInProgressOrLastCheckpoint(cp);
transformsConfigManager.getTransformStats(transformId, transformStatsActionListener);
},
error -> {
String msg = DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_LOAD_TRANSFORM_CHECKPOINT, transformId);
String msg = DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_LOAD_TRANSFORM_STATE, transformId);
logger.error(msg, error);
markAsFailed(buildTask, msg);
}
logger.trace("[{}] No stats found(new transform), starting the task", transformId);
startTask(buildTask, indexerBuilder, null, startTaskListener);
}
);
// <2> set fieldmappings for the indexer, get the previous stats (if they exist)
ActionListener<Map<String, String>> getFieldMappingsListener = ActionListener.wrap(
fieldMappings -> {
indexerBuilder.setFieldMappings(fieldMappings);
long inProgressCheckpoint = transformPTaskState == null ? 0L :
Math.max(transformPTaskState.getCheckpoint(), transformPTaskState.getInProgressCheckpoint());
logger.debug("Restore in progress or last checkpoint: {}", inProgressCheckpoint);
if (inProgressCheckpoint == 0) {
getTransformCheckpointListener.onResponse(DataFrameTransformCheckpoint.EMPTY);
} else {
transformsConfigManager.getTransformCheckpoint(transformId, inProgressCheckpoint, getTransformCheckpointListener);
}
transformsConfigManager.getTransformStats(transformId, transformStatsActionListener);
},
error -> {
String msg = DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS,

View File

@ -30,6 +30,7 @@ import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerPosition;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
@ -50,7 +51,6 @@ import org.elasticsearch.xpack.dataframe.transforms.pivot.AggregationResultUtils
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -72,7 +72,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
private final SchedulerEngine schedulerEngine;
private final ThreadPool threadPool;
private final DataFrameAuditor auditor;
private final Map<String, Object> initialPosition;
private final DataFrameIndexerPosition initialPosition;
private final IndexerState initialIndexerState;
private final SetOnce<ClientDataFrameIndexer> indexer = new SetOnce<>();
@ -95,7 +95,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
DataFrameTransformTaskState initialTaskState = DataFrameTransformTaskState.STOPPED;
String initialReason = null;
long initialGeneration = 0;
Map<String, Object> initialPosition = null;
DataFrameIndexerPosition initialPosition = null;
if (state != null) {
initialTaskState = state.getTaskState();
initialReason = state.getReason();
@ -383,9 +383,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
private DataFrameTransformConfig transformConfig;
private DataFrameIndexerTransformStats initialStats;
private IndexerState indexerState = IndexerState.STOPPED;
private Map<String, Object> initialPosition;
private DataFrameIndexerPosition initialPosition;
private DataFrameTransformProgress progress;
private DataFrameTransformCheckpoint inProgressOrLastCheckpoint;
private DataFrameTransformCheckpoint lastCheckpoint;
private DataFrameTransformCheckpoint nextCheckpoint;
ClientDataFrameIndexerBuilder(String transformId) {
this.transformId = transformId;
@ -404,7 +405,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
this.transformConfig,
this.fieldMappings,
this.progress,
this.inProgressOrLastCheckpoint,
this.lastCheckpoint,
this.nextCheckpoint,
parentTask);
}
@ -457,7 +459,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
return this;
}
ClientDataFrameIndexerBuilder setInitialPosition(Map<String, Object> initialPosition) {
ClientDataFrameIndexerBuilder setInitialPosition(DataFrameIndexerPosition initialPosition) {
this.initialPosition = initialPosition;
return this;
}
@ -467,8 +469,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
return this;
}
ClientDataFrameIndexerBuilder setInProgressOrLastCheckpoint(DataFrameTransformCheckpoint inProgressOrLastCheckpoint) {
this.inProgressOrLastCheckpoint = inProgressOrLastCheckpoint;
ClientDataFrameIndexerBuilder setLastCheckpoint(DataFrameTransformCheckpoint lastCheckpoint) {
this.lastCheckpoint = lastCheckpoint;
return this;
}
ClientDataFrameIndexerBuilder setNextCheckpoint(DataFrameTransformCheckpoint nextCheckpoint) {
this.nextCheckpoint = nextCheckpoint;
return this;
}
}
@ -491,14 +498,15 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
DataFrameTransformsConfigManager transformsConfigManager,
DataFrameTransformsCheckpointService transformsCheckpointService,
AtomicReference<IndexerState> initialState,
Map<String, Object> initialPosition,
DataFrameIndexerPosition initialPosition,
Client client,
DataFrameAuditor auditor,
DataFrameIndexerTransformStats initialStats,
DataFrameTransformConfig transformConfig,
Map<String, String> fieldMappings,
DataFrameTransformProgress transformProgress,
DataFrameTransformCheckpoint inProgressOrLastCheckpoint,
DataFrameTransformCheckpoint lastCheckpoint,
DataFrameTransformCheckpoint nextCheckpoint,
DataFrameTransformTask parentTask) {
super(ExceptionsHelper.requireNonNull(parentTask, "parentTask")
.threadPool
@ -510,7 +518,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
initialPosition,
initialStats == null ? new DataFrameIndexerTransformStats(transformId) : initialStats,
transformProgress,
inProgressOrLastCheckpoint);
lastCheckpoint,
nextCheckpoint);
this.transformId = ExceptionsHelper.requireNonNull(transformId, "transformId");
this.transformsConfigManager = ExceptionsHelper.requireNonNull(transformsConfigManager, "transformsConfigManager");
this.transformsCheckpointService = ExceptionsHelper.requireNonNull(transformsCheckpointService,
@ -526,8 +535,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
// Since multiple checkpoints can be executed in the task while it is running on the same node, we need to gather
// the progress here, and not in the executor.
if (initialRun()) {
ActionListener<Map<String, Set<String>>> changedBucketsListener = ActionListener.wrap(
r -> {
createCheckpoint(ActionListener.wrap(cp -> {
nextCheckpoint = cp;
TransformProgressGatherer.getInitialProgress(this.client, buildFilterQuery(), getConfig(), ActionListener.wrap(
newProgress -> {
logger.trace("[{}] reset the progress from [{}] to [{}]", transformId, progress, newProgress);
@ -540,20 +549,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
super.onStart(now, listener);
}
));
},
listener::onFailure
);
createCheckpoint(ActionListener.wrap(cp -> {
DataFrameTransformCheckpoint oldCheckpoint = inProgressOrLastCheckpoint;
if (oldCheckpoint.isEmpty()) {
// this is the 1st run, accept the new in progress checkpoint and go on
inProgressOrLastCheckpoint = cp;
changedBucketsListener.onResponse(null);
} else {
logger.debug ("Getting changes from {} to {}", oldCheckpoint.getTimeUpperBound(), cp.getTimeUpperBound());
getChangedBuckets(oldCheckpoint, cp, changedBucketsListener);
}
}, listener::onFailure));
} else {
super.onStart(now, listener);
@ -615,7 +610,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
}
@Override
protected void doSaveState(IndexerState indexerState, Map<String, Object> position, Runnable next) {
protected void doSaveState(IndexerState indexerState, DataFrameIndexerPosition position, Runnable next) {
if (indexerState.equals(IndexerState.ABORTING)) {
// If we're aborting, just invoke `next` (which is likely an onFailure handler)
next.run();
@ -698,8 +693,12 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
@Override
protected void onFinish(ActionListener<Void> listener) {
try {
// TODO: needs cleanup super is called with a listener, but listener.onResponse is called below
// super.onFinish() fortunately ignores the listener
super.onFinish(listener);
long checkpoint = transformTask.currentCheckpoint.getAndIncrement();
lastCheckpoint = nextCheckpoint;
nextCheckpoint = null;
// Reset our failure count as we have finished and may start again with a new checkpoint
failureCount.set(0);
if (checkpoint % ON_FINISH_AUDIT_FREQUENCY == 0) {
@ -756,7 +755,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
SetOnce<Boolean> changed = new SetOnce<>();
transformsCheckpointService.getCheckpoint(transformConfig, new LatchedActionListener<>(ActionListener.wrap(
cp -> {
long behind = DataFrameTransformCheckpoint.getBehind(inProgressOrLastCheckpoint, cp);
long behind = DataFrameTransformCheckpoint.getBehind(lastCheckpoint, cp);
if (behind > 0) {
logger.debug("Detected changes, dest is {} operations behind the source", behind);
changed.set(true);

View File

@ -51,6 +51,7 @@ public class Pivot {
private static final Logger logger = LogManager.getLogger(Pivot.class);
private final PivotConfig config;
private final boolean supportsIncrementalBucketUpdate;
// objects for re-using
private final CompositeAggregationBuilder cachedCompositeAggregation;
@ -58,6 +59,13 @@ public class Pivot {
public Pivot(PivotConfig config) {
this.config = config;
this.cachedCompositeAggregation = createCompositeAggregation(config);
boolean supportsIncrementalBucketUpdate = false;
for(Entry<String, SingleGroupSource> entry: config.getGroupConfig().getGroups().entrySet()) {
supportsIncrementalBucketUpdate |= entry.getValue().supportsIncrementalBucketUpdate();
}
this.supportsIncrementalBucketUpdate = supportsIncrementalBucketUpdate;
}
public void validate(Client client, SourceConfig sourceConfig, final ActionListener<Boolean> listener) {
@ -135,6 +143,10 @@ public class Pivot {
return changedBuckets;
}
public boolean supportsIncrementalBucketUpdate() {
return supportsIncrementalBucketUpdate;
}
public Stream<Map<String, Object>> extractResults(CompositeAggregation agg,
Map<String, String> fieldTypeMap,
DataFrameIndexerTransformStats dataFrameIndexerTransformStats) {

View File

@ -21,6 +21,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerPosition;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
@ -32,6 +33,8 @@ import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
import org.junit.Before;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@ -68,13 +71,13 @@ public class DataFrameIndexerTests extends ESTestCase {
Map<String, String> fieldMappings,
DataFrameAuditor auditor,
AtomicReference<IndexerState> initialState,
Map<String, Object> initialPosition,
DataFrameIndexerPosition initialPosition,
DataFrameIndexerTransformStats jobStats,
Function<SearchRequest, SearchResponse> searchFunction,
Function<BulkRequest, BulkResponse> bulkFunction,
Consumer<Exception> failureConsumer) {
super(executor, auditor, transformConfig, fieldMappings, initialState, initialPosition, jobStats,
/* DataFrameTransformProgress */ null, DataFrameTransformCheckpoint.EMPTY);
/* DataFrameTransformProgress */ null, DataFrameTransformCheckpoint.EMPTY, DataFrameTransformCheckpoint.EMPTY);
this.searchFunction = searchFunction;
this.bulkFunction = bulkFunction;
this.failureConsumer = failureConsumer;
@ -129,7 +132,7 @@ public class DataFrameIndexerTests extends ESTestCase {
}
@Override
protected void doSaveState(IndexerState state, Map<String, Object> position, Runnable next) {
protected void doSaveState(IndexerState state, DataFrameIndexerPosition position, Runnable next) {
assert state == IndexerState.STARTED || state == IndexerState.INDEXING || state == IndexerState.STOPPED;
next.run();
}
@ -198,7 +201,12 @@ public class DataFrameIndexerTests extends ESTestCase {
Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);
Consumer<Exception> failureConsumer = e -> fail("expected circuit breaker exception to be handled");
Consumer<Exception> failureConsumer = e -> {
final StringWriter sw = new StringWriter();
final PrintWriter pw = new PrintWriter(sw, true);
e.printStackTrace(pw);
fail("expected circuit breaker exception to be handled, got:" + e + " Trace: " + sw.getBuffer().toString());
};
final ExecutorService executor = Executors.newFixedThreadPool(1);
try {