From 8521b2d11ea8713411090f8d317ee77840b0b2b8 Mon Sep 17 00:00:00 2001 From: Jay Modi Date: Wed, 22 Nov 2017 08:35:18 -0700 Subject: [PATCH] Remove InternalClient and InternalSecurityClient (#3054) This change removes the InternalClient and the InternalSecurityClient. These are replaced with usage of the ThreadContext and a transient value, `action.origin`, to indicate which component the request came from. The security code has been updated to look for this value and ensure the request is executed as the proper user. This work comes from #2808 where @s1monw suggested that we do this. While working on this, I came across index template registries and rather than updating them to use the new method, I replaced the ML one with the template upgrade framework so that we could remove this template registry. The watcher template registry is still needed as the template must be updated for rolling upgrades to work (see #2950). --- .../persistent/PersistentTasksService.java | 26 +++++++++++-------- .../persistent/TestPersistentTasksPlugin.java | 4 +-- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java index 220f55aff33..4958e618340 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java @@ -21,6 +21,7 @@ package org.elasticsearch.persistent; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -34,20 +35,22 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; -import org.elasticsearch.security.InternalClient; import java.util.function.Predicate; +import static org.elasticsearch.ClientHelper.PERSISTENT_TASK_ORIGIN; +import static org.elasticsearch.ClientHelper.executeAsyncWithOrigin; + /** * This service is used by persistent actions to propagate changes in the action state and notify about completion */ public class PersistentTasksService extends AbstractComponent { - private final InternalClient client; + private final Client client; private final ClusterService clusterService; private final ThreadPool threadPool; - public PersistentTasksService(Settings settings, ClusterService clusterService, ThreadPool threadPool, InternalClient client) { + public PersistentTasksService(Settings settings, ClusterService clusterService, ThreadPool threadPool, Client client) { super(settings); this.client = client; this.clusterService = clusterService; @@ -63,8 +66,8 @@ public class PersistentTasksService extends AbstractComponent { StartPersistentTaskAction.Request createPersistentActionRequest = new StartPersistentTaskAction.Request(taskId, taskName, params); try { - client.execute(StartPersistentTaskAction.INSTANCE, createPersistentActionRequest, ActionListener.wrap( - o -> listener.onResponse((PersistentTask) o.getTask()), listener::onFailure)); + executeAsyncWithOrigin(client, PERSISTENT_TASK_ORIGIN, StartPersistentTaskAction.INSTANCE, createPersistentActionRequest, + ActionListener.wrap(o -> listener.onResponse((PersistentTask) o.getTask()), listener::onFailure)); } catch (Exception e) { listener.onFailure(e); } @@ -77,7 +80,7 @@ public class PersistentTasksService extends AbstractComponent { ActionListener> listener) { CompletionPersistentTaskAction.Request restartRequest = new CompletionPersistentTaskAction.Request(taskId, allocationId, failure); try { - client.execute(CompletionPersistentTaskAction.INSTANCE, restartRequest, + executeAsyncWithOrigin(client, PERSISTENT_TASK_ORIGIN, CompletionPersistentTaskAction.INSTANCE, restartRequest, ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure)); } catch (Exception e) { listener.onFailure(e); @@ -93,7 +96,8 @@ public class PersistentTasksService extends AbstractComponent { cancelTasksRequest.setTaskId(new TaskId(localNode.getId(), taskId)); cancelTasksRequest.setReason("persistent action was removed"); try { - client.admin().cluster().cancelTasks(cancelTasksRequest, listener); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), PERSISTENT_TASK_ORIGIN, cancelTasksRequest, listener, + client.admin().cluster()::cancelTasks); } catch (Exception e) { listener.onFailure(e); } @@ -109,8 +113,8 @@ public class PersistentTasksService extends AbstractComponent { UpdatePersistentTaskStatusAction.Request updateStatusRequest = new UpdatePersistentTaskStatusAction.Request(taskId, allocationId, status); try { - client.execute(UpdatePersistentTaskStatusAction.INSTANCE, updateStatusRequest, ActionListener.wrap( - o -> listener.onResponse(o.getTask()), listener::onFailure)); + executeAsyncWithOrigin(client, PERSISTENT_TASK_ORIGIN, UpdatePersistentTaskStatusAction.INSTANCE, updateStatusRequest, + ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure)); } catch (Exception e) { listener.onFailure(e); } @@ -122,8 +126,8 @@ public class PersistentTasksService extends AbstractComponent { public void cancelPersistentTask(String taskId, ActionListener> listener) { RemovePersistentTaskAction.Request removeRequest = new RemovePersistentTaskAction.Request(taskId); try { - client.execute(RemovePersistentTaskAction.INSTANCE, removeRequest, ActionListener.wrap(o -> listener.onResponse(o.getTask()), - listener::onFailure)); + executeAsyncWithOrigin(client, PERSISTENT_TASK_ORIGIN, RemovePersistentTaskAction.INSTANCE, removeRequest, + ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure)); } catch (Exception e) { listener.onFailure(e); } diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java index b024359e209..5204a64eacb 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java @@ -64,7 +64,6 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; -import org.elasticsearch.security.InternalClient; import java.io.IOException; import java.util.ArrayList; @@ -105,8 +104,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { ResourceWatcherService resourceWatcherService, ScriptService scriptService, NamedXContentRegistry xContentRegistry, Environment environment, NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) { - InternalClient internalClient = new InternalClient(Settings.EMPTY, threadPool, client); - PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, threadPool, internalClient); + PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, threadPool, client); TestPersistentTasksExecutor testPersistentAction = new TestPersistentTasksExecutor(Settings.EMPTY, clusterService); PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Collections.singletonList(testPersistentAction));