diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskAction.java index d1e27e49088..4e88963de4c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.Action; * Action for retrieving a list of currently running tasks */ public class GetTaskAction extends Action { + public static final String TASKS_ORIGIN = "tasks"; public static final GetTaskAction INSTANCE = new GetTaskAction(); public static final String NAME = "cluster:monitor/task/get"; diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java index cc141d1b3b1..f914cc89a4f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; @@ -51,6 +52,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN; import static org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction.waitForCompletionTimeout; /** @@ -77,7 +79,7 @@ public class TransportGetTaskAction extends HandledTransportAction() { @Override public void onResponse(GetResponse getResponse) { diff --git a/server/src/main/java/org/elasticsearch/client/OriginSettingClient.java b/server/src/main/java/org/elasticsearch/client/OriginSettingClient.java new file mode 100644 index 00000000000..ad5a602fbae --- /dev/null +++ b/server/src/main/java/org/elasticsearch/client/OriginSettingClient.java @@ -0,0 +1,54 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ContextPreservingActionListener; +import org.elasticsearch.common.util.concurrent.ThreadContext; + +import java.util.function.Supplier; + +/** + * A {@linkplain Client} that sends requests with the + * {@link ThreadContext#stashWithOrigin origin} set to a particular + * value and calls its {@linkplain ActionListener} in its original + * {@link ThreadContext}. + */ +public final class OriginSettingClient extends FilterClient { + + private final String origin; + + public OriginSettingClient(Client in, String origin) { + super(in); + this.origin = origin; + } + + @Override + protected + void doExecute(Action action, Request request, ActionListener listener) { + final Supplier supplier = in().threadPool().getThreadContext().newRestorableContext(false); + try (ThreadContext.StoredContext ignore = in().threadPool().getThreadContext().stashWithOrigin(origin)) { + super.doExecute(action, request, new ContextPreservingActionListener<>(supplier, listener)); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java index 96648111495..79d7c3510c2 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java @@ -22,6 +22,8 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; import org.apache.lucene.util.CloseableThreadLocal; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.support.ContextPreservingActionListener; +import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -85,6 +87,12 @@ public final class ThreadContext implements Closeable, Writeable { public static final String PREFIX = "request.headers"; public static final Setting DEFAULT_HEADERS_SETTING = Setting.groupSetting(PREFIX + ".", Property.NodeScope); + + /** + * Name for the {@link #stashWithOrigin origin} attribute. + */ + public static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin"; + private static final Logger logger = LogManager.getLogger(ThreadContext.class); private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct(); private final Map defaultHeader; @@ -119,7 +127,7 @@ public final class ThreadContext implements Closeable, Writeable { /** * Removes the current context and resets a default context. The removed context can be - * restored when closing the returned {@link StoredContext} + * restored by closing the returned {@link StoredContext}. */ public StoredContext stashContext() { final ThreadContextStruct context = threadLocal.get(); @@ -127,6 +135,31 @@ public final class ThreadContext implements Closeable, Writeable { return () -> threadLocal.set(context); } + /** + * Removes the current context and resets a default context marked with as + * originating from the supplied string. The removed context can be + * restored by closing the returned {@link StoredContext}. Callers should + * be careful to save the current context before calling this method and + * restore it any listeners, likely with + * {@link ContextPreservingActionListener}. Use {@link OriginSettingClient} + * which can be used to do this automatically. + *

+ * Without security the origin is ignored, but security uses it to authorize + * actions that are made up of many sub-actions. These actions call + * {@link #stashWithOrigin} before performing on behalf of a user that + * should be allowed even if the user doesn't have permission to perform + * those actions on their own. + *

+ * For example, a user might not have permission to GET from the tasks index + * but the tasks API will perform a get on their behalf using this method + * if it can't find the task in memory. + */ + public StoredContext stashWithOrigin(String origin) { + final ThreadContext.StoredContext storedContext = stashContext(); + putTransient(ACTION_ORIGIN_TRANSIENT_NAME, origin); + return storedContext; + } + /** * Removes the current context and resets a new context that contains a merge of the current headers and the given headers. * The removed context can be restored when closing the returned {@link StoredContext}. The merge strategy is that headers diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java index 301c0a21ea5..ef467200001 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java @@ -25,21 +25,19 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; -import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import java.util.function.Predicate; -import java.util.function.Supplier; /** * This service is used by persistent tasks and allocated persistent tasks to communicate changes @@ -50,7 +48,6 @@ public class PersistentTasksService { private static final Logger logger = LogManager.getLogger(PersistentTasksService.class); - private static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin"; private static final String PERSISTENT_TASK_ORIGIN = "persistent_tasks"; private final Client client; @@ -58,7 +55,7 @@ public class PersistentTasksService { private final ThreadPool threadPool; public PersistentTasksService(ClusterService clusterService, ThreadPool threadPool, Client client) { - this.client = client; + this.client = new OriginSettingClient(client, PERSISTENT_TASK_ORIGIN); this.clusterService = clusterService; this.threadPool = threadPool; } @@ -98,12 +95,7 @@ public class PersistentTasksService { request.setTaskId(new TaskId(clusterService.localNode().getId(), taskId)); request.setReason(reason); try { - final ThreadContext threadContext = client.threadPool().getThreadContext(); - final Supplier supplier = threadContext.newRestorableContext(false); - - try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, PERSISTENT_TASK_ORIGIN)) { - client.admin().cluster().cancelTasks(request, new ContextPreservingActionListener<>(supplier, listener)); - } + client.admin().cluster().cancelTasks(request, listener); } catch (Exception e) { listener.onFailure(e); } @@ -140,14 +132,8 @@ public class PersistentTasksService { private void execute(final Req request, final Action action, final ActionListener> listener) { try { - final ThreadContext threadContext = client.threadPool().getThreadContext(); - final Supplier supplier = threadContext.newRestorableContext(false); - - try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, PERSISTENT_TASK_ORIGIN)) { - client.execute(action, request, - new ContextPreservingActionListener<>(supplier, - ActionListener.wrap(r -> listener.onResponse(r.getTask()), listener::onFailure))); - } + client.execute(action, request, + ActionListener.wrap(r -> listener.onResponse(r.getTask()), listener::onFailure)); } catch (Exception e) { listener.onFailure(e); } @@ -233,10 +219,4 @@ public class PersistentTasksService { onFailure(new IllegalStateException("Timed out when waiting for persistent task after " + timeout)); } } - - public static ThreadContext.StoredContext stashWithOrigin(ThreadContext threadContext, String origin) { - final ThreadContext.StoredContext storedContext = threadContext.stashContext(); - threadContext.putTransient(ACTION_ORIGIN_TRANSIENT_NAME, origin); - return storedContext; - } } diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java b/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java index 8a3d237b443..64239d2e6b0 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java @@ -31,6 +31,7 @@ import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -50,6 +51,8 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.Map; +import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN; + /** * Service that can store task results. */ @@ -73,7 +76,7 @@ public class TaskResultsService { @Inject public TaskResultsService(Client client, ClusterService clusterService) { - this.client = client; + this.client = new OriginSettingClient(client, TASKS_ORIGIN); this.clusterService = clusterService; } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index d33fff45308..ad7b1c51b51 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest; import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; @@ -34,7 +35,6 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeAction; import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction; import org.elasticsearch.action.bulk.BulkAction; -import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchAction; @@ -85,7 +85,6 @@ import static java.util.Collections.singleton; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; @@ -725,12 +724,6 @@ public class TasksIT extends ESIntegTestCase { } public void testTaskStoringSuccesfulResult() throws Exception { - // Randomly create an empty index to make sure the type is created automatically - if (randomBoolean()) { - logger.info("creating an empty results index with custom settings"); - assertAcked(client().admin().indices().prepareCreate(TaskResultsService.TASK_INDEX)); - } - registerTaskManageListeners(TestTaskPlugin.TestTaskAction.NAME); // we need this to get task id of the process // Start non-blocking test task @@ -743,23 +736,20 @@ public class TasksIT extends ESIntegTestCase { TaskInfo taskInfo = events.get(0); TaskId taskId = taskInfo.getTaskId(); - GetResponse resultDoc = client() - .prepareGet(TaskResultsService.TASK_INDEX, TaskResultsService.TASK_TYPE, taskId.toString()).get(); - assertTrue(resultDoc.isExists()); + TaskResult taskResult = client().admin().cluster() + .getTask(new GetTaskRequest().setTaskId(taskId)).get().getTask(); + assertTrue(taskResult.isCompleted()); + assertNull(taskResult.getError()); - Map source = resultDoc.getSource(); - @SuppressWarnings("unchecked") - Map task = (Map) source.get("task"); - assertEquals(taskInfo.getTaskId().getNodeId(), task.get("node")); - assertEquals(taskInfo.getAction(), task.get("action")); - assertEquals(Long.toString(taskInfo.getId()), task.get("id").toString()); - - @SuppressWarnings("unchecked") - Map result = (Map) source.get("response"); + assertEquals(taskInfo.getTaskId(), taskResult.getTask().getTaskId()); + assertEquals(taskInfo.getType(), taskResult.getTask().getType()); + assertEquals(taskInfo.getAction(), taskResult.getTask().getAction()); + assertEquals(taskInfo.getDescription(), taskResult.getTask().getDescription()); + assertEquals(taskInfo.getStartTime(), taskResult.getTask().getStartTime()); + assertEquals(taskInfo.getHeaders(), taskResult.getTask().getHeaders()); + Map result = taskResult.getResponseAsMap(); assertEquals("0", result.get("failure_count").toString()); - assertNull(source.get("failure")); - assertNoFailures(client().admin().indices().prepareRefresh(TaskResultsService.TASK_INDEX).get()); SearchResponse searchResponse = client().prepareSearch(TaskResultsService.TASK_INDEX) @@ -797,25 +787,21 @@ public class TasksIT extends ESIntegTestCase { TaskInfo failedTaskInfo = events.get(0); TaskId failedTaskId = failedTaskInfo.getTaskId(); - GetResponse failedResultDoc = client() - .prepareGet(TaskResultsService.TASK_INDEX, TaskResultsService.TASK_TYPE, failedTaskId.toString()) - .get(); - assertTrue(failedResultDoc.isExists()); + TaskResult taskResult = client().admin().cluster() + .getTask(new GetTaskRequest().setTaskId(failedTaskId)).get().getTask(); + assertTrue(taskResult.isCompleted()); + assertNull(taskResult.getResponse()); - Map source = failedResultDoc.getSource(); - @SuppressWarnings("unchecked") - Map task = (Map) source.get("task"); - assertEquals(failedTaskInfo.getTaskId().getNodeId(), task.get("node")); - assertEquals(failedTaskInfo.getAction(), task.get("action")); - assertEquals(Long.toString(failedTaskInfo.getId()), task.get("id").toString()); - - @SuppressWarnings("unchecked") - Map error = (Map) source.get("error"); + assertEquals(failedTaskInfo.getTaskId(), taskResult.getTask().getTaskId()); + assertEquals(failedTaskInfo.getType(), taskResult.getTask().getType()); + assertEquals(failedTaskInfo.getAction(), taskResult.getTask().getAction()); + assertEquals(failedTaskInfo.getDescription(), taskResult.getTask().getDescription()); + assertEquals(failedTaskInfo.getStartTime(), taskResult.getTask().getStartTime()); + assertEquals(failedTaskInfo.getHeaders(), taskResult.getTask().getHeaders()); + Map error = (Map) taskResult.getErrorAsMap(); assertEquals("Simulating operation failure", error.get("reason")); assertEquals("illegal_state_exception", error.get("type")); - assertNull(source.get("result")); - GetTaskResponse getResponse = expectFinishedTask(failedTaskId); assertNull(getResponse.getTask().getResponse()); assertEquals(error, getResponse.getTask().getErrorAsMap()); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java index cac1ff61b20..8357c25299a 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; @@ -40,18 +41,28 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.NetworkPlugin; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportInterceptor; +import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportResponseHandler; import java.io.IOException; import java.util.ArrayList; @@ -61,13 +72,15 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; +import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN; import static org.elasticsearch.test.ESTestCase.awaitBusy; /** * A plugin that adds a cancellable blocking test task of integration testing of the task manager. */ -public class TestTaskPlugin extends Plugin implements ActionPlugin { +public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugin { @Override public List> getActions() { @@ -80,6 +93,16 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin { return Collections.singleton("Custom-Task-Header"); } + /** + * Intercept transport requests to verify that all of the ones that should + * have the origin set do have the origin set and the ones + * that should not have the origin set do not have it set. + */ + @Override + public List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) { + return Collections.singletonList(new OriginAssertingInterceptor(threadContext)); + } + static class TestTask extends CancellableTask { private volatile boolean blocked = true; @@ -469,4 +492,70 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin { } } + private static class OriginAssertingInterceptor implements TransportInterceptor { + private final ThreadContext threadContext; + + private OriginAssertingInterceptor(ThreadContext threadContext) { + this.threadContext = threadContext; + } + + @Override + public AsyncSender interceptSender(AsyncSender sender) { + return new AsyncSender() { + @Override + public void sendRequest( + Transport.Connection connection, String action, TransportRequest request, + TransportRequestOptions options, TransportResponseHandler handler) { + if (action.startsWith("indices:data/write/bulk[s]")) { + /* + * We can't reason about these requests because + * *sometimes* they should have the origin, if they are + * running on the node that stores the task. But + * sometimes they won't be and in that case they don't + * need the origin. Either way, the interesting work is + * done by checking that the main bulk request + * (without the [s] part) has the origin. + */ + sender.sendRequest(connection, action, request, options, handler); + return; + } + String expectedOrigin = shouldHaveOrigin(action, request) ? TASKS_ORIGIN : null; + String actualOrigin = threadContext.getTransient(ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME); + if (Objects.equals(expectedOrigin, actualOrigin)) { + sender.sendRequest(connection, action, request, options, handler); + return; + } + handler.handleException(new TransportException("should have origin of [" + + expectedOrigin + "] but was [" + actualOrigin + "] action was [" + + action + "][" + request + "]")); + } + }; + } + + private boolean shouldHaveOrigin(String action, TransportRequest request) { + if (false == action.startsWith("indices:")) { + /* + * The Tasks API never uses origin with non-indices actions. + */ + return false; + } + if ( action.startsWith("indices:admin/refresh") + || action.startsWith("indices:data/read/search")) { + /* + * The test refreshes and searches to count the number of tasks + * in the index and the Tasks API never does either. + */ + return false; + } + if (false == (request instanceof IndicesRequest)) { + return false; + } + IndicesRequest ir = (IndicesRequest) request; + /* + * When the API Tasks API makes an indices request it only every + * targets the .tasks index. Other requests come from the tests. + */ + return Arrays.equals(new String[] {".tasks"}, ir.indices()); + } + } } diff --git a/server/src/test/java/org/elasticsearch/client/OriginSettingClientTests.java b/server/src/test/java/org/elasticsearch/client/OriginSettingClientTests.java new file mode 100644 index 00000000000..8347cc1c1dd --- /dev/null +++ b/server/src/test/java/org/elasticsearch/client/OriginSettingClientTests.java @@ -0,0 +1,72 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.search.ClearScrollRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpClient; + +public class OriginSettingClientTests extends ESTestCase { + public void testSetsParentId() { + String origin = randomAlphaOfLength(7); + + /* + * This mock will do nothing but verify that origin is set in the + * thread context before executing the action. + */ + NoOpClient mock = new NoOpClient(getTestName()) { + @Override + protected + void doExecute(Action action, Request request, ActionListener listener) { + assertEquals(origin, threadPool().getThreadContext().getTransient(ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME)); + super.doExecute(action, request, listener); + } + }; + + try (OriginSettingClient client = new OriginSettingClient(mock, origin)) { + // All of these should have the origin set + client.bulk(new BulkRequest()); + client.search(new SearchRequest()); + client.clearScroll(new ClearScrollRequest()); + + ThreadContext threadContext = client.threadPool().getThreadContext(); + client.bulk(new BulkRequest(), listenerThatAssertsOriginNotSet(threadContext)); + client.search(new SearchRequest(), listenerThatAssertsOriginNotSet(threadContext)); + client.clearScroll(new ClearScrollRequest(), listenerThatAssertsOriginNotSet(threadContext)); + } + } + + private ActionListener listenerThatAssertsOriginNotSet(ThreadContext threadContext) { + return ActionListener.wrap( + r -> { + assertNull(threadContext.getTransient(ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME)); + }, + e -> { + fail("didn't expect to fail but: " + e); + }); + } +} diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/ThreadContextTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/ThreadContextTests.java index a0a92cad7a8..9729aca2941 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/ThreadContextTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/ThreadContextTests.java @@ -55,6 +55,31 @@ public class ThreadContextTests extends ESTestCase { assertEquals("1", threadContext.getHeader("default")); } + public void testStashWithOrigin() { + final String origin = randomAlphaOfLengthBetween(4, 16); + final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + + final boolean setOtherValues = randomBoolean(); + if (setOtherValues) { + threadContext.putTransient("foo", "bar"); + threadContext.putHeader("foo", "bar"); + } + + assertNull(threadContext.getTransient(ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME)); + try (ThreadContext.StoredContext storedContext = threadContext.stashWithOrigin(origin)) { + assertEquals(origin, threadContext.getTransient(ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME)); + assertNull(threadContext.getTransient("foo")); + assertNull(threadContext.getTransient("bar")); + } + + assertNull(threadContext.getTransient(ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME)); + + if (setOtherValues) { + assertEquals("bar", threadContext.getTransient("foo")); + assertEquals("bar", threadContext.getHeader("foo")); + } + } + public void testStashAndMerge() { Settings build = Settings.builder().put("request.headers.default", "1").build(); ThreadContext threadContext = new ThreadContext(build); diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java index 8aa553639cc..b5d72e3aa43 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java @@ -22,6 +22,7 @@ package org.elasticsearch.persistent; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -231,7 +232,9 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { public void testTaskCancellation() { AtomicLong capturedTaskId = new AtomicLong(); AtomicReference> capturedListener = new AtomicReference<>(); - PersistentTasksService persistentTasksService = new PersistentTasksService(null, null, null) { + Client client = mock(Client.class); + when(client.settings()).thenReturn(Settings.EMPTY); + PersistentTasksService persistentTasksService = new PersistentTasksService(null, null, client) { @Override void sendCancelRequest(final long taskId, final String reason, final ActionListener listener) { capturedTaskId.set(taskId); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java index c0a7a0b90a4..a6874a18853 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java @@ -12,7 +12,7 @@ import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.client.Client; -import org.elasticsearch.client.FilterClient; +import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.xpack.core.security.authc.AuthenticationField; @@ -36,7 +36,12 @@ public final class ClientHelper { public static final Set SECURITY_HEADER_FILTERS = Sets.newHashSet(AuthenticationServiceField.RUN_AS_USER_HEADER, AuthenticationField.AUTHENTICATION_KEY); - public static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin"; + /** + * . + * @deprecated use ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME + */ + @Deprecated + public static final String ACTION_ORIGIN_TRANSIENT_NAME = ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME; public static final String SECURITY_ORIGIN = "security"; public static final String WATCHER_ORIGIN = "watcher"; public static final String ML_ORIGIN = "ml"; @@ -50,18 +55,20 @@ public final class ClientHelper { /** * Stashes the current context and sets the origin in the current context. The original context is returned as a stored context + * @deprecated use ThreadContext.stashWithOrigin */ + @Deprecated public static ThreadContext.StoredContext stashWithOrigin(ThreadContext threadContext, String origin) { - final ThreadContext.StoredContext storedContext = threadContext.stashContext(); - threadContext.putTransient(ACTION_ORIGIN_TRANSIENT_NAME, origin); - return storedContext; + return threadContext.stashWithOrigin(origin); } /** * Returns a client that will always set the appropriate origin and ensure the proper context is restored by listeners + * @deprecated use {@link OriginSettingClient} instead */ + @Deprecated public static Client clientWithOrigin(Client client, String origin) { - return new ClientWithOrigin(client, origin); + return new OriginSettingClient(client, origin); } /** @@ -165,24 +172,4 @@ public final class ClientHelper { threadContext.copyHeaders(headers.entrySet()); return storedContext; } - - private static final class ClientWithOrigin extends FilterClient { - - private final String origin; - - private ClientWithOrigin(Client in, String origin) { - super(in); - this.origin = origin; - } - - @Override - protected - void doExecute(Action action, Request request, ActionListener listener) { - final Supplier supplier = in().threadPool().getThreadContext().newRestorableContext(false); - try (ThreadContext.StoredContext ignore = in().threadPool().getThreadContext().stashContext()) { - in().threadPool().getThreadContext().putTransient(ACTION_ORIGIN_TRANSIENT_NAME, origin); - super.doExecute(action, request, new ContextPreservingActionListener<>(supplier, listener)); - } - } - } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStore.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStore.java index 214e80a1ac0..046cf592549 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStore.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStore.java @@ -118,9 +118,7 @@ public class ReservedRolesStore implements BiConsumer, ActionListene RoleDescriptor.IndicesPrivileges.builder() .indices(".monitoring-*").privileges("read", "read_cross_cluster").build(), RoleDescriptor.IndicesPrivileges.builder() - .indices(".management-beats").privileges("create_index", "read", "write").build(), - RoleDescriptor.IndicesPrivileges.builder() - .indices(".tasks").privileges("create_index", "read", "create").build() + .indices(".management-beats").privileges("create_index", "read", "write").build() }, null, new ConditionalClusterPrivilege[] { new ManageApplicationPrivileges(Collections.singleton("kibana-*")) }, diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ClientHelperTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ClientHelperTests.java index 95361dbff42..1a0a8e76412 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ClientHelperTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ClientHelperTests.java @@ -42,32 +42,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class ClientHelperTests extends ESTestCase { - - public void testStashContext() { - final String origin = randomAlphaOfLengthBetween(4, 16); - final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); - - final boolean setOtherValues = randomBoolean(); - if (setOtherValues) { - threadContext.putTransient("foo", "bar"); - threadContext.putHeader("foo", "bar"); - } - - assertNull(threadContext.getTransient(ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME)); - ThreadContext.StoredContext storedContext = ClientHelper.stashWithOrigin(threadContext, origin); - assertEquals(origin, threadContext.getTransient(ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME)); - assertNull(threadContext.getTransient("foo")); - assertNull(threadContext.getTransient("bar")); - - storedContext.close(); - assertNull(threadContext.getTransient(ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME)); - - if (setOtherValues) { - assertEquals("bar", threadContext.getTransient("foo")); - assertEquals("bar", threadContext.getHeader("foo")); - } - } - public void testExecuteAsyncWrapsListener() throws Exception { final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); final String headerName = randomAlphaOfLengthBetween(4, 16); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java index c85b5f8dc39..fc9869a1280 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java @@ -11,11 +11,9 @@ import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteAction; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsAction; -import org.elasticsearch.action.admin.indices.close.CloseIndexAction; import org.elasticsearch.action.admin.indices.create.CreateIndexAction; import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; import org.elasticsearch.action.admin.indices.get.GetIndexAction; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction; import org.elasticsearch.action.admin.indices.recovery.RecoveryAction; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction; import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateAction; @@ -279,18 +277,6 @@ public class ReservedRolesStoreTests extends ESTestCase { assertThat(kibanaRole.indices().allowedIndicesMatcher(MultiSearchAction.NAME).test(index), is(true)); assertThat(kibanaRole.indices().allowedIndicesMatcher(GetAction.NAME).test(index), is(true)); assertThat(kibanaRole.indices().allowedIndicesMatcher(READ_CROSS_CLUSTER_NAME).test(index), is(false)); - - // Tasks index - final String taskIndex = org.elasticsearch.tasks.TaskResultsService.TASK_INDEX; - // Things that kibana_system *should* be able to do - assertThat(kibanaRole.indices().allowedIndicesMatcher(CreateIndexAction.NAME).test(taskIndex), is(true)); - assertThat(kibanaRole.indices().allowedIndicesMatcher(PutMappingAction.NAME).test(taskIndex), is(true)); - assertThat(kibanaRole.indices().allowedIndicesMatcher(IndexAction.NAME).test(taskIndex), is(true)); - assertThat(kibanaRole.indices().allowedIndicesMatcher(GetAction.NAME).test(taskIndex), is(true)); - // Things that kibana_system *should not* be able to do - assertThat(kibanaRole.indices().allowedIndicesMatcher(DeleteIndexAction.NAME).test(taskIndex), is(false)); - assertThat(kibanaRole.indices().allowedIndicesMatcher(DeleteAction.NAME).test(taskIndex), is(false)); - assertThat(kibanaRole.indices().allowedIndicesMatcher(CloseIndexAction.NAME).test(taskIndex), is(false)); } public void testKibanaUserRole() { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java index 02679a1dfc0..193e3bdb3bd 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java @@ -23,6 +23,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Predicate; +import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.DEPRECATION_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.INDEX_LIFECYCLE_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -113,6 +114,7 @@ public final class AuthorizationUtils { case PERSISTENT_TASK_ORIGIN: case ROLLUP_ORIGIN: case INDEX_LIFECYCLE_ORIGIN: + case TASKS_ORIGIN: // TODO use a more limited user for tasks securityContext.executeAsUser(XPackUser.INSTANCE, consumer, Version.CURRENT); break; default: diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationUtilsTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationUtilsTests.java index 66b1e9d9c2a..f1767b11c78 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationUtilsTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationUtilsTests.java @@ -23,6 +23,7 @@ import org.junit.Before; import java.util.concurrent.CountDownLatch; import java.util.function.Consumer; +import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN; import static org.hamcrest.Matchers.is; /** @@ -90,60 +91,45 @@ public class AuthorizationUtilsTests extends ESTestCase { } public void testSwitchAndExecuteXpackSecurityUser() throws Exception { - SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext); - final String headerName = randomAlphaOfLengthBetween(4, 16); - final String headerValue = randomAlphaOfLengthBetween(4, 16); - final CountDownLatch latch = new CountDownLatch(2); - - final ActionListener listener = ActionListener.wrap(v -> { - assertNull(threadContext.getTransient(ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME)); - assertNull(threadContext.getHeader(headerName)); - assertEquals(XPackSecurityUser.INSTANCE, securityContext.getAuthentication().getUser()); - latch.countDown(); - }, e -> fail(e.getMessage())); - - final Consumer consumer = original -> { - assertNull(threadContext.getTransient(ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME)); - assertNull(threadContext.getHeader(headerName)); - assertEquals(XPackSecurityUser.INSTANCE, securityContext.getAuthentication().getUser()); - latch.countDown(); - listener.onResponse(null); - }; - threadContext.putHeader(headerName, headerValue); - threadContext.putTransient(ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME, ClientHelper.SECURITY_ORIGIN); - - AuthorizationUtils.switchUserBasedOnActionOriginAndExecute(threadContext, securityContext, consumer); - - latch.await(); + assertSwitchBasedOnOriginAndExecute(ClientHelper.SECURITY_ORIGIN, XPackSecurityUser.INSTANCE); } public void testSwitchAndExecuteXpackUser() throws Exception { + String origin = randomFrom(ClientHelper.ML_ORIGIN, ClientHelper.WATCHER_ORIGIN, ClientHelper.DEPRECATION_ORIGIN, + ClientHelper.MONITORING_ORIGIN, ClientHelper.PERSISTENT_TASK_ORIGIN, ClientHelper.INDEX_LIFECYCLE_ORIGIN); + assertSwitchBasedOnOriginAndExecute(origin, XPackUser.INSTANCE); + } + + public void testSwitchWithTaskOrigin() throws Exception { + assertSwitchBasedOnOriginAndExecute(TASKS_ORIGIN, XPackUser.INSTANCE); + } + + private void assertSwitchBasedOnOriginAndExecute(String origin, User user) throws Exception { SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext); final String headerName = randomAlphaOfLengthBetween(4, 16); final String headerValue = randomAlphaOfLengthBetween(4, 16); final CountDownLatch latch = new CountDownLatch(2); final ActionListener listener = ActionListener.wrap(v -> { - assertNull(threadContext.getTransient(ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME)); + assertNull(threadContext.getTransient(ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME)); assertNull(threadContext.getHeader(headerName)); - assertEquals(XPackUser.INSTANCE, securityContext.getAuthentication().getUser()); + assertEquals(user, securityContext.getAuthentication().getUser()); latch.countDown(); }, e -> fail(e.getMessage())); final Consumer consumer = original -> { - assertNull(threadContext.getTransient(ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME)); + assertNull(threadContext.getTransient(ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME)); assertNull(threadContext.getHeader(headerName)); - assertEquals(XPackUser.INSTANCE, securityContext.getAuthentication().getUser()); + assertEquals(user, securityContext.getAuthentication().getUser()); latch.countDown(); listener.onResponse(null); }; threadContext.putHeader(headerName, headerValue); - threadContext.putTransient(ClientHelper.ACTION_ORIGIN_TRANSIENT_NAME, - randomFrom(ClientHelper.ML_ORIGIN, ClientHelper.WATCHER_ORIGIN, ClientHelper.DEPRECATION_ORIGIN, - ClientHelper.MONITORING_ORIGIN, ClientHelper.PERSISTENT_TASK_ORIGIN, ClientHelper.INDEX_LIFECYCLE_ORIGIN)); + try (ThreadContext.StoredContext ignored = threadContext.stashWithOrigin(origin)) { + AuthorizationUtils.switchUserBasedOnActionOriginAndExecute(threadContext, securityContext, consumer); - AuthorizationUtils.switchUserBasedOnActionOriginAndExecute(threadContext, securityContext, consumer); - - latch.await(); + latch.await(); + } } + } diff --git a/x-pack/qa/reindex-tests-with-security/build.gradle b/x-pack/qa/reindex-tests-with-security/build.gradle index ea2b7d69906..0bd51f483ea 100644 --- a/x-pack/qa/reindex-tests-with-security/build.gradle +++ b/x-pack/qa/reindex-tests-with-security/build.gradle @@ -21,6 +21,7 @@ integTestCluster { test_admin: 'superuser', powerful_user: 'superuser', minimal_user: 'minimal', + minimal_with_task_user: 'minimal_with_task', readonly_user: 'readonly', dest_only_user: 'dest_only', can_not_see_hidden_docs_user: 'can_not_see_hidden_docs', diff --git a/x-pack/qa/reindex-tests-with-security/roles.yml b/x-pack/qa/reindex-tests-with-security/roles.yml index ce45f980670..f91fefaca1b 100644 --- a/x-pack/qa/reindex-tests-with-security/roles.yml +++ b/x-pack/qa/reindex-tests-with-security/roles.yml @@ -29,6 +29,26 @@ minimal: - create_index - indices:admin/refresh +# Search and write on both source and destination indices. It should work if you could just search on the source and +# write to the destination but that isn't how security works. +minimal_with_task: + cluster: + - cluster:monitor/main + - cluster:monitor/task/get + indices: + - names: source + privileges: + - read + - write + - create_index + - indices:admin/refresh + - names: dest + privileges: + - read + - write + - create_index + - indices:admin/refresh + # Read only operations on indices readonly: cluster: diff --git a/x-pack/qa/reindex-tests-with-security/src/test/resources/rest-api-spec/test/10_reindex.yml b/x-pack/qa/reindex-tests-with-security/src/test/resources/rest-api-spec/test/10_reindex.yml index a5779ff94d0..ab9f56652e3 100644 --- a/x-pack/qa/reindex-tests-with-security/src/test/resources/rest-api-spec/test/10_reindex.yml +++ b/x-pack/qa/reindex-tests-with-security/src/test/resources/rest-api-spec/test/10_reindex.yml @@ -54,7 +54,6 @@ setup: text: test - match: { hits.total: 1 } - --- "Reindex with runas user with minimal privileges works": @@ -87,7 +86,6 @@ setup: text: test - match: { hits.total: 1 } - --- "Reindex as readonly user is forbidden": @@ -316,3 +314,50 @@ setup: index: dest dest: index: source + +--- +"Reindex wait_for_completion=false as minimal with task API": + + - do: + index: + index: source + type: foo + id: 1 + body: { "text": "test" } + - do: + indices.refresh: {} + + - do: + headers: {es-security-runas-user: minimal_with_task_user} + reindex: + refresh: true + wait_for_completion: false + body: + source: + index: source + dest: + index: dest + - set: {task: task} + + - do: + headers: {es-security-runas-user: minimal_with_task_user} + tasks.get: + wait_for_completion: true + task_id: $task + - match: {response.created: 1} + + - do: + search: + index: dest + body: + query: + match: + text: test + - match: { hits.total: 1 } + + # the minimal user doesn't have permission to read the tasks API + - do: + headers: {es-security-runas-user: minimal_user} + catch: forbidden + tasks.get: + task_id: $task