[7.x] [ML] allow data streams to be expanded for analytics and transforms (#58280) (#58455)

This commits allows data streams to be a valid source for analytics and transforms.

Data streams are fairly transparent and our `_search` and `_reindex` actions work without error.

For `_transforms` the check-pointing works as desired as well. Data streams are effectively treated as an `alias` and the backing index values are stored within checkpointing information.
This commit is contained in:
Benjamin Trent 2020-06-23 14:40:35 -04:00 committed by GitHub
parent 0cc84d3caf
commit a9b868b7a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 353 additions and 45 deletions

View File

@ -214,6 +214,7 @@ public final class SourceDestValidator {
indexNameExpressionResolver.concreteIndexNames(
state,
DEFAULT_INDICES_OPTIONS_FOR_VALIDATION,
true,
resolvedSource.toArray(new String[0])
)
)

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.integration;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
@ -18,6 +19,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
@ -35,6 +37,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.evaluation.classification.Preci
import org.elasticsearch.xpack.core.ml.dataframe.evaluation.classification.Recall;
import org.junit.After;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -141,6 +144,61 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertEvaluation(KEYWORD_FIELD, KEYWORD_FIELD_VALUES, "ml." + predictedClassField);
}
public void testWithDatastreams() throws Exception {
initialize("classification_with_datastreams", true);
String predictedClassField = KEYWORD_FIELD + "_prediction";
indexData(sourceIndex, 300, 50, KEYWORD_FIELD);
DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null,
new Classification(
KEYWORD_FIELD,
BoostedTreeParams.builder().setNumTopFeatureImportanceValues(1).build(),
null,
null,
null,
null,
null));
putAnalytics(config);
assertIsStopped(jobId);
assertProgressIsZero(jobId);
startAnalytics(jobId);
waitUntilAnalyticsIsStopped(jobId);
client().admin().indices().refresh(new RefreshRequest(destIndex));
SearchResponse sourceData = client().prepareSearch(sourceIndex).setTrackTotalHits(true).setSize(1000).get();
for (SearchHit hit : sourceData.getHits()) {
Map<String, Object> destDoc = getDestDoc(config, hit);
Map<String, Object> resultsObject = getFieldValue(destDoc, "ml");
assertThat(getFieldValue(resultsObject, predictedClassField), is(in(KEYWORD_FIELD_VALUES)));
assertThat(getFieldValue(resultsObject, "is_training"), is(destDoc.containsKey(KEYWORD_FIELD)));
assertTopClasses(resultsObject, 2, KEYWORD_FIELD, KEYWORD_FIELD_VALUES);
@SuppressWarnings("unchecked")
List<Map<String, Object>> importanceArray = (List<Map<String, Object>>)resultsObject.get("feature_importance");
assertThat(importanceArray, hasSize(greaterThan(0)));
}
assertProgressComplete(jobId);
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
assertModelStatePersisted(stateDocId());
assertInferenceModelPersisted(jobId);
assertMlResultsFieldMappings(destIndex, predictedClassField, "keyword");
assertThatAuditMessagesMatch(jobId,
"Created analytics with analysis type [classification]",
"Estimated memory usage for this analytics to be",
"Starting analytics on node",
"Started analytics",
expectedDestIndexAuditMessage(),
"Started reindexing to destination index [" + destIndex + "]",
"Finished reindexing to destination index [" + destIndex + "]",
"Started loading data",
"Started analyzing",
"Started writing results",
"Finished analysis");
assertEvaluation(KEYWORD_FIELD, KEYWORD_FIELD_VALUES, "ml." + predictedClassField);
}
public void testWithOnlyTrainingRowsAndTrainingPercentIsHundred() throws Exception {
initialize("classification_only_training_data_and_training_percent_is_100");
String predictedClassField = KEYWORD_FIELD + "_prediction";
@ -455,7 +513,7 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
String sourceIndex = "classification_two_jobs_with_same_randomize_seed_source";
String dependentVariable = KEYWORD_FIELD;
createIndex(sourceIndex);
createIndex(sourceIndex, false);
// We use 100 rows as we can't set this too low. If too low it is possible
// we only train with rows of one of the two classes which leads to a failure.
indexData(sourceIndex, 100, 0, dependentVariable);
@ -595,29 +653,66 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
}
private void initialize(String jobId) {
initialize(jobId, false);
}
private void initialize(String jobId, boolean isDatastream) {
this.jobId = jobId;
this.sourceIndex = jobId + "_source_index";
this.destIndex = sourceIndex + "_results";
this.analysisUsesExistingDestIndex = randomBoolean();
createIndex(sourceIndex);
createIndex(sourceIndex, isDatastream);
if (analysisUsesExistingDestIndex) {
createIndex(destIndex);
createIndex(destIndex, false);
}
}
private static void createIndex(String index) {
private static void createIndex(String index, boolean isDatastream) {
String mapping = "{\n" +
" \"properties\": {\n" +
" \"time\": {\n" +
" \"type\": \"date\"\n" +
" }," +
" \""+ BOOLEAN_FIELD + "\": {\n" +
" \"type\": \"boolean\"\n" +
" }," +
" \""+ NUMERICAL_FIELD + "\": {\n" +
" \"type\": \"double\"\n" +
" }," +
" \""+ DISCRETE_NUMERICAL_FIELD + "\": {\n" +
" \"type\": \"integer\"\n" +
" }," +
" \""+ TEXT_FIELD + "\": {\n" +
" \"type\": \"text\"\n" +
" }," +
" \""+ KEYWORD_FIELD + "\": {\n" +
" \"type\": \"keyword\"\n" +
" }," +
" \""+ NESTED_FIELD + "\": {\n" +
" \"type\": \"keyword\"\n" +
" }," +
" \""+ ALIAS_TO_KEYWORD_FIELD + "\": {\n" +
" \"type\": \"alias\",\n" +
" \"path\": \"" + KEYWORD_FIELD + "\"\n" +
" }," +
" \""+ ALIAS_TO_NESTED_FIELD + "\": {\n" +
" \"type\": \"alias\",\n" +
" \"path\": \"" + NESTED_FIELD + "\"\n" +
" }" +
" }\n" +
" }";
if (isDatastream) {
try {
createDataStreamAndTemplate(index, "time", mapping);
} catch (IOException ex) {
throw new ElasticsearchException(ex);
}
} else {
client().admin().indices().prepareCreate(index)
.addMapping("_doc",
BOOLEAN_FIELD, "type=boolean",
NUMERICAL_FIELD, "type=double",
DISCRETE_NUMERICAL_FIELD, "type=integer",
TEXT_FIELD, "type=text",
KEYWORD_FIELD, "type=keyword",
NESTED_FIELD, "type=keyword",
ALIAS_TO_KEYWORD_FIELD, "type=alias,path=" + KEYWORD_FIELD,
ALIAS_TO_NESTED_FIELD, "type=alias,path=" + NESTED_FIELD)
.addMapping("_doc", mapping, XContentType.JSON)
.get();
}
}
private static void indexData(String sourceIndex, int numTrainingRows, int numNonTrainingRows, String dependentVariable) {
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()
@ -630,7 +725,7 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
TEXT_FIELD, KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()),
KEYWORD_FIELD, KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()),
NESTED_FIELD, KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()));
IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray());
IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE);
bulkRequestBuilder.add(indexRequest);
}
for (int i = numTrainingRows; i < numTrainingRows + numNonTrainingRows; i++) {
@ -655,7 +750,7 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
if (NESTED_FIELD.equals(dependentVariable) == false) {
source.addAll(Arrays.asList(NESTED_FIELD, KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size())));
}
IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray());
IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE);
bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder.get();

View File

@ -101,6 +101,50 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
waitUntilJobIsClosed(job.getId());
}
public void testLookbackOnlyDataStream() throws Exception {
String mapping = "{\n" +
" \"properties\": {\n" +
" \"time\": {\n" +
" \"type\": \"date\"\n" +
" }" +
" }\n" +
" }";
createDataStreamAndTemplate("datafeed_data_stream", "time", mapping);
long numDocs = randomIntBetween(32, 2048);
long now = System.currentTimeMillis();
long oneWeekAgo = now - 604800000;
long twoWeeksAgo = oneWeekAgo - 604800000;
indexDocs(logger, "datafeed_data_stream", "_doc", numDocs, twoWeeksAgo, oneWeekAgo);
client().admin().cluster().prepareHealth("datafeed_data_stream").setWaitForYellowStatus().get();
Job.Builder job = createScheduledJob("lookback-data-stream-job");
registerJob(job);
PutJobAction.Response putJobResponse = putJob(job);
assertThat(putJobResponse.getResponse().getJobVersion(), equalTo(Version.CURRENT));
openJob(job.getId());
assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));
DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed",
job.getId(),
Collections.singletonList("datafeed_data_stream"));
registerDatafeed(datafeedConfig);
putDatafeed(datafeedConfig);
startDatafeed(datafeedConfig.getId(), 0L, now);
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedConfig.getId());
GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet();
assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED));
}, 60, TimeUnit.SECONDS);
waitUntilJobIsClosed(job.getId());
}
public void testDatafeedTimingStats_DatafeedRecreated() throws Exception {
client().admin().indices().prepareCreate("data")
.addMapping("type", "time", "type=date")

View File

@ -6,11 +6,16 @@
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkModule;
@ -270,6 +275,20 @@ abstract class MlNativeIntegTestCase extends ESIntegTestCase {
}
}
protected static void createDataStreamAndTemplate(String dataStreamName, String timeField, String mapping) throws IOException {
client().execute(PutComposableIndexTemplateAction.INSTANCE,
new PutComposableIndexTemplateAction.Request(dataStreamName + "_template")
.indexTemplate(new ComposableIndexTemplate(Collections.singletonList(dataStreamName),
new Template(null, new CompressedXContent("{\"_doc\":" + mapping + "}"), null),
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate(timeField))))
.actionGet();
client().execute(CreateDataStreamAction.INSTANCE, new CreateDataStreamAction.Request(dataStreamName)).actionGet();
}
public static class MockPainlessScriptEngine extends MockScriptEngine {
public static final String NAME = "painless";

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
@ -13,6 +15,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
@ -24,6 +27,8 @@ import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression;
import org.junit.After;
import java.util.Arrays;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -392,6 +397,65 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertMlResultsFieldMappings(destIndex, predictedClassField, "double");
}
public void testWithDatastream() throws Exception {
initialize("regression_with_datastream");
String predictedClassField = DEPENDENT_VARIABLE_FIELD + "_prediction";
indexData(sourceIndex, 300, 50, true);
DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null,
new Regression(
DEPENDENT_VARIABLE_FIELD,
BoostedTreeParams.builder().setNumTopFeatureImportanceValues(1).build(),
null,
null,
null,
null,
null)
);
putAnalytics(config);
assertIsStopped(jobId);
assertProgressIsZero(jobId);
startAnalytics(jobId);
waitUntilAnalyticsIsStopped(jobId);
SearchResponse sourceData = client().prepareSearch(sourceIndex).setTrackTotalHits(true).setSize(1000).get();
for (SearchHit hit : sourceData.getHits()) {
Map<String, Object> destDoc = getDestDoc(config, hit);
Map<String, Object> resultsObject = getMlResultsObjectFromDestDoc(destDoc);
assertThat(resultsObject.containsKey(predictedClassField), is(true));
assertThat(resultsObject.containsKey("is_training"), is(true));
assertThat(resultsObject.get("is_training"), is(destDoc.containsKey(DEPENDENT_VARIABLE_FIELD)));
@SuppressWarnings("unchecked")
List<Map<String, Object>> importanceArray = (List<Map<String, Object>>)resultsObject.get("feature_importance");
assertThat(importanceArray, hasSize(greaterThan(0)));
assertThat(
importanceArray.stream().filter(m -> NUMERICAL_FEATURE_FIELD.equals(m.get("feature_name"))
|| DISCRETE_NUMERICAL_FEATURE_FIELD.equals(m.get("feature_name"))).findAny(),
isPresent());
}
assertProgressComplete(jobId);
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
assertModelStatePersisted(stateDocId());
assertInferenceModelPersisted(jobId);
assertMlResultsFieldMappings(destIndex, predictedClassField, "double");
assertThatAuditMessagesMatch(jobId,
"Created analytics with analysis type [regression]",
"Estimated memory usage for this analytics to be",
"Starting analytics on node",
"Started analytics",
"Creating destination index [" + destIndex + "]",
"Started reindexing to destination index [" + destIndex + "]",
"Finished reindexing to destination index [" + destIndex + "]",
"Started loading data",
"Started analyzing",
"Started writing results",
"Finished analysis");
}
private void initialize(String jobId) {
this.jobId = jobId;
this.sourceIndex = jobId + "_source_index";
@ -399,12 +463,37 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
}
static void indexData(String sourceIndex, int numTrainingRows, int numNonTrainingRows) {
indexData(sourceIndex, numTrainingRows, numNonTrainingRows, false);
}
static void indexData(String sourceIndex, int numTrainingRows, int numNonTrainingRows, boolean dataStream) {
String mapping = "{\n" +
" \"properties\": {\n" +
" \"time\": {\n" +
" \"type\": \"date\"\n" +
" }," +
" \""+ NUMERICAL_FEATURE_FIELD + "\": {\n" +
" \"type\": \"double\"\n" +
" }," +
" \"" + DISCRETE_NUMERICAL_FEATURE_FIELD + "\": {\n" +
" \"type\": \"long\"\n" +
" }," +
" \"" + DEPENDENT_VARIABLE_FIELD + "\": {\n" +
" \"type\": \"double\"\n" +
" }" +
" }\n" +
" }";
if (dataStream) {
try {
createDataStreamAndTemplate(sourceIndex, "time", mapping);
} catch (IOException ex) {
throw new ElasticsearchException(ex);
}
} else {
client().admin().indices().prepareCreate(sourceIndex)
.addMapping("_doc",
NUMERICAL_FEATURE_FIELD, "type=double",
DISCRETE_NUMERICAL_FEATURE_FIELD, "type=long",
DEPENDENT_VARIABLE_FIELD, "type=double")
.addMapping("_doc", mapping, XContentType.JSON)
.get();
}
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
@ -412,15 +501,17 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
List<Object> source = Arrays.asList(
NUMERICAL_FEATURE_FIELD, NUMERICAL_FEATURE_VALUES.get(i % NUMERICAL_FEATURE_VALUES.size()),
DISCRETE_NUMERICAL_FEATURE_FIELD, DISCRETE_NUMERICAL_FEATURE_VALUES.get(i % DISCRETE_NUMERICAL_FEATURE_VALUES.size()),
DEPENDENT_VARIABLE_FIELD, DEPENDENT_VARIABLE_VALUES.get(i % DEPENDENT_VARIABLE_VALUES.size()));
IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray());
DEPENDENT_VARIABLE_FIELD, DEPENDENT_VARIABLE_VALUES.get(i % DEPENDENT_VARIABLE_VALUES.size()),
"time", Instant.now().toEpochMilli());
IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE);
bulkRequestBuilder.add(indexRequest);
}
for (int i = numTrainingRows; i < numTrainingRows + numNonTrainingRows; i++) {
List<Object> source = Arrays.asList(
NUMERICAL_FEATURE_FIELD, NUMERICAL_FEATURE_VALUES.get(i % NUMERICAL_FEATURE_VALUES.size()),
DISCRETE_NUMERICAL_FEATURE_FIELD, DISCRETE_NUMERICAL_FEATURE_VALUES.get(i % DISCRETE_NUMERICAL_FEATURE_VALUES.size()));
IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray());
DISCRETE_NUMERICAL_FEATURE_FIELD, DISCRETE_NUMERICAL_FEATURE_VALUES.get(i % DISCRETE_NUMERICAL_FEATURE_VALUES.size()),
"time", Instant.now().toEpochMilli());
IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE);
bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder.get();

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.support;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
@ -250,13 +251,17 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
}
public static void indexDocs(Logger logger, String index, long numDocs, long start, long end) {
indexDocs(logger, index, "type", numDocs, start, end);
}
public static void indexDocs(Logger logger, String index, String type, long numDocs, long start, long end) {
int maxDelta = (int) (end - start - 1);
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
for (int i = 0; i < numDocs; i++) {
IndexRequest indexRequest = new IndexRequest(index, "type");
IndexRequest indexRequest = new IndexRequest(index, type);
long timestamp = start + randomIntBetween(0, maxDelta);
assert timestamp >= start && timestamp < end;
indexRequest.source("time", timestamp);
indexRequest.source("time", timestamp).opType(DocWriteRequest.OpType.CREATE);
bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder

View File

@ -95,6 +95,34 @@ public class TransformPivotRestIT extends TransformRestTestCase {
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_26", 3.918918918);
}
public void testSimpleDataStreamPivot() throws Exception {
String indexName = "reviews_data_stream";
createReviewsIndex(indexName, 1000, "date", true);
String transformId = "simple_data_stream_pivot";
String transformIndex = "pivot_reviews_data_stream";
setupDataAccessRole(DATA_ACCESS_ROLE, indexName, transformIndex);
createPivotReviewsTransform(transformId,
transformIndex,
null,
null,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS,
indexName);
startAndWaitForTransform(transformId, transformIndex, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
// we expect 27 documents as there shall be 27 user_id's
Map<String, Object> indexStats = getAsMap(transformIndex + "/_stats");
assertEquals(27, XContentMapValues.extractValue("_all.total.docs.count", indexStats));
// get and check some users
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_0", 3.776978417);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_5", 3.72);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_11", 3.846153846);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_20", 3.769230769);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_26", 3.918918918);
client().performRequest(new Request("DELETE", "/_data_stream/" + indexName));
}
public void testSimplePivotWithQuery() throws Exception {
String transformId = "simple_pivot_with_query";
String transformIndex = "pivot_reviews_user_id_above_20";

View File

@ -80,7 +80,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
return super.buildClient(settings, hosts);
}
protected void createReviewsIndex(String indexName, int numDocs, String dateType) throws IOException {
protected void createReviewsIndex(String indexName, int numDocs, String dateType, boolean isDataStream) throws IOException {
int[] distributionTable = { 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 4, 4, 4, 3, 3, 2, 1, 1, 1 };
// create mapping
@ -110,11 +110,26 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
.endObject();
}
builder.endObject();
if (isDataStream) {
Request createCompositeTemplate = new Request("PUT", "_index_template/" + indexName + "_template");
createCompositeTemplate.setJsonEntity(
"{\n" +
" \"index_patterns\": [ \"" + indexName + "\" ],\n" +
" \"data_stream\": {\n" +
" \"timestamp_field\": \"timestamp\"\n" +
" },\n" +
" \"template\": \n" + Strings.toString(builder) +
"}"
);
client().performRequest(createCompositeTemplate);
client().performRequest(new Request("PUT", "_data_stream/" + indexName));
} else {
final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON);
Request req = new Request("PUT", indexName);
req.setEntity(entity);
client().performRequest(req);
}
}
// create index
final StringBuilder bulk = new StringBuilder();
@ -122,7 +137,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
int hour = 10;
int min = 10;
for (int i = 0; i < numDocs; i++) {
bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
bulk.append("{\"create\":{\"_index\":\"" + indexName + "\"}}\n");
long user = Math.round(Math.pow(i * 31 % 1000, distributionTable[i % distributionTable.length]) % 27);
int stars = distributionTable[(i * 33) % distributionTable.length];
long business = Math.round(Math.pow(user * stars, distributionTable[i % distributionTable.length]) % 13);
@ -183,7 +198,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
}
protected void createReviewsIndex(String indexName) throws IOException {
createReviewsIndex(indexName, 1000, "date");
createReviewsIndex(indexName, 1000, "date", false);
}
protected void createPivotReviewsTransform(String transformId, String transformIndex, String query) throws IOException {
@ -196,7 +211,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
}
protected void createReviewsIndexNano() throws IOException {
createReviewsIndex(REVIEWS_DATE_NANO_INDEX_NAME, 1000, "date_nanos");
createReviewsIndex(REVIEWS_DATE_NANO_INDEX_NAME, 1000, "date_nanos", false);
}
protected void createContinuousPivotReviewsTransform(String transformId, String transformIndex, String authHeader) throws IOException {
@ -226,8 +241,12 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
}
protected void createPivotReviewsTransform(String transformId, String transformIndex, String query, String pipeline, String authHeader)
throws IOException {
protected void createPivotReviewsTransform(String transformId,
String transformIndex,
String query,
String pipeline,
String authHeader,
String sourceIndex) throws IOException {
final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, authHeader);
String config = "{";
@ -239,9 +258,9 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
}
if (query != null) {
config += " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\", \"query\":{" + query + "}},";
config += " \"source\": {\"index\":\"" + sourceIndex + "\", \"query\":{" + query + "}},";
} else {
config += " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},";
config += " \"source\": {\"index\":\"" + sourceIndex + "\"},";
}
config += " \"pivot\": {"
@ -264,6 +283,11 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
}
protected void createPivotReviewsTransform(String transformId, String transformIndex, String query, String pipeline, String authHeader)
throws IOException {
createPivotReviewsTransform(transformId, transformIndex, query, pipeline, authHeader, REVIEWS_INDEX_NAME);
}
protected void startTransform(String transformId) throws IOException {
startTransform(transformId, null);
}

View File

@ -64,7 +64,7 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
public void testForceStopFailedTransform() throws Exception {
String transformId = "test-force-stop-failed-transform";
createReviewsIndex(REVIEWS_INDEX_NAME, 10, "date");
createReviewsIndex(REVIEWS_INDEX_NAME, 10, "date", false);
String transformIndex = "failure_pivot_reviews";
createDestinationIndexWithBadMapping(transformIndex);
createContinuousPivotReviewsTransform(transformId, transformIndex, null);
@ -102,7 +102,7 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
public void testStartFailedTransform() throws Exception {
String transformId = "test-force-start-failed-transform";
createReviewsIndex(REVIEWS_INDEX_NAME, 10, "date");
createReviewsIndex(REVIEWS_INDEX_NAME, 10, "date", false);
String transformIndex = "failure_pivot_reviews";
createDestinationIndexWithBadMapping(transformIndex);
createContinuousPivotReviewsTransform(transformId, transformIndex, null);

View File

@ -367,6 +367,7 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
String[] src = indexNameExpressionResolver.concreteIndexNames(
clusterState,
IndicesOptions.lenientExpandOpen(),
true,
config.getSource().getIndex()
);
// If we are running, we should verify that the destination index exists and create it if it does not