[7.x][ML] Support multiple source indices for df-analytics (#43702) (#43731)

This commit adds support for multiple source indices.
In order to deal with multiple indices having different mappings,
it attempts a best-effort approach to merge the mappings assuming
there are no conflicts. In case conflicts exists an error will be
returned.

To allow users creating custom mappings for special use cases,
the destination index is now allowed to exist before the analytics
job runs. In addition, settings are no longer copied except for
the `index.number_of_shards` and `index.number_of_replicas`.
This commit is contained in:
Dimitris Athanasiou 2019-06-28 13:28:03 +03:00 committed by GitHub
parent 2cc7f5a744
commit cab879118d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 741 additions and 187 deletions

View File

@ -28,6 +28,8 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
public class DataFrameAnalyticsSource implements ToXContentObject {
@ -46,19 +48,19 @@ public class DataFrameAnalyticsSource implements ToXContentObject {
private static ObjectParser<Builder, Void> PARSER = new ObjectParser<>("data_frame_analytics_source", true, Builder::new);
static {
PARSER.declareString(Builder::setIndex, INDEX);
PARSER.declareStringArray(Builder::setIndex, INDEX);
PARSER.declareObject(Builder::setQueryConfig, (p, c) -> QueryConfig.fromXContent(p), QUERY);
}
private final String index;
private final String[] index;
private final QueryConfig queryConfig;
private DataFrameAnalyticsSource(String index, @Nullable QueryConfig queryConfig) {
private DataFrameAnalyticsSource(String[] index, @Nullable QueryConfig queryConfig) {
this.index = Objects.requireNonNull(index);
this.queryConfig = queryConfig;
}
public String getIndex() {
public String[] getIndex() {
return index;
}
@ -83,13 +85,13 @@ public class DataFrameAnalyticsSource implements ToXContentObject {
if (o == null || getClass() != o.getClass()) return false;
DataFrameAnalyticsSource other = (DataFrameAnalyticsSource) o;
return Objects.equals(index, other.index)
return Arrays.equals(index, other.index)
&& Objects.equals(queryConfig, other.queryConfig);
}
@Override
public int hashCode() {
return Objects.hash(index, queryConfig);
return Objects.hash(Arrays.asList(index), queryConfig);
}
@Override
@ -99,16 +101,21 @@ public class DataFrameAnalyticsSource implements ToXContentObject {
public static class Builder {
private String index;
private String[] index;
private QueryConfig queryConfig;
private Builder() {}
public Builder setIndex(String index) {
public Builder setIndex(String... index) {
this.index = index;
return this;
}
public Builder setIndex(List<String> index) {
this.index = index.toArray(new String[0]);
return this;
}
public Builder setQueryConfig(QueryConfig queryConfig) {
this.queryConfig = queryConfig;
return this;

View File

@ -2802,7 +2802,7 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
}
public void testGetDataFrameAnalytics() throws Exception {
createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex());
createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex()[0]);
RestHighLevelClient client = highLevelClient();
client.machineLearning().putDataFrameAnalytics(new PutDataFrameAnalyticsRequest(DF_ANALYTICS_CONFIG), RequestOptions.DEFAULT);
@ -2851,7 +2851,7 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
}
public void testGetDataFrameAnalyticsStats() throws Exception {
createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex());
createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex()[0]);
RestHighLevelClient client = highLevelClient();
client.machineLearning().putDataFrameAnalytics(new PutDataFrameAnalyticsRequest(DF_ANALYTICS_CONFIG), RequestOptions.DEFAULT);
@ -2901,7 +2901,7 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
}
public void testPutDataFrameAnalytics() throws Exception {
createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex());
createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex()[0]);
RestHighLevelClient client = highLevelClient();
{
@ -2994,7 +2994,7 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
}
public void testDeleteDataFrameAnalytics() throws Exception {
createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex());
createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex()[0]);
RestHighLevelClient client = highLevelClient();
client.machineLearning().putDataFrameAnalytics(new PutDataFrameAnalyticsRequest(DF_ANALYTICS_CONFIG), RequestOptions.DEFAULT);
@ -3044,9 +3044,9 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
}
public void testStartDataFrameAnalytics() throws Exception {
createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex());
createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex()[0]);
highLevelClient().index(
new IndexRequest(DF_ANALYTICS_CONFIG.getSource().getIndex()).source(XContentType.JSON, "total", 10000)
new IndexRequest(DF_ANALYTICS_CONFIG.getSource().getIndex()[0]).source(XContentType.JSON, "total", 10000)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), RequestOptions.DEFAULT);
RestHighLevelClient client = highLevelClient();
client.machineLearning().putDataFrameAnalytics(new PutDataFrameAnalyticsRequest(DF_ANALYTICS_CONFIG), RequestOptions.DEFAULT);
@ -3101,9 +3101,9 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
}
public void testStopDataFrameAnalytics() throws Exception {
createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex());
createIndex(DF_ANALYTICS_CONFIG.getSource().getIndex()[0]);
highLevelClient().index(
new IndexRequest(DF_ANALYTICS_CONFIG.getSource().getIndex()).source(XContentType.JSON, "total", 10000)
new IndexRequest(DF_ANALYTICS_CONFIG.getSource().getIndex()[0]).source(XContentType.JSON, "total", 10000)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), RequestOptions.DEFAULT);
RestHighLevelClient client = highLevelClient();
client.machineLearning().putDataFrameAnalytics(new PutDataFrameAnalyticsRequest(DF_ANALYTICS_CONFIG), RequestOptions.DEFAULT);

View File

@ -36,7 +36,7 @@ public class DataFrameAnalyticsSourceTests extends AbstractXContentTestCase<Data
public static DataFrameAnalyticsSource randomSourceConfig() {
return DataFrameAnalyticsSource.builder()
.setIndex(randomAlphaOfLengthBetween(1, 10))
.setIndex(generateRandomStringArray(10, 10, false, false))
.setQueryConfig(randomBoolean() ? null : randomQueryConfig())
.build();
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.ml.dataframe;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
@ -23,6 +24,7 @@ import org.elasticsearch.xpack.core.ml.utils.XContentObjectTransformer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -34,44 +36,47 @@ public class DataFrameAnalyticsSource implements Writeable, ToXContentObject {
public static ConstructingObjectParser<DataFrameAnalyticsSource, Void> createParser(boolean ignoreUnknownFields) {
ConstructingObjectParser<DataFrameAnalyticsSource, Void> parser = new ConstructingObjectParser<>("data_frame_analytics_source",
ignoreUnknownFields, a -> new DataFrameAnalyticsSource((String) a[0], (QueryProvider) a[1]));
parser.declareString(ConstructingObjectParser.constructorArg(), INDEX);
ignoreUnknownFields, a -> new DataFrameAnalyticsSource(((List<String>) a[0]).toArray(new String[0]), (QueryProvider) a[1]));
parser.declareStringArray(ConstructingObjectParser.constructorArg(), INDEX);
parser.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> QueryProvider.fromXContent(p, ignoreUnknownFields, Messages.DATA_FRAME_ANALYTICS_BAD_QUERY_FORMAT), QUERY);
return parser;
}
private final String index;
private final String[] index;
private final QueryProvider queryProvider;
public DataFrameAnalyticsSource(String index, @Nullable QueryProvider queryProvider) {
public DataFrameAnalyticsSource(String[] index, @Nullable QueryProvider queryProvider) {
this.index = ExceptionsHelper.requireNonNull(index, INDEX);
if (index.isEmpty()) {
throw ExceptionsHelper.badRequestException("[{}] must be non-empty", INDEX);
if (index.length == 0) {
throw new IllegalArgumentException("source.index must specify at least one index");
}
if (Arrays.stream(index).anyMatch(Strings::isNullOrEmpty)) {
throw new IllegalArgumentException("source.index must contain non-null and non-empty strings");
}
this.queryProvider = queryProvider == null ? QueryProvider.defaultQuery() : queryProvider;
}
public DataFrameAnalyticsSource(StreamInput in) throws IOException {
index = in.readString();
index = in.readStringArray();
queryProvider = QueryProvider.fromStream(in);
}
public DataFrameAnalyticsSource(DataFrameAnalyticsSource other) {
this.index = other.index;
this.index = Arrays.copyOf(other.index, other.index.length);
this.queryProvider = new QueryProvider(other.queryProvider);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(index);
out.writeStringArray(index);
queryProvider.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(INDEX.getPreferredName(), index);
builder.array(INDEX.getPreferredName(), index);
builder.field(QUERY.getPreferredName(), queryProvider.getQuery());
builder.endObject();
return builder;
@ -83,16 +88,16 @@ public class DataFrameAnalyticsSource implements Writeable, ToXContentObject {
if (o == null || getClass() != o.getClass()) return false;
DataFrameAnalyticsSource other = (DataFrameAnalyticsSource) o;
return Objects.equals(index, other.index)
return Arrays.equals(index, other.index)
&& Objects.equals(queryProvider, other.queryProvider);
}
@Override
public int hashCode() {
return Objects.hash(index, queryProvider);
return Objects.hash(Arrays.asList(index), queryProvider);
}
public String getIndex() {
public String[] getIndex() {
return index;
}

View File

@ -51,8 +51,7 @@ public final class Messages {
public static final String DATAFEED_ID_ALREADY_TAKEN = "A datafeed with id [{0}] already exists";
public static final String DATA_FRAME_ANALYTICS_BAD_QUERY_FORMAT = "Data Frame Analytics config query is not parsable";
public static final String DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER =
"No compatible fields could be detected in index [{0}] with name [{1}]";
public static final String DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER = "No field [{0}] could be detected";
public static final String FILTER_CANNOT_DELETE = "Cannot delete filter [{0}] currently used by jobs {1}";
public static final String FILTER_CONTAINS_TOO_MANY_ITEMS = "Filter [{0}] contains too many items; up to [{1}] items are allowed";

View File

@ -44,7 +44,7 @@ public class DataFrameAnalyticsSourceTests extends AbstractSerializingTestCase<D
}
public static DataFrameAnalyticsSource createRandom() {
String index = randomAlphaOfLength(10);
String[] index = generateRandomStringArray(10, 10, false, false);
QueryProvider queryProvider = null;
if (randomBoolean()) {
try {

View File

@ -53,7 +53,8 @@ integTest.runner {
'ml/data_frame_analytics_crud/Test put config with unknown top level field',
'ml/data_frame_analytics_crud/Test put config with unknown field in outlier detection analysis',
'ml/data_frame_analytics_crud/Test put config given missing source',
'ml/data_frame_analytics_crud/Test put config given source with empty index',
'ml/data_frame_analytics_crud/Test put config given source with empty index array',
'ml/data_frame_analytics_crud/Test put config given source with empty string in index array',
'ml/data_frame_analytics_crud/Test put config given source without index',
'ml/data_frame_analytics_crud/Test put config given missing dest',
'ml/data_frame_analytics_crud/Test put config given dest with empty index',

View File

@ -69,7 +69,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
}
String id = "test_outlier_detection_with_few_docs";
DataFrameAnalyticsConfig config = buildOutlierDetectionAnalytics(id, sourceIndex, null);
DataFrameAnalyticsConfig config = buildOutlierDetectionAnalytics(id, new String[] {sourceIndex}, sourceIndex + "-results", null);
registerAnalytics(config);
putAnalytics(config);
@ -130,7 +130,8 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
}
String id = "test_outlier_detection_with_enough_docs_to_scroll";
DataFrameAnalyticsConfig config = buildOutlierDetectionAnalytics(id, sourceIndex, "custom_ml");
DataFrameAnalyticsConfig config = buildOutlierDetectionAnalytics(
id, new String[] {sourceIndex}, sourceIndex + "-results", "custom_ml");
registerAnalytics(config);
putAnalytics(config);
@ -188,7 +189,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
}
String id = "test_outlier_detection_with_more_fields_than_docvalue_limit";
DataFrameAnalyticsConfig config = buildOutlierDetectionAnalytics(id, sourceIndex, null);
DataFrameAnalyticsConfig config = buildOutlierDetectionAnalytics(id, new String[] {sourceIndex}, sourceIndex + "-results", null);
registerAnalytics(config);
putAnalytics(config);
@ -216,7 +217,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
}
public void testStopOutlierDetectionWithEnoughDocumentsToScroll() {
String sourceIndex = "test-outlier-detection-with-enough-docs-to-scroll";
String sourceIndex = "test-stop-outlier-detection-with-enough-docs-to-scroll";
client().admin().indices().prepareCreate(sourceIndex)
.addMapping("_doc", "numeric_1", "type=double", "numeric_2", "type=float", "categorical_1", "type=keyword")
@ -236,8 +237,9 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
fail("Failed to index data: " + bulkResponse.buildFailureMessage());
}
String id = "test_outlier_detection_with_enough_docs_to_scroll";
DataFrameAnalyticsConfig config = buildOutlierDetectionAnalytics(id, sourceIndex, "custom_ml");
String id = "test_stop_outlier_detection_with_enough_docs_to_scroll";
DataFrameAnalyticsConfig config = buildOutlierDetectionAnalytics(
id, new String[] {sourceIndex}, sourceIndex + "-results", "custom_ml");
registerAnalytics(config);
putAnalytics(config);
@ -264,10 +266,107 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
}
}
private static DataFrameAnalyticsConfig buildOutlierDetectionAnalytics(String id, String sourceIndex, @Nullable String resultsField) {
public void testOutlierDetectionWithMultipleSourceIndices() throws Exception {
String sourceIndex1 = "test-outlier-detection-with-multiple-source-indices-1";
String sourceIndex2 = "test-outlier-detection-with-multiple-source-indices-2";
String destIndex = "test-outlier-detection-with-multiple-source-indices-results";
String[] sourceIndex = new String[] { sourceIndex1, sourceIndex2 };
client().admin().indices().prepareCreate(sourceIndex1)
.addMapping("_doc", "numeric_1", "type=double", "numeric_2", "type=float", "categorical_1", "type=keyword")
.get();
client().admin().indices().prepareCreate(sourceIndex2)
.addMapping("_doc", "numeric_1", "type=double", "numeric_2", "type=float", "categorical_1", "type=keyword")
.get();
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (String index : sourceIndex) {
for (int i = 0; i < 5; i++) {
IndexRequest indexRequest = new IndexRequest(index);
indexRequest.source("numeric_1", randomDouble(), "numeric_2", randomFloat(), "categorical_1", "foo_" + i);
bulkRequestBuilder.add(indexRequest);
}
}
BulkResponse bulkResponse = bulkRequestBuilder.get();
if (bulkResponse.hasFailures()) {
fail("Failed to index data: " + bulkResponse.buildFailureMessage());
}
String id = "test_outlier_detection_with_multiple_source_indices";
DataFrameAnalyticsConfig config = buildOutlierDetectionAnalytics(id, sourceIndex, destIndex, null);
registerAnalytics(config);
putAnalytics(config);
assertState(id, DataFrameAnalyticsState.STOPPED);
startAnalytics(id);
waitUntilAnalyticsIsStopped(id);
// Check we've got all docs
SearchResponse searchResponse = client().prepareSearch(config.getDest().getIndex()).setTrackTotalHits(true).get();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) bulkRequestBuilder.numberOfActions()));
// Check they all have an outlier_score
searchResponse = client().prepareSearch(config.getDest().getIndex())
.setTrackTotalHits(true)
.setQuery(QueryBuilders.existsQuery("ml.outlier_score")).get();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) bulkRequestBuilder.numberOfActions()));
}
public void testOutlierDetectionWithPreExistingDestIndex() throws Exception {
String sourceIndex = "test-outlier-detection-with-pre-existing-dest-index";
String destIndex = "test-outlier-detection-with-pre-existing-dest-index-results";
client().admin().indices().prepareCreate(sourceIndex)
.addMapping("_doc", "numeric_1", "type=double", "numeric_2", "type=float", "categorical_1", "type=keyword")
.get();
client().admin().indices().prepareCreate(destIndex)
.addMapping("_doc", "numeric_1", "type=double", "numeric_2", "type=float", "categorical_1", "type=keyword")
.get();
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < 5; i++) {
IndexRequest indexRequest = new IndexRequest(sourceIndex);
indexRequest.source("numeric_1", randomDouble(), "numeric_2", randomFloat(), "categorical_1", "foo_" + i);
bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder.get();
if (bulkResponse.hasFailures()) {
fail("Failed to index data: " + bulkResponse.buildFailureMessage());
}
String id = "test_outlier_detection_with_pre_existing_dest_index";
DataFrameAnalyticsConfig config = buildOutlierDetectionAnalytics(id, new String[] {sourceIndex}, destIndex, null);
registerAnalytics(config);
putAnalytics(config);
assertState(id, DataFrameAnalyticsState.STOPPED);
startAnalytics(id);
waitUntilAnalyticsIsStopped(id);
// Check we've got all docs
SearchResponse searchResponse = client().prepareSearch(config.getDest().getIndex()).setTrackTotalHits(true).get();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) bulkRequestBuilder.numberOfActions()));
// Check they all have an outlier_score
searchResponse = client().prepareSearch(config.getDest().getIndex())
.setTrackTotalHits(true)
.setQuery(QueryBuilders.existsQuery("ml.outlier_score")).get();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) bulkRequestBuilder.numberOfActions()));
}
private static DataFrameAnalyticsConfig buildOutlierDetectionAnalytics(String id, String[] sourceIndex, String destIndex,
@Nullable String resultsField) {
DataFrameAnalyticsConfig.Builder configBuilder = new DataFrameAnalyticsConfig.Builder(id);
configBuilder.setSource(new DataFrameAnalyticsSource(sourceIndex, null));
configBuilder.setDest(new DataFrameAnalyticsDest(sourceIndex + "-results", resultsField));
configBuilder.setDest(new DataFrameAnalyticsDest(destIndex, resultsField));
configBuilder.setAnalysis(new OutlierDetection());
return configBuilder.build();
}

View File

@ -519,7 +519,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
AnalyticsProcessManager analyticsProcessManager = new AnalyticsProcessManager(client, threadPool, analyticsProcessFactory);
DataFrameAnalyticsConfigProvider dataFrameAnalyticsConfigProvider = new DataFrameAnalyticsConfigProvider(client);
assert client instanceof NodeClient;
DataFrameAnalyticsManager dataFrameAnalyticsManager = new DataFrameAnalyticsManager(clusterService, (NodeClient) client,
DataFrameAnalyticsManager dataFrameAnalyticsManager = new DataFrameAnalyticsManager((NodeClient) client,
dataFrameAnalyticsConfigProvider, analyticsProcessManager);
this.dataFrameAnalyticsManager.set(dataFrameAnalyticsManager);

View File

@ -51,6 +51,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsManager;
import org.elasticsearch.xpack.ml.dataframe.MappingsMerger;
import org.elasticsearch.xpack.ml.dataframe.SourceDestValidator;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
@ -157,23 +158,35 @@ public class TransportStartDataFrameAnalyticsAction
);
// Tell the job tracker to refresh the memory requirement for this job and all other jobs that have persistent tasks
ActionListener<DataFrameAnalyticsConfig> validateListener = ActionListener.wrap(
ActionListener<DataFrameAnalyticsConfig> configListener = ActionListener.wrap(
config -> memoryTracker.addDataFrameAnalyticsJobMemoryAndRefreshAllOthers(
request.getId(), config.getModelMemoryLimit().getBytes(), memoryRequirementRefreshListener),
listener::onFailure
);
// Validate config
ActionListener<DataFrameAnalyticsConfig> configListener = ActionListener.wrap(
config -> {
new SourceDestValidator(clusterService.state(), indexNameExpressionResolver).check(config);
DataFrameDataExtractorFactory.validateConfigAndSourceIndex(client, config, validateListener);
},
listener::onFailure
// Get config
getConfigAndValidate(request.getId(), configListener);
}
private void getConfigAndValidate(String id, ActionListener<DataFrameAnalyticsConfig> finalListener) {
// Validate mappings can be merged
ActionListener<DataFrameAnalyticsConfig> firstValidationListener = ActionListener.wrap(
config -> MappingsMerger.mergeMappings(client, config.getHeaders(), config.getSource().getIndex(), ActionListener.wrap(
mappings -> finalListener.onResponse(config), finalListener::onFailure)),
finalListener::onFailure
);
// Get config
configProvider.get(request.getId(), configListener);
// Validate source and dest; check data extraction is possible
ActionListener<DataFrameAnalyticsConfig> getConfigListener = ActionListener.wrap(
config -> {
new SourceDestValidator(clusterService.state(), indexNameExpressionResolver).check(config);
DataFrameDataExtractorFactory.validateConfigAndSourceIndex(client, config, firstValidationListener);
},
finalListener::onFailure
);
// First, get the config
configProvider.get(id, getConfigListener);
}
private void waitForAnalyticsStarted(PersistentTasksCustomMetaData.PersistentTask<StartDataFrameAnalyticsAction.TaskParams> task,

View File

@ -10,23 +10,35 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSortConfig;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import java.time.Clock;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
/**
* {@link DataFrameAnalyticsIndex} class encapsulates logic for creating destination index based on source index metadata.
*/
@ -36,57 +48,103 @@ final class DataFrameAnalyticsIndex {
private static final String META = "_meta";
/**
* Unfortunately, getting the settings of an index include internal settings that should
* not be set explicitly. There is no way to filter those out. Thus, we have to maintain
* a list of them and filter them out manually.
* We only preserve the most important settings.
* If the user needs other settings on the destination index they
* should create the destination index before starting the analytics.
*/
private static final List<String> INTERNAL_SETTINGS = Arrays.asList(
"index.creation_date",
"index.provided_name",
"index.uuid",
"index.version.created",
"index.version.upgraded"
);
private static final String[] PRESERVED_SETTINGS = new String[] {"index.number_of_shards", "index.number_of_replicas"};
private DataFrameAnalyticsIndex() {}
/**
* Creates destination index based on source index metadata.
*/
public static void createDestinationIndex(Client client,
Clock clock,
ClusterState clusterState,
DataFrameAnalyticsConfig analyticsConfig,
ActionListener<CreateIndexResponse> listener) {
String sourceIndex = analyticsConfig.getSource().getIndex();
Map<String, String> headers = analyticsConfig.getHeaders();
IndexMetaData sourceIndexMetaData = clusterState.getMetaData().getIndices().get(sourceIndex);
if (sourceIndexMetaData == null) {
listener.onFailure(new IndexNotFoundException(sourceIndex));
return;
}
CreateIndexRequest createIndexRequest =
prepareCreateIndexRequest(sourceIndexMetaData, analyticsConfig.getDest().getIndex(), analyticsConfig.getId(), clock);
ClientHelper.executeWithHeadersAsync(
headers, ClientHelper.ML_ORIGIN, client, CreateIndexAction.INSTANCE, createIndexRequest, listener);
ActionListener<CreateIndexRequest> createIndexRequestListener = ActionListener.wrap(
createIndexRequest -> ClientHelper.executeWithHeadersAsync(analyticsConfig.getHeaders(), ClientHelper.ML_ORIGIN, client,
CreateIndexAction.INSTANCE, createIndexRequest, listener),
listener::onFailure
);
prepareCreateIndexRequest(client, clock, analyticsConfig, createIndexRequestListener);
}
private static CreateIndexRequest prepareCreateIndexRequest(IndexMetaData sourceIndexMetaData,
String destinationIndex,
String analyticsId,
Clock clock) {
// Settings
Settings.Builder settingsBuilder = Settings.builder().put(sourceIndexMetaData.getSettings());
INTERNAL_SETTINGS.forEach(settingsBuilder::remove);
private static void prepareCreateIndexRequest(Client client, Clock clock, DataFrameAnalyticsConfig config,
ActionListener<CreateIndexRequest> listener) {
AtomicReference<Settings> settingsHolder = new AtomicReference<>();
String[] sourceIndex = config.getSource().getIndex();
ActionListener<ImmutableOpenMap<String, MappingMetaData>> mappingsListener = ActionListener.wrap(
mappings -> listener.onResponse(createIndexRequest(clock, config, settingsHolder.get(), mappings)),
listener::onFailure
);
ActionListener<Settings> settingsListener = ActionListener.wrap(
settings -> {
settingsHolder.set(settings);
MappingsMerger.mergeMappings(client, config.getHeaders(), sourceIndex, mappingsListener);
},
listener::onFailure
);
ActionListener<GetSettingsResponse> getSettingsResponseListener = ActionListener.wrap(
settingsResponse -> settingsListener.onResponse(settings(settingsResponse)),
listener::onFailure
);
GetSettingsRequest getSettingsRequest = new GetSettingsRequest();
getSettingsRequest.indices(sourceIndex);
getSettingsRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
getSettingsRequest.names(PRESERVED_SETTINGS);
ClientHelper.executeWithHeadersAsync(config.getHeaders(), ML_ORIGIN, client, GetSettingsAction.INSTANCE,
getSettingsRequest, getSettingsResponseListener);
}
private static CreateIndexRequest createIndexRequest(Clock clock, DataFrameAnalyticsConfig config, Settings settings,
ImmutableOpenMap<String, MappingMetaData> mappings) {
// There should only be 1 type
assert mappings.size() == 1;
String destinationIndex = config.getDest().getIndex();
String type = mappings.keysIt().next();
Map<String, Object> mappingsAsMap = mappings.valuesIt().next().sourceAsMap();
addProperties(mappingsAsMap);
addMetaData(mappingsAsMap, config.getId(), clock);
return new CreateIndexRequest(destinationIndex, settings).mapping(type, mappingsAsMap);
}
private static Settings settings(GetSettingsResponse settingsResponse) {
Integer maxNumberOfShards = findMaxSettingValue(settingsResponse, IndexMetaData.SETTING_NUMBER_OF_SHARDS);
Integer maxNumberOfReplicas = findMaxSettingValue(settingsResponse, IndexMetaData.SETTING_NUMBER_OF_REPLICAS);
Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put(IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey(), DataFrameAnalyticsFields.ID);
settingsBuilder.put(IndexSortConfig.INDEX_SORT_ORDER_SETTING.getKey(), SortOrder.ASC);
Settings settings = settingsBuilder.build();
if (maxNumberOfShards != null) {
settingsBuilder.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, maxNumberOfShards);
}
if (maxNumberOfReplicas != null) {
settingsBuilder.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, maxNumberOfReplicas);
}
return settingsBuilder.build();
}
// Mappings
String singleMappingType = sourceIndexMetaData.getMappings().keysIt().next();
Map<String, Object> mappingsAsMap = sourceIndexMetaData.getMappings().valuesIt().next().sourceAsMap();
addProperties(mappingsAsMap);
addMetaData(mappingsAsMap, analyticsId, clock);
return new CreateIndexRequest(destinationIndex, settings).mapping(singleMappingType, mappingsAsMap);
@Nullable
private static Integer findMaxSettingValue(GetSettingsResponse settingsResponse, String settingKey) {
Integer maxValue = null;
Iterator<Settings> settingsIterator = settingsResponse.getIndexToSettings().valuesIt();
while (settingsIterator.hasNext()) {
Settings settings = settingsIterator.next();
Integer indexValue = settings.getAsInt(settingKey, null);
if (indexValue != null) {
maxValue = maxValue == null ? indexValue : Math.max(indexValue, maxValue);
}
}
return maxValue;
}
private static void addProperties(Map<String, Object> mappingsAsMap) {
@ -115,6 +173,22 @@ final class DataFrameAnalyticsIndex {
return value;
}
private DataFrameAnalyticsIndex() {}
public static void updateMappingsToDestIndex(Client client, DataFrameAnalyticsConfig analyticsConfig, GetIndexResponse getIndexResponse,
ActionListener<AcknowledgedResponse> listener) {
// We have validated the destination index should match a single index
assert getIndexResponse.indices().length == 1;
ImmutableOpenMap<String, MappingMetaData> mappings = getIndexResponse.getMappings().get(getIndexResponse.indices()[0]);
String type = mappings.keysIt().next();
Map<String, Object> addedMappings = Collections.singletonMap(PROPERTIES,
Collections.singletonMap(DataFrameAnalyticsFields.ID, Collections.singletonMap("type", "keyword")));
PutMappingRequest putMappingRequest = new PutMappingRequest(getIndexResponse.indices());
putMappingRequest.type(type);
putMappingRequest.source(addedMappings);
ClientHelper.executeWithHeadersAsync(analyticsConfig.getHeaders(), ML_ORIGIN, client, PutMappingAction.INSTANCE,
putMappingRequest, listener);
}
}

View File

@ -5,16 +5,20 @@
*/
package org.elasticsearch.xpack.ml.dataframe;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
@ -40,17 +44,17 @@ import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
public class DataFrameAnalyticsManager {
private final ClusterService clusterService;
private static final Logger LOGGER = LogManager.getLogger(DataFrameAnalyticsManager.class);
/**
* We need a {@link NodeClient} to be get the reindexing task and be able to report progress
* We need a {@link NodeClient} to get the reindexing task and be able to report progress
*/
private final NodeClient client;
private final DataFrameAnalyticsConfigProvider configProvider;
private final AnalyticsProcessManager processManager;
public DataFrameAnalyticsManager(ClusterService clusterService, NodeClient client, DataFrameAnalyticsConfigProvider configProvider,
public DataFrameAnalyticsManager(NodeClient client, DataFrameAnalyticsConfigProvider configProvider,
AnalyticsProcessManager processManager) {
this.clusterService = Objects.requireNonNull(clusterService);
this.client = Objects.requireNonNull(client);
this.configProvider = Objects.requireNonNull(configProvider);
this.processManager = Objects.requireNonNull(processManager);
@ -77,7 +81,6 @@ public class DataFrameAnalyticsManager {
break;
// The task has fully reindexed the documents and we should continue on with our analyses
case ANALYZING:
// TODO apply previously stored model state if applicable
startAnalytics(task, config, true);
break;
// If we are already at REINDEXING, we are not 100% sure if we reindexed ALL the docs.
@ -160,7 +163,27 @@ public class DataFrameAnalyticsManager {
reindexCompletedListener::onFailure
);
DataFrameAnalyticsIndex.createDestinationIndex(client, Clock.systemUTC(), clusterService.state(), config, copyIndexCreatedListener);
// Create destination index if it does not exist
ActionListener<GetIndexResponse> destIndexListener = ActionListener.wrap(
indexResponse -> {
LOGGER.info("[{}] Using existing destination index [{}]", config.getId(), indexResponse.indices()[0]);
DataFrameAnalyticsIndex.updateMappingsToDestIndex(client, config, indexResponse, ActionListener.wrap(
acknowledgedResponse -> copyIndexCreatedListener.onResponse(null),
copyIndexCreatedListener::onFailure
));
},
e -> {
if (org.elasticsearch.ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
LOGGER.info("[{}] Creating destination index [{}]", config.getId(), config.getDest().getIndex());
DataFrameAnalyticsIndex.createDestinationIndex(client, Clock.systemUTC(), config, copyIndexCreatedListener);
} else {
copyIndexCreatedListener.onFailure(e);
}
}
);
ClientHelper.executeWithHeadersAsync(config.getHeaders(), ML_ORIGIN, client, GetIndexAction.INSTANCE,
new GetIndexRequest().indices(config.getDest().getIndex()), destIndexListener);
}
private void startAnalytics(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, boolean isTaskRestarting) {

View File

@ -0,0 +1,100 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.dataframe;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
/**
* Merges mappings in a best effort and naive manner.
* The merge will fail if there is any conflict, i.e. the mappings of a field are not exactly the same.
*/
public final class MappingsMerger {
private MappingsMerger() {}
public static void mergeMappings(Client client, Map<String, String> headers, String[] index,
ActionListener<ImmutableOpenMap<String, MappingMetaData>> listener) {
ActionListener<GetMappingsResponse> mappingsListener = ActionListener.wrap(
getMappingsResponse -> listener.onResponse(MappingsMerger.mergeMappings(getMappingsResponse)),
listener::onFailure
);
GetMappingsRequest getMappingsRequest = new GetMappingsRequest();
getMappingsRequest.indices(index);
ClientHelper.executeWithHeadersAsync(headers, ML_ORIGIN, client, GetMappingsAction.INSTANCE, getMappingsRequest, mappingsListener);
}
static ImmutableOpenMap<String, MappingMetaData> mergeMappings(GetMappingsResponse getMappingsResponse) {
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> indexToMappings = getMappingsResponse.getMappings();
String type = null;
Map<String, Object> mergedMappings = new HashMap<>();
Iterator<ObjectObjectCursor<String, ImmutableOpenMap<String, MappingMetaData>>> iterator = indexToMappings.iterator();
while (iterator.hasNext()) {
ObjectObjectCursor<String, ImmutableOpenMap<String, MappingMetaData>> indexMappings = iterator.next();
Iterator<ObjectObjectCursor<String, MappingMetaData>> typeIterator = indexMappings.value.iterator();
while (typeIterator.hasNext()) {
ObjectObjectCursor<String, MappingMetaData> typeMapping = typeIterator.next();
if (type == null) {
type = typeMapping.key;
} else {
if (type.equals(typeMapping.key) == false) {
throw ExceptionsHelper.badRequestException("source indices contain mappings for different types: [{}, {}]",
type, typeMapping.key);
}
}
Map<String, Object> currentMappings = typeMapping.value.getSourceAsMap();
if (currentMappings.containsKey("properties")) {
@SuppressWarnings("unchecked")
Map<String, Object> fieldMappings = (Map<String, Object>) currentMappings.get("properties");
for (Map.Entry<String, Object> fieldMapping : fieldMappings.entrySet()) {
if (mergedMappings.containsKey(fieldMapping.getKey())) {
if (mergedMappings.get(fieldMapping.getKey()).equals(fieldMapping.getValue()) == false) {
throw ExceptionsHelper.badRequestException("cannot merge mappings because of differences for field [{}]",
fieldMapping.getKey());
}
} else {
mergedMappings.put(fieldMapping.getKey(), fieldMapping.getValue());
}
}
}
}
}
MappingMetaData mappingMetaData = createMappingMetaData(type, mergedMappings);
ImmutableOpenMap.Builder<String, MappingMetaData> result = ImmutableOpenMap.builder();
result.put(type, mappingMetaData);
return result.build();
}
private static MappingMetaData createMappingMetaData(String type, Map<String, Object> mappings) {
try {
return new MappingMetaData(type, Collections.singletonMap("properties", mappings));
} catch (IOException e) {
throw ExceptionsHelper.serverError("Failed to parse mappings: " + mappings);
}
}
}

View File

@ -29,10 +29,13 @@ public class SourceDestValidator {
}
public void check(DataFrameAnalyticsConfig config) {
String sourceIndex = config.getSource().getIndex();
String[] sourceIndex = config.getSource().getIndex();
String destIndex = config.getDest().getIndex();
String[] sourceExpressions = Strings.tokenizeToStringArray(sourceIndex, ",");
String[] sourceExpressions = Arrays.stream(sourceIndex)
.map(index -> Strings.tokenizeToStringArray(index, ","))
.flatMap(Arrays::stream)
.toArray(String[]::new);
for (String sourceExpression : sourceExpressions) {
if (Regex.simpleMatch(sourceExpression, destIndex)) {
@ -45,7 +48,7 @@ public class SourceDestValidator {
IndicesOptions.lenientExpandOpen(), sourceExpressions)));
if (concreteSourceIndexNames.isEmpty()) {
throw ExceptionsHelper.badRequestException("No index matches source index [{}]", sourceIndex);
throw ExceptionsHelper.badRequestException("No index matches source index {}", Arrays.toString(sourceIndex));
}
final String[] concreteDestIndexNames = indexNameExpressionResolver.concreteIndexNames(clusterState,
@ -59,7 +62,7 @@ public class SourceDestValidator {
if (concreteDestIndexNames.length == 1 && concreteSourceIndexNames.contains(concreteDestIndexNames[0])) {
// In case the dest index is an alias, we need to check the concrete index is not matched by source
throw ExceptionsHelper.badRequestException("Destination index [{}], which is an alias for [{}], " +
"must not be included in source index [{}]", destIndex, concreteDestIndexNames[0], sourceIndex);
"must not be included in source index {}", destIndex, concreteDestIndexNames[0], Arrays.toString(sourceIndex));
}
}
}

View File

@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
@ -73,7 +74,7 @@ public class DataFrameDataExtractorFactory {
DataFrameAnalyticsConfig config,
boolean isTaskRestarting,
ActionListener<DataFrameDataExtractorFactory> listener) {
validateIndexAndExtractFields(client, config.getDest().getIndex(), config, isTaskRestarting,
validateIndexAndExtractFields(client, new String[] {config.getDest().getIndex()}, config, isTaskRestarting,
ActionListener.wrap(extractedFields -> listener.onResponse(new DataFrameDataExtractorFactory(
client, config.getId(), config.getDest().getIndex(), extractedFields, config.getHeaders())),
listener::onFailure
@ -100,7 +101,7 @@ public class DataFrameDataExtractorFactory {
}
private static void validateIndexAndExtractFields(Client client,
String index,
String[] index,
DataFrameAnalyticsConfig config,
boolean isTaskRestarting,
ActionListener<ExtractedFields> listener) {
@ -120,6 +121,7 @@ public class DataFrameDataExtractorFactory {
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest();
fieldCapabilitiesRequest.indices(index);
fieldCapabilitiesRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
fieldCapabilitiesRequest.fields("*");
ClientHelper.executeWithHeaders(config.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> {
client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, fieldCapabilitiesHandler);
@ -134,7 +136,7 @@ public class DataFrameDataExtractorFactory {
getDocValueFieldsLimit(client, index, docValueFieldsLimitListener);
}
private static void getDocValueFieldsLimit(Client client, String index, ActionListener<Integer> docValueFieldsLimitListener) {
private static void getDocValueFieldsLimit(Client client, String[] index, ActionListener<Integer> docValueFieldsLimitListener) {
ActionListener<GetSettingsResponse> settingsListener = ActionListener.wrap(getSettingsResponse -> {
Integer minDocValueFieldsLimit = Integer.MAX_VALUE;

View File

@ -55,13 +55,13 @@ public class ExtractedFieldsDetector {
COMPATIBLE_FIELD_TYPES = Collections.unmodifiableSet(compatibleTypes);
}
private final String index;
private final String[] index;
private final DataFrameAnalyticsConfig config;
private final boolean isTaskRestarting;
private final int docValueFieldsLimit;
private final FieldCapabilitiesResponse fieldCapabilitiesResponse;
ExtractedFieldsDetector(String index, DataFrameAnalyticsConfig config, boolean isTaskRestarting, int docValueFieldsLimit,
ExtractedFieldsDetector(String[] index, DataFrameAnalyticsConfig config, boolean isTaskRestarting, int docValueFieldsLimit,
FieldCapabilitiesResponse fieldCapabilitiesResponse) {
this.index = Objects.requireNonNull(index);
this.config = Objects.requireNonNull(config);
@ -74,7 +74,7 @@ public class ExtractedFieldsDetector {
Set<String> fields = new HashSet<>(fieldCapabilitiesResponse.get().keySet());
fields.removeAll(IGNORE_FIELDS);
checkResultsFieldIsNotPresent(fields, index);
checkResultsFieldIsNotPresent();
// Ignore fields under the results object
fields.removeIf(field -> field.startsWith(config.getDest().getResultsField() + "."));
@ -87,7 +87,7 @@ public class ExtractedFieldsDetector {
ExtractedFields extractedFields = ExtractedFields.build(sortedFields, Collections.emptySet(), fieldCapabilitiesResponse)
.filterFields(ExtractedField.ExtractionMethod.DOC_VALUE);
if (extractedFields.getAllFields().isEmpty()) {
throw ExceptionsHelper.badRequestException("No compatible fields could be detected in index [{}]", index);
throw ExceptionsHelper.badRequestException("No compatible fields could be detected in index {}", Arrays.toString(index));
}
if (extractedFields.getDocValueFields().size() > docValueFieldsLimit) {
extractedFields = fetchFromSourceIfSupported(extractedFields);
@ -100,11 +100,16 @@ public class ExtractedFieldsDetector {
return extractedFields;
}
private void checkResultsFieldIsNotPresent(Set<String> fields, String index) {
private void checkResultsFieldIsNotPresent() {
// If the task is restarting we do not mind the index containing the results field, we will overwrite all docs
if (isTaskRestarting == false && fields.contains(config.getDest().getResultsField())) {
throw ExceptionsHelper.badRequestException("Index [{}] already has a field that matches the {}.{} [{}];" +
" please set a different {}", index, DataFrameAnalyticsConfig.DEST.getPreferredName(),
if (isTaskRestarting) {
return;
}
Map<String, FieldCapabilities> indexToFieldCaps = fieldCapabilitiesResponse.getField(config.getDest().getResultsField());
if (indexToFieldCaps != null && indexToFieldCaps.isEmpty() == false) {
throw ExceptionsHelper.badRequestException("A field that matches the {}.{} [{}] already exists;" +
" please set a different {}", DataFrameAnalyticsConfig.DEST.getPreferredName(),
DataFrameAnalyticsDest.RESULTS_FIELD.getPreferredName(), config.getDest().getResultsField(),
DataFrameAnalyticsDest.RESULTS_FIELD.getPreferredName());
}
@ -121,7 +126,7 @@ public class ExtractedFieldsDetector {
}
}
private void includeAndExcludeFields(Set<String> fields, String index) {
private void includeAndExcludeFields(Set<String> fields, String[] index) {
FetchSourceContext analyzedFields = config.getAnalyzedFields();
if (analyzedFields == null) {
return;
@ -136,12 +141,14 @@ public class ExtractedFieldsDetector {
// If the inclusion set does not match anything, that means the user's desired fields cannot be found in
// the collection of supported field types. We should let the user know.
Set<String> includedSet = NameResolver.newUnaliased(fields,
(ex) -> new ResourceNotFoundException(Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER, index, ex)))
(ex) -> new ResourceNotFoundException(
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER, ex)))
.expand(includes, false);
// If the exclusion set does not match anything, that means the fields are already not present
// no need to raise if nothing matched
Set<String> excludedSet = NameResolver.newUnaliased(fields,
(ex) -> new ResourceNotFoundException(Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER, index, ex)))
(ex) -> new ResourceNotFoundException(
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER, ex)))
.expand(excludes, true);
fields.retainAll(includedSet);

View File

@ -10,17 +10,21 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
@ -33,27 +37,25 @@ import java.io.IOException;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class DataFrameAnalyticsIndexTests extends ESTestCase {
private static final String CLUSTER_NAME = "some-cluster-name";
private static final String ANALYTICS_ID = "some-analytics-id";
private static final String SOURCE_INDEX = "source-index";
private static final String[] SOURCE_INDEX = new String[] {"source-index"};
private static final String DEST_INDEX = "dest-index";
private static final DataFrameAnalyticsConfig ANALYTICS_CONFIG =
new DataFrameAnalyticsConfig.Builder(ANALYTICS_ID)
@ -71,6 +73,8 @@ public class DataFrameAnalyticsIndexTests extends ESTestCase {
public void testCreateDestinationIndex() throws IOException {
when(client.threadPool()).thenReturn(threadPool);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
ArgumentCaptor<CreateIndexRequest> createIndexRequestCaptor = ArgumentCaptor.forClass(CreateIndexRequest.class);
doAnswer(
invocationOnMock -> {
@SuppressWarnings("unchecked")
@ -78,60 +82,102 @@ public class DataFrameAnalyticsIndexTests extends ESTestCase {
listener.onResponse(null);
return null;
})
.when(client).execute(any(), any(), any());
.when(client).execute(eq(CreateIndexAction.INSTANCE), createIndexRequestCaptor.capture(), any());
Map<String, Object> propertiesMapping = new HashMap<>();
propertiesMapping.put("properties", new HashMap<>());
ClusterState clusterState =
ClusterState.builder(new ClusterName(CLUSTER_NAME))
.metaData(MetaData.builder()
.put(IndexMetaData.builder(SOURCE_INDEX)
.settings(Settings.builder()
Settings index1Settings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0))
.putMapping(new MappingMetaData("_doc", propertiesMapping))))
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build();
Settings index2Settings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 5)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
ArgumentCaptor<GetSettingsRequest> getSettingsRequestCaptor = ArgumentCaptor.forClass(GetSettingsRequest.class);
ArgumentCaptor<GetMappingsRequest> getMappingsRequestCaptor = ArgumentCaptor.forClass(GetMappingsRequest.class);
ImmutableOpenMap.Builder<String, Settings> indexToSettings = ImmutableOpenMap.builder();
indexToSettings.put("index_1", index1Settings);
indexToSettings.put("index_2", index2Settings);
GetSettingsResponse getSettingsResponse = new GetSettingsResponse(indexToSettings.build(), ImmutableOpenMap.of());
doAnswer(
invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<GetSettingsResponse> listener = (ActionListener<GetSettingsResponse>) invocationOnMock.getArguments()[2];
listener.onResponse(getSettingsResponse);
return null;
}
).when(client).execute(eq(GetSettingsAction.INSTANCE), getSettingsRequestCaptor.capture(), any());
Map<String, Object> index1Properties = new HashMap<>();
index1Properties.put("field_1", "field_1_mappings");
index1Properties.put("field_2", "field_2_mappings");
Map<String, Object> index1Mappings = Collections.singletonMap("properties", index1Properties);
MappingMetaData index1MappingMetaData = new MappingMetaData("_doc", index1Mappings);
Map<String, Object> index2Properties = new HashMap<>();
index2Properties.put("field_1", "field_1_mappings");
index2Properties.put("field_2", "field_2_mappings");
Map<String, Object> index2Mappings = Collections.singletonMap("properties", index2Properties);
MappingMetaData index2MappingMetaData = new MappingMetaData("_doc", index2Mappings);
ImmutableOpenMap.Builder<String, MappingMetaData> index1MappingsMap = ImmutableOpenMap.builder();
index1MappingsMap.put("_doc", index1MappingMetaData);
ImmutableOpenMap.Builder<String, MappingMetaData> index2MappingsMap = ImmutableOpenMap.builder();
index2MappingsMap.put("_doc", index2MappingMetaData);
ImmutableOpenMap.Builder<String, ImmutableOpenMap<String, MappingMetaData>> mappings = ImmutableOpenMap.builder();
mappings.put("index_1", index1MappingsMap.build());
mappings.put("index_2", index2MappingsMap.build());
GetMappingsResponse getMappingsResponse = new GetMappingsResponse(mappings.build());
doAnswer(
invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<GetMappingsResponse> listener = (ActionListener<GetMappingsResponse>) invocationOnMock.getArguments()[2];
listener.onResponse(getMappingsResponse);
return null;
}
).when(client).execute(eq(GetMappingsAction.INSTANCE), getMappingsRequestCaptor.capture(), any());
DataFrameAnalyticsIndex.createDestinationIndex(
client,
clock,
clusterState,
ANALYTICS_CONFIG,
ActionListener.wrap(
response -> {},
e -> fail(e.getMessage())));
ArgumentCaptor<CreateIndexRequest> createIndexRequestCaptor = ArgumentCaptor.forClass(CreateIndexRequest.class);
verify(client, atLeastOnce()).threadPool();
verify(client).execute(eq(CreateIndexAction.INSTANCE), createIndexRequestCaptor.capture(), any());
verifyNoMoreInteractions(client);
GetSettingsRequest capturedGetSettingsRequest = getSettingsRequestCaptor.getValue();
assertThat(capturedGetSettingsRequest.indices(), equalTo(SOURCE_INDEX));
assertThat(capturedGetSettingsRequest.indicesOptions(), equalTo(IndicesOptions.lenientExpandOpen()));
assertThat(Arrays.asList(capturedGetSettingsRequest.names()), contains("index.number_of_shards", "index.number_of_replicas"));
assertThat(getMappingsRequestCaptor.getValue().indices(), equalTo(SOURCE_INDEX));
CreateIndexRequest createIndexRequest = createIndexRequestCaptor.getValue();
assertThat(createIndexRequest.settings().keySet(),
containsInAnyOrder("index.number_of_shards", "index.number_of_replicas", "index.sort.field", "index.sort.order"));
assertThat(createIndexRequest.settings().getAsInt("index.number_of_shards", -1), equalTo(5));
assertThat(createIndexRequest.settings().getAsInt("index.number_of_replicas", -1), equalTo(1));
assertThat(createIndexRequest.settings().get("index.sort.field"), equalTo("_id_copy"));
assertThat(createIndexRequest.settings().get("index.sort.order"), equalTo("asc"));
try (XContentParser parser = createParser(JsonXContent.jsonXContent, createIndexRequest.mappings().get("_doc"))) {
Map<String, Object> map = parser.map();
assertThat(extractValue("_doc.properties._id_copy.type", map), equalTo("keyword"));
assertThat(extractValue("_doc.properties.field_1", map), equalTo("field_1_mappings"));
assertThat(extractValue("_doc.properties.field_2", map), equalTo("field_2_mappings"));
assertThat(extractValue("_doc._meta.analytics", map), equalTo(ANALYTICS_ID));
assertThat(extractValue("_doc._meta.creation_date_in_millis", map), equalTo(CURRENT_TIME_MILLIS));
assertThat(extractValue("_doc._meta.created_by", map), equalTo(CREATED_BY));
}
}
public void testCreateDestinationIndex_IndexNotFound() {
ClusterState clusterState =
ClusterState.builder(new ClusterName(CLUSTER_NAME))
.metaData(MetaData.builder())
.build();
DataFrameAnalyticsIndex.createDestinationIndex(
client,
clock,
clusterState,
ANALYTICS_CONFIG,
ActionListener.wrap(
response -> fail("IndexNotFoundException should be thrown"),
e -> {
assertThat(e, instanceOf(IndexNotFoundException.class));
IndexNotFoundException infe = (IndexNotFoundException) e;
assertThat(infe.getIndex().getName(), equalTo(SOURCE_INDEX));
}));
}
}

View File

@ -0,0 +1,153 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.dataframe;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
public class MappingsMergerTests extends ESTestCase {
public void testMergeMappings_GivenIndicesWithIdenticalMappings() throws IOException {
Map<String, Object> index1Properties = new HashMap<>();
index1Properties.put("field_1", "field_1_mappings");
index1Properties.put("field_2", "field_2_mappings");
Map<String, Object> index1Mappings = Collections.singletonMap("properties", index1Properties);
MappingMetaData index1MappingMetaData = new MappingMetaData("_doc", index1Mappings);
Map<String, Object> index2Properties = new HashMap<>();
index2Properties.put("field_1", "field_1_mappings");
index2Properties.put("field_2", "field_2_mappings");
Map<String, Object> index2Mappings = Collections.singletonMap("properties", index2Properties);
MappingMetaData index2MappingMetaData = new MappingMetaData("_doc", index2Mappings);
ImmutableOpenMap.Builder<String, MappingMetaData> index1MappingsMap = ImmutableOpenMap.builder();
index1MappingsMap.put("_doc", index1MappingMetaData);
ImmutableOpenMap.Builder<String, MappingMetaData> index2MappingsMap = ImmutableOpenMap.builder();
index2MappingsMap.put("_doc", index2MappingMetaData);
ImmutableOpenMap.Builder<String, ImmutableOpenMap<String, MappingMetaData>> mappings = ImmutableOpenMap.builder();
mappings.put("index_1", index1MappingsMap.build());
mappings.put("index_2", index2MappingsMap.build());
GetMappingsResponse getMappingsResponse = new GetMappingsResponse(mappings.build());
ImmutableOpenMap<String, MappingMetaData> mergedMappings = MappingsMerger.mergeMappings(getMappingsResponse);
assertThat(mergedMappings.size(), equalTo(1));
assertThat(mergedMappings.containsKey("_doc"), is(true));
assertThat(mergedMappings.valuesIt().next().getSourceAsMap(), equalTo(index1Mappings));
}
public void testMergeMappings_GivenIndicesWithDifferentTypes() throws IOException {
Map<String, Object> index1Mappings = Collections.singletonMap("properties",
Collections.singletonMap("field_1", "field_1_mappings"));
MappingMetaData index1MappingMetaData = new MappingMetaData("type_1", index1Mappings);
Map<String, Object> index2Mappings = Collections.singletonMap("properties",
Collections.singletonMap("field_1", "field_1_mappings"));
MappingMetaData index2MappingMetaData = new MappingMetaData("type_2", index2Mappings);
ImmutableOpenMap.Builder<String, MappingMetaData> index1MappingsMap = ImmutableOpenMap.builder();
index1MappingsMap.put("type_1", index1MappingMetaData);
ImmutableOpenMap.Builder<String, MappingMetaData> index2MappingsMap = ImmutableOpenMap.builder();
index2MappingsMap.put("type_2", index2MappingMetaData);
ImmutableOpenMap.Builder<String, ImmutableOpenMap<String, MappingMetaData>> mappings = ImmutableOpenMap.builder();
mappings.put("index_1", index1MappingsMap.build());
mappings.put("index_2", index2MappingsMap.build());
GetMappingsResponse getMappingsResponse = new GetMappingsResponse(mappings.build());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> MappingsMerger.mergeMappings(getMappingsResponse));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), containsString("source indices contain mappings for different types:"));
assertThat(e.getMessage(), containsString("type_1"));
assertThat(e.getMessage(), containsString("type_2"));
}
public void testMergeMappings_GivenFieldWithDifferentMapping() throws IOException {
Map<String, Object> index1Mappings = Collections.singletonMap("properties",
Collections.singletonMap("field_1", "field_1_mappings"));
MappingMetaData index1MappingMetaData = new MappingMetaData("_doc", index1Mappings);
Map<String, Object> index2Mappings = Collections.singletonMap("properties",
Collections.singletonMap("field_1", "different_field_1_mappings"));
MappingMetaData index2MappingMetaData = new MappingMetaData("_doc", index2Mappings);
ImmutableOpenMap.Builder<String, MappingMetaData> index1MappingsMap = ImmutableOpenMap.builder();
index1MappingsMap.put("_doc", index1MappingMetaData);
ImmutableOpenMap.Builder<String, MappingMetaData> index2MappingsMap = ImmutableOpenMap.builder();
index2MappingsMap.put("_doc", index2MappingMetaData);
ImmutableOpenMap.Builder<String, ImmutableOpenMap<String, MappingMetaData>> mappings = ImmutableOpenMap.builder();
mappings.put("index_1", index1MappingsMap.build());
mappings.put("index_2", index2MappingsMap.build());
GetMappingsResponse getMappingsResponse = new GetMappingsResponse(mappings.build());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> MappingsMerger.mergeMappings(getMappingsResponse));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("cannot merge mappings because of differences for field [field_1]"));
}
public void testMergeMappings_GivenIndicesWithDifferentMappingsButNoConflicts() throws IOException {
Map<String, Object> index1Properties = new HashMap<>();
index1Properties.put("field_1", "field_1_mappings");
index1Properties.put("field_2", "field_2_mappings");
Map<String, Object> index1Mappings = Collections.singletonMap("properties", index1Properties);
MappingMetaData index1MappingMetaData = new MappingMetaData("_doc", index1Mappings);
Map<String, Object> index2Properties = new HashMap<>();
index2Properties.put("field_1", "field_1_mappings");
index2Properties.put("field_3", "field_3_mappings");
Map<String, Object> index2Mappings = Collections.singletonMap("properties", index2Properties);
MappingMetaData index2MappingMetaData = new MappingMetaData("_doc", index2Mappings);
ImmutableOpenMap.Builder<String, MappingMetaData> index1MappingsMap = ImmutableOpenMap.builder();
index1MappingsMap.put("_doc", index1MappingMetaData);
ImmutableOpenMap.Builder<String, MappingMetaData> index2MappingsMap = ImmutableOpenMap.builder();
index2MappingsMap.put("_doc", index2MappingMetaData);
ImmutableOpenMap.Builder<String, ImmutableOpenMap<String, MappingMetaData>> mappings = ImmutableOpenMap.builder();
mappings.put("index_1", index1MappingsMap.build());
mappings.put("index_2", index2MappingsMap.build());
GetMappingsResponse getMappingsResponse = new GetMappingsResponse(mappings.build());
ImmutableOpenMap<String, MappingMetaData> mergedMappings = MappingsMerger.mergeMappings(getMappingsResponse);
assertThat(mergedMappings.size(), equalTo(1));
assertThat(mergedMappings.containsKey("_doc"), is(true));
Map<String, Object> mappingsAsMap = mergedMappings.valuesIt().next().getSourceAsMap();
assertThat(mappingsAsMap.size(), equalTo(1));
assertThat(mappingsAsMap.containsKey("properties"), is(true));
@SuppressWarnings("unchecked")
Map<String, Object> fieldMappings = (Map<String, Object>) mappingsAsMap.get("properties");
assertThat(fieldMappings.size(), equalTo(3));
assertThat(fieldMappings.keySet(), containsInAnyOrder("field_1", "field_2", "field_3"));
assertThat(fieldMappings.get("field_1"), equalTo("field_1_mappings"));
assertThat(fieldMappings.get("field_2"), equalTo("field_2_mappings"));
assertThat(fieldMappings.get("field_3"), equalTo("field_3_mappings"));
}
}

View File

@ -64,7 +64,7 @@ public class SourceDestValidatorTests extends ESTestCase {
public void testCheck_GivenSimpleSourceIndexAndValidDestIndex() {
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder("test")
.setSource(new DataFrameAnalyticsSource("source-1", null))
.setSource(createSource("source-1"))
.setDest(new DataFrameAnalyticsDest("dest", null))
.setAnalysis(new OutlierDetection())
.build();
@ -75,7 +75,7 @@ public class SourceDestValidatorTests extends ESTestCase {
public void testCheck_GivenMissingConcreteSourceIndex() {
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder("test")
.setSource(new DataFrameAnalyticsSource("missing", null))
.setSource(createSource("missing"))
.setDest(new DataFrameAnalyticsDest("dest", null))
.setAnalysis(new OutlierDetection())
.build();
@ -89,7 +89,7 @@ public class SourceDestValidatorTests extends ESTestCase {
public void testCheck_GivenMissingWildcardSourceIndex() {
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder("test")
.setSource(new DataFrameAnalyticsSource("missing*", null))
.setSource(createSource("missing*"))
.setDest(new DataFrameAnalyticsDest("dest", null))
.setAnalysis(new OutlierDetection())
.build();
@ -103,7 +103,7 @@ public class SourceDestValidatorTests extends ESTestCase {
public void testCheck_GivenDestIndexSameAsSourceIndex() {
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder("test")
.setSource(new DataFrameAnalyticsSource("source-1", null))
.setSource(createSource("source-1"))
.setDest(new DataFrameAnalyticsDest("source-1", null))
.setAnalysis(new OutlierDetection())
.build();
@ -117,7 +117,7 @@ public class SourceDestValidatorTests extends ESTestCase {
public void testCheck_GivenDestIndexMatchesSourceIndex() {
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder("test")
.setSource(new DataFrameAnalyticsSource("source-*", null))
.setSource(createSource("source-*"))
.setDest(new DataFrameAnalyticsDest(SOURCE_2, null))
.setAnalysis(new OutlierDetection())
.build();
@ -131,7 +131,7 @@ public class SourceDestValidatorTests extends ESTestCase {
public void testCheck_GivenDestIndexMatchesOneOfSourceIndices() {
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder("test")
.setSource(new DataFrameAnalyticsSource("source-1,source-*", null))
.setSource(createSource("source-1,source-*"))
.setDest(new DataFrameAnalyticsDest(SOURCE_2, null))
.setAnalysis(new OutlierDetection())
.build();
@ -145,7 +145,7 @@ public class SourceDestValidatorTests extends ESTestCase {
public void testCheck_GivenDestIndexIsAliasThatMatchesMultipleIndices() {
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder("test")
.setSource(new DataFrameAnalyticsSource(SOURCE_1, null))
.setSource(createSource(SOURCE_1))
.setDest(new DataFrameAnalyticsDest("dest-alias", null))
.setAnalysis(new OutlierDetection())
.build();
@ -160,7 +160,7 @@ public class SourceDestValidatorTests extends ESTestCase {
public void testCheck_GivenDestIndexIsAliasThatIsIncludedInSource() {
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder("test")
.setSource(new DataFrameAnalyticsSource("source-1", null))
.setSource(createSource("source-1"))
.setDest(new DataFrameAnalyticsDest("source-1-alias", null))
.setAnalysis(new OutlierDetection())
.build();
@ -173,4 +173,8 @@ public class SourceDestValidatorTests extends ESTestCase {
equalTo("Destination index [source-1-alias], which is an alias for [source-1], " +
"must not be included in source index [source-1]"));
}
private static DataFrameAnalyticsSource createSource(String... index) {
return new DataFrameAnalyticsSource(index, null);
}
}

View File

@ -33,7 +33,7 @@ import static org.mockito.Mockito.when;
public class ExtractedFieldsDetectorTests extends ESTestCase {
private static final String SOURCE_INDEX = "source_index";
private static final String[] SOURCE_INDEX = new String[] { "source_index" };
private static final String DEST_INDEX = "dest_index";
private static final String RESULTS_FIELD = "ml";
@ -154,7 +154,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
SOURCE_INDEX, buildAnalyticsConfig(desiredFields), false, 100, fieldCapabilities);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index] with name [your_field1]"));
assertThat(e.getMessage(), equalTo("No field [your_field1] could be detected"));
}
public void testDetectedExtractedFields_GivenExcludeAllValidFields() {
@ -202,7 +202,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
SOURCE_INDEX, buildAnalyticsConfig(), false, 100, fieldCapabilities);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
assertThat(e.getMessage(), equalTo("Index [source_index] already has a field that matches the dest.results_field [ml]; " +
assertThat(e.getMessage(), equalTo("A field that matches the dest.results_field [ml] already exists; " +
"please set a different results_field"));
}

View File

@ -50,7 +50,7 @@ setup:
"analyzed_fields": [ "obj1.*", "obj2.*" ]
}
- match: { id: "simple-outlier-detection-with-query" }
- match: { source.index: "index-source" }
- match: { source.index: ["index-source"] }
- match: { source.query: {"term" : { "user" : "Kimchy"} } }
- match: { dest.index: "index-dest" }
- match: { analysis: {"outlier_detection":{}} }
@ -63,7 +63,7 @@ setup:
id: "simple-outlier-detection-with-query"
- match: { count: 1 }
- match: { data_frame_analytics.0.id: "simple-outlier-detection-with-query" }
- match: { data_frame_analytics.0.source.index: "index-source" }
- match: { data_frame_analytics.0.source.index: ["index-source"] }
- match: { data_frame_analytics.0.source.query: {"term" : { "user" : "Kimchy"} } }
- match: { data_frame_analytics.0.dest.index: "index-dest" }
- match: { data_frame_analytics.0.analysis: {"outlier_detection":{}} }
@ -145,7 +145,7 @@ setup:
"analysis": {"outlier_detection":{}}
}
- match: { id: "simple-outlier-detection" }
- match: { source.index: "index-source" }
- match: { source.index: ["index-source"] }
- match: { source.query: {"match_all" : {} } }
- match: { dest.index: "index-dest" }
- match: { analysis: {"outlier_detection":{}} }
@ -175,7 +175,7 @@ setup:
}
}
- match: { id: "custom-outlier-detection" }
- match: { source.index: "index-source" }
- match: { source.index: ["index-source"] }
- match: { source.query: {"match_all" : {} } }
- match: { dest.index: "index-dest" }
- match: { analysis.outlier_detection.n_neighbors: 5 }
@ -427,16 +427,34 @@ setup:
}
---
"Test put config given source with empty index":
"Test put config given source with empty index array":
- do:
catch: /\[index\] must be non-empty/
catch: /source\.index must specify at least one index/
ml.put_data_frame_analytics:
id: "simple-outlier-detection"
body: >
{
"source": {
"index": ""
"index": []
},
"dest": {
"index": "index-dest"
},
"analysis": {"outlier_detection":{}}
}
---
"Test put config given source with empty string in index array":
- do:
catch: /source\.index must contain non-null and non-empty strings/
ml.put_data_frame_analytics:
id: "simple-outlier-detection"
body: >
{
"source": {
"index": [""]
},
"dest": {
"index": "index-dest"
@ -889,7 +907,7 @@ setup:
"analyzed_fields": [ "obj1.*", "obj2.*" ]
}
- match: { id: "simple-outlier-detection-with-query" }
- match: { source.index: "index-source" }
- match: { source.index: ["index-source"] }
- match: { source.query: {"term" : { "user" : "Kimchy"} } }
- match: { dest.index: "index-dest" }
- match: { analysis: {"outlier_detection":{}} }