[ML] add multi node integ tests for data frames (#41508) (#41552)

* [ML] adding native-multi-node-integTests for data frames'

* addressing streaming issues

* formatting fixes

* Addressing PR comments
This commit is contained in:
Benjamin Trent 2019-04-26 07:18:49 -05:00 committed by GitHub
parent 1f00cec36f
commit 4836ff7bcd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 531 additions and 53 deletions

View File

@ -44,6 +44,14 @@ import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.CCRFeatureSet; import org.elasticsearch.xpack.core.ccr.CCRFeatureSet;
import org.elasticsearch.xpack.core.dataframe.DataFrameFeatureSetUsage; import org.elasticsearch.xpack.core.dataframe.DataFrameFeatureSetUsage;
import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.deprecation.DeprecationInfoAction; import org.elasticsearch.xpack.core.deprecation.DeprecationInfoAction;
@ -363,7 +371,16 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
RemoveIndexLifecyclePolicyAction.INSTANCE, RemoveIndexLifecyclePolicyAction.INSTANCE,
MoveToStepAction.INSTANCE, MoveToStepAction.INSTANCE,
RetryAction.INSTANCE, RetryAction.INSTANCE,
TransportFreezeIndexAction.FreezeIndexAction.INSTANCE TransportFreezeIndexAction.FreezeIndexAction.INSTANCE,
// Data Frame
PutDataFrameTransformAction.INSTANCE,
StartDataFrameTransformAction.INSTANCE,
StartDataFrameTransformTaskAction.INSTANCE,
StopDataFrameTransformAction.INSTANCE,
DeleteDataFrameTransformAction.INSTANCE,
GetDataFrameTransformsAction.INSTANCE,
GetDataFrameTransformsStatsAction.INSTANCE,
PreviewDataFrameTransformAction.INSTANCE
); );
} }

View File

@ -56,7 +56,7 @@ public class GetDataFrameTransformsStatsAction extends Action<GetDataFrameTransf
public static final int MAX_SIZE_RETURN = 1000; public static final int MAX_SIZE_RETURN = 1000;
// used internally to expand the queried id expression // used internally to expand the queried id expression
private List<String> expandedIds = Collections.emptyList(); private List<String> expandedIds;
public Request(String id) { public Request(String id) {
if (Strings.isNullOrEmpty(id) || id.equals("*")) { if (Strings.isNullOrEmpty(id) || id.equals("*")) {
@ -64,13 +64,14 @@ public class GetDataFrameTransformsStatsAction extends Action<GetDataFrameTransf
} else { } else {
this.id = id; this.id = id;
} }
this.expandedIds = Collections.singletonList(id);
} }
public Request(StreamInput in) throws IOException { public Request(StreamInput in) throws IOException {
super(in); super(in);
id = in.readString(); id = in.readString();
expandedIds = in.readList(StreamInput::readString); expandedIds = Collections.unmodifiableList(in.readStringList());
pageParams = in.readOptionalWriteable(PageParams::new); pageParams = new PageParams(in);
} }
@Override @Override
@ -93,7 +94,7 @@ public class GetDataFrameTransformsStatsAction extends Action<GetDataFrameTransf
} }
public final void setPageParams(PageParams pageParams) { public final void setPageParams(PageParams pageParams) {
this.pageParams = pageParams; this.pageParams = Objects.requireNonNull(pageParams);
} }
public final PageParams getPageParams() { public final PageParams getPageParams() {
@ -105,7 +106,7 @@ public class GetDataFrameTransformsStatsAction extends Action<GetDataFrameTransf
super.writeTo(out); super.writeTo(out);
out.writeString(id); out.writeString(id);
out.writeStringCollection(expandedIds); out.writeStringCollection(expandedIds);
out.writeOptionalWriteable(pageParams); pageParams.writeTo(out);
} }
@Override @Override
@ -136,7 +137,7 @@ public class GetDataFrameTransformsStatsAction extends Action<GetDataFrameTransf
} }
} }
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject { public static class Response extends BaseTasksResponse implements ToXContentObject {
private List<DataFrameTransformStateAndStats> transformsStateAndStats; private List<DataFrameTransformStateAndStats> transformsStateAndStats;
public Response(List<DataFrameTransformStateAndStats> transformsStateAndStats) { public Response(List<DataFrameTransformStateAndStats> transformsStateAndStats) {
@ -165,6 +166,11 @@ public class GetDataFrameTransformsStatsAction extends Action<GetDataFrameTransf
out.writeList(transformsStateAndStats); out.writeList(transformsStateAndStats);
} }
@Override
public void readFrom(StreamInput in) {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();

View File

@ -95,7 +95,7 @@ public class StartDataFrameTransformAction extends Action<StartDataFrameTransfor
} }
} }
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject { public static class Response extends BaseTasksResponse implements ToXContentObject {
private final boolean started; private final boolean started;
public Response(StreamInput in) throws IOException { public Response(StreamInput in) throws IOException {

View File

@ -93,7 +93,7 @@ public class StartDataFrameTransformTaskAction extends Action<StartDataFrameTran
} }
} }
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject { public static class Response extends BaseTasksResponse implements ToXContentObject {
private final boolean started; private final boolean started;
public Response(StreamInput in) throws IOException { public Response(StreamInput in) throws IOException {

View File

@ -6,6 +6,9 @@
package org.elasticsearch.xpack.core.dataframe.action; package org.elasticsearch.xpack.core.dataframe.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Response; import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Response;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
@ -18,11 +21,18 @@ public class GetDataFrameTransformsStatsActionResponseTests extends AbstractWire
@Override @Override
protected Response createTestInstance() { protected Response createTestInstance() {
List<DataFrameTransformStateAndStats> stats = new ArrayList<>(); List<DataFrameTransformStateAndStats> stats = new ArrayList<>();
for (int i = 0; i < randomInt(10); ++i) { int totalStats = randomInt(10);
for (int i = 0; i < totalStats; ++i) {
stats.add(DataFrameTransformStateAndStatsTests.randomDataFrameTransformStateAndStats()); stats.add(DataFrameTransformStateAndStatsTests.randomDataFrameTransformStateAndStats());
} }
int totalErrors = randomInt(10);
return new Response(stats); List<TaskOperationFailure> taskFailures = new ArrayList<>(totalErrors);
List<ElasticsearchException> nodeFailures = new ArrayList<>(totalErrors);
for (int i = 0; i < totalErrors; i++) {
taskFailures.add(new TaskOperationFailure("node1", randomLongBetween(1, 10), new Exception("error")));
nodeFailures.add(new FailedNodeException("node1", "message", new Exception("error")));
}
return new Response(stats, taskFailures, nodeFailures);
} }
@Override @Override

View File

@ -0,0 +1,55 @@
apply plugin: 'elasticsearch.standalone-rest-test'
apply plugin: 'elasticsearch.rest-test'
dependencies {
testCompile project(path: xpackModule('core'), configuration: 'default')
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
testCompile project(path: xpackModule('data-frame'), configuration: 'runtime')
}
// location for keys and certificates
File keystoreDir = new File(project.buildDir, 'keystore')
File nodeKey = file("$keystoreDir/testnode.pem")
File nodeCert = file("$keystoreDir/testnode.crt")
// Add key and certs to test classpath: it expects it there
task copyKeyCerts(type: Copy) {
from(project(':x-pack:plugin:core').file('src/test/resources/org/elasticsearch/xpack/security/transport/ssl/certs/simple/')) {
include 'testnode.crt', 'testnode.pem'
}
into keystoreDir
}
// Add keys and cets to test classpath: it expects it there
sourceSets.test.resources.srcDir(keystoreDir)
processTestResources.dependsOn(copyKeyCerts)
integTestCluster {
dependsOn copyKeyCerts
setting 'xpack.security.enabled', 'true'
setting 'xpack.license.self_generated.type', 'trial'
setting 'xpack.monitoring.enabled', 'false'
setting 'xpack.security.authc.token.enabled', 'true'
setting 'xpack.security.transport.ssl.enabled', 'true'
setting 'xpack.security.transport.ssl.key', nodeKey.name
setting 'xpack.security.transport.ssl.certificate', nodeCert.name
setting 'xpack.security.transport.ssl.verification_mode', 'certificate'
setting 'xpack.security.audit.enabled', 'false'
setting 'xpack.license.self_generated.type', 'trial'
keystoreSetting 'bootstrap.password', 'x-pack-test-password'
keystoreSetting 'xpack.security.transport.ssl.secure_key_passphrase', 'testnode'
setupCommand 'setupDummyUser',
'bin/elasticsearch-users', 'useradd', 'x_pack_rest_user', '-p', 'x-pack-test-password', '-r', 'superuser'
numNodes = 3
extraConfigFile nodeKey.name, nodeKey
extraConfigFile nodeCert.name, nodeCert
waitCondition = { node, ant ->
File tmpFile = new File(node.cwd, 'wait.success')
ant.get(src: "http://${node.httpUri()}/_cluster/health?wait_for_nodes=>=${numNodes}&wait_for_status=yellow",
dest: tmpFile.toString(),
username: 'x_pack_rest_user',
password: 'x-pack-test-password',
ignoreerrors: true,
retries: 10)
return tmpFile.exists()
}
}

View File

@ -0,0 +1,332 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.integration;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.SecuritySettingsSourceField;
import org.elasticsearch.transport.Netty4Plugin;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.XPackClientPlugin;
import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DestConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.QueryConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.SourceConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.AggregationConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.DateHistogramGroupSource;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.GroupConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.SingleGroupSource;
import org.elasticsearch.xpack.core.security.SecurityField;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.core.Is.is;
abstract class DataFrameIntegTestCase extends ESIntegTestCase {
protected static final String REVIEWS_INDEX_NAME = "data_frame_reviews";
private Map<String, DataFrameTransformConfig> transformConfigs = new HashMap<>();
protected void cleanUp() {
cleanUpTransforms();
waitForPendingTasks();
}
protected void cleanUpTransforms() {
for (DataFrameTransformConfig config : transformConfigs.values()) {
stopDataFrameTransform(config.getId());
deleteDataFrameTransform(config.getId());
}
transformConfigs.clear();
}
protected StopDataFrameTransformAction.Response stopDataFrameTransform(String id) {
return client().execute(StopDataFrameTransformAction.INSTANCE,
new StopDataFrameTransformAction.Request(id, true, false, null)).actionGet();
}
protected StartDataFrameTransformAction.Response startDataFrameTransform(String id) {
return client().execute(StartDataFrameTransformAction.INSTANCE,
new StartDataFrameTransformAction.Request(id, false)).actionGet();
}
protected DeleteDataFrameTransformAction.Response deleteDataFrameTransform(String id) {
DeleteDataFrameTransformAction.Response response = client().execute(DeleteDataFrameTransformAction.INSTANCE,
new DeleteDataFrameTransformAction.Request(id))
.actionGet();
if (response.isDeleted()) {
transformConfigs.remove(id);
}
return response;
}
protected AcknowledgedResponse putDataFrameTransform(DataFrameTransformConfig config) {
if (transformConfigs.keySet().contains(config.getId())) {
throw new IllegalArgumentException("data frame transform [" + config.getId() + "] is already registered");
}
AcknowledgedResponse response = client().execute(PutDataFrameTransformAction.INSTANCE,
new PutDataFrameTransformAction.Request(config))
.actionGet();
if (response.isAcknowledged()) {
transformConfigs.put(config.getId(), config);
}
return response;
}
protected GetDataFrameTransformsStatsAction.Response getDataFrameTransformStats(String id) {
return client().execute(GetDataFrameTransformsStatsAction.INSTANCE, new GetDataFrameTransformsStatsAction.Request(id)).actionGet();
}
protected void waitUntilCheckpoint(String id, long checkpoint) throws Exception {
waitUntilCheckpoint(id, checkpoint, TimeValue.timeValueSeconds(30));
}
protected void waitUntilCheckpoint(String id, long checkpoint, TimeValue waitTime) throws Exception {
assertBusy(() ->
assertEquals(checkpoint, getDataFrameTransformStats(id)
.getTransformsStateAndStats()
.get(0)
.getTransformState()
.getCheckpoint()),
waitTime.getMillis(),
TimeUnit.MILLISECONDS);
}
protected DateHistogramGroupSource createDateHistogramGroupSource(String field, long interval, ZoneId zone, String format) {
DateHistogramGroupSource source = new DateHistogramGroupSource(field);
source.setFormat(format);
source.setInterval(interval);
source.setTimeZone(zone);
return source;
}
protected DateHistogramGroupSource createDateHistogramGroupSource(String field,
DateHistogramInterval interval,
ZoneId zone,
String format) {
DateHistogramGroupSource source = new DateHistogramGroupSource(field);
source.setFormat(format);
source.setDateHistogramInterval(interval);
source.setTimeZone(zone);
return source;
}
protected GroupConfig createGroupConfig(Map<String, SingleGroupSource> groups) throws Exception {
Map<String, Object> lazyParsed = new HashMap<>(groups.size());
for(Map.Entry<String, SingleGroupSource> sgs : groups.entrySet()) {
lazyParsed.put(sgs.getKey(), Collections.singletonMap(sgs.getValue().getType().value(), toLazy(sgs.getValue())));
}
return new GroupConfig(lazyParsed, groups);
}
protected QueryConfig createQueryConfig(QueryBuilder queryBuilder) throws Exception {
return new QueryConfig(toLazy(queryBuilder), queryBuilder);
}
protected AggregationConfig createAggConfig(AggregatorFactories.Builder aggregations) throws Exception {
return new AggregationConfig(toLazy(aggregations), aggregations);
}
protected PivotConfig createPivotConfig(Map<String, SingleGroupSource> groups,
AggregatorFactories.Builder aggregations) throws Exception {
return new PivotConfig(createGroupConfig(groups), createAggConfig(aggregations));
}
protected DataFrameTransformConfig createTransformConfig(String id,
Map<String, SingleGroupSource> groups,
AggregatorFactories.Builder aggregations,
String destinationIndex,
String... sourceIndices) throws Exception {
return createTransformConfig(id, groups, aggregations, destinationIndex, QueryBuilders.matchAllQuery(), sourceIndices);
}
protected DataFrameTransformConfig createTransformConfig(String id,
Map<String, SingleGroupSource> groups,
AggregatorFactories.Builder aggregations,
String destinationIndex,
QueryBuilder queryBuilder,
String... sourceIndices) throws Exception {
return new DataFrameTransformConfig(id,
new SourceConfig(sourceIndices, createQueryConfig(queryBuilder)),
new DestConfig(destinationIndex),
Collections.emptyMap(),
createPivotConfig(groups, aggregations));
}
protected void createReviewsIndex() throws Exception {
final int numDocs = 1000;
// create mapping
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
{
builder.startObject("properties")
.startObject("timestamp")
.field("type", "date")
.endObject()
.startObject("user_id")
.field("type", "keyword")
.endObject()
.startObject("count")
.field("type", "integer")
.endObject()
.startObject("business_id")
.field("type", "keyword")
.endObject()
.startObject("stars")
.field("type", "integer")
.endObject()
.endObject();
}
builder.endObject();
CreateIndexResponse response = client().admin()
.indices()
.prepareCreate(REVIEWS_INDEX_NAME)
.addMapping("_doc", builder)
.get();
assertThat(response.isAcknowledged(), is(true));
}
// create index
BulkRequestBuilder bulk = client().prepareBulk(REVIEWS_INDEX_NAME, "_doc");
int day = 10;
for (int i = 0; i < numDocs; i++) {
long user = i % 28;
int stars = (i + 20) % 5;
long business = (i + 100) % 50;
int hour = 10 + (i % 13);
int min = 10 + (i % 49);
int sec = 10 + (i % 49);
String date_string = "2017-01-" + day + "T" + hour + ":" + min + ":" + sec + "Z";
StringBuilder sourceBuilder = new StringBuilder();
sourceBuilder.append("{\"user_id\":\"")
.append("user_")
.append(user)
.append("\",\"count\":")
.append(i)
.append(",\"business_id\":\"")
.append("business_")
.append(business)
.append("\",\"stars\":")
.append(stars)
.append(",\"timestamp\":\"")
.append(date_string)
.append("\"}");
bulk.add(new IndexRequest().source(sourceBuilder.toString(), XContentType.JSON));
if (i % 50 == 0) {
BulkResponse response = client().bulk(bulk.request()).get();
assertThat(response.buildFailureMessage(), response.hasFailures(), is(false));
bulk = client().prepareBulk(REVIEWS_INDEX_NAME, "_doc");
day += 1;
}
}
BulkResponse response = client().bulk(bulk.request()).get();
assertThat(response.buildFailureMessage(), response.hasFailures(), is(false));
client().admin().indices().prepareRefresh(REVIEWS_INDEX_NAME).get();
}
protected Map<String, Object> toLazy(ToXContent parsedObject) throws Exception {
BytesReference bytes = XContentHelper.toXContent(parsedObject, XContentType.JSON, false);
try(XContentParser parser = XContentHelper.createParser(xContentRegistry(),
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
bytes,
XContentType.JSON)) {
return parser.mapOrdered();
}
}
private void waitForPendingTasks() {
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setWaitForCompletion(true);
listTasksRequest.setDetailed(true);
listTasksRequest.setTimeout(TimeValue.timeValueSeconds(10));
try {
admin().cluster().listTasks(listTasksRequest).get();
} catch (Exception e) {
throw new AssertionError("Failed to wait for pending tasks to complete", e);
}
}
@Override
protected NamedXContentRegistry xContentRegistry() {
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
return new NamedXContentRegistry(searchModule.getNamedXContents());
}
@Override
protected Settings externalClusterClientSettings() {
Path key;
Path certificate;
try {
key = PathUtils.get(getClass().getResource("/testnode.pem").toURI());
certificate = PathUtils.get(getClass().getResource("/testnode.crt").toURI());
} catch (URISyntaxException e) {
throw new IllegalStateException("error trying to get keystore path", e);
}
Settings.Builder builder = Settings.builder();
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, SecurityField.NAME4);
builder.put(SecurityField.USER_SETTING.getKey(), "x_pack_rest_user:" + SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING);
builder.put("xpack.security.transport.ssl.enabled", true);
builder.put("xpack.security.transport.ssl.key", key.toAbsolutePath().toString());
builder.put("xpack.security.transport.ssl.certificate", certificate.toAbsolutePath().toString());
builder.put("xpack.security.transport.ssl.key_passphrase", "testnode");
builder.put("xpack.security.transport.ssl.verification_mode", "certificate");
return builder.build();
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(LocalStateCompositeXPackPlugin.class, Netty4Plugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Arrays.asList(XPackClientPlugin.class, Netty4Plugin.class);
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.integration;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.SingleGroupSource;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.TermsGroupSource;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.junit.After;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
public class DataFrameTransformIT extends DataFrameIntegTestCase {
@After
public void cleanTransforms() {
cleanUp();
}
public void testDataFrameTransformCrud() throws Exception {
createReviewsIndex();
Map<String, SingleGroupSource> groups = new HashMap<>();
groups.put("by-day", createDateHistogramGroupSource("timestamp", DateHistogramInterval.DAY, null, null));
groups.put("by-user", new TermsGroupSource("user_id"));
groups.put("by-business", new TermsGroupSource("business_id"));
AggregatorFactories.Builder aggs = AggregatorFactories.builder()
.addAggregator(AggregationBuilders.avg("review_score").field("stars"))
.addAggregator(AggregationBuilders.max("timestamp").field("timestamp"));
DataFrameTransformConfig config = createTransformConfig("data-frame-transform-crud",
groups,
aggs,
"reviews-by-user-business-day",
REVIEWS_INDEX_NAME);
assertTrue(putDataFrameTransform(config).isAcknowledged());
assertTrue(startDataFrameTransform(config.getId()).isStarted());
waitUntilCheckpoint(config.getId(), 1L);
DataFrameTransformStateAndStats stats = getDataFrameTransformStats(config.getId()).getTransformsStateAndStats().get(0);
assertThat(stats.getTransformState().getIndexerState(), equalTo(IndexerState.STARTED));
}
}

View File

@ -12,16 +12,14 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
@ -30,7 +28,6 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
@ -81,7 +78,7 @@ public class TransportGetDataFrameTransformsStatsAction extends
DataFrameTransformsConfigManager dataFrameTransformsConfigManager, DataFrameTransformsConfigManager dataFrameTransformsConfigManager,
DataFrameTransformsCheckpointService transformsCheckpointService) { DataFrameTransformsCheckpointService transformsCheckpointService) {
super(GetDataFrameTransformsStatsAction.NAME, clusterService, transportService, actionFilters, Request::new, Response::new, super(GetDataFrameTransformsStatsAction.NAME, clusterService, transportService, actionFilters, Request::new, Response::new,
Response::new, ThreadPool.Names.SAME); Response::new, ThreadPool.Names.SAME);
this.client = client; this.client = client;
this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager; this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
this.transformsCheckpointService = transformsCheckpointService; this.transformsCheckpointService = transformsCheckpointService;
@ -94,7 +91,9 @@ public class TransportGetDataFrameTransformsStatsAction extends
.flatMap(r -> r.getTransformsStateAndStats().stream()) .flatMap(r -> r.getTransformsStateAndStats().stream())
.sorted(Comparator.comparing(DataFrameTransformStateAndStats::getId)) .sorted(Comparator.comparing(DataFrameTransformStateAndStats::getId))
.collect(Collectors.toList()); .collect(Collectors.toList());
return new Response(responses, taskOperationFailures, failedNodeExceptions); List<ElasticsearchException> allFailedNodeExceptions = new ArrayList<>(failedNodeExceptions);
allFailedNodeExceptions.addAll(tasks.stream().flatMap(r -> r.getNodeFailures().stream()).collect(Collectors.toList()));
return new Response(responses, taskOperationFailures, allFailedNodeExceptions);
} }
@Override @Override
@ -110,7 +109,7 @@ public class TransportGetDataFrameTransformsStatsAction extends
Collections.singletonList(new DataFrameTransformStateAndStats(task.getTransformId(), task.getState(), Collections.singletonList(new DataFrameTransformStateAndStats(task.getTransformId(), task.getState(),
task.getStats(), DataFrameTransformCheckpointingInfo.EMPTY)), task.getStats(), DataFrameTransformCheckpointingInfo.EMPTY)),
Collections.emptyList(), Collections.emptyList(),
Collections.singletonList(new ElasticsearchException("Failed to retrieve checkpointing info", e)))); Collections.singletonList(new FailedNodeException("", "Failed to retrieve checkpointing info", e))));
})); }));
} else { } else {
listener.onResponse(new Response(Collections.emptyList())); listener.onResponse(new Response(Collections.emptyList()));
@ -119,37 +118,24 @@ public class TransportGetDataFrameTransformsStatsAction extends
@Override @Override
protected void doExecute(Task task, Request request, ActionListener<Response> finalListener) { protected void doExecute(Task task, Request request, ActionListener<Response> finalListener) {
final ClusterState state = clusterService.state(); dataFrameTransformsConfigManager.expandTransformIds(request.getId(), request.getPageParams(), ActionListener.wrap(
final DiscoveryNodes nodes = state.nodes(); ids -> {
if (nodes.isLocalNodeElectedMaster()) { request.setExpandedIds(ids);
dataFrameTransformsConfigManager.expandTransformIds(request.getId(), request.getPageParams(), ActionListener.wrap( super.doExecute(task, request, ActionListener.wrap(
ids -> { response -> collectStatsForTransformsWithoutTasks(request, response, finalListener),
request.setExpandedIds(ids); finalListener::onFailure
super.doExecute(task, request, ActionListener.wrap( ));
response -> collectStatsForTransformsWithoutTasks(request, response, finalListener), },
finalListener::onFailure e -> {
)); // If the index to search, or the individual config is not there, just return empty
}, logger.error("failed to expand ids", e);
e -> { if (e instanceof ResourceNotFoundException) {
// If the index to search, or the individual config is not there, just return empty finalListener.onResponse(new Response(Collections.emptyList()));
if (e instanceof ResourceNotFoundException) { } else {
finalListener.onResponse(new Response(Collections.emptyList())); finalListener.onFailure(e);
} else {
finalListener.onFailure(e);
}
} }
));
} else {
// Delegates GetTransforms to elected master node, so it becomes the coordinating node.
// Non-master nodes may have a stale cluster state that shows transforms which are cancelled
// on the master, which makes testing difficult.
if (nodes.getMasterNode() == null) {
finalListener.onFailure(new MasterNotDiscoveredException("no known master nodes"));
} else {
transportService.sendRequest(nodes.getMasterNode(), actionName, request,
new ActionListenerResponseHandler<>(finalListener, Response::new));
} }
} ));
} }
private void collectStatsForTransformsWithoutTasks(Request request, private void collectStatsForTransformsWithoutTasks(Request request,
@ -172,10 +158,15 @@ public class TransportGetDataFrameTransformsStatsAction extends
searchResponse -> { searchResponse -> {
List<ElasticsearchException> nodeFailures = new ArrayList<>(response.getNodeFailures()); List<ElasticsearchException> nodeFailures = new ArrayList<>(response.getNodeFailures());
if (searchResponse.getShardFailures().length > 0) { if (searchResponse.getShardFailures().length > 0) {
String msg = "transform statistics document search returned shard failures: " + for(ShardSearchFailure shardSearchFailure : searchResponse.getShardFailures()) {
Arrays.toString(searchResponse.getShardFailures()); String nodeId = "";
logger.error(msg); if (shardSearchFailure.shard() != null) {
nodeFailures.add(new ElasticsearchException(msg)); nodeId = shardSearchFailure.shard().getNodeId();
}
nodeFailures.add(new FailedNodeException(nodeId, shardSearchFailure.toString(), shardSearchFailure.getCause()));
}
logger.error("transform statistics document search returned shard failures: {}",
Arrays.toString(searchResponse.getShardFailures()));
} }
List<DataFrameTransformStateAndStats> allStateAndStats = response.getTransformsStateAndStats(); List<DataFrameTransformStateAndStats> allStateAndStats = response.getTransformsStateAndStats();
for(SearchHit hit : searchResponse.getHits().getHits()) { for(SearchHit hit : searchResponse.getHits().getHits()) {

View File

@ -22,6 +22,7 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.license.XPackLicenseState;
@ -44,6 +45,7 @@ import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigMa
import org.elasticsearch.xpack.dataframe.persistence.DataframeIndex; import org.elasticsearch.xpack.dataframe.persistence.DataframeIndex;
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -82,7 +84,12 @@ public class TransportStartDataFrameTransformAction extends
@Override @Override
protected StartDataFrameTransformAction.Response newResponse() { protected StartDataFrameTransformAction.Response newResponse() {
return new StartDataFrameTransformAction.Response(false); throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
protected StartDataFrameTransformAction.Response read(StreamInput in) throws IOException {
return new StartDataFrameTransformAction.Response(in);
} }
@Override @Override