[ML-DataFrame]backport dataframe changes from 42202, using client instead of transport (#42468)

backport dataframe changes from #42202, using client instead of transport
This commit is contained in:
Hendrik Muhs 2019-05-24 11:05:30 +02:00 committed by GitHub
parent f472186b9f
commit 7cee294acf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 171 additions and 175 deletions

View File

@ -5,6 +5,7 @@ dependencies {
testCompile project(path: xpackModule('core'), configuration: 'default')
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
testCompile project(path: xpackModule('data-frame'), configuration: 'runtime')
testCompile "org.elasticsearch.client:elasticsearch-rest-high-level-client:${versions.elasticsearch}"
}
// location for keys and certificates

View File

@ -7,16 +7,36 @@
package org.elasticsearch.xpack.dataframe.integration;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.AcknowledgedResponse;
import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsRequest;
import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsResponse;
import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StartDataFrameTransformResponse;
import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StopDataFrameTransformResponse;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.client.dataframe.transforms.DestConfig;
import org.elasticsearch.client.dataframe.transforms.QueryConfig;
import org.elasticsearch.client.dataframe.transforms.SourceConfig;
import org.elasticsearch.client.dataframe.transforms.pivot.AggregationConfig;
import org.elasticsearch.client.dataframe.transforms.pivot.DateHistogramGroupSource;
import org.elasticsearch.client.dataframe.transforms.pivot.GroupConfig;
import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfig;
import org.elasticsearch.client.dataframe.transforms.pivot.SingleGroupSource;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
@ -26,36 +46,15 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.SecuritySettingsSourceField;
import org.elasticsearch.transport.Netty4Plugin;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.XPackClientPlugin;
import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DestConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.QueryConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.SourceConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.AggregationConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.DateHistogramGroupSource;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.GroupConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.SingleGroupSource;
import org.elasticsearch.xpack.core.security.SecurityField;
import org.elasticsearch.test.rest.ESRestTestCase;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collection;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -64,18 +63,18 @@ import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.core.Is.is;
abstract class DataFrameIntegTestCase extends ESIntegTestCase {
abstract class DataFrameIntegTestCase extends ESRestTestCase {
protected static final String REVIEWS_INDEX_NAME = "data_frame_reviews";
private Map<String, DataFrameTransformConfig> transformConfigs = new HashMap<>();
protected void cleanUp() {
protected void cleanUp() throws IOException {
cleanUpTransforms();
waitForPendingTasks();
}
protected void cleanUpTransforms() {
protected void cleanUpTransforms() throws IOException {
for (DataFrameTransformConfig config : transformConfigs.values()) {
stopDataFrameTransform(config.getId());
deleteDataFrameTransform(config.getId());
@ -83,41 +82,42 @@ abstract class DataFrameIntegTestCase extends ESIntegTestCase {
transformConfigs.clear();
}
protected StopDataFrameTransformAction.Response stopDataFrameTransform(String id) {
return client().execute(StopDataFrameTransformAction.INSTANCE,
new StopDataFrameTransformAction.Request(id, true, false, null)).actionGet();
protected StopDataFrameTransformResponse stopDataFrameTransform(String id) throws IOException {
RestHighLevelClient restClient = new TestRestHighLevelClient();
return restClient.dataFrame().stopDataFrameTransform(new StopDataFrameTransformRequest(id, true, null), RequestOptions.DEFAULT);
}
protected StartDataFrameTransformAction.Response startDataFrameTransform(String id) {
return client().execute(StartDataFrameTransformAction.INSTANCE,
new StartDataFrameTransformAction.Request(id, false)).actionGet();
protected StartDataFrameTransformResponse startDataFrameTransform(String id, RequestOptions options) throws IOException {
RestHighLevelClient restClient = new TestRestHighLevelClient();
return restClient.dataFrame().startDataFrameTransform(new StartDataFrameTransformRequest(id), options);
}
protected AcknowledgedResponse deleteDataFrameTransform(String id) {
AcknowledgedResponse response = client().execute(DeleteDataFrameTransformAction.INSTANCE,
new DeleteDataFrameTransformAction.Request(id))
.actionGet();
protected AcknowledgedResponse deleteDataFrameTransform(String id) throws IOException {
RestHighLevelClient restClient = new TestRestHighLevelClient();
AcknowledgedResponse response =
restClient.dataFrame().deleteDataFrameTransform(new DeleteDataFrameTransformRequest(id), RequestOptions.DEFAULT);
if (response.isAcknowledged()) {
transformConfigs.remove(id);
}
return response;
}
protected AcknowledgedResponse putDataFrameTransform(DataFrameTransformConfig config) {
protected AcknowledgedResponse putDataFrameTransform(DataFrameTransformConfig config, RequestOptions options) throws IOException {
if (transformConfigs.keySet().contains(config.getId())) {
throw new IllegalArgumentException("data frame transform [" + config.getId() + "] is already registered");
}
AcknowledgedResponse response = client().execute(PutDataFrameTransformAction.INSTANCE,
new PutDataFrameTransformAction.Request(config))
.actionGet();
RestHighLevelClient restClient = new TestRestHighLevelClient();
AcknowledgedResponse response =
restClient.dataFrame().putDataFrameTransform(new PutDataFrameTransformRequest(config), options);
if (response.isAcknowledged()) {
transformConfigs.put(config.getId(), config);
}
return response;
}
protected GetDataFrameTransformsStatsAction.Response getDataFrameTransformStats(String id) {
return client().execute(GetDataFrameTransformsStatsAction.INSTANCE, new GetDataFrameTransformsStatsAction.Request(id)).actionGet();
protected GetDataFrameTransformStatsResponse getDataFrameTransformStats(String id) throws IOException {
RestHighLevelClient restClient = new TestRestHighLevelClient();
return restClient.dataFrame().getDataFrameTransformStats(new GetDataFrameTransformStatsRequest(id), RequestOptions.DEFAULT);
}
protected void waitUntilCheckpoint(String id, long checkpoint) throws Exception {
@ -136,38 +136,40 @@ abstract class DataFrameIntegTestCase extends ESIntegTestCase {
}
protected DateHistogramGroupSource createDateHistogramGroupSource(String field, long interval, ZoneId zone, String format) {
DateHistogramGroupSource source = new DateHistogramGroupSource(field);
source.setFormat(format);
source.setInterval(interval);
source.setTimeZone(zone);
return source;
DateHistogramGroupSource.Builder builder = DateHistogramGroupSource.builder()
.setField(field)
.setFormat(format)
.setInterval(interval)
.setTimeZone(zone);
return builder.build();
}
protected DateHistogramGroupSource createDateHistogramGroupSource(String field,
DateHistogramInterval interval,
ZoneId zone,
String format) {
DateHistogramGroupSource source = new DateHistogramGroupSource(field);
source.setFormat(format);
source.setDateHistogramInterval(interval);
source.setTimeZone(zone);
return source;
DateHistogramGroupSource.Builder builder = DateHistogramGroupSource.builder()
.setField(field)
.setFormat(format)
.setDateHistgramInterval(interval)
.setTimeZone(zone);
return builder.build();
}
protected GroupConfig createGroupConfig(Map<String, SingleGroupSource> groups) throws Exception {
Map<String, Object> lazyParsed = new HashMap<>(groups.size());
for(Map.Entry<String, SingleGroupSource> sgs : groups.entrySet()) {
lazyParsed.put(sgs.getKey(), Collections.singletonMap(sgs.getValue().getType().value(), toLazy(sgs.getValue())));
GroupConfig.Builder builder = GroupConfig.builder();
for (Map.Entry<String, SingleGroupSource> sgs : groups.entrySet()) {
builder.groupBy(sgs.getKey(), sgs.getValue());
}
return new GroupConfig(lazyParsed, groups);
return builder.build();
}
protected QueryConfig createQueryConfig(QueryBuilder queryBuilder) throws Exception {
return new QueryConfig(toLazy(queryBuilder), queryBuilder);
return new QueryConfig(queryBuilder);
}
protected AggregationConfig createAggConfig(AggregatorFactories.Builder aggregations) throws Exception {
return new AggregationConfig(toLazy(aggregations), aggregations);
return new AggregationConfig(aggregations);
}
protected PivotConfig createPivotConfig(Map<String, SingleGroupSource> groups,
@ -178,7 +180,11 @@ abstract class DataFrameIntegTestCase extends ESIntegTestCase {
protected PivotConfig createPivotConfig(Map<String, SingleGroupSource> groups,
AggregatorFactories.Builder aggregations,
Integer size) throws Exception {
return new PivotConfig(createGroupConfig(groups), createAggConfig(aggregations), size);
PivotConfig.Builder builder = PivotConfig.builder()
.setGroups(createGroupConfig(groups))
.setAggregationConfig(createAggConfig(aggregations))
.setMaxPageSearchSize(size);
return builder.build();
}
protected DataFrameTransformConfig createTransformConfig(String id,
@ -195,16 +201,18 @@ abstract class DataFrameIntegTestCase extends ESIntegTestCase {
String destinationIndex,
QueryBuilder queryBuilder,
String... sourceIndices) throws Exception {
return new DataFrameTransformConfig(id,
new SourceConfig(sourceIndices, createQueryConfig(queryBuilder)),
new DestConfig(destinationIndex),
Collections.emptyMap(),
createPivotConfig(groups, aggregations),
"Test data frame transform config id: " + id);
return DataFrameTransformConfig.builder()
.setId(id)
.setSource(SourceConfig.builder().setIndex(sourceIndices).setQueryConfig(createQueryConfig(queryBuilder)).build())
.setDest(new DestConfig(destinationIndex))
.setPivotConfig(createPivotConfig(groups, aggregations))
.setDescription("Test data frame transform config id: " + id)
.build();
}
protected void createReviewsIndex() throws Exception {
final int numDocs = 1000;
RestHighLevelClient restClient = new TestRestHighLevelClient();
// create mapping
try (XContentBuilder builder = jsonBuilder()) {
@ -229,16 +237,13 @@ abstract class DataFrameIntegTestCase extends ESIntegTestCase {
.endObject();
}
builder.endObject();
CreateIndexResponse response = client().admin()
.indices()
.prepareCreate(REVIEWS_INDEX_NAME)
.addMapping("_doc", builder)
.get();
CreateIndexResponse response =
restClient.indices().create(new CreateIndexRequest(REVIEWS_INDEX_NAME).mapping(builder), RequestOptions.DEFAULT);
assertThat(response.isAcknowledged(), is(true));
}
// create index
BulkRequestBuilder bulk = client().prepareBulk(REVIEWS_INDEX_NAME, "_doc");
BulkRequest bulk = new BulkRequest(REVIEWS_INDEX_NAME);
int day = 10;
for (int i = 0; i < numDocs; i++) {
long user = i % 28;
@ -267,15 +272,15 @@ abstract class DataFrameIntegTestCase extends ESIntegTestCase {
bulk.add(new IndexRequest().source(sourceBuilder.toString(), XContentType.JSON));
if (i % 50 == 0) {
BulkResponse response = client().bulk(bulk.request()).get();
BulkResponse response = restClient.bulk(bulk, RequestOptions.DEFAULT);
assertThat(response.buildFailureMessage(), response.hasFailures(), is(false));
bulk = client().prepareBulk(REVIEWS_INDEX_NAME, "_doc");
bulk = new BulkRequest(REVIEWS_INDEX_NAME);
day += 1;
}
}
BulkResponse response = client().bulk(bulk.request()).get();
BulkResponse response = restClient.bulk(bulk, RequestOptions.DEFAULT);
assertThat(response.buildFailureMessage(), response.hasFailures(), is(false));
client().admin().indices().prepareRefresh(REVIEWS_INDEX_NAME).get();
restClient.indices().refresh(new RefreshRequest(REVIEWS_INDEX_NAME), RequestOptions.DEFAULT);
}
protected Map<String, Object> toLazy(ToXContent parsedObject) throws Exception {
@ -293,8 +298,9 @@ abstract class DataFrameIntegTestCase extends ESIntegTestCase {
listTasksRequest.setWaitForCompletion(true);
listTasksRequest.setDetailed(true);
listTasksRequest.setTimeout(TimeValue.timeValueSeconds(10));
RestHighLevelClient restClient = new TestRestHighLevelClient();
try {
admin().cluster().listTasks(listTasksRequest).get();
restClient.tasks().list(listTasksRequest, RequestOptions.DEFAULT);
} catch (Exception e) {
throw new AssertionError("Failed to wait for pending tasks to complete", e);
}
@ -307,33 +313,17 @@ abstract class DataFrameIntegTestCase extends ESIntegTestCase {
}
@Override
protected Settings externalClusterClientSettings() {
Path key;
Path certificate;
try {
key = PathUtils.get(getClass().getResource("/testnode.pem").toURI());
certificate = PathUtils.get(getClass().getResource("/testnode.crt").toURI());
} catch (URISyntaxException e) {
throw new IllegalStateException("error trying to get keystore path", e);
protected Settings restClientSettings() {
final String token = "Basic " +
Base64.getEncoder().encodeToString(("x_pack_rest_user:x-pack-test-password").getBytes(StandardCharsets.UTF_8));
return Settings.builder()
.put(ThreadContext.PREFIX + ".Authorization", token)
.build();
}
private class TestRestHighLevelClient extends RestHighLevelClient {
TestRestHighLevelClient() {
super(client(), restClient -> {}, Collections.emptyList());
}
Settings.Builder builder = Settings.builder();
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, SecurityField.NAME4);
builder.put(SecurityField.USER_SETTING.getKey(), "x_pack_rest_user:" + SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING);
builder.put("xpack.security.transport.ssl.enabled", true);
builder.put("xpack.security.transport.ssl.key", key.toAbsolutePath().toString());
builder.put("xpack.security.transport.ssl.certificate", certificate.toAbsolutePath().toString());
builder.put("xpack.security.transport.ssl.key_passphrase", "testnode");
builder.put("xpack.security.transport.ssl.verification_mode", "certificate");
return builder.build();
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(LocalStateCompositeXPackPlugin.class, Netty4Plugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Arrays.asList(XPackClientPlugin.class, Netty4Plugin.class);
}
}

View File

@ -6,16 +6,18 @@
package org.elasticsearch.xpack.dataframe.integration;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.core.IndexerState;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.client.dataframe.transforms.pivot.SingleGroupSource;
import org.elasticsearch.client.dataframe.transforms.pivot.TermsGroupSource;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.SingleGroupSource;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.TermsGroupSource;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.junit.After;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@ -24,7 +26,7 @@ import static org.hamcrest.Matchers.equalTo;
public class DataFrameTransformIT extends DataFrameIntegTestCase {
@After
public void cleanTransforms() {
public void cleanTransforms() throws IOException {
cleanUp();
}
@ -34,8 +36,8 @@ public class DataFrameTransformIT extends DataFrameIntegTestCase {
Map<String, SingleGroupSource> groups = new HashMap<>();
groups.put("by-day", createDateHistogramGroupSource("timestamp", DateHistogramInterval.DAY, null, null));
groups.put("by-user", new TermsGroupSource("user_id"));
groups.put("by-business", new TermsGroupSource("business_id"));
groups.put("by-user", TermsGroupSource.builder().setField("user_id").build());
groups.put("by-business", TermsGroupSource.builder().setField("business_id").build());
AggregatorFactories.Builder aggs = AggregatorFactories.builder()
.addAggregator(AggregationBuilders.avg("review_score").field("stars"))
@ -47,8 +49,10 @@ public class DataFrameTransformIT extends DataFrameIntegTestCase {
"reviews-by-user-business-day",
REVIEWS_INDEX_NAME);
assertTrue(putDataFrameTransform(config).isAcknowledged());
assertTrue(startDataFrameTransform(config.getId()).isStarted());
final RequestOptions options =
expectWarnings("[interval] on [date_histogram] is deprecated, use [fixed_interval] or [calendar_interval] in the future.");
assertTrue(putDataFrameTransform(config, options).isAcknowledged());
assertTrue(startDataFrameTransform(config.getId(), options).isStarted());
waitUntilCheckpoint(config.getId(), 1L);

View File

@ -5,6 +5,7 @@ dependencies {
testCompile project(path: xpackModule('core'), configuration: 'default')
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
testCompile project(path: xpackModule('data-frame'), configuration: 'runtime')
testCompile "org.elasticsearch.client:elasticsearch-rest-high-level-client:${versions.elasticsearch}"
}
integTestCluster {

View File

@ -7,24 +7,23 @@
package org.elasticsearch.xpack.dataframe.integration;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.SecuritySettingsSourceField;
import org.elasticsearch.transport.Netty4Plugin;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.XPackClientPlugin;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress;
import org.elasticsearch.xpack.core.dataframe.transforms.DestConfig;
@ -34,11 +33,10 @@ import org.elasticsearch.xpack.core.dataframe.transforms.pivot.AggregationConfig
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.GroupConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.HistogramGroupSource;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfig;
import org.elasticsearch.xpack.core.security.SecurityField;
import org.elasticsearch.xpack.dataframe.transforms.TransformProgressGatherer;
import java.util.Arrays;
import java.util.Collection;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collections;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -47,10 +45,11 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameTransformProgressIT extends ESIntegTestCase {
public class DataFrameTransformProgressIT extends ESRestTestCase {
protected void createReviewsIndex() throws Exception {
final int numDocs = 1000;
final RestHighLevelClient restClient = new TestRestHighLevelClient();
// create mapping
try (XContentBuilder builder = jsonBuilder()) {
@ -75,16 +74,13 @@ public class DataFrameTransformProgressIT extends ESIntegTestCase {
.endObject();
}
builder.endObject();
CreateIndexResponse response = client().admin()
.indices()
.prepareCreate(REVIEWS_INDEX_NAME)
.addMapping("_doc", builder)
.get();
CreateIndexResponse response = restClient.indices()
.create(new CreateIndexRequest(REVIEWS_INDEX_NAME).mapping(builder), RequestOptions.DEFAULT);
assertThat(response.isAcknowledged(), is(true));
}
// create index
BulkRequestBuilder bulk = client().prepareBulk(REVIEWS_INDEX_NAME, "_doc");
BulkRequest bulk = new BulkRequest(REVIEWS_INDEX_NAME);
int day = 10;
for (int i = 0; i < numDocs; i++) {
long user = i % 28;
@ -113,14 +109,14 @@ public class DataFrameTransformProgressIT extends ESIntegTestCase {
bulk.add(new IndexRequest().source(sourceBuilder.toString(), XContentType.JSON));
if (i % 50 == 0) {
BulkResponse response = client().bulk(bulk.request()).get();
BulkResponse response = restClient.bulk(bulk, RequestOptions.DEFAULT);
assertThat(response.buildFailureMessage(), response.hasFailures(), is(false));
bulk = client().prepareBulk(REVIEWS_INDEX_NAME, "_doc");
bulk = new BulkRequest(REVIEWS_INDEX_NAME);
day += 1;
}
}
client().bulk(bulk.request()).get();
client().admin().indices().prepareRefresh(REVIEWS_INDEX_NAME).get();
restClient.bulk(bulk, RequestOptions.DEFAULT);
restClient.indices().refresh(new RefreshRequest(REVIEWS_INDEX_NAME), RequestOptions.DEFAULT);
}
public void testGetProgress() throws Exception {
@ -140,10 +136,11 @@ public class DataFrameTransformProgressIT extends ESIntegTestCase {
pivotConfig,
null);
PlainActionFuture<DataFrameTransformProgress> progressFuture = new PlainActionFuture<>();
TransformProgressGatherer.getInitialProgress(client(), config, progressFuture);
final RestHighLevelClient restClient = new TestRestHighLevelClient();
SearchResponse response = restClient.search(TransformProgressGatherer.getSearchRequest(config), RequestOptions.DEFAULT);
DataFrameTransformProgress progress = progressFuture.get();
DataFrameTransformProgress progress =
TransformProgressGatherer.searchResponseToDataFrameTransformProgressFunction().apply(response);
assertThat(progress.getTotalDocs(), equalTo(1000L));
assertThat(progress.getRemainingDocs(), equalTo(1000L));
@ -160,34 +157,28 @@ public class DataFrameTransformProgressIT extends ESIntegTestCase {
pivotConfig,
null);
progressFuture = new PlainActionFuture<>();
TransformProgressGatherer.getInitialProgress(client(), config, progressFuture);
progress = progressFuture.get();
response = restClient.search(TransformProgressGatherer.getSearchRequest(config), RequestOptions.DEFAULT);
progress = TransformProgressGatherer.searchResponseToDataFrameTransformProgressFunction().apply(response);
assertThat(progress.getTotalDocs(), equalTo(35L));
assertThat(progress.getRemainingDocs(), equalTo(35L));
assertThat(progress.getPercentComplete(), equalTo(0.0));
client().admin().indices().prepareDelete(REVIEWS_INDEX_NAME).get();
deleteIndex(REVIEWS_INDEX_NAME);
}
@Override
protected Settings externalClusterClientSettings() {
Settings.Builder builder = Settings.builder();
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, SecurityField.NAME4);
builder.put(SecurityField.USER_SETTING.getKey(), "x_pack_rest_user:" + SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING);
return builder.build();
protected Settings restClientSettings() {
final String token = "Basic " +
Base64.getEncoder().encodeToString(("x_pack_rest_user:x-pack-test-password").getBytes(StandardCharsets.UTF_8));
return Settings.builder()
.put(ThreadContext.PREFIX + ".Authorization", token)
.build();
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(LocalStateCompositeXPackPlugin.class, Netty4Plugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Arrays.asList(XPackClientPlugin.class, Netty4Plugin.class);
private class TestRestHighLevelClient extends RestHighLevelClient {
TestRestHighLevelClient() {
super(client(), restClient -> {}, Collections.emptyList());
}
}
}

View File

@ -11,10 +11,13 @@ import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress;
import java.util.function.Function;
/**
* Utility class to gather the progress information for a given config and its cursor position
*/
@ -29,17 +32,10 @@ public final class TransformProgressGatherer {
public static void getInitialProgress(Client client,
DataFrameTransformConfig config,
ActionListener<DataFrameTransformProgress> progressListener) {
SearchRequest request = client.prepareSearch(config.getSource().getIndex())
.setSize(0)
.setAllowPartialSearchResults(false)
.setTrackTotalHits(true)
.setQuery(config.getSource().getQueryConfig().getQuery())
.request();
SearchRequest request = getSearchRequest(config);
ActionListener<SearchResponse> searchResponseActionListener = ActionListener.wrap(
searchResponse -> {
progressListener.onResponse(new DataFrameTransformProgress(searchResponse.getHits().getTotalHits().value, null));
},
searchResponse -> progressListener.onResponse(searchResponseToDataFrameTransformProgressFunction().apply(searchResponse)),
progressListener::onFailure
);
ClientHelper.executeWithHeadersAsync(config.getHeaders(),
@ -50,4 +46,17 @@ public final class TransformProgressGatherer {
searchResponseActionListener);
}
public static SearchRequest getSearchRequest(DataFrameTransformConfig config) {
SearchRequest request = new SearchRequest(config.getSource().getIndex());
request.allowPartialSearchResults(false);
request.source(new SearchSourceBuilder()
.size(0)
.trackTotalHits(true)
.query(config.getSource().getQueryConfig().getQuery()));
return request;
}
public static Function<SearchResponse, DataFrameTransformProgress> searchResponseToDataFrameTransformProgressFunction() {
return searchResponse -> new DataFrameTransformProgress(searchResponse.getHits().getTotalHits().value, null);
}
}