[ML-DataFrame] create checkpoints on every new run (#40725)

Use the checkpoint service to create a checkpoint on every new run. Expose checkpoints stats on _stats endpoint.
This commit is contained in:
Hendrik Muhs 2019-04-10 08:00:58 +02:00
parent 46b0fdae33
commit f9018ab11b
34 changed files with 1448 additions and 147 deletions

View File

@ -0,0 +1,93 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe.transforms;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Objects;
public class DataFrameTransformCheckpointStats {
public static final ParseField TIMESTAMP_MILLIS = new ParseField("timestamp_millis");
public static final ParseField TIME_UPPER_BOUND_MILLIS = new ParseField("time_upper_bound_millis");
public static DataFrameTransformCheckpointStats EMPTY = new DataFrameTransformCheckpointStats(0L, 0L);
private final long timestampMillis;
private final long timeUpperBoundMillis;
public static final ConstructingObjectParser<DataFrameTransformCheckpointStats, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
"data_frame_transform_checkpoint_stats", true, args -> {
long timestamp = args[0] == null ? 0L : (Long) args[0];
long timeUpperBound = args[1] == null ? 0L : (Long) args[1];
return new DataFrameTransformCheckpointStats(timestamp, timeUpperBound);
});
static {
LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), TIMESTAMP_MILLIS);
LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), TIME_UPPER_BOUND_MILLIS);
}
public static DataFrameTransformCheckpointStats fromXContent(XContentParser parser) throws IOException {
return LENIENT_PARSER.parse(parser, null);
}
public DataFrameTransformCheckpointStats(final long timestampMillis, final long timeUpperBoundMillis) {
this.timestampMillis = timestampMillis;
this.timeUpperBoundMillis = timeUpperBoundMillis;
}
public DataFrameTransformCheckpointStats(StreamInput in) throws IOException {
this.timestampMillis = in.readLong();
this.timeUpperBoundMillis = in.readLong();
}
public long getTimestampMillis() {
return timestampMillis;
}
public long getTimeUpperBoundMillis() {
return timeUpperBoundMillis;
}
@Override
public int hashCode() {
return Objects.hash(timestampMillis, timeUpperBoundMillis);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
DataFrameTransformCheckpointStats that = (DataFrameTransformCheckpointStats) other;
return this.timestampMillis == that.timestampMillis && this.timeUpperBoundMillis == that.timeUpperBoundMillis;
}
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe.transforms;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import java.util.Objects;
public class DataFrameTransformCheckpointingInfo {
public static final ParseField CURRENT_CHECKPOINT = new ParseField("current");
public static final ParseField IN_PROGRESS_CHECKPOINT = new ParseField("in_progress");
public static final ParseField OPERATIONS_BEHIND = new ParseField("operations_behind");
private final DataFrameTransformCheckpointStats current;
private final DataFrameTransformCheckpointStats inProgress;
private final long operationsBehind;
private static final ConstructingObjectParser<DataFrameTransformCheckpointingInfo, Void> LENIENT_PARSER =
new ConstructingObjectParser<>(
"data_frame_transform_checkpointing_info", true, a -> {
long behind = a[2] == null ? 0L : (Long) a[2];
return new DataFrameTransformCheckpointingInfo(
a[0] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[0],
a[1] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[1], behind);
});
static {
LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> DataFrameTransformCheckpointStats.fromXContent(p), CURRENT_CHECKPOINT);
LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> DataFrameTransformCheckpointStats.fromXContent(p), IN_PROGRESS_CHECKPOINT);
LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), OPERATIONS_BEHIND);
}
public DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStats current, DataFrameTransformCheckpointStats inProgress,
long operationsBehind) {
this.current = Objects.requireNonNull(current);
this.inProgress = Objects.requireNonNull(inProgress);
this.operationsBehind = operationsBehind;
}
public DataFrameTransformCheckpointStats getCurrent() {
return current;
}
public DataFrameTransformCheckpointStats getInProgress() {
return inProgress;
}
public long getOperationsBehind() {
return operationsBehind;
}
public static DataFrameTransformCheckpointingInfo fromXContent(XContentParser p) {
return LENIENT_PARSER.apply(p, null);
}
@Override
public int hashCode() {
return Objects.hash(current, inProgress, operationsBehind);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
DataFrameTransformCheckpointingInfo that = (DataFrameTransformCheckpointingInfo) other;
return Objects.equals(this.current, that.current) &&
Objects.equals(this.inProgress, that.inProgress) &&
this.operationsBehind == that.operationsBehind;
}
}

View File

@ -42,7 +42,7 @@ public class DataFrameTransformState {
private static final ParseField INDEXER_STATE = new ParseField("indexer_state");
private static final ParseField TASK_STATE = new ParseField("task_state");
private static final ParseField CURRENT_POSITION = new ParseField("current_position");
private static final ParseField GENERATION = new ParseField("generation");
private static final ParseField CHECKPOINT = new ParseField("checkpoint");
private static final ParseField REASON = new ParseField("reason");
@SuppressWarnings("unchecked")
@ -69,7 +69,7 @@ public class DataFrameTransformState {
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, CURRENT_POSITION, ObjectParser.ValueType.VALUE_OBJECT_ARRAY);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), GENERATION);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), CHECKPOINT);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REASON);
}
@ -79,19 +79,19 @@ public class DataFrameTransformState {
private final DataFrameTransformTaskState taskState;
private final IndexerState indexerState;
private final long generation;
private final long checkpoint;
private final SortedMap<String, Object> currentPosition;
private final String reason;
public DataFrameTransformState(DataFrameTransformTaskState taskState,
IndexerState indexerState,
@Nullable Map<String, Object> position,
long generation,
long checkpoint,
@Nullable String reason) {
this.taskState = taskState;
this.indexerState = indexerState;
this.currentPosition = position == null ? null : Collections.unmodifiableSortedMap(new TreeMap<>(position));
this.generation = generation;
this.checkpoint = checkpoint;
this.reason = reason;
}
@ -108,8 +108,8 @@ public class DataFrameTransformState {
return currentPosition;
}
public long getGeneration() {
return generation;
public long getCheckpoint() {
return checkpoint;
}
@Nullable
@ -132,13 +132,13 @@ public class DataFrameTransformState {
return Objects.equals(this.taskState, that.taskState) &&
Objects.equals(this.indexerState, that.indexerState) &&
Objects.equals(this.currentPosition, that.currentPosition) &&
this.generation == that.generation &&
this.checkpoint == that.checkpoint &&
Objects.equals(this.reason, that.reason);
}
@Override
public int hashCode() {
return Objects.hash(taskState, indexerState, currentPosition, generation, reason);
return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason);
}
}

View File

@ -31,16 +31,20 @@ public class DataFrameTransformStateAndStats {
public static final ParseField ID = new ParseField("id");
public static final ParseField STATE_FIELD = new ParseField("state");
public static final ParseField STATS_FIELD = new ParseField("stats");
public static final ParseField CHECKPOINTING_INFO_FIELD = new ParseField("checkpointing");
public static final ConstructingObjectParser<DataFrameTransformStateAndStats, Void> PARSER = new ConstructingObjectParser<>(
"data_frame_transform_state_and_stats", true,
a -> new DataFrameTransformStateAndStats((String) a[0], (DataFrameTransformState) a[1], (DataFrameIndexerTransformStats) a[2]));
a -> new DataFrameTransformStateAndStats((String) a[0], (DataFrameTransformState) a[1], (DataFrameIndexerTransformStats) a[2],
(DataFrameTransformCheckpointingInfo) a[3]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), DataFrameTransformState.PARSER::apply, STATE_FIELD);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> DataFrameIndexerTransformStats.fromXContent(p),
STATS_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> DataFrameTransformCheckpointingInfo.fromXContent(p), CHECKPOINTING_INFO_FIELD);
}
public static DataFrameTransformStateAndStats fromXContent(XContentParser parser) throws IOException {
@ -50,11 +54,14 @@ public class DataFrameTransformStateAndStats {
private final String id;
private final DataFrameTransformState transformState;
private final DataFrameIndexerTransformStats transformStats;
private final DataFrameTransformCheckpointingInfo checkpointingInfo;
public DataFrameTransformStateAndStats(String id, DataFrameTransformState state, DataFrameIndexerTransformStats stats) {
public DataFrameTransformStateAndStats(String id, DataFrameTransformState state, DataFrameIndexerTransformStats stats,
DataFrameTransformCheckpointingInfo checkpointingInfo) {
this.id = id;
this.transformState = state;
this.transformStats = stats;
this.checkpointingInfo = checkpointingInfo;
}
public String getId() {
@ -69,9 +76,13 @@ public class DataFrameTransformStateAndStats {
return transformState;
}
public DataFrameTransformCheckpointingInfo getCheckpointingInfo() {
return checkpointingInfo;
}
@Override
public int hashCode() {
return Objects.hash(id, transformState, transformStats);
return Objects.hash(id, transformState, transformStats, checkpointingInfo);
}
@Override
@ -87,6 +98,7 @@ public class DataFrameTransformStateAndStats {
DataFrameTransformStateAndStats that = (DataFrameTransformStateAndStats) other;
return Objects.equals(this.id, that.id) && Objects.equals(this.transformState, that.transformState)
&& Objects.equals(this.transformStats, that.transformStats);
&& Objects.equals(this.transformStats, that.transformStats)
&& Objects.equals(this.checkpointingInfo, that.checkpointingInfo);
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe.transforms;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
public class DataFrameTransformCheckpointStatsTests extends ESTestCase {
public void testFromXContent() throws IOException {
xContentTester(this::createParser,
DataFrameTransformCheckpointStatsTests::randomDataFrameTransformCheckpointStats,
DataFrameTransformCheckpointStatsTests::toXContent,
DataFrameTransformCheckpointStats::fromXContent)
.supportsUnknownFields(true)
.test();
}
public static DataFrameTransformCheckpointStats randomDataFrameTransformCheckpointStats() {
return new DataFrameTransformCheckpointStats(randomLongBetween(1, 1_000_000), randomLongBetween(0, 1_000_000));
}
public static void toXContent(DataFrameTransformCheckpointStats stats, XContentBuilder builder) throws IOException {
builder.startObject();
builder.field("timestamp_millis", stats.getTimestampMillis());
builder.field("time_upper_bound_millis", stats.getTimeUpperBoundMillis());
builder.endObject();
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.dataframe.transforms;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
public class DataFrameTransformCheckpointingInfoTests extends ESTestCase {
public void testFromXContent() throws IOException {
xContentTester(this::createParser,
DataFrameTransformCheckpointingInfoTests::randomDataFrameTransformCheckpointingInfo,
DataFrameTransformCheckpointingInfoTests::toXContent,
DataFrameTransformCheckpointingInfo::fromXContent)
.supportsUnknownFields(false)
.test();
}
public static DataFrameTransformCheckpointingInfo randomDataFrameTransformCheckpointingInfo() {
return new DataFrameTransformCheckpointingInfo(
DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(),
DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(),
randomLongBetween(0, 10000));
}
public static void toXContent(DataFrameTransformCheckpointingInfo info, XContentBuilder builder) throws IOException {
builder.startObject();
if (info.getCurrent().getTimestampMillis() > 0) {
builder.field("current");
DataFrameTransformCheckpointStatsTests.toXContent(info.getCurrent(), builder);
}
if (info.getInProgress().getTimestampMillis() > 0) {
builder.field("in_progress");
DataFrameTransformCheckpointStatsTests.toXContent(info.getInProgress(), builder);
}
builder.field("operations_behind", info.getOperationsBehind());
builder.endObject();
}
}

View File

@ -41,7 +41,8 @@ public class DataFrameTransformStateAndStatsTests extends ESTestCase {
public static DataFrameTransformStateAndStats randomInstance() {
return new DataFrameTransformStateAndStats(randomAlphaOfLength(10),
DataFrameTransformStateTests.randomDataFrameTransformState(),
DataFrameIndexerTransformStatsTests.randomStats());
DataFrameIndexerTransformStatsTests.randomStats(),
DataFrameTransformCheckpointingInfoTests.randomDataFrameTransformCheckpointingInfo());
}
public static void toXContent(DataFrameTransformStateAndStats stateAndStats, XContentBuilder builder) throws IOException {
@ -51,6 +52,8 @@ public class DataFrameTransformStateAndStatsTests extends ESTestCase {
DataFrameTransformStateTests.toXContent(stateAndStats.getTransformState(), builder);
builder.field(DataFrameTransformStateAndStats.STATS_FIELD.getPreferredName());
DataFrameIndexerTransformStatsTests.toXContent(stateAndStats.getTransformStats(), builder);
builder.field(DataFrameTransformStateAndStats.CHECKPOINTING_INFO_FIELD.getPreferredName());
DataFrameTransformCheckpointingInfoTests.toXContent(stateAndStats.getCheckpointingInfo(), builder);
builder.endObject();
}
}

View File

@ -56,7 +56,7 @@ public class DataFrameTransformStateTests extends ESTestCase {
if (state.getPosition() != null) {
builder.field("current_position", state.getPosition());
}
builder.field("generation", state.getGeneration());
builder.field("checkpoint", state.getCheckpoint());
if (state.getReason() != null) {
builder.field("reason", state.getReason());
}

View File

@ -28,6 +28,17 @@ public final class DataFrameField {
public static final ParseField DESTINATION = new ParseField("dest");
public static final ParseField FORCE = new ParseField("force");
/**
* Fields for checkpointing
*/
// the timestamp of the checkpoint, mandatory
public static final ParseField TIMESTAMP_MILLIS = new ParseField("timestamp_millis");
public static final ParseField TIMESTAMP = new ParseField("timestamp");
// checkpoint for for time based sync
// TODO: consider a lower bound for usecases where you want to transform on a window of a stream
public static final ParseField TIME_UPPER_BOUND_MILLIS = new ParseField("time_upper_bound_millis");
public static final ParseField TIME_UPPER_BOUND = new ParseField("time_upper_bound");
// common strings
public static final String TASK_NAME = "data_frame/transforms";
public static final String REST_BASE_PATH = "/_data_frame/";

View File

@ -0,0 +1,108 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import java.io.IOException;
import java.util.Objects;
/**
* Checkpoint stats data for 1 checkpoint
*
* This is the user-facing side of DataFrameTransformCheckpoint, containing only the stats to be exposed.
*/
public class DataFrameTransformCheckpointStats implements Writeable, ToXContentObject {
public static DataFrameTransformCheckpointStats EMPTY = new DataFrameTransformCheckpointStats(0L, 0L);
private final long timestampMillis;
private final long timeUpperBoundMillis;
private static final ConstructingObjectParser<DataFrameTransformCheckpointStats, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
"data_frame_transform_checkpoint_stats", true, args -> {
long timestamp = args[0] == null ? 0L : (Long) args[0];
long timeUpperBound = args[1] == null ? 0L : (Long) args[1];
return new DataFrameTransformCheckpointStats(timestamp, timeUpperBound);
});
static {
LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), DataFrameField.TIMESTAMP_MILLIS);
LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), DataFrameField.TIME_UPPER_BOUND_MILLIS);
}
public DataFrameTransformCheckpointStats(final long timestampMillis, final long timeUpperBoundMillis) {
this.timestampMillis = timestampMillis;
this.timeUpperBoundMillis = timeUpperBoundMillis;
}
public DataFrameTransformCheckpointStats(StreamInput in) throws IOException {
this.timestampMillis = in.readLong();
this.timeUpperBoundMillis = in.readLong();
}
public long getTimestampMillis() {
return timestampMillis;
}
public long getTimeUpperBoundMillis() {
return timeUpperBoundMillis;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.timeField(DataFrameField.TIMESTAMP_MILLIS.getPreferredName(), DataFrameField.TIMESTAMP.getPreferredName(),
getTimestampMillis());
if (timeUpperBoundMillis > 0) {
builder.timeField(DataFrameField.TIME_UPPER_BOUND_MILLIS.getPreferredName(), DataFrameField.TIME_UPPER_BOUND.getPreferredName(),
timeUpperBoundMillis);
}
builder.endObject();
return builder;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(timestampMillis);
out.writeLong(timeUpperBoundMillis);
}
@Override
public int hashCode() {
return Objects.hash(timestampMillis, timeUpperBoundMillis);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
DataFrameTransformCheckpointStats that = (DataFrameTransformCheckpointStats) other;
return this.timestampMillis == that.timestampMillis &&
this.timeUpperBoundMillis == that.timeUpperBoundMillis;
}
public static DataFrameTransformCheckpointStats fromXContent(XContentParser p) {
return LENIENT_PARSER.apply(p, null);
}
}

View File

@ -0,0 +1,141 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Objects;
/**
* Holds information about checkpointing regarding
* - the current checkpoint
* - the in progress checkpoint
* - the current state of the source
*/
public class DataFrameTransformCheckpointingInfo implements Writeable, ToXContentObject {
public static DataFrameTransformCheckpointingInfo EMPTY = new DataFrameTransformCheckpointingInfo(
DataFrameTransformCheckpointStats.EMPTY,
DataFrameTransformCheckpointStats.EMPTY,
0L);
public static final ParseField CURRENT_CHECKPOINT = new ParseField("current");
public static final ParseField IN_PROGRESS_CHECKPOINT = new ParseField("in_progress");
public static final ParseField OPERATIONS_BEHIND = new ParseField("operations_behind");
private final DataFrameTransformCheckpointStats current;
private final DataFrameTransformCheckpointStats inProgress;
private final long operationsBehind;
private static final ConstructingObjectParser<DataFrameTransformCheckpointingInfo, Void> LENIENT_PARSER =
new ConstructingObjectParser<>(
"data_frame_transform_checkpointing_info", true, a -> {
long behind = a[2] == null ? 0L : (Long) a[2];
return new DataFrameTransformCheckpointingInfo(
a[0] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[0],
a[1] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[1], behind);
});
static {
LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> DataFrameTransformCheckpointStats.fromXContent(p), CURRENT_CHECKPOINT);
LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> DataFrameTransformCheckpointStats.fromXContent(p), IN_PROGRESS_CHECKPOINT);
LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), OPERATIONS_BEHIND);
}
/**
* Create checkpoint stats object with checkpoint information about the current and in progress checkpoint as well as the current state
* of source.
*
* @param current stats of the current checkpoint
* @param inProgress stats of the in progress checkpoint
* @param operationsBehind counter of operations the current checkpoint is behind source
*/
public DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStats current, DataFrameTransformCheckpointStats inProgress,
long operationsBehind) {
this.current = Objects.requireNonNull(current);
this.inProgress = Objects.requireNonNull(inProgress);
this.operationsBehind = operationsBehind;
}
public DataFrameTransformCheckpointingInfo(StreamInput in) throws IOException {
current = new DataFrameTransformCheckpointStats(in);
inProgress = new DataFrameTransformCheckpointStats(in);
operationsBehind = in.readLong();
}
public DataFrameTransformCheckpointStats getCurrent() {
return current;
}
public DataFrameTransformCheckpointStats getInProgress() {
return inProgress;
}
public long getOperationsBehind() {
return operationsBehind;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (current.getTimestampMillis() > 0) {
builder.field(CURRENT_CHECKPOINT.getPreferredName(), current);
}
if (inProgress.getTimestampMillis() > 0) {
builder.field(IN_PROGRESS_CHECKPOINT.getPreferredName(), inProgress);
}
builder.field(OPERATIONS_BEHIND.getPreferredName(), operationsBehind);
builder.endObject();
return builder;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
current.writeTo(out);
inProgress.writeTo(out);
out.writeLong(operationsBehind);
}
public static DataFrameTransformCheckpointingInfo fromXContent(XContentParser p) {
return LENIENT_PARSER.apply(p, null);
}
@Override
public int hashCode() {
return Objects.hash(current, inProgress, operationsBehind);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
DataFrameTransformCheckpointingInfo that = (DataFrameTransformCheckpointingInfo) other;
return Objects.equals(this.current, that.current) &&
Objects.equals(this.inProgress, that.inProgress) &&
this.operationsBehind == that.operationsBehind;
}
}

View File

@ -35,7 +35,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
private final DataFrameTransformTaskState taskState;
private final IndexerState indexerState;
private final long generation;
private final long checkpoint;
@Nullable
private final SortedMap<String, Object> currentPosition;
@ -45,7 +45,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
private static final ParseField TASK_STATE = new ParseField("task_state");
private static final ParseField INDEXER_STATE = new ParseField("indexer_state");
private static final ParseField CURRENT_POSITION = new ParseField("current_position");
private static final ParseField GENERATION = new ParseField("generation");
private static final ParseField CHECKPOINT = new ParseField("checkpoint");
private static final ParseField REASON = new ParseField("reason");
@SuppressWarnings("unchecked")
@ -80,19 +80,19 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, CURRENT_POSITION, ObjectParser.ValueType.VALUE_OBJECT_ARRAY);
PARSER.declareLong(constructorArg(), GENERATION);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), CHECKPOINT);
PARSER.declareString(optionalConstructorArg(), REASON);
}
public DataFrameTransformState(DataFrameTransformTaskState taskState,
IndexerState indexerState,
@Nullable Map<String, Object> position,
long generation,
long checkpoint,
@Nullable String reason) {
this.taskState = taskState;
this.indexerState = indexerState;
this.currentPosition = position == null ? null : Collections.unmodifiableSortedMap(new TreeMap<>(position));
this.generation = generation;
this.checkpoint = checkpoint;
this.reason = reason;
}
@ -100,7 +100,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
taskState = DataFrameTransformTaskState.fromStream(in);
indexerState = IndexerState.fromStream(in);
currentPosition = in.readBoolean() ? Collections.unmodifiableSortedMap(new TreeMap<>(in.readMap())) : null;
generation = in.readLong();
checkpoint = in.readLong();
reason = in.readOptionalString();
}
@ -116,8 +116,17 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
return currentPosition;
}
public long getGeneration() {
return generation;
public long getCheckpoint() {
return checkpoint;
}
/**
* Get the in-progress checkpoint
*
* @return checkpoint in progress or 0 if task/indexer is not active
*/
public long getInProgressCheckpoint() {
return indexerState.equals(IndexerState.INDEXING) ? checkpoint + 1L : 0;
}
public String getReason() {
@ -140,7 +149,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
if (currentPosition != null) {
builder.field(CURRENT_POSITION.getPreferredName(), currentPosition);
}
builder.field(GENERATION.getPreferredName(), generation);
builder.field(CHECKPOINT.getPreferredName(), checkpoint);
if (reason != null) {
builder.field(REASON.getPreferredName(), reason);
}
@ -161,7 +170,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
if (currentPosition != null) {
out.writeMap(currentPosition);
}
out.writeLong(generation);
out.writeLong(checkpoint);
out.writeOptionalString(reason);
}
@ -180,13 +189,13 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
return Objects.equals(this.taskState, that.taskState) &&
Objects.equals(this.indexerState, that.indexerState) &&
Objects.equals(this.currentPosition, that.currentPosition) &&
this.generation == that.generation &&
this.checkpoint == that.checkpoint &&
Objects.equals(this.reason, that.reason);
}
@Override
public int hashCode() {
return Objects.hash(taskState, indexerState, currentPosition, generation, reason);
return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason);
}
@Override

View File

@ -24,20 +24,27 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
private static final String NAME = "data_frame_transform_state_and_stats";
public static final ParseField STATE_FIELD = new ParseField("state");
public static final ParseField CHECKPOINTING_INFO_FIELD = new ParseField("checkpointing");
private final String id;
private final DataFrameTransformState transformState;
private final DataFrameIndexerTransformStats transformStats;
private final DataFrameTransformCheckpointingInfo checkpointingInfo;
public static final ConstructingObjectParser<DataFrameTransformStateAndStats, Void> PARSER = new ConstructingObjectParser<>(
NAME, true,
a -> new DataFrameTransformStateAndStats((String) a[0], (DataFrameTransformState) a[1], (DataFrameIndexerTransformStats) a[2]));
a -> new DataFrameTransformStateAndStats((String) a[0],
(DataFrameTransformState) a[1],
(DataFrameIndexerTransformStats) a[2],
(DataFrameTransformCheckpointingInfo) a[3]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameField.ID);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), DataFrameTransformState.PARSER::apply, STATE_FIELD);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> DataFrameIndexerTransformStats.fromXContent(p),
DataFrameField.STATS_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> DataFrameTransformCheckpointingInfo.fromXContent(p), CHECKPOINTING_INFO_FIELD);
}
public static DataFrameTransformStateAndStats initialStateAndStats(String id) {
@ -47,27 +54,32 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
public static DataFrameTransformStateAndStats initialStateAndStats(String id, DataFrameIndexerTransformStats indexerTransformStats) {
return new DataFrameTransformStateAndStats(id,
new DataFrameTransformState(DataFrameTransformTaskState.STOPPED, IndexerState.STOPPED, null, 0L, null),
indexerTransformStats);
indexerTransformStats,
DataFrameTransformCheckpointingInfo.EMPTY);
}
public DataFrameTransformStateAndStats(String id, DataFrameTransformState state, DataFrameIndexerTransformStats stats) {
public DataFrameTransformStateAndStats(String id, DataFrameTransformState state, DataFrameIndexerTransformStats stats,
DataFrameTransformCheckpointingInfo checkpointingInfo) {
this.id = Objects.requireNonNull(id);
this.transformState = Objects.requireNonNull(state);
this.transformStats = Objects.requireNonNull(stats);
this.checkpointingInfo = Objects.requireNonNull(checkpointingInfo);
}
public DataFrameTransformStateAndStats(StreamInput in) throws IOException {
this.id = in.readString();
this.transformState = new DataFrameTransformState(in);
this.transformStats = new DataFrameIndexerTransformStats(in);
this.checkpointingInfo = new DataFrameTransformCheckpointingInfo(in);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DataFrameField.ID.getPreferredName(), id);
builder.field(STATE_FIELD.getPreferredName(), transformState);
builder.field(STATE_FIELD.getPreferredName(), transformState, params);
builder.field(DataFrameField.STATS_FIELD.getPreferredName(), transformStats, params);
builder.field(CHECKPOINTING_INFO_FIELD.getPreferredName(), checkpointingInfo, params);
builder.endObject();
return builder;
}
@ -77,11 +89,12 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
out.writeString(id);
transformState.writeTo(out);
transformStats.writeTo(out);
checkpointingInfo.writeTo(out);
}
@Override
public int hashCode() {
return Objects.hash(id, transformState, transformStats);
return Objects.hash(id, transformState, transformStats, checkpointingInfo);
}
@Override
@ -97,7 +110,8 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
DataFrameTransformStateAndStats that = (DataFrameTransformStateAndStats) other;
return Objects.equals(this.id, that.id) && Objects.equals(this.transformState, that.transformState)
&& Objects.equals(this.transformStats, that.transformStats);
&& Objects.equals(this.transformStats, that.transformStats)
&& Objects.equals(this.checkpointingInfo, that.checkpointingInfo);
}
public String getId() {
@ -112,6 +126,10 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
return transformState;
}
public DataFrameTransformCheckpointingInfo getCheckpointingInfo() {
return checkpointingInfo;
}
@Override
public String toString() {
return Strings.toString(this);

View File

@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
public class DataFrameTransformCheckpointStatsTests extends AbstractSerializingDataFrameTestCase<DataFrameTransformCheckpointStats>
{
public static DataFrameTransformCheckpointStats randomDataFrameTransformCheckpointStats() {
return new DataFrameTransformCheckpointStats(randomNonNegativeLong(), randomNonNegativeLong());
}
@Override
protected DataFrameTransformCheckpointStats doParseInstance(XContentParser parser) throws IOException {
return DataFrameTransformCheckpointStats.fromXContent(parser);
}
@Override
protected DataFrameTransformCheckpointStats createTestInstance() {
return randomDataFrameTransformCheckpointStats();
}
@Override
protected Reader<DataFrameTransformCheckpointStats> instanceReader() {
return DataFrameTransformCheckpointStats::new;
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
public class DataFrameTransformCheckpointingInfoTests extends AbstractSerializingDataFrameTestCase<DataFrameTransformCheckpointingInfo> {
public static DataFrameTransformCheckpointingInfo randomDataFrameTransformCheckpointingInfo() {
return new DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(),
DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats(), randomNonNegativeLong());
}
@Override
protected DataFrameTransformCheckpointingInfo doParseInstance(XContentParser parser) throws IOException {
return DataFrameTransformCheckpointingInfo.fromXContent(parser);
}
@Override
protected DataFrameTransformCheckpointingInfo createTestInstance() {
return randomDataFrameTransformCheckpointingInfo();
}
@Override
protected Reader<DataFrameTransformCheckpointingInfo> instanceReader() {
return DataFrameTransformCheckpointingInfo::new;
}
}

View File

@ -22,7 +22,8 @@ public class DataFrameTransformStateAndStatsTests extends AbstractSerializingDat
public static DataFrameTransformStateAndStats randomDataFrameTransformStateAndStats(String id) {
return new DataFrameTransformStateAndStats(id,
DataFrameTransformStateTests.randomDataFrameTransformState(),
DataFrameIndexerTransformStatsTests.randomStats(id));
DataFrameIndexerTransformStatsTests.randomStats(id),
DataFrameTransformCheckpointingInfoTests.randomDataFrameTransformCheckpointingInfo());
}
public static DataFrameTransformStateAndStats randomDataFrameTransformStateAndStats() {

View File

@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.transforms.hlrc;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.protocol.AbstractHlrcXContentTestCase;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStatsTests;
import java.io.IOException;
public class DataFrameTransformCheckpointStatsHlrcTests extends AbstractHlrcXContentTestCase<
DataFrameTransformCheckpointStats,
org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats> {
public static DataFrameTransformCheckpointStats fromHlrc(
org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats instance) {
return new DataFrameTransformCheckpointStats(instance.getTimestampMillis(), instance.getTimeUpperBoundMillis());
}
@Override
public org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats doHlrcParseInstance(XContentParser parser)
throws IOException {
return org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats.fromXContent(parser);
}
@Override
public DataFrameTransformCheckpointStats convertHlrcToInternal(
org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointStats instance) {
return fromHlrc(instance);
}
@Override
protected DataFrameTransformCheckpointStats createTestInstance() {
return DataFrameTransformCheckpointStatsTests.randomDataFrameTransformCheckpointStats();
}
@Override
protected DataFrameTransformCheckpointStats doParseInstance(XContentParser parser) throws IOException {
return DataFrameTransformCheckpointStats.fromXContent(parser);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
}

View File

@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.transforms.hlrc;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.protocol.AbstractHlrcXContentTestCase;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfoTests;
import java.io.IOException;
public class DataFrameTransformCheckpointingInfoHlrcTests extends AbstractHlrcXContentTestCase<
DataFrameTransformCheckpointingInfo,
org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointingInfo> {
public static DataFrameTransformCheckpointingInfo fromHlrc(
org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointingInfo instance) {
return new DataFrameTransformCheckpointingInfo(
DataFrameTransformCheckpointStatsHlrcTests.fromHlrc(instance.getCurrent()),
DataFrameTransformCheckpointStatsHlrcTests.fromHlrc(instance.getInProgress()),
instance.getOperationsBehind());
}
@Override
public org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointingInfo doHlrcParseInstance(XContentParser parser)
throws IOException {
return org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointingInfo.fromXContent(parser);
}
@Override
public DataFrameTransformCheckpointingInfo convertHlrcToInternal(
org.elasticsearch.client.dataframe.transforms.DataFrameTransformCheckpointingInfo instance) {
return fromHlrc(instance);
}
@Override
protected DataFrameTransformCheckpointingInfo createTestInstance() {
return DataFrameTransformCheckpointingInfoTests.randomDataFrameTransformCheckpointingInfo();
}
@Override
protected DataFrameTransformCheckpointingInfo doParseInstance(XContentParser parser) throws IOException {
return DataFrameTransformCheckpointingInfo.fromXContent(parser);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
}

View File

@ -29,7 +29,8 @@ public class DataFrameTransformStateAndStatsHlrcTests extends AbstractHlrcXConte
org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStats instance) {
return new DataFrameTransformStateAndStats(instance.getId(),
DataFrameTransformStateHlrcTests.fromHlrc(instance.getTransformState()),
DataFrameIndexerTransformStatsHlrcTests.fromHlrc(instance.getTransformStats()));
DataFrameIndexerTransformStatsHlrcTests.fromHlrc(instance.getTransformStats()),
DataFrameTransformCheckpointingInfoHlrcTests.fromHlrc(instance.getCheckpointingInfo()));
}
@Override

View File

@ -21,7 +21,7 @@ public class DataFrameTransformStateHlrcTests extends AbstractHlrcXContentTestCa
public static DataFrameTransformState fromHlrc(org.elasticsearch.client.dataframe.transforms.DataFrameTransformState instance) {
return new DataFrameTransformState(DataFrameTransformTaskState.fromString(instance.getTaskState().value()),
IndexerState.fromString(instance.getIndexerState().value()), instance.getPosition(), instance.getGeneration(),
IndexerState.fromString(instance.getIndexerState().value()), instance.getPosition(), instance.getCheckpoint(),
instance.getReason());
}

View File

@ -196,7 +196,7 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
// start the transform
startDataframeTransform(transformId, false, authHeader);
// wait until the dataframe has been created and all data is available
waitForDataFrameGeneration(transformId);
waitForDataFrameCheckpoint(transformId);
refreshIndex(dataFrameIndex);
}
@ -212,10 +212,10 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
return request;
}
void waitForDataFrameGeneration(String transformId) throws Exception {
void waitForDataFrameCheckpoint(String transformId) throws Exception {
assertBusy(() -> {
long generation = getDataFrameGeneration(transformId);
assertEquals(1, generation);
long checkpoint = getDataFrameCheckpoint(transformId);
assertEquals(1, checkpoint);
}, 30, TimeUnit.SECONDS);
}
@ -321,11 +321,11 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
}
}
static int getDataFrameGeneration(String transformId) throws IOException {
static int getDataFrameCheckpoint(String transformId) throws IOException {
Response statsResponse = client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + transformId + "/_stats"));
Map<?, ?> transformStatsAsMap = (Map<?, ?>) ((List<?>) entityAsMap(statsResponse).get("transforms")).get(0);
return (int) XContentMapValues.extractValue("state.generation", transformStatsAsMap);
return (int) XContentMapValues.extractValue("state.checkpoint", transformStatsAsMap);
}
protected void setupDataAccessRole(String role, String... indices) throws IOException {

View File

@ -60,7 +60,7 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
// Force start the data frame to indicate failure correction
startDataframeTransform(transformId, true);
// Wait for data to be indexed appropriately and refresh for search
waitForDataFrameGeneration(transformId);
waitForDataFrameCheckpoint(transformId);
refreshIndex(dataFrameIndex);
// Verify that we have started and that our reason is cleared

View File

@ -190,7 +190,7 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
DATA_FRAME_ORIGIN,
DataFrameAuditMessage.builder()));
dataFrameTransformsConfigManager.set(new DataFrameTransformsConfigManager(client, xContentRegistry));
dataFrameTransformsCheckpointService.set(new DataFrameTransformsCheckpointService(client));
dataFrameTransformsCheckpointService.set(new DataFrameTransformsCheckpointService(client, dataFrameTransformsConfigManager.get()));
return Arrays.asList(dataFrameTransformsConfigManager.get(), dataFrameAuditor.get(), dataFrameTransformsCheckpointService.get());
}

View File

@ -45,7 +45,9 @@ import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStats
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Request;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Response;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
@ -71,14 +73,18 @@ public class TransportGetDataFrameTransformsStatsAction extends
private final Client client;
private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
private final DataFrameTransformsCheckpointService transformsCheckpointService;
@Inject
public TransportGetDataFrameTransformsStatsAction(TransportService transportService, ActionFilters actionFilters,
ClusterService clusterService, Client client,
DataFrameTransformsConfigManager dataFrameTransformsConfigManager) {
DataFrameTransformsConfigManager dataFrameTransformsConfigManager,
DataFrameTransformsCheckpointService transformsCheckpointService) {
super(GetDataFrameTransformsStatsAction.NAME, clusterService, transportService, actionFilters, Request::new, Response::new,
Response::new, ThreadPool.Names.SAME);
this.client = client;
this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
this.transformsCheckpointService = transformsCheckpointService;
}
@Override
@ -93,16 +99,22 @@ public class TransportGetDataFrameTransformsStatsAction extends
@Override
protected void taskOperation(Request request, DataFrameTransformTask task, ActionListener<Response> listener) {
List<DataFrameTransformStateAndStats> transformsStateAndStats = Collections.emptyList();
// Little extra insurance, make sure we only return transforms that aren't cancelled
if (task.isCancelled() == false) {
DataFrameTransformStateAndStats transformStateAndStats = new DataFrameTransformStateAndStats(task.getTransformId(),
task.getState(), task.getStats());
transformsStateAndStats = Collections.singletonList(transformStateAndStats);
transformsCheckpointService.getCheckpointStats(task.getTransformId(), task.getCheckpoint(), task.getInProgressCheckpoint(),
ActionListener.wrap(checkpointStats -> {
listener.onResponse(new Response(Collections.singletonList(
new DataFrameTransformStateAndStats(task.getTransformId(), task.getState(), task.getStats(), checkpointStats))));
}, e -> {
listener.onResponse(new Response(
Collections.singletonList(new DataFrameTransformStateAndStats(task.getTransformId(), task.getState(),
task.getStats(), DataFrameTransformCheckpointingInfo.EMPTY)),
Collections.emptyList(),
Collections.singletonList(new ElasticsearchException("Failed to retrieve checkpointing info", e))));
}));
} else {
listener.onResponse(new Response(Collections.emptyList()));
}
listener.onResponse(new Response(transformsStateAndStats));
}
@Override

View File

@ -9,6 +9,7 @@ package org.elasticsearch.xpack.dataframe.checkpoint;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
@ -16,14 +17,19 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.client.Client;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformCheckpoint;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* DataFrameTransform Checkpoint Service
@ -35,12 +41,24 @@ import java.util.TreeMap;
*/
public class DataFrameTransformsCheckpointService {
private class Checkpoints {
DataFrameTransformCheckpoint currentCheckpoint = DataFrameTransformCheckpoint.EMPTY;
DataFrameTransformCheckpoint inProgressCheckpoint = DataFrameTransformCheckpoint.EMPTY;
DataFrameTransformCheckpoint sourceCheckpoint = DataFrameTransformCheckpoint.EMPTY;
}
private static final Logger logger = LogManager.getLogger(DataFrameTransformsCheckpointService.class);
private final Client client;
// timeout for retrieving checkpoint information
private static final int CHECKPOINT_STATS_TIMEOUT_SECONDS = 5;
public DataFrameTransformsCheckpointService(final Client client) {
private final Client client;
private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
public DataFrameTransformsCheckpointService(final Client client,
final DataFrameTransformsConfigManager dataFrameTransformsConfigManager) {
this.client = client;
this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
}
/**
@ -84,6 +102,7 @@ public class DataFrameTransformsCheckpointService {
Map<String, long[]> checkpointsByIndex = extractIndexCheckPoints(response.getShards(), userIndices);
DataFrameTransformCheckpoint checkpointDoc = new DataFrameTransformCheckpoint(transformConfig.getId(),
timestamp, checkpoint, checkpointsByIndex, timeUpperBound);
listener.onResponse(checkpointDoc);
}, IndicesStatsRequestException -> {
@ -96,6 +115,90 @@ public class DataFrameTransformsCheckpointService {
}
/**
* Get checkpointing stats for a data frame
*
* Implementation details:
* - fires up to 3 requests _in parallel_ rather than cascading them
*
* @param transformId The data frame task
* @param currentCheckpoint the current checkpoint
* @param inProgressCheckpoint in progress checkpoint
* @param listener listener to retrieve the result
*/
public void getCheckpointStats(
String transformId,
long currentCheckpoint,
long inProgressCheckpoint,
ActionListener<DataFrameTransformCheckpointingInfo> listener) {
// process in parallel: current checkpoint, in-progress checkpoint, current state of the source
CountDownLatch latch = new CountDownLatch(3);
// ensure listener is called exactly once
final ActionListener<DataFrameTransformCheckpointingInfo> wrappedListener = ActionListener.notifyOnce(listener);
// holder structure for writing the results of the 3 parallel tasks
Checkpoints checkpoints = new Checkpoints();
// get the current checkpoint
if (currentCheckpoint != 0) {
dataFrameTransformsConfigManager.getTransformCheckpoint(transformId, currentCheckpoint,
new LatchedActionListener<>(ActionListener.wrap(checkpoint -> checkpoints.currentCheckpoint = checkpoint, e -> {
logger.debug("Failed to retrieve checkpoint [" + currentCheckpoint + "] for data frame []" + transformId, e);
wrappedListener
.onFailure(new CheckpointException("Failed to retrieve current checkpoint [" + currentCheckpoint + "]", e));
}), latch));
} else {
latch.countDown();
}
// get the in-progress checkpoint
if (inProgressCheckpoint != 0) {
dataFrameTransformsConfigManager.getTransformCheckpoint(transformId, inProgressCheckpoint,
new LatchedActionListener<>(ActionListener.wrap(checkpoint -> checkpoints.inProgressCheckpoint = checkpoint, e -> {
logger.debug("Failed to retrieve in progress checkpoint [" + inProgressCheckpoint + "] for data frame ["
+ transformId + "]", e);
wrappedListener.onFailure(
new CheckpointException("Failed to retrieve in progress checkpoint [" + inProgressCheckpoint + "]", e));
}), latch));
} else {
latch.countDown();
}
// get the current state
dataFrameTransformsConfigManager.getTransformConfiguration(transformId, ActionListener.wrap(transformConfig -> {
getCheckpoint(transformConfig,
new LatchedActionListener<>(ActionListener.wrap(checkpoint -> checkpoints.sourceCheckpoint = checkpoint, e2 -> {
logger.debug("Failed to retrieve actual checkpoint for data frame [" + transformId + "]", e2);
wrappedListener.onFailure(new CheckpointException("Failed to retrieve actual checkpoint", e2));
}), latch));
}, e -> {
logger.warn("Failed to retrieve configuration for data frame [" + transformId + "]", e);
wrappedListener.onFailure(new CheckpointException("Failed to retrieve configuration", e));
latch.countDown();
}));
try {
if (latch.await(CHECKPOINT_STATS_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
logger.debug("Retrieval of checkpoint information succeeded for data frame [" + transformId + "]");
wrappedListener.onResponse(new DataFrameTransformCheckpointingInfo(
new DataFrameTransformCheckpointStats(checkpoints.currentCheckpoint.getTimestamp(),
checkpoints.currentCheckpoint.getTimeUpperBound()),
new DataFrameTransformCheckpointStats(checkpoints.inProgressCheckpoint.getTimestamp(),
checkpoints.inProgressCheckpoint.getTimeUpperBound()),
DataFrameTransformCheckpoint.getBehind(checkpoints.currentCheckpoint, checkpoints.sourceCheckpoint)));
} else {
// timed out
logger.debug("Retrieval of checkpoint information has timed out for data frame [" + transformId + "]");
wrappedListener.onFailure(new CheckpointException("Retrieval of checkpoint information has timed out"));
}
} catch (InterruptedException e) {
logger.debug("Failed to retrieve checkpoints for data frame [" + transformId + "]", e);
wrappedListener.onFailure(new CheckpointException("Failure during checkpoint info retrieval", e));
}
}
static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<String> userIndices) {
Map<String, TreeMap<Integer, Long>> checkpointsByIndex = new TreeMap<>();

View File

@ -45,8 +45,8 @@ import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformCheckpoint;
import java.io.IOException;
import java.io.InputStream;
@ -148,6 +148,7 @@ public class DataFrameTransformsConfigManager {
if (getResponse.isExists() == false) {
// do not fail if checkpoint does not exist but return an empty checkpoint
logger.trace("found no checkpoint for transform [" + transformId + "], returning empty checkpoint");
resultListener.onResponse(DataFrameTransformCheckpoint.EMPTY);
return;
}

View File

@ -51,12 +51,23 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
protected abstract Map<String, String> getFieldMappings();
/**
* Request a checkpoint
*/
protected abstract void createCheckpoint(ActionListener<Void> listener);
@Override
protected void onStart(long now, ActionListener<Void> listener) {
try {
QueryBuilder queryBuilder = getConfig().getSource().getQueryConfig().getQuery();
pivot = new Pivot(getConfig().getSource().getIndex(), queryBuilder, getConfig().getPivotConfig());
listener.onResponse(null);
// if run for the 1st time, create checkpoint
if (getPosition() == null) {
createCheckpoint(listener);
} else {
listener.onResponse(null);
}
} catch (Exception e) {
listener.onFailure(e);
}

View File

@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.transforms;
package org.elasticsearch.xpack.dataframe.transforms;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.ParseField;
@ -35,7 +35,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
* The fields:
*
* timestamp the timestamp when this document has been created
* checkpoint the checkpoint number, incremented for every checkpoint
* checkpoint the checkpoint number, incremented for every checkpoint, if -1 this is a non persisted checkpoint
* indices a map of the indices from the source including all checkpoints of all indices matching the source pattern, shard level
* time_upper_bound for time-based indices this holds the upper time boundary of this checkpoint
*
@ -44,21 +44,12 @@ public class DataFrameTransformCheckpoint implements Writeable, ToXContentObject
public static DataFrameTransformCheckpoint EMPTY = new DataFrameTransformCheckpoint("empty", 0L, -1L, Collections.emptyMap(), 0L);
// the timestamp of the checkpoint, mandatory
public static final ParseField TIMESTAMP_MILLIS = new ParseField("timestamp_millis");
public static final ParseField TIMESTAMP = new ParseField("timestamp");
// the own checkpoint
public static final ParseField CHECKPOINT = new ParseField("checkpoint");
// checkpoint of the indexes (sequence id's)
public static final ParseField INDICES = new ParseField("indices");
// checkpoint for for time based sync
// TODO: consider a lower bound for usecases where you want to transform on a window of a stream
public static final ParseField TIME_UPPER_BOUND_MILLIS = new ParseField("time_upper_bound_millis");
public static final ParseField TIME_UPPER_BOUND = new ParseField("time_upper_bound");
private static final String NAME = "data_frame_transform_checkpoint";
private static final ConstructingObjectParser<DataFrameTransformCheckpoint, Void> STRICT_PARSER = createParser(false);
@ -89,7 +80,7 @@ public class DataFrameTransformCheckpoint implements Writeable, ToXContentObject
parser.declareString(constructorArg(), DataFrameField.ID);
// note: this is never parsed from the outside where timestamp can be formatted as date time
parser.declareLong(constructorArg(), TIMESTAMP_MILLIS);
parser.declareLong(constructorArg(), DataFrameField.TIMESTAMP_MILLIS);
parser.declareLong(constructorArg(), CHECKPOINT);
parser.declareObject(constructorArg(), (p,c) -> {
@ -111,7 +102,7 @@ public class DataFrameTransformCheckpoint implements Writeable, ToXContentObject
}
return checkPointsByIndexName;
}, INDICES);
parser.declareLong(optionalConstructorArg(), TIME_UPPER_BOUND_MILLIS);
parser.declareLong(optionalConstructorArg(), DataFrameField.TIME_UPPER_BOUND_MILLIS);
parser.declareString(optionalConstructorArg(), DataFrameField.INDEX_DOC_TYPE);
return parser;
@ -134,29 +125,47 @@ public class DataFrameTransformCheckpoint implements Writeable, ToXContentObject
this.timeUpperBoundMillis = in.readLong();
}
public boolean isEmpty() {
return indicesCheckpoints.isEmpty();
}
/**
* Whether this checkpoint is a transient (non persisted) checkpoint
*
* @return true if this is a transient checkpoint, false otherwise
*/
public boolean isTransient() {
return checkpoint == -1;
}
/**
* Create XContent for the purpose of storing it in the internal index
*
* Note:
* @param builder the {@link XContentBuilder}
* @param params builder specific parameters
*
* @return builder instance
*/
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
// the id, doc_type and checkpoint is only internally used for storage, the user-facing version gets embedded
if (params.paramAsBoolean(DataFrameField.FOR_INTERNAL_STORAGE, false)) {
builder.field(DataFrameField.ID.getPreferredName(), transformId);
builder.field(CHECKPOINT.getPreferredName(), checkpoint);
builder.field(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), NAME);
}
builder.timeField(TIMESTAMP_MILLIS.getPreferredName(), TIMESTAMP.getPreferredName(), timestampMillis);
if (timeUpperBoundMillis > 0) {
builder.timeField(TIME_UPPER_BOUND_MILLIS.getPreferredName(), TIME_UPPER_BOUND.getPreferredName(), timeUpperBoundMillis);
}
builder.field(DataFrameField.ID.getPreferredName(), transformId);
builder.field(CHECKPOINT.getPreferredName(), checkpoint);
builder.field(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), NAME);
builder.startObject(INDICES.getPreferredName());
for (Entry<String, long[]> entry : indicesCheckpoints.entrySet()) {
builder.array(entry.getKey(), entry.getValue());
}
builder.endObject();
builder.field(DataFrameField.TIMESTAMP_MILLIS.getPreferredName(), timestampMillis);
if (timeUpperBoundMillis > 0) {
builder.field(DataFrameField.TIME_UPPER_BOUND_MILLIS.getPreferredName(), timeUpperBoundMillis);
}
builder.endObject();
return builder;
}
@ -249,6 +258,53 @@ public class DataFrameTransformCheckpoint implements Writeable, ToXContentObject
return NAME + "-" + transformId + "-" + checkpoint;
}
/**
* Calculate the diff of 2 checkpoints
*
* This is to get an indicator for the difference between checkpoints.
*
* Note: order is important
*
* @param oldCheckpoint the older checkpoint, if transient, newer must be transient, too
* @param newCheckpoint the newer checkpoint, can be a transient checkpoint
*
* @return count number of operations the checkpoint is behind or -1L if it could not calculate the difference
*/
public static long getBehind(DataFrameTransformCheckpoint oldCheckpoint, DataFrameTransformCheckpoint newCheckpoint) {
if (oldCheckpoint.isTransient()) {
if (newCheckpoint.isTransient() == false) {
throw new IllegalArgumentException("can not compare transient against a non transient checkpoint");
} // else: both are transient
} else if (newCheckpoint.isTransient() == false && oldCheckpoint.getCheckpoint() > newCheckpoint.getCheckpoint()) {
throw new IllegalArgumentException("old checkpoint is newer than new checkpoint");
}
// all old indices must be contained in the new ones but not vice versa
if (newCheckpoint.indicesCheckpoints.keySet().containsAll(oldCheckpoint.indicesCheckpoints.keySet()) == false) {
return -1L;
}
// get the sum of of shard checkpoints
// note: we require shard checkpoints to strictly increase and never decrease
long oldCheckPointSum = 0;
long newCheckPointSum = 0;
for (long[] v : oldCheckpoint.indicesCheckpoints.values()) {
oldCheckPointSum += Arrays.stream(v).sum();
}
for (long[] v : newCheckpoint.indicesCheckpoints.values()) {
newCheckPointSum += Arrays.stream(v).sum();
}
// this should not be possible
if (newCheckPointSum < oldCheckPointSum) {
return -1L;
}
return newCheckPointSum - oldCheckPointSum;
}
private static Map<String, long[]> readCheckpoints(Map<String, Object> readMap) {
Map<String, long[]> checkpoints = new TreeMap<>();
for (Map.Entry<String, Object> e : readMap.entrySet()) {

View File

@ -48,6 +48,7 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -68,10 +69,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
private final AtomicReference<DataFrameTransformTaskState> taskState;
private final AtomicReference<String> stateReason;
// the generation of this data frame, for v1 there will be only
// 0: data frame not created or still indexing
// 1: data frame complete, all data has been indexed
private final AtomicReference<Long> generation;
// the checkpoint of this data frame, storing the checkpoint until data indexing from source to dest is _complete_
// Note: Each indexer run creates a new future checkpoint which becomes the current checkpoint only after the indexer run finished
private final AtomicLong currentCheckpoint;
private final AtomicInteger failureCount;
public DataFrameTransformTask(long id, String type, String action, TaskId parentTask, DataFrameTransform transform,
@ -105,12 +105,12 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
initialState = existingState;
}
initialPosition = state.getPosition();
initialGeneration = state.getGeneration();
initialGeneration = state.getCheckpoint();
}
this.indexer = new ClientDataFrameIndexer(transform.getId(), transformsConfigManager, transformsCheckpointService,
new AtomicReference<>(initialState), initialPosition, client, auditor);
this.generation = new AtomicReference<>(initialGeneration);
this.currentCheckpoint = new AtomicLong(initialGeneration);
this.previousStats = new DataFrameIndexerTransformStats(transform.getId());
this.taskState = new AtomicReference<>(initialTaskState);
this.stateReason = new AtomicReference<>(initialReason);
@ -130,7 +130,12 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
}
public DataFrameTransformState getState() {
return new DataFrameTransformState(taskState.get(), indexer.getState(), indexer.getPosition(), generation.get(), stateReason.get());
return new DataFrameTransformState(
taskState.get(),
indexer.getState(),
indexer.getPosition(),
currentCheckpoint.get(),
stateReason.get());
}
void initializePreviousStats(DataFrameIndexerTransformStats stats) {
@ -141,8 +146,17 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
return new DataFrameIndexerTransformStats(previousStats).merge(indexer.getStats());
}
public long getGeneration() {
return generation.get();
public long getCheckpoint() {
return currentCheckpoint.get();
}
/**
* Get the in-progress checkpoint
*
* @return checkpoint in progress or 0 if task/indexer is not active
*/
public long getInProgressCheckpoint() {
return indexer.getState().equals(IndexerState.INDEXING) ? currentCheckpoint.get() + 1L : 0;
}
public boolean isStopped() {
@ -164,7 +178,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
DataFrameTransformTaskState.STARTED,
IndexerState.STOPPED,
indexer.getPosition(),
generation.get(),
currentCheckpoint.get(),
null);
logger.info("Updating state for data frame transform [{}] to [{}]", transform.getId(), state.toString());
@ -203,7 +217,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
DataFrameTransformTaskState.STOPPED,
IndexerState.STOPPED,
indexer.getPosition(),
generation.get(),
currentCheckpoint.get(),
stateReason.get());
persistStateToClusterState(state, ActionListener.wrap(
task -> {
@ -224,7 +238,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
@Override
public synchronized void triggered(Event event) {
if (generation.get() == 0 && event.getJobName().equals(SCHEDULE_NAME + "_" + transform.getId())) {
// for now no rerun, so only trigger if checkpoint == 0
if (currentCheckpoint.get() == 0 && event.getJobName().equals(SCHEDULE_NAME + "_" + transform.getId())) {
logger.debug("Data frame indexer [" + event.getJobName() + "] schedule has triggered, state: [" + indexer.getState() + "]");
indexer.maybeTriggerAsyncJob(System.currentTimeMillis());
}
@ -298,6 +313,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
protected class ClientDataFrameIndexer extends DataFrameIndexer {
private static final int LOAD_TRANSFORM_TIMEOUT_IN_SECONDS = 30;
private static final int CREATE_CHECKPOINT_TIMEOUT_IN_SECONDS = 30;
private final Client client;
private final DataFrameTransformsConfigManager transformsConfigManager;
private final DataFrameTransformsCheckpointService transformsCheckpointService;
@ -413,21 +430,11 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
return;
}
if(indexerState.equals(IndexerState.STARTED) && getStats().getNumDocuments() > 0) {
// if the indexer resets the state to started, it means it is done with a run through the data.
// But, if there were no documents, we should allow it to attempt to gather more again, as there is no risk of overwriting
// Some reasons for no documents are (but is not limited to):
// * Could have failed early on search or index
// * Have an empty index
// * Have a query that returns no documents
generation.compareAndSet(0L, 1L);
}
final DataFrameTransformState state = new DataFrameTransformState(
taskState.get(),
indexerState,
getPosition(),
generation.get(),
currentCheckpoint.get(),
stateReason.get());
logger.info("Updating persistent state of transform [" + transform.getId() + "] to [" + state.toString() + "]");
@ -480,8 +487,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
@Override
protected void onFinish(ActionListener<Void> listener) {
try {
auditor.info(transform.getId(), "Finished indexing for data frame transform");
logger.info("Finished indexing for data frame transform [" + transform.getId() + "]");
long checkpoint = currentCheckpoint.incrementAndGet();
auditor.info(transform.getId(), "Finished indexing for data frame transform checkpoint [" + checkpoint + "]");
logger.info("Finished indexing for data frame transform [" + transform.getId() + "] checkpoint [" + checkpoint + "]");
listener.onResponse(null);
} catch (Exception e) {
listener.onFailure(e);
@ -494,6 +502,19 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
logger.info("Data frame transform [" + transform.getId() + "] received abort request, stopping indexer");
shutdown();
}
@Override
protected void createCheckpoint(ActionListener<Void> listener) {
transformsCheckpointService.getCheckpoint(transformConfig, currentCheckpoint.get() + 1, ActionListener.wrap(checkpoint -> {
transformsConfigManager.putTransformCheckpoint(checkpoint, ActionListener.wrap(putCheckPointResponse -> {
listener.onResponse(null);
}, createCheckpointException -> {
listener.onFailure(new RuntimeException("Failed to create checkpoint", createCheckpointException));
}));
}, getCheckPointException -> {
listener.onFailure(new RuntimeException("Failed to retrieve checkpoint", getCheckPointException));
}));
}
}
class DataFrameConfigurationException extends RuntimeException {

View File

@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.persistence;
package org.elasticsearch.xpack.dataframe;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
@ -15,7 +15,7 @@ import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.core.template.TemplateUtils;
import org.elasticsearch.xpack.dataframe.LocalStateDataFrame;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.junit.Before;
import java.util.Collection;

View File

@ -0,0 +1,258 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.checkpoint;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.cache.query.QueryCacheStats;
import org.elasticsearch.index.cache.request.RequestCacheStats;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.index.warmer.WarmerStats;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
import org.elasticsearch.xpack.dataframe.DataFrameSingleNodeTestCase;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformCheckpoint;
import org.junit.After;
import org.junit.Before;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class DataFrameTransformCheckpointServiceNodeTests extends DataFrameSingleNodeTestCase {
private DataFrameTransformsConfigManager transformsConfigManager;
private MockClientForCheckpointing mockClientForCheckpointing;
private DataFrameTransformsCheckpointService transformsCheckpointService;
private class MockClientForCheckpointing extends NoOpClient {
private ShardStats[] shardStats;
private String[] indices;
MockClientForCheckpointing(String testName) {
super(testName);
}
public void setShardStats(ShardStats[] shardStats) {
this.shardStats = shardStats;
Set<String> indices = new HashSet<>();
for (ShardStats s:shardStats) {
indices.add(s.getShardRouting().getIndexName());
}
this.indices = indices.toArray(new String[0]);
}
@SuppressWarnings("unchecked")
@Override
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(Action<Response> action, Request request,
ActionListener<Response> listener) {
if (request instanceof GetIndexRequest) {
// for this test we only need the indices
final GetIndexResponse indexResponse = new GetIndexResponse(indices, null, null, null, null);
listener.onResponse((Response) indexResponse);
return;
} else if (request instanceof IndicesStatsRequest) {
// IndicesStatsResponse is package private, therefore using a mock
final IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class);
when(indicesStatsResponse.getShards()).thenReturn(shardStats);
when(indicesStatsResponse.getFailedShards()).thenReturn(0);
listener.onResponse((Response) indicesStatsResponse);
return;
}
super.doExecute(action, request, listener);
}
}
@Before
public void createComponents() {
transformsConfigManager = new DataFrameTransformsConfigManager(client(), xContentRegistry());
// use a mock for the checkpoint service
mockClientForCheckpointing = new MockClientForCheckpointing(getTestName());
transformsCheckpointService = new DataFrameTransformsCheckpointService(mockClientForCheckpointing, transformsConfigManager);
}
@After
public void tearDownClient() {
mockClientForCheckpointing.close();
}
public void testCreateReadDeleteCheckpoint() throws InterruptedException {
String transformId = randomAlphaOfLengthBetween(3, 10);
long timestamp = 1000;
DataFrameTransformCheckpoint checkpoint = new DataFrameTransformCheckpoint(transformId, timestamp, 1L,
createCheckPointMap(transformId, 10, 10, 10), null);
// create transform
assertAsync(
listener -> transformsConfigManager
.putTransformConfiguration(DataFrameTransformConfigTests.randomDataFrameTransformConfig(transformId), listener),
true, null, null);
// by design no exception is thrown but an empty checkpoint is returned
assertAsync(listener -> transformsConfigManager.getTransformCheckpoint(transformId, 1L, listener),
DataFrameTransformCheckpoint.EMPTY, null, null);
assertAsync(listener -> transformsConfigManager.putTransformCheckpoint(checkpoint, listener), true, null, null);
assertAsync(listener -> transformsConfigManager.getTransformCheckpoint(transformId, 1L, listener), checkpoint, null, null);
// add a 2nd checkpoint
DataFrameTransformCheckpoint checkpoint2 = new DataFrameTransformCheckpoint(transformId, timestamp + 100L, 2L,
createCheckPointMap(transformId, 20, 20, 20), null);
assertAsync(listener -> transformsConfigManager.putTransformCheckpoint(checkpoint2, listener), true, null, null);
// both checkpoints should be there
assertAsync(listener -> transformsConfigManager.getTransformCheckpoint(transformId, 1L, listener), checkpoint, null, null);
assertAsync(listener -> transformsConfigManager.getTransformCheckpoint(transformId, 2L, listener), checkpoint2, null, null);
// delete transform
assertAsync(listener -> transformsConfigManager.deleteTransform(transformId, listener), true, null, null);
// checkpoints should be empty again
assertAsync(listener -> transformsConfigManager.getTransformCheckpoint(transformId, 1L, listener),
DataFrameTransformCheckpoint.EMPTY, null, null);
assertAsync(listener -> transformsConfigManager.getTransformCheckpoint(transformId, 2L, listener),
DataFrameTransformCheckpoint.EMPTY, null, null);
}
public void testGetCheckpointStats() throws InterruptedException {
String transformId = randomAlphaOfLengthBetween(3, 10);
long timestamp = 1000;
// create transform
assertAsync(
listener -> transformsConfigManager
.putTransformConfiguration(DataFrameTransformConfigTests.randomDataFrameTransformConfig(transformId), listener),
true, null, null);
DataFrameTransformCheckpoint checkpoint = new DataFrameTransformCheckpoint(transformId, timestamp, 1L,
createCheckPointMap(transformId, 10, 10, 10), null);
assertAsync(listener -> transformsConfigManager.putTransformCheckpoint(checkpoint, listener), true, null, null);
DataFrameTransformCheckpoint checkpoint2 = new DataFrameTransformCheckpoint(transformId, timestamp + 100L, 2L,
createCheckPointMap(transformId, 20, 20, 20), null);
assertAsync(listener -> transformsConfigManager.putTransformCheckpoint(checkpoint2, listener), true, null, null);
mockClientForCheckpointing.setShardStats(createShardStats(createCheckPointMap(transformId, 20, 20, 20)));
DataFrameTransformCheckpointingInfo checkpointInfo = new DataFrameTransformCheckpointingInfo(
new DataFrameTransformCheckpointStats(timestamp, 0L),
new DataFrameTransformCheckpointStats(timestamp + 100L, 0L),
30L);
assertAsync(listener -> transformsCheckpointService.getCheckpointStats(transformId, 1, 2, listener), checkpointInfo, null, null);
mockClientForCheckpointing.setShardStats(createShardStats(createCheckPointMap(transformId, 10, 50, 33)));
checkpointInfo = new DataFrameTransformCheckpointingInfo(
new DataFrameTransformCheckpointStats(timestamp, 0L),
new DataFrameTransformCheckpointStats(timestamp + 100L, 0L),
63L);
assertAsync(listener -> transformsCheckpointService.getCheckpointStats(transformId, 1, 2, listener), checkpointInfo, null, null);
// same as current
mockClientForCheckpointing.setShardStats(createShardStats(createCheckPointMap(transformId, 10, 10, 10)));
checkpointInfo = new DataFrameTransformCheckpointingInfo(
new DataFrameTransformCheckpointStats(timestamp, 0L),
new DataFrameTransformCheckpointStats(timestamp + 100L, 0L),
0L);
assertAsync(listener -> transformsCheckpointService.getCheckpointStats(transformId, 1, 2, listener), checkpointInfo, null, null);
}
private static Map<String, long[]> createCheckPointMap(String index, long checkpointShard1, long checkpointShard2,
long checkpointShard3) {
return Collections.singletonMap(index, new long[] { checkpointShard1, checkpointShard2, checkpointShard3 });
}
private static ShardStats[] createShardStats(Map<String, long[]> checkpoints) {
List<ShardStats> shardStats = new ArrayList<>();
for (Entry<String, long[]> entry : checkpoints.entrySet()) {
for (int i = 0; i < entry.getValue().length; ++i) {
long checkpoint = entry.getValue()[i];
CommonStats stats = new CommonStats();
stats.fieldData = new FieldDataStats();
stats.queryCache = new QueryCacheStats();
stats.docs = new DocsStats();
stats.store = new StoreStats();
stats.indexing = new IndexingStats();
stats.search = new SearchStats();
stats.segments = new SegmentsStats();
stats.merge = new MergeStats();
stats.refresh = new RefreshStats();
stats.completion = new CompletionStats();
stats.requestCache = new RequestCacheStats();
stats.get = new GetStats();
stats.flush = new FlushStats();
stats.warmer = new WarmerStats();
SeqNoStats seqNoStats = new SeqNoStats(checkpoint, checkpoint, checkpoint);
Index index = new Index(entry.getKey(), UUIDs.randomBase64UUID(random()));
ShardId shardId = new ShardId(index, i);
ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.EmptyStoreRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
Path path = createTempDir().resolve("indices").resolve(index.getUUID()).resolve(String.valueOf(i));
shardStats.add(new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, seqNoStats, null));
}
}
return shardStats.toArray(new ShardStats[0]);
}
}

View File

@ -10,10 +10,11 @@ import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointTests;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
import org.elasticsearch.xpack.dataframe.DataFrameSingleNodeTestCase;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformCheckpoint;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformCheckpointTests;
import org.junit.Before;
import java.util.Arrays;

View File

@ -4,18 +4,17 @@
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.transforms;
package org.elasticsearch.xpack.dataframe.transforms;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.dataframe.transforms.AbstractSerializingDataFrameTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@ -44,11 +43,6 @@ public class DataFrameTransformCheckpointTests extends AbstractSerializingDataFr
return DataFrameTransformCheckpoint::new;
}
@Override
protected ToXContent.Params getToXContentParams() {
return TO_XCONTENT_PARAMS;
}
public void testXContentForInternalStorage() throws IOException {
DataFrameTransformCheckpoint dataFrameTransformCheckpoints = randomDataFrameTransformCheckpoints();
@ -58,28 +52,6 @@ public class DataFrameTransformCheckpointTests extends AbstractSerializingDataFr
assertThat(doc, matchesPattern(".*\"doc_type\"\\s*:\\s*\"data_frame_transform_checkpoint\".*"));
}
try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
XContentBuilder content = dataFrameTransformCheckpoints.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
String doc = Strings.toString(content);
assertFalse(doc.contains("doc_type"));
}
}
public void testXContentForApiUsage() throws IOException {
DataFrameTransformCheckpoint dataFrameTransformCheckpoints = new DataFrameTransformCheckpoint(randomAlphaOfLengthBetween(1, 10),
1546300800000L, randomNonNegativeLong(), Collections.emptyMap(), 1545609600000L);
try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
xContentBuilder.humanReadable(true);
XContentBuilder content = dataFrameTransformCheckpoints.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
String doc = Strings.toString(content);
assertThat(doc, matchesPattern(".*\"timestamp_millis\"\\s*:\\s*1546300800000.*"));
assertThat(doc, matchesPattern(".*\"time_upper_bound_millis\"\\s*:\\s*1545609600000.*"));
assertThat(doc, matchesPattern(".*\"timestamp\"\\s*:\\s*\"2019-01-01T00:00:00.000Z\".*"));
assertThat(doc, matchesPattern(".*\"time_upper_bound\"\\s*:\\s*\"2018-12-24T00:00:00.000Z\".*"));
}
}
public void testMatches() throws IOException {
@ -119,12 +91,90 @@ public class DataFrameTransformCheckpointTests extends AbstractSerializingDataFr
.matches(new DataFrameTransformCheckpoint(id, timestamp, checkpoint, checkpointsByIndex, (timeUpperBound / 2) + 1)));
}
public void testGetBehind() {
String id = randomAlphaOfLengthBetween(1, 10);
long timestamp = randomNonNegativeLong();
TreeMap<String, long[]> checkpointsByIndexOld = new TreeMap<>();
TreeMap<String, long[]> checkpointsByIndexNew = new TreeMap<>();
int indices = randomIntBetween(3, 10);
int shards = randomIntBetween(1, 20);
for (int i = 0; i < indices; ++i) {
List<Long> checkpoints1 = new ArrayList<>();
List<Long> checkpoints2 = new ArrayList<>();
for (int j = 0; j < shards; ++j) {
long shardCheckpoint = randomLongBetween(0, 1_000_000);
checkpoints1.add(shardCheckpoint);
checkpoints2.add(shardCheckpoint + 10);
}
String indexName = randomAlphaOfLengthBetween(1, 10);
checkpointsByIndexOld.put(indexName, checkpoints1.stream().mapToLong(l -> l).toArray());
checkpointsByIndexNew.put(indexName, checkpoints2.stream().mapToLong(l -> l).toArray());
}
long checkpoint = randomLongBetween(10, 100);
DataFrameTransformCheckpoint checkpointOld = new DataFrameTransformCheckpoint(
id, timestamp, checkpoint, checkpointsByIndexOld, 0L);
DataFrameTransformCheckpoint checkpointTransientNew = new DataFrameTransformCheckpoint(
id, timestamp, -1L, checkpointsByIndexNew, 0L);
DataFrameTransformCheckpoint checkpointNew = new DataFrameTransformCheckpoint(
id, timestamp, checkpoint + 1, checkpointsByIndexNew, 0L);
DataFrameTransformCheckpoint checkpointOlderButNewerShardsCheckpoint = new DataFrameTransformCheckpoint(
id, timestamp, checkpoint - 1, checkpointsByIndexNew, 0L);
assertEquals(indices * shards * 10L, DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew));
assertEquals(indices * shards * 10L, DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointNew));
// no difference for same checkpoints, transient or not
assertEquals(0L, DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointOld));
assertEquals(0L, DataFrameTransformCheckpoint.getBehind(checkpointTransientNew, checkpointTransientNew));
assertEquals(0L, DataFrameTransformCheckpoint.getBehind(checkpointNew, checkpointNew));
// new vs transient new: ok
assertEquals(0L, DataFrameTransformCheckpoint.getBehind(checkpointNew, checkpointTransientNew));
// transient new vs new: illegal
Exception e = expectThrows(IllegalArgumentException.class,
() -> DataFrameTransformCheckpoint.getBehind(checkpointTransientNew, checkpointNew));
assertEquals("can not compare transient against a non transient checkpoint", e.getMessage());
// new vs old: illegal
e = expectThrows(IllegalArgumentException.class, () -> DataFrameTransformCheckpoint.getBehind(checkpointNew, checkpointOld));
assertEquals("old checkpoint is newer than new checkpoint", e.getMessage());
// corner case: the checkpoint appears older but the inner shard checkpoints are newer
assertEquals(-1L, DataFrameTransformCheckpoint.getBehind(checkpointOlderButNewerShardsCheckpoint, checkpointOld));
// test cases where indices sets do not match
// remove something from old, so newer has 1 index more than old
checkpointsByIndexOld.remove(checkpointsByIndexOld.firstKey());
long behind = DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew);
assertTrue("Expected behind (" + behind + ") > sum of shard checkpoints (" + indices * shards * 10L + ")",
behind > indices * shards * 10L);
// remove same key: old and new should have equal indices again
checkpointsByIndexNew.remove(checkpointsByIndexNew.firstKey());
assertEquals((indices - 1) * shards * 10L, DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew));
// remove 1st index from new, now old has 1 index more, behind can not be calculated
checkpointsByIndexNew.remove(checkpointsByIndexNew.firstKey());
assertEquals(-1L, DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew));
}
private static Map<String, long[]> randomCheckpointsByIndex() {
Map<String, long[]> checkpointsByIndex = new TreeMap<>();
for (int i = 0; i < randomIntBetween(1, 10); ++i) {
int indices = randomIntBetween(1, 10);
for (int i = 0; i < indices; ++i) {
List<Long> checkpoints = new ArrayList<>();
for (int j = 0; j < randomIntBetween(1, 20); ++j) {
checkpoints.add(randomNonNegativeLong());
int shards = randomIntBetween(1, 20);
for (int j = 0; j < shards; ++j) {
checkpoints.add(randomLongBetween(0, 1_000_000));
}
checkpointsByIndex.put(randomAlphaOfLengthBetween(1, 10), checkpoints.stream().mapToLong(l -> l).toArray());
}

View File

@ -48,7 +48,7 @@ teardown:
- match: { transforms.0.id: "airline-transform-stats" }
- match: { transforms.0.state.indexer_state: "started" }
- match: { transforms.0.state.task_state: "started" }
- match: { transforms.0.state.generation: 0 }
- match: { transforms.0.state.checkpoint: 0 }
- match: { transforms.0.stats.pages_processed: 0 }
- match: { transforms.0.stats.documents_processed: 0 }
- match: { transforms.0.stats.documents_indexed: 0 }
@ -196,7 +196,7 @@ teardown:
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-stats-dos" }
- match: { transforms.0.state.indexer_state: "stopped" }
- match: { transforms.0.state.generation: 0 }
- match: { transforms.0.state.checkpoint: 0 }
- match: { transforms.0.stats.pages_processed: 0 }
- match: { transforms.0.stats.documents_processed: 0 }
- match: { transforms.0.stats.documents_indexed: 0 }