From cab879118d429544c1fc53b976313792317b5944 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Fri, 28 Jun 2019 13:28:03 +0300 Subject: [PATCH] [7.x][ML] Support multiple source indices for df-analytics (#43702) (#43731) This commit adds support for multiple source indices. In order to deal with multiple indices having different mappings, it attempts a best-effort approach to merge the mappings assuming there are no conflicts. In case conflicts exists an error will be returned. To allow users creating custom mappings for special use cases, the destination index is now allowed to exist before the analytics job runs. In addition, settings are no longer copied except for the `index.number_of_shards` and `index.number_of_replicas`. --- .../dataframe/DataFrameAnalyticsSource.java | 23 ++- .../MlClientDocumentationIT.java | 16 +- .../DataFrameAnalyticsSourceTests.java | 2 +- .../dataframe/DataFrameAnalyticsSource.java | 31 ++-- .../xpack/core/ml/job/messages/Messages.java | 3 +- .../DataFrameAnalyticsSourceTests.java | 2 +- .../ml/qa/ml-with-security/build.gradle | 3 +- .../integration/RunDataFrameAnalyticsIT.java | 115 ++++++++++++- .../xpack/ml/MachineLearning.java | 2 +- ...ransportStartDataFrameAnalyticsAction.java | 33 ++-- .../ml/dataframe/DataFrameAnalyticsIndex.java | 158 +++++++++++++----- .../dataframe/DataFrameAnalyticsManager.java | 37 +++- .../xpack/ml/dataframe/MappingsMerger.java | 100 +++++++++++ .../ml/dataframe/SourceDestValidator.java | 11 +- .../DataFrameDataExtractorFactory.java | 8 +- .../extractor/ExtractedFieldsDetector.java | 29 ++-- .../DataFrameAnalyticsIndexTests.java | 142 ++++++++++------ .../ml/dataframe/MappingsMergerTests.java | 153 +++++++++++++++++ .../dataframe/SourceDestValidatorTests.java | 20 ++- .../ExtractedFieldsDetectorTests.java | 6 +- .../test/ml/data_frame_analytics_crud.yml | 34 +++- 21 files changed, 741 insertions(+), 187 deletions(-) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/MappingsMerger.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/MappingsMergerTests.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSource.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSource.java index c36799cd3b4..9a6de159bea 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSource.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSource.java @@ -28,6 +28,8 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; +import java.util.Arrays; +import java.util.List; import java.util.Objects; public class DataFrameAnalyticsSource implements ToXContentObject { @@ -46,19 +48,19 @@ public class DataFrameAnalyticsSource implements ToXContentObject { private static ObjectParser PARSER = new ObjectParser<>("data_frame_analytics_source", true, Builder::new); static { - PARSER.declareString(Builder::setIndex, INDEX); + PARSER.declareStringArray(Builder::setIndex, INDEX); PARSER.declareObject(Builder::setQueryConfig, (p, c) -> QueryConfig.fromXContent(p), QUERY); } - private final String index; + private final String[] index; private final QueryConfig queryConfig; - private DataFrameAnalyticsSource(String index, @Nullable QueryConfig queryConfig) { + private DataFrameAnalyticsSource(String[] index, @Nullable QueryConfig queryConfig) { this.index = Objects.requireNonNull(index); this.queryConfig = queryConfig; } - public String getIndex() { + public String[] getIndex() { return index; } @@ -83,13 +85,13 @@ public class DataFrameAnalyticsSource implements ToXContentObject { if (o == null || getClass() != o.getClass()) return false; DataFrameAnalyticsSource other = (DataFrameAnalyticsSource) o; - return Objects.equals(index, other.index) + return Arrays.equals(index, other.index) && Objects.equals(queryConfig, other.queryConfig); } @Override public int hashCode() { - return Objects.hash(index, queryConfig); + return Objects.hash(Arrays.asList(index), queryConfig); } @Override @@ -99,16 +101,21 @@ public class DataFrameAnalyticsSource implements ToXContentObject { public static class Builder { - private String index; + private String[] index; private QueryConfig queryConfig; private Builder() {} - public Builder setIndex(String index) { + public Builder setIndex(String... index) { this.index = index; return this; } + public Builder setIndex(List index) { + this.index = index.toArray(new String[0]); + return this; + } + public Builder setQueryConfig(QueryConfig queryConfig) { this.queryConfig = queryConfig; return this; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java index 526e31a5da1..5c9017b7706 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java @@ -2802,7 +2802,7 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase { } public void testGetDataFrameAnalytics() throws Exception { - createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex()); + createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex()[0]); RestHighLevelClient client = highLevelClient(); client.machineLearning().putDataFrameAnalytics(new PutDataFrameAnalyticsRequest(DF_ANALYTICS_CONFIG), RequestOptions.DEFAULT); @@ -2851,7 +2851,7 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase { } public void testGetDataFrameAnalyticsStats() throws Exception { - createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex()); + createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex()[0]); RestHighLevelClient client = highLevelClient(); client.machineLearning().putDataFrameAnalytics(new PutDataFrameAnalyticsRequest(DF_ANALYTICS_CONFIG), RequestOptions.DEFAULT); @@ -2901,7 +2901,7 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase { } public void testPutDataFrameAnalytics() throws Exception { - createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex()); + createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex()[0]); RestHighLevelClient client = highLevelClient(); { @@ -2994,7 +2994,7 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase { } public void testDeleteDataFrameAnalytics() throws Exception { - createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex()); + createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex()[0]); RestHighLevelClient client = highLevelClient(); client.machineLearning().putDataFrameAnalytics(new PutDataFrameAnalyticsRequest(DF_ANALYTICS_CONFIG), RequestOptions.DEFAULT); @@ -3044,9 +3044,9 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase { } public void testStartDataFrameAnalytics() throws Exception { - createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex()); + createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex()[0]); highLevelClient().index( - new IndexRequest(DF_ANALYTICS_CONFIG.getSource().getIndex()).source(XContentType.JSON, "total", 10000) + new IndexRequest(DF_ANALYTICS_CONFIG.getSource().getIndex()[0]).source(XContentType.JSON, "total", 10000) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), RequestOptions.DEFAULT); RestHighLevelClient client = highLevelClient(); client.machineLearning().putDataFrameAnalytics(new PutDataFrameAnalyticsRequest(DF_ANALYTICS_CONFIG), RequestOptions.DEFAULT); @@ -3101,9 +3101,9 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase { } public void testStopDataFrameAnalytics() throws Exception { - createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex()); + createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex()[0]); highLevelClient().index( - new IndexRequest(DF_ANALYTICS_CONFIG.getSource().getIndex()).source(XContentType.JSON, "total", 10000) + new IndexRequest(DF_ANALYTICS_CONFIG.getSource().getIndex()[0]).source(XContentType.JSON, "total", 10000) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), RequestOptions.DEFAULT); RestHighLevelClient client = highLevelClient(); client.machineLearning().putDataFrameAnalytics(new PutDataFrameAnalyticsRequest(DF_ANALYTICS_CONFIG), RequestOptions.DEFAULT); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSourceTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSourceTests.java index eb254fd23de..c556b2e053c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSourceTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSourceTests.java @@ -36,7 +36,7 @@ public class DataFrameAnalyticsSourceTests extends AbstractXContentTestCase createParser(boolean ignoreUnknownFields) { ConstructingObjectParser parser = new ConstructingObjectParser<>("data_frame_analytics_source", - ignoreUnknownFields, a -> new DataFrameAnalyticsSource((String) a[0], (QueryProvider) a[1])); - parser.declareString(ConstructingObjectParser.constructorArg(), INDEX); + ignoreUnknownFields, a -> new DataFrameAnalyticsSource(((List) a[0]).toArray(new String[0]), (QueryProvider) a[1])); + parser.declareStringArray(ConstructingObjectParser.constructorArg(), INDEX); parser.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> QueryProvider.fromXContent(p, ignoreUnknownFields, Messages.DATA_FRAME_ANALYTICS_BAD_QUERY_FORMAT), QUERY); return parser; } - private final String index; + private final String[] index; private final QueryProvider queryProvider; - public DataFrameAnalyticsSource(String index, @Nullable QueryProvider queryProvider) { + public DataFrameAnalyticsSource(String[] index, @Nullable QueryProvider queryProvider) { this.index = ExceptionsHelper.requireNonNull(index, INDEX); - if (index.isEmpty()) { - throw ExceptionsHelper.badRequestException("[{}] must be non-empty", INDEX); + if (index.length == 0) { + throw new IllegalArgumentException("source.index must specify at least one index"); + } + if (Arrays.stream(index).anyMatch(Strings::isNullOrEmpty)) { + throw new IllegalArgumentException("source.index must contain non-null and non-empty strings"); } this.queryProvider = queryProvider == null ? QueryProvider.defaultQuery() : queryProvider; } public DataFrameAnalyticsSource(StreamInput in) throws IOException { - index = in.readString(); + index = in.readStringArray(); queryProvider = QueryProvider.fromStream(in); } public DataFrameAnalyticsSource(DataFrameAnalyticsSource other) { - this.index = other.index; + this.index = Arrays.copyOf(other.index, other.index.length); this.queryProvider = new QueryProvider(other.queryProvider); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeString(index); + out.writeStringArray(index); queryProvider.writeTo(out); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field(INDEX.getPreferredName(), index); + builder.array(INDEX.getPreferredName(), index); builder.field(QUERY.getPreferredName(), queryProvider.getQuery()); builder.endObject(); return builder; @@ -83,16 +88,16 @@ public class DataFrameAnalyticsSource implements Writeable, ToXContentObject { if (o == null || getClass() != o.getClass()) return false; DataFrameAnalyticsSource other = (DataFrameAnalyticsSource) o; - return Objects.equals(index, other.index) + return Arrays.equals(index, other.index) && Objects.equals(queryProvider, other.queryProvider); } @Override public int hashCode() { - return Objects.hash(index, queryProvider); + return Objects.hash(Arrays.asList(index), queryProvider); } - public String getIndex() { + public String[] getIndex() { return index; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index 417184f8a75..dfb95d2adac 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -51,8 +51,7 @@ public final class Messages { public static final String DATAFEED_ID_ALREADY_TAKEN = "A datafeed with id [{0}] already exists"; public static final String DATA_FRAME_ANALYTICS_BAD_QUERY_FORMAT = "Data Frame Analytics config query is not parsable"; - public static final String DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER = - "No compatible fields could be detected in index [{0}] with name [{1}]"; + public static final String DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER = "No field [{0}] could be detected"; public static final String FILTER_CANNOT_DELETE = "Cannot delete filter [{0}] currently used by jobs {1}"; public static final String FILTER_CONTAINS_TOO_MANY_ITEMS = "Filter [{0}] contains too many items; up to [{1}] items are allowed"; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSourceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSourceTests.java index 7783354d425..1db3477111d 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSourceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSourceTests.java @@ -44,7 +44,7 @@ public class DataFrameAnalyticsSourceTests extends AbstractSerializingTestCase validateListener = ActionListener.wrap( + ActionListener configListener = ActionListener.wrap( config -> memoryTracker.addDataFrameAnalyticsJobMemoryAndRefreshAllOthers( request.getId(), config.getModelMemoryLimit().getBytes(), memoryRequirementRefreshListener), listener::onFailure ); - // Validate config - ActionListener configListener = ActionListener.wrap( - config -> { - new SourceDestValidator(clusterService.state(), indexNameExpressionResolver).check(config); - DataFrameDataExtractorFactory.validateConfigAndSourceIndex(client, config, validateListener); - }, - listener::onFailure + // Get config + getConfigAndValidate(request.getId(), configListener); + } + + private void getConfigAndValidate(String id, ActionListener finalListener) { + // Validate mappings can be merged + ActionListener firstValidationListener = ActionListener.wrap( + config -> MappingsMerger.mergeMappings(client, config.getHeaders(), config.getSource().getIndex(), ActionListener.wrap( + mappings -> finalListener.onResponse(config), finalListener::onFailure)), + finalListener::onFailure ); - // Get config - configProvider.get(request.getId(), configListener); + // Validate source and dest; check data extraction is possible + ActionListener getConfigListener = ActionListener.wrap( + config -> { + new SourceDestValidator(clusterService.state(), indexNameExpressionResolver).check(config); + DataFrameDataExtractorFactory.validateConfigAndSourceIndex(client, config, firstValidationListener); + }, + finalListener::onFailure + ); + + // First, get the config + configProvider.get(id, getConfigListener); } private void waitForAnalyticsStarted(PersistentTasksCustomMetaData.PersistentTask task, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsIndex.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsIndex.java index 11812cd4f5b..e07eb99a3f5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsIndex.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsIndex.java @@ -10,23 +10,35 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.get.GetIndexResponse; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSortConfig; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import java.time.Clock; -import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; -import java.util.List; +import java.util.Iterator; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; + /** * {@link DataFrameAnalyticsIndex} class encapsulates logic for creating destination index based on source index metadata. */ @@ -36,57 +48,103 @@ final class DataFrameAnalyticsIndex { private static final String META = "_meta"; /** - * Unfortunately, getting the settings of an index include internal settings that should - * not be set explicitly. There is no way to filter those out. Thus, we have to maintain - * a list of them and filter them out manually. + * We only preserve the most important settings. + * If the user needs other settings on the destination index they + * should create the destination index before starting the analytics. */ - private static final List INTERNAL_SETTINGS = Arrays.asList( - "index.creation_date", - "index.provided_name", - "index.uuid", - "index.version.created", - "index.version.upgraded" - ); + private static final String[] PRESERVED_SETTINGS = new String[] {"index.number_of_shards", "index.number_of_replicas"}; + + private DataFrameAnalyticsIndex() {} /** * Creates destination index based on source index metadata. */ public static void createDestinationIndex(Client client, Clock clock, - ClusterState clusterState, DataFrameAnalyticsConfig analyticsConfig, ActionListener listener) { - String sourceIndex = analyticsConfig.getSource().getIndex(); - Map headers = analyticsConfig.getHeaders(); - IndexMetaData sourceIndexMetaData = clusterState.getMetaData().getIndices().get(sourceIndex); - if (sourceIndexMetaData == null) { - listener.onFailure(new IndexNotFoundException(sourceIndex)); - return; - } - CreateIndexRequest createIndexRequest = - prepareCreateIndexRequest(sourceIndexMetaData, analyticsConfig.getDest().getIndex(), analyticsConfig.getId(), clock); - ClientHelper.executeWithHeadersAsync( - headers, ClientHelper.ML_ORIGIN, client, CreateIndexAction.INSTANCE, createIndexRequest, listener); + ActionListener createIndexRequestListener = ActionListener.wrap( + createIndexRequest -> ClientHelper.executeWithHeadersAsync(analyticsConfig.getHeaders(), ClientHelper.ML_ORIGIN, client, + CreateIndexAction.INSTANCE, createIndexRequest, listener), + listener::onFailure + ); + + prepareCreateIndexRequest(client, clock, analyticsConfig, createIndexRequestListener); } - private static CreateIndexRequest prepareCreateIndexRequest(IndexMetaData sourceIndexMetaData, - String destinationIndex, - String analyticsId, - Clock clock) { - // Settings - Settings.Builder settingsBuilder = Settings.builder().put(sourceIndexMetaData.getSettings()); - INTERNAL_SETTINGS.forEach(settingsBuilder::remove); + private static void prepareCreateIndexRequest(Client client, Clock clock, DataFrameAnalyticsConfig config, + ActionListener listener) { + AtomicReference settingsHolder = new AtomicReference<>(); + + String[] sourceIndex = config.getSource().getIndex(); + + ActionListener> mappingsListener = ActionListener.wrap( + mappings -> listener.onResponse(createIndexRequest(clock, config, settingsHolder.get(), mappings)), + listener::onFailure + ); + + ActionListener settingsListener = ActionListener.wrap( + settings -> { + settingsHolder.set(settings); + MappingsMerger.mergeMappings(client, config.getHeaders(), sourceIndex, mappingsListener); + }, + listener::onFailure + ); + + ActionListener getSettingsResponseListener = ActionListener.wrap( + settingsResponse -> settingsListener.onResponse(settings(settingsResponse)), + listener::onFailure + ); + + GetSettingsRequest getSettingsRequest = new GetSettingsRequest(); + getSettingsRequest.indices(sourceIndex); + getSettingsRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); + getSettingsRequest.names(PRESERVED_SETTINGS); + ClientHelper.executeWithHeadersAsync(config.getHeaders(), ML_ORIGIN, client, GetSettingsAction.INSTANCE, + getSettingsRequest, getSettingsResponseListener); + } + + private static CreateIndexRequest createIndexRequest(Clock clock, DataFrameAnalyticsConfig config, Settings settings, + ImmutableOpenMap mappings) { + // There should only be 1 type + assert mappings.size() == 1; + + String destinationIndex = config.getDest().getIndex(); + String type = mappings.keysIt().next(); + Map mappingsAsMap = mappings.valuesIt().next().sourceAsMap(); + addProperties(mappingsAsMap); + addMetaData(mappingsAsMap, config.getId(), clock); + return new CreateIndexRequest(destinationIndex, settings).mapping(type, mappingsAsMap); + } + + private static Settings settings(GetSettingsResponse settingsResponse) { + Integer maxNumberOfShards = findMaxSettingValue(settingsResponse, IndexMetaData.SETTING_NUMBER_OF_SHARDS); + Integer maxNumberOfReplicas = findMaxSettingValue(settingsResponse, IndexMetaData.SETTING_NUMBER_OF_REPLICAS); + + Settings.Builder settingsBuilder = Settings.builder(); settingsBuilder.put(IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey(), DataFrameAnalyticsFields.ID); settingsBuilder.put(IndexSortConfig.INDEX_SORT_ORDER_SETTING.getKey(), SortOrder.ASC); - Settings settings = settingsBuilder.build(); + if (maxNumberOfShards != null) { + settingsBuilder.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, maxNumberOfShards); + } + if (maxNumberOfReplicas != null) { + settingsBuilder.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, maxNumberOfReplicas); + } + return settingsBuilder.build(); + } - // Mappings - String singleMappingType = sourceIndexMetaData.getMappings().keysIt().next(); - Map mappingsAsMap = sourceIndexMetaData.getMappings().valuesIt().next().sourceAsMap(); - addProperties(mappingsAsMap); - addMetaData(mappingsAsMap, analyticsId, clock); - - return new CreateIndexRequest(destinationIndex, settings).mapping(singleMappingType, mappingsAsMap); + @Nullable + private static Integer findMaxSettingValue(GetSettingsResponse settingsResponse, String settingKey) { + Integer maxValue = null; + Iterator settingsIterator = settingsResponse.getIndexToSettings().valuesIt(); + while (settingsIterator.hasNext()) { + Settings settings = settingsIterator.next(); + Integer indexValue = settings.getAsInt(settingKey, null); + if (indexValue != null) { + maxValue = maxValue == null ? indexValue : Math.max(indexValue, maxValue); + } + } + return maxValue; } private static void addProperties(Map mappingsAsMap) { @@ -115,6 +173,22 @@ final class DataFrameAnalyticsIndex { return value; } - private DataFrameAnalyticsIndex() {} + public static void updateMappingsToDestIndex(Client client, DataFrameAnalyticsConfig analyticsConfig, GetIndexResponse getIndexResponse, + ActionListener listener) { + // We have validated the destination index should match a single index + assert getIndexResponse.indices().length == 1; + + ImmutableOpenMap mappings = getIndexResponse.getMappings().get(getIndexResponse.indices()[0]); + String type = mappings.keysIt().next(); + + Map addedMappings = Collections.singletonMap(PROPERTIES, + Collections.singletonMap(DataFrameAnalyticsFields.ID, Collections.singletonMap("type", "keyword"))); + + PutMappingRequest putMappingRequest = new PutMappingRequest(getIndexResponse.indices()); + putMappingRequest.type(type); + putMappingRequest.source(addedMappings); + ClientHelper.executeWithHeadersAsync(analyticsConfig.getHeaders(), ML_ORIGIN, client, PutMappingAction.INSTANCE, + putMappingRequest, listener); + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java index 764ca08d735..c7cfe2b6253 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java @@ -5,16 +5,20 @@ */ package org.elasticsearch.xpack.ml.dataframe; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.get.GetIndexAction; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.reindex.BulkByScrollResponse; @@ -40,17 +44,17 @@ import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; public class DataFrameAnalyticsManager { - private final ClusterService clusterService; + private static final Logger LOGGER = LogManager.getLogger(DataFrameAnalyticsManager.class); + /** - * We need a {@link NodeClient} to be get the reindexing task and be able to report progress + * We need a {@link NodeClient} to get the reindexing task and be able to report progress */ private final NodeClient client; private final DataFrameAnalyticsConfigProvider configProvider; private final AnalyticsProcessManager processManager; - public DataFrameAnalyticsManager(ClusterService clusterService, NodeClient client, DataFrameAnalyticsConfigProvider configProvider, + public DataFrameAnalyticsManager(NodeClient client, DataFrameAnalyticsConfigProvider configProvider, AnalyticsProcessManager processManager) { - this.clusterService = Objects.requireNonNull(clusterService); this.client = Objects.requireNonNull(client); this.configProvider = Objects.requireNonNull(configProvider); this.processManager = Objects.requireNonNull(processManager); @@ -77,7 +81,6 @@ public class DataFrameAnalyticsManager { break; // The task has fully reindexed the documents and we should continue on with our analyses case ANALYZING: - // TODO apply previously stored model state if applicable startAnalytics(task, config, true); break; // If we are already at REINDEXING, we are not 100% sure if we reindexed ALL the docs. @@ -160,7 +163,27 @@ public class DataFrameAnalyticsManager { reindexCompletedListener::onFailure ); - DataFrameAnalyticsIndex.createDestinationIndex(client, Clock.systemUTC(), clusterService.state(), config, copyIndexCreatedListener); + // Create destination index if it does not exist + ActionListener destIndexListener = ActionListener.wrap( + indexResponse -> { + LOGGER.info("[{}] Using existing destination index [{}]", config.getId(), indexResponse.indices()[0]); + DataFrameAnalyticsIndex.updateMappingsToDestIndex(client, config, indexResponse, ActionListener.wrap( + acknowledgedResponse -> copyIndexCreatedListener.onResponse(null), + copyIndexCreatedListener::onFailure + )); + }, + e -> { + if (org.elasticsearch.ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) { + LOGGER.info("[{}] Creating destination index [{}]", config.getId(), config.getDest().getIndex()); + DataFrameAnalyticsIndex.createDestinationIndex(client, Clock.systemUTC(), config, copyIndexCreatedListener); + } else { + copyIndexCreatedListener.onFailure(e); + } + } + ); + + ClientHelper.executeWithHeadersAsync(config.getHeaders(), ML_ORIGIN, client, GetIndexAction.INSTANCE, + new GetIndexRequest().indices(config.getDest().getIndex()), destIndexListener); } private void startAnalytics(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, boolean isTaskRestarting) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/MappingsMerger.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/MappingsMerger.java new file mode 100644 index 00000000000..f007831f7cf --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/MappingsMerger.java @@ -0,0 +1,100 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.dataframe; + +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; + +/** + * Merges mappings in a best effort and naive manner. + * The merge will fail if there is any conflict, i.e. the mappings of a field are not exactly the same. + */ +public final class MappingsMerger { + + private MappingsMerger() {} + + public static void mergeMappings(Client client, Map headers, String[] index, + ActionListener> listener) { + ActionListener mappingsListener = ActionListener.wrap( + getMappingsResponse -> listener.onResponse(MappingsMerger.mergeMappings(getMappingsResponse)), + listener::onFailure + ); + + GetMappingsRequest getMappingsRequest = new GetMappingsRequest(); + getMappingsRequest.indices(index); + ClientHelper.executeWithHeadersAsync(headers, ML_ORIGIN, client, GetMappingsAction.INSTANCE, getMappingsRequest, mappingsListener); + } + + static ImmutableOpenMap mergeMappings(GetMappingsResponse getMappingsResponse) { + ImmutableOpenMap> indexToMappings = getMappingsResponse.getMappings(); + + String type = null; + Map mergedMappings = new HashMap<>(); + + Iterator>> iterator = indexToMappings.iterator(); + while (iterator.hasNext()) { + ObjectObjectCursor> indexMappings = iterator.next(); + Iterator> typeIterator = indexMappings.value.iterator(); + while (typeIterator.hasNext()) { + ObjectObjectCursor typeMapping = typeIterator.next(); + if (type == null) { + type = typeMapping.key; + } else { + if (type.equals(typeMapping.key) == false) { + throw ExceptionsHelper.badRequestException("source indices contain mappings for different types: [{}, {}]", + type, typeMapping.key); + } + } + Map currentMappings = typeMapping.value.getSourceAsMap(); + if (currentMappings.containsKey("properties")) { + + @SuppressWarnings("unchecked") + Map fieldMappings = (Map) currentMappings.get("properties"); + + for (Map.Entry fieldMapping : fieldMappings.entrySet()) { + if (mergedMappings.containsKey(fieldMapping.getKey())) { + if (mergedMappings.get(fieldMapping.getKey()).equals(fieldMapping.getValue()) == false) { + throw ExceptionsHelper.badRequestException("cannot merge mappings because of differences for field [{}]", + fieldMapping.getKey()); + } + } else { + mergedMappings.put(fieldMapping.getKey(), fieldMapping.getValue()); + } + } + } + } + } + + MappingMetaData mappingMetaData = createMappingMetaData(type, mergedMappings); + ImmutableOpenMap.Builder result = ImmutableOpenMap.builder(); + result.put(type, mappingMetaData); + return result.build(); + } + + private static MappingMetaData createMappingMetaData(String type, Map mappings) { + try { + return new MappingMetaData(type, Collections.singletonMap("properties", mappings)); + } catch (IOException e) { + throw ExceptionsHelper.serverError("Failed to parse mappings: " + mappings); + } + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/SourceDestValidator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/SourceDestValidator.java index f607387e317..01803dc4359 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/SourceDestValidator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/SourceDestValidator.java @@ -29,10 +29,13 @@ public class SourceDestValidator { } public void check(DataFrameAnalyticsConfig config) { - String sourceIndex = config.getSource().getIndex(); + String[] sourceIndex = config.getSource().getIndex(); String destIndex = config.getDest().getIndex(); - String[] sourceExpressions = Strings.tokenizeToStringArray(sourceIndex, ","); + String[] sourceExpressions = Arrays.stream(sourceIndex) + .map(index -> Strings.tokenizeToStringArray(index, ",")) + .flatMap(Arrays::stream) + .toArray(String[]::new); for (String sourceExpression : sourceExpressions) { if (Regex.simpleMatch(sourceExpression, destIndex)) { @@ -45,7 +48,7 @@ public class SourceDestValidator { IndicesOptions.lenientExpandOpen(), sourceExpressions))); if (concreteSourceIndexNames.isEmpty()) { - throw ExceptionsHelper.badRequestException("No index matches source index [{}]", sourceIndex); + throw ExceptionsHelper.badRequestException("No index matches source index {}", Arrays.toString(sourceIndex)); } final String[] concreteDestIndexNames = indexNameExpressionResolver.concreteIndexNames(clusterState, @@ -59,7 +62,7 @@ public class SourceDestValidator { if (concreteDestIndexNames.length == 1 && concreteSourceIndexNames.contains(concreteDestIndexNames[0])) { // In case the dest index is an alias, we need to check the concrete index is not matched by source throw ExceptionsHelper.badRequestException("Destination index [{}], which is an alias for [{}], " + - "must not be included in source index [{}]", destIndex, concreteDestIndexNames[0], sourceIndex); + "must not be included in source index {}", destIndex, concreteDestIndexNames[0], Arrays.toString(sourceIndex)); } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java index baf77c420c5..cacf00ad9e9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; @@ -73,7 +74,7 @@ public class DataFrameDataExtractorFactory { DataFrameAnalyticsConfig config, boolean isTaskRestarting, ActionListener listener) { - validateIndexAndExtractFields(client, config.getDest().getIndex(), config, isTaskRestarting, + validateIndexAndExtractFields(client, new String[] {config.getDest().getIndex()}, config, isTaskRestarting, ActionListener.wrap(extractedFields -> listener.onResponse(new DataFrameDataExtractorFactory( client, config.getId(), config.getDest().getIndex(), extractedFields, config.getHeaders())), listener::onFailure @@ -100,7 +101,7 @@ public class DataFrameDataExtractorFactory { } private static void validateIndexAndExtractFields(Client client, - String index, + String[] index, DataFrameAnalyticsConfig config, boolean isTaskRestarting, ActionListener listener) { @@ -120,6 +121,7 @@ public class DataFrameDataExtractorFactory { FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest(); fieldCapabilitiesRequest.indices(index); + fieldCapabilitiesRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); fieldCapabilitiesRequest.fields("*"); ClientHelper.executeWithHeaders(config.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> { client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, fieldCapabilitiesHandler); @@ -134,7 +136,7 @@ public class DataFrameDataExtractorFactory { getDocValueFieldsLimit(client, index, docValueFieldsLimitListener); } - private static void getDocValueFieldsLimit(Client client, String index, ActionListener docValueFieldsLimitListener) { + private static void getDocValueFieldsLimit(Client client, String[] index, ActionListener docValueFieldsLimitListener) { ActionListener settingsListener = ActionListener.wrap(getSettingsResponse -> { Integer minDocValueFieldsLimit = Integer.MAX_VALUE; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java index b36fc6f182a..d58eaebe353 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java @@ -55,13 +55,13 @@ public class ExtractedFieldsDetector { COMPATIBLE_FIELD_TYPES = Collections.unmodifiableSet(compatibleTypes); } - private final String index; + private final String[] index; private final DataFrameAnalyticsConfig config; private final boolean isTaskRestarting; private final int docValueFieldsLimit; private final FieldCapabilitiesResponse fieldCapabilitiesResponse; - ExtractedFieldsDetector(String index, DataFrameAnalyticsConfig config, boolean isTaskRestarting, int docValueFieldsLimit, + ExtractedFieldsDetector(String[] index, DataFrameAnalyticsConfig config, boolean isTaskRestarting, int docValueFieldsLimit, FieldCapabilitiesResponse fieldCapabilitiesResponse) { this.index = Objects.requireNonNull(index); this.config = Objects.requireNonNull(config); @@ -74,7 +74,7 @@ public class ExtractedFieldsDetector { Set fields = new HashSet<>(fieldCapabilitiesResponse.get().keySet()); fields.removeAll(IGNORE_FIELDS); - checkResultsFieldIsNotPresent(fields, index); + checkResultsFieldIsNotPresent(); // Ignore fields under the results object fields.removeIf(field -> field.startsWith(config.getDest().getResultsField() + ".")); @@ -87,7 +87,7 @@ public class ExtractedFieldsDetector { ExtractedFields extractedFields = ExtractedFields.build(sortedFields, Collections.emptySet(), fieldCapabilitiesResponse) .filterFields(ExtractedField.ExtractionMethod.DOC_VALUE); if (extractedFields.getAllFields().isEmpty()) { - throw ExceptionsHelper.badRequestException("No compatible fields could be detected in index [{}]", index); + throw ExceptionsHelper.badRequestException("No compatible fields could be detected in index {}", Arrays.toString(index)); } if (extractedFields.getDocValueFields().size() > docValueFieldsLimit) { extractedFields = fetchFromSourceIfSupported(extractedFields); @@ -100,11 +100,16 @@ public class ExtractedFieldsDetector { return extractedFields; } - private void checkResultsFieldIsNotPresent(Set fields, String index) { + private void checkResultsFieldIsNotPresent() { // If the task is restarting we do not mind the index containing the results field, we will overwrite all docs - if (isTaskRestarting == false && fields.contains(config.getDest().getResultsField())) { - throw ExceptionsHelper.badRequestException("Index [{}] already has a field that matches the {}.{} [{}];" + - " please set a different {}", index, DataFrameAnalyticsConfig.DEST.getPreferredName(), + if (isTaskRestarting) { + return; + } + + Map indexToFieldCaps = fieldCapabilitiesResponse.getField(config.getDest().getResultsField()); + if (indexToFieldCaps != null && indexToFieldCaps.isEmpty() == false) { + throw ExceptionsHelper.badRequestException("A field that matches the {}.{} [{}] already exists;" + + " please set a different {}", DataFrameAnalyticsConfig.DEST.getPreferredName(), DataFrameAnalyticsDest.RESULTS_FIELD.getPreferredName(), config.getDest().getResultsField(), DataFrameAnalyticsDest.RESULTS_FIELD.getPreferredName()); } @@ -121,7 +126,7 @@ public class ExtractedFieldsDetector { } } - private void includeAndExcludeFields(Set fields, String index) { + private void includeAndExcludeFields(Set fields, String[] index) { FetchSourceContext analyzedFields = config.getAnalyzedFields(); if (analyzedFields == null) { return; @@ -136,12 +141,14 @@ public class ExtractedFieldsDetector { // If the inclusion set does not match anything, that means the user's desired fields cannot be found in // the collection of supported field types. We should let the user know. Set includedSet = NameResolver.newUnaliased(fields, - (ex) -> new ResourceNotFoundException(Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER, index, ex))) + (ex) -> new ResourceNotFoundException( + Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER, ex))) .expand(includes, false); // If the exclusion set does not match anything, that means the fields are already not present // no need to raise if nothing matched Set excludedSet = NameResolver.newUnaliased(fields, - (ex) -> new ResourceNotFoundException(Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER, index, ex))) + (ex) -> new ResourceNotFoundException( + Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER, ex))) .expand(excludes, true); fields.retainAll(includedSet); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsIndexTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsIndexTests.java index 74d5526519d..80391de519e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsIndexTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsIndexTests.java @@ -10,17 +10,21 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.json.JsonXContent; -import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; @@ -33,27 +37,25 @@ import java.io.IOException; import java.time.Clock; import java.time.Instant; import java.time.ZoneId; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; public class DataFrameAnalyticsIndexTests extends ESTestCase { - private static final String CLUSTER_NAME = "some-cluster-name"; - private static final String ANALYTICS_ID = "some-analytics-id"; - private static final String SOURCE_INDEX = "source-index"; + private static final String[] SOURCE_INDEX = new String[] {"source-index"}; private static final String DEST_INDEX = "dest-index"; private static final DataFrameAnalyticsConfig ANALYTICS_CONFIG = new DataFrameAnalyticsConfig.Builder(ANALYTICS_ID) @@ -71,6 +73,8 @@ public class DataFrameAnalyticsIndexTests extends ESTestCase { public void testCreateDestinationIndex() throws IOException { when(client.threadPool()).thenReturn(threadPool); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + + ArgumentCaptor createIndexRequestCaptor = ArgumentCaptor.forClass(CreateIndexRequest.class); doAnswer( invocationOnMock -> { @SuppressWarnings("unchecked") @@ -78,60 +82,102 @@ public class DataFrameAnalyticsIndexTests extends ESTestCase { listener.onResponse(null); return null; }) - .when(client).execute(any(), any(), any()); + .when(client).execute(eq(CreateIndexAction.INSTANCE), createIndexRequestCaptor.capture(), any()); + + Settings index1Settings = Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .build(); + + Settings index2Settings = Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 5) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build(); + + ArgumentCaptor getSettingsRequestCaptor = ArgumentCaptor.forClass(GetSettingsRequest.class); + ArgumentCaptor getMappingsRequestCaptor = ArgumentCaptor.forClass(GetMappingsRequest.class); + + ImmutableOpenMap.Builder indexToSettings = ImmutableOpenMap.builder(); + indexToSettings.put("index_1", index1Settings); + indexToSettings.put("index_2", index2Settings); + + GetSettingsResponse getSettingsResponse = new GetSettingsResponse(indexToSettings.build(), ImmutableOpenMap.of()); + + doAnswer( + invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(getSettingsResponse); + return null; + } + ).when(client).execute(eq(GetSettingsAction.INSTANCE), getSettingsRequestCaptor.capture(), any()); + + Map index1Properties = new HashMap<>(); + index1Properties.put("field_1", "field_1_mappings"); + index1Properties.put("field_2", "field_2_mappings"); + Map index1Mappings = Collections.singletonMap("properties", index1Properties); + MappingMetaData index1MappingMetaData = new MappingMetaData("_doc", index1Mappings); + + Map index2Properties = new HashMap<>(); + index2Properties.put("field_1", "field_1_mappings"); + index2Properties.put("field_2", "field_2_mappings"); + Map index2Mappings = Collections.singletonMap("properties", index2Properties); + MappingMetaData index2MappingMetaData = new MappingMetaData("_doc", index2Mappings); + + ImmutableOpenMap.Builder index1MappingsMap = ImmutableOpenMap.builder(); + index1MappingsMap.put("_doc", index1MappingMetaData); + ImmutableOpenMap.Builder index2MappingsMap = ImmutableOpenMap.builder(); + index2MappingsMap.put("_doc", index2MappingMetaData); + + ImmutableOpenMap.Builder> mappings = ImmutableOpenMap.builder(); + mappings.put("index_1", index1MappingsMap.build()); + mappings.put("index_2", index2MappingsMap.build()); + + GetMappingsResponse getMappingsResponse = new GetMappingsResponse(mappings.build()); + + doAnswer( + invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(getMappingsResponse); + return null; + } + ).when(client).execute(eq(GetMappingsAction.INSTANCE), getMappingsRequestCaptor.capture(), any()); - Map propertiesMapping = new HashMap<>(); - propertiesMapping.put("properties", new HashMap<>()); - ClusterState clusterState = - ClusterState.builder(new ClusterName(CLUSTER_NAME)) - .metaData(MetaData.builder() - .put(IndexMetaData.builder(SOURCE_INDEX) - .settings(Settings.builder() - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)) - .putMapping(new MappingMetaData("_doc", propertiesMapping)))) - .build(); DataFrameAnalyticsIndex.createDestinationIndex( client, clock, - clusterState, ANALYTICS_CONFIG, ActionListener.wrap( response -> {}, e -> fail(e.getMessage()))); - ArgumentCaptor createIndexRequestCaptor = ArgumentCaptor.forClass(CreateIndexRequest.class); - verify(client, atLeastOnce()).threadPool(); - verify(client).execute(eq(CreateIndexAction.INSTANCE), createIndexRequestCaptor.capture(), any()); - verifyNoMoreInteractions(client); + GetSettingsRequest capturedGetSettingsRequest = getSettingsRequestCaptor.getValue(); + assertThat(capturedGetSettingsRequest.indices(), equalTo(SOURCE_INDEX)); + assertThat(capturedGetSettingsRequest.indicesOptions(), equalTo(IndicesOptions.lenientExpandOpen())); + assertThat(Arrays.asList(capturedGetSettingsRequest.names()), contains("index.number_of_shards", "index.number_of_replicas")); + + assertThat(getMappingsRequestCaptor.getValue().indices(), equalTo(SOURCE_INDEX)); CreateIndexRequest createIndexRequest = createIndexRequestCaptor.getValue(); + + assertThat(createIndexRequest.settings().keySet(), + containsInAnyOrder("index.number_of_shards", "index.number_of_replicas", "index.sort.field", "index.sort.order")); + assertThat(createIndexRequest.settings().getAsInt("index.number_of_shards", -1), equalTo(5)); + assertThat(createIndexRequest.settings().getAsInt("index.number_of_replicas", -1), equalTo(1)); + assertThat(createIndexRequest.settings().get("index.sort.field"), equalTo("_id_copy")); + assertThat(createIndexRequest.settings().get("index.sort.order"), equalTo("asc")); + try (XContentParser parser = createParser(JsonXContent.jsonXContent, createIndexRequest.mappings().get("_doc"))) { Map map = parser.map(); assertThat(extractValue("_doc.properties._id_copy.type", map), equalTo("keyword")); + assertThat(extractValue("_doc.properties.field_1", map), equalTo("field_1_mappings")); + assertThat(extractValue("_doc.properties.field_2", map), equalTo("field_2_mappings")); assertThat(extractValue("_doc._meta.analytics", map), equalTo(ANALYTICS_ID)); assertThat(extractValue("_doc._meta.creation_date_in_millis", map), equalTo(CURRENT_TIME_MILLIS)); assertThat(extractValue("_doc._meta.created_by", map), equalTo(CREATED_BY)); } } - - public void testCreateDestinationIndex_IndexNotFound() { - ClusterState clusterState = - ClusterState.builder(new ClusterName(CLUSTER_NAME)) - .metaData(MetaData.builder()) - .build(); - DataFrameAnalyticsIndex.createDestinationIndex( - client, - clock, - clusterState, - ANALYTICS_CONFIG, - ActionListener.wrap( - response -> fail("IndexNotFoundException should be thrown"), - e -> { - assertThat(e, instanceOf(IndexNotFoundException.class)); - IndexNotFoundException infe = (IndexNotFoundException) e; - assertThat(infe.getIndex().getName(), equalTo(SOURCE_INDEX)); - })); - } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/MappingsMergerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/MappingsMergerTests.java new file mode 100644 index 00000000000..5c7b08ba1c7 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/MappingsMergerTests.java @@ -0,0 +1,153 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.dataframe; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class MappingsMergerTests extends ESTestCase { + + public void testMergeMappings_GivenIndicesWithIdenticalMappings() throws IOException { + Map index1Properties = new HashMap<>(); + index1Properties.put("field_1", "field_1_mappings"); + index1Properties.put("field_2", "field_2_mappings"); + Map index1Mappings = Collections.singletonMap("properties", index1Properties); + MappingMetaData index1MappingMetaData = new MappingMetaData("_doc", index1Mappings); + + Map index2Properties = new HashMap<>(); + index2Properties.put("field_1", "field_1_mappings"); + index2Properties.put("field_2", "field_2_mappings"); + Map index2Mappings = Collections.singletonMap("properties", index2Properties); + MappingMetaData index2MappingMetaData = new MappingMetaData("_doc", index2Mappings); + + ImmutableOpenMap.Builder index1MappingsMap = ImmutableOpenMap.builder(); + index1MappingsMap.put("_doc", index1MappingMetaData); + ImmutableOpenMap.Builder index2MappingsMap = ImmutableOpenMap.builder(); + index2MappingsMap.put("_doc", index2MappingMetaData); + + ImmutableOpenMap.Builder> mappings = ImmutableOpenMap.builder(); + mappings.put("index_1", index1MappingsMap.build()); + mappings.put("index_2", index2MappingsMap.build()); + + GetMappingsResponse getMappingsResponse = new GetMappingsResponse(mappings.build()); + + ImmutableOpenMap mergedMappings = MappingsMerger.mergeMappings(getMappingsResponse); + + assertThat(mergedMappings.size(), equalTo(1)); + assertThat(mergedMappings.containsKey("_doc"), is(true)); + assertThat(mergedMappings.valuesIt().next().getSourceAsMap(), equalTo(index1Mappings)); + } + + public void testMergeMappings_GivenIndicesWithDifferentTypes() throws IOException { + Map index1Mappings = Collections.singletonMap("properties", + Collections.singletonMap("field_1", "field_1_mappings")); + MappingMetaData index1MappingMetaData = new MappingMetaData("type_1", index1Mappings); + + Map index2Mappings = Collections.singletonMap("properties", + Collections.singletonMap("field_1", "field_1_mappings")); + MappingMetaData index2MappingMetaData = new MappingMetaData("type_2", index2Mappings); + + ImmutableOpenMap.Builder index1MappingsMap = ImmutableOpenMap.builder(); + index1MappingsMap.put("type_1", index1MappingMetaData); + ImmutableOpenMap.Builder index2MappingsMap = ImmutableOpenMap.builder(); + index2MappingsMap.put("type_2", index2MappingMetaData); + + ImmutableOpenMap.Builder> mappings = ImmutableOpenMap.builder(); + mappings.put("index_1", index1MappingsMap.build()); + mappings.put("index_2", index2MappingsMap.build()); + + GetMappingsResponse getMappingsResponse = new GetMappingsResponse(mappings.build()); + + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, + () -> MappingsMerger.mergeMappings(getMappingsResponse)); + assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(e.getMessage(), containsString("source indices contain mappings for different types:")); + assertThat(e.getMessage(), containsString("type_1")); + assertThat(e.getMessage(), containsString("type_2")); + } + + public void testMergeMappings_GivenFieldWithDifferentMapping() throws IOException { + Map index1Mappings = Collections.singletonMap("properties", + Collections.singletonMap("field_1", "field_1_mappings")); + MappingMetaData index1MappingMetaData = new MappingMetaData("_doc", index1Mappings); + + Map index2Mappings = Collections.singletonMap("properties", + Collections.singletonMap("field_1", "different_field_1_mappings")); + MappingMetaData index2MappingMetaData = new MappingMetaData("_doc", index2Mappings); + + ImmutableOpenMap.Builder index1MappingsMap = ImmutableOpenMap.builder(); + index1MappingsMap.put("_doc", index1MappingMetaData); + ImmutableOpenMap.Builder index2MappingsMap = ImmutableOpenMap.builder(); + index2MappingsMap.put("_doc", index2MappingMetaData); + + ImmutableOpenMap.Builder> mappings = ImmutableOpenMap.builder(); + mappings.put("index_1", index1MappingsMap.build()); + mappings.put("index_2", index2MappingsMap.build()); + + GetMappingsResponse getMappingsResponse = new GetMappingsResponse(mappings.build()); + + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, + () -> MappingsMerger.mergeMappings(getMappingsResponse)); + assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(e.getMessage(), equalTo("cannot merge mappings because of differences for field [field_1]")); + } + + public void testMergeMappings_GivenIndicesWithDifferentMappingsButNoConflicts() throws IOException { + Map index1Properties = new HashMap<>(); + index1Properties.put("field_1", "field_1_mappings"); + index1Properties.put("field_2", "field_2_mappings"); + Map index1Mappings = Collections.singletonMap("properties", index1Properties); + MappingMetaData index1MappingMetaData = new MappingMetaData("_doc", index1Mappings); + + Map index2Properties = new HashMap<>(); + index2Properties.put("field_1", "field_1_mappings"); + index2Properties.put("field_3", "field_3_mappings"); + Map index2Mappings = Collections.singletonMap("properties", index2Properties); + MappingMetaData index2MappingMetaData = new MappingMetaData("_doc", index2Mappings); + + ImmutableOpenMap.Builder index1MappingsMap = ImmutableOpenMap.builder(); + index1MappingsMap.put("_doc", index1MappingMetaData); + ImmutableOpenMap.Builder index2MappingsMap = ImmutableOpenMap.builder(); + index2MappingsMap.put("_doc", index2MappingMetaData); + + ImmutableOpenMap.Builder> mappings = ImmutableOpenMap.builder(); + mappings.put("index_1", index1MappingsMap.build()); + mappings.put("index_2", index2MappingsMap.build()); + + GetMappingsResponse getMappingsResponse = new GetMappingsResponse(mappings.build()); + + ImmutableOpenMap mergedMappings = MappingsMerger.mergeMappings(getMappingsResponse); + + assertThat(mergedMappings.size(), equalTo(1)); + assertThat(mergedMappings.containsKey("_doc"), is(true)); + Map mappingsAsMap = mergedMappings.valuesIt().next().getSourceAsMap(); + assertThat(mappingsAsMap.size(), equalTo(1)); + assertThat(mappingsAsMap.containsKey("properties"), is(true)); + + @SuppressWarnings("unchecked") + Map fieldMappings = (Map) mappingsAsMap.get("properties"); + + assertThat(fieldMappings.size(), equalTo(3)); + assertThat(fieldMappings.keySet(), containsInAnyOrder("field_1", "field_2", "field_3")); + assertThat(fieldMappings.get("field_1"), equalTo("field_1_mappings")); + assertThat(fieldMappings.get("field_2"), equalTo("field_2_mappings")); + assertThat(fieldMappings.get("field_3"), equalTo("field_3_mappings")); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/SourceDestValidatorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/SourceDestValidatorTests.java index fb91673b7a5..d48d079314a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/SourceDestValidatorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/SourceDestValidatorTests.java @@ -64,7 +64,7 @@ public class SourceDestValidatorTests extends ESTestCase { public void testCheck_GivenSimpleSourceIndexAndValidDestIndex() { DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder("test") - .setSource(new DataFrameAnalyticsSource("source-1", null)) + .setSource(createSource("source-1")) .setDest(new DataFrameAnalyticsDest("dest", null)) .setAnalysis(new OutlierDetection()) .build(); @@ -75,7 +75,7 @@ public class SourceDestValidatorTests extends ESTestCase { public void testCheck_GivenMissingConcreteSourceIndex() { DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder("test") - .setSource(new DataFrameAnalyticsSource("missing", null)) + .setSource(createSource("missing")) .setDest(new DataFrameAnalyticsDest("dest", null)) .setAnalysis(new OutlierDetection()) .build(); @@ -89,7 +89,7 @@ public class SourceDestValidatorTests extends ESTestCase { public void testCheck_GivenMissingWildcardSourceIndex() { DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder("test") - .setSource(new DataFrameAnalyticsSource("missing*", null)) + .setSource(createSource("missing*")) .setDest(new DataFrameAnalyticsDest("dest", null)) .setAnalysis(new OutlierDetection()) .build(); @@ -103,7 +103,7 @@ public class SourceDestValidatorTests extends ESTestCase { public void testCheck_GivenDestIndexSameAsSourceIndex() { DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder("test") - .setSource(new DataFrameAnalyticsSource("source-1", null)) + .setSource(createSource("source-1")) .setDest(new DataFrameAnalyticsDest("source-1", null)) .setAnalysis(new OutlierDetection()) .build(); @@ -117,7 +117,7 @@ public class SourceDestValidatorTests extends ESTestCase { public void testCheck_GivenDestIndexMatchesSourceIndex() { DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder("test") - .setSource(new DataFrameAnalyticsSource("source-*", null)) + .setSource(createSource("source-*")) .setDest(new DataFrameAnalyticsDest(SOURCE_2, null)) .setAnalysis(new OutlierDetection()) .build(); @@ -131,7 +131,7 @@ public class SourceDestValidatorTests extends ESTestCase { public void testCheck_GivenDestIndexMatchesOneOfSourceIndices() { DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder("test") - .setSource(new DataFrameAnalyticsSource("source-1,source-*", null)) + .setSource(createSource("source-1,source-*")) .setDest(new DataFrameAnalyticsDest(SOURCE_2, null)) .setAnalysis(new OutlierDetection()) .build(); @@ -145,7 +145,7 @@ public class SourceDestValidatorTests extends ESTestCase { public void testCheck_GivenDestIndexIsAliasThatMatchesMultipleIndices() { DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder("test") - .setSource(new DataFrameAnalyticsSource(SOURCE_1, null)) + .setSource(createSource(SOURCE_1)) .setDest(new DataFrameAnalyticsDest("dest-alias", null)) .setAnalysis(new OutlierDetection()) .build(); @@ -160,7 +160,7 @@ public class SourceDestValidatorTests extends ESTestCase { public void testCheck_GivenDestIndexIsAliasThatIsIncludedInSource() { DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder("test") - .setSource(new DataFrameAnalyticsSource("source-1", null)) + .setSource(createSource("source-1")) .setDest(new DataFrameAnalyticsDest("source-1-alias", null)) .setAnalysis(new OutlierDetection()) .build(); @@ -173,4 +173,8 @@ public class SourceDestValidatorTests extends ESTestCase { equalTo("Destination index [source-1-alias], which is an alias for [source-1], " + "must not be included in source index [source-1]")); } + + private static DataFrameAnalyticsSource createSource(String... index) { + return new DataFrameAnalyticsSource(index, null); + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java index c035c44f117..1345a1fe128 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java @@ -33,7 +33,7 @@ import static org.mockito.Mockito.when; public class ExtractedFieldsDetectorTests extends ESTestCase { - private static final String SOURCE_INDEX = "source_index"; + private static final String[] SOURCE_INDEX = new String[] { "source_index" }; private static final String DEST_INDEX = "dest_index"; private static final String RESULTS_FIELD = "ml"; @@ -154,7 +154,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { SOURCE_INDEX, buildAnalyticsConfig(desiredFields), false, 100, fieldCapabilities); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); - assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index] with name [your_field1]")); + assertThat(e.getMessage(), equalTo("No field [your_field1] could be detected")); } public void testDetectedExtractedFields_GivenExcludeAllValidFields() { @@ -202,7 +202,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { SOURCE_INDEX, buildAnalyticsConfig(), false, 100, fieldCapabilities); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); - assertThat(e.getMessage(), equalTo("Index [source_index] already has a field that matches the dest.results_field [ml]; " + + assertThat(e.getMessage(), equalTo("A field that matches the dest.results_field [ml] already exists; " + "please set a different results_field")); } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml index 01afb7714f3..168173e648e 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml @@ -50,7 +50,7 @@ setup: "analyzed_fields": [ "obj1.*", "obj2.*" ] } - match: { id: "simple-outlier-detection-with-query" } - - match: { source.index: "index-source" } + - match: { source.index: ["index-source"] } - match: { source.query: {"term" : { "user" : "Kimchy"} } } - match: { dest.index: "index-dest" } - match: { analysis: {"outlier_detection":{}} } @@ -63,7 +63,7 @@ setup: id: "simple-outlier-detection-with-query" - match: { count: 1 } - match: { data_frame_analytics.0.id: "simple-outlier-detection-with-query" } - - match: { data_frame_analytics.0.source.index: "index-source" } + - match: { data_frame_analytics.0.source.index: ["index-source"] } - match: { data_frame_analytics.0.source.query: {"term" : { "user" : "Kimchy"} } } - match: { data_frame_analytics.0.dest.index: "index-dest" } - match: { data_frame_analytics.0.analysis: {"outlier_detection":{}} } @@ -145,7 +145,7 @@ setup: "analysis": {"outlier_detection":{}} } - match: { id: "simple-outlier-detection" } - - match: { source.index: "index-source" } + - match: { source.index: ["index-source"] } - match: { source.query: {"match_all" : {} } } - match: { dest.index: "index-dest" } - match: { analysis: {"outlier_detection":{}} } @@ -175,7 +175,7 @@ setup: } } - match: { id: "custom-outlier-detection" } - - match: { source.index: "index-source" } + - match: { source.index: ["index-source"] } - match: { source.query: {"match_all" : {} } } - match: { dest.index: "index-dest" } - match: { analysis.outlier_detection.n_neighbors: 5 } @@ -427,16 +427,34 @@ setup: } --- -"Test put config given source with empty index": +"Test put config given source with empty index array": - do: - catch: /\[index\] must be non-empty/ + catch: /source\.index must specify at least one index/ ml.put_data_frame_analytics: id: "simple-outlier-detection" body: > { "source": { - "index": "" + "index": [] + }, + "dest": { + "index": "index-dest" + }, + "analysis": {"outlier_detection":{}} + } + +--- +"Test put config given source with empty string in index array": + + - do: + catch: /source\.index must contain non-null and non-empty strings/ + ml.put_data_frame_analytics: + id: "simple-outlier-detection" + body: > + { + "source": { + "index": [""] }, "dest": { "index": "index-dest" @@ -889,7 +907,7 @@ setup: "analyzed_fields": [ "obj1.*", "obj2.*" ] } - match: { id: "simple-outlier-detection-with-query" } - - match: { source.index: "index-source" } + - match: { source.index: ["index-source"] } - match: { source.query: {"term" : { "user" : "Kimchy"} } } - match: { dest.index: "index-dest" } - match: { analysis: {"outlier_detection":{}} }