From abbdf232aa82d8e216c6b39172920988b74d142f Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 16 May 2017 14:34:44 +0100 Subject: [PATCH] [ML] Test ML with the Transport Client (elastic/x-pack-elasticsearch#1440) * Hide ML actions for tribe node client * Remove unused parameters * Enable ML actions and rest endpoints for the transport client * Create the ML components for the transport client * Add ml transport client tests Original commit: elastic/x-pack-elasticsearch@509007ca2911e1197021b4acdfc549d1a129c50d --- .../org/elasticsearch/xpack/XPackPlugin.java | 3 +- .../xpack/ml/MachineLearning.java | 28 ++- .../xpack/ml/action/DeleteFilterAction.java | 8 +- .../xpack/ml/action/GetFiltersAction.java | 5 +- .../xpack/ml/action/PutFilterAction.java | 4 +- .../ml/action/ValidateDetectorAction.java | 3 +- .../ml/action/ValidateJobConfigAction.java | 3 +- .../ml/client/MachineLearningClient.java | 26 +++ .../xpack/ml/client/MLTransportClientIT.java | 166 +++++++++++++++--- 9 files changed, 190 insertions(+), 56 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/XPackPlugin.java b/plugin/src/main/java/org/elasticsearch/xpack/XPackPlugin.java index 4d92f7a52ed..18b70d6e3b1 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/XPackPlugin.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/XPackPlugin.java @@ -278,8 +278,7 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I httpClient, httpTemplateParser, threadPool, clusterService, security.getCryptoService(), xContentRegistry, components)); - components.addAll(machineLearning.createComponents(internalClient, clusterService, threadPool, resourceWatcherService, - scriptService, xContentRegistry)); + components.addAll(machineLearning.createComponents(internalClient, clusterService, threadPool, xContentRegistry)); // just create the reloader as it will pull all of the loaded ssl configurations and start watching them new SSLConfigurationReloader(settings, env, sslService, resourceWatcherService); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 964175df95f..43d56d66d83 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -15,6 +15,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.inject.Module; +import org.elasticsearch.common.inject.util.Providers; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -29,12 +30,10 @@ import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; -import org.elasticsearch.script.ScriptService; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.XPackSettings; import org.elasticsearch.xpack.ml.action.CloseJobAction; @@ -192,7 +191,7 @@ public class MachineLearning implements ActionPlugin { } public Settings additionalSettings() { - if (enabled == false || this.transportClientMode || this.tribeNode || this.tribeNodeClient) { + if (enabled == false || transportClientMode || tribeNode || tribeNodeClient) { return Settings.EMPTY; } @@ -249,9 +248,8 @@ public class MachineLearning implements ActionPlugin { } public Collection createComponents(InternalClient internalClient, ClusterService clusterService, ThreadPool threadPool, - ResourceWatcherService resourceWatcherService, ScriptService scriptService, NamedXContentRegistry xContentRegistry) { - if (this.transportClientMode || this.tribeNodeClient) { + if (transportClientMode || tribeNodeClient) { return emptyList(); } @@ -260,17 +258,18 @@ public class MachineLearning implements ActionPlugin { // Even when ML is disabled the native controller will be running if it's installed, and it // prevents graceful shutdown on Windows unless we tell it to stop. Hence when disabled we // still return a lifecycle service that will tell the native controller to stop. - if (false == enabled || this.tribeNode) { + if (enabled == false || tribeNode) { return Collections.singletonList(mlLifeCycleService); } - JobResultsPersister jobResultsPersister = new JobResultsPersister(settings, internalClient); - JobProvider jobProvider = new JobProvider(internalClient, settings); - JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, internalClient); - Auditor auditor = new Auditor(internalClient, clusterService); + JobProvider jobProvider = new JobProvider(internalClient, settings); UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(settings, internalClient, clusterService, threadPool); JobManager jobManager = new JobManager(settings, jobProvider, clusterService, auditor, internalClient, notifier); + + JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, internalClient); + JobResultsPersister jobResultsPersister = new JobResultsPersister(settings, internalClient); + AutodetectProcessFactory autodetectProcessFactory; NormalizerProcessFactory normalizerProcessFactory; if (AUTODETECT_PROCESS.get(settings) && MachineLearningFeatureSet.isRunningOnMlPlatform(true)) { @@ -305,7 +304,6 @@ public class MachineLearning implements ActionPlugin { System::currentTimeMillis, auditor, persistentTasksService); InvalidLicenseEnforcer invalidLicenseEnforcer = new InvalidLicenseEnforcer(settings, licenseState, threadPool, datafeedManager, autodetectProcessManager); - PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Arrays.asList( new OpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager), new StartDatafeedAction.StartDatafeedPersistentTasksExecutor(settings, datafeedManager) @@ -332,7 +330,7 @@ public class MachineLearning implements ActionPlugin { public Collection nodeModules() { List modules = new ArrayList<>(); - if (transportClientMode) { + if (tribeNodeClient || transportClientMode) { return modules; } @@ -348,7 +346,7 @@ public class MachineLearning implements ActionPlugin { IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter, IndexNameExpressionResolver indexNameExpressionResolver, Supplier nodesInCluster) { - if (false == enabled || tribeNodeClient) { + if (false == enabled || tribeNodeClient || tribeNode) { return emptyList(); } return Arrays.asList( @@ -388,7 +386,7 @@ public class MachineLearning implements ActionPlugin { @Override public List> getActions() { - if (false == enabled) { + if (false == enabled || tribeNodeClient || tribeNode) { return emptyList(); } return Arrays.asList( @@ -433,7 +431,7 @@ public class MachineLearning implements ActionPlugin { } public List> getExecutorBuilders(Settings settings) { - if (false == enabled || tribeNode || tribeNodeClient) { + if (false == enabled || tribeNode || tribeNodeClient || transportClientMode) { return emptyList(); } int maxNumberOfJobs = AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteFilterAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteFilterAction.java index a8ef60ce58d..3418140dcad 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteFilterAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/DeleteFilterAction.java @@ -18,7 +18,6 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; -import org.elasticsearch.client.Client; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -32,12 +31,10 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.MlMetadata; -import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.MlFilter; import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; -import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import java.io.IOException; @@ -152,9 +149,8 @@ public class DeleteFilterAction extends Action { @Inject - public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, ValidateDetectorAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ValidateJobConfigAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ValidateJobConfigAction.java index 55fddcf6eed..ba75daab328 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ValidateJobConfigAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/ValidateJobConfigAction.java @@ -15,7 +15,6 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -152,7 +151,7 @@ extends Action { @Inject - public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, ValidateJobConfigAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/client/MachineLearningClient.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/client/MachineLearningClient.java index c249438af26..67edfabb11a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/client/MachineLearningClient.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/client/MachineLearningClient.java @@ -36,6 +36,8 @@ import org.elasticsearch.xpack.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.ml.action.UpdateDatafeedAction; import org.elasticsearch.xpack.ml.action.UpdateJobAction; import org.elasticsearch.xpack.ml.action.UpdateModelSnapshotAction; +import org.elasticsearch.xpack.ml.action.ValidateDetectorAction; +import org.elasticsearch.xpack.ml.action.ValidateJobConfigAction; public class MachineLearningClient { @@ -355,4 +357,28 @@ public class MachineLearningClient { client.execute(UpdateModelSnapshotAction.INSTANCE, request, listener); return listener; } + + public void validateDetector(ValidateDetectorAction.Request request, + ActionListener listener) { + client.execute(ValidateDetectorAction.INSTANCE, request, listener); + } + + public ActionFuture validateDetector( + ValidateDetectorAction.Request request) { + PlainActionFuture listener = PlainActionFuture.newFuture(); + client.execute(ValidateDetectorAction.INSTANCE, request, listener); + return listener; + } + + public void validateJobConfig(ValidateJobConfigAction.Request request, + ActionListener listener) { + client.execute(ValidateJobConfigAction.INSTANCE, request, listener); + } + + public ActionFuture validateJobConfig( + ValidateJobConfigAction.Request request) { + PlainActionFuture listener = PlainActionFuture.newFuture(); + client.execute(ValidateJobConfigAction.INSTANCE, request, listener); + return listener; + } } diff --git a/qa/transport-client-tests/src/test/java/org/elasticsearch/xpack/ml/client/MLTransportClientIT.java b/qa/transport-client-tests/src/test/java/org/elasticsearch/xpack/ml/client/MLTransportClientIT.java index 7ad2d189e83..7361b3292c8 100644 --- a/qa/transport-client-tests/src/test/java/org/elasticsearch/xpack/ml/client/MLTransportClientIT.java +++ b/qa/transport-client-tests/src/test/java/org/elasticsearch/xpack/ml/client/MLTransportClientIT.java @@ -6,29 +6,167 @@ package org.elasticsearch.xpack.ml.client; import org.elasticsearch.client.Client; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.xpack.XPackClient; +import org.elasticsearch.xpack.ml.action.CloseJobAction; import org.elasticsearch.xpack.ml.action.DeleteJobAction; +import org.elasticsearch.xpack.ml.action.FlushJobAction; +import org.elasticsearch.xpack.ml.action.GetBucketsAction; +import org.elasticsearch.xpack.ml.action.GetDatafeedsAction; import org.elasticsearch.xpack.ml.action.GetJobsAction; +import org.elasticsearch.xpack.ml.action.GetModelSnapshotsAction; +import org.elasticsearch.xpack.ml.action.OpenJobAction; +import org.elasticsearch.xpack.ml.action.PostDataAction; +import org.elasticsearch.xpack.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.ml.action.PutJobAction; +import org.elasticsearch.xpack.ml.action.StartDatafeedAction; +import org.elasticsearch.xpack.ml.action.StopDatafeedAction; +import org.elasticsearch.xpack.ml.action.UpdateModelSnapshotAction; +import org.elasticsearch.xpack.ml.action.ValidateDetectorAction; +import org.elasticsearch.xpack.ml.action.ValidateJobConfigAction; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; public class MLTransportClientIT extends ESXPackSmokeClientTestCase { - public void testMLTransportClient() { + public void testMLTransportClient_JobActions() { Client client = getClient(); XPackClient xPackClient = new XPackClient(client); MachineLearningClient mlClient = xPackClient.machineLearning(); + + String jobId = "ml-transport-client-it-job"; + Job.Builder job = createJob(jobId); + + PutJobAction.Response putJobResponse = mlClient.putJob(new PutJobAction.Request(job)).actionGet(); + assertThat(putJobResponse, notNullValue()); + assertThat(putJobResponse.isAcknowledged(), equalTo(true)); + + GetJobsAction.Response getJobResponse = mlClient.getJobs(new GetJobsAction.Request(jobId)).actionGet(); + assertThat(getJobResponse, notNullValue()); + assertThat(getJobResponse.getResponse(), notNullValue()); + assertThat(getJobResponse.getResponse().count(), equalTo(1L)); + + // Open job POST data, flush, close and check a result + OpenJobAction.Response openJobResponse = mlClient.openJob(new OpenJobAction.Request(jobId)).actionGet(); + assertThat(openJobResponse.isAcknowledged(), equalTo(true)); + + String content = "{\"time\":1000, \"msg\": \"some categorical message\"}\n" + + "{\"time\":11000, \"msg\": \"some categorical message in the second bucket\"}\n" + + "{\"time\":21000, \"msg\": \"some categorical message in the third bucket\"}\n"; + PostDataAction.Request postRequest = new PostDataAction.Request(jobId); + postRequest.setContent(new BytesArray(content), XContentType.JSON); + PostDataAction.Response postResponse = mlClient.postData(postRequest).actionGet(); + assertThat(postResponse.getDataCounts(), notNullValue()); + assertThat(postResponse.getDataCounts().getInputFieldCount(), equalTo(3L)); + + FlushJobAction.Response flushResponse = mlClient.flushJob(new FlushJobAction.Request(jobId)).actionGet(); + assertThat(flushResponse.isFlushed(), equalTo(true)); + + CloseJobAction.Response closeResponse = mlClient.closeJob(new CloseJobAction.Request(jobId)).actionGet(); + assertThat(closeResponse.isClosed(), equalTo(true)); + + GetBucketsAction.Response getBucketsResponse = mlClient.getBuckets(new GetBucketsAction.Request(jobId)).actionGet(); + assertThat(getBucketsResponse.getBuckets().count(), equalTo(1L)); + + // Update a model snapshot + GetModelSnapshotsAction.Response getModelSnapshotResponse = + mlClient.getModelSnapshots(new GetModelSnapshotsAction.Request(jobId, null)).actionGet(); + assertThat(getModelSnapshotResponse.getPage().count(), equalTo(1L)); + String snapshotId = getModelSnapshotResponse.getPage().results().get(0).getSnapshotId(); + + UpdateModelSnapshotAction.Request updateModelSnapshotRequest = new UpdateModelSnapshotAction.Request(jobId, snapshotId); + updateModelSnapshotRequest.setDescription("Changed description"); + UpdateModelSnapshotAction.Response updateModelSnapshotResponse = + mlClient.updateModelSnapshot(updateModelSnapshotRequest).actionGet(); + assertThat(updateModelSnapshotResponse.getModel(), notNullValue()); + assertThat(updateModelSnapshotResponse.getModel().getDescription(), equalTo("Changed description")); + + // and delete the job + DeleteJobAction.Response deleteJobResponse = mlClient.deleteJob(new DeleteJobAction.Request(jobId)).actionGet(); + assertThat(deleteJobResponse, notNullValue()); + assertThat(deleteJobResponse.isAcknowledged(), equalTo(true)); + } + + public void testMLTransportClient_ValidateActions() { + Client client = getClient(); + XPackClient xPackClient = new XPackClient(client); + MachineLearningClient mlClient = xPackClient.machineLearning(); + + Detector.Builder detector = new Detector.Builder(); + detector.setFunction("count"); + ValidateDetectorAction.Request validateDetectorRequest = new ValidateDetectorAction.Request(detector.build()); + ValidateDetectorAction.Response validateDetectorResponse = mlClient.validateDetector(validateDetectorRequest).actionGet(); + assertThat(validateDetectorResponse.isAcknowledged(), equalTo(true)); + + Job.Builder job = createJob("ml-transport-client-it-validate-job"); + ValidateJobConfigAction.Request validateJobRequest = new ValidateJobConfigAction.Request(job.build(new Date())); + ValidateJobConfigAction.Response validateJobResponse = mlClient.validateJobConfig(validateJobRequest).actionGet(); + assertThat(validateJobResponse.isAcknowledged(), equalTo(true)); + } + + + public void testMLTransportClient_DateFeedActions() { + Client client = getClient(); + XPackClient xPackClient = new XPackClient(client); + MachineLearningClient mlClient = xPackClient.machineLearning(); + + String jobId = "ml-transport-client-it-datafeed-job"; + Job.Builder job = createJob(jobId); + + PutJobAction.Response putJobResponse = mlClient.putJob(new PutJobAction.Request(job)).actionGet(); + assertThat(putJobResponse, notNullValue()); + assertThat(putJobResponse.isAcknowledged(), equalTo(true)); + + String datafeedId = "ml-transport-client-it-datafeed"; + DatafeedConfig.Builder datafeed = new DatafeedConfig.Builder(datafeedId, jobId); + String datafeedIndex = "ml-transport-client-test"; + String datatype = "type-bar"; + datafeed.setIndices(Collections.singletonList(datafeedIndex)); + datafeed.setTypes(Collections.singletonList("type-bar")); + + PutDatafeedAction.Response putDatafeedResponse = mlClient.putDatafeed(new PutDatafeedAction.Request(datafeed.build())).actionGet(); + assertThat(putDatafeedResponse.isAcknowledged(), equalTo(true)); + + GetDatafeedsAction.Response getDatafeedResponse = mlClient.getDatafeeds(new GetDatafeedsAction.Request(datafeedId)).actionGet(); + assertThat(getDatafeedResponse.getResponse(), notNullValue()); + + // Open job before starting the datafeed + OpenJobAction.Response openJobResponse = mlClient.openJob(new OpenJobAction.Request(jobId)).actionGet(); + assertThat(openJobResponse.isAcknowledged(), equalTo(true)); + + // create the index for the data feed + Map source = new HashMap<>(); + source.put("time", new Date()); + source.put("message", "some message"); + client.prepareIndex(datafeedIndex, datatype).setSource(source).get(); + + StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(datafeedId, new Date().getTime()); + StartDatafeedAction.Response startDataFeedResponse = mlClient.startDatafeed(startDatafeedRequest).actionGet(); + assertThat(startDataFeedResponse.isAcknowledged(), equalTo(true)); + + StopDatafeedAction.Response stopDataFeedResponse = mlClient.stopDatafeed(new StopDatafeedAction.Request(datafeedId)).actionGet(); + assertThat(stopDataFeedResponse.isStopped(), equalTo(true)); + } + + private Job.Builder createJob(String jobId) { Job.Builder job = new Job.Builder(); - job.setId("test"); + job.setId(jobId); List detectors = new ArrayList<>(); Detector.Builder detector = new Detector.Builder(); @@ -36,29 +174,9 @@ public class MLTransportClientIT extends ESXPackSmokeClientTestCase { detectors.add(detector.build()); AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(detectors); + analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(10L)); job.setAnalysisConfig(analysisConfig); - job.setDataDescription(new DataDescription.Builder()); - - PutJobAction.Response putJobResponse = mlClient - .putJob(new PutJobAction.Request(job)) - .actionGet(); - - assertThat(putJobResponse, notNullValue()); - assertThat(putJobResponse.isAcknowledged(), equalTo(true)); - - GetJobsAction.Response getJobResponse = mlClient.getJobs(new GetJobsAction.Request("test")) - .actionGet(); - - assertThat(getJobResponse, notNullValue()); - assertThat(getJobResponse.getResponse(), notNullValue()); - assertThat(getJobResponse.getResponse().count(), equalTo(1L)); - - DeleteJobAction.Response deleteJobResponse = mlClient - .deleteJob(new DeleteJobAction.Request("test")) - .actionGet(); - - assertThat(deleteJobResponse, notNullValue()); - assertThat(deleteJobResponse.isAcknowledged(), equalTo(true)); + return job; } }