From 99db6013eac9ac04eccfa23270d6732cd64ae00c Mon Sep 17 00:00:00 2001 From: Colin Goodheart-Smithe Date: Tue, 28 Mar 2017 09:05:55 +0100 Subject: [PATCH] [ML] Adds a test for ml transport client (elastic/x-pack-elasticsearch#851) This change also adds synchronous methods to `MachineLearningClient`. relates elastic/x-pack-elasticsearch#567 Original commit: elastic/x-pack-elasticsearch@b3a4b38a51963a878cb8d42d29cb81a271412ef9 --- .../org/elasticsearch/xpack/XPackClient.java | 15 +- .../ml/client/MachineLearningClient.java | 277 ++++++++++++++++-- qa/ml-single-node-tests/build.gradle | 2 - qa/transport-client-tests/build.gradle | 13 + .../ml/client/ESXPackSmokeClientTestCase.java | 154 ++++++++++ .../xpack/ml/client/MLTransportClientIT.java | 65 ++++ 6 files changed, 496 insertions(+), 30 deletions(-) create mode 100644 qa/transport-client-tests/build.gradle create mode 100644 qa/transport-client-tests/src/test/java/org/elasticsearch/xpack/ml/client/ESXPackSmokeClientTestCase.java create mode 100644 qa/transport-client-tests/src/test/java/org/elasticsearch/xpack/ml/client/MLTransportClientIT.java diff --git a/plugin/src/main/java/org/elasticsearch/xpack/XPackClient.java b/plugin/src/main/java/org/elasticsearch/xpack/XPackClient.java index 1e5878a3b47..95d22b0f35d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/XPackClient.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/XPackClient.java @@ -8,14 +8,15 @@ package org.elasticsearch.xpack; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.license.LicensingClient; +import org.elasticsearch.license.XPackInfoResponse; +import org.elasticsearch.xpack.action.XPackInfoAction; +import org.elasticsearch.xpack.action.XPackInfoRequest; +import org.elasticsearch.xpack.action.XPackInfoRequestBuilder; +import org.elasticsearch.xpack.ml.client.MachineLearningClient; import org.elasticsearch.xpack.monitoring.client.MonitoringClient; import org.elasticsearch.xpack.security.authc.support.SecuredString; import org.elasticsearch.xpack.security.client.SecurityClient; import org.elasticsearch.xpack.watcher.client.WatcherClient; -import org.elasticsearch.xpack.action.XPackInfoAction; -import org.elasticsearch.xpack.action.XPackInfoRequest; -import org.elasticsearch.xpack.action.XPackInfoRequestBuilder; -import org.elasticsearch.license.XPackInfoResponse; import java.util.Collections; import java.util.Map; @@ -31,6 +32,7 @@ public class XPackClient { private final MonitoringClient monitoringClient; private final SecurityClient securityClient; private final WatcherClient watcherClient; + private final MachineLearningClient machineLearning; public XPackClient(Client client) { this.client = client; @@ -38,6 +40,7 @@ public class XPackClient { this.monitoringClient = new MonitoringClient(client); this.securityClient = new SecurityClient(client); this.watcherClient = new WatcherClient(client); + this.machineLearning = new MachineLearningClient(client); } public Client es() { @@ -60,6 +63,10 @@ public class XPackClient { return watcherClient; } + public MachineLearningClient machineLearning() { + return machineLearning; + } + public XPackClient withHeaders(Map headers) { return new XPackClient(client.filterWithHeader(headers)); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/client/MachineLearningClient.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/client/MachineLearningClient.java index 33745389e43..7d9d4de6e36 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/client/MachineLearningClient.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/client/MachineLearningClient.java @@ -5,7 +5,9 @@ */ package org.elasticsearch.xpack.ml.client; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainListenableActionFuture; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.xpack.ml.action.CloseJobAction; import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction; @@ -43,114 +45,341 @@ public class MachineLearningClient { this.client = client; } - public void closeJob(CloseJobAction.Request request, ActionListener listener) { + public void closeJob(CloseJobAction.Request request, + ActionListener listener) { client.execute(CloseJobAction.INSTANCE, request, listener); } - public void deleteDatafeed(DeleteDatafeedAction.Request request, ActionListener listener) { + public ActionFuture closeJob(CloseJobAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(CloseJobAction.INSTANCE, request, listener); + return listener; + } + + public void deleteDatafeed(DeleteDatafeedAction.Request request, + ActionListener listener) { client.execute(DeleteDatafeedAction.INSTANCE, request, listener); } - public void deleteFilter(DeleteFilterAction.Request request, ActionListener listener) { + public ActionFuture deleteDatafeed( + DeleteDatafeedAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(DeleteDatafeedAction.INSTANCE, request, listener); + return listener; + } + + public void deleteFilter(DeleteFilterAction.Request request, + ActionListener listener) { client.execute(DeleteFilterAction.INSTANCE, request, listener); } - public void deleteJob(DeleteJobAction.Request request, ActionListener listener) { + public ActionFuture deleteFilter( + DeleteFilterAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(DeleteFilterAction.INSTANCE, request, listener); + return listener; + } + + public void deleteJob(DeleteJobAction.Request request, + ActionListener listener) { client.execute(DeleteJobAction.INSTANCE, request, listener); } + public ActionFuture deleteJob(DeleteJobAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(DeleteJobAction.INSTANCE, request, listener); + return listener; + } + public void deleteModelSnapshot(DeleteModelSnapshotAction.Request request, ActionListener listener) { client.execute(DeleteModelSnapshotAction.INSTANCE, request, listener); } - public void flushJob(FlushJobAction.Request request, ActionListener listener) { + public ActionFuture deleteModelSnapshot( + DeleteModelSnapshotAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(DeleteModelSnapshotAction.INSTANCE, request, listener); + return listener; + } + + public void flushJob(FlushJobAction.Request request, + ActionListener listener) { client.execute(FlushJobAction.INSTANCE, request, listener); } - public void getBuckets(GetBucketsAction.Request request, ActionListener listener) { + public ActionFuture flushJob(FlushJobAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(FlushJobAction.INSTANCE, request, listener); + return listener; + } + + public void getBuckets(GetBucketsAction.Request request, + ActionListener listener) { client.execute(GetBucketsAction.INSTANCE, request, listener); } - public void getCategories(GetCategoriesAction.Request request, ActionListener listener) { + public ActionFuture getBuckets(GetBucketsAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(GetBucketsAction.INSTANCE, request, listener); + return listener; + } + + public void getCategories(GetCategoriesAction.Request request, + ActionListener listener) { client.execute(GetCategoriesAction.INSTANCE, request, listener); } - public void getDatafeeds(GetDatafeedsAction.Request request, ActionListener listener) { + public ActionFuture getCategories( + GetCategoriesAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(GetCategoriesAction.INSTANCE, request, listener); + return listener; + } + + public void getDatafeeds(GetDatafeedsAction.Request request, + ActionListener listener) { client.execute(GetDatafeedsAction.INSTANCE, request, listener); } - public void getDatafeedsStats(GetDatafeedsStatsAction.Request request, ActionListener listener) { + public ActionFuture getDatafeeds( + GetDatafeedsAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(GetDatafeedsAction.INSTANCE, request, listener); + return listener; + } + + public void getDatafeedsStats(GetDatafeedsStatsAction.Request request, + ActionListener listener) { client.execute(GetDatafeedsStatsAction.INSTANCE, request, listener); } - public void getFilters(GetFiltersAction.Request request, ActionListener listener) { + public ActionFuture getDatafeedsStats( + GetDatafeedsStatsAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(GetDatafeedsStatsAction.INSTANCE, request, listener); + return listener; + } + + public void getFilters(GetFiltersAction.Request request, + ActionListener listener) { client.execute(GetFiltersAction.INSTANCE, request, listener); } - public void getInfluencers(GetInfluencersAction.Request request, ActionListener listener) { + public ActionFuture getFilters(GetFiltersAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(GetFiltersAction.INSTANCE, request, listener); + return listener; + } + + public void getInfluencers(GetInfluencersAction.Request request, + ActionListener listener) { client.execute(GetInfluencersAction.INSTANCE, request, listener); } - public void getJobs(GetJobsAction.Request request, ActionListener listener) { + public ActionFuture getInfluencers( + GetInfluencersAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(GetInfluencersAction.INSTANCE, request, listener); + return listener; + } + + public void getJobs(GetJobsAction.Request request, + ActionListener listener) { client.execute(GetJobsAction.INSTANCE, request, listener); } - public void getJobsStats(GetJobsStatsAction.Request request, ActionListener listener) { + public ActionFuture getJobs(GetJobsAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(GetJobsAction.INSTANCE, request, listener); + return listener; + } + + public void getJobsStats(GetJobsStatsAction.Request request, + ActionListener listener) { client.execute(GetJobsStatsAction.INSTANCE, request, listener); } - public void getModelSnapshots(GetModelSnapshotsAction.Request request, ActionListener listener) { + public ActionFuture getJobsStats( + GetJobsStatsAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(GetJobsStatsAction.INSTANCE, request, listener); + return listener; + } + + public void getModelSnapshots(GetModelSnapshotsAction.Request request, + ActionListener listener) { client.execute(GetModelSnapshotsAction.INSTANCE, request, listener); } - public void getRecords(GetRecordsAction.Request request, ActionListener listener) { + public ActionFuture getModelSnapshots( + GetModelSnapshotsAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(GetModelSnapshotsAction.INSTANCE, request, listener); + return listener; + } + + public void getRecords(GetRecordsAction.Request request, + ActionListener listener) { client.execute(GetRecordsAction.INSTANCE, request, listener); } - public void openJob(OpenJobAction.Request request, ActionListener listener) { + public ActionFuture getRecords(GetRecordsAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(GetRecordsAction.INSTANCE, request, listener); + return listener; + } + + public void openJob(OpenJobAction.Request request, + ActionListener listener) { client.execute(OpenJobAction.INSTANCE, request, listener); } - public void postData(PostDataAction.Request request, ActionListener listener) { + public ActionFuture openJob(OpenJobAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(OpenJobAction.INSTANCE, request, listener); + return listener; + } + + public void postData(PostDataAction.Request request, + ActionListener listener) { client.execute(PostDataAction.INSTANCE, request, listener); } - public void putDatafeed(PutDatafeedAction.Request request, ActionListener listener) { + public ActionFuture postData(PostDataAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(PostDataAction.INSTANCE, request, listener); + return listener; + } + + public void putDatafeed(PutDatafeedAction.Request request, + ActionListener listener) { client.execute(PutDatafeedAction.INSTANCE, request, listener); } - public void putFilter(PutFilterAction.Request request, ActionListener listener) { + public ActionFuture putDatafeed(PutDatafeedAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(PutDatafeedAction.INSTANCE, request, listener); + return listener; + } + + public void putFilter(PutFilterAction.Request request, + ActionListener listener) { client.execute(PutFilterAction.INSTANCE, request, listener); } - public void putJob(PutJobAction.Request request, ActionListener listener) { + public ActionFuture putFilter(PutFilterAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(PutFilterAction.INSTANCE, request, listener); + return listener; + } + + public void putJob(PutJobAction.Request request, + ActionListener listener) { client.execute(PutJobAction.INSTANCE, request, listener); } + public ActionFuture putJob(PutJobAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(PutJobAction.INSTANCE, request, listener); + return listener; + } + public void revertModelSnapshot(RevertModelSnapshotAction.Request request, ActionListener listener) { client.execute(RevertModelSnapshotAction.INSTANCE, request, listener); } - public void startDatafeed(StartDatafeedAction.Request request, ActionListener listener) { + public ActionFuture revertModelSnapshot( + RevertModelSnapshotAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(RevertModelSnapshotAction.INSTANCE, request, listener); + return listener; + } + + public void startDatafeed(StartDatafeedAction.Request request, + ActionListener listener) { client.execute(StartDatafeedAction.INSTANCE, request, listener); } - public void stopDatafeed(StopDatafeedAction.Request request, ActionListener listener) { + public ActionFuture startDatafeed( + StartDatafeedAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(StartDatafeedAction.INSTANCE, request, listener); + return listener; + } + + public void stopDatafeed(StopDatafeedAction.Request request, + ActionListener listener) { client.execute(StopDatafeedAction.INSTANCE, request, listener); } - public void updateDatafeed(UpdateDatafeedAction.Request request, ActionListener listener) { + public ActionFuture stopDatafeed( + StopDatafeedAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(StopDatafeedAction.INSTANCE, request, listener); + return listener; + } + + public void updateDatafeed(UpdateDatafeedAction.Request request, + ActionListener listener) { client.execute(UpdateDatafeedAction.INSTANCE, request, listener); } - public void updateJob(UpdateJobAction.Request request, ActionListener listener) { + public ActionFuture updateDatafeed( + UpdateDatafeedAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(UpdateDatafeedAction.INSTANCE, request, listener); + return listener; + } + + public void updateJob(UpdateJobAction.Request request, + ActionListener listener) { client.execute(UpdateJobAction.INSTANCE, request, listener); } + public ActionFuture updateJob(UpdateJobAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(UpdateJobAction.INSTANCE, request, listener); + return listener; + } + public void updateModelSnapshot(UpdateModelSnapshotAction.Request request, ActionListener listener) { client.execute(UpdateModelSnapshotAction.INSTANCE, request, listener); } + + public ActionFuture updateModelSnapshot( + UpdateModelSnapshotAction.Request request) { + PlainListenableActionFuture listener = + new PlainListenableActionFuture<>(client.threadPool()); + client.execute(UpdateModelSnapshotAction.INSTANCE, request, listener); + return listener; + } } diff --git a/qa/ml-single-node-tests/build.gradle b/qa/ml-single-node-tests/build.gradle index 12d303e11b7..54931c74e2f 100644 --- a/qa/ml-single-node-tests/build.gradle +++ b/qa/ml-single-node-tests/build.gradle @@ -7,8 +7,6 @@ dependencies { integTestCluster { setting 'xpack.security.enabled', 'false' - setting 'xpack.ml.enabled', 'true' setting 'script.inline', 'true' - distribution = 'zip' plugin ':x-pack-elasticsearch:plugin' } diff --git a/qa/transport-client-tests/build.gradle b/qa/transport-client-tests/build.gradle new file mode 100644 index 00000000000..2c985542fa7 --- /dev/null +++ b/qa/transport-client-tests/build.gradle @@ -0,0 +1,13 @@ +apply plugin: 'elasticsearch.standalone-rest-test' +apply plugin: 'elasticsearch.rest-test' + +dependencies { + testCompile project(path: ':x-pack-elasticsearch:plugin', configuration: 'runtime') + testCompile project(path: ':x-pack-elasticsearch:transport-client', configuration: 'runtime') +} + +integTestCluster { + setting 'xpack.security.enabled', 'false' + setting 'script.inline', 'true' + plugin ':x-pack-elasticsearch:plugin' +} diff --git a/qa/transport-client-tests/src/test/java/org/elasticsearch/xpack/ml/client/ESXPackSmokeClientTestCase.java b/qa/transport-client-tests/src/test/java/org/elasticsearch/xpack/ml/client/ESXPackSmokeClientTestCase.java new file mode 100644 index 00000000000..c77715431ec --- /dev/null +++ b/qa/transport-client-tests/src/test/java/org/elasticsearch/xpack/ml/client/ESXPackSmokeClientTestCase.java @@ -0,0 +1,154 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.client; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.LuceneTestCase; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.logging.ESLoggerFactory; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.env.Environment; +import org.elasticsearch.xpack.client.PreBuiltXPackTransportClient; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URL; +import java.nio.file.Path; +import java.util.Locale; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiOfLength; +import static org.hamcrest.Matchers.notNullValue; + +/** + * An abstract base class to run integration tests against an Elasticsearch + * cluster running outside of the test process. + *

+ * You can define a list of transport addresses from where you can reach your + * cluster by setting "tests.cluster" system property. It defaults to + * "localhost:9300". If you run this from `gradle integTest` then it will start + * the clsuter for you and set up the property. + *

+ * If you want to debug this module from your IDE, then start an external + * cluster by yourself, maybe with `gradle run`, then run JUnit. If you changed + * the default port, set "-Dtests.cluster=localhost:PORT" when running your + * test. + */ +@LuceneTestCase.SuppressSysoutChecks(bugUrl = "we log a lot on purpose") +public abstract class ESXPackSmokeClientTestCase extends LuceneTestCase { + + /** + * Key used to eventually switch to using an external cluster and provide + * its transport addresses + */ + public static final String TESTS_CLUSTER = "tests.cluster"; + + protected static final Logger logger = ESLoggerFactory + .getLogger(ESXPackSmokeClientTestCase.class.getName()); + + private static final AtomicInteger counter = new AtomicInteger(); + private static Client client; + private static String clusterAddresses; + protected String index; + + private static Client startClient(Path tempDir, TransportAddress... transportAddresses) { + Settings.Builder builder = Settings.builder() + .put("node.name", "qa_xpack_smoke_client_" + counter.getAndIncrement()) + .put("client.transport.ignore_cluster_name", true) + .put("xpack.security.enabled", false) + .put(Environment.PATH_HOME_SETTING.getKey(), tempDir); + TransportClient client = new PreBuiltXPackTransportClient(builder.build()) + .addTransportAddresses(transportAddresses); + + logger.info("--> Elasticsearch Java TransportClient started"); + + Exception clientException = null; + try { + ClusterHealthResponse health = client.admin().cluster().prepareHealth().get(); + logger.info("--> connected to [{}] cluster which is running [{}] node(s).", + health.getClusterName(), health.getNumberOfNodes()); + } catch (Exception e) { + logger.error("Error getting cluster health", e); + clientException = e; + } + + assumeNoException("Sounds like your cluster is not running at " + clusterAddresses, + clientException); + + return client; + } + + private static Client startClient() throws IOException { + String[] stringAddresses = clusterAddresses.split(","); + TransportAddress[] transportAddresses = new TransportAddress[stringAddresses.length]; + int i = 0; + for (String stringAddress : stringAddresses) { + URL url = new URL("http://" + stringAddress); + InetAddress inetAddress = InetAddress.getByName(url.getHost()); + transportAddresses[i++] = new TransportAddress( + new InetSocketAddress(inetAddress, url.getPort())); + } + return startClient(createTempDir(), transportAddresses); + } + + public static Client getClient() { + if (client == null) { + try { + client = startClient(); + } catch (IOException e) { + logger.error("can not start the client", e); + } + assertThat(client, notNullValue()); + } + return client; + } + + @BeforeClass + public static void initializeSettings() { + clusterAddresses = System.getProperty(TESTS_CLUSTER); + if (clusterAddresses == null || clusterAddresses.isEmpty()) { + fail("Must specify " + TESTS_CLUSTER + " for smoke client test"); + } + } + + @AfterClass + public static void stopTransportClient() { + if (client != null) { + client.close(); + client = null; + } + } + + @Before + public void defineIndexName() { + doClean(); + index = "qa-xpack-smoke-test-client-" + + randomAsciiOfLength(10).toLowerCase(Locale.getDefault()); + } + + @After + public void cleanIndex() { + doClean(); + } + + private void doClean() { + if (client != null) { + try { + client.admin().indices().prepareDelete(index).get(); + } catch (Exception e) { + // We ignore this cleanup exception + } + } + } +} diff --git a/qa/transport-client-tests/src/test/java/org/elasticsearch/xpack/ml/client/MLTransportClientIT.java b/qa/transport-client-tests/src/test/java/org/elasticsearch/xpack/ml/client/MLTransportClientIT.java new file mode 100644 index 00000000000..ce269795279 --- /dev/null +++ b/qa/transport-client-tests/src/test/java/org/elasticsearch/xpack/ml/client/MLTransportClientIT.java @@ -0,0 +1,65 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.client; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.XPackClient; +import org.elasticsearch.xpack.ml.action.DeleteJobAction; +import org.elasticsearch.xpack.ml.action.GetJobsAction; +import org.elasticsearch.xpack.ml.action.PutJobAction; +import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.ml.job.config.Detector; +import org.elasticsearch.xpack.ml.job.config.Job; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +public class MLTransportClientIT extends ESXPackSmokeClientTestCase { + + public void testMLTransportClient() { + Client client = getClient(); + XPackClient xPackClient = new XPackClient(client); + MachineLearningClient mlClient = xPackClient.machineLearning(); + Job.Builder job = new Job.Builder(); + job.setId("test"); + job.setCreateTime(new Date()); + + List detectors = new ArrayList<>(); + Detector.Builder detector = new Detector.Builder(); + detector.setFunction("count"); + detectors.add(detector.build()); + + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(detectors); + analysisConfig.setBatchSpan(TimeValue.timeValueMinutes(5)); + job.setAnalysisConfig(analysisConfig); + + PutJobAction.Response putJobResponse = mlClient + .putJob(new PutJobAction.Request(job.build())) + .actionGet(); + + assertThat(putJobResponse, notNullValue()); + assertThat(putJobResponse.isAcknowledged(), equalTo(true)); + + GetJobsAction.Response getJobResponse = mlClient.getJobs(new GetJobsAction.Request("test")) + .actionGet(); + + assertThat(getJobResponse, notNullValue()); + assertThat(getJobResponse.getResponse(), notNullValue()); + assertThat(getJobResponse.getResponse().count(), equalTo(1L)); + + DeleteJobAction.Response deleteJobResponse = mlClient + .deleteJob(new DeleteJobAction.Request("test")) + .actionGet(); + + assertThat(deleteJobResponse, notNullValue()); + assertThat(deleteJobResponse.isAcknowledged(), equalTo(true)); + } +}