diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java index 728505f3de0..ba4143dd1ce 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java @@ -53,6 +53,9 @@ public class DataFrameMessages { public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_AGGREGATION = "Failed to parse aggregation for data frame pivot transform"; + public static final String FAILED_TO_PARSE_TRANSFORM_CHECKPOINTS = + "Failed to parse transform checkpoints for [{0}]"; + private DataFrameMessages() { } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpoint.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpoint.java new file mode 100644 index 00000000000..f14415489f1 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpoint.java @@ -0,0 +1,264 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.dataframe.transforms; + +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.ParsingException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.dataframe.DataFrameField; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.TreeMap; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; + +/** + * Checkpoint document to store the checkpoint of a data frame transform + * + * The fields: + * + * timestamp the timestamp when this document has been created + * checkpoint the checkpoint number, incremented for every checkpoint + * indices a map of the indices from the source including all checkpoints of all indices matching the source pattern, shard level + * time_upper_bound for time-based indices this holds the upper time boundary of this checkpoint + * + */ +public class DataFrameTransformCheckpoint implements Writeable, ToXContentObject { + + public static DataFrameTransformCheckpoint EMPTY = new DataFrameTransformCheckpoint("empty", 0L, -1L, Collections.emptyMap(), 0L); + + // the timestamp of the checkpoint, mandatory + public static final ParseField TIMESTAMP_MILLIS = new ParseField("timestamp_millis"); + public static final ParseField TIMESTAMP = new ParseField("timestamp"); + + // the own checkpoint + public static final ParseField CHECKPOINT = new ParseField("checkpoint"); + + // checkpoint of the indexes (sequence id's) + public static final ParseField INDICES = new ParseField("indices"); + + // checkpoint for for time based sync + // TODO: consider a lower bound for usecases where you want to transform on a window of a stream + public static final ParseField TIME_UPPER_BOUND_MILLIS = new ParseField("time_upper_bound_millis"); + public static final ParseField TIME_UPPER_BOUND = new ParseField("time_upper_bound"); + + private static final String NAME = "data_frame_transform_checkpoint"; + + private static final ConstructingObjectParser STRICT_PARSER = createParser(false); + private static final ConstructingObjectParser LENIENT_PARSER = createParser(true); + + private final String transformId; + private final long timestampMillis; + private final long checkpoint; + private final Map indicesCheckpoints; + private final long timeUpperBoundMillis; + + private static ConstructingObjectParser createParser(boolean lenient) { + ConstructingObjectParser parser = new ConstructingObjectParser<>(NAME, + lenient, args -> { + String id = (String) args[0]; + Long timestamp = (Long) args[1]; + Long checkpoint = (Long) args[2]; + + @SuppressWarnings("unchecked") + Map checkpoints = (Map) args[3]; + + Long timestamp_checkpoint = (Long) args[4]; + + // ignored, only for internal storage: String docType = (String) args[5]; + return new DataFrameTransformCheckpoint(id, timestamp, checkpoint, checkpoints, timestamp_checkpoint); + }); + + parser.declareString(constructorArg(), DataFrameField.ID); + + // note: this is never parsed from the outside where timestamp can be formatted as date time + parser.declareLong(constructorArg(), TIMESTAMP_MILLIS); + parser.declareLong(constructorArg(), CHECKPOINT); + + parser.declareObject(constructorArg(), (p,c) -> { + Map checkPointsByIndexName = new TreeMap<>(); + XContentParser.Token token = null; + while ((token = p.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token != XContentParser.Token.FIELD_NAME) { + throw new ParsingException(p.getTokenLocation(), "Unexpected token " + token + " "); + } + + final String indexName = p.currentName(); + token = p.nextToken(); + if (token != XContentParser.Token.START_ARRAY) { + throw new ParsingException(p.getTokenLocation(), "Unexpected token " + token + " "); + } + + long[] checkpoints = p.listOrderedMap().stream().mapToLong(num -> ((Number) num).longValue()).toArray(); + checkPointsByIndexName.put(indexName, checkpoints); + } + return checkPointsByIndexName; + }, INDICES); + parser.declareLong(optionalConstructorArg(), TIME_UPPER_BOUND_MILLIS); + parser.declareString(optionalConstructorArg(), DataFrameField.INDEX_DOC_TYPE); + + return parser; + } + + public DataFrameTransformCheckpoint(String transformId, Long timestamp, Long checkpoint, Map checkpoints, + Long timeUpperBound) { + this.transformId = transformId; + this.timestampMillis = timestamp.longValue(); + this.checkpoint = checkpoint; + this.indicesCheckpoints = Collections.unmodifiableMap(checkpoints); + this.timeUpperBoundMillis = timeUpperBound == null ? 0 : timeUpperBound.longValue(); + } + + public DataFrameTransformCheckpoint(StreamInput in) throws IOException { + this.transformId = in.readString(); + this.timestampMillis = in.readLong(); + this.checkpoint = in.readLong(); + this.indicesCheckpoints = readCheckpoints(in.readMap()); + this.timeUpperBoundMillis = in.readLong(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + + // the id, doc_type and checkpoint is only internally used for storage, the user-facing version gets embedded + if (params.paramAsBoolean(DataFrameField.FOR_INTERNAL_STORAGE, false)) { + builder.field(DataFrameField.ID.getPreferredName(), transformId); + builder.field(CHECKPOINT.getPreferredName(), checkpoint); + builder.field(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), NAME); + } + + builder.timeField(TIMESTAMP_MILLIS.getPreferredName(), TIMESTAMP.getPreferredName(), timestampMillis); + + if (timeUpperBoundMillis > 0) { + builder.timeField(TIME_UPPER_BOUND_MILLIS.getPreferredName(), TIME_UPPER_BOUND.getPreferredName(), timeUpperBoundMillis); + } + + builder.startObject(INDICES.getPreferredName()); + for (Entry entry : indicesCheckpoints.entrySet()) { + builder.array(entry.getKey(), entry.getValue()); + } + builder.endObject(); + + builder.endObject(); + return builder; + } + + public String getTransformId() { + return transformId; + } + + public long getTimestamp() { + return timestampMillis; + } + + public long getCheckpoint() { + return checkpoint; + } + + public Map getIndicesCheckpoints() { + return indicesCheckpoints; + } + + public long getTimeUpperBound() { + return timeUpperBoundMillis; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(transformId); + out.writeLong(timestampMillis); + out.writeLong(checkpoint); + out.writeGenericValue(indicesCheckpoints); + out.writeLong(timeUpperBoundMillis); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + final DataFrameTransformCheckpoint that = (DataFrameTransformCheckpoint) other; + + // compare the timestamp, id, checkpoint and than call matches for the rest + return this.timestampMillis == that.timestampMillis && this.checkpoint == that.checkpoint + && this.timeUpperBoundMillis == that.timeUpperBoundMillis && matches(that); + } + + /** + * Compares 2 checkpoints ignoring some inner fields. + * + * This is for comparing 2 checkpoints to check whether the data frame transform requires an update + * + * @param that other checkpoint + * @return true if checkpoints match + */ + public boolean matches (DataFrameTransformCheckpoint that) { + if (this == that) { + return true; + } + + return Objects.equals(this.transformId, that.transformId) + && this.indicesCheckpoints.size() == that.indicesCheckpoints.size() // quick check + // do the expensive deep equal operation last + && this.indicesCheckpoints.entrySet().stream() + .allMatch(e -> Arrays.equals(e.getValue(), that.indicesCheckpoints.get(e.getKey()))); + } + + @Override + public int hashCode() { + int hash = Objects.hash(transformId, timestampMillis, checkpoint, timeUpperBoundMillis); + + for (Entry e : indicesCheckpoints.entrySet()) { + hash = 31 * hash + Objects.hash(e.getKey(), Arrays.hashCode(e.getValue())); + } + return hash; + } + + public static DataFrameTransformCheckpoint fromXContent(final XContentParser parser, boolean lenient) throws IOException { + return lenient ? LENIENT_PARSER.apply(parser, null) : STRICT_PARSER.apply(parser, null); + } + + public static String documentId(String transformId, long checkpoint) { + if (checkpoint < 0) { + throw new IllegalArgumentException("checkpoint must be a positive number"); + } + + return NAME + "-" + transformId + "-" + checkpoint; + } + + private static Map readCheckpoints(Map readMap) { + Map checkpoints = new TreeMap<>(); + for (Map.Entry e : readMap.entrySet()) { + if (e.getValue() instanceof long[]) { + checkpoints.put(e.getKey(), (long[]) e.getValue()); + } else { + throw new ElasticsearchParseException("expecting the checkpoints for [{}] to be a long[], but found [{}] instead", + e.getKey(), e.getValue().getClass()); + } + } + return checkpoints; + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/AbstractSerializingDataFrameTestCase.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/AbstractSerializingDataFrameTestCase.java index 32449417ae2..2b64fadac05 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/AbstractSerializingDataFrameTestCase.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/AbstractSerializingDataFrameTestCase.java @@ -12,13 +12,16 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContent.Params; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.BaseAggregationBuilder; import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.junit.Before; +import java.util.Collections; import java.util.List; import static java.util.Collections.emptyList; @@ -26,6 +29,9 @@ import static java.util.Collections.emptyList; public abstract class AbstractSerializingDataFrameTestCase extends AbstractSerializingTestCase { + protected static Params TO_XCONTENT_PARAMS = new ToXContent.MapParams( + Collections.singletonMap(DataFrameField.FOR_INTERNAL_STORAGE, "true")); + /** * Test case that ensures aggregation named objects are registered */ diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointTests.java new file mode 100644 index 00000000000..964d08fad20 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointTests.java @@ -0,0 +1,133 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.dataframe.transforms; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import static org.elasticsearch.test.TestMatchers.matchesPattern; + +public class DataFrameTransformCheckpointTests extends AbstractSerializingDataFrameTestCase { + + public static DataFrameTransformCheckpoint randomDataFrameTransformCheckpoints() { + return new DataFrameTransformCheckpoint(randomAlphaOfLengthBetween(1, 10), randomNonNegativeLong(), randomNonNegativeLong(), + randomCheckpointsByIndex(), randomNonNegativeLong()); + } + + @Override + protected DataFrameTransformCheckpoint doParseInstance(XContentParser parser) throws IOException { + return DataFrameTransformCheckpoint.fromXContent(parser, false); + } + + @Override + protected DataFrameTransformCheckpoint createTestInstance() { + return randomDataFrameTransformCheckpoints(); + } + + @Override + protected Reader instanceReader() { + return DataFrameTransformCheckpoint::new; + } + + @Override + protected ToXContent.Params getToXContentParams() { + return TO_XCONTENT_PARAMS; + } + + public void testXContentForInternalStorage() throws IOException { + DataFrameTransformCheckpoint dataFrameTransformCheckpoints = randomDataFrameTransformCheckpoints(); + + try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) { + XContentBuilder content = dataFrameTransformCheckpoints.toXContent(xContentBuilder, getToXContentParams()); + String doc = Strings.toString(content); + + assertThat(doc, matchesPattern(".*\"doc_type\"\\s*:\\s*\"data_frame_transform_checkpoint\".*")); + } + + try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) { + XContentBuilder content = dataFrameTransformCheckpoints.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); + String doc = Strings.toString(content); + + assertFalse(doc.contains("doc_type")); + } + } + + public void testXContentForApiUsage() throws IOException { + DataFrameTransformCheckpoint dataFrameTransformCheckpoints = new DataFrameTransformCheckpoint(randomAlphaOfLengthBetween(1, 10), + 1546300800000L, randomNonNegativeLong(), Collections.emptyMap(), 1545609600000L); + + try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) { + xContentBuilder.humanReadable(true); + XContentBuilder content = dataFrameTransformCheckpoints.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); + String doc = Strings.toString(content); + assertThat(doc, matchesPattern(".*\"timestamp_millis\"\\s*:\\s*1546300800000.*")); + assertThat(doc, matchesPattern(".*\"time_upper_bound_millis\"\\s*:\\s*1545609600000.*")); + assertThat(doc, matchesPattern(".*\"timestamp\"\\s*:\\s*\"2019-01-01T00:00:00.000Z\".*")); + assertThat(doc, matchesPattern(".*\"time_upper_bound\"\\s*:\\s*\"2018-12-24T00:00:00.000Z\".*")); + } + } + + public void testMatches() throws IOException { + String id = randomAlphaOfLengthBetween(1, 10); + long timestamp = randomNonNegativeLong(); + long checkpoint = randomNonNegativeLong(); + Map checkpointsByIndex = randomCheckpointsByIndex(); + Map otherCheckpointsByIndex = new TreeMap<>(checkpointsByIndex); + otherCheckpointsByIndex.put(randomAlphaOfLengthBetween(1, 10), new long[] { 1, 2, 3 }); + long timeUpperBound = randomNonNegativeLong(); + + DataFrameTransformCheckpoint dataFrameTransformCheckpoints = new DataFrameTransformCheckpoint(id, timestamp, checkpoint, + checkpointsByIndex, timeUpperBound); + + // same + assertTrue(dataFrameTransformCheckpoints.matches(dataFrameTransformCheckpoints)); + DataFrameTransformCheckpoint dataFrameTransformCheckpointsCopy = copyInstance(dataFrameTransformCheckpoints); + + // with copy + assertTrue(dataFrameTransformCheckpoints.matches(dataFrameTransformCheckpointsCopy)); + assertTrue(dataFrameTransformCheckpointsCopy.matches(dataFrameTransformCheckpoints)); + + // other id + assertFalse(dataFrameTransformCheckpoints + .matches(new DataFrameTransformCheckpoint(id + "-1", timestamp, checkpoint, checkpointsByIndex, timeUpperBound))); + // other timestamp + assertTrue(dataFrameTransformCheckpoints + .matches(new DataFrameTransformCheckpoint(id, (timestamp / 2) + 1, checkpoint, checkpointsByIndex, timeUpperBound))); + // other checkpoint + assertTrue(dataFrameTransformCheckpoints + .matches(new DataFrameTransformCheckpoint(id, timestamp, (checkpoint / 2) + 1, checkpointsByIndex, timeUpperBound))); + // other index checkpoints + assertFalse(dataFrameTransformCheckpoints + .matches(new DataFrameTransformCheckpoint(id, timestamp, checkpoint, otherCheckpointsByIndex, timeUpperBound))); + // other time upper bound + assertTrue(dataFrameTransformCheckpoints + .matches(new DataFrameTransformCheckpoint(id, timestamp, checkpoint, checkpointsByIndex, (timeUpperBound / 2) + 1))); + } + + private static Map randomCheckpointsByIndex() { + Map checkpointsByIndex = new TreeMap<>(); + for (int i = 0; i < randomIntBetween(1, 10); ++i) { + List checkpoints = new ArrayList<>(); + for (int j = 0; j < randomIntBetween(1, 20); ++j) { + checkpoints.add(randomNonNegativeLong()); + } + checkpointsByIndex.put(randomAlphaOfLengthBetween(1, 10), checkpoints.stream().mapToLong(l -> l).toArray()); + } + return checkpointsByIndex; + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java index 76db5a1266d..94530890ed0 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java @@ -10,17 +10,14 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.ToXContent.Params; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfigTests; import org.junit.Before; import java.io.IOException; -import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -28,9 +25,6 @@ import static org.elasticsearch.test.TestMatchers.matchesPattern; public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameTestCase { - private static Params TO_XCONTENT_PARAMS = new ToXContent.MapParams( - Collections.singletonMap(DataFrameField.FOR_INTERNAL_STORAGE, "true")); - private String transformId; private boolean runWithHeaders; diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameConfigurationIndexIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameConfigurationIndexIT.java index c901fd7ddc6..681599331c8 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameConfigurationIndexIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameConfigurationIndexIT.java @@ -15,8 +15,8 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.xpack.core.dataframe.DataFrameField; -import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex; import java.io.IOException; @@ -47,6 +47,9 @@ public class DataFrameConfigurationIndexIT extends DataFrameRestTestCase { client().performRequest(req); } + // refresh the index + assertOK(client().performRequest(new Request("POST", DataFrameInternalIndex.INDEX_NAME + "/_refresh"))); + Request deleteRequest = new Request("DELETE", DATAFRAME_ENDPOINT + fakeTransformName); Response deleteResponse = client().performRequest(deleteRequest); assertOK(deleteResponse); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java index ced9ab8512a..fdd14d4cff8 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java @@ -57,6 +57,7 @@ import org.elasticsearch.xpack.dataframe.action.TransportPutDataFrameTransformAc import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransformTaskAction; import org.elasticsearch.xpack.dataframe.action.TransportStopDataFrameTransformAction; +import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService; import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.rest.action.RestDeleteDataFrameTransformAction; @@ -98,6 +99,7 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu private final Settings settings; private final boolean transportClientMode; private final SetOnce dataFrameTransformsConfigManager = new SetOnce<>(); + private final SetOnce dataFrameTransformsCheckpointService = new SetOnce<>(); private final SetOnce schedulerEngine = new SetOnce<>(); public DataFrame(Settings settings) { @@ -180,8 +182,9 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu } dataFrameTransformsConfigManager.set(new DataFrameTransformsConfigManager(client, xContentRegistry)); + dataFrameTransformsCheckpointService.set(new DataFrameTransformsCheckpointService(client)); - return Collections.singletonList(dataFrameTransformsConfigManager.get()); + return Arrays.asList(dataFrameTransformsConfigManager.get(), dataFrameTransformsCheckpointService.get()); } @Override @@ -207,10 +210,12 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu // the transforms config manager should have been created assert dataFrameTransformsConfigManager.get() != null; + assert dataFrameTransformsCheckpointService.get() != null; + return Collections.singletonList(new DataFrameTransformPersistentTasksExecutor(client, dataFrameTransformsConfigManager.get(), - schedulerEngine.get(), threadPool)); + dataFrameTransformsCheckpointService.get(), schedulerEngine.get(), threadPool)); } - + @Override public void close() { if (schedulerEngine.get() != null) { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java index f9ae96b4544..131ad690d2b 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java @@ -59,7 +59,7 @@ public class TransportDeleteDataFrameTransformAction extends TransportTasksActio IndexerState state = task.getState().getIndexerState(); if (state.equals(IndexerState.STOPPED)) { task.onCancelled(); - transformsConfigManager.deleteTransformConfiguration(request.getId(), ActionListener.wrap(r -> { + transformsConfigManager.deleteTransform(request.getId(), ActionListener.wrap(r -> { listener.onResponse(new Response(true)); }, listener::onFailure)); } else { @@ -80,7 +80,7 @@ public class TransportDeleteDataFrameTransformAction extends TransportTasksActio // we couldn't find the transform in the persistent task CS, but maybe the transform exists in the configuration index, // if so delete the orphaned document and do not throw (for the normal case we want to stop the task first, // than delete the configuration document if and only if the data frame transform is in stopped state) - transformsConfigManager.deleteTransformConfiguration(request.getId(), ActionListener.wrap(r -> { + transformsConfigManager.deleteTransform(request.getId(), ActionListener.wrap(r -> { listener.onResponse(new Response(true)); return; }, listener::onFailure)); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java index b696243cc5d..63fd878652f 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java @@ -42,6 +42,7 @@ import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction.Request; import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction.Response; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest; @@ -51,7 +52,6 @@ import org.elasticsearch.xpack.core.security.authz.permission.ResourcePrivileges import org.elasticsearch.xpack.core.security.support.Exceptions; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.persistence.DataframeIndex; -import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; import java.io.IOException; diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/CheckpointException.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/CheckpointException.java new file mode 100644 index 00000000000..0a0a50761f0 --- /dev/null +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/CheckpointException.java @@ -0,0 +1,26 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.dataframe.checkpoint; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; + +public class CheckpointException extends ElasticsearchException { + public CheckpointException(String msg, Object... params) { + super(msg, null, params); + } + + public CheckpointException(String msg, Throwable cause, Object... params) { + super(msg, cause, params); + } + + public CheckpointException(StreamInput in) throws IOException { + super(in); + } +} diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java new file mode 100644 index 00000000000..00cc2d2f522 --- /dev/null +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java @@ -0,0 +1,136 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.dataframe.checkpoint; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.get.GetIndexAction; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.client.Client; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +/** + * DataFrameTransform Checkpoint Service + * + * Allows checkpointing a source of a data frame transform which includes all relevant checkpoints of the source. + * + * This will be used to checkpoint a transform, detect changes, run the transform in continuous mode. + * + */ +public class DataFrameTransformsCheckpointService { + + private static final Logger logger = LogManager.getLogger(DataFrameTransformsCheckpointService.class); + + private final Client client; + + public DataFrameTransformsCheckpointService(final Client client) { + this.client = client; + } + + /** + * Get an unnumbered checkpoint. These checkpoints are for persistence but comparing state. + * + * @param transformConfig the @link{DataFrameTransformConfig} + * @param listener listener to call after inner request returned + */ + public void getCheckpoint(DataFrameTransformConfig transformConfig, ActionListener listener) { + getCheckpoint(transformConfig, -1L, listener); + } + + /** + * Get a checkpoint, used to store a checkpoint. + * + * @param transformConfig the @link{DataFrameTransformConfig} + * @param checkpoint the number of the checkpoint + * @param listener listener to call after inner request returned + */ + public void getCheckpoint(DataFrameTransformConfig transformConfig, long checkpoint, + ActionListener listener) { + long timestamp = System.currentTimeMillis(); + + // placeholder for time based synchronization + long timeUpperBound = 0; + + // 1st get index to see the indexes the user has access to + GetIndexRequest getIndexRequest = new GetIndexRequest().indices(transformConfig.getSource()); + + ClientHelper.executeWithHeadersAsync(transformConfig.getHeaders(), ClientHelper.DATA_FRAME_ORIGIN, client, GetIndexAction.INSTANCE, + getIndexRequest, ActionListener.wrap(getIndexResponse -> { + Set userIndices = new HashSet<>(Arrays.asList(getIndexResponse.getIndices())); + + // 2nd get stats request + ClientHelper.executeAsyncWithOrigin(client, ClientHelper.DATA_FRAME_ORIGIN, IndicesStatsAction.INSTANCE, + new IndicesStatsRequest().indices(transformConfig.getSource()), ActionListener.wrap(response -> { + if (response.getFailedShards() != 0) { + throw new CheckpointException("Source has [" + response.getFailedShards() + "] failed shards"); + } + + Map checkpointsByIndex = extractIndexCheckPoints(response.getShards(), userIndices); + DataFrameTransformCheckpoint checkpointDoc = new DataFrameTransformCheckpoint(transformConfig.getId(), + timestamp, checkpoint, checkpointsByIndex, timeUpperBound); + listener.onResponse(checkpointDoc); + + }, IndicesStatsRequestException -> { + throw new CheckpointException("Failed to retrieve indices stats", IndicesStatsRequestException); + })); + + }, getIndexException -> { + throw new CheckpointException("Failed to retrieve list of indices", getIndexException); + })); + + } + + static Map extractIndexCheckPoints(ShardStats[] shards, Set userIndices) { + Map> checkpointsByIndex = new TreeMap<>(); + + for (ShardStats shard : shards) { + String indexName = shard.getShardRouting().getIndexName(); + if (userIndices.contains(indexName)) { + if (checkpointsByIndex.containsKey(indexName)) { + // we have already seen this index, just check/add shards + TreeMap checkpoints = checkpointsByIndex.get(indexName); + if (checkpoints.containsKey(shard.getShardRouting().getId())) { + // there is already a checkpoint entry for this index/shard combination, check if they match + if (checkpoints.get(shard.getShardRouting().getId()) != shard.getSeqNoStats().getGlobalCheckpoint()) { + throw new CheckpointException("Global checkpoints mismatch for index [" + indexName + "] between shards of id [" + + shard.getShardRouting().getId() + "]"); + } + } else { + // 1st time we see this shard for this index, add the entry for the shard + checkpoints.put(shard.getShardRouting().getId(), shard.getSeqNoStats().getGlobalCheckpoint()); + } + } else { + // 1st time we see this index, create an entry for the index and add the shard checkpoint + checkpointsByIndex.put(indexName, new TreeMap<>()); + checkpointsByIndex.get(indexName).put(shard.getShardRouting().getId(), shard.getSeqNoStats().getGlobalCheckpoint()); + } + } + } + + // create the final structure + Map checkpointsByIndexReduced = new TreeMap<>(); + + checkpointsByIndex.forEach((indexName, checkpoints) -> { + checkpointsByIndexReduced.put(indexName, checkpoints.values().stream().mapToLong(l -> l).toArray()); + }); + + return checkpointsByIndexReduced; + } + +} diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java index 445a7913e99..1392e95e79a 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java @@ -13,9 +13,6 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.action.delete.DeleteAction; -import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetAction; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexAction; @@ -32,8 +29,13 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.reindex.DeleteByQueryAction; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import java.io.IOException; @@ -58,6 +60,37 @@ public class DataFrameTransformsConfigManager { this.xContentRegistry = xContentRegistry; } + /** + * Persist a checkpoint in the internal index + * + * @param checkpoint the @link{DataFrameTransformCheckpoint} + * @param listener listener to call after request has been made + */ + public void putTransformCheckpoint(DataFrameTransformCheckpoint checkpoint, ActionListener listener) { + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + XContentBuilder source = checkpoint.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); + + IndexRequest indexRequest = new IndexRequest(DataFrameInternalIndex.INDEX_NAME) + .opType(DocWriteRequest.OpType.INDEX) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .id(DataFrameTransformCheckpoint.documentId(checkpoint.getTransformId(), checkpoint.getCheckpoint())) + .source(source); + + executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap(r -> { + listener.onResponse(true); + }, listener::onFailure)); + } catch (IOException e) { + // not expected to happen but for the sake of completeness + listener.onFailure(e); + } + } + + /** + * Store the transform configuration in the internal index + * + * @param transformConfig the @link{DataFrameTransformConfig} + * @param listener listener to call after request + */ public void putTransformConfiguration(DataFrameTransformConfig transformConfig, ActionListener listener) { try (XContentBuilder builder = XContentFactory.jsonBuilder()) { XContentBuilder source = transformConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); @@ -89,8 +122,35 @@ public class DataFrameTransformsConfigManager { } } - // For transforms returned via GET data_frame/transforms, see the TransportGetDataFrameTransformsAction - // This function is only for internal use. + /** + * Get a stored checkpoint, requires the transform id as well as the checkpoint id + * + * @param transformId the transform id + * @param checkpoint the checkpoint + * @param resultListener listener to call after request has been made + */ + public void getTransformCheckpoint(String transformId, long checkpoint, ActionListener resultListener) { + GetRequest getRequest = new GetRequest(DataFrameInternalIndex.INDEX_NAME, + DataFrameTransformCheckpoint.documentId(transformId, checkpoint)); + executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> { + + if (getResponse.isExists() == false) { + // do not fail if checkpoint does not exist but return an empty checkpoint + resultListener.onResponse(DataFrameTransformCheckpoint.EMPTY); + return; + } + BytesReference source = getResponse.getSourceAsBytesRef(); + parseCheckpointsLenientlyFromSource(source, transformId, resultListener); + }, resultListener::onFailure)); + } + + /** + * Get the transform configuration for a given transform id. This function is only for internal use. For transforms returned via GET + * data_frame/transforms, see the TransportGetDataFrameTransformsAction + * + * @param transformId the transform id + * @param resultListener listener to call after inner request has returned + */ public void getTransformConfiguration(String transformId, ActionListener resultListener) { GetRequest getRequest = new GetRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameTransformConfig.documentId(transformId)); executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> { @@ -112,13 +172,25 @@ public class DataFrameTransformsConfigManager { })); } - public void deleteTransformConfiguration(String transformId, ActionListener listener) { - DeleteRequest request = new DeleteRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameTransformConfig.documentId(transformId)); - request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + /** + * This deletes the configuration and all other documents corresponding to the transform id (e.g. checkpoints). + * + * @param transformId the transform id + * @param listener listener to call after inner request returned + */ + public void deleteTransform(String transformId, ActionListener listener) { + DeleteByQueryRequest request = new DeleteByQueryRequest() + .setAbortOnVersionConflict(false) //since these documents are not updated, a conflict just means it was deleted previously + .setSlices(5); - executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, DeleteAction.INSTANCE, request, ActionListener.wrap(deleteResponse -> { + request.indices(DataFrameInternalIndex.INDEX_NAME); + QueryBuilder query = QueryBuilders.termQuery(DataFrameField.ID.getPreferredName(), transformId); + request.setQuery(query); + request.setRefresh(true); - if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { + executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(deleteResponse -> { + + if (deleteResponse.getDeleted() == 0) { listener.onFailure(new ResourceNotFoundException( DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformId))); return; @@ -145,4 +217,16 @@ public class DataFrameTransformsConfigManager { transformListener.onFailure(e); } } + + private void parseCheckpointsLenientlyFromSource(BytesReference source, String transformId, + ActionListener transformListener) { + try (InputStream stream = source.streamInput(); + XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) { + transformListener.onResponse(DataFrameTransformCheckpoint.fromXContent(parser, true)); + } catch (Exception e) { + logger.error(DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_PARSE_TRANSFORM_CHECKPOINTS, transformId), e); + transformListener.onFailure(e); + } + } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index fb2557edce9..370091ca6f3 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -21,6 +21,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.dataframe.DataFrame; +import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import java.util.Map; @@ -31,14 +32,17 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx private final Client client; private final DataFrameTransformsConfigManager transformsConfigManager; + private final DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService; private final SchedulerEngine schedulerEngine; private final ThreadPool threadPool; public DataFrameTransformPersistentTasksExecutor(Client client, DataFrameTransformsConfigManager transformsConfigManager, - SchedulerEngine schedulerEngine, ThreadPool threadPool) { + DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService, SchedulerEngine schedulerEngine, + ThreadPool threadPool) { super(DataFrameField.TASK_NAME, DataFrame.TASK_THREAD_POOL_NAME); this.client = client; this.transformsConfigManager = transformsConfigManager; + this.dataFrameTransformsCheckpointService = dataFrameTransformsCheckpointService; this.schedulerEngine = schedulerEngine; this.threadPool = threadPool; } @@ -67,6 +71,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId, PersistentTasksCustomMetaData.PersistentTask persistentTask, Map headers) { return new DataFrameTransformTask(id, type, action, parentTaskId, persistentTask.getParams(), - (DataFrameTransformState) persistentTask.getState(), client, transformsConfigManager, schedulerEngine, threadPool, headers); + (DataFrameTransformState) persistentTask.getState(), client, transformsConfigManager, dataFrameTransformsCheckpointService, + schedulerEngine, threadPool, headers); } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index e4446c65abe..a8d94f0ae70 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -24,16 +24,17 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; -import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; +import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; +import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response; +import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event; -import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; -import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response; -import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; +import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import java.util.Map; @@ -58,7 +59,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S public DataFrameTransformTask(long id, String type, String action, TaskId parentTask, DataFrameTransform transform, DataFrameTransformState state, Client client, DataFrameTransformsConfigManager transformsConfigManager, - SchedulerEngine schedulerEngine, ThreadPool threadPool, Map headers) { + DataFrameTransformsCheckpointService transformsCheckpointService, SchedulerEngine schedulerEngine, ThreadPool threadPool, + Map headers) { super(id, type, action, DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + transform.getId(), parentTask, headers); this.transform = transform; this.schedulerEngine = schedulerEngine; @@ -83,8 +85,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S initialGeneration = state.getGeneration(); } - this.indexer = new ClientDataFrameIndexer(transform.getId(), transformsConfigManager, new AtomicReference<>(initialState), - initialPosition, client); + this.indexer = new ClientDataFrameIndexer(transform.getId(), transformsConfigManager, transformsCheckpointService, + new AtomicReference<>(initialState), initialPosition, client); this.generation = new AtomicReference(initialGeneration); } @@ -226,15 +228,18 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S private static final int LOAD_TRANSFORM_TIMEOUT_IN_SECONDS = 30; private final Client client; private final DataFrameTransformsConfigManager transformsConfigManager; + private final DataFrameTransformsCheckpointService transformsCheckpointService; private final String transformId; private DataFrameTransformConfig transformConfig = null; public ClientDataFrameIndexer(String transformId, DataFrameTransformsConfigManager transformsConfigManager, - AtomicReference initialState, Map initialPosition, Client client) { + DataFrameTransformsCheckpointService transformsCheckpointService, AtomicReference initialState, + Map initialPosition, Client client) { super(threadPool.executor(ThreadPool.Names.GENERIC), initialState, initialPosition); this.transformId = transformId; this.transformsConfigManager = transformsConfigManager; + this.transformsCheckpointService = transformsCheckpointService; this.client = client; } diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointServiceTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointServiceTests.java new file mode 100644 index 00000000000..cc9179f8ba7 --- /dev/null +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointServiceTests.java @@ -0,0 +1,207 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.dataframe.checkpoint; + +import org.elasticsearch.action.admin.indices.stats.CommonStats; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.cache.query.QueryCacheStats; +import org.elasticsearch.index.cache.request.RequestCacheStats; +import org.elasticsearch.index.engine.SegmentsStats; +import org.elasticsearch.index.fielddata.FieldDataStats; +import org.elasticsearch.index.flush.FlushStats; +import org.elasticsearch.index.get.GetStats; +import org.elasticsearch.index.merge.MergeStats; +import org.elasticsearch.index.refresh.RefreshStats; +import org.elasticsearch.index.search.stats.SearchStats; +import org.elasticsearch.index.seqno.SeqNoStats; +import org.elasticsearch.index.shard.DocsStats; +import org.elasticsearch.index.shard.IndexingStats; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.index.store.StoreStats; +import org.elasticsearch.index.warmer.WarmerStats; +import org.elasticsearch.search.suggest.completion.CompletionStats; +import org.elasticsearch.test.ESTestCase; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import static org.hamcrest.Matchers.containsString; + +public class DataFrameTransformsCheckpointServiceTests extends ESTestCase { + + public void testExtractIndexCheckpoints() { + Map expectedCheckpoints = new HashMap<>(); + Set indices = randomUserIndices(); + + ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, false, false); + + Map checkpoints = DataFrameTransformsCheckpointService.extractIndexCheckPoints(shardStatsArray, indices); + + assertEquals(expectedCheckpoints.size(), checkpoints.size()); + assertEquals(expectedCheckpoints.keySet(), checkpoints.keySet()); + + // low-level compare + for (Entry entry : expectedCheckpoints.entrySet()) { + assertTrue(Arrays.equals(entry.getValue(), checkpoints.get(entry.getKey()))); + } + } + + public void testExtractIndexCheckpointsLostPrimaries() { + Map expectedCheckpoints = new HashMap<>(); + Set indices = randomUserIndices(); + + ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, true, false); + + Map checkpoints = DataFrameTransformsCheckpointService.extractIndexCheckPoints(shardStatsArray, indices); + + assertEquals(expectedCheckpoints.size(), checkpoints.size()); + assertEquals(expectedCheckpoints.keySet(), checkpoints.keySet()); + + // low-level compare + for (Entry entry : expectedCheckpoints.entrySet()) { + assertTrue(Arrays.equals(entry.getValue(), checkpoints.get(entry.getKey()))); + } + } + + public void testExtractIndexCheckpointsInconsistentGlobalCheckpoints() { + Map expectedCheckpoints = new HashMap<>(); + Set indices = randomUserIndices(); + + ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, randomBoolean(), true); + + // fail + CheckpointException e = expectThrows(CheckpointException.class, + () -> DataFrameTransformsCheckpointService.extractIndexCheckPoints(shardStatsArray, indices)); + + assertThat(e.getMessage(), containsString("Global checkpoints mismatch")); } + + /** + * Create a random set of 3 index names + * @return set of indices a simulated user has access to + */ + private static Set randomUserIndices() { + Set indices = new HashSet<>(); + + // never create an empty set + if (randomBoolean()) { + indices.add("index-1"); + } else { + indices.add("index-2"); + } + if (randomBoolean()) { + indices.add("index-3"); + } + return indices; + } + + /** + * create a ShardStats for testing with random fuzzing + * + * @param expectedCheckpoints output parameter to return the checkpoints to expect + * @param userIndices set of indices that are visible + * @param skipPrimaries whether some shards do not have a primary shard at random + * @param inconsistentGlobalCheckpoints whether to introduce inconsistent global checkpoints + * @return array of ShardStats + */ + private static ShardStats[] createRandomShardStats(Map expectedCheckpoints, Set userIndices, + boolean skipPrimaries, boolean inconsistentGlobalCheckpoints) { + + // always create the full list + List indices = new ArrayList<>(); + indices.add(new Index("index-1", UUIDs.randomBase64UUID(random()))); + indices.add(new Index("index-2", UUIDs.randomBase64UUID(random()))); + indices.add(new Index("index-3", UUIDs.randomBase64UUID(random()))); + + List shardStats = new ArrayList<>(); + for (final Index index : indices) { + int numShards = randomIntBetween(1, 5); + + List checkpoints = new ArrayList<>(); + for (int shardIndex = 0; shardIndex < numShards; shardIndex++) { + // we need at least one replica for testing + int numShardCopies = randomIntBetween(2, 4); + + int inconsistentReplica = -1; + if (inconsistentGlobalCheckpoints) { + inconsistentReplica = randomIntBetween(0, numShardCopies - 1); + } + + // SeqNoStats asserts that checkpoints are logical + long localCheckpoint = randomLongBetween(0L, 100000000L); + long globalCheckpoint = randomBoolean() ? localCheckpoint : randomLongBetween(0L, 100000000L); + long maxSeqNo = Math.max(localCheckpoint, globalCheckpoint); + + SeqNoStats seqNoStats = new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint); + checkpoints.add(globalCheckpoint); + + for (int replica = 0; replica < numShardCopies; replica++) { + ShardId shardId = new ShardId(index, shardIndex); + boolean primary = (replica == 0); + + if (skipPrimaries) { + primary = randomBoolean(); + } + + Path path = createTempDir().resolve("indices").resolve(index.getUUID()).resolve(String.valueOf(shardIndex)); + ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, primary, + primary ? RecoverySource.EmptyStoreRecoverySource.INSTANCE : PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null) + ); + shardRouting = shardRouting.initialize("node-0", null, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); + shardRouting = shardRouting.moveToStarted(); + + CommonStats stats = new CommonStats(); + stats.fieldData = new FieldDataStats(); + stats.queryCache = new QueryCacheStats(); + stats.docs = new DocsStats(); + stats.store = new StoreStats(); + stats.indexing = new IndexingStats(); + stats.search = new SearchStats(); + stats.segments = new SegmentsStats(); + stats.merge = new MergeStats(); + stats.refresh = new RefreshStats(); + stats.completion = new CompletionStats(); + stats.requestCache = new RequestCacheStats(); + stats.get = new GetStats(); + stats.flush = new FlushStats(); + stats.warmer = new WarmerStats(); + + if (inconsistentReplica == replica) { + // overwrite + seqNoStats = new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint + randomLongBetween(10L, 100L)); + } + + shardStats.add(new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, seqNoStats, null)); + } + } + + if (userIndices.contains(index.getName())) { + expectedCheckpoints.put(index.getName(), checkpoints.stream().mapToLong(l -> l).toArray()); + } + } + // shuffle the shard stats + Collections.shuffle(shardStats, random()); + ShardStats[] shardStatsArray = shardStats.toArray(new ShardStats[0]); + return shardStatsArray; + } + +} diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameSingleNodeTestCase.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameSingleNodeTestCase.java index d1691fd094d..687e94cdb3a 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameSingleNodeTestCase.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameSingleNodeTestCase.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.reindex.ReindexPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.xpack.core.template.TemplateUtils; @@ -43,7 +44,7 @@ public abstract class DataFrameSingleNodeTestCase extends ESSingleNodeTestCase { @Override protected Collection> getPlugins() { - return pluginList(LocalStateDataFrame.class); + return pluginList(LocalStateDataFrame.class, ReindexPlugin.class); } protected void assertAsync(Consumer> function, T expected, CheckedConsumer onAnswer, diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManagerTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManagerTests.java index 7ad41333acb..f9c5d405fe6 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManagerTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManagerTests.java @@ -9,6 +9,8 @@ package org.elasticsearch.xpack.dataframe.persistence; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointTests; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests; import org.junit.Before; @@ -48,7 +50,7 @@ public class DataFrameTransformsConfigManagerTests extends DataFrameSingleNodeTe public void testDeleteMissingTransform() throws InterruptedException { // the index does not exist yet - assertAsync(listener -> transformsConfigManager.deleteTransformConfiguration("not_there", listener), (Boolean) null, null, e -> { + assertAsync(listener -> transformsConfigManager.deleteTransform("not_there", listener), (Boolean) null, null, e -> { assertEquals(ResourceNotFoundException.class, e.getClass()); assertEquals(DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, "not_there"), e.getMessage()); }); @@ -60,13 +62,13 @@ public class DataFrameTransformsConfigManagerTests extends DataFrameSingleNodeTe true, null, null); // same test, but different code path - assertAsync(listener -> transformsConfigManager.deleteTransformConfiguration("not_there", listener), (Boolean) null, null, e -> { + assertAsync(listener -> transformsConfigManager.deleteTransform("not_there", listener), (Boolean) null, null, e -> { assertEquals(ResourceNotFoundException.class, e.getClass()); assertEquals(DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, "not_there"), e.getMessage()); }); } - public void testCreateReadDelete() throws InterruptedException { + public void testCreateReadDeleteTransform() throws InterruptedException { DataFrameTransformConfig transformConfig = DataFrameTransformConfigTests.randomDataFrameTransformConfig(); // create transform @@ -84,15 +86,14 @@ public class DataFrameTransformsConfigManagerTests extends DataFrameSingleNodeTe }); // delete transform - assertAsync(listener -> transformsConfigManager.deleteTransformConfiguration(transformConfig.getId(), listener), true, null, null); + assertAsync(listener -> transformsConfigManager.deleteTransform(transformConfig.getId(), listener), true, null, null); // delete again - assertAsync(listener -> transformsConfigManager.deleteTransformConfiguration(transformConfig.getId(), listener), (Boolean) null, - null, e -> { - assertEquals(ResourceNotFoundException.class, e.getClass()); - assertEquals(DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformConfig.getId()), - e.getMessage()); - }); + assertAsync(listener -> transformsConfigManager.deleteTransform(transformConfig.getId(), listener), (Boolean) null, null, e -> { + assertEquals(ResourceNotFoundException.class, e.getClass()); + assertEquals(DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformConfig.getId()), + e.getMessage()); + }); // try to get deleted transform assertAsync(listener -> transformsConfigManager.getTransformConfiguration(transformConfig.getId(), listener), @@ -102,4 +103,29 @@ public class DataFrameTransformsConfigManagerTests extends DataFrameSingleNodeTe e.getMessage()); }); } + + public void testCreateReadDeleteCheckPoint() throws InterruptedException { + DataFrameTransformCheckpoint checkpoint = DataFrameTransformCheckpointTests.randomDataFrameTransformCheckpoints(); + + // create + assertAsync(listener -> transformsConfigManager.putTransformCheckpoint(checkpoint, listener), true, null, null); + + // read + assertAsync(listener -> transformsConfigManager.getTransformCheckpoint(checkpoint.getTransformId(), checkpoint.getCheckpoint(), + listener), checkpoint, null, null); + + // delete + assertAsync(listener -> transformsConfigManager.deleteTransform(checkpoint.getTransformId(), listener), true, null, null); + + // delete again + assertAsync(listener -> transformsConfigManager.deleteTransform(checkpoint.getTransformId(), listener), (Boolean) null, null, e -> { + assertEquals(ResourceNotFoundException.class, e.getClass()); + assertEquals(DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, checkpoint.getTransformId()), + e.getMessage()); + }); + + // getting a non-existing checkpoint returns null + assertAsync(listener -> transformsConfigManager.getTransformCheckpoint(checkpoint.getTransformId(), checkpoint.getCheckpoint(), + listener), DataFrameTransformCheckpoint.EMPTY, null, null); + } }