Make persistent tasks work.

Made persistent tasks executors pluggable.
This commit is contained in:
Martijn van Groningen 2018-01-25 21:40:47 +01:00
parent 07e727c769
commit 592eedbf49
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
4 changed files with 73 additions and 22 deletions

View File

@ -312,9 +312,12 @@ import org.elasticsearch.rest.action.search.RestExplainAction;
import org.elasticsearch.rest.action.search.RestMultiSearchAction;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.rest.action.search.RestSearchScrollAction;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.usage.UsageService;
import org.elasticsearch.persistent.CompletionPersistentTaskAction;
import org.elasticsearch.persistent.RemovePersistentTaskAction;
import org.elasticsearch.persistent.StartPersistentTaskAction;
import org.elasticsearch.persistent.UpdatePersistentTaskStatusAction;
import java.util.ArrayList;
import java.util.Collections;
@ -507,6 +510,12 @@ public class ActionModule extends AbstractModule {
actionPlugins.stream().flatMap(p -> p.getActions().stream()).forEach(actions::register);
// Persistent tasks:
actions.register(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class);
actions.register(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class);
actions.register(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class);
actions.register(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class);
return unmodifiableMap(actions.getRegistry());
}

View File

@ -117,6 +117,7 @@ import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.MetaDataUpgrader;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.PersistentTaskPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.RepositoryPlugin;
@ -139,6 +140,10 @@ import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.usage.UsageService;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.persistent.PersistentTasksClusterService;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.persistent.PersistentTasksExecutorRegistry;
import org.elasticsearch.persistent.PersistentTasksService;
import java.io.BufferedWriter;
import java.io.Closeable;
@ -461,6 +466,17 @@ public class Node implements Closeable {
threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),
responseCollectorService);
final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService
.filterPlugins(PersistentTaskPlugin.class).stream()
.map(p -> p.getPersistentTasksExecutor(clusterService))
.flatMap(List::stream)
.collect(toList());
final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(settings, tasksExecutors);
final PersistentTasksClusterService persistentTasksClusterService =
new PersistentTasksClusterService(settings, registry, clusterService);
final PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, client);
modules.add(b -> {
b.bind(Node.class).toInstance(this);
b.bind(NodeService.class).toInstance(nodeService);
@ -504,6 +520,9 @@ public class Node implements Closeable {
}
httpBind.accept(b);
pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
b.bind(PersistentTasksService.class).toInstance(persistentTasksService);
b.bind(PersistentTasksClusterService.class).toInstance(persistentTasksClusterService);
b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry);
}
);
injector = modules.createInjector();

View File

@ -0,0 +1,39 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugins;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import java.util.Collections;
import java.util.List;
/**
* Plugin for registering persistent tasks executors.
*/
public interface PersistentTaskPlugin {
/**
* Returns additional persistent tasks executors added by this plugin.
*/
default List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService) {
return Collections.emptyList();
}
}

View File

@ -54,6 +54,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.PersistentTaskPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.tasks.Task;
@ -87,33 +88,16 @@ import static org.junit.Assert.fail;
/**
* A plugin that adds a test persistent task.
*/
public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, PersistentTaskPlugin {
@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Arrays.asList(
new ActionHandler<>(TestTaskAction.INSTANCE, TransportTestTaskAction.class),
new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class),
new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class),
new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class),
new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class)
);
return Collections.singletonList(new ActionHandler<>(TestTaskAction.INSTANCE, TransportTestTaskAction.class));
}
@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry, Environment environment,
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, threadPool, client);
TestPersistentTasksExecutor testPersistentAction = new TestPersistentTasksExecutor(Settings.EMPTY, clusterService);
PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY,
Collections.singletonList(testPersistentAction));
return Arrays.asList(
persistentTasksService,
persistentTasksExecutorRegistry,
new PersistentTasksClusterService(Settings.EMPTY, persistentTasksExecutorRegistry, clusterService)
);
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService) {
return Collections.singletonList(new TestPersistentTasksExecutor(Settings.EMPTY, clusterService));
}
@Override