diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidator.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidator.java index 2ecd6b2ec65..4aaef4a25bd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidator.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidator.java @@ -214,6 +214,7 @@ public final class SourceDestValidator { indexNameExpressionResolver.concreteIndexNames( state, DEFAULT_INDICES_OPTIONS_FOR_VALIDATION, + true, resolvedSource.toArray(new String[0]) ) ) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java index f02d64942a3..23c458e35c0 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.integration; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; @@ -18,6 +19,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; @@ -35,6 +37,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.evaluation.classification.Preci import org.elasticsearch.xpack.core.ml.dataframe.evaluation.classification.Recall; import org.junit.After; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -141,6 +144,61 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase { assertEvaluation(KEYWORD_FIELD, KEYWORD_FIELD_VALUES, "ml." + predictedClassField); } + public void testWithDatastreams() throws Exception { + initialize("classification_with_datastreams", true); + String predictedClassField = KEYWORD_FIELD + "_prediction"; + indexData(sourceIndex, 300, 50, KEYWORD_FIELD); + + DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, + new Classification( + KEYWORD_FIELD, + BoostedTreeParams.builder().setNumTopFeatureImportanceValues(1).build(), + null, + null, + null, + null, + null)); + putAnalytics(config); + + assertIsStopped(jobId); + assertProgressIsZero(jobId); + + startAnalytics(jobId); + waitUntilAnalyticsIsStopped(jobId); + + client().admin().indices().refresh(new RefreshRequest(destIndex)); + SearchResponse sourceData = client().prepareSearch(sourceIndex).setTrackTotalHits(true).setSize(1000).get(); + for (SearchHit hit : sourceData.getHits()) { + Map destDoc = getDestDoc(config, hit); + Map resultsObject = getFieldValue(destDoc, "ml"); + assertThat(getFieldValue(resultsObject, predictedClassField), is(in(KEYWORD_FIELD_VALUES))); + assertThat(getFieldValue(resultsObject, "is_training"), is(destDoc.containsKey(KEYWORD_FIELD))); + assertTopClasses(resultsObject, 2, KEYWORD_FIELD, KEYWORD_FIELD_VALUES); + @SuppressWarnings("unchecked") + List> importanceArray = (List>)resultsObject.get("feature_importance"); + assertThat(importanceArray, hasSize(greaterThan(0))); + } + + assertProgressComplete(jobId); + assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L)); + assertModelStatePersisted(stateDocId()); + assertInferenceModelPersisted(jobId); + assertMlResultsFieldMappings(destIndex, predictedClassField, "keyword"); + assertThatAuditMessagesMatch(jobId, + "Created analytics with analysis type [classification]", + "Estimated memory usage for this analytics to be", + "Starting analytics on node", + "Started analytics", + expectedDestIndexAuditMessage(), + "Started reindexing to destination index [" + destIndex + "]", + "Finished reindexing to destination index [" + destIndex + "]", + "Started loading data", + "Started analyzing", + "Started writing results", + "Finished analysis"); + assertEvaluation(KEYWORD_FIELD, KEYWORD_FIELD_VALUES, "ml." + predictedClassField); + } + public void testWithOnlyTrainingRowsAndTrainingPercentIsHundred() throws Exception { initialize("classification_only_training_data_and_training_percent_is_100"); String predictedClassField = KEYWORD_FIELD + "_prediction"; @@ -455,7 +513,7 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase { String sourceIndex = "classification_two_jobs_with_same_randomize_seed_source"; String dependentVariable = KEYWORD_FIELD; - createIndex(sourceIndex); + createIndex(sourceIndex, false); // We use 100 rows as we can't set this too low. If too low it is possible // we only train with rows of one of the two classes which leads to a failure. indexData(sourceIndex, 100, 0, dependentVariable); @@ -595,28 +653,65 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase { } private void initialize(String jobId) { + initialize(jobId, false); + } + + private void initialize(String jobId, boolean isDatastream) { this.jobId = jobId; this.sourceIndex = jobId + "_source_index"; this.destIndex = sourceIndex + "_results"; this.analysisUsesExistingDestIndex = randomBoolean(); - createIndex(sourceIndex); + createIndex(sourceIndex, isDatastream); if (analysisUsesExistingDestIndex) { - createIndex(destIndex); + createIndex(destIndex, false); } } - private static void createIndex(String index) { - client().admin().indices().prepareCreate(index) - .addMapping("_doc", - BOOLEAN_FIELD, "type=boolean", - NUMERICAL_FIELD, "type=double", - DISCRETE_NUMERICAL_FIELD, "type=integer", - TEXT_FIELD, "type=text", - KEYWORD_FIELD, "type=keyword", - NESTED_FIELD, "type=keyword", - ALIAS_TO_KEYWORD_FIELD, "type=alias,path=" + KEYWORD_FIELD, - ALIAS_TO_NESTED_FIELD, "type=alias,path=" + NESTED_FIELD) - .get(); + private static void createIndex(String index, boolean isDatastream) { + String mapping = "{\n" + + " \"properties\": {\n" + + " \"time\": {\n" + + " \"type\": \"date\"\n" + + " }," + + " \""+ BOOLEAN_FIELD + "\": {\n" + + " \"type\": \"boolean\"\n" + + " }," + + " \""+ NUMERICAL_FIELD + "\": {\n" + + " \"type\": \"double\"\n" + + " }," + + " \""+ DISCRETE_NUMERICAL_FIELD + "\": {\n" + + " \"type\": \"integer\"\n" + + " }," + + " \""+ TEXT_FIELD + "\": {\n" + + " \"type\": \"text\"\n" + + " }," + + " \""+ KEYWORD_FIELD + "\": {\n" + + " \"type\": \"keyword\"\n" + + " }," + + " \""+ NESTED_FIELD + "\": {\n" + + " \"type\": \"keyword\"\n" + + " }," + + " \""+ ALIAS_TO_KEYWORD_FIELD + "\": {\n" + + " \"type\": \"alias\",\n" + + " \"path\": \"" + KEYWORD_FIELD + "\"\n" + + " }," + + " \""+ ALIAS_TO_NESTED_FIELD + "\": {\n" + + " \"type\": \"alias\",\n" + + " \"path\": \"" + NESTED_FIELD + "\"\n" + + " }" + + " }\n" + + " }"; + if (isDatastream) { + try { + createDataStreamAndTemplate(index, "time", mapping); + } catch (IOException ex) { + throw new ElasticsearchException(ex); + } + } else { + client().admin().indices().prepareCreate(index) + .addMapping("_doc", mapping, XContentType.JSON) + .get(); + } } private static void indexData(String sourceIndex, int numTrainingRows, int numNonTrainingRows, String dependentVariable) { @@ -630,7 +725,7 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase { TEXT_FIELD, KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()), KEYWORD_FIELD, KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()), NESTED_FIELD, KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size())); - IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()); + IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE); bulkRequestBuilder.add(indexRequest); } for (int i = numTrainingRows; i < numTrainingRows + numNonTrainingRows; i++) { @@ -655,7 +750,7 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase { if (NESTED_FIELD.equals(dependentVariable) == false) { source.addAll(Arrays.asList(NESTED_FIELD, KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()))); } - IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()); + IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE); bulkRequestBuilder.add(indexRequest); } BulkResponse bulkResponse = bulkRequestBuilder.get(); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java index 0f66ac9987b..2fcb0a3d5f5 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java @@ -101,6 +101,50 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase { waitUntilJobIsClosed(job.getId()); } + public void testLookbackOnlyDataStream() throws Exception { + String mapping = "{\n" + + " \"properties\": {\n" + + " \"time\": {\n" + + " \"type\": \"date\"\n" + + " }" + + " }\n" + + " }"; + createDataStreamAndTemplate("datafeed_data_stream", "time", mapping); + long numDocs = randomIntBetween(32, 2048); + long now = System.currentTimeMillis(); + long oneWeekAgo = now - 604800000; + long twoWeeksAgo = oneWeekAgo - 604800000; + indexDocs(logger, "datafeed_data_stream", "_doc", numDocs, twoWeeksAgo, oneWeekAgo); + + client().admin().cluster().prepareHealth("datafeed_data_stream").setWaitForYellowStatus().get(); + + Job.Builder job = createScheduledJob("lookback-data-stream-job"); + registerJob(job); + PutJobAction.Response putJobResponse = putJob(job); + assertThat(putJobResponse.getResponse().getJobVersion(), equalTo(Version.CURRENT)); + openJob(job.getId()); + assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED)); + + DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", + job.getId(), + Collections.singletonList("datafeed_data_stream")); + registerDatafeed(datafeedConfig); + putDatafeed(datafeedConfig); + + startDatafeed(datafeedConfig.getId(), 0L, now); + assertBusy(() -> { + DataCounts dataCounts = getDataCounts(job.getId()); + assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs)); + assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L)); + + GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedConfig.getId()); + GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet(); + assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED)); + }, 60, TimeUnit.SECONDS); + + waitUntilJobIsClosed(job.getId()); + } + public void testDatafeedTimingStats_DatafeedRecreated() throws Exception { client().admin().indices().prepareCreate("data") .addMapping("type", "time", "type=date") diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java index be7399bc54e..e69959db49c 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java @@ -6,11 +6,16 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; +import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkModule; @@ -270,6 +275,20 @@ abstract class MlNativeIntegTestCase extends ESIntegTestCase { } } + protected static void createDataStreamAndTemplate(String dataStreamName, String timeField, String mapping) throws IOException { + client().execute(PutComposableIndexTemplateAction.INSTANCE, + new PutComposableIndexTemplateAction.Request(dataStreamName + "_template") + .indexTemplate(new ComposableIndexTemplate(Collections.singletonList(dataStreamName), + new Template(null, new CompressedXContent("{\"_doc\":" + mapping + "}"), null), + null, + null, + null, + null, + new ComposableIndexTemplate.DataStreamTemplate(timeField)))) + .actionGet(); + client().execute(CreateDataStreamAction.INSTANCE, new CreateDataStreamAction.Request(dataStreamName)).actionGet(); + } + public static class MockPainlessScriptEngine extends MockScriptEngine { public static final String NAME = "painless"; diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java index bcb3eb64c87..bf5b36e4df7 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteResponse; @@ -13,6 +15,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; @@ -24,6 +27,8 @@ import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression; import org.junit.After; import java.util.Arrays; +import java.io.IOException; +import java.time.Instant; import java.util.List; import java.util.Map; import java.util.Set; @@ -392,6 +397,65 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase { assertMlResultsFieldMappings(destIndex, predictedClassField, "double"); } + public void testWithDatastream() throws Exception { + initialize("regression_with_datastream"); + String predictedClassField = DEPENDENT_VARIABLE_FIELD + "_prediction"; + indexData(sourceIndex, 300, 50, true); + + DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, + new Regression( + DEPENDENT_VARIABLE_FIELD, + BoostedTreeParams.builder().setNumTopFeatureImportanceValues(1).build(), + null, + null, + null, + null, + null) + ); + putAnalytics(config); + + assertIsStopped(jobId); + assertProgressIsZero(jobId); + + startAnalytics(jobId); + waitUntilAnalyticsIsStopped(jobId); + + SearchResponse sourceData = client().prepareSearch(sourceIndex).setTrackTotalHits(true).setSize(1000).get(); + for (SearchHit hit : sourceData.getHits()) { + Map destDoc = getDestDoc(config, hit); + Map resultsObject = getMlResultsObjectFromDestDoc(destDoc); + + assertThat(resultsObject.containsKey(predictedClassField), is(true)); + assertThat(resultsObject.containsKey("is_training"), is(true)); + assertThat(resultsObject.get("is_training"), is(destDoc.containsKey(DEPENDENT_VARIABLE_FIELD))); + @SuppressWarnings("unchecked") + List> importanceArray = (List>)resultsObject.get("feature_importance"); + assertThat(importanceArray, hasSize(greaterThan(0))); + assertThat( + importanceArray.stream().filter(m -> NUMERICAL_FEATURE_FIELD.equals(m.get("feature_name")) + || DISCRETE_NUMERICAL_FEATURE_FIELD.equals(m.get("feature_name"))).findAny(), + isPresent()); + } + + assertProgressComplete(jobId); + assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L)); + assertModelStatePersisted(stateDocId()); + assertInferenceModelPersisted(jobId); + assertMlResultsFieldMappings(destIndex, predictedClassField, "double"); + assertThatAuditMessagesMatch(jobId, + "Created analytics with analysis type [regression]", + "Estimated memory usage for this analytics to be", + "Starting analytics on node", + "Started analytics", + "Creating destination index [" + destIndex + "]", + "Started reindexing to destination index [" + destIndex + "]", + "Finished reindexing to destination index [" + destIndex + "]", + "Started loading data", + "Started analyzing", + "Started writing results", + "Finished analysis"); + } + private void initialize(String jobId) { this.jobId = jobId; this.sourceIndex = jobId + "_source_index"; @@ -399,12 +463,37 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase { } static void indexData(String sourceIndex, int numTrainingRows, int numNonTrainingRows) { - client().admin().indices().prepareCreate(sourceIndex) - .addMapping("_doc", - NUMERICAL_FEATURE_FIELD, "type=double", - DISCRETE_NUMERICAL_FEATURE_FIELD, "type=long", - DEPENDENT_VARIABLE_FIELD, "type=double") - .get(); + indexData(sourceIndex, numTrainingRows, numNonTrainingRows, false); + } + + static void indexData(String sourceIndex, int numTrainingRows, int numNonTrainingRows, boolean dataStream) { + String mapping = "{\n" + + " \"properties\": {\n" + + " \"time\": {\n" + + " \"type\": \"date\"\n" + + " }," + + " \""+ NUMERICAL_FEATURE_FIELD + "\": {\n" + + " \"type\": \"double\"\n" + + " }," + + " \"" + DISCRETE_NUMERICAL_FEATURE_FIELD + "\": {\n" + + " \"type\": \"long\"\n" + + " }," + + " \"" + DEPENDENT_VARIABLE_FIELD + "\": {\n" + + " \"type\": \"double\"\n" + + " }" + + " }\n" + + " }"; + if (dataStream) { + try { + createDataStreamAndTemplate(sourceIndex, "time", mapping); + } catch (IOException ex) { + throw new ElasticsearchException(ex); + } + } else { + client().admin().indices().prepareCreate(sourceIndex) + .addMapping("_doc", mapping, XContentType.JSON) + .get(); + } BulkRequestBuilder bulkRequestBuilder = client().prepareBulk() .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); @@ -412,15 +501,17 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase { List source = Arrays.asList( NUMERICAL_FEATURE_FIELD, NUMERICAL_FEATURE_VALUES.get(i % NUMERICAL_FEATURE_VALUES.size()), DISCRETE_NUMERICAL_FEATURE_FIELD, DISCRETE_NUMERICAL_FEATURE_VALUES.get(i % DISCRETE_NUMERICAL_FEATURE_VALUES.size()), - DEPENDENT_VARIABLE_FIELD, DEPENDENT_VARIABLE_VALUES.get(i % DEPENDENT_VARIABLE_VALUES.size())); - IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()); + DEPENDENT_VARIABLE_FIELD, DEPENDENT_VARIABLE_VALUES.get(i % DEPENDENT_VARIABLE_VALUES.size()), + "time", Instant.now().toEpochMilli()); + IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE); bulkRequestBuilder.add(indexRequest); } for (int i = numTrainingRows; i < numTrainingRows + numNonTrainingRows; i++) { List source = Arrays.asList( NUMERICAL_FEATURE_FIELD, NUMERICAL_FEATURE_VALUES.get(i % NUMERICAL_FEATURE_VALUES.size()), - DISCRETE_NUMERICAL_FEATURE_FIELD, DISCRETE_NUMERICAL_FEATURE_VALUES.get(i % DISCRETE_NUMERICAL_FEATURE_VALUES.size())); - IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()); + DISCRETE_NUMERICAL_FEATURE_FIELD, DISCRETE_NUMERICAL_FEATURE_VALUES.get(i % DISCRETE_NUMERICAL_FEATURE_VALUES.size()), + "time", Instant.now().toEpochMilli()); + IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE); bulkRequestBuilder.add(indexRequest); } BulkResponse bulkResponse = bulkRequestBuilder.get(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index aeee77bd458..8901f95665f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.support; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.bulk.BulkItemResponse; @@ -250,18 +251,22 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { } public static void indexDocs(Logger logger, String index, long numDocs, long start, long end) { + indexDocs(logger, index, "type", numDocs, start, end); + } + + public static void indexDocs(Logger logger, String index, String type, long numDocs, long start, long end) { int maxDelta = (int) (end - start - 1); BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); for (int i = 0; i < numDocs; i++) { - IndexRequest indexRequest = new IndexRequest(index, "type"); + IndexRequest indexRequest = new IndexRequest(index, type); long timestamp = start + randomIntBetween(0, maxDelta); assert timestamp >= start && timestamp < end; - indexRequest.source("time", timestamp); + indexRequest.source("time", timestamp).opType(DocWriteRequest.OpType.CREATE); bulkRequestBuilder.add(indexRequest); } BulkResponse bulkResponse = bulkRequestBuilder - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .get(); + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); if (bulkResponse.hasFailures()) { int failures = 0; for (BulkItemResponse itemResponse : bulkResponse) { diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java index 6f65f12e5b5..4e8db8e977d 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java @@ -95,6 +95,34 @@ public class TransformPivotRestIT extends TransformRestTestCase { assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_26", 3.918918918); } + public void testSimpleDataStreamPivot() throws Exception { + String indexName = "reviews_data_stream"; + createReviewsIndex(indexName, 1000, "date", true); + String transformId = "simple_data_stream_pivot"; + String transformIndex = "pivot_reviews_data_stream"; + setupDataAccessRole(DATA_ACCESS_ROLE, indexName, transformIndex); + createPivotReviewsTransform(transformId, + transformIndex, + null, + null, + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS, + indexName); + + startAndWaitForTransform(transformId, transformIndex, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS); + + // we expect 27 documents as there shall be 27 user_id's + Map indexStats = getAsMap(transformIndex + "/_stats"); + assertEquals(27, XContentMapValues.extractValue("_all.total.docs.count", indexStats)); + + // get and check some users + assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_0", 3.776978417); + assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_5", 3.72); + assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_11", 3.846153846); + assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_20", 3.769230769); + assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_26", 3.918918918); + client().performRequest(new Request("DELETE", "/_data_stream/" + indexName)); + } + public void testSimplePivotWithQuery() throws Exception { String transformId = "simple_pivot_with_query"; String transformIndex = "pivot_reviews_user_id_above_20"; diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java index b18df5ba8e5..70df129c674 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java @@ -80,7 +80,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase { return super.buildClient(settings, hosts); } - protected void createReviewsIndex(String indexName, int numDocs, String dateType) throws IOException { + protected void createReviewsIndex(String indexName, int numDocs, String dateType, boolean isDataStream) throws IOException { int[] distributionTable = { 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 4, 4, 4, 3, 3, 2, 1, 1, 1 }; // create mapping @@ -110,10 +110,25 @@ public abstract class TransformRestTestCase extends ESRestTestCase { .endObject(); } builder.endObject(); - final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON); - Request req = new Request("PUT", indexName); - req.setEntity(entity); - client().performRequest(req); + if (isDataStream) { + Request createCompositeTemplate = new Request("PUT", "_index_template/" + indexName + "_template"); + createCompositeTemplate.setJsonEntity( + "{\n" + + " \"index_patterns\": [ \"" + indexName + "\" ],\n" + + " \"data_stream\": {\n" + + " \"timestamp_field\": \"timestamp\"\n" + + " },\n" + + " \"template\": \n" + Strings.toString(builder) + + "}" + ); + client().performRequest(createCompositeTemplate); + client().performRequest(new Request("PUT", "_data_stream/" + indexName)); + } else { + final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON); + Request req = new Request("PUT", indexName); + req.setEntity(entity); + client().performRequest(req); + } } // create index @@ -122,7 +137,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase { int hour = 10; int min = 10; for (int i = 0; i < numDocs; i++) { - bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n"); + bulk.append("{\"create\":{\"_index\":\"" + indexName + "\"}}\n"); long user = Math.round(Math.pow(i * 31 % 1000, distributionTable[i % distributionTable.length]) % 27); int stars = distributionTable[(i * 33) % distributionTable.length]; long business = Math.round(Math.pow(user * stars, distributionTable[i % distributionTable.length]) % 13); @@ -183,7 +198,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase { } protected void createReviewsIndex(String indexName) throws IOException { - createReviewsIndex(indexName, 1000, "date"); + createReviewsIndex(indexName, 1000, "date", false); } protected void createPivotReviewsTransform(String transformId, String transformIndex, String query) throws IOException { @@ -196,7 +211,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase { } protected void createReviewsIndexNano() throws IOException { - createReviewsIndex(REVIEWS_DATE_NANO_INDEX_NAME, 1000, "date_nanos"); + createReviewsIndex(REVIEWS_DATE_NANO_INDEX_NAME, 1000, "date_nanos", false); } protected void createContinuousPivotReviewsTransform(String transformId, String transformIndex, String authHeader) throws IOException { @@ -226,8 +241,12 @@ public abstract class TransformRestTestCase extends ESRestTestCase { assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); } - protected void createPivotReviewsTransform(String transformId, String transformIndex, String query, String pipeline, String authHeader) - throws IOException { + protected void createPivotReviewsTransform(String transformId, + String transformIndex, + String query, + String pipeline, + String authHeader, + String sourceIndex) throws IOException { final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, authHeader); String config = "{"; @@ -239,9 +258,9 @@ public abstract class TransformRestTestCase extends ESRestTestCase { } if (query != null) { - config += " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\", \"query\":{" + query + "}},"; + config += " \"source\": {\"index\":\"" + sourceIndex + "\", \"query\":{" + query + "}},"; } else { - config += " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"; + config += " \"source\": {\"index\":\"" + sourceIndex + "\"},"; } config += " \"pivot\": {" @@ -264,6 +283,11 @@ public abstract class TransformRestTestCase extends ESRestTestCase { assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); } + protected void createPivotReviewsTransform(String transformId, String transformIndex, String query, String pipeline, String authHeader) + throws IOException { + createPivotReviewsTransform(transformId, transformIndex, query, pipeline, authHeader, REVIEWS_INDEX_NAME); + } + protected void startTransform(String transformId) throws IOException { startTransform(transformId, null); } diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java index 7940bbf1372..a17a1c0d7e8 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java @@ -64,7 +64,7 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase { public void testForceStopFailedTransform() throws Exception { String transformId = "test-force-stop-failed-transform"; - createReviewsIndex(REVIEWS_INDEX_NAME, 10, "date"); + createReviewsIndex(REVIEWS_INDEX_NAME, 10, "date", false); String transformIndex = "failure_pivot_reviews"; createDestinationIndexWithBadMapping(transformIndex); createContinuousPivotReviewsTransform(transformId, transformIndex, null); @@ -102,7 +102,7 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase { public void testStartFailedTransform() throws Exception { String transformId = "test-force-start-failed-transform"; - createReviewsIndex(REVIEWS_INDEX_NAME, 10, "date"); + createReviewsIndex(REVIEWS_INDEX_NAME, 10, "date", false); String transformIndex = "failure_pivot_reviews"; createDestinationIndexWithBadMapping(transformIndex); createContinuousPivotReviewsTransform(transformId, transformIndex, null); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java index 174f21e0a05..66678478299 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java @@ -367,6 +367,7 @@ public class TransportUpdateTransformAction extends TransportTasksAction