[ML] Test ML with the Transport Client (elastic/x-pack-elasticsearch#1440)

* Hide ML actions for tribe node client
* Remove unused parameters
* Enable ML actions and rest endpoints for the transport client
* Create the ML components for the transport client
* Add ml transport client tests

Original commit: elastic/x-pack-elasticsearch@509007ca29
This commit is contained in:
David Kyle 2017-05-16 14:34:44 +01:00 committed by GitHub
parent dda456fb76
commit abbdf232aa
9 changed files with 190 additions and 56 deletions

View File

@ -278,8 +278,7 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
httpClient, httpTemplateParser, threadPool, clusterService, security.getCryptoService(), xContentRegistry, components));
components.addAll(machineLearning.createComponents(internalClient, clusterService, threadPool, resourceWatcherService,
scriptService, xContentRegistry));
components.addAll(machineLearning.createComponents(internalClient, clusterService, threadPool, xContentRegistry));
// just create the reloader as it will pull all of the loaded ssl configurations and start watching them
new SSLConfigurationReloader(settings, env, sslService, resourceWatcherService);

View File

@ -15,6 +15,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
@ -29,12 +30,10 @@ import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.ml.action.CloseJobAction;
@ -192,7 +191,7 @@ public class MachineLearning implements ActionPlugin {
}
public Settings additionalSettings() {
if (enabled == false || this.transportClientMode || this.tribeNode || this.tribeNodeClient) {
if (enabled == false || transportClientMode || tribeNode || tribeNodeClient) {
return Settings.EMPTY;
}
@ -249,9 +248,8 @@ public class MachineLearning implements ActionPlugin {
}
public Collection<Object> createComponents(InternalClient internalClient, ClusterService clusterService, ThreadPool threadPool,
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry) {
if (this.transportClientMode || this.tribeNodeClient) {
if (transportClientMode || tribeNodeClient) {
return emptyList();
}
@ -260,17 +258,18 @@ public class MachineLearning implements ActionPlugin {
// Even when ML is disabled the native controller will be running if it's installed, and it
// prevents graceful shutdown on Windows unless we tell it to stop. Hence when disabled we
// still return a lifecycle service that will tell the native controller to stop.
if (false == enabled || this.tribeNode) {
if (enabled == false || tribeNode) {
return Collections.singletonList(mlLifeCycleService);
}
JobResultsPersister jobResultsPersister = new JobResultsPersister(settings, internalClient);
JobProvider jobProvider = new JobProvider(internalClient, settings);
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, internalClient);
Auditor auditor = new Auditor(internalClient, clusterService);
JobProvider jobProvider = new JobProvider(internalClient, settings);
UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(settings, internalClient, clusterService, threadPool);
JobManager jobManager = new JobManager(settings, jobProvider, clusterService, auditor, internalClient, notifier);
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, internalClient);
JobResultsPersister jobResultsPersister = new JobResultsPersister(settings, internalClient);
AutodetectProcessFactory autodetectProcessFactory;
NormalizerProcessFactory normalizerProcessFactory;
if (AUTODETECT_PROCESS.get(settings) && MachineLearningFeatureSet.isRunningOnMlPlatform(true)) {
@ -305,7 +304,6 @@ public class MachineLearning implements ActionPlugin {
System::currentTimeMillis, auditor, persistentTasksService);
InvalidLicenseEnforcer invalidLicenseEnforcer =
new InvalidLicenseEnforcer(settings, licenseState, threadPool, datafeedManager, autodetectProcessManager);
PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Arrays.asList(
new OpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager),
new StartDatafeedAction.StartDatafeedPersistentTasksExecutor(settings, datafeedManager)
@ -332,7 +330,7 @@ public class MachineLearning implements ActionPlugin {
public Collection<Module> nodeModules() {
List<Module> modules = new ArrayList<>();
if (transportClientMode) {
if (tribeNodeClient || transportClientMode) {
return modules;
}
@ -348,7 +346,7 @@ public class MachineLearning implements ActionPlugin {
IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster) {
if (false == enabled || tribeNodeClient) {
if (false == enabled || tribeNodeClient || tribeNode) {
return emptyList();
}
return Arrays.asList(
@ -388,7 +386,7 @@ public class MachineLearning implements ActionPlugin {
@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
if (false == enabled) {
if (false == enabled || tribeNodeClient || tribeNode) {
return emptyList();
}
return Arrays.asList(
@ -433,7 +431,7 @@ public class MachineLearning implements ActionPlugin {
}
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
if (false == enabled || tribeNode || tribeNodeClient) {
if (false == enabled || tribeNode || tribeNodeClient || transportClientMode) {
return emptyList();
}
int maxNumberOfJobs = AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings);

View File

@ -18,7 +18,6 @@ import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -32,12 +31,10 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
@ -152,9 +149,8 @@ public class DeleteFilterAction extends Action<DeleteFilterAction.Request, Delet
@Inject
public TransportAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, JobProvider jobProvider,
JobManager jobManager, Client client, ClusterService clusterService,
TransportBulkAction transportAction) {
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, TransportBulkAction transportAction) {
super(settings, NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, Request::new);
this.clusterService = clusterService;

View File

@ -225,9 +225,8 @@ public class GetFiltersAction extends Action<GetFiltersAction.Request, GetFilter
@Inject
public TransportAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, JobProvider jobProvider,
JobManager jobManager, Client client, TransportGetAction transportGetAction,
TransportSearchAction transportSearchAction) {
IndexNameExpressionResolver indexNameExpressionResolver,
TransportGetAction transportGetAction, TransportSearchAction transportSearchAction) {
super(settings, NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, Request::new);
this.transportGetAction = transportGetAction;

View File

@ -156,8 +156,8 @@ public class PutFilterAction extends Action<PutFilterAction.Request, PutFilterAc
@Inject
public TransportAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, JobProvider jobProvider,
JobManager jobManager, Client client, TransportBulkAction transportBulkAction) {
IndexNameExpressionResolver indexNameExpressionResolver,
TransportBulkAction transportBulkAction) {
super(settings, NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, Request::new);
this.transportBulkAction = transportBulkAction;

View File

@ -15,7 +15,6 @@ import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -147,7 +146,7 @@ extends Action<ValidateDetectorAction.Request, ValidateDetectorAction.Response,
public static class TransportAction extends HandledTransportAction<Request, Response> {
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ValidateDetectorAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
Request::new);

View File

@ -15,7 +15,6 @@ import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -152,7 +151,7 @@ extends Action<ValidateJobConfigAction.Request, ValidateJobConfigAction.Response
public static class TransportAction extends HandledTransportAction<Request, Response> {
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ValidateJobConfigAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
Request::new);

View File

@ -36,6 +36,8 @@ import org.elasticsearch.xpack.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.ml.action.UpdateDatafeedAction;
import org.elasticsearch.xpack.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.ml.action.UpdateModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.ValidateDetectorAction;
import org.elasticsearch.xpack.ml.action.ValidateJobConfigAction;
public class MachineLearningClient {
@ -355,4 +357,28 @@ public class MachineLearningClient {
client.execute(UpdateModelSnapshotAction.INSTANCE, request, listener);
return listener;
}
public void validateDetector(ValidateDetectorAction.Request request,
ActionListener<ValidateDetectorAction.Response> listener) {
client.execute(ValidateDetectorAction.INSTANCE, request, listener);
}
public ActionFuture<ValidateDetectorAction.Response> validateDetector(
ValidateDetectorAction.Request request) {
PlainActionFuture<ValidateDetectorAction.Response> listener = PlainActionFuture.newFuture();
client.execute(ValidateDetectorAction.INSTANCE, request, listener);
return listener;
}
public void validateJobConfig(ValidateJobConfigAction.Request request,
ActionListener<ValidateJobConfigAction.Response> listener) {
client.execute(ValidateJobConfigAction.INSTANCE, request, listener);
}
public ActionFuture<ValidateJobConfigAction.Response> validateJobConfig(
ValidateJobConfigAction.Request request) {
PlainActionFuture<ValidateJobConfigAction.Response> listener = PlainActionFuture.newFuture();
client.execute(ValidateJobConfigAction.INSTANCE, request, listener);
return listener;
}
}

View File

@ -6,29 +6,167 @@
package org.elasticsearch.xpack.ml.client;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.XPackClient;
import org.elasticsearch.xpack.ml.action.CloseJobAction;
import org.elasticsearch.xpack.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.ml.action.FlushJobAction;
import org.elasticsearch.xpack.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.ml.action.GetDatafeedsAction;
import org.elasticsearch.xpack.ml.action.GetJobsAction;
import org.elasticsearch.xpack.ml.action.GetModelSnapshotsAction;
import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.action.PostDataAction;
import org.elasticsearch.xpack.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.ml.action.UpdateModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.ValidateDetectorAction;
import org.elasticsearch.xpack.ml.action.ValidateJobConfigAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
public class MLTransportClientIT extends ESXPackSmokeClientTestCase {
public void testMLTransportClient() {
public void testMLTransportClient_JobActions() {
Client client = getClient();
XPackClient xPackClient = new XPackClient(client);
MachineLearningClient mlClient = xPackClient.machineLearning();
String jobId = "ml-transport-client-it-job";
Job.Builder job = createJob(jobId);
PutJobAction.Response putJobResponse = mlClient.putJob(new PutJobAction.Request(job)).actionGet();
assertThat(putJobResponse, notNullValue());
assertThat(putJobResponse.isAcknowledged(), equalTo(true));
GetJobsAction.Response getJobResponse = mlClient.getJobs(new GetJobsAction.Request(jobId)).actionGet();
assertThat(getJobResponse, notNullValue());
assertThat(getJobResponse.getResponse(), notNullValue());
assertThat(getJobResponse.getResponse().count(), equalTo(1L));
// Open job POST data, flush, close and check a result
OpenJobAction.Response openJobResponse = mlClient.openJob(new OpenJobAction.Request(jobId)).actionGet();
assertThat(openJobResponse.isAcknowledged(), equalTo(true));
String content = "{\"time\":1000, \"msg\": \"some categorical message\"}\n" +
"{\"time\":11000, \"msg\": \"some categorical message in the second bucket\"}\n" +
"{\"time\":21000, \"msg\": \"some categorical message in the third bucket\"}\n";
PostDataAction.Request postRequest = new PostDataAction.Request(jobId);
postRequest.setContent(new BytesArray(content), XContentType.JSON);
PostDataAction.Response postResponse = mlClient.postData(postRequest).actionGet();
assertThat(postResponse.getDataCounts(), notNullValue());
assertThat(postResponse.getDataCounts().getInputFieldCount(), equalTo(3L));
FlushJobAction.Response flushResponse = mlClient.flushJob(new FlushJobAction.Request(jobId)).actionGet();
assertThat(flushResponse.isFlushed(), equalTo(true));
CloseJobAction.Response closeResponse = mlClient.closeJob(new CloseJobAction.Request(jobId)).actionGet();
assertThat(closeResponse.isClosed(), equalTo(true));
GetBucketsAction.Response getBucketsResponse = mlClient.getBuckets(new GetBucketsAction.Request(jobId)).actionGet();
assertThat(getBucketsResponse.getBuckets().count(), equalTo(1L));
// Update a model snapshot
GetModelSnapshotsAction.Response getModelSnapshotResponse =
mlClient.getModelSnapshots(new GetModelSnapshotsAction.Request(jobId, null)).actionGet();
assertThat(getModelSnapshotResponse.getPage().count(), equalTo(1L));
String snapshotId = getModelSnapshotResponse.getPage().results().get(0).getSnapshotId();
UpdateModelSnapshotAction.Request updateModelSnapshotRequest = new UpdateModelSnapshotAction.Request(jobId, snapshotId);
updateModelSnapshotRequest.setDescription("Changed description");
UpdateModelSnapshotAction.Response updateModelSnapshotResponse =
mlClient.updateModelSnapshot(updateModelSnapshotRequest).actionGet();
assertThat(updateModelSnapshotResponse.getModel(), notNullValue());
assertThat(updateModelSnapshotResponse.getModel().getDescription(), equalTo("Changed description"));
// and delete the job
DeleteJobAction.Response deleteJobResponse = mlClient.deleteJob(new DeleteJobAction.Request(jobId)).actionGet();
assertThat(deleteJobResponse, notNullValue());
assertThat(deleteJobResponse.isAcknowledged(), equalTo(true));
}
public void testMLTransportClient_ValidateActions() {
Client client = getClient();
XPackClient xPackClient = new XPackClient(client);
MachineLearningClient mlClient = xPackClient.machineLearning();
Detector.Builder detector = new Detector.Builder();
detector.setFunction("count");
ValidateDetectorAction.Request validateDetectorRequest = new ValidateDetectorAction.Request(detector.build());
ValidateDetectorAction.Response validateDetectorResponse = mlClient.validateDetector(validateDetectorRequest).actionGet();
assertThat(validateDetectorResponse.isAcknowledged(), equalTo(true));
Job.Builder job = createJob("ml-transport-client-it-validate-job");
ValidateJobConfigAction.Request validateJobRequest = new ValidateJobConfigAction.Request(job.build(new Date()));
ValidateJobConfigAction.Response validateJobResponse = mlClient.validateJobConfig(validateJobRequest).actionGet();
assertThat(validateJobResponse.isAcknowledged(), equalTo(true));
}
public void testMLTransportClient_DateFeedActions() {
Client client = getClient();
XPackClient xPackClient = new XPackClient(client);
MachineLearningClient mlClient = xPackClient.machineLearning();
String jobId = "ml-transport-client-it-datafeed-job";
Job.Builder job = createJob(jobId);
PutJobAction.Response putJobResponse = mlClient.putJob(new PutJobAction.Request(job)).actionGet();
assertThat(putJobResponse, notNullValue());
assertThat(putJobResponse.isAcknowledged(), equalTo(true));
String datafeedId = "ml-transport-client-it-datafeed";
DatafeedConfig.Builder datafeed = new DatafeedConfig.Builder(datafeedId, jobId);
String datafeedIndex = "ml-transport-client-test";
String datatype = "type-bar";
datafeed.setIndices(Collections.singletonList(datafeedIndex));
datafeed.setTypes(Collections.singletonList("type-bar"));
PutDatafeedAction.Response putDatafeedResponse = mlClient.putDatafeed(new PutDatafeedAction.Request(datafeed.build())).actionGet();
assertThat(putDatafeedResponse.isAcknowledged(), equalTo(true));
GetDatafeedsAction.Response getDatafeedResponse = mlClient.getDatafeeds(new GetDatafeedsAction.Request(datafeedId)).actionGet();
assertThat(getDatafeedResponse.getResponse(), notNullValue());
// Open job before starting the datafeed
OpenJobAction.Response openJobResponse = mlClient.openJob(new OpenJobAction.Request(jobId)).actionGet();
assertThat(openJobResponse.isAcknowledged(), equalTo(true));
// create the index for the data feed
Map<String, Object> source = new HashMap<>();
source.put("time", new Date());
source.put("message", "some message");
client.prepareIndex(datafeedIndex, datatype).setSource(source).get();
StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(datafeedId, new Date().getTime());
StartDatafeedAction.Response startDataFeedResponse = mlClient.startDatafeed(startDatafeedRequest).actionGet();
assertThat(startDataFeedResponse.isAcknowledged(), equalTo(true));
StopDatafeedAction.Response stopDataFeedResponse = mlClient.stopDatafeed(new StopDatafeedAction.Request(datafeedId)).actionGet();
assertThat(stopDataFeedResponse.isStopped(), equalTo(true));
}
private Job.Builder createJob(String jobId) {
Job.Builder job = new Job.Builder();
job.setId("test");
job.setId(jobId);
List<Detector> detectors = new ArrayList<>();
Detector.Builder detector = new Detector.Builder();
@ -36,29 +174,9 @@ public class MLTransportClientIT extends ESXPackSmokeClientTestCase {
detectors.add(detector.build());
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(detectors);
analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(10L));
job.setAnalysisConfig(analysisConfig);
job.setDataDescription(new DataDescription.Builder());
PutJobAction.Response putJobResponse = mlClient
.putJob(new PutJobAction.Request(job))
.actionGet();
assertThat(putJobResponse, notNullValue());
assertThat(putJobResponse.isAcknowledged(), equalTo(true));
GetJobsAction.Response getJobResponse = mlClient.getJobs(new GetJobsAction.Request("test"))
.actionGet();
assertThat(getJobResponse, notNullValue());
assertThat(getJobResponse.getResponse(), notNullValue());
assertThat(getJobResponse.getResponse().count(), equalTo(1L));
DeleteJobAction.Response deleteJobResponse = mlClient
.deleteJob(new DeleteJobAction.Request("test"))
.actionGet();
assertThat(deleteJobResponse, notNullValue());
assertThat(deleteJobResponse.isAcknowledged(), equalTo(true));
return job;
}
}