diff --git a/core/src/main/java/org/elasticsearch/client/ParentTaskAssigningClient.java b/core/src/main/java/org/elasticsearch/client/ParentTaskAssigningClient.java new file mode 100644 index 00000000000..44ba2b76e43 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/client/ParentTaskAssigningClient.java @@ -0,0 +1,68 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; + +/** + * A {@linkplain Client} that sets the parent task on all requests that it makes. Use this to conveniently implement actions that cause + * many other actions. + */ +public class ParentTaskAssigningClient extends FilterClient { + private final TaskId parentTask; + + /** + * Standard constructor. + */ + public ParentTaskAssigningClient(Client in, TaskId parentTask) { + super(in); + this.parentTask = parentTask; + } + + /** + * Convenience constructor for building the TaskId out of what is usually at hand. + */ + public ParentTaskAssigningClient(Client in, DiscoveryNode localNode, Task parentTask) { + this(in, new TaskId(localNode.getId(), parentTask.getId())); + } + + /** + * Fetch the wrapped client. Use this to make calls that don't set {@link ActionRequest#setParentTask(TaskId)}. + */ + public Client unwrap() { + return in(); + } + + @Override + protected < Request extends ActionRequest, + Response extends ActionResponse, + RequestBuilder extends ActionRequestBuilder + > void doExecute(Action action, Request request, ActionListener listener) { + request.setParentTask(parentTask); + super.doExecute(action, request, listener); + } +} diff --git a/core/src/test/java/org/elasticsearch/client/ParentTaskAssigningClientTests.java b/core/src/test/java/org/elasticsearch/client/ParentTaskAssigningClientTests.java new file mode 100644 index 00000000000..360137b8904 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/client/ParentTaskAssigningClientTests.java @@ -0,0 +1,63 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.search.ClearScrollRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpClient; + +public class ParentTaskAssigningClientTests extends ESTestCase { + public void testSetsParentId() { + TaskId[] parentTaskId = new TaskId[] {new TaskId(randomAsciiOfLength(3), randomLong())}; + + // This mock will do nothing but verify that parentTaskId is set on all requests sent to it. + NoOpClient mock = new NoOpClient(getTestName()) { + @Override + protected < Request extends ActionRequest, + Response extends ActionResponse, + RequestBuilder extends ActionRequestBuilder + > void doExecute( Action action, Request request, + ActionListener listener) { + assertEquals(parentTaskId[0], request.getParentTask()); + super.doExecute(action, request, listener); + } + }; + try (ParentTaskAssigningClient client = new ParentTaskAssigningClient(mock, parentTaskId[0])) { + // All of these should have the parentTaskId set + client.bulk(new BulkRequest()); + client.search(new SearchRequest()); + client.clearScroll(new ClearScrollRequest()); + + // Now lets verify that unwrapped calls don't have the parentTaskId set + parentTaskId[0] = TaskId.EMPTY_TASK_ID; + client.unwrap().bulk(new BulkRequest()); + client.unwrap().search(new SearchRequest()); + client.unwrap().clearScroll(new ClearScrollRequest()); + } + } +} diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java index d904eb503fd..f403178037b 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java @@ -35,7 +35,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.ShardSearchFailure; -import org.elasticsearch.client.Client; +import org.elasticsearch.client.ParentTaskAssigningClient; import org.elasticsearch.common.Strings; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.unit.ByteSizeValue; @@ -87,14 +87,14 @@ public abstract class AbstractAsyncBulkByScrollAction destinationIndices = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final ESLogger logger; - private final Client client; + private final ParentTaskAssigningClient client; private final ThreadPool threadPool; private final SearchRequest firstSearchRequest; private final ActionListener listener; private final Retry retry; - public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, ESLogger logger, Client client, ThreadPool threadPool, - Request mainRequest, SearchRequest firstSearchRequest, ActionListener listener) { + public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, ESLogger logger, ParentTaskAssigningClient client, + ThreadPool threadPool, Request mainRequest, SearchRequest firstSearchRequest, ActionListener listener) { this.task = task; this.logger = logger; this.client = client; @@ -409,7 +409,11 @@ public abstract class AbstractAsyncBulkByScrollAction() { + /* + * Unwrap the client so we don't set our task as the parent. If we *did* set our ID then the clear scroll would be cancelled as + * if this task is cancelled. But we want to clear the scroll regardless of whether or not the main request was cancelled. + */ + client.unwrap().clearScroll(clearScrollRequest, new ActionListener() { @Override public void onResponse(ClearScrollResponse response) { logger.debug("Freed [{}] contexts", response.getNumFreed()); diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkIndexByScrollAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkIndexByScrollAction.java index 345b92df67b..1932f2ef5be 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkIndexByScrollAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkIndexByScrollAction.java @@ -23,9 +23,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.client.Client; +import org.elasticsearch.client.ParentTaskAssigningClient; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.index.mapper.internal.IdFieldMapper; import org.elasticsearch.index.mapper.internal.IndexFieldMapper; @@ -62,8 +61,8 @@ public abstract class AbstractAsyncBulkIndexByScrollAction< private final ScriptService scriptService; private final CompiledScript script; - public AbstractAsyncBulkIndexByScrollAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, - ClusterState state, Client client, ThreadPool threadPool, Request mainRequest, SearchRequest firstSearchRequest, + public AbstractAsyncBulkIndexByScrollAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, ClusterState state, + ParentTaskAssigningClient client, ThreadPool threadPool, Request mainRequest, SearchRequest firstSearchRequest, ActionListener listener) { super(task, logger, client, threadPool, mainRequest, firstSearchRequest, listener); this.scriptService = scriptService; diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java index ccf72414c8b..7054fdbc4e6 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.Client; +import org.elasticsearch.client.ParentTaskAssigningClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; @@ -73,8 +74,8 @@ public class TransportReindexAction extends HandledTransportAction listener) { ClusterState state = clusterService.state(); validateAgainstAliases(request.getSearchRequest(), request.getDestination(), indexNameExpressionResolver, autoCreateIndex, state); - new AsyncIndexBySearchAction((BulkByScrollTask) task, logger, scriptService, client, state, threadPool, request, listener) - .start(); + ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task); + new AsyncIndexBySearchAction((BulkByScrollTask) task, logger, scriptService, client, state, threadPool, request, listener).start(); } @Override @@ -116,8 +117,9 @@ public class TransportReindexAction extends HandledTransportAction { - public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, Client client, - ClusterState state, ThreadPool threadPool, ReindexRequest request, ActionListener listener) { + public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, + ParentTaskAssigningClient client, ClusterState state, ThreadPool threadPool, ReindexRequest request, + ActionListener listener) { super(task, logger, scriptService, state, client, threadPool, request, request.getSearchRequest(), listener); } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java index 34440e8d749..38d3e3d55c3 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.Client; +import org.elasticsearch.client.ParentTaskAssigningClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; @@ -66,9 +67,9 @@ public class TransportUpdateByQueryAction extends HandledTransportAction listener) { + protected void doExecute(Task task, UpdateByQueryRequest request, ActionListener listener) { ClusterState state = clusterService.state(); + ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task); new AsyncIndexBySearchAction((BulkByScrollTask) task, logger, scriptService, client, threadPool, state, request, listener) .start(); } @@ -82,8 +83,8 @@ public class TransportUpdateByQueryAction extends HandledTransportAction { - public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, Client client, - ThreadPool threadPool, ClusterState clusterState, UpdateByQueryRequest request, + public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, + ParentTaskAssigningClient client, ThreadPool threadPool, ClusterState clusterState, UpdateByQueryRequest request, ActionListener listener) { super(task, logger, scriptService, clusterState, client, threadPool, request, request.getSearchRequest(), listener); } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java index 9a581314b3c..048f95b1075 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.reindex; +import org.elasticsearch.Version; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; @@ -47,8 +48,11 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.FilterClient; +import org.elasticsearch.client.ParentTaskAssigningClient; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.text.Text; +import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; @@ -60,6 +64,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.internal.InternalSearchHit; import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; @@ -83,6 +88,7 @@ import java.util.function.Consumer; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; import static java.util.Collections.singleton; import static org.apache.lucene.util.TestUtil.randomSimpleString; import static org.elasticsearch.action.bulk.BackoffPolicy.constantBackoff; @@ -102,30 +108,34 @@ import static org.hamcrest.Matchers.instanceOf; public class AsyncBulkByScrollActionTests extends ESTestCase { private MyMockClient client; private ThreadPool threadPool; - private DummyAbstractBulkByScrollRequest mainRequest; + private DummyAbstractBulkByScrollRequest testRequest; private SearchRequest firstSearchRequest; private PlainActionFuture listener; private String scrollId; private TaskManager taskManager; - private BulkByScrollTask task; + private BulkByScrollTask testTask; private Map expectedHeaders = new HashMap<>(); + private DiscoveryNode localNode; + private TaskId taskId; @Before public void setupForTest() { client = new MyMockClient(new NoOpClient(getTestName())); threadPool = new ThreadPool(getTestName()); - mainRequest = new DummyAbstractBulkByScrollRequest(); + testRequest = new DummyAbstractBulkByScrollRequest(); firstSearchRequest = new SearchRequest().scroll(timeValueSeconds(10)); listener = new PlainActionFuture<>(); scrollId = null; taskManager = new TaskManager(Settings.EMPTY); - task = (BulkByScrollTask) taskManager.register("don'tcare", "hereeither", mainRequest); + testTask = (BulkByScrollTask) taskManager.register("don'tcare", "hereeither", testRequest); // Fill the context with something random so we can make sure we inherited it appropriately. expectedHeaders.clear(); expectedHeaders.put(randomSimpleString(random()), randomSimpleString(random())); threadPool.getThreadContext().newStoredContext(); threadPool.getThreadContext().putHeader(expectedHeaders); + localNode = new DiscoveryNode("thenode", new LocalTransportAddress("dead.end:666"), emptyMap(), emptySet(), Version.CURRENT); + taskId = new TaskId(localNode.getId(), testTask.getId()); } @After @@ -146,14 +156,14 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { public void testScrollResponseSetsTotal() { // Default is 0, meaning unstarted - assertEquals(0, task.getStatus().getTotal()); + assertEquals(0, testTask.getStatus().getTotal()); long total = randomIntBetween(0, Integer.MAX_VALUE); InternalSearchHits hits = new InternalSearchHits(null, total, 0); InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false); new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(timeValueSeconds(0), new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); - assertEquals(total, task.getStatus().getTotal()); + assertEquals(total, testTask.getStatus().getTotal()); } /** @@ -172,7 +182,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { // Use assert busy because the update happens on another thread final int expectedBatches = batches; - assertBusy(() -> assertEquals(expectedBatches, task.getStatus().getBatches())); + assertBusy(() -> assertEquals(expectedBatches, testTask.getStatus().getBatches())); /* * While we're here we can check that getting a scroll response sets the last scroll start time which makes sure the wait time @@ -189,7 +199,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { } public void testBulkResponseSetsLotsOfStatus() { - mainRequest.setAbortOnVersionConflict(false); + testRequest.setAbortOnVersionConflict(false); int maxBatches = randomIntBetween(0, 100); long versionConflicts = 0; long created = 0; @@ -230,11 +240,11 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { responses[i] = new BulkItemResponse(i, opType, new IndexResponse(shardId, "type", "id" + i, randomInt(), createdResponse)); } new DummyAbstractAsyncBulkByScrollAction().onBulkResponse(new BulkResponse(responses, 0)); - assertEquals(versionConflicts, task.getStatus().getVersionConflicts()); - assertEquals(updated, task.getStatus().getUpdated()); - assertEquals(created, task.getStatus().getCreated()); - assertEquals(deleted, task.getStatus().getDeleted()); - assertEquals(versionConflicts, task.getStatus().getVersionConflicts()); + assertEquals(versionConflicts, testTask.getStatus().getVersionConflicts()); + assertEquals(updated, testTask.getStatus().getUpdated()); + assertEquals(created, testTask.getStatus().getCreated()); + assertEquals(deleted, testTask.getStatus().getDeleted()); + assertEquals(versionConflicts, testTask.getStatus().getVersionConflicts()); } } @@ -265,7 +275,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { assertThat(client.scrollsCleared, contains(scrollId)); // When the task is rejected we don't increment the throttled timer - assertEquals(timeValueMillis(0), task.getStatus().getThrottled()); + assertEquals(timeValueMillis(0), testTask.getStatus().getThrottled()); } /** @@ -345,7 +355,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { long retryAttempts = 0; for (int i = 0; i < bulksToTry; i++) { retryAttempts += retryTestCase(false); - assertEquals(retryAttempts, task.getStatus().getRetries()); + assertEquals(retryAttempts, testTask.getStatus().getRetries()); } } @@ -354,16 +364,16 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { */ public void testBulkRejectionsRetryAndFailAnyway() throws Exception { long retryAttempts = retryTestCase(true); - assertEquals(retryAttempts, task.getStatus().getRetries()); + assertEquals(retryAttempts, testTask.getStatus().getRetries()); } public void testPerfectlyThrottledBatchTime() { DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction(); - mainRequest.setRequestsPerSecond(0); + testRequest.setRequestsPerSecond(0); assertThat((double) action.perfectlyThrottledBatchTime(randomInt()), closeTo(0f, 0f)); int total = between(0, 1000000); - task.rethrottle(1); + testTask.rethrottle(1); assertThat((double) action.perfectlyThrottledBatchTime(total), closeTo(TimeUnit.SECONDS.toNanos(total), TimeUnit.SECONDS.toNanos(1))); } @@ -389,7 +399,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { action.setScroll(scrollId()); // We'd like to get about 1 request a second - task.rethrottle(1f); + testTask.rethrottle(1f); // Make the last scroll look nearly instant action.setLastBatchStartTime(System.nanoTime()); // The last batch had 100 documents @@ -409,11 +419,11 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { // Running the command ought to increment the delay counter on the task. capturedCommand.get().run(); - assertEquals(capturedDelay.get(), task.getStatus().getThrottled()); + assertEquals(capturedDelay.get(), testTask.getStatus().getThrottled()); } private long retryTestCase(boolean failWithRejection) throws Exception { - int totalFailures = randomIntBetween(1, mainRequest.getMaxRetries()); + int totalFailures = randomIntBetween(1, testRequest.getMaxRetries()); int size = randomIntBetween(1, 100); int retryAttempts = totalFailures - (failWithRejection ? 1 : 0); @@ -487,7 +497,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { private void refreshTestCase(Boolean refresh, boolean addDestinationIndexes, boolean shouldRefresh) { if (refresh != null) { - mainRequest.setRefresh(refresh); + testRequest.setRefresh(refresh); } DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction(); if (addDestinationIndexes) { @@ -526,7 +536,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { public void testCancelBeforeStartNormalTermination() throws Exception { // Refresh or not doesn't matter - we don't try to refresh. - mainRequest.setRefresh(usually()); + testRequest.setRefresh(usually()); cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.startNormalTermination(emptyList(), emptyList(), false)); assertNull("No refresh was attempted", client.lastRefreshRequest.get()); } @@ -554,7 +564,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { * that good stuff. */ if (delay.nanos() > 0) { - generic().execute(() -> taskManager.cancel(task, reason, (Set s) -> {})); + generic().execute(() -> taskManager.cancel(testTask, reason, (Set s) -> {})); } return super.schedule(delay, name, command); } @@ -587,7 +597,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { action.setScroll(scrollId()); } String reason = randomSimpleString(random()); - taskManager.cancel(task, reason, (Set s) -> {}); + taskManager.cancel(testTask, reason, (Set s) -> {}); testMe.accept(action); assertEquals(reason, listener.get().getReasonCancelled()); if (previousScrollSet) { @@ -599,8 +609,8 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { private class DummyAbstractAsyncBulkByScrollAction extends AbstractAsyncBulkByScrollAction { public DummyAbstractAsyncBulkByScrollAction() { - super(AsyncBulkByScrollActionTests.this.task, logger, client, threadPool, - AsyncBulkByScrollActionTests.this.mainRequest, firstSearchRequest, listener); + super(testTask, logger, new ParentTaskAssigningClient(client, localNode, testTask), threadPool, testRequest, firstSearchRequest, + listener); } @Override @@ -641,6 +651,11 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { RequestBuilder extends ActionRequestBuilder> void doExecute( Action action, Request request, ActionListener listener) { lastHeaders.set(threadPool.getThreadContext().getHeaders()); + if (request instanceof ClearScrollRequest) { + assertEquals(TaskId.EMPTY_TASK_ID, request.getParentTask()); + } else { + assertEquals(taskId, request.getParentTask()); + } if (request instanceof RefreshRequest) { lastRefreshRequest.set((RefreshRequest) request); listener.onResponse(null); diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexMetadataTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexMetadataTests.java index 0e0bae4c1f6..186dc76804f 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexMetadataTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexMetadataTests.java @@ -67,8 +67,7 @@ public class ReindexMetadataTests extends AbstractAsyncBulkIndexbyScrollActionMe @Override protected TransportReindexAction.AsyncIndexBySearchAction action() { - return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, null, null, threadPool - , request(), listener()); + return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, null, null, threadPool, request(), listener()); } @Override diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexScriptTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexScriptTests.java index 20d57c27266..a21a16deb0f 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexScriptTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexScriptTests.java @@ -134,7 +134,6 @@ public class ReindexScriptTests extends AbstractAsyncBulkIndexByScrollActionScri @Override protected AbstractAsyncBulkIndexByScrollAction action() { - return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, null, null, - threadPool, request(), listener()); + return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, null, null, threadPool, request(), listener()); } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java index 735d3966889..7a75759bfda 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java @@ -50,7 +50,6 @@ public class UpdateByQueryWithScriptTests @Override protected AbstractAsyncBulkIndexByScrollAction action() { - return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, null, threadPool, - null, request(), listener()); + return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, null, threadPool, null, request(), listener()); } }