Add a checkpoint service for data frame transforms (#39836)

Add a checkpoint service for data frame transforms, which allows to ask for a checkpoint of the
source. In future these checkpoints will be stored in the internal index to

 - detect upstream changes
 - updating the data frame without a full re-run
 - allow data frame clients to checkpoint themselves
This commit is contained in:
Hendrik Muhs 2019-03-22 09:56:41 +01:00
parent 1265a15b75
commit 5a0c32833e
17 changed files with 942 additions and 44 deletions

View File

@ -53,6 +53,9 @@ public class DataFrameMessages {
public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_AGGREGATION = public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_AGGREGATION =
"Failed to parse aggregation for data frame pivot transform"; "Failed to parse aggregation for data frame pivot transform";
public static final String FAILED_TO_PARSE_TRANSFORM_CHECKPOINTS =
"Failed to parse transform checkpoints for [{0}]";
private DataFrameMessages() { private DataFrameMessages() {
} }

View File

@ -0,0 +1,264 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.TreeMap;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
/**
* Checkpoint document to store the checkpoint of a data frame transform
*
* The fields:
*
* timestamp the timestamp when this document has been created
* checkpoint the checkpoint number, incremented for every checkpoint
* indices a map of the indices from the source including all checkpoints of all indices matching the source pattern, shard level
* time_upper_bound for time-based indices this holds the upper time boundary of this checkpoint
*
*/
public class DataFrameTransformCheckpoint implements Writeable, ToXContentObject {
public static DataFrameTransformCheckpoint EMPTY = new DataFrameTransformCheckpoint("empty", 0L, -1L, Collections.emptyMap(), 0L);
// the timestamp of the checkpoint, mandatory
public static final ParseField TIMESTAMP_MILLIS = new ParseField("timestamp_millis");
public static final ParseField TIMESTAMP = new ParseField("timestamp");
// the own checkpoint
public static final ParseField CHECKPOINT = new ParseField("checkpoint");
// checkpoint of the indexes (sequence id's)
public static final ParseField INDICES = new ParseField("indices");
// checkpoint for for time based sync
// TODO: consider a lower bound for usecases where you want to transform on a window of a stream
public static final ParseField TIME_UPPER_BOUND_MILLIS = new ParseField("time_upper_bound_millis");
public static final ParseField TIME_UPPER_BOUND = new ParseField("time_upper_bound");
private static final String NAME = "data_frame_transform_checkpoint";
private static final ConstructingObjectParser<DataFrameTransformCheckpoint, Void> STRICT_PARSER = createParser(false);
private static final ConstructingObjectParser<DataFrameTransformCheckpoint, Void> LENIENT_PARSER = createParser(true);
private final String transformId;
private final long timestampMillis;
private final long checkpoint;
private final Map<String, long[]> indicesCheckpoints;
private final long timeUpperBoundMillis;
private static ConstructingObjectParser<DataFrameTransformCheckpoint, Void> createParser(boolean lenient) {
ConstructingObjectParser<DataFrameTransformCheckpoint, Void> parser = new ConstructingObjectParser<>(NAME,
lenient, args -> {
String id = (String) args[0];
Long timestamp = (Long) args[1];
Long checkpoint = (Long) args[2];
@SuppressWarnings("unchecked")
Map<String, long[]> checkpoints = (Map<String, long[]>) args[3];
Long timestamp_checkpoint = (Long) args[4];
// ignored, only for internal storage: String docType = (String) args[5];
return new DataFrameTransformCheckpoint(id, timestamp, checkpoint, checkpoints, timestamp_checkpoint);
});
parser.declareString(constructorArg(), DataFrameField.ID);
// note: this is never parsed from the outside where timestamp can be formatted as date time
parser.declareLong(constructorArg(), TIMESTAMP_MILLIS);
parser.declareLong(constructorArg(), CHECKPOINT);
parser.declareObject(constructorArg(), (p,c) -> {
Map<String, long[]> checkPointsByIndexName = new TreeMap<>();
XContentParser.Token token = null;
while ((token = p.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token != XContentParser.Token.FIELD_NAME) {
throw new ParsingException(p.getTokenLocation(), "Unexpected token " + token + " ");
}
final String indexName = p.currentName();
token = p.nextToken();
if (token != XContentParser.Token.START_ARRAY) {
throw new ParsingException(p.getTokenLocation(), "Unexpected token " + token + " ");
}
long[] checkpoints = p.listOrderedMap().stream().mapToLong(num -> ((Number) num).longValue()).toArray();
checkPointsByIndexName.put(indexName, checkpoints);
}
return checkPointsByIndexName;
}, INDICES);
parser.declareLong(optionalConstructorArg(), TIME_UPPER_BOUND_MILLIS);
parser.declareString(optionalConstructorArg(), DataFrameField.INDEX_DOC_TYPE);
return parser;
}
public DataFrameTransformCheckpoint(String transformId, Long timestamp, Long checkpoint, Map<String, long[]> checkpoints,
Long timeUpperBound) {
this.transformId = transformId;
this.timestampMillis = timestamp.longValue();
this.checkpoint = checkpoint;
this.indicesCheckpoints = Collections.unmodifiableMap(checkpoints);
this.timeUpperBoundMillis = timeUpperBound == null ? 0 : timeUpperBound.longValue();
}
public DataFrameTransformCheckpoint(StreamInput in) throws IOException {
this.transformId = in.readString();
this.timestampMillis = in.readLong();
this.checkpoint = in.readLong();
this.indicesCheckpoints = readCheckpoints(in.readMap());
this.timeUpperBoundMillis = in.readLong();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
// the id, doc_type and checkpoint is only internally used for storage, the user-facing version gets embedded
if (params.paramAsBoolean(DataFrameField.FOR_INTERNAL_STORAGE, false)) {
builder.field(DataFrameField.ID.getPreferredName(), transformId);
builder.field(CHECKPOINT.getPreferredName(), checkpoint);
builder.field(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), NAME);
}
builder.timeField(TIMESTAMP_MILLIS.getPreferredName(), TIMESTAMP.getPreferredName(), timestampMillis);
if (timeUpperBoundMillis > 0) {
builder.timeField(TIME_UPPER_BOUND_MILLIS.getPreferredName(), TIME_UPPER_BOUND.getPreferredName(), timeUpperBoundMillis);
}
builder.startObject(INDICES.getPreferredName());
for (Entry<String, long[]> entry : indicesCheckpoints.entrySet()) {
builder.array(entry.getKey(), entry.getValue());
}
builder.endObject();
builder.endObject();
return builder;
}
public String getTransformId() {
return transformId;
}
public long getTimestamp() {
return timestampMillis;
}
public long getCheckpoint() {
return checkpoint;
}
public Map<String, long[]> getIndicesCheckpoints() {
return indicesCheckpoints;
}
public long getTimeUpperBound() {
return timeUpperBoundMillis;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(transformId);
out.writeLong(timestampMillis);
out.writeLong(checkpoint);
out.writeGenericValue(indicesCheckpoints);
out.writeLong(timeUpperBoundMillis);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
final DataFrameTransformCheckpoint that = (DataFrameTransformCheckpoint) other;
// compare the timestamp, id, checkpoint and than call matches for the rest
return this.timestampMillis == that.timestampMillis && this.checkpoint == that.checkpoint
&& this.timeUpperBoundMillis == that.timeUpperBoundMillis && matches(that);
}
/**
* Compares 2 checkpoints ignoring some inner fields.
*
* This is for comparing 2 checkpoints to check whether the data frame transform requires an update
*
* @param that other checkpoint
* @return true if checkpoints match
*/
public boolean matches (DataFrameTransformCheckpoint that) {
if (this == that) {
return true;
}
return Objects.equals(this.transformId, that.transformId)
&& this.indicesCheckpoints.size() == that.indicesCheckpoints.size() // quick check
// do the expensive deep equal operation last
&& this.indicesCheckpoints.entrySet().stream()
.allMatch(e -> Arrays.equals(e.getValue(), that.indicesCheckpoints.get(e.getKey())));
}
@Override
public int hashCode() {
int hash = Objects.hash(transformId, timestampMillis, checkpoint, timeUpperBoundMillis);
for (Entry<String, long[]> e : indicesCheckpoints.entrySet()) {
hash = 31 * hash + Objects.hash(e.getKey(), Arrays.hashCode(e.getValue()));
}
return hash;
}
public static DataFrameTransformCheckpoint fromXContent(final XContentParser parser, boolean lenient) throws IOException {
return lenient ? LENIENT_PARSER.apply(parser, null) : STRICT_PARSER.apply(parser, null);
}
public static String documentId(String transformId, long checkpoint) {
if (checkpoint < 0) {
throw new IllegalArgumentException("checkpoint must be a positive number");
}
return NAME + "-" + transformId + "-" + checkpoint;
}
private static Map<String, long[]> readCheckpoints(Map<String, Object> readMap) {
Map<String, long[]> checkpoints = new TreeMap<>();
for (Map.Entry<String, Object> e : readMap.entrySet()) {
if (e.getValue() instanceof long[]) {
checkpoints.put(e.getKey(), (long[]) e.getValue());
} else {
throw new ElasticsearchParseException("expecting the checkpoints for [{}] to be a long[], but found [{}] instead",
e.getKey(), e.getValue().getClass());
}
}
return checkpoints;
}
}

View File

@ -12,13 +12,16 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContent.Params;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.BaseAggregationBuilder; import org.elasticsearch.search.aggregations.BaseAggregationBuilder;
import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.junit.Before; import org.junit.Before;
import java.util.Collections;
import java.util.List; import java.util.List;
import static java.util.Collections.emptyList; import static java.util.Collections.emptyList;
@ -26,6 +29,9 @@ import static java.util.Collections.emptyList;
public abstract class AbstractSerializingDataFrameTestCase<T extends ToXContent & Writeable> public abstract class AbstractSerializingDataFrameTestCase<T extends ToXContent & Writeable>
extends AbstractSerializingTestCase<T> { extends AbstractSerializingTestCase<T> {
protected static Params TO_XCONTENT_PARAMS = new ToXContent.MapParams(
Collections.singletonMap(DataFrameField.FOR_INTERNAL_STORAGE, "true"));
/** /**
* Test case that ensures aggregation named objects are registered * Test case that ensures aggregation named objects are registered
*/ */

View File

@ -0,0 +1,133 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import static org.elasticsearch.test.TestMatchers.matchesPattern;
public class DataFrameTransformCheckpointTests extends AbstractSerializingDataFrameTestCase<DataFrameTransformCheckpoint> {
public static DataFrameTransformCheckpoint randomDataFrameTransformCheckpoints() {
return new DataFrameTransformCheckpoint(randomAlphaOfLengthBetween(1, 10), randomNonNegativeLong(), randomNonNegativeLong(),
randomCheckpointsByIndex(), randomNonNegativeLong());
}
@Override
protected DataFrameTransformCheckpoint doParseInstance(XContentParser parser) throws IOException {
return DataFrameTransformCheckpoint.fromXContent(parser, false);
}
@Override
protected DataFrameTransformCheckpoint createTestInstance() {
return randomDataFrameTransformCheckpoints();
}
@Override
protected Reader<DataFrameTransformCheckpoint> instanceReader() {
return DataFrameTransformCheckpoint::new;
}
@Override
protected ToXContent.Params getToXContentParams() {
return TO_XCONTENT_PARAMS;
}
public void testXContentForInternalStorage() throws IOException {
DataFrameTransformCheckpoint dataFrameTransformCheckpoints = randomDataFrameTransformCheckpoints();
try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
XContentBuilder content = dataFrameTransformCheckpoints.toXContent(xContentBuilder, getToXContentParams());
String doc = Strings.toString(content);
assertThat(doc, matchesPattern(".*\"doc_type\"\\s*:\\s*\"data_frame_transform_checkpoint\".*"));
}
try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
XContentBuilder content = dataFrameTransformCheckpoints.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
String doc = Strings.toString(content);
assertFalse(doc.contains("doc_type"));
}
}
public void testXContentForApiUsage() throws IOException {
DataFrameTransformCheckpoint dataFrameTransformCheckpoints = new DataFrameTransformCheckpoint(randomAlphaOfLengthBetween(1, 10),
1546300800000L, randomNonNegativeLong(), Collections.emptyMap(), 1545609600000L);
try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
xContentBuilder.humanReadable(true);
XContentBuilder content = dataFrameTransformCheckpoints.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
String doc = Strings.toString(content);
assertThat(doc, matchesPattern(".*\"timestamp_millis\"\\s*:\\s*1546300800000.*"));
assertThat(doc, matchesPattern(".*\"time_upper_bound_millis\"\\s*:\\s*1545609600000.*"));
assertThat(doc, matchesPattern(".*\"timestamp\"\\s*:\\s*\"2019-01-01T00:00:00.000Z\".*"));
assertThat(doc, matchesPattern(".*\"time_upper_bound\"\\s*:\\s*\"2018-12-24T00:00:00.000Z\".*"));
}
}
public void testMatches() throws IOException {
String id = randomAlphaOfLengthBetween(1, 10);
long timestamp = randomNonNegativeLong();
long checkpoint = randomNonNegativeLong();
Map<String, long[]> checkpointsByIndex = randomCheckpointsByIndex();
Map<String, long[]> otherCheckpointsByIndex = new TreeMap<>(checkpointsByIndex);
otherCheckpointsByIndex.put(randomAlphaOfLengthBetween(1, 10), new long[] { 1, 2, 3 });
long timeUpperBound = randomNonNegativeLong();
DataFrameTransformCheckpoint dataFrameTransformCheckpoints = new DataFrameTransformCheckpoint(id, timestamp, checkpoint,
checkpointsByIndex, timeUpperBound);
// same
assertTrue(dataFrameTransformCheckpoints.matches(dataFrameTransformCheckpoints));
DataFrameTransformCheckpoint dataFrameTransformCheckpointsCopy = copyInstance(dataFrameTransformCheckpoints);
// with copy
assertTrue(dataFrameTransformCheckpoints.matches(dataFrameTransformCheckpointsCopy));
assertTrue(dataFrameTransformCheckpointsCopy.matches(dataFrameTransformCheckpoints));
// other id
assertFalse(dataFrameTransformCheckpoints
.matches(new DataFrameTransformCheckpoint(id + "-1", timestamp, checkpoint, checkpointsByIndex, timeUpperBound)));
// other timestamp
assertTrue(dataFrameTransformCheckpoints
.matches(new DataFrameTransformCheckpoint(id, (timestamp / 2) + 1, checkpoint, checkpointsByIndex, timeUpperBound)));
// other checkpoint
assertTrue(dataFrameTransformCheckpoints
.matches(new DataFrameTransformCheckpoint(id, timestamp, (checkpoint / 2) + 1, checkpointsByIndex, timeUpperBound)));
// other index checkpoints
assertFalse(dataFrameTransformCheckpoints
.matches(new DataFrameTransformCheckpoint(id, timestamp, checkpoint, otherCheckpointsByIndex, timeUpperBound)));
// other time upper bound
assertTrue(dataFrameTransformCheckpoints
.matches(new DataFrameTransformCheckpoint(id, timestamp, checkpoint, checkpointsByIndex, (timeUpperBound / 2) + 1)));
}
private static Map<String, long[]> randomCheckpointsByIndex() {
Map<String, long[]> checkpointsByIndex = new TreeMap<>();
for (int i = 0; i < randomIntBetween(1, 10); ++i) {
List<Long> checkpoints = new ArrayList<>();
for (int j = 0; j < randomIntBetween(1, 20); ++j) {
checkpoints.add(randomNonNegativeLong());
}
checkpointsByIndex.put(randomAlphaOfLengthBetween(1, 10), checkpoints.stream().mapToLong(l -> l).toArray());
}
return checkpointsByIndex;
}
}

View File

@ -10,17 +10,14 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContent.Params;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfigTests; import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfigTests;
import org.junit.Before; import org.junit.Before;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -28,9 +25,6 @@ import static org.elasticsearch.test.TestMatchers.matchesPattern;
public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameTestCase<DataFrameTransformConfig> { public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameTestCase<DataFrameTransformConfig> {
private static Params TO_XCONTENT_PARAMS = new ToXContent.MapParams(
Collections.singletonMap(DataFrameField.FOR_INTERNAL_STORAGE, "true"));
private String transformId; private String transformId;
private boolean runWithHeaders; private boolean runWithHeaders;

View File

@ -15,8 +15,8 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import java.io.IOException; import java.io.IOException;
@ -47,6 +47,9 @@ public class DataFrameConfigurationIndexIT extends DataFrameRestTestCase {
client().performRequest(req); client().performRequest(req);
} }
// refresh the index
assertOK(client().performRequest(new Request("POST", DataFrameInternalIndex.INDEX_NAME + "/_refresh")));
Request deleteRequest = new Request("DELETE", DATAFRAME_ENDPOINT + fakeTransformName); Request deleteRequest = new Request("DELETE", DATAFRAME_ENDPOINT + fakeTransformName);
Response deleteResponse = client().performRequest(deleteRequest); Response deleteResponse = client().performRequest(deleteRequest);
assertOK(deleteResponse); assertOK(deleteResponse);

View File

@ -57,6 +57,7 @@ import org.elasticsearch.xpack.dataframe.action.TransportPutDataFrameTransformAc
import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransformTaskAction; import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransformTaskAction;
import org.elasticsearch.xpack.dataframe.action.TransportStopDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.action.TransportStopDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex; import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.rest.action.RestDeleteDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.rest.action.RestDeleteDataFrameTransformAction;
@ -98,6 +99,7 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
private final Settings settings; private final Settings settings;
private final boolean transportClientMode; private final boolean transportClientMode;
private final SetOnce<DataFrameTransformsConfigManager> dataFrameTransformsConfigManager = new SetOnce<>(); private final SetOnce<DataFrameTransformsConfigManager> dataFrameTransformsConfigManager = new SetOnce<>();
private final SetOnce<DataFrameTransformsCheckpointService> dataFrameTransformsCheckpointService = new SetOnce<>();
private final SetOnce<SchedulerEngine> schedulerEngine = new SetOnce<>(); private final SetOnce<SchedulerEngine> schedulerEngine = new SetOnce<>();
public DataFrame(Settings settings) { public DataFrame(Settings settings) {
@ -180,8 +182,9 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
} }
dataFrameTransformsConfigManager.set(new DataFrameTransformsConfigManager(client, xContentRegistry)); dataFrameTransformsConfigManager.set(new DataFrameTransformsConfigManager(client, xContentRegistry));
dataFrameTransformsCheckpointService.set(new DataFrameTransformsCheckpointService(client));
return Collections.singletonList(dataFrameTransformsConfigManager.get()); return Arrays.asList(dataFrameTransformsConfigManager.get(), dataFrameTransformsCheckpointService.get());
} }
@Override @Override
@ -207,10 +210,12 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
// the transforms config manager should have been created // the transforms config manager should have been created
assert dataFrameTransformsConfigManager.get() != null; assert dataFrameTransformsConfigManager.get() != null;
assert dataFrameTransformsCheckpointService.get() != null;
return Collections.singletonList(new DataFrameTransformPersistentTasksExecutor(client, dataFrameTransformsConfigManager.get(), return Collections.singletonList(new DataFrameTransformPersistentTasksExecutor(client, dataFrameTransformsConfigManager.get(),
schedulerEngine.get(), threadPool)); dataFrameTransformsCheckpointService.get(), schedulerEngine.get(), threadPool));
} }
@Override @Override
public void close() { public void close() {
if (schedulerEngine.get() != null) { if (schedulerEngine.get() != null) {

View File

@ -59,7 +59,7 @@ public class TransportDeleteDataFrameTransformAction extends TransportTasksActio
IndexerState state = task.getState().getIndexerState(); IndexerState state = task.getState().getIndexerState();
if (state.equals(IndexerState.STOPPED)) { if (state.equals(IndexerState.STOPPED)) {
task.onCancelled(); task.onCancelled();
transformsConfigManager.deleteTransformConfiguration(request.getId(), ActionListener.wrap(r -> { transformsConfigManager.deleteTransform(request.getId(), ActionListener.wrap(r -> {
listener.onResponse(new Response(true)); listener.onResponse(new Response(true));
}, listener::onFailure)); }, listener::onFailure));
} else { } else {
@ -80,7 +80,7 @@ public class TransportDeleteDataFrameTransformAction extends TransportTasksActio
// we couldn't find the transform in the persistent task CS, but maybe the transform exists in the configuration index, // we couldn't find the transform in the persistent task CS, but maybe the transform exists in the configuration index,
// if so delete the orphaned document and do not throw (for the normal case we want to stop the task first, // if so delete the orphaned document and do not throw (for the normal case we want to stop the task first,
// than delete the configuration document if and only if the data frame transform is in stopped state) // than delete the configuration document if and only if the data frame transform is in stopped state)
transformsConfigManager.deleteTransformConfiguration(request.getId(), ActionListener.wrap(r -> { transformsConfigManager.deleteTransform(request.getId(), ActionListener.wrap(r -> {
listener.onResponse(new Response(true)); listener.onResponse(new Response(true));
return; return;
}, listener::onFailure)); }, listener::onFailure));

View File

@ -42,6 +42,7 @@ import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction.Request; import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction.Request;
import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction.Response; import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction.Response;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction;
import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest;
@ -51,7 +52,6 @@ import org.elasticsearch.xpack.core.security.authz.permission.ResourcePrivileges
import org.elasticsearch.xpack.core.security.support.Exceptions; import org.elasticsearch.xpack.core.security.support.Exceptions;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.persistence.DataframeIndex; import org.elasticsearch.xpack.dataframe.persistence.DataframeIndex;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
import java.io.IOException; import java.io.IOException;

View File

@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.checkpoint;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.IOException;
public class CheckpointException extends ElasticsearchException {
public CheckpointException(String msg, Object... params) {
super(msg, null, params);
}
public CheckpointException(String msg, Throwable cause, Object... params) {
super(msg, cause, params);
}
public CheckpointException(StreamInput in) throws IOException {
super(in);
}
}

View File

@ -0,0 +1,136 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.checkpoint;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.client.Client;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
/**
* DataFrameTransform Checkpoint Service
*
* Allows checkpointing a source of a data frame transform which includes all relevant checkpoints of the source.
*
* This will be used to checkpoint a transform, detect changes, run the transform in continuous mode.
*
*/
public class DataFrameTransformsCheckpointService {
private static final Logger logger = LogManager.getLogger(DataFrameTransformsCheckpointService.class);
private final Client client;
public DataFrameTransformsCheckpointService(final Client client) {
this.client = client;
}
/**
* Get an unnumbered checkpoint. These checkpoints are for persistence but comparing state.
*
* @param transformConfig the @link{DataFrameTransformConfig}
* @param listener listener to call after inner request returned
*/
public void getCheckpoint(DataFrameTransformConfig transformConfig, ActionListener<DataFrameTransformCheckpoint> listener) {
getCheckpoint(transformConfig, -1L, listener);
}
/**
* Get a checkpoint, used to store a checkpoint.
*
* @param transformConfig the @link{DataFrameTransformConfig}
* @param checkpoint the number of the checkpoint
* @param listener listener to call after inner request returned
*/
public void getCheckpoint(DataFrameTransformConfig transformConfig, long checkpoint,
ActionListener<DataFrameTransformCheckpoint> listener) {
long timestamp = System.currentTimeMillis();
// placeholder for time based synchronization
long timeUpperBound = 0;
// 1st get index to see the indexes the user has access to
GetIndexRequest getIndexRequest = new GetIndexRequest().indices(transformConfig.getSource());
ClientHelper.executeWithHeadersAsync(transformConfig.getHeaders(), ClientHelper.DATA_FRAME_ORIGIN, client, GetIndexAction.INSTANCE,
getIndexRequest, ActionListener.wrap(getIndexResponse -> {
Set<String> userIndices = new HashSet<>(Arrays.asList(getIndexResponse.getIndices()));
// 2nd get stats request
ClientHelper.executeAsyncWithOrigin(client, ClientHelper.DATA_FRAME_ORIGIN, IndicesStatsAction.INSTANCE,
new IndicesStatsRequest().indices(transformConfig.getSource()), ActionListener.wrap(response -> {
if (response.getFailedShards() != 0) {
throw new CheckpointException("Source has [" + response.getFailedShards() + "] failed shards");
}
Map<String, long[]> checkpointsByIndex = extractIndexCheckPoints(response.getShards(), userIndices);
DataFrameTransformCheckpoint checkpointDoc = new DataFrameTransformCheckpoint(transformConfig.getId(),
timestamp, checkpoint, checkpointsByIndex, timeUpperBound);
listener.onResponse(checkpointDoc);
}, IndicesStatsRequestException -> {
throw new CheckpointException("Failed to retrieve indices stats", IndicesStatsRequestException);
}));
}, getIndexException -> {
throw new CheckpointException("Failed to retrieve list of indices", getIndexException);
}));
}
static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<String> userIndices) {
Map<String, TreeMap<Integer, Long>> checkpointsByIndex = new TreeMap<>();
for (ShardStats shard : shards) {
String indexName = shard.getShardRouting().getIndexName();
if (userIndices.contains(indexName)) {
if (checkpointsByIndex.containsKey(indexName)) {
// we have already seen this index, just check/add shards
TreeMap<Integer, Long> checkpoints = checkpointsByIndex.get(indexName);
if (checkpoints.containsKey(shard.getShardRouting().getId())) {
// there is already a checkpoint entry for this index/shard combination, check if they match
if (checkpoints.get(shard.getShardRouting().getId()) != shard.getSeqNoStats().getGlobalCheckpoint()) {
throw new CheckpointException("Global checkpoints mismatch for index [" + indexName + "] between shards of id ["
+ shard.getShardRouting().getId() + "]");
}
} else {
// 1st time we see this shard for this index, add the entry for the shard
checkpoints.put(shard.getShardRouting().getId(), shard.getSeqNoStats().getGlobalCheckpoint());
}
} else {
// 1st time we see this index, create an entry for the index and add the shard checkpoint
checkpointsByIndex.put(indexName, new TreeMap<>());
checkpointsByIndex.get(indexName).put(shard.getShardRouting().getId(), shard.getSeqNoStats().getGlobalCheckpoint());
}
}
}
// create the final structure
Map<String, long[]> checkpointsByIndexReduced = new TreeMap<>();
checkpointsByIndex.forEach((indexName, checkpoints) -> {
checkpointsByIndexReduced.put(indexName, checkpoints.values().stream().mapToLong(l -> l).toArray());
});
return checkpointsByIndexReduced;
}
}

View File

@ -13,9 +13,6 @@ import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteAction;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetAction; import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexAction;
@ -32,8 +29,13 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import java.io.IOException; import java.io.IOException;
@ -58,6 +60,37 @@ public class DataFrameTransformsConfigManager {
this.xContentRegistry = xContentRegistry; this.xContentRegistry = xContentRegistry;
} }
/**
* Persist a checkpoint in the internal index
*
* @param checkpoint the @link{DataFrameTransformCheckpoint}
* @param listener listener to call after request has been made
*/
public void putTransformCheckpoint(DataFrameTransformCheckpoint checkpoint, ActionListener<Boolean> listener) {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
XContentBuilder source = checkpoint.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS));
IndexRequest indexRequest = new IndexRequest(DataFrameInternalIndex.INDEX_NAME)
.opType(DocWriteRequest.OpType.INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.id(DataFrameTransformCheckpoint.documentId(checkpoint.getTransformId(), checkpoint.getCheckpoint()))
.source(source);
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap(r -> {
listener.onResponse(true);
}, listener::onFailure));
} catch (IOException e) {
// not expected to happen but for the sake of completeness
listener.onFailure(e);
}
}
/**
* Store the transform configuration in the internal index
*
* @param transformConfig the @link{DataFrameTransformConfig}
* @param listener listener to call after request
*/
public void putTransformConfiguration(DataFrameTransformConfig transformConfig, ActionListener<Boolean> listener) { public void putTransformConfiguration(DataFrameTransformConfig transformConfig, ActionListener<Boolean> listener) {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) { try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
XContentBuilder source = transformConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); XContentBuilder source = transformConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS));
@ -89,8 +122,35 @@ public class DataFrameTransformsConfigManager {
} }
} }
// For transforms returned via GET data_frame/transforms, see the TransportGetDataFrameTransformsAction /**
// This function is only for internal use. * Get a stored checkpoint, requires the transform id as well as the checkpoint id
*
* @param transformId the transform id
* @param checkpoint the checkpoint
* @param resultListener listener to call after request has been made
*/
public void getTransformCheckpoint(String transformId, long checkpoint, ActionListener<DataFrameTransformCheckpoint> resultListener) {
GetRequest getRequest = new GetRequest(DataFrameInternalIndex.INDEX_NAME,
DataFrameTransformCheckpoint.documentId(transformId, checkpoint));
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> {
if (getResponse.isExists() == false) {
// do not fail if checkpoint does not exist but return an empty checkpoint
resultListener.onResponse(DataFrameTransformCheckpoint.EMPTY);
return;
}
BytesReference source = getResponse.getSourceAsBytesRef();
parseCheckpointsLenientlyFromSource(source, transformId, resultListener);
}, resultListener::onFailure));
}
/**
* Get the transform configuration for a given transform id. This function is only for internal use. For transforms returned via GET
* data_frame/transforms, see the TransportGetDataFrameTransformsAction
*
* @param transformId the transform id
* @param resultListener listener to call after inner request has returned
*/
public void getTransformConfiguration(String transformId, ActionListener<DataFrameTransformConfig> resultListener) { public void getTransformConfiguration(String transformId, ActionListener<DataFrameTransformConfig> resultListener) {
GetRequest getRequest = new GetRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameTransformConfig.documentId(transformId)); GetRequest getRequest = new GetRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameTransformConfig.documentId(transformId));
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> { executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> {
@ -112,13 +172,25 @@ public class DataFrameTransformsConfigManager {
})); }));
} }
public void deleteTransformConfiguration(String transformId, ActionListener<Boolean> listener) { /**
DeleteRequest request = new DeleteRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameTransformConfig.documentId(transformId)); * This deletes the configuration and all other documents corresponding to the transform id (e.g. checkpoints).
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); *
* @param transformId the transform id
* @param listener listener to call after inner request returned
*/
public void deleteTransform(String transformId, ActionListener<Boolean> listener) {
DeleteByQueryRequest request = new DeleteByQueryRequest()
.setAbortOnVersionConflict(false) //since these documents are not updated, a conflict just means it was deleted previously
.setSlices(5);
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, DeleteAction.INSTANCE, request, ActionListener.wrap(deleteResponse -> { request.indices(DataFrameInternalIndex.INDEX_NAME);
QueryBuilder query = QueryBuilders.termQuery(DataFrameField.ID.getPreferredName(), transformId);
request.setQuery(query);
request.setRefresh(true);
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(deleteResponse -> {
if (deleteResponse.getDeleted() == 0) {
listener.onFailure(new ResourceNotFoundException( listener.onFailure(new ResourceNotFoundException(
DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformId))); DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformId)));
return; return;
@ -145,4 +217,16 @@ public class DataFrameTransformsConfigManager {
transformListener.onFailure(e); transformListener.onFailure(e);
} }
} }
private void parseCheckpointsLenientlyFromSource(BytesReference source, String transformId,
ActionListener<DataFrameTransformCheckpoint> transformListener) {
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) {
transformListener.onResponse(DataFrameTransformCheckpoint.fromXContent(parser, true));
} catch (Exception e) {
logger.error(DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_PARSE_TRANSFORM_CHECKPOINTS, transformId), e);
transformListener.onFailure(e);
}
}
} }

View File

@ -21,6 +21,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.dataframe.DataFrame; import org.elasticsearch.xpack.dataframe.DataFrame;
import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import java.util.Map; import java.util.Map;
@ -31,14 +32,17 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
private final Client client; private final Client client;
private final DataFrameTransformsConfigManager transformsConfigManager; private final DataFrameTransformsConfigManager transformsConfigManager;
private final DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService;
private final SchedulerEngine schedulerEngine; private final SchedulerEngine schedulerEngine;
private final ThreadPool threadPool; private final ThreadPool threadPool;
public DataFrameTransformPersistentTasksExecutor(Client client, DataFrameTransformsConfigManager transformsConfigManager, public DataFrameTransformPersistentTasksExecutor(Client client, DataFrameTransformsConfigManager transformsConfigManager,
SchedulerEngine schedulerEngine, ThreadPool threadPool) { DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService, SchedulerEngine schedulerEngine,
ThreadPool threadPool) {
super(DataFrameField.TASK_NAME, DataFrame.TASK_THREAD_POOL_NAME); super(DataFrameField.TASK_NAME, DataFrame.TASK_THREAD_POOL_NAME);
this.client = client; this.client = client;
this.transformsConfigManager = transformsConfigManager; this.transformsConfigManager = transformsConfigManager;
this.dataFrameTransformsCheckpointService = dataFrameTransformsCheckpointService;
this.schedulerEngine = schedulerEngine; this.schedulerEngine = schedulerEngine;
this.threadPool = threadPool; this.threadPool = threadPool;
} }
@ -67,6 +71,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId, protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform> persistentTask, Map<String, String> headers) { PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform> persistentTask, Map<String, String> headers) {
return new DataFrameTransformTask(id, type, action, parentTaskId, persistentTask.getParams(), return new DataFrameTransformTask(id, type, action, parentTaskId, persistentTask.getParams(),
(DataFrameTransformState) persistentTask.getState(), client, transformsConfigManager, schedulerEngine, threadPool, headers); (DataFrameTransformState) persistentTask.getState(), client, transformsConfigManager, dataFrameTransformsCheckpointService,
schedulerEngine, threadPool, headers);
} }
} }

View File

@ -24,16 +24,17 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import java.util.Map; import java.util.Map;
@ -58,7 +59,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
public DataFrameTransformTask(long id, String type, String action, TaskId parentTask, DataFrameTransform transform, public DataFrameTransformTask(long id, String type, String action, TaskId parentTask, DataFrameTransform transform,
DataFrameTransformState state, Client client, DataFrameTransformsConfigManager transformsConfigManager, DataFrameTransformState state, Client client, DataFrameTransformsConfigManager transformsConfigManager,
SchedulerEngine schedulerEngine, ThreadPool threadPool, Map<String, String> headers) { DataFrameTransformsCheckpointService transformsCheckpointService, SchedulerEngine schedulerEngine, ThreadPool threadPool,
Map<String, String> headers) {
super(id, type, action, DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + transform.getId(), parentTask, headers); super(id, type, action, DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + transform.getId(), parentTask, headers);
this.transform = transform; this.transform = transform;
this.schedulerEngine = schedulerEngine; this.schedulerEngine = schedulerEngine;
@ -83,8 +85,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
initialGeneration = state.getGeneration(); initialGeneration = state.getGeneration();
} }
this.indexer = new ClientDataFrameIndexer(transform.getId(), transformsConfigManager, new AtomicReference<>(initialState), this.indexer = new ClientDataFrameIndexer(transform.getId(), transformsConfigManager, transformsCheckpointService,
initialPosition, client); new AtomicReference<>(initialState), initialPosition, client);
this.generation = new AtomicReference<Long>(initialGeneration); this.generation = new AtomicReference<Long>(initialGeneration);
} }
@ -226,15 +228,18 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
private static final int LOAD_TRANSFORM_TIMEOUT_IN_SECONDS = 30; private static final int LOAD_TRANSFORM_TIMEOUT_IN_SECONDS = 30;
private final Client client; private final Client client;
private final DataFrameTransformsConfigManager transformsConfigManager; private final DataFrameTransformsConfigManager transformsConfigManager;
private final DataFrameTransformsCheckpointService transformsCheckpointService;
private final String transformId; private final String transformId;
private DataFrameTransformConfig transformConfig = null; private DataFrameTransformConfig transformConfig = null;
public ClientDataFrameIndexer(String transformId, DataFrameTransformsConfigManager transformsConfigManager, public ClientDataFrameIndexer(String transformId, DataFrameTransformsConfigManager transformsConfigManager,
AtomicReference<IndexerState> initialState, Map<String, Object> initialPosition, Client client) { DataFrameTransformsCheckpointService transformsCheckpointService, AtomicReference<IndexerState> initialState,
Map<String, Object> initialPosition, Client client) {
super(threadPool.executor(ThreadPool.Names.GENERIC), initialState, initialPosition); super(threadPool.executor(ThreadPool.Names.GENERIC), initialState, initialPosition);
this.transformId = transformId; this.transformId = transformId;
this.transformsConfigManager = transformsConfigManager; this.transformsConfigManager = transformsConfigManager;
this.transformsCheckpointService = transformsCheckpointService;
this.client = client; this.client = client;
} }

View File

@ -0,0 +1,207 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.checkpoint;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.cache.query.QueryCacheStats;
import org.elasticsearch.index.cache.request.RequestCacheStats;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.index.warmer.WarmerStats;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.test.ESTestCase;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import static org.hamcrest.Matchers.containsString;
public class DataFrameTransformsCheckpointServiceTests extends ESTestCase {
public void testExtractIndexCheckpoints() {
Map<String, long[]> expectedCheckpoints = new HashMap<>();
Set<String> indices = randomUserIndices();
ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, false, false);
Map<String, long[]> checkpoints = DataFrameTransformsCheckpointService.extractIndexCheckPoints(shardStatsArray, indices);
assertEquals(expectedCheckpoints.size(), checkpoints.size());
assertEquals(expectedCheckpoints.keySet(), checkpoints.keySet());
// low-level compare
for (Entry<String, long[]> entry : expectedCheckpoints.entrySet()) {
assertTrue(Arrays.equals(entry.getValue(), checkpoints.get(entry.getKey())));
}
}
public void testExtractIndexCheckpointsLostPrimaries() {
Map<String, long[]> expectedCheckpoints = new HashMap<>();
Set<String> indices = randomUserIndices();
ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, true, false);
Map<String, long[]> checkpoints = DataFrameTransformsCheckpointService.extractIndexCheckPoints(shardStatsArray, indices);
assertEquals(expectedCheckpoints.size(), checkpoints.size());
assertEquals(expectedCheckpoints.keySet(), checkpoints.keySet());
// low-level compare
for (Entry<String, long[]> entry : expectedCheckpoints.entrySet()) {
assertTrue(Arrays.equals(entry.getValue(), checkpoints.get(entry.getKey())));
}
}
public void testExtractIndexCheckpointsInconsistentGlobalCheckpoints() {
Map<String, long[]> expectedCheckpoints = new HashMap<>();
Set<String> indices = randomUserIndices();
ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, randomBoolean(), true);
// fail
CheckpointException e = expectThrows(CheckpointException.class,
() -> DataFrameTransformsCheckpointService.extractIndexCheckPoints(shardStatsArray, indices));
assertThat(e.getMessage(), containsString("Global checkpoints mismatch")); }
/**
* Create a random set of 3 index names
* @return set of indices a simulated user has access to
*/
private static Set<String> randomUserIndices() {
Set<String> indices = new HashSet<>();
// never create an empty set
if (randomBoolean()) {
indices.add("index-1");
} else {
indices.add("index-2");
}
if (randomBoolean()) {
indices.add("index-3");
}
return indices;
}
/**
* create a ShardStats for testing with random fuzzing
*
* @param expectedCheckpoints output parameter to return the checkpoints to expect
* @param userIndices set of indices that are visible
* @param skipPrimaries whether some shards do not have a primary shard at random
* @param inconsistentGlobalCheckpoints whether to introduce inconsistent global checkpoints
* @return array of ShardStats
*/
private static ShardStats[] createRandomShardStats(Map<String, long[]> expectedCheckpoints, Set<String> userIndices,
boolean skipPrimaries, boolean inconsistentGlobalCheckpoints) {
// always create the full list
List<Index> indices = new ArrayList<>();
indices.add(new Index("index-1", UUIDs.randomBase64UUID(random())));
indices.add(new Index("index-2", UUIDs.randomBase64UUID(random())));
indices.add(new Index("index-3", UUIDs.randomBase64UUID(random())));
List<ShardStats> shardStats = new ArrayList<>();
for (final Index index : indices) {
int numShards = randomIntBetween(1, 5);
List<Long> checkpoints = new ArrayList<>();
for (int shardIndex = 0; shardIndex < numShards; shardIndex++) {
// we need at least one replica for testing
int numShardCopies = randomIntBetween(2, 4);
int inconsistentReplica = -1;
if (inconsistentGlobalCheckpoints) {
inconsistentReplica = randomIntBetween(0, numShardCopies - 1);
}
// SeqNoStats asserts that checkpoints are logical
long localCheckpoint = randomLongBetween(0L, 100000000L);
long globalCheckpoint = randomBoolean() ? localCheckpoint : randomLongBetween(0L, 100000000L);
long maxSeqNo = Math.max(localCheckpoint, globalCheckpoint);
SeqNoStats seqNoStats = new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
checkpoints.add(globalCheckpoint);
for (int replica = 0; replica < numShardCopies; replica++) {
ShardId shardId = new ShardId(index, shardIndex);
boolean primary = (replica == 0);
if (skipPrimaries) {
primary = randomBoolean();
}
Path path = createTempDir().resolve("indices").resolve(index.getUUID()).resolve(String.valueOf(shardIndex));
ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, primary,
primary ? RecoverySource.EmptyStoreRecoverySource.INSTANCE : PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null)
);
shardRouting = shardRouting.initialize("node-0", null, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
shardRouting = shardRouting.moveToStarted();
CommonStats stats = new CommonStats();
stats.fieldData = new FieldDataStats();
stats.queryCache = new QueryCacheStats();
stats.docs = new DocsStats();
stats.store = new StoreStats();
stats.indexing = new IndexingStats();
stats.search = new SearchStats();
stats.segments = new SegmentsStats();
stats.merge = new MergeStats();
stats.refresh = new RefreshStats();
stats.completion = new CompletionStats();
stats.requestCache = new RequestCacheStats();
stats.get = new GetStats();
stats.flush = new FlushStats();
stats.warmer = new WarmerStats();
if (inconsistentReplica == replica) {
// overwrite
seqNoStats = new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint + randomLongBetween(10L, 100L));
}
shardStats.add(new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, seqNoStats, null));
}
}
if (userIndices.contains(index.getName())) {
expectedCheckpoints.put(index.getName(), checkpoints.stream().mapToLong(l -> l).toArray());
}
}
// shuffle the shard stats
Collections.shuffle(shardStats, random());
ShardStats[] shardStatsArray = shardStats.toArray(new ShardStats[0]);
return shardStatsArray;
}
}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.core.template.TemplateUtils; import org.elasticsearch.xpack.core.template.TemplateUtils;
@ -43,7 +44,7 @@ public abstract class DataFrameSingleNodeTestCase extends ESSingleNodeTestCase {
@Override @Override
protected Collection<Class<? extends Plugin>> getPlugins() { protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(LocalStateDataFrame.class); return pluginList(LocalStateDataFrame.class, ReindexPlugin.class);
} }
protected <T> void assertAsync(Consumer<ActionListener<T>> function, T expected, CheckedConsumer<T, ? extends Exception> onAnswer, protected <T> void assertAsync(Consumer<ActionListener<T>> function, T expected, CheckedConsumer<T, ? extends Exception> onAnswer,

View File

@ -9,6 +9,8 @@ package org.elasticsearch.xpack.dataframe.persistence;
import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointTests;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
import org.junit.Before; import org.junit.Before;
@ -48,7 +50,7 @@ public class DataFrameTransformsConfigManagerTests extends DataFrameSingleNodeTe
public void testDeleteMissingTransform() throws InterruptedException { public void testDeleteMissingTransform() throws InterruptedException {
// the index does not exist yet // the index does not exist yet
assertAsync(listener -> transformsConfigManager.deleteTransformConfiguration("not_there", listener), (Boolean) null, null, e -> { assertAsync(listener -> transformsConfigManager.deleteTransform("not_there", listener), (Boolean) null, null, e -> {
assertEquals(ResourceNotFoundException.class, e.getClass()); assertEquals(ResourceNotFoundException.class, e.getClass());
assertEquals(DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, "not_there"), e.getMessage()); assertEquals(DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, "not_there"), e.getMessage());
}); });
@ -60,13 +62,13 @@ public class DataFrameTransformsConfigManagerTests extends DataFrameSingleNodeTe
true, null, null); true, null, null);
// same test, but different code path // same test, but different code path
assertAsync(listener -> transformsConfigManager.deleteTransformConfiguration("not_there", listener), (Boolean) null, null, e -> { assertAsync(listener -> transformsConfigManager.deleteTransform("not_there", listener), (Boolean) null, null, e -> {
assertEquals(ResourceNotFoundException.class, e.getClass()); assertEquals(ResourceNotFoundException.class, e.getClass());
assertEquals(DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, "not_there"), e.getMessage()); assertEquals(DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, "not_there"), e.getMessage());
}); });
} }
public void testCreateReadDelete() throws InterruptedException { public void testCreateReadDeleteTransform() throws InterruptedException {
DataFrameTransformConfig transformConfig = DataFrameTransformConfigTests.randomDataFrameTransformConfig(); DataFrameTransformConfig transformConfig = DataFrameTransformConfigTests.randomDataFrameTransformConfig();
// create transform // create transform
@ -84,15 +86,14 @@ public class DataFrameTransformsConfigManagerTests extends DataFrameSingleNodeTe
}); });
// delete transform // delete transform
assertAsync(listener -> transformsConfigManager.deleteTransformConfiguration(transformConfig.getId(), listener), true, null, null); assertAsync(listener -> transformsConfigManager.deleteTransform(transformConfig.getId(), listener), true, null, null);
// delete again // delete again
assertAsync(listener -> transformsConfigManager.deleteTransformConfiguration(transformConfig.getId(), listener), (Boolean) null, assertAsync(listener -> transformsConfigManager.deleteTransform(transformConfig.getId(), listener), (Boolean) null, null, e -> {
null, e -> { assertEquals(ResourceNotFoundException.class, e.getClass());
assertEquals(ResourceNotFoundException.class, e.getClass()); assertEquals(DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformConfig.getId()),
assertEquals(DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformConfig.getId()), e.getMessage());
e.getMessage()); });
});
// try to get deleted transform // try to get deleted transform
assertAsync(listener -> transformsConfigManager.getTransformConfiguration(transformConfig.getId(), listener), assertAsync(listener -> transformsConfigManager.getTransformConfiguration(transformConfig.getId(), listener),
@ -102,4 +103,29 @@ public class DataFrameTransformsConfigManagerTests extends DataFrameSingleNodeTe
e.getMessage()); e.getMessage());
}); });
} }
public void testCreateReadDeleteCheckPoint() throws InterruptedException {
DataFrameTransformCheckpoint checkpoint = DataFrameTransformCheckpointTests.randomDataFrameTransformCheckpoints();
// create
assertAsync(listener -> transformsConfigManager.putTransformCheckpoint(checkpoint, listener), true, null, null);
// read
assertAsync(listener -> transformsConfigManager.getTransformCheckpoint(checkpoint.getTransformId(), checkpoint.getCheckpoint(),
listener), checkpoint, null, null);
// delete
assertAsync(listener -> transformsConfigManager.deleteTransform(checkpoint.getTransformId(), listener), true, null, null);
// delete again
assertAsync(listener -> transformsConfigManager.deleteTransform(checkpoint.getTransformId(), listener), (Boolean) null, null, e -> {
assertEquals(ResourceNotFoundException.class, e.getClass());
assertEquals(DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, checkpoint.getTransformId()),
e.getMessage());
});
// getting a non-existing checkpoint returns null
assertAsync(listener -> transformsConfigManager.getTransformCheckpoint(checkpoint.getTransformId(), checkpoint.getCheckpoint(),
listener), DataFrameTransformCheckpoint.EMPTY, null, null);
}
} }