Tasks: Only require task permissions (#35667)

Right now using the `GET /_tasks/<taskid>` API and causing a task to opt
in to saving its result after being completed requires permissions on
the `.tasks` index. When we built this we thought that that was fine,
but we've since moved towards not leaking details like "persisting task
results after the task is completed is done by saving them into an index
named `.tasks`." A more modern way of doing this would be to save the
tasks into the index "under the hood" and to have APIs to manage the
saved tasks. This is the first step down that road: it drops the
requirement to have permissions to interact with the `.tasks` index when
fetching task statuses and when persisting statuses beyond the lifetime
of the task.

In particular, this moves the concept of the "origin" of an action into
a more prominent place in the Elasticsearch server. The origin of an
action is ignored by the server, but the security plugin uses the origin
to make requests on behalf of a user in such a way that the user need
not have permissions to perform these actions. It *can* be made to be
fairly precise. More specifically, we can create an internal user just
for the tasks API that just has permission to interact with the `.tasks`
index. This change doesn't do that, instead, it uses the ubiquitus
"xpack" user which has most permissions because it is simpler. Adding
the tasks user is something I'd like to get to in a follow up change.

Instead, the majority of this change is about moving the "origin"
concept from the security portion of x-pack into the server. This should
allow any code to use the origin. To keep the change managable I've also
opted to deprecate rather than remove the "origin" helpers in the
security code. Removing them is almost entirely mechanical and I'd like
to that in a follow up as well.

Relates to #35573
This commit is contained in:
Nik Everett 2018-11-28 09:28:27 -05:00 committed by GitHub
parent 51a7dc54ec
commit 0588dad80b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 421 additions and 173 deletions

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.Action;
* Action for retrieving a list of currently running tasks
*/
public class GetTaskAction extends Action<GetTaskResponse> {
public static final String TASKS_ORIGIN = "tasks";
public static final GetTaskAction INSTANCE = new GetTaskAction();
public static final String NAME = "cluster:monitor/task/get";

View File

@ -28,6 +28,7 @@ import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
@ -51,6 +52,7 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
import static org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction.waitForCompletionTimeout;
/**
@ -77,7 +79,7 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
this.threadPool = threadPool;
this.clusterService = clusterService;
this.transportService = transportService;
this.client = client;
this.client = new OriginSettingClient(client, TASKS_ORIGIN);
this.xContentRegistry = xContentRegistry;
}
@ -210,6 +212,7 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
GetRequest get = new GetRequest(TaskResultsService.TASK_INDEX, TaskResultsService.TASK_TYPE,
request.getTaskId().toString());
get.setParentTask(clusterService.localNode().getId(), thisTask.getId());
client.get(get, new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {

View File

@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import java.util.function.Supplier;
/**
* A {@linkplain Client} that sends requests with the
* {@link ThreadContext#stashWithOrigin origin} set to a particular
* value and calls its {@linkplain ActionListener} in its original
* {@link ThreadContext}.
*/
public final class OriginSettingClient extends FilterClient {
private final String origin;
public OriginSettingClient(Client in, String origin) {
super(in);
this.origin = origin;
}
@Override
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
final Supplier<ThreadContext.StoredContext> supplier = in().threadPool().getThreadContext().newRestorableContext(false);
try (ThreadContext.StoredContext ignore = in().threadPool().getThreadContext().stashWithOrigin(origin)) {
super.doExecute(action, request, new ContextPreservingActionListener<>(supplier, listener));
}
}
}

View File

@ -22,6 +22,8 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
@ -85,6 +87,12 @@ public final class ThreadContext implements Closeable, Writeable {
public static final String PREFIX = "request.headers";
public static final Setting<Settings> DEFAULT_HEADERS_SETTING = Setting.groupSetting(PREFIX + ".", Property.NodeScope);
/**
* Name for the {@link #stashWithOrigin origin} attribute.
*/
public static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin";
private static final Logger logger = LogManager.getLogger(ThreadContext.class);
private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct();
private final Map<String, String> defaultHeader;
@ -119,7 +127,7 @@ public final class ThreadContext implements Closeable, Writeable {
/**
* Removes the current context and resets a default context. The removed context can be
* restored when closing the returned {@link StoredContext}
* restored by closing the returned {@link StoredContext}.
*/
public StoredContext stashContext() {
final ThreadContextStruct context = threadLocal.get();
@ -127,6 +135,31 @@ public final class ThreadContext implements Closeable, Writeable {
return () -> threadLocal.set(context);
}
/**
* Removes the current context and resets a default context marked with as
* originating from the supplied string. The removed context can be
* restored by closing the returned {@link StoredContext}. Callers should
* be careful to save the current context before calling this method and
* restore it any listeners, likely with
* {@link ContextPreservingActionListener}. Use {@link OriginSettingClient}
* which can be used to do this automatically.
* <p>
* Without security the origin is ignored, but security uses it to authorize
* actions that are made up of many sub-actions. These actions call
* {@link #stashWithOrigin} before performing on behalf of a user that
* should be allowed even if the user doesn't have permission to perform
* those actions on their own.
* <p>
* For example, a user might not have permission to GET from the tasks index
* but the tasks API will perform a get on their behalf using this method
* if it can't find the task in memory.
*/
public StoredContext stashWithOrigin(String origin) {
final ThreadContext.StoredContext storedContext = stashContext();
putTransient(ACTION_ORIGIN_TRANSIENT_NAME, origin);
return storedContext;
}
/**
* Removes the current context and resets a new context that contains a merge of the current headers and the given headers.
* The removed context can be restored when closing the returned {@link StoredContext}. The merge strategy is that headers

View File

@ -25,21 +25,19 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.function.Predicate;
import java.util.function.Supplier;
/**
* This service is used by persistent tasks and allocated persistent tasks to communicate changes
@ -50,7 +48,6 @@ public class PersistentTasksService {
private static final Logger logger = LogManager.getLogger(PersistentTasksService.class);
private static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin";
private static final String PERSISTENT_TASK_ORIGIN = "persistent_tasks";
private final Client client;
@ -58,7 +55,7 @@ public class PersistentTasksService {
private final ThreadPool threadPool;
public PersistentTasksService(ClusterService clusterService, ThreadPool threadPool, Client client) {
this.client = client;
this.client = new OriginSettingClient(client, PERSISTENT_TASK_ORIGIN);
this.clusterService = clusterService;
this.threadPool = threadPool;
}
@ -98,12 +95,7 @@ public class PersistentTasksService {
request.setTaskId(new TaskId(clusterService.localNode().getId(), taskId));
request.setReason(reason);
try {
final ThreadContext threadContext = client.threadPool().getThreadContext();
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, PERSISTENT_TASK_ORIGIN)) {
client.admin().cluster().cancelTasks(request, new ContextPreservingActionListener<>(supplier, listener));
}
client.admin().cluster().cancelTasks(request, listener);
} catch (Exception e) {
listener.onFailure(e);
}
@ -140,14 +132,8 @@ public class PersistentTasksService {
private <Req extends ActionRequest, Resp extends PersistentTaskResponse>
void execute(final Req request, final Action<Resp> action, final ActionListener<PersistentTask<?>> listener) {
try {
final ThreadContext threadContext = client.threadPool().getThreadContext();
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, PERSISTENT_TASK_ORIGIN)) {
client.execute(action, request,
new ContextPreservingActionListener<>(supplier,
ActionListener.wrap(r -> listener.onResponse(r.getTask()), listener::onFailure)));
}
client.execute(action, request,
ActionListener.wrap(r -> listener.onResponse(r.getTask()), listener::onFailure));
} catch (Exception e) {
listener.onFailure(e);
}
@ -233,10 +219,4 @@ public class PersistentTasksService {
onFailure(new IllegalStateException("Timed out when waiting for persistent task after " + timeout));
}
}
public static ThreadContext.StoredContext stashWithOrigin(ThreadContext threadContext, String origin) {
final ThreadContext.StoredContext storedContext = threadContext.stashContext();
threadContext.putTransient(ACTION_ORIGIN_TRANSIENT_NAME, origin);
return storedContext;
}
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -50,6 +51,8 @@ import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
/**
* Service that can store task results.
*/
@ -73,7 +76,7 @@ public class TaskResultsService {
@Inject
public TaskResultsService(Client client, ClusterService clusterService) {
this.client = client;
this.client = new OriginSettingClient(client, TASKS_ORIGIN);
this.clusterService = clusterService;
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
@ -34,7 +35,6 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeAction;
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchAction;
@ -85,7 +85,6 @@ import static java.util.Collections.singleton;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
@ -725,12 +724,6 @@ public class TasksIT extends ESIntegTestCase {
}
public void testTaskStoringSuccesfulResult() throws Exception {
// Randomly create an empty index to make sure the type is created automatically
if (randomBoolean()) {
logger.info("creating an empty results index with custom settings");
assertAcked(client().admin().indices().prepareCreate(TaskResultsService.TASK_INDEX));
}
registerTaskManageListeners(TestTaskPlugin.TestTaskAction.NAME); // we need this to get task id of the process
// Start non-blocking test task
@ -743,23 +736,20 @@ public class TasksIT extends ESIntegTestCase {
TaskInfo taskInfo = events.get(0);
TaskId taskId = taskInfo.getTaskId();
GetResponse resultDoc = client()
.prepareGet(TaskResultsService.TASK_INDEX, TaskResultsService.TASK_TYPE, taskId.toString()).get();
assertTrue(resultDoc.isExists());
TaskResult taskResult = client().admin().cluster()
.getTask(new GetTaskRequest().setTaskId(taskId)).get().getTask();
assertTrue(taskResult.isCompleted());
assertNull(taskResult.getError());
Map<String, Object> source = resultDoc.getSource();
@SuppressWarnings("unchecked")
Map<String, Object> task = (Map<String, Object>) source.get("task");
assertEquals(taskInfo.getTaskId().getNodeId(), task.get("node"));
assertEquals(taskInfo.getAction(), task.get("action"));
assertEquals(Long.toString(taskInfo.getId()), task.get("id").toString());
@SuppressWarnings("unchecked")
Map<String, Object> result = (Map<String, Object>) source.get("response");
assertEquals(taskInfo.getTaskId(), taskResult.getTask().getTaskId());
assertEquals(taskInfo.getType(), taskResult.getTask().getType());
assertEquals(taskInfo.getAction(), taskResult.getTask().getAction());
assertEquals(taskInfo.getDescription(), taskResult.getTask().getDescription());
assertEquals(taskInfo.getStartTime(), taskResult.getTask().getStartTime());
assertEquals(taskInfo.getHeaders(), taskResult.getTask().getHeaders());
Map<?, ?> result = taskResult.getResponseAsMap();
assertEquals("0", result.get("failure_count").toString());
assertNull(source.get("failure"));
assertNoFailures(client().admin().indices().prepareRefresh(TaskResultsService.TASK_INDEX).get());
SearchResponse searchResponse = client().prepareSearch(TaskResultsService.TASK_INDEX)
@ -797,25 +787,21 @@ public class TasksIT extends ESIntegTestCase {
TaskInfo failedTaskInfo = events.get(0);
TaskId failedTaskId = failedTaskInfo.getTaskId();
GetResponse failedResultDoc = client()
.prepareGet(TaskResultsService.TASK_INDEX, TaskResultsService.TASK_TYPE, failedTaskId.toString())
.get();
assertTrue(failedResultDoc.isExists());
TaskResult taskResult = client().admin().cluster()
.getTask(new GetTaskRequest().setTaskId(failedTaskId)).get().getTask();
assertTrue(taskResult.isCompleted());
assertNull(taskResult.getResponse());
Map<String, Object> source = failedResultDoc.getSource();
@SuppressWarnings("unchecked")
Map<String, Object> task = (Map<String, Object>) source.get("task");
assertEquals(failedTaskInfo.getTaskId().getNodeId(), task.get("node"));
assertEquals(failedTaskInfo.getAction(), task.get("action"));
assertEquals(Long.toString(failedTaskInfo.getId()), task.get("id").toString());
@SuppressWarnings("unchecked")
Map<String, Object> error = (Map<String, Object>) source.get("error");
assertEquals(failedTaskInfo.getTaskId(), taskResult.getTask().getTaskId());
assertEquals(failedTaskInfo.getType(), taskResult.getTask().getType());
assertEquals(failedTaskInfo.getAction(), taskResult.getTask().getAction());
assertEquals(failedTaskInfo.getDescription(), taskResult.getTask().getDescription());
assertEquals(failedTaskInfo.getStartTime(), taskResult.getTask().getStartTime());
assertEquals(failedTaskInfo.getHeaders(), taskResult.getTask().getHeaders());
Map<?, ?> error = (Map<?, ?>) taskResult.getErrorAsMap();
assertEquals("Simulating operation failure", error.get("reason"));
assertEquals("illegal_state_exception", error.get("type"));
assertNull(source.get("result"));
GetTaskResponse getResponse = expectFinishedTask(failedTaskId);
assertNull(getResponse.getTask().getResponse());
assertEquals(error, getResponse.getTask().getErrorAsMap());

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
@ -40,18 +41,28 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import java.io.IOException;
import java.util.ArrayList;
@ -61,13 +72,15 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
import static org.elasticsearch.test.ESTestCase.awaitBusy;
/**
* A plugin that adds a cancellable blocking test task of integration testing of the task manager.
*/
public class TestTaskPlugin extends Plugin implements ActionPlugin {
public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugin {
@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
@ -80,6 +93,16 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin {
return Collections.singleton("Custom-Task-Header");
}
/**
* Intercept transport requests to verify that all of the ones that should
* have the origin set <strong>do</strong> have the origin set and the ones
* that should not have the origin set <strong>do not</strong> have it set.
*/
@Override
public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) {
return Collections.singletonList(new OriginAssertingInterceptor(threadContext));
}
static class TestTask extends CancellableTask {
private volatile boolean blocked = true;
@ -469,4 +492,70 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin {
}
}
private static class OriginAssertingInterceptor implements TransportInterceptor {
private final ThreadContext threadContext;
private OriginAssertingInterceptor(ThreadContext threadContext) {
this.threadContext = threadContext;
}
@Override
public AsyncSender interceptSender(AsyncSender sender) {
return new AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(
Transport.Connection connection, String action, TransportRequest request,
TransportRequestOptions options, TransportResponseHandler<T> handler) {
if (action.startsWith("indices:data/write/bulk[s]")) {
/*
* We can't reason about these requests because
* *sometimes* they should have the origin, if they are
* running on the node that stores the task. But
* sometimes they won't be and in that case they don't
* need the origin. Either way, the interesting work is
* done by checking that the main bulk request
* (without the [s] part) has the origin.
*/
sender.sendRequest(connection, action, request, options, handler);
return;
}
String expectedOrigin = shouldHaveOrigin(action, request) ? TASKS_ORIGIN : null;
String actualOrigin = threadContext.getTransient(ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME);
if (Objects.equals(expectedOrigin, actualOrigin)) {
sender.sendRequest(connection, action, request, options, handler);
return;
}
handler.handleException(new TransportException("should have origin of ["
+ expectedOrigin + "] but was [" + actualOrigin + "] action was ["
+ action + "][" + request + "]"));
}
};
}
private boolean shouldHaveOrigin(String action, TransportRequest request) {
if (false == action.startsWith("indices:")) {
/*
* The Tasks API never uses origin with non-indices actions.
*/
return false;
}
if ( action.startsWith("indices:admin/refresh")
|| action.startsWith("indices:data/read/search")) {
/*
* The test refreshes and searches to count the number of tasks
* in the index and the Tasks API never does either.
*/
return false;
}
if (false == (request instanceof IndicesRequest)) {
return false;
}
IndicesRequest ir = (IndicesRequest) request;
/*
* When the API Tasks API makes an indices request it only every
* targets the .tasks index. Other requests come from the tests.
*/
return Arrays.equals(new String[] {".tasks"}, ir.indices());
}
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient;
public class OriginSettingClientTests extends ESTestCase {
public void testSetsParentId() {
String origin = randomAlphaOfLength(7);
/*
* This mock will do nothing but verify that origin is set in the
* thread context before executing the action.
*/
NoOpClient mock = new NoOpClient(getTestName()) {
@Override
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
assertEquals(origin, threadPool().getThreadContext().getTransient(ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME));
super.doExecute(action, request, listener);
}
};
try (OriginSettingClient client = new OriginSettingClient(mock, origin)) {
// All of these should have the origin set
client.bulk(new BulkRequest());
client.search(new SearchRequest());
client.clearScroll(new ClearScrollRequest());
ThreadContext threadContext = client.threadPool().getThreadContext();
client.bulk(new BulkRequest(), listenerThatAssertsOriginNotSet(threadContext));
client.search(new SearchRequest(), listenerThatAssertsOriginNotSet(threadContext));
client.clearScroll(new ClearScrollRequest(), listenerThatAssertsOriginNotSet(threadContext));
}
}
private <T> ActionListener<T> listenerThatAssertsOriginNotSet(ThreadContext threadContext) {
return ActionListener.wrap(
r -> {
assertNull(threadContext.getTransient(ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME));
},
e -> {
fail("didn't expect to fail but: " + e);
});
}
}

View File

@ -55,6 +55,31 @@ public class ThreadContextTests extends ESTestCase {
assertEquals("1", threadContext.getHeader("default"));
}
public void testStashWithOrigin() {
final String origin = randomAlphaOfLengthBetween(4, 16);
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
final boolean setOtherValues = randomBoolean();
if (setOtherValues) {
threadContext.putTransient("foo", "bar");
threadContext.putHeader("foo", "bar");
}
assertNull(threadContext.getTransient(ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME));
try (ThreadContext.StoredContext storedContext = threadContext.stashWithOrigin(origin)) {
assertEquals(origin, threadContext.getTransient(ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME));
assertNull(threadContext.getTransient("foo"));
assertNull(threadContext.getTransient("bar"));
}
assertNull(threadContext.getTransient(ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME));
if (setOtherValues) {
assertEquals("bar", threadContext.getTransient("foo"));
assertEquals("bar", threadContext.getHeader("foo"));
}
}
public void testStashAndMerge() {
Settings build = Settings.builder().put("request.headers.default", "1").build();
ThreadContext threadContext = new ThreadContext(build);

View File

@ -22,6 +22,7 @@ package org.elasticsearch.persistent;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
@ -231,7 +232,9 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
public void testTaskCancellation() {
AtomicLong capturedTaskId = new AtomicLong();
AtomicReference<ActionListener<CancelTasksResponse>> capturedListener = new AtomicReference<>();
PersistentTasksService persistentTasksService = new PersistentTasksService(null, null, null) {
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
PersistentTasksService persistentTasksService = new PersistentTasksService(null, null, client) {
@Override
void sendCancelRequest(final long taskId, final String reason, final ActionListener<CancelTasksResponse> listener) {
capturedTaskId.set(taskId);

View File

@ -12,7 +12,7 @@ import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.FilterClient;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.xpack.core.security.authc.AuthenticationField;
@ -36,7 +36,12 @@ public final class ClientHelper {
public static final Set<String> SECURITY_HEADER_FILTERS = Sets.newHashSet(AuthenticationServiceField.RUN_AS_USER_HEADER,
AuthenticationField.AUTHENTICATION_KEY);
public static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin";
/**
* .
* @deprecated use ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME
*/
@Deprecated
public static final String ACTION_ORIGIN_TRANSIENT_NAME = ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME;
public static final String SECURITY_ORIGIN = "security";
public static final String WATCHER_ORIGIN = "watcher";
public static final String ML_ORIGIN = "ml";
@ -50,18 +55,20 @@ public final class ClientHelper {
/**
* Stashes the current context and sets the origin in the current context. The original context is returned as a stored context
* @deprecated use ThreadContext.stashWithOrigin
*/
@Deprecated
public static ThreadContext.StoredContext stashWithOrigin(ThreadContext threadContext, String origin) {
final ThreadContext.StoredContext storedContext = threadContext.stashContext();
threadContext.putTransient(ACTION_ORIGIN_TRANSIENT_NAME, origin);
return storedContext;
return threadContext.stashWithOrigin(origin);
}
/**
* Returns a client that will always set the appropriate origin and ensure the proper context is restored by listeners
* @deprecated use {@link OriginSettingClient} instead
*/
@Deprecated
public static Client clientWithOrigin(Client client, String origin) {
return new ClientWithOrigin(client, origin);
return new OriginSettingClient(client, origin);
}
/**
@ -165,24 +172,4 @@ public final class ClientHelper {
threadContext.copyHeaders(headers.entrySet());
return storedContext;
}
private static final class ClientWithOrigin extends FilterClient {
private final String origin;
private ClientWithOrigin(Client in, String origin) {
super(in);
this.origin = origin;
}
@Override
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
final Supplier<ThreadContext.StoredContext> supplier = in().threadPool().getThreadContext().newRestorableContext(false);
try (ThreadContext.StoredContext ignore = in().threadPool().getThreadContext().stashContext()) {
in().threadPool().getThreadContext().putTransient(ACTION_ORIGIN_TRANSIENT_NAME, origin);
super.doExecute(action, request, new ContextPreservingActionListener<>(supplier, listener));
}
}
}
}

View File

@ -118,9 +118,7 @@ public class ReservedRolesStore implements BiConsumer<Set<String>, ActionListene
RoleDescriptor.IndicesPrivileges.builder()
.indices(".monitoring-*").privileges("read", "read_cross_cluster").build(),
RoleDescriptor.IndicesPrivileges.builder()
.indices(".management-beats").privileges("create_index", "read", "write").build(),
RoleDescriptor.IndicesPrivileges.builder()
.indices(".tasks").privileges("create_index", "read", "create").build()
.indices(".management-beats").privileges("create_index", "read", "write").build()
},
null,
new ConditionalClusterPrivilege[] { new ManageApplicationPrivileges(Collections.singleton("kibana-*")) },

View File

@ -42,32 +42,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ClientHelperTests extends ESTestCase {
public void testStashContext() {
final String origin = randomAlphaOfLengthBetween(4, 16);
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
final boolean setOtherValues = randomBoolean();
if (setOtherValues) {
threadContext.putTransient("foo", "bar");
threadContext.putHeader("foo", "bar");
}
assertNull(threadContext.getTransient(ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME));
ThreadContext.StoredContext storedContext = ClientHelper.stashWithOrigin(threadContext, origin);
assertEquals(origin, threadContext.getTransient(ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME));
assertNull(threadContext.getTransient("foo"));
assertNull(threadContext.getTransient("bar"));
storedContext.close();
assertNull(threadContext.getTransient(ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME));
if (setOtherValues) {
assertEquals("bar", threadContext.getTransient("foo"));
assertEquals("bar", threadContext.getHeader("foo"));
}
}
public void testExecuteAsyncWrapsListener() throws Exception {
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
final String headerName = randomAlphaOfLengthBetween(4, 16);

View File

@ -11,11 +11,9 @@ import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteAction;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsAction;
import org.elasticsearch.action.admin.indices.close.CloseIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
import org.elasticsearch.action.admin.indices.recovery.RecoveryAction;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateAction;
@ -279,18 +277,6 @@ public class ReservedRolesStoreTests extends ESTestCase {
assertThat(kibanaRole.indices().allowedIndicesMatcher(MultiSearchAction.NAME).test(index), is(true));
assertThat(kibanaRole.indices().allowedIndicesMatcher(GetAction.NAME).test(index), is(true));
assertThat(kibanaRole.indices().allowedIndicesMatcher(READ_CROSS_CLUSTER_NAME).test(index), is(false));
// Tasks index
final String taskIndex = org.elasticsearch.tasks.TaskResultsService.TASK_INDEX;
// Things that kibana_system *should* be able to do
assertThat(kibanaRole.indices().allowedIndicesMatcher(CreateIndexAction.NAME).test(taskIndex), is(true));
assertThat(kibanaRole.indices().allowedIndicesMatcher(PutMappingAction.NAME).test(taskIndex), is(true));
assertThat(kibanaRole.indices().allowedIndicesMatcher(IndexAction.NAME).test(taskIndex), is(true));
assertThat(kibanaRole.indices().allowedIndicesMatcher(GetAction.NAME).test(taskIndex), is(true));
// Things that kibana_system *should not* be able to do
assertThat(kibanaRole.indices().allowedIndicesMatcher(DeleteIndexAction.NAME).test(taskIndex), is(false));
assertThat(kibanaRole.indices().allowedIndicesMatcher(DeleteAction.NAME).test(taskIndex), is(false));
assertThat(kibanaRole.indices().allowedIndicesMatcher(CloseIndexAction.NAME).test(taskIndex), is(false));
}
public void testKibanaUserRole() {

View File

@ -23,6 +23,7 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;
import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.DEPRECATION_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.INDEX_LIFECYCLE_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
@ -113,6 +114,7 @@ public final class AuthorizationUtils {
case PERSISTENT_TASK_ORIGIN:
case ROLLUP_ORIGIN:
case INDEX_LIFECYCLE_ORIGIN:
case TASKS_ORIGIN: // TODO use a more limited user for tasks
securityContext.executeAsUser(XPackUser.INSTANCE, consumer, Version.CURRENT);
break;
default:

View File

@ -23,6 +23,7 @@ import org.junit.Before;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
import static org.hamcrest.Matchers.is;
/**
@ -90,60 +91,45 @@ public class AuthorizationUtilsTests extends ESTestCase {
}
public void testSwitchAndExecuteXpackSecurityUser() throws Exception {
SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext);
final String headerName = randomAlphaOfLengthBetween(4, 16);
final String headerValue = randomAlphaOfLengthBetween(4, 16);
final CountDownLatch latch = new CountDownLatch(2);
final ActionListener<Void> listener = ActionListener.wrap(v -> {
assertNull(threadContext.getTransient(ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME));
assertNull(threadContext.getHeader(headerName));
assertEquals(XPackSecurityUser.INSTANCE, securityContext.getAuthentication().getUser());
latch.countDown();
}, e -> fail(e.getMessage()));
final Consumer<ThreadContext.StoredContext> consumer = original -> {
assertNull(threadContext.getTransient(ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME));
assertNull(threadContext.getHeader(headerName));
assertEquals(XPackSecurityUser.INSTANCE, securityContext.getAuthentication().getUser());
latch.countDown();
listener.onResponse(null);
};
threadContext.putHeader(headerName, headerValue);
threadContext.putTransient(ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME, ClientHelper.SECURITY_ORIGIN);
AuthorizationUtils.switchUserBasedOnActionOriginAndExecute(threadContext, securityContext, consumer);
latch.await();
assertSwitchBasedOnOriginAndExecute(ClientHelper.SECURITY_ORIGIN, XPackSecurityUser.INSTANCE);
}
public void testSwitchAndExecuteXpackUser() throws Exception {
String origin = randomFrom(ClientHelper.ML_ORIGIN, ClientHelper.WATCHER_ORIGIN, ClientHelper.DEPRECATION_ORIGIN,
ClientHelper.MONITORING_ORIGIN, ClientHelper.PERSISTENT_TASK_ORIGIN, ClientHelper.INDEX_LIFECYCLE_ORIGIN);
assertSwitchBasedOnOriginAndExecute(origin, XPackUser.INSTANCE);
}
public void testSwitchWithTaskOrigin() throws Exception {
assertSwitchBasedOnOriginAndExecute(TASKS_ORIGIN, XPackUser.INSTANCE);
}
private void assertSwitchBasedOnOriginAndExecute(String origin, User user) throws Exception {
SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext);
final String headerName = randomAlphaOfLengthBetween(4, 16);
final String headerValue = randomAlphaOfLengthBetween(4, 16);
final CountDownLatch latch = new CountDownLatch(2);
final ActionListener<Void> listener = ActionListener.wrap(v -> {
assertNull(threadContext.getTransient(ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME));
assertNull(threadContext.getTransient(ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME));
assertNull(threadContext.getHeader(headerName));
assertEquals(XPackUser.INSTANCE, securityContext.getAuthentication().getUser());
assertEquals(user, securityContext.getAuthentication().getUser());
latch.countDown();
}, e -> fail(e.getMessage()));
final Consumer<ThreadContext.StoredContext> consumer = original -> {
assertNull(threadContext.getTransient(ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME));
assertNull(threadContext.getTransient(ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME));
assertNull(threadContext.getHeader(headerName));
assertEquals(XPackUser.INSTANCE, securityContext.getAuthentication().getUser());
assertEquals(user, securityContext.getAuthentication().getUser());
latch.countDown();
listener.onResponse(null);
};
threadContext.putHeader(headerName, headerValue);
threadContext.putTransient(ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME,
randomFrom(ClientHelper.ML_ORIGIN, ClientHelper.WATCHER_ORIGIN, ClientHelper.DEPRECATION_ORIGIN,
ClientHelper.MONITORING_ORIGIN, ClientHelper.PERSISTENT_TASK_ORIGIN, ClientHelper.INDEX_LIFECYCLE_ORIGIN));
try (ThreadContext.StoredContext ignored = threadContext.stashWithOrigin(origin)) {
AuthorizationUtils.switchUserBasedOnActionOriginAndExecute(threadContext, securityContext, consumer);
AuthorizationUtils.switchUserBasedOnActionOriginAndExecute(threadContext, securityContext, consumer);
latch.await();
latch.await();
}
}
}

View File

@ -21,6 +21,7 @@ integTestCluster {
test_admin: 'superuser',
powerful_user: 'superuser',
minimal_user: 'minimal',
minimal_with_task_user: 'minimal_with_task',
readonly_user: 'readonly',
dest_only_user: 'dest_only',
can_not_see_hidden_docs_user: 'can_not_see_hidden_docs',

View File

@ -29,6 +29,26 @@ minimal:
- create_index
- indices:admin/refresh
# Search and write on both source and destination indices. It should work if you could just search on the source and
# write to the destination but that isn't how security works.
minimal_with_task:
cluster:
- cluster:monitor/main
- cluster:monitor/task/get
indices:
- names: source
privileges:
- read
- write
- create_index
- indices:admin/refresh
- names: dest
privileges:
- read
- write
- create_index
- indices:admin/refresh
# Read only operations on indices
readonly:
cluster:

View File

@ -54,7 +54,6 @@ setup:
text: test
- match: { hits.total: 1 }
---
"Reindex with runas user with minimal privileges works":
@ -87,7 +86,6 @@ setup:
text: test
- match: { hits.total: 1 }
---
"Reindex as readonly user is forbidden":
@ -316,3 +314,50 @@ setup:
index: dest
dest:
index: source
---
"Reindex wait_for_completion=false as minimal with task API":
- do:
index:
index: source
type: foo
id: 1
body: { "text": "test" }
- do:
indices.refresh: {}
- do:
headers: {es-security-runas-user: minimal_with_task_user}
reindex:
refresh: true
wait_for_completion: false
body:
source:
index: source
dest:
index: dest
- set: {task: task}
- do:
headers: {es-security-runas-user: minimal_with_task_user}
tasks.get:
wait_for_completion: true
task_id: $task
- match: {response.created: 1}
- do:
search:
index: dest
body:
query:
match:
text: test
- match: { hits.total: 1 }
# the minimal user doesn't have permission to read the tasks API
- do:
headers: {es-security-runas-user: minimal_user}
catch: forbidden
tasks.get:
task_id: $task