From 439830890faa3d68e00b6ccfa9ed573ca4736582 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 14 Nov 2017 13:50:59 +0100 Subject: [PATCH] Move the initialization of persistent tasks from the machine learning level to xpack level. Today persistent tasks is only usable from machine learning, but others like ccr will need to use it too. With this change ccr and other will be able to make use of it too. Original commit: elastic/x-pack-elasticsearch@c90f01d5f6e48eb675f81e0ca51731294576ba64 --- .../org/elasticsearch/xpack/XPackPlugin.java | 66 ++++++++++++------- .../xpack/ml/MachineLearning.java | 38 +++++------ 2 files changed, 63 insertions(+), 41 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/XPackPlugin.java b/plugin/src/main/java/org/elasticsearch/xpack/XPackPlugin.java index dee875efd6d..9a23390550f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/XPackPlugin.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/XPackPlugin.java @@ -62,14 +62,6 @@ import org.elasticsearch.xpack.action.TransportXPackInfoAction; import org.elasticsearch.xpack.action.TransportXPackUsageAction; import org.elasticsearch.xpack.action.XPackInfoAction; import org.elasticsearch.xpack.action.XPackUsageAction; -import org.elasticsearch.xpack.watcher.common.http.HttpClient; -import org.elasticsearch.xpack.watcher.common.http.HttpRequestTemplate; -import org.elasticsearch.xpack.watcher.common.http.HttpSettings; -import org.elasticsearch.xpack.watcher.common.http.auth.HttpAuthFactory; -import org.elasticsearch.xpack.watcher.common.http.auth.HttpAuthRegistry; -import org.elasticsearch.xpack.watcher.common.http.auth.basic.BasicAuth; -import org.elasticsearch.xpack.watcher.common.http.auth.basic.BasicAuthFactory; -import org.elasticsearch.xpack.watcher.common.text.TextTemplateEngine; import org.elasticsearch.xpack.deprecation.Deprecation; import org.elasticsearch.xpack.extensions.XPackExtension; import org.elasticsearch.xpack.extensions.XPackExtensionsService; @@ -81,19 +73,14 @@ import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MachineLearningFeatureSet; import org.elasticsearch.xpack.monitoring.Monitoring; import org.elasticsearch.xpack.monitoring.MonitoringFeatureSet; -import org.elasticsearch.xpack.watcher.notification.email.Account; -import org.elasticsearch.xpack.watcher.notification.email.EmailService; -import org.elasticsearch.xpack.watcher.notification.email.attachment.DataAttachmentParser; -import org.elasticsearch.xpack.watcher.notification.email.attachment.EmailAttachmentParser; -import org.elasticsearch.xpack.watcher.notification.email.attachment.EmailAttachmentsParser; -import org.elasticsearch.xpack.watcher.notification.email.attachment.HttpEmailAttachementParser; -import org.elasticsearch.xpack.watcher.notification.email.attachment.ReportingAttachmentParser; -import org.elasticsearch.xpack.watcher.notification.email.support.BodyPartSource; -import org.elasticsearch.xpack.watcher.notification.hipchat.HipChatService; -import org.elasticsearch.xpack.watcher.notification.jira.JiraService; -import org.elasticsearch.xpack.watcher.notification.pagerduty.PagerDutyAccount; -import org.elasticsearch.xpack.watcher.notification.pagerduty.PagerDutyService; -import org.elasticsearch.xpack.watcher.notification.slack.SlackService; +import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction; +import org.elasticsearch.xpack.persistent.PersistentTasksClusterService; +import org.elasticsearch.xpack.persistent.PersistentTasksExecutor; +import org.elasticsearch.xpack.persistent.PersistentTasksExecutorRegistry; +import org.elasticsearch.xpack.persistent.PersistentTasksService; +import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; +import org.elasticsearch.xpack.persistent.StartPersistentTaskAction; +import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction; import org.elasticsearch.xpack.rest.action.RestXPackInfoAction; import org.elasticsearch.xpack.rest.action.RestXPackUsageAction; import org.elasticsearch.xpack.security.InternalClient; @@ -107,6 +94,27 @@ import org.elasticsearch.xpack.ssl.SSLService; import org.elasticsearch.xpack.upgrade.Upgrade; import org.elasticsearch.xpack.watcher.Watcher; import org.elasticsearch.xpack.watcher.WatcherFeatureSet; +import org.elasticsearch.xpack.watcher.common.http.HttpClient; +import org.elasticsearch.xpack.watcher.common.http.HttpRequestTemplate; +import org.elasticsearch.xpack.watcher.common.http.HttpSettings; +import org.elasticsearch.xpack.watcher.common.http.auth.HttpAuthFactory; +import org.elasticsearch.xpack.watcher.common.http.auth.HttpAuthRegistry; +import org.elasticsearch.xpack.watcher.common.http.auth.basic.BasicAuth; +import org.elasticsearch.xpack.watcher.common.http.auth.basic.BasicAuthFactory; +import org.elasticsearch.xpack.watcher.common.text.TextTemplateEngine; +import org.elasticsearch.xpack.watcher.notification.email.Account; +import org.elasticsearch.xpack.watcher.notification.email.EmailService; +import org.elasticsearch.xpack.watcher.notification.email.attachment.DataAttachmentParser; +import org.elasticsearch.xpack.watcher.notification.email.attachment.EmailAttachmentParser; +import org.elasticsearch.xpack.watcher.notification.email.attachment.EmailAttachmentsParser; +import org.elasticsearch.xpack.watcher.notification.email.attachment.HttpEmailAttachementParser; +import org.elasticsearch.xpack.watcher.notification.email.attachment.ReportingAttachmentParser; +import org.elasticsearch.xpack.watcher.notification.email.support.BodyPartSource; +import org.elasticsearch.xpack.watcher.notification.hipchat.HipChatService; +import org.elasticsearch.xpack.watcher.notification.jira.JiraService; +import org.elasticsearch.xpack.watcher.notification.pagerduty.PagerDutyAccount; +import org.elasticsearch.xpack.watcher.notification.pagerduty.PagerDutyService; +import org.elasticsearch.xpack.watcher.notification.slack.SlackService; import javax.security.auth.DestroyFailedException; import java.io.IOException; @@ -316,14 +324,24 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I components.addAll(watcher.createComponents(getClock(), scriptService, internalClient, licenseState, httpClient, httpTemplateParser, threadPool, clusterService, cryptoService, xContentRegistry, components)); + PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, internalClient); - components.addAll(machineLearning.createComponents(internalClient, clusterService, threadPool, xContentRegistry)); + components.addAll(machineLearning.createComponents(internalClient, clusterService, threadPool, xContentRegistry, + persistentTasksService)); + List> tasksExecutors = new ArrayList<>(); + tasksExecutors.addAll(machineLearning.createPersistentTasksExecutors(clusterService)); components.addAll(logstash.createComponents(internalClient, clusterService)); components.addAll(upgrade.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService, xContentRegistry)); + PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(settings, tasksExecutors); + PersistentTasksClusterService persistentTasksClusterService = new PersistentTasksClusterService(settings, registry, clusterService); + components.add(persistentTasksClusterService); + components.add(persistentTasksService); + components.add(registry); + // just create the reloader as it will pull all of the loaded ssl configurations and start watching them new SSLConfigurationReloader(settings, env, sslService, resourceWatcherService); return components; @@ -443,6 +461,10 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I List> actions = new ArrayList<>(); actions.add(new ActionHandler<>(XPackInfoAction.INSTANCE, TransportXPackInfoAction.class)); actions.add(new ActionHandler<>(XPackUsageAction.INSTANCE, TransportXPackUsageAction.class)); + actions.add(new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class)); + actions.add(new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class)); + actions.add(new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class)); + actions.add(new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class)); actions.addAll(licensing.getActions()); actions.addAll(monitoring.getActions()); actions.addAll(security.getActions()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index b83821313ed..dff6191a122 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; @@ -128,16 +129,11 @@ import org.elasticsearch.xpack.ml.rest.results.RestGetOverallBucketsAction; import org.elasticsearch.xpack.ml.rest.results.RestGetRecordsAction; import org.elasticsearch.xpack.ml.rest.validate.RestValidateDetectorAction; import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction; -import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction; import org.elasticsearch.xpack.persistent.PersistentTaskParams; -import org.elasticsearch.xpack.persistent.PersistentTasksClusterService; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.xpack.persistent.PersistentTasksExecutorRegistry; +import org.elasticsearch.xpack.persistent.PersistentTasksExecutor; import org.elasticsearch.xpack.persistent.PersistentTasksNodeService; import org.elasticsearch.xpack.persistent.PersistentTasksService; -import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; -import org.elasticsearch.xpack.persistent.StartPersistentTaskAction; -import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction; import org.elasticsearch.xpack.security.InternalClient; import java.io.IOException; @@ -178,6 +174,9 @@ public class MachineLearning implements ActionPlugin { private final boolean tribeNode; private final boolean tribeNodeClient; + private final SetOnce autodetectProcessManager = new SetOnce<>(); + private final SetOnce datafeedManager = new SetOnce<>(); + public MachineLearning(Settings settings, Environment env, XPackLicenseState licenseState) { this.settings = settings; this.env = env; @@ -296,7 +295,7 @@ public class MachineLearning implements ActionPlugin { } public Collection createComponents(InternalClient internalClient, ClusterService clusterService, ThreadPool threadPool, - NamedXContentRegistry xContentRegistry) { + NamedXContentRegistry xContentRegistry, PersistentTasksService persistentTasksService) { if (enabled == false || transportClientMode || tribeNode || tribeNodeClient) { return emptyList(); } @@ -337,17 +336,14 @@ public class MachineLearning implements ActionPlugin { AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(settings, internalClient, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, xContentRegistry, auditor); - PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, internalClient); + this.autodetectProcessManager.set(autodetectProcessManager); DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(internalClient, jobProvider, auditor, System::currentTimeMillis); DatafeedManager datafeedManager = new DatafeedManager(threadPool, internalClient, clusterService, datafeedJobBuilder, System::currentTimeMillis, auditor, persistentTasksService); + this.datafeedManager.set(datafeedManager); MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(env, clusterService, datafeedManager, autodetectProcessManager); InvalidLicenseEnforcer invalidLicenseEnforcer = new InvalidLicenseEnforcer(settings, licenseState, threadPool, datafeedManager, autodetectProcessManager); - PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Arrays.asList( - new OpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager), - new StartDatafeedAction.StartDatafeedPersistentTasksExecutor(settings, datafeedManager) - )); return Arrays.asList( mlLifeCycleService, @@ -358,15 +354,23 @@ public class MachineLearning implements ActionPlugin { new MlInitializationService(settings, threadPool, clusterService, internalClient), jobDataCountsPersister, datafeedManager, - persistentTasksService, - persistentTasksExecutorRegistry, - new PersistentTasksClusterService(Settings.EMPTY, persistentTasksExecutorRegistry, clusterService), auditor, invalidLicenseEnforcer, new MlAssignmentNotifier(settings, auditor, clusterService) ); } + public List> createPersistentTasksExecutors(ClusterService clusterService) { + if (enabled == false || transportClientMode || tribeNode || tribeNodeClient) { + return emptyList(); + } + + return Arrays.asList( + new OpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager.get()), + new StartDatafeedAction.StartDatafeedPersistentTasksExecutor(settings, datafeedManager.get()) + ); + } + public Collection nodeModules() { List modules = new ArrayList<>(); @@ -466,10 +470,6 @@ public class MachineLearning implements ActionPlugin { new ActionHandler<>(StopDatafeedAction.INSTANCE, StopDatafeedAction.TransportAction.class), new ActionHandler<>(IsolateDatafeedAction.INSTANCE, IsolateDatafeedAction.TransportAction.class), new ActionHandler<>(DeleteModelSnapshotAction.INSTANCE, DeleteModelSnapshotAction.TransportAction.class), - new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class), - new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class), - new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class), - new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class), new ActionHandler<>(UpdateProcessAction.INSTANCE, UpdateProcessAction.TransportAction.class), new ActionHandler<>(DeleteExpiredDataAction.INSTANCE, DeleteExpiredDataAction.TransportAction.class), new ActionHandler<>(ForecastJobAction.INSTANCE, ForecastJobAction.TransportAction.class)