[ML-DataFrame] version data frame transform internal index (#45375) (#45837)

Adds index versioning for the internal data frame transform index. Allows for new indices to be created and referenced, `GET` requests now query over the index pattern and takes the latest doc (based on INDEX name).
This commit is contained in:
Benjamin Trent 2019-08-22 11:46:30 -05:00 committed by GitHub
parent 1dab73929f
commit e50a78cf50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 606 additions and 196 deletions

View File

@ -80,6 +80,7 @@ public abstract class AbstractTransportGetResourcesAction<Resource extends ToXCo
sourceBuilder.from(request.getPageParams().getFrom())
.size(request.getPageParams().getSize());
}
sourceBuilder.trackTotalHits(true);
IndicesOptions indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS;
SearchRequest searchRequest = new SearchRequest(getIndices())
@ -88,7 +89,7 @@ public abstract class AbstractTransportGetResourcesAction<Resource extends ToXCo
indicesOptions.expandWildcardsOpen(),
indicesOptions.expandWildcardsClosed(),
indicesOptions))
.source(sourceBuilder.trackTotalHits(true));
.source(customSearchOptions(sourceBuilder));
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
executionOrigin(),
@ -105,8 +106,12 @@ public abstract class AbstractTransportGetResourcesAction<Resource extends ToXCo
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) {
Resource resource = parse(parser);
docs.add(resource);
foundResourceIds.add(extractIdFromResource(resource));
String id = extractIdFromResource(resource);
// Do not include a resource with the same ID twice
if (foundResourceIds.contains(id) == false) {
docs.add(resource);
foundResourceIds.add(id);
}
} catch (IOException e) {
this.onFailure(e);
}
@ -159,6 +164,10 @@ public abstract class AbstractTransportGetResourcesAction<Resource extends ToXCo
return boolQuery.hasClauses() ? boolQuery : QueryBuilders.matchAllQuery();
}
protected SearchSourceBuilder customSearchOptions(SearchSourceBuilder searchSourceBuilder) {
return searchSourceBuilder;
}
@Nullable
protected QueryBuilder additionalQuery() {
return null;

View File

@ -26,6 +26,8 @@ public final class DataFrameField {
public static final ParseField INDEX_DOC_TYPE = new ParseField("doc_type");
public static final ParseField SOURCE = new ParseField("source");
public static final ParseField DESCRIPTION = new ParseField("description");
public static final ParseField VERSION = new ParseField("version");
public static final ParseField CREATE_TIME = new ParseField("create_time");
public static final ParseField DESTINATION = new ParseField("dest");
public static final ParseField FREQUENCY = new ParseField("frequency");
public static final ParseField FORCE = new ParseField("force");
@ -65,7 +67,6 @@ public final class DataFrameField {
// strings for meta information
public static final String META_FIELDNAME = "_data_frame";
public static final String CREATION_DATE_MILLIS = "creation_date_in_millis";
public static final String VERSION = "version";
public static final String CREATED = "created";
public static final String CREATED_BY = "created_by";
public static final String TRANSFORM = "transform";

View File

@ -24,7 +24,6 @@ import java.util.Objects;
public class DataFrameTransform extends AbstractDiffable<DataFrameTransform> implements XPackPlugin.XPackPersistentTaskParams {
public static final String NAME = DataFrameField.TASK_NAME;
public static final ParseField VERSION = new ParseField(DataFrameField.VERSION);
public static final ParseField FREQUENCY = DataFrameField.FREQUENCY;
private final String transformId;
@ -36,7 +35,7 @@ public class DataFrameTransform extends AbstractDiffable<DataFrameTransform> imp
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameField.ID);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), VERSION);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), DataFrameField.VERSION);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FREQUENCY);
}
@ -90,7 +89,7 @@ public class DataFrameTransform extends AbstractDiffable<DataFrameTransform> imp
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DataFrameField.ID.getPreferredName(), transformId);
builder.field(VERSION.getPreferredName(), version);
builder.field(DataFrameField.VERSION.getPreferredName(), version);
if (frequency != null) {
builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep());
}

View File

@ -47,8 +47,6 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
// types of transforms
public static final ParseField PIVOT_TRANSFORM = new ParseField("pivot");
public static final ParseField VERSION = new ParseField("version");
public static final ParseField CREATE_TIME = new ParseField("create_time");
private static final ConstructingObjectParser<DataFrameTransformConfig, String> STRICT_PARSER = createParser(false);
private static final ConstructingObjectParser<DataFrameTransformConfig, String> LENIENT_PARSER = createParser(true);
static final int MAX_DESCRIPTION_LENGTH = 1_000;
@ -98,8 +96,8 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
// on strict parsing do not allow injection of headers, transform version, or create time
if (lenient == false) {
validateStrictParsingParams(args[6], HEADERS.getPreferredName());
validateStrictParsingParams(args[9], CREATE_TIME.getPreferredName());
validateStrictParsingParams(args[10], VERSION.getPreferredName());
validateStrictParsingParams(args[9], DataFrameField.CREATE_TIME.getPreferredName());
validateStrictParsingParams(args[10], DataFrameField.VERSION.getPreferredName());
}
@SuppressWarnings("unchecked")
@ -132,8 +130,9 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
parser.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p, lenient), PIVOT_TRANSFORM);
parser.declareString(optionalConstructorArg(), DataFrameField.DESCRIPTION);
parser.declareField(optionalConstructorArg(),
p -> TimeUtils.parseTimeFieldToInstant(p, CREATE_TIME.getPreferredName()), CREATE_TIME, ObjectParser.ValueType.VALUE);
parser.declareString(optionalConstructorArg(), VERSION);
p -> TimeUtils.parseTimeFieldToInstant(p, DataFrameField.CREATE_TIME.getPreferredName()), DataFrameField.CREATE_TIME,
ObjectParser.ValueType.VALUE);
parser.declareString(optionalConstructorArg(), DataFrameField.VERSION);
return parser;
}
@ -256,7 +255,7 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
}
public DataFrameTransformConfig setCreateTime(Instant createTime) {
ExceptionsHelper.requireNonNull(createTime, CREATE_TIME.getPreferredName());
ExceptionsHelper.requireNonNull(createTime, DataFrameField.CREATE_TIME.getPreferredName());
this.createTime = Instant.ofEpochMilli(createTime.toEpochMilli());
return this;
}
@ -332,10 +331,11 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
builder.field(DataFrameField.DESCRIPTION.getPreferredName(), description);
}
if (transformVersion != null) {
builder.field(VERSION.getPreferredName(), transformVersion);
builder.field(DataFrameField.VERSION.getPreferredName(), transformVersion);
}
if (createTime != null) {
builder.timeField(CREATE_TIME.getPreferredName(), CREATE_TIME.getPreferredName() + "_string", createTime.toEpochMilli());
builder.timeField(DataFrameField.CREATE_TIME.getPreferredName(), DataFrameField.CREATE_TIME.getPreferredName() + "_string",
createTime.toEpochMilli());
}
builder.endObject();
return builder;

View File

@ -42,13 +42,13 @@ public class DataFrameConfigurationIndexIT extends DataFrameRestTestCase {
builder.endObject();
final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON);
Request req = new Request("PUT",
DataFrameInternalIndex.INDEX_NAME + "/_doc/" + DataFrameTransformConfig.documentId(fakeTransformName));
DataFrameInternalIndex.LATEST_INDEX_NAME + "/_doc/" + DataFrameTransformConfig.documentId(fakeTransformName));
req.setEntity(entity);
client().performRequest(req);
}
// refresh the index
assertOK(client().performRequest(new Request("POST", DataFrameInternalIndex.INDEX_NAME + "/_refresh")));
assertOK(client().performRequest(new Request("POST", DataFrameInternalIndex.LATEST_INDEX_NAME + "/_refresh")));
Request deleteRequest = new Request("DELETE", DATAFRAME_ENDPOINT + fakeTransformName);
Response deleteResponse = client().performRequest(deleteRequest);

View File

@ -385,7 +385,7 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
assertTrue(transformConfigs.isEmpty());
// the configuration index should be empty
Request request = new Request("GET", DataFrameInternalIndex.INDEX_NAME + "/_search");
Request request = new Request("GET", DataFrameInternalIndex.LATEST_INDEX_NAME + "/_search");
try {
Response searchResponse = adminClient().performRequest(request);
Map<String, Object> searchResult = entityAsMap(searchResponse);

View File

@ -0,0 +1,132 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.integration;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.dataframe.GetDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.GetDataFrameTransformResponse;
import org.elasticsearch.client.dataframe.UpdateDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.UpdateDataFrameTransformResponse;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigUpdate;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collections;
import static org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex.addDataFrameTransformsConfigMappings;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.equalTo;
public class DataFrameTransformInternalIndexIT extends ESRestTestCase {
private static final String CURRENT_INDEX = DataFrameInternalIndex.LATEST_INDEX_NAME;
private static final String OLD_INDEX = DataFrameInternalIndex.INDEX_PATTERN + "1";
public void testUpdateDeletesOldTransformConfig() throws Exception {
TestRestHighLevelClient client = new TestRestHighLevelClient();
// The mapping does not need to actually be the "OLD" mapping, we are testing that the old doc gets deleted, and the new one
// created.
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
builder.startObject();
builder.startObject("properties");
builder.startObject(DataFrameField.INDEX_DOC_TYPE.getPreferredName()).field("type", "keyword").endObject();
addDataFrameTransformsConfigMappings(builder);
builder.endObject();
builder.endObject();
client.indices().create(new CreateIndexRequest(OLD_INDEX).mapping(builder), RequestOptions.DEFAULT);
}
String transformIndex = "transform-index-deletes-old";
createSourceIndex(transformIndex);
String transformId = "transform-update-deletes-old-transform-config";
String config = "{\"dest\": {\"index\":\"bar\"},"
+ " \"source\": {\"index\":\"" + transformIndex + "\", \"query\": {\"match_all\":{}}},"
+ " \"id\": \""+transformId+"\","
+ " \"doc_type\": \"data_frame_transform_config\","
+ " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } } } },"
+ "\"frequency\":\"1s\""
+ "}";
client.index(new IndexRequest(OLD_INDEX)
.id(DataFrameTransformConfig.documentId(transformId))
.source(config, XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE),
RequestOptions.DEFAULT);
GetResponse getResponse = client.get(new GetRequest(OLD_INDEX, DataFrameTransformConfig.documentId(transformId)),
RequestOptions.DEFAULT);
assertThat(getResponse.isExists(), is(true));
GetDataFrameTransformResponse response = client.dataFrame()
.getDataFrameTransform(new GetDataFrameTransformRequest(transformId), RequestOptions.DEFAULT);
assertThat(response.getTransformConfigurations().get(0).getId(), equalTo(transformId));
UpdateDataFrameTransformResponse updated = client.dataFrame().updateDataFrameTransform(
new UpdateDataFrameTransformRequest(DataFrameTransformConfigUpdate.builder().setDescription("updated").build(), transformId),
RequestOptions.DEFAULT);
assertThat(updated.getTransformConfiguration().getId(), equalTo(transformId));
assertThat(updated.getTransformConfiguration().getDescription(), equalTo("updated"));
// Old should now be gone
getResponse = client.get(new GetRequest(OLD_INDEX, DataFrameTransformConfig.documentId(transformId)), RequestOptions.DEFAULT);
assertThat(getResponse.isExists(), is(false));
// New should be here
getResponse = client.get(new GetRequest(CURRENT_INDEX, DataFrameTransformConfig.documentId(transformId)),
RequestOptions.DEFAULT);
assertThat(getResponse.isExists(), is(true));
}
@Override
protected Settings restClientSettings() {
final String token = "Basic " +
Base64.getEncoder().encodeToString(("x_pack_rest_user:x-pack-test-password").getBytes(StandardCharsets.UTF_8));
return Settings.builder()
.put(ThreadContext.PREFIX + ".Authorization", token)
.build();
}
private void createSourceIndex(String index) throws IOException {
TestRestHighLevelClient client = new TestRestHighLevelClient();
client.indices().create(new CreateIndexRequest(index), RequestOptions.DEFAULT);
}
private class TestRestHighLevelClient extends RestHighLevelClient {
TestRestHighLevelClient() {
super(client(), restClient -> {}, new SearchModule(Settings.EMPTY, false, Collections.emptyList()).getNamedXContents());
}
}
}

View File

@ -54,7 +54,7 @@ public class DataFrameUsageIT extends DataFrameRestTestCase {
stopDataFrameTransform("test_usage", false);
Request statsExistsRequest = new Request("GET",
DataFrameInternalIndex.INDEX_NAME+"/_search?q=" +
DataFrameInternalIndex.LATEST_INDEX_NAME+"/_search?q=" +
INDEX_DOC_TYPE.getPreferredName() + ":" +
DataFrameTransformStoredDoc.NAME);
// Verify that we have one stat document
@ -96,7 +96,7 @@ public class DataFrameUsageIT extends DataFrameRestTestCase {
XContentMapValues.extractValue("data_frame.stats." + statName, statsMap));
}
// Refresh the index so that statistics are searchable
refreshIndex(DataFrameInternalIndex.INDEX_TEMPLATE_NAME);
refreshIndex(DataFrameInternalIndex.LATEST_INDEX_VERSIONED_NAME);
}, 60, TimeUnit.SECONDS);

View File

@ -198,7 +198,7 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDataUpgrader() {
return templates -> {
try {
templates.put(DataFrameInternalIndex.INDEX_TEMPLATE_NAME, DataFrameInternalIndex.getIndexTemplateMetaData());
templates.put(DataFrameInternalIndex.LATEST_INDEX_VERSIONED_NAME, DataFrameInternalIndex.getIndexTemplateMetaData());
} catch (IOException e) {
logger.error("Error creating data frame index template", e);
}

View File

@ -154,7 +154,7 @@ public class DataFrameFeatureSet implements XPackFeatureSet {
}
);
SearchRequest totalTransformCount = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
SearchRequest totalTransformCount = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME_PATTERN)
.setTrackTotalHits(true)
.setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), DataFrameTransformConfig.NAME))))
@ -196,7 +196,7 @@ public class DataFrameFeatureSet implements XPackFeatureSet {
.filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(),
DataFrameTransformStoredDoc.NAME)));
SearchRequestBuilder requestBuilder = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
SearchRequestBuilder requestBuilder = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME_PATTERN)
.setSize(0)
.setQuery(queryBuilder);

View File

@ -16,6 +16,8 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
@ -56,7 +58,7 @@ public class TransportGetDataFrameTransformsAction extends AbstractTransportGetR
@Override
protected String[] getIndices() {
return new String[]{DataFrameInternalIndex.INDEX_NAME};
return new String[]{DataFrameInternalIndex.INDEX_NAME_PATTERN};
}
@Override
@ -84,4 +86,10 @@ public class TransportGetDataFrameTransformsAction extends AbstractTransportGetR
protected QueryBuilder additionalQuery() {
return QueryBuilders.termQuery(INDEX_DOC_TYPE.getPreferredName(), DataFrameTransformConfig.NAME);
}
@Override
protected SearchSourceBuilder customSearchOptions(SearchSourceBuilder searchSourceBuilder) {
return searchSourceBuilder.sort("_index", SortOrder.DESC);
}
}

View File

@ -175,7 +175,7 @@ public class TransportStopDataFrameTransformAction extends TransportTasksAction<
waitResponse ->
client.admin()
.indices()
.prepareRefresh(DataFrameInternalIndex.INDEX_NAME)
.prepareRefresh(DataFrameInternalIndex.LATEST_INDEX_NAME)
.execute(ActionListener.wrap(
r -> listener.onResponse(waitResponse),
e -> {

View File

@ -6,6 +6,8 @@
package org.elasticsearch.xpack.dataframe.action;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
@ -19,6 +21,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
@ -58,6 +61,7 @@ import static org.elasticsearch.xpack.dataframe.action.TransportPutDataFrameTran
public class TransportUpdateDataFrameTransformAction extends TransportMasterNodeAction<Request, Response> {
private static final Logger logger = LogManager.getLogger(TransportUpdateDataFrameTransformAction.class);
private final XPackLicenseState licenseState;
private final Client client;
private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
@ -108,8 +112,6 @@ public class TransportUpdateDataFrameTransformAction extends TransportMasterNode
DataFrameTransformConfigUpdate update = request.getUpdate();
update.setHeaders(filteredHeaders);
String transformId = request.getId();
// GET transform and attempt to update
// We don't want the update to complete if the config changed between GET and INDEX
dataFrameTransformsConfigManager.getTransformConfigurationForUpdate(request.getId(), ActionListener.wrap(
@ -135,12 +137,12 @@ public class TransportUpdateDataFrameTransformAction extends TransportMasterNode
private void handlePrivsResponse(String username,
Request request,
DataFrameTransformConfig config,
DataFrameTransformsConfigManager.SeqNoPrimaryTermPair seqNoPrimaryTermPair,
DataFrameTransformsConfigManager.SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
ClusterState clusterState,
HasPrivilegesResponse privilegesResponse,
ActionListener<Response> listener) {
if (privilegesResponse.isCompleteMatch()) {
updateDataFrame(request, config, seqNoPrimaryTermPair, clusterState, listener);
updateDataFrame(request, config, seqNoPrimaryTermAndIndex, clusterState, listener);
} else {
List<String> indices = privilegesResponse.getIndexPrivileges()
.stream()
@ -158,7 +160,7 @@ public class TransportUpdateDataFrameTransformAction extends TransportMasterNode
private void validateAndUpdateDataFrame(Request request,
ClusterState clusterState,
DataFrameTransformConfig config,
DataFrameTransformsConfigManager.SeqNoPrimaryTermPair seqNoPrimaryTermPair,
DataFrameTransformsConfigManager.SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
ActionListener<Response> listener) {
try {
SourceDestValidator.validate(config, clusterState, indexNameExpressionResolver, request.isDeferValidation());
@ -173,17 +175,17 @@ public class TransportUpdateDataFrameTransformAction extends TransportMasterNode
final String username = securityContext.getUser().principal();
HasPrivilegesRequest privRequest = buildPrivilegeCheck(config, indexNameExpressionResolver, clusterState, username);
ActionListener<HasPrivilegesResponse> privResponseListener = ActionListener.wrap(
r -> handlePrivsResponse(username, request, config, seqNoPrimaryTermPair, clusterState, r, listener),
r -> handlePrivsResponse(username, request, config, seqNoPrimaryTermAndIndex, clusterState, r, listener),
listener::onFailure);
client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
} else { // No security enabled, just create the transform
updateDataFrame(request, config, seqNoPrimaryTermPair, clusterState, listener);
updateDataFrame(request, config, seqNoPrimaryTermAndIndex, clusterState, listener);
}
}
private void updateDataFrame(Request request,
DataFrameTransformConfig config,
DataFrameTransformsConfigManager.SeqNoPrimaryTermPair seqNoPrimaryTermPair,
DataFrameTransformsConfigManager.SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
ClusterState clusterState,
ActionListener<Response> listener) {
@ -193,7 +195,18 @@ public class TransportUpdateDataFrameTransformAction extends TransportMasterNode
ActionListener<Boolean> putTransformConfigurationListener = ActionListener.wrap(
putTransformConfigurationResult -> {
auditor.info(config.getId(), "updated data frame transform.");
listener.onResponse(new Response(config));
dataFrameTransformsConfigManager.deleteOldTransformConfigurations(request.getId(), ActionListener.wrap(
r -> {
logger.trace("[{}] successfully deleted old transform configurations", request.getId());
listener.onResponse(new Response(config));
},
e -> {
logger.warn(
LoggerMessageFormat.format("[{}] failed deleting old transform configurations.", request.getId()),
e);
listener.onResponse(new Response(config));
}
));
},
// If we failed to INDEX AND we created the destination index, the destination index will still be around
// This is a similar behavior to _start
@ -203,7 +216,7 @@ public class TransportUpdateDataFrameTransformAction extends TransportMasterNode
// <2> Update our transform
ActionListener<Void> createDestinationListener = ActionListener.wrap(
createDestResponse -> dataFrameTransformsConfigManager.updateTransformConfiguration(config,
seqNoPrimaryTermPair,
seqNoPrimaryTermAndIndex,
putTransformConfigurationListener),
listener::onFailure
);

View File

@ -31,11 +31,23 @@ import static org.elasticsearch.xpack.core.dataframe.DataFrameField.TRANSFORM_ID
public final class DataFrameInternalIndex {
/* Changelog of internal index versions
*
* Please list changes, increase the version if you are 1st in this release cycle
*
* version 1 (7.2): initial
* version 2 (7.4): cleanup, add config::version, config::create_time, checkpoint::timestamp, checkpoint::time_upper_bound,
* progress::docs_processed, progress::docs_indexed,
* stats::exponential_avg_checkpoint_duration_ms, stats::exponential_avg_documents_indexed,
* stats::exponential_avg_documents_processed
*/
// constants for the index
public static final String INDEX_TEMPLATE_VERSION = "1";
public static final String INDEX_TEMPLATE_PATTERN = ".data-frame-internal-";
public static final String INDEX_TEMPLATE_NAME = INDEX_TEMPLATE_PATTERN + INDEX_TEMPLATE_VERSION;
public static final String INDEX_NAME = INDEX_TEMPLATE_NAME;
public static final String INDEX_VERSION = "2";
public static final String INDEX_PATTERN = ".data-frame-internal-";
public static final String LATEST_INDEX_VERSIONED_NAME = INDEX_PATTERN + INDEX_VERSION;
public static final String LATEST_INDEX_NAME = LATEST_INDEX_VERSIONED_NAME;
public static final String INDEX_NAME_PATTERN = INDEX_PATTERN + "*";
public static final String AUDIT_TEMPLATE_VERSION = "1";
public static final String AUDIT_INDEX_PREFIX = ".data-frame-notifications-";
@ -58,8 +70,8 @@ public final class DataFrameInternalIndex {
public static final String KEYWORD = "keyword";
public static IndexTemplateMetaData getIndexTemplateMetaData() throws IOException {
IndexTemplateMetaData dataFrameTemplate = IndexTemplateMetaData.builder(INDEX_TEMPLATE_NAME)
.patterns(Collections.singletonList(INDEX_TEMPLATE_NAME))
IndexTemplateMetaData dataFrameTemplate = IndexTemplateMetaData.builder(LATEST_INDEX_VERSIONED_NAME)
.patterns(Collections.singletonList(LATEST_INDEX_VERSIONED_NAME))
.version(Version.CURRENT.id)
.settings(Settings.builder()
// the configurations are expected to be small
@ -117,7 +129,7 @@ public final class DataFrameInternalIndex {
return builder;
}
private static XContentBuilder mappings() throws IOException {
public static XContentBuilder mappings() throws IOException {
XContentBuilder builder = jsonBuilder();
builder.startObject();
@ -134,6 +146,8 @@ public final class DataFrameInternalIndex {
addDataFrameTransformsConfigMappings(builder);
// add the schema for transform stats
addDataFrameTransformStoredDocMappings(builder);
// add the schema for checkpoints
addDataFrameCheckpointMappings(builder);
// end type
builder.endObject();
// end properties
@ -226,15 +240,13 @@ public final class DataFrameInternalIndex {
.field(TYPE, DOUBLE)
.endObject()
.endObject()
.endObject()
.endObject();
// This is obsolete and can be removed for future versions of the index, but is left here as a warning/reminder that
// we cannot declare this field differently in version 1 of the internal index as it would cause a mapping clash
.startObject("checkpointing")
.field(ENABLED, false)
.endObject();
// .startObject("checkpointing").field(ENABLED, false).endObject();
}
private static XContentBuilder addDataFrameTransformsConfigMappings(XContentBuilder builder) throws IOException {
public static XContentBuilder addDataFrameTransformsConfigMappings(XContentBuilder builder) throws IOException {
return builder
.startObject(DataFrameField.ID.getPreferredName())
.field(TYPE, KEYWORD)
@ -258,6 +270,22 @@ public final class DataFrameInternalIndex {
.endObject()
.startObject(DataFrameField.DESCRIPTION.getPreferredName())
.field(TYPE, TEXT)
.endObject()
.startObject(DataFrameField.VERSION.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(DataFrameField.CREATE_TIME.getPreferredName())
.field(TYPE, DATE)
.endObject();
}
private static XContentBuilder addDataFrameCheckpointMappings(XContentBuilder builder) throws IOException {
return builder
.startObject(DataFrameField.TIMESTAMP_MILLIS.getPreferredName())
.field(TYPE, DATE)
.endObject()
.startObject(DataFrameField.TIME_UPPER_BOUND_MILLIS.getPreferredName())
.field(TYPE, DATE)
.endObject();
}

View File

@ -9,16 +9,18 @@ package org.elasticsearch.xpack.dataframe.persistence;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
@ -37,8 +39,11 @@ import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ScrollableHitSource;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher;
@ -54,12 +59,34 @@ import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
/**
* Place of all interactions with the internal transforms index. For configuration and mappings see @link{DataFrameInternalIndex}
*
* Versioned Index:
*
* We wrap several indexes under 1 pattern: ".data-frame-internal-1", ".data-frame-internal-2", ".data-frame-internal-n" while
* n is the _current_ version of the index.
*
* - all gets/reads and dbq as well are searches on all indexes, while last-one-wins, so the result with the highest version is uses
* - all puts and updates go into the _current_ version of the index, in case of updates this can leave dups behind
*
* Duplicate handling / old version cleanup
*
* As we always write to the new index, updates of older documents leave a dup in the previous versioned index behind. However,
* documents are tiny, so the impact is rather small.
*
* Nevertheless cleanup would be good, eventually we need to move old documents into new indexes after major upgrades.
*
* TODO: Provide a method that moves old docs into the current index and delete old indexes and templates
*/
public class DataFrameTransformsConfigManager {
private static final Logger logger = LogManager.getLogger(DataFrameTransformsConfigManager.class);
@ -84,7 +111,7 @@ public class DataFrameTransformsConfigManager {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
XContentBuilder source = checkpoint.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS));
IndexRequest indexRequest = new IndexRequest(DataFrameInternalIndex.INDEX_NAME)
IndexRequest indexRequest = new IndexRequest(DataFrameInternalIndex.LATEST_INDEX_NAME)
.opType(DocWriteRequest.OpType.INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.id(DataFrameTransformCheckpoint.documentId(checkpoint.getTransformId(), checkpoint.getCheckpoint()))
@ -116,30 +143,91 @@ public class DataFrameTransformsConfigManager {
* but is an index operation that will fail with a version conflict
* if the current document seqNo and primaryTerm is not the same as the provided version.
* @param transformConfig the @link{DataFrameTransformConfig}
* @param seqNoPrimaryTermPair an object containing the believed seqNo and primaryTerm for the doc.
* @param seqNoPrimaryTermAndIndex an object containing the believed seqNo, primaryTerm and index for the doc.
* Used for optimistic concurrency control
* @param listener listener to call after request
*/
public void updateTransformConfiguration(DataFrameTransformConfig transformConfig,
SeqNoPrimaryTermPair seqNoPrimaryTermPair,
SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
ActionListener<Boolean> listener) {
putTransformConfiguration(transformConfig, DocWriteRequest.OpType.INDEX, seqNoPrimaryTermPair, listener);
if (seqNoPrimaryTermAndIndex.getIndex().equals(DataFrameInternalIndex.LATEST_INDEX_NAME)) {
// update the config in the same, current index using optimistic concurrency control
putTransformConfiguration(transformConfig, DocWriteRequest.OpType.INDEX, seqNoPrimaryTermAndIndex, listener);
} else {
// create the config in the current version of the index assuming there is no existing one
// this leaves a dup behind in the old index, see dup handling on the top
putTransformConfiguration(transformConfig, DocWriteRequest.OpType.CREATE, null, listener);
}
}
/**
* This deletes configuration documents that match the given transformId that are contained in old index versions.
*
* @param transformId The configuration ID potentially referencing configurations stored in the old indices
* @param listener listener to alert on completion
*/
public void deleteOldTransformConfigurations(String transformId, ActionListener<Boolean> listener) {
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(DataFrameInternalIndex.INDEX_NAME_PATTERN)
.setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.mustNot(QueryBuilders.termQuery("_index", DataFrameInternalIndex.LATEST_INDEX_NAME))
.filter(QueryBuilders.termQuery("_id", DataFrameTransformConfig.documentId(transformId)))))
.setIndicesOptions(IndicesOptions.lenientExpandOpen());
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap(
response -> {
if ((response.getBulkFailures().isEmpty() && response.getSearchFailures().isEmpty()) == false) {
Tuple<RestStatus, Throwable> statusAndReason = getStatusAndReason(response);
listener.onFailure(
new ElasticsearchStatusException(statusAndReason.v2().getMessage(), statusAndReason.v1(), statusAndReason.v2()));
return;
}
listener.onResponse(true);
},
listener::onFailure
));
}
/**
* This deletes stored state/stats documents for the given transformId that are contained in old index versions.
*
* @param transformId The transform ID referenced by the documents
* @param listener listener to alert on completion
*/
public void deleteOldTransformStoredDocuments(String transformId, ActionListener<Boolean> listener) {
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(DataFrameInternalIndex.INDEX_NAME_PATTERN)
.setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.mustNot(QueryBuilders.termQuery("_index", DataFrameInternalIndex.LATEST_INDEX_NAME))
.filter(QueryBuilders.termQuery("_id", DataFrameTransformStoredDoc.documentId(transformId)))))
.setIndicesOptions(IndicesOptions.lenientExpandOpen());
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap(
response -> {
if ((response.getBulkFailures().isEmpty() && response.getSearchFailures().isEmpty()) == false) {
Tuple<RestStatus, Throwable> statusAndReason = getStatusAndReason(response);
listener.onFailure(
new ElasticsearchStatusException(statusAndReason.v2().getMessage(), statusAndReason.v1(), statusAndReason.v2()));
return;
}
listener.onResponse(true);
},
listener::onFailure
));
}
private void putTransformConfiguration(DataFrameTransformConfig transformConfig,
DocWriteRequest.OpType optType,
SeqNoPrimaryTermPair seqNoPrimaryTermPair,
SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
ActionListener<Boolean> listener) {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
XContentBuilder source = transformConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS));
IndexRequest indexRequest = new IndexRequest(DataFrameInternalIndex.INDEX_NAME)
IndexRequest indexRequest = new IndexRequest(DataFrameInternalIndex.LATEST_INDEX_NAME)
.opType(optType)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.id(DataFrameTransformConfig.documentId(transformConfig.getId()))
.source(source);
if (seqNoPrimaryTermPair != null) {
indexRequest.setIfSeqNo(seqNoPrimaryTermPair.seqNo).setIfPrimaryTerm(seqNoPrimaryTermPair.primaryTerm);
if (seqNoPrimaryTermAndIndex != null) {
indexRequest.setIfSeqNo(seqNoPrimaryTermAndIndex.seqNo).setIfPrimaryTerm(seqNoPrimaryTermAndIndex.primaryTerm);
}
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap(r -> {
listener.onResponse(true);
@ -170,19 +258,25 @@ public class DataFrameTransformsConfigManager {
* @param resultListener listener to call after request has been made
*/
public void getTransformCheckpoint(String transformId, long checkpoint, ActionListener<DataFrameTransformCheckpoint> resultListener) {
GetRequest getRequest = new GetRequest(DataFrameInternalIndex.INDEX_NAME,
DataFrameTransformCheckpoint.documentId(transformId, checkpoint));
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> {
QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", DataFrameTransformCheckpoint.documentId(transformId, checkpoint));
SearchRequest searchRequest = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME_PATTERN)
.setQuery(queryBuilder)
// use sort to get the last
.addSort("_index", SortOrder.DESC)
.setSize(1)
.request();
if (getResponse.isExists() == false) {
// do not fail if checkpoint does not exist but return an empty checkpoint
logger.trace("found no checkpoint for transform [" + transformId + "], returning empty checkpoint");
resultListener.onResponse(DataFrameTransformCheckpoint.EMPTY);
return;
}
BytesReference source = getResponse.getSourceAsBytesRef();
parseCheckpointsLenientlyFromSource(source, transformId, resultListener);
}, resultListener::onFailure));
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, SearchAction.INSTANCE, searchRequest, ActionListener.<SearchResponse>wrap(
searchResponse -> {
if (searchResponse.getHits().getHits().length == 0) {
// do not fail if checkpoint does not exist but return an empty checkpoint
logger.trace("found no checkpoint for transform [" + transformId + "], returning empty checkpoint");
resultListener.onResponse(DataFrameTransformCheckpoint.EMPTY);
return;
}
BytesReference source = searchResponse.getHits().getHits()[0].getSourceRef();
parseCheckpointsLenientlyFromSource(source, transformId, resultListener);
}, resultListener::onFailure));
}
/**
@ -193,24 +287,25 @@ public class DataFrameTransformsConfigManager {
* @param resultListener listener to call after inner request has returned
*/
public void getTransformConfiguration(String transformId, ActionListener<DataFrameTransformConfig> resultListener) {
GetRequest getRequest = new GetRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameTransformConfig.documentId(transformId));
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> {
QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", DataFrameTransformConfig.documentId(transformId));
SearchRequest searchRequest = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME_PATTERN)
.setQuery(queryBuilder)
// use sort to get the last
.addSort("_index", SortOrder.DESC)
.setSize(1)
.request();
if (getResponse.isExists() == false) {
resultListener.onFailure(new ResourceNotFoundException(
DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformId)));
return;
}
BytesReference source = getResponse.getSourceAsBytesRef();
parseTransformLenientlyFromSource(source, transformId, resultListener);
}, e -> {
if (e.getClass() == IndexNotFoundException.class) {
resultListener.onFailure(new ResourceNotFoundException(
DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformId)));
} else {
resultListener.onFailure(e);
}
}));
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, SearchAction.INSTANCE, searchRequest,
ActionListener.<SearchResponse>wrap(
searchResponse -> {
if (searchResponse.getHits().getHits().length == 0) {
resultListener.onFailure(new ResourceNotFoundException(
DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformId)));
return;
}
BytesReference source = searchResponse.getHits().getHits()[0].getSourceRef();
parseTransformLenientlyFromSource(source, transformId, resultListener);
}, resultListener::onFailure));
}
/**
@ -222,28 +317,30 @@ public class DataFrameTransformsConfigManager {
*/
public void getTransformConfigurationForUpdate(String transformId,
ActionListener<Tuple<DataFrameTransformConfig,
SeqNoPrimaryTermPair>> configAndVersionListener) {
GetRequest getRequest = new GetRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameTransformConfig.documentId(transformId));
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> {
SeqNoPrimaryTermAndIndex>> configAndVersionListener) {
QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", DataFrameTransformConfig.documentId(transformId));
SearchRequest searchRequest = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME_PATTERN)
.setQuery(queryBuilder)
// use sort to get the last
.addSort("_index", SortOrder.DESC)
.setSize(1)
.seqNoAndPrimaryTerm(true)
.request();
if (getResponse.isExists() == false) {
configAndVersionListener.onFailure(new ResourceNotFoundException(
DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformId)));
return;
}
BytesReference source = getResponse.getSourceAsBytesRef();
parseTransformLenientlyFromSource(source, transformId, ActionListener.wrap(
config -> configAndVersionListener.onResponse(Tuple.tuple(config,
new SeqNoPrimaryTermPair(getResponse.getSeqNo(), getResponse.getPrimaryTerm()))),
configAndVersionListener::onFailure));
}, e -> {
if (e.getClass() == IndexNotFoundException.class) {
configAndVersionListener.onFailure(new ResourceNotFoundException(
DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformId)));
} else {
configAndVersionListener.onFailure(e);
}
}));
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, SearchAction.INSTANCE, searchRequest, ActionListener.wrap(
searchResponse -> {
if (searchResponse.getHits().getHits().length == 0) {
configAndVersionListener.onFailure(new ResourceNotFoundException(
DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformId)));
return;
}
SearchHit hit = searchResponse.getHits().getHits()[0];
BytesReference source = hit.getSourceRef();
parseTransformLenientlyFromSource(source, transformId, ActionListener.wrap(
config -> configAndVersionListener.onResponse(Tuple.tuple(config,
new SeqNoPrimaryTermAndIndex(hit.getSeqNo(), hit.getPrimaryTerm(), hit.getIndex()))),
configAndVersionListener::onFailure));
}, configAndVersionListener::onFailure));
}
/**
@ -263,7 +360,7 @@ public class DataFrameTransformsConfigManager {
String[] idTokens = ExpandedIdsMatcher.tokenizeExpression(transformIdsExpression);
QueryBuilder queryBuilder = buildQueryFromTokenizedIds(idTokens, DataFrameTransformConfig.NAME);
SearchRequest request = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
SearchRequest request = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME_PATTERN)
.addSort(DataFrameField.ID.getPreferredName(), SortOrder.ASC)
.setFrom(pageParams.getFrom())
.setTrackTotalHits(true)
@ -275,35 +372,33 @@ public class DataFrameTransformsConfigManager {
final ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(idTokens, allowNoMatch);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), DATA_FRAME_ORIGIN, request,
ActionListener.<SearchResponse>wrap(
searchResponse -> {
long totalHits = searchResponse.getHits().getTotalHits().value;
List<String> ids = new ArrayList<>(searchResponse.getHits().getHits().length);
for (SearchHit hit : searchResponse.getHits().getHits()) {
BytesReference source = hit.getSourceRef();
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE, stream)) {
ids.add((String) parser.map().get(DataFrameField.ID.getPreferredName()));
} catch (IOException e) {
foundIdsListener.onFailure(new ElasticsearchParseException("failed to parse search hit for ids", e));
return;
}
}
requiredMatches.filterMatchedIds(ids);
if (requiredMatches.hasUnmatchedIds()) {
// some required Ids were not found
foundIdsListener.onFailure(
new ResourceNotFoundException(
DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM,
requiredMatches.unmatchedIdsString())));
executeAsyncWithOrigin(client.threadPool().getThreadContext(), DATA_FRAME_ORIGIN, request, ActionListener.<SearchResponse>wrap(
searchResponse -> {
long totalHits = searchResponse.getHits().getTotalHits().value;
// important: preserve order
Set<String> ids = new LinkedHashSet<>(searchResponse.getHits().getHits().length);
for (SearchHit hit : searchResponse.getHits().getHits()) {
BytesReference source = hit.getSourceRef();
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE, stream)) {
ids.add((String) parser.map().get(DataFrameField.ID.getPreferredName()));
} catch (IOException e) {
foundIdsListener.onFailure(new ElasticsearchParseException("failed to parse search hit for ids", e));
return;
}
foundIdsListener.onResponse(new Tuple<>(totalHits, ids));
},
foundIdsListener::onFailure
), client::search);
}
requiredMatches.filterMatchedIds(ids);
if (requiredMatches.hasUnmatchedIds()) {
// some required Ids were not found
foundIdsListener.onFailure(
new ResourceNotFoundException(
DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM,
requiredMatches.unmatchedIdsString())));
return;
}
foundIdsListener.onResponse(new Tuple<>(totalHits, new ArrayList<>(ids)));
}, foundIdsListener::onFailure), client::search);
}
/**
@ -314,15 +409,14 @@ public class DataFrameTransformsConfigManager {
*/
public void deleteTransform(String transformId, ActionListener<Boolean> listener) {
DeleteByQueryRequest request = new DeleteByQueryRequest()
.setAbortOnVersionConflict(false); //since these documents are not updated, a conflict just means it was deleted previously
.setAbortOnVersionConflict(false); //since these documents are not updated, a conflict just means it was deleted previously
request.indices(DataFrameInternalIndex.INDEX_NAME);
request.indices(DataFrameInternalIndex.INDEX_NAME_PATTERN);
QueryBuilder query = QueryBuilders.termQuery(DataFrameField.ID.getPreferredName(), transformId);
request.setQuery(query);
request.setRefresh(true);
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(deleteResponse -> {
if (deleteResponse.getDeleted() == 0) {
listener.onFailure(new ResourceNotFoundException(
DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformId)));
@ -343,9 +437,10 @@ public class DataFrameTransformsConfigManager {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
XContentBuilder source = stats.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS));
IndexRequest indexRequest = new IndexRequest(DataFrameInternalIndex.INDEX_NAME)
IndexRequest indexRequest = new IndexRequest(DataFrameInternalIndex.LATEST_INDEX_NAME)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.id(DataFrameTransformStoredDoc.documentId(stats.getId()))
.opType(DocWriteRequest.OpType.INDEX)
.source(source);
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap(
@ -363,51 +458,56 @@ public class DataFrameTransformsConfigManager {
}
public void getTransformStoredDoc(String transformId, ActionListener<DataFrameTransformStoredDoc> resultListener) {
GetRequest getRequest = new GetRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameTransformStoredDoc.documentId(transformId));
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> {
QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", DataFrameTransformStoredDoc.documentId(transformId));
SearchRequest searchRequest = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME_PATTERN)
.setQuery(queryBuilder)
// use sort to get the last
.addSort("_index", SortOrder.DESC)
.setSize(1)
.request();
if (getResponse.isExists() == false) {
resultListener.onFailure(new ResourceNotFoundException(
DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNKNOWN_TRANSFORM_STATS, transformId)));
return;
}
BytesReference source = getResponse.getSourceAsBytesRef();
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) {
resultListener.onResponse(DataFrameTransformStoredDoc.fromXContent(parser));
} catch (Exception e) {
logger.error(
DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_PARSE_TRANSFORM_STATISTICS_CONFIGURATION, transformId), e);
resultListener.onFailure(e);
}
}, e -> {
if (e instanceof ResourceNotFoundException) {
resultListener.onFailure(new ResourceNotFoundException(
DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNKNOWN_TRANSFORM_STATS, transformId)));
} else {
resultListener.onFailure(e);
}
}));
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, SearchAction.INSTANCE, searchRequest, ActionListener.<SearchResponse>wrap(
searchResponse -> {
if (searchResponse.getHits().getHits().length == 0) {
resultListener.onFailure(new ResourceNotFoundException(
DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNKNOWN_TRANSFORM_STATS, transformId)));
return;
}
BytesReference source = searchResponse.getHits().getHits()[0].getSourceRef();
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) {
resultListener.onResponse(DataFrameTransformStoredDoc.fromXContent(parser));
} catch (Exception e) {
logger.error(DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_PARSE_TRANSFORM_STATISTICS_CONFIGURATION,
transformId), e);
resultListener.onFailure(e);
}
}, resultListener::onFailure));
}
public void getTransformStoredDoc(Collection<String> transformIds, ActionListener<List<DataFrameTransformStoredDoc>> listener) {
QueryBuilder builder = QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termsQuery(DataFrameField.ID.getPreferredName(), transformIds))
.filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), DataFrameTransformStoredDoc.NAME)));
.filter(QueryBuilders.termsQuery(DataFrameField.ID.getPreferredName(), transformIds))
.filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), DataFrameTransformStoredDoc.NAME)));
SearchRequest searchRequest = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
.addSort(DataFrameField.ID.getPreferredName(), SortOrder.ASC)
.setQuery(builder)
.setSize(Math.min(transformIds.size(), 10_000))
.request();
SearchRequest searchRequest = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME_PATTERN)
.addSort(DataFrameField.ID.getPreferredName(), SortOrder.ASC)
.addSort("_index", SortOrder.DESC)
.setQuery(builder)
// the limit for getting stats and transforms is 1000, as long as we do not have 10 indices this works
.setSize(Math.min(transformIds.size(), 10_000))
.request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(), DATA_FRAME_ORIGIN, searchRequest,
ActionListener.<SearchResponse>wrap(
searchResponse -> {
List<DataFrameTransformStoredDoc> stats = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits().getHits()) {
ActionListener.<SearchResponse>wrap(
searchResponse -> {
List<DataFrameTransformStoredDoc> stats = new ArrayList<>();
String previousId = null;
for (SearchHit hit : searchResponse.getHits().getHits()) {
// skip old versions
if (hit.getId().equals(previousId) == false) {
previousId = hit.getId();
BytesReference source = hit.getSourceRef();
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
@ -419,17 +519,11 @@ public class DataFrameTransformsConfigManager {
return;
}
}
listener.onResponse(stats);
},
e -> {
if (e.getClass() == IndexNotFoundException.class) {
listener.onResponse(Collections.emptyList());
} else {
listener.onFailure(e);
}
}
), client::search);
listener.onResponse(stats);
}, listener::onFailure
), client::search);
}
private void parseTransformLenientlyFromSource(BytesReference source, String transformId,
@ -480,13 +574,37 @@ public class DataFrameTransformsConfigManager {
return QueryBuilders.constantScoreQuery(queryBuilder);
}
public static class SeqNoPrimaryTermPair {
private static Tuple<RestStatus, Throwable> getStatusAndReason(final BulkByScrollResponse response) {
RestStatus status = RestStatus.OK;
Throwable reason = new Exception("Unknown error");
//Getting the max RestStatus is sort of arbitrary, would the user care about 5xx over 4xx?
//Unsure of a better way to return an appropriate and possibly actionable cause to the user.
for (BulkItemResponse.Failure failure : response.getBulkFailures()) {
if (failure.getStatus().getStatus() > status.getStatus()) {
status = failure.getStatus();
reason = failure.getCause();
}
}
for (ScrollableHitSource.SearchFailure failure : response.getSearchFailures()) {
RestStatus failureStatus = org.elasticsearch.ExceptionsHelper.status(failure.getReason());
if (failureStatus.getStatus() > status.getStatus()) {
status = failureStatus;
reason = failure.getReason();
}
}
return new Tuple<>(status, reason);
}
public static class SeqNoPrimaryTermAndIndex {
private final long seqNo;
private final long primaryTerm;
private final String index;
public SeqNoPrimaryTermPair(long seqNo, long primaryTerm) {
public SeqNoPrimaryTermAndIndex(long seqNo, long primaryTerm, String index) {
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.index = index;
}
public long getSeqNo() {
@ -496,5 +614,9 @@ public class DataFrameTransformsConfigManager {
public long getPrimaryTerm() {
return primaryTerm;
}
public String getIndex() {
return index;
}
}
}

View File

@ -100,7 +100,7 @@ public final class DataframeIndex {
.field(DataFrameField.CREATED_BY, DataFrameField.DATA_FRAME_SIGNATURE)
.startObject(DataFrameField.META_FIELDNAME)
.field(DataFrameField.CREATION_DATE_MILLIS, clock.millis())
.startObject(DataFrameField.VERSION)
.startObject(DataFrameField.VERSION.getPreferredName())
.field(DataFrameField.CREATED, Version.CURRENT)
.endObject()
.field(DataFrameField.TRANSFORM, id)

View File

@ -104,7 +104,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
String[] indices = resolver.concreteIndexNames(clusterState,
IndicesOptions.lenientExpandOpen(),
DataFrameInternalIndex.INDEX_TEMPLATE_PATTERN + "*");
DataFrameInternalIndex.INDEX_NAME_PATTERN);
List<String> unavailableIndices = new ArrayList<>(indices.length);
for (String index : indices) {
IndexRoutingTable routingTable = clusterState.getRoutingTable().index(index);

View File

@ -22,6 +22,7 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
@ -58,6 +59,7 @@ import org.elasticsearch.xpack.dataframe.transforms.pivot.AggregationResultUtils
import java.time.Instant;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -632,6 +634,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
private volatile boolean auditBulkFailures = true;
// Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index
private volatile String lastAuditedExceptionMessage = null;
private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false);
ClientDataFrameIndexer(String transformId,
DataFrameTransformsConfigManager transformsConfigManager,
@ -896,7 +899,23 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
if (state.getTaskState().equals(DataFrameTransformTaskState.STOPPED)) {
transformTask.shutdown();
}
next.run();
// Only do this clean up once, if it succeeded, no reason to do the query again.
if (oldStatsCleanedUp.compareAndSet(false, true)) {
transformsConfigManager.deleteOldTransformStoredDocuments(transformId, ActionListener.wrap(
nil -> {
logger.trace("[{}] deleted old transform stats and state document", transformId);
next.run();
},
e -> {
String msg = LoggerMessageFormat.format("[{}] failed deleting old transform configurations.",
transformId);
logger.warn(msg, e);
// If we have failed, we should attempt the clean up again later
oldStatsCleanedUp.set(false);
next.run();
}
));
}
},
statsExc -> {
logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);

View File

@ -29,8 +29,8 @@ public abstract class DataFrameSingleNodeTestCase extends ESSingleNodeTestCase {
public void waitForTemplates() throws Exception {
assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState().get().getState();
assertTrue("Timed out waiting for the data frame templates to be installed",
TemplateUtils.checkTemplateExistsAndVersionIsGTECurrentVersion(DataFrameInternalIndex.INDEX_TEMPLATE_NAME, state));
assertTrue("Timed out waiting for the data frame templates to be installed", TemplateUtils
.checkTemplateExistsAndVersionIsGTECurrentVersion(DataFrameInternalIndex.LATEST_INDEX_VERSIONED_NAME, state));
});
}

View File

@ -8,7 +8,16 @@ package org.elasticsearch.xpack.dataframe.persistence;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
@ -27,6 +36,9 @@ import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex.mappings;
import static org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager.TO_XCONTENT_PARAMS;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
@ -278,4 +290,71 @@ public class DataFrameTransformsConfigManagerTests extends DataFrameSingleNodeTe
expectedDocs.sort(Comparator.comparing(DataFrameTransformStoredDoc::getId));
assertAsync(listener -> transformsConfigManager.getTransformStoredDoc(ids, listener), expectedDocs, null, null);
}
public void testDeleteOldTransformConfigurations() throws Exception {
String oldIndex = DataFrameInternalIndex.INDEX_PATTERN + "1";
String transformId = "transform_test_delete_old_configurations";
String docId = DataFrameTransformConfig.documentId(transformId);
DataFrameTransformConfig transformConfig = DataFrameTransformConfigTests
.randomDataFrameTransformConfig("transform_test_delete_old_configurations");
client().admin().indices().create(new CreateIndexRequest(oldIndex)
.mapping(MapperService.SINGLE_MAPPING_NAME, mappings())).actionGet();
try(XContentBuilder builder = XContentFactory.jsonBuilder()) {
XContentBuilder source = transformConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS));
IndexRequest request = new IndexRequest(oldIndex)
.source(source)
.id(docId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client().index(request).actionGet();
}
assertAsync(listener -> transformsConfigManager.putTransformConfiguration(transformConfig, listener), true, null, null);
assertThat(client().get(new GetRequest(oldIndex).id(docId)).actionGet().isExists(), is(true));
assertThat(client().get(new GetRequest(DataFrameInternalIndex.LATEST_INDEX_NAME).id(docId)).actionGet().isExists(), is(true));
assertAsync(listener -> transformsConfigManager.deleteOldTransformConfigurations(transformId, listener), true, null, null);
client().admin().indices().refresh(new RefreshRequest(DataFrameInternalIndex.INDEX_NAME_PATTERN)).actionGet();
assertThat(client().get(new GetRequest(oldIndex).id(docId)).actionGet().isExists(), is(false));
assertThat(client().get(new GetRequest(DataFrameInternalIndex.LATEST_INDEX_NAME).id(docId)).actionGet().isExists(), is(true));
}
public void testDeleteOldTransformStoredDocuments() throws Exception {
String oldIndex = DataFrameInternalIndex.INDEX_PATTERN + "1";
String transformId = "transform_test_delete_old_stored_documents";
String docId = DataFrameTransformStoredDoc.documentId(transformId);
DataFrameTransformStoredDoc dataFrameTransformStoredDoc = DataFrameTransformStoredDocTests
.randomDataFrameTransformStoredDoc(transformId);
client().admin().indices().create(new CreateIndexRequest(oldIndex)
.mapping(MapperService.SINGLE_MAPPING_NAME, mappings())).actionGet();
try(XContentBuilder builder = XContentFactory.jsonBuilder()) {
XContentBuilder source = dataFrameTransformStoredDoc.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS));
IndexRequest request = new IndexRequest(oldIndex)
.source(source)
.id(docId);
client().index(request).actionGet();
}
assertAsync(listener -> transformsConfigManager.putOrUpdateTransformStoredDoc(dataFrameTransformStoredDoc, listener),
true,
null,
null);
client().admin().indices().refresh(new RefreshRequest(DataFrameInternalIndex.INDEX_NAME_PATTERN)).actionGet();
assertThat(client().get(new GetRequest(oldIndex).id(docId)).actionGet().isExists(), is(true));
assertThat(client().get(new GetRequest(DataFrameInternalIndex.LATEST_INDEX_NAME).id(docId)).actionGet().isExists(), is(true));
assertAsync(listener -> transformsConfigManager.deleteOldTransformStoredDocuments(transformId, listener),
true,
null,
null);
client().admin().indices().refresh(new RefreshRequest(DataFrameInternalIndex.INDEX_NAME_PATTERN)).actionGet();
assertThat(client().get(new GetRequest(oldIndex).id(docId)).actionGet().isExists(), is(false));
assertThat(client().get(new GetRequest(DataFrameInternalIndex.LATEST_INDEX_NAME).id(docId)).actionGet().isExists(), is(true));
}
}

View File

@ -135,7 +135,7 @@ public class DataFrameTransformPersistentTasksExecutorTests extends ESTestCase {
metaData = new MetaData.Builder(cs.metaData());
routingTable = new RoutingTable.Builder(cs.routingTable());
String indexToRemove = DataFrameInternalIndex.INDEX_NAME;
String indexToRemove = DataFrameInternalIndex.LATEST_INDEX_NAME;
if (randomBoolean()) {
routingTable.remove(indexToRemove);
} else {
@ -158,7 +158,7 @@ public class DataFrameTransformPersistentTasksExecutorTests extends ESTestCase {
private void addIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable) {
List<String> indices = new ArrayList<>();
indices.add(DataFrameInternalIndex.AUDIT_INDEX);
indices.add(DataFrameInternalIndex.INDEX_NAME);
indices.add(DataFrameInternalIndex.LATEST_INDEX_NAME);
for (String indexName : indices) {
IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName);
indexMetaData.settings(Settings.builder()