Remove InternalClient and InternalSecurityClient (#3054)
This change removes the InternalClient and the InternalSecurityClient. These are replaced with usage of the ThreadContext and a transient value, `action.origin`, to indicate which component the request came from. The security code has been updated to look for this value and ensure the request is executed as the proper user. This work comes from #2808 where @s1monw suggested that we do this. While working on this, I came across index template registries and rather than updating them to use the new method, I replaced the ML one with the template upgrade framework so that we could remove this template registry. The watcher template registry is still needed as the template must be updated for rolling upgrades to work (see #2950).
This commit is contained in:
parent
4dd69951f3
commit
8521b2d11e
|
@ -21,6 +21,7 @@ package org.elasticsearch.persistent;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
|
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
|
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
|
||||||
|
import org.elasticsearch.client.Client;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.ClusterStateObserver;
|
import org.elasticsearch.cluster.ClusterStateObserver;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
|
@ -34,20 +35,22 @@ import org.elasticsearch.tasks.Task;
|
||||||
import org.elasticsearch.tasks.TaskId;
|
import org.elasticsearch.tasks.TaskId;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||||
import org.elasticsearch.security.InternalClient;
|
|
||||||
|
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
|
import static org.elasticsearch.ClientHelper.PERSISTENT_TASK_ORIGIN;
|
||||||
|
import static org.elasticsearch.ClientHelper.executeAsyncWithOrigin;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This service is used by persistent actions to propagate changes in the action state and notify about completion
|
* This service is used by persistent actions to propagate changes in the action state and notify about completion
|
||||||
*/
|
*/
|
||||||
public class PersistentTasksService extends AbstractComponent {
|
public class PersistentTasksService extends AbstractComponent {
|
||||||
|
|
||||||
private final InternalClient client;
|
private final Client client;
|
||||||
private final ClusterService clusterService;
|
private final ClusterService clusterService;
|
||||||
private final ThreadPool threadPool;
|
private final ThreadPool threadPool;
|
||||||
|
|
||||||
public PersistentTasksService(Settings settings, ClusterService clusterService, ThreadPool threadPool, InternalClient client) {
|
public PersistentTasksService(Settings settings, ClusterService clusterService, ThreadPool threadPool, Client client) {
|
||||||
super(settings);
|
super(settings);
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.clusterService = clusterService;
|
this.clusterService = clusterService;
|
||||||
|
@ -63,8 +66,8 @@ public class PersistentTasksService extends AbstractComponent {
|
||||||
StartPersistentTaskAction.Request createPersistentActionRequest =
|
StartPersistentTaskAction.Request createPersistentActionRequest =
|
||||||
new StartPersistentTaskAction.Request(taskId, taskName, params);
|
new StartPersistentTaskAction.Request(taskId, taskName, params);
|
||||||
try {
|
try {
|
||||||
client.execute(StartPersistentTaskAction.INSTANCE, createPersistentActionRequest, ActionListener.wrap(
|
executeAsyncWithOrigin(client, PERSISTENT_TASK_ORIGIN, StartPersistentTaskAction.INSTANCE, createPersistentActionRequest,
|
||||||
o -> listener.onResponse((PersistentTask<Params>) o.getTask()), listener::onFailure));
|
ActionListener.wrap(o -> listener.onResponse((PersistentTask<Params>) o.getTask()), listener::onFailure));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
listener.onFailure(e);
|
listener.onFailure(e);
|
||||||
}
|
}
|
||||||
|
@ -77,7 +80,7 @@ public class PersistentTasksService extends AbstractComponent {
|
||||||
ActionListener<PersistentTask<?>> listener) {
|
ActionListener<PersistentTask<?>> listener) {
|
||||||
CompletionPersistentTaskAction.Request restartRequest = new CompletionPersistentTaskAction.Request(taskId, allocationId, failure);
|
CompletionPersistentTaskAction.Request restartRequest = new CompletionPersistentTaskAction.Request(taskId, allocationId, failure);
|
||||||
try {
|
try {
|
||||||
client.execute(CompletionPersistentTaskAction.INSTANCE, restartRequest,
|
executeAsyncWithOrigin(client, PERSISTENT_TASK_ORIGIN, CompletionPersistentTaskAction.INSTANCE, restartRequest,
|
||||||
ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure));
|
ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
listener.onFailure(e);
|
listener.onFailure(e);
|
||||||
|
@ -93,7 +96,8 @@ public class PersistentTasksService extends AbstractComponent {
|
||||||
cancelTasksRequest.setTaskId(new TaskId(localNode.getId(), taskId));
|
cancelTasksRequest.setTaskId(new TaskId(localNode.getId(), taskId));
|
||||||
cancelTasksRequest.setReason("persistent action was removed");
|
cancelTasksRequest.setReason("persistent action was removed");
|
||||||
try {
|
try {
|
||||||
client.admin().cluster().cancelTasks(cancelTasksRequest, listener);
|
executeAsyncWithOrigin(client.threadPool().getThreadContext(), PERSISTENT_TASK_ORIGIN, cancelTasksRequest, listener,
|
||||||
|
client.admin().cluster()::cancelTasks);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
listener.onFailure(e);
|
listener.onFailure(e);
|
||||||
}
|
}
|
||||||
|
@ -109,8 +113,8 @@ public class PersistentTasksService extends AbstractComponent {
|
||||||
UpdatePersistentTaskStatusAction.Request updateStatusRequest =
|
UpdatePersistentTaskStatusAction.Request updateStatusRequest =
|
||||||
new UpdatePersistentTaskStatusAction.Request(taskId, allocationId, status);
|
new UpdatePersistentTaskStatusAction.Request(taskId, allocationId, status);
|
||||||
try {
|
try {
|
||||||
client.execute(UpdatePersistentTaskStatusAction.INSTANCE, updateStatusRequest, ActionListener.wrap(
|
executeAsyncWithOrigin(client, PERSISTENT_TASK_ORIGIN, UpdatePersistentTaskStatusAction.INSTANCE, updateStatusRequest,
|
||||||
o -> listener.onResponse(o.getTask()), listener::onFailure));
|
ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
listener.onFailure(e);
|
listener.onFailure(e);
|
||||||
}
|
}
|
||||||
|
@ -122,8 +126,8 @@ public class PersistentTasksService extends AbstractComponent {
|
||||||
public void cancelPersistentTask(String taskId, ActionListener<PersistentTask<?>> listener) {
|
public void cancelPersistentTask(String taskId, ActionListener<PersistentTask<?>> listener) {
|
||||||
RemovePersistentTaskAction.Request removeRequest = new RemovePersistentTaskAction.Request(taskId);
|
RemovePersistentTaskAction.Request removeRequest = new RemovePersistentTaskAction.Request(taskId);
|
||||||
try {
|
try {
|
||||||
client.execute(RemovePersistentTaskAction.INSTANCE, removeRequest, ActionListener.wrap(o -> listener.onResponse(o.getTask()),
|
executeAsyncWithOrigin(client, PERSISTENT_TASK_ORIGIN, RemovePersistentTaskAction.INSTANCE, removeRequest,
|
||||||
listener::onFailure));
|
ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
listener.onFailure(e);
|
listener.onFailure(e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,7 +64,6 @@ import org.elasticsearch.transport.TransportService;
|
||||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
|
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
|
||||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||||
import org.elasticsearch.security.InternalClient;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -105,8 +104,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
|
||||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
||||||
NamedXContentRegistry xContentRegistry, Environment environment,
|
NamedXContentRegistry xContentRegistry, Environment environment,
|
||||||
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
|
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
|
||||||
InternalClient internalClient = new InternalClient(Settings.EMPTY, threadPool, client);
|
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, threadPool, client);
|
||||||
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, threadPool, internalClient);
|
|
||||||
TestPersistentTasksExecutor testPersistentAction = new TestPersistentTasksExecutor(Settings.EMPTY, clusterService);
|
TestPersistentTasksExecutor testPersistentAction = new TestPersistentTasksExecutor(Settings.EMPTY, clusterService);
|
||||||
PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY,
|
PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY,
|
||||||
Collections.singletonList(testPersistentAction));
|
Collections.singletonList(testPersistentAction));
|
||||||
|
|
Loading…
Reference in New Issue