From 592eedbf495e95b4728b370b705a2409b41af6b9 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 25 Jan 2018 21:40:47 +0100 Subject: [PATCH] Make persistent tasks work. Made persistent tasks executors pluggable. --- .../elasticsearch/action/ActionModule.java | 11 +++++- .../java/org/elasticsearch/node/Node.java | 19 +++++++++ .../plugins/PersistentTaskPlugin.java | 39 +++++++++++++++++++ .../persistent/TestPersistentTasksPlugin.java | 26 +++---------- 4 files changed, 73 insertions(+), 22 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/plugins/PersistentTaskPlugin.java diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 872c217f980..51abf6b0222 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -312,9 +312,12 @@ import org.elasticsearch.rest.action.search.RestExplainAction; import org.elasticsearch.rest.action.search.RestMultiSearchAction; import org.elasticsearch.rest.action.search.RestSearchAction; import org.elasticsearch.rest.action.search.RestSearchScrollAction; -import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.usage.UsageService; +import org.elasticsearch.persistent.CompletionPersistentTaskAction; +import org.elasticsearch.persistent.RemovePersistentTaskAction; +import org.elasticsearch.persistent.StartPersistentTaskAction; +import org.elasticsearch.persistent.UpdatePersistentTaskStatusAction; import java.util.ArrayList; import java.util.Collections; @@ -507,6 +510,12 @@ public class ActionModule extends AbstractModule { actionPlugins.stream().flatMap(p -> p.getActions().stream()).forEach(actions::register); + // Persistent tasks: + actions.register(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class); + actions.register(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class); + actions.register(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class); + actions.register(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class); + return unmodifiableMap(actions.getRegistry()); } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index d0470ea2c42..15c0428d259 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -117,6 +117,7 @@ import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.plugins.MetaDataUpgrader; import org.elasticsearch.plugins.NetworkPlugin; +import org.elasticsearch.plugins.PersistentTaskPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.RepositoryPlugin; @@ -139,6 +140,10 @@ import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportService; import org.elasticsearch.usage.UsageService; import org.elasticsearch.watcher.ResourceWatcherService; +import org.elasticsearch.persistent.PersistentTasksClusterService; +import org.elasticsearch.persistent.PersistentTasksExecutor; +import org.elasticsearch.persistent.PersistentTasksExecutorRegistry; +import org.elasticsearch.persistent.PersistentTasksService; import java.io.BufferedWriter; import java.io.Closeable; @@ -461,6 +466,17 @@ public class Node implements Closeable { threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(), responseCollectorService); + final List> tasksExecutors = pluginsService + .filterPlugins(PersistentTaskPlugin.class).stream() + .map(p -> p.getPersistentTasksExecutor(clusterService)) + .flatMap(List::stream) + .collect(toList()); + + final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(settings, tasksExecutors); + final PersistentTasksClusterService persistentTasksClusterService = + new PersistentTasksClusterService(settings, registry, clusterService); + final PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, client); + modules.add(b -> { b.bind(Node.class).toInstance(this); b.bind(NodeService.class).toInstance(nodeService); @@ -504,6 +520,9 @@ public class Node implements Closeable { } httpBind.accept(b); pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p)); + b.bind(PersistentTasksService.class).toInstance(persistentTasksService); + b.bind(PersistentTasksClusterService.class).toInstance(persistentTasksClusterService); + b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry); } ); injector = modules.createInjector(); diff --git a/server/src/main/java/org/elasticsearch/plugins/PersistentTaskPlugin.java b/server/src/main/java/org/elasticsearch/plugins/PersistentTaskPlugin.java new file mode 100644 index 00000000000..c402b907ffd --- /dev/null +++ b/server/src/main/java/org/elasticsearch/plugins/PersistentTaskPlugin.java @@ -0,0 +1,39 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.plugins; + +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.persistent.PersistentTasksExecutor; + +import java.util.Collections; +import java.util.List; + +/** + * Plugin for registering persistent tasks executors. + */ +public interface PersistentTaskPlugin { + + /** + * Returns additional persistent tasks executors added by this plugin. + */ + default List> getPersistentTasksExecutor(ClusterService clusterService) { + return Collections.emptyList(); + } + +} diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java index ba8e2337fd7..ca3e840028c 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java @@ -54,6 +54,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.PersistentTaskPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptService; import org.elasticsearch.tasks.Task; @@ -87,33 +88,16 @@ import static org.junit.Assert.fail; /** * A plugin that adds a test persistent task. */ -public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { +public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, PersistentTaskPlugin { @Override public List> getActions() { - return Arrays.asList( - new ActionHandler<>(TestTaskAction.INSTANCE, TransportTestTaskAction.class), - new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class), - new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class), - new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class), - new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class) - ); + return Collections.singletonList(new ActionHandler<>(TestTaskAction.INSTANCE, TransportTestTaskAction.class)); } @Override - public Collection createComponents(Client client, ClusterService clusterService, ThreadPool threadPool, - ResourceWatcherService resourceWatcherService, ScriptService scriptService, - NamedXContentRegistry xContentRegistry, Environment environment, - NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) { - PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, threadPool, client); - TestPersistentTasksExecutor testPersistentAction = new TestPersistentTasksExecutor(Settings.EMPTY, clusterService); - PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY, - Collections.singletonList(testPersistentAction)); - return Arrays.asList( - persistentTasksService, - persistentTasksExecutorRegistry, - new PersistentTasksClusterService(Settings.EMPTY, persistentTasksExecutorRegistry, clusterService) - ); + public List> getPersistentTasksExecutor(ClusterService clusterService) { + return Collections.singletonList(new TestPersistentTasksExecutor(Settings.EMPTY, clusterService)); } @Override