From 4836ff7bcda20d1f3c76176fd0e13d8886eeb6ee Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Fri, 26 Apr 2019 07:18:49 -0500 Subject: [PATCH] [ML] add multi node integ tests for data frames (#41508) (#41552) * [ML] adding native-multi-node-integTests for data frames' * addressing streaming issues * formatting fixes * Addressing PR comments --- .../xpack/core/XPackClientPlugin.java | 19 +- .../GetDataFrameTransformsStatsAction.java | 18 +- .../action/StartDataFrameTransformAction.java | 2 +- .../StartDataFrameTransformTaskAction.java | 2 +- ...ameTransformsStatsActionResponseTests.java | 16 +- .../qa/multi-node-tests/build.gradle | 55 +++ .../integration/DataFrameIntegTestCase.java | 332 ++++++++++++++++++ .../integration/DataFrameTransformIT.java | 60 ++++ ...portGetDataFrameTransformsStatsAction.java | 71 ++-- ...ransportStartDataFrameTransformAction.java | 9 +- 10 files changed, 531 insertions(+), 53 deletions(-) create mode 100644 x-pack/plugin/data-frame/qa/multi-node-tests/build.gradle create mode 100644 x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java create mode 100644 x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index dc8403b7bd5..a145569898e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -44,6 +44,14 @@ import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.CCRFeatureSet; import org.elasticsearch.xpack.core.dataframe.DataFrameFeatureSetUsage; import org.elasticsearch.xpack.core.dataframe.DataFrameField; +import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction; +import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction; +import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction; +import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction; +import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction; +import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction; +import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; +import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.deprecation.DeprecationInfoAction; @@ -363,7 +371,16 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl RemoveIndexLifecyclePolicyAction.INSTANCE, MoveToStepAction.INSTANCE, RetryAction.INSTANCE, - TransportFreezeIndexAction.FreezeIndexAction.INSTANCE + TransportFreezeIndexAction.FreezeIndexAction.INSTANCE, + // Data Frame + PutDataFrameTransformAction.INSTANCE, + StartDataFrameTransformAction.INSTANCE, + StartDataFrameTransformTaskAction.INSTANCE, + StopDataFrameTransformAction.INSTANCE, + DeleteDataFrameTransformAction.INSTANCE, + GetDataFrameTransformsAction.INSTANCE, + GetDataFrameTransformsStatsAction.INSTANCE, + PreviewDataFrameTransformAction.INSTANCE ); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/GetDataFrameTransformsStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/GetDataFrameTransformsStatsAction.java index 96763153240..e9c71d3718b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/GetDataFrameTransformsStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/GetDataFrameTransformsStatsAction.java @@ -56,7 +56,7 @@ public class GetDataFrameTransformsStatsAction extends Action expandedIds = Collections.emptyList(); + private List expandedIds; public Request(String id) { if (Strings.isNullOrEmpty(id) || id.equals("*")) { @@ -64,13 +64,14 @@ public class GetDataFrameTransformsStatsAction extends Action transformsStateAndStats; public Response(List transformsStateAndStats) { @@ -165,6 +166,11 @@ public class GetDataFrameTransformsStatsAction extends Action stats = new ArrayList<>(); - for (int i = 0; i < randomInt(10); ++i) { + int totalStats = randomInt(10); + for (int i = 0; i < totalStats; ++i) { stats.add(DataFrameTransformStateAndStatsTests.randomDataFrameTransformStateAndStats()); } - - return new Response(stats); + int totalErrors = randomInt(10); + List taskFailures = new ArrayList<>(totalErrors); + List nodeFailures = new ArrayList<>(totalErrors); + for (int i = 0; i < totalErrors; i++) { + taskFailures.add(new TaskOperationFailure("node1", randomLongBetween(1, 10), new Exception("error"))); + nodeFailures.add(new FailedNodeException("node1", "message", new Exception("error"))); + } + return new Response(stats, taskFailures, nodeFailures); } @Override diff --git a/x-pack/plugin/data-frame/qa/multi-node-tests/build.gradle b/x-pack/plugin/data-frame/qa/multi-node-tests/build.gradle new file mode 100644 index 00000000000..ab170d6be36 --- /dev/null +++ b/x-pack/plugin/data-frame/qa/multi-node-tests/build.gradle @@ -0,0 +1,55 @@ +apply plugin: 'elasticsearch.standalone-rest-test' +apply plugin: 'elasticsearch.rest-test' + +dependencies { + testCompile project(path: xpackModule('core'), configuration: 'default') + testCompile project(path: xpackModule('core'), configuration: 'testArtifacts') + testCompile project(path: xpackModule('data-frame'), configuration: 'runtime') +} + +// location for keys and certificates +File keystoreDir = new File(project.buildDir, 'keystore') +File nodeKey = file("$keystoreDir/testnode.pem") +File nodeCert = file("$keystoreDir/testnode.crt") +// Add key and certs to test classpath: it expects it there +task copyKeyCerts(type: Copy) { + from(project(':x-pack:plugin:core').file('src/test/resources/org/elasticsearch/xpack/security/transport/ssl/certs/simple/')) { + include 'testnode.crt', 'testnode.pem' + } + into keystoreDir +} +// Add keys and cets to test classpath: it expects it there +sourceSets.test.resources.srcDir(keystoreDir) +processTestResources.dependsOn(copyKeyCerts) + +integTestCluster { + dependsOn copyKeyCerts + setting 'xpack.security.enabled', 'true' + setting 'xpack.license.self_generated.type', 'trial' + setting 'xpack.monitoring.enabled', 'false' + setting 'xpack.security.authc.token.enabled', 'true' + setting 'xpack.security.transport.ssl.enabled', 'true' + setting 'xpack.security.transport.ssl.key', nodeKey.name + setting 'xpack.security.transport.ssl.certificate', nodeCert.name + setting 'xpack.security.transport.ssl.verification_mode', 'certificate' + setting 'xpack.security.audit.enabled', 'false' + setting 'xpack.license.self_generated.type', 'trial' + keystoreSetting 'bootstrap.password', 'x-pack-test-password' + keystoreSetting 'xpack.security.transport.ssl.secure_key_passphrase', 'testnode' + setupCommand 'setupDummyUser', + 'bin/elasticsearch-users', 'useradd', 'x_pack_rest_user', '-p', 'x-pack-test-password', '-r', 'superuser' + + numNodes = 3 + extraConfigFile nodeKey.name, nodeKey + extraConfigFile nodeCert.name, nodeCert + waitCondition = { node, ant -> + File tmpFile = new File(node.cwd, 'wait.success') + ant.get(src: "http://${node.httpUri()}/_cluster/health?wait_for_nodes=>=${numNodes}&wait_for_status=yellow", + dest: tmpFile.toString(), + username: 'x_pack_rest_user', + password: 'x-pack-test-password', + ignoreerrors: true, + retries: 10) + return tmpFile.exists() + } +} diff --git a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java new file mode 100644 index 00000000000..b45325fffaf --- /dev/null +++ b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java @@ -0,0 +1,332 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.dataframe.integration; + +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.PathUtils; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.DeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.SecuritySettingsSourceField; +import org.elasticsearch.transport.Netty4Plugin; +import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.XPackClientPlugin; +import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction; +import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction; +import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction; +import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction; +import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.DestConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.QueryConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.SourceConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.pivot.AggregationConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.pivot.DateHistogramGroupSource; +import org.elasticsearch.xpack.core.dataframe.transforms.pivot.GroupConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.pivot.SingleGroupSource; +import org.elasticsearch.xpack.core.security.SecurityField; + +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.time.ZoneId; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.core.Is.is; + +abstract class DataFrameIntegTestCase extends ESIntegTestCase { + + protected static final String REVIEWS_INDEX_NAME = "data_frame_reviews"; + + private Map transformConfigs = new HashMap<>(); + + protected void cleanUp() { + cleanUpTransforms(); + waitForPendingTasks(); + } + + protected void cleanUpTransforms() { + for (DataFrameTransformConfig config : transformConfigs.values()) { + stopDataFrameTransform(config.getId()); + deleteDataFrameTransform(config.getId()); + } + transformConfigs.clear(); + } + + protected StopDataFrameTransformAction.Response stopDataFrameTransform(String id) { + return client().execute(StopDataFrameTransformAction.INSTANCE, + new StopDataFrameTransformAction.Request(id, true, false, null)).actionGet(); + } + + protected StartDataFrameTransformAction.Response startDataFrameTransform(String id) { + return client().execute(StartDataFrameTransformAction.INSTANCE, + new StartDataFrameTransformAction.Request(id, false)).actionGet(); + } + + protected DeleteDataFrameTransformAction.Response deleteDataFrameTransform(String id) { + DeleteDataFrameTransformAction.Response response = client().execute(DeleteDataFrameTransformAction.INSTANCE, + new DeleteDataFrameTransformAction.Request(id)) + .actionGet(); + if (response.isDeleted()) { + transformConfigs.remove(id); + } + return response; + } + + protected AcknowledgedResponse putDataFrameTransform(DataFrameTransformConfig config) { + if (transformConfigs.keySet().contains(config.getId())) { + throw new IllegalArgumentException("data frame transform [" + config.getId() + "] is already registered"); + } + AcknowledgedResponse response = client().execute(PutDataFrameTransformAction.INSTANCE, + new PutDataFrameTransformAction.Request(config)) + .actionGet(); + if (response.isAcknowledged()) { + transformConfigs.put(config.getId(), config); + } + return response; + } + + protected GetDataFrameTransformsStatsAction.Response getDataFrameTransformStats(String id) { + return client().execute(GetDataFrameTransformsStatsAction.INSTANCE, new GetDataFrameTransformsStatsAction.Request(id)).actionGet(); + } + + protected void waitUntilCheckpoint(String id, long checkpoint) throws Exception { + waitUntilCheckpoint(id, checkpoint, TimeValue.timeValueSeconds(30)); + } + + protected void waitUntilCheckpoint(String id, long checkpoint, TimeValue waitTime) throws Exception { + assertBusy(() -> + assertEquals(checkpoint, getDataFrameTransformStats(id) + .getTransformsStateAndStats() + .get(0) + .getTransformState() + .getCheckpoint()), + waitTime.getMillis(), + TimeUnit.MILLISECONDS); + } + + protected DateHistogramGroupSource createDateHistogramGroupSource(String field, long interval, ZoneId zone, String format) { + DateHistogramGroupSource source = new DateHistogramGroupSource(field); + source.setFormat(format); + source.setInterval(interval); + source.setTimeZone(zone); + return source; + } + + protected DateHistogramGroupSource createDateHistogramGroupSource(String field, + DateHistogramInterval interval, + ZoneId zone, + String format) { + DateHistogramGroupSource source = new DateHistogramGroupSource(field); + source.setFormat(format); + source.setDateHistogramInterval(interval); + source.setTimeZone(zone); + return source; + } + + protected GroupConfig createGroupConfig(Map groups) throws Exception { + Map lazyParsed = new HashMap<>(groups.size()); + for(Map.Entry sgs : groups.entrySet()) { + lazyParsed.put(sgs.getKey(), Collections.singletonMap(sgs.getValue().getType().value(), toLazy(sgs.getValue()))); + } + return new GroupConfig(lazyParsed, groups); + } + + protected QueryConfig createQueryConfig(QueryBuilder queryBuilder) throws Exception { + return new QueryConfig(toLazy(queryBuilder), queryBuilder); + } + + protected AggregationConfig createAggConfig(AggregatorFactories.Builder aggregations) throws Exception { + return new AggregationConfig(toLazy(aggregations), aggregations); + } + + protected PivotConfig createPivotConfig(Map groups, + AggregatorFactories.Builder aggregations) throws Exception { + return new PivotConfig(createGroupConfig(groups), createAggConfig(aggregations)); + } + + protected DataFrameTransformConfig createTransformConfig(String id, + Map groups, + AggregatorFactories.Builder aggregations, + String destinationIndex, + String... sourceIndices) throws Exception { + return createTransformConfig(id, groups, aggregations, destinationIndex, QueryBuilders.matchAllQuery(), sourceIndices); + } + + protected DataFrameTransformConfig createTransformConfig(String id, + Map groups, + AggregatorFactories.Builder aggregations, + String destinationIndex, + QueryBuilder queryBuilder, + String... sourceIndices) throws Exception { + return new DataFrameTransformConfig(id, + new SourceConfig(sourceIndices, createQueryConfig(queryBuilder)), + new DestConfig(destinationIndex), + Collections.emptyMap(), + createPivotConfig(groups, aggregations)); + } + + protected void createReviewsIndex() throws Exception { + final int numDocs = 1000; + + // create mapping + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + { + builder.startObject("properties") + .startObject("timestamp") + .field("type", "date") + .endObject() + .startObject("user_id") + .field("type", "keyword") + .endObject() + .startObject("count") + .field("type", "integer") + .endObject() + .startObject("business_id") + .field("type", "keyword") + .endObject() + .startObject("stars") + .field("type", "integer") + .endObject() + .endObject(); + } + builder.endObject(); + CreateIndexResponse response = client().admin() + .indices() + .prepareCreate(REVIEWS_INDEX_NAME) + .addMapping("_doc", builder) + .get(); + assertThat(response.isAcknowledged(), is(true)); + } + + // create index + BulkRequestBuilder bulk = client().prepareBulk(REVIEWS_INDEX_NAME, "_doc"); + int day = 10; + for (int i = 0; i < numDocs; i++) { + long user = i % 28; + int stars = (i + 20) % 5; + long business = (i + 100) % 50; + int hour = 10 + (i % 13); + int min = 10 + (i % 49); + int sec = 10 + (i % 49); + + String date_string = "2017-01-" + day + "T" + hour + ":" + min + ":" + sec + "Z"; + + StringBuilder sourceBuilder = new StringBuilder(); + sourceBuilder.append("{\"user_id\":\"") + .append("user_") + .append(user) + .append("\",\"count\":") + .append(i) + .append(",\"business_id\":\"") + .append("business_") + .append(business) + .append("\",\"stars\":") + .append(stars) + .append(",\"timestamp\":\"") + .append(date_string) + .append("\"}"); + bulk.add(new IndexRequest().source(sourceBuilder.toString(), XContentType.JSON)); + + if (i % 50 == 0) { + BulkResponse response = client().bulk(bulk.request()).get(); + assertThat(response.buildFailureMessage(), response.hasFailures(), is(false)); + bulk = client().prepareBulk(REVIEWS_INDEX_NAME, "_doc"); + day += 1; + } + } + BulkResponse response = client().bulk(bulk.request()).get(); + assertThat(response.buildFailureMessage(), response.hasFailures(), is(false)); + client().admin().indices().prepareRefresh(REVIEWS_INDEX_NAME).get(); + } + + protected Map toLazy(ToXContent parsedObject) throws Exception { + BytesReference bytes = XContentHelper.toXContent(parsedObject, XContentType.JSON, false); + try(XContentParser parser = XContentHelper.createParser(xContentRegistry(), + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + bytes, + XContentType.JSON)) { + return parser.mapOrdered(); + } + } + + private void waitForPendingTasks() { + ListTasksRequest listTasksRequest = new ListTasksRequest(); + listTasksRequest.setWaitForCompletion(true); + listTasksRequest.setDetailed(true); + listTasksRequest.setTimeout(TimeValue.timeValueSeconds(10)); + try { + admin().cluster().listTasks(listTasksRequest).get(); + } catch (Exception e) { + throw new AssertionError("Failed to wait for pending tasks to complete", e); + } + } + + @Override + protected NamedXContentRegistry xContentRegistry() { + SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList()); + return new NamedXContentRegistry(searchModule.getNamedXContents()); + } + + @Override + protected Settings externalClusterClientSettings() { + Path key; + Path certificate; + try { + key = PathUtils.get(getClass().getResource("/testnode.pem").toURI()); + certificate = PathUtils.get(getClass().getResource("/testnode.crt").toURI()); + } catch (URISyntaxException e) { + throw new IllegalStateException("error trying to get keystore path", e); + } + Settings.Builder builder = Settings.builder(); + builder.put(NetworkModule.TRANSPORT_TYPE_KEY, SecurityField.NAME4); + builder.put(SecurityField.USER_SETTING.getKey(), "x_pack_rest_user:" + SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING); + builder.put("xpack.security.transport.ssl.enabled", true); + builder.put("xpack.security.transport.ssl.key", key.toAbsolutePath().toString()); + builder.put("xpack.security.transport.ssl.certificate", certificate.toAbsolutePath().toString()); + builder.put("xpack.security.transport.ssl.key_passphrase", "testnode"); + builder.put("xpack.security.transport.ssl.verification_mode", "certificate"); + return builder.build(); + } + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(LocalStateCompositeXPackPlugin.class, Netty4Plugin.class); + } + + @Override + protected Collection> transportClientPlugins() { + return Arrays.asList(XPackClientPlugin.class, Netty4Plugin.class); + } +} diff --git a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java new file mode 100644 index 00000000000..ecb2025c6a9 --- /dev/null +++ b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.dataframe.integration; + +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats; +import org.elasticsearch.xpack.core.dataframe.transforms.pivot.SingleGroupSource; +import org.elasticsearch.xpack.core.dataframe.transforms.pivot.TermsGroupSource; +import org.elasticsearch.xpack.core.indexing.IndexerState; +import org.junit.After; + +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class DataFrameTransformIT extends DataFrameIntegTestCase { + + @After + public void cleanTransforms() { + cleanUp(); + } + + public void testDataFrameTransformCrud() throws Exception { + createReviewsIndex(); + + Map groups = new HashMap<>(); + groups.put("by-day", createDateHistogramGroupSource("timestamp", DateHistogramInterval.DAY, null, null)); + groups.put("by-user", new TermsGroupSource("user_id")); + groups.put("by-business", new TermsGroupSource("business_id")); + + AggregatorFactories.Builder aggs = AggregatorFactories.builder() + .addAggregator(AggregationBuilders.avg("review_score").field("stars")) + .addAggregator(AggregationBuilders.max("timestamp").field("timestamp")); + + DataFrameTransformConfig config = createTransformConfig("data-frame-transform-crud", + groups, + aggs, + "reviews-by-user-business-day", + REVIEWS_INDEX_NAME); + + assertTrue(putDataFrameTransform(config).isAcknowledged()); + assertTrue(startDataFrameTransform(config.getId()).isStarted()); + + waitUntilCheckpoint(config.getId(), 1L); + + DataFrameTransformStateAndStats stats = getDataFrameTransformStats(config.getId()).getTransformsStateAndStats().get(0); + + assertThat(stats.getTransformState().getIndexerState(), equalTo(IndexerState.STARTED)); + } + + +} diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java index b350300392c..93658c2f4f4 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java @@ -12,16 +12,14 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; @@ -30,7 +28,6 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -81,7 +78,7 @@ public class TransportGetDataFrameTransformsStatsAction extends DataFrameTransformsConfigManager dataFrameTransformsConfigManager, DataFrameTransformsCheckpointService transformsCheckpointService) { super(GetDataFrameTransformsStatsAction.NAME, clusterService, transportService, actionFilters, Request::new, Response::new, - Response::new, ThreadPool.Names.SAME); + Response::new, ThreadPool.Names.SAME); this.client = client; this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager; this.transformsCheckpointService = transformsCheckpointService; @@ -94,7 +91,9 @@ public class TransportGetDataFrameTransformsStatsAction extends .flatMap(r -> r.getTransformsStateAndStats().stream()) .sorted(Comparator.comparing(DataFrameTransformStateAndStats::getId)) .collect(Collectors.toList()); - return new Response(responses, taskOperationFailures, failedNodeExceptions); + List allFailedNodeExceptions = new ArrayList<>(failedNodeExceptions); + allFailedNodeExceptions.addAll(tasks.stream().flatMap(r -> r.getNodeFailures().stream()).collect(Collectors.toList())); + return new Response(responses, taskOperationFailures, allFailedNodeExceptions); } @Override @@ -110,7 +109,7 @@ public class TransportGetDataFrameTransformsStatsAction extends Collections.singletonList(new DataFrameTransformStateAndStats(task.getTransformId(), task.getState(), task.getStats(), DataFrameTransformCheckpointingInfo.EMPTY)), Collections.emptyList(), - Collections.singletonList(new ElasticsearchException("Failed to retrieve checkpointing info", e)))); + Collections.singletonList(new FailedNodeException("", "Failed to retrieve checkpointing info", e)))); })); } else { listener.onResponse(new Response(Collections.emptyList())); @@ -119,37 +118,24 @@ public class TransportGetDataFrameTransformsStatsAction extends @Override protected void doExecute(Task task, Request request, ActionListener finalListener) { - final ClusterState state = clusterService.state(); - final DiscoveryNodes nodes = state.nodes(); - if (nodes.isLocalNodeElectedMaster()) { - dataFrameTransformsConfigManager.expandTransformIds(request.getId(), request.getPageParams(), ActionListener.wrap( - ids -> { - request.setExpandedIds(ids); - super.doExecute(task, request, ActionListener.wrap( - response -> collectStatsForTransformsWithoutTasks(request, response, finalListener), - finalListener::onFailure - )); - }, - e -> { - // If the index to search, or the individual config is not there, just return empty - if (e instanceof ResourceNotFoundException) { - finalListener.onResponse(new Response(Collections.emptyList())); - } else { - finalListener.onFailure(e); - } + dataFrameTransformsConfigManager.expandTransformIds(request.getId(), request.getPageParams(), ActionListener.wrap( + ids -> { + request.setExpandedIds(ids); + super.doExecute(task, request, ActionListener.wrap( + response -> collectStatsForTransformsWithoutTasks(request, response, finalListener), + finalListener::onFailure + )); + }, + e -> { + // If the index to search, or the individual config is not there, just return empty + logger.error("failed to expand ids", e); + if (e instanceof ResourceNotFoundException) { + finalListener.onResponse(new Response(Collections.emptyList())); + } else { + finalListener.onFailure(e); } - )); - } else { - // Delegates GetTransforms to elected master node, so it becomes the coordinating node. - // Non-master nodes may have a stale cluster state that shows transforms which are cancelled - // on the master, which makes testing difficult. - if (nodes.getMasterNode() == null) { - finalListener.onFailure(new MasterNotDiscoveredException("no known master nodes")); - } else { - transportService.sendRequest(nodes.getMasterNode(), actionName, request, - new ActionListenerResponseHandler<>(finalListener, Response::new)); } - } + )); } private void collectStatsForTransformsWithoutTasks(Request request, @@ -172,10 +158,15 @@ public class TransportGetDataFrameTransformsStatsAction extends searchResponse -> { List nodeFailures = new ArrayList<>(response.getNodeFailures()); if (searchResponse.getShardFailures().length > 0) { - String msg = "transform statistics document search returned shard failures: " + - Arrays.toString(searchResponse.getShardFailures()); - logger.error(msg); - nodeFailures.add(new ElasticsearchException(msg)); + for(ShardSearchFailure shardSearchFailure : searchResponse.getShardFailures()) { + String nodeId = ""; + if (shardSearchFailure.shard() != null) { + nodeId = shardSearchFailure.shard().getNodeId(); + } + nodeFailures.add(new FailedNodeException(nodeId, shardSearchFailure.toString(), shardSearchFailure.getCause())); + } + logger.error("transform statistics document search returned shard failures: {}", + Arrays.toString(searchResponse.getShardFailures())); } List allStateAndStats = response.getTransformsStateAndStats(); for(SearchHit hit : searchResponse.getHits().getHits()) { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java index 1e65639a89f..98e70fa2578 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; @@ -44,6 +45,7 @@ import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigMa import org.elasticsearch.xpack.dataframe.persistence.DataframeIndex; import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; +import java.io.IOException; import java.util.Collection; import java.util.Map; import java.util.function.Consumer; @@ -82,7 +84,12 @@ public class TransportStartDataFrameTransformAction extends @Override protected StartDataFrameTransformAction.Response newResponse() { - return new StartDataFrameTransformAction.Response(false); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + protected StartDataFrameTransformAction.Response read(StreamInput in) throws IOException { + return new StartDataFrameTransformAction.Response(in); } @Override