Move the initialization of persistent tasks from the machine learning level to xpack level.

Today persistent tasks is only usable from machine learning, but others like ccr will need to use it too.
With this change ccr and other will be able to make use of it too.

Original commit: elastic/x-pack-elasticsearch@c90f01d5f6
This commit is contained in:
Martijn van Groningen 2017-11-14 13:50:59 +01:00
parent 245543d5cf
commit 439830890f
2 changed files with 63 additions and 41 deletions

View File

@ -62,14 +62,6 @@ import org.elasticsearch.xpack.action.TransportXPackInfoAction;
import org.elasticsearch.xpack.action.TransportXPackUsageAction; import org.elasticsearch.xpack.action.TransportXPackUsageAction;
import org.elasticsearch.xpack.action.XPackInfoAction; import org.elasticsearch.xpack.action.XPackInfoAction;
import org.elasticsearch.xpack.action.XPackUsageAction; import org.elasticsearch.xpack.action.XPackUsageAction;
import org.elasticsearch.xpack.watcher.common.http.HttpClient;
import org.elasticsearch.xpack.watcher.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.watcher.common.http.HttpSettings;
import org.elasticsearch.xpack.watcher.common.http.auth.HttpAuthFactory;
import org.elasticsearch.xpack.watcher.common.http.auth.HttpAuthRegistry;
import org.elasticsearch.xpack.watcher.common.http.auth.basic.BasicAuth;
import org.elasticsearch.xpack.watcher.common.http.auth.basic.BasicAuthFactory;
import org.elasticsearch.xpack.watcher.common.text.TextTemplateEngine;
import org.elasticsearch.xpack.deprecation.Deprecation; import org.elasticsearch.xpack.deprecation.Deprecation;
import org.elasticsearch.xpack.extensions.XPackExtension; import org.elasticsearch.xpack.extensions.XPackExtension;
import org.elasticsearch.xpack.extensions.XPackExtensionsService; import org.elasticsearch.xpack.extensions.XPackExtensionsService;
@ -81,19 +73,14 @@ import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MachineLearningFeatureSet; import org.elasticsearch.xpack.ml.MachineLearningFeatureSet;
import org.elasticsearch.xpack.monitoring.Monitoring; import org.elasticsearch.xpack.monitoring.Monitoring;
import org.elasticsearch.xpack.monitoring.MonitoringFeatureSet; import org.elasticsearch.xpack.monitoring.MonitoringFeatureSet;
import org.elasticsearch.xpack.watcher.notification.email.Account; import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction;
import org.elasticsearch.xpack.watcher.notification.email.EmailService; import org.elasticsearch.xpack.persistent.PersistentTasksClusterService;
import org.elasticsearch.xpack.watcher.notification.email.attachment.DataAttachmentParser; import org.elasticsearch.xpack.persistent.PersistentTasksExecutor;
import org.elasticsearch.xpack.watcher.notification.email.attachment.EmailAttachmentParser; import org.elasticsearch.xpack.persistent.PersistentTasksExecutorRegistry;
import org.elasticsearch.xpack.watcher.notification.email.attachment.EmailAttachmentsParser; import org.elasticsearch.xpack.persistent.PersistentTasksService;
import org.elasticsearch.xpack.watcher.notification.email.attachment.HttpEmailAttachementParser; import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
import org.elasticsearch.xpack.watcher.notification.email.attachment.ReportingAttachmentParser; import org.elasticsearch.xpack.persistent.StartPersistentTaskAction;
import org.elasticsearch.xpack.watcher.notification.email.support.BodyPartSource; import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction;
import org.elasticsearch.xpack.watcher.notification.hipchat.HipChatService;
import org.elasticsearch.xpack.watcher.notification.jira.JiraService;
import org.elasticsearch.xpack.watcher.notification.pagerduty.PagerDutyAccount;
import org.elasticsearch.xpack.watcher.notification.pagerduty.PagerDutyService;
import org.elasticsearch.xpack.watcher.notification.slack.SlackService;
import org.elasticsearch.xpack.rest.action.RestXPackInfoAction; import org.elasticsearch.xpack.rest.action.RestXPackInfoAction;
import org.elasticsearch.xpack.rest.action.RestXPackUsageAction; import org.elasticsearch.xpack.rest.action.RestXPackUsageAction;
import org.elasticsearch.xpack.security.InternalClient; import org.elasticsearch.xpack.security.InternalClient;
@ -107,6 +94,27 @@ import org.elasticsearch.xpack.ssl.SSLService;
import org.elasticsearch.xpack.upgrade.Upgrade; import org.elasticsearch.xpack.upgrade.Upgrade;
import org.elasticsearch.xpack.watcher.Watcher; import org.elasticsearch.xpack.watcher.Watcher;
import org.elasticsearch.xpack.watcher.WatcherFeatureSet; import org.elasticsearch.xpack.watcher.WatcherFeatureSet;
import org.elasticsearch.xpack.watcher.common.http.HttpClient;
import org.elasticsearch.xpack.watcher.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.watcher.common.http.HttpSettings;
import org.elasticsearch.xpack.watcher.common.http.auth.HttpAuthFactory;
import org.elasticsearch.xpack.watcher.common.http.auth.HttpAuthRegistry;
import org.elasticsearch.xpack.watcher.common.http.auth.basic.BasicAuth;
import org.elasticsearch.xpack.watcher.common.http.auth.basic.BasicAuthFactory;
import org.elasticsearch.xpack.watcher.common.text.TextTemplateEngine;
import org.elasticsearch.xpack.watcher.notification.email.Account;
import org.elasticsearch.xpack.watcher.notification.email.EmailService;
import org.elasticsearch.xpack.watcher.notification.email.attachment.DataAttachmentParser;
import org.elasticsearch.xpack.watcher.notification.email.attachment.EmailAttachmentParser;
import org.elasticsearch.xpack.watcher.notification.email.attachment.EmailAttachmentsParser;
import org.elasticsearch.xpack.watcher.notification.email.attachment.HttpEmailAttachementParser;
import org.elasticsearch.xpack.watcher.notification.email.attachment.ReportingAttachmentParser;
import org.elasticsearch.xpack.watcher.notification.email.support.BodyPartSource;
import org.elasticsearch.xpack.watcher.notification.hipchat.HipChatService;
import org.elasticsearch.xpack.watcher.notification.jira.JiraService;
import org.elasticsearch.xpack.watcher.notification.pagerduty.PagerDutyAccount;
import org.elasticsearch.xpack.watcher.notification.pagerduty.PagerDutyService;
import org.elasticsearch.xpack.watcher.notification.slack.SlackService;
import javax.security.auth.DestroyFailedException; import javax.security.auth.DestroyFailedException;
import java.io.IOException; import java.io.IOException;
@ -316,14 +324,24 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
components.addAll(watcher.createComponents(getClock(), scriptService, internalClient, licenseState, components.addAll(watcher.createComponents(getClock(), scriptService, internalClient, licenseState,
httpClient, httpTemplateParser, threadPool, clusterService, cryptoService, xContentRegistry, components)); httpClient, httpTemplateParser, threadPool, clusterService, cryptoService, xContentRegistry, components));
PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, internalClient);
components.addAll(machineLearning.createComponents(internalClient, clusterService, threadPool, xContentRegistry)); components.addAll(machineLearning.createComponents(internalClient, clusterService, threadPool, xContentRegistry,
persistentTasksService));
List<PersistentTasksExecutor<?>> tasksExecutors = new ArrayList<>();
tasksExecutors.addAll(machineLearning.createPersistentTasksExecutors(clusterService));
components.addAll(logstash.createComponents(internalClient, clusterService)); components.addAll(logstash.createComponents(internalClient, clusterService));
components.addAll(upgrade.createComponents(client, clusterService, threadPool, resourceWatcherService, components.addAll(upgrade.createComponents(client, clusterService, threadPool, resourceWatcherService,
scriptService, xContentRegistry)); scriptService, xContentRegistry));
PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(settings, tasksExecutors);
PersistentTasksClusterService persistentTasksClusterService = new PersistentTasksClusterService(settings, registry, clusterService);
components.add(persistentTasksClusterService);
components.add(persistentTasksService);
components.add(registry);
// just create the reloader as it will pull all of the loaded ssl configurations and start watching them // just create the reloader as it will pull all of the loaded ssl configurations and start watching them
new SSLConfigurationReloader(settings, env, sslService, resourceWatcherService); new SSLConfigurationReloader(settings, env, sslService, resourceWatcherService);
return components; return components;
@ -443,6 +461,10 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> actions = new ArrayList<>(); List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> actions = new ArrayList<>();
actions.add(new ActionHandler<>(XPackInfoAction.INSTANCE, TransportXPackInfoAction.class)); actions.add(new ActionHandler<>(XPackInfoAction.INSTANCE, TransportXPackInfoAction.class));
actions.add(new ActionHandler<>(XPackUsageAction.INSTANCE, TransportXPackUsageAction.class)); actions.add(new ActionHandler<>(XPackUsageAction.INSTANCE, TransportXPackUsageAction.class));
actions.add(new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class));
actions.add(new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class));
actions.add(new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class));
actions.add(new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class));
actions.addAll(licensing.getActions()); actions.addAll(licensing.getActions());
actions.addAll(monitoring.getActions()); actions.addAll(monitoring.getActions());
actions.addAll(security.getActions()); actions.addAll(security.getActions());

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.ml; package org.elasticsearch.xpack.ml;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
@ -128,16 +129,11 @@ import org.elasticsearch.xpack.ml.rest.results.RestGetOverallBucketsAction;
import org.elasticsearch.xpack.ml.rest.results.RestGetRecordsAction; import org.elasticsearch.xpack.ml.rest.results.RestGetRecordsAction;
import org.elasticsearch.xpack.ml.rest.validate.RestValidateDetectorAction; import org.elasticsearch.xpack.ml.rest.validate.RestValidateDetectorAction;
import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction; import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction;
import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction;
import org.elasticsearch.xpack.persistent.PersistentTaskParams; import org.elasticsearch.xpack.persistent.PersistentTaskParams;
import org.elasticsearch.xpack.persistent.PersistentTasksClusterService;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksExecutorRegistry; import org.elasticsearch.xpack.persistent.PersistentTasksExecutor;
import org.elasticsearch.xpack.persistent.PersistentTasksNodeService; import org.elasticsearch.xpack.persistent.PersistentTasksNodeService;
import org.elasticsearch.xpack.persistent.PersistentTasksService; import org.elasticsearch.xpack.persistent.PersistentTasksService;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
import org.elasticsearch.xpack.persistent.StartPersistentTaskAction;
import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction;
import org.elasticsearch.xpack.security.InternalClient; import org.elasticsearch.xpack.security.InternalClient;
import java.io.IOException; import java.io.IOException;
@ -178,6 +174,9 @@ public class MachineLearning implements ActionPlugin {
private final boolean tribeNode; private final boolean tribeNode;
private final boolean tribeNodeClient; private final boolean tribeNodeClient;
private final SetOnce<AutodetectProcessManager> autodetectProcessManager = new SetOnce<>();
private final SetOnce<DatafeedManager> datafeedManager = new SetOnce<>();
public MachineLearning(Settings settings, Environment env, XPackLicenseState licenseState) { public MachineLearning(Settings settings, Environment env, XPackLicenseState licenseState) {
this.settings = settings; this.settings = settings;
this.env = env; this.env = env;
@ -296,7 +295,7 @@ public class MachineLearning implements ActionPlugin {
} }
public Collection<Object> createComponents(InternalClient internalClient, ClusterService clusterService, ThreadPool threadPool, public Collection<Object> createComponents(InternalClient internalClient, ClusterService clusterService, ThreadPool threadPool,
NamedXContentRegistry xContentRegistry) { NamedXContentRegistry xContentRegistry, PersistentTasksService persistentTasksService) {
if (enabled == false || transportClientMode || tribeNode || tribeNodeClient) { if (enabled == false || transportClientMode || tribeNode || tribeNodeClient) {
return emptyList(); return emptyList();
} }
@ -337,17 +336,14 @@ public class MachineLearning implements ActionPlugin {
AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(settings, internalClient, threadPool, AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(settings, internalClient, threadPool,
jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, xContentRegistry, auditor); normalizerFactory, xContentRegistry, auditor);
PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, internalClient); this.autodetectProcessManager.set(autodetectProcessManager);
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(internalClient, jobProvider, auditor, System::currentTimeMillis); DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(internalClient, jobProvider, auditor, System::currentTimeMillis);
DatafeedManager datafeedManager = new DatafeedManager(threadPool, internalClient, clusterService, datafeedJobBuilder, DatafeedManager datafeedManager = new DatafeedManager(threadPool, internalClient, clusterService, datafeedJobBuilder,
System::currentTimeMillis, auditor, persistentTasksService); System::currentTimeMillis, auditor, persistentTasksService);
this.datafeedManager.set(datafeedManager);
MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(env, clusterService, datafeedManager, autodetectProcessManager); MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(env, clusterService, datafeedManager, autodetectProcessManager);
InvalidLicenseEnforcer invalidLicenseEnforcer = InvalidLicenseEnforcer invalidLicenseEnforcer =
new InvalidLicenseEnforcer(settings, licenseState, threadPool, datafeedManager, autodetectProcessManager); new InvalidLicenseEnforcer(settings, licenseState, threadPool, datafeedManager, autodetectProcessManager);
PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Arrays.asList(
new OpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager),
new StartDatafeedAction.StartDatafeedPersistentTasksExecutor(settings, datafeedManager)
));
return Arrays.asList( return Arrays.asList(
mlLifeCycleService, mlLifeCycleService,
@ -358,15 +354,23 @@ public class MachineLearning implements ActionPlugin {
new MlInitializationService(settings, threadPool, clusterService, internalClient), new MlInitializationService(settings, threadPool, clusterService, internalClient),
jobDataCountsPersister, jobDataCountsPersister,
datafeedManager, datafeedManager,
persistentTasksService,
persistentTasksExecutorRegistry,
new PersistentTasksClusterService(Settings.EMPTY, persistentTasksExecutorRegistry, clusterService),
auditor, auditor,
invalidLicenseEnforcer, invalidLicenseEnforcer,
new MlAssignmentNotifier(settings, auditor, clusterService) new MlAssignmentNotifier(settings, auditor, clusterService)
); );
} }
public List<PersistentTasksExecutor<?>> createPersistentTasksExecutors(ClusterService clusterService) {
if (enabled == false || transportClientMode || tribeNode || tribeNodeClient) {
return emptyList();
}
return Arrays.asList(
new OpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager.get()),
new StartDatafeedAction.StartDatafeedPersistentTasksExecutor(settings, datafeedManager.get())
);
}
public Collection<Module> nodeModules() { public Collection<Module> nodeModules() {
List<Module> modules = new ArrayList<>(); List<Module> modules = new ArrayList<>();
@ -466,10 +470,6 @@ public class MachineLearning implements ActionPlugin {
new ActionHandler<>(StopDatafeedAction.INSTANCE, StopDatafeedAction.TransportAction.class), new ActionHandler<>(StopDatafeedAction.INSTANCE, StopDatafeedAction.TransportAction.class),
new ActionHandler<>(IsolateDatafeedAction.INSTANCE, IsolateDatafeedAction.TransportAction.class), new ActionHandler<>(IsolateDatafeedAction.INSTANCE, IsolateDatafeedAction.TransportAction.class),
new ActionHandler<>(DeleteModelSnapshotAction.INSTANCE, DeleteModelSnapshotAction.TransportAction.class), new ActionHandler<>(DeleteModelSnapshotAction.INSTANCE, DeleteModelSnapshotAction.TransportAction.class),
new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class),
new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class),
new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class),
new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class),
new ActionHandler<>(UpdateProcessAction.INSTANCE, UpdateProcessAction.TransportAction.class), new ActionHandler<>(UpdateProcessAction.INSTANCE, UpdateProcessAction.TransportAction.class),
new ActionHandler<>(DeleteExpiredDataAction.INSTANCE, DeleteExpiredDataAction.TransportAction.class), new ActionHandler<>(DeleteExpiredDataAction.INSTANCE, DeleteExpiredDataAction.TransportAction.class),
new ActionHandler<>(ForecastJobAction.INSTANCE, ForecastJobAction.TransportAction.class) new ActionHandler<>(ForecastJobAction.INSTANCE, ForecastJobAction.TransportAction.class)