diff --git a/plugin/src/main/java/org/elasticsearch/xpack/XPackPlugin.java b/plugin/src/main/java/org/elasticsearch/xpack/XPackPlugin.java index dee875efd6d..9a23390550f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/XPackPlugin.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/XPackPlugin.java @@ -62,14 +62,6 @@ import org.elasticsearch.xpack.action.TransportXPackInfoAction; import org.elasticsearch.xpack.action.TransportXPackUsageAction; import org.elasticsearch.xpack.action.XPackInfoAction; import org.elasticsearch.xpack.action.XPackUsageAction; -import org.elasticsearch.xpack.watcher.common.http.HttpClient; -import org.elasticsearch.xpack.watcher.common.http.HttpRequestTemplate; -import org.elasticsearch.xpack.watcher.common.http.HttpSettings; -import org.elasticsearch.xpack.watcher.common.http.auth.HttpAuthFactory; -import org.elasticsearch.xpack.watcher.common.http.auth.HttpAuthRegistry; -import org.elasticsearch.xpack.watcher.common.http.auth.basic.BasicAuth; -import org.elasticsearch.xpack.watcher.common.http.auth.basic.BasicAuthFactory; -import org.elasticsearch.xpack.watcher.common.text.TextTemplateEngine; import org.elasticsearch.xpack.deprecation.Deprecation; import org.elasticsearch.xpack.extensions.XPackExtension; import org.elasticsearch.xpack.extensions.XPackExtensionsService; @@ -81,19 +73,14 @@ import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MachineLearningFeatureSet; import org.elasticsearch.xpack.monitoring.Monitoring; import org.elasticsearch.xpack.monitoring.MonitoringFeatureSet; -import org.elasticsearch.xpack.watcher.notification.email.Account; -import org.elasticsearch.xpack.watcher.notification.email.EmailService; -import org.elasticsearch.xpack.watcher.notification.email.attachment.DataAttachmentParser; -import org.elasticsearch.xpack.watcher.notification.email.attachment.EmailAttachmentParser; -import org.elasticsearch.xpack.watcher.notification.email.attachment.EmailAttachmentsParser; -import org.elasticsearch.xpack.watcher.notification.email.attachment.HttpEmailAttachementParser; -import org.elasticsearch.xpack.watcher.notification.email.attachment.ReportingAttachmentParser; -import org.elasticsearch.xpack.watcher.notification.email.support.BodyPartSource; -import org.elasticsearch.xpack.watcher.notification.hipchat.HipChatService; -import org.elasticsearch.xpack.watcher.notification.jira.JiraService; -import org.elasticsearch.xpack.watcher.notification.pagerduty.PagerDutyAccount; -import org.elasticsearch.xpack.watcher.notification.pagerduty.PagerDutyService; -import org.elasticsearch.xpack.watcher.notification.slack.SlackService; +import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction; +import org.elasticsearch.xpack.persistent.PersistentTasksClusterService; +import org.elasticsearch.xpack.persistent.PersistentTasksExecutor; +import org.elasticsearch.xpack.persistent.PersistentTasksExecutorRegistry; +import org.elasticsearch.xpack.persistent.PersistentTasksService; +import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; +import org.elasticsearch.xpack.persistent.StartPersistentTaskAction; +import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction; import org.elasticsearch.xpack.rest.action.RestXPackInfoAction; import org.elasticsearch.xpack.rest.action.RestXPackUsageAction; import org.elasticsearch.xpack.security.InternalClient; @@ -107,6 +94,27 @@ import org.elasticsearch.xpack.ssl.SSLService; import org.elasticsearch.xpack.upgrade.Upgrade; import org.elasticsearch.xpack.watcher.Watcher; import org.elasticsearch.xpack.watcher.WatcherFeatureSet; +import org.elasticsearch.xpack.watcher.common.http.HttpClient; +import org.elasticsearch.xpack.watcher.common.http.HttpRequestTemplate; +import org.elasticsearch.xpack.watcher.common.http.HttpSettings; +import org.elasticsearch.xpack.watcher.common.http.auth.HttpAuthFactory; +import org.elasticsearch.xpack.watcher.common.http.auth.HttpAuthRegistry; +import org.elasticsearch.xpack.watcher.common.http.auth.basic.BasicAuth; +import org.elasticsearch.xpack.watcher.common.http.auth.basic.BasicAuthFactory; +import org.elasticsearch.xpack.watcher.common.text.TextTemplateEngine; +import org.elasticsearch.xpack.watcher.notification.email.Account; +import org.elasticsearch.xpack.watcher.notification.email.EmailService; +import org.elasticsearch.xpack.watcher.notification.email.attachment.DataAttachmentParser; +import org.elasticsearch.xpack.watcher.notification.email.attachment.EmailAttachmentParser; +import org.elasticsearch.xpack.watcher.notification.email.attachment.EmailAttachmentsParser; +import org.elasticsearch.xpack.watcher.notification.email.attachment.HttpEmailAttachementParser; +import org.elasticsearch.xpack.watcher.notification.email.attachment.ReportingAttachmentParser; +import org.elasticsearch.xpack.watcher.notification.email.support.BodyPartSource; +import org.elasticsearch.xpack.watcher.notification.hipchat.HipChatService; +import org.elasticsearch.xpack.watcher.notification.jira.JiraService; +import org.elasticsearch.xpack.watcher.notification.pagerduty.PagerDutyAccount; +import org.elasticsearch.xpack.watcher.notification.pagerduty.PagerDutyService; +import org.elasticsearch.xpack.watcher.notification.slack.SlackService; import javax.security.auth.DestroyFailedException; import java.io.IOException; @@ -316,14 +324,24 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I components.addAll(watcher.createComponents(getClock(), scriptService, internalClient, licenseState, httpClient, httpTemplateParser, threadPool, clusterService, cryptoService, xContentRegistry, components)); + PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, internalClient); - components.addAll(machineLearning.createComponents(internalClient, clusterService, threadPool, xContentRegistry)); + components.addAll(machineLearning.createComponents(internalClient, clusterService, threadPool, xContentRegistry, + persistentTasksService)); + List> tasksExecutors = new ArrayList<>(); + tasksExecutors.addAll(machineLearning.createPersistentTasksExecutors(clusterService)); components.addAll(logstash.createComponents(internalClient, clusterService)); components.addAll(upgrade.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService, xContentRegistry)); + PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(settings, tasksExecutors); + PersistentTasksClusterService persistentTasksClusterService = new PersistentTasksClusterService(settings, registry, clusterService); + components.add(persistentTasksClusterService); + components.add(persistentTasksService); + components.add(registry); + // just create the reloader as it will pull all of the loaded ssl configurations and start watching them new SSLConfigurationReloader(settings, env, sslService, resourceWatcherService); return components; @@ -443,6 +461,10 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I List> actions = new ArrayList<>(); actions.add(new ActionHandler<>(XPackInfoAction.INSTANCE, TransportXPackInfoAction.class)); actions.add(new ActionHandler<>(XPackUsageAction.INSTANCE, TransportXPackUsageAction.class)); + actions.add(new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class)); + actions.add(new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class)); + actions.add(new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class)); + actions.add(new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class)); actions.addAll(licensing.getActions()); actions.addAll(monitoring.getActions()); actions.addAll(security.getActions()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index b83821313ed..dff6191a122 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; @@ -128,16 +129,11 @@ import org.elasticsearch.xpack.ml.rest.results.RestGetOverallBucketsAction; import org.elasticsearch.xpack.ml.rest.results.RestGetRecordsAction; import org.elasticsearch.xpack.ml.rest.validate.RestValidateDetectorAction; import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction; -import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction; import org.elasticsearch.xpack.persistent.PersistentTaskParams; -import org.elasticsearch.xpack.persistent.PersistentTasksClusterService; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.xpack.persistent.PersistentTasksExecutorRegistry; +import org.elasticsearch.xpack.persistent.PersistentTasksExecutor; import org.elasticsearch.xpack.persistent.PersistentTasksNodeService; import org.elasticsearch.xpack.persistent.PersistentTasksService; -import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; -import org.elasticsearch.xpack.persistent.StartPersistentTaskAction; -import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction; import org.elasticsearch.xpack.security.InternalClient; import java.io.IOException; @@ -178,6 +174,9 @@ public class MachineLearning implements ActionPlugin { private final boolean tribeNode; private final boolean tribeNodeClient; + private final SetOnce autodetectProcessManager = new SetOnce<>(); + private final SetOnce datafeedManager = new SetOnce<>(); + public MachineLearning(Settings settings, Environment env, XPackLicenseState licenseState) { this.settings = settings; this.env = env; @@ -296,7 +295,7 @@ public class MachineLearning implements ActionPlugin { } public Collection createComponents(InternalClient internalClient, ClusterService clusterService, ThreadPool threadPool, - NamedXContentRegistry xContentRegistry) { + NamedXContentRegistry xContentRegistry, PersistentTasksService persistentTasksService) { if (enabled == false || transportClientMode || tribeNode || tribeNodeClient) { return emptyList(); } @@ -337,17 +336,14 @@ public class MachineLearning implements ActionPlugin { AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(settings, internalClient, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, xContentRegistry, auditor); - PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, internalClient); + this.autodetectProcessManager.set(autodetectProcessManager); DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(internalClient, jobProvider, auditor, System::currentTimeMillis); DatafeedManager datafeedManager = new DatafeedManager(threadPool, internalClient, clusterService, datafeedJobBuilder, System::currentTimeMillis, auditor, persistentTasksService); + this.datafeedManager.set(datafeedManager); MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(env, clusterService, datafeedManager, autodetectProcessManager); InvalidLicenseEnforcer invalidLicenseEnforcer = new InvalidLicenseEnforcer(settings, licenseState, threadPool, datafeedManager, autodetectProcessManager); - PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Arrays.asList( - new OpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager), - new StartDatafeedAction.StartDatafeedPersistentTasksExecutor(settings, datafeedManager) - )); return Arrays.asList( mlLifeCycleService, @@ -358,15 +354,23 @@ public class MachineLearning implements ActionPlugin { new MlInitializationService(settings, threadPool, clusterService, internalClient), jobDataCountsPersister, datafeedManager, - persistentTasksService, - persistentTasksExecutorRegistry, - new PersistentTasksClusterService(Settings.EMPTY, persistentTasksExecutorRegistry, clusterService), auditor, invalidLicenseEnforcer, new MlAssignmentNotifier(settings, auditor, clusterService) ); } + public List> createPersistentTasksExecutors(ClusterService clusterService) { + if (enabled == false || transportClientMode || tribeNode || tribeNodeClient) { + return emptyList(); + } + + return Arrays.asList( + new OpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager.get()), + new StartDatafeedAction.StartDatafeedPersistentTasksExecutor(settings, datafeedManager.get()) + ); + } + public Collection nodeModules() { List modules = new ArrayList<>(); @@ -466,10 +470,6 @@ public class MachineLearning implements ActionPlugin { new ActionHandler<>(StopDatafeedAction.INSTANCE, StopDatafeedAction.TransportAction.class), new ActionHandler<>(IsolateDatafeedAction.INSTANCE, IsolateDatafeedAction.TransportAction.class), new ActionHandler<>(DeleteModelSnapshotAction.INSTANCE, DeleteModelSnapshotAction.TransportAction.class), - new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class), - new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class), - new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class), - new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class), new ActionHandler<>(UpdateProcessAction.INSTANCE, UpdateProcessAction.TransportAction.class), new ActionHandler<>(DeleteExpiredDataAction.INSTANCE, DeleteExpiredDataAction.TransportAction.class), new ActionHandler<>(ForecastJobAction.INSTANCE, ForecastJobAction.TransportAction.class)