[ML Data Frame] Persist and restore checkpoint and position (#41942)

Persist and restore Data frame's current checkpoint and position
This commit is contained in:
David Kyle 2019-05-21 18:40:04 +01:00
parent 813db163d8
commit 7e4d3c695b
16 changed files with 285 additions and 207 deletions

View File

@ -109,15 +109,6 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats {
out.writeString(transformId);
}
/**
* Get the persisted stats document name from the Data Frame Transformer Id.
*
* @return The id of document the where the transform stats are persisted
*/
public static String documentId(String transformId) {
return NAME + "-" + transformId;
}
@Nullable
public String getTransformId() {
return transformId;

View File

@ -23,9 +23,9 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
public class DataFrameTransformProgress implements Writeable, ToXContentObject {
private static final ParseField TOTAL_DOCS = new ParseField("total_docs");
private static final ParseField DOCS_REMAINING = new ParseField("docs_remaining");
private static final String PERCENT_COMPLETE = "percent_complete";
public static final ParseField TOTAL_DOCS = new ParseField("total_docs");
public static final ParseField DOCS_REMAINING = new ParseField("docs_remaining");
public static final String PERCENT_COMPLETE = "percent_complete";
public static final ConstructingObjectParser<DataFrameTransformProgress, Void> PARSER = new ConstructingObjectParser<>(
"data_frame_transform_progress",

View File

@ -42,12 +42,12 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
@Nullable
private final String reason;
private static final ParseField TASK_STATE = new ParseField("task_state");
private static final ParseField INDEXER_STATE = new ParseField("indexer_state");
private static final ParseField CURRENT_POSITION = new ParseField("current_position");
private static final ParseField CHECKPOINT = new ParseField("checkpoint");
private static final ParseField REASON = new ParseField("reason");
private static final ParseField PROGRESS = new ParseField("progress");
public static final ParseField TASK_STATE = new ParseField("task_state");
public static final ParseField INDEXER_STATE = new ParseField("indexer_state");
public static final ParseField CURRENT_POSITION = new ParseField("current_position");
public static final ParseField CHECKPOINT = new ParseField("checkpoint");
public static final ParseField REASON = new ParseField("reason");
public static final ParseField PROGRESS = new ParseField("progress");
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<DataFrameTransformState, Void> PARSER = new ConstructingObjectParser<>(NAME,

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
@ -14,6 +15,7 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.indexing.IndexerState;
@ -22,7 +24,7 @@ import java.util.Objects;
public class DataFrameTransformStateAndStats implements Writeable, ToXContentObject {
private static final String NAME = "data_frame_transform_state_and_stats";
public static final String NAME = "data_frame_transform_state_and_stats";
public static final ParseField STATE_FIELD = new ParseField("state");
public static final ParseField CHECKPOINTING_INFO_FIELD = new ParseField("checkpointing");
@ -47,6 +49,10 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
(p, c) -> DataFrameTransformCheckpointingInfo.fromXContent(p), CHECKPOINTING_INFO_FIELD);
}
public static DataFrameTransformStateAndStats fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
public static DataFrameTransformStateAndStats initialStateAndStats(String id) {
return initialStateAndStats(id, new DataFrameIndexerTransformStats(id));
}
@ -58,6 +64,15 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
DataFrameTransformCheckpointingInfo.EMPTY);
}
/**
* Get the persisted state and stats document name from the Data Frame Transform Id.
*
* @return The id of document the where the transform stats are persisted
*/
public static String documentId(String transformId) {
return NAME + "-" + transformId;
}
public DataFrameTransformStateAndStats(String id, DataFrameTransformState state, DataFrameIndexerTransformStats stats,
DataFrameTransformCheckpointingInfo checkpointingInfo) {
this.id = Objects.requireNonNull(id);
@ -73,6 +88,11 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
this.checkpointingInfo = new DataFrameTransformCheckpointingInfo(in);
}
@Nullable
public String getTransformId() {
return transformStats.getTransformId();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -80,6 +100,9 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
builder.field(STATE_FIELD.getPreferredName(), transformState, params);
builder.field(DataFrameField.STATS_FIELD.getPreferredName(), transformStats, params);
builder.field(CHECKPOINTING_INFO_FIELD.getPreferredName(), checkpointingInfo, params);
if (params.paramAsBoolean(DataFrameField.FOR_INTERNAL_STORAGE, false)) {
builder.field(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), NAME);
}
builder.endObject();
return builder;
}

View File

@ -16,6 +16,7 @@ import org.elasticsearch.action.search.SearchResponse;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
@ -94,16 +95,21 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
*/
public synchronized IndexerState stop() {
AtomicBoolean wasStartedAndSetStopped = new AtomicBoolean(false);
IndexerState currentState = state.updateAndGet(previousState -> {
if (previousState == IndexerState.INDEXING) {
return IndexerState.STOPPING;
} else if (previousState == IndexerState.STARTED) {
onStop();
wasStartedAndSetStopped.set(true);
return IndexerState.STOPPED;
} else {
return previousState;
}
});
if (wasStartedAndSetStopped.get()) {
onStop();
}
return currentState;
}

View File

@ -292,7 +292,7 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
wipeIndices();
}
public void wipeDataFrameTransforms() throws IOException, InterruptedException {
public void wipeDataFrameTransforms() throws IOException {
List<Map<String, Object>> transformConfigs = getDataFrameTransforms();
for (Map<String, Object> transformConfig : transformConfigs) {
String transformId = (String) transformConfig.get("id");

View File

@ -10,7 +10,7 @@ import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.junit.Before;
@ -72,7 +72,7 @@ public class DataFrameUsageIT extends DataFrameRestTestCase {
Request statsExistsRequest = new Request("GET",
DataFrameInternalIndex.INDEX_NAME+"/_search?q=" +
INDEX_DOC_TYPE.getPreferredName() + ":" +
DataFrameIndexerTransformStats.NAME);
DataFrameTransformStateAndStats.NAME);
// Verify that we have our two stats documents
assertBusy(() -> {
Map<String, Object> hasStatsMap = entityAsMap(client().performRequest(statsExistsRequest));
@ -100,7 +100,6 @@ public class DataFrameUsageIT extends DataFrameRestTestCase {
expectedStats.merge(statName, statistic, Integer::sum);
}
usageResponse = client().performRequest(new Request("GET", "_xpack/usage"));
usageAsMap = entityAsMap(usageResponse);
@ -109,7 +108,8 @@ public class DataFrameUsageIT extends DataFrameRestTestCase {
assertEquals(1, XContentMapValues.extractValue("data_frame.transforms.started", usageAsMap));
assertEquals(2, XContentMapValues.extractValue("data_frame.transforms.stopped", usageAsMap));
for(String statName : PROVIDED_STATS) {
assertEquals(expectedStats.get(statName), XContentMapValues.extractValue("data_frame.stats."+statName, usageAsMap));
assertEquals("Incorrect stat " + statName,
expectedStats.get(statName), XContentMapValues.extractValue("data_frame.stats." + statName, usageAsMap));
}
}
}

View File

@ -35,6 +35,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransfo
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
@ -176,6 +177,7 @@ public class DataFrameFeatureSet implements XPackFeatureSet {
for(String statName : PROVIDED_STATS) {
Aggregation agg = searchResponse.getAggregations().get(statName);
if (agg instanceof NumericMetricsAggregation.SingleValue) {
statisticsList.add((long)((NumericMetricsAggregation.SingleValue)agg).value());
} else {
@ -197,14 +199,15 @@ public class DataFrameFeatureSet implements XPackFeatureSet {
static void getStatisticSummations(Client client, ActionListener<DataFrameIndexerTransformStats> statsListener) {
QueryBuilder queryBuilder = QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(),
DataFrameIndexerTransformStats.NAME)));
DataFrameTransformStateAndStats.NAME)));
SearchRequestBuilder requestBuilder = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
.setSize(0)
.setQuery(queryBuilder);
final String path = DataFrameField.STATS_FIELD.getPreferredName() + ".";
for(String statName : PROVIDED_STATS) {
requestBuilder.addAggregation(AggregationBuilders.sum(statName).field(statName));
requestBuilder.addAggregation(AggregationBuilders.sum(statName).field(path + statName));
}
ActionListener<SearchResponse> getStatisticSummationsListener = ActionListener.wrap(
@ -213,6 +216,7 @@ public class DataFrameFeatureSet implements XPackFeatureSet {
logger.error("statistics summations search returned shard failures: {}",
Arrays.toString(searchResponse.getShardFailures()));
}
statsListener.onResponse(parseSearchAggs(searchResponse));
},
failure -> {

View File

@ -9,51 +9,29 @@ package org.elasticsearch.xpack.dataframe.action;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Request;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Response;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
@ -69,18 +47,16 @@ public class TransportGetDataFrameTransformsStatsAction extends
private static final Logger logger = LogManager.getLogger(TransportGetDataFrameTransformsStatsAction.class);
private final Client client;
private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
private final DataFrameTransformsCheckpointService transformsCheckpointService;
@Inject
public TransportGetDataFrameTransformsStatsAction(TransportService transportService, ActionFilters actionFilters,
ClusterService clusterService, Client client,
ClusterService clusterService,
DataFrameTransformsConfigManager dataFrameTransformsConfigManager,
DataFrameTransformsCheckpointService transformsCheckpointService) {
super(GetDataFrameTransformsStatsAction.NAME, clusterService, transportService, actionFilters, Request::new, Response::new,
Response::new, ThreadPool.Names.SAME);
this.client = client;
this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
this.transformsCheckpointService = transformsCheckpointService;
}
@ -157,32 +133,14 @@ public class TransportGetDataFrameTransformsStatsAction extends
// Small assurance that we are at least below the max. Terms search has a hard limit of 10k, we should at least be below that.
assert transformsWithoutTasks.size() <= Request.MAX_SIZE_RETURN;
ActionListener<SearchResponse> searchStatsListener = ActionListener.wrap(
searchResponse -> {
List<ElasticsearchException> nodeFailures = new ArrayList<>(response.getNodeFailures());
if (searchResponse.getShardFailures().length > 0) {
for(ShardSearchFailure shardSearchFailure : searchResponse.getShardFailures()) {
String nodeId = "";
if (shardSearchFailure.shard() != null) {
nodeId = shardSearchFailure.shard().getNodeId();
}
nodeFailures.add(new FailedNodeException(nodeId, shardSearchFailure.toString(), shardSearchFailure.getCause()));
}
logger.error("transform statistics document search returned shard failures: {}",
Arrays.toString(searchResponse.getShardFailures()));
}
ActionListener<List<DataFrameTransformStateAndStats>> searchStatsListener = ActionListener.wrap(
stats -> {
List<DataFrameTransformStateAndStats> allStateAndStats = response.getTransformsStateAndStats();
for(SearchHit hit : searchResponse.getHits().getHits()) {
BytesReference source = hit.getSourceRef();
try {
DataFrameIndexerTransformStats stats = parseFromSource(source);
allStateAndStats.add(DataFrameTransformStateAndStats.initialStateAndStats(stats.getTransformId(), stats));
transformsWithoutTasks.remove(stats.getTransformId());
} catch (IOException e) {
listener.onFailure(new ElasticsearchParseException("Could not parse data frame transform stats", e));
return;
}
}
allStateAndStats.addAll(stats);
transformsWithoutTasks.removeAll(
stats.stream().map(DataFrameTransformStateAndStats::getId).collect(Collectors.toSet()));
// Transforms that have not been started and have no state or stats.
transformsWithoutTasks.forEach(transformId ->
allStateAndStats.add(DataFrameTransformStateAndStats.initialStateAndStats(transformId)));
@ -190,7 +148,7 @@ public class TransportGetDataFrameTransformsStatsAction extends
// it can easily become arbitrarily ordered based on which transforms don't have a task or stats docs
allStateAndStats.sort(Comparator.comparing(DataFrameTransformStateAndStats::getId));
listener.onResponse(new Response(allStateAndStats, response.getTaskFailures(), nodeFailures));
listener.onResponse(new Response(allStateAndStats, response.getTaskFailures(), response.getNodeFailures()));
},
e -> {
if (e instanceof IndexNotFoundException) {
@ -201,26 +159,6 @@ public class TransportGetDataFrameTransformsStatsAction extends
}
);
QueryBuilder builder = QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termsQuery(DataFrameField.ID.getPreferredName(), transformsWithoutTasks))
.filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), DataFrameIndexerTransformStats.NAME)));
SearchRequest searchRequest = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
.addSort(DataFrameField.ID.getPreferredName(), SortOrder.ASC)
.setQuery(builder)
.request();
ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ClientHelper.DATA_FRAME_ORIGIN,
searchRequest,
searchStatsListener, client::search);
}
private static DataFrameIndexerTransformStats parseFromSource(BytesReference source) throws IOException {
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
return DataFrameIndexerTransformStats.fromXContent(parser);
}
dataFrameTransformsConfigManager.getTransformStats(transformsWithoutTasks, searchStatsListener);
}
}

View File

@ -59,7 +59,7 @@ public class TransportStartDataFrameTransformTaskAction extends
protected void taskOperation(StartDataFrameTransformTaskAction.Request request, DataFrameTransformTask transformTask,
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
if (transformTask.getTransformId().equals(request.getId())) {
transformTask.start(listener);
transformTask.start(null, listener);
} else {
listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId()
+ "] does not match request's ID [" + request.getId() + "]"));

View File

@ -17,6 +17,9 @@ import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DestConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.SourceConfig;
@ -50,7 +53,7 @@ public final class DataFrameInternalIndex {
public static final String RAW = "raw";
// data types
public static final String DOUBLE = "double";
public static final String FLOAT = "float";
public static final String LONG = "long";
public static final String KEYWORD = "keyword";
@ -130,7 +133,7 @@ public final class DataFrameInternalIndex {
// add the schema for transform configurations
addDataFrameTransformsConfigMappings(builder);
// add the schema for transform stats
addDataFrameTransformsStatsMappings(builder);
addDataFrameTransformStateAndStatsMappings(builder);
// end type
builder.endObject();
// end properties
@ -141,37 +144,76 @@ public final class DataFrameInternalIndex {
}
private static XContentBuilder addDataFrameTransformsStatsMappings(XContentBuilder builder) throws IOException {
private static XContentBuilder addDataFrameTransformStateAndStatsMappings(XContentBuilder builder) throws IOException {
return builder
.startObject(DataFrameIndexerTransformStats.NUM_PAGES.getPreferredName())
.field(TYPE, LONG)
.startObject(DataFrameTransformStateAndStats.STATE_FIELD.getPreferredName())
.startObject(PROPERTIES)
.startObject(DataFrameTransformState.TASK_STATE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(DataFrameTransformState.INDEXER_STATE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(DataFrameTransformState.CURRENT_POSITION.getPreferredName())
.field(ENABLED, false)
.endObject()
.startObject(DataFrameTransformState.CHECKPOINT.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameTransformState.REASON.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(DataFrameTransformState.PROGRESS.getPreferredName())
.startObject(PROPERTIES)
.startObject(DataFrameTransformProgress.TOTAL_DOCS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameTransformProgress.DOCS_REMAINING.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameTransformProgress.PERCENT_COMPLETE)
.field(TYPE, FLOAT)
.endObject()
.endObject()
.endObject()
.endObject()
.endObject()
.startObject(DataFrameIndexerTransformStats.NUM_INPUT_DOCUMENTS.getPreferredName())
.field(TYPE, LONG)
.startObject(DataFrameField.STATS_FIELD.getPreferredName())
.startObject(PROPERTIES)
.startObject(DataFrameIndexerTransformStats.NUM_PAGES.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.NUM_INPUT_DOCUMENTS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.NUM_OUTPUT_DOCUMENTS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.NUM_INVOCATIONS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.INDEX_TIME_IN_MS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.SEARCH_TIME_IN_MS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.INDEX_TOTAL.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.SEARCH_TOTAL.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.SEARCH_FAILURES.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.INDEX_FAILURES.getPreferredName())
.field(TYPE, LONG)
.endObject()
.endObject()
.endObject()
.startObject(DataFrameIndexerTransformStats.NUM_OUTPUT_DOCUMENTS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.NUM_INVOCATIONS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.INDEX_TIME_IN_MS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.SEARCH_TIME_IN_MS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.INDEX_TOTAL.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.SEARCH_TOTAL.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.SEARCH_FAILURES.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.INDEX_FAILURES.getPreferredName())
.field(TYPE, LONG)
.startObject(DataFrameTransformStateAndStats.CHECKPOINTING_INFO_FIELD.getPreferredName())
.field(ENABLED, false)
.endObject();
}

View File

@ -44,13 +44,14 @@ import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -274,13 +275,13 @@ public class DataFrameTransformsConfigManager {
}));
}
public void putOrUpdateTransformStats(DataFrameIndexerTransformStats stats, ActionListener<Boolean> listener) {
public void putOrUpdateTransformStats(DataFrameTransformStateAndStats stats, ActionListener<Boolean> listener) {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
XContentBuilder source = stats.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS));
IndexRequest indexRequest = new IndexRequest(DataFrameInternalIndex.INDEX_NAME)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.id(DataFrameIndexerTransformStats.documentId(stats.getTransformId()))
.id(DataFrameTransformStateAndStats.documentId(stats.getTransformId()))
.source(source);
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap(
@ -297,8 +298,8 @@ public class DataFrameTransformsConfigManager {
}
}
public void getTransformStats(String transformId, ActionListener<DataFrameIndexerTransformStats> resultListener) {
GetRequest getRequest = new GetRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameIndexerTransformStats.documentId(transformId));
public void getTransformStats(String transformId, ActionListener<DataFrameTransformStateAndStats> resultListener) {
GetRequest getRequest = new GetRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameTransformStateAndStats.documentId(transformId));
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> {
if (getResponse.isExists() == false) {
@ -310,7 +311,7 @@ public class DataFrameTransformsConfigManager {
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) {
resultListener.onResponse(DataFrameIndexerTransformStats.fromXContent(parser));
resultListener.onResponse(DataFrameTransformStateAndStats.fromXContent(parser));
} catch (Exception e) {
logger.error(
DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_PARSE_TRANSFORM_STATISTICS_CONFIGURATION, transformId), e);
@ -326,6 +327,46 @@ public class DataFrameTransformsConfigManager {
}));
}
public void getTransformStats(Collection<String> transformIds, ActionListener<List<DataFrameTransformStateAndStats>> listener) {
QueryBuilder builder = QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termsQuery(DataFrameField.ID.getPreferredName(), transformIds))
.filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), DataFrameTransformStateAndStats.NAME)));
SearchRequest searchRequest = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
.addSort(DataFrameField.ID.getPreferredName(), SortOrder.ASC)
.setQuery(builder)
.request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(), DATA_FRAME_ORIGIN, searchRequest,
ActionListener.<SearchResponse>wrap(
searchResponse -> {
List<DataFrameTransformStateAndStats> stats = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits().getHits()) {
BytesReference source = hit.getSourceRef();
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
stats.add(DataFrameTransformStateAndStats.fromXContent(parser));
} catch (IOException e) {
listener.onFailure(
new ElasticsearchParseException("failed to parse data frame stats from search hit", e));
return;
}
}
listener.onResponse(stats);
},
e -> {
if (e.getClass() == IndexNotFoundException.class) {
listener.onResponse(Collections.emptyList());
} else {
listener.onFailure(e);
}
}
), client::search);
}
private void parseTransformLenientlyFromSource(BytesReference source, String transformId,
ActionListener<DataFrameTransformConfig> transformListener) {
try (InputStream stream = source.streamInput();

View File

@ -26,10 +26,10 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
@ -106,44 +106,47 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTransform params, PersistentTaskState state) {
final String transformId = params.getId();
final DataFrameTransformTask buildTask = (DataFrameTransformTask) task;
final DataFrameTransformState transformState = (DataFrameTransformState) state;
final DataFrameTransformState transformPTaskState = (DataFrameTransformState) state;
final DataFrameTransformTask.ClientDataFrameIndexerBuilder indexerBuilder =
new DataFrameTransformTask.ClientDataFrameIndexerBuilder()
new DataFrameTransformTask.ClientDataFrameIndexerBuilder(transformId)
.setAuditor(auditor)
.setClient(client)
.setIndexerState(currentIndexerState(transformState))
.setInitialPosition(transformState == null ? null : transformState.getPosition())
// If the state is `null` that means this is a "first run". We can safely assume the
// task will attempt to gather the initial progress information
// if we have state, this may indicate the previous execution node crashed, so we should attempt to retrieve
// the progress from state to keep an accurate measurement of our progress
.setProgress(transformState == null ? null : transformState.getProgress())
.setIndexerState(currentIndexerState(transformPTaskState))
// If the transform persistent task state is `null` that means this is a "first run".
// If we have state then the task has relocated from another node in which case this
// state is preferred
.setInitialPosition(transformPTaskState == null ? null : transformPTaskState.getPosition())
.setProgress(transformPTaskState == null ? null : transformPTaskState.getProgress())
.setTransformsCheckpointService(dataFrameTransformsCheckpointService)
.setTransformsConfigManager(transformsConfigManager)
.setTransformId(transformId);
.setTransformsConfigManager(transformsConfigManager);
ActionListener<StartDataFrameTransformTaskAction.Response> startTaskListener = ActionListener.wrap(
response -> logger.info("Successfully completed and scheduled task in node operation"),
failure -> logger.error("Failed to start task ["+ transformId +"] in node operation", failure)
);
Long previousCheckpoint = transformPTaskState != null ? transformPTaskState.getCheckpoint() : null;
// <3> Set the previous stats (if they exist), initialize the indexer, start the task (If it is STOPPED)
// Since we don't create the task until `_start` is called, if we see that the task state is stopped, attempt to start
// Schedule execution regardless
ActionListener<DataFrameIndexerTransformStats> transformStatsActionListener = ActionListener.wrap(
stats -> {
indexerBuilder.setInitialStats(stats);
buildTask.initializeIndexer(indexerBuilder);
startTask(buildTask, startTaskListener);
ActionListener<DataFrameTransformStateAndStats> transformStatsActionListener = ActionListener.wrap(
stateAndStats -> {
indexerBuilder.setInitialStats(stateAndStats.getTransformStats());
if (transformPTaskState == null) { // prefer the persistent task state
indexerBuilder.setInitialPosition(stateAndStats.getTransformState().getPosition());
indexerBuilder.setProgress(stateAndStats.getTransformState().getProgress());
}
final Long checkpoint = previousCheckpoint != null ? previousCheckpoint : stateAndStats.getTransformState().getCheckpoint();
startTask(buildTask, indexerBuilder, checkpoint, startTaskListener);
},
error -> {
if (error instanceof ResourceNotFoundException == false) {
logger.error("Unable to load previously persisted statistics for transform [" + params.getId() + "]", error);
}
indexerBuilder.setInitialStats(new DataFrameIndexerTransformStats(transformId));
buildTask.initializeIndexer(indexerBuilder);
startTask(buildTask, startTaskListener);
startTask(buildTask, indexerBuilder, previousCheckpoint, startTaskListener);
}
);
@ -217,13 +220,17 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
}
private void startTask(DataFrameTransformTask buildTask,
DataFrameTransformTask.ClientDataFrameIndexerBuilder indexerBuilder,
Long previousCheckpoint,
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
// If we are stopped, and it is an initial run, this means we have never been started,
// attempt to start the task
buildTask.initializeIndexer(indexerBuilder);
// TODO isInitialRun is false after relocation??
if (buildTask.getState().getTaskState().equals(DataFrameTransformTaskState.STOPPED) && buildTask.isInitialRun()) {
logger.info("Data frame transform [{}] created.", buildTask.getTransformId());
buildTask.start(listener);
buildTask.start(previousCheckpoint, listener);
} else {
logger.debug("No need to start task. Its current state is: {}", buildTask.getState().getIndexerState());
listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));

View File

@ -29,9 +29,11 @@ import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTask
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.core.dataframe.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.indexing.IndexerState;
@ -181,7 +183,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
return getIndexer() != null && getIndexer().initialRun();
}
public synchronized void start(ActionListener<Response> listener) {
/**
* Start the background indexer and set the task's state to started
* @param startingCheckpoint Set the current checkpoint to this value. If null the
* current checkpoint is not set
* @param listener Started listener
*/
public synchronized void start(Long startingCheckpoint, ActionListener<Response> listener) {
if (getIndexer() == null) {
listener.onFailure(new ElasticsearchException("Task for transform [{}] not fully initialized. Try again later",
getTransformId()));
@ -195,6 +203,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
}
stateReason.set(null);
taskState.set(DataFrameTransformTaskState.STARTED);
if (startingCheckpoint != null) {
currentCheckpoint.set(startingCheckpoint);
}
final DataFrameTransformState state = new DataFrameTransformState(
DataFrameTransformTaskState.STARTED,
@ -347,6 +358,11 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
private Map<String, Object> initialPosition;
private DataFrameTransformProgress progress;
ClientDataFrameIndexerBuilder(String transformId) {
this.transformId = transformId;
this.initialStats = new DataFrameIndexerTransformStats(transformId);
}
ClientDataFrameIndexer build(DataFrameTransformTask parentTask) {
return new ClientDataFrameIndexer(this.transformId,
this.transformsConfigManager,
@ -538,7 +554,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
task -> {
// Only persist the stats if something has actually changed
if (previouslyPersistedStats == null || previouslyPersistedStats.equals(getStats()) == false) {
transformsConfigManager.putOrUpdateTransformStats(getStats(),
transformsConfigManager.putOrUpdateTransformStats(
new DataFrameTransformStateAndStats(transformId, state, getStats(),
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
ActionListener.wrap(
r -> {
previouslyPersistedStats = getStats();
@ -599,7 +617,18 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
protected void onStop() {
auditor.info(transformConfig.getId(), "Indexer has stopped");
logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId());
transformTask.shutdown();
transformsConfigManager.putOrUpdateTransformStats(
new DataFrameTransformStateAndStats(transformId, transformTask.getState(), getStats(),
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
ActionListener.wrap(
r -> {
transformTask.shutdown();
},
statsExc -> {
transformTask.shutdown();
logger.error("Updating saving stats of transform [" + transformConfig.getId() + "] failed", statsExc);
}
));
}
@Override

View File

@ -14,12 +14,17 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheck
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointTests;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStatsTests;
import org.elasticsearch.xpack.dataframe.DataFrameSingleNodeTestCase;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
@ -217,4 +222,40 @@ public class DataFrameTransformsConfigManagerTests extends DataFrameSingleNodeTe
});
}
public void testStateAndStats() throws InterruptedException {
String transformId = "transform_test_stats_create_read_update";
DataFrameTransformStateAndStats stateAndStats =
DataFrameTransformStateAndStatsTests.randomDataFrameTransformStateAndStats(transformId);
assertAsync(listener -> transformsConfigManager.putOrUpdateTransformStats(stateAndStats, listener), Boolean.TRUE, null, null);
assertAsync(listener -> transformsConfigManager.getTransformStats(transformId, listener), stateAndStats, null, null);
DataFrameTransformStateAndStats updated =
DataFrameTransformStateAndStatsTests.randomDataFrameTransformStateAndStats(transformId);
assertAsync(listener -> transformsConfigManager.putOrUpdateTransformStats(updated, listener), Boolean.TRUE, null, null);
assertAsync(listener -> transformsConfigManager.getTransformStats(transformId, listener), updated, null, null);
}
public void testGetStateAndStatsMultiple() throws InterruptedException {
int numStats = randomInt(5);
List<DataFrameTransformStateAndStats> expectedStats = new ArrayList<>();
for (int i=0; i<numStats; i++) {
DataFrameTransformStateAndStats stat =
DataFrameTransformStateAndStatsTests.randomDataFrameTransformStateAndStats(randomAlphaOfLength(6));
expectedStats.add(stat);
assertAsync(listener -> transformsConfigManager.putOrUpdateTransformStats(stat, listener), Boolean.TRUE, null, null);
}
// remove one of the put stats so we don't retrieve all
if (expectedStats.size() > 1) {
expectedStats.remove(expectedStats.size() -1);
}
List<String> ids = expectedStats.stream().map(DataFrameTransformStateAndStats::getId).collect(Collectors.toList());
// get stats will be ordered by id
expectedStats.sort(Comparator.comparing(DataFrameTransformStateAndStats::getId));
assertAsync(listener -> transformsConfigManager.getTransformStats(ids, listener), expectedStats, null, null);
}
}

View File

@ -114,8 +114,8 @@ teardown:
transform_id: "airline-transform-start-stop"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-stop" }
- match: { transforms.0.state.indexer_state: "stopped" }
- match: { transforms.0.state.task_state: "stopped" }
# - match: { transforms.0.state.indexer_state: "stopped" }
# - match: { transforms.0.state.task_state: "stopped" }
- do:
data_frame.start_data_frame_transform:
@ -206,47 +206,3 @@ teardown:
- do:
data_frame.delete_data_frame_transform:
transform_id: "airline-transform-start-later"
---
"Test stop all":
- do:
data_frame.put_data_frame_transform:
transform_id: "airline-transform-stop-all"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-start-later" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
- do:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-stop-all"
- match: { started: true }
- do:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-start-stop"
- match: { started: true }
- do:
data_frame.stop_data_frame_transform:
transform_id: "_all"
wait_for_completion: true
- match: { stopped: true }
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "*"
- match: { count: 2 }
- match: { transforms.0.state.indexer_state: "stopped" }
- match: { transforms.0.state.task_state: "stopped" }
- match: { transforms.1.state.indexer_state: "stopped" }
- match: { transforms.1.state.task_state: "stopped" }
- do:
data_frame.delete_data_frame_transform:
transform_id: "airline-transform-stop-all"